This commit is contained in:
Palash Tyagi 2024-11-08 09:32:45 +00:00
parent 63c1cf64b2
commit 9c56839b9e
7 changed files with 211 additions and 88 deletions

View File

@ -0,0 +1,34 @@
// For format details, see https://aka.ms/devcontainer.json. For config options, see the
// README at: https://github.com/devcontainers/templates/tree/main/src/rust
{
"name": "Rust",
// Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile
"image": "mcr.microsoft.com/devcontainers/rust:1-1-bullseye",
"features": {
"ghcr.io/devcontainers/features/rust:1": {}
}
// Use 'mounts' to make the cargo cache persistent in a Docker Volume.
// "mounts": [
// {
// "source": "devcontainer-cargo-cache-${devcontainerId}",
// "target": "/usr/local/cargo",
// "type": "volume"
// }
// ]
// Features to add to the dev container. More info: https://containers.dev/features.
// "features": {},
// Use 'forwardPorts' to make a list of ports inside the container available locally.
// "forwardPorts": [],
// Use 'postCreateCommand' to run commands after the container is created.
// "postCreateCommand": "rustc --version",
// Configure tool-specific properties.
// "customizations": {},
// Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root.
// "remoteUser": "root"
}

3
.gitignore vendored
View File

@ -1 +1,4 @@
.env
/target

7
Cargo.lock generated
View File

