diff --git a/src/download/jpmaqsdownload.rs b/src/download/jpmaqsdownload.rs index 47672f5..0fd8b47 100644 --- a/src/download/jpmaqsdownload.rs +++ b/src/download/jpmaqsdownload.rs @@ -24,6 +24,22 @@ fn construct_expressions(tickers: Vec, metrics: Vec) -> Vec>() } +fn deconstruct_expression(expression: &str) -> (String, String, String) { + if !is_jpmaq_expression(expression) { + return ( + expression.to_string(), + expression.to_string(), + expression.to_string(), + ); + } + let parts = expression.split(',').collect::>(); + let ticker = parts[1].to_string(); + let metric = parts[2].to_string(); + let ticker_parts = ticker.split_once('_').unwrap(); + let cid = ticker_parts.0.to_string(); + let xcat = ticker_parts.1.to_string(); + (cid, xcat, metric) +} fn is_jpmaq_expression(expression: &str) -> bool { expression.starts_with("DB(JPMAQS,") && expression.ends_with(")") @@ -152,10 +168,30 @@ impl JPMaQSDownload { /// Get the catalogue of tickers available in the JPMaQS data. pub fn get_catalogue(&mut self) -> Result, Box> { + println!("Getting JPMaQS catalogue ..."); let dq_catalogue = self.requester.get_catalogue("JPMAQS", 1000)?; Ok(dq_catalogue.all_instruments) } + fn filter_expressions( + &mut self, + expressions: Vec, + ) -> Result, Box> { + // filter out expressions that are not in the catalogue + let dq_catalogue = self.get_catalogue()?; + println!("Filtering expressions based on the JPMaQS catalogue ..."); + let filtered_expressions = expressions + .iter() + .filter(|expression| { + let (cid, xcat, _) = deconstruct_expression(expression); + dq_catalogue.contains(&format!("{}_{}", cid, xcat)) + }) + .map(|s| s.to_string()) + .collect::>(); + + Ok(filtered_expressions) + } + /// Get the time series data for the provided expressions. pub fn get_expressions( &mut self, @@ -184,6 +220,8 @@ impl JPMaQSDownload { let expressions = construct_expressions(download_args.tickers, download_args.metrics); assert!(all_jpmaq_expressions(expressions.clone())); + let expressions = self.filter_expressions(expressions)?; + let dq_download_args = DQTimeseriesRequestArgs { expressions: expressions, start_date: download_args.start_date, diff --git a/src/download/mod.rs b/src/download/mod.rs index 16a6d40..7facc6b 100644 --- a/src/download/mod.rs +++ b/src/download/mod.rs @@ -1,5 +1,8 @@ -pub mod helpers; -pub mod jpmaqsdownload; +mod helpers; mod oauth_client; mod parreq; mod requester; + +pub mod jpmaqsdownload; + +pub use jpmaqsdownload::*; \ No newline at end of file diff --git a/src/download/parreq.rs b/src/download/parreq.rs index cae9caf..9ca6806 100644 --- a/src/download/parreq.rs +++ b/src/download/parreq.rs @@ -77,6 +77,11 @@ impl ParallelRequester { }) .collect(); + let prog_batches = match expression_batches.len() { + 0..=250 => 25, + _ => 100, + }; + let okay_response_texts = Arc::new(Mutex::new(Vec::new())); let failed_batches = Arc::new(Mutex::new(Vec::new())); let self_arc = Arc::new(self.clone()); @@ -97,7 +102,7 @@ impl ParallelRequester { let ep = format!("{}?{}", TIMESERIES_ENDPOINT, args_clone.as_query_string()); let permit = semaphore.clone().acquire_owned().await.unwrap(); - if curr_batch % 100 == 0 { + if curr_batch % prog_batches == 0 { println!("Requesting batch {} of {}", curr_batch, total_batches); } diff --git a/src/main.rs b/src/main.rs index efbeace..c2ac20b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,11 @@ -use msyrs::download::jpmaqsdownload::{JPMaQSDownload, JPMaQSDownloadGetIndicatorArgs}; +use msyrs::download::{JPMaQSDownload, JPMaQSDownloadGetIndicatorArgs}; +// use msyrs::download::jpmaqsdownload::*; // use msyrs::utils::qdf::load::*; // use msyrs::utils::qdf::dftools::*; // use msyrs::utils::qdf::core::*; -use msyrs::utils::qdf::*; - +use msyrs::utils as msyrs_utils; +use msyrs::utils::qdf as msyrs_qdf; +use polars::frame::DataFrame; #[allow(dead_code)] fn download_stuff() { @@ -46,32 +48,22 @@ fn download_stuff() { start.elapsed() ); - if !is_quantamental_dataframe(&res_df) { + if !msyrs_qdf::is_quantamental_dataframe(&res_df) { println!("DataFrame is not a quantamental DataFrame"); } else { println!("DataFrame is a quantamental DataFrame"); } } -fn main() { - // E:\Work\ruzt\msyrs\data\JPMaQSData\ALLIFCDSGDP\AUD_ALLIFCDSGDP_NSA.csv - // let pth = "E:/Work/ruzt/msyrs/data/JPMaQSData/ALLIFCDSGDP/AUD_ALLIFCDSGDP_NSA.csv"; - // let df = msyrs_dftools::load_quantamental_dataframe(pth).unwrap(); - // println!("{:?}", df); - // load_quantamental_dataframe_from_download_bank - // let st_pth = "E:/Work/ruzt/msyrs/data/JPMaQSData/"; - +#[allow(dead_code)] +fn load_mega_df() -> DataFrame { let start = std::time::Instant::now(); let st_pth = "E:\\Work\\jpmaqs-data\\data"; - let mega_df = load_quantamental_dataframe_from_download_bank( + let mega_df = msyrs_qdf::load::load_qdf_from_download_bank( st_pth, - // Some(vec!["AUD", "USD", "GBP", "JPY"]), - // Some(vec!["RIR_NSA", "EQXR_NSA"]), None, None, - // Some(vec!["EQXR_NSA", "RIR_NSA"]), - // None Some(vec![ "AUD_EQXR_NSA", "USD_EQXR_NSA", @@ -83,87 +75,124 @@ fn main() { "JPY_RIR_NSA", ]), ) - .unwrap(); + .unwrap(); let end = start.elapsed(); println!("Loaded Mega DataFrame in {:?}", end); + mega_df +} - let start = std::time::Instant::now(); - let df_new = reduce_dataframe( - mega_df.clone(), +fn main() { + nb(); +} + +#[allow(dead_code)] +fn nb() { + let cids_dm = vec![ + "AUD", "CAD", "CHF", "EUR", "GBP", "JPY", "NOK", "NZD", "SEK", "USD", + ]; + let cids_em = vec![ + "CLP", "COP", "CZK", "HUF", "IDR", "ILS", "INR", "KRW", "MXN", "PLN", "THB", "TRY", "TWD", + "ZAR", + ]; + + // cids = cids_dm + cids_em + let cids: Vec = cids_dm + .iter() + .chain(cids_em.iter()) + .map(|s| s.to_string()) + .collect(); + + let non_dux = ["IDR", "NZD"]; + #[allow(unused_variables)] + let cids_dux: Vec = cids + .iter() + .filter(|s| !non_dux.contains(&s.as_str())) + .map(|s| s.to_string()) + .collect(); + let ecos = vec![ + "CPIC_SA_P1M1ML12", + "CPIC_SJA_P3M3ML3AR", + "CPIC_SJA_P6M6ML6AR", + "CPIH_SA_P1M1ML12", + "CPIH_SJA_P3M3ML3AR", + "CPIH_SJA_P6M6ML6AR", + "INFTEFF_NSA", + "INTRGDP_NSA_P1M1ML12_3MMA", + "INTRGDPv5Y_NSA_P1M1ML12_3MMA", + "PCREDITGDP_SJA_D1M1ML12", + "RGDP_SA_P1Q1QL4_20QMA", + "RYLDIRS02Y_NSA", + "RYLDIRS05Y_NSA", + "PCREDITBN_SJA_P1M1ML12", + ]; + let mkts = vec![ + "DU02YXR_NSA", + "DU05YXR_NSA", + "DU02YXR_VT10", + "DU05YXR_VT10", + "EQXR_NSA", + "EQXR_VT10", + "FXXR_NSA", + "FXXR_VT10", + "FXCRR_NSA", + "FXTARGETED_NSA", + "FXUNTRADABLE_NSA", + ]; + + let xcats: Vec = ecos + .iter() + .chain(mkts.iter()) + .map(|s| s.to_string()) + .collect(); + + let cids_str: Vec<&str> = cids.iter().map(AsRef::as_ref).collect(); + let xcats_str: Vec<&str> = xcats.iter().map(AsRef::as_ref).collect(); + let download_tickers = msyrs_utils::misc::create_interesecting_tickers(&cids_str, &xcats_str); + + let mut jpmaqs_client = JPMaQSDownload::default(); + let downloaded_df = jpmaqs_client + .get_indicators_qdf(JPMaQSDownloadGetIndicatorArgs { + tickers: download_tickers, + ..Default::default() + }) + .unwrap(); + + let eq_df = msyrs_qdf::reduce_dataframe( + downloaded_df.clone(), + None, + Some(vec!["EQXR_NSA", "EQXR_VT10"]), + None, + // None, + None, + None, + false, + ) + .unwrap(); + + println!("{:?}", eq_df.head(Some(10))); + + let fx_df = msyrs_qdf::reduce_dataframe( + downloaded_df, + None, Some(vec![ - "GBP".to_string(), - "AUD".to_string(), - "USD".to_string(), + "FXXR_NSA", + "FXXR_VT10", + "FXCRR_NSA", + "FXTARGETED_NSA", + "FXUNTRADABLE_NSA", ]), - Some(vec!["RIR_NSA".to_string(), "EQXR_NSA".to_string()]), None, - Some("2010-01-20"), + // None, None, - false, - ) - .unwrap(); - let end = start.elapsed(); - println!("Reduced Mega DataFrame in {:?}", end); - - // FOUND TICKERs - let start = std::time::Instant::now(); - let found_tickers = get_unique_tickers(&df_new); - let end = start.elapsed(); - println!( - "Found {:?} unique tickers in df_new", - found_tickers.unwrap() - ); - println!("Found unique tickers in {:?}", end); - - let end = start.elapsed(); - println!("Loaded DataFrame in {:?}", end); - let start = std::time::Instant::now(); - let df_gbp = reduce_dataframe( - df_new.clone(), - Some(vec!["GBP".to_string()]), - Some(vec!["RIR_NSA".to_string()]), - None, - Some("2024-11-12"), - None, - false, - ) - .unwrap(); - let end = start.elapsed(); - println!("Reduced DataFrame in {:?}", end); - // println!("{:?}", df_gbp.head(Some(10))); - - // FOUND TICKERs - let start = std::time::Instant::now(); - let found_tickers = get_unique_tickers(&mega_df); - let end = start.elapsed(); - println!( - "Found {:?} unique tickers in Mega DataFrame", - found_tickers.unwrap() - ); - println!("Found unique tickers in {:?}", end); - - let start = std::time::Instant::now(); - let df_aud = reduce_dataframe( - df_new.clone(), - Some(vec!["USD".to_string()]), - // Some(vec!["EQXR_NSA".to_string(), "RIR_NSA".to_string()]), - Some(vec!["EQXR_NSA".to_string()]), - None, - Some("2024-11-13"), None, true, ) - .unwrap(); - let end = start.elapsed(); - println!("Reduced DataFrame in {:?}", end); - // dimenstions reduced from to - println!("{:?} from {:?}", df_aud.shape(), df_new.shape()); - // println!("{:?}", df_aud.head(Some(10))); + .unwrap(); - let start = std::time::Instant::now(); - let up_df = update_dataframe(&df_gbp, &df_aud).unwrap(); - let end = start.elapsed(); - println!("Updated DataFrame in {:?}", end); - println!("{:?}", up_df.head(Some(10))); + println!("{:?}", fx_df.head(Some(10))); + + let custom_df = msyrs_qdf::update_dataframe(&fx_df, &eq_df).unwrap(); + + println!("{:?}", custom_df.head(Some(10))); } diff --git a/src/utils/qdf/load.rs b/src/utils/qdf/load.rs index 3005367..62c1165 100644 --- a/src/utils/qdf/load.rs +++ b/src/utils/qdf/load.rs @@ -106,7 +106,7 @@ fn _load_qdf_thread_safe(file_path: &str) -> Result>, xcats: Option>, diff --git a/src/utils/qdf/reduce_df.rs b/src/utils/qdf/reduce_df.rs index ab4d920..a3270bd 100644 --- a/src/utils/qdf/reduce_df.rs +++ b/src/utils/qdf/reduce_df.rs @@ -17,8 +17,8 @@ const QDF_INDEX_COLUMNS: [&str; 3] = ["real_date", "cid", "xcat"]; /// If no filters are provided, the original DataFrame is returned. pub fn reduce_dataframe( df: DataFrame, - cids: Option>, - xcats: Option>, + cids: Option>, + xcats: Option>, metrics: Option>, start: Option<&str>, end: Option<&str>, @@ -36,8 +36,10 @@ pub fn reduce_dataframe( let u_xcats: Vec = get_unique_xcats(&new_df)?; let u_tickers: Vec = _get_unique_strs_from_str_column_object(&ticker_col)?; - let specified_cids: Vec = cids.unwrap_or_else(|| u_cids.clone()); - let specified_xcats: Vec = xcats.unwrap_or_else(|| u_xcats.clone()); + let specified_cids: Vec<&str> = + cids.unwrap_or_else(|| u_cids.iter().map(AsRef::as_ref).collect()); + let specified_xcats: Vec<&str> = + xcats.unwrap_or_else(|| u_xcats.iter().map(AsRef::as_ref).collect()); let non_idx_cols: Vec = new_df .get_column_names() @@ -63,8 +65,17 @@ pub fn reduce_dataframe( let keep_tickers: Vec = match intersect { // true => get_intersecting_cids_str_func(&specified_cids, &specified_xcats, &u_tickers), true => { - let int_cids = - get_intersecting_cids_str_func(&specified_cids, &specified_xcats, &u_tickers); + let int_cids = get_intersecting_cids_str_func( + &specified_cids + .iter() + .map(|&s| s.to_string()) + .collect::>(), + &specified_xcats + .iter() + .map(|&s| s.to_string()) + .collect::>(), + &u_tickers, + ); create_interesecting_tickers( &int_cids.iter().map(AsRef::as_ref).collect::>(), &specified_xcats