Skip to content

Commit

Permalink
380: Initial implementation of moving_product operation (google#383)
Browse files Browse the repository at this point in the history
* WIP: Initial implementation of moving_product operation

* Remove the wrong use of override keyword at Reset Method in the MovingProductAccumulator class

* Implement On-Demand Product Calculation in MovingProductAccumulator

Key Changes:
- Start and end indices (`start_idx` and `end_idx`) introduced to track the window.
- `Add` and `Remove` methods updated to adjust the window indices without affecting the product.
- `Result` method now calculates the product on-demand, considering zeros and ignoring NaN values for accuracy.
- Added a TODO comment to explore future optimizations for the Result method to enhance calculation efficiency.

* add test and doc for moving_product

* Optimize moving_product logic to directly return zero for windows with zero values

* fix(docs): Update docs and adjust example values for clarity in event_set_ops.py
fix(moving_product): Update the moving_product logic to return NaN when the input only contains NaN.

* feat(window_op): Add cumprod operator

* style: format cumprod related files with Black

* docs: remove un-wanted function description from moving_product at event_set_ops.py

* fix(test): Pass the correct dtype at docstring example for moving_product in event_set_ops.py

* fix(test_moving_product): Correct calculation errors in test_with_sampling and  test_with_variable_winlen_same_sampling.
  • Loading branch information
akshatvishu committed Mar 7, 2024
1 parent 6e8cd65 commit 9ccd08f
Show file tree
Hide file tree
Showing 17 changed files with 682 additions and 13 deletions.
26 changes: 13 additions & 13 deletions docs/src/reference/index.md
Expand Up @@ -45,13 +45,13 @@ Check the index on the left for a more detailed description of any symbol.
| [`tp.combine()`][temporian.combine] | Combines events from [`EventSets`][temporian.EventSet] with different samplings. |
| [`tp.glue()`][temporian.glue] | Concatenates features from [`EventSets`][temporian.EventSet] with the same sampling. |
| [`EventSet.abs()`][temporian.EventSet.abs] | Computes the absolute value of the features. |
| [`EventSet.add_index()`][temporian.EventSet.add_index] | Adds indexes to an [`EventSet`][temporian.EventSet].
| [`EventSet.arccos()`][temporian.EventSet.arccos] | Computes the inverse cosine of the features.
| [`EventSet.arcsin()`][temporian.EventSet.arcsin] | Computes the inverse sine of the features.
| [`EventSet.arctan()`][temporian.EventSet.arctan] | Computes the inverse tangent of the features. |
| [`EventSet.add_index()`][temporian.EventSet.add_index] | Adds indexes to an [`EventSet`][temporian.EventSet]. |
| [`EventSet.arccos()`][temporian.EventSet.arccos] | Computes the inverse cosine of the features. |
| [`EventSet.arcsin()`][temporian.EventSet.arcsin] | Computes the inverse sine of the features. |
| [`EventSet.arctan()`][temporian.EventSet.arctan] | Computes the inverse tangent of the features. |
| [`EventSet.begin()`][temporian.EventSet.begin] | Generates a single timestamp at the beginning of the input. |
| [`EventSet.cast()`][temporian.EventSet.cast] | Casts the dtype of features.
| [`EventSet.cos()`][temporian.EventSet.cos] | Computes the cosine of the features. |
| [`EventSet.cast()`][temporian.EventSet.cast] | Casts the dtype of features. |
| [`EventSet.cos()`][temporian.EventSet.cos] | Computes the cosine of the features. |
| [`EventSet.drop_index()`][temporian.EventSet.drop_index] | Removes indexes from an [`EventSet`][temporian.EventSet]. |
| [`EventSet.end()`][temporian.EventSet.end] | Generates a single timestamp at the end of the input. |
| [`EventSet.enumerate()`][temporian.EventSet.enumerate] | Creates an ordinal feature enumerating the events according to their timestamp. |
Expand All @@ -72,10 +72,10 @@ Check the index on the left for a more detailed description of any symbol.
| [`EventSet.resample()`][temporian.EventSet.resample] | Resamples an [`EventSet`][temporian.EventSet] at each timestamp of another [`EventSet`][temporian.EventSet]. |
| [`EventSet.select()`][temporian.EventSet.select] | Selects a subset of features from an [`EventSet`][temporian.EventSet]. |
| [`EventSet.select_index_values()`][temporian.EventSet.select_index_values] | Selects a subset of index values from an [`EventSet`][temporian.EventSet]. |
| [`EventSet.set_index()`][temporian.EventSet.set_index] | Replaces the indexes in an [`EventSet`][temporian.EventSet].
| [`EventSet.sin()`][temporian.EventSet.sin] | Computes the sine of the features. |
| [`EventSet.since_last()`][temporian.EventSet.since_last] | Computes the amount of time since the last distinct timestamp.
| [`EventSet.tan()`][temporian.EventSet.tan] | Computes the tangent of the features. |
| [`EventSet.set_index()`][temporian.EventSet.set_index] | Replaces the indexes in an [`EventSet`][temporian.EventSet]. |
| [`EventSet.sin()`][temporian.EventSet.sin] | Computes the sine of the features. |
| [`EventSet.since_last()`][temporian.EventSet.since_last] | Computes the amount of time since the last distinct timestamp. |
| [`EventSet.tan()`][temporian.EventSet.tan] | Computes the tangent of the features. |
| [`EventSet.tick()`][temporian.EventSet.tick] | Generates timestamps at regular intervals in the range of a guide. |
| [`EventSet.tick_calendar()`][temporian.EventSet.tick] | Generates timestamps at the specified calendar date-time events. |
| [`EventSet.timestamps()`][temporian.EventSet.timestamps] | Creates a feature from the events timestamps (`float64`). |
Expand All @@ -91,9 +91,9 @@ Check the index on the left for a more detailed description of any symbol.

### Window operators

| Symbols | Description |
| -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------- |
| [`EventSet.simple_moving_average()`][temporian.EventSet.simple_moving_average] [`EventSet.moving_standard_deviation()`][temporian.EventSet.moving_standard_deviation] [`EventSet.cumsum()`][temporian.EventSet.cumsum] [`EventSet.moving_sum()`][temporian.EventSet.moving_sum] [`EventSet.moving_count()`][temporian.EventSet.moving_count] [`EventSet.moving_min()`][temporian.EventSet.moving_min] [`EventSet.moving_max()`][temporian.EventSet.moving_max] | Compute an operation on the values in a sliding window over an EventSet's timestamps. |
| Symbols | Description |
| ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------- |
| [`EventSet.simple_moving_average()`][temporian.EventSet.simple_moving_average] [`EventSet.moving_standard_deviation()`][temporian.EventSet.moving_standard_deviation] [`EventSet.cumsum()`][temporian.EventSet.cumsum] [`EventSet.moving_sum()`][temporian.EventSet.moving_sum] [`EventSet.moving_count()`][temporian.EventSet.moving_count] [`EventSet.moving_min()`][temporian.EventSet.moving_min] [`EventSet.moving_max()`][temporian.EventSet.moving_max] [`EventSet.cumprod()`][temporian.EventSet.cumprod] [`EventSet.moving_product()`][temporian.EventSet.moving_product] | Compute an operation on the values in a sliding window over an EventSet's timestamps. |

### Python operators

Expand Down
1 change: 1 addition & 0 deletions docs/src/reference/temporian/operators/window/cumprod.md
@@ -0,0 +1 @@
::: temporian.EventSet.cumprod
@@ -0,0 +1 @@
::: temporian.EventSet.moving_product
134 changes: 134 additions & 0 deletions temporian/core/event_set_ops.py
Expand Up @@ -2133,6 +2133,78 @@ def cast(

return cast(self, target=target, check_overflow=check_overflow)

def cumprod(
self: EventSetOrNode,
sampling: Optional[EventSetOrNode] = None,
) -> EventSetOrNode:
"""Computes the cumulative product of values over each feature in an
[`EventSet`][temporian.EventSet].
This operation only supports floating-point features.
Missing (NaN) values are not accounted for. The output will be NaN until
the input contains at least one numeric value.
Warning: The `cumprod` function leverages an infinite window length for
its calculations, which may lead to considerable computational overhead
with increasing dataset sizes.
Example:
```python
>>> a = tp.event_set(
... timestamps=[0, 1, 2, 3],
... features={"value": [1.0, 2.0, 10.0, 12.0]},
... )
>>> b = a.cumprod()
>>> b
indexes: ...
(4 events):
timestamps: [0. 1. 2. 3.]
'value': [ 1. 2. 20. 240.]
...
```
Examples with sampling:
```python
>>> a = tp.event_set(
... timestamps=[0, 1, 2, 5, 6, 7],
... features={"value": [1, 2, 10, 12, np.nan, 2]},
... )
>>> # Cumulative product at 5 and 10
>>> b = tp.event_set(timestamps=[5, 10])
>>> c = a.cumprod(sampling=b)
>>> c
indexes: ...
(2 events):
timestamps: [ 5. 10.]
'value': [240. 480.]
...
>>> # Product all values in the EventSet
>>> c = a.cumprod(sampling=a.end())
>>> c
indexes: ...
(1 events):
timestamps: [7.]
'value': [480.]
...
```
Args:
sampling: Timestamps to sample the sliding window's value at. If not
provided, timestamps in the input are used.
Returns:
Cumulative product of each feature.
"""
from temporian.core.operators.window.moving_product import cumprod

return cumprod(self, sampling=sampling)

def cumsum(
self: EventSetOrNode,
sampling: Optional[EventSetOrNode] = None,
Expand Down Expand Up @@ -3171,6 +3243,68 @@ def moving_standard_deviation(
self, window_length=window_length, sampling=sampling
)

def moving_product(
self: EventSetOrNode,
window_length: WindowLength,
sampling: Optional[EventSetOrNode] = None,
) -> EventSetOrNode:
"""Computes the product of values in a sliding window over an
[`EventSet`][temporian.EventSet].
This operation only supports floating-point features.
For each t in sampling, and for each feature independently, returns at
time t the product of non-zero and non-NaN values for the feature in the window
(t - window_length, t].
`sampling` can't be specified if a variable `window_length` is
specified (i.e., if `window_length` is an EventSet).
If `sampling` is specified or `window_length` is an EventSet, the moving
window is sampled at each timestamp in them, else it is sampled on the
input's.
Zeros result in the accumulator's result being 0 for the window. NaN values are ignored in the
calculation of the product. If the window does not contain any NaN, zero or any non-zero values (e.g.,
all values are missing), the output for that window is an empty array.
Example:
```python
>>> a = tp.event_set(
... timestamps=[0, 1, 2],
... features={"value": [np.nan, 1, 5]},
... )
>>> b = a.moving_product(tp.duration.seconds(1))
>>> b
indexes: ...
(3 events):
timestamps: [0. 1. 2.]
'value': [nan 1. 5.]
...
```
See [`EventSet.moving_count()`][temporian.EventSet.moving_count] for
examples of moving window operations with external sampling and indices.
Args:
window_length: Sliding window's length.
sampling: Timestamps to sample the sliding window's value at. If not
provided, timestamps in the input are used.
Returns:
EventSet containing the moving product of each feature in the input,
considering non-zero and non-NaN values only.
"""
from temporian.core.operators.window.moving_product import (
moving_product,
)

return moving_product(
self, window_length=window_length, sampling=sampling
)

def moving_sum(
self: EventSetOrNode,
window_length: WindowLength,
Expand Down
16 changes: 16 additions & 0 deletions temporian/core/operators/window/BUILD
Expand Up @@ -17,6 +17,7 @@ py_library(
":moving_standard_deviation",
":moving_sum",
":simple_moving_average",
":moving_product",
],
)

Expand Down Expand Up @@ -126,3 +127,18 @@ py_library(
"//temporian/core/data:schema",
],
)

py_library(
name = "moving_product",
srcs = ["moving_product.py"],
srcs_version = "PY3",
deps = [
":base",
"//temporian/core:compilation",
"//temporian/core:operator_lib",
"//temporian/core:typing",
"//temporian/core/data:dtype",
"//temporian/core/data:node",
"//temporian/core/data:schema",
],
)
2 changes: 2 additions & 0 deletions temporian/core/operators/window/__init__.py
Expand Up @@ -25,3 +25,5 @@
from temporian.core.operators.window.moving_count import moving_count
from temporian.core.operators.window.moving_min import moving_min
from temporian.core.operators.window.moving_max import moving_max
from temporian.core.operators.window.moving_product import cumprod
from temporian.core.operators.window.moving_product import moving_product
85 changes: 85 additions & 0 deletions temporian/core/operators/window/moving_product.py
@@ -0,0 +1,85 @@
# Copyright 2021 Google LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Moving Product operator class and public API function definition.."""

from typing import Optional

import numpy as np

from temporian.core import operator_lib
from temporian.core.compilation import compile
from temporian.core.data.dtype import DType
from temporian.core.data.node import EventSetNode
from temporian.core.data.schema import FeatureSchema
from temporian.core.operators.window.base import BaseWindowOperator
from temporian.utils.typecheck import typecheck
from temporian.core.typing import EventSetOrNode, WindowLength


class MovingProductOperator(BaseWindowOperator):
"""
Window operator to compute the moving product.
"""

@classmethod
def operator_def_key(cls) -> str:
return "MOVING_PRODUCT"

def get_feature_dtype(self, feature: FeatureSchema) -> DType:
if not feature.dtype.is_float:
raise ValueError(
"moving_product requires the input EventSet to contain"
" floating point features only, but received feature"
f" {feature.name!r} with type {feature.dtype}. Note: You can"
" cast features e.g. `.cast(tp.float32)`"
)
return (
DType.FLOAT32 if feature.dtype == DType.FLOAT32 else DType.FLOAT64
)


operator_lib.register_operator(MovingProductOperator)


@typecheck
@compile
def moving_product(
input: EventSetOrNode,
window_length: WindowLength,
sampling: Optional[EventSetOrNode] = None,
) -> EventSetOrNode:
assert isinstance(input, EventSetNode)
if sampling is not None:
assert isinstance(sampling, EventSetNode)

return MovingProductOperator(
input=input,
window_length=window_length,
sampling=sampling,
).outputs["output"]


@compile
def cumprod(
input: EventSetOrNode,
sampling: Optional[EventSetOrNode] = None,
) -> EventSetOrNode:
assert isinstance(input, EventSetNode)
if sampling is not None:
assert isinstance(sampling, EventSetNode)

return MovingProductOperator(
input=input, window_length=np.inf, sampling=sampling
).outputs["output"]

0 comments on commit 9ccd08f

Please sign in to comment.