From 9ccd08f7b10c8c0c63640c71ebe2caa304d338e4 Mon Sep 17 00:00:00 2001 From: akshatvishu <33392262+akshatvishu@users.noreply.github.com> Date: Thu, 7 Mar 2024 23:26:59 +0530 Subject: [PATCH] 380: Initial implementation of moving_product operation (#383) * WIP: Initial implementation of moving_product operation * Remove the wrong use of override keyword at Reset Method in the MovingProductAccumulator class * Implement On-Demand Product Calculation in MovingProductAccumulator Key Changes: - Start and end indices (`start_idx` and `end_idx`) introduced to track the window. - `Add` and `Remove` methods updated to adjust the window indices without affecting the product. - `Result` method now calculates the product on-demand, considering zeros and ignoring NaN values for accuracy. - Added a TODO comment to explore future optimizations for the Result method to enhance calculation efficiency. * add test and doc for moving_product * Optimize moving_product logic to directly return zero for windows with zero values * fix(docs): Update docs and adjust example values for clarity in event_set_ops.py fix(moving_product): Update the moving_product logic to return NaN when the input only contains NaN. * feat(window_op): Add cumprod operator * style: format cumprod related files with Black * docs: remove un-wanted function description from moving_product at event_set_ops.py * fix(test): Pass the correct dtype at docstring example for moving_product in event_set_ops.py * fix(test_moving_product): Correct calculation errors in test_with_sampling and test_with_variable_winlen_same_sampling. --- docs/src/reference/index.md | 26 +- .../temporian/operators/window/cumprod.md | 1 + .../operators/window/moving_product.md | 1 + temporian/core/event_set_ops.py | 134 ++++++++++ temporian/core/operators/window/BUILD | 16 ++ temporian/core/operators/window/__init__.py | 2 + .../core/operators/window/moving_product.py | 85 +++++++ temporian/core/operators/window/test/BUILD | 26 ++ .../operators/window/test/test_cumprod.py | 62 +++++ .../window/test/test_moving_product.py | 231 ++++++++++++++++++ .../core/test/registered_operators_test.py | 1 + .../implementation/numpy/operators/BUILD | 1 + .../numpy/operators/__init__.py | 1 + .../numpy/operators/window/BUILD | 12 + .../numpy/operators/window/moving_product.py | 35 +++ .../numpy/test/registered_operators_test.py | 1 + .../numpy_cc/operators/window.cc | 60 +++++ 17 files changed, 682 insertions(+), 13 deletions(-) create mode 100644 docs/src/reference/temporian/operators/window/cumprod.md create mode 100644 docs/src/reference/temporian/operators/window/moving_product.md create mode 100644 temporian/core/operators/window/moving_product.py create mode 100644 temporian/core/operators/window/test/test_cumprod.py create mode 100644 temporian/core/operators/window/test/test_moving_product.py create mode 100644 temporian/implementation/numpy/operators/window/moving_product.py diff --git a/docs/src/reference/index.md b/docs/src/reference/index.md index 596be48af..196204964 100644 --- a/docs/src/reference/index.md +++ b/docs/src/reference/index.md @@ -45,13 +45,13 @@ Check the index on the left for a more detailed description of any symbol. | [`tp.combine()`][temporian.combine] | Combines events from [`EventSets`][temporian.EventSet] with different samplings. | | [`tp.glue()`][temporian.glue] | Concatenates features from [`EventSets`][temporian.EventSet] with the same sampling. | | [`EventSet.abs()`][temporian.EventSet.abs] | Computes the absolute value of the features. | -| [`EventSet.add_index()`][temporian.EventSet.add_index] | Adds indexes to an [`EventSet`][temporian.EventSet]. -| [`EventSet.arccos()`][temporian.EventSet.arccos] | Computes the inverse cosine of the features. -| [`EventSet.arcsin()`][temporian.EventSet.arcsin] | Computes the inverse sine of the features. -| [`EventSet.arctan()`][temporian.EventSet.arctan] | Computes the inverse tangent of the features. | +| [`EventSet.add_index()`][temporian.EventSet.add_index] | Adds indexes to an [`EventSet`][temporian.EventSet]. | +| [`EventSet.arccos()`][temporian.EventSet.arccos] | Computes the inverse cosine of the features. | +| [`EventSet.arcsin()`][temporian.EventSet.arcsin] | Computes the inverse sine of the features. | +| [`EventSet.arctan()`][temporian.EventSet.arctan] | Computes the inverse tangent of the features. | | [`EventSet.begin()`][temporian.EventSet.begin] | Generates a single timestamp at the beginning of the input. | -| [`EventSet.cast()`][temporian.EventSet.cast] | Casts the dtype of features. -| [`EventSet.cos()`][temporian.EventSet.cos] | Computes the cosine of the features. | +| [`EventSet.cast()`][temporian.EventSet.cast] | Casts the dtype of features. | +| [`EventSet.cos()`][temporian.EventSet.cos] | Computes the cosine of the features. | | [`EventSet.drop_index()`][temporian.EventSet.drop_index] | Removes indexes from an [`EventSet`][temporian.EventSet]. | | [`EventSet.end()`][temporian.EventSet.end] | Generates a single timestamp at the end of the input. | | [`EventSet.enumerate()`][temporian.EventSet.enumerate] | Creates an ordinal feature enumerating the events according to their timestamp. | @@ -72,10 +72,10 @@ Check the index on the left for a more detailed description of any symbol. | [`EventSet.resample()`][temporian.EventSet.resample] | Resamples an [`EventSet`][temporian.EventSet] at each timestamp of another [`EventSet`][temporian.EventSet]. | | [`EventSet.select()`][temporian.EventSet.select] | Selects a subset of features from an [`EventSet`][temporian.EventSet]. | | [`EventSet.select_index_values()`][temporian.EventSet.select_index_values] | Selects a subset of index values from an [`EventSet`][temporian.EventSet]. | -| [`EventSet.set_index()`][temporian.EventSet.set_index] | Replaces the indexes in an [`EventSet`][temporian.EventSet]. -| [`EventSet.sin()`][temporian.EventSet.sin] | Computes the sine of the features. | -| [`EventSet.since_last()`][temporian.EventSet.since_last] | Computes the amount of time since the last distinct timestamp. -| [`EventSet.tan()`][temporian.EventSet.tan] | Computes the tangent of the features. | +| [`EventSet.set_index()`][temporian.EventSet.set_index] | Replaces the indexes in an [`EventSet`][temporian.EventSet]. | +| [`EventSet.sin()`][temporian.EventSet.sin] | Computes the sine of the features. | +| [`EventSet.since_last()`][temporian.EventSet.since_last] | Computes the amount of time since the last distinct timestamp. | +| [`EventSet.tan()`][temporian.EventSet.tan] | Computes the tangent of the features. | | [`EventSet.tick()`][temporian.EventSet.tick] | Generates timestamps at regular intervals in the range of a guide. | | [`EventSet.tick_calendar()`][temporian.EventSet.tick] | Generates timestamps at the specified calendar date-time events. | | [`EventSet.timestamps()`][temporian.EventSet.timestamps] | Creates a feature from the events timestamps (`float64`). | @@ -91,9 +91,9 @@ Check the index on the left for a more detailed description of any symbol. ### Window operators -| Symbols | Description | -| -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------- | -| [`EventSet.simple_moving_average()`][temporian.EventSet.simple_moving_average] [`EventSet.moving_standard_deviation()`][temporian.EventSet.moving_standard_deviation] [`EventSet.cumsum()`][temporian.EventSet.cumsum] [`EventSet.moving_sum()`][temporian.EventSet.moving_sum] [`EventSet.moving_count()`][temporian.EventSet.moving_count] [`EventSet.moving_min()`][temporian.EventSet.moving_min] [`EventSet.moving_max()`][temporian.EventSet.moving_max] | Compute an operation on the values in a sliding window over an EventSet's timestamps. | +| Symbols | Description | +| ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------- | +| [`EventSet.simple_moving_average()`][temporian.EventSet.simple_moving_average] [`EventSet.moving_standard_deviation()`][temporian.EventSet.moving_standard_deviation] [`EventSet.cumsum()`][temporian.EventSet.cumsum] [`EventSet.moving_sum()`][temporian.EventSet.moving_sum] [`EventSet.moving_count()`][temporian.EventSet.moving_count] [`EventSet.moving_min()`][temporian.EventSet.moving_min] [`EventSet.moving_max()`][temporian.EventSet.moving_max] [`EventSet.cumprod()`][temporian.EventSet.cumprod] [`EventSet.moving_product()`][temporian.EventSet.moving_product] | Compute an operation on the values in a sliding window over an EventSet's timestamps. | ### Python operators diff --git a/docs/src/reference/temporian/operators/window/cumprod.md b/docs/src/reference/temporian/operators/window/cumprod.md new file mode 100644 index 000000000..197e91957 --- /dev/null +++ b/docs/src/reference/temporian/operators/window/cumprod.md @@ -0,0 +1 @@ +::: temporian.EventSet.cumprod diff --git a/docs/src/reference/temporian/operators/window/moving_product.md b/docs/src/reference/temporian/operators/window/moving_product.md new file mode 100644 index 000000000..1974eb360 --- /dev/null +++ b/docs/src/reference/temporian/operators/window/moving_product.md @@ -0,0 +1 @@ +::: temporian.EventSet.moving_product diff --git a/temporian/core/event_set_ops.py b/temporian/core/event_set_ops.py index 62fdda797..ed980e4c8 100644 --- a/temporian/core/event_set_ops.py +++ b/temporian/core/event_set_ops.py @@ -2133,6 +2133,78 @@ def cast( return cast(self, target=target, check_overflow=check_overflow) + def cumprod( + self: EventSetOrNode, + sampling: Optional[EventSetOrNode] = None, + ) -> EventSetOrNode: + """Computes the cumulative product of values over each feature in an + [`EventSet`][temporian.EventSet]. + + This operation only supports floating-point features. + + Missing (NaN) values are not accounted for. The output will be NaN until + the input contains at least one numeric value. + + Warning: The `cumprod` function leverages an infinite window length for + its calculations, which may lead to considerable computational overhead + with increasing dataset sizes. + + Example: + ```python + >>> a = tp.event_set( + ... timestamps=[0, 1, 2, 3], + ... features={"value": [1.0, 2.0, 10.0, 12.0]}, + ... ) + + >>> b = a.cumprod() + >>> b + indexes: ... + (4 events): + timestamps: [0. 1. 2. 3.] + 'value': [ 1. 2. 20. 240.] + ... + + ``` + + Examples with sampling: + ```python + >>> a = tp.event_set( + ... timestamps=[0, 1, 2, 5, 6, 7], + ... features={"value": [1, 2, 10, 12, np.nan, 2]}, + ... ) + + >>> # Cumulative product at 5 and 10 + >>> b = tp.event_set(timestamps=[5, 10]) + >>> c = a.cumprod(sampling=b) + >>> c + indexes: ... + (2 events): + timestamps: [ 5. 10.] + 'value': [240. 480.] + ... + + >>> # Product all values in the EventSet + >>> c = a.cumprod(sampling=a.end()) + >>> c + indexes: ... + (1 events): + timestamps: [7.] + 'value': [480.] + ... + + ``` + + Args: + sampling: Timestamps to sample the sliding window's value at. If not + provided, timestamps in the input are used. + + Returns: + Cumulative product of each feature. + """ + from temporian.core.operators.window.moving_product import cumprod + + return cumprod(self, sampling=sampling) + def cumsum( self: EventSetOrNode, sampling: Optional[EventSetOrNode] = None, @@ -3171,6 +3243,68 @@ def moving_standard_deviation( self, window_length=window_length, sampling=sampling ) + def moving_product( + self: EventSetOrNode, + window_length: WindowLength, + sampling: Optional[EventSetOrNode] = None, + ) -> EventSetOrNode: + """Computes the product of values in a sliding window over an + [`EventSet`][temporian.EventSet]. + + This operation only supports floating-point features. + + For each t in sampling, and for each feature independently, returns at + time t the product of non-zero and non-NaN values for the feature in the window + (t - window_length, t]. + + `sampling` can't be specified if a variable `window_length` is + specified (i.e., if `window_length` is an EventSet). + + If `sampling` is specified or `window_length` is an EventSet, the moving + window is sampled at each timestamp in them, else it is sampled on the + input's. + + Zeros result in the accumulator's result being 0 for the window. NaN values are ignored in the + calculation of the product. If the window does not contain any NaN, zero or any non-zero values (e.g., + all values are missing), the output for that window is an empty array. + + Example: + ```python + >>> a = tp.event_set( + ... timestamps=[0, 1, 2], + ... features={"value": [np.nan, 1, 5]}, + ... ) + + >>> b = a.moving_product(tp.duration.seconds(1)) + >>> b + indexes: ... + (3 events): + timestamps: [0. 1. 2.] + 'value': [nan 1. 5.] + ... + + ``` + + See [`EventSet.moving_count()`][temporian.EventSet.moving_count] for + examples of moving window operations with external sampling and indices. + + Args: + window_length: Sliding window's length. + sampling: Timestamps to sample the sliding window's value at. If not + provided, timestamps in the input are used. + + Returns: + EventSet containing the moving product of each feature in the input, + considering non-zero and non-NaN values only. + """ + from temporian.core.operators.window.moving_product import ( + moving_product, + ) + + return moving_product( + self, window_length=window_length, sampling=sampling + ) + def moving_sum( self: EventSetOrNode, window_length: WindowLength, diff --git a/temporian/core/operators/window/BUILD b/temporian/core/operators/window/BUILD index cd42102cd..680fcda90 100644 --- a/temporian/core/operators/window/BUILD +++ b/temporian/core/operators/window/BUILD @@ -17,6 +17,7 @@ py_library( ":moving_standard_deviation", ":moving_sum", ":simple_moving_average", + ":moving_product", ], ) @@ -126,3 +127,18 @@ py_library( "//temporian/core/data:schema", ], ) + +py_library( + name = "moving_product", + srcs = ["moving_product.py"], + srcs_version = "PY3", + deps = [ + ":base", + "//temporian/core:compilation", + "//temporian/core:operator_lib", + "//temporian/core:typing", + "//temporian/core/data:dtype", + "//temporian/core/data:node", + "//temporian/core/data:schema", + ], +) diff --git a/temporian/core/operators/window/__init__.py b/temporian/core/operators/window/__init__.py index 93e2e9aaa..f4202294e 100644 --- a/temporian/core/operators/window/__init__.py +++ b/temporian/core/operators/window/__init__.py @@ -25,3 +25,5 @@ from temporian.core.operators.window.moving_count import moving_count from temporian.core.operators.window.moving_min import moving_min from temporian.core.operators.window.moving_max import moving_max +from temporian.core.operators.window.moving_product import cumprod +from temporian.core.operators.window.moving_product import moving_product diff --git a/temporian/core/operators/window/moving_product.py b/temporian/core/operators/window/moving_product.py new file mode 100644 index 000000000..2d87bbd6d --- /dev/null +++ b/temporian/core/operators/window/moving_product.py @@ -0,0 +1,85 @@ +# Copyright 2021 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Moving Product operator class and public API function definition..""" + +from typing import Optional + +import numpy as np + +from temporian.core import operator_lib +from temporian.core.compilation import compile +from temporian.core.data.dtype import DType +from temporian.core.data.node import EventSetNode +from temporian.core.data.schema import FeatureSchema +from temporian.core.operators.window.base import BaseWindowOperator +from temporian.utils.typecheck import typecheck +from temporian.core.typing import EventSetOrNode, WindowLength + + +class MovingProductOperator(BaseWindowOperator): + """ + Window operator to compute the moving product. + """ + + @classmethod + def operator_def_key(cls) -> str: + return "MOVING_PRODUCT" + + def get_feature_dtype(self, feature: FeatureSchema) -> DType: + if not feature.dtype.is_float: + raise ValueError( + "moving_product requires the input EventSet to contain" + " floating point features only, but received feature" + f" {feature.name!r} with type {feature.dtype}. Note: You can" + " cast features e.g. `.cast(tp.float32)`" + ) + return ( + DType.FLOAT32 if feature.dtype == DType.FLOAT32 else DType.FLOAT64 + ) + + +operator_lib.register_operator(MovingProductOperator) + + +@typecheck +@compile +def moving_product( + input: EventSetOrNode, + window_length: WindowLength, + sampling: Optional[EventSetOrNode] = None, +) -> EventSetOrNode: + assert isinstance(input, EventSetNode) + if sampling is not None: + assert isinstance(sampling, EventSetNode) + + return MovingProductOperator( + input=input, + window_length=window_length, + sampling=sampling, + ).outputs["output"] + + +@compile +def cumprod( + input: EventSetOrNode, + sampling: Optional[EventSetOrNode] = None, +) -> EventSetOrNode: + assert isinstance(input, EventSetNode) + if sampling is not None: + assert isinstance(sampling, EventSetNode) + + return MovingProductOperator( + input=input, window_length=np.inf, sampling=sampling + ).outputs["output"] diff --git a/temporian/core/operators/window/test/BUILD b/temporian/core/operators/window/test/BUILD index 1b68e9e90..ed625c49f 100644 --- a/temporian/core/operators/window/test/BUILD +++ b/temporian/core/operators/window/test/BUILD @@ -18,6 +18,19 @@ py_test( ], ) +py_test( + name = "test_cumprod", + srcs = ["test_cumprod.py"], + srcs_version = "PY3", + deps = [ + # already_there/absl/testing:absltest + # already_there/absl/testing:parameterized + "//temporian/implementation/numpy/data:io", + "//temporian/core/data:duration", + "//temporian/test:utils", + ], +) + py_test( name = "test_cumsum", srcs = ["test_cumsum.py"], @@ -83,6 +96,19 @@ py_test( ], ) +py_test( + name = "test_moving_product", + srcs = ["test_moving_product.py"], + srcs_version = "PY3", + deps = [ + # already_there/absl/testing:absltest + # already_there/absl/testing:parameterized + "//temporian/implementation/numpy/data:io", + "//temporian/core/data:duration", + "//temporian/test:utils", + ], +) + py_test( name = "test_moving_standard_deviation", srcs = ["test_moving_standard_deviation.py"], diff --git a/temporian/core/operators/window/test/test_cumprod.py b/temporian/core/operators/window/test/test_cumprod.py new file mode 100644 index 000000000..e73b54d99 --- /dev/null +++ b/temporian/core/operators/window/test/test_cumprod.py @@ -0,0 +1,62 @@ +# Copyright 2021 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from absl.testing import absltest +from absl.testing.parameterized import TestCase + +from temporian.implementation.numpy.data.io import event_set +from temporian.test.utils import assertOperatorResult + + +class CumprodTest(TestCase): + def test_basic(self): + evset = event_set( + timestamps=[1.0, 2.0, 3.0, 1.1, 2.1, 3.1, 1.2, 2.2, 3.2], + features={ + "x": ["X1", "X1", "X1", "X2", "X2", "X2", "X2", "X2", "X2"], + "y": ["Y1", "Y1", "Y1", "Y1", "Y1", "Y1", "Y2", "Y2", "Y2"], + "a": [10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0], + "b": [1.0, -1.0, 2.0, -3.0, -8.0, 0.0, 5.0, 3.0, -1.0], + }, + indexes=["x", "y"], + ) + + expected = event_set( + timestamps=[1.0, 2.0, 3.0, 1.1, 2.1, 3.1, 1.2, 2.2, 3.2], + features={ + "x": ["X1", "X1", "X1", "X2", "X2", "X2", "X2", "X2", "X2"], + "y": ["Y1", "Y1", "Y1", "Y1", "Y1", "Y1", "Y2", "Y2", "Y2"], + "a": [ + 10.0, + 110.0, + 1320.0, + 13.0, + 182.0, + 2730.0, + 16.0, + 272.0, + 4896.0, + ], + "b": [1.0, -1.0, -2.0, -3.0, 24.0, 0, 5.0, 15.0, -15.0], + }, + indexes=["x", "y"], + same_sampling_as=evset, + ) + + result = evset.cumprod() + assertOperatorResult(self, result, expected) + + +if __name__ == "__main__": + absltest.main() diff --git a/temporian/core/operators/window/test/test_moving_product.py b/temporian/core/operators/window/test/test_moving_product.py new file mode 100644 index 000000000..a9f6b8def --- /dev/null +++ b/temporian/core/operators/window/test/test_moving_product.py @@ -0,0 +1,231 @@ +# Copyright 2021 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from math import nan + +import numpy as np +from absl.testing import absltest +from absl.testing.parameterized import TestCase, parameters + +from temporian.implementation.numpy.data.io import event_set +from temporian.test.utils import f32, f64, assertOperatorResult + + +class MovingProductTest(TestCase): + def test_without_sampling(self): + timestamps = f64([1, 2, 3, 5, 6]) + evset = event_set( + timestamps=timestamps, features={"a": f32([2, nan, 3, 0, 5])} + ) + + expected = event_set( + timestamps=timestamps, + features={"a": f32([2.0, 2.0, 3.0, 0.0, 0.0])}, + same_sampling_as=evset, + ) + + result = evset.moving_product(window_length=2.0) + assertOperatorResult(self, result, expected) + + def test_with_zeros_and_nans(self): + timestamps = f64([1, 2, 3, 4]) + evset = event_set( + timestamps=timestamps, features={"a": f32([2.0, 0.0, nan, 3.0])} + ) + + expected = event_set( + timestamps=timestamps, + features={"a": f32([2.0, 0.0, 0.0, 3.0])}, + same_sampling_as=evset, + ) + + result = evset.moving_product(window_length=2.0) + assertOperatorResult(self, result, expected) + + def test_empty_event_set(self): + timestamps = f64([]) + evset = event_set(timestamps=timestamps, features={"a": f32([])}) + + expected = event_set( + timestamps=timestamps, + features={"a": f32([])}, + same_sampling_as=evset, + ) + + result = evset.moving_product(window_length=2.0) + assertOperatorResult(self, result, expected) + + def test_without_sampling_many_features(self): + timestamps = [1, 2, 3, 5, 20] + evset = event_set( + timestamps=timestamps, + features={ + "a": [10.0, 11.0, 12.0, 13.0, 14.0], + "b": [20.0, 21.0, 22.0, 23.0, 24.0], + }, + ) + + expected = event_set( + timestamps=timestamps, + features={ + "a": [10.0, 110.0, 132.0, 13.0, 14.0], + "b": [20.0, 420.0, 462.0, 23.0, 24.0], + }, + same_sampling_as=evset, + ) + + result = evset.moving_product(window_length=2.0) + assertOperatorResult(self, result, expected) + + def test_without_sampling_with_index(self): + timestamps = [1, 2, 3, 1.1, 2.1, 3.1, 1.2, 2.2, 3.2] + evset = event_set( + timestamps=timestamps, + features={ + "x": ["X1", "X1", "X1", "X2", "X2", "X2", "X2", "X2", "X2"], + "y": ["Y1", "Y1", "Y1", "Y1", "Y1", "Y1", "Y2", "Y2", "Y2"], + "a": [10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0], + }, + indexes=["x", "y"], + ) + + expected = event_set( + timestamps=timestamps, + features={ + "x": ["X1", "X1", "X1", "X2", "X2", "X2", "X2", "X2", "X2"], + "y": ["Y1", "Y1", "Y1", "Y1", "Y1", "Y1", "Y2", "Y2", "Y2"], + "a": [ + 10.0, + 110.0, + 1320.0, + 13.0, + 182.0, + 2730.0, + 16.0, + 272.0, + 4896.0, + ], + }, + indexes=["x", "y"], + same_sampling_as=evset, + ) + + result = evset.moving_product(window_length=5.0) + assertOperatorResult(self, result, expected) + + @parameters( + { # normal + "timestamps": f64([1, 2, 3, 5, 6]), + "feature": [10.0, 11.0, 12.0, 13.0, 14.0], + "window_length": 3.1, + "sampling_timestamps": [-1.0, 1.0, 1.1, 3.0, 3.5, 6.0, 10.0], + "output_feature": [nan, 10.0, 10.0, 1320.0, 1320.0, 2184.0, nan], + }, + { # w nan + "timestamps": f64([1, 2, 3, 5, 6]), + "feature": [nan, 11.0, nan, 13.0, 14.0], + "window_length": 1.1, + "sampling_timestamps": [1, 2, 2.5, 3, 3.5, 4, 5, 6], + "output_feature": [nan, 11.0, 11.0, 11.0, nan, nan, 13.0, 182.0], + }, + ) + def test_with_sampling( + self, + timestamps, + feature, + window_length, + sampling_timestamps, + output_feature, + ): + evset = event_set( + timestamps=timestamps, + features={"a": feature}, + ) + sampling = event_set(timestamps=sampling_timestamps) + + expected = event_set( + timestamps=sampling_timestamps, + features={"a": output_feature}, + same_sampling_as=sampling, + ) + + result = evset.moving_product( + window_length=window_length, sampling=sampling + ) + assertOperatorResult(self, result, expected) + + def test_with_variable_winlen_same_sampling(self): + timestamps = f64([0, 1, 2, 3, 5, 20]) + evset = event_set( + timestamps=timestamps, + features={"a": f32([nan, 10, 11, 12, 13, 14])}, + ) + + window = event_set( + timestamps=timestamps, + features={"a": f64([1, 1, 1.5, 0.5, 3.5, 20])}, + same_sampling_as=evset, + ) + + expected = event_set( + timestamps=timestamps, + features={"a": f32([nan, 10, 110, 12, nan, nan])}, + same_sampling_as=evset, + ) + + result = evset.moving_product(window_length=window) + assertOperatorResult(self, result, expected) + + def test_with_variable_winlen_diff_sampling(self): + window_timestamps = f64([-1, 1, 4, 19, 20, 20]) + window_length = f64([10, 0.5, 2.5, 19, 16, np.inf]) + + evset = event_set( + timestamps=f64([0, 1, 2, 3, 5, 20]), + features={"a": f32([nan, 10, 11, 12, 13, 14])}, + ) + + window = event_set( + timestamps=window_timestamps, + features={"a": window_length}, + ) + + expected = event_set( + timestamps=window_timestamps, + features={"a": f32([nan, 10.0, 132.0, nan, 182.0, nan])}, + same_sampling_as=window, + ) + + result = evset.moving_product(window_length=window) + assertOperatorResult(self, result, expected) + + def test_error_input_int(self): + evset = event_set([1, 2], {"f": [1, 2]}) + with self.assertRaisesRegex( + ValueError, + "moving_product requires the input EventSet to contain", + ): + _ = evset.moving_product(1) + + def test_error_input_bytes(self): + evset = event_set([1, 2], {"f": ["A", "B"]}) + with self.assertRaisesRegex( + ValueError, + "moving_product requires the input EventSet to contain", + ): + _ = evset.moving_product(1) + + +if __name__ == "__main__": + absltest.main() diff --git a/temporian/core/test/registered_operators_test.py b/temporian/core/test/registered_operators_test.py index fb562d179..e023817a0 100644 --- a/temporian/core/test/registered_operators_test.py +++ b/temporian/core/test/registered_operators_test.py @@ -79,6 +79,7 @@ def test_base(self): "MOVING_COUNT", "MOVING_MAX", "MOVING_MIN", + "MOVING_PRODUCT", "MOVING_STANDARD_DEVIATION", "MOVING_SUM", "MULTIPLICATION", diff --git a/temporian/implementation/numpy/operators/BUILD b/temporian/implementation/numpy/operators/BUILD index 6cf8206e1..7a44f587f 100644 --- a/temporian/implementation/numpy/operators/BUILD +++ b/temporian/implementation/numpy/operators/BUILD @@ -57,6 +57,7 @@ py_library( "//temporian/implementation/numpy/operators/window:moving_count", "//temporian/implementation/numpy/operators/window:moving_max", "//temporian/implementation/numpy/operators/window:moving_min", + "//temporian/implementation/numpy/operators/window:moving_product", "//temporian/implementation/numpy/operators/window:moving_standard_deviation", "//temporian/implementation/numpy/operators/window:moving_sum", "//temporian/implementation/numpy/operators/window:simple_moving_average", diff --git a/temporian/implementation/numpy/operators/__init__.py b/temporian/implementation/numpy/operators/__init__.py index 2aba1db13..f7ccf980a 100644 --- a/temporian/implementation/numpy/operators/__init__.py +++ b/temporian/implementation/numpy/operators/__init__.py @@ -40,6 +40,7 @@ from temporian.implementation.numpy.operators.scalar import relational_scalar from temporian.implementation.numpy.operators.window import simple_moving_average +from temporian.implementation.numpy.operators.window import moving_product from temporian.implementation.numpy.operators.window import moving_standard_deviation from temporian.implementation.numpy.operators.window import moving_sum from temporian.implementation.numpy.operators.window import moving_count diff --git a/temporian/implementation/numpy/operators/window/BUILD b/temporian/implementation/numpy/operators/window/BUILD index 6f95e15f4..15e956b4b 100644 --- a/temporian/implementation/numpy/operators/window/BUILD +++ b/temporian/implementation/numpy/operators/window/BUILD @@ -50,6 +50,18 @@ py_library( ], ) +py_library( + name = "moving_product", + srcs = ["moving_product.py"], + srcs_version = "PY3", + deps = [ + ":base", + "//temporian/core/operators/window:moving_product", + "//temporian/implementation/numpy:implementation_lib", + "//temporian/implementation/numpy_cc/operators:operators_cc", + ], +) + py_library( name = "moving_sum", srcs = ["moving_sum.py"], diff --git a/temporian/implementation/numpy/operators/window/moving_product.py b/temporian/implementation/numpy/operators/window/moving_product.py new file mode 100644 index 000000000..ae0ca7831 --- /dev/null +++ b/temporian/implementation/numpy/operators/window/moving_product.py @@ -0,0 +1,35 @@ +# Copyright 2021 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from temporian.core.operators.window.moving_product import ( + MovingProductOperator, +) +from temporian.implementation.numpy import implementation_lib +from temporian.implementation.numpy.operators.window.base import ( + BaseWindowNumpyImplementation, +) +from temporian.implementation.numpy_cc.operators import operators_cc + + +class MovingProductNumpyImplementation(BaseWindowNumpyImplementation): + """Numpy implementation of the moving product operator.""" + + def _implementation(self): + return operators_cc.moving_product + + +implementation_lib.register_operator_implementation( + MovingProductOperator, MovingProductNumpyImplementation +) diff --git a/temporian/implementation/numpy/test/registered_operators_test.py b/temporian/implementation/numpy/test/registered_operators_test.py index 6a026d22b..c0bbf809d 100644 --- a/temporian/implementation/numpy/test/registered_operators_test.py +++ b/temporian/implementation/numpy/test/registered_operators_test.py @@ -77,6 +77,7 @@ def test_base(self): "MOVING_COUNT", "MOVING_MAX", "MOVING_MIN", + "MOVING_PRODUCT", "MOVING_STANDARD_DEVIATION", "MOVING_SUM", "MULTIPLICATION", diff --git a/temporian/implementation/numpy_cc/operators/window.cc b/temporian/implementation/numpy_cc/operators/window.cc index 71cfeeefe..99043595f 100644 --- a/temporian/implementation/numpy_cc/operators/window.cc +++ b/temporian/implementation/numpy_cc/operators/window.cc @@ -516,6 +516,59 @@ struct MovingMaxAccumulator : MovingExtremumAccumulator { bool Compare(INPUT a, INPUT b) { return a > b; } }; +// TODO: Revisit the MovingProductAccumulator for potential optimization to +// improve calculation efficiency while maintaining accuracy. +template +struct MovingProductAccumulator : public Accumulator { + int start_idx = 0; + int end_idx = -1; // Initialize to -1 to indicate an empty window initially + + MovingProductAccumulator(const ArrayRef& values) + : Accumulator(values) {} + void Add(Idx idx) override { + // Simply move the end to the given index + end_idx = idx; + } + + void Remove(Idx idx) override { + // Adjust the start index to exclude the removed value, signaling a window shift. + start_idx = idx + 1; + } + + OUTPUT Result() override { + if (start_idx > end_idx) { + // No valid indices to process, indicating an empty window or EventSet + return std::numeric_limits::quiet_NaN(); + } + + double product = 1.0; + bool hasEncounteredValidValue = false; // This will be true if any non-NaN and non-zero value is encountered + + for (int idx = start_idx; idx <= end_idx; ++idx) { + const INPUT value = Accumulator::values[idx]; + if (value == 0) { + return 0; // If a zero is found, return 0 immediately. + } else if (!std::isnan(value)) { + product *= value; + hasEncounteredValidValue = true; + } + } + + if (!hasEncounteredValidValue) { + return std::numeric_limits::quiet_NaN(); + } + + return product; + } + + void Reset() { + start_idx = 0; + end_idx = -1; + } +}; + + + // Instantiate the "accumulate" function with and without sampling, // and with and without variable window length. // @@ -614,6 +667,9 @@ REGISTER_CC_FUNC(moving_max, int32_t, int32_t, MovingMaxAccumulator); REGISTER_CC_FUNC(moving_max, int64_t, int64_t, MovingMaxAccumulator); REGISTER_CC_FUNC_NO_INPUT(moving_count, int32_t, MovingCountAccumulator); + +REGISTER_CC_FUNC(moving_product, float, float, MovingProductAccumulator); +REGISTER_CC_FUNC(moving_product, double, double, MovingProductAccumulator); } // namespace // Register c++ functions to pybind with and without sampling, @@ -693,4 +749,8 @@ void init_window(py::module &m) { ADD_PY_DEF(moving_max, int64_t, int64_t) ADD_PY_DEF_NO_INPUT(moving_count, int32_t) + + ADD_PY_DEF(moving_product, float, float) + ADD_PY_DEF(moving_product, double, double) + }