.. DO NOT EDIT. .. THIS FILE WAS AUTOMATICALLY GENERATED BY SPHINX-GALLERY. .. TO MAKE CHANGES, EDIT THE SOURCE PYTHON FILE: .. "auto_examples/validation/_03_custom_scorer.py" .. LINE NUMBERS ARE GIVEN BELOW. .. only:: html .. note:: :class: sphx-glr-download-link-note :ref:`Go to the end ` to download the full example code .. rst-class:: sphx-glr-example-title .. _sphx_glr_auto_examples_validation__03_custom_scorer.py: .. _custom_scorer: Custom Scorer ============= Scorer or scoring functions are used in tpcp whenever we need to rank any form of output. For examples, after a GridSearch, we want to know which pipeline is the best. This is done by a function, that takes a pipeline and a datapoint as an input and returns one or multiple score. These scores are then averaged over all datapoints provided. However, sometimes this is not exactly what we want. In this case, you need to create a custom scorer or custom aggregator to also control how scores are averaged over all datapoints. In the following, we will demonstrate solutions for two typical usecases: 1. Instead of averaging the scores you want to use another metric (e.g. median) or you want to weight the scores based on the datatype. 2. You want to calculate a score, that can not be first aggregated on a datapoint level. This can happen, if each datapoint has multiple events. If you score (e.g. F1 score) on each datapoint first, you will get a different result, compared to calculating the F1 score across all events of a dataset, independent of the datapoint they belong to. (Note, which of the two cases you want will depend on your usecase and the data distributions per datapoint) .. GENERATED FROM PYTHON SOURCE LINES 27-30 .. code-block:: default from collections.abc import Sequence from pathlib import Path .. GENERATED FROM PYTHON SOURCE LINES 31-35 Setup ----- We will simply reuse the pipline from the general QRS detection example. For all of our custom scorer, we will use this pipeline and apply it to all datapoints of the ECG example dataset. .. GENERATED FROM PYTHON SOURCE LINES 35-72 .. code-block:: default import pandas as pd from examples.algorithms.algorithms_qrs_detection_final import ( QRSDetector, match_events_with_reference, precision_recall_f1_score, ) from examples.datasets.datasets_final_ecg import ECGExampleData from tpcp import Parameter, Pipeline, cf try: HERE = Path(__file__).parent except NameError: HERE = Path().resolve() data_path = HERE.parent.parent / "example_data/ecg_mit_bih_arrhythmia/data" example_data = ECGExampleData(data_path) class MyPipeline(Pipeline[ECGExampleData]): algorithm: Parameter[QRSDetector] r_peak_positions_: pd.Series def __init__(self, algorithm: QRSDetector = cf(QRSDetector())): self.algorithm = algorithm def run(self, datapoint: ECGExampleData): # Note: We need to clone the algorithm instance, to make sure we don't leak any data between runs. algo = self.algorithm.clone() algo.detect(datapoint.data["ecg"], datapoint.sampling_rate_hz) self.r_peak_positions_ = algo.r_peak_positions_ return self pipe = MyPipeline() .. GENERATED FROM PYTHON SOURCE LINES 73-78 Custom Median Scorer -------------------- To create a custom score aggregation, we first need a score function. We will use a similar score function as we used in the QRS detection example. It returns the precision, recall and f1 score of the QRS detection for each datapoint. .. GENERATED FROM PYTHON SOURCE LINES 78-95 .. code-block:: default def score(pipeline: MyPipeline, datapoint: ECGExampleData): # We use the `safe_run` wrapper instead of just run. This is always a good idea. # We don't need to clone the pipeline here, as GridSearch will already clone the pipeline internally and `run` # will clone it again. pipeline = pipeline.safe_run(datapoint) tolerance_s = 0.02 # We just use 20 ms for this example matches = match_events_with_reference( pipeline.r_peak_positions_.to_numpy(), datapoint.r_peak_positions_.to_numpy(), tolerance=tolerance_s * datapoint.sampling_rate_hz, ) precision, recall, f1_score = precision_recall_f1_score(matches) return {"precision": precision, "recall": recall, "f1_score": f1_score} .. GENERATED FROM PYTHON SOURCE LINES 96-98 By default, these values will be aggregated by averaging over all datapoints. We can see that by running an instance of the scorer on the example dataset. .. GENERATED FROM PYTHON SOURCE LINES 98-103 .. code-block:: default from tpcp.validate import Scorer baseline_results_agg, baseline_results_single = Scorer(score)(pipe, example_data) baseline_results_agg .. rst-class:: sphx-glr-script-out .. code-block:: none Datapoints: 0%| | 0/12 [00:00 float: print("Median Aggregator called") try: return float(np.median(values)) except TypeError as e: raise ValidationError( f"MedianAggregator can only be used with float values. Got the following values instead:\nn{values}" ) from e .. GENERATED FROM PYTHON SOURCE LINES 139-146 We can apply this Aggregator in two ways: 1. By using it as `default_aggregator` in the Scorer constructor. In this case, the aggregator will be used for all scores. 2. By wrapping specific return values of the score method. Let's start with the first way. .. GENERATED FROM PYTHON SOURCE LINES 146-148 .. code-block:: default median_results_agg, median_results_single = Scorer(score, default_aggregator=MedianAggregator)(pipe, example_data) median_results_agg .. rst-class:: sphx-glr-script-out .. code-block:: none Datapoints: 0%| | 0/12 [00:00 dict[str, float]: print("MeanAndStdAggregator Aggreagtor called") try: return {"mean": float(np.mean(values)), "std": float(np.std(values))} except TypeError as e: raise ValidationError( "MeanAndStdAggregator can only be used with float values. " f"Got the following values instead:\n\n{values}" ) from e multi_agg_agg, multi_agg_single = Scorer(score, default_aggregator=MeanAndStdAggregator)(pipe, example_data) .. rst-class:: sphx-glr-script-out .. code-block:: none Datapoints: 0%| | 0/12 [00:00 dict[str, float]: print("SingleValuePrecisionRecallF1 Aggregator called") precision, recall, f1_score = precision_recall_f1_score(np.vstack(values)) return {"precision": precision, "recall": recall, "f1_score": f1_score} def score(pipeline: MyPipeline, datapoint: ECGExampleData): # We use the `safe_run` wrapper instead of just run. This is always a good idea. # We don't need to clone the pipeline here, as GridSearch will already clone the pipeline internally and `run` # will clone it again. pipeline = pipeline.safe_run(datapoint) tolerance_s = 0.02 # We just use 20 ms for this example matches = match_events_with_reference( pipeline.r_peak_positions_.to_numpy(), datapoint.r_peak_positions_.to_numpy(), tolerance=tolerance_s * datapoint.sampling_rate_hz, ) precision, recall, f1_score = precision_recall_f1_score(matches) return { "precision": precision, "recall": recall, "f1_score": f1_score, "per_sample": SingleValuePrecisionRecallF1(matches), } .. GENERATED FROM PYTHON SOURCE LINES 271-274 We can see that we now get the values per datapoint (as before) and the values without previous aggregation. From a scientific perspective, we can see that these values are quite different. Again, which version to choose for scoring will depend on the use case. .. GENERATED FROM PYTHON SOURCE LINES 274-277 .. code-block:: default complicated_agg, complicated_single = Scorer(score)(pipe, example_data) complicated_agg .. rst-class:: sphx-glr-script-out .. code-block:: none Datapoints: 0%| | 0/12 [00:00

ECGExampleData [12 groups/rows]

patient_group participant
0 group_1 100
1 group_2 102
2 group_3 104
3 group_1 105
4 group_2 106
5 group_3 108
6 group_1 114
7 group_2 116
8 group_3 119
9 group_1 121
10 group_2 123
11 group_3 200


.. GENERATED FROM PYTHON SOURCE LINES 308-313 For this our aggregator will use the `datapoint` parameter to find out which group the datapoint belongs and then average the values using pandas groupby function. We also return the values of the individual groups. Note that we must return everything as a dict of float values. .. GENERATED FROM PYTHON SOURCE LINES 313-323 .. code-block:: default class GroupWeightedAggregator(Aggregator[float]): @classmethod def aggregate(cls, /, values: Sequence[float], datapoints: Sequence[ECGExampleData], **_) -> dict[str, float]: print("GroupWeightedAggregator Aggregator called") patient_groups = [d.group_label.patient_group for d in datapoints] data = pd.DataFrame({"value": values, "patient_groups": patient_groups}) per_group = data.groupby("patient_groups").mean()["value"] return {**per_group.to_dict(), "group_mean": per_group.mean()} .. GENERATED FROM PYTHON SOURCE LINES 324-326 In our score function, we wrap the f1-score with the new aggregator (we could of cause also wrap the others, or use the `default_aggregator` parameter). .. GENERATED FROM PYTHON SOURCE LINES 326-345 .. code-block:: default def score(pipeline: MyPipeline, datapoint: ECGExampleData): # We use the `safe_run` wrapper instead of just run. This is always a good idea. # We don't need to clone the pipeline here, as GridSearch will already clone the pipeline internally and `run` # will clone it again. pipeline = pipeline.safe_run(datapoint) tolerance_s = 0.02 # We just use 20 ms for this example matches = match_events_with_reference( pipeline.r_peak_positions_.to_numpy(), datapoint.r_peak_positions_.to_numpy(), tolerance=tolerance_s * datapoint.sampling_rate_hz, ) precision, recall, f1_score = precision_recall_f1_score(matches) return {"precision": precision, "recall": recall, "f1_score": GroupWeightedAggregator(f1_score)} group_weighted_agg, group_weighted_single = Scorer(score)(pipe, example_data) group_weighted_agg .. rst-class:: sphx-glr-script-out .. code-block:: none Datapoints: 0%| | 0/12 [00:00` .. container:: sphx-glr-download sphx-glr-download-jupyter :download:`Download Jupyter notebook: _03_custom_scorer.ipynb <_03_custom_scorer.ipynb>` .. only:: html .. rst-class:: sphx-glr-signature `Gallery generated by Sphinx-Gallery `_