Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/api_reference/processing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,7 @@ The processing module provides transformers for data processing and manipulation
.. autoclass:: tide.processing.TrimSequence
:members:
:show-inheritance:

.. autoclass:: tide.processing.WindowAggregate
:members:
:show-inheritance:
32 changes: 32 additions & 0 deletions tests/test_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
AddFourierPairs,
DropQuantile,
TrimSequence,
WindowAggregate,
)

RESOURCES_PATH = Path(__file__).parent / "resources"
Expand Down Expand Up @@ -1340,3 +1341,34 @@ def test_sequence_trim(self):
)

check_feature_names_out(trimer, res)

def test_window_aggregation(self):
df = pd.DataFrame(
{
"a": np.arange(10),
"b": np.arange(10) * 2,
},
index=pd.date_range("2009-01-01", freq="min", periods=10, tz="UTC"),
)

wa = WindowAggregate(
window_interval=("-3min", "-1min"),
agg_method="mean",
)

out = wa.fit_transform(df)

assert True

# # --- manual expected values ---
# # t = 00:09 → window [00:07, 00:08] → a = [7, 8], b = [14, 16]
assert out.loc["2009-01-01 00:09", "('-3min', '-1min')_a"] == 7.0
assert out.loc["2009-01-01 00:09", "('-3min', '-1min')_b"] == 14.0
#

assert wa.get_feature_names_out() == [
"a",
"b",
"('-3min', '-1min')_a",
"('-3min', '-1min')_b",
]
185 changes: 183 additions & 2 deletions tide/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from scipy.signal import detrend

