catalogue working

This commit is contained in:
Palash Tyagi 2024-11-07 00:05:09 +00:00
parent 9efd434210
commit 693b4fad96
3 changed files with 296 additions and 118 deletions

View File

@ -3,7 +3,8 @@
use macrosynergy_dataquery::oauth_client::OAuthClient; use macrosynergy_dataquery::oauth_client::OAuthClient;
use macrosynergy_dataquery::requester::Requester; use macrosynergy_dataquery::requester::Requester;
use macrosynergy_dataquery::timeseries::DQResponse; use macrosynergy_dataquery::timeseries::DQCatalogueResponse;
use macrosynergy_dataquery::timeseries::DQTimeSeriesResponse;
// use macrosynergy_dataquery::timeseries::DQTimeSeries; // use macrosynergy_dataquery::timeseries::DQTimeSeries;
use macrosynergy_dataquery::timeseries::JPMaQSIndicator; use macrosynergy_dataquery::timeseries::JPMaQSIndicator;
// use macrosynergy_dataquery::timeseries::TimeSeriesList; // use macrosynergy_dataquery::timeseries::TimeSeriesList;
@ -23,30 +24,36 @@ fn main() {
let expressions_a = vec![ let expressions_a = vec![
"DB(JPMAQS,USD_EQXR_NSA,value)", "DB(JPMAQS,USD_EQXR_NSA,value)",
"DB(JPMAQS,USD_EQXR_NSA,grading)", "DB(JPMAQS,USD_EQXR_NSA,grading)",
// ]; "DB(JPMAQS,USD_EQXR_NSA,eop_lag)",
"DB(JPMAQS,USD_EQXR_NSA,mop_lag)",
// let expressions_b = vec![
"DB(JPMAQS,GBP_EQXR_NSA,value)", "DB(JPMAQS,GBP_EQXR_NSA,value)",
"DB(JPMAQS,GBP_EQXR_NSA,grading)", "DB(JPMAQS,GBP_EQXR_NSA,grading)",
"DB(JPMAQS,GBP_EQXR_NSA,eop_lag)",
"DB(JPMAQS,GBP_EQXR_NSA,mop_lag)",
]; ];
// expressions_b.len(); // expressions_b.len();
let response = requester // let response = requester
.get_timeseries_with_defaults(expressions_a) // .get_timeseries_with_defaults(expressions_a)
.unwrap(); // .unwrap();
let json_data = response.text().unwrap(); // let json_data = response.text().unwrap();
// let dqresponse: DQResponse = serde_json::from_str(&json_data).unwrap();
// for ts_group in dqresponse.get_timeseries_by_ticker() {
// let jpmi = JPMaQSIndicator::new(ts_group).unwrap();
// println!("{:?}", jpmi.as_qdf().unwrap());
// }
let dqresponse: DQResponse = serde_json::from_str(&json_data).unwrap(); // get catalog
let timeseries_list = dqresponse.get_all_timeseries(); let response = requester.get_catalogue("JPMAQS").unwrap();
for ts in timeseries_list {
// println!("{:?}", ts); // let json_data = response.text().unwrap();
// ts.to_dataframe().unwrap(); let json_data = response[0].to_string();
println!("{:?}", ts.to_dataframe().unwrap()); println!("{}", json_data);
}
for ts_group in dqresponse.get_timeseries_by_ticker() { // try to pull into DQResponse
let jpmi = JPMaQSIndicator::new(ts_group).unwrap(); let response: DQCatalogueResponse = serde_json::from_str(&json_data).unwrap();
println!("{:?}", jpmi.as_qdf().unwrap());
} println!("{:?}", response);
} }

View File

