diff --git a/src/download/jpmaqsdownload.rs b/src/download/jpmaqsdownload.rs new file mode 100644 index 0000000..ada139f --- /dev/null +++ b/src/download/jpmaqsdownload.rs @@ -0,0 +1,94 @@ +use crate::download::oauth_client::OAuthClient; +use crate::download::requester::DQRequester; +use crate::download::requester::DQTimeseriesRequestArgs; +use crate::download::timeseries::DQTimeSeriesResponse; +use crate::download::timeseries::JPMaQSIndicator; +use std::error::Error; + +const DEFAULT_JPMAQS_METRICS: [&str; 4] = ["value", "grading", "eop_lag", "mop_lag"]; + +fn ticker_to_expressions(ticker: &str) -> Vec { + DEFAULT_JPMAQS_METRICS + .iter() + .map(|metric| format!("DB(JPMAQS,{},{})", ticker, metric)) + .collect::>() +} + +fn construct_expressions(tickers: Vec) -> Vec { + tickers + .iter() + .flat_map(|ticker| ticker_to_expressions(ticker)) + .collect() +} + +fn is_jpmaq_expression(expression: &str) -> bool { + expression.starts_with("DB(JPMAQS,") + && expression.ends_with(")") + && expression.split(',').count() == 3 + && expression.split(',').nth(0).unwrap() == "DB(JPMAQS" + && expression.split(',').nth(2).unwrap().ends_with(")") +} + +fn all_jpmaq_expressions(expressions: Vec) -> bool { + expressions + .iter() + .all(|expression| is_jpmaq_expression(expression)) +} + +#[derive(Debug, Clone)] +pub struct JPMaQSDownload { + requester: DQRequester, +} + +impl Default for JPMaQSDownload { + fn default() -> Self { + let requester = DQRequester::default(); + JPMaQSDownload { requester } + } +} + +impl JPMaQSDownload { + pub fn new(client_id: String, client_secret: String) -> Self { + let oauth_client = OAuthClient::new(client_id.clone(), client_secret.clone()); + let requester = DQRequester::new(oauth_client); + JPMaQSDownload { requester } + } + + pub fn check_connection(&mut self) -> Result<(), Box> { + self.requester.check_connection() + } + + pub fn get_catalogue(&mut self) -> Result, Box> { + let dq_catalogue = self.requester.get_catalogue("JPMAQS", 1000)?; + Ok(dq_catalogue.all_instruments) + } + + pub fn get_expressions( + &mut self, + expressions: Vec, + ) -> Result, Box> { + let dqts_vec = self.requester.get_timeseries(DQTimeseriesRequestArgs { + expressions: expressions, + ..Default::default() + })?; + + Ok(dqts_vec) + } + + pub fn get_indicators( + &mut self, + tickers: Vec, + ) -> Result, Box> { + let expressions = construct_expressions(tickers); + assert!(all_jpmaq_expressions(expressions.clone())); + let dqts_vec = self.get_expressions(expressions)?; + + let indicators = dqts_vec + .iter() + .flat_map(|dqts| dqts.get_timeseries_by_ticker()) + .map(|tsv| JPMaQSIndicator::new(tsv)) + .collect::, Box>>()?; + + Ok(indicators) + } +} diff --git a/src/download/mod.rs b/src/download/mod.rs new file mode 100644 index 0000000..d83c686 --- /dev/null +++ b/src/download/mod.rs @@ -0,0 +1,4 @@ +pub mod jpmaqsdownload; +pub mod oauth_client; +pub mod requester; +pub mod timeseries; diff --git a/src/oauth_client.rs b/src/download/oauth_client.rs similarity index 100% rename from src/oauth_client.rs rename to src/download/oauth_client.rs diff --git a/src/requester.rs b/src/download/requester.rs similarity index 90% rename from src/requester.rs rename to src/download/requester.rs index 4a98f9f..d871a39 100644 --- a/src/requester.rs +++ b/src/download/requester.rs @@ -1,7 +1,7 @@ -use crate::oauth_client::OAuthClient; -use crate::timeseries::DQCatalogueResponse; -use crate::timeseries::DQCatalogueSingleResponse; -use crate::timeseries::DQTimeSeriesResponse; +use crate::download::oauth_client::OAuthClient; +use crate::download::timeseries::DQCatalogueResponse; +use crate::download::timeseries::DQCatalogueSingleResponse; +use crate::download::timeseries::DQTimeSeriesResponse; use reqwest; use reqwest::blocking::Client; use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; @@ -90,6 +90,15 @@ impl Default for DQTimeseriesRequestArgs { } } +impl Default for DQRequester { + fn default() -> Self { + DQRequester { + oauth_client: OAuthClient::default(), + rqclient: Client::new(), + } + } +} + impl DQRequester { pub fn new(oauth_client: OAuthClient) -> Self { DQRequester { @@ -197,36 +206,10 @@ impl DQRequester { Ok(response) } - // pub fn get_timeseries( - // &mut self, - // args: DQTimeseriesRequestArgs, - // ) -> Vec { - // let expressions_chunks: Vec> = args - // .expressions - // .chunks(20) - // .map(|chunk| chunk.to_vec()) - // .collect(); - - // let mut responses: Vec>> = Vec::new(); - - // for expressions in expressions_chunks { - // let mut args = args.clone(); - // args.update_expressions(expressions); - // let response = self.get_single_timeseries_batch(args); - // responses.push(response); - // // just sleep for a bit not threadsleep - // // println!("Sleeping for 500ms"); - // std::thread::sleep(std::time::Duration::from_millis(500)); - // } - - // responses.into_iter().map(|r| r.unwrap()).collect() - // } - fn _fetch_timeseries_recursive( &mut self, args: DQTimeseriesRequestArgs, max_retries: u32, - // ) -> Result, Box> { ) -> Result, Box> { let expression_batches: Vec> = args .expressions diff --git a/src/timeseries.rs b/src/download/timeseries.rs similarity index 94% rename from src/timeseries.rs rename to src/download/timeseries.rs index ce78327..cdb45ff 100644 --- a/src/timeseries.rs +++ b/src/download/timeseries.rs @@ -163,20 +163,6 @@ impl DQTimeSeriesResponse { /// Return a list of lists of DQTimeSeries, where each list contains all the timeseries for each ticker pub fn get_timeseries_by_ticker(&self) -> Vec> { - // let mut timeseries_by_ticker: HashMap> = HashMap::new(); - // for instrument in &self.instruments { - // for attribute in &instrument.attributes { - // let ticker = attribute.expression.split(',').nth(1).unwrap().to_string(); - // let timeseries = DQTimeSeries { - // expression: attribute.expression.clone(), - // time_series: attribute.time_series.clone(), - // }; - // timeseries_by_ticker - // .entry(ticker) - // .or_insert_with(Vec::new) - // .push(timeseries); - // } - // } let timeseries_by_ticker = self .instruments .iter() diff --git a/src/dqinterface.rs b/src/dqinterface.rs deleted file mode 100644 index e69de29..0000000 diff --git a/src/lib.rs b/src/lib.rs index f312f02..674b799 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1 @@ -pub mod oauth_client; -pub mod requester; -pub mod timeseries; -// pub mod main; +pub mod download; diff --git a/src/main.rs b/src/main.rs index 990a907..fe3df3e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,73 +1,45 @@ -// use log; -use msyrs::oauth_client::OAuthClient; -use msyrs::requester; -use msyrs::requester::*; -use msyrs::timeseries::JPMaQSIndicator; -// use msyrs::timeseries::*; -use rand::seq::SliceRandom; -// use std::collections::HashSet; -// use dotenv::dotenv; -use std::time::Instant; +use msyrs::download::jpmaqsdownload::JPMaQSDownload; fn main() { - // dotenv().ok(); - let mut requester = requester::DQRequester::new(OAuthClient::default()); - requester.check_connection().unwrap(); + println!("Authentication to DataQuery API"); + let mut jpamqs_download = JPMaQSDownload::default(); - let dq_catalogue = requester.get_catalogue("JPMAQS", 1000).unwrap(); + println!("Checking connection to DataQuery API"); + jpamqs_download.check_connection().unwrap(); - let tickers_vec = dq_catalogue.all_instruments; + println!("Retrieving catalogue of tickers"); + let mut start = std::time::Instant::now(); + let tickers = jpamqs_download.get_catalogue().unwrap(); + println!( + "Retrieved catalogue with {} tickers in {:?}", + tickers.len(), + start.elapsed() + ); - for ticker in tickers_vec.clone().iter().take(5) { - println!("{}", ticker); - } - - fn make_expression(ticker: &str) -> Vec { - // return metrics - return ["value", "grading", "eop_lag", "mop_lag"] - .iter() - .map(|metric| format!("DB(JPMAQS,{},{})", ticker, metric)) - .collect::>(); - } - - // make expressions for 10 ticker - let mut shuffled_tickers = tickers_vec.clone(); - shuffled_tickers.shuffle(&mut rand::thread_rng()); - - let expressions_vec: Vec = shuffled_tickers + let num_ticks = 1000; + let sel_tickers: Vec = tickers .iter() - .flat_map(|ticker| make_expression(ticker)) + .take(num_ticks) + .map(|s| s.to_string()) .collect(); + let mut df_deets = Vec::new(); - let start = Instant::now(); + println!("Retrieving indicators for {} tickers", sel_tickers.len()); + start = std::time::Instant::now(); + let indicators = jpamqs_download.get_indicators(sel_tickers.clone()).unwrap(); - let dqts_vec = requester - .get_timeseries(DQTimeseriesRequestArgs { - expressions: expressions_vec[0..1000].to_vec(), - ..Default::default() - }) - .unwrap(); + println!( + "Retrieved indicators for {} tickers in {:?}", + sel_tickers.len(), + start.elapsed() + ); - println!("Time elapsed in download step: {:?}", start.elapsed()); - - // let mut df_deets: Vec = Vec::new(); - let mut df_deets: Vec<(String, usize)> = Vec::new(); - let start = Instant::now(); - for dqts in dqts_vec.iter() { - for tsv in dqts.get_timeseries_by_ticker() { - // df_deets.push(tsv.len()); - let jpmaqs_indicator = JPMaQSIndicator::new(tsv).unwrap(); - df_deets.push((jpmaqs_indicator.ticker, jpmaqs_indicator.df.size())); - // for ts in tsv { - // let df = ts.to_dataframe().unwrap(); - // // println!("{}: {}", ts.get_metric().unwrap(), df.size()); - // df_deets.push((ts.get_metric().unwrap(), df.size())); - // // if len df_deets mod 10 print - // if df_deets.len() % 10 == 0 { - // println!("{}: {:?}", df_deets.len(), start.elapsed()); - // } - // } - } + start = std::time::Instant::now(); + for indicator in indicators { + df_deets.push((indicator.ticker.clone(), indicator.as_qdf().unwrap().size())); } - println!("Time elapsed in processing step: {:?}", start.elapsed()); + println!( + "Converted indicators to DataFrames in {:?}", + start.elapsed() + ); }