From 9c56839b9e2bd587747f7742955c1b397d5303e1 Mon Sep 17 00:00:00 2001 From: Palash Tyagi <23239946+Magnus167@users.noreply.github.com> Date: Fri, 8 Nov 2024 09:32:45 +0000 Subject: [PATCH] working! --- .devcontainer/devcontainer.json | 34 +++++++++++ .gitignore | 5 +- Cargo.lock | 7 --- Cargo.toml | 9 ++- src/main.rs | 73 +++++++++++----------- src/requester.rs | 104 ++++++++++++++++++++++++-------- src/timeseries.rs | 67 ++++++++++++++++---- 7 files changed, 211 insertions(+), 88 deletions(-) create mode 100644 .devcontainer/devcontainer.json diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 0000000..24846c7 --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,34 @@ +// For format details, see https://aka.ms/devcontainer.json. For config options, see the +// README at: https://github.com/devcontainers/templates/tree/main/src/rust +{ + "name": "Rust", + // Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile + "image": "mcr.microsoft.com/devcontainers/rust:1-1-bullseye", + "features": { + "ghcr.io/devcontainers/features/rust:1": {} + } + + // Use 'mounts' to make the cargo cache persistent in a Docker Volume. + // "mounts": [ + // { + // "source": "devcontainer-cargo-cache-${devcontainerId}", + // "target": "/usr/local/cargo", + // "type": "volume" + // } + // ] + + // Features to add to the dev container. More info: https://containers.dev/features. + // "features": {}, + + // Use 'forwardPorts' to make a list of ports inside the container available locally. + // "forwardPorts": [], + + // Use 'postCreateCommand' to run commands after the container is created. + // "postCreateCommand": "rustc --version", + + // Configure tool-specific properties. + // "customizations": {}, + + // Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root. + // "remoteUser": "root" +} diff --git a/.gitignore b/.gitignore index ea8c4bf..b43cdd0 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,4 @@ -/target +.env + + +/target \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 767bf88..2fe347a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -75,12 +75,6 @@ dependencies = [ "libc", ] -[[package]] -name = "anyhow" -version = "1.0.93" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775" - [[package]] name = "argminmax" version = "0.6.2" @@ -1161,7 +1155,6 @@ dependencies = [ name = "macrosynergy_dataquery" version = "0.0.1" dependencies = [ - "anyhow", "log", "polars", "rand", diff --git a/Cargo.toml b/Cargo.toml index 4377fa1..05775e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,8 +8,11 @@ reqwest = { version = "0.12.9", features = ["blocking", "json"] } serde_json = "1.0" serde_urlencoded = "0.7" serde = { version = "1.0", features = ["derive"] } -polars = { version = "0.44.2", features = ["temporal", "lazy"] } -anyhow = "1.0.92" +# polars = { version = "0.44.2", features = ["temporal", "lazy"] } +polars = { version = "0.44.2", features = ["lazy"] } + +# anyhow = "1.0.92" rand = "0.8" threadpool = "1.8.1" -log = "0.4.22" \ No newline at end of file +log = "0.4.22" +# dotenv = "0.15.0" diff --git a/src/main.rs b/src/main.rs index 64eaad2..24792ba 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,25 +1,22 @@ -use log; +// use log; use macrosynergy_dataquery::oauth_client::OAuthClient; use macrosynergy_dataquery::requester; use macrosynergy_dataquery::requester::*; -use macrosynergy_dataquery::timeseries::*; +use macrosynergy_dataquery::timeseries::JPMaQSIndicator; +// use macrosynergy_dataquery::timeseries::*; use rand::seq::SliceRandom; -use std::collections::HashSet; +// use std::collections::HashSet; +// use dotenv::dotenv; use std::time::Instant; fn main() { + // dotenv().ok(); let mut requester = requester::DQRequester::new(OAuthClient::default()); requester.check_connection().unwrap(); - let responses = requester.get_catalogue("JPMAQS", 1000).unwrap(); + let dq_catalogue = requester.get_catalogue("JPMAQS", 1000).unwrap(); - let mut tickers_vec = vec![]; - for rsp in responses { - let response: DQCatalogueResponse = serde_json::from_str(&rsp.to_string()).unwrap(); - for instrument in response.instruments { - tickers_vec.push(instrument.instrument_name); - } - } + let tickers_vec = dq_catalogue.all_instruments; for ticker in tickers_vec.clone().iter().take(5) { println!("{}", ticker); @@ -34,45 +31,43 @@ fn main() { } // make expressions for 10 ticker - let mut expressions_vec = vec![]; let mut shuffled_tickers = tickers_vec.clone(); shuffled_tickers.shuffle(&mut rand::thread_rng()); - for ticker in shuffled_tickers.clone() { - for expr in make_expression(&ticker) { - expressions_vec.push(expr); - } - } - // let rsp = requester.get_timeseries(DQTimeseriesRequestArgs { - // expressions: expressions_vec[0..40].to_vec(), - // ..Default::default() - // }); + let expressions_vec: Vec = shuffled_tickers + .iter() + .flat_map(|ticker| make_expression(ticker)) + .collect(); - // for response in rsp { - // let dq_response: DQTimeSeriesResponse = - // serde_json::from_str(&response.text().unwrap()).unwrap(); - // for ts_group in dq_response.get_timeseries_by_ticker() { - // let jpmi = JPMaQSIndicator::new(ts_group).unwrap(); - // println!("{:?}", jpmi.as_qdf().unwrap()); - // } - // } - - // fetch first 50 expressions let start = Instant::now(); - let responses = requester + let dqts_vec = requester .get_timeseries(DQTimeseriesRequestArgs { - expressions: expressions_vec[0..50].to_vec(), + expressions: expressions_vec[0..1000].to_vec(), ..Default::default() }) .unwrap(); - for response in responses { - let dq_response: DQTimeSeriesResponse = - serde_json::from_str(&response.text().unwrap()).unwrap(); - for ts_group in dq_response.get_timeseries_by_ticker() { - let jpmi = JPMaQSIndicator::new(ts_group).unwrap(); - println!("{:?}", jpmi.as_qdf().unwrap()); + println!("Time elapsed in download step: {:?}", start.elapsed()); + + // let mut df_deets: Vec = Vec::new(); + let mut df_deets: Vec<(String, usize)> = Vec::new(); + let start = Instant::now(); + for dqts in dqts_vec.iter() { + for tsv in dqts.get_timeseries_by_ticker() { + // df_deets.push(tsv.len()); + let jpmaqs_indicator = JPMaQSIndicator::new(tsv).unwrap(); + df_deets.push((jpmaqs_indicator.ticker, jpmaqs_indicator.df.size())); + // for ts in tsv { + // let df = ts.to_dataframe().unwrap(); + // // println!("{}: {}", ts.get_metric().unwrap(), df.size()); + // df_deets.push((ts.get_metric().unwrap(), df.size())); + // // if len df_deets mod 10 print + // if df_deets.len() % 10 == 0 { + // println!("{}: {:?}", df_deets.len(), start.elapsed()); + // } + // } } } + println!("Time elapsed in processing step: {:?}", start.elapsed()); } diff --git a/src/requester.rs b/src/requester.rs index d1790bc..4a98f9f 100644 --- a/src/requester.rs +++ b/src/requester.rs @@ -1,13 +1,20 @@ use crate::oauth_client::OAuthClient; +use crate::timeseries::DQCatalogueResponse; +use crate::timeseries::DQCatalogueSingleResponse; +use crate::timeseries::DQTimeSeriesResponse; +use reqwest; use reqwest::blocking::Client; use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use std::error::Error; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::Duration; const API_BASE_URL: &str = "https://api-developer.jpmorgan.com/research/dataquery-authe/api/v2"; const HEARTBEAT_ENDPOINT: &str = "/services/heartbeat"; const TIMESERIES_ENDPOINT: &str = "/expressions/time-series"; const CATALOGUE_ENDPOINT: &str = "/group/instruments"; -const JPMAQS_CATALOGUE_GROUP: &str = "JPMAQS"; +// const JPMAQS_CATALOGUE_GROUP: &str = "JPMAQS"; #[derive(Debug, Clone)] pub struct DQRequester { @@ -131,8 +138,9 @@ impl DQRequester { &mut self, catalogue_group: &str, page_size: u32, - ) -> Result, Box> { - let mut responses: Vec = Vec::new(); + // ) -> Result, Box> { + ) -> Result> { + let mut responses: Vec = Vec::new(); if page_size < 1 || page_size > 1000 { return Err("Page size must be between 1 and 1000".into()); @@ -152,7 +160,9 @@ impl DQRequester { } let json: serde_json::Value = response.json()?; - responses.push(json.clone()); + let dq_catalogue_response: DQCatalogueSingleResponse = + serde_json::from_str(&json.to_string())?; + responses.push(dq_catalogue_response); let links = json.get("links"); let links_array = links.and_then(|links| links.as_array()); @@ -164,7 +174,7 @@ impl DQRequester { next_page = next_link; } - Ok(responses) + Ok(DQCatalogueResponse::new(responses)) } pub fn _fetch_single_timeseries_batch( @@ -216,34 +226,78 @@ impl DQRequester { &mut self, args: DQTimeseriesRequestArgs, max_retries: u32, - ) -> Result, Box> { + // ) -> Result, Box> { + ) -> Result, Box> { let expression_batches: Vec> = args .expressions .chunks(20) .map(|chunk| chunk.to_vec()) .collect(); - let mut okay_responses: Vec = Vec::new(); - let mut failed_batches: Vec> = Vec::new(); + let okay_responses = Arc::new(Mutex::new(Vec::new())); + let failed_batches = Arc::new(Mutex::new(Vec::new())); + let client = Arc::new(Mutex::new(self.clone())); + + let mut handles = vec![]; + let total_batches = expression_batches.len(); + let mut curr_batch = 0; for expr_batch in expression_batches { + curr_batch += 1; let mut args = args.clone(); args.update_expressions(expr_batch.clone()); - let response = self._fetch_single_timeseries_batch(args); - std::thread::sleep(std::time::Duration::from_millis(200)); - match response { - Ok(r) => { - okay_responses.push(r); + let okay_responses = Arc::clone(&okay_responses); + let failed_batches = Arc::clone(&failed_batches); + let client = Arc::clone(&client); + // if curr_batch mod 100 == 0 print progress + log::info!("Processed {} out of {} batches", curr_batch, total_batches); + thread::sleep(Duration::from_millis(200)); + let handle = thread::spawn(move || { + let response = client.lock().unwrap()._fetch_single_timeseries_batch(args); - log::info!("Got batch: {:?}", expr_batch); + match response { + Ok(r) => { + // Attempt to parse the response text + match serde_json::from_str::(&r.text().unwrap()) { + Ok(dq_response) => { + okay_responses.lock().unwrap().push(dq_response); + log::info!("Got batch: {:?}", expr_batch); + } + Err(e) => { + // If parsing fails, treat this as a batch failure + failed_batches.lock().unwrap().push(expr_batch.clone()); + log::error!( + "Failed to parse timeseries: {:?} : {:?}", + expr_batch, + e + ); + } + } + } + Err(e) => { + // Handle _fetch_single_timeseries_batch error + failed_batches.lock().unwrap().push(expr_batch.clone()); + log::error!("Failed to get batch: {:?} : {:?}", expr_batch, e); + } } - Err(e) => { - failed_batches.push(expr_batch.clone()); - log::error!("Failed to get batch: {:?} : {:?}", expr_batch, e); - } - } + }); + + handles.push(handle); } + for handle in handles { + handle.join().unwrap(); + } + + let mut okay_responses = Arc::try_unwrap(okay_responses) + .unwrap() + .into_inner() + .unwrap(); + let failed_batches = Arc::try_unwrap(failed_batches) + .unwrap() + .into_inner() + .unwrap(); + if !failed_batches.is_empty() && max_retries == 0 { return Err("Max retries reached".into()); } @@ -257,6 +311,7 @@ impl DQRequester { self._fetch_timeseries_recursive(new_args, max_retries - 1)?; okay_responses.append(&mut retry_responses); } + log::info!("Returning {} responses", okay_responses.len()); Ok(okay_responses) } @@ -264,7 +319,7 @@ impl DQRequester { pub fn get_timeseries( &mut self, args: DQTimeseriesRequestArgs, - ) -> Result, Box> { + ) -> Result, Box> { let max_retries = 5; println!( "Invoking recursive function for {:?} expressions", @@ -285,12 +340,11 @@ fn main() { let mut requester = DQRequester::new(oauth_client); requester.check_connection().unwrap(); - let response = requester - .get_catalogue(JPMAQS_CATALOGUE_GROUP, 1000) - .unwrap(); + // let response = requester + // .get_catalogue(JPMAQS_CATALOGUE_GROUP, 1000) + // .unwrap(); - let json_data = response[0].to_string(); - println!("{}", json_data); + // let json_data = response // try to pull into diff --git a/src/timeseries.rs b/src/timeseries.rs index e6492bb..ce78327 100644 --- a/src/timeseries.rs +++ b/src/timeseries.rs @@ -16,6 +16,33 @@ pub struct DQTimeSeriesResponse { #[derive(Deserialize, Debug)] pub struct DQCatalogueResponse { + pub items: u32, + pub catalogue_responses: Vec, + pub all_instruments: Vec, +} + +impl DQCatalogueResponse { + pub fn new(catalogue_responses: Vec) -> Self { + let all_instruments: Vec = catalogue_responses + .iter() + .flat_map(|response| { + response + .instruments + .iter() + .map(|instrument| instrument.instrument_name.clone()) + }) + .collect(); + + DQCatalogueResponse { + items: all_instruments.len() as u32, + catalogue_responses: catalogue_responses, + all_instruments: all_instruments, + } + } +} + +#[derive(Deserialize, Debug)] +pub struct DQCatalogueSingleResponse { pub links: Vec>>, pub items: u32, pub instruments: Vec, @@ -108,6 +135,7 @@ impl DQTimeSeries { } impl DQTimeSeriesResponse { + /// Return a list of all expressions in the response pub fn list_expressions(&self) -> Vec { self.instruments .iter() @@ -135,21 +163,34 @@ impl DQTimeSeriesResponse { /// Return a list of lists of DQTimeSeries, where each list contains all the timeseries for each ticker pub fn get_timeseries_by_ticker(&self) -> Vec> { - // create a hashmap where the key is the ticker and the value is a list of timeseries - let mut timeseries_by_ticker: HashMap> = HashMap::new(); - for instrument in &self.instruments { - for attribute in &instrument.attributes { - let ticker = attribute.expression.split(',').nth(1).unwrap().to_string(); - let timeseries = DQTimeSeries { + // let mut timeseries_by_ticker: HashMap> = HashMap::new(); + // for instrument in &self.instruments { + // for attribute in &instrument.attributes { + // let ticker = attribute.expression.split(',').nth(1).unwrap().to_string(); + // let timeseries = DQTimeSeries { + // expression: attribute.expression.clone(), + // time_series: attribute.time_series.clone(), + // }; + // timeseries_by_ticker + // .entry(ticker) + // .or_insert_with(Vec::new) + // .push(timeseries); + // } + // } + let timeseries_by_ticker = self + .instruments + .iter() + .flat_map(|instrument| { + instrument.attributes.iter().map(|attribute| DQTimeSeries { expression: attribute.expression.clone(), time_series: attribute.time_series.clone(), - }; - timeseries_by_ticker - .entry(ticker) - .or_insert_with(Vec::new) - .push(timeseries); - } - } + }) + }) + .fold(HashMap::new(), |mut acc, ts| { + let ticker = ts.get_ticker().unwrap(); + acc.entry(ticker).or_insert_with(Vec::new).push(ts); + acc + }); timeseries_by_ticker.into_iter().map(|(_, v)| v).collect() }