tpcp.optimize.optuna.CustomOptunaOptimize#

class tpcp.optimize.optuna.CustomOptunaOptimize(pipeline: PipelineT, get_study_params: Callable[[int], StudyParamsDict], *, n_trials: int | None = None, timeout: float | None = None, callbacks: List[Callable[[Study, FrozenTrial], None]] | None = None, gc_after_trial: bool = False, n_jobs: int = 1, random_seed: int | None = None, eval_str_paras: Sequence[str] = (), show_progress_bar: bool = False, return_optimized: bool = True)[source]#

Base class for custom Optuna optimizer.

This provides a relatively simple tpcp compatible interface to Optuna. You basically need to subclass this class and implement the create_objective method to return the objective function you want to optimize. The only difference to a normal objective function in Optuna is, that your objective here should expect a pipeline and a dataset object as second and third argument (see Example). If there are parameters you want to make customizable (e.g. which metric to optimize for), expose them in the __init__ of your subclass.

Depending on your usecase, your custom optimizers can be single use with a bunch of “hard-coded” logic, or you can try to make them more general, by exposing certain configurability.

Parameters:
pipeline

A tpcp pipeline with some hyper-parameters that should be optimized. This can either be a normal pipeline or an optimizable-pipeline. This fully depends on your implementation of the create_objective method.

get_study_params

A callable that returns a dictionary with the parameters that should be used to create the study (i.e. passed to optuna.create_study). Creating the study is handled via a callable, instead of providing the parameters or the study object directly, to make it possible to create individual studies, when CustomOptuna optimize is called by an external wrapper (i.e. cross_validate). Further, the provided function is called with a single parameter seed that can be used to create a samplers and pruners with fixed random seeds. Using the passed seed compared to a fix global seed is important, as otherwise the same sampler/pruner use the same random numbers across multiple processes if n_jobs > 1 (see more in Notes). Note, that this method should return consistent output when called multiple times with the same seed. Otherwise, unexpected behaviour can occur, where different processes use different samplers/pruners in a multi-processing setting.

n_trials

The number of trials. If this argument is set to None, there is no limitation on the number of trials. In this case you should use timeout instead. Because optuna is called internally by this wrapper, you can not set up a study without limits and end it using CTRL+C (as suggested by the Optuna docs). In this case the entire execution flow would be stopped.

timeout

Stop study after the given number of second(s). If this argument is set to None, the study is executed without time limitation. In this case you should use n_trials to limit the execution.

return_optimized

If True, a pipeline object with the overall best parameters is created and re-optimized using all provided data as input. The optimized pipeline object is stored as optimized_pipeline_. How the “re-optimization” works depends on the type of pipeline provided. If it is a simple pipeline, no specific re-optimization will be perfomed and optimized_pipeline_ will simply be an instance of the pipeline with the best parameters indentified in the search. When pipeline is a subclass of OptimizablePipeline, we attempt to call pipeline.self_optimize with the entire dataset provided to the optimize method. The result of this self-optimization will be set as optimized_pipeline. If this behaviour is undesired, you can overwrite the return_optimized_pipeline method in subclass.s

callbacks

List of callback functions that are invoked at the end of each trial. Each function must accept two parameters with the following types in this order: Study and FrozenTrial.

n_jobs

Number of parallel jobs to use (default = 1 -> single process, -1 -> all available cores). This uses joblib with the multiprocessing backend to parallelize the optimization. If this is set to -1, all available cores are used.

Warning

Read the notes on multiprocessing below before using this feature.

random_seed

A random seed that is used as base for the seed passed to your implementation get_study_params. If None, this is set to a random integer between 0 and 100 (derived using numpy.random.randint). In case of multiprocessing, this seed is used as offset to create different seeds for each process.

eval_str_paras

This can be a sequence (tuple/list) of parameter names used by Optuna that should be evaluated using literal_eval instead of just set as string on the pipeline. The main usecase of this is to allow the user to pass a list of strings to suggest_categorical but have the actual pipeline recive the evaluated value of this string. This is required, as many storage backends of optuna only support number or strings as categorical values.

A typical example would be wanting to select a set of axis for an algorithm that are expressed as a list/tuple of strings. In this case you would use a strinigfied version of these tuples as the categorical values in the optuna study and then use eval_str_paras to evaluate the stringified version to the actual tuple.