@ -7,13 +7,14 @@ use crate::oauth_client::OAuthClient;
const API_BASE_URL: &str = "https://api-developer.jpmorgan.com/research/dataquery-authe/api/v2"; const API_BASE_URL: &str = "https://api-developer.jpmorgan.com/research/dataquery-authe/api/v2";
const HEARTBEAT_ENDPOINT: &str = "/services/heartbeat"; const HEARTBEAT_ENDPOINT: &str = "/services/heartbeat";
const TIMESERIES_ENDPOINT: &str = "/expressions/time-series"; const TIMESERIES_ENDPOINT: &str = "/expressions/time-series";
const CATALOGUE_ENDPOINT: &str = "/group/instruments";
const JPMAQS_CATALOGUE_GROUP: &str = "JPMAQS";
pub struct Requester { pub struct Requester {
oauth_client: OAuthClient, oauth_client: OAuthClient,
rqclient: Client, rqclient: Client,
} }
#[derive(Default)]
pub struct TimeseriesRequestArgs { pub struct TimeseriesRequestArgs {
pub start_date: String, pub start_date: String,
pub end_date: String, pub end_date: String,
@ -26,22 +27,57 @@ pub struct TimeseriesRequestArgs {
impl TimeseriesRequestArgs { impl TimeseriesRequestArgs {
pub fn new( pub fn new(
start_date: Option<&str>, start_date: &str,
end_date: Option<&str>, end_date: &str,
calendar: Option<&str>, calendar: &str,
frequency: Option<&str>, frequency: &str,
conversion: Option<&str>, conversion: &str,
nan_treatment: Option<&str>, nan_treatment: &str,
expressions: Vec<&str>, expressions: Vec<String>,
) -> Self { ) -> Self {
TimeseriesRequestArgs { TimeseriesRequestArgs {
start_date: start_date.unwrap_or("2024-10-20").to_string(), start_date: start_date.to_string(),
end_date: end_date.unwrap_or("TODAY").to_string(), end_date: end_date.to_string(),
calendar: calendar.unwrap_or("CAL_ALLDAYS").to_string(), calendar: calendar.to_string(),
frequency: frequency.unwrap_or("FREQ_DAY").to_string(), frequency: frequency.to_string(),
conversion: conversion.unwrap_or("CONV_LASTBUS_ABS").to_string(), conversion: conversion.to_string(),
nan_treatment: nan_treatment.unwrap_or("NA_NOTHING").to_string(), nan_treatment: nan_treatment.to_string(),
expressions: expressions.into_iter().map(|s| s.to_string()).collect(), expressions,
}
}
pub fn update_expressions(&mut self, expressions: Vec<String>) {
self.expressions = expressions;
}
pub fn as_query_string(&self) -> String {
let mut params = vec![
("format", "JSON"),
("start-date", &self.start_date),
("end-date", &self.end_date),
("calendar", &self.calendar),
("frequency", &self.frequency),
("conversion", &self.conversion),
("nan_treatment", &self.nan_treatment),
("data", "NO_REFERENCE_DATA"),
];
for expression in &self.expressions {
params.push(("expressions", expression));
}
serde_urlencoded::to_string(&params).unwrap()
}
}
impl Default for TimeseriesRequestArgs {
fn default() -> Self {
TimeseriesRequestArgs {
start_date: "1990-01-01".to_string(),
end_date: "TODAY+2D".to_string(),
calendar: "CAL_ALLDAYS".to_string(),
frequency: "FREQ_DAY".to_string(),
conversion: "CONV_LASTBUS_ABS".to_string(),
nan_treatment: "NA_NOTHING".to_string(),
expressions: Vec::new(),
} }
} }
} }
@ -90,6 +126,43 @@ impl Requester {
Ok(()) Ok(())
} }
pub fn get_catalogue(
&mut self,
catalogue_group: &str,
) -> Result<Vec<serde_json::Value>, Box<dyn Error>> {
let mut responses: Vec<serde_json::Value> = Vec::new();
let mut next_page = Some(format!(
"{}?group-id={}",
CATALOGUE_ENDPOINT, catalogue_group
));
while let Some(endpoint) = next_page {
std::thread::sleep(std::time::Duration::from_millis(200));
let response = self._request(reqwest::Method::GET, &endpoint)?;
if !response.status().is_success() {
return Err(Box::new(response.error_for_status().unwrap_err()));
}
// Parse the response body as JSON
let json: serde_json::Value = response.json()?;
// Add the JSON response to the vector
responses.push(json.clone());
// Check for the next link
next_page = json["links"].as_array().and_then(|links| {
links
.iter()
.find(|link| link["rel"] == "next")
.and_then(|link| link["href"].as_str())
.map(|s| s.to_string())
});
}
Ok(responses)
}
pub fn get_timeseries_batch( pub fn get_timeseries_batch(
&mut self, &mut self,
args: TimeseriesRequestArgs, args: TimeseriesRequestArgs,
@ -98,24 +171,7 @@ impl Requester {
return Err("Number of expressions must be between 1 and 20".into()); return Err("Number of expressions must be between 1 and 20".into());
} }
// Construct the parameters let query_string = args.as_query_string();
let mut params = vec![
("format", "JSON"),
("start-date", &args.start_date),
("end-date", &args.end_date),
("calendar", &args.calendar),
("frequency", &args.frequency),
("conversion", &args.conversion),
("nan_treatment", &args.nan_treatment),
("data", "NO_REFERENCE_DATA"),
];
// Add expressions to the parameters
for expression in &args.expressions {
params.push(("expressions", expression));
}
let query_string = serde_urlencoded::to_string(&params)?;
let endpoint = format!("{}?{}", TIMESERIES_ENDPOINT, query_string); let endpoint = format!("{}?{}", TIMESERIES_ENDPOINT, query_string);
let response = self._request(reqwest::Method::GET, &endpoint)?; let response = self._request(reqwest::Method::GET, &endpoint)?;
@ -126,16 +182,49 @@ impl Requester {
&mut self, &mut self,
expressions: Vec<&str>, expressions: Vec<&str>,
) -> Result<reqwest::blocking::Response, Box<dyn Error>> { ) -> Result<reqwest::blocking::Response, Box<dyn Error>> {
let args = TimeseriesRequestArgs::new( // replace just the expressions
"1990-01-01".into(), let args = TimeseriesRequestArgs {
"TODAY+1".into(), expressions: expressions.into_iter().map(|s| s.to_string()).collect(),
"CAL_ALLDAYS".into(), ..Default::default()
"FREQ_DAY".into(), };
"CONV_LASTBUS_ABS".into(),
"NA_NOTHING".into(),
expressions,
);
self.get_timeseries_batch(args) self.get_timeseries_batch(args)
} }
} }
#[allow(dead_code)]
fn main() {
let client_id = std::env::var("DQ_CLIENT_ID").unwrap();
let client_secret = std::env::var("DQ_CLIENT_SECRET").unwrap();
let mut oauth_client = OAuthClient::new(client_id, client_secret);
oauth_client.fetch_token().unwrap();
let mut requester = Requester::new(oauth_client);
requester.check_connection().unwrap();
let response = requester.get_catalogue(JPMAQS_CATALOGUE_GROUP).unwrap();
let json_data = response[0].to_string();
println!("{}", json_data);
// try to pull into
// let expressions_a = vec![
// "DB(JPMAQS,USD_EQXR_NSA,value)",
// "DB(JPMAQS,USD_EQXR_NSA,grading)",
// "DB(JPMAQS,USD_EQXR_NSA,eop_lag)",
// "DB(JPMAQS,USD_EQXR_NSA,mop_lag)",
// "DB(JPMAQS,GBP_EQXR_NSA,value)",
// "DB(JPMAQS,GBP_EQXR_NSA,grading)",
// "DB(JPMAQS,GBP_EQXR_NSA,eop_lag)",
// "DB(JPMAQS,GBP_EQXR_NSA,mop_lag)",
// ];
// let response = requester
// .get_timeseries_with_defaults(expressions_a.iter().map(|s| *s).collect())
// .unwrap();
// let json_data = response.text().unwrap();
// println!("{}", json_data);
}

