Files
msyrs/src/download/jpmaqsdownload.rs

409 lines
14 KiB
Rust

use crate::download::helpers::DQTimeSeriesResponse;
use crate::download::helpers::DQTimeseriesRequestArgs;
use crate::download::helpers::JPMaQSIndicator;
use crate::download::oauth_client::OAuthClient;
use crate::download::requester::DQRequester;
// use polars::prelude::*;
use std::error::Error;
const DEFAULT_JPMAQS_METRICS: [&str; 4] = ["value", "grading", "eop_lag", "mop_lag"];
fn ticker_to_expressions(ticker: &str, metrics: Vec<&str>) -> Vec<String> {
metrics
.iter()
.map(|metric| format!("DB(JPMAQS,{},{})", ticker, metric))
.collect::<Vec<String>>()
}
fn construct_expressions(tickers: Vec<String>, metrics: Vec<String>) -> Vec<String> {
tickers
.iter()
.flat_map(|ticker| {
ticker_to_expressions(ticker, metrics.clone().iter().map(|s| s.as_str()).collect())
})
.collect::<Vec<String>>()
}
fn deconstruct_expression(expression: &str) -> (String, String, String) {
if !is_jpmaq_expression(expression) {
return (
expression.to_string(),
expression.to_string(),
expression.to_string(),
);
}
let parts = expression.split(',').collect::<Vec<&str>>();
let ticker = parts[1].to_string();
let metric = parts[2].to_string();
let ticker_parts = ticker.split_once('_').unwrap();
let cid = ticker_parts.0.to_string();
let xcat = ticker_parts.1.to_string();
(cid, xcat, metric)
}
fn is_jpmaq_expression(expression: &str) -> bool {
expression.starts_with("DB(JPMAQS,")
&& expression.ends_with(")")
&& expression.split(',').count() == 3
&& expression.split(',').nth(0).unwrap() == "DB(JPMAQS"
&& expression.split(',').nth(2).unwrap().ends_with(")")
}
fn all_jpmaq_expressions(expressions: Vec<String>) -> bool {
expressions
.iter()
.all(|expression| is_jpmaq_expression(expression))
}
/// Struct for the arguments passed to JPMaQSDownload for downloading indicators.
///
/// Example Usage:
///
/// ```rust
/// use msyrs::download::jpmaqsdownload::JPMaQSDownloadGetIndicatorArgs;
/// use msyrs::download::jpmaqsdownload::JPMaQSDownload;
///
/// let download_args = JPMaQSDownloadGetIndicatorArgs {
/// tickers: vec!["USD_EQXR_NSA".to_string(), "GBP_EQXR_NSA".to_string()],
/// metrics: vec!["value".to_string(), "grading".to_string()],
/// start_date: "2024-11-11".to_string(),
/// end_date: "2024-11-11".to_string(),
/// };
///
/// let mut jpamqs_download = JPMaQSDownload::default();
/// let res_df = jpamqs_download.get_indicators_qdf(download_args).unwrap();
/// ```
#[derive(Debug, Clone)]
pub struct JPMaQSDownloadGetIndicatorArgs {
pub tickers: Vec<String>,
pub metrics: Vec<String>,
pub start_date: String,
pub end_date: String,
}
/// Default implementation for JPMaQSDownloadGetIndicatorArgs.
/// Gets the indicators for the tickers from the start of the data till 'today + 2 days'.
/// Typically, a user only needs to update `tickers` and `start_date` fields.
///
/// The default metrics are: ["value", "grading", "eop_lag", "mop_lag"]
impl Default for JPMaQSDownloadGetIndicatorArgs {
fn default() -> Self {
JPMaQSDownloadGetIndicatorArgs {
tickers: Vec::new(),
metrics: DEFAULT_JPMAQS_METRICS
.iter()
.map(|s| s.to_string())
.collect(),
start_date: "1990-01-01".to_string(),
end_date: "TODAY+2D".to_string(),
}
}
}
/// Struct for downloading data from the JPMaQS data from JPMorgan DataQuery API.
///
/// ## Example Usage
/// ```ignore
/// use msyrs::download::jpmaqsdownload::JPMaQSDownload;
/// use msyrs::download::jpmaqsdownload::JPMaQSDownloadGetIndicatorArgs;
/// use polars::prelude::*;
/// // if your client ID & secret are already set in the environment variables
/// // you can create a new JPMaQSDownload instance with the default constructor
///
/// let mut jpamqs_download = JPMaQSDownload::default();
///
/// // or alternatively,
/// let mut jpamqs_download = JPMaQSDownload::new(
/// "your_client_id".to_string(),
/// "your_client_secret".to_string(),
/// );
///
/// match jpamqs_download.check_connection() {
/// Ok(_) => println!("Connection to DataQuery API successful"),
/// Err(e) => println!("Error connecting to DataQuery API: {:?}", e),
/// }
///
/// let tickers: Vec<String> = jpamqs_download.get_catalogue().unwrap();
///
/// let some_tickers = tickers.iter().take(100).map(|s| s.to_string()).collect();
/// // get a dataframe containing the indicators for the first 100 tickers
///
/// let download_args = JPMaQSDownloadGetIndicatorArgs {
/// tickers: some_tickers,
/// start_date: "2024-11-11".to_string(),
/// ..Default::default()
/// };
/// let res_df: DataFrame = jpamqs_download.get_indicators_qdf(download_args).unwrap();
///
/// // save some tickers to disk as CSVs
///
/// match jpamqs_download.save_indicators_as_csv(download_args, "./data/") {
/// Ok(_) => println!("Saved indicators to disk"),
/// Err(e) => println!("Error saving indicators: {:?}", e),
/// }
/// ```
#[derive(Debug, Clone)]
pub struct JPMaQSDownload {
requester: DQRequester,
}
impl Default for JPMaQSDownload {
fn default() -> Self {
let requester = DQRequester::default();
JPMaQSDownload { requester }
}
}
impl JPMaQSDownload {
/// Create a new JPMaQSDownload instance with the provided client ID and client secret.
pub fn new(client_id: String, client_secret: String) -> Self {
let oauth_client = OAuthClient::new(client_id.clone(), client_secret.clone());
let requester = DQRequester::new(oauth_client);
JPMaQSDownload { requester }
}
/// Check the connection to the DataQuery API.
pub fn check_connection(&mut self) -> Result<(), Box<dyn Error>> {
self.requester.check_connection()
}
/// Get the catalogue of tickers available in the JPMaQS data.
pub fn get_catalogue(&mut self) -> Result<Vec<String>, Box<dyn Error>> {
println!("Getting JPMaQS catalogue ...");
let dq_catalogue = self.requester.get_catalogue("JPMAQS", 1000)?;
Ok(dq_catalogue.all_instruments)
}
fn filter_expressions(
&mut self,
expressions: Vec<String>,
) -> Result<Vec<String>, Box<dyn Error>> {
// filter out expressions that are not in the catalogue
let dq_catalogue = self.get_catalogue()?;
println!("Filtering expressions based on the JPMaQS catalogue ...");
let filtered_expressions = expressions
.iter()
.filter(|expression| {
let (cid, xcat, _) = deconstruct_expression(expression);
dq_catalogue.contains(&format!("{}_{}", cid, xcat))
})
.map(|s| s.to_string())
.collect::<Vec<String>>();
Ok(filtered_expressions)
}
/// Get the time series data for the provided expressions.
pub fn get_expressions(
&mut self,
expressions: Vec<String>,
) -> Result<Vec<DQTimeSeriesResponse>, Box<dyn Error>> {
let dqts_vec = self
.requester
.get_timeseries(DQTimeseriesRequestArgs {
expressions: expressions,
..Default::default()
})
.unwrap();
Ok(dqts_vec)
}
/// Get the indicators for the provided tickers and metrics.
pub fn get_indicators_list(
&mut self,
download_args: JPMaQSDownloadGetIndicatorArgs,
) -> Result<Vec<JPMaQSIndicator>, Box<dyn Error>> {
if download_args.tickers.is_empty() {
return Err("No tickers provided".into());
}
let expressions = construct_expressions(download_args.tickers, download_args.metrics);
assert!(all_jpmaq_expressions(expressions.clone()));
let expressions = self.filter_expressions(expressions)?;
let dq_download_args = DQTimeseriesRequestArgs {
expressions: expressions,
start_date: download_args.start_date,
end_date: download_args.end_date,
..Default::default()
};
let result = self
.requester
.get_timeseries_as_jpmaqs_indicators(dq_download_args);
match result {
Ok(indicators) => Ok(indicators),
Err(e) => Err(e),
}
}
pub fn get_indicators_qdf(
&mut self,
download_args: JPMaQSDownloadGetIndicatorArgs,
) -> Result<polars::prelude::DataFrame, Box<dyn Error>> {
let mut indicators: Vec<JPMaQSIndicator> = self.get_indicators_list(download_args)?;
if indicators.is_empty() {
return Err("No indicators retrieved".into());
}
if indicators.len() == 1 {
return indicators.pop().unwrap().as_qdf();
}
assert!(indicators.len() > 1);
let mut df_main = indicators.pop().unwrap().as_qdf().unwrap();
while !indicators.is_empty() {
let df = indicators.pop().unwrap().as_qdf().unwrap();
df_main = df_main.vstack(&df).unwrap();
}
// sort by cid, xcat, real_date in that order
let _ = df_main.sort_in_place(
[
"cid".to_string(),
"xcat".to_string(),
"real_date".to_string(),
],
polars::chunked_array::ops::SortMultipleOptions::default(),
);
Ok(df_main)
}
/// Save the indicators for the provided tickers and metrics as CSV files.///
/// The CSV files will be saved in the provided folder path in a subfolder named "JPMaQSData".
/// Any data in the folder path will be deleted before saving the new data.
/// The function downloads the indicators in batches of 500 tickers at a time.
///
/// The saved results would be in the format: `<folder_path>/JPMaQSData/<xcat>/<ticker>.csv` .
///
/// Usage:
///
/// ```ignore
/// use msyrs::download::jpmaqsdownload::JPMaQSDownload;
/// use msyrs::download::jpmaqsdownload::JPMaQSDownloadGetIndicatorArgs;
/// let mut jpamqs_download = JPMaQSDownload::default();
/// let tickers: Vec<String> = jpamqs_download.get_catalogue().unwrap();
/// let some_tickers = tickers.iter().take(100).map(|s| s.to_string()).collect();
/// let res = jpamqs_download.save_indicators_as_csv(
/// JPMaQSDownloadGetIndicatorArgs {
/// tickers: some_tickers,
/// start_date: "2024-11-11".to_string(),
/// ..Default::default()
/// },
/// "./data/",
/// );
/// match res {
/// Ok(_) => println!("Saved indicators to disk"),
/// Err(e) => println!("Error saving indicators: {:?}", e),
/// }
/// ```
pub fn save_indicators_as_csv(
&mut self,
download_args: JPMaQSDownloadGetIndicatorArgs,
folder_path: &str,
) -> Result<(), Box<dyn Error>> {
// if the folder path does not exist, create it
let save_path = format!("{}/JPMaQSData/", folder_path);
let save_path = std::path::Path::new(&save_path)
.components()
.collect::<std::path::PathBuf>()
.to_string_lossy()
.to_string();
let _ = std::fs::remove_dir_all(save_path.clone());
std::fs::create_dir_all(save_path.clone())?;
// get ticker count, and split into chunks of batch_size
let ticker_batches = download_args.tickers.chunks(500);
let batch_download_args = ticker_batches
.map(|ticker_batch| {
let mut new_args = download_args.clone();
new_args.tickers = ticker_batch.to_vec();
new_args
})
.collect::<Vec<JPMaQSDownloadGetIndicatorArgs>>();
// print total number of download sessions
println!(
"Total number of download sessions: {}",
batch_download_args.len()
);
// Start timer
let start_time = std::time::Instant::now();
let total_batches = batch_download_args.len();
// download each batch and save as csv
for (index, batch_args) in batch_download_args.iter().enumerate() {
let df_list = self.get_indicators_list(batch_args.clone())?;
let total_indicators = df_list.len();
save_indicators_list_as_csvs(df_list, save_path.clone())?;
println!("Saved {} indicators to {}", total_indicators, save_path);
// Calculate and print estimated remaining time
let elapsed = start_time.elapsed();
let average_time_per_batch = elapsed / (index as u32 + 1);
let remaining_batches = total_batches - index - 1;
let estimated_remaining = average_time_per_batch * remaining_batches as u32;
println!(
"Progress: {}/{}. Estimated time remaining: {:.2?}",
index + 1,
total_batches,
estimated_remaining
);
}
Ok(())
}
}
fn save_indicators_list_as_csvs(
indicators: Vec<JPMaQSIndicator>,
folder_path: String,
) -> Result<(), Box<dyn Error>> {
for indicator in indicators {
let ticker_folder = format!("{}/{}", folder_path, indicator.get_xcat());
std::fs::create_dir_all(&ticker_folder)?;
let file_path = format!("{}/{}.csv", ticker_folder, indicator.ticker);
indicator.save_as_csv(&file_path)?;
}
Ok(())
}
/// Function to download the JPMaQS tickers as a DataFrame. Solely exists to allow for a simpler
/// Python wrapper for downloading JPMaQS data.
pub fn download_jpmaqs_indicators_as_df(
client_id: String,
client_secret: String,
tickers: Vec<String>,
metrics: Option<Vec<String>>,
start_date: Option<String>,
end_date: Option<String>,
) -> Result<polars::prelude::DataFrame, Box<dyn Error>> {
let metrics = metrics.unwrap_or_else(|| {
DEFAULT_JPMAQS_METRICS
.iter()
.map(|s| s.to_string())
.collect()
});
// if metrics== "ALL", then use the default metrics
let metrics = if metrics.len() == 1 && metrics[0] == "ALL" {
DEFAULT_JPMAQS_METRICS
.iter()
.map(|s| s.to_string())
.collect()
} else {
metrics
};
let mut jpamqs_download = JPMaQSDownload::new(client_id, client_secret);
let download_args = JPMaQSDownloadGetIndicatorArgs {
tickers: tickers,
metrics: metrics,
start_date: start_date.unwrap_or("1990-01-01".to_string()),
end_date: end_date.unwrap_or("TODAY+2D".to_string()),
..Default::default()
};
jpamqs_download.get_indicators_qdf(download_args)
}