diff --git a/docs/api_reference/processing.rst b/docs/api_reference/processing.rst index d00b60f..fd7541e 100644 --- a/docs/api_reference/processing.rst +++ b/docs/api_reference/processing.rst @@ -122,3 +122,7 @@ The processing module provides transformers for data processing and manipulation .. autoclass:: tide.processing.TrimSequence :members: :show-inheritance: + +.. autoclass:: tide.processing.WindowAggregate + :members: + :show-inheritance: diff --git a/tests/test_processing.py b/tests/test_processing.py index 6760516..23bf798 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -41,6 +41,7 @@ AddFourierPairs, DropQuantile, TrimSequence, + WindowAggregate, ) RESOURCES_PATH = Path(__file__).parent / "resources" @@ -1340,3 +1341,34 @@ def test_sequence_trim(self): ) check_feature_names_out(trimer, res) + + def test_window_aggregation(self): + df = pd.DataFrame( + { + "a": np.arange(10), + "b": np.arange(10) * 2, + }, + index=pd.date_range("2009-01-01", freq="min", periods=10, tz="UTC"), + ) + + wa = WindowAggregate( + window_interval=("-3min", "-1min"), + agg_method="mean", + ) + + out = wa.fit_transform(df) + + assert True + + # # --- manual expected values --- + # # t = 00:09 → window [00:07, 00:08] → a = [7, 8], b = [14, 16] + assert out.loc["2009-01-01 00:09", "('-3min', '-1min')_a"] == 7.0 + assert out.loc["2009-01-01 00:09", "('-3min', '-1min')_b"] == 14.0 + # + + assert wa.get_feature_names_out() == [ + "a", + "b", + "('-3min', '-1min')_a", + "('-3min', '-1min')_b", + ] diff --git a/tide/processing.py b/tide/processing.py index ce2adfe..1451abc 100644 --- a/tide/processing.py +++ b/tide/processing.py @@ -12,7 +12,7 @@ from scipy.signal import detrend from tide.base import BaseProcessing, BaseFiller, BaseOikoMeteo -from tide.math import time_gradient +from tide.math import time_gradient, time_integrate from tide.utils import ( get_data_blocks, get_outer_timestamps, @@ -25,7 +25,13 @@ from tide.meteo import sun_position, beam_component, sky_diffuse, ground_diffuse from tide.utils import get_tags_max_level -FUNCTION_MAP = {"mean": np.mean, "average": np.average, "sum": np.sum, "dot": np.dot} +FUNCTION_MAP = { + "mean": np.mean, + "average": np.average, + "sum": np.sum, + "dot": np.dot, + "integrate": time_integrate, +} MODEL_MAP = {"STL": SkSTLForecast, "Prophet": SkProphet} @@ -1624,6 +1630,181 @@ def _transform_implementation(self, X: pd.Series | pd.DataFrame): return X_transformed +class WindowAggregate(BaseProcessing): + """A transformer that adds window-aggregated features to a pandas DataFrame. + + This transformer creates new features by applying aggregation functions over + rolling time windows of existing features. + + Parameters + ---------- + window_interval : tuple[str, str], default=("-5min", "0min") + The time window interval for aggregation, specified as a tuple of + (start_offset, stop_offset) from each timestamp. Can be specified as: + - String tuples (e.g., ("-5min", "0min"), ("-1h", "-30min")) + - Both offsets must be parseable as pandas Timedelta objects + The window includes all data points between [t + start_offset, t + stop_offset]. + Note: Future windows (stop_offset > 0) are not supported. + + features_to_aggregate : str | list[str] | None, default=None + The features to create aggregated versions of. If None, all features in + the input DataFrame will be aggregated. Can be specified as: + - A single feature name (string) + - A list of feature names + - None (to aggregate all features) + + feature_marker : str | None, default=None + The prefix to use for the new aggregated feature names. If None, the + string representation of window_interval followed by an underscore is used. + For example, with window_interval=("-5min", "0min"), features will be + prefixed with "('-5min', '0min')_". + + agg_method : str, default="mean" + The aggregation method to apply over the rolling window. Must be a valid + pandas rolling aggregation method such as: + - "mean": Average value over the window + - "sum": Sum of values over the window + - "min": Minimum value over the window + - "max": Maximum value over the window + - "std": Standard deviation over the window + - "median": Median value over the window + - "count": Count of non-null values over the window + + Examples + -------- + >>> import pandas as pd + >>> from tide.processing import WindowAggregate + >>> # Create sample data + >>> dates = pd.date_range(start="2024-01-01", periods=6, freq="5min", tz="UTC") + >>> df = pd.DataFrame( + ... { + ... "power__W__building": [100, 150, 200, 180, 220, 250], + ... "temp__°C__room": [20, 20.5, 21, 21.5, 22, 22.5], + ... }, + ... index=dates, + ... ) + >>> # Add 10-minute rolling mean features (last 10 minutes up to current time) + >>> aggregator = WindowAggregate(window_interval=("-10min", "0min")) + >>> result = aggregator.fit_transform(df) + >>> print(result) + power__W__building temp__°C__room ('-10min', '0min')_power__W__building ('-10min', '0min')_temp__°C__room + 2024-01-01 00:00:00 100.0 20.00 100.0 20.00 + 2024-01-01 00:05:00 150.0 20.50 125.0 20.25 + 2024-01-01 00:10:00 200.0 21.00 150.0 20.50 + 2024-01-01 00:15:00 180.0 21.50 176.7 21.00 + 2024-01-01 00:20:00 220.0 22.00 200.0 21.50 + 2024-01-01 00:25:00 250.0 22.50 216.7 22.00 + >>> # Add custom aggregated features with specific marker and method + >>> aggregator_max = WindowAggregate( + ... window_interval=("-15min", "0min"), + ... feature_marker="max_15min_", + ... agg_method="max", + ... ) + >>> result_max = aggregator_max.fit_transform(df) + >>> print(result_max) + power__W__building temp__°C__room max_15min_power__W__building max_15min_temp__°C__room + 2024-01-01 00:00:00 100.0 20.00 100.0 20.00 + 2024-01-01 00:05:00 150.0 20.50 150.0 20.50 + 2024-01-01 00:10:00 200.0 21.00 200.0 21.00 + 2024-01-01 00:15:00 180.0 21.50 200.0 21.50 + 2024-01-01 00:20:00 220.0 22.00 220.0 22.00 + 2024-01-01 00:25:00 250.0 22.50 250.0 22.50 + + Notes + ----- + - The transformer preserves the original features and adds new aggregated versions + - The rolling window is inclusive on both ends ("closed='both'") + - The minimum number of periods for aggregation is set to 1 (min_periods=1) + - Future windows (where stop_offset > 0) are not supported and will raise ValueError + - The window_interval must satisfy start_offset < stop_offset + - The aggregation method must be a valid pandas rolling method + - For each timestamp t, the aggregation is computed over [t + start_offset, t + stop_offset] + - The window is achieved by first shifting the data by -stop_offset, then applying + a rolling window of size (stop_offset - start_offset) + + Returns + ------- + pd.DataFrame + The input DataFrame with additional window-aggregated features. The original + features are preserved, and new aggregated features are added with the + specified prefix. + + Raises + ------ + ValueError + If stop_offset > 0 (future windows not supported) + If start_offset >= stop_offset (invalid window interval) + If agg_method is not a valid pandas rolling aggregation method + """ + + def __init__( + self, + window_interval: tuple[str, str] = ("-5min", "0min"), + feature_marker: str | None = None, + agg_method: str = "mean", + ): + super().__init__() + self.window_interval = window_interval + self.feature_marker = feature_marker + self.agg_method = agg_method + + def _fit_implementation(self, X: pd.DataFrame, y=None): + start, stop = self.window_interval + self.offset_start_ = pd.Timedelta(start) + self.offset_stop_ = pd.Timedelta(stop) + + if self.offset_stop_ > pd.Timedelta(0): + raise ValueError("Future windows are not supported with rolling") + if self.offset_start_ >= self.offset_stop_: + raise ValueError("window_interval must satisfy start < stop") + + self.feature_marker = ( + f"{self.window_interval}_" + if self.feature_marker is None + else self.feature_marker + ) + + self.required_columns = list(X.columns) + self.new_columns_ = [self.feature_marker + col for col in self.required_columns] + + self.feature_names_out_ = self.feature_names_in_ + self.new_columns_ + + self.shift_ = -self.offset_stop_ # e.g. 5min + self.rolling_window_ = self.offset_stop_ - self.offset_start_ # e.g. 10min + + return self + + def _transform_implementation(self, X: pd.DataFrame): + check_is_fitted( + self, + attributes=[ + "required_columns", + "feature_names_out_", + "shift_", + "rolling_window_", + "new_columns_", + ], + ) + + X_shifted = X.shift(freq=self.shift_) + + rolling = X_shifted.rolling( + window=self.rolling_window_, + closed="both", + min_periods=1, + ) + + if not hasattr(rolling, self.agg_method): + raise ValueError( + f"Aggregation '{self.agg_method}' not supported by rolling" + ) + + X_agg = getattr(rolling, self.agg_method)() + X_agg.columns = self.new_columns_ + + return pd.concat([X, X_agg], axis=1) + + class GaussianFilter1D(BaseProcessing): """A transformer that applies a 1D Gaussian filter to smooth time series data.