mirror of
https://github.com/amazon-science/chronos-forecasting
synced 2026-05-24 01:58:27 +00:00
Optimize convert_df_input_to_list_of_dicts_input and validate_df_inputs (#395)
*Issue #, if available:* Addresses #391 *Description of changes:* - Speed up `convert_df_input_to_list_of_dicts_input` and `validate_df_inputs` via a few tricks: - Replace `df.iloc[start_idx:end_idx][col]` with `df[col].iloc[start_idx:end_idx]` to avoid copying data on each slice - Vectorize computation of future timestamps using numpy - Work with `dict[str, np.ndarray]` instead of `pd.DataFrame` when working with covariates to avoid repeated `.to_numpy()` calls. **Before** ``` Benchmarking 20000 series, 200 steps, 0 covariates... Average runtime: 27.33s Benchmarking 20000 series, 200 steps, 5 covariates... Average runtime: 44.69s ``` **After** ``` Benchmarking 20000 series, 200 steps, 0 covariates... Average runtime: 4.60s Benchmarking 20000 series, 200 steps, 5 covariates... Average runtime: 8.92s ``` <details> ```python import time import numpy as np import pandas as pd from chronos.df_utils import convert_df_input_to_list_of_dicts_input def benchmark_convert_df_input( num_items: int, num_steps: int, num_covariates: int = 0, num_trials: int = 10, freq: str = "D" ) -> None: """ Benchmark convert_df_input_to_list_of_dicts_input function. Args: num_items: Number of time series num_steps: Number of observations per series num_covariates: Number of covariates to include num_trials: Number of benchmark trials freq: Frequency string for timestamps """ prediction_length = 24 # Generate context DataFrame item_ids = np.repeat(np.arange(num_items), num_steps) timestamps = np.tile(pd.date_range("2020-01-01", periods=num_steps, freq=freq), num_items) df_data = {"item_id": item_ids, "timestamp": timestamps, "target": np.random.randn(num_items * num_steps)} df_data.update({f"cov_{i}": np.random.randn(num_items * num_steps) for i in range(num_covariates)}) df = pd.DataFrame(df_data) # Generate future_df with covariates future_df = None if num_covariates > 0: future_item_ids = np.repeat(np.arange(num_items), prediction_length) offset = pd.tseries.frequencies.to_offset(freq) future_start = pd.Timestamp("2020-01-01") + num_steps * offset future_timestamps = np.tile(pd.date_range(start=future_start, periods=prediction_length, freq=freq), num_items) future_data = {"item_id": future_item_ids, "timestamp": future_timestamps} future_data.update({f"cov_{i}": np.random.randn(num_items * prediction_length) for i in range(num_covariates)}) future_df = pd.DataFrame(future_data) times = [] print(f"Benchmarking {num_items} series, {num_steps} steps, {num_covariates} covariates...") for _ in range(num_trials): start = time.perf_counter() convert_df_input_to_list_of_dicts_input( df=df, future_df=future_df, id_column="item_id", timestamp_column="timestamp", target_columns=["target"], prediction_length=prediction_length, ) end = time.perf_counter() times.append(end - start) print(f"Average runtime: {sum(times) / len(times):.2f}s") if __name__ == "__main__": # Test without covariates benchmark_convert_df_input(20_000, 200, num_covariates=0, num_trials=1) # Test with covariates benchmark_convert_df_input(20_000, 200, num_covariates=5, num_trials=1) ``` </details> By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
This commit is contained in:
parent
d6f8ea4301
commit
bcd563eb66
1 changed files with 46 additions and 44 deletions
|
|
@ -4,6 +4,7 @@
|
|||
# Authors: Abdul Fatir Ansari <ansarnd@amazon.com>
|
||||
|
||||
|
||||
import warnings
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import numpy as np
|
||||
|
|
@ -62,7 +63,7 @@ def validate_df_inputs(
|
|||
prediction_length: int,
|
||||
id_column: str = "item_id",
|
||||
timestamp_column: str = "timestamp",
|
||||
) -> tuple["pd.DataFrame", "pd.DataFrame | None", "pd.Timedelta", list[int], list[int] | None, np.ndarray]:
|
||||
) -> tuple["pd.DataFrame", "pd.DataFrame | None", str, list[int], np.ndarray]:
|
||||
"""
|
||||
Validates and prepares dataframe inputs
|
||||
|
||||
|
|
@ -95,7 +96,6 @@ def validate_df_inputs(
|
|||
- Validated and sorted future dataframe (if provided)
|
||||
- Inferred frequency of the time series
|
||||
- List of series lengths from input dataframe
|
||||
- List of series lengths from future dataframe (if provided)
|
||||
- Original order of time series IDs
|
||||
|
||||
Raises
|
||||
|
|
@ -147,7 +147,7 @@ def validate_df_inputs(
|
|||
# Get series lengths
|
||||
series_lengths = df[id_column].value_counts(sort=False).to_list()
|
||||
|
||||
def validate_freq(timestamps: pd.Series, series_id: str):
|
||||
def validate_freq(timestamps: pd.DatetimeIndex, series_id: str):
|
||||
freq = pd.infer_freq(timestamps)
|
||||
if not freq:
|
||||
raise ValueError(f"Could not infer frequency for series {series_id}")
|
||||
|
|
@ -156,16 +156,15 @@ def validate_df_inputs(
|
|||
# Validate each series
|
||||
all_freqs = []
|
||||
start_idx = 0
|
||||
timestamp_index = pd.DatetimeIndex(df[timestamp_column])
|
||||
for length in series_lengths:
|
||||
if length < 3:
|
||||
series_id = df.iloc[start_idx][id_column]
|
||||
series_id = df[id_column].iloc[start_idx]
|
||||
raise ValueError(
|
||||
f"Every time series must have at least 3 data points, found {length=} for series {series_id}"
|
||||
)
|
||||
|
||||
series_data = df.iloc[start_idx : start_idx + length]
|
||||
timestamps = series_data[timestamp_column]
|
||||
series_id = series_data.iloc[0][id_column]
|
||||
timestamps = timestamp_index[start_idx : start_idx + length]
|
||||
series_id = df[id_column].iloc[start_idx]
|
||||
all_freqs.append(validate_freq(timestamps, series_id))
|
||||
start_idx += length
|
||||
|
||||
|
|
@ -190,10 +189,10 @@ def validate_df_inputs(
|
|||
|
||||
# Validate future series lengths match prediction_length
|
||||
future_start_idx = 0
|
||||
future_timestamps_index = pd.DatetimeIndex(future_df[timestamp_column])
|
||||
for future_length in future_series_lengths:
|
||||
future_series_data = future_df.iloc[future_start_idx : future_start_idx + future_length]
|
||||
future_timestamps = future_series_data[timestamp_column]
|
||||
future_series_id = future_series_data.iloc[0][id_column]
|
||||
future_timestamps = future_timestamps_index[future_start_idx : future_start_idx + future_length]
|
||||
future_series_id = future_df[id_column].iloc[future_start_idx]
|
||||
if future_length != prediction_length:
|
||||
raise ValueError(
|
||||
f"Future covariates all time series must have length {prediction_length}, got {future_length} for series {future_series_id}"
|
||||
|
|
@ -206,7 +205,7 @@ def validate_df_inputs(
|
|||
|
||||
assert len(series_lengths) == len(future_series_lengths)
|
||||
|
||||
return df, future_df, inferred_freq, series_lengths, future_series_lengths, original_order
|
||||
return df, future_df, inferred_freq, series_lengths, original_order
|
||||
|
||||
|
||||
def convert_df_input_to_list_of_dicts_input(
|
||||
|
|
@ -252,7 +251,7 @@ def convert_df_input_to_list_of_dicts_input(
|
|||
|
||||
import pandas as pd
|
||||
|
||||
df, future_df, freq, series_lengths, future_series_lengths, original_order = validate_df_inputs(
|
||||
df, future_df, freq, series_lengths, original_order = validate_df_inputs(
|
||||
df,
|
||||
future_df=future_df,
|
||||
id_column=id_column,
|
||||
|
|
@ -264,50 +263,53 @@ def convert_df_input_to_list_of_dicts_input(
|
|||
# Convert to list of dicts format
|
||||
inputs: list[dict[str, np.ndarray | dict[str, np.ndarray]]] = []
|
||||
prediction_timestamps: dict[str, pd.DatetimeIndex] = {}
|
||||
start_idx: int = 0
|
||||
future_start_idx: int = 0
|
||||
|
||||
for i, length in enumerate(series_lengths):
|
||||
series_data = df.iloc[start_idx : start_idx + length]
|
||||
# Extract target(s)
|
||||
target_data = series_data[target_columns].to_numpy().T # Shape: (n_targets, history_length)
|
||||
task: dict[str, np.ndarray | dict[str, np.ndarray]] = {"target": target_data}
|
||||
indptr = np.concatenate([[0], np.cumsum(series_lengths)]).astype("int64")
|
||||
target_array = df[target_columns].to_numpy().T # Shape: (n_targets, len(df))
|
||||
last_ts = pd.DatetimeIndex(df[timestamp_column].iloc[indptr[1:] - 1]) # Shape: (n_series,)
|
||||
offset = pd.tseries.frequencies.to_offset(freq)
|
||||
with warnings.catch_warnings():
|
||||
# Silence PerformanceWarning for non-vectorized offsets https://github.com/pandas-dev/pandas/blob/95624ca2e99b0/pandas/core/arrays/datetimes.py#L822
|
||||
warnings.simplefilter("ignore", category=pd.errors.PerformanceWarning)
|
||||
# Generate all prediction timestamps at once by stacking offsets into shape (n_series * prediction_length)
|
||||
prediction_timestamps_array = pd.DatetimeIndex(
|
||||
np.dstack([last_ts + step * offset for step in range(1, prediction_length + 1)]).ravel()
|
||||
)
|
||||
|
||||
# Generate future timestamps
|
||||
series_id = series_data.iloc[0][id_column]
|
||||
last_timestamp = series_data[timestamp_column].iloc[-1]
|
||||
future_ts = pd.date_range(start=last_timestamp, periods=prediction_length + 1, freq=freq)[1:]
|
||||
prediction_timestamps[series_id] = future_ts
|
||||
past_covariates_dict = {
|
||||
col: df[col].to_numpy() for col in df.columns if col not in [id_column, timestamp_column] + target_columns
|
||||
}
|
||||
if future_df is not None:
|
||||
future_covariates_dict = {
|
||||
col: future_df[col].to_numpy() for col in future_df.columns if col not in [id_column, timestamp_column]
|
||||
}
|
||||
|
||||
for i in range(len(series_lengths)):
|
||||
start_idx, end_idx = indptr[i], indptr[i + 1]
|
||||
future_start_idx, future_end_idx = i * prediction_length, (i + 1) * prediction_length
|
||||
|
||||
series_id = df[id_column].iloc[start_idx]
|
||||
prediction_timestamps[series_id] = prediction_timestamps_array[future_start_idx:future_end_idx]
|
||||
task: dict[str, np.ndarray | dict[str, np.ndarray]] = {"target": target_array[:, start_idx:end_idx]}
|
||||
|
||||
# Handle covariates if present
|
||||
covariate_cols = [
|
||||
col for col in series_data.columns if col not in [id_column, timestamp_column] + target_columns
|
||||
]
|
||||
|
||||
if covariate_cols:
|
||||
past_covariates = {col: series_data[col].to_numpy() for col in covariate_cols}
|
||||
task["past_covariates"] = past_covariates
|
||||
if len(past_covariates_dict) > 0:
|
||||
task["past_covariates"] = {col: values[start_idx:end_idx] for col, values in past_covariates_dict.items()}
|
||||
|
||||
# Handle future covariates
|
||||
if future_df is not None:
|
||||
assert future_series_lengths is not None
|
||||
future_length = future_series_lengths[i]
|
||||
future_data = future_df.iloc[future_start_idx : future_start_idx + future_length]
|
||||
assert future_data[timestamp_column].iloc[0] == future_ts[0], (
|
||||
first_future_timestamp = future_df[timestamp_column].iloc[future_start_idx]
|
||||
assert first_future_timestamp == prediction_timestamps[series_id][0], (
|
||||
f"the first timestamp in future_df must be the first forecast timestamp, found mismatch "
|
||||
f"({future_data[timestamp_column].iloc[0]} != {future_ts[0]}) in series {series_id}"
|
||||
f"({first_future_timestamp} != {prediction_timestamps[series_id][0]}) in series {series_id}"
|
||||
)
|
||||
|
||||
if len(future_data) > 0:
|
||||
future_covariates = {
|
||||
col: future_data[col].to_numpy() for col in covariate_cols if col in future_data.columns
|
||||
if len(future_covariates_dict) > 0:
|
||||
task["future_covariates"] = {
|
||||
col: values[future_start_idx:future_end_idx] for col, values in future_covariates_dict.items()
|
||||
}
|
||||
if future_covariates:
|
||||
task["future_covariates"] = future_covariates
|
||||
future_start_idx += future_length
|
||||
|
||||
inputs.append(task)
|
||||
start_idx += length
|
||||
|
||||
assert len(inputs) == len(series_lengths)
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue