This commit is contained in:
Palash Tyagi 2024-11-03 17:31:31 +00:00
parent 01fea29fde
commit 45ae88b330
8 changed files with 2815 additions and 60 deletions

2579
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,8 +1,11 @@
[package]
name = "macrosynergy_dataquery"
version = "0.1.0"
version = "0.0.1"
edition = "2021"
[dependencies]
reqwest = { version = "0.11", features = ["blocking", "json"] }
serde_json = "1.0"
serde_json = "1.0"
serde_urlencoded = "0.7"
serde = { version = "1.0", features = ["derive"] }
polars = { version = "0.44.2", features = ["temporal"] }

0
src/dqinterface.rs Normal file
View File

View File

@ -1,2 +1,4 @@
pub mod oauth_client;
pub mod requester;
pub mod timeseries;
// pub mod main;

51
src/main.rs Normal file
View File

@ -0,0 +1,51 @@
// use crate::oauth_client::OAuthClient;
// use crate::requester::Requester;
use macrosynergy_dataquery::oauth_client::OAuthClient;
use macrosynergy_dataquery::requester::Requester;
use macrosynergy_dataquery::timeseries::DQResponse;
// use macrosynergy_dataquery::timeseries::DQTimeSeries;
// use macrosynergy_dataquery::timeseries::JPMaQSIndicator;
// use macrosynergy_dataquery::timeseries::TimeSeriesList;
use std::env;
fn main() {
let client_id = env::var("DQ_CLIENT_ID").unwrap();
let client_secret = env::var("DQ_CLIENT_SECRET").unwrap();
let mut oauth_client = OAuthClient::new(client_id, client_secret);
oauth_client.fetch_token().unwrap();
let mut requester = Requester::new(oauth_client);
requester.check_connection().unwrap();
let expressions_a = vec![
"DB(JPMAQS,USD_EQXR_NSA,value)",
"DB(JPMAQS,USD_EQXR_NSA,grading)",
// ];
// let expressions_b = vec![
"DB(JPMAQS,GBP_EQXR_NSA,value)",
"DB(JPMAQS,GBP_EQXR_NSA,grading)",
];
// expressions_b.len();
let response = requester
.get_timeseries_with_defaults(expressions_a)
.unwrap();
let json_data = response.text().unwrap();
let dqresponse: DQResponse = serde_json::from_str(&json_data).unwrap();
let timeseries_list = dqresponse.get_all_timeseries();
for ts in timeseries_list {
// println!("{:?}", ts);
// ts.to_dataframe().unwrap();
println!("{:?}", ts.to_dataframe().unwrap());
}
}

View File

@ -7,11 +7,13 @@ use std::error::Error;
use std::time::{Duration, SystemTime};
pub const OAUTH_TOKEN_URL: &str = "https://authe.jpmchase.com/as/token.oauth2";
pub const OAUTH_RESOURCE_ID: &str = "JPMC:URI:RS-06785-DataQueryExternalApi-PROD";
pub struct OAuthClient {
client_id: String,
client_secret: String,
token_url: String,
resource_id: String,
access_token: Option<String>,
expires_at: Option<SystemTime>, // Stores the token's expiration time
}
@ -22,6 +24,7 @@ impl OAuthClient {
client_id,
client_secret,
token_url: OAUTH_TOKEN_URL.to_string(),
resource_id: OAUTH_RESOURCE_ID.to_string(),
access_token: None,
expires_at: None, // Initially None until a token is fetched
}
@ -35,24 +38,20 @@ impl OAuthClient {
params.insert("grant_type", "client_credentials");
params.insert("client_id", &self.client_id);
params.insert("client_secret", &self.client_secret);
params.insert("aud", &self.resource_id);
// Make the POST request to retrieve the token
let response = client.post(&self.token_url).form(&params).send()?;
// Check if the request was successful
if response.status().is_success() {
// Parse the JSON response to get the access token and expiration time
let json: Value = response.json()?;
if let Some(token) = json["access_token"].as_str() {
self.access_token = Some(token.to_string());
println!("Access token retrieved: {}", token);
// Set the expiration time
println!("Access token retrieved; token length: {}", token.len());
if let Some(expires_in) = json["expires_in"].as_u64() {
self.expires_at = Some(SystemTime::now() + Duration::from_secs(expires_in));
} else {
self.expires_at = Some(SystemTime::now() + Duration::from_secs(3600));
// Default to 1 hour if `expires_in` is missing
}
} else {
eprintln!("Failed to retrieve access token from response.");
@ -64,7 +63,6 @@ impl OAuthClient {
Ok(())
}
// Method to check if the token has expired
fn is_token_expired(&self) -> bool {
match self.expires_at {
Some(expiration) => SystemTime::now() >= expiration,
@ -72,15 +70,11 @@ impl OAuthClient {
}
}
// Method to get headers with authorization, renewing the token if expired
pub fn get_headers(&mut self) -> Result<HashMap<String, String>, Box<dyn Error>> {
// Check if the token is expired; if it is, fetch a new one
if self.is_token_expired() {
println!("Token has expired. Fetching a new token...");
self.fetch_token()?;
}
// Construct the headers
let mut headers = HashMap::new();
if let Some(token) = &self.access_token {
headers.insert("Authorization".to_string(), format!("Bearer {}", token));

View File

@ -1,52 +1,113 @@
use reqwest::blocking::Client;
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use std::collections::HashMap;
// use std::collections::HashMap;
use std::error::Error;
use crate::oauth_client::OAuthClient;
// use crate::timeseries::TimeSeries;
const API_BASE_URL: &str = "https://api-developer.jpmorgan.com/research/dataquery-authe/api/v2";
const HEARTBEAT_ENDPOINT: &str = "/services/heartbeat";
const TIMESERIES_ENDPOINT: &str = "/expressions/time-series";
pub struct Requester {
oauth_client: OAuthClient,
client: Client,
rqclient: Client,
}
impl Requester {
pub fn new(oauth_client: OAuthClient) -> Self {
Requester {
oauth_client,
client: Client::new(),
rqclient: Client::new(),
}
}
fn _request(
&mut self,
method: reqwest::Method,
endpoint: &str,
) -> Result<reqwest::blocking::Response, Box<dyn Error>> {
let headers_map = self.oauth_client.get_headers()?;
let mut headers = HeaderMap::new();
for (key, value) in headers_map {
let header_name = HeaderName::from_bytes(key.as_bytes())?;
let header_value = HeaderValue::from_str(&value)?;
headers.insert(header_name, header_value);
}
// Construct the full URL
let url = format!("{}{}", API_BASE_URL, endpoint);
let response = self
.rqclient
.request(method, &url)
.headers(headers)
.send()?;
if response.status().is_success() {
Ok(response)
} else {
Err(Box::new(response.error_for_status().unwrap_err()))
}
}
pub fn check_connection(&mut self) -> Result<(), Box<dyn Error>> {
// Get headers with authorization from OAuthClient
let headers_map = self.oauth_client.get_headers()?;
let response = self._request(reqwest::Method::GET, HEARTBEAT_ENDPOINT)?;
// Convert HashMap<String, String> to HeaderMap
let mut headers = HeaderMap::new();
for (key, value) in headers_map {
headers.insert(
HeaderName::from_bytes(key.as_bytes())?,
HeaderValue::from_str(&value)?,
);
println!("Connection is successful: {}", response.status());
Ok(())
}
pub fn get_timeseries_batch(
&mut self,
start_date: &str,
end_date: &str,
calendar: &str,
frequency: &str,
conversion: &str,
nan_treatment: &str,
expressions: Vec<&str>,
) -> Result<reqwest::blocking::Response, Box<dyn Error>> {
// Construct the parameters
let mut params = vec![
("format", "JSON"),
("start-date", start_date),
("end-date", end_date),
("calendar", calendar),
("frequency", frequency),
("conversion", conversion),
("nan_treatment", nan_treatment),
("data", "NO_REFERENCE_DATA"),
];
if (20 < expressions.len()) || (expressions.len() < 1) {
return Err("Number of expressions must be between 1 and 20".into());
}
// Add expressions to the parameters
for expression in expressions {
params.push(("expressions", expression));
}
// Construct the full URL
let url = format!("{}{}", API_BASE_URL, HEARTBEAT_ENDPOINT);
let query_string = serde_urlencoded::to_string(&params)?;
let endpoint = format!("{}?{}", TIMESERIES_ENDPOINT, query_string);
let response = self._request(reqwest::Method::GET, &endpoint)?;
// Make the GET request with the authorization headers
let response = self.client.get(&url).headers(headers).send()?;
// Check the status of the response
if response.status().is_success() {
println!("Connection is successful: {}", response.status());
Ok(())
} else {
eprintln!("Failed to connect: {}", response.status());
Err(Box::new(response.error_for_status().unwrap_err()))
}
Ok(response)
}
pub fn get_timeseries_with_defaults(
&mut self,
expressions: Vec<&str>,
) -> Result<reqwest::blocking::Response, Box<dyn Error>> {
self.get_timeseries_batch(
// start_date,
// end_date,
"2024-10-20",
"TODAY",
// "CAL_WEEKDAYS",
"CAL_ALLDAYS",
"FREQ_DAY",
"CONV_LASTBUS_ABS",
"NA_NOTHING",
expressions,
)
}
}

109
src/timeseries.rs Normal file
View File

@ -0,0 +1,109 @@
use serde::Deserialize;
use polars::prelude::*;
use polars::error::PolarsError;
use polars::export::chrono::{Datelike, NaiveDate};
#[derive(Deserialize, Debug)]
pub struct DQResponse {
instruments: Vec<Instrument>,
}
#[derive(Deserialize, Debug)]
struct Instrument {
attributes: Vec<Attribute>,
}
#[derive(Deserialize, Debug)]
struct Attribute {
expression: String,
#[serde(rename = "time-series")]
time_series: Vec<(String, Option<f64>)>,
}
#[derive(Debug)]
pub struct DQTimeSeries {
expression: String,
time_series: Vec<(String, Option<f64>)>,
}
impl DQTimeSeries {
pub fn to_dataframe(&self) -> Result<DataFrame, PolarsError> {
let dates: Vec<NaiveDate> = self.time_series.iter()
.map(|(date_str, _)| NaiveDate::parse_from_str(date_str, "%Y%m%d").unwrap())
.collect();
let values: Vec<Option<f64>> = self.time_series.iter()
.map(|(_, value)| *value)
.collect();
let date_series = Series::new("date".into(), &dates);
let value_series = Float64Chunked::new("value".into(), &values);
df!(
"real_date" => date_series,
self.expression.clone() => value_series
)
// Ok(df)
}
}
impl DQResponse {
pub fn get_all_timeseries(&self) -> Vec<DQTimeSeries> {
self.instruments.iter().flat_map(|instrument| {
instrument.attributes.iter().map(|attribute| DQTimeSeries {
expression: attribute.expression.clone(),
time_series: attribute.time_series.clone(),
})
}).collect()
}
}
fn main() {
let json_data = r#"
{
"instruments": [
{
"attributes": [
{
"expression": "metric, TICKER, metric1()",
"time-series": [
["2022-01-01", 10.0],
["2022-01-02", null]
]
},
{
"expression": "metric, TICKER2, metric2()",
"time-series": [
["2022-01-01", 20.0],
["2022-01-03", 25.0]
]
}
]
},
{
"attributes": [
{
"expression": "metric, TICKER3, metric3()",
"time-series": [
["2022-02-01", 30.0],
["2022-02-02", 35.0]
]
}
]
}
]
}
"#;
let response : DQResponse = serde_json::from_str(json_data).unwrap();
println!("{:?}", response);
let all_timeseries = response.get_all_timeseries();
for ts in all_timeseries {
println!("{:?}", ts);
match ts.to_dataframe() {
Ok(df) => println!("{:?}", df),
Err(e) => println!("Failed to create DataFrame: {:?}", e),
}
}
}