Add JPMaQSIndicator struct and related functionality; update dependencies

This commit is contained in:
Palash Tyagi 2024-11-06 02:21:49 +00:00
parent 45ae88b330
commit ce870b171e
4 changed files with 179 additions and 23 deletions

7
Cargo.lock generated
View File

@ -75,6 +75,12 @@ dependencies = [
"libc",
]
[[package]]
name = "anyhow"
version = "1.0.92"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74f37166d7d48a0284b99dd824694c26119c700b53bf0d1540cdb147dbdaaf13"
[[package]]
name = "argminmax"
version = "0.6.2"
@ -1235,6 +1241,7 @@ dependencies = [
name = "macrosynergy_dataquery"
version = "0.0.1"
dependencies = [
"anyhow",
"polars",
"reqwest 0.11.27",
"serde",

View File

@ -8,4 +8,5 @@ reqwest = { version = "0.11", features = ["blocking", "json"] }
serde_json = "1.0"
serde_urlencoded = "0.7"
serde = { version = "1.0", features = ["derive"] }
polars = { version = "0.44.2", features = ["temporal"] }
polars = { version = "0.44.2", features = ["temporal", "lazy"] }
anyhow = "1.0"

View File

@ -5,7 +5,7 @@ use macrosynergy_dataquery::oauth_client::OAuthClient;
use macrosynergy_dataquery::requester::Requester;
use macrosynergy_dataquery::timeseries::DQResponse;
// use macrosynergy_dataquery::timeseries::DQTimeSeries;
// use macrosynergy_dataquery::timeseries::JPMaQSIndicator;
use macrosynergy_dataquery::timeseries::JPMaQSIndicator;
// use macrosynergy_dataquery::timeseries::TimeSeriesList;
use std::env;
@ -23,9 +23,9 @@ fn main() {
let expressions_a = vec![
"DB(JPMAQS,USD_EQXR_NSA,value)",
"DB(JPMAQS,USD_EQXR_NSA,grading)",
// ];
// ];
// let expressions_b = vec![
// let expressions_b = vec![
"DB(JPMAQS,GBP_EQXR_NSA,value)",
"DB(JPMAQS,GBP_EQXR_NSA,grading)",
];
@ -45,7 +45,8 @@ fn main() {
// ts.to_dataframe().unwrap();
println!("{:?}", ts.to_dataframe().unwrap());
}
for ts_group in dqresponse.get_timeseries_by_ticker() {
let jpmi = JPMaQSIndicator::new(ts_group).unwrap();
println!("{:?}", jpmi.as_qdf().unwrap());
}
}

View File

@ -1,7 +1,12 @@
use serde::Deserialize;
use polars::prelude::*;
use polars::error::PolarsError;
use polars::export::chrono::{Datelike, NaiveDate};
use polars::export::chrono::{NaiveDate, NaiveDateTime};
use polars::prelude::*;
use polars::series::Series;
use serde::Deserialize;
use std::collections::HashMap;
use std::collections::HashSet;
use std::error::Error;
#[derive(Deserialize, Debug)]
pub struct DQResponse {
@ -26,35 +31,177 @@ pub struct DQTimeSeries {
time_series: Vec<(String, Option<f64>)>,
}
impl DQTimeSeries {
#[derive(Debug)]
pub struct JPMaQSIndicator {
pub df: DataFrame,
pub ticker: String,
pub metrics: Vec<String>,
}
impl DQTimeSeries {
/// Get the ticker from the expression
pub fn get_ticker(&self) -> Result<String, Box<dyn Error>> {
if !self.expression.starts_with("DB(JPMAQS,") {
return Err("Expression does not start with 'DB(JPMAQS,'".into());
}
let ticker = self.expression.split(',').nth(1).unwrap();
if ticker.is_empty() {
return Err("Ticker is empty".into());
}
Ok(ticker.to_string())
}
/// Get the metric from the expression
pub fn get_metric(&self) -> Result<String, Box<dyn Error>> {
if !self.expression.starts_with("DB(JPMAQS,") {
return Err("Expression does not start with 'DB(JPMAQS,'".into());
}
let metric = self
.expression
.trim_end_matches(')')
.split(',')
.last()
.unwrap();
if metric.is_empty() {
return Err("Metric is empty".into());
}
Ok(metric.to_string())
}
/// Convert the time series to a Polars DataFrame
pub fn to_dataframe(&self) -> Result<DataFrame, PolarsError> {
let dates: Vec<NaiveDate> = self.time_series.iter()
let dates: Vec<NaiveDate> = self
.time_series
.iter()
.map(|(date_str, _)| NaiveDate::parse_from_str(date_str, "%Y%m%d").unwrap())
.collect();
let values: Vec<Option<f64>> = self.time_series.iter()
.map(|(_, value)| *value)
.collect();
let values: Vec<Option<f64>> = self.time_series.iter().map(|(_, value)| *value).collect();
let date_series = Series::new("date".into(), &dates);
let value_series = Float64Chunked::new("value".into(), &values);
df!(
"real_date" => date_series,
self.expression.clone() => value_series
)
// Ok(df)
}
}
impl DQResponse {
/// Return a list of all DQTimeSeries in the response
pub fn get_all_timeseries(&self) -> Vec<DQTimeSeries> {
self.instruments.iter().flat_map(|instrument| {
instrument.attributes.iter().map(|attribute| DQTimeSeries {
expression: attribute.expression.clone(),
time_series: attribute.time_series.clone(),
self.instruments
.iter()
.flat_map(|instrument| {
instrument.attributes.iter().map(|attribute| DQTimeSeries {
expression: attribute.expression.clone(),
time_series: attribute.time_series.clone(),
})
})
}).collect()
.collect()
}
/// Return a list of lists of DQTimeSeries, where each list contains all the timeseries for each ticker
pub fn get_timeseries_by_ticker(&self) -> Vec<Vec<DQTimeSeries>> {
// create a hashmap where the key is the ticker and the value is a list of timeseries
let mut timeseries_by_ticker: HashMap<String, Vec<DQTimeSeries>> = HashMap::new();
for instrument in &self.instruments {
for attribute in &instrument.attributes {
let ticker = attribute.expression.split(',').nth(1).unwrap().to_string();
let timeseries = DQTimeSeries {
expression: attribute.expression.clone(),
time_series: attribute.time_series.clone(),
};
timeseries_by_ticker
.entry(ticker)
.or_insert_with(Vec::new)
.push(timeseries);
}
}
timeseries_by_ticker.into_iter().map(|(_, v)| v).collect()
}
}
impl JPMaQSIndicator {
pub fn new(timeseries_list: Vec<DQTimeSeries>) -> Result<Self, Box<dyn Error>> {
let found_tickers = timeseries_list
.iter()
.map(|ts| ts.get_ticker().unwrap())
.collect::<HashSet<String>>();
if found_tickers.len() != 1 {
return Err("All provided timeseries do not belong to the same ticker".into());
}
let ticker = found_tickers.into_iter().next().unwrap();
let metrics = timeseries_list
.iter()
.map(|ts| ts.get_metric().unwrap())
.collect::<Vec<String>>();
let mut all_dates = timeseries_list
.iter()
.flat_map(|ts| ts.time_series.iter().map(|(date, _)| date.to_string()))
.collect::<HashSet<String>>()
.into_iter()
.collect::<Vec<String>>();
let sorted_dates = all_dates
.iter()
.map(|s| s.to_string())
.collect::<Vec<String>>();
// create a vector of vectors with the values for each metric
let mut values_vec_vec = Vec::with_capacity(metrics.len() + 1);
// add real_date column
for i_metric in 0..metrics.len() {
let metric = &metrics[i_metric];
let mut metric_values = Vec::with_capacity(sorted_dates.len());
// when a date is missing, we need to insert a null value
for date in &sorted_dates {
let value = timeseries_list
.iter()
.find(|ts| ts.get_metric().unwrap() == *metric)
.and_then(|ts| {
ts.time_series
.iter()
.find(|(d, _)| d == date)
.map(|(_, v)| *v)
})
.unwrap_or(None);
metric_values.push(value);
}
values_vec_vec.push(metric_values);
}
let mut columns: Vec<Column> = values_vec_vec
.into_iter()
.enumerate()
.map(|(i, values)| {
let colx = Column::new(metrics[i].clone().into(), values);
colx
})
.collect();
let date_col = Column::new("real_date".into(), sorted_dates);
columns.insert(0, date_col);
let df = DataFrame::new(columns).unwrap();
Ok(JPMaQSIndicator {
df,
ticker,
metrics,
})
}
pub fn as_qdf(&self) -> Result<DataFrame, PolarsError> {
let mut qdf = self.df.clone();
let (cid, xcat) = match self.ticker.split_once('_') {
Some((cid, xcat)) => (cid, xcat),
None => return Err(PolarsError::ComputeError("Invalid ticker format".into())),
};
qdf.with_column(Series::new("cid".into(), vec![cid; qdf.height()]))?;
qdf.with_column(Series::new("xcat".into(), vec![xcat; qdf.height()]))?;
Ok(qdf)
}
}
@ -95,7 +242,7 @@ fn main() {
}
"#;
let response : DQResponse = serde_json::from_str(json_data).unwrap();
let response: DQResponse = serde_json::from_str(json_data).unwrap();
println!("{:?}", response);
let all_timeseries = response.get_all_timeseries();