From 693b4fad96aaa81961e75fc424770bbdf57c9c72 Mon Sep 17 00:00:00 2001 From: Palash Tyagi <23239946+Magnus167@users.noreply.github.com> Date: Thu, 7 Nov 2024 00:05:09 +0000 Subject: [PATCH] catalogue working --- src/main.rs | 45 ++++++----- src/requester.rs | 173 ++++++++++++++++++++++++++++++---------- src/timeseries.rs | 196 ++++++++++++++++++++++++++++++++-------------- 3 files changed, 296 insertions(+), 118 deletions(-) diff --git a/src/main.rs b/src/main.rs index 27f8f54..18d6d15 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,8 @@ use macrosynergy_dataquery::oauth_client::OAuthClient; use macrosynergy_dataquery::requester::Requester; -use macrosynergy_dataquery::timeseries::DQResponse; +use macrosynergy_dataquery::timeseries::DQCatalogueResponse; +use macrosynergy_dataquery::timeseries::DQTimeSeriesResponse; // use macrosynergy_dataquery::timeseries::DQTimeSeries; use macrosynergy_dataquery::timeseries::JPMaQSIndicator; // use macrosynergy_dataquery::timeseries::TimeSeriesList; @@ -23,30 +24,36 @@ fn main() { let expressions_a = vec![ "DB(JPMAQS,USD_EQXR_NSA,value)", "DB(JPMAQS,USD_EQXR_NSA,grading)", - // ]; - - // let expressions_b = vec![ + "DB(JPMAQS,USD_EQXR_NSA,eop_lag)", + "DB(JPMAQS,USD_EQXR_NSA,mop_lag)", "DB(JPMAQS,GBP_EQXR_NSA,value)", "DB(JPMAQS,GBP_EQXR_NSA,grading)", + "DB(JPMAQS,GBP_EQXR_NSA,eop_lag)", + "DB(JPMAQS,GBP_EQXR_NSA,mop_lag)", ]; // expressions_b.len(); - let response = requester - .get_timeseries_with_defaults(expressions_a) - .unwrap(); + // let response = requester + // .get_timeseries_with_defaults(expressions_a) + // .unwrap(); - let json_data = response.text().unwrap(); + // let json_data = response.text().unwrap(); + // let dqresponse: DQResponse = serde_json::from_str(&json_data).unwrap(); + // for ts_group in dqresponse.get_timeseries_by_ticker() { + // let jpmi = JPMaQSIndicator::new(ts_group).unwrap(); + // println!("{:?}", jpmi.as_qdf().unwrap()); + // } - let dqresponse: DQResponse = serde_json::from_str(&json_data).unwrap(); - let timeseries_list = dqresponse.get_all_timeseries(); - for ts in timeseries_list { - // println!("{:?}", ts); - // ts.to_dataframe().unwrap(); - println!("{:?}", ts.to_dataframe().unwrap()); - } - for ts_group in dqresponse.get_timeseries_by_ticker() { - let jpmi = JPMaQSIndicator::new(ts_group).unwrap(); - println!("{:?}", jpmi.as_qdf().unwrap()); - } + // get catalog + let response = requester.get_catalogue("JPMAQS").unwrap(); + + // let json_data = response.text().unwrap(); + let json_data = response[0].to_string(); + println!("{}", json_data); + + // try to pull into DQResponse + let response: DQCatalogueResponse = serde_json::from_str(&json_data).unwrap(); + + println!("{:?}", response); } diff --git a/src/requester.rs b/src/requester.rs index 85fb254..45031ad 100644 --- a/src/requester.rs +++ b/src/requester.rs @@ -7,13 +7,14 @@ use crate::oauth_client::OAuthClient; const API_BASE_URL: &str = "https://api-developer.jpmorgan.com/research/dataquery-authe/api/v2"; const HEARTBEAT_ENDPOINT: &str = "/services/heartbeat"; const TIMESERIES_ENDPOINT: &str = "/expressions/time-series"; +const CATALOGUE_ENDPOINT: &str = "/group/instruments"; +const JPMAQS_CATALOGUE_GROUP: &str = "JPMAQS"; pub struct Requester { oauth_client: OAuthClient, rqclient: Client, } -#[derive(Default)] pub struct TimeseriesRequestArgs { pub start_date: String, pub end_date: String, @@ -26,22 +27,57 @@ pub struct TimeseriesRequestArgs { impl TimeseriesRequestArgs { pub fn new( - start_date: Option<&str>, - end_date: Option<&str>, - calendar: Option<&str>, - frequency: Option<&str>, - conversion: Option<&str>, - nan_treatment: Option<&str>, - expressions: Vec<&str>, + start_date: &str, + end_date: &str, + calendar: &str, + frequency: &str, + conversion: &str, + nan_treatment: &str, + expressions: Vec, ) -> Self { TimeseriesRequestArgs { - start_date: start_date.unwrap_or("2024-10-20").to_string(), - end_date: end_date.unwrap_or("TODAY").to_string(), - calendar: calendar.unwrap_or("CAL_ALLDAYS").to_string(), - frequency: frequency.unwrap_or("FREQ_DAY").to_string(), - conversion: conversion.unwrap_or("CONV_LASTBUS_ABS").to_string(), - nan_treatment: nan_treatment.unwrap_or("NA_NOTHING").to_string(), - expressions: expressions.into_iter().map(|s| s.to_string()).collect(), + start_date: start_date.to_string(), + end_date: end_date.to_string(), + calendar: calendar.to_string(), + frequency: frequency.to_string(), + conversion: conversion.to_string(), + nan_treatment: nan_treatment.to_string(), + expressions, + } + } + pub fn update_expressions(&mut self, expressions: Vec) { + self.expressions = expressions; + } + pub fn as_query_string(&self) -> String { + let mut params = vec![ + ("format", "JSON"), + ("start-date", &self.start_date), + ("end-date", &self.end_date), + ("calendar", &self.calendar), + ("frequency", &self.frequency), + ("conversion", &self.conversion), + ("nan_treatment", &self.nan_treatment), + ("data", "NO_REFERENCE_DATA"), + ]; + + for expression in &self.expressions { + params.push(("expressions", expression)); + } + + serde_urlencoded::to_string(¶ms).unwrap() + } +} + +impl Default for TimeseriesRequestArgs { + fn default() -> Self { + TimeseriesRequestArgs { + start_date: "1990-01-01".to_string(), + end_date: "TODAY+2D".to_string(), + calendar: "CAL_ALLDAYS".to_string(), + frequency: "FREQ_DAY".to_string(), + conversion: "CONV_LASTBUS_ABS".to_string(), + nan_treatment: "NA_NOTHING".to_string(), + expressions: Vec::new(), } } } @@ -90,6 +126,43 @@ impl Requester { Ok(()) } + pub fn get_catalogue( + &mut self, + catalogue_group: &str, + ) -> Result, Box> { + let mut responses: Vec = Vec::new(); + let mut next_page = Some(format!( + "{}?group-id={}", + CATALOGUE_ENDPOINT, catalogue_group + )); + + while let Some(endpoint) = next_page { + std::thread::sleep(std::time::Duration::from_millis(200)); + let response = self._request(reqwest::Method::GET, &endpoint)?; + + if !response.status().is_success() { + return Err(Box::new(response.error_for_status().unwrap_err())); + } + + // Parse the response body as JSON + let json: serde_json::Value = response.json()?; + + // Add the JSON response to the vector + responses.push(json.clone()); + + // Check for the next link + next_page = json["links"].as_array().and_then(|links| { + links + .iter() + .find(|link| link["rel"] == "next") + .and_then(|link| link["href"].as_str()) + .map(|s| s.to_string()) + }); + } + + Ok(responses) + } + pub fn get_timeseries_batch( &mut self, args: TimeseriesRequestArgs, @@ -98,24 +171,7 @@ impl Requester { return Err("Number of expressions must be between 1 and 20".into()); } - // Construct the parameters - let mut params = vec![ - ("format", "JSON"), - ("start-date", &args.start_date), - ("end-date", &args.end_date), - ("calendar", &args.calendar), - ("frequency", &args.frequency), - ("conversion", &args.conversion), - ("nan_treatment", &args.nan_treatment), - ("data", "NO_REFERENCE_DATA"), - ]; - - // Add expressions to the parameters - for expression in &args.expressions { - params.push(("expressions", expression)); - } - - let query_string = serde_urlencoded::to_string(¶ms)?; + let query_string = args.as_query_string(); let endpoint = format!("{}?{}", TIMESERIES_ENDPOINT, query_string); let response = self._request(reqwest::Method::GET, &endpoint)?; @@ -126,16 +182,49 @@ impl Requester { &mut self, expressions: Vec<&str>, ) -> Result> { - let args = TimeseriesRequestArgs::new( - "1990-01-01".into(), - "TODAY+1".into(), - "CAL_ALLDAYS".into(), - "FREQ_DAY".into(), - "CONV_LASTBUS_ABS".into(), - "NA_NOTHING".into(), - expressions, - ); + // replace just the expressions + let args = TimeseriesRequestArgs { + expressions: expressions.into_iter().map(|s| s.to_string()).collect(), + ..Default::default() + }; self.get_timeseries_batch(args) } } + +#[allow(dead_code)] +fn main() { + let client_id = std::env::var("DQ_CLIENT_ID").unwrap(); + let client_secret = std::env::var("DQ_CLIENT_SECRET").unwrap(); + + let mut oauth_client = OAuthClient::new(client_id, client_secret); + oauth_client.fetch_token().unwrap(); + + let mut requester = Requester::new(oauth_client); + requester.check_connection().unwrap(); + + let response = requester.get_catalogue(JPMAQS_CATALOGUE_GROUP).unwrap(); + + let json_data = response[0].to_string(); + println!("{}", json_data); + + // try to pull into + + // let expressions_a = vec![ + // "DB(JPMAQS,USD_EQXR_NSA,value)", + // "DB(JPMAQS,USD_EQXR_NSA,grading)", + // "DB(JPMAQS,USD_EQXR_NSA,eop_lag)", + // "DB(JPMAQS,USD_EQXR_NSA,mop_lag)", + // "DB(JPMAQS,GBP_EQXR_NSA,value)", + // "DB(JPMAQS,GBP_EQXR_NSA,grading)", + // "DB(JPMAQS,GBP_EQXR_NSA,eop_lag)", + // "DB(JPMAQS,GBP_EQXR_NSA,mop_lag)", + // ]; + + // let response = requester + // .get_timeseries_with_defaults(expressions_a.iter().map(|s| *s).collect()) + // .unwrap(); + + // let json_data = response.text().unwrap(); + // println!("{}", json_data); +} diff --git a/src/timeseries.rs b/src/timeseries.rs index 9246382..eeb28d8 100644 --- a/src/timeseries.rs +++ b/src/timeseries.rs @@ -1,23 +1,42 @@ use polars::error::PolarsError; -use polars::export::chrono::{NaiveDate, NaiveDateTime}; +use polars::export::chrono::NaiveDate; use polars::prelude::*; use polars::series::Series; - use serde::Deserialize; use std::collections::HashMap; use std::collections::HashSet; use std::error::Error; +use std::fs::File; +/// Response from the DataQuery API #[derive(Deserialize, Debug)] -pub struct DQResponse { +pub struct DQTimeSeriesResponse { instruments: Vec, } +#[derive(Deserialize, Debug)] +pub struct DQCatalogueResponse { + links: Vec>, + items: u32, + instruments: Vec, +} + +#[derive(Deserialize, Debug)] +pub struct DQCatalogueInstrument { + #[serde(rename = "instrument-id")] + instrument_id: String, + #[serde(rename = "instrument-name")] + instrument_name: String, + item: u32, +} + +/// Representation of DQResponse.Instrument #[derive(Deserialize, Debug)] struct Instrument { attributes: Vec, } +/// Representation of DQResponse.Instrument.Attribute #[derive(Deserialize, Debug)] struct Attribute { expression: String, @@ -25,6 +44,7 @@ struct Attribute { time_series: Vec<(String, Option)>, } +/// Representation of a single time series #[derive(Debug)] pub struct DQTimeSeries { expression: String, @@ -87,7 +107,7 @@ impl DQTimeSeries { } } -impl DQResponse { +impl DQTimeSeriesResponse { /// Return a list of all DQTimeSeries in the response pub fn get_all_timeseries(&self) -> Vec { self.instruments @@ -138,73 +158,135 @@ impl JPMaQSIndicator { .map(|ts| ts.get_metric().unwrap()) .collect::>(); - let mut all_dates = timeseries_list - .iter() - .flat_map(|ts| ts.time_series.iter().map(|(date, _)| date.to_string())) - .collect::>() - .into_iter() - .collect::>(); - - let sorted_dates = all_dates - .iter() - .map(|s| s.to_string()) - .collect::>(); - - // create a vector of vectors with the values for each metric - let mut values_vec_vec = Vec::with_capacity(metrics.len() + 1); - // add real_date column - for i_metric in 0..metrics.len() { - let metric = &metrics[i_metric]; - let mut metric_values = Vec::with_capacity(sorted_dates.len()); - // when a date is missing, we need to insert a null value - for date in &sorted_dates { - let value = timeseries_list - .iter() - .find(|ts| ts.get_metric().unwrap() == *metric) - .and_then(|ts| { - ts.time_series - .iter() - .find(|(d, _)| d == date) - .map(|(_, v)| *v) - }) - .unwrap_or(None); - metric_values.push(value); - } - values_vec_vec.push(metric_values); - } - - let mut columns: Vec = values_vec_vec - .into_iter() - .enumerate() - .map(|(i, values)| { - let colx = Column::new(metrics[i].clone().into(), values); - colx - }) - .collect(); - let date_col = Column::new("real_date".into(), sorted_dates); - - columns.insert(0, date_col); - let df = DataFrame::new(columns).unwrap(); + let output_df = timeseries_list_to_dataframe(timeseries_list, true)?; Ok(JPMaQSIndicator { - df, - ticker, - metrics, + df: output_df, + ticker: ticker, + metrics: metrics, }) } - pub fn as_qdf(&self) -> Result { + pub fn as_qdf(&self) -> Result> { let mut qdf = self.df.clone(); let (cid, xcat) = match self.ticker.split_once('_') { Some((cid, xcat)) => (cid, xcat), - None => return Err(PolarsError::ComputeError("Invalid ticker format".into())), + None => return Err(format!("Invalid ticker format; got '{}'", self.ticker).into()), }; qdf.with_column(Series::new("cid".into(), vec![cid; qdf.height()]))?; qdf.with_column(Series::new("xcat".into(), vec![xcat; qdf.height()]))?; + + sort_qdf_columns(&mut qdf)?; + Ok(qdf) } + + pub fn save_qdf_to_csv(&self, filename: &str) -> Result<(), Box> { + save_qdf_to_csv(&mut self.as_qdf()?, filename) + } } +fn timeseries_list_to_dataframe( + timeseries_list: Vec, + dropna: bool, +) -> Result> { + let mut output_df = DataFrame::new(vec![]).expect("Failed to create DataFrame"); + + if let Some((first, rest)) = timeseries_list.split_first() { + // Convert the first timeseries to DataFrame and clone it to avoid modifying the original + let mut result_df = { + let mut df = first + .to_dataframe() + .expect("Failed to convert first timeseries to DataFrame"); + let curr_metric = first.get_metric().expect("Failed to get metric"); + let column_name = df.get_column_names()[1].to_string(); + df.rename(&column_name, curr_metric.into()) + .expect("Failed to rename column"); + df.clone() + }; + + // Iterate over the remaining timeseries + for ts in rest { + // Convert the current timeseries to DataFrame + let mut df = ts + .to_dataframe() + .expect("Failed to convert timeseries to DataFrame"); + + // Rename the metric column to the metric of the relevant DataFrame + let curr_metric = ts.get_metric().expect("Failed to get metric"); + let column_name = df.get_column_names()[1].to_string(); + df.rename(&column_name, curr_metric.into()) + .expect("Failed to rename column"); + + // Perform a left join on the 'real_date' column + result_df = result_df + .left_join(&df, ["real_date"], ["real_date"]) + .expect("Left join failed"); + } + + output_df = result_df.clone(); + } else { + println!("No timeseries provided."); + } + + // drop rows where all values are NA + if dropna { + output_df = output_df + .lazy() + .drop_nulls(None) + .filter(all_horizontal([all().is_not_null()])?) + .collect() + .expect("Failed to drop NA rows"); + } + + Ok(output_df) +} + +fn sort_qdf_columns(qdf: &mut DataFrame) -> Result<(), Box> { + let index_columns = ["real_date", "cid", "xcat"]; + let known_metrics = ["value", "grading", "eop_lag", "mop_lag"]; + + let df_columns = qdf + .get_column_names() + .into_iter() + .map(|s| s.clone().into_string()) + .collect::>(); + + let mut unknown_metrics: Vec = df_columns + .iter() + .filter(|&m| !known_metrics.contains(&m.as_str())) + .filter(|&m| !index_columns.contains(&m.as_str())) + .cloned() + .collect(); + + let mut new_columns: Vec = vec![]; + new_columns.extend(index_columns.iter().map(|s| s.to_string())); + for &colname in &known_metrics { + if df_columns.contains(&colname.into()) { + new_columns.push(colname.to_string()); + } + } + + unknown_metrics.sort(); + new_columns.extend(unknown_metrics); + *qdf = qdf + .select(new_columns.clone()) + .expect("Failed to select columns"); + + Ok(()) +} + +fn save_qdf_to_csv(qdf: &mut DataFrame, filename: &str) -> Result<(), Box> { + let file = File::create(filename)?; + + // Write the DataFrame to a CSV file + let mut csv_writer = CsvWriter::new(file); + csv_writer.finish(qdf)?; + + Ok(()) +} + +#[allow(dead_code)] fn main() { let json_data = r#" { @@ -242,7 +324,7 @@ fn main() { } "#; - let response: DQResponse = serde_json::from_str(json_data).unwrap(); + let response: DQTimeSeriesResponse = serde_json::from_str(json_data).unwrap(); println!("{:?}", response); let all_timeseries = response.get_all_timeseries();