View File

@ -1,23 +1,42 @@
use polars::error::PolarsError; use polars::error::PolarsError;
use polars::export::chrono::{NaiveDate, NaiveDateTime}; use polars::export::chrono::NaiveDate;
use polars::prelude::*; use polars::prelude::*;
use polars::series::Series; use polars::series::Series;
use serde::Deserialize; use serde::Deserialize;
use std::collections::HashMap; use std::collections::HashMap;
use std::collections::HashSet; use std::collections::HashSet;
use std::error::Error; use std::error::Error;
use std::fs::File;
/// Response from the DataQuery API
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
pub struct DQResponse { pub struct DQTimeSeriesResponse {
instruments: Vec<Instrument>, instruments: Vec<Instrument>,
} }
#[derive(Deserialize, Debug)]
pub struct DQCatalogueResponse {
links: Vec<HashMap<String, String>>,
items: u32,
instruments: Vec<DQCatalogueInstrument>,
}
#[derive(Deserialize, Debug)]
pub struct DQCatalogueInstrument {
#[serde(rename = "instrument-id")]
instrument_id: String,
#[serde(rename = "instrument-name")]
instrument_name: String,
item: u32,
}
/// Representation of DQResponse.Instrument
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
struct Instrument { struct Instrument {
attributes: Vec<Attribute>, attributes: Vec<Attribute>,
} }
/// Representation of DQResponse.Instrument.Attribute
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
struct Attribute { struct Attribute {
expression: String, expression: String,
@ -25,6 +44,7 @@ struct Attribute {
time_series: Vec<(String, Option<f64>)>, time_series: Vec<(String, Option<f64>)>,
} }
/// Representation of a single time series
#[derive(Debug)] #[derive(Debug)]
pub struct DQTimeSeries { pub struct DQTimeSeries {
expression: String, expression: String,
@ -87,7 +107,7 @@ impl DQTimeSeries {
} }
} }
impl DQResponse { impl DQTimeSeriesResponse {
/// Return a list of all DQTimeSeries in the response /// Return a list of all DQTimeSeries in the response
pub fn get_all_timeseries(&self) -> Vec<DQTimeSeries> { pub fn get_all_timeseries(&self) -> Vec<DQTimeSeries> {
self.instruments self.instruments
@ -138,73 +158,135 @@ impl JPMaQSIndicator {
.map(|ts| ts.get_metric().unwrap()) .map(|ts| ts.get_metric().unwrap())
.collect::<Vec<String>>(); .collect::<Vec<String>>();
let mut all_dates = timeseries_list let output_df = timeseries_list_to_dataframe(timeseries_list, true)?;
.iter()
.flat_map(|ts| ts.time_series.iter().map(|(date, _)| date.to_string()))
.collect::<HashSet<String>>()
.into_iter()
.collect::<Vec<String>>();
let sorted_dates = all_dates
.iter()
.map(|s| s.to_string())
.collect::<Vec<String>>();
// create a vector of vectors with the values for each metric
let mut values_vec_vec = Vec::with_capacity(metrics.len() + 1);
// add real_date column
for i_metric in 0..metrics.len() {
let metric = &metrics[i_metric];
let mut metric_values = Vec::with_capacity(sorted_dates.len());
// when a date is missing, we need to insert a null value
for date in &sorted_dates {
let value = timeseries_list
.iter()
.find(|ts| ts.get_metric().unwrap() == *metric)
.and_then(|ts| {
ts.time_series
.iter()
.find(|(d, _)| d == date)
.map(|(_, v)| *v)
})
.unwrap_or(None);
metric_values.push(value);
}
values_vec_vec.push(metric_values);
}
let mut columns: Vec<Column> = values_vec_vec
.into_iter()
.enumerate()
.map(|(i, values)| {
let colx = Column::new(metrics[i].clone().into(), values);
colx
})
.collect();
let date_col = Column::new("real_date".into(), sorted_dates);
columns.insert(0, date_col);
let df = DataFrame::new(columns).unwrap();
Ok(JPMaQSIndicator { Ok(JPMaQSIndicator {
df, df: output_df,
ticker, ticker: ticker,
metrics, metrics: metrics,
}) })
} }
pub fn as_qdf(&self) -> Result<DataFrame, PolarsError> { pub fn as_qdf(&self) -> Result<DataFrame, Box<dyn Error>> {
let mut qdf = self.df.clone(); let mut qdf = self.df.clone();
let (cid, xcat) = match self.ticker.split_once('_') { let (cid, xcat) = match self.ticker.split_once('_') {
Some((cid, xcat)) => (cid, xcat), Some((cid, xcat)) => (cid, xcat),
None => return Err(PolarsError::ComputeError("Invalid ticker format".into())), None => return Err(format!("Invalid ticker format; got '{}'", self.ticker).into()),
}; };
qdf.with_column(Series::new("cid".into(), vec![cid; qdf.height()]))?; qdf.with_column(Series::new("cid".into(), vec![cid; qdf.height()]))?;
qdf.with_column(Series::new("xcat".into(), vec![xcat; qdf.height()]))?; qdf.with_column(Series::new("xcat".into(), vec![xcat; qdf.height()]))?;
sort_qdf_columns(&mut qdf)?;
Ok(qdf) Ok(qdf)
} }
pub fn save_qdf_to_csv(&self, filename: &str) -> Result<(), Box<dyn Error>> {
save_qdf_to_csv(&mut self.as_qdf()?, filename)
}
} }
fn timeseries_list_to_dataframe(
timeseries_list: Vec<DQTimeSeries>,
dropna: bool,
) -> Result<DataFrame, Box<dyn Error>> {
let mut output_df = DataFrame::new(vec![]).expect("Failed to create DataFrame");
if let Some((first, rest)) = timeseries_list.split_first() {
// Convert the first timeseries to DataFrame and clone it to avoid modifying the original
let mut result_df = {
let mut df = first
.to_dataframe()
.expect("Failed to convert first timeseries to DataFrame");
let curr_metric = first.get_metric().expect("Failed to get metric");
let column_name = df.get_column_names()[1].to_string();
df.rename(&column_name, curr_metric.into())
.expect("Failed to rename column");
df.clone()
};
// Iterate over the remaining timeseries
for ts in rest {
// Convert the current timeseries to DataFrame
let mut df = ts
.to_dataframe()
.expect("Failed to convert timeseries to DataFrame");
// Rename the metric column to the metric of the relevant DataFrame
let curr_metric = ts.get_metric().expect("Failed to get metric");
let column_name = df.get_column_names()[1].to_string();
df.rename(&column_name, curr_metric.into())
.expect("Failed to rename column");
// Perform a left join on the 'real_date' column
result_df = result_df
.left_join(&df, ["real_date"], ["real_date"])
.expect("Left join failed");
}
output_df = result_df.clone();
} else {
println!("No timeseries provided.");
}
// drop rows where all values are NA
if dropna {
output_df = output_df
.lazy()
.drop_nulls(None)
.filter(all_horizontal([all().is_not_null()])?)
.collect()
.expect("Failed to drop NA rows");
}
Ok(output_df)
}
fn sort_qdf_columns(qdf: &mut DataFrame) -> Result<(), Box<dyn Error>> {
let index_columns = ["real_date", "cid", "xcat"];
let known_metrics = ["value", "grading", "eop_lag", "mop_lag"];
let df_columns = qdf
.get_column_names()
.into_iter()
.map(|s| s.clone().into_string())
.collect::<Vec<String>>();
let mut unknown_metrics: Vec<String> = df_columns
.iter()
.filter(|&m| !known_metrics.contains(&m.as_str()))
.filter(|&m| !index_columns.contains(&m.as_str()))
.cloned()
.collect();
let mut new_columns: Vec<String> = vec![];
new_columns.extend(index_columns.iter().map(|s| s.to_string()));
for &colname in &known_metrics {
if df_columns.contains(&colname.into()) {
new_columns.push(colname.to_string());
}
}
unknown_metrics.sort();
new_columns.extend(unknown_metrics);
*qdf = qdf
.select(new_columns.clone())
.expect("Failed to select columns");
Ok(())
}
fn save_qdf_to_csv(qdf: &mut DataFrame, filename: &str) -> Result<(), Box<dyn Error>> {
let file = File::create(filename)?;
// Write the DataFrame to a CSV file
let mut csv_writer = CsvWriter::new(file);
csv_writer.finish(qdf)?;
Ok(())
}
#[allow(dead_code)]
fn main() { fn main() {
let json_data = r#" let json_data = r#"
{ {
@ -242,7 +324,7 @@ fn main() {
} }
"#; "#;
let response: DQResponse = serde_json::from_str(json_data).unwrap(); let response: DQTimeSeriesResponse = serde_json::from_str(json_data).unwrap();
println!("{:?}", response); println!("{:?}", response);
let all_timeseries = response.get_all_timeseries(); let all_timeseries = response.get_all_timeseries();