.. DO NOT EDIT. .. THIS FILE WAS AUTOMATICALLY GENERATED BY SPHINX-GALLERY. .. TO MAKE CHANGES, EDIT THE SOURCE PYTHON FILE: .. "auto_examples/validation/_03_custom_scorer.py" .. LINE NUMBERS ARE GIVEN BELOW. .. only:: html .. note:: :class: sphx-glr-download-link-note :ref:`Go to the end ` to download the full example code .. rst-class:: sphx-glr-example-title .. _sphx_glr_auto_examples_validation__03_custom_scorer.py: .. _custom_scorer: Custom Scorer ============= Scorer or scoring functions are used in tpcp whenever we need to rank any form of output. For examples, after a GridSearch, we want to know which pipeline is the best. This is done by a function, that takes a pipeline and a datapoint as an input and returns one or multiple score. These scores are then averaged over all datapoints provided. However, sometimes this is not exactly what we want. In this case, you need to create a custom scorer or custom aggregator to also control how scores are averaged over all datapoints. Four general usecases arise for custom scorers: 1. You actually don't want to score anything, but just want to collect some metadata, or pass results out of the method unchanged for later analysis. This can be easily done using :func:`~tpcp.validate.no_agg` (See first example below) 2. You can properly calculate a performance value on a single datapoint, but you don't want to take the mean over all datapoints, but rather use a different aggregation metrics (e.g. median, ...). This can be done by using the existing :class:`~tpcp.validate.FloatAggregator` class with a new function (See second and third example below) 3. Similar to 3, but you require additional information passed through the aggregation function. This could be the datapoints itself (e.g. to calculate a Macro Average) or some other metadata required for the aggregation. This can be done by inheriting from the :class:`~tpcp.validate.Aggregator` class and implementing the `aggregate` method (See fourth example below). 4. You want to calculate a score, that can not be first aggregated on a datapoint level. For example, you are detecting events in a dataset and you want to calculate the F1 score across all events of a dataset, without first aggregating the F1 score on a datapoint level. .. GENERATED FROM PYTHON SOURCE LINES 33-37 .. code-block:: default from collections.abc import Sequence from pathlib import Path .. GENERATED FROM PYTHON SOURCE LINES 38-42 Setup ----- We will simply reuse the pipline from the general QRS detection example. For all of our custom scorer, we will use this pipeline and apply it to all datapoints of the ECG example dataset. .. GENERATED FROM PYTHON SOURCE LINES 42-82 .. code-block:: default from examples.algorithms.algorithms_qrs_detection_final import ( match_events_with_reference, ) from examples.datasets.datasets_final_ecg import ECGExampleData try: HERE = Path(__file__).parent except NameError: HERE = Path().resolve() data_path = HERE.parent.parent / "example_data/ecg_mit_bih_arrhythmia/data" example_data = ECGExampleData(data_path) import pandas as pd from joblib.memory import Memory from tpcp import Parameter, Pipeline, cf from examples.algorithms.algorithms_qrs_detection_final import ( QRSDetector, precision_recall_f1_score, ) from examples.datasets.datasets_final_ecg import ECGExampleData class MyPipeline(Pipeline[ECGExampleData]): algorithm: Parameter[QRSDetector] r_peak_positions_: pd.Series def __init__(self, algorithm: QRSDetector = cf(QRSDetector())): self.algorithm = algorithm def run(self, datapoint: ECGExampleData): # Note: We need to clone the algorithm instance, to make sure we don't leak any data between runs. algo = self.algorithm.clone() algo.detect(datapoint.data["ecg"], datapoint.sampling_rate_hz) self.r_peak_positions_ = algo.r_peak_positions_ return self .. GENERATED FROM PYTHON SOURCE LINES 83-84 We set up a global cache for our pipeline to speed up the repeated evaluation we do below. .. GENERATED FROM PYTHON SOURCE LINES 84-94 .. code-block:: default from tpcp.caching import global_disk_cache global_disk_cache( memory=Memory("./.cache"), restore_in_parallel_process=True, action_method_name="run", )(MyPipeline) pipe = MyPipeline() .. rst-class:: sphx-glr-script-out .. code-block:: none /home/docs/checkouts/readthedocs.org/user_builds/tpcp/checkouts/v2.2.0/examples/validation/_03_custom_scorer.py:86: UserWarning: Global caching is a little tricky to get right and our implementation is not yet battle-tested. Please double check that the results are correct and report any issues you find. global_disk_cache( .. GENERATED FROM PYTHON SOURCE LINES 95-110 No Aggregation -------------- Sometimes you might want to return data from a score function that should not be aggregated. This could be arbitrary metadata or scores will value that can not be averaged. In this case you can simply use the :func:`~tpcp.validate.no_agg` aggregator. This will return only the single values and no aggregated items. In the example below, we will calculate the precision, recall and f1-score for each datapoint and in addition return the number of labeled reference values as "metadata". This metadata will not be aggregated, but still be available in the single results. .. note:: At the moment we don't support returning only no-aggregated from a scorer. At least one value must be aggregated, so that it can be used to rank results. If you really need this (e.g. in combination with :func:`~tpcp.validate.validate`), you can return a dummy value that is not used in the aggregation. .. GENERATED FROM PYTHON SOURCE LINES 110-133 .. code-block:: default from tpcp.validate import no_agg def score(pipeline: MyPipeline, datapoint: ECGExampleData): # We use the `safe_run` wrapper instead of just run. This is always a good idea. # We don't need to clone the pipeline here, as GridSearch will already clone the pipeline internally and `run` # will clone it again. pipeline = pipeline.safe_run(datapoint) tolerance_s = 0.02 # We just use 20 ms for this example matches = match_events_with_reference( pipeline.r_peak_positions_.to_numpy(), datapoint.r_peak_positions_.to_numpy(), tolerance=tolerance_s * datapoint.sampling_rate_hz, ) precision, recall, f1_score = precision_recall_f1_score(matches) return { "precision": precision, "recall": recall, "f1_score": f1_score, "n_labels": no_agg(len(datapoint.r_peak_positions_)), } .. GENERATED FROM PYTHON SOURCE LINES 134-135 We can see that the n_labels is not contained in the aggregated results. .. GENERATED FROM PYTHON SOURCE LINES 135-140 .. code-block:: default from tpcp.validate import Scorer no_agg_agg, no_agg_single = Scorer(score)(pipe, example_data) no_agg_agg .. rst-class:: sphx-glr-script-out .. code-block:: none Datapoints: 0%| | 0/12 [00:00.inner..cached_action_method... cached_action_method(MyPipeline(algorithm=QRSDetector(high_pass_filter_cutoff_hz=1, max_heart_rate_bpm=200.0, min_r_peak_height_over_baseline=1.0)), None, 'run', ECGExampleData [1 groups/rows] patient_group participant 0 group_1 100) _____________________________________________cached_action_method - 0.0s, 0.0min ________________________________________________________________________________ [Memory] Calling tpcp.caching.global_disk_cache..inner..cached_action_method... cached_action_method(MyPipeline(algorithm=QRSDetector(high_pass_filter_cutoff_hz=1, max_heart_rate_bpm=200.0, min_r_peak_height_over_baseline=1.0)), None, 'run', ECGExampleData [1 groups/rows] patient_group participant 0 group_2 102) _____________________________________________cached_action_method - 0.0s, 0.0min Datapoints: 17%|█▋ | 2/12 [00:00<00:00, 16.43it/s]________________________________________________________________________________ [Memory] Calling tpcp.caching.global_disk_cache..inner..cached_action_method... cached_action_method(MyPipeline(algorithm=QRSDetector(high_pass_filter_cutoff_hz=1, max_heart_rate_bpm=200.0, min_r_peak_height_over_baseline=1.0)), None, 'run', ECGExampleData [1 groups/rows] patient_group participant 0 group_3 104) _____________________________________________cached_action_method - 0.0s, 0.0min ________________________________________________________________________________ [Memory] Calling tpcp.caching.global_disk_cache..inner..cached_action_method... cached_action_method(MyPipeline(algorithm=QRSDetector(high_pass_filter_cutoff_hz=1, max_heart_rate_bpm=200.0, min_r_peak_height_over_baseline=1.0)), None, 'run', ECGExampleData [1 groups/rows] patient_group participant 0 group_1 105) _____________________________________________cached_action_method - 0.0s, 0.0min Datapoints: 33%|███▎ | 4/12 [00:00<00:00, 17.57it/s]________________________________________________________________________________ [Memory] Calling tpcp.caching.global_disk_cache..inner..cached_action_method... cached_action_method(MyPipeline(algorithm=QRSDetector(high_pass_filter_cutoff_hz=1, max_heart_rate_bpm=200.0, min_r_peak_height_over_baseline=1.0)), None, 'run', ECGExampleData [1 groups/rows] patient_group participant 0 group_2 106) _____________________________________________cached_action_method - 0.0s, 0.0min ________________________________________________________________________________ [Memory] Calling tpcp.caching.global_disk_cache..inner..cached_action_method... cached_action_method(MyPipeline(algorithm=QRSDetector(high_pass_filter_cutoff_hz=1, max_heart_rate_bpm=200.0, min_r_peak_height_over_baseline=1.0)), None, 'run', ECGExampleData [1 groups/rows] patient_group participant 0 group_3 108) _____________________________________________cached_action_method - 0.0s, 0.0min Datapoints: 50%|█████ | 6/12 [00:00<00:00, 18.19it/s]________________________________________________________________________________ [Memory] Calling tpcp.caching.global_disk_cache..inner..cached_action_method... cached_action_method(MyPipeline(algorithm=QRSDetector(high_pass_filter_cutoff_hz=1, max_heart_rate_bpm=200.0, min_r_peak_height_over_baseline=1.0)), None, 'run', ECGExampleData [1 groups/rows] patient_group participant 0 group_1 114) _____________________________________________cached_action_method - 0.0s, 0.0min ________________________________________________________________________________ [Memory] Calling tpcp.caching.global_disk_cache..inner..cached_action_method... cached_action_method(MyPipeline(algorithm=QRSDetector(high_pass_filter_cutoff_hz=1, max_heart_rate_bpm=200.0, min_r_peak_height_over_baseline=1.0)), None, 'run', ECGExampleData [1 groups/rows] patient_group participant 0 group_2 116) _____________________________________________cached_action_method - 0.0s, 0.0min Datapoints: 67%|██████▋ | 8/12 [00:00<00:00, 18.39it/s]________________________________________________________________________________ [Memory] Calling tpcp.caching.global_disk_cache..inner..cached_action_method... cached_action_method(MyPipeline(algorithm=QRSDetector(high_pass_filter_cutoff_hz=1, max_heart_rate_bpm=200.0, min_r_peak_height_over_baseline=1.0)), None, 'run', ECGExampleData [1 groups/rows] patient_group participant 0 group_3 119) _____________________________________________cached_action_method - 0.0s, 0.0min ________________________________________________________________________________ [Memory] Calling tpcp.caching.global_disk_cache..inner..cached_action_method... cached_action_method(MyPipeline(algorithm=QRSDetector(high_pass_filter_cutoff_hz=1, max_heart_rate_bpm=200.0, min_r_peak_height_over_baseline=1.0)), None, 'run', ECGExampleData [1 groups/rows] patient_group participant 0 group_1 121) _____________________________________________cached_action_method - 0.0s, 0.0min Datapoints: 83%|████████▎ | 10/12 [00:00<00:00, 16.53it/s]________________________________________________________________________________ [Memory] Calling tpcp.caching.global_disk_cache..inner..cached_action_method... cached_action_method(MyPipeline(algorithm=QRSDetector(high_pass_filter_cutoff_hz=1, max_heart_rate_bpm=200.0, min_r_peak_height_over_baseline=1.0)), None, 'run', ECGExampleData [1 groups/rows] patient_group participant 0 group_2 123) _____________________________________________cached_action_method - 0.0s, 0.0min ________________________________________________________________________________ [Memory] Calling tpcp.caching.global_disk_cache..inner..cached_action_method... cached_action_method(MyPipeline(algorithm=QRSDetector(high_pass_filter_cutoff_hz=1, max_heart_rate_bpm=200.0, min_r_peak_height_over_baseline=1.0)), None, 'run', ECGExampleData [1 groups/rows] patient_group participant 0 group_3 200) _____________________________________________cached_action_method - 0.0s, 0.0min Datapoints: 100%|██████████| 12/12 [00:00<00:00, 15.49it/s] Datapoints: 100%|██████████| 12/12 [00:00<00:00, 16.44it/s] {'precision': 0.9929358534618008, 'recall': 0.6737755326205007, 'f1_score': 0.7089727629059107} .. GENERATED FROM PYTHON SOURCE LINES 141-142 But we can still access the value in the single results. .. GENERATED FROM PYTHON SOURCE LINES 142-144 .. code-block:: default no_agg_single["n_labels"] .. rst-class:: sphx-glr-script-out .. code-block:: none [2273, 2187, 2229, 2572, 2027, 1763, 1879, 2412, 1987, 1863, 1518, 2601] .. GENERATED FROM PYTHON SOURCE LINES 145-156 Custom Median Scorer -------------------- If we want to change the way the scores are aggregated, we can use a custom aggregator. For simple cases, this does not require to implement a new class, but we can use the :class:`~tpcp.validate.FloatAggregator` directly. It assumes that we have a function that takes a sequence of floats and returns a float. Aggregators are simply instances of the :class:`~tpcp.validate.Aggregator` classes. So we can create a new instance of the :class:`~tpcp.validate.FloatAggregator` with a new function. Below we simply use the median as an example. .. GENERATED FROM PYTHON SOURCE LINES 156-161 .. code-block:: default import numpy as np from tpcp.validate import FloatAggregator median_agg = FloatAggregator(np.median) .. GENERATED FROM PYTHON SOURCE LINES 162-164 Then we reuse the score function from before and wrap the F1-score with the median aggregator. For all other values, the default aggregator will be used (which is the mean). .. GENERATED FROM PYTHON SOURCE LINES 164-188 .. code-block:: default # .. warning:: Note, that you score function must return the same aggregator for a scores across all datapoints. # If not, we will raise an error! def score(pipeline: MyPipeline, datapoint: ECGExampleData): # We use the `safe_run` wrapper instead of just run. This is always a good idea. # We don't need to clone the pipeline here, as GridSearch will already clone the pipeline internally and `run` # will clone it again. pipeline = pipeline.safe_run(datapoint) tolerance_s = 0.02 # We just use 20 ms for this example matches = match_events_with_reference( pipeline.r_peak_positions_.to_numpy(), datapoint.r_peak_positions_.to_numpy(), tolerance=tolerance_s * datapoint.sampling_rate_hz, ) precision, recall, f1_score = precision_recall_f1_score(matches) return { "precision": precision, "recall": recall, "f1_score": f1_score, "median_f1_score": median_agg(f1_score), } .. GENERATED FROM PYTHON SOURCE LINES 189-192 .. code-block:: default median_results_agg, median_results_single = Scorer(score)(pipe, example_data) median_results_agg .. rst-class:: sphx-glr-script-out .. code-block:: none Datapoints: 0%| | 0/12 [00:00 is currently using SeriesGroupBy.mean. In a future version of pandas, the provided callable will be used directly. To keep current behavior pass the string "mean" instead. per_group = data.groupby(self.groupby).agg(self.group_agg) {'precision': 0.9929358534618008, 'recall': 0.6737755326205007, 'f1_score__group_1': 0.5039949762609272, 'f1_score__group_2': 0.9405469879110494, 'f1_score__group_3': 0.6823763245457557, 'f1_score__macro': np.float64(0.7089727629059107)} .. GENERATED FROM PYTHON SOURCE LINES 309-310 The raw values are still available in the single results. .. GENERATED FROM PYTHON SOURCE LINES 310-312 .. code-block:: default macro_single["f1_score"] .. rst-class:: sphx-glr-script-out .. code-block:: none [np.float64(0.9993396434074401), np.float64(0.8673338465486272), np.float64(0.9336437718277065), np.float64(0.9787896477913991), np.float64(0.9010989010989011), np.float64(0.08473655621944595), np.float64(0.03143006809848088), np.float64(0.9937552039966694), np.float64(0.99874213836478), np.float64(0.006420545746388443), np.float64(1.0), np.float64(0.7123828317710903)] .. GENERATED FROM PYTHON SOURCE LINES 313-317 So far we did not need to implement a fully custom aggregation, as we `tpcp` could provide helper funcs for typical usecases. However, if you need to do more complicated things with your score values, or pass other things than floats to your scores, you will need a custom aggregator as shown in the next example. .. GENERATED FROM PYTHON SOURCE LINES 319-343 Fully Custom Aggregation ------------------------ In the next example, we want to aggregate on a "lower" level than a single datapoint. In the previous example, where we wanted to aggregate first on a "higher" level than a single datapoint. In this case we could provide tpcp-helper, as the higher levels were defined by the used `Dataset`. Hence, we could make some assumptions about how the passed data will look like. However, if you want to go more granular as a single datapoint, we can not know what datastructures you are dealing with. Therefore, you need to create a completely custom aggregation by subclassing :class:`~tpcp.validate.Aggregator`. Below we show an example, where we calculate the precision, recall and f1-score without aggregating on a datapoint level, but rather first combining all predictions and references across all datapoints before calculating the precision, recall and f1-score. There are no restrictions on the data you can pass from the scorer. Only the aggregator needs to be able to handle the values and then return a float or a dict with float values. In this example, we will use a custom aggregator to calculate the precision, recall and f1-score without aggregating on a datapoint level first. For that we return the raw `matches` from the score function and wrap them into an aggregator that concatenates all of them, before throwing them into the `precision_recall_f1_score` function. Note, that the actual aggregation is an instance of our custom class, NOT the class itself. .. GENERATED FROM PYTHON SOURCE LINES 343-380 .. code-block:: default from tpcp.validate import Aggregator class SingleValuePrecisionRecallF1(Aggregator[np.ndarray]): def aggregate( self, /, values: Sequence[np.ndarray], **_ ) -> dict[str, float]: print("SingleValuePrecisionRecallF1 Aggregator called") precision, recall, f1_score = precision_recall_f1_score( np.vstack(values) ) return {"precision": precision, "recall": recall, "f1_score": f1_score} single_value_precision_recall_f1_agg = SingleValuePrecisionRecallF1() def score(pipeline: MyPipeline, datapoint: ECGExampleData): # We use the `safe_run` wrapper instead of just run. This is always a good idea. # We don't need to clone the pipeline here, as GridSearch will already clone the pipeline internally and `run` # will clone it again. pipeline = pipeline.safe_run(datapoint) tolerance_s = 0.02 # We just use 20 ms for this example matches = match_events_with_reference( pipeline.r_peak_positions_.to_numpy(), datapoint.r_peak_positions_.to_numpy(), tolerance=tolerance_s * datapoint.sampling_rate_hz, ) precision, recall, f1_score = precision_recall_f1_score(matches) return { "precision": precision, "recall": recall, "f1_score": f1_score, "per_sample": single_value_precision_recall_f1_agg(matches), } .. GENERATED FROM PYTHON SOURCE LINES 381-384 We can see that we now get the values per datapoint (as before) and the values without previous aggregation. From a scientific perspective, we can see that these values are quite different. Again, which version to choose for scoring will depend on the use case. .. GENERATED FROM PYTHON SOURCE LINES 384-387 .. code-block:: default complicated_agg, complicated_single = Scorer(score)(pipe, example_data) complicated_agg .. rst-class:: sphx-glr-script-out .. code-block:: none Datapoints: 0%| | 0/12 [00:00 dict[str, float]: return self.func(np.vstack(values)) .. GENERATED FROM PYTHON SOURCE LINES 461-462 With this our aggregator from before becomes just a special case of the new aggregator. .. GENERATED FROM PYTHON SOURCE LINES 462-497 .. code-block:: default def calculate_precision_recall_f1( matches: Sequence[np.ndarray], ) -> dict[str, float]: precision, recall, f1_score = precision_recall_f1_score(np.vstack(matches)) return {"precision": precision, "recall": recall, "f1_score": f1_score} single_value_precision_recall_f1_agg_from_gen = SingleValueAggregator( calculate_precision_recall_f1 ) def score(pipeline: MyPipeline, datapoint: ECGExampleData): # We use the `safe_run` wrapper instead of just run. This is always a good idea. # We don't need to clone the pipeline here, as GridSearch will already clone the pipeline internally and `run` # will clone it again. pipeline = pipeline.safe_run(datapoint) tolerance_s = 0.02 # We just use 20 ms for this example matches = match_events_with_reference( pipeline.r_peak_positions_.to_numpy(), datapoint.r_peak_positions_.to_numpy(), tolerance=tolerance_s * datapoint.sampling_rate_hz, ) precision, recall, f1_score = precision_recall_f1_score(matches) return { "precision": precision, "recall": recall, "f1_score": f1_score, "per_sample": single_value_precision_recall_f1_agg_from_gen(matches), } complicated_agg, complicated_single = Scorer(score)(pipe, example_data) complicated_agg .. rst-class:: sphx-glr-script-out .. code-block:: none Datapoints: 0%| | 0/12 [00:00` .. container:: sphx-glr-download sphx-glr-download-jupyter :download:`Download Jupyter notebook: _03_custom_scorer.ipynb <_03_custom_scorer.ipynb>` .. only:: html .. rst-class:: sphx-glr-signature `Gallery generated by Sphinx-Gallery `_