From bcd563eb664fb5c856611132e804353ac47a1ffb Mon Sep 17 00:00:00 2001 From: Oleksandr Shchur Date: Tue, 25 Nov 2025 15:30:32 +0100 Subject: [PATCH] Optimize `convert_df_input_to_list_of_dicts_input` and `validate_df_inputs` (#395) *Issue #, if available:* Addresses #391 *Description of changes:* - Speed up `convert_df_input_to_list_of_dicts_input` and `validate_df_inputs` via a few tricks: - Replace `df.iloc[start_idx:end_idx][col]` with `df[col].iloc[start_idx:end_idx]` to avoid copying data on each slice - Vectorize computation of future timestamps using numpy - Work with `dict[str, np.ndarray]` instead of `pd.DataFrame` when working with covariates to avoid repeated `.to_numpy()` calls. **Before** ``` Benchmarking 20000 series, 200 steps, 0 covariates... Average runtime: 27.33s Benchmarking 20000 series, 200 steps, 5 covariates... Average runtime: 44.69s ``` **After** ``` Benchmarking 20000 series, 200 steps, 0 covariates... Average runtime: 4.60s Benchmarking 20000 series, 200 steps, 5 covariates... Average runtime: 8.92s ```
```python import time import numpy as np import pandas as pd from chronos.df_utils import convert_df_input_to_list_of_dicts_input def benchmark_convert_df_input( num_items: int, num_steps: int, num_covariates: int = 0, num_trials: int = 10, freq: str = "D" ) -> None: """ Benchmark convert_df_input_to_list_of_dicts_input function. Args: num_items: Number of time series num_steps: Number of observations per series num_covariates: Number of covariates to include num_trials: Number of benchmark trials freq: Frequency string for timestamps """ prediction_length = 24 # Generate context DataFrame item_ids = np.repeat(np.arange(num_items), num_steps) timestamps = np.tile(pd.date_range("2020-01-01", periods=num_steps, freq=freq), num_items) df_data = {"item_id": item_ids, "timestamp": timestamps, "target": np.random.randn(num_items * num_steps)} df_data.update({f"cov_{i}": np.random.randn(num_items * num_steps) for i in range(num_covariates)}) df = pd.DataFrame(df_data) # Generate future_df with covariates future_df = None if num_covariates > 0: future_item_ids = np.repeat(np.arange(num_items), prediction_length) offset = pd.tseries.frequencies.to_offset(freq) future_start = pd.Timestamp("2020-01-01") + num_steps * offset future_timestamps = np.tile(pd.date_range(start=future_start, periods=prediction_length, freq=freq), num_items) future_data = {"item_id": future_item_ids, "timestamp": future_timestamps} future_data.update({f"cov_{i}": np.random.randn(num_items * prediction_length) for i in range(num_covariates)}) future_df = pd.DataFrame(future_data) times = [] print(f"Benchmarking {num_items} series, {num_steps} steps, {num_covariates} covariates...") for _ in range(num_trials): start = time.perf_counter() convert_df_input_to_list_of_dicts_input( df=df, future_df=future_df, id_column="item_id", timestamp_column="timestamp", target_columns=["target"], prediction_length=prediction_length, ) end = time.perf_counter() times.append(end - start) print(f"Average runtime: {sum(times) / len(times):.2f}s") if __name__ == "__main__": # Test without covariates benchmark_convert_df_input(20_000, 200, num_covariates=0, num_trials=1) # Test with covariates benchmark_convert_df_input(20_000, 200, num_covariates=5, num_trials=1) ```
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice. --- src/chronos/df_utils.py | 90 +++++++++++++++++++++-------------------- 1 file changed, 46 insertions(+), 44 deletions(-) diff --git a/src/chronos/df_utils.py b/src/chronos/df_utils.py index e17584f..e63ec4f 100644 --- a/src/chronos/df_utils.py +++ b/src/chronos/df_utils.py @@ -4,6 +4,7 @@ # Authors: Abdul Fatir Ansari +import warnings from typing import TYPE_CHECKING import numpy as np @@ -62,7 +63,7 @@ def validate_df_inputs( prediction_length: int, id_column: str = "item_id", timestamp_column: str = "timestamp", -) -> tuple["pd.DataFrame", "pd.DataFrame | None", "pd.Timedelta", list[int], list[int] | None, np.ndarray]: +) -> tuple["pd.DataFrame", "pd.DataFrame | None", str, list[int], np.ndarray]: """ Validates and prepares dataframe inputs @@ -95,7 +96,6 @@ def validate_df_inputs( - Validated and sorted future dataframe (if provided) - Inferred frequency of the time series - List of series lengths from input dataframe - - List of series lengths from future dataframe (if provided) - Original order of time series IDs Raises @@ -147,7 +147,7 @@ def validate_df_inputs( # Get series lengths series_lengths = df[id_column].value_counts(sort=False).to_list() - def validate_freq(timestamps: pd.Series, series_id: str): + def validate_freq(timestamps: pd.DatetimeIndex, series_id: str): freq = pd.infer_freq(timestamps) if not freq: raise ValueError(f"Could not infer frequency for series {series_id}") @@ -156,16 +156,15 @@ def validate_df_inputs( # Validate each series all_freqs = [] start_idx = 0 + timestamp_index = pd.DatetimeIndex(df[timestamp_column]) for length in series_lengths: if length < 3: - series_id = df.iloc[start_idx][id_column] + series_id = df[id_column].iloc[start_idx] raise ValueError( f"Every time series must have at least 3 data points, found {length=} for series {series_id}" ) - - series_data = df.iloc[start_idx : start_idx + length] - timestamps = series_data[timestamp_column] - series_id = series_data.iloc[0][id_column] + timestamps = timestamp_index[start_idx : start_idx + length] + series_id = df[id_column].iloc[start_idx] all_freqs.append(validate_freq(timestamps, series_id)) start_idx += length @@ -190,10 +189,10 @@ def validate_df_inputs( # Validate future series lengths match prediction_length future_start_idx = 0 + future_timestamps_index = pd.DatetimeIndex(future_df[timestamp_column]) for future_length in future_series_lengths: - future_series_data = future_df.iloc[future_start_idx : future_start_idx + future_length] - future_timestamps = future_series_data[timestamp_column] - future_series_id = future_series_data.iloc[0][id_column] + future_timestamps = future_timestamps_index[future_start_idx : future_start_idx + future_length] + future_series_id = future_df[id_column].iloc[future_start_idx] if future_length != prediction_length: raise ValueError( f"Future covariates all time series must have length {prediction_length}, got {future_length} for series {future_series_id}" @@ -206,7 +205,7 @@ def validate_df_inputs( assert len(series_lengths) == len(future_series_lengths) - return df, future_df, inferred_freq, series_lengths, future_series_lengths, original_order + return df, future_df, inferred_freq, series_lengths, original_order def convert_df_input_to_list_of_dicts_input( @@ -252,7 +251,7 @@ def convert_df_input_to_list_of_dicts_input( import pandas as pd - df, future_df, freq, series_lengths, future_series_lengths, original_order = validate_df_inputs( + df, future_df, freq, series_lengths, original_order = validate_df_inputs( df, future_df=future_df, id_column=id_column, @@ -264,50 +263,53 @@ def convert_df_input_to_list_of_dicts_input( # Convert to list of dicts format inputs: list[dict[str, np.ndarray | dict[str, np.ndarray]]] = [] prediction_timestamps: dict[str, pd.DatetimeIndex] = {} - start_idx: int = 0 - future_start_idx: int = 0 - for i, length in enumerate(series_lengths): - series_data = df.iloc[start_idx : start_idx + length] - # Extract target(s) - target_data = series_data[target_columns].to_numpy().T # Shape: (n_targets, history_length) - task: dict[str, np.ndarray | dict[str, np.ndarray]] = {"target": target_data} + indptr = np.concatenate([[0], np.cumsum(series_lengths)]).astype("int64") + target_array = df[target_columns].to_numpy().T # Shape: (n_targets, len(df)) + last_ts = pd.DatetimeIndex(df[timestamp_column].iloc[indptr[1:] - 1]) # Shape: (n_series,) + offset = pd.tseries.frequencies.to_offset(freq) + with warnings.catch_warnings(): + # Silence PerformanceWarning for non-vectorized offsets https://github.com/pandas-dev/pandas/blob/95624ca2e99b0/pandas/core/arrays/datetimes.py#L822 + warnings.simplefilter("ignore", category=pd.errors.PerformanceWarning) + # Generate all prediction timestamps at once by stacking offsets into shape (n_series * prediction_length) + prediction_timestamps_array = pd.DatetimeIndex( + np.dstack([last_ts + step * offset for step in range(1, prediction_length + 1)]).ravel() + ) - # Generate future timestamps - series_id = series_data.iloc[0][id_column] - last_timestamp = series_data[timestamp_column].iloc[-1] - future_ts = pd.date_range(start=last_timestamp, periods=prediction_length + 1, freq=freq)[1:] - prediction_timestamps[series_id] = future_ts + past_covariates_dict = { + col: df[col].to_numpy() for col in df.columns if col not in [id_column, timestamp_column] + target_columns + } + if future_df is not None: + future_covariates_dict = { + col: future_df[col].to_numpy() for col in future_df.columns if col not in [id_column, timestamp_column] + } + + for i in range(len(series_lengths)): + start_idx, end_idx = indptr[i], indptr[i + 1] + future_start_idx, future_end_idx = i * prediction_length, (i + 1) * prediction_length + + series_id = df[id_column].iloc[start_idx] + prediction_timestamps[series_id] = prediction_timestamps_array[future_start_idx:future_end_idx] + task: dict[str, np.ndarray | dict[str, np.ndarray]] = {"target": target_array[:, start_idx:end_idx]} # Handle covariates if present - covariate_cols = [ - col for col in series_data.columns if col not in [id_column, timestamp_column] + target_columns - ] - - if covariate_cols: - past_covariates = {col: series_data[col].to_numpy() for col in covariate_cols} - task["past_covariates"] = past_covariates + if len(past_covariates_dict) > 0: + task["past_covariates"] = {col: values[start_idx:end_idx] for col, values in past_covariates_dict.items()} # Handle future covariates if future_df is not None: - assert future_series_lengths is not None - future_length = future_series_lengths[i] - future_data = future_df.iloc[future_start_idx : future_start_idx + future_length] - assert future_data[timestamp_column].iloc[0] == future_ts[0], ( + first_future_timestamp = future_df[timestamp_column].iloc[future_start_idx] + assert first_future_timestamp == prediction_timestamps[series_id][0], ( f"the first timestamp in future_df must be the first forecast timestamp, found mismatch " - f"({future_data[timestamp_column].iloc[0]} != {future_ts[0]}) in series {series_id}" + f"({first_future_timestamp} != {prediction_timestamps[series_id][0]}) in series {series_id}" ) - if len(future_data) > 0: - future_covariates = { - col: future_data[col].to_numpy() for col in covariate_cols if col in future_data.columns + if len(future_covariates_dict) > 0: + task["future_covariates"] = { + col: values[future_start_idx:future_end_idx] for col, values in future_covariates_dict.items() } - if future_covariates: - task["future_covariates"] = future_covariates - future_start_idx += future_length inputs.append(task) - start_idx += length assert len(inputs) == len(series_lengths)