use crate::download::oauth_client::OAuthClient; use crate::download::parreq::ParallelRequester; use crate::download::timeseries::DQCatalogueResponse; use crate::download::timeseries::DQCatalogueSingleResponse; use crate::download::timeseries::DQTimeSeriesResponse; use crate::download::timeseries::DQTimeseriesRequestArgs; use crate::download::timeseries::JPMaQSIndicator; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use reqwest; use reqwest::blocking::Client; use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use std::error::Error; const API_BASE_URL: &str = "https://api-developer.jpmorgan.com/research/dataquery-authe/api/v2"; const HEARTBEAT_ENDPOINT: &str = "/services/heartbeat"; const TIMESERIES_ENDPOINT: &str = "/expressions/time-series"; const CATALOGUE_ENDPOINT: &str = "/group/instruments"; const API_DELAY_MILLIS: u64 = 200; // const MAX_THREAD_WORKERS: usize = 100; // const JPMAQS_CATALOGUE_GROUP: &str = "JPMAQS"; #[derive(Debug, Clone)] pub struct DQRequester { oauth_client: OAuthClient, rqclient: Client, } impl Default for DQRequester { fn default() -> Self { DQRequester { oauth_client: OAuthClient::default(), rqclient: Client::new(), } } } impl DQRequester { pub fn new(oauth_client: OAuthClient) -> Self { DQRequester { oauth_client, rqclient: Client::new(), } } fn _request( &mut self, method: reqwest::Method, endpoint: &str, ) -> Result> { let headers_map = self.oauth_client.get_headers()?; let mut headers = HeaderMap::new(); for (key, value) in headers_map { let header_name = HeaderName::from_bytes(key.as_bytes())?; let header_value = HeaderValue::from_str(&value)?; headers.insert(header_name, header_value); } // Construct the full URL let url = format!("{}{}", API_BASE_URL, endpoint); let response = self .rqclient .request(method, &url) .headers(headers) .send()?; if response.status().is_success() { Ok(response) } else { Err(Box::new(response.error_for_status().unwrap_err())) } } pub fn check_connection(&mut self) -> Result<(), Box> { let response = self._request(reqwest::Method::GET, HEARTBEAT_ENDPOINT)?; println!("Connection is successful: {}", response.status()); Ok(()) } pub fn get_catalogue( &mut self, catalogue_group: &str, page_size: u32, // ) -> Result, Box> { ) -> Result> { let mut responses: Vec = Vec::new(); if page_size < 1 || page_size > 1000 { return Err("Page size must be between 1 and 1000".into()); } let mut next_page = Some(format!( "{}?group-id={}&limit={}", CATALOGUE_ENDPOINT, catalogue_group, page_size )); while let Some(endpoint) = next_page { std::thread::sleep(std::time::Duration::from_millis(API_DELAY_MILLIS)); let response = self._request(reqwest::Method::GET, &endpoint)?; if !response.status().is_success() { return Err(Box::new(response.error_for_status().unwrap_err())); } let json: serde_json::Value = response.json()?; let dq_catalogue_response: DQCatalogueSingleResponse = serde_json::from_str(&json.to_string())?; responses.push(dq_catalogue_response); let links = json.get("links"); let links_array = links.and_then(|links| links.as_array()); let next_link = links_array.and_then(|links| { links .iter() .find_map(|link| link.get("next")?.as_str().map(|s| s.to_string())) }); next_page = next_link; } Ok(DQCatalogueResponse::new(responses)) } pub fn _fetch_single_timeseries_batch( &mut self, args: DQTimeseriesRequestArgs, ) -> Result> { log::info!( "Fetching timeseries batch with {} expressions", args.expressions.len() ); if args.expressions.len() < 1 || args.expressions.len() > 20 { return Err("Number of expressions must be between 1 and 20".into()); } let query_string = args.as_query_string(); let endpoint = format!("{}?{}", TIMESERIES_ENDPOINT, query_string); let response = self._request(reqwest::Method::GET, &endpoint)?; log::info!("Got response: {:?}", response.status()); Ok(response) } pub fn get_timeseries( &mut self, args: DQTimeseriesRequestArgs, ) -> Result, Box> { let max_retries = 5; println!( "Invoking ParallelRequester for {:?} expressions", args.expressions.len() ); let mut pq = ParallelRequester::new(self.oauth_client.clone()); let response_texts = match pq.request_expressions(args, max_retries) { Ok(r) => r, Err(e) => return Err(e), }; let dqts_vec: Vec = parse_response_texts(response_texts); Ok(dqts_vec) } pub fn get_timeseries_as_jpmaqs_indicators( &mut self, args: DQTimeseriesRequestArgs, ) -> Result, Box> { let max_retries = 5; println!( "Invoking ParallelRequester for {:?} expressions", args.expressions.len() ); let mut pq = ParallelRequester::new(self.oauth_client.clone()); let start = std::time::Instant::now(); let mut response_texts = match pq.request_expressions(args, max_retries) { Ok(r) => r, Err(e) => return Err(e), }; println!( "Time elapsed for pq.request_expressions: {:?}", start.elapsed() ); // sleep for 10 seconds println!("Pausing for 10 seconds"); std::thread::sleep(std::time::Duration::from_secs(10)); println!("Resuming - parsing response texts to JPMaQSIndicators"); let jpmaqs_indicators: Vec = parse_response_texts_to_jpmaqs_indicators(&mut response_texts); // Sleep for 10 seconds println!("Pausing for 10 seconds"); std::thread::sleep(std::time::Duration::from_secs(10)); println!("Resuming"); Ok(jpmaqs_indicators) } } fn parse_response_texts(response_texts: Vec) -> Vec { response_texts .into_par_iter() .filter_map(|rtext| { // Attempt to deserialize and immediately release rtext if successful match serde_json::from_str::(&rtext) { Ok(dqts) => Some(dqts), Err(err) => { eprintln!("Failed to deserialize response: {}", err); None } } }) .collect() } fn parse_response_texts_to_jpmaqs_indicators( response_texts: &mut Vec, ) -> Vec { // Create an empty hashmap of jpmaqs indicators let mut jpmaqs_indicators_map: std::collections::HashMap = std::collections::HashMap::new(); // Iterate over the response texts by taking and consuming each element while let Some(response_text) = response_texts.pop() { // Attempt to deserialize the response text let json_res = serde_json::from_str::(&response_text); // Free the memory occupied by response_text immediately after parsing drop(response_text); let dqts_res = match json_res { Ok(dqts) => dqts, Err(err) => { eprintln!("Failed to deserialize response: {}", err); continue; } }; let grouped_by_ticker = dqts_res.consume_to_grouped_by_ticker(); for ts_vec in grouped_by_ticker { let curr_ticker = ts_vec[0].get_ticker().unwrap(); if let Some(existing_jpmaqs_indicator) = jpmaqs_indicators_map.get_mut(&curr_ticker) { for tsv in ts_vec { match existing_jpmaqs_indicator.add_timeseries(tsv) { Ok(_) => {} Err(e) => { eprintln!("Failed to add timeseries: {}", e); } } } } else { let jpmaqs_indicator = JPMaQSIndicator::new(ts_vec); jpmaqs_indicators_map.insert(curr_ticker.into(), jpmaqs_indicator.unwrap()); } } } println!("Number of responses left: {}", response_texts.len()); jpmaqs_indicators_map.into_iter().map(|(_, v)| v).collect() } #[allow(dead_code)] fn main() { let client_id = std::env::var("DQ_CLIENT_ID").unwrap(); let client_secret = std::env::var("DQ_CLIENT_SECRET").unwrap(); let mut oauth_client = OAuthClient::new(client_id, client_secret); oauth_client.fetch_token().unwrap(); let mut requester = DQRequester::new(oauth_client); requester.check_connection().unwrap(); // let response = requester // .get_catalogue(JPMAQS_CATALOGUE_GROUP, 1000) // .unwrap(); // let json_data = response // try to pull into // let expressions_a = vec![ // "DB(JPMAQS,USD_EQXR_NSA,value)", // "DB(JPMAQS,USD_EQXR_NSA,grading)", // "DB(JPMAQS,USD_EQXR_NSA,eop_lag)", // "DB(JPMAQS,USD_EQXR_NSA,mop_lag)", // "DB(JPMAQS,GBP_EQXR_NSA,value)", // "DB(JPMAQS,GBP_EQXR_NSA,grading)", // "DB(JPMAQS,GBP_EQXR_NSA,eop_lag)", // "DB(JPMAQS,GBP_EQXR_NSA,mop_lag)", // ]; // let response = requester // .get_timeseries_with_defaults(expressions_a.iter().map(|s| *s).collect()) // .unwrap(); // let json_data = response.text().unwrap(); // println!("{}", json_data); }