msyrs/src/download/timeseries.rs
Palash Tyagi 001c8488b0 working
2024-11-08 16:50:42 +00:00

378 lines
12 KiB
Rust

use polars::error::PolarsError;
use polars::export::chrono::NaiveDate;
use polars::prelude::*;
use polars::series::Series;
use serde::Deserialize;
use std::collections::HashMap;
use std::collections::HashSet;
use std::error::Error;
use std::fs::File;
/// Response from the DataQuery API
#[derive(Deserialize, Debug)]
pub struct DQTimeSeriesResponse {
instruments: Vec<Instrument>,
}
#[derive(Deserialize, Debug)]
pub struct DQCatalogueResponse {
pub items: u32,
pub catalogue_responses: Vec<DQCatalogueSingleResponse>,
pub all_instruments: Vec<String>,
}
impl DQCatalogueResponse {
pub fn new(catalogue_responses: Vec<DQCatalogueSingleResponse>) -> Self {
let all_instruments: Vec<String> = catalogue_responses
.iter()
.flat_map(|response| {
response
.instruments
.iter()
.map(|instrument| instrument.instrument_name.clone())
})
.collect();
DQCatalogueResponse {
items: all_instruments.len() as u32,
catalogue_responses: catalogue_responses,
all_instruments: all_instruments,
}
}
}
#[derive(Deserialize, Debug)]
pub struct DQCatalogueSingleResponse {
pub links: Vec<HashMap<String, Option<String>>>,
pub items: u32,
pub instruments: Vec<DQCatalogueInstrument>,
}
#[derive(Deserialize, Debug)]
pub struct DQCatalogueInstrument {
#[serde(rename = "instrument-id")]
pub instrument_id: String,
#[serde(rename = "instrument-name")]
pub instrument_name: String,
pub item: u32,
}
/// Representation of DQResponse.Instrument
#[derive(Deserialize, Debug)]
struct Instrument {
attributes: Vec<Attribute>,
}
/// Representation of DQResponse.Instrument.Attribute
#[derive(Deserialize, Debug)]
struct Attribute {
expression: String,
#[serde(rename = "time-series")]
time_series: Vec<(String, Option<f64>)>,
}
/// Representation of a single time series
#[derive(Debug)]
pub struct DQTimeSeries {
expression: String,
time_series: Vec<(String, Option<f64>)>,
}
#[derive(Debug)]
pub struct JPMaQSIndicator {
pub df: DataFrame,
pub ticker: String,
pub metrics: Vec<String>,
}
impl DQTimeSeries {
/// Get the ticker from the expression
pub fn get_ticker(&self) -> Result<String, Box<dyn Error>> {
if !self.expression.starts_with("DB(JPMAQS,") {
return Err("Expression does not start with 'DB(JPMAQS,'".into());
}
let ticker = self.expression.split(',').nth(1).unwrap();
if ticker.is_empty() {
return Err("Ticker is empty".into());
}
Ok(ticker.to_string())
}
/// Get the metric from the expression
pub fn get_metric(&self) -> Result<String, Box<dyn Error>> {
if !self.expression.starts_with("DB(JPMAQS,") {
return Err("Expression does not start with 'DB(JPMAQS,'".into());
}
let metric = self
.expression
.trim_end_matches(')')
.split(',')
.last()
.unwrap();
if metric.is_empty() {
return Err("Metric is empty".into());
}
Ok(metric.to_string())
}
/// Convert the time series to a Polars DataFrame
pub fn to_dataframe(&self) -> Result<DataFrame, PolarsError> {
let dates: Vec<NaiveDate> = self
.time_series
.iter()
.map(|(date_str, _)| NaiveDate::parse_from_str(date_str, "%Y%m%d").unwrap())
.collect();
let values: Vec<Option<f64>> = self.time_series.iter().map(|(_, value)| *value).collect();
let date_series = Series::new("date".into(), &dates);
let value_series = Float64Chunked::new("value".into(), &values);
df!(
"real_date" => date_series,
self.expression.clone() => value_series
)
}
}
impl DQTimeSeriesResponse {
/// Return a list of all expressions in the response
pub fn list_expressions(&self) -> Vec<String> {
self.instruments
.iter()
.flat_map(|instrument| {
instrument
.attributes
.iter()
.map(|attribute| attribute.expression.clone())
})
.collect()
}
/// Return a list of all DQTimeSeries in the response
pub fn get_all_timeseries(&self) -> Vec<DQTimeSeries> {
self.instruments
.iter()
.flat_map(|instrument| {
instrument.attributes.iter().map(|attribute| DQTimeSeries {
expression: attribute.expression.clone(),
time_series: attribute.time_series.clone(),
})
})
.collect()
}
/// Return a list of lists of DQTimeSeries, where each list contains all the timeseries for each ticker
pub fn get_timeseries_by_ticker(&self) -> Vec<Vec<DQTimeSeries>> {
let timeseries_by_ticker = self
.instruments
.iter()
.flat_map(|instrument| {
instrument.attributes.iter().map(|attribute| DQTimeSeries {
expression: attribute.expression.clone(),
time_series: attribute.time_series.clone(),
})
})
.fold(HashMap::new(), |mut acc, ts| {
let ticker = ts.get_ticker().unwrap();
acc.entry(ticker).or_insert_with(Vec::new).push(ts);
acc
});
timeseries_by_ticker.into_iter().map(|(_, v)| v).collect()
}
}
impl JPMaQSIndicator {
pub fn new(timeseries_list: Vec<DQTimeSeries>) -> Result<Self, Box<dyn Error>> {
let found_tickers = timeseries_list
.iter()
.map(|ts| ts.get_ticker().unwrap())
.collect::<HashSet<String>>();
if found_tickers.len() != 1 {
return Err("All provided timeseries do not belong to the same ticker".into());
}
let ticker = found_tickers.into_iter().next().unwrap();
let metrics = timeseries_list
.iter()
.map(|ts| ts.get_metric().unwrap())
.collect::<Vec<String>>();
let output_df = timeseries_list_to_dataframe(timeseries_list, true)?;
Ok(JPMaQSIndicator {
df: output_df,
ticker: ticker,
metrics: metrics,
})
}
pub fn as_qdf(&self) -> Result<DataFrame, Box<dyn Error>> {
let mut qdf = self.df.clone();
let (cid, xcat) = match self.ticker.split_once('_') {
Some((cid, xcat)) => (cid, xcat),
None => return Err(format!("Invalid ticker format; got '{}'", self.ticker).into()),
};
qdf.with_column(Series::new("cid".into(), vec![cid; qdf.height()]))?;
qdf.with_column(Series::new("xcat".into(), vec![xcat; qdf.height()]))?;
sort_qdf_columns(&mut qdf)?;
Ok(qdf)
}
pub fn save_qdf_to_csv(&self, filename: &str) -> Result<(), Box<dyn Error>> {
save_qdf_to_csv(&mut self.as_qdf()?, filename)
}
}
fn timeseries_list_to_dataframe(
timeseries_list: Vec<DQTimeSeries>,
dropna: bool,
) -> Result<DataFrame, Box<dyn Error>> {
let mut output_df = DataFrame::new(vec![]).expect("Failed to create DataFrame");
if let Some((first, rest)) = timeseries_list.split_first() {
// Convert the first timeseries to DataFrame and clone it to avoid modifying the original
let mut result_df = {
let mut df = first
.to_dataframe()
.expect("Failed to convert first timeseries to DataFrame");
let curr_metric = first.get_metric().expect("Failed to get metric");
let column_name = df.get_column_names()[1].to_string();
df.rename(&column_name, curr_metric.into())
.expect("Failed to rename column");
df.clone()
};
// Iterate over the remaining timeseries
for ts in rest {
// Convert the current timeseries to DataFrame
let mut df = ts
.to_dataframe()
.expect("Failed to convert timeseries to DataFrame");
// Rename the metric column to the metric of the relevant DataFrame
let curr_metric = ts.get_metric().expect("Failed to get metric");
let column_name = df.get_column_names()[1].to_string();
df.rename(&column_name, curr_metric.into())
.expect("Failed to rename column");
// Perform a left join on the 'real_date' column
result_df = result_df
.left_join(&df, ["real_date"], ["real_date"])
.expect("Left join failed");
}
output_df = result_df.clone();
} else {
println!("No timeseries provided.");
}
// drop rows where all values are NA
if dropna {
output_df = output_df
.lazy()
.drop_nulls(None)
.filter(all_horizontal([all().is_not_null()])?)
.collect()
.expect("Failed to drop NA rows");
}
Ok(output_df)
}
fn sort_qdf_columns(qdf: &mut DataFrame) -> Result<(), Box<dyn Error>> {
let index_columns = ["real_date", "cid", "xcat"];
let known_metrics = ["value", "grading", "eop_lag", "mop_lag"];
let df_columns = qdf
.get_column_names()
.into_iter()
.map(|s| s.clone().into_string())
.collect::<Vec<String>>();
let mut unknown_metrics: Vec<String> = df_columns
.iter()
.filter(|&m| !known_metrics.contains(&m.as_str()))
.filter(|&m| !index_columns.contains(&m.as_str()))
.cloned()
.collect();
let mut new_columns: Vec<String> = vec![];
new_columns.extend(index_columns.iter().map(|s| s.to_string()));
for &colname in &known_metrics {
if df_columns.contains(&colname.into()) {
new_columns.push(colname.to_string());
}
}
unknown_metrics.sort();
new_columns.extend(unknown_metrics);
*qdf = qdf
.select(new_columns.clone())
.expect("Failed to select columns");
Ok(())
}
fn save_qdf_to_csv(qdf: &mut DataFrame, filename: &str) -> Result<(), Box<dyn Error>> {
let file = File::create(filename)?;
// Write the DataFrame to a CSV file
let mut csv_writer = CsvWriter::new(file);
csv_writer.finish(qdf)?;
Ok(())
}
#[allow(dead_code)]
fn main() {
let json_data = r#"
{
"instruments": [
{
"attributes": [
{
"expression": "metric, TICKER, metric1()",
"time-series": [
["2022-01-01", 10.0],
["2022-01-02", null]
]
},
{
"expression": "metric, TICKER2, metric2()",
"time-series": [
["2022-01-01", 20.0],
["2022-01-03", 25.0]
]
}
]
},
{
"attributes": [
{
"expression": "metric, TICKER3, metric3()",
"time-series": [
["2022-02-01", 30.0],
["2022-02-02", 35.0]
]
}
]
}
]
}
"#;
let response: DQTimeSeriesResponse = serde_json::from_str(json_data).unwrap();
println!("{:?}", response);
let all_timeseries = response.get_all_timeseries();
for ts in all_timeseries {
println!("{:?}", ts);
match ts.to_dataframe() {
Ok(df) => println!("{:?}", df),
Err(e) => println!("Failed to create DataFrame: {:?}", e),
}
}
}