from tide.base import BaseProcessing, BaseFiller, BaseOikoMeteo
from tide.math import time_gradient
from tide.math import time_gradient, time_integrate
from tide.utils import (
get_data_blocks,
get_outer_timestamps,
Expand All @@ -25,7 +25,13 @@
from tide.meteo import sun_position, beam_component, sky_diffuse, ground_diffuse
from tide.utils import get_tags_max_level

FUNCTION_MAP = {"mean": np.mean, "average": np.average, "sum": np.sum, "dot": np.dot}
FUNCTION_MAP = {
"mean": np.mean,
"average": np.average,
"sum": np.sum,
"dot": np.dot,
"integrate": time_integrate,
}

MODEL_MAP = {"STL": SkSTLForecast, "Prophet": SkProphet}

Expand Down Expand Up @@ -1624,6 +1630,181 @@ def _transform_implementation(self, X: pd.Series | pd.DataFrame):
return X_transformed


class WindowAggregate(BaseProcessing):
"""A transformer that adds window-aggregated features to a pandas DataFrame.

This transformer creates new features by applying aggregation functions over
rolling time windows of existing features.

Parameters
----------
window_interval : tuple[str, str], default=("-5min", "0min")
The time window interval for aggregation, specified as a tuple of
(start_offset, stop_offset) from each timestamp. Can be specified as:
- String tuples (e.g., ("-5min", "0min"), ("-1h", "-30min"))
- Both offsets must be parseable as pandas Timedelta objects
The window includes all data points between [t + start_offset, t + stop_offset].
Note: Future windows (stop_offset > 0) are not supported.

features_to_aggregate : str | list[str] | None, default=None
The features to create aggregated versions of. If None, all features in
the input DataFrame will be aggregated. Can be specified as:
- A single feature name (string)
- A list of feature names
- None (to aggregate all features)

feature_marker : str | None, default=None
The prefix to use for the new aggregated feature names. If None, the
string representation of window_interval followed by an underscore is used.
For example, with window_interval=("-5min", "0min"), features will be
prefixed with "('-5min', '0min')_".

agg_method : str, default="mean"
The aggregation method to apply over the rolling window. Must be a valid
pandas rolling aggregation method such as:
- "mean": Average value over the window
- "sum": Sum of values over the window
- "min": Minimum value over the window
- "max": Maximum value over the window
- "std": Standard deviation over the window
- "median": Median value over the window
- "count": Count of non-null values over the window

Examples
--------
>>> import pandas as pd
>>> from tide.processing import WindowAggregate
>>> # Create sample data
>>> dates = pd.date_range(start="2024-01-01", periods=6, freq="5min", tz="UTC")
>>> df = pd.DataFrame(
... {
... "power__W__building": [100, 150, 200, 180, 220, 250],
... "temp__°C__room": [20, 20.5, 21, 21.5, 22, 22.5],
... },
... index=dates,
... )
>>> # Add 10-minute rolling mean features (last 10 minutes up to current time)
>>> aggregator = WindowAggregate(window_interval=("-10min", "0min"))
>>> result = aggregator.fit_transform(df)
>>> print(result)
power__W__building temp__°C__room ('-10min', '0min')_power__W__building ('-10min', '0min')_temp__°C__room
2024-01-01 00:00:00 100.0 20.00 100.0 20.00
2024-01-01 00:05:00 150.0 20.50 125.0 20.25
2024-01-01 00:10:00 200.0 21.00 150.0 20.50
2024-01-01 00:15:00 180.0 21.50 176.7 21.00
2024-01-01 00:20:00 220.0 22.00 200.0 21.50
2024-01-01 00:25:00 250.0 22.50 216.7 22.00
>>> # Add custom aggregated features with specific marker and method
>>> aggregator_max = WindowAggregate(
... window_interval=("-15min", "0min"),
... feature_marker="max_15min_",
... agg_method="max",
... )
>>> result_max = aggregator_max.fit_transform(df)
>>> print(result_max)
power__W__building temp__°C__room max_15min_power__W__building max_15min_temp__°C__room
2024-01-01 00:00:00 100.0 20.00 100.0 20.00
2024-01-01 00:05:00 150.0 20.50 150.0 20.50
2024-01-01 00:10:00 200.0 21.00 200.0 21.00
2024-01-01 00:15:00 180.0 21.50 200.0 21.50
2024-01-01 00:20:00 220.0 22.00 220.0 22.00
2024-01-01 00:25:00 250.0 22.50 250.0 22.50

Notes
-----
- The transformer preserves the original features and adds new aggregated versions
- The rolling window is inclusive on both ends ("closed='both'")
- The minimum number of periods for aggregation is set to 1 (min_periods=1)
- Future windows (where stop_offset > 0) are not supported and will raise ValueError
- The window_interval must satisfy start_offset < stop_offset
- The aggregation method must be a valid pandas rolling method
- For each timestamp t, the aggregation is computed over [t + start_offset, t + stop_offset]
- The window is achieved by first shifting the data by -stop_offset, then applying
a rolling window of size (stop_offset - start_offset)

Returns
-------
pd.DataFrame
The input DataFrame with additional window-aggregated features. The original
features are preserved, and new aggregated features are added with the
specified prefix.

Raises
------
ValueError
If stop_offset > 0 (future windows not supported)
If start_offset >= stop_offset (invalid window interval)
If agg_method is not a valid pandas rolling aggregation method
"""

def __init__(
self,
window_interval: tuple[str, str] = ("-5min", "0min"),
feature_marker: str | None = None,
agg_method: str = "mean",
):
super().__init__()
self.window_interval = window_interval
self.feature_marker = feature_marker
self.agg_method = agg_method

def _fit_implementation(self, X: pd.DataFrame, y=None):
start, stop = self.window_interval
self.offset_start_ = pd.Timedelta(start)
self.offset_stop_ = pd.Timedelta(stop)

if self.offset_stop_ > pd.Timedelta(0):
raise ValueError("Future windows are not supported with rolling")
if self.offset_start_ >= self.offset_stop_:
raise ValueError("window_interval must satisfy start < stop")

self.feature_marker = (
f"{self.window_interval}_"
if self.feature_marker is None
else self.feature_marker
)

self.required_columns = list(X.columns)
self.new_columns_ = [self.feature_marker + col for col in self.required_columns]

self.feature_names_out_ = self.feature_names_in_ + self.new_columns_

self.shift_ = -self.offset_stop_ # e.g. 5min
self.rolling_window_ = self.offset_stop_ - self.offset_start_ # e.g. 10min

return self

def _transform_implementation(self, X: pd.DataFrame):
check_is_fitted(
self,
attributes=[
"required_columns",
"feature_names_out_",
"shift_",
"rolling_window_",
"new_columns_",
],
)

X_shifted = X.shift(freq=self.shift_)

rolling = X_shifted.rolling(
window=self.rolling_window_,
closed="both",
min_periods=1,
)

if not hasattr(rolling, self.agg_method):
raise ValueError(
f"Aggregation '{self.agg_method}' not supported by rolling"
)

X_agg = getattr(rolling, self.agg_method)()
X_agg.columns = self.new_columns_

return pd.concat([X, X_agg], axis=1)


class GaussianFilter1D(BaseProcessing):
"""A transformer that applies a 1D Gaussian filter to smooth time series data.

Expand Down