From c3a49abdf7b0da56c666e19655b5cc0c958aff5d Mon Sep 17 00:00:00 2001 From: Palash Tyagi <23239946+Magnus167@users.noreply.github.com> Date: Sun, 10 Nov 2024 00:00:04 +0000 Subject: [PATCH] working --- src/download/requester.rs | 250 +++++++++++++++++++------------------- src/main.rs | 11 +- 2 files changed, 133 insertions(+), 128 deletions(-) diff --git a/src/download/requester.rs b/src/download/requester.rs index a41a8bd..d1dc2d2 100644 --- a/src/download/requester.rs +++ b/src/download/requester.rs @@ -211,130 +211,6 @@ impl DQRequester { Ok(response) } - fn _fetch_expression_batch( - client: Arc>, - expr_batch: Vec, - okay_responses: Arc>>, - failed_batches: Arc>>>, - args: DQTimeseriesRequestArgs, - ) { - let response = client.lock().unwrap()._fetch_single_timeseries_batch(args); - - match response { - Ok(r) => { - // Attempt to parse the response text - match serde_json::from_str::(&r.text().unwrap()) { - Ok(dq_response) => { - okay_responses.lock().unwrap().push(dq_response); - log::info!("Got batch: {:?}", expr_batch); - } - Err(e) => { - // If parsing fails, treat this as a batch failure - failed_batches.lock().unwrap().push(expr_batch.clone()); - log::error!("Failed to parse timeseries: {:?} : {:?}", expr_batch, e); - } - } - } - Err(e) => { - // Handle _fetch_single_timeseries_batch error - failed_batches.lock().unwrap().push(expr_batch.clone()); - log::error!("Failed to get batch: {:?} : {:?}", expr_batch, e); - } - } - } - - fn _fetch_timeseries_recursive( - &mut self, - args: DQTimeseriesRequestArgs, - max_retries: u32, - ) -> Result, Box> { - // sort to ensure that the order of expressions is consistent - let expression_batches: Vec> = args - .expressions - .chunks(20) - .map(|chunk| { - let mut vec = chunk.to_vec(); - vec.sort(); - vec - }) - .collect(); - - let okay_responses = Arc::new(Mutex::new(Vec::new())); - let failed_batches = Arc::new(Mutex::new(Vec::new())); - let client = Arc::new(Mutex::new(self.clone())); - - let (sender, receiver) = channel::unbounded(); - let total_batches = expression_batches.len(); - let mut curr_batch = 0; - - // Spawn 20 worker threads - let mut workers = vec![]; - for _ in 0..MAX_THREAD_WORKERS { - let receiver = receiver.clone(); - let okay_responses = Arc::clone(&okay_responses); - let failed_batches = Arc::clone(&failed_batches); - let client = Arc::clone(&client); - - let worker = thread::spawn(move || { - while let Ok((args, expr_batch)) = receiver.recv() { - DQRequester::_fetch_expression_batch( - client.clone(), - expr_batch, - okay_responses.clone(), - failed_batches.clone(), - args, - ); - } - }); - workers.push(worker); - } - - // Send jobs to workers - for expr_batch in expression_batches { - curr_batch += 1; - let mut args = args.clone(); - args.update_expressions(expr_batch.clone()); - - log::info!("Processed {} out of {} batches", curr_batch, total_batches); - thread::sleep(Duration::from_millis(API_DELAY_MILLIS)); - - sender.send((args, expr_batch)).unwrap(); - } - - drop(sender); // Close the channel so workers can finish - - // Wait for all workers to finish - for worker in workers { - worker.join().unwrap(); - } - - let mut okay_responses = Arc::try_unwrap(okay_responses) - .unwrap() - .into_inner() - .unwrap(); - let failed_batches = Arc::try_unwrap(failed_batches) - .unwrap() - .into_inner() - .unwrap(); - - if !failed_batches.is_empty() && max_retries == 0 { - return Err("Max retries reached".into()); - } - - if !failed_batches.is_empty() && max_retries > 0 { - log::info!("Retrying failed batches"); - let mut new_args = args.clone(); - new_args.update_expressions(failed_batches.concat()); - log::info!("Retrying with {} failed expressions", failed_batches.len()); - let mut retry_responses = - self._fetch_timeseries_recursive(new_args, max_retries - 1)?; - okay_responses.append(&mut retry_responses); - } - - log::info!("Returning {} responses", okay_responses.len()); - Ok(okay_responses) - } - pub fn get_timeseries( &mut self, args: DQTimeseriesRequestArgs, @@ -344,10 +220,134 @@ impl DQRequester { "Invoking recursive function for {:?} expressions", args.expressions.len() ); - self._fetch_timeseries_recursive(args, max_retries) + _fetch_timeseries_recursive(self, args, max_retries) } } +fn _fetch_expression_batch( + client: Arc>, + expr_batch: Vec, + okay_responses: Arc>>, + failed_batches: Arc>>>, + args: DQTimeseriesRequestArgs, +) { + let response = client.lock().unwrap()._fetch_single_timeseries_batch(args); + + match response { + Ok(r) => { + // Attempt to parse the response text + match serde_json::from_str::(&r.text().unwrap()) { + Ok(dq_response) => { + okay_responses.lock().unwrap().push(dq_response); + log::info!("Got batch: {:?}", expr_batch); + } + Err(e) => { + // If parsing fails, treat this as a batch failure + failed_batches.lock().unwrap().push(expr_batch.clone()); + log::error!("Failed to parse timeseries: {:?} : {:?}", expr_batch, e); + } + } + } + Err(e) => { + // Handle _fetch_single_timeseries_batch error + failed_batches.lock().unwrap().push(expr_batch.clone()); + log::error!("Failed to get batch: {:?} : {:?}", expr_batch, e); + } + } +} + +fn _fetch_timeseries_recursive( + dq_requester: &mut DQRequester, + args: DQTimeseriesRequestArgs, + max_retries: u32, +) -> Result, Box> { + // sort to ensure that the order of expressions is consistent + let expression_batches: Vec> = args + .expressions + .chunks(20) + .map(|chunk| { + let mut vec = chunk.to_vec(); + vec.sort(); + vec + }) + .collect(); + + let okay_responses = Arc::new(Mutex::new(Vec::new())); + let failed_batches = Arc::new(Mutex::new(Vec::new())); + let client = Arc::new(Mutex::new(dq_requester.clone())); + + let (sender, receiver) = channel::unbounded(); + let total_batches = expression_batches.len(); + let mut curr_batch = 0; + + // Spawn 20 worker threads + let mut workers = vec![]; + for _ in 0..MAX_THREAD_WORKERS { + let receiver = receiver.clone(); + let okay_responses = Arc::clone(&okay_responses); + let failed_batches = Arc::clone(&failed_batches); + let client = Arc::clone(&client); + + let worker = thread::spawn(move || { + while let Ok((args, expr_batch)) = receiver.recv() { + _fetch_expression_batch( + client.clone(), + expr_batch, + okay_responses.clone(), + failed_batches.clone(), + args, + ); + } + }); + workers.push(worker); + } + + // Send jobs to workers + for expr_batch in expression_batches { + curr_batch += 1; + let mut args = args.clone(); + args.update_expressions(expr_batch.clone()); + + log::info!("Processed {} out of {} batches", curr_batch, total_batches); + thread::sleep(Duration::from_millis(API_DELAY_MILLIS)); + + sender.send((args, expr_batch)).unwrap(); + } + + drop(sender); // Close the channel so workers can finish + + // Wait for all workers to finish + for worker in workers { + worker.join().unwrap(); + } + + let mut okay_responses = Arc::try_unwrap(okay_responses) + .unwrap() + .into_inner() + .unwrap(); + let failed_batches = Arc::try_unwrap(failed_batches) + .unwrap() + .into_inner() + .unwrap(); + + if !failed_batches.is_empty() && max_retries == 0 { + return Err("Max retries reached".into()); + } + + if !failed_batches.is_empty() && max_retries > 0 { + log::info!("Retrying failed batches"); + let mut new_args = args.clone(); + new_args.update_expressions(failed_batches.concat()); + log::info!("Retrying with {} failed expressions", failed_batches.len()); + let mut retry_responses = + _fetch_timeseries_recursive(dq_requester, new_args, max_retries - 1)?; + okay_responses.append(&mut retry_responses); + } + + log::info!("Returning {} responses", okay_responses.len()); + Ok(okay_responses) +} + #[allow(dead_code)] fn main() { let client_id = std::env::var("DQ_CLIENT_ID").unwrap(); diff --git a/src/main.rs b/src/main.rs index f86a4cd..eb3f278 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,13 +16,13 @@ fn main() { start.elapsed() ); - let num_ticks = 1000; + let num_ticks = 50; let sel_tickers: Vec = tickers .iter() .take(num_ticks) .map(|s| s.to_string()) .collect(); - let mut df_deets = Vec::new(); + // let mut df_deets = Vec::new(); println!("Retrieving indicators for {} tickers", sel_tickers.len()); start = std::time::Instant::now(); @@ -41,7 +41,12 @@ fn main() { start = std::time::Instant::now(); for indicator in indicators { - df_deets.push((indicator.ticker.clone(), indicator.as_qdf().unwrap().size())); + // df_deets.push((indicator.ticker.clone(), indicator.as_qdf().unwrap().size())); + println!( + "Ticker: {}, DataFrame: {}", + indicator.ticker, + indicator.as_qdf().unwrap() + ); } println!( "Converted indicators to DataFrames in {:?}",