diff --git a/Cargo.lock b/Cargo.lock index ae33e74..92d82c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1248,6 +1248,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", + "threadpool", ] [[package]] @@ -1378,6 +1379,16 @@ dependencies = [ "libm", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "object" version = "0.36.5" @@ -2995,6 +3006,15 @@ dependencies = [ "syn 2.0.86", ] +[[package]] +name = "threadpool" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" +dependencies = [ + "num_cpus", +] + [[package]] name = "tinyvec" version = "1.8.0" diff --git a/Cargo.toml b/Cargo.toml index 11b7d0e..8e9fdc8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,3 +11,4 @@ serde = { version = "1.0", features = ["derive"] } polars = { version = "0.44.2", features = ["temporal", "lazy"] } anyhow = "1.0" rand = "0.8" +threadpool = "1.8.1" diff --git a/src/main.rs b/src/main.rs index 384ec96..1214faa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,60 +2,14 @@ use macrosynergy_dataquery::oauth_client::OAuthClient; use macrosynergy_dataquery::requester; use macrosynergy_dataquery::requester::*; use macrosynergy_dataquery::timeseries::*; -use rand::seq::SliceRandom; // Import the trait for `shuffle` -use std::env; +use rand::seq::SliceRandom; fn main() { - // let client_id = env::var("DQ_CLIENT_ID").unwrap(); - // let client_secret = env::var("DQ_CLIENT_SECRET").unwrap(); - - // let mut oauth_client = OAuthClient::new(client_id, client_secret); - // oauth_client.fetch_token().unwrap(); - - // let mut requester = DQRequester::new(oauth_client); let mut requester = requester::DQRequester::new(OAuthClient::default()); requester.check_connection().unwrap(); - // let expressions = vec![ - // "DB(JPMAQS,USD_EQXR_NSA,value)", - // "DB(JPMAQS,USD_EQXR_NSA,grading)", - // "DB(JPMAQS,USD_EQXR_NSA,eop_lag)", - // "DB(JPMAQS,USD_EQXR_NSA,mop_lag)", - // "DB(JPMAQS,GBP_EQXR_NSA,value)", - // "DB(JPMAQS,GBP_EQXR_NSA,grading)", - // "DB(JPMAQS,GBP_EQXR_NSA,eop_lag)", - // "DB(JPMAQS,GBP_EQXR_NSA,mop_lag)", - // ]; - - // expressions_b.len(); - - // let response = requester - // .get_timeseries_with_defaults(expressions_a) - // .unwrap(); - - // let json_data = response.text().unwrap(); - // let dqresponse: DQResponse = serde_json::from_str(&json_data).unwrap(); - // for ts_group in dqresponse.get_timeseries_by_ticker() { - // let jpmi = JPMaQSIndicator::new(ts_group).unwrap(); - // println!("{:?}", jpmi.as_qdf().unwrap()); - // } - - // get catalog let responses = requester.get_catalogue("JPMAQS", 1000).unwrap(); - // let json_data = response.text().unwrap(); - // let json_data = response[0].to_string(); - // println!("{}", json_data); - - // // try to pull into DQResponse - // let response: DQCatalogueResponse = serde_json::from_str(&json_data).unwrap(); - - // println!("{:?}", response); - - // // print all instrument names - // for instrument in response.instruments { - // println!("{}", instrument.instrument_name); - // } let mut tickers_vec = vec![]; for rsp in responses { @@ -91,14 +45,17 @@ fn main() { expressions: expressions_vec[0..20].to_vec(), ..Default::default() }; - // on the first 20 expressions - let response = requester.get_timeseries_batch(rqargs).unwrap(); + let rsp = requester.get_timeseries(DQTimeseriesRequestArgs { + expressions: expressions_vec[0..100].to_vec(), + ..Default::default() + }); - let dq_response: DQTimeSeriesResponse = - serde_json::from_str(&response.text().unwrap()).unwrap(); - - for ts_group in dq_response.get_timeseries_by_ticker() { - let jpmi = JPMaQSIndicator::new(ts_group).unwrap(); - println!("{:?}", jpmi.as_qdf().unwrap()); + for response in rsp { + let dq_response: DQTimeSeriesResponse = + serde_json::from_str(&response.text().unwrap()).unwrap(); + for ts_group in dq_response.get_timeseries_by_ticker() { + let jpmi = JPMaQSIndicator::new(ts_group).unwrap(); + println!("{:?}", jpmi.as_qdf().unwrap()); + } } } diff --git a/src/oauth_client.rs b/src/oauth_client.rs index 4e415ce..f71ebfd 100644 --- a/src/oauth_client.rs +++ b/src/oauth_client.rs @@ -8,7 +8,7 @@ use std::time::{Duration, SystemTime}; pub const OAUTH_TOKEN_URL: &str = "https://authe.jpmchase.com/as/token.oauth2"; pub const OAUTH_RESOURCE_ID: &str = "JPMC:URI:RS-06785-DataQueryExternalApi-PROD"; - +#[derive(Debug, Clone)] pub struct OAuthClient { client_id: String, client_secret: String, diff --git a/src/requester.rs b/src/requester.rs index 4d5e9de..379712f 100644 --- a/src/requester.rs +++ b/src/requester.rs @@ -1,20 +1,21 @@ +use crate::oauth_client::OAuthClient; use reqwest::blocking::Client; use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use std::error::Error; -use crate::oauth_client::OAuthClient; - const API_BASE_URL: &str = "https://api-developer.jpmorgan.com/research/dataquery-authe/api/v2"; const HEARTBEAT_ENDPOINT: &str = "/services/heartbeat"; const TIMESERIES_ENDPOINT: &str = "/expressions/time-series"; const CATALOGUE_ENDPOINT: &str = "/group/instruments"; const JPMAQS_CATALOGUE_GROUP: &str = "JPMAQS"; +#[derive(Debug, Clone)] pub struct DQRequester { oauth_client: OAuthClient, rqclient: Client, } +#[derive(Debug, Clone)] pub struct DQTimeseriesRequestArgs { pub start_date: String, pub end_date: String, @@ -166,7 +167,7 @@ impl DQRequester { Ok(responses) } - pub fn get_timeseries_batch( + pub fn get_single_timeseries_batch( &mut self, args: DQTimeseriesRequestArgs, ) -> Result> { @@ -181,17 +182,46 @@ impl DQRequester { Ok(response) } - pub fn get_timeseries_with_defaults( + pub fn get_single_timeseries_with_defaults( &mut self, expressions: Vec<&str>, ) -> Result> { - // replace just the expressions let args = DQTimeseriesRequestArgs { expressions: expressions.into_iter().map(|s| s.to_string()).collect(), ..Default::default() }; - self.get_timeseries_batch(args) + self.get_single_timeseries_batch(args) + } + + pub fn get_timeseries( + &mut self, + args: DQTimeseriesRequestArgs, + ) -> Vec { + let expressions_chunks: Vec> = args + .expressions + .chunks(20) + .map(|chunk| chunk.to_vec()) + .collect(); + + let mut responses: Vec>> = Vec::new(); + + for expressions in expressions_chunks { + let mut args = args.clone(); + args.update_expressions(expressions); + println!( + "Requesting timeseries for expressions: {:?}", + args.expressions + ); + let response = self.get_single_timeseries_batch(args); + responses.push(response); + // just sleep for a bit not threadsleep + println!("Sleeping for 500ms"); + std::thread::sleep(std::time::Duration::from_millis(500)); + + } + + responses.into_iter().map(|r| r.unwrap()).collect() } }