This commit is contained in:
Palash Tyagi 2024-11-13 00:35:07 +00:00
parent c69454fe2f
commit 6cf7a2f174
6 changed files with 156 additions and 45 deletions

4
.gitignore vendored
View File

@ -2,4 +2,6 @@
dev/ dev/
/target /target
data/

View File

@ -381,10 +381,22 @@ impl JPMaQSIndicator {
Ok(qdf) Ok(qdf)
} }
pub fn get_cid(&self) -> String {
self.ticker.split('_').next().unwrap().to_string()
}
pub fn get_xcat(&self) -> String {
self.ticker.split('_').nth(1).unwrap().to_string()
}
/// Save the JPMaQSIndicator to a CSV file /// Save the JPMaQSIndicator to a CSV file
pub fn save_qdf_to_csv(&self, filename: &str) -> Result<(), Box<dyn Error>> { pub fn save_qdf_to_csv(&self, filename: &str) -> Result<(), Box<dyn Error>> {
save_qdf_to_csv(&mut self.as_qdf()?, filename) save_qdf_to_csv(&mut self.as_qdf()?, filename)
} }
pub fn save_as_csv(self, filename: &str) -> Result<(), Box<dyn Error>> {
save_indicator_df_as_csv(self, filename)
}
} }
fn timeseries_list_to_dataframe( fn timeseries_list_to_dataframe(
@ -494,6 +506,19 @@ fn sort_qdf_columns(qdf: &mut DataFrame) -> Result<(), Box<dyn Error>> {
Ok(()) Ok(())
} }
fn save_indicator_df_as_csv(
indicator: JPMaQSIndicator,
filename: &str,
) -> Result<(), Box<dyn Error>> {
let file = File::create(filename)?;
let mut csv_writer = CsvWriter::new(file);
let mut df = indicator.df;
csv_writer.finish(&mut df)?;
Ok(())
}
fn save_qdf_to_csv(qdf: &mut DataFrame, filename: &str) -> Result<(), Box<dyn Error>> { fn save_qdf_to_csv(qdf: &mut DataFrame, filename: &str) -> Result<(), Box<dyn Error>> {
let file = File::create(filename)?; let file = File::create(filename)?;

View File

@ -1,8 +1,8 @@
use crate::download::oauth_client::OAuthClient;
use crate::download::requester::DQRequester;
use crate::download::helpers::DQTimeSeriesResponse; use crate::download::helpers::DQTimeSeriesResponse;
use crate::download::helpers::DQTimeseriesRequestArgs; use crate::download::helpers::DQTimeseriesRequestArgs;
use crate::download::helpers::JPMaQSIndicator; use crate::download::helpers::JPMaQSIndicator;
use crate::download::oauth_client::OAuthClient;
use crate::download::requester::DQRequester;
// use polars::prelude::*; // use polars::prelude::*;
use std::error::Error; use std::error::Error;
@ -165,4 +165,103 @@ impl JPMaQSDownload {
); );
Ok(df_main) Ok(df_main)
} }
// pub fn save_indicators_as_csv(
// &mut self,
// download_args: JPMaQSDownloadGetIndicatorArgs,
// batch_size: usize,
// folder_path: &str,
// ) -> Result<(), Box<dyn Error>> {
// // if the folder path does not exist, create it
// std::fs::create_dir_all(folder_path)?;
// // get ticker count, and split into chunks of batch_size
// let ticker_batches = download_args.tickers.chunks(batch_size);
// // let batch_download_args -- create new download args for each batch with new tickers
// let mut batch_download_args = Vec::new();
// for ticker_batch in ticker_batches {
// let mut new_args = download_args.clone();
// new_args.tickers = ticker_batch.to_vec();
// batch_download_args.push(new_args);
// }
// // print total number of download sessions
// println!(
// "Total number of download sessions: {}",
// batch_download_args.len()
// );
// // download each batch and save as csv
// for (_, batch_args) in batch_download_args.iter().enumerate() {
// let df_list = self.get_indicators_list(batch_args.clone())?;
// let total_indicators = df_list.len();
// save_indicators_list_as_csvs(df_list, &folder_path)?;
// println!("Saved {} indicators to {}", total_indicators, folder_path);
// }
// Ok(())
// }
pub fn save_indicators_as_csv(
&mut self,
download_args: JPMaQSDownloadGetIndicatorArgs,
batch_size: usize,
folder_path: &str,
) -> Result<(), Box<dyn Error>> {
// if the folder path does not exist, create it
std::fs::create_dir_all(folder_path)?;
// get ticker count, and split into chunks of batch_size
let ticker_batches = download_args.tickers.chunks(batch_size);
// let batch_download_args -- create new download args for each batch with new tickers
let mut batch_download_args = Vec::new();
for ticker_batch in ticker_batches {
let mut new_args = download_args.clone();
new_args.tickers = ticker_batch.to_vec();
batch_download_args.push(new_args);
}
// print total number of download sessions
println!(
"Total number of download sessions: {}",
batch_download_args.len()
);
// Start timer
let start_time = std::time::Instant::now();
let total_batches = batch_download_args.len();
// download each batch and save as csv
for (index, batch_args) in batch_download_args.iter().enumerate() {
let df_list = self.get_indicators_list(batch_args.clone())?;
let total_indicators = df_list.len();
save_indicators_list_as_csvs(df_list, &folder_path)?;
println!("Saved {} indicators to {}", total_indicators, folder_path);
// Calculate and print estimated remaining time
let elapsed = start_time.elapsed();
let average_time_per_batch = elapsed / (index as u32 + 1);
let remaining_batches = total_batches - index - 1;
let estimated_remaining = average_time_per_batch * remaining_batches as u32;
println!(
"Progress: {}/{}. Estimated time remaining: {:.2?}",
index + 1,
total_batches,
estimated_remaining
);
}
Ok(())
}
}
fn save_indicators_list_as_csvs(
indicators: Vec<JPMaQSIndicator>,
folder_path: &str,
) -> Result<(), Box<dyn Error>> {
for indicator in indicators {
let ticker_folder = format!("{}/{}", folder_path, indicator.get_xcat());
std::fs::create_dir_all(&ticker_folder)?;
let file_path = format!("{}/{}.csv", ticker_folder, indicator.ticker);
indicator.save_as_csv(&file_path)?;
}
Ok(())
} }

