{
  "cells": [
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "%matplotlib inline"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "\n\n# Custom Dataset - A real world example\n\nTo better understand how you would actually use tpcp datasets, we are going to build a dataset class for an actual\ndataset.\nWe are going to use a subset of the [MIT-BIH Arrhythmia Database](https://physionet.org/content/mitdb/1.0.0/).\nThe actual content of the data is not relevant, but it has a couple of key characteristics that are typical for such\ndatasets:\n\n- Data comes in individual files per participant/recording\n- There are multiple files (with different formats and structures) for each recording\n- Each recording/data point is an entire time series\n\nThese characteristics typically make working with such a dataset a little cumbersome.\nIn the following we want to explore how we can create a tpcp dataset for it, to make future work with the data easier.\n\nIf you want to see other real-life implementations of tpcp-datasets you can also check out:\n\n* https://github.com/mad-lab-fau/sensor_position_dataset_helper/blob/master/sensor_position_dataset_helper/tpcp_dataset.py\n* https://github.com/mad-lab-fau/cft-analysis/tree/main/cft_analysis/datasets\n\nIf you just want the final implementation, without all the explanation, check `custom_dataset_final_ecg`.\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Creating an index\nFirst we need to figure out what data we have and convert that into an index dataframe.\nTo make things a little more complicated we will add a second layer to the data, by assigning each participant\ninto one of three \"patient groups\" arbitrarily.\n\nIn the data we have 3 files per participant:\n  1. {patient_id}.pk.gz -> The ECG recording\n  2. {patient_id}_all.csv -> The position of the R-peaks of all heart beats (PVC or normal).\n      All heart beats that show a different condition than PVC are already excluded\n  3. {patient_id}_pvc.csv -> The position of all PVC heart beats in the recording.\n\nLater we need to include the data of all files into the dataset, but to generate out index, it is sufficient to only\nlist one of the datatypes.\n\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "from pathlib import Path\nfrom typing import List, Optional, Union\n\nfrom tpcp import Dataset\n\ntry:\n    HERE = Path(__file__).parent\nexcept NameError:\n    HERE = Path(\".\").resolve()\ndata_path = HERE.parent.parent / \"example_data/ecg_mit_bih_arrhythmia/data\"\n\n# Note that we sort the files explicitly, as the file order might depend on the operating system.\n# Otherwise, the ordering of our dataset might not be reproducible\nparticipant_ids = [f.name.split(\"_\")[0] for f in sorted(data_path.glob(\"*_all.csv\"))]"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "This information forms one level of our dataset.\nWe will add the \"patient group\" as arbitrary hardcoded second level to our dataset to make things a little more\ninteresting.\n\nAfterwards we put everything into an index that we will use for our dataset.\n\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "from itertools import cycle\n\nimport pandas as pd\n\npatient_group = [g for g, _ in zip(cycle((\"group_1\", \"group_2\", \"group_3\")), participant_ids)]\n\ndata_index = pd.DataFrame({\"patient_group\": patient_group, \"participant\": participant_ids})\ndata_index"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Creating the dataset\nNow that we know how to create our index, we will integrate this logic into our dataset.\nNote, that we do not want to hardcode the dataset path and hence, turn it into a parameter of the dataset.\nThe rest of the logic stays the same and goes into the `create_index` method.\n\n<div class=\"alert alert-info\"><h4>Note</h4><p>Note that we sort the files explicitly, as the file order might depend on the operating system.\n          Otherwise, the ordering of our dataset might not be reproducible.</p></div>\n\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "class ECGExampleData(Dataset):\n    data_path: Path\n\n    def __init__(\n        self,\n        data_path: Path,\n        *,\n        groupby_cols: Optional[Union[List[str], str]] = None,\n        subset_index: Optional[pd.DataFrame] = None,\n    ):\n        self.data_path = data_path\n        super().__init__(groupby_cols=groupby_cols, subset_index=subset_index)\n\n    def create_index(self) -> pd.DataFrame:\n        participant_ids = [f.name.split(\"_\")[0] for f in sorted(self.data_path.glob(\"*_all.csv\"))]\n        patient_group = [g for g, _ in zip(cycle((\"group_1\", \"group_2\", \"group_3\")), participant_ids)]\n        df = pd.DataFrame({\"patient_group\": patient_group, \"participant\": participant_ids})\n        # Some additional checks to avoid common issues\n        if len(df) == 0:\n            raise ValueError(\n                \"The dataset is empty. Are you sure you selected the correct folder? Current folder is: \"\n                f\"{self.data_path}\"\n            )\n        return df\n\n\nECGExampleData(data_path=data_path)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Adding data\nThe implementation above is a fully functional dataset and can be used to split or iterate the index.\nHowever, to make things really convenient, we want to add data access parameters to the dataset.\nWe start with the raw ecg data.\n\nIn general, it is completely up to you how you implement this.\nYou can use methods on the dataset, properties, or create a set of functions, that get a dataset instance as an input.\nThe way below shows how we usually do it.\nThe most important thing in all cases is documentation to make sure everyone knows what data they are getting and\nin which format.\n\nAs we don't know how people will use the dataset, we will load the data only on demand.\nFurther, loading the data (in this case) only makes sense, when there is just a single participant selected in the\ndataset.\n\nWe will implement this logic by using a `property` to implement \"load-on-demand\" and the `dataset.is_single` method\nto check that there is really only a single participant selected.\n\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "class ECGExampleData(Dataset):\n    data_path: Path\n\n    def __init__(\n        self,\n        data_path: Path,\n        *,\n        groupby_cols: Optional[Union[List[str], str]] = None,\n        subset_index: Optional[pd.DataFrame] = None,\n    ):\n        self.data_path = data_path\n        super().__init__(groupby_cols=groupby_cols, subset_index=subset_index)\n\n    @property\n    def sampling_rate_hz(self) -> float:\n        \"\"\"The sampling rate of the raw ECG recording in Hz\"\"\"\n        return 360.0\n\n    @property\n    def data(self) -> pd.DataFrame:\n        \"\"\"The raw ECG data of a participant's recording.\n\n        The dataframe contains a single column called \"ecg\".\n        The index values are just samples.\n        You can use the sampling rate (`self.sampling_rate_hz`) to convert it into time\n        \"\"\"\n        # Check that there is only a single participant in the dataset\n        if not self.is_single(None):\n            raise ValueError(\"Data can only be accessed, when there is just a single participant in the dataset.\")\n        # Reconstruct the ecg file path based on the data index\n        p_id = self.index[\"participant\"][0]\n        return pd.read_pickle(self.data_path / f\"{p_id}.pk.gz\")\n\n    def create_index(self) -> pd.DataFrame:\n        participant_ids = [f.name.split(\"_\")[0] for f in sorted(self.data_path.glob(\"*_all.csv\"))]\n        patient_group = [g for g, _ in zip(cycle((\"group_1\", \"group_2\", \"group_3\")), participant_ids)]\n        df = pd.DataFrame({\"patient_group\": patient_group, \"participant\": participant_ids})\n        # Some additional checks to avoid common issues\n        if len(df) == 0:\n            raise ValueError(\n                \"The dataset is empty. Are you sure you selected the correct folder? Current folder is: \"\n                f\"{self.data_path}\"\n            )\n        return df"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "With that logic, we can now select a subset of the dataset that contains only a single participant and then access\nthe data.\n\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "dataset = ECGExampleData(data_path=data_path)\nsubset = dataset[0]\nsubset"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "subset.data"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Adding more data\nIn the same way, we can add the remaining data.\nThe remaining data we have available is both data generated by human labelers on the ECG signal.\nAka data that you usually would not have available with your ECG recording.\nHence, we will treat this information as \"reference data/labels\".\nBy convention, we usually use a trailing `_` after the property name to indicate that.\n\nWe also add one \"derived\" property (`labeled_r_peaks_`) that returns the data in a more convenient format for\ncertain tasks.\nYou could also implement methods or properties that perform certain computations.\nFor example if there is a typical pre-processing that should always be applied to the data, it might be good to add\na property `data_cleaned` (or similar) that runs this processing on demand when the parameter is accessed.\n\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "class ECGExampleData(Dataset):\n    data_path: Path\n\n    def __init__(\n        self,\n        data_path: Path,\n        *,\n        groupby_cols: Optional[Union[List[str], str]] = None,\n        subset_index: Optional[pd.DataFrame] = None,\n    ):\n        self.data_path = data_path\n        super().__init__(groupby_cols=groupby_cols, subset_index=subset_index)\n\n    @property\n    def sampling_rate_hz(self) -> float:\n        \"\"\"The sampling rate of the raw ECG recording in Hz\"\"\"\n        return 360.0\n\n    @property\n    def data(self) -> pd.DataFrame:\n        \"\"\"The raw ECG data of a participant's recording.\n\n        The dataframe contains a single column called \"ecg\".\n        The index values are just samples.\n        You can use the sampling rate (`self.sampling_rate_hz`) to convert it into time\n        \"\"\"\n        # Check that there is only a single participant in the dataset\n        self.assert_is_single(None, \"data\")\n        # Reconstruct the ecg file path based on the data index\n        p_id = self.index[\"participant\"][0]\n        return pd.read_pickle(self.data_path / f\"{p_id}.pk.gz\")\n\n    @property\n    def r_peak_positions_(self) -> pd.DataFrame:\n        \"\"\"The sample positions of all R-peaks in the ECG data.\n\n        This includes all R-Peaks (PVC or normal)\n        \"\"\"\n        self.assert_is_single(None, \"r_peaks_\")\n        p_id = self.index[\"participant\"][0]\n        r_peaks = pd.read_csv(self.data_path / f\"{p_id}_all.csv\", index_col=0)\n        r_peaks = r_peaks.rename(columns={\"R\": \"r_peak_position\"})\n        return r_peaks\n\n    @property\n    def pvc_positions_(self) -> pd.DataFrame:\n        \"\"\"The positions of R-peaks belonging to abnormal PVC peaks in the data stream.\n\n        The position is equivalent to a position entry in `self.r_peak_positions_`.\n        \"\"\"\n        self.assert_is_single(None, \"pvc_positions_\")\n        p_id = self.index[\"participant\"][0]\n        pvc_peaks = pd.read_csv(self.data_path / f\"{p_id}_pvc.csv\", index_col=0)\n        pvc_peaks = pvc_peaks.rename(columns={\"PVC\": \"pvc_position\"})\n        return pvc_peaks\n\n    @property\n    def labeled_r_peaks_(self) -> pd.DataFrame:\n        \"\"\"All r-peak positions with an additional column that labels them as normal or PVC.\"\"\"\n        self.assert_is_single(None, \"labeled_r_peaks_\")\n        r_peaks = self.r_peak_positions_\n        r_peaks[\"label\"] = \"normal\"\n        r_peaks.loc[r_peaks[\"r_peak_position\"].isin(self.pvc_positions_[\"pvc_position\"]), \"label\"] = \"pvc\"\n        return r_peaks\n\n    def create_index(self) -> pd.DataFrame:\n        participant_ids = [f.name.split(\"_\")[0] for f in sorted(self.data_path.glob(\"*_all.csv\"))]\n        patient_group = [g for g, _ in zip(cycle((\"group_1\", \"group_2\", \"group_3\")), participant_ids)]\n        df = pd.DataFrame({\"patient_group\": patient_group, \"participant\": participant_ids})\n        # Some additional checks to avoid common issues\n        if len(df) == 0:\n            raise ValueError(\n                \"The dataset is empty. Are you sure you selected the correct folder? Current folder is: \"\n                f\"{self.data_path}\"\n            )\n        return df\n\n\ndataset = ECGExampleData(data_path=data_path)\nsubset = dataset[0]\nsubset.labeled_r_peaks_"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Conclusion\nWhile building the dataset is not always easy and requires you to think about how you want to access the data, it\nmakes working with the data in the future easy.\nEven without using any other tpcp feature, it provides a clear overview over the data available and abstracts the\ncomplexity of data loading.\nThis can prevent accidental errors and just a much faster and better workflow in case new people want to work with\na dataset.\n\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Advanced Concepts - Caching\nLoading/pre-processing the data on demand in the dataset above is a good optimization, if you only need to access the\ndata once.\nHowever, if you need to access the data multiple times you might want to cache the loaded data within a single\nexecution of you code, or even cache time-consuming computations across multiple runs/programs.\nDepending on the scenario this can either be achieved by using a in memory cache like `functools.lru_cache` or a disk\ncache like `joblib.Memory`.\n\nFinding the right functions to cache and to do it in a way that balances runtime and other resource usage is tricky.\nSo only do that, if you really need it.\nHowever, when you implement it, the best approach is to extract the part you want to cache into a global function\n**without** side-effects and then wrap this function with your caching method of choice.\n\nBelow we will demonstrate how to do that using Pythons `lru_cache` for the `data` property and make caching optional\nusing a dataset parameter.\n\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false
      },
      "outputs": [],
      "source": [
        "from functools import lru_cache\n\n\ndef load_pandas_pickle_file(file_path):\n    return pd.read_pickle(file_path)\n\n\ncached_load_pandas_pickle_file = lru_cache(10)(load_pandas_pickle_file)\n\n\nclass ECGExampleData(Dataset):\n    data_path: Path\n    use_lru_cache: bool\n\n    def __init__(\n        self,\n        data_path: Path,\n        *,\n        use_lru_cache: bool = True,\n        groupby_cols: Optional[Union[List[str], str]] = None,\n        subset_index: Optional[pd.DataFrame] = None,\n    ):\n        self.data_path = data_path\n        self.use_lru_cache = use_lru_cache\n        super().__init__(groupby_cols=groupby_cols, subset_index=subset_index)\n\n    @property\n    def sampling_rate_hz(self) -> float:\n        \"\"\"The sampling rate of the raw ECG recording in Hz\"\"\"\n        return 360.0\n\n    @property\n    def data(self) -> pd.DataFrame:\n        \"\"\"The raw ECG data of a participant's recording.\n\n        The dataframe contains a single column called \"ecg\".\n        The index values are just samples.\n        You can use the sampling rate (`self.sampling_rate_hz`) to convert it into time\n        \"\"\"\n        # Check that there is only a single participant in the dataset\n        self.assert_is_single(None, \"data\")\n        # Reconstruct the ecg file path based on the data index\n        p_id = self.group.participant\n        file_path = self.data_path / f\"{p_id}.pk.gz\"\n        # We try to use the cache if enabled.\n        if self.use_lru_cache:\n            return cached_load_pandas_pickle_file(file_path)\n        return load_pandas_pickle_file(file_path)\n\n    @property\n    def r_peak_positions_(self) -> pd.DataFrame:\n        \"\"\"The sample positions of all R-peaks in the ECG data.\n\n        This includes all R-Peaks (PVC or normal)\n        \"\"\"\n        self.assert_is_single(None, \"r_peaks_\")\n        p_id = self.group.participant\n        r_peaks = pd.read_csv(self.data_path / f\"{p_id}_all.csv\", index_col=0)\n        r_peaks = r_peaks.rename(columns={\"R\": \"r_peak_position\"})\n        return r_peaks\n\n    @property\n    def pvc_positions_(self) -> pd.DataFrame:\n        \"\"\"The positions of R-peaks belonging to abnormal PVC peaks in the data stream.\n\n        The position is equivalent to a position entry in `self.r_peak_positions_`.\n        \"\"\"\n        self.assert_is_single(None, \"pvc_positions_\")\n        p_id = self.index[\"participant\"][0]\n        pvc_peaks = pd.read_csv(self.data_path / f\"{p_id}_pvc.csv\", index_col=0)\n        pvc_peaks = pvc_peaks.rename(columns={\"PVC\": \"pvc_position\"})\n        return pvc_peaks\n\n    @property\n    def labeled_r_peaks_(self) -> pd.DataFrame:\n        \"\"\"All r-peak positions with an additional column that labels them as normal or PVC.\"\"\"\n        self.assert_is_single(None, \"labeled_r_peaks_\")\n        r_peaks = self.r_peak_positions_\n        r_peaks[\"label\"] = \"normal\"\n        r_peaks.loc[r_peaks[\"r_peak_position\"].isin(self.pvc_positions_[\"pvc_position\"]), \"label\"] = \"pvc\"\n        return r_peaks\n\n    def create_index(self) -> pd.DataFrame:\n        participant_ids = [f.name.split(\"_\")[0] for f in sorted(self.data_path.glob(\"*_all.csv\"))]\n        patient_group = [g for g, _ in zip(cycle((\"group_1\", \"group_2\", \"group_3\")), participant_ids)]\n        df = pd.DataFrame({\"patient_group\": patient_group, \"participant\": participant_ids})\n        # Some additional checks to avoid common issues\n        if len(df) == 0:\n            raise ValueError(\n                \"The dataset is empty. Are you sure you selected the correct folder? Current folder is: \"\n                f\"{self.data_path}\"\n            )\n        return df"
      ]
    }
  ],
  "metadata": {
    "kernelspec": {
      "display_name": "Python 3",
      "language": "python",
      "name": "python3"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.8.13"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 0
}