Skip to content

Commit

Permalink
Merge pull request #5443 from nabenabe0928/code-fix/introduce-future-…
Browse files Browse the repository at this point in the history
…annotations-to-tpe-related-modules

Introduce `__future__.annotations` to TPE-related modules
  • Loading branch information
y0z committed May 15, 2024
2 parents 2b6a36f + 63f8e66 commit f954e35
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 61 deletions.
22 changes: 11 additions & 11 deletions optuna/samplers/_tpe/_truncnorm.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from __future__ import annotations

from collections.abc import Callable
import functools
import math
import sys
from typing import Callable
from typing import Optional
from typing import Union

import numpy as np

Expand Down Expand Up @@ -170,7 +170,7 @@ def _ndtri_exp(y: np.ndarray) -> np.ndarray:
return np.frompyfunc(_ndtri_exp_single, 1, 1)(y).astype(float)


def ppf(q: np.ndarray, a: Union[np.ndarray, float], b: Union[np.ndarray, float]) -> np.ndarray:
def ppf(q: np.ndarray, a: np.ndarray | float, b: np.ndarray | float) -> np.ndarray:
q, a, b = np.atleast_1d(q, a, b)
q, a, b = np.broadcast_arrays(q, a, b)

Expand Down Expand Up @@ -205,9 +205,9 @@ def ppf_right(q: np.ndarray, a: np.ndarray, b: np.ndarray) -> np.ndarray:
def rvs(
a: np.ndarray,
b: np.ndarray,
loc: Union[np.ndarray, float] = 0,
scale: Union[np.ndarray, float] = 1,
random_state: Optional[np.random.RandomState] = None,
loc: np.ndarray | float = 0,
scale: np.ndarray | float = 1,
random_state: np.random.RandomState | None = None,
) -> np.ndarray:
random_state = random_state or np.random.RandomState()
size = np.broadcast(a, b, loc, scale).shape
Expand All @@ -217,10 +217,10 @@ def rvs(

def logpdf(
x: np.ndarray,
a: Union[np.ndarray, float],
b: Union[np.ndarray, float],
loc: Union[np.ndarray, float] = 0,
scale: Union[np.ndarray, float] = 1,
a: np.ndarray | float,
b: np.ndarray | float,
loc: np.ndarray | float = 0,
scale: np.ndarray | float = 1,
) -> np.ndarray:
x = (x - loc) / scale

Expand Down
20 changes: 9 additions & 11 deletions optuna/samplers/_tpe/parzen_estimator.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from __future__ import annotations

from typing import Callable
from typing import Dict
from collections.abc import Callable
from typing import NamedTuple
from typing import Optional

import numpy as np

Expand Down Expand Up @@ -37,10 +35,10 @@ class _ParzenEstimatorParameters(NamedTuple):
class _ParzenEstimator:
def __init__(
self,
observations: Dict[str, np.ndarray],
search_space: Dict[str, BaseDistribution],
observations: dict[str, np.ndarray],
search_space: dict[str, BaseDistribution],
parameters: _ParzenEstimatorParameters,
predetermined_weights: Optional[np.ndarray] = None,
predetermined_weights: np.ndarray | None = None,
) -> None:
if parameters.consider_prior:
if parameters.prior_weight is None:
Expand Down Expand Up @@ -77,11 +75,11 @@ def __init__(
],
)

def sample(self, rng: np.random.RandomState, size: int) -> Dict[str, np.ndarray]:
def sample(self, rng: np.random.RandomState, size: int) -> dict[str, np.ndarray]:
sampled = self._mixture_distribution.sample(rng, size)
return self._untransform(sampled)

def log_pdf(self, samples_dict: Dict[str, np.ndarray]) -> np.ndarray:
def log_pdf(self, samples_dict: dict[str, np.ndarray]) -> np.ndarray:
transformed_samples = self._transform(samples_dict)
return self._mixture_distribution.log_pdf(transformed_samples)

Expand Down Expand Up @@ -112,7 +110,7 @@ def _call_weights_func(weights_func: Callable[[int], np.ndarray], n: int) -> np.
def _is_log(dist: BaseDistribution) -> bool:
return isinstance(dist, (FloatDistribution, IntDistribution)) and dist.log

def _transform(self, samples_dict: Dict[str, np.ndarray]) -> np.ndarray:
def _transform(self, samples_dict: dict[str, np.ndarray]) -> np.ndarray:
return np.array(
[
(
Expand All @@ -124,7 +122,7 @@ def _transform(self, samples_dict: Dict[str, np.ndarray]) -> np.ndarray:
]
).T

def _untransform(self, samples_array: np.ndarray) -> Dict[str, np.ndarray]:
def _untransform(self, samples_array: np.ndarray) -> dict[str, np.ndarray]:
res = {
param: (
np.exp(samples_array[:, i])
Expand Down Expand Up @@ -219,7 +217,7 @@ def _calculate_numerical_distributions(
observations: np.ndarray,
low: float,
high: float,
step: Optional[float],
step: float | None,
parameters: _ParzenEstimatorParameters,
) -> _BatchedDistributions:
step_or_0 = step or 0
Expand Down
5 changes: 3 additions & 2 deletions optuna/samplers/_tpe/probability_distributions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import List
from __future__ import annotations

from typing import NamedTuple
from typing import Union

Expand Down Expand Up @@ -35,7 +36,7 @@ class _BatchedDiscreteTruncNormDistributions(NamedTuple):

class _MixtureOfProductDistribution(NamedTuple):
weights: np.ndarray
distributions: List[_BatchedDistributions]
distributions: list[_BatchedDistributions]

def sample(self, rng: np.random.RandomState, batch_size: int) -> np.ndarray:
active_indices = rng.choice(len(self.weights), p=self.weights, size=batch_size)
Expand Down
61 changes: 24 additions & 37 deletions optuna/samplers/_tpe/sampler.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
from __future__ import annotations

from collections.abc import Callable
from collections.abc import Sequence
import math
from typing import Any
from typing import Callable
from typing import cast
from typing import Dict
from typing import Optional
from typing import Sequence
from typing import TYPE_CHECKING
import warnings

Expand Down Expand Up @@ -283,16 +281,16 @@ def __init__(
n_ei_candidates: int = 24,
gamma: Callable[[int], int] = default_gamma,
weights: Callable[[int], np.ndarray] = default_weights,
seed: Optional[int] = None,
seed: int | None = None,
*,
multivariate: bool = False,
group: bool = False,
warn_independent_sampling: bool = True,
constant_liar: bool = False,
constraints_func: Optional[Callable[[FrozenTrial], Sequence[float]]] = None,
categorical_distance_func: Optional[
dict[str, Callable[[CategoricalChoiceType, CategoricalChoiceType], float]]
] = None,
constraints_func: Callable[[FrozenTrial], Sequence[float]] | None = None,
categorical_distance_func: (
dict[str, Callable[[CategoricalChoiceType, CategoricalChoiceType], float]] | None
) = None,
) -> None:
self._parzen_estimator_parameters = _ParzenEstimatorParameters(
consider_prior,
Expand All @@ -313,8 +311,8 @@ def __init__(

self._multivariate = multivariate
self._group = group
self._group_decomposed_search_space: Optional[_GroupDecomposedSearchSpace] = None
self._search_space_group: Optional[_SearchSpaceGroup] = None
self._group_decomposed_search_space: _GroupDecomposedSearchSpace | None = None
self._search_space_group: _SearchSpaceGroup | None = None
self._search_space = IntersectionSearchSpace(include_pruned=True)
self._constant_liar = constant_liar
self._constraints_func = constraints_func
Expand Down Expand Up @@ -367,11 +365,11 @@ def reseed_rng(self) -> None:

def infer_relative_search_space(
self, study: Study, trial: FrozenTrial
) -> Dict[str, BaseDistribution]:
) -> dict[str, BaseDistribution]:
if not self._multivariate:
return {}

search_space: Dict[str, BaseDistribution] = {}
search_space: dict[str, BaseDistribution] = {}

if self._group:
assert self._group_decomposed_search_space is not None
Expand All @@ -392,8 +390,8 @@ def infer_relative_search_space(
return search_space

def sample_relative(
self, study: Study, trial: FrozenTrial, search_space: Dict[str, BaseDistribution]
) -> Dict[str, Any]:
self, study: Study, trial: FrozenTrial, search_space: dict[str, BaseDistribution]
) -> dict[str, Any]:
if self._group:
assert self._search_space_group is not None
params = {}
Expand All @@ -409,8 +407,8 @@ def sample_relative(
return self._sample_relative(study, trial, search_space)

def _sample_relative(
self, study: Study, trial: FrozenTrial, search_space: Dict[str, BaseDistribution]
) -> Dict[str, Any]:
self, study: Study, trial: FrozenTrial, search_space: dict[str, BaseDistribution]
) -> dict[str, Any]:
if search_space == {}:
return {}

Expand Down Expand Up @@ -465,8 +463,8 @@ def _get_internal_repr(
return {k: np.asarray(v) for k, v in values.items()}

def _sample(
self, study: Study, trial: FrozenTrial, search_space: Dict[str, BaseDistribution]
) -> Dict[str, Any]:
self, study: Study, trial: FrozenTrial, search_space: dict[str, BaseDistribution]
) -> dict[str, Any]:
if self._constant_liar:
states = [TrialState.COMPLETE, TrialState.PRUNED, TrialState.RUNNING]
else:
Expand Down Expand Up @@ -542,9 +540,7 @@ def _compute_acquisition_func(

@classmethod
def _compare(
cls,
samples: Dict[str, np.ndarray],
acquisition_func_vals: np.ndarray,
cls, samples: dict[str, np.ndarray], acquisition_func_vals: np.ndarray
) -> dict[str, int | float]:
sample_size = next(iter(samples.values())).size
if sample_size == 0:
Expand All @@ -561,7 +557,7 @@ def _compare(
return {k: v[best_idx].item() for k, v in samples.items()}

@staticmethod
def hyperopt_parameters() -> Dict[str, Any]:
def hyperopt_parameters() -> dict[str, Any]:
"""Return the the default parameters of hyperopt (v0.1.2).
:class:`~optuna.samplers.TPESampler` can be instantiated with the parameters returned
Expand Down Expand Up @@ -611,7 +607,7 @@ def after_trial(
study: Study,
trial: FrozenTrial,
state: TrialState,
values: Optional[Sequence[float]],
values: Sequence[float] | None,
) -> None:
assert state in [TrialState.COMPLETE, TrialState.FAIL, TrialState.PRUNED]
if self._constraints_func is not None:
Expand All @@ -620,10 +616,7 @@ def after_trial(


def _split_trials(
study: Study,
trials: list[FrozenTrial],
n_below: int,
constraints_enabled: bool,
study: Study, trials: list[FrozenTrial], n_below: int, constraints_enabled: bool
) -> tuple[list[FrozenTrial], list[FrozenTrial]]:
complete_trials = []
pruned_trials = []
Expand Down Expand Up @@ -672,9 +665,7 @@ def _split_complete_trials(


def _split_complete_trials_single_objective(
trials: Sequence[FrozenTrial],
study: Study,
n_below: int,
trials: Sequence[FrozenTrial], study: Study, n_below: int
) -> tuple[list[FrozenTrial], list[FrozenTrial]]:
if study.direction == StudyDirection.MINIMIZE:
sorted_trials = sorted(trials, key=lambda trial: cast(float, trial.value))
Expand All @@ -684,9 +675,7 @@ def _split_complete_trials_single_objective(


def _split_complete_trials_multi_objective(
trials: Sequence[FrozenTrial],
study: Study,
n_below: int,
trials: Sequence[FrozenTrial], study: Study, n_below: int
) -> tuple[list[FrozenTrial], list[FrozenTrial]]:
if n_below == 0:
# The type of trials must be `list`, but not `Sequence`.
Expand Down Expand Up @@ -746,9 +735,7 @@ def _get_pruned_trial_score(trial: FrozenTrial, study: Study) -> tuple[float, fl


def _split_pruned_trials(
trials: Sequence[FrozenTrial],
study: Study,
n_below: int,
trials: Sequence[FrozenTrial], study: Study, n_below: int
) -> tuple[list[FrozenTrial], list[FrozenTrial]]:
n_below = min(n_below, len(trials))
sorted_trials = sorted(trials, key=lambda trial: _get_pruned_trial_score(trial, study))
Expand Down

0 comments on commit f954e35

Please sign in to comment.