tpcp.optimize.optuna
.CustomOptunaOptimize#
- class tpcp.optimize.optuna.CustomOptunaOptimize(pipeline: PipelineT, create_study: Callable[[], Study], *, n_trials: int | None = None, timeout: float | None = None, callbacks: List[Callable[[Study, FrozenTrial], None]] | None = None, gc_after_trial: bool = False, n_jobs: int = 1, eval_str_paras: Sequence[str] = (), show_progress_bar: bool = False, return_optimized: bool = True)[source]#
Base class for custom Optuna optimizer.
This provides a relatively simple tpcp compatible interface to Optuna. You basically need to subclass this class and implement the
create_objective
method to return the objective function you want to optimize. The only difference to a normal objective function in Optuna is, that your objective here should expect a pipeline and a dataset object as second and third argument (see Example). If there are parameters you want to make customizable (e.g. which metric to optimize for), expose them in the__init__
of your subclass.Depending on your usecase, your custom optimizers can be single use with a bunch of “hard-coded” logic, or you can try to make them more general, by exposing certain configurability.
- Parameters:
- pipeline
A tpcp pipeline with some hyper-parameters that should be optimized. This can either be a normal pipeline or an optimizable-pipeline. This fully depends on your implementation of the
create_objective
method.- create_study
A callable that returns an optuna study instance to be used for the optimization. It will be called as part of the
optimize
method without parameters. The resulting study object can be accessed viaself.study_
after the optimization is finished. Creating the study is handled via a callable, instead of providing the study object itself, to make it possible to create individual studies, when CustomOptuna optimize is called by an external wrapper (i.e.cross_validate
).- n_trials
The number of trials. If this argument is set to
None
, there is no limitation on the number of trials. In this case you should usetimeout
instead. Because optuna is called internally by this wrapper, you can not set up a study without limits and end it using CTRL+C (as suggested by the Optuna docs). In this case the entire execution flow would be stopped.- timeout
Stop study after the given number of second(s). If this argument is set to
None
, the study is executed without time limitation. In this case you should usen_trials
to limit the execution.- return_optimized
If True, a pipeline object with the overall best parameters is created and re-optimized using all provided data as input. The optimized pipeline object is stored as
optimized_pipeline_
. How the “re-optimization” works depends on the type of pipeline provided. If it is a simple pipeline, no specific re-optimization will be perfomed andoptimized_pipeline_
will simply be an instance of the pipeline with the best parameters indentified in the search. Whenpipeline
is a subclass ofOptimizablePipeline
, we attempt to callpipeline.self_optimize
with the entire dataset provided to theoptimize
method. The result of this self-optimization will be set asoptimized_pipeline
. If this behaviour is undesired, you can overwrite thereturn_optimized_pipeline
method in subclass.s- callbacks
List of callback functions that are invoked at the end of each trial. Each function must accept two parameters with the following types in this order:
Study
andFrozenTrial
.- n_jobs
Number of parallel jobs to use (default = 1 -> single process, -1 -> all available cores). This uses joblib with the multiprocessing backend to parallelize the optimization. If this is set to -1, all available cores are used.
Warning
Read the notes on multiprocessing below before using this feature.
- eval_str_paras
This can be a sequence (tuple/list) of parameter names used by Optuna that should be evaluated using
literal_eval
instead of just set as string on the pipeline. The main usecase of this is to allow the user to pass a list of strings tosuggest_categorical
but have the actual pipeline recive the evaluated value of this string. This is required, as many storage backends of optuna only support number or strings as categorical values.A typical example would be wanting to select a set of axis for an algorithm that are expressed as a list/tuple of strings. In this case you would use a strinigfied version of these tuples as the categorical values in the optuna study and then use
eval_str_paras
to evaluate the stringified version to the actual tuple.>>> def search_space(trial): ... trial.suggest_categorical("axis", ["('x',)", "('y',)", "('z',)", "('x', 'y')"]) >>> optuna_opt = CustomOptunaOptimize(pipeline, ..., eval_str_paras=["axis"])
Note, that in your custom subclass, you need to wrap the trial params in
self.sanitize_params
to make sure this sanitazation is applied to all parameters.>>> # Inside your custom subclass >>> def create_objective(self): ... def objective(trial, pipeline, dataset): ... params = self.sanitize_params(trial.params) ... pipeline.set_params(**params)
In all other places (e.g. when converting the final result table) this class handles sanitazation automatically.
- show_progress_bar
Flag to show progress bars or not.
- gc_after_trial
Run the garbage collector after each trial. Check the optuna documentation for more detail
- Other Parameters:
- dataset
The dataset instance passed to the optimize method
- Attributes:
search_results_
Detailed results of the study.
- optimized_pipeline_
An instance of the input pipeline with the best parameter set. This is only available if
return_optimized
is not False.best_params_
Parameters of the best trial in the
Study
.best_score_
Best score reached in the study.
best_trial_
Best trial in the
Study
.- study_
The study object itself. This should usually be identical to
self.study
.
Notes
Multiprocessing#
This class provides a relatively hacky implementation of multiprocessing. The implementation is based on the suggestions made here: optuna/optuna#2862 However, it depends on internal optuna APIs and might break in future versions.
To use multiprocessing, the provided
create_study
function must return a study with a persistent backend ( i.e. not the default InMemoryStorage), that can be written to by multiple processes. To make sure that your individual runs are independent and you don’t leave behind any files, make sure you clean up your study after each run. You can useoptuna.delete_study(study_name=opti_instance.study_.study_name, storage=opti_instance.study_._storage)
for this.From the implementation perspective, we split the number of trials into
n_jobs
chunks and then spawn one study per job. This study is a copy of the study from the main process and hence, points to the same database. Each process will then complete its chunk of trials and then terminate. This is a relatively naive implementation, but it avoids the overhead of spawning a new process for each trial. If this is always the best idea, is unclear.One downside of using multiprocessing is, that your runs will not be reproducible, as the order of the trials is not guraranteed and depends on when the individual processes finish. This can lead to different suggested parameters when non-trivial samplers are used. Note that this is not a specific problem of our implementation, but a general problem of using multiprocessing with optuna.
Further, the use of
show_progress_bar
is not recommended when using multiprocessing, as one progress bar per process is created and the output is not very readable. It might still be helpful to see that something is happening.Note
Using SQLite as backend is known to cause issues with multiprocessing, when the database is stored on a network drive (e.g. as typically done on a cluster). On most clusters, you should use the local storage of your node for the database or use a different backend (e.g. Redis, MySQL), if multiple nodes need to access the database at once.
Examples
>>> from tpcp.validate import Scorer >>> from optuna import create_study >>> from optuna import samplers >>> >>> class MyOptunaOptimizer(CustomOptunaOptimize): ... def create_objective(self): ... def objective(trial: Trial, pipeline: Pipeline, dataset: Dataset): ... trial.suggest_float("my_pipeline_para", 0, 3) ... mean_score, _ = Scorer(lambda pipe, dp: pipe.score(dp))(pipeline, dataset) ... return mean_score ... return objective >>> >>> study = create_study(sampler=samplers.RandomSampler()) >>> opti = MyOptunaOptimizer(pipeline=MyPipeline(), study=study, n_trials=10) >>> opti = opti.optimize(MyDataset())
Methods
as_attrs
()Return a version of the Dataset class that can be subclassed using
attrs
defined classes.Return a version of the Dataset class that can be subclassed using dataclasses.
clone
()Create a new instance of the class with all parameters copied over.
Return the objective function that should be optimized.
get_params
([deep])Get parameters for this algorithm.
optimize
(dataset, **_)Optimize the objective over the dataset and find the best parameter combination.
return_optimized_pipeline
(pipeline, dataset, ...)Return the pipeline with the best parameters of a study.
run
(datapoint)Run the optimized pipeline.
safe_run
(datapoint)Run the optimized pipeline.
sanitize_params
(params)Sanatize the parameters of a trial.
score
(datapoint)Run score of the optimized pipeline.
set_params
(**params)Set the parameters of this Algorithm.
- __init__(pipeline: PipelineT, create_study: Callable[[], Study], *, n_trials: int | None = None, timeout: float | None = None, callbacks: List[Callable[[Study, FrozenTrial], None]] | None = None, gc_after_trial: bool = False, n_jobs: int = 1, eval_str_paras: Sequence[str] = (), show_progress_bar: bool = False, return_optimized: bool = True)[source]#
- _call_optimize(study: Study, objective: Callable[[Trial], float | Sequence[float]])[source]#
Call the optuna study.
This is a separate method to make it easy to modify how the study is called.
- static as_attrs()[source]#
Return a version of the Dataset class that can be subclassed using
attrs
defined classes.Note, this requires
attrs
to be installed!
- static as_dataclass()[source]#
Return a version of the Dataset class that can be subclassed using dataclasses.
- clone() Self [source]#
Create a new instance of the class with all parameters copied over.
This will create a new instance of the class itself and all nested objects
- create_objective() Callable[[Trial, PipelineT, DatasetT], float | Sequence[float]] [source]#
Return the objective function that should be optimized.
This method should be implemented by a child class and return an objective function that is compatible with Optuna. However, compared to a normal Optuna objective function, the function should expect a pipeline and a dataset object as additional inputs to the optimization Trial object.
- get_params(deep: bool = True) Dict[str, Any] [source]#
Get parameters for this algorithm.
- Parameters:
- deep
Only relevant if object contains nested algorithm objects. If this is the case and deep is True, the params of these nested objects are included in the output using a prefix like
nested_object_name__
(Note the two “_” at the end)
- Returns:
- params
Parameter names mapped to their values.
- optimize(dataset: DatasetT, **_: Any) Self [source]#
Optimize the objective over the dataset and find the best parameter combination.
This method calls
self.create_objective
to obtain the objective function that should be optimized.- Parameters:
- dataset
The dataset used for optimization.
- return_optimized_pipeline(pipeline: PipelineT, dataset: DatasetT, study: Study) PipelineT [source]#
Return the pipeline with the best parameters of a study.
This either just returns the pipeline with the best parameters set, or if the pipeline is a subclass of
OptimizablePipeline
it attempts a re-optimization of the pipeline using the provided dataset.This functionality is a sensible default, but it is expected to overwrite this method in custom subclasses, if specific behaviour is needed.
Don’t call this function on its own! It is only expected to be called internally by optimize.
- run(datapoint: DatasetT) PipelineT [source]#
Run the optimized pipeline.
This is a wrapper to contain API compatibility with
Pipeline
.
- safe_run(datapoint: DatasetT) PipelineT [source]#
Run the optimized pipeline.
This is a wrapper to contain API compatibility with
Pipeline
.
- sanitize_params(params: Dict[str, Any]) Dict[str, Any] [source]#
Sanatize the parameters of a trial.
This will apply the str evaluation controlled by
self.eval_str_paras
to the parameters. Call this method before passing the parameters to the pipeline in your objective function.