From 6cf7a2f1748440c858a547b5bcf061ed3007104f Mon Sep 17 00:00:00 2001 From: Palash Tyagi <23239946+Magnus167@users.noreply.github.com> Date: Wed, 13 Nov 2024 00:35:07 +0000 Subject: [PATCH] working --- .gitignore | 4 +- src/download/helpers.rs | 25 ++++++++ src/download/jpmaqsdownload.rs | 103 ++++++++++++++++++++++++++++++++- src/download/parreq.rs | 2 +- src/download/requester.rs | 1 - src/main.rs | 66 +++++++++------------ 6 files changed, 156 insertions(+), 45 deletions(-) diff --git a/.gitignore b/.gitignore index f518cf2..e5f33bf 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,6 @@ dev/ -/target \ No newline at end of file +/target + +data/ \ No newline at end of file diff --git a/src/download/helpers.rs b/src/download/helpers.rs index 071fe2c..b70797b 100644 --- a/src/download/helpers.rs +++ b/src/download/helpers.rs @@ -381,10 +381,22 @@ impl JPMaQSIndicator { Ok(qdf) } + pub fn get_cid(&self) -> String { + self.ticker.split('_').next().unwrap().to_string() + } + + pub fn get_xcat(&self) -> String { + self.ticker.split('_').nth(1).unwrap().to_string() + } + /// Save the JPMaQSIndicator to a CSV file pub fn save_qdf_to_csv(&self, filename: &str) -> Result<(), Box> { save_qdf_to_csv(&mut self.as_qdf()?, filename) } + + pub fn save_as_csv(self, filename: &str) -> Result<(), Box> { + save_indicator_df_as_csv(self, filename) + } } fn timeseries_list_to_dataframe( @@ -494,6 +506,19 @@ fn sort_qdf_columns(qdf: &mut DataFrame) -> Result<(), Box> { Ok(()) } +fn save_indicator_df_as_csv( + indicator: JPMaQSIndicator, + filename: &str, +) -> Result<(), Box> { + let file = File::create(filename)?; + + let mut csv_writer = CsvWriter::new(file); + let mut df = indicator.df; + csv_writer.finish(&mut df)?; + + Ok(()) +} + fn save_qdf_to_csv(qdf: &mut DataFrame, filename: &str) -> Result<(), Box> { let file = File::create(filename)?; diff --git a/src/download/jpmaqsdownload.rs b/src/download/jpmaqsdownload.rs index e0e99df..095c5c6 100644 --- a/src/download/jpmaqsdownload.rs +++ b/src/download/jpmaqsdownload.rs @@ -1,8 +1,8 @@ -use crate::download::oauth_client::OAuthClient; -use crate::download::requester::DQRequester; use crate::download::helpers::DQTimeSeriesResponse; use crate::download::helpers::DQTimeseriesRequestArgs; use crate::download::helpers::JPMaQSIndicator; +use crate::download::oauth_client::OAuthClient; +use crate::download::requester::DQRequester; // use polars::prelude::*; use std::error::Error; @@ -165,4 +165,103 @@ impl JPMaQSDownload { ); Ok(df_main) } + + // pub fn save_indicators_as_csv( + // &mut self, + // download_args: JPMaQSDownloadGetIndicatorArgs, + // batch_size: usize, + // folder_path: &str, + // ) -> Result<(), Box> { + // // if the folder path does not exist, create it + // std::fs::create_dir_all(folder_path)?; + + // // get ticker count, and split into chunks of batch_size + // let ticker_batches = download_args.tickers.chunks(batch_size); + // // let batch_download_args -- create new download args for each batch with new tickers + // let mut batch_download_args = Vec::new(); + // for ticker_batch in ticker_batches { + // let mut new_args = download_args.clone(); + // new_args.tickers = ticker_batch.to_vec(); + // batch_download_args.push(new_args); + // } + // // print total number of download sessions + // println!( + // "Total number of download sessions: {}", + // batch_download_args.len() + // ); + // // download each batch and save as csv + // for (_, batch_args) in batch_download_args.iter().enumerate() { + // let df_list = self.get_indicators_list(batch_args.clone())?; + // let total_indicators = df_list.len(); + // save_indicators_list_as_csvs(df_list, &folder_path)?; + // println!("Saved {} indicators to {}", total_indicators, folder_path); + // } + + // Ok(()) + // } + pub fn save_indicators_as_csv( + &mut self, + download_args: JPMaQSDownloadGetIndicatorArgs, + batch_size: usize, + folder_path: &str, + ) -> Result<(), Box> { + // if the folder path does not exist, create it + std::fs::create_dir_all(folder_path)?; + + // get ticker count, and split into chunks of batch_size + let ticker_batches = download_args.tickers.chunks(batch_size); + // let batch_download_args -- create new download args for each batch with new tickers + let mut batch_download_args = Vec::new(); + for ticker_batch in ticker_batches { + let mut new_args = download_args.clone(); + new_args.tickers = ticker_batch.to_vec(); + batch_download_args.push(new_args); + } + // print total number of download sessions + println!( + "Total number of download sessions: {}", + batch_download_args.len() + ); + + // Start timer + let start_time = std::time::Instant::now(); + let total_batches = batch_download_args.len(); + + // download each batch and save as csv + for (index, batch_args) in batch_download_args.iter().enumerate() { + let df_list = self.get_indicators_list(batch_args.clone())?; + let total_indicators = df_list.len(); + save_indicators_list_as_csvs(df_list, &folder_path)?; + println!("Saved {} indicators to {}", total_indicators, folder_path); + + // Calculate and print estimated remaining time + let elapsed = start_time.elapsed(); + let average_time_per_batch = elapsed / (index as u32 + 1); + let remaining_batches = total_batches - index - 1; + let estimated_remaining = average_time_per_batch * remaining_batches as u32; + + println!( + "Progress: {}/{}. Estimated time remaining: {:.2?}", + index + 1, + total_batches, + estimated_remaining + ); + } + + Ok(()) + } + +} + +fn save_indicators_list_as_csvs( + indicators: Vec, + folder_path: &str, +) -> Result<(), Box> { + for indicator in indicators { + let ticker_folder = format!("{}/{}", folder_path, indicator.get_xcat()); + std::fs::create_dir_all(&ticker_folder)?; + let file_path = format!("{}/{}.csv", ticker_folder, indicator.ticker); + indicator.save_as_csv(&file_path)?; + } + Ok(()) } diff --git a/src/download/parreq.rs b/src/download/parreq.rs index 31790ba..613c32b 100644 --- a/src/download/parreq.rs +++ b/src/download/parreq.rs @@ -97,7 +97,7 @@ impl ParallelRequester { let ep = format!("{}?{}", TIMESERIES_ENDPOINT, args_clone.as_query_string()); let permit = semaphore.clone().acquire_owned().await.unwrap(); - if curr_batch % 10 == 0 { + if curr_batch % 100 == 0 { println!("Requesting batch {} of {}", curr_batch, total_batches); } diff --git a/src/download/requester.rs b/src/download/requester.rs index 39cdc81..64a671f 100644 --- a/src/download/requester.rs +++ b/src/download/requester.rs @@ -247,6 +247,5 @@ fn parse_response_texts_to_jpmaqs_indicators( } } - println!("Number of responses left: {}", response_texts.len()); jpmaqs_indicators_map.into_iter().map(|(_, v)| v).collect() } diff --git a/src/main.rs b/src/main.rs index 4a7db11..b01a565 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,7 +17,7 @@ fn main() { start.elapsed() ); - let num_ticks = 20; + let num_ticks = tickers.len(); let sel_tickers: Vec = tickers .iter() .take(num_ticks) @@ -27,50 +27,36 @@ fn main() { println!("Retrieving indicators for {} tickers", sel_tickers.len()); start = std::time::Instant::now(); - let mut res_df: DataFrame = jpamqs_download - .get_indicators_qdf(JPMaQSDownloadGetIndicatorArgs { + + let res = jpamqs_download.save_indicators_as_csv( + JPMaQSDownloadGetIndicatorArgs { tickers: sel_tickers.clone(), - start_date: "2024-11-05".to_string(), ..Default::default() - }) - .unwrap(); - - println!( - "Retrieved indicators for {} tickers in {:?}", - sel_tickers.len(), - start.elapsed() + }, + 5000, + "./data/", ); - // append _ to every cid - let cid_vec: Vec = res_df - .column("cid") - .unwrap() - .str() - .unwrap() - .into_iter() - .map(|s| s.unwrap_or("").to_string()) - .collect(); - let xcat_vec: Vec = res_df - .column("xcat") - .unwrap() - .str() - .unwrap() - .into_iter() - .map(|s| s.unwrap_or("").to_string()) - .collect(); - - let mut tickers_set = std::collections::HashSet::new(); - for (cid, xcat) in cid_vec.iter().zip(xcat_vec.iter()) { - tickers_set.insert(cid.to_string() + "_" + xcat); + match res { + Ok(_) => println!( + "Saved indicators for {} tickers in {:?}", + sel_tickers.len(), + start.elapsed() + ), + Err(e) => println!("Error saving indicators: {:?}", e), } - // save this df to disk - let file_name = "data/jpmaqs_indicators_qdf.csv"; - let file = std::fs::File::create(file_name).unwrap(); - let mut csv_writer = CsvWriter::new(file); - csv_writer.finish(&mut res_df).unwrap(); + // let mut res_df: DataFrame = jpamqs_download + // .get_indicators_qdf(JPMaQSDownloadGetIndicatorArgs { + // tickers: sel_tickers.clone(), + // start_date: "2024-11-05".to_string(), + // ..Default::default() + // }) + // .unwrap(); - // print len of tickers_set - println!("Unique tickers: {}", tickers_set.len()); - println!("DataFrame shape: {:?}", res_df.shape()); + // println!( + // "Retrieved indicators for {} tickers in {:?}", + // sel_tickers.len(), + // start.elapsed() + // ); }