View File

@ -97,7 +97,7 @@ impl ParallelRequester {
let ep = format!("{}?{}", TIMESERIES_ENDPOINT, args_clone.as_query_string()); let ep = format!("{}?{}", TIMESERIES_ENDPOINT, args_clone.as_query_string());
let permit = semaphore.clone().acquire_owned().await.unwrap(); let permit = semaphore.clone().acquire_owned().await.unwrap();
if curr_batch % 10 == 0 { if curr_batch % 100 == 0 {
println!("Requesting batch {} of {}", curr_batch, total_batches); println!("Requesting batch {} of {}", curr_batch, total_batches);
} }

View File

@ -247,6 +247,5 @@ fn parse_response_texts_to_jpmaqs_indicators(
} }
} }
println!("Number of responses left: {}", response_texts.len());
jpmaqs_indicators_map.into_iter().map(|(_, v)| v).collect() jpmaqs_indicators_map.into_iter().map(|(_, v)| v).collect()
} }

View File

@ -17,7 +17,7 @@ fn main() {
start.elapsed() start.elapsed()
); );
let num_ticks = 20; let num_ticks = tickers.len();
let sel_tickers: Vec<String> = tickers let sel_tickers: Vec<String> = tickers
.iter() .iter()
.take(num_ticks) .take(num_ticks)
@ -27,50 +27,36 @@ fn main() {
println!("Retrieving indicators for {} tickers", sel_tickers.len()); println!("Retrieving indicators for {} tickers", sel_tickers.len());
start = std::time::Instant::now(); start = std::time::Instant::now();
let mut res_df: DataFrame = jpamqs_download
.get_indicators_qdf(JPMaQSDownloadGetIndicatorArgs { let res = jpamqs_download.save_indicators_as_csv(
JPMaQSDownloadGetIndicatorArgs {
tickers: sel_tickers.clone(), tickers: sel_tickers.clone(),
start_date: "2024-11-05".to_string(),
..Default::default() ..Default::default()
}) },
.unwrap(); 5000,
"./data/",
println!(
"Retrieved indicators for {} tickers in {:?}",
sel_tickers.len(),
start.elapsed()
); );
// append _ to every cid match res {
let cid_vec: Vec<String> = res_df Ok(_) => println!(
.column("cid") "Saved indicators for {} tickers in {:?}",
.unwrap() sel_tickers.len(),
.str() start.elapsed()
.unwrap() ),
.into_iter() Err(e) => println!("Error saving indicators: {:?}", e),
.map(|s| s.unwrap_or("").to_string())
.collect();
let xcat_vec: Vec<String> = res_df
.column("xcat")
.unwrap()
.str()
.unwrap()
.into_iter()
.map(|s| s.unwrap_or("").to_string())
.collect();
let mut tickers_set = std::collections::HashSet::new();
for (cid, xcat) in cid_vec.iter().zip(xcat_vec.iter()) {
tickers_set.insert(cid.to_string() + "_" + xcat);
} }
// save this df to disk // let mut res_df: DataFrame = jpamqs_download
let file_name = "data/jpmaqs_indicators_qdf.csv"; // .get_indicators_qdf(JPMaQSDownloadGetIndicatorArgs {
let file = std::fs::File::create(file_name).unwrap(); // tickers: sel_tickers.clone(),
let mut csv_writer = CsvWriter::new(file); // start_date: "2024-11-05".to_string(),
csv_writer.finish(&mut res_df).unwrap(); // ..Default::default()
// })
// .unwrap();
// print len of tickers_set // println!(
println!("Unique tickers: {}", tickers_set.len()); // "Retrieved indicators for {} tickers in {:?}",
println!("DataFrame shape: {:?}", res_df.shape()); // sel_tickers.len(),
// start.elapsed()
// );
} }