mirror of
https://github.com/Magnus167/msyrs.git
synced 2025-08-20 13:10:00 +00:00
451 lines
14 KiB
Rust
451 lines
14 KiB
Rust
use polars::error::PolarsError;
|
|
use polars::export::chrono::NaiveDate;
|
|
use polars::prelude::*;
|
|
use polars::series::Series;
|
|
use serde::Deserialize;
|
|
use std::collections::HashMap;
|
|
use std::collections::HashSet;
|
|
use std::error::Error;
|
|
use std::fs::File;
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct DQTimeseriesRequestArgs {
|
|
pub start_date: String,
|
|
pub end_date: String,
|
|
pub calendar: String,
|
|
pub frequency: String,
|
|
pub conversion: String,
|
|
pub nan_treatment: String,
|
|
pub expressions: Vec<String>,
|
|
}
|
|
|
|
impl DQTimeseriesRequestArgs {
|
|
pub fn new(
|
|
start_date: &str,
|
|
end_date: &str,
|
|
calendar: &str,
|
|
frequency: &str,
|
|
conversion: &str,
|
|
nan_treatment: &str,
|
|
expressions: Vec<String>,
|
|
) -> Self {
|
|
DQTimeseriesRequestArgs {
|
|
start_date: start_date.to_string(),
|
|
end_date: end_date.to_string(),
|
|
calendar: calendar.to_string(),
|
|
frequency: frequency.to_string(),
|
|
conversion: conversion.to_string(),
|
|
nan_treatment: nan_treatment.to_string(),
|
|
expressions,
|
|
}
|
|
}
|
|
pub fn update_expressions(&mut self, expressions: Vec<String>) {
|
|
self.expressions = expressions;
|
|
}
|
|
pub fn as_query_string(&self) -> String {
|
|
let mut params = vec![
|
|
("format", "JSON"),
|
|
("start-date", &self.start_date),
|
|
("end-date", &self.end_date),
|
|
("calendar", &self.calendar),
|
|
("frequency", &self.frequency),
|
|
("conversion", &self.conversion),
|
|
("nan_treatment", &self.nan_treatment),
|
|
("data", "NO_REFERENCE_DATA"),
|
|
];
|
|
|
|
for expression in &self.expressions {
|
|
params.push(("expressions", expression));
|
|
}
|
|
|
|
serde_urlencoded::to_string(¶ms).unwrap()
|
|
}
|
|
}
|
|
|
|
impl Default for DQTimeseriesRequestArgs {
|
|
fn default() -> Self {
|
|
DQTimeseriesRequestArgs {
|
|
start_date: "1990-01-01".to_string(),
|
|
end_date: "TODAY+2D".to_string(),
|
|
calendar: "CAL_ALLDAYS".to_string(),
|
|
frequency: "FREQ_DAY".to_string(),
|
|
conversion: "CONV_LASTBUS_ABS".to_string(),
|
|
nan_treatment: "NA_NOTHING".to_string(),
|
|
expressions: Vec::new(),
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/// Response from the DataQuery API
|
|
#[derive(Deserialize, Debug)]
|
|
pub struct DQTimeSeriesResponse {
|
|
instruments: Vec<Instrument>,
|
|
}
|
|
|
|
#[derive(Deserialize, Debug)]
|
|
pub struct DQCatalogueResponse {
|
|
pub items: u32,
|
|
pub catalogue_responses: Vec<DQCatalogueSingleResponse>,
|
|
pub all_instruments: Vec<String>,
|
|
}
|
|
|
|
impl DQCatalogueResponse {
|
|
pub fn new(catalogue_responses: Vec<DQCatalogueSingleResponse>) -> Self {
|
|
let all_instruments: Vec<String> = catalogue_responses
|
|
.iter()
|
|
.flat_map(|response| {
|
|
response
|
|
.instruments
|
|
.iter()
|
|
.map(|instrument| instrument.instrument_name.clone())
|
|
})
|
|
.collect();
|
|
|
|
DQCatalogueResponse {
|
|
items: all_instruments.len() as u32,
|
|
catalogue_responses: catalogue_responses,
|
|
all_instruments: all_instruments,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Deserialize, Debug)]
|
|
pub struct DQCatalogueSingleResponse {
|
|
pub links: Vec<HashMap<String, Option<String>>>,
|
|
pub items: u32,
|
|
pub instruments: Vec<DQCatalogueInstrument>,
|
|
}
|
|
|
|
#[derive(Deserialize, Debug)]
|
|
pub struct DQCatalogueInstrument {
|
|
#[serde(rename = "instrument-id")]
|
|
pub instrument_id: String,
|
|
#[serde(rename = "instrument-name")]
|
|
pub instrument_name: String,
|
|
pub item: u32,
|
|
}
|
|
|
|
/// Representation of DQResponse.Instrument
|
|
#[derive(Deserialize, Debug)]
|
|
struct Instrument {
|
|
attributes: Vec<Attribute>,
|
|
}
|
|
|
|
/// Representation of DQResponse.Instrument.Attribute
|
|
#[derive(Deserialize, Debug)]
|
|
struct Attribute {
|
|
expression: String,
|
|
#[serde(rename = "time-series")]
|
|
time_series: Vec<(String, Option<f64>)>,
|
|
}
|
|
|
|
/// Representation of a single time series
|
|
#[derive(Debug)]
|
|
pub struct DQTimeSeries {
|
|
expression: String,
|
|
time_series: Vec<(String, Option<f64>)>,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct JPMaQSIndicator {
|
|
pub df: DataFrame,
|
|
pub ticker: String,
|
|
pub metrics: Vec<String>,
|
|
}
|
|
|
|
impl DQTimeSeries {
|
|
/// Get the ticker from the expression
|
|
pub fn get_ticker(&self) -> Result<String, Box<dyn Error>> {
|
|
if !self.expression.starts_with("DB(JPMAQS,") {
|
|
return Err("Expression does not start with 'DB(JPMAQS,'".into());
|
|
}
|
|
let ticker = self.expression.split(',').nth(1).unwrap();
|
|
if ticker.is_empty() {
|
|
return Err("Ticker is empty".into());
|
|
}
|
|
Ok(ticker.to_string())
|
|
}
|
|
|
|
/// Get the metric from the expression
|
|
pub fn get_metric(&self) -> Result<String, Box<dyn Error>> {
|
|
if !self.expression.starts_with("DB(JPMAQS,") {
|
|
return Err("Expression does not start with 'DB(JPMAQS,'".into());
|
|
}
|
|
let metric = self
|
|
.expression
|
|
.trim_end_matches(')')
|
|
.split(',')
|
|
.last()
|
|
.unwrap();
|
|
if metric.is_empty() {
|
|
return Err("Metric is empty".into());
|
|
}
|
|
Ok(metric.to_string())
|
|
}
|
|
|
|
/// Convert the time series to a Polars DataFrame
|
|
pub fn to_dataframe(&self) -> Result<DataFrame, PolarsError> {
|
|
let dates: Vec<NaiveDate> = self
|
|
.time_series
|
|
.iter()
|
|
.map(|(date_str, _)| NaiveDate::parse_from_str(date_str, "%Y%m%d").unwrap())
|
|
.collect();
|
|
|
|
let values: Vec<Option<f64>> = self.time_series.iter().map(|(_, value)| *value).collect();
|
|
let date_series = Series::new("date".into(), &dates);
|
|
let value_series = Float64Chunked::new("value".into(), &values);
|
|
|
|
df!(
|
|
"real_date" => date_series,
|
|
self.expression.clone() => value_series
|
|
)
|
|
}
|
|
}
|
|
|
|
impl DQTimeSeriesResponse {
|
|
/// Return a list of all expressions in the response
|
|
pub fn list_expressions(&self) -> Vec<String> {
|
|
self.instruments
|
|
.iter()
|
|
.flat_map(|instrument| {
|
|
instrument
|
|
.attributes
|
|
.iter()
|
|
.map(|attribute| attribute.expression.clone())
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
/// Return a list of all DQTimeSeries in the response
|
|
pub fn get_all_timeseries(&self) -> Vec<DQTimeSeries> {
|
|
self.instruments
|
|
.iter()
|
|
.flat_map(|instrument| {
|
|
instrument.attributes.iter().map(|attribute| DQTimeSeries {
|
|
expression: attribute.expression.clone(),
|
|
time_series: attribute.time_series.clone(),
|
|
})
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
/// Return a list of lists of DQTimeSeries, where each list contains all the timeseries for each ticker
|
|
pub fn get_timeseries_by_ticker(&self) -> Vec<Vec<DQTimeSeries>> {
|
|
let timeseries_by_ticker = self
|
|
.instruments
|
|
.iter()
|
|
.flat_map(|instrument| {
|
|
instrument.attributes.iter().map(|attribute| DQTimeSeries {
|
|
expression: attribute.expression.clone(),
|
|
time_series: attribute.time_series.clone(),
|
|
})
|
|
})
|
|
.fold(HashMap::new(), |mut acc, ts| {
|
|
let ticker = ts.get_ticker().unwrap();
|
|
acc.entry(ticker).or_insert_with(Vec::new).push(ts);
|
|
acc
|
|
});
|
|
|
|
timeseries_by_ticker.into_iter().map(|(_, v)| v).collect()
|
|
}
|
|
}
|
|
|
|
impl JPMaQSIndicator {
|
|
pub fn new(timeseries_list: Vec<DQTimeSeries>) -> Result<Self, Box<dyn Error>> {
|
|
let found_tickers = timeseries_list
|
|
.iter()
|
|
.map(|ts| ts.get_ticker().unwrap())
|
|
.collect::<HashSet<String>>();
|
|
if found_tickers.len() != 1 {
|
|
return Err("All provided timeseries do not belong to the same ticker".into());
|
|
}
|
|
let ticker = found_tickers.into_iter().next().unwrap();
|
|
let metrics = timeseries_list
|
|
.iter()
|
|
.map(|ts| ts.get_metric().unwrap())
|
|
.collect::<Vec<String>>();
|
|
|
|
let output_df = timeseries_list_to_dataframe(timeseries_list, true)?;
|
|
|
|
Ok(JPMaQSIndicator {
|
|
df: output_df,
|
|
ticker: ticker,
|
|
metrics: metrics,
|
|
})
|
|
}
|
|
|
|
pub fn as_qdf(&self) -> Result<DataFrame, Box<dyn Error>> {
|
|
let mut qdf = self.df.clone();
|
|
let (cid, xcat) = match self.ticker.split_once('_') {
|
|
Some((cid, xcat)) => (cid, xcat),
|
|
None => return Err(format!("Invalid ticker format; got '{}'", self.ticker).into()),
|
|
};
|
|
qdf.with_column(Series::new("cid".into(), vec![cid; qdf.height()]))?;
|
|
qdf.with_column(Series::new("xcat".into(), vec![xcat; qdf.height()]))?;
|
|
|
|
sort_qdf_columns(&mut qdf)?;
|
|
|
|
Ok(qdf)
|
|
}
|
|
|
|
pub fn save_qdf_to_csv(&self, filename: &str) -> Result<(), Box<dyn Error>> {
|
|
save_qdf_to_csv(&mut self.as_qdf()?, filename)
|
|
}
|
|
}
|
|
|
|
fn timeseries_list_to_dataframe(
|
|
timeseries_list: Vec<DQTimeSeries>,
|
|
dropna: bool,
|
|
) -> Result<DataFrame, Box<dyn Error>> {
|
|
let mut output_df = DataFrame::new(vec![]).expect("Failed to create DataFrame");
|
|
|
|
if let Some((first, rest)) = timeseries_list.split_first() {
|
|
// Convert the first timeseries to DataFrame and clone it to avoid modifying the original
|
|
let mut result_df = {
|
|
let mut df = first
|
|
.to_dataframe()
|
|
.expect("Failed to convert first timeseries to DataFrame");
|
|
let curr_metric = first.get_metric().expect("Failed to get metric");
|
|
let column_name = df.get_column_names()[1].to_string();
|
|
df.rename(&column_name, curr_metric.into())
|
|
.expect("Failed to rename column");
|
|
df.clone()
|
|
};
|
|
|
|
// Iterate over the remaining timeseries
|
|
for ts in rest {
|
|
// Convert the current timeseries to DataFrame
|
|
let mut df = ts
|
|
.to_dataframe()
|
|
.expect("Failed to convert timeseries to DataFrame");
|
|
|
|
// Rename the metric column to the metric of the relevant DataFrame
|
|
let curr_metric = ts.get_metric().expect("Failed to get metric");
|
|
let column_name = df.get_column_names()[1].to_string();
|
|
df.rename(&column_name, curr_metric.into())
|
|
.expect("Failed to rename column");
|
|
|
|
// Perform a left join on the 'real_date' column
|
|
result_df = result_df
|
|
.left_join(&df, ["real_date"], ["real_date"])
|
|
.expect("Left join failed");
|
|
}
|
|
|
|
output_df = result_df.clone();
|
|
} else {
|
|
println!("No timeseries provided.");
|
|
}
|
|
|
|
// drop rows where all values are NA
|
|
if dropna {
|
|
output_df = output_df
|
|
.lazy()
|
|
.drop_nulls(None)
|
|
.filter(all_horizontal([all().is_not_null()])?)
|
|
.collect()
|
|
.expect("Failed to drop NA rows");
|
|
}
|
|
|
|
Ok(output_df)
|
|
}
|
|
|
|
fn sort_qdf_columns(qdf: &mut DataFrame) -> Result<(), Box<dyn Error>> {
|
|
let index_columns = ["real_date", "cid", "xcat"];
|
|
let known_metrics = ["value", "grading", "eop_lag", "mop_lag"];
|
|
|
|
let df_columns = qdf
|
|
.get_column_names()
|
|
.into_iter()
|
|
.map(|s| s.clone().into_string())
|
|
.collect::<Vec<String>>();
|
|
|
|
let mut unknown_metrics: Vec<String> = df_columns
|
|
.iter()
|
|
.filter(|&m| !known_metrics.contains(&m.as_str()))
|
|
.filter(|&m| !index_columns.contains(&m.as_str()))
|
|
.cloned()
|
|
.collect();
|
|
|
|
let mut new_columns: Vec<String> = vec![];
|
|
new_columns.extend(index_columns.iter().map(|s| s.to_string()));
|
|
for &colname in &known_metrics {
|
|
if df_columns.contains(&colname.into()) {
|
|
new_columns.push(colname.to_string());
|
|
}
|
|
}
|
|
|
|
unknown_metrics.sort();
|
|
new_columns.extend(unknown_metrics);
|
|
*qdf = qdf
|
|
.select(new_columns.clone())
|
|
.expect("Failed to select columns");
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn save_qdf_to_csv(qdf: &mut DataFrame, filename: &str) -> Result<(), Box<dyn Error>> {
|
|
let file = File::create(filename)?;
|
|
|
|
// Write the DataFrame to a CSV file
|
|
let mut csv_writer = CsvWriter::new(file);
|
|
csv_writer.finish(qdf)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[allow(dead_code)]
|
|
fn main() {
|
|
let json_data = r#"
|
|
{
|
|
"instruments": [
|
|
{
|
|
"attributes": [
|
|
{
|
|
"expression": "metric, TICKER, metric1()",
|
|
"time-series": [
|
|
["2022-01-01", 10.0],
|
|
["2022-01-02", null]
|
|
]
|
|
},
|
|
{
|
|
"expression": "metric, TICKER2, metric2()",
|
|
"time-series": [
|
|
["2022-01-01", 20.0],
|
|
["2022-01-03", 25.0]
|
|
]
|
|
}
|
|
]
|
|
},
|
|
{
|
|
"attributes": [
|
|
{
|
|
"expression": "metric, TICKER3, metric3()",
|
|
"time-series": [
|
|
["2022-02-01", 30.0],
|
|
["2022-02-02", 35.0]
|
|
]
|
|
}
|
|
]
|
|
}
|
|
]
|
|
}
|
|
"#;
|
|
|
|
let response: DQTimeSeriesResponse = serde_json::from_str(json_data).unwrap();
|
|
println!("{:?}", response);
|
|
|
|
let all_timeseries = response.get_all_timeseries();
|
|
for ts in all_timeseries {
|
|
println!("{:?}", ts);
|
|
match ts.to_dataframe() {
|
|
Ok(df) => println!("{:?}", df),
|
|
Err(e) => println!("Failed to create DataFrame: {:?}", e),
|
|
}
|
|
}
|
|
}
|