use polars::error::PolarsError; use polars::export::chrono::NaiveDate; use polars::prelude::*; use polars::series::Series; use serde::Deserialize; use std::collections::HashMap; use std::collections::HashSet; use std::error::Error; use std::fs::File; /// Arguments for the DataQuery API request #[derive(Debug, Clone)] pub struct DQTimeseriesRequestArgs { pub start_date: String, pub end_date: String, pub calendar: String, pub frequency: String, pub conversion: String, pub nan_treatment: String, pub expressions: Vec, } #[allow(dead_code)] impl DQTimeseriesRequestArgs { pub fn new( start_date: &str, end_date: &str, calendar: &str, frequency: &str, conversion: &str, nan_treatment: &str, expressions: Vec, ) -> Self { DQTimeseriesRequestArgs { start_date: start_date.to_string(), end_date: end_date.to_string(), calendar: calendar.to_string(), frequency: frequency.to_string(), conversion: conversion.to_string(), nan_treatment: nan_treatment.to_string(), expressions, } } pub fn update_expressions(&mut self, expressions: Vec) { self.expressions = expressions; } pub fn as_query_string(&self) -> String { let mut params = vec![ ("format", "JSON"), ("start-date", &self.start_date), ("end-date", &self.end_date), ("calendar", &self.calendar), ("frequency", &self.frequency), ("conversion", &self.conversion), ("nan_treatment", &self.nan_treatment), ("data", "NO_REFERENCE_DATA"), ]; for expression in &self.expressions { params.push(("expressions", expression)); } serde_urlencoded::to_string(¶ms).unwrap() } } /// Default implementation for DQTimeseriesRequestArgs /// The default values are set: /// - start_date: "1990-01-01" /// - end_date: "TODAY+2D" /// - calendar: "CAL_ALLDAYS" /// - frequency: "FREQ_DAY" /// - conversion: "CONV_LASTBUS_ABS" /// - nan_treatment: "NA_NOTHING" /// - expressions: empty Vec impl Default for DQTimeseriesRequestArgs { fn default() -> Self { DQTimeseriesRequestArgs { start_date: "1990-01-01".to_string(), end_date: "TODAY+2D".to_string(), calendar: "CAL_ALLDAYS".to_string(), frequency: "FREQ_DAY".to_string(), conversion: "CONV_LASTBUS_ABS".to_string(), nan_treatment: "NA_NOTHING".to_string(), expressions: Vec::new(), } } } /// Response from the DataQuery API. #[derive(Deserialize, Debug)] pub struct DQTimeSeriesResponse { instruments: Vec, } /// Response from the DataQuery API for a catalogue request. #[allow(dead_code)] #[derive(Deserialize, Debug)] pub struct DQCatalogueResponse { pub items: u32, pub catalogue_responses: Vec, pub all_instruments: Vec, } /// Methods for DQCatalogueResponse impl DQCatalogueResponse { /// Create a new DQCatalogueResponse from a list of DQCatalogueSingleResponse objects. pub fn new(catalogue_responses: Vec) -> Self { let all_instruments: Vec = catalogue_responses .iter() .flat_map(|response| { response .instruments .iter() .map(|instrument| instrument.instrument_name.clone()) }) .collect(); DQCatalogueResponse { items: all_instruments.len() as u32, catalogue_responses: catalogue_responses, all_instruments: all_instruments, } } } /// Response from the DataQuery API for a single catalogue request (one page). #[allow(dead_code)] #[derive(Deserialize, Debug)] pub struct DQCatalogueSingleResponse { pub links: Vec>>, pub items: u32, pub instruments: Vec, } /// Representation of DQCatalogueSingleResponse.Instrument #[allow(dead_code)] #[derive(Deserialize, Debug)] pub struct DQCatalogueInstrument { #[serde(rename = "instrument-id")] pub instrument_id: String, #[serde(rename = "instrument-name")] pub instrument_name: String, pub item: u32, } /// Representation of DQResponse.Instrument #[derive(Deserialize, Debug)] struct Instrument { attributes: Vec, } /// Representation of DQResponse.Instrument.Attribute #[derive(Deserialize, Debug)] struct Attribute { expression: String, #[serde(rename = "time-series")] time_series: Vec<(String, Option)>, } impl Attribute { /// Get the ticker from the expression pub fn get_ticker(&self) -> Result> { if !self.expression.starts_with("DB(JPMAQS,") { return Err("Expression does not start with 'DB(JPMAQS,'".into()); } let ticker = self.expression.split(',').nth(1).unwrap(); if ticker.is_empty() { return Err("Ticker is empty".into()); } Ok(ticker.to_string()) } /// Get the metric from the expression #[allow(dead_code)] pub fn get_metric(&self) -> Result> { if !self.expression.starts_with("DB(JPMAQS,") { return Err("Expression does not start with 'DB(JPMAQS,'".into()); } let metric = self .expression .trim_end_matches(')') .split(',') .last() .unwrap(); if metric.is_empty() { return Err("Metric is empty".into()); } Ok(metric.to_string()) } } /// Representation of a single time series #[derive(Debug)] pub struct DQTimeSeries { expression: String, time_series: Vec<(String, Option)>, } /// Representation of a JPMaQS indicator (1 or more time series for a single JPMaQS ticker) #[derive(Debug)] pub struct JPMaQSIndicator { pub df: DataFrame, pub ticker: String, pub metrics: Vec, } /// Methods for DQTimeSeries impl DQTimeSeries { /// Get the ticker from the expression pub fn get_ticker(&self) -> Result> { if !self.expression.starts_with("DB(JPMAQS,") { return Err("Expression does not start with 'DB(JPMAQS,'".into()); } let ticker = self.expression.split(',').nth(1).unwrap(); if ticker.is_empty() { return Err("Ticker is empty".into()); } Ok(ticker.to_string()) } /// Get the metric from the expression pub fn get_metric(&self) -> Result> { if !self.expression.starts_with("DB(JPMAQS,") { return Err("Expression does not start with 'DB(JPMAQS,'".into()); } let metric = self .expression .trim_end_matches(')') .split(',') .last() .unwrap(); if metric.is_empty() { return Err("Metric is empty".into()); } Ok(metric.to_string()) } /// Convert the time series to a Polars DataFrame pub fn to_dataframe(&self) -> Result { let dates: Vec = self .time_series .iter() .map(|(date_str, _)| NaiveDate::parse_from_str(date_str, "%Y%m%d").unwrap()) .collect(); let values: Vec> = self.time_series.iter().map(|(_, value)| *value).collect(); let date_series = Series::new("date".into(), &dates); let value_series = Float64Chunked::new("value".into(), &values); df!( "real_date" => date_series, self.expression.clone() => value_series ) } } /// Methods for DQTimeSeriesResponse impl DQTimeSeriesResponse { /// Return a list of all expressions in the response pub fn list_expressions(&self) -> Vec { self.instruments .iter() .flat_map(|instrument| { instrument .attributes .iter() .map(|attribute| attribute.expression.clone()) }) .collect() } /// Return a list of all DQTimeSeries in the response pub fn get_all_timeseries(&self) -> Vec { self.instruments .iter() .flat_map(|instrument| { instrument.attributes.iter().map(|attribute| DQTimeSeries { expression: attribute.expression.clone(), time_series: attribute.time_series.clone(), }) }) .collect() } /// Return a list of lists of DQTimeSeries, where each list contains all the timeseries for each ticker pub fn get_timeseries_by_ticker(&self) -> Vec> { let timeseries_by_ticker = self .instruments .iter() .flat_map(|instrument| { instrument.attributes.iter().map(|attribute| DQTimeSeries { expression: attribute.expression.clone(), time_series: attribute.time_series.clone(), }) }) .fold(HashMap::new(), |mut acc, ts| { let ticker = ts.get_ticker().unwrap(); acc.entry(ticker).or_insert_with(Vec::new).push(ts); acc }); timeseries_by_ticker.into_iter().map(|(_, v)| v).collect() } /// Return a list of lists of DQTimeSeries, where each list contains all the timeseries for each ticker. /// The function consumes the data, leaving an empty vector in its place. /// This function can only be called once as it transfers ownership of the data. pub fn consume_to_grouped_by_ticker(mut self) -> Vec> { // Take the instruments vector, leaving an empty one in its place. let instruments = std::mem::take(&mut self.instruments); // Group time series by ticker let mut timeseries_by_ticker: HashMap> = HashMap::new(); for instrument in instruments { for attribute in instrument.attributes { let ticker = attribute.get_ticker().unwrap_or_default(); timeseries_by_ticker .entry(ticker) .or_default() .push(DQTimeSeries { expression: attribute.expression, time_series: attribute.time_series, }); } } // Convert the HashMap into a Vec of Vecs timeseries_by_ticker.into_iter().map(|(_, v)| v).collect() } } /// Methods for JPMaQSIndicator impl JPMaQSIndicator { /// Create a new JPMaQSIndicator from a list of DQTimeSeries, ensuring they all belong to the same ticker pub fn new(timeseries_list: Vec) -> Result> { let found_tickers = timeseries_list .iter() .map(|ts| ts.get_ticker().unwrap()) .collect::>(); if found_tickers.len() != 1 { return Err("All provided timeseries do not belong to the same ticker".into()); } let ticker = found_tickers.into_iter().next().unwrap(); let metrics = timeseries_list .iter() .map(|ts| ts.get_metric().unwrap()) .collect::>(); let output_df = timeseries_list_to_dataframe(timeseries_list, true)?; Ok(JPMaQSIndicator { df: output_df, ticker: ticker, metrics: metrics, }) } /// Add a single time series to the JPMaQSIndicator DataFrame pub fn add_timeseries(&mut self, timeseries: DQTimeSeries) -> Result<(), Box> { if self.ticker != timeseries.get_ticker()? { return Err("Timeseries does not belong to the same ticker".into()); } add_timeseries_to_df(&mut self.df, timeseries)?; Ok(()) } /// Convert the JPMaQSIndicator to a standard JPMaQS Quantamental DataFrame () pub fn as_qdf(&self) -> Result> { let mut qdf = self.df.clone(); let (cid, xcat) = match self.ticker.split_once('_') { Some((cid, xcat)) => (cid, xcat), None => return Err(format!("Invalid ticker format; got '{}'", self.ticker).into()), }; qdf.with_column(Series::new("cid".into(), vec![cid; qdf.height()]))?; qdf.with_column(Series::new("xcat".into(), vec![xcat; qdf.height()]))?; sort_qdf_columns(&mut qdf)?; Ok(qdf) } /// Save the JPMaQSIndicator to a CSV file pub fn save_qdf_to_csv(&self, filename: &str) -> Result<(), Box> { save_qdf_to_csv(&mut self.as_qdf()?, filename) } } fn timeseries_list_to_dataframe( timeseries_list: Vec, dropna: bool, ) -> Result> { let mut output_df = DataFrame::new(vec![]).expect("Failed to create DataFrame"); if let Some((first, rest)) = timeseries_list.split_first() { // Convert the first timeseries to DataFrame and clone it to avoid modifying the original let mut result_df = { let mut df = first .to_dataframe() .expect("Failed to convert first timeseries to DataFrame"); let curr_metric = first.get_metric().expect("Failed to get metric"); let column_name = df.get_column_names()[1].to_string(); df.rename(&column_name, curr_metric.into()) .expect("Failed to rename column"); df.clone() }; // Iterate over the remaining timeseries for ts in rest { // Convert the current timeseries to DataFrame let mut df = ts .to_dataframe() .expect("Failed to convert timeseries to DataFrame"); // Rename the metric column to the metric of the relevant DataFrame let curr_metric = ts.get_metric().expect("Failed to get metric"); let column_name = df.get_column_names()[1].to_string(); df.rename(&column_name, curr_metric.into()) .expect("Failed to rename column"); // Perform a left join on the 'real_date' column result_df = result_df .left_join(&df, ["real_date"], ["real_date"]) .expect("Left join failed"); } output_df = result_df.clone(); } else { println!("No timeseries provided."); } // drop rows where all values are NA if dropna { output_df = output_df .lazy() .drop_nulls(None) .filter(all_horizontal([all().is_not_null()])?) .collect() .expect("Failed to drop NA rows"); } Ok(output_df) } fn add_timeseries_to_df( df: &mut DataFrame, timeseries: DQTimeSeries, ) -> Result<(), Box> { let mut new_df = timeseries.to_dataframe()?; let curr_metric = timeseries.get_metric()?; let column_name = new_df.get_column_names()[1].to_string(); new_df .rename(&column_name, curr_metric.into()) .expect("Failed to rename column"); *df = df .left_join(&new_df, ["real_date"], ["real_date"]) .expect("Left join failed"); Ok(()) } fn sort_qdf_columns(qdf: &mut DataFrame) -> Result<(), Box> { let index_columns = ["real_date", "cid", "xcat"]; let known_metrics = ["value", "grading", "eop_lag", "mop_lag"]; let df_columns = qdf .get_column_names() .into_iter() .map(|s| s.clone().into_string()) .collect::>(); let mut unknown_metrics: Vec = df_columns .iter() .filter(|&m| !known_metrics.contains(&m.as_str())) .filter(|&m| !index_columns.contains(&m.as_str())) .cloned() .collect(); let mut new_columns: Vec = vec![]; new_columns.extend(index_columns.iter().map(|s| s.to_string())); for &colname in &known_metrics { if df_columns.contains(&colname.into()) { new_columns.push(colname.to_string()); } } unknown_metrics.sort(); new_columns.extend(unknown_metrics); *qdf = qdf .select(new_columns.clone()) .expect("Failed to select columns"); Ok(()) } fn save_qdf_to_csv(qdf: &mut DataFrame, filename: &str) -> Result<(), Box> { let file = File::create(filename)?; // Write the DataFrame to a CSV file let mut csv_writer = CsvWriter::new(file); csv_writer.finish(qdf)?; Ok(()) }