use crate::download::helpers::DQTimeSeriesResponse; use crate::download::helpers::DQTimeseriesRequestArgs; use crate::download::helpers::JPMaQSIndicator; use crate::download::oauth_client::OAuthClient; use crate::download::requester::DQRequester; // use polars::prelude::*; use std::error::Error; const DEFAULT_JPMAQS_METRICS: [&str; 4] = ["value", "grading", "eop_lag", "mop_lag"]; fn ticker_to_expressions(ticker: &str, metrics: Vec<&str>) -> Vec { metrics .iter() .map(|metric| format!("DB(JPMAQS,{},{})", ticker, metric)) .collect::>() } fn construct_expressions(tickers: Vec, metrics: Vec) -> Vec { tickers .iter() .flat_map(|ticker| { ticker_to_expressions(ticker, metrics.clone().iter().map(|s| s.as_str()).collect()) }) .collect::>() } fn deconstruct_expression(expression: &str) -> (String, String, String) { if !is_jpmaq_expression(expression) { return ( expression.to_string(), expression.to_string(), expression.to_string(), ); } let parts = expression.split(',').collect::>(); let ticker = parts[1].to_string(); let metric = parts[2].to_string(); let ticker_parts = ticker.split_once('_').unwrap(); let cid = ticker_parts.0.to_string(); let xcat = ticker_parts.1.to_string(); (cid, xcat, metric) } fn is_jpmaq_expression(expression: &str) -> bool { expression.starts_with("DB(JPMAQS,") && expression.ends_with(")") && expression.split(',').count() == 3 && expression.split(',').nth(0).unwrap() == "DB(JPMAQS" && expression.split(',').nth(2).unwrap().ends_with(")") } fn all_jpmaq_expressions(expressions: Vec) -> bool { expressions .iter() .all(|expression| is_jpmaq_expression(expression)) } /// Struct for the arguments passed to JPMaQSDownload for downloading indicators. /// /// Example Usage: /// /// ```ignore /// use msyrs::download::jpmaqsdownload::JPMaQSDownloadGetIndicatorArgs; /// use msyrs::download::jpmaqsdownload::JPMaQSDownload; /// /// let download_args = JPMaQSDownloadGetIndicatorArgs { /// tickers: vec!["USD_EQXR_NSA".to_string(), "GBP_EQXR_NSA".to_string()], /// metrics: vec!["value".to_string(), "grading".to_string()], /// start_date: "2024-11-11".to_string(), /// end_date: "2024-11-11".to_string(), /// }; /// /// let mut jpamqs_download = JPMaQSDownload::default(); /// let res_df = jpamqs_download.get_indicators_qdf(download_args).unwrap(); /// ``` #[derive(Debug, Clone)] pub struct JPMaQSDownloadGetIndicatorArgs { pub tickers: Vec, pub metrics: Vec, pub start_date: String, pub end_date: String, } /// Default implementation for JPMaQSDownloadGetIndicatorArgs. /// Gets the indicators for the tickers from the start of the data till 'today + 2 days'. /// Typically, a user only needs to update `tickers` and `start_date` fields. /// /// The default metrics are: ["value", "grading", "eop_lag", "mop_lag"] impl Default for JPMaQSDownloadGetIndicatorArgs { fn default() -> Self { JPMaQSDownloadGetIndicatorArgs { tickers: Vec::new(), metrics: DEFAULT_JPMAQS_METRICS .iter() .map(|s| s.to_string()) .collect(), start_date: "1990-01-01".to_string(), end_date: "TODAY+2D".to_string(), } } } /// Struct for downloading data from the JPMaQS data from JPMorgan DataQuery API. /// /// ## Example Usage /// ```ignore /// use msyrs::download::jpmaqsdownload::JPMaQSDownload; /// use msyrs::download::jpmaqsdownload::JPMaQSDownloadGetIndicatorArgs; /// use polars::prelude::*; /// // if your client ID & secret are already set in the environment variables /// // you can create a new JPMaQSDownload instance with the default constructor /// /// let mut jpamqs_download = JPMaQSDownload::default(); /// /// // or alternatively, /// let mut jpamqs_download = JPMaQSDownload::new( /// "your_client_id".to_string(), /// "your_client_secret".to_string(), /// ); /// /// match jpamqs_download.check_connection() { /// Ok(_) => println!("Connection to DataQuery API successful"), /// Err(e) => println!("Error connecting to DataQuery API: {:?}", e), /// } /// /// let tickers: Vec = jpamqs_download.get_catalogue().unwrap(); /// /// let some_tickers = tickers.iter().take(100).map(|s| s.to_string()).collect(); /// // get a dataframe containing the indicators for the first 100 tickers /// /// let download_args = JPMaQSDownloadGetIndicatorArgs { /// tickers: some_tickers, /// start_date: "2024-11-11".to_string(), /// ..Default::default() /// }; /// let res_df: DataFrame = jpamqs_download.get_indicators_qdf(download_args).unwrap(); /// /// // save some tickers to disk as CSVs /// /// match jpamqs_download.save_indicators_as_csv(download_args, "./data/") { /// Ok(_) => println!("Saved indicators to disk"), /// Err(e) => println!("Error saving indicators: {:?}", e), /// } /// #[derive(Debug, Clone)] pub struct JPMaQSDownload { requester: DQRequester, } impl Default for JPMaQSDownload { fn default() -> Self { let requester = DQRequester::default(); JPMaQSDownload { requester } } } impl JPMaQSDownload { /// Create a new JPMaQSDownload instance with the provided client ID and client secret. pub fn new(client_id: String, client_secret: String) -> Self { let oauth_client = OAuthClient::new(client_id.clone(), client_secret.clone()); let requester = DQRequester::new(oauth_client); JPMaQSDownload { requester } } /// Check the connection to the DataQuery API. pub fn check_connection(&mut self) -> Result<(), Box> { self.requester.check_connection() } /// Get the catalogue of tickers available in the JPMaQS data. pub fn get_catalogue(&mut self) -> Result, Box> { println!("Getting JPMaQS catalogue ..."); let dq_catalogue = self.requester.get_catalogue("JPMAQS", 1000)?; Ok(dq_catalogue.all_instruments) } fn filter_expressions( &mut self, expressions: Vec, ) -> Result, Box> { // filter out expressions that are not in the catalogue let dq_catalogue = self.get_catalogue()?; println!("Filtering expressions based on the JPMaQS catalogue ..."); let filtered_expressions = expressions .iter() .filter(|expression| { let (cid, xcat, _) = deconstruct_expression(expression); dq_catalogue.contains(&format!("{}_{}", cid, xcat)) }) .map(|s| s.to_string()) .collect::>(); Ok(filtered_expressions) } /// Get the time series data for the provided expressions. pub fn get_expressions( &mut self, expressions: Vec, ) -> Result, Box> { let dqts_vec = self .requester .get_timeseries(DQTimeseriesRequestArgs { expressions: expressions, ..Default::default() }) .unwrap(); Ok(dqts_vec) } /// Get the indicators for the provided tickers and metrics. pub fn get_indicators_list( &mut self, download_args: JPMaQSDownloadGetIndicatorArgs, ) -> Result, Box> { if download_args.tickers.is_empty() { return Err("No tickers provided".into()); } let expressions = construct_expressions(download_args.tickers, download_args.metrics); assert!(all_jpmaq_expressions(expressions.clone())); let expressions = self.filter_expressions(expressions)?; let dq_download_args = DQTimeseriesRequestArgs { expressions: expressions, start_date: download_args.start_date, end_date: download_args.end_date, ..Default::default() }; let result = self .requester .get_timeseries_as_jpmaqs_indicators(dq_download_args); match result { Ok(indicators) => Ok(indicators), Err(e) => Err(e), } } pub fn get_indicators_qdf( &mut self, download_args: JPMaQSDownloadGetIndicatorArgs, ) -> Result> { let mut indicators: Vec = self.get_indicators_list(download_args)?; if indicators.is_empty() { return Err("No indicators retrieved".into()); } if indicators.len() == 1 { return indicators.pop().unwrap().as_qdf(); } assert!(indicators.len() > 1); let mut df_main = indicators.pop().unwrap().as_qdf().unwrap(); while !indicators.is_empty() { let df = indicators.pop().unwrap().as_qdf().unwrap(); df_main = df_main.vstack(&df).unwrap(); } // sort by cid, xcat, real_date in that order let _ = df_main.sort_in_place( [ "cid".to_string(), "xcat".to_string(), "real_date".to_string(), ], polars::chunked_array::ops::SortMultipleOptions::default(), ); Ok(df_main) } /// Save the indicators for the provided tickers and metrics as CSV files./// /// The CSV files will be saved in the provided folder path in a subfolder named "JPMaQSData". /// Any data in the folder path will be deleted before saving the new data. /// The function downloads the indicators in batches of 500 tickers at a time. /// /// The saved results would be in the format: `/JPMaQSData//.csv` . /// /// Usage: /// /// ```ignore /// use msyrs::download::jpmaqsdownload::JPMaQSDownload; /// use msyrs::download::jpmaqsdownload::JPMaQSDownloadGetIndicatorArgs; /// let mut jpamqs_download = JPMaQSDownload::default(); /// let tickers: Vec = jpamqs_download.get_catalogue().unwrap(); /// let some_tickers = tickers.iter().take(100).map(|s| s.to_string()).collect(); /// let res = jpamqs_download.save_indicators_as_csv( /// JPMaQSDownloadGetIndicatorArgs { /// tickers: some_tickers, /// start_date: "2024-11-11".to_string(), /// ..Default::default() /// }, /// "./data/", /// ); /// match res { /// Ok(_) => println!("Saved indicators to disk"), /// Err(e) => println!("Error saving indicators: {:?}", e), /// } /// ``` pub fn save_indicators_as_csv( &mut self, download_args: JPMaQSDownloadGetIndicatorArgs, folder_path: &str, ) -> Result<(), Box> { // if the folder path does not exist, create it let save_path = format!("{}/JPMaQSData/", folder_path); let save_path = std::path::Path::new(&save_path) .components() .collect::() .to_string_lossy() .to_string(); let _ = std::fs::remove_dir_all(save_path.clone()); std::fs::create_dir_all(save_path.clone())?; // get ticker count, and split into chunks of batch_size let ticker_batches = download_args.tickers.chunks(500); let batch_download_args = ticker_batches .map(|ticker_batch| { let mut new_args = download_args.clone(); new_args.tickers = ticker_batch.to_vec(); new_args }) .collect::>(); // print total number of download sessions println!( "Total number of download sessions: {}", batch_download_args.len() ); // Start timer let start_time = std::time::Instant::now(); let total_batches = batch_download_args.len(); // download each batch and save as csv for (index, batch_args) in batch_download_args.iter().enumerate() { let df_list = self.get_indicators_list(batch_args.clone())?; let total_indicators = df_list.len(); save_indicators_list_as_csvs(df_list, save_path.clone())?; println!("Saved {} indicators to {}", total_indicators, save_path); // Calculate and print estimated remaining time let elapsed = start_time.elapsed(); let average_time_per_batch = elapsed / (index as u32 + 1); let remaining_batches = total_batches - index - 1; let estimated_remaining = average_time_per_batch * remaining_batches as u32; println!( "Progress: {}/{}. Estimated time remaining: {:.2?}", index + 1, total_batches, estimated_remaining ); } Ok(()) } } fn save_indicators_list_as_csvs( indicators: Vec, folder_path: String, ) -> Result<(), Box> { for indicator in indicators { let ticker_folder = format!("{}/{}", folder_path, indicator.get_xcat()); std::fs::create_dir_all(&ticker_folder)?; let file_path = format!("{}/{}.csv", ticker_folder, indicator.ticker); indicator.save_as_csv(&file_path)?; } Ok(()) } /// Function to download the JPMaQS tickers as a DataFrame. Solely exists to allow for a simpler /// Python wrapper for downloading JPMaQS data. pub fn download_jpmaqs_indicators_as_df( client_id: String, client_secret: String, tickers: Vec, metrics: Option>, start_date: Option, end_date: Option, ) -> Result> { let metrics = metrics.unwrap_or_else(|| { DEFAULT_JPMAQS_METRICS .iter() .map(|s| s.to_string()) .collect() }); // if metrics== "ALL", then use the default metrics let metrics = if metrics.len() == 1 && metrics[0] == "ALL" { DEFAULT_JPMAQS_METRICS .iter() .map(|s| s.to_string()) .collect() } else { metrics }; let mut jpamqs_download = JPMaQSDownload::new(client_id, client_secret); let download_args = JPMaQSDownloadGetIndicatorArgs { tickers: tickers, metrics: metrics, start_date: start_date.unwrap_or("1990-01-01".to_string()), end_date: end_date.unwrap_or("TODAY+2D".to_string()), ..Default::default() }; jpamqs_download.get_indicators_qdf(download_args) }