use crate::utils::bdates::{get_bdates_list_with_freq, BDateFreq}; use crate::utils::dateutils::get_min_max_real_dates; use crate::utils::misc::get_cid; use crate::utils::qdf::core::check_quantamental_dataframe; use chrono::NaiveDate; use polars::prelude::*; use std::collections::{BTreeMap, HashMap}; use std::error::Error; use crate::utils::qdf::get_unique_metrics; // struct Blacklist which is a wrapper around hashmap and btreemap #[derive(Debug, Clone)] pub struct Blacklist { pub blacklist: BTreeMap, } // impl hashmap into impl Blacklist { pub fn into_hashmap(self) -> HashMap { self.blacklist.into_iter().collect() } } /// Apply a blacklist to a Quantamental DataFrame with Lazy API. /// /// * `blacklist` is a map from any “ticker‑like” key to a tuple of /// `(start_date, end_date)` in **inclusive** `"YYYY‑MM‑DD"` format. /// * `metrics` – if `None`, every metric from `get_unique_metrics(df)` /// is used. /// * `group_by_cid = Some(false)` is not implemented yet (parity with /// the eager version). pub fn apply_blacklist_lazy( df: &mut DataFrame, blacklist: &BTreeMap, metrics: Option>, group_by_cid: Option, ) -> Result> { check_quantamental_dataframe(df)?; Ok(df.clone()) } /// Create a blacklist from a Quantamental DataFrame. /// The blacklist is a mapping of tickers to date ranges where the specified metrics are null or NaN. /// # Arguments: /// * `df` - The Quantamental DataFrame. /// * `group_by_cid` - If true, group the blacklist by `cid`. Defaults to true. /// * `blacklist_name` - The name of the blacklist. Defaults to "BLACKLIST". /// * `metrics` - The metrics to check for null or NaN values. If None, all metrics are used. pub fn create_blacklist_from_qdf( df: &DataFrame, group_by_cid: Option, blacklist_name: Option, metrics: Option>, ) -> Result, Box> { check_quantamental_dataframe(df)?; let metrics = metrics.unwrap_or_else(|| get_unique_metrics(df).unwrap()); let blacklist_name = blacklist_name.unwrap_or_else(|| "BLACKLIST".into()); let group_by_cid = group_by_cid.unwrap_or(true); let (min_date, max_date) = get_min_max_real_dates(df, "real_date".into())?; let min_date_str = min_date.format("%Y-%m-%d").to_string(); let max_date_str = max_date.format("%Y-%m-%d").to_string(); // let all_bdates = get_bdates_series_default_opt(min_date_str, max_date_str, None)?; let all_bdates = get_bdates_list_with_freq( min_date_str.clone().as_str(), max_date_str.clone().as_str(), BDateFreq::Daily, )?; // if none of the metrics are null or NaN, return an empty blacklist if !metrics.iter().any(|metric| { df.column(metric) .map(|col| col.is_null().any()) .unwrap_or(false) }) { return Ok(BTreeMap::new()); } // let null_mask = get_nan_mask(df, metrics)?; // let df = df.filter(&null_mask)?.clone(); let df = df .clone() .lazy() .with_columns([ (cols(metrics.clone()).is_null().or(cols(metrics).is_nan())).alias("null_mask") ]) .filter(col("null_mask")) // if is now empty, return an empty blacklist .sort( ["cid", "xcat"], SortMultipleOptions::default().with_maintain_order(true), ) .group_by([col("cid"), col("xcat")]) // .agg([col("real_date").sort(SortOptions::default())]) .agg([col("real_date") .dt() .strftime("%Y-%m-%d") .sort(SortOptions::default())]) .select([ concat_str([col("cid"), col("xcat")], "_", true).alias("ticker"), col("real_date").alias("real_dates"), ]) .collect()?; // assert!(0 == 1, "{:?}", df); let ticker_vec = df .column("ticker")? .str()? .into_iter() .filter_map(|opt| opt.map(|s| s.to_string())) .collect::>(); let rdt = get_vec_of_vec_of_dates_from_df(df)?; let mut blk: HashMap> = HashMap::new(); for (tkr, dates) in ticker_vec.iter().zip(rdt.iter()) { if group_by_cid { let _cid = get_cid(tkr.clone())?; if blk.contains_key(&_cid) { blk.get_mut(&_cid).unwrap().extend(dates.iter().cloned()); } else { blk.insert(_cid, dates.clone()); } } else { blk.insert(tkr.to_string(), dates.clone()); } } for (_key, vals) in blk.iter_mut() { // order is important - dedup depends on the vec being sorted vals.sort(); vals.dedup(); } let all_bdates_strs = all_bdates .iter() .map(|date| date.format("%Y-%m-%d").to_string()) .collect::>(); let mut blacklist: HashMap = HashMap::new(); for (tkr, dates) in blk.iter() { let date_ranges = convert_dates_list_to_date_ranges(dates.clone(), all_bdates_strs.clone()); for (rng_idx, (start_date, end_date)) in date_ranges.iter() { let range_key = format!("{}_{}_{}", tkr, blacklist_name.clone(), rng_idx); blacklist.insert(range_key, (start_date.clone(), end_date.clone())); } } // Ok(blacklist) let mut btree_map: BTreeMap = BTreeMap::new(); for (key, (start_date, end_date)) in blacklist.iter() { btree_map.insert(key.clone(), (start_date.clone(), end_date.clone())); } Ok(btree_map) } /// Get a mask of NaN values for the specified metrics in the DataFrame. #[allow(dead_code)] fn get_nan_mask( df: &DataFrame, metrics: Vec, ) -> Result, Box> { let null_masks: Vec> = metrics .iter() .map(|metric| { let null_mask = df.column(metric.as_str())?.is_null(); let nan_mask = df.column(metric.as_str())?.is_nan()?; Ok(null_mask | nan_mask) }) .collect::>>()?; let null_mask = null_masks .into_iter() .reduce(|acc, mask| acc | mask) .unwrap_or_else(|| BooleanChunked::full_null("null_mask".into(), df.height())); Ok(null_mask) } fn convert_dates_list_to_date_ranges( blacklist: Vec, all_bdates_strs: Vec, ) -> HashMap { // Step 1: Map every date in all_bdates_strs to its index let mut all_map: HashMap = HashMap::new(); for (i, d) in all_bdates_strs.iter().enumerate() { all_map.insert(d.clone(), i); } // Step 2: Convert each blacklisted date into its index, if it exists let mut blacklisted_indices: Vec = Vec::new(); for dt in blacklist { if let Some(&idx) = all_map.get(&dt) { blacklisted_indices.push(idx); } } // Step 3: Sort the blacklisted indices blacklisted_indices.sort_unstable(); // Step 4: Traverse and group consecutive indices into ranges let mut result: HashMap = HashMap::new(); let mut string_result: HashMap = HashMap::new(); if blacklisted_indices.is_empty() { return string_result; } let mut range_idx: i64 = 0; let mut start_idx = blacklisted_indices[0]; let mut end_idx = start_idx; for &cur_idx in blacklisted_indices.iter().skip(1) { if cur_idx == end_idx + 1 { // We are still in a contiguous run end_idx = cur_idx; } else { // We hit a break in contiguity, so store the last range result.insert( range_idx, ( all_bdates_strs[start_idx].clone(), all_bdates_strs[end_idx].clone(), ), ); range_idx += 1; // Start a new range start_idx = cur_idx; end_idx = cur_idx; } } // Don't forget to store the final range after the loop result.insert( range_idx, ( all_bdates_strs[start_idx].clone(), all_bdates_strs[end_idx].clone(), ), ); let max_digits = result.keys().max().unwrap_or(&-1).to_string().len(); for (key, (start_date, end_date)) in result.iter() { let new_key = format!("{:0width$}", key, width = max_digits); string_result.insert(new_key, (start_date.clone(), end_date.clone())); } string_result } fn get_vec_of_vec_of_dates_from_df(df: DataFrame) -> Result>, Box> { let rdt = df .column("real_dates")? // .clone() .as_series() .unwrap() .list()? .into_iter() .filter_map(|opt| opt) .collect::>() .iter() .map(|s| { s.str() .unwrap() .into_iter() .filter_map(|opt| opt.map(|s| s.to_string())) .collect::>() }) .collect::>>(); Ok(rdt) } #[allow(dead_code)] fn get_vec_of_vec_of_naivedates_from_df( df: DataFrame, ) -> Result>, Box> { let rdt = df .column("real_dates")? // .clone() .as_series() .unwrap() .list()? .into_iter() .filter_map(|opt| opt) .collect::>() .iter() .map(|s| { s.date() .unwrap() .into_iter() .filter_map(|opt| opt.and_then(|date| NaiveDate::from_num_days_from_ce_opt(date))) .collect::>() }) .collect::>>(); Ok(rdt) } // fn get_vec_of_vec_of_dates_from_df(df: DataFrame) -> Result>, Box> { // let real_dates_column = df.column("real_dates")?.clone(); // let series = real_dates_column.as_series().unwrap().clone(); // let rdt = series.list()?.clone(); // let rdt = rdt // .into_iter() // .filter_map(|opt| opt) // .collect::>(); // let rdt = rdt // .iter() // .map(|s| { // s.str() // .unwrap() // .into_iter() // .filter_map(|opt| opt.map(|s| s.to_string())) // .collect::>() // }) // .collect::>>(); // Ok(rdt) // } #[cfg(test)] mod tests { use super::*; #[test] fn test_convert_dates_list_to_date_ranges() { let all_dates = vec![ "2023-01-01".to_string(), "2023-01-02".to_string(), "2023-01-03".to_string(), "2023-01-04".to_string(), "2023-01-05".to_string(), "2023-01-06".to_string(), ]; let blacklist = vec![ "2023-01-02".to_string(), "2023-01-03".to_string(), "2023-01-05".to_string(), ]; let result = convert_dates_list_to_date_ranges(blacklist, all_dates); // Expect two ranges: // range 0 => ("2023-01-02", "2023-01-03") // range 1 => ("2023-01-05", "2023-01-05") assert_eq!( result["0"], ("2023-01-02".to_string(), "2023-01-03".to_string()) ); assert_eq!( result["1"], ("2023-01-05".to_string(), "2023-01-05".to_string()) ); } }