diff --git a/tests/integration/test_ask_tell_optimization.py b/tests/integration/test_ask_tell_optimization.py index 8f90314518..d84c5da664 100644 --- a/tests/integration/test_ask_tell_optimization.py +++ b/tests/integration/test_ask_tell_optimization.py @@ -16,6 +16,7 @@ import copy import pickle import tempfile +from dataclasses import replace from typing import Callable, Mapping, Tuple, Union import numpy.testing as npt @@ -34,8 +35,8 @@ TREGOBox, ) from trieste.acquisition.utils import copy_to_local_models -from trieste.ask_tell_optimization import AskTellOptimizer -from trieste.bayesian_optimizer import OptimizationResult, Record +from trieste.ask_tell_optimization import AskTellOptimizer, AskTellOptimizerState +from trieste.bayesian_optimizer import OptimizationResult from trieste.data import Dataset from trieste.logging import set_step_number, tensorboard_writer from trieste.models import TrainableProbabilisticModel @@ -150,8 +151,10 @@ def test_ask_tell_optimizer_finds_minima_of_the_scaled_branin_function( @random_seed +@pytest.mark.parametrize("track_data", [True, False]) @pytest.mark.parametrize(*copy.deepcopy(OPTIMIZER_PARAMS)) def test_ask_tell_optimizer_finds_minima_of_simple_quadratic( + track_data: bool, num_steps: int, reload_state: bool, acquisition_rule_fn: AcquisitionRuleFunction | Tuple[AcquisitionRuleFunction, int], @@ -159,7 +162,11 @@ def test_ask_tell_optimizer_finds_minima_of_simple_quadratic( # for speed reasons we sometimes test with a simple quadratic defined on the same search space # branin; currently assume that every rule should be able to solve this in 5 steps _test_ask_tell_optimization_finds_minima( - False, min(num_steps, 5), reload_state, acquisition_rule_fn + False, + min(num_steps, 5), + reload_state, + acquisition_rule_fn, + track_data=track_data, ) @@ -168,6 +175,7 @@ def _test_ask_tell_optimization_finds_minima( num_steps: int, reload_state: bool, acquisition_rule_fn: AcquisitionRuleFunction | Tuple[AcquisitionRuleFunction, int], + track_data: bool = True, ) -> None: # For the case when optimization state is saved and reload on each iteration # we need to use new acquisition function object to imitate real life usage @@ -195,7 +203,7 @@ def _test_ask_tell_optimization_finds_minima( with tensorboard_writer(summary_writer): set_step_number(0) ask_tell = AskTellOptimizer( - search_space, initial_dataset, models, acquisition_rule_fn() + search_space, initial_dataset, models, acquisition_rule_fn(), track_data=track_data ) for i in range(1, num_steps + 1): @@ -206,10 +214,10 @@ def _test_ask_tell_optimization_finds_minima( new_point = ask_tell.ask() if reload_state: - state: Record[ + state: AskTellOptimizerState[ None | State[TensorType, AsynchronousRuleState | BatchTrustRegionBox.State], GaussianProcessRegression, - ] = ask_tell.to_record() + ] = ask_tell.to_state() written_state = pickle.dumps(state) # If query points are rank 3, then use a batched observer. @@ -222,11 +230,26 @@ def _test_ask_tell_optimization_finds_minima( if reload_state: state = pickle.loads(written_state) + state_record = state.record + if not track_data: + # reload using the up-to-date dataset + state_record = replace(state_record, datasets=initial_dataset) ask_tell = AskTellOptimizer.from_record( - state, search_space, acquisition_rule_fn() + state_record, + search_space, + acquisition_rule_fn(), + track_data=track_data, + local_data_ixs=state.local_data_ixs, ) - ask_tell.tell(new_data_point) + if track_data: + ask_tell.tell(new_data_point) + else: + if isinstance(new_data_point, Dataset): + new_data_point = {OBJECTIVE: new_data_point} + for tag in initial_dataset.keys(): + initial_dataset[tag] += new_data_point[tag] + ask_tell.tell(initial_dataset) result: OptimizationResult[ None | State[TensorType, AsynchronousRuleState | BatchTrustRegionBox.State], diff --git a/tests/unit/acquisition/test_utils.py b/tests/unit/acquisition/test_utils.py index 6992f18e06..a92cb7fc45 100644 --- a/tests/unit/acquisition/test_utils.py +++ b/tests/unit/acquisition/test_utils.py @@ -13,7 +13,7 @@ # limitations under the License. from __future__ import annotations -from typing import Any, Mapping, Optional +from typing import Any, Mapping, Optional, Sequence from unittest.mock import MagicMock import numpy as np @@ -31,7 +31,7 @@ ) from trieste.data import Dataset from trieste.space import Box, SearchSpaceType -from trieste.types import Tag +from trieste.types import Tag, TensorType from trieste.utils.misc import LocalizedTag @@ -166,6 +166,47 @@ def test_with_local_datasets( assert datasets[ltag] is original_datasets[global_tag] +@pytest.mark.parametrize( + "datasets, indices", + [ + ( + { + "a": Dataset(tf.constant([[1.0, 2.0], [3.0, 4.0]]), tf.constant([[5.0], [6.0]])), + "b": Dataset(tf.constant([[7.0, 8.0], [9.0, 1.0]]), tf.constant([[2.0], [3.0]])), + }, + [tf.constant([0]), tf.constant([0, 1])], + ), + ( + { + "a": Dataset(tf.constant([[1.0, 2.0], [3.0, 4.0]]), tf.constant([[5.0], [6.0]])), + "b": Dataset(tf.constant([[7.0, 8.0], [9.0, 1.0]]), tf.constant([[2.0], [3.0]])), + }, + [tf.constant([], dtype=tf.int32), tf.constant([0])], + ), + ], +) +def test_with_local_datasets_indices( + datasets: Mapping[Tag, Dataset], indices: Sequence[TensorType] +) -> None: + original_datasets = dict(datasets).copy() + global_tags = {t for t in original_datasets if not LocalizedTag.from_tag(t).is_local} + num_global_datasets = len(global_tags) + + num_local_datasets = len(indices) + datasets = with_local_datasets(datasets, num_local_datasets, indices) + assert len(datasets) == num_global_datasets * (1 + num_local_datasets) + + for global_tag in global_tags: + assert datasets[global_tag] is original_datasets[global_tag] + for i in range(num_local_datasets): + ltag = LocalizedTag(global_tag, i) + if ltag in original_datasets: + assert datasets[ltag] is original_datasets[ltag] + else: + assert len(datasets[ltag].query_points) == len(indices[i]) + assert len(datasets[ltag].observations) == len(indices[i]) + + @pytest.mark.parametrize( "points, tolerance, expected_mask", [ diff --git a/tests/unit/models/test_interfaces.py b/tests/unit/models/test_interfaces.py index 66f4739e10..6213f7e723 100644 --- a/tests/unit/models/test_interfaces.py +++ b/tests/unit/models/test_interfaces.py @@ -78,6 +78,9 @@ def _model_stack() -> ( def test_model_stack_predict() -> None: stack, (model01, model2, model3) = _model_stack() + assert all( + isinstance(model, TrainableProbabilisticModel) for model in (stack, model01, model2, model3) + ) query_points = tf.random.uniform([5, 7, 3]) mean, var = stack.predict(query_points) diff --git a/tests/unit/test_ask_tell_optimization.py b/tests/unit/test_ask_tell_optimization.py index ce1f19193f..62c4d446fc 100644 --- a/tests/unit/test_ask_tell_optimization.py +++ b/tests/unit/test_ask_tell_optimization.py @@ -13,13 +13,19 @@ # limitations under the License. from __future__ import annotations -from typing import Mapping, Optional, Type, Union +from itertools import zip_longest +from typing import Mapping, Optional, Sequence, Type, Union import numpy.testing as npt import pytest import tensorflow as tf -from tests.util.misc import FixedAcquisitionRule, assert_datasets_allclose, mk_dataset +from tests.util.misc import ( + FixedAcquisitionRule, + FixedLocalAcquisitionRule, + assert_datasets_allclose, + mk_dataset, +) from tests.util.models.gpflow.models import ( GaussianProcess, PseudoTrainableProbModel, @@ -28,7 +34,11 @@ ) from trieste.acquisition.rule import AcquisitionRule, LocalDatasetsAcquisitionRule from trieste.acquisition.utils import copy_to_local_models -from trieste.ask_tell_optimization import AskTellOptimizer, AskTellOptimizerNoTraining +from trieste.ask_tell_optimization import ( + AskTellOptimizer, + AskTellOptimizerNoTraining, + AskTellOptimizerState, +) from trieste.bayesian_optimizer import OptimizationResult, Record from trieste.data import Dataset from trieste.models.interfaces import ProbabilisticModel, TrainableProbabilisticModel @@ -63,7 +73,7 @@ def search_space() -> Box: @pytest.fixture def init_dataset() -> Dataset: - return mk_dataset([[0.0]], [[0.0]]) + return mk_dataset([[0.0], [0.5]], [[0.0], [0.5]]) @pytest.fixture @@ -71,6 +81,11 @@ def acquisition_rule() -> AcquisitionRule[TensorType, Box, ProbabilisticModel]: return FixedAcquisitionRule([[0.0]]) +@pytest.fixture +def local_acquisition_rule() -> LocalDatasetsAcquisitionRule[TensorType, Box, ProbabilisticModel]: + return FixedLocalAcquisitionRule([[0.0]]) + + @pytest.fixture def model() -> TrainableProbabilisticModel: return LinearWithUnitVariance() @@ -84,6 +99,7 @@ def model() -> TrainableProbabilisticModel: ] +@pytest.mark.parametrize("track_data", [True, False]) @pytest.mark.parametrize("optimizer", OPTIMIZERS) def test_ask_tell_optimizer_suggests_new_point( search_space: Box, @@ -91,8 +107,9 @@ def test_ask_tell_optimizer_suggests_new_point( model: TrainableProbabilisticModel, acquisition_rule: AcquisitionRule[TensorType, Box, TrainableProbabilisticModel], optimizer: OptimizerType, + track_data: bool, ) -> None: - ask_tell = optimizer(search_space, init_dataset, model, acquisition_rule) + ask_tell = optimizer(search_space, init_dataset, model, acquisition_rule, track_data=track_data) new_point = ask_tell.ask() @@ -115,7 +132,7 @@ def test_ask_tell_optimizer_with_default_acquisition_suggests_new_point( @pytest.mark.parametrize("optimizer", OPTIMIZERS) @pytest.mark.parametrize("copy", [True, False]) -def test_ask_tell_optimizer_returns_complete_state( +def test_ask_tell_optimizer_returns_complete_record( search_space: Box, init_dataset: Dataset, model: TrainableProbabilisticModel, @@ -134,7 +151,7 @@ def test_ask_tell_optimizer_returns_complete_state( @pytest.mark.parametrize("optimizer", OPTIMIZERS) @pytest.mark.parametrize("copy", [True, False]) -def test_ask_tell_optimizer_loads_from_state( +def test_ask_tell_optimizer_loads_from_record( search_space: Box, init_dataset: Dataset, model: TrainableProbabilisticModel, @@ -153,6 +170,69 @@ def test_ask_tell_optimizer_loads_from_state( assert isinstance(new_state.model, type(old_state.model)) +@pytest.mark.parametrize("optimizer", OPTIMIZERS) +def test_ask_tell_optimizer_returns_complete_state( + search_space: Box, + init_dataset: Dataset, + model: TrainableProbabilisticModel, + local_acquisition_rule: LocalDatasetsAcquisitionRule[ + TensorType, Box, TrainableProbabilisticModel + ], + optimizer: OptimizerType, +) -> None: + ask_tell = optimizer( + search_space, init_dataset, model, local_acquisition_rule, track_data=False + ) + + state: AskTellOptimizerState[None, TrainableProbabilisticModel] = ask_tell.to_state() + + assert_datasets_allclose(state.record.dataset, init_dataset) + assert isinstance(state.record.model, type(model)) + assert state.record.acquisition_state is None + assert state.local_data_ixs is not None + npt.assert_array_equal( + state.local_data_ixs, + [ + tf.range(len(init_dataset.query_points)) + for _ in range(local_acquisition_rule.num_local_datasets) + ], + ) + + +@pytest.mark.parametrize("optimizer", OPTIMIZERS) +def test_ask_tell_optimizer_loads_from_state( + search_space: Box, + init_dataset: Dataset, + model: TrainableProbabilisticModel, + local_acquisition_rule: LocalDatasetsAcquisitionRule[ + TensorType, Box, TrainableProbabilisticModel + ], + optimizer: OptimizerType, +) -> None: + old_state: AskTellOptimizerState[None, TrainableProbabilisticModel] = AskTellOptimizerState( + record=Record({OBJECTIVE: init_dataset}, {OBJECTIVE: model}, None), + local_data_ixs=[ + tf.range(len(init_dataset.query_points)) + for _ in range(local_acquisition_rule.num_local_datasets) + ], + ) + + ask_tell = optimizer.from_record( + old_state.record, + search_space, + local_acquisition_rule, + track_data=False, + local_data_ixs=old_state.local_data_ixs, + ) + new_state: AskTellOptimizerState[None, TrainableProbabilisticModel] = ask_tell.to_state() + + assert_datasets_allclose(new_state.record.dataset, old_state.record.dataset) + assert old_state.record.model is new_state.record.model + assert new_state.local_data_ixs is not None + assert old_state.local_data_ixs is not None + npt.assert_array_equal(new_state.local_data_ixs, old_state.local_data_ixs) + + @pytest.mark.parametrize("optimizer", OPTIMIZERS) @pytest.mark.parametrize("copy", [True, False]) def test_ask_tell_optimizer_returns_optimization_result( @@ -188,6 +268,23 @@ def test_ask_tell_optimizer_updates_state_with_new_data( assert_datasets_allclose(state_record.dataset, init_dataset + new_data) +@pytest.mark.parametrize("optimizer", OPTIMIZERS) +def test_ask_tell_optimizer_doesnt_update_state_with_new_data( + search_space: Box, + init_dataset: Dataset, + model: TrainableProbabilisticModel, + acquisition_rule: AcquisitionRule[TensorType, Box, TrainableProbabilisticModel], + optimizer: OptimizerType, +) -> None: + new_data = mk_dataset([[1.0]], [[1.0]]) + ask_tell = optimizer(search_space, init_dataset, model, acquisition_rule, track_data=False) + + ask_tell.tell(new_data) + state_record: Record[None, TrainableProbabilisticModel] = ask_tell.to_record() + + assert_datasets_allclose(state_record.dataset, init_dataset) + + @pytest.mark.parametrize("optimizer", OPTIMIZERS) @pytest.mark.parametrize("copy", [True, False]) def test_ask_tell_optimizer_copies_state( @@ -301,6 +398,38 @@ def test_ask_tell_optimizer_model_setter_errors( two_models.model = model +@pytest.mark.parametrize("optimizer", OPTIMIZERS) +@pytest.mark.parametrize("track_data", [False, True]) +def test_ask_tell_optimizer_local_data_ixs_property( + search_space: Box, + init_dataset: Dataset, + model: TrainableProbabilisticModel, + local_acquisition_rule: LocalDatasetsAcquisitionRule[ + TensorType, Box, TrainableProbabilisticModel + ], + optimizer: OptimizerType, + track_data: bool, +) -> None: + local_data_ixs = [ + tf.range(min(i, len(init_dataset.query_points))) + for i in range(local_acquisition_rule.num_local_datasets) + ] + ask_tell = optimizer( + search_space, + init_dataset, + model, + local_acquisition_rule, + track_data=track_data, + local_data_ixs=local_data_ixs, + ) + if track_data: + assert ask_tell.local_data_ixs is None + else: + assert ask_tell.local_data_ixs is not None + for expected, actual in zip_longest(local_data_ixs, ask_tell.local_data_ixs): + npt.assert_array_equal(expected, actual) + + def test_ask_tell_optimizer_trains_model( search_space: Box, init_dataset: Dataset, @@ -457,7 +586,7 @@ def test_ask_tell_optimizer_tell_validates_keys( new_data_with_key_2 = {TAG2: mk_dataset([[1.0]], [[1.0]])} ask_tell = optimizer(search_space, dataset_with_key_1, model_with_key_1, acquisition_rule) - with pytest.raises(KeyError, match=str(TAG2)): + with pytest.raises(ValueError): ask_tell.tell(new_data_with_key_2) @@ -611,3 +740,77 @@ def test_ask_tell_optimizer_no_training_with_non_trainable_model( ask_tell.tell(new_data) state_record: Record[None, ProbabilisticModel] = ask_tell.to_record() assert_datasets_allclose(state_record.dataset, init_dataset + new_data) + + +@pytest.mark.parametrize("optimizer", OPTIMIZERS) +@pytest.mark.parametrize( + "new_data_ixs", [None, [tf.constant([2, 3, 4]), tf.constant([7]), tf.constant([3])]] +) +def test_ask_tell_optimizer_tracks_local_data_ixs( + search_space: Box, + init_dataset: Dataset, + model: TrainableProbabilisticModel, + local_acquisition_rule: LocalDatasetsAcquisitionRule[ + TensorType, Box, TrainableProbabilisticModel + ], + optimizer: OptimizerType, + new_data_ixs: Optional[Sequence[TensorType]], +) -> None: + ask_tell = optimizer( + search_space, init_dataset, model, local_acquisition_rule, track_data=False + ) + new_data = mk_dataset( + [[x / 100] for x in range(75, 75 + 6)], [[x / 100] for x in range(75, 75 + 6)] + ) + ask_tell.tell(init_dataset + new_data, new_data_ixs=new_data_ixs) + + if new_data_ixs is None: + # default is to assign new points round-robin + expected_indices = [[0, 1, 2, 5], [0, 1, 3, 6], [0, 1, 4, 7]] + else: + expected_indices = [[0, 1, 2, 3, 4], [0, 1, 7], [0, 1, 3]] + + assert ask_tell.local_data_ixs is not None + for ixs, expected_ixs in zip_longest(ask_tell.local_data_ixs, expected_indices): + assert ixs.numpy().tolist() == expected_ixs + + +@pytest.mark.parametrize("optimizer", OPTIMIZERS) +def test_ask_tell_optimizer_raises_when_round_robin_fails( + search_space: Box, + init_dataset: Dataset, + model: TrainableProbabilisticModel, + local_acquisition_rule: LocalDatasetsAcquisitionRule[ + TensorType, Box, TrainableProbabilisticModel + ], + optimizer: OptimizerType, +) -> None: + ask_tell = optimizer( + search_space, init_dataset, model, local_acquisition_rule, track_data=False + ) + # five points can't be round-robined properly across three datasets + new_data = mk_dataset( + [[x / 100] for x in range(75, 75 + 5)], [[x / 100] for x in range(75, 75 + 5)] + ) + with pytest.raises(ValueError, match="Cannot infer new data points"): + ask_tell.tell(init_dataset + new_data) + + +@pytest.mark.parametrize("optimizer", OPTIMIZERS) +def test_ask_tell_optimizer_raises_with_badly_shaped_new_data_idxs( + search_space: Box, + init_dataset: Dataset, + model: TrainableProbabilisticModel, + local_acquisition_rule: LocalDatasetsAcquisitionRule[ + TensorType, Box, TrainableProbabilisticModel + ], + optimizer: OptimizerType, +) -> None: + ask_tell = optimizer( + search_space, init_dataset, model, local_acquisition_rule, track_data=False + ) + new_data = mk_dataset( + [[x / 100] for x in range(75, 75 + 6)], [[x / 100] for x in range(75, 75 + 6)] + ) + with pytest.raises(ValueError, match="new_data_ixs has 1"): + ask_tell.tell(init_dataset + new_data, new_data_ixs=[tf.constant([[4]])]) diff --git a/tests/util/misc.py b/tests/util/misc.py index 4133315b62..a87230b672 100644 --- a/tests/util/misc.py +++ b/tests/util/misc.py @@ -24,7 +24,7 @@ import tensorflow as tf from typing_extensions import Final -from trieste.acquisition.rule import AcquisitionRule +from trieste.acquisition.rule import AcquisitionRule, LocalDatasetsAcquisitionRule from trieste.data import Dataset from trieste.models import ProbabilisticModel from trieste.objectives import Branin, Hartmann6 @@ -194,6 +194,16 @@ def acquire( return self._qp +class FixedLocalAcquisitionRule( + LocalDatasetsAcquisitionRule[TensorType, SearchSpace, ProbabilisticModel], FixedAcquisitionRule +): + """A local dataset acquisition rule that returns the same fixed value on every step.""" + + @property + def num_local_datasets(self) -> int: + return 3 + + ShapeLike = Union[tf.TensorShape, Sequence[int]] """ Type alias for types that can represent tensor shapes. """ diff --git a/trieste/acquisition/rule.py b/trieste/acquisition/rule.py index f3fedc1be1..b28e690cb8 100644 --- a/trieste/acquisition/rule.py +++ b/trieste/acquisition/rule.py @@ -1139,7 +1139,9 @@ def with_input_active_dims( # No selection for models. # Nothing to do if active dimensions are not set. - if isinstance(value, ProbabilisticModel) or self.input_active_dims is None: + # NOTE: do not replace with isinstance(value, ProbabilisticModel) until + # https://github.com/secondmind-labs/trieste/issues/836 has been fixed. + if not isinstance(value, (Dataset, tf.Tensor)) or self.input_active_dims is None: return value # Select components of query points for datasets. @@ -2076,7 +2078,7 @@ def _set_tr_width(self, models: Optional[Mapping[Tag, ProbabilisticModelType]] = # Select the input lengthscales that are active for this region. if tf.size(lengthscales) > 1: - lengthscales = self.with_input_active_dims(lengthscales) + lengthscales = self.with_input_active_dims(tf.convert_to_tensor(lengthscales)) self.tr_width = ( lengthscales diff --git a/trieste/acquisition/utils.py b/trieste/acquisition/utils.py index e8bcf28b3d..91ffae7cb8 100644 --- a/trieste/acquisition/utils.py +++ b/trieste/acquisition/utils.py @@ -13,7 +13,7 @@ # limitations under the License. import copy import functools -from typing import Dict, Mapping, Tuple, Union +from typing import Dict, Mapping, Optional, Sequence, Tuple, Union import tensorflow as tf from check_shapes import check_shapes @@ -162,14 +162,24 @@ def copy_to_local_models( def with_local_datasets( datasets: Mapping[Tag, Dataset], num_local_datasets: int, + local_dataset_indices: Optional[Sequence[TensorType]] = None, ) -> Dict[Tag, Dataset]: """ - Helper method to add local datasets if they do not already exist, by copying global datasets. + Helper method to add local datasets if they do not already exist, by copying global datasets + or a subset thereof. :param datasets: The original datasets. :param num_local_datasets: The number of local datasets to add per global tag. + :param local_dataset_indices: Optional sequence of indices, indicating which parts of + the global datasets should be copied. If None then the entire datasets are copied. :return: The updated mapping of datasets. """ + if local_dataset_indices is not None and len(local_dataset_indices) != num_local_datasets: + raise ValueError( + f"local_dataset_indices should have {num_local_datasets} entries, " + f"has {len(local_dataset_indices)}" + ) + updated_datasets = {} for tag in datasets: updated_datasets[tag] = datasets[tag] @@ -178,7 +188,19 @@ def with_local_datasets( for i in range(num_local_datasets): target_ltag = LocalizedTag(ltag.global_tag, i) if target_ltag not in datasets: - updated_datasets[target_ltag] = datasets[tag] + if local_dataset_indices is None: + updated_datasets[target_ltag] = datasets[tag] + else: + # TODO: use sparse tensors instead + updated_datasets[target_ltag] = Dataset( + query_points=tf.gather( + datasets[tag].query_points, local_dataset_indices[i] + ), + observations=tf.gather( + datasets[tag].observations, local_dataset_indices[i] + ), + ) + return updated_datasets diff --git a/trieste/ask_tell_optimization.py b/trieste/ask_tell_optimization.py index 46053f66b0..9353622a13 100644 --- a/trieste/ask_tell_optimization.py +++ b/trieste/ask_tell_optimization.py @@ -22,7 +22,10 @@ from abc import ABC, abstractmethod from copy import deepcopy -from typing import Dict, Generic, Mapping, Type, TypeVar, cast, overload +from dataclasses import dataclass +from typing import Dict, Generic, Mapping, Optional, Sequence, Type, TypeVar, cast, overload + +import tensorflow as tf from .models.utils import optimize_model_and_save_result @@ -69,6 +72,21 @@ AskTellOptimizerType = TypeVar("AskTellOptimizerType") +@dataclass(frozen=True) +class AskTellOptimizerState(Generic[StateType, ProbabilisticModelType]): + """ + Internal state for an Ask/Tell optimizer. This can be obtained using the optimizer's + `to_state` method, and can be used to initialise a new instance of the optimizer. + """ + + record: Record[StateType, ProbabilisticModelType] + """ A record of the current state of the optimization. """ + + local_data_ixs: Optional[Sequence[TensorType]] + """ Indices to the local data, for LocalDatasetsAcquisitionRule rules + when `track_data` is `False`. """ + + class AskTellOptimizerABC(ABC, Generic[SearchSpaceType, ProbabilisticModelType]): """ This class provides Ask/Tell optimization interface. It is designed for those use cases @@ -85,6 +103,8 @@ def __init__( models: Mapping[Tag, ProbabilisticModelType], *, fit_model: bool = True, + track_data: bool = True, + local_data_ixs: Optional[Sequence[TensorType]] = None, ): ... @@ -97,6 +117,8 @@ def __init__( acquisition_rule: AcquisitionRule[TensorType, SearchSpaceType, ProbabilisticModelType], *, fit_model: bool = True, + track_data: bool = True, + local_data_ixs: Optional[Sequence[TensorType]] = None, ): ... @@ -112,6 +134,8 @@ def __init__( acquisition_state: StateType | None, *, fit_model: bool = True, + track_data: bool = True, + local_data_ixs: Optional[Sequence[TensorType]] = None, ): ... @@ -123,6 +147,8 @@ def __init__( models: ProbabilisticModelType, *, fit_model: bool = True, + track_data: bool = True, + local_data_ixs: Optional[Sequence[TensorType]] = None, ): ... @@ -135,6 +161,8 @@ def __init__( acquisition_rule: AcquisitionRule[TensorType, SearchSpaceType, ProbabilisticModelType], *, fit_model: bool = True, + track_data: bool = True, + local_data_ixs: Optional[Sequence[TensorType]] = None, ): ... @@ -150,6 +178,8 @@ def __init__( acquisition_state: StateType | None = None, *, fit_model: bool = True, + track_data: bool = True, + local_data_ixs: Optional[Sequence[TensorType]] = None, ): ... @@ -167,6 +197,8 @@ def __init__( acquisition_state: StateType | None = None, *, fit_model: bool = True, + track_data: bool = True, + local_data_ixs: Optional[Sequence[TensorType]] = None, ): """ :param search_space: The space over which to search for the next query point. @@ -181,6 +213,12 @@ def __init__( :param acquisition_state: The optional acquisition state for stateful acquisitions. :param fit_model: If `True` (default), models passed in will be optimized on the given data. If `False`, the models are assumed to be optimized already. + :param track_data: If `True` (default), the optimizer will track the changing + datasets via a local copy. If `False`, it will infer new datasets from + updates to the global datasets (optionally using `local_data_ixs` and indices passed + in to `tell`). + :param local_data_ixs: Indices to the local data in the initial datasets. If unspecified, + assumes that the initial datasets are global. :raise ValueError: If any of the following are true: - the keys in ``datasets`` and ``models`` do not match - ``datasets`` or ``models`` are empty @@ -192,9 +230,6 @@ def __init__( if not datasets or not models: raise ValueError("dicts of datasets and models must be populated.") - # Copy the dataset so we don't change the one provided by the user. - datasets = deepcopy(datasets) - if isinstance(datasets, Dataset): datasets = {OBJECTIVE: datasets} if not isinstance(models, Mapping): @@ -216,6 +251,7 @@ def __init__( self._datasets = datasets self._models = models + self.track_data = track_data self._query_plot_dfs: dict[int, pd.DataFrame] = {} self._observation_plot_dfs = observation_plot_init(self._datasets) @@ -234,20 +270,25 @@ def __init__( else: self._acquisition_rule = acquisition_rule - # In order to support local datasets, account for the case where there may be an initial - # dataset that is not tagged per region. In this case, only the global dataset will - # exist in datasets. We want to copy this initial dataset to all the regions. - # Copy the global dataset if the local version for the subspace is not available. - # - # Only applies to a subset of acquisition rules, i.e. ones that have subspaces and - # hence use local datasets. if isinstance(self._acquisition_rule, LocalDatasetsAcquisitionRule): - self._datasets = with_local_datasets( - self._datasets, self._acquisition_rule.num_local_datasets - ) - self._filtered_datasets = self._acquisition_rule.filter_datasets( - self._models, self._datasets - ) + # In order to support local datasets, account for the case where there may be an initial + # dataset that is not tagged per region. In this case, only the global dataset will + # exist in datasets. We want to copy this initial dataset to all the regions. + num_local_datasets = self._acquisition_rule.num_local_datasets + if self.track_data: + datasets = self._datasets = with_local_datasets(self._datasets, num_local_datasets) + else: + self._dataset_len = self.dataset_len(self._datasets) + if local_data_ixs is not None: + self._dataset_ixs = list(local_data_ixs) + else: + self._dataset_ixs = [ + tf.range(self._dataset_len) for _ in range(num_local_datasets) + ] + datasets = with_local_datasets( + self._datasets, num_local_datasets, self._dataset_ixs + ) + self._filtered_datasets = self._acquisition_rule.filter_datasets(self._models, datasets) if fit_model: with Timer() as initial_model_fitting_timer: @@ -293,6 +334,14 @@ def dataset(self) -> Dataset: else: raise ValueError(f"Expected a single dataset, found {len(datasets)}") + @property + def local_data_ixs(self) -> Optional[Sequence[TensorType]]: + """Indices to the local data. Only stored for LocalDatasetsAcquisitionRule rules + when `track_data` is `False`.""" + if isinstance(self._acquisition_rule, LocalDatasetsAcquisitionRule) and not self.track_data: + return self._dataset_ixs + return None + @property def models(self) -> Mapping[Tag, ProbabilisticModelType]: """The current models.""" @@ -335,6 +384,18 @@ def acquisition_state(self) -> StateType | None: """The current acquisition state.""" return self._acquisition_state + @classmethod + def dataset_len(cls, datasets: Mapping[Tag, Dataset]) -> int: + """Helper method for inferring the global dataset size.""" + dataset_lens = { + len(dataset.query_points) + for tag, dataset in datasets.items() + if not LocalizedTag.from_tag(tag).is_local + } + if len(dataset_lens) != 1: + raise ValueError(f"Expected unique global dataset size, got {dataset_lens}") + return next(iter(dataset_lens)) + @classmethod def from_record( cls: Type[AskTellOptimizerType], @@ -347,6 +408,8 @@ def from_record( ProbabilisticModelType, ] | None = None, + track_data: bool = True, + local_data_ixs: Optional[Sequence[TensorType]] = None, ) -> AskTellOptimizerType: """Creates new :class:`~AskTellOptimizer` instance from provided optimization state. Model training isn't triggered upon creation of the instance. @@ -372,6 +435,8 @@ def from_record( acquisition_rule=acquisition_rule, acquisition_state=record.acquisition_state, fit_model=False, + track_data=track_data, + local_data_ixs=local_data_ixs, ) def to_record(self, copy: bool = True) -> Record[StateType, ProbabilisticModelType]: @@ -410,6 +475,22 @@ def to_result(self, copy: bool = True) -> OptimizationResult[StateType, Probabil record: Record[StateType, ProbabilisticModelType] = self.to_record(copy=copy) return OptimizationResult(Ok(record), []) + def to_state( + self, copy: bool = False + ) -> AskTellOptimizerState[StateType, ProbabilisticModelType]: + """Returns the AskTellOptimizer state, comprising the current optimization state + alongside any internal AskTellOptimizer state. + + :param copy: Whether to return a copy of the current state or the original. Copying + is not supported for all model types. However, continuing the optimization will + modify the original state. + :return: An :class:`AskTellOptimizerState` object. + """ + return AskTellOptimizerState( + record=self.to_record(copy=copy), + local_data_ixs=self.local_data_ixs, + ) + def ask(self) -> TensorType: """Suggests a point (or points in batch mode) to observe by optimizing the acquisition function. If the acquisition is stateful, its state is saved. @@ -447,30 +528,76 @@ def ask(self) -> TensorType: return query_points - def tell(self, new_data: Mapping[Tag, Dataset] | Dataset) -> None: + def tell( + self, + new_data: Mapping[Tag, Dataset] | Dataset, + new_data_ixs: Optional[Sequence[TensorType]] = None, + ) -> None: """Updates optimizer state with new data. - :param new_data: New observed data. + :param new_data: New observed data. If `track_data` is `False`, this refers to all + the data. + :param new_data_ixs: Indices to the new observed local data, if `track_data` is `False`. + If unspecified, inferred from the change in dataset sizes. :raise ValueError: If keys in ``new_data`` do not match those in already built dataset. """ if isinstance(new_data, Dataset): new_data = {OBJECTIVE: new_data} # The datasets must have the same keys as the existing datasets. Only exception is if - # the existing datasets are all global, in which case the dataset will be appropriately - # updated below for the next iteration. - datasets_indices = {LocalizedTag.from_tag(tag).local_index for tag in self._datasets.keys()} - if self._datasets.keys() != new_data.keys() and datasets_indices != {None}: + # the existing datasets are all global and the new data contains local datasets too. + if all(LocalizedTag.from_tag(tag).local_index is None for tag in self._datasets.keys()): + global_old = {LocalizedTag.from_tag(tag).global_tag for tag in self._datasets.keys()} + global_new = {LocalizedTag.from_tag(tag).global_tag for tag in new_data.keys()} + if global_new != global_old: + raise ValueError( + f"new_data global keys {global_new} doesn't " + f"match dataset global keys {global_old}" + ) + elif self._datasets.keys() != new_data.keys(): raise ValueError( f"new_data keys {new_data.keys()} doesn't " f"match dataset keys {self._datasets.keys()}" ) - for tag, new_dataset in new_data.items(): - self._datasets[tag] += new_dataset - self._filtered_datasets = self._acquisition_rule.filter_datasets( - self._models, self._datasets - ) + if self.track_data: + for tag, new_dataset in new_data.items(): + self._datasets[tag] += new_dataset + datasets: Mapping[Tag, Dataset] = self._datasets + elif not isinstance(self._acquisition_rule, LocalDatasetsAcquisitionRule): + datasets = new_data + else: + num_local_datasets = len(self._dataset_ixs) + if new_data_ixs is None: + # infer dataset indices from change in dataset sizes + new_dataset_len = self.dataset_len(new_data) + num_new_points = new_dataset_len - self._dataset_len + if num_new_points < 0 or num_new_points % num_local_datasets != 0: + raise ValueError( + "Cannot infer new data points as datasets haven't increased by " + f"a multiple of {num_local_datasets}" + ) + for i in range(num_local_datasets): + self._dataset_ixs[i] = tf.concat( + [ + self._dataset_ixs[i], + tf.range(0, num_new_points, num_local_datasets) + self._dataset_len + i, + ], + -1, + ) + else: + # use explicit indices + if len(new_data_ixs) != num_local_datasets: + raise ValueError( + f"new_data_ixs has {len(new_data_ixs)} entries, " + f"expected {num_local_datasets}" + ) + for i in range(num_local_datasets): + self._dataset_ixs[i] = tf.concat([self._dataset_ixs[i], new_data_ixs[i]], -1) + datasets = with_local_datasets(new_data, num_local_datasets, self._dataset_ixs) + self._dataset_len = self.dataset_len(datasets) + + self._filtered_datasets = self._acquisition_rule.filter_datasets(self._models, datasets) with Timer() as model_fitting_timer: for tag, model in self._models.items(): @@ -483,7 +610,7 @@ def tell(self, new_data: Mapping[Tag, Dataset] | Dataset) -> None: if summary_writer: with summary_writer.as_default(step=logging.get_step_number()): write_summary_observations( - self._datasets, + datasets, self._models, new_data, model_fitting_timer, diff --git a/trieste/bayesian_optimizer.py b/trieste/bayesian_optimizer.py index 0e489323a6..72fb5efac4 100644 --- a/trieste/bayesian_optimizer.py +++ b/trieste/bayesian_optimizer.py @@ -999,6 +999,9 @@ def write_summary_observations( ) -> None: """Write TensorBoard summary for the current step observations.""" for tag in models: + if tag not in tagged_output: + continue + with tf.name_scope(f"{tag}.model"): models[tag].log(datasets[tag])