From c69454fe2f88c70b0b0e317b0ed57c197618e5e7 Mon Sep 17 00:00:00 2001 From: Palash Tyagi <23239946+Magnus167@users.noreply.github.com> Date: Tue, 12 Nov 2024 23:56:37 +0000 Subject: [PATCH] working --- docs/Cargo.toml | 6 --- docs/src/lib.rs | 14 ------ src/download/{timeseries.rs => helpers.rs} | 35 +++++++++++-- src/download/jpmaqsdownload.rs | 44 ++++++++++++++-- src/download/mod.rs | 2 +- src/download/oauth_client.rs | 1 - src/download/parreq.rs | 2 +- src/download/requester.rs | 42 ++++++---------- src/main.rs | 58 +++++++++++++--------- 9 files changed, 123 insertions(+), 81 deletions(-) delete mode 100644 docs/Cargo.toml delete mode 100644 docs/src/lib.rs rename src/download/{timeseries.rs => helpers.rs} (90%) diff --git a/docs/Cargo.toml b/docs/Cargo.toml deleted file mode 100644 index 1e72233..0000000 --- a/docs/Cargo.toml +++ /dev/null @@ -1,6 +0,0 @@ -[package] -name = "docs" -version = "0.1.0" -edition = "2021" - -[dependencies] diff --git a/docs/src/lib.rs b/docs/src/lib.rs deleted file mode 100644 index b93cf3f..0000000 --- a/docs/src/lib.rs +++ /dev/null @@ -1,14 +0,0 @@ -pub fn add(left: u64, right: u64) -> u64 { - left + right -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn it_works() { - let result = add(2, 2); - assert_eq!(result, 4); - } -} diff --git a/src/download/timeseries.rs b/src/download/helpers.rs similarity index 90% rename from src/download/timeseries.rs rename to src/download/helpers.rs index 8880485..071fe2c 100644 --- a/src/download/timeseries.rs +++ b/src/download/helpers.rs @@ -8,6 +8,7 @@ use std::collections::HashSet; use std::error::Error; use std::fs::File; +/// Arguments for the DataQuery API request #[derive(Debug, Clone)] pub struct DQTimeseriesRequestArgs { pub start_date: String, @@ -19,6 +20,7 @@ pub struct DQTimeseriesRequestArgs { pub expressions: Vec, } +#[allow(dead_code)] impl DQTimeseriesRequestArgs { pub fn new( start_date: &str, @@ -62,6 +64,15 @@ impl DQTimeseriesRequestArgs { } } +/// Default implementation for DQTimeseriesRequestArgs +/// The default values are set: +/// - start_date: "1990-01-01" +/// - end_date: "TODAY+2D" +/// - calendar: "CAL_ALLDAYS" +/// - frequency: "FREQ_DAY" +/// - conversion: "CONV_LASTBUS_ABS" +/// - nan_treatment: "NA_NOTHING" +/// - expressions: empty Vec impl Default for DQTimeseriesRequestArgs { fn default() -> Self { DQTimeseriesRequestArgs { @@ -76,12 +87,14 @@ impl Default for DQTimeseriesRequestArgs { } } -/// Response from the DataQuery API +/// Response from the DataQuery API. #[derive(Deserialize, Debug)] pub struct DQTimeSeriesResponse { instruments: Vec, } +/// Response from the DataQuery API for a catalogue request. +#[allow(dead_code)] #[derive(Deserialize, Debug)] pub struct DQCatalogueResponse { pub items: u32, @@ -89,7 +102,9 @@ pub struct DQCatalogueResponse { pub all_instruments: Vec, } +/// Methods for DQCatalogueResponse impl DQCatalogueResponse { + /// Create a new DQCatalogueResponse from a list of DQCatalogueSingleResponse objects. pub fn new(catalogue_responses: Vec) -> Self { let all_instruments: Vec = catalogue_responses .iter() @@ -109,6 +124,8 @@ impl DQCatalogueResponse { } } +/// Response from the DataQuery API for a single catalogue request (one page). +#[allow(dead_code)] #[derive(Deserialize, Debug)] pub struct DQCatalogueSingleResponse { pub links: Vec>>, @@ -116,6 +133,8 @@ pub struct DQCatalogueSingleResponse { pub instruments: Vec, } +/// Representation of DQCatalogueSingleResponse.Instrument +#[allow(dead_code)] #[derive(Deserialize, Debug)] pub struct DQCatalogueInstrument { #[serde(rename = "instrument-id")] @@ -178,6 +197,7 @@ pub struct DQTimeSeries { time_series: Vec<(String, Option)>, } +/// Representation of a JPMaQS indicator (1 or more time series for a single JPMaQS ticker) #[derive(Debug)] pub struct JPMaQSIndicator { pub df: DataFrame, @@ -185,6 +205,7 @@ pub struct JPMaQSIndicator { pub metrics: Vec, } +/// Methods for DQTimeSeries impl DQTimeSeries { /// Get the ticker from the expression pub fn get_ticker(&self) -> Result> { @@ -234,6 +255,7 @@ impl DQTimeSeries { } } +/// Methods for DQTimeSeriesResponse impl DQTimeSeriesResponse { /// Return a list of all expressions in the response pub fn list_expressions(&self) -> Vec { @@ -280,8 +302,10 @@ impl DQTimeSeriesResponse { timeseries_by_ticker.into_iter().map(|(_, v)| v).collect() } - /// Consume the DQTimeSeriesResponse by grouping the time series by ticker. - /// This function can only be called once as it takes ownership of the data. + + /// Return a list of lists of DQTimeSeries, where each list contains all the timeseries for each ticker. + /// The function consumes the data, leaving an empty vector in its place. + /// This function can only be called once as it transfers ownership of the data. pub fn consume_to_grouped_by_ticker(mut self) -> Vec> { // Take the instruments vector, leaving an empty one in its place. let instruments = std::mem::take(&mut self.instruments); @@ -307,7 +331,9 @@ impl DQTimeSeriesResponse { } } +/// Methods for JPMaQSIndicator impl JPMaQSIndicator { + /// Create a new JPMaQSIndicator from a list of DQTimeSeries, ensuring they all belong to the same ticker pub fn new(timeseries_list: Vec) -> Result> { let found_tickers = timeseries_list .iter() @@ -331,6 +357,7 @@ impl JPMaQSIndicator { }) } + /// Add a single time series to the JPMaQSIndicator DataFrame pub fn add_timeseries(&mut self, timeseries: DQTimeSeries) -> Result<(), Box> { if self.ticker != timeseries.get_ticker()? { return Err("Timeseries does not belong to the same ticker".into()); @@ -339,6 +366,7 @@ impl JPMaQSIndicator { Ok(()) } + /// Convert the JPMaQSIndicator to a standard JPMaQS Quantamental DataFrame () pub fn as_qdf(&self) -> Result> { let mut qdf = self.df.clone(); let (cid, xcat) = match self.ticker.split_once('_') { @@ -353,6 +381,7 @@ impl JPMaQSIndicator { Ok(qdf) } + /// Save the JPMaQSIndicator to a CSV file pub fn save_qdf_to_csv(&self, filename: &str) -> Result<(), Box> { save_qdf_to_csv(&mut self.as_qdf()?, filename) } diff --git a/src/download/jpmaqsdownload.rs b/src/download/jpmaqsdownload.rs index e889e8e..e0e99df 100644 --- a/src/download/jpmaqsdownload.rs +++ b/src/download/jpmaqsdownload.rs @@ -1,8 +1,9 @@ use crate::download::oauth_client::OAuthClient; use crate::download::requester::DQRequester; -use crate::download::timeseries::DQTimeSeriesResponse; -use crate::download::timeseries::DQTimeseriesRequestArgs; -use crate::download::timeseries::JPMaQSIndicator; +use crate::download::helpers::DQTimeSeriesResponse; +use crate::download::helpers::DQTimeseriesRequestArgs; +use crate::download::helpers::JPMaQSIndicator; +// use polars::prelude::*; use std::error::Error; const DEFAULT_JPMAQS_METRICS: [&str; 4] = ["value", "grading", "eop_lag", "mop_lag"]; @@ -59,6 +60,7 @@ impl Default for JPMaQSDownloadGetIndicatorArgs { } } +/// Struct for downloading data from the JPMaQS data from JPMorgan DataQuery API. #[derive(Debug, Clone)] pub struct JPMaQSDownload { requester: DQRequester, @@ -72,21 +74,25 @@ impl Default for JPMaQSDownload { } impl JPMaQSDownload { + /// Create a new JPMaQSDownload instance with the provided client ID and client secret. pub fn new(client_id: String, client_secret: String) -> Self { let oauth_client = OAuthClient::new(client_id.clone(), client_secret.clone()); let requester = DQRequester::new(oauth_client); JPMaQSDownload { requester } } + /// Check the connection to the DataQuery API. pub fn check_connection(&mut self) -> Result<(), Box> { self.requester.check_connection() } + /// Get the catalogue of tickers available in the JPMaQS data. pub fn get_catalogue(&mut self) -> Result, Box> { let dq_catalogue = self.requester.get_catalogue("JPMAQS", 1000)?; Ok(dq_catalogue.all_instruments) } + /// Get the time series data for the provided expressions. pub fn get_expressions( &mut self, expressions: Vec, @@ -102,7 +108,8 @@ impl JPMaQSDownload { Ok(dqts_vec) } - pub fn get_indicators( + /// Get the indicators for the provided tickers and metrics. + pub fn get_indicators_list( &mut self, download_args: JPMaQSDownloadGetIndicatorArgs, ) -> Result, Box> { @@ -129,4 +136,33 @@ impl JPMaQSDownload { Err(e) => Err(e), } } + + pub fn get_indicators_qdf( + &mut self, + download_args: JPMaQSDownloadGetIndicatorArgs, + ) -> Result> { + let mut indicators: Vec = self.get_indicators_list(download_args)?; + if indicators.is_empty() { + return Err("No indicators retrieved".into()); + } + if indicators.len() == 1 { + return indicators.pop().unwrap().as_qdf(); + } + assert!(indicators.len() > 1); + let mut df_main = indicators.pop().unwrap().as_qdf().unwrap(); + while !indicators.is_empty() { + let df = indicators.pop().unwrap().as_qdf().unwrap(); + df_main = df_main.vstack(&df).unwrap(); + } + // sort by cid, xcat, real_date in that order + let _ = df_main.sort_in_place( + [ + "cid".to_string(), + "xcat".to_string(), + "real_date".to_string(), + ], + polars::chunked_array::ops::SortMultipleOptions::default(), + ); + Ok(df_main) + } } diff --git a/src/download/mod.rs b/src/download/mod.rs index 93140ba..507aecc 100644 --- a/src/download/mod.rs +++ b/src/download/mod.rs @@ -1,5 +1,5 @@ pub mod jpmaqsdownload; +pub mod helpers; pub mod oauth_client; pub mod requester; -pub mod timeseries; pub mod parreq; \ No newline at end of file diff --git a/src/download/oauth_client.rs b/src/download/oauth_client.rs index 8c39d7d..83145d2 100644 --- a/src/download/oauth_client.rs +++ b/src/download/oauth_client.rs @@ -47,7 +47,6 @@ impl OAuthClient { let json: Value = response.json()?; if let Some(token) = json["access_token"].as_str() { self.access_token = Some(token.to_string()); - println!("Access token retrieved; token length: {}", token.len()); if let Some(expires_in) = json["expires_in"].as_u64() { self.expires_at = Some(SystemTime::now() + Duration::from_secs(expires_in)); } else { diff --git a/src/download/parreq.rs b/src/download/parreq.rs index 7282089..31790ba 100644 --- a/src/download/parreq.rs +++ b/src/download/parreq.rs @@ -1,5 +1,5 @@ use crate::download::oauth_client::OAuthClient; -use crate::download::timeseries::DQTimeseriesRequestArgs; +use crate::download::helpers::DQTimeseriesRequestArgs; use futures::future; use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use std::error::Error; diff --git a/src/download/requester.rs b/src/download/requester.rs index fcec710..39cdc81 100644 --- a/src/download/requester.rs +++ b/src/download/requester.rs @@ -1,10 +1,10 @@ +use crate::download::helpers::DQCatalogueResponse; +use crate::download::helpers::DQCatalogueSingleResponse; +use crate::download::helpers::DQTimeSeriesResponse; +use crate::download::helpers::DQTimeseriesRequestArgs; +use crate::download::helpers::JPMaQSIndicator; use crate::download::oauth_client::OAuthClient; use crate::download::parreq::ParallelRequester; -use crate::download::timeseries::DQCatalogueResponse; -use crate::download::timeseries::DQCatalogueSingleResponse; -use crate::download::timeseries::DQTimeSeriesResponse; -use crate::download::timeseries::DQTimeseriesRequestArgs; -use crate::download::timeseries::JPMaQSIndicator; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use reqwest; use reqwest::blocking::Client; @@ -35,6 +35,7 @@ impl Default for DQRequester { } } +#[allow(dead_code)] impl DQRequester { pub fn new(oauth_client: OAuthClient) -> Self { DQRequester { @@ -75,7 +76,6 @@ impl DQRequester { pub fn check_connection(&mut self) -> Result<(), Box> { let response = self._request(reqwest::Method::GET, HEARTBEAT_ENDPOINT)?; - println!("Connection is successful: {}", response.status()); Ok(()) } @@ -124,7 +124,8 @@ impl DQRequester { Ok(DQCatalogueResponse::new(responses)) } - pub fn _fetch_single_timeseries_batch( + /// Fetches a single batch of timeseries data from the DataQuery API. + fn _fetch_single_timeseries_batch( &mut self, args: DQTimeseriesRequestArgs, ) -> Result> { @@ -144,6 +145,8 @@ impl DQRequester { Ok(response) } + /// Makes parallel requests to the DataQuery API to fetch timeseries data. + /// The function returns a vector of DQTimeSeriesResponse objects. pub fn get_timeseries( &mut self, args: DQTimeseriesRequestArgs, @@ -162,43 +165,28 @@ impl DQRequester { Ok(dqts_vec) } + /// Makes parallel requests to the DataQuery API to fetch timeseries data. + /// The function returns a vector of JPMaQSIndicator objects. + /// This function is preferred as it consumes less memory than get_timeseries. pub fn get_timeseries_as_jpmaqs_indicators( &mut self, args: DQTimeseriesRequestArgs, ) -> Result, Box> { let max_retries = 5; - println!( - "Invoking ParallelRequester for {:?} expressions", - args.expressions.len() - ); let mut pq = ParallelRequester::new(self.oauth_client.clone()); - let start = std::time::Instant::now(); - let mut response_texts = match pq.request_expressions(args, max_retries) { Ok(r) => r, Err(e) => return Err(e), }; - println!( - "Time elapsed for pq.request_expressions: {:?}", - start.elapsed() - ); - - // sleep for 10 seconds - println!("Pausing for 10 seconds"); - std::thread::sleep(std::time::Duration::from_secs(10)); - println!("Resuming - parsing response texts to JPMaQSIndicators"); let jpmaqs_indicators: Vec = parse_response_texts_to_jpmaqs_indicators(&mut response_texts); - // Sleep for 10 seconds - println!("Pausing for 10 seconds"); - std::thread::sleep(std::time::Duration::from_secs(10)); - println!("Resuming"); Ok(jpmaqs_indicators) } } +/// Parses a vector of response texts into a vector of DQTimeSeriesResponse objects. fn parse_response_texts(response_texts: Vec) -> Vec { response_texts .into_par_iter() @@ -215,6 +203,7 @@ fn parse_response_texts(response_texts: Vec) -> Vec, ) -> Vec { @@ -261,4 +250,3 @@ fn parse_response_texts_to_jpmaqs_indicators( println!("Number of responses left: {}", response_texts.len()); jpmaqs_indicators_map.into_iter().map(|(_, v)| v).collect() } - diff --git a/src/main.rs b/src/main.rs index 87dbd98..4a7db11 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,7 +17,7 @@ fn main() { start.elapsed() ); - let num_ticks = 5000; + let num_ticks = 20; let sel_tickers: Vec = tickers .iter() .take(num_ticks) @@ -27,9 +27,10 @@ fn main() { println!("Retrieving indicators for {} tickers", sel_tickers.len()); start = std::time::Instant::now(); - let indicators = jpamqs_download - .get_indicators(JPMaQSDownloadGetIndicatorArgs { + let mut res_df: DataFrame = jpamqs_download + .get_indicators_qdf(JPMaQSDownloadGetIndicatorArgs { tickers: sel_tickers.clone(), + start_date: "2024-11-05".to_string(), ..Default::default() }) .unwrap(); @@ -40,27 +41,36 @@ fn main() { start.elapsed() ); - // sleep for 10 seconds - println!("Sleeping for 10 seconds..."); - std::thread::sleep(std::time::Duration::from_secs(10)); - println!("concatting to mega DataFrame"); + // append _ to every cid + let cid_vec: Vec = res_df + .column("cid") + .unwrap() + .str() + .unwrap() + .into_iter() + .map(|s| s.unwrap_or("").to_string()) + .collect(); + let xcat_vec: Vec = res_df + .column("xcat") + .unwrap() + .str() + .unwrap() + .into_iter() + .map(|s| s.unwrap_or("").to_string()) + .collect(); - start = std::time::Instant::now(); - // let mut qdf_list = Vec::new(); - let mega_df = indicators - .iter() - .map(|indicator| indicator.as_qdf().unwrap()) - .fold(DataFrame::new(vec![]).unwrap(), |acc, df| { - acc.vstack(&df).unwrap() - }); + let mut tickers_set = std::collections::HashSet::new(); + for (cid, xcat) in cid_vec.iter().zip(xcat_vec.iter()) { + tickers_set.insert(cid.to_string() + "_" + xcat); + } - // - let es = mega_df.estimated_size(); - let es_mb = es as f64 / 1_048_576.0; - println!("Estimated size of DataFrame: {:.2} MB", es_mb); - println!("Sleeping for 10 seconds..."); - println!( - "Converted indicators to DataFrames in {:?}", - start.elapsed() - ); + // save this df to disk + let file_name = "data/jpmaqs_indicators_qdf.csv"; + let file = std::fs::File::create(file_name).unwrap(); + let mut csv_writer = CsvWriter::new(file); + csv_writer.finish(&mut res_df).unwrap(); + + // print len of tickers_set + println!("Unique tickers: {}", tickers_set.len()); + println!("DataFrame shape: {:?}", res_df.shape()); }