Add DQRequester and DQTimeseriesRequestArgs structs; update dependencies and refactor main logic

This commit is contained in:
Palash Tyagi 2024-11-07 14:31:05 +00:00
parent 693b4fad96
commit 4d14de8017
6 changed files with 127 additions and 61 deletions

1
Cargo.lock generated
View File

@ -1243,6 +1243,7 @@ version = "0.0.1"
dependencies = [
"anyhow",
"polars",
"rand",
"reqwest 0.11.27",
"serde",
"serde_json",

View File

@ -10,3 +10,4 @@ serde_urlencoded = "0.7"
serde = { version = "1.0", features = ["derive"] }
polars = { version = "0.44.2", features = ["temporal", "lazy"] }
anyhow = "1.0"
rand = "0.8"

View File

@ -1,36 +1,31 @@
// use crate::oauth_client::OAuthClient;
// use crate::requester::Requester;
use macrosynergy_dataquery::oauth_client::OAuthClient;
use macrosynergy_dataquery::requester::Requester;
use macrosynergy_dataquery::timeseries::DQCatalogueResponse;
use macrosynergy_dataquery::timeseries::DQTimeSeriesResponse;
// use macrosynergy_dataquery::timeseries::DQTimeSeries;
use macrosynergy_dataquery::timeseries::JPMaQSIndicator;
// use macrosynergy_dataquery::timeseries::TimeSeriesList;
use macrosynergy_dataquery::requester;
use macrosynergy_dataquery::requester::*;
use macrosynergy_dataquery::timeseries::*;
use rand::seq::SliceRandom; // Import the trait for `shuffle`
use std::env;
fn main() {
let client_id = env::var("DQ_CLIENT_ID").unwrap();
let client_secret = env::var("DQ_CLIENT_SECRET").unwrap();
// let client_id = env::var("DQ_CLIENT_ID").unwrap();
// let client_secret = env::var("DQ_CLIENT_SECRET").unwrap();
let mut oauth_client = OAuthClient::new(client_id, client_secret);
oauth_client.fetch_token().unwrap();
// let mut oauth_client = OAuthClient::new(client_id, client_secret);
// oauth_client.fetch_token().unwrap();
let mut requester = Requester::new(oauth_client);
// let mut requester = DQRequester::new(oauth_client);
let mut requester = requester::DQRequester::new(OAuthClient::default());
requester.check_connection().unwrap();
let expressions_a = vec![
"DB(JPMAQS,USD_EQXR_NSA,value)",
"DB(JPMAQS,USD_EQXR_NSA,grading)",
"DB(JPMAQS,USD_EQXR_NSA,eop_lag)",
"DB(JPMAQS,USD_EQXR_NSA,mop_lag)",
"DB(JPMAQS,GBP_EQXR_NSA,value)",
"DB(JPMAQS,GBP_EQXR_NSA,grading)",
"DB(JPMAQS,GBP_EQXR_NSA,eop_lag)",
"DB(JPMAQS,GBP_EQXR_NSA,mop_lag)",
];
// let expressions = vec![
// "DB(JPMAQS,USD_EQXR_NSA,value)",
// "DB(JPMAQS,USD_EQXR_NSA,grading)",
// "DB(JPMAQS,USD_EQXR_NSA,eop_lag)",
// "DB(JPMAQS,USD_EQXR_NSA,mop_lag)",
// "DB(JPMAQS,GBP_EQXR_NSA,value)",
// "DB(JPMAQS,GBP_EQXR_NSA,grading)",
// "DB(JPMAQS,GBP_EQXR_NSA,eop_lag)",
// "DB(JPMAQS,GBP_EQXR_NSA,mop_lag)",
// ];
// expressions_b.len();
@ -46,14 +41,64 @@ fn main() {
// }
// get catalog
let response = requester.get_catalogue("JPMAQS").unwrap();
let responses = requester.get_catalogue("JPMAQS", 1000).unwrap();
// let json_data = response.text().unwrap();
let json_data = response[0].to_string();
println!("{}", json_data);
// let json_data = response[0].to_string();
// println!("{}", json_data);
// try to pull into DQResponse
let response: DQCatalogueResponse = serde_json::from_str(&json_data).unwrap();
// // try to pull into DQResponse
// let response: DQCatalogueResponse = serde_json::from_str(&json_data).unwrap();
println!("{:?}", response);
// println!("{:?}", response);
// // print all instrument names
// for instrument in response.instruments {
// println!("{}", instrument.instrument_name);
// }
let mut tickers_vec = vec![];
for rsp in responses {
let response: DQCatalogueResponse = serde_json::from_str(&rsp.to_string()).unwrap();
for instrument in response.instruments {
tickers_vec.push(instrument.instrument_name);
}
}
for ticker in tickers_vec.clone().iter().take(5) {
println!("{}", ticker);
}
fn make_expression(ticker: &str) -> Vec<String> {
// return metrics
return ["value", "grading", "eop_lag", "mop_lag"]
.iter()
.map(|metric| format!("DB(JPMAQS,{},{})", ticker, metric))
.collect::<Vec<String>>();
}
// make expressions for 10 ticker
let mut expressions_vec = vec![];
let mut shuffled_tickers = tickers_vec.clone();
shuffled_tickers.shuffle(&mut rand::thread_rng());
for ticker in shuffled_tickers.clone() {
for expr in make_expression(&ticker) {
expressions_vec.push(expr);
}
}
let rqargs = DQTimeseriesRequestArgs {
expressions: expressions_vec[0..20].to_vec(),
..Default::default()
};
// on the first 20 expressions
let response = requester.get_timeseries_batch(rqargs).unwrap();
let dq_response: DQTimeSeriesResponse =
serde_json::from_str(&response.text().unwrap()).unwrap();
for ts_group in dq_response.get_timeseries_by_ticker() {
let jpmi = JPMaQSIndicator::new(ts_group).unwrap();
println!("{:?}", jpmi.as_qdf().unwrap());
}
}

View File

@ -2,7 +2,7 @@ use reqwest::blocking::Client;
use reqwest::Error as ReqwestError;
use serde_json::Value;
use std::collections::HashMap;
// use std::env;
use std::env;
use std::error::Error;
use std::time::{Duration, SystemTime};
@ -72,7 +72,7 @@ impl OAuthClient {
pub fn get_headers(&mut self) -> Result<HashMap<String, String>, Box<dyn Error>> {
if self.is_token_expired() {
println!("Token has expired. Fetching a new token...");
// println!("Token has expired. Fetching a new token...");
self.fetch_token()?;
}
let mut headers = HashMap::new();
@ -85,3 +85,17 @@ impl OAuthClient {
Ok(headers)
}
}
impl Default for OAuthClient {
fn default() -> Self {
OAuthClient {
client_id: env::var("DQ_CLIENT_ID".to_string()).unwrap().to_string(),
client_secret: env::var("DQ_CLIENT_SECRET".to_string())
.unwrap()
.to_string(),
token_url: OAUTH_TOKEN_URL.to_string(),
resource_id: OAUTH_RESOURCE_ID.to_string(),
access_token: None,
expires_at: None,
}
}
}

View File

@ -10,12 +10,12 @@ const TIMESERIES_ENDPOINT: &str = "/expressions/time-series";
const CATALOGUE_ENDPOINT: &str = "/group/instruments";
const JPMAQS_CATALOGUE_GROUP: &str = "JPMAQS";
pub struct Requester {
pub struct DQRequester {
oauth_client: OAuthClient,
rqclient: Client,
}
pub struct TimeseriesRequestArgs {
pub struct DQTimeseriesRequestArgs {
pub start_date: String,
pub end_date: String,
pub calendar: String,
@ -25,7 +25,7 @@ pub struct TimeseriesRequestArgs {
pub expressions: Vec<String>,
}
impl TimeseriesRequestArgs {
impl DQTimeseriesRequestArgs {
pub fn new(
start_date: &str,
end_date: &str,
@ -35,7 +35,7 @@ impl TimeseriesRequestArgs {
nan_treatment: &str,
expressions: Vec<String>,
) -> Self {
TimeseriesRequestArgs {
DQTimeseriesRequestArgs {
start_date: start_date.to_string(),
end_date: end_date.to_string(),
calendar: calendar.to_string(),
@ -68,9 +68,9 @@ impl TimeseriesRequestArgs {
}
}
impl Default for TimeseriesRequestArgs {
impl Default for DQTimeseriesRequestArgs {
fn default() -> Self {
TimeseriesRequestArgs {
DQTimeseriesRequestArgs {
start_date: "1990-01-01".to_string(),
end_date: "TODAY+2D".to_string(),
calendar: "CAL_ALLDAYS".to_string(),
@ -82,9 +82,9 @@ impl Default for TimeseriesRequestArgs {
}
}
impl Requester {
impl DQRequester {
pub fn new(oauth_client: OAuthClient) -> Self {
Requester {
DQRequester {
oauth_client,
rqclient: Client::new(),
}
@ -129,11 +129,17 @@ impl Requester {
pub fn get_catalogue(
&mut self,
catalogue_group: &str,
page_size: u32,
) -> Result<Vec<serde_json::Value>, Box<dyn Error>> {
let mut responses: Vec<serde_json::Value> = Vec::new();
if page_size < 1 || page_size > 1000 {
return Err("Page size must be between 1 and 1000".into());
}
let mut next_page = Some(format!(
"{}?group-id={}",
CATALOGUE_ENDPOINT, catalogue_group
"{}?group-id={}&limit={}",
CATALOGUE_ENDPOINT, catalogue_group, page_size
));
while let Some(endpoint) = next_page {
@ -144,20 +150,17 @@ impl Requester {
return Err(Box::new(response.error_for_status().unwrap_err()));
}
// Parse the response body as JSON
let json: serde_json::Value = response.json()?;
// Add the JSON response to the vector
responses.push(json.clone());
// Check for the next link
next_page = json["links"].as_array().and_then(|links| {
let links = json.get("links");
let links_array = links.and_then(|links| links.as_array());
let next_link = links_array.and_then(|links| {
links
.iter()
.find(|link| link["rel"] == "next")
.and_then(|link| link["href"].as_str())
.map(|s| s.to_string())
.find_map(|link| link.get("next")?.as_str().map(|s| s.to_string()))
});
next_page = next_link;
}
Ok(responses)
@ -165,7 +168,7 @@ impl Requester {
pub fn get_timeseries_batch(
&mut self,
args: TimeseriesRequestArgs,
args: DQTimeseriesRequestArgs,
) -> Result<reqwest::blocking::Response, Box<dyn Error>> {
if args.expressions.len() < 1 || args.expressions.len() > 20 {
return Err("Number of expressions must be between 1 and 20".into());
@ -183,7 +186,7 @@ impl Requester {
expressions: Vec<&str>,
) -> Result<reqwest::blocking::Response, Box<dyn Error>> {
// replace just the expressions
let args = TimeseriesRequestArgs {
let args = DQTimeseriesRequestArgs {
expressions: expressions.into_iter().map(|s| s.to_string()).collect(),
..Default::default()
};
@ -200,10 +203,12 @@ fn main() {
let mut oauth_client = OAuthClient::new(client_id, client_secret);
oauth_client.fetch_token().unwrap();
let mut requester = Requester::new(oauth_client);
let mut requester = DQRequester::new(oauth_client);
requester.check_connection().unwrap();
let response = requester.get_catalogue(JPMAQS_CATALOGUE_GROUP).unwrap();
let response = requester
.get_catalogue(JPMAQS_CATALOGUE_GROUP, 1000)
.unwrap();
let json_data = response[0].to_string();
println!("{}", json_data);

View File

@ -16,18 +16,18 @@ pub struct DQTimeSeriesResponse {
#[derive(Deserialize, Debug)]
pub struct DQCatalogueResponse {
links: Vec<HashMap<String, String>>,
items: u32,
instruments: Vec<DQCatalogueInstrument>,
pub links: Vec<HashMap<String, Option<String>>>,
pub items: u32,
pub instruments: Vec<DQCatalogueInstrument>,
}
#[derive(Deserialize, Debug)]
pub struct DQCatalogueInstrument {
#[serde(rename = "instrument-id")]
instrument_id: String,
pub instrument_id: String,
#[serde(rename = "instrument-name")]
instrument_name: String,
item: u32,
pub instrument_name: String,
pub item: u32,
}
/// Representation of DQResponse.Instrument