mirror of
https://github.com/Magnus167/msyrs.git
synced 2025-08-20 13:00:01 +00:00
working
This commit is contained in:
parent
fe1c348dc7
commit
c3a49abdf7
@ -211,130 +211,6 @@ impl DQRequester {
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
fn _fetch_expression_batch(
|
||||
client: Arc<Mutex<Self>>,
|
||||
expr_batch: Vec<String>,
|
||||
okay_responses: Arc<Mutex<Vec<DQTimeSeriesResponse>>>,
|
||||
failed_batches: Arc<Mutex<Vec<Vec<String>>>>,
|
||||
args: DQTimeseriesRequestArgs,
|
||||
) {
|
||||
let response = client.lock().unwrap()._fetch_single_timeseries_batch(args);
|
||||
|
||||
match response {
|
||||
Ok(r) => {
|
||||
// Attempt to parse the response text
|
||||
match serde_json::from_str::<DQTimeSeriesResponse>(&r.text().unwrap()) {
|
||||
Ok(dq_response) => {
|
||||
okay_responses.lock().unwrap().push(dq_response);
|
||||
log::info!("Got batch: {:?}", expr_batch);
|
||||
}
|
||||
Err(e) => {
|
||||
// If parsing fails, treat this as a batch failure
|
||||
failed_batches.lock().unwrap().push(expr_batch.clone());
|
||||
log::error!("Failed to parse timeseries: {:?} : {:?}", expr_batch, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
// Handle _fetch_single_timeseries_batch error
|
||||
failed_batches.lock().unwrap().push(expr_batch.clone());
|
||||
log::error!("Failed to get batch: {:?} : {:?}", expr_batch, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn _fetch_timeseries_recursive(
|
||||
&mut self,
|
||||
args: DQTimeseriesRequestArgs,
|
||||
max_retries: u32,
|
||||
) -> Result<Vec<DQTimeSeriesResponse>, Box<dyn Error>> {
|
||||
// sort to ensure that the order of expressions is consistent
|
||||
let expression_batches: Vec<Vec<String>> = args
|
||||
.expressions
|
||||
.chunks(20)
|
||||
.map(|chunk| {
|
||||
let mut vec = chunk.to_vec();
|
||||
vec.sort();
|
||||
vec
|
||||
})
|
||||
.collect();
|
||||
|
||||
let okay_responses = Arc::new(Mutex::new(Vec::new()));
|
||||
let failed_batches = Arc::new(Mutex::new(Vec::new()));
|
||||
let client = Arc::new(Mutex::new(self.clone()));
|
||||
|
||||
let (sender, receiver) = channel::unbounded();
|
||||
let total_batches = expression_batches.len();
|
||||
let mut curr_batch = 0;
|
||||
|
||||
// Spawn 20 worker threads
|
||||
let mut workers = vec![];
|
||||
for _ in 0..MAX_THREAD_WORKERS {
|
||||
let receiver = receiver.clone();
|
||||
let okay_responses = Arc::clone(&okay_responses);
|
||||
let failed_batches = Arc::clone(&failed_batches);
|
||||
let client = Arc::clone(&client);
|
||||
|
||||
let worker = thread::spawn(move || {
|
||||
while let Ok((args, expr_batch)) = receiver.recv() {
|
||||
DQRequester::_fetch_expression_batch(
|
||||
client.clone(),
|
||||
expr_batch,
|
||||
okay_responses.clone(),
|
||||
failed_batches.clone(),
|
||||
args,
|
||||
);
|
||||
}
|
||||
});
|
||||
workers.push(worker);
|
||||
}
|
||||
|
||||
// Send jobs to workers
|
||||
for expr_batch in expression_batches {
|
||||
curr_batch += 1;
|
||||
let mut args = args.clone();
|
||||
args.update_expressions(expr_batch.clone());
|
||||
|
||||
log::info!("Processed {} out of {} batches", curr_batch, total_batches);
|
||||
thread::sleep(Duration::from_millis(API_DELAY_MILLIS));
|
||||
|
||||
sender.send((args, expr_batch)).unwrap();
|
||||
}
|
||||
|
||||
drop(sender); // Close the channel so workers can finish
|
||||
|
||||
// Wait for all workers to finish
|
||||
for worker in workers {
|
||||
worker.join().unwrap();
|
||||
}
|
||||
|
||||
let mut okay_responses = Arc::try_unwrap(okay_responses)
|
||||
.unwrap()
|
||||
.into_inner()
|
||||
.unwrap();
|
||||
let failed_batches = Arc::try_unwrap(failed_batches)
|
||||
.unwrap()
|
||||
.into_inner()
|
||||
.unwrap();
|
||||
|
||||
if !failed_batches.is_empty() && max_retries == 0 {
|
||||
return Err("Max retries reached".into());
|
||||
}
|
||||
|
||||
if !failed_batches.is_empty() && max_retries > 0 {
|
||||
log::info!("Retrying failed batches");
|
||||
let mut new_args = args.clone();
|
||||
new_args.update_expressions(failed_batches.concat());
|
||||
log::info!("Retrying with {} failed expressions", failed_batches.len());
|
||||
let mut retry_responses =
|
||||
self._fetch_timeseries_recursive(new_args, max_retries - 1)?;
|
||||
okay_responses.append(&mut retry_responses);
|
||||
}
|
||||
|
||||
log::info!("Returning {} responses", okay_responses.len());
|
||||
Ok(okay_responses)
|
||||
}
|
||||
|
||||
pub fn get_timeseries(
|
||||
&mut self,
|
||||
args: DQTimeseriesRequestArgs,
|
||||
@ -344,10 +220,134 @@ impl DQRequester {
|
||||
"Invoking recursive function for {:?} expressions",
|
||||
args.expressions.len()
|
||||
);
|
||||
self._fetch_timeseries_recursive(args, max_retries)
|
||||
_fetch_timeseries_recursive(self, args, max_retries)
|
||||
}
|
||||
}
|
||||
|
||||
fn _fetch_expression_batch(
|
||||
client: Arc<Mutex<DQRequester>>,
|
||||
expr_batch: Vec<String>,
|
||||
okay_responses: Arc<Mutex<Vec<DQTimeSeriesResponse>>>,
|
||||
failed_batches: Arc<Mutex<Vec<Vec<String>>>>,
|
||||
args: DQTimeseriesRequestArgs,
|
||||
) {
|
||||
let response = client.lock().unwrap()._fetch_single_timeseries_batch(args);
|
||||
|
||||
match response {
|
||||
Ok(r) => {
|
||||
// Attempt to parse the response text
|
||||
match serde_json::from_str::<DQTimeSeriesResponse>(&r.text().unwrap()) {
|
||||
Ok(dq_response) => {
|
||||
okay_responses.lock().unwrap().push(dq_response);
|
||||
log::info!("Got batch: {:?}", expr_batch);
|
||||
}
|
||||
Err(e) => {
|
||||
// If parsing fails, treat this as a batch failure
|
||||
failed_batches.lock().unwrap().push(expr_batch.clone());
|
||||
log::error!("Failed to parse timeseries: {:?} : {:?}", expr_batch, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
// Handle _fetch_single_timeseries_batch error
|
||||
failed_batches.lock().unwrap().push(expr_batch.clone());
|
||||
log::error!("Failed to get batch: {:?} : {:?}", expr_batch, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn _fetch_timeseries_recursive(
|
||||
dq_requester: &mut DQRequester,
|
||||
args: DQTimeseriesRequestArgs,
|
||||
max_retries: u32,
|
||||
) -> Result<Vec<DQTimeSeriesResponse>, Box<dyn Error>> {
|
||||
// sort to ensure that the order of expressions is consistent
|
||||
let expression_batches: Vec<Vec<String>> = args
|
||||
.expressions
|
||||
.chunks(20)
|
||||
.map(|chunk| {
|
||||
let mut vec = chunk.to_vec();
|
||||
vec.sort();
|
||||
vec
|
||||
})
|
||||
.collect();
|
||||
|
||||
let okay_responses = Arc::new(Mutex::new(Vec::new()));
|
||||
let failed_batches = Arc::new(Mutex::new(Vec::new()));
|
||||
let client = Arc::new(Mutex::new(dq_requester.clone()));
|
||||
|
||||
let (sender, receiver) = channel::unbounded();
|
||||
let total_batches = expression_batches.len();
|
||||
let mut curr_batch = 0;
|
||||
|
||||
// Spawn 20 worker threads
|
||||
let mut workers = vec![];
|
||||
for _ in 0..MAX_THREAD_WORKERS {
|
||||
let receiver = receiver.clone();
|
||||
let okay_responses = Arc::clone(&okay_responses);
|
||||
let failed_batches = Arc::clone(&failed_batches);
|
||||
let client = Arc::clone(&client);
|
||||
|
||||
let worker = thread::spawn(move || {
|
||||
while let Ok((args, expr_batch)) = receiver.recv() {
|
||||
_fetch_expression_batch(
|
||||
client.clone(),
|
||||
expr_batch,
|
||||
okay_responses.clone(),
|
||||
failed_batches.clone(),
|
||||
args,
|
||||
);
|
||||
}
|
||||
});
|
||||
workers.push(worker);
|
||||
}
|
||||
|
||||
// Send jobs to workers
|
||||
for expr_batch in expression_batches {
|
||||
curr_batch += 1;
|
||||
let mut args = args.clone();
|
||||
args.update_expressions(expr_batch.clone());
|
||||
|
||||
log::info!("Processed {} out of {} batches", curr_batch, total_batches);
|
||||
thread::sleep(Duration::from_millis(API_DELAY_MILLIS));
|
||||
|
||||
sender.send((args, expr_batch)).unwrap();
|
||||
}
|
||||
|
||||
drop(sender); // Close the channel so workers can finish
|
||||
|
||||
// Wait for all workers to finish
|
||||
for worker in workers {
|
||||
worker.join().unwrap();
|
||||
}
|
||||
|
||||
let mut okay_responses = Arc::try_unwrap(okay_responses)
|
||||
.unwrap()
|
||||
.into_inner()
|
||||
.unwrap();
|
||||
let failed_batches = Arc::try_unwrap(failed_batches)
|
||||
.unwrap()
|
||||
.into_inner()
|
||||
.unwrap();
|
||||
|
||||
if !failed_batches.is_empty() && max_retries == 0 {
|
||||
return Err("Max retries reached".into());
|
||||
}
|
||||
|
||||
if !failed_batches.is_empty() && max_retries > 0 {
|
||||
log::info!("Retrying failed batches");
|
||||
let mut new_args = args.clone();
|
||||
new_args.update_expressions(failed_batches.concat());
|
||||
log::info!("Retrying with {} failed expressions", failed_batches.len());
|
||||
let mut retry_responses =
|
||||
_fetch_timeseries_recursive(dq_requester, new_args, max_retries - 1)?;
|
||||
okay_responses.append(&mut retry_responses);
|
||||
}
|
||||
|
||||
log::info!("Returning {} responses", okay_responses.len());
|
||||
Ok(okay_responses)
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn main() {
|
||||
let client_id = std::env::var("DQ_CLIENT_ID").unwrap();
|
||||
|
11
src/main.rs
11
src/main.rs
@ -16,13 +16,13 @@ fn main() {
|
||||
start.elapsed()
|
||||
);
|
||||
|
||||
let num_ticks = 1000;
|
||||
let num_ticks = 50;
|
||||
let sel_tickers: Vec<String> = tickers
|
||||
.iter()
|
||||
.take(num_ticks)
|
||||
.map(|s| s.to_string())
|
||||
.collect();
|
||||
let mut df_deets = Vec::new();
|
||||
// let mut df_deets = Vec::new();
|
||||
|
||||
println!("Retrieving indicators for {} tickers", sel_tickers.len());
|
||||
start = std::time::Instant::now();
|
||||
@ -41,7 +41,12 @@ fn main() {
|
||||
|
||||
start = std::time::Instant::now();
|
||||
for indicator in indicators {
|
||||
df_deets.push((indicator.ticker.clone(), indicator.as_qdf().unwrap().size()));
|
||||
// df_deets.push((indicator.ticker.clone(), indicator.as_qdf().unwrap().size()));
|
||||
println!(
|
||||
"Ticker: {}, DataFrame: {}",
|
||||
indicator.ticker,
|
||||
indicator.as_qdf().unwrap()
|
||||
);
|
||||
}
|
||||
println!(
|
||||
"Converted indicators to DataFrames in {:?}",
|
||||
|
Loading…
x
Reference in New Issue
Block a user