This commit is contained in:
Palash Tyagi 2024-11-12 22:27:41 +00:00
parent 6bb51905b8
commit c9971dc2e3
5 changed files with 53 additions and 119 deletions

View File

@ -30,7 +30,7 @@ impl OAuthClient {
} }
} }
pub fn fetch_token(&mut self) -> Result<(), ReqwestError> { fn fetch_token(&mut self) -> Result<(), ReqwestError> {
let client = Client::new(); let client = Client::new();
// Set up the form parameters for the request // Set up the form parameters for the request

View File

@ -12,20 +12,26 @@ use tokio::task;
const API_BASE_URL: &str = "https://api-developer.jpmorgan.com/research/dataquery-authe/api/v2"; const API_BASE_URL: &str = "https://api-developer.jpmorgan.com/research/dataquery-authe/api/v2";
const TIMESERIES_ENDPOINT: &str = "/expressions/time-series"; const TIMESERIES_ENDPOINT: &str = "/expressions/time-series";
const DQ_THREAD_EXPIRY_SECONDS: u64 = 300;
const DQ_THREAD_SLEEP_MILLIS: u64 = 200;
const DQ_MAX_THREADS: usize = 250;
#[derive(Clone)] #[derive(Clone)]
pub struct ParallelRequester { pub struct ParallelRequester {
oauth_client: Arc<Mutex<OAuthClient>>, oauth_client: Arc<Mutex<OAuthClient>>,
max_threads: usize, max_threads: usize,
} }
/// Internal implementation of a struct for making parallel requests to the DataQuery API.
impl ParallelRequester { impl ParallelRequester {
pub fn new(oauth_client: OAuthClient) -> Self { pub fn new(oauth_client: OAuthClient) -> Self {
ParallelRequester { ParallelRequester {
oauth_client: Arc::new(Mutex::new(oauth_client)), oauth_client: Arc::new(Mutex::new(oauth_client)),
max_threads: 250, max_threads: DQ_MAX_THREADS,
} }
} }
/// Builds the headers for the request to the DataQuery API.
fn build_headers(&self) -> Result<HeaderMap, Box<dyn Error>> { fn build_headers(&self) -> Result<HeaderMap, Box<dyn Error>> {
let headers_map = self.oauth_client.lock().unwrap().get_headers()?; let headers_map = self.oauth_client.lock().unwrap().get_headers()?;
let mut headers = HeaderMap::new(); let mut headers = HeaderMap::new();
@ -37,6 +43,7 @@ impl ParallelRequester {
Ok(headers) Ok(headers)
} }
/// Makes an asynchronous request to the DataQuery API.
async fn _request_async(&self, endpoint: &str) -> Result<String, Box<dyn Error>> { async fn _request_async(&self, endpoint: &str) -> Result<String, Box<dyn Error>> {
let headers = self.build_headers()?; let headers = self.build_headers()?;
let url = format!("{}{}", API_BASE_URL, endpoint); let url = format!("{}{}", API_BASE_URL, endpoint);
@ -55,7 +62,7 @@ impl ParallelRequester {
} }
} }
pub fn request_expressions( fn _request_expressions(
&mut self, &mut self,
args: DQTimeseriesRequestArgs, args: DQTimeseriesRequestArgs,
max_retries: u32, max_retries: u32,
@ -89,20 +96,25 @@ impl ParallelRequester {
args_clone.update_expressions(batch.clone()); args_clone.update_expressions(batch.clone());
let ep = format!("{}?{}", TIMESERIES_ENDPOINT, args_clone.as_query_string()); let ep = format!("{}?{}", TIMESERIES_ENDPOINT, args_clone.as_query_string());
let permit = semaphore.clone().acquire_owned().await.unwrap(); let permit = semaphore.clone().acquire_owned().await.unwrap();
// println!("Requesting batch {} of {}", curr_batch + 1, total_batches);
if curr_batch % 10 == 0 { if curr_batch % 10 == 0 {
println!("Requesting batch {} of {}", curr_batch, total_batches); println!("Requesting batch {} of {}", curr_batch, total_batches);
} }
let task = task::spawn(async move { let task = task::spawn(async move {
let _permit = permit; // Keep the permit alive until the end of the task let _permit = permit; // Keep the permit alive until the end of the task
let res_str = self_arc._request_async(&ep).await; let res_future = self_arc._request_async(&ep);
match res_str { let res = tokio::time::timeout(
Ok(text) => { Duration::from_secs(DQ_THREAD_EXPIRY_SECONDS),
res_future,
)
.await;
match res {
Ok(Ok(text)) => {
okay_response_texts.lock().unwrap().push(text); okay_response_texts.lock().unwrap().push(text);
println!("Batch {} of {} successful", curr_batch, total_batches);
} }
Err(_) => { _ => {
failed_batches.lock().unwrap().push(batch); failed_batches.lock().unwrap().push(batch);
} }
} }
@ -111,11 +123,16 @@ impl ParallelRequester {
tasks.push(task); tasks.push(task);
// Delay before starting the next task // Delay before starting the next task
tokio::time::sleep(Duration::from_millis(250)).await; tokio::time::sleep(Duration::from_millis(DQ_THREAD_SLEEP_MILLIS)).await;
curr_batch += 1; curr_batch += 1;
} }
future::join_all(tasks).await; // Await all tasks, ensuring forced termination for any remaining tasks
let _ = tokio::time::timeout(
Duration::from_secs(DQ_THREAD_EXPIRY_SECONDS),
future::join_all(tasks),
)
.await;
}); });
// Retry failed batches if any, respecting the max_retries limit // Retry failed batches if any, respecting the max_retries limit
@ -126,7 +143,7 @@ impl ParallelRequester {
failed_batches.into_iter().flatten().collect(); failed_batches.into_iter().flatten().collect();
new_args.update_expressions(flattened_failed_batches); new_args.update_expressions(flattened_failed_batches);
let retried_responses = self.request_expressions(new_args, max_retries - 1)?; let retried_responses = self._request_expressions(new_args, max_retries - 1)?;
okay_response_texts okay_response_texts
.lock() .lock()
.unwrap() .unwrap()
@ -139,4 +156,14 @@ impl ParallelRequester {
let final_responses = okay_response_texts.lock().unwrap().clone(); let final_responses = okay_response_texts.lock().unwrap().clone();
Ok(final_responses) Ok(final_responses)
} }
/// Makes parallel requests to the DataQuery API for the given expressions.
/// The returned responses are collected and returned as a vector of strings (response texts).
pub fn request_expressions(
&mut self,
args: DQTimeseriesRequestArgs,
max_retries: u32,
) -> Result<Vec<String>, Box<dyn Error>> {
self._request_expressions(args, max_retries)
}
} }

View File

@ -43,6 +43,7 @@ impl DQRequester {
} }
} }
/// Internal implementation of a function to make a request to the DataQuery API.
fn _request( fn _request(
&mut self, &mut self,
method: reqwest::Method, method: reqwest::Method,
@ -79,11 +80,12 @@ impl DQRequester {
Ok(()) Ok(())
} }
/// Fetches the catalogue of tickers from the DataQuery API.
/// Returned DQCatalogueResponse contains a list of DQCatalogueSingleResponse objects.
pub fn get_catalogue( pub fn get_catalogue(
&mut self, &mut self,
catalogue_group: &str, catalogue_group: &str,
page_size: u32, page_size: u32,
// ) -> Result<Vec<DQCatalogueSingleResponse>, Box<dyn Error>> {
) -> Result<DQCatalogueResponse, Box<dyn Error>> { ) -> Result<DQCatalogueResponse, Box<dyn Error>> {
let mut responses: Vec<DQCatalogueSingleResponse> = Vec::new(); let mut responses: Vec<DQCatalogueSingleResponse> = Vec::new();
@ -260,40 +262,3 @@ fn parse_response_texts_to_jpmaqs_indicators(
jpmaqs_indicators_map.into_iter().map(|(_, v)| v).collect() jpmaqs_indicators_map.into_iter().map(|(_, v)| v).collect()
} }
#[allow(dead_code)]
fn main() {
let client_id = std::env::var("DQ_CLIENT_ID").unwrap();
let client_secret = std::env::var("DQ_CLIENT_SECRET").unwrap();
let mut oauth_client = OAuthClient::new(client_id, client_secret);
oauth_client.fetch_token().unwrap();
let mut requester = DQRequester::new(oauth_client);
requester.check_connection().unwrap();
// let response = requester
// .get_catalogue(JPMAQS_CATALOGUE_GROUP, 1000)
// .unwrap();
// let json_data = response
// try to pull into
// let expressions_a = vec![
// "DB(JPMAQS,USD_EQXR_NSA,value)",
// "DB(JPMAQS,USD_EQXR_NSA,grading)",
// "DB(JPMAQS,USD_EQXR_NSA,eop_lag)",
// "DB(JPMAQS,USD_EQXR_NSA,mop_lag)",
// "DB(JPMAQS,GBP_EQXR_NSA,value)",
// "DB(JPMAQS,GBP_EQXR_NSA,grading)",
// "DB(JPMAQS,GBP_EQXR_NSA,eop_lag)",
// "DB(JPMAQS,GBP_EQXR_NSA,mop_lag)",
// ];
// let response = requester
// .get_timeseries_with_defaults(expressions_a.iter().map(|s| *s).collect())
// .unwrap();
// let json_data = response.text().unwrap();
// println!("{}", json_data);
}

View File

@ -474,54 +474,3 @@ fn save_qdf_to_csv(qdf: &mut DataFrame, filename: &str) -> Result<(), Box<dyn Er
Ok(()) Ok(())
} }
#[allow(dead_code)]
fn main() {
let json_data = r#"
{
"instruments": [
{
"attributes": [
{
"expression": "metric, TICKER, metric1()",
"time-series": [
["2022-01-01", 10.0],
["2022-01-02", null]
]
},
{
"expression": "metric, TICKER2, metric2()",
"time-series": [
["2022-01-01", 20.0],
["2022-01-03", 25.0]
]
}
]
},
{
"attributes": [
{
"expression": "metric, TICKER3, metric3()",
"time-series": [
["2022-02-01", 30.0],
["2022-02-02", 35.0]
]
}
]
}
]
}
"#;
let response: DQTimeSeriesResponse = serde_json::from_str(json_data).unwrap();
println!("{:?}", response);
let all_timeseries = response.get_all_timeseries();
for ts in all_timeseries {
println!("{:?}", ts);
match ts.to_dataframe() {
Ok(df) => println!("{:?}", df),
Err(e) => println!("Failed to create DataFrame: {:?}", e),
}
}
}

View File

@ -1,8 +1,5 @@
use msyrs::download::jpmaqsdownload::{JPMaQSDownload, JPMaQSDownloadGetIndicatorArgs}; use msyrs::download::jpmaqsdownload::{JPMaQSDownload, JPMaQSDownloadGetIndicatorArgs};
use polars::error::PolarsError;
use polars::export::chrono::NaiveDate;
use polars::prelude::*; use polars::prelude::*;
use polars::series::Series;
fn main() { fn main() {
println!("Authentication to DataQuery API"); println!("Authentication to DataQuery API");
@ -20,7 +17,7 @@ fn main() {
start.elapsed() start.elapsed()
); );
let num_ticks = 1000; let num_ticks = 5000;
let sel_tickers: Vec<String> = tickers let sel_tickers: Vec<String> = tickers
.iter() .iter()
.take(num_ticks) .take(num_ticks)
@ -43,6 +40,11 @@ fn main() {
start.elapsed() start.elapsed()
); );
// sleep for 10 seconds
println!("Sleeping for 10 seconds...");
std::thread::sleep(std::time::Duration::from_secs(10));
println!("concatting to mega DataFrame");
start = std::time::Instant::now(); start = std::time::Instant::now();
// let mut qdf_list = Vec::new(); // let mut qdf_list = Vec::new();
let mega_df = indicators let mega_df = indicators
@ -51,21 +53,12 @@ fn main() {
.fold(DataFrame::new(vec![]).unwrap(), |acc, df| { .fold(DataFrame::new(vec![]).unwrap(), |acc, df| {
acc.vstack(&df).unwrap() acc.vstack(&df).unwrap()
}); });
// for indicator in indicators {
// // df_deets.push((indicator.ticker.clone(), indicator.as_qdf().unwrap().size()));
// // println!(
// // "Ticker: {}, DataFrame",
// // indicator.ticker,
// // // indicator.as_qdf().unwrap()
// // );
// qdf_list.push(indicator.as_qdf().unwrap());
// }
// // vstack the DataFrames
// let qdf = DataFrame::new(qdf_list).unwrap();
// print mega_df size
println!("Mega DataFrame size: {:?}", mega_df.size());
//
let es = mega_df.estimated_size();
let es_mb = es as f64 / 1_048_576.0;
println!("Estimated size of DataFrame: {:.2} MB", es_mb);
println!("Sleeping for 10 seconds...");
println!( println!(
"Converted indicators to DataFrames in {:?}", "Converted indicators to DataFrames in {:?}",
start.elapsed() start.elapsed()