working - ish

This commit is contained in:
Palash Tyagi 2024-11-07 17:10:44 +00:00
parent 4d14de8017
commit 9689c8129e
5 changed files with 70 additions and 62 deletions

20
Cargo.lock generated
View File

@ -1248,6 +1248,7 @@ dependencies = [
"serde",
"serde_json",
"serde_urlencoded",
"threadpool",
]
[[package]]
@ -1378,6 +1379,16 @@ dependencies = [
"libm",
]
[[package]]
name = "num_cpus"
version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "object"
version = "0.36.5"
@ -2995,6 +3006,15 @@ dependencies = [
"syn 2.0.86",
]
[[package]]
name = "threadpool"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa"
dependencies = [
"num_cpus",
]
[[package]]
name = "tinyvec"
version = "1.8.0"

View File

@ -11,3 +11,4 @@ serde = { version = "1.0", features = ["derive"] }
polars = { version = "0.44.2", features = ["temporal", "lazy"] }
anyhow = "1.0"
rand = "0.8"
threadpool = "1.8.1"

View File

@ -2,60 +2,14 @@ use macrosynergy_dataquery::oauth_client::OAuthClient;
use macrosynergy_dataquery::requester;
use macrosynergy_dataquery::requester::*;
use macrosynergy_dataquery::timeseries::*;
use rand::seq::SliceRandom; // Import the trait for `shuffle`
use std::env;
use rand::seq::SliceRandom;
fn main() {
// let client_id = env::var("DQ_CLIENT_ID").unwrap();
// let client_secret = env::var("DQ_CLIENT_SECRET").unwrap();
// let mut oauth_client = OAuthClient::new(client_id, client_secret);
// oauth_client.fetch_token().unwrap();
// let mut requester = DQRequester::new(oauth_client);
let mut requester = requester::DQRequester::new(OAuthClient::default());
requester.check_connection().unwrap();
// let expressions = vec![
// "DB(JPMAQS,USD_EQXR_NSA,value)",
// "DB(JPMAQS,USD_EQXR_NSA,grading)",
// "DB(JPMAQS,USD_EQXR_NSA,eop_lag)",
// "DB(JPMAQS,USD_EQXR_NSA,mop_lag)",
// "DB(JPMAQS,GBP_EQXR_NSA,value)",
// "DB(JPMAQS,GBP_EQXR_NSA,grading)",
// "DB(JPMAQS,GBP_EQXR_NSA,eop_lag)",
// "DB(JPMAQS,GBP_EQXR_NSA,mop_lag)",
// ];
// expressions_b.len();
// let response = requester
// .get_timeseries_with_defaults(expressions_a)
// .unwrap();
// let json_data = response.text().unwrap();
// let dqresponse: DQResponse = serde_json::from_str(&json_data).unwrap();
// for ts_group in dqresponse.get_timeseries_by_ticker() {
// let jpmi = JPMaQSIndicator::new(ts_group).unwrap();
// println!("{:?}", jpmi.as_qdf().unwrap());
// }
// get catalog
let responses = requester.get_catalogue("JPMAQS", 1000).unwrap();
// let json_data = response.text().unwrap();
// let json_data = response[0].to_string();
// println!("{}", json_data);
// // try to pull into DQResponse
// let response: DQCatalogueResponse = serde_json::from_str(&json_data).unwrap();
// println!("{:?}", response);
// // print all instrument names
// for instrument in response.instruments {
// println!("{}", instrument.instrument_name);
// }
let mut tickers_vec = vec![];
for rsp in responses {
@ -91,14 +45,17 @@ fn main() {
expressions: expressions_vec[0..20].to_vec(),
..Default::default()
};
// on the first 20 expressions
let response = requester.get_timeseries_batch(rqargs).unwrap();
let rsp = requester.get_timeseries(DQTimeseriesRequestArgs {
expressions: expressions_vec[0..100].to_vec(),
..Default::default()
});
for response in rsp {
let dq_response: DQTimeSeriesResponse =
serde_json::from_str(&response.text().unwrap()).unwrap();
for ts_group in dq_response.get_timeseries_by_ticker() {
let jpmi = JPMaQSIndicator::new(ts_group).unwrap();
println!("{:?}", jpmi.as_qdf().unwrap());
}
}
}

View File

@ -8,7 +8,7 @@ use std::time::{Duration, SystemTime};
pub const OAUTH_TOKEN_URL: &str = "https://authe.jpmchase.com/as/token.oauth2";
pub const OAUTH_RESOURCE_ID: &str = "JPMC:URI:RS-06785-DataQueryExternalApi-PROD";
#[derive(Debug, Clone)]
pub struct OAuthClient {
client_id: String,
client_secret: String,

View File

@ -1,20 +1,21 @@
use crate::oauth_client::OAuthClient;
use reqwest::blocking::Client;
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use std::error::Error;
use crate::oauth_client::OAuthClient;
const API_BASE_URL: &str = "https://api-developer.jpmorgan.com/research/dataquery-authe/api/v2";
const HEARTBEAT_ENDPOINT: &str = "/services/heartbeat";
const TIMESERIES_ENDPOINT: &str = "/expressions/time-series";
const CATALOGUE_ENDPOINT: &str = "/group/instruments";
const JPMAQS_CATALOGUE_GROUP: &str = "JPMAQS";
#[derive(Debug, Clone)]
pub struct DQRequester {
oauth_client: OAuthClient,
rqclient: Client,
}
#[derive(Debug, Clone)]
pub struct DQTimeseriesRequestArgs {
pub start_date: String,
pub end_date: String,
@ -166,7 +167,7 @@ impl DQRequester {
Ok(responses)
}
pub fn get_timeseries_batch(
pub fn get_single_timeseries_batch(
&mut self,
args: DQTimeseriesRequestArgs,
) -> Result<reqwest::blocking::Response, Box<dyn Error>> {
@ -181,17 +182,46 @@ impl DQRequester {
Ok(response)
}
pub fn get_timeseries_with_defaults(
pub fn get_single_timeseries_with_defaults(
&mut self,
expressions: Vec<&str>,
) -> Result<reqwest::blocking::Response, Box<dyn Error>> {
// replace just the expressions
let args = DQTimeseriesRequestArgs {
expressions: expressions.into_iter().map(|s| s.to_string()).collect(),
..Default::default()
};
self.get_timeseries_batch(args)
self.get_single_timeseries_batch(args)
}
pub fn get_timeseries(
&mut self,
args: DQTimeseriesRequestArgs,
) -> Vec<reqwest::blocking::Response> {
let expressions_chunks: Vec<Vec<String>> = args
.expressions
.chunks(20)
.map(|chunk| chunk.to_vec())
.collect();
let mut responses: Vec<Result<reqwest::blocking::Response, Box<dyn Error>>> = Vec::new();
for expressions in expressions_chunks {
let mut args = args.clone();
args.update_expressions(expressions);
println!(
"Requesting timeseries for expressions: {:?}",
args.expressions
);
let response = self.get_single_timeseries_batch(args);
responses.push(response);
// just sleep for a bit not threadsleep
println!("Sleeping for 500ms");
std::thread::sleep(std::time::Duration::from_millis(500));
}
responses.into_iter().map(|r| r.unwrap()).collect()
}
}