/
resample.py
112 lines (96 loc) Β· 4.06 KB
/
resample.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# Copyright 2021 Google LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict
import numpy as np
from temporian.core.operators.resample import Resample
from temporian.implementation.numpy import implementation_lib
from temporian.implementation.numpy.data.dtype_normalization import (
tp_dtype_to_np_dtype,
)
from temporian.implementation.numpy.data.event_set import IndexData, EventSet
from temporian.implementation.numpy_cc.operators import operators_cc
from temporian.implementation.numpy.operators.base import OperatorImplementation
class ResampleNumpyImplementation(OperatorImplementation):
"""Numpy implementation of the sample operator."""
def __init__(self, operator: Resample) -> None:
super().__init__(operator)
assert isinstance(operator, Resample)
def __call__(
self, input: EventSet, sampling: EventSet
) -> Dict[str, EventSet]:
assert isinstance(self.operator, Resample)
output_schema = self.output_schema("output")
# Type and replacement values
output_missing_and_np_dtypes = [
(
f.dtype.missing_value(),
tp_dtype_to_np_dtype(f.dtype),
)
for f in output_schema.features
]
# create output EventSet
dst_evset = EventSet(data={}, schema=output_schema)
# iterate over destination sampling
for index_key, index_data in sampling.data.items():
# initialize destination index data
dst_mts = []
index_data = IndexData(dst_mts, index_data.timestamps, schema=None)
dst_evset.set_index_value(
index_key,
index_data,
normalize=False,
)
if index_key not in input.data:
# No matching events to sample from
for (
output_missing_value,
output_np_dtype,
) in output_missing_and_np_dtypes:
dst_mts.append(
np.full(
shape=len(index_data),
fill_value=output_missing_value,
dtype=output_np_dtype,
)
)
index_data.check_schema(output_schema)
continue
src_mts = input.data[index_key].features
src_timestamps = input.data[index_key].timestamps
(
sampling_idxs,
first_valid_idx,
) = operators_cc.build_sampling_idxs(
src_timestamps, index_data.timestamps
)
# For each feature
for src_ts, (output_missing_value, output_np_dtype) in zip(
src_mts, output_missing_and_np_dtypes
):
# TODO: Check if running the following block in c++ is faster.
assert src_ts.dtype.type == output_np_dtype
dst_ts_data = np.full(
shape=len(index_data),
fill_value=output_missing_value,
dtype=src_ts.dtype,
)
dst_ts_data[first_valid_idx:] = src_ts[
sampling_idxs[first_valid_idx:]
]
dst_mts.append(dst_ts_data)
index_data.check_schema(output_schema)
return {"output": dst_evset}
implementation_lib.register_operator_implementation(
Resample, ResampleNumpyImplementation
)