>>> def search_space(trial):
...     trial.suggest_categorical("axis", ["('x',)", "('y',)", "('z',)", "('x', 'y')"])
>>> optuna_opt = CustomOptunaOptimize(pipeline, ..., eval_str_paras=["axis"])

Note, that in your custom subclass, you need to wrap the trial params in self.sanitize_params to make sure this sanitazation is applied to all parameters.

>>> # Inside your custom subclass
>>> def create_objective(self):
...     def objective(trial, pipeline, dataset):
...         params = self.sanitize_params(trial.params)
...         pipeline.set_params(**params)

In all other places (e.g. when converting the final result table) this class handles sanitazation automatically.

show_progress_bar

Flag to show progress bars or not.

gc_after_trial

Run the garbage collector after each trial. Check the optuna documentation for more detail

Other Parameters:
dataset

The dataset instance passed to the optimize method

Attributes:
search_results_

Detailed results of the study.

optimized_pipeline_

An instance of the input pipeline with the best parameter set. This is only available if return_optimized is not False.

best_params_

Parameters of the best trial in the Study.

best_score_

Best score reached in the study.

best_trial_

Best trial in the Study.

study_

The study object itself. This should usually be identical to self.study.

random_seed_

The actual random seed used for the optimization. This is either the value passed to random_seed or a random integer between 0 and 100.

Notes

Multiprocessing#

This class provides a relatively hacky implementation of multiprocessing. The implementation is based on the suggestions made here: optuna/optuna#2862 However, it depends on internal optuna APIs and might break in future versions.

To use multiprocessing, the provided get_study_params function must return a persistent backend ( i.e. not the default InMemoryStorage), that can be written to by multiple processes. To make sure that your individual runs are independent and you don’t leave behind any files, make sure you clean up your study after each run. You can use optuna.delete_study(study_name=opti_instance.study_.study_name, storage=opti_instance.study_._storage) for this.

Further, if you use samplers, that use a random seed (you likely are), you need to make sure that you pass the seed provided to get_study_params to the sampler and potentially pruner as well. Otherwise, each sampler/pruner instance might use the same random seed, which will lead to identical trials in each process.

From the implementation perspective, we split the number of trials into n_jobs chunks and then spawn one study per job. However, we first run the first trial in the main process, to make sure that the study is created correctly. Then we split the remaining trials into chunks and spawn a new process for each chunk. This study is a copy of the study from the main process and hence, points to the same database. Each process will then complete its chunk of trials and then terminate. This is a relatively naive implementation, but it avoids the overhead of spawning a new process for each trial. If this is always the best idea, is unclear.

One downside of using multiprocessing is, that your runs will not be reproducible, as the order of the trials is not guraranteed and depends on when the individual processes finish. This can lead to different suggested parameters when non-trivial samplers are used. Note that this is not a specific problem of our implementation, but a general problem of using multiprocessing with optuna.

Further, the use of show_progress_bar is not recommended when using multiprocessing, as one progress bar per process is created and the output is not very readable. It might still be helpful to see that something is happening.

Note

Using SQLite as backend is known to cause issues with multiprocessing, when the database is stored on a network drive (e.g. as typically done on a cluster). On most clusters, you should use the local storage of your node for the database or use a different backend (e.g. Redis, MySQL), if multiple nodes need to access the database at once.

Examples

>>> from tpcp.validate import Scorer
>>> from optuna import create_study
>>> from optuna import samplers
>>>
>>> class MyOptunaOptimizer(CustomOptunaOptimize):
...     def create_objective(self):
...         def objective(trial: Trial, pipeline: Pipeline, dataset: Dataset):
...             trial.suggest_float("my_pipeline_para", 0, 3)
...             mean_score, _ = Scorer(lambda pipe, dp: pipe.score(dp))(pipeline, dataset)
...             return mean_score
...         return objective
>>>
>>> study = create_study(sampler=samplers.RandomSampler())
>>> opti = MyOptunaOptimizer(pipeline=MyPipeline(), study=study, n_trials=10)
>>> opti = opti.optimize(MyDataset())

Methods

as_attrs()

Return a version of the Dataset class that can be subclassed using attrs defined classes.

as_dataclass()

Return a version of the Dataset class that can be subclassed using dataclasses.

clone()

Create a new instance of the class with all parameters copied over.

create_objective()

Return the objective function that should be optimized.

get_params([deep])

Get parameters for this algorithm.

optimize(dataset, **_)

Optimize the objective over the dataset and find the best parameter combination.

return_optimized_pipeline(pipeline, dataset, ...)

Return the pipeline with the best parameters of a study.

run(datapoint)

Run the optimized pipeline.

safe_run(datapoint)

Run the optimized pipeline.

sanitize_params(params)

Sanatize the parameters of a trial.

score(datapoint)

Run score of the optimized pipeline.

set_params(**params)

Set the parameters of this Algorithm.

__init__(pipeline: PipelineT, get_study_params: Callable[[int], StudyParamsDict], *, n_trials: int | None = None, timeout: float | None = None, callbacks: List[Callable[[Study, FrozenTrial], None]] | None = None, gc_after_trial: bool = False, n_jobs: int = 1, random_seed: int | None = None, eval_str_paras: Sequence[str] = (), show_progress_bar: bool = False, return_optimized: bool = True) None[source]#
_call_optimize(study: Study, objective: Callable[[Trial], float | Sequence[float]])[source]#

Call the optuna study.

This is a separate method to make it easy to modify how the study is called.

static as_attrs()[source]#

Return a version of the Dataset class that can be subclassed using attrs defined classes.

Note, this requires attrs to be installed!

static as_dataclass()[source]#

Return a version of the Dataset class that can be subclassed using dataclasses.

property best_params_: Dict[str, Any]#

Parameters of the best trial in the Study.

property best_score_: float#

Best score reached in the study.

property best_trial_: FrozenTrial#

Best trial in the Study.

clone() Self[source]#

Create a new instance of the class with all parameters copied over.

This will create a new instance of the class itself and all nested objects

create_objective() Callable[[Trial, PipelineT, DatasetT], float | Sequence[float]][source]#

Return the objective function that should be optimized.

This method should be implemented by a child class and return an objective function that is compatible with Optuna. However, compared to a normal Optuna objective function, the function should expect a pipeline and a dataset object as additional inputs to the optimization Trial object.

get_params(deep: bool = True) Dict[str, Any][source]#

Get parameters for this algorithm.

Parameters:
deep

Only relevant if object contains nested algorithm objects. If this is the case and deep is True, the params of these nested objects are included in the output using a prefix like nested_object_name__ (Note the two “_” at the end)

Returns:
params

Parameter names mapped to their values.

optimize(dataset: DatasetT, **_: Any) Self[source]#

Optimize the objective over the dataset and find the best parameter combination.

This method calls self.create_objective to obtain the objective function that should be optimized.

Parameters:
dataset

The dataset used for optimization.

return_optimized_pipeline(pipeline: PipelineT, dataset: DatasetT, study: Study) PipelineT[source]#

Return the pipeline with the best parameters of a study.

This either just returns the pipeline with the best parameters set, or if the pipeline is a subclass of OptimizablePipeline it attempts a re-optimization of the pipeline using the provided dataset.

This functionality is a sensible default, but it is expected to overwrite this method in custom subclasses, if specific behaviour is needed.

Don’t call this function on its own! It is only expected to be called internally by optimize.

run(datapoint: DatasetT) PipelineT[source]#

Run the optimized pipeline.

This is a wrapper to contain API compatibility with Pipeline.

safe_run(datapoint: DatasetT) PipelineT[source]#

Run the optimized pipeline.

This is a wrapper to contain API compatibility with Pipeline.

sanitize_params(params: Dict[str, Any]) Dict[str, Any][source]#

Sanatize the parameters of a trial.

This will apply the str evaluation controlled by self.eval_str_paras to the parameters. Call this method before passing the parameters to the pipeline in your objective function.

score(datapoint: DatasetT) float | Dict[str, float][source]#

Run score of the optimized pipeline.

This is a wrapper to contain API compatibility with Pipeline.

property search_results_: Dict[str, Sequence[Any]]#

Detailed results of the study.

This basically contains the same information as self.study_.trials_dataframe(), with some small modifications:

  • columns starting with “params_” are renamed to “param_”

  • a new column called “params” containing all parameters as dict is added

  • “value” is renamed to score”

  • the score of pruned trials is set to np.nan

These changes are made to make the output comparable to the output of GridSearch and GridSearchCV.

set_params(**params: Any) Self[source]#

Set the parameters of this Algorithm.

To set parameters of nested objects use nested_object_name__para_name=.

Examples using tpcp.optimize.optuna.CustomOptunaOptimize#

Custom Optuna Optimizer

Custom Optuna Optimizer