diff --git a/Cargo.lock b/Cargo.lock index 563c195..bc4d96d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2559,18 +2559,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.214" +version = "1.0.215" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f55c3193aca71c12ad7890f1785d2b73e1b9f63a0bbc353c08ef26fe03fc56b5" +checksum = "6513c1ad0b11a9376da888e3e0baa0077f1aed55c17f50e7b2397136129fb88f" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.214" +version = "1.0.215" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de523f781f095e28fa605cdce0f8307e451cc0fd14e2eb4cd2e98a355b147766" +checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 645f0c1..936a73f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" reqwest = { version = "0.12.9", features = ["blocking", "json"] } serde_json = "1.0" serde_urlencoded = "0.7" -serde = { version = "1.0", features = ["derive"] } +serde = { version = "1.0.215", features = ["derive"] } polars = { version = "0.44.2", features = ["lazy"] } rand = "0.8" threadpool = "1.8.1" diff --git a/README.md b/README.md new file mode 100644 index 0000000..f5e94af --- /dev/null +++ b/README.md @@ -0,0 +1,35 @@ +# msyrs + +A Rust implementation of the [Macrosynergy Python Package](https://github.com/macrosynergy/macrosynergy). + +## Status + +- Docs + - [x] Building + - [ ] written? + +- Download + - [x] OAuth workflow + - [x] Heartbeat + - [x] Get Catalogue + - [x] Get Generic DQ Time Series + - [x] Get JPMaQS Indicators (formatted as array) + - [x] Get JPMaQS Indicators as Polars DataFrame + - [x] Add QDF option + - [ ] Save to disk functionality + +- Utils + - [ ] Reduce DF + - [ ] Reduce DF by ticker + - [ ] Apply Blacklist + - [ ] Get Blacklist + - [ ] Apply Slip + + +- Panel + - [ ] Historic Volatility + - [ ] Linear Composites + - [ ] Make Relative Value + - [ ] Imputers + - [ ] + diff --git a/docs/Cargo.toml b/docs/Cargo.toml new file mode 100644 index 0000000..1e72233 --- /dev/null +++ b/docs/Cargo.toml @@ -0,0 +1,6 @@ +[package] +name = "docs" +version = "0.1.0" +edition = "2021" + +[dependencies] diff --git a/docs/src/lib.rs b/docs/src/lib.rs new file mode 100644 index 0000000..b93cf3f --- /dev/null +++ b/docs/src/lib.rs @@ -0,0 +1,14 @@ +pub fn add(left: u64, right: u64) -> u64 { + left + right +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn it_works() { + let result = add(2, 2); + assert_eq!(result, 4); + } +} diff --git a/src/download/jpmaqsdownload.rs b/src/download/jpmaqsdownload.rs index 9d11fda..e889e8e 100644 --- a/src/download/jpmaqsdownload.rs +++ b/src/download/jpmaqsdownload.rs @@ -1,10 +1,8 @@ use crate::download::oauth_client::OAuthClient; use crate::download::requester::DQRequester; -use crate::download::timeseries::DQTimeSeries; use crate::download::timeseries::DQTimeSeriesResponse; use crate::download::timeseries::DQTimeseriesRequestArgs; use crate::download::timeseries::JPMaQSIndicator; -use rayon::prelude::*; use std::error::Error; const DEFAULT_JPMAQS_METRICS: [&str; 4] = ["value", "grading", "eop_lag", "mop_lag"]; @@ -114,53 +112,21 @@ impl JPMaQSDownload { let expressions = construct_expressions(download_args.tickers, download_args.metrics); assert!(all_jpmaq_expressions(expressions.clone())); - let dqts_vec = self.get_expressions(expressions)?; - // println!("Retrieved {} time series", -- sum[dqts_vec.iter().map(|dqts| dqts.len())]); - println!("Retrieved all time series",); - // say pausing for 30 seconds - println!("Pausing for 10 seconds"); - std::thread::sleep(std::time::Duration::from_secs(10)); - println!("Resuming"); - let start = std::time::Instant::now(); + let dq_download_args = DQTimeseriesRequestArgs { + expressions: expressions, + start_date: download_args.start_date, + end_date: download_args.end_date, + ..Default::default() + }; - let indicators = dq_response_vec_to_jpmaqs_indicators(dqts_vec); + let result = self + .requester + .get_timeseries_as_jpmaqs_indicators(dq_download_args); - println!( - "Converted time series to indicators in {:?}", - start.elapsed() - ); - println!("Pausing for 10 seconds"); - std::thread::sleep(std::time::Duration::from_secs(10)); - println!("Resuming"); - Ok(indicators) + match result { + Ok(indicators) => Ok(indicators), + Err(e) => Err(e), + } } } - -// fn dq_response_vec_to_jpmaqs_indicators( -// dqts_vec: Vec, -// ) -> Vec { -// let mut indicators: Vec = Vec::new(); -// for dqts in dqts_vec { -// indicators.extend( -// dqts.consume_to_grouped_by_ticker() // moves the values to free up memory -// .into_iter() -// .filter_map(|tsv| JPMaQSIndicator::new(tsv).ok()), -// ); -// } - -// indicators -// } -fn dq_response_vec_to_jpmaqs_indicators( - dqts_vec: Vec, -) -> Vec { - dqts_vec - .into_par_iter() - .flat_map(|dqts| { - dqts.consume_to_grouped_by_ticker() - .into_iter() - .filter_map(|tsv| JPMaQSIndicator::new(tsv).ok()) - .collect::>() - }) - .collect() -} diff --git a/src/download/requester.rs b/src/download/requester.rs index ee9d26b..ea1fc7e 100644 --- a/src/download/requester.rs +++ b/src/download/requester.rs @@ -146,6 +146,24 @@ impl DQRequester { &mut self, args: DQTimeseriesRequestArgs, ) -> Result, Box> { + let max_retries = 5; + println!( + "Invoking ParallelRequester for {:?} expressions", + args.expressions.len() + ); + let mut pq = ParallelRequester::new(self.oauth_client.clone()); + let response_texts = match pq.request_expressions(args, max_retries) { + Ok(r) => r, + Err(e) => return Err(e), + }; + let dqts_vec: Vec = parse_response_texts(response_texts); + Ok(dqts_vec) + } + + pub fn get_timeseries_as_jpmaqs_indicators( + &mut self, + args: DQTimeseriesRequestArgs, + ) -> Result, Box> { let max_retries = 5; println!( "Invoking ParallelRequester for {:?} expressions", @@ -154,7 +172,7 @@ impl DQRequester { let mut pq = ParallelRequester::new(self.oauth_client.clone()); let start = std::time::Instant::now(); - let response_texts = match pq.request_expressions(args, max_retries) { + let mut response_texts = match pq.request_expressions(args, max_retries) { Ok(r) => r, Err(e) => return Err(e), }; @@ -163,13 +181,19 @@ impl DQRequester { start.elapsed() ); - let dqts_vec: Vec = parse_response_texts(response_texts); + // sleep for 10 seconds + println!("Pausing for 10 seconds"); + std::thread::sleep(std::time::Duration::from_secs(10)); + println!("Resuming - parsing response texts to JPMaQSIndicators"); + + let jpmaqs_indicators: Vec = + parse_response_texts_to_jpmaqs_indicators(&mut response_texts); // Sleep for 10 seconds println!("Pausing for 10 seconds"); std::thread::sleep(std::time::Duration::from_secs(10)); println!("Resuming"); - Ok(dqts_vec) + Ok(jpmaqs_indicators) } } @@ -188,6 +212,54 @@ fn parse_response_texts(response_texts: Vec) -> Vec, +) -> Vec { + // Create an empty hashmap of jpmaqs indicators + let mut jpmaqs_indicators_map: std::collections::HashMap = + std::collections::HashMap::new(); + + // Iterate over the response texts by taking and consuming each element + while let Some(response_text) = response_texts.pop() { + // Attempt to deserialize the response text + let json_res = serde_json::from_str::(&response_text); + + // Free the memory occupied by response_text immediately after parsing + drop(response_text); + + let dqts_res = match json_res { + Ok(dqts) => dqts, + Err(err) => { + eprintln!("Failed to deserialize response: {}", err); + continue; + } + }; + + let grouped_by_ticker = dqts_res.consume_to_grouped_by_ticker(); + + for ts_vec in grouped_by_ticker { + let curr_ticker = ts_vec[0].get_ticker().unwrap(); + if let Some(existing_jpmaqs_indicator) = jpmaqs_indicators_map.get_mut(&curr_ticker) { + for tsv in ts_vec { + match existing_jpmaqs_indicator.add_timeseries(tsv) { + Ok(_) => {} + Err(e) => { + eprintln!("Failed to add timeseries: {}", e); + } + } + } + } else { + let jpmaqs_indicator = JPMaQSIndicator::new(ts_vec); + jpmaqs_indicators_map.insert(curr_ticker.into(), jpmaqs_indicator.unwrap()); + } + } + } + + println!("Number of responses left: {}", response_texts.len()); + jpmaqs_indicators_map.into_iter().map(|(_, v)| v).collect() +} + #[allow(dead_code)] fn main() { let client_id = std::env::var("DQ_CLIENT_ID").unwrap(); diff --git a/src/download/timeseries.rs b/src/download/timeseries.rs index fb0746c..6835fa8 100644 --- a/src/download/timeseries.rs +++ b/src/download/timeseries.rs @@ -153,6 +153,7 @@ impl Attribute { } /// Get the metric from the expression + #[allow(dead_code)] pub fn get_metric(&self) -> Result> { if !self.expression.starts_with("DB(JPMAQS,") { return Err("Expression does not start with 'DB(JPMAQS,'".into()); @@ -330,6 +331,14 @@ impl JPMaQSIndicator { }) } + pub fn add_timeseries(&mut self, timeseries: DQTimeSeries) -> Result<(), Box> { + if self.ticker != timeseries.get_ticker()? { + return Err("Timeseries does not belong to the same ticker".into()); + } + add_timeseries_to_df(&mut self.df, timeseries)?; + Ok(()) + } + pub fn as_qdf(&self) -> Result> { let mut qdf = self.df.clone(); let (cid, xcat) = match self.ticker.split_once('_') { @@ -405,6 +414,23 @@ fn timeseries_list_to_dataframe( Ok(output_df) } +fn add_timeseries_to_df( + df: &mut DataFrame, + timeseries: DQTimeSeries, +) -> Result<(), Box> { + let mut new_df = timeseries.to_dataframe()?; + let curr_metric = timeseries.get_metric()?; + let column_name = new_df.get_column_names()[1].to_string(); + new_df + .rename(&column_name, curr_metric.into()) + .expect("Failed to rename column"); + + *df = df + .left_join(&new_df, ["real_date"], ["real_date"]) + .expect("Left join failed"); + Ok(()) +} + fn sort_qdf_columns(qdf: &mut DataFrame) -> Result<(), Box> { let index_columns = ["real_date", "cid", "xcat"]; let known_metrics = ["value", "grading", "eop_lag", "mop_lag"];