diff --git a/Cargo.lock b/Cargo.lock index 6b8896f..f88ae41 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -75,6 +75,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anyhow" +version = "1.0.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74f37166d7d48a0284b99dd824694c26119c700b53bf0d1540cdb147dbdaaf13" + [[package]] name = "argminmax" version = "0.6.2" @@ -1235,6 +1241,7 @@ dependencies = [ name = "macrosynergy_dataquery" version = "0.0.1" dependencies = [ + "anyhow", "polars", "reqwest 0.11.27", "serde", diff --git a/Cargo.toml b/Cargo.toml index a20ba44..9d89f3d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,4 +8,5 @@ reqwest = { version = "0.11", features = ["blocking", "json"] } serde_json = "1.0" serde_urlencoded = "0.7" serde = { version = "1.0", features = ["derive"] } -polars = { version = "0.44.2", features = ["temporal"] } \ No newline at end of file +polars = { version = "0.44.2", features = ["temporal", "lazy"] } +anyhow = "1.0" diff --git a/src/main.rs b/src/main.rs index 6807cbb..27f8f54 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,7 @@ use macrosynergy_dataquery::oauth_client::OAuthClient; use macrosynergy_dataquery::requester::Requester; use macrosynergy_dataquery::timeseries::DQResponse; // use macrosynergy_dataquery::timeseries::DQTimeSeries; -// use macrosynergy_dataquery::timeseries::JPMaQSIndicator; +use macrosynergy_dataquery::timeseries::JPMaQSIndicator; // use macrosynergy_dataquery::timeseries::TimeSeriesList; use std::env; @@ -23,9 +23,9 @@ fn main() { let expressions_a = vec![ "DB(JPMAQS,USD_EQXR_NSA,value)", "DB(JPMAQS,USD_EQXR_NSA,grading)", - // ]; + // ]; - // let expressions_b = vec![ + // let expressions_b = vec![ "DB(JPMAQS,GBP_EQXR_NSA,value)", "DB(JPMAQS,GBP_EQXR_NSA,grading)", ]; @@ -45,7 +45,8 @@ fn main() { // ts.to_dataframe().unwrap(); println!("{:?}", ts.to_dataframe().unwrap()); } - - - + for ts_group in dqresponse.get_timeseries_by_ticker() { + let jpmi = JPMaQSIndicator::new(ts_group).unwrap(); + println!("{:?}", jpmi.as_qdf().unwrap()); + } } diff --git a/src/timeseries.rs b/src/timeseries.rs index 184bdc1..9246382 100644 --- a/src/timeseries.rs +++ b/src/timeseries.rs @@ -1,7 +1,12 @@ -use serde::Deserialize; -use polars::prelude::*; use polars::error::PolarsError; -use polars::export::chrono::{Datelike, NaiveDate}; +use polars::export::chrono::{NaiveDate, NaiveDateTime}; +use polars::prelude::*; +use polars::series::Series; + +use serde::Deserialize; +use std::collections::HashMap; +use std::collections::HashSet; +use std::error::Error; #[derive(Deserialize, Debug)] pub struct DQResponse { @@ -26,35 +31,177 @@ pub struct DQTimeSeries { time_series: Vec<(String, Option)>, } -impl DQTimeSeries { +#[derive(Debug)] +pub struct JPMaQSIndicator { + pub df: DataFrame, + pub ticker: String, + pub metrics: Vec, +} + +impl DQTimeSeries { + /// Get the ticker from the expression + pub fn get_ticker(&self) -> Result> { + if !self.expression.starts_with("DB(JPMAQS,") { + return Err("Expression does not start with 'DB(JPMAQS,'".into()); + } + let ticker = self.expression.split(',').nth(1).unwrap(); + if ticker.is_empty() { + return Err("Ticker is empty".into()); + } + Ok(ticker.to_string()) + } + + /// Get the metric from the expression + pub fn get_metric(&self) -> Result> { + if !self.expression.starts_with("DB(JPMAQS,") { + return Err("Expression does not start with 'DB(JPMAQS,'".into()); + } + let metric = self + .expression + .trim_end_matches(')') + .split(',') + .last() + .unwrap(); + if metric.is_empty() { + return Err("Metric is empty".into()); + } + Ok(metric.to_string()) + } + + /// Convert the time series to a Polars DataFrame pub fn to_dataframe(&self) -> Result { - let dates: Vec = self.time_series.iter() + let dates: Vec = self + .time_series + .iter() .map(|(date_str, _)| NaiveDate::parse_from_str(date_str, "%Y%m%d").unwrap()) .collect(); - let values: Vec> = self.time_series.iter() - .map(|(_, value)| *value) - .collect(); + let values: Vec> = self.time_series.iter().map(|(_, value)| *value).collect(); let date_series = Series::new("date".into(), &dates); let value_series = Float64Chunked::new("value".into(), &values); - df!( "real_date" => date_series, self.expression.clone() => value_series ) - // Ok(df) } } impl DQResponse { + /// Return a list of all DQTimeSeries in the response pub fn get_all_timeseries(&self) -> Vec { - self.instruments.iter().flat_map(|instrument| { - instrument.attributes.iter().map(|attribute| DQTimeSeries { - expression: attribute.expression.clone(), - time_series: attribute.time_series.clone(), + self.instruments + .iter() + .flat_map(|instrument| { + instrument.attributes.iter().map(|attribute| DQTimeSeries { + expression: attribute.expression.clone(), + time_series: attribute.time_series.clone(), + }) }) - }).collect() + .collect() + } + + /// Return a list of lists of DQTimeSeries, where each list contains all the timeseries for each ticker + pub fn get_timeseries_by_ticker(&self) -> Vec> { + // create a hashmap where the key is the ticker and the value is a list of timeseries + let mut timeseries_by_ticker: HashMap> = HashMap::new(); + for instrument in &self.instruments { + for attribute in &instrument.attributes { + let ticker = attribute.expression.split(',').nth(1).unwrap().to_string(); + let timeseries = DQTimeSeries { + expression: attribute.expression.clone(), + time_series: attribute.time_series.clone(), + }; + timeseries_by_ticker + .entry(ticker) + .or_insert_with(Vec::new) + .push(timeseries); + } + } + + timeseries_by_ticker.into_iter().map(|(_, v)| v).collect() + } +} + +impl JPMaQSIndicator { + pub fn new(timeseries_list: Vec) -> Result> { + let found_tickers = timeseries_list + .iter() + .map(|ts| ts.get_ticker().unwrap()) + .collect::>(); + if found_tickers.len() != 1 { + return Err("All provided timeseries do not belong to the same ticker".into()); + } + let ticker = found_tickers.into_iter().next().unwrap(); + let metrics = timeseries_list + .iter() + .map(|ts| ts.get_metric().unwrap()) + .collect::>(); + + let mut all_dates = timeseries_list + .iter() + .flat_map(|ts| ts.time_series.iter().map(|(date, _)| date.to_string())) + .collect::>() + .into_iter() + .collect::>(); + + let sorted_dates = all_dates + .iter() + .map(|s| s.to_string()) + .collect::>(); + + // create a vector of vectors with the values for each metric + let mut values_vec_vec = Vec::with_capacity(metrics.len() + 1); + // add real_date column + for i_metric in 0..metrics.len() { + let metric = &metrics[i_metric]; + let mut metric_values = Vec::with_capacity(sorted_dates.len()); + // when a date is missing, we need to insert a null value + for date in &sorted_dates { + let value = timeseries_list + .iter() + .find(|ts| ts.get_metric().unwrap() == *metric) + .and_then(|ts| { + ts.time_series + .iter() + .find(|(d, _)| d == date) + .map(|(_, v)| *v) + }) + .unwrap_or(None); + metric_values.push(value); + } + values_vec_vec.push(metric_values); + } + + let mut columns: Vec = values_vec_vec + .into_iter() + .enumerate() + .map(|(i, values)| { + let colx = Column::new(metrics[i].clone().into(), values); + colx + }) + .collect(); + let date_col = Column::new("real_date".into(), sorted_dates); + + columns.insert(0, date_col); + let df = DataFrame::new(columns).unwrap(); + + Ok(JPMaQSIndicator { + df, + ticker, + metrics, + }) + } + + pub fn as_qdf(&self) -> Result { + let mut qdf = self.df.clone(); + let (cid, xcat) = match self.ticker.split_once('_') { + Some((cid, xcat)) => (cid, xcat), + None => return Err(PolarsError::ComputeError("Invalid ticker format".into())), + }; + qdf.with_column(Series::new("cid".into(), vec![cid; qdf.height()]))?; + qdf.with_column(Series::new("xcat".into(), vec![xcat; qdf.height()]))?; + Ok(qdf) } } @@ -95,7 +242,7 @@ fn main() { } "#; - let response : DQResponse = serde_json::from_str(json_data).unwrap(); + let response: DQResponse = serde_json::from_str(json_data).unwrap(); println!("{:?}", response); let all_timeseries = response.get_all_timeseries();