@ -75,12 +75,6 @@ dependencies = [
"libc",
]
[[package]]
name = "anyhow"
version = "1.0.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775"
[[package]]
name = "argminmax"
version = "0.6.2"
@ -1161,7 +1155,6 @@ dependencies = [
name = "macrosynergy_dataquery"
version = "0.0.1"
dependencies = [
"anyhow",
"log",
"polars",
"rand",

View File

@ -8,8 +8,11 @@ reqwest = { version = "0.12.9", features = ["blocking", "json"] }
serde_json = "1.0"
serde_urlencoded = "0.7"
serde = { version = "1.0", features = ["derive"] }
polars = { version = "0.44.2", features = ["temporal", "lazy"] }
anyhow = "1.0.92"
# polars = { version = "0.44.2", features = ["temporal", "lazy"] }
polars = { version = "0.44.2", features = ["lazy"] }
# anyhow = "1.0.92"
rand = "0.8"
threadpool = "1.8.1"
log = "0.4.22"
# dotenv = "0.15.0"

View File

@ -1,25 +1,22 @@
use log;
// use log;
use macrosynergy_dataquery::oauth_client::OAuthClient;
use macrosynergy_dataquery::requester;
use macrosynergy_dataquery::requester::*;
use macrosynergy_dataquery::timeseries::*;
use macrosynergy_dataquery::timeseries::JPMaQSIndicator;
// use macrosynergy_dataquery::timeseries::*;
use rand::seq::SliceRandom;
use std::collections::HashSet;
// use std::collections::HashSet;
// use dotenv::dotenv;
use std::time::Instant;
fn main() {
// dotenv().ok();
let mut requester = requester::DQRequester::new(OAuthClient::default());
requester.check_connection().unwrap();
let responses = requester.get_catalogue("JPMAQS", 1000).unwrap();
let dq_catalogue = requester.get_catalogue("JPMAQS", 1000).unwrap();
let mut tickers_vec = vec![];
for rsp in responses {
let response: DQCatalogueResponse = serde_json::from_str(&rsp.to_string()).unwrap();
for instrument in response.instruments {
tickers_vec.push(instrument.instrument_name);
}
}
let tickers_vec = dq_catalogue.all_instruments;
for ticker in tickers_vec.clone().iter().take(5) {
println!("{}", ticker);
@ -34,45 +31,43 @@ fn main() {
}
// make expressions for 10 ticker
let mut expressions_vec = vec![];
let mut shuffled_tickers = tickers_vec.clone();
shuffled_tickers.shuffle(&mut rand::thread_rng());
for ticker in shuffled_tickers.clone() {
for expr in make_expression(&ticker) {
expressions_vec.push(expr);
}
}
// let rsp = requester.get_timeseries(DQTimeseriesRequestArgs {
// expressions: expressions_vec[0..40].to_vec(),
// ..Default::default()
// });
let expressions_vec: Vec<String> = shuffled_tickers
.iter()
.flat_map(|ticker| make_expression(ticker))
.collect();
// for response in rsp {
// let dq_response: DQTimeSeriesResponse =
// serde_json::from_str(&response.text().unwrap()).unwrap();
// for ts_group in dq_response.get_timeseries_by_ticker() {
// let jpmi = JPMaQSIndicator::new(ts_group).unwrap();
// println!("{:?}", jpmi.as_qdf().unwrap());
// }
// }
// fetch first 50 expressions
let start = Instant::now();
let responses = requester
let dqts_vec = requester
.get_timeseries(DQTimeseriesRequestArgs {
expressions: expressions_vec[0..50].to_vec(),
expressions: expressions_vec[0..1000].to_vec(),
..Default::default()
})
.unwrap();
for response in responses {
let dq_response: DQTimeSeriesResponse =
serde_json::from_str(&response.text().unwrap()).unwrap();
for ts_group in dq_response.get_timeseries_by_ticker() {
let jpmi = JPMaQSIndicator::new(ts_group).unwrap();
println!("{:?}", jpmi.as_qdf().unwrap());
println!("Time elapsed in download step: {:?}", start.elapsed());
// let mut df_deets: Vec<usize> = Vec::new();
let mut df_deets: Vec<(String, usize)> = Vec::new();
let start = Instant::now();
for dqts in dqts_vec.iter() {
for tsv in dqts.get_timeseries_by_ticker() {
// df_deets.push(tsv.len());
let jpmaqs_indicator = JPMaQSIndicator::new(tsv).unwrap();
df_deets.push((jpmaqs_indicator.ticker, jpmaqs_indicator.df.size()));
// for ts in tsv {
// let df = ts.to_dataframe().unwrap();
// // println!("{}: {}", ts.get_metric().unwrap(), df.size());
// df_deets.push((ts.get_metric().unwrap(), df.size()));
// // if len df_deets mod 10 print
// if df_deets.len() % 10 == 0 {
// println!("{}: {:?}", df_deets.len(), start.elapsed());
// }
// }
}
}
println!("Time elapsed in processing step: {:?}", start.elapsed());
}

View File

@ -1,13 +1,20 @@
use crate::oauth_client::OAuthClient;
use crate::timeseries::DQCatalogueResponse;
use crate::timeseries::DQCatalogueSingleResponse;
use crate::timeseries::DQTimeSeriesResponse;
use reqwest;
use reqwest::blocking::Client;
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use std::error::Error;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
const API_BASE_URL: &str = "https://api-developer.jpmorgan.com/research/dataquery-authe/api/v2";
const HEARTBEAT_ENDPOINT: &str = "/services/heartbeat";
const TIMESERIES_ENDPOINT: &str = "/expressions/time-series";
const CATALOGUE_ENDPOINT: &str = "/group/instruments";
const JPMAQS_CATALOGUE_GROUP: &str = "JPMAQS";
// const JPMAQS_CATALOGUE_GROUP: &str = "JPMAQS";
#[derive(Debug, Clone)]
pub struct DQRequester {
@ -131,8 +138,9 @@ impl DQRequester {
&mut self,
catalogue_group: &str,
page_size: u32,
) -> Result<Vec<serde_json::Value>, Box<dyn Error>> {
let mut responses: Vec<serde_json::Value> = Vec::new();
// ) -> Result<Vec<DQCatalogueSingleResponse>, Box<dyn Error>> {
) -> Result<DQCatalogueResponse, Box<dyn Error>> {
let mut responses: Vec<DQCatalogueSingleResponse> = Vec::new();
if page_size < 1 || page_size > 1000 {
return Err("Page size must be between 1 and 1000".into());
@ -152,7 +160,9 @@ impl DQRequester {
}
let json: serde_json::Value = response.json()?;
responses.push(json.clone());
let dq_catalogue_response: DQCatalogueSingleResponse =
serde_json::from_str(&json.to_string())?;
responses.push(dq_catalogue_response);
let links = json.get("links");
let links_array = links.and_then(|links| links.as_array());
@ -164,7 +174,7 @@ impl DQRequester {
next_page = next_link;
}
Ok(responses)
Ok(DQCatalogueResponse::new(responses))
}
pub fn _fetch_single_timeseries_batch(
@ -216,34 +226,78 @@ impl DQRequester {
&mut self,
args: DQTimeseriesRequestArgs,
max_retries: u32,
) -> Result<Vec<reqwest::blocking::Response>, Box<dyn Error>> {
// ) -> Result<Vec<reqwest::blocking::Response>, Box<dyn Error>> {
) -> Result<Vec<DQTimeSeriesResponse>, Box<dyn Error>> {
let expression_batches: Vec<Vec<String>> = args
.expressions
.chunks(20)
.map(|chunk| chunk.to_vec())
.collect();
let mut okay_responses: Vec<reqwest::blocking::Response> = Vec::new();
let mut failed_batches: Vec<Vec<String>> = Vec::new();
let okay_responses = Arc::new(Mutex::new(Vec::new()));
let failed_batches = Arc::new(Mutex::new(Vec::new()));
let client = Arc::new(Mutex::new(self.clone()));
let mut handles = vec![];
let total_batches = expression_batches.len();
let mut curr_batch = 0;
for expr_batch in expression_batches {
curr_batch += 1;
let mut args = args.clone();
args.update_expressions(expr_batch.clone());
let response = self._fetch_single_timeseries_batch(args);
std::thread::sleep(std::time::Duration::from_millis(200));
match response {
Ok(r) => {
okay_responses.push(r);
let okay_responses = Arc::clone(&okay_responses);
let failed_batches = Arc::clone(&failed_batches);
let client = Arc::clone(&client);
// if curr_batch mod 100 == 0 print progress
log::info!("Processed {} out of {} batches", curr_batch, total_batches);
thread::sleep(Duration::from_millis(200));
let handle = thread::spawn(move || {
let response = client.lock().unwrap()._fetch_single_timeseries_batch(args);
log::info!("Got batch: {:?}", expr_batch);
match response {
Ok(r) => {
// Attempt to parse the response text
match serde_json::from_str::<DQTimeSeriesResponse>(&r.text().unwrap()) {
Ok(dq_response) => {
okay_responses.lock().unwrap().push(dq_response);
log::info!("Got batch: {:?}", expr_batch);
}
Err(e) => {
// If parsing fails, treat this as a batch failure
failed_batches.lock().unwrap().push(expr_batch.clone());
log::error!(
"Failed to parse timeseries: {:?} : {:?}",
expr_batch,
e
);
}
}
}
Err(e) => {
// Handle _fetch_single_timeseries_batch error
failed_batches.lock().unwrap().push(expr_batch.clone());
log::error!("Failed to get batch: {:?} : {:?}", expr_batch, e);
}
}
Err(e) => {
failed_batches.push(expr_batch.clone());
log::error!("Failed to get batch: {:?} : {:?}", expr_batch, e);
}
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let mut okay_responses = Arc::try_unwrap(okay_responses)
.unwrap()
.into_inner()
.unwrap();
let failed_batches = Arc::try_unwrap(failed_batches)
.unwrap()
.into_inner()
.unwrap();
if !failed_batches.is_empty() && max_retries == 0 {
return Err("Max retries reached".into());
}
@ -257,6 +311,7 @@ impl DQRequester {
self._fetch_timeseries_recursive(new_args, max_retries - 1)?;
okay_responses.append(&mut retry_responses);
}
log::info!("Returning {} responses", okay_responses.len());
Ok(okay_responses)
}
@ -264,7 +319,7 @@ impl DQRequester {
pub fn get_timeseries(
&mut self,
args: DQTimeseriesRequestArgs,
) -> Result<Vec<reqwest::blocking::Response>, Box<dyn Error>> {
) -> Result<Vec<DQTimeSeriesResponse>, Box<dyn Error>> {
let max_retries = 5;
println!(
"Invoking recursive function for {:?} expressions",
@ -285,12 +340,11 @@ fn main() {
let mut requester = DQRequester::new(oauth_client);
requester.check_connection().unwrap();
let response = requester
.get_catalogue(JPMAQS_CATALOGUE_GROUP, 1000)
.unwrap();
// let response = requester
// .get_catalogue(JPMAQS_CATALOGUE_GROUP, 1000)
// .unwrap();
let json_data = response[0].to_string();
println!("{}", json_data);
// let json_data = response
// try to pull into

View File

@ -16,6 +16,33 @@ pub struct DQTimeSeriesResponse {
#[derive(Deserialize, Debug)]
pub struct DQCatalogueResponse {
pub items: u32,
pub catalogue_responses: Vec<DQCatalogueSingleResponse>,
pub all_instruments: Vec<String>,
}
impl DQCatalogueResponse {
pub fn new(catalogue_responses: Vec<DQCatalogueSingleResponse>) -> Self {
let all_instruments: Vec<String> = catalogue_responses
.iter()
.flat_map(|response| {
response
.instruments
.iter()
.map(|instrument| instrument.instrument_name.clone())
})
.collect();
DQCatalogueResponse {
items: all_instruments.len() as u32,
catalogue_responses: catalogue_responses,
all_instruments: all_instruments,
}
}
}
#[derive(Deserialize, Debug)]
pub struct DQCatalogueSingleResponse {
pub links: Vec<HashMap<String, Option<String>>>,
pub items: u32,
pub instruments: Vec<DQCatalogueInstrument>,
@ -108,6 +135,7 @@ impl DQTimeSeries {
}
impl DQTimeSeriesResponse {
/// Return a list of all expressions in the response
pub fn list_expressions(&self) -> Vec<String> {
self.instruments
.iter()
@ -135,21 +163,34 @@ impl DQTimeSeriesResponse {
/// Return a list of lists of DQTimeSeries, where each list contains all the timeseries for each ticker
pub fn get_timeseries_by_ticker(&self) -> Vec<Vec<DQTimeSeries>> {
// create a hashmap where the key is the ticker and the value is a list of timeseries
let mut timeseries_by_ticker: HashMap<String, Vec<DQTimeSeries>> = HashMap::new();
for instrument in &self.instruments {
for attribute in &instrument.attributes {
let ticker = attribute.expression.split(',').nth(1).unwrap().to_string();
let timeseries = DQTimeSeries {
// let mut timeseries_by_ticker: HashMap<String, Vec<DQTimeSeries>> = HashMap::new();
// for instrument in &self.instruments {
// for attribute in &instrument.attributes {
// let ticker = attribute.expression.split(',').nth(1).unwrap().to_string();
// let timeseries = DQTimeSeries {
// expression: attribute.expression.clone(),
// time_series: attribute.time_series.clone(),
// };
// timeseries_by_ticker
// .entry(ticker)
// .or_insert_with(Vec::new)
// .push(timeseries);
// }
// }
let timeseries_by_ticker = self
.instruments
.iter()
.flat_map(|instrument| {
instrument.attributes.iter().map(|attribute| DQTimeSeries {
expression: attribute.expression.clone(),
time_series: attribute.time_series.clone(),
};
timeseries_by_ticker
.entry(ticker)
.or_insert_with(Vec::new)
.push(timeseries);
}
}
})
})
.fold(HashMap::new(), |mut acc, ts| {
let ticker = ts.get_ticker().unwrap();
acc.entry(ticker).or_insert_with(Vec::new).push(ts);
acc
});
timeseries_by_ticker.into_iter().map(|(_, v)| v).collect()
}