adding filter layer to jpmaqsdownload

This commit is contained in:
Palash Tyagi 2024-11-17 05:12:21 +00:00
parent d4721a0b79
commit 41e544d5bf
6 changed files with 184 additions and 98 deletions

View File

@ -24,6 +24,22 @@ fn construct_expressions(tickers: Vec<String>, metrics: Vec<String>) -> Vec<Stri
.collect::<Vec<String>>() .collect::<Vec<String>>()
} }
fn deconstruct_expression(expression: &str) -> (String, String, String) {
if !is_jpmaq_expression(expression) {
return (
expression.to_string(),
expression.to_string(),
expression.to_string(),
);
}
let parts = expression.split(',').collect::<Vec<&str>>();
let ticker = parts[1].to_string();
let metric = parts[2].to_string();
let ticker_parts = ticker.split_once('_').unwrap();
let cid = ticker_parts.0.to_string();
let xcat = ticker_parts.1.to_string();
(cid, xcat, metric)
}
fn is_jpmaq_expression(expression: &str) -> bool { fn is_jpmaq_expression(expression: &str) -> bool {
expression.starts_with("DB(JPMAQS,") expression.starts_with("DB(JPMAQS,")
&& expression.ends_with(")") && expression.ends_with(")")
@ -152,10 +168,30 @@ impl JPMaQSDownload {
/// Get the catalogue of tickers available in the JPMaQS data. /// Get the catalogue of tickers available in the JPMaQS data.
pub fn get_catalogue(&mut self) -> Result<Vec<String>, Box<dyn Error>> { pub fn get_catalogue(&mut self) -> Result<Vec<String>, Box<dyn Error>> {
println!("Getting JPMaQS catalogue ...");
let dq_catalogue = self.requester.get_catalogue("JPMAQS", 1000)?; let dq_catalogue = self.requester.get_catalogue("JPMAQS", 1000)?;
Ok(dq_catalogue.all_instruments) Ok(dq_catalogue.all_instruments)
} }
fn filter_expressions(
&mut self,
expressions: Vec<String>,
) -> Result<Vec<String>, Box<dyn Error>> {
// filter out expressions that are not in the catalogue
let dq_catalogue = self.get_catalogue()?;
println!("Filtering expressions based on the JPMaQS catalogue ...");
let filtered_expressions = expressions
.iter()
.filter(|expression| {
let (cid, xcat, _) = deconstruct_expression(expression);
dq_catalogue.contains(&format!("{}_{}", cid, xcat))
})
.map(|s| s.to_string())
.collect::<Vec<String>>();
Ok(filtered_expressions)
}
/// Get the time series data for the provided expressions. /// Get the time series data for the provided expressions.
pub fn get_expressions( pub fn get_expressions(
&mut self, &mut self,
@ -184,6 +220,8 @@ impl JPMaQSDownload {
let expressions = construct_expressions(download_args.tickers, download_args.metrics); let expressions = construct_expressions(download_args.tickers, download_args.metrics);
assert!(all_jpmaq_expressions(expressions.clone())); assert!(all_jpmaq_expressions(expressions.clone()));
let expressions = self.filter_expressions(expressions)?;
let dq_download_args = DQTimeseriesRequestArgs { let dq_download_args = DQTimeseriesRequestArgs {
expressions: expressions, expressions: expressions,
start_date: download_args.start_date, start_date: download_args.start_date,

View File

@ -1,5 +1,8 @@
pub mod helpers; mod helpers;
pub mod jpmaqsdownload;
mod oauth_client; mod oauth_client;
mod parreq; mod parreq;
mod requester; mod requester;
pub mod jpmaqsdownload;
pub use jpmaqsdownload::*;

View File

@ -77,6 +77,11 @@ impl ParallelRequester {
}) })
.collect(); .collect();
let prog_batches = match expression_batches.len() {
0..=250 => 25,
_ => 100,
};
let okay_response_texts = Arc::new(Mutex::new(Vec::new())); let okay_response_texts = Arc::new(Mutex::new(Vec::new()));
let failed_batches = Arc::new(Mutex::new(Vec::new())); let failed_batches = Arc::new(Mutex::new(Vec::new()));
let self_arc = Arc::new(self.clone()); let self_arc = Arc::new(self.clone());
@ -97,7 +102,7 @@ impl ParallelRequester {
let ep = format!("{}?{}", TIMESERIES_ENDPOINT, args_clone.as_query_string()); let ep = format!("{}?{}", TIMESERIES_ENDPOINT, args_clone.as_query_string());
let permit = semaphore.clone().acquire_owned().await.unwrap(); let permit = semaphore.clone().acquire_owned().await.unwrap();
if curr_batch % 100 == 0 { if curr_batch % prog_batches == 0 {
println!("Requesting batch {} of {}", curr_batch, total_batches); println!("Requesting batch {} of {}", curr_batch, total_batches);
} }

View File

@ -1,9 +1,11 @@
use msyrs::download::jpmaqsdownload::{JPMaQSDownload, JPMaQSDownloadGetIndicatorArgs}; use msyrs::download::{JPMaQSDownload, JPMaQSDownloadGetIndicatorArgs};
// use msyrs::download::jpmaqsdownload::*;
// use msyrs::utils::qdf::load::*; // use msyrs::utils::qdf::load::*;
// use msyrs::utils::qdf::dftools::*; // use msyrs::utils::qdf::dftools::*;
// use msyrs::utils::qdf::core::*; // use msyrs::utils::qdf::core::*;
use msyrs::utils::qdf::*; use msyrs::utils as msyrs_utils;
use msyrs::utils::qdf as msyrs_qdf;
use polars::frame::DataFrame;
#[allow(dead_code)] #[allow(dead_code)]
fn download_stuff() { fn download_stuff() {
@ -46,32 +48,22 @@ fn download_stuff() {
start.elapsed() start.elapsed()
); );
if !is_quantamental_dataframe(&res_df) { if !msyrs_qdf::is_quantamental_dataframe(&res_df) {
println!("DataFrame is not a quantamental DataFrame"); println!("DataFrame is not a quantamental DataFrame");
} else { } else {
println!("DataFrame is a quantamental DataFrame"); println!("DataFrame is a quantamental DataFrame");
} }
} }
fn main() { #[allow(dead_code)]
// E:\Work\ruzt\msyrs\data\JPMaQSData\ALLIFCDSGDP\AUD_ALLIFCDSGDP_NSA.csv fn load_mega_df() -> DataFrame {
// let pth = "E:/Work/ruzt/msyrs/data/JPMaQSData/ALLIFCDSGDP/AUD_ALLIFCDSGDP_NSA.csv";
// let df = msyrs_dftools::load_quantamental_dataframe(pth).unwrap();
// println!("{:?}", df);
// load_quantamental_dataframe_from_download_bank
// let st_pth = "E:/Work/ruzt/msyrs/data/JPMaQSData/";
let start = std::time::Instant::now(); let start = std::time::Instant::now();
let st_pth = "E:\\Work\\jpmaqs-data\\data"; let st_pth = "E:\\Work\\jpmaqs-data\\data";
let mega_df = load_quantamental_dataframe_from_download_bank( let mega_df = msyrs_qdf::load::load_qdf_from_download_bank(
st_pth, st_pth,
// Some(vec!["AUD", "USD", "GBP", "JPY"]),
// Some(vec!["RIR_NSA", "EQXR_NSA"]),
None, None,
None, None,
// Some(vec!["EQXR_NSA", "RIR_NSA"]),
// None
Some(vec![ Some(vec![
"AUD_EQXR_NSA", "AUD_EQXR_NSA",
"USD_EQXR_NSA", "USD_EQXR_NSA",
@ -83,87 +75,124 @@ fn main() {
"JPY_RIR_NSA", "JPY_RIR_NSA",
]), ]),
) )
.unwrap(); .unwrap();
let end = start.elapsed(); let end = start.elapsed();
println!("Loaded Mega DataFrame in {:?}", end); println!("Loaded Mega DataFrame in {:?}", end);
mega_df
}
let start = std::time::Instant::now(); fn main() {
let df_new = reduce_dataframe( nb();
mega_df.clone(), }
#[allow(dead_code)]
fn nb() {
let cids_dm = vec![
"AUD", "CAD", "CHF", "EUR", "GBP", "JPY", "NOK", "NZD", "SEK", "USD",
];
let cids_em = vec![
"CLP", "COP", "CZK", "HUF", "IDR", "ILS", "INR", "KRW", "MXN", "PLN", "THB", "TRY", "TWD",
"ZAR",
];
// cids = cids_dm + cids_em
let cids: Vec<String> = cids_dm
.iter()
.chain(cids_em.iter())
.map(|s| s.to_string())
.collect();
let non_dux = ["IDR", "NZD"];
#[allow(unused_variables)]
let cids_dux: Vec<String> = cids
.iter()
.filter(|s| !non_dux.contains(&s.as_str()))
.map(|s| s.to_string())
.collect();
let ecos = vec![
"CPIC_SA_P1M1ML12",
"CPIC_SJA_P3M3ML3AR",
"CPIC_SJA_P6M6ML6AR",
"CPIH_SA_P1M1ML12",
"CPIH_SJA_P3M3ML3AR",
"CPIH_SJA_P6M6ML6AR",
"INFTEFF_NSA",
"INTRGDP_NSA_P1M1ML12_3MMA",
"INTRGDPv5Y_NSA_P1M1ML12_3MMA",
"PCREDITGDP_SJA_D1M1ML12",
"RGDP_SA_P1Q1QL4_20QMA",
"RYLDIRS02Y_NSA",
"RYLDIRS05Y_NSA",
"PCREDITBN_SJA_P1M1ML12",
];
let mkts = vec![
"DU02YXR_NSA",
"DU05YXR_NSA",
"DU02YXR_VT10",
"DU05YXR_VT10",
"EQXR_NSA",
"EQXR_VT10",
"FXXR_NSA",
"FXXR_VT10",
"FXCRR_NSA",
"FXTARGETED_NSA",
"FXUNTRADABLE_NSA",
];
let xcats: Vec<String> = ecos
.iter()
.chain(mkts.iter())
.map(|s| s.to_string())
.collect();
let cids_str: Vec<&str> = cids.iter().map(AsRef::as_ref).collect();
let xcats_str: Vec<&str> = xcats.iter().map(AsRef::as_ref).collect();
let download_tickers = msyrs_utils::misc::create_interesecting_tickers(&cids_str, &xcats_str);
let mut jpmaqs_client = JPMaQSDownload::default();
let downloaded_df = jpmaqs_client
.get_indicators_qdf(JPMaQSDownloadGetIndicatorArgs {
tickers: download_tickers,
..Default::default()
})
.unwrap();
let eq_df = msyrs_qdf::reduce_dataframe(
downloaded_df.clone(),
None,
Some(vec!["EQXR_NSA", "EQXR_VT10"]),
None,
// None,
None,
None,
false,
)
.unwrap();
println!("{:?}", eq_df.head(Some(10)));
let fx_df = msyrs_qdf::reduce_dataframe(
downloaded_df,
None,
Some(vec![ Some(vec![
"GBP".to_string(), "FXXR_NSA",
"AUD".to_string(), "FXXR_VT10",
"USD".to_string(), "FXCRR_NSA",
"FXTARGETED_NSA",
"FXUNTRADABLE_NSA",
]), ]),
Some(vec!["RIR_NSA".to_string(), "EQXR_NSA".to_string()]),
None, None,
Some("2010-01-20"), // None,
None, None,
false,
)
.unwrap();
let end = start.elapsed();
println!("Reduced Mega DataFrame in {:?}", end);
// FOUND TICKERs
let start = std::time::Instant::now();
let found_tickers = get_unique_tickers(&df_new);
let end = start.elapsed();
println!(
"Found {:?} unique tickers in df_new",
found_tickers.unwrap()
);
println!("Found unique tickers in {:?}", end);
let end = start.elapsed();
println!("Loaded DataFrame in {:?}", end);
let start = std::time::Instant::now();
let df_gbp = reduce_dataframe(
df_new.clone(),
Some(vec!["GBP".to_string()]),
Some(vec!["RIR_NSA".to_string()]),
None,
Some("2024-11-12"),
None,
false,
)
.unwrap();
let end = start.elapsed();
println!("Reduced DataFrame in {:?}", end);
// println!("{:?}", df_gbp.head(Some(10)));
// FOUND TICKERs
let start = std::time::Instant::now();
let found_tickers = get_unique_tickers(&mega_df);
let end = start.elapsed();
println!(
"Found {:?} unique tickers in Mega DataFrame",
found_tickers.unwrap()
);
println!("Found unique tickers in {:?}", end);
let start = std::time::Instant::now();
let df_aud = reduce_dataframe(
df_new.clone(),
Some(vec!["USD".to_string()]),
// Some(vec!["EQXR_NSA".to_string(), "RIR_NSA".to_string()]),
Some(vec!["EQXR_NSA".to_string()]),
None,
Some("2024-11-13"),
None, None,
true, true,
) )
.unwrap(); .unwrap();
let end = start.elapsed();
println!("Reduced DataFrame in {:?}", end);
// dimenstions reduced from to
println!("{:?} from {:?}", df_aud.shape(), df_new.shape());
// println!("{:?}", df_aud.head(Some(10)));
let start = std::time::Instant::now(); println!("{:?}", fx_df.head(Some(10)));
let up_df = update_dataframe(&df_gbp, &df_aud).unwrap();
let end = start.elapsed(); let custom_df = msyrs_qdf::update_dataframe(&fx_df, &eq_df).unwrap();
println!("Updated DataFrame in {:?}", end);
println!("{:?}", up_df.head(Some(10))); println!("{:?}", custom_df.head(Some(10)));
} }

View File

@ -106,7 +106,7 @@ fn _load_qdf_thread_safe(file_path: &str) -> Result<DataFrame, Box<dyn Error + S
.into() .into()
}) })
} }
pub fn load_quantamental_dataframe_from_download_bank( pub fn load_qdf_from_download_bank(
folder_path: &str, folder_path: &str,
cids: Option<Vec<&str>>, cids: Option<Vec<&str>>,
xcats: Option<Vec<&str>>, xcats: Option<Vec<&str>>,

View File

@ -17,8 +17,8 @@ const QDF_INDEX_COLUMNS: [&str; 3] = ["real_date", "cid", "xcat"];
/// If no filters are provided, the original DataFrame is returned. /// If no filters are provided, the original DataFrame is returned.
pub fn reduce_dataframe( pub fn reduce_dataframe(
df: DataFrame, df: DataFrame,
cids: Option<Vec<String>>, cids: Option<Vec<&str>>,
xcats: Option<Vec<String>>, xcats: Option<Vec<&str>>,
metrics: Option<Vec<String>>, metrics: Option<Vec<String>>,
start: Option<&str>, start: Option<&str>,
end: Option<&str>, end: Option<&str>,
@ -36,8 +36,10 @@ pub fn reduce_dataframe(
let u_xcats: Vec<String> = get_unique_xcats(&new_df)?; let u_xcats: Vec<String> = get_unique_xcats(&new_df)?;
let u_tickers: Vec<String> = _get_unique_strs_from_str_column_object(&ticker_col)?; let u_tickers: Vec<String> = _get_unique_strs_from_str_column_object(&ticker_col)?;
let specified_cids: Vec<String> = cids.unwrap_or_else(|| u_cids.clone()); let specified_cids: Vec<&str> =
let specified_xcats: Vec<String> = xcats.unwrap_or_else(|| u_xcats.clone()); cids.unwrap_or_else(|| u_cids.iter().map(AsRef::as_ref).collect());
let specified_xcats: Vec<&str> =
xcats.unwrap_or_else(|| u_xcats.iter().map(AsRef::as_ref).collect());
let non_idx_cols: Vec<String> = new_df let non_idx_cols: Vec<String> = new_df
.get_column_names() .get_column_names()
@ -63,8 +65,17 @@ pub fn reduce_dataframe(
let keep_tickers: Vec<String> = match intersect { let keep_tickers: Vec<String> = match intersect {
// true => get_intersecting_cids_str_func(&specified_cids, &specified_xcats, &u_tickers), // true => get_intersecting_cids_str_func(&specified_cids, &specified_xcats, &u_tickers),
true => { true => {
let int_cids = let int_cids = get_intersecting_cids_str_func(
get_intersecting_cids_str_func(&specified_cids, &specified_xcats, &u_tickers); &specified_cids
.iter()
.map(|&s| s.to_string())
.collect::<Vec<String>>(),
&specified_xcats
.iter()
.map(|&s| s.to_string())
.collect::<Vec<String>>(),
&u_tickers,
);
create_interesecting_tickers( create_interesecting_tickers(
&int_cids.iter().map(AsRef::as_ref).collect::<Vec<&str>>(), &int_cids.iter().map(AsRef::as_ref).collect::<Vec<&str>>(),
&specified_xcats &specified_xcats