working...

This commit is contained in:
Palash Tyagi 2024-11-09 21:57:28 +00:00
parent 8ae35d2981
commit b7bccf342d
5 changed files with 71 additions and 27 deletions

1
Cargo.lock generated
View File

@ -1233,6 +1233,7 @@ dependencies = [
"log", "log",
"polars", "polars",
"rand", "rand",
"rayon",
"reqwest", "reqwest",
"serde", "serde",
"serde_json", "serde_json",

View File

@ -8,12 +8,9 @@ reqwest = { version = "0.12.9", features = ["blocking", "json"] }
serde_json = "1.0" serde_json = "1.0"
serde_urlencoded = "0.7" serde_urlencoded = "0.7"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
# polars = { version = "0.44.2", features = ["temporal", "lazy"] }
polars = { version = "0.44.2", features = ["lazy"] } polars = { version = "0.44.2", features = ["lazy"] }
# anyhow = "1.0.92"
rand = "0.8" rand = "0.8"
threadpool = "1.8.1" threadpool = "1.8.1"
log = "0.4.22" log = "0.4.22"
# dotenv = "0.15.0"
crossbeam = "0.8" crossbeam = "0.8"
rayon = "1.5"

View File

@ -3,6 +3,7 @@ use crate::download::requester::DQRequester;
use crate::download::requester::DQTimeseriesRequestArgs; use crate::download::requester::DQTimeseriesRequestArgs;
use crate::download::timeseries::DQTimeSeriesResponse; use crate::download::timeseries::DQTimeSeriesResponse;
use crate::download::timeseries::JPMaQSIndicator; use crate::download::timeseries::JPMaQSIndicator;
use rayon::prelude::*;
use std::error::Error; use std::error::Error;
const DEFAULT_JPMAQS_METRICS: [&str; 4] = ["value", "grading", "eop_lag", "mop_lag"]; const DEFAULT_JPMAQS_METRICS: [&str; 4] = ["value", "grading", "eop_lag", "mop_lag"];
@ -111,13 +112,31 @@ impl JPMaQSDownload {
assert!(all_jpmaq_expressions(expressions.clone())); assert!(all_jpmaq_expressions(expressions.clone()));
let dqts_vec = self.get_expressions(expressions)?; let dqts_vec = self.get_expressions(expressions)?;
println!("Retrieved {} time series", dqts_vec.len()); // println!("Retrieved {} time series", -- sum[dqts_vec.iter().map(|dqts| dqts.len())]);
println!(
"Retrieved {} time series",
dqts_vec
.iter()
.map(|dqts| dqts.list_expressions().len())
.sum::<usize>()
);
let start = std::time::Instant::now(); let start = std::time::Instant::now();
let indicators = dqts_vec
.iter() // let indicators = dqts_vec
.flat_map(|dqts| dqts.get_timeseries_by_ticker()) // .iter()
.map(|tsv| JPMaQSIndicator::new(tsv)) // .flat_map(|dqts| dqts.get_timeseries_by_ticker())
.collect::<Result<Vec<JPMaQSIndicator>, Box<dyn Error>>>()?; // .map(|tsv| JPMaQSIndicator::new(tsv))
// .collect::<Result<Vec<JPMaQSIndicator>, Box<dyn Error>>>()?;
let indicators: Vec<_> = dqts_vec
.par_iter()
.flat_map(|dqts| {
dqts.get_timeseries_by_ticker()
.into_par_iter()
.filter_map(|tsv| JPMaQSIndicator::new(tsv).ok())
})
.collect();
println!( println!(
"Converted time series to indicators in {:?}", "Converted time series to indicators in {:?}",
start.elapsed() start.elapsed()

View File

@ -2,19 +2,23 @@ use crate::download::oauth_client::OAuthClient;
use crate::download::timeseries::DQCatalogueResponse; use crate::download::timeseries::DQCatalogueResponse;
use crate::download::timeseries::DQCatalogueSingleResponse; use crate::download::timeseries::DQCatalogueSingleResponse;
use crate::download::timeseries::DQTimeSeriesResponse; use crate::download::timeseries::DQTimeSeriesResponse;
use crossbeam::channel;
use reqwest; use reqwest;
use reqwest::blocking::Client; use reqwest::blocking::Client;
use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
// use std::collections::HashMap;
use std::error::Error; use std::error::Error;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
use crossbeam::channel;
const API_BASE_URL: &str = "https://api-developer.jpmorgan.com/research/dataquery-authe/api/v2"; const API_BASE_URL: &str = "https://api-developer.jpmorgan.com/research/dataquery-authe/api/v2";
const HEARTBEAT_ENDPOINT: &str = "/services/heartbeat"; const HEARTBEAT_ENDPOINT: &str = "/services/heartbeat";
const TIMESERIES_ENDPOINT: &str = "/expressions/time-series"; const TIMESERIES_ENDPOINT: &str = "/expressions/time-series";
const CATALOGUE_ENDPOINT: &str = "/group/instruments"; const CATALOGUE_ENDPOINT: &str = "/group/instruments";
const API_DELAY_MILLIS: u64 = 200;
const MAX_THREAD_WORKERS: usize = 20;
// const JPMAQS_CATALOGUE_GROUP: &str = "JPMAQS"; // const JPMAQS_CATALOGUE_GROUP: &str = "JPMAQS";
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -162,7 +166,7 @@ impl DQRequester {
)); ));
while let Some(endpoint) = next_page { while let Some(endpoint) = next_page {
std::thread::sleep(std::time::Duration::from_millis(200)); std::thread::sleep(std::time::Duration::from_millis(API_DELAY_MILLIS));
let response = self._request(reqwest::Method::GET, &endpoint)?; let response = self._request(reqwest::Method::GET, &endpoint)?;
if !response.status().is_success() { if !response.status().is_success() {
@ -244,41 +248,64 @@ impl DQRequester {
args: DQTimeseriesRequestArgs, args: DQTimeseriesRequestArgs,
max_retries: u32, max_retries: u32,
) -> Result<Vec<DQTimeSeriesResponse>, Box<dyn Error>> { ) -> Result<Vec<DQTimeSeriesResponse>, Box<dyn Error>> {
// sort to ensure that the order of expressions is consistent
let expression_batches: Vec<Vec<String>> = args let expression_batches: Vec<Vec<String>> = args
.expressions .expressions
.chunks(20) .chunks(20)
.map(|chunk| chunk.to_vec()) .map(|chunk| {
let mut vec = chunk.to_vec();
vec.sort();
vec
})
.collect(); .collect();
let okay_responses = Arc::new(Mutex::new(Vec::new())); let okay_responses = Arc::new(Mutex::new(Vec::new()));
let failed_batches = Arc::new(Mutex::new(Vec::new())); let failed_batches = Arc::new(Mutex::new(Vec::new()));
let client = Arc::new(Mutex::new(self.clone())); let client = Arc::new(Mutex::new(self.clone()));
let mut handles = vec![]; let (sender, receiver) = channel::unbounded();
let total_batches = expression_batches.len(); let total_batches = expression_batches.len();
let mut curr_batch = 0; let mut curr_batch = 0;
// Spawn 20 worker threads
let mut workers = vec![];
for _ in 0..MAX_THREAD_WORKERS {
let receiver = receiver.clone();
let okay_responses = Arc::clone(&okay_responses);
let failed_batches = Arc::clone(&failed_batches);
let client = Arc::clone(&client);
let worker = thread::spawn(move || {
while let Ok((args, expr_batch)) = receiver.recv() {
DQRequester::_fetch_expression_batch(
client.clone(),
expr_batch,
okay_responses.clone(),
failed_batches.clone(),
args,
);
}
});
workers.push(worker);
}
// Send jobs to workers
for expr_batch in expression_batches { for expr_batch in expression_batches {
curr_batch += 1; curr_batch += 1;
let mut args = args.clone(); let mut args = args.clone();
args.update_expressions(expr_batch.clone()); args.update_expressions(expr_batch.clone());
let okay_responses = Arc::clone(&okay_responses);
let failed_batches = Arc::clone(&failed_batches);
let client = Arc::clone(&client);
log::info!("Processed {} out of {} batches", curr_batch, total_batches); log::info!("Processed {} out of {} batches", curr_batch, total_batches);
thread::sleep(Duration::from_millis(200)); thread::sleep(Duration::from_millis(API_DELAY_MILLIS));
let handle = thread::spawn(move || { sender.send((args, expr_batch)).unwrap();
DQRequester::_fetch_expression_batch(client, expr_batch, okay_responses, failed_batches, args);
});
handles.push(handle);
} }
for handle in handles { drop(sender); // Close the channel so workers can finish
handle.join().unwrap();
// Wait for all workers to finish
for worker in workers {
worker.join().unwrap();
} }
let mut okay_responses = Arc::try_unwrap(okay_responses) let mut okay_responses = Arc::try_unwrap(okay_responses)

View File

@ -16,7 +16,7 @@ fn main() {
start.elapsed() start.elapsed()
); );
let num_ticks = 250; let num_ticks = 1000;
let sel_tickers: Vec<String> = tickers let sel_tickers: Vec<String> = tickers
.iter() .iter()
.take(num_ticks) .take(num_ticks)