{
  "cells": [
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "%matplotlib inline"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "\n# Custom Scorer\n\nScorer or scoring functions are used in tpcp whenever we need to rank any form of output.\nFor examples, after a GridSearch, we want to know which pipeline is the best.\nThis is done by a function, that takes a pipeline and a datapoint as an input and returns one or multiple score.\nThese scores are then averaged over all datapoints provided.\n\nHowever, sometimes this is not exactly what we want.\nIn this case, you need to create a custom scorer or custom aggregator to also control how scores are averaged over all\ndatapoints.\n\nIn the following, we will demonstrate solutions for two typical usecases:\n\n1. Instead of averaging the scores you want to use another metric (e.g. median)\n2. You want to calculate a score, that can not be first aggregated on a datapoint level.\n   This can happen, if each datapoint has multiple events.\n   If you score (e.g. F1 score) on each datapoint first, you will get a different result, compared to calculating the F1\n   score across all events of a dataset, independent of the datapoint they belong to.\n   (Note, which of the two cases you want will depend on your usecase and the data distributions per datapoint)\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "from pathlib import Path"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Setup\nWe will simply reuse the pipline from the general QRS detection example.\nFor all of our custom scorer, we will use this pipeline and apply it to all datapoints of the ECG example dataset.\n\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "from typing import Dict, Sequence\n\nimport pandas as pd\n\nfrom examples.algorithms.algorithms_qrs_detection_final import (\n    QRSDetector,\n    match_events_with_reference,\n    precision_recall_f1_score,\n)\nfrom examples.datasets.datasets_final_ecg import ECGExampleData\nfrom tpcp import Parameter, Pipeline, cf\n\ntry:\n    HERE = Path(__file__).parent\nexcept NameError:\n    HERE = Path(\".\").resolve()\ndata_path = HERE.parent.parent / \"example_data/ecg_mit_bih_arrhythmia/data\"\nexample_data = ECGExampleData(data_path)\n\n\nclass MyPipeline(Pipeline[ECGExampleData]):\n    algorithm: Parameter[QRSDetector]\n\n    r_peak_positions_: pd.Series\n\n    def __init__(self, algorithm: QRSDetector = cf(QRSDetector())):\n        self.algorithm = algorithm\n\n    def run(self, datapoint: ECGExampleData):\n        # Note: We need to clone the algorithm instance, to make sure we don't leak any data between runs.\n        algo = self.algorithm.clone()\n        algo.detect(datapoint.data[\"ecg\"], datapoint.sampling_rate_hz)\n\n        self.r_peak_positions_ = algo.r_peak_positions_\n        return self\n\n\npipe = MyPipeline()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Custom Median Scorer\nTo create a custom score aggregation, we first need a score function.\nWe will use a similar score function as we used in the QRS detection example.\nIt returns the precision, recall and f1 score of the QRS detection for each datapoint.\n\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "def score(pipeline: MyPipeline, datapoint: ECGExampleData):\n    # We use the `safe_run` wrapper instead of just run. This is always a good idea.\n    # We don't need to clone the pipeline here, as GridSearch will already clone the pipeline internally and `run`\n    # will clone it again.\n    pipeline = pipeline.safe_run(datapoint)\n    tolerance_s = 0.02  # We just use 20 ms for this example\n    matches = match_events_with_reference(\n        pipeline.r_peak_positions_.to_numpy(),\n        datapoint.r_peak_positions_.to_numpy(),\n        tolerance=tolerance_s * datapoint.sampling_rate_hz,\n    )\n    precision, recall, f1_score = precision_recall_f1_score(matches)\n    return {\"precision\": precision, \"recall\": recall, \"f1_score\": f1_score}"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "By default, these values will be aggregated by averaging over all datapoints.\nWe can see that by running an instance of the scorer on the example dataset.\n\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "from tpcp.validate import Scorer\n\nbaseline_results_agg, baseline_results_single = Scorer(score)(pipe, example_data)\nbaseline_results_agg"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "baseline_results_single"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "The scorer provides the results per datapoint and the aggregated values.\nWe can see that the aggregation was performed using the average\n\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "import numpy as np\n\nassert baseline_results_agg[\"f1_score\"] == np.mean(baseline_results_single[\"f1_score\"])\n\nfrom tpcp.exceptions import ValidationError"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "We can change this behaviour by implementing a custom Aggregator.\nThis is a simple class inheriting from :class:`tpcp.validate.Aggregator`, implementing a `aggregate` class - method.\nBelow we have implemented a custom aggregator that calculates the median of the per-datapoint scores.\nIn addition, it prints a log message when it is called, so we can better understand how it works.\n\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "from tpcp.validate import Aggregator\n\n\nclass MedianAggregator(Aggregator):\n    @classmethod\n    def aggregate(cls, values: Sequence[float]) -> float:\n        print(\"Median Aggregator called\")\n        try:\n            return float(np.median(values))\n        except TypeError as e:\n            raise ValidationError(\n                f\"MedianAggregator can only be used with float values. Got the following values instead:\\nn{values}\"\n            ) from e"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "We can apply this Aggregator in two ways:\n\n1. By using it as `default_aggregator` in the Scorer constructor.\n   In this case, the aggregator will be used for all scores.\n2. By wrapping specific return values of the score method.\n\nLet's start with the first way.\n\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "median_results_agg, median_results_single = Scorer(score, default_aggregator=MedianAggregator)(pipe, example_data)\nmedian_results_agg"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "We can see via the log-printing that the aggregator was called 3 times (once per score).\n\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "assert median_results_agg[\"f1_score\"] == np.median(median_results_single[\"f1_score\"])\nassert median_results_agg[\"precision\"] == np.median(median_results_single[\"precision\"])"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "In the second case, we can select which scores we want to aggregate in a different way.\nAll scores without a specific aggregator will be aggregated by the default aggregator.\n\nBelow, only the F1-score will be aggregated by the median aggregator.\n\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "def score(pipeline: MyPipeline, datapoint: ECGExampleData):\n    # We use the `safe_run` wrapper instead of just run. This is always a good idea.\n    # We don't need to clone the pipeline here, as GridSearch will already clone the pipeline internally and `run`\n    # will clone it again.\n    pipeline = pipeline.safe_run(datapoint)\n    tolerance_s = 0.02  # We just use 20 ms for this example\n    matches = match_events_with_reference(\n        pipeline.r_peak_positions_.to_numpy(),\n        datapoint.r_peak_positions_.to_numpy(),\n        tolerance=tolerance_s * datapoint.sampling_rate_hz,\n    )\n    precision, recall, f1_score = precision_recall_f1_score(matches)\n    return {\"precision\": precision, \"recall\": recall, \"f1_score\": MedianAggregator(f1_score)}\n\n\npartial_median_results_agg, partial_median_results_single = Scorer(score)(pipe, example_data)\npartial_median_results_agg"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "assert partial_median_results_agg[\"f1_score\"] == np.median(partial_median_results_single[\"f1_score\"])\nassert partial_median_results_agg[\"precision\"] == np.mean(partial_median_results_single[\"precision\"])"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "<div class=\"alert alert-danger\"><h4>Warning</h4><p>Note, that you score function must return the same aggregator for a score across all datapoints.\n             If not, we will raise an error!</p></div>\n\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Multi-Return Aggregator\nSometimes an aggregator needs to return multiple values.\nWe can easily do that, by returning a dict from the `aggregate` method.\n\nAs example, we will calculate the mean and standard deviation of the returned scores in one aggregation.\n\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "def score(pipeline: MyPipeline, datapoint: ECGExampleData):\n    # We use the `safe_run` wrapper instead of just run. This is always a good idea.\n    # We don't need to clone the pipeline here, as GridSearch will already clone the pipeline internally and `run`\n    # will clone it again.\n    pipeline = pipeline.safe_run(datapoint)\n    tolerance_s = 0.02  # We just use 20 ms for this example\n    matches = match_events_with_reference(\n        pipeline.r_peak_positions_.to_numpy(),\n        datapoint.r_peak_positions_.to_numpy(),\n        tolerance=tolerance_s * datapoint.sampling_rate_hz,\n    )\n    precision, recall, f1_score = precision_recall_f1_score(matches)\n    return {\"precision\": precision, \"recall\": recall, \"f1_score\": f1_score}\n\n\nclass MeanAndStdAggregator(Aggregator[float]):\n    @classmethod\n    def aggregate(cls, values: Sequence[float]) -> Dict[str, float]:\n        print(\"MeanAndStdAggregator Aggreagtor called\")\n        try:\n            return {\"mean\": float(np.mean(values)), \"std\": float(np.std(values))}\n        except TypeError as e:\n            raise ValidationError(\n                \"MeanAndStdAggregator can only be used with float values. \"\n                f\"Got the following values instead:\\n\\n{values}\"\n            ) from e\n\n\nmulti_agg_agg, multi_agg_single = Scorer(score, default_aggregator=MeanAndStdAggregator)(pipe, example_data)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "When multiple values are returned, the names are concatenated with the names of the scores using `__`.\n\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "multi_agg_agg"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Complicated Aggregation\nIn cases where we do not want to or can not aggregate the scores on a per-datapoint basis, we can return arbitrary\ndata from the score function and pass it to a complex aggregator.\nThere are no restrictions on the data you can pass from the scorer.\nOnly the aggregator needs to be able to handle the values and then return a float or a dict with float values.\n\nIn this example, we will use a custom aggregator to calculate the precision, recall and f1-score without\naggregating on a datapoint level first.\nFor that we return the raw `matches` from the score function and wrap them into an aggregator that concatenates all\nof them, before throwing them into the `precision_recall_f1_score` function.\n\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "class SingleValuePrecisionRecallF1(Aggregator[np.ndarray]):\n    @classmethod\n    def aggregate(cls, values: Sequence[np.ndarray]) -> Dict[str, float]:\n        print(\"SingleValuePrecisionRecallF1 Aggregator called\")\n        precision, recall, f1_score = precision_recall_f1_score(np.vstack(values))\n        return {\"precision\": precision, \"recall\": recall, \"f1_score\": f1_score}\n\n\ndef score(pipeline: MyPipeline, datapoint: ECGExampleData):\n    # We use the `safe_run` wrapper instead of just run. This is always a good idea.\n    # We don't need to clone the pipeline here, as GridSearch will already clone the pipeline internally and `run`\n    # will clone it again.\n    pipeline = pipeline.safe_run(datapoint)\n    tolerance_s = 0.02  # We just use 20 ms for this example\n    matches = match_events_with_reference(\n        pipeline.r_peak_positions_.to_numpy(),\n        datapoint.r_peak_positions_.to_numpy(),\n        tolerance=tolerance_s * datapoint.sampling_rate_hz,\n    )\n    precision, recall, f1_score = precision_recall_f1_score(matches)\n    return {\n        \"precision\": precision,\n        \"recall\": recall,\n        \"f1_score\": f1_score,\n        \"per_sample\": SingleValuePrecisionRecallF1(matches),\n    }"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "We can see that we now get the values per datapoint (as before) and the values without previous aggregation.\nFrom a scientific perspective, we can see that these values are quite different.\nAgain, which version to choose for scoring will depend on the use case.\n\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "complicated_agg, complicated_single = Scorer(score)(pipe, example_data)\ncomplicated_agg"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "The raw matches array is still available in the `single` results.\n\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "complicated_single[\"per_sample\"]"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## No-Aggregation Aggregator\nSometimes you might want to return data from a score function that should not be aggregated.\nThis could be arbitrary metadata or scores will value that can not be averaged.\nIn this case you can simply use the :class:`~tpcp.validate.NoAgg` aggregator.\nThis will return only the single values and no aggregated items.\n\nIn the example below, we will only aggregate the precision and recall, but not the f1-score.\n\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "from tpcp.validate import NoAgg\n\n\ndef score(pipeline: MyPipeline, datapoint: ECGExampleData):\n    # We use the `safe_run` wrapper instead of just run. This is always a good idea.\n    # We don't need to clone the pipeline here, as GridSearch will already clone the pipeline internally and `run`\n    # will clone it again.\n    pipeline = pipeline.safe_run(datapoint)\n    tolerance_s = 0.02  # We just use 20 ms for this example\n    matches = match_events_with_reference(\n        pipeline.r_peak_positions_.to_numpy(),\n        datapoint.r_peak_positions_.to_numpy(),\n        tolerance=tolerance_s * datapoint.sampling_rate_hz,\n    )\n    precision, recall, f1_score = precision_recall_f1_score(matches)\n    return {\"precision\": precision, \"recall\": recall, \"f1_score\": NoAgg(f1_score)}"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "We can see that the f1-score is not contained in the aggregated results.\n\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "no_agg_agg, no_agg_single = Scorer(score)(pipe, example_data)\nno_agg_agg"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "But we can still access the value in the single results.\n\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "no_agg_single[\"f1_score\"]"
      ]
    }
  ],
  "metadata": {
    "kernelspec": {
      "display_name": "Python 3",
      "language": "python",
      "name": "python3"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.8.13"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 0
}