Note
Go to the end to download the full example code
TypedIterator#
This example shows how to use the TypedIterator
class, which might be helpful, when iterating over
data and needing to store multiple results for each iteration.
The Problem#
A very common pattern when working with any type of data is to iterate over it and then apply a series of operations to it. In simple cases you might only want to store the final result, but often you are also interested in intermediate or alternative outputs.
What typically happens, is that you create multiple empty lists or dictionaries (one for each result) and then append the results to them during the iteration. At the end you might apply further operations to the results, e.g. aggregations.
Below is a simple example of this pattern:
data = [1, 2, 3, 4, 5]
result_1 = []
result_2 = []
result_3 = []
for d in data:
intermediate_result_1 = d * 3
result_1.append(intermediate_result_1)
intermediate_result_2 = intermediate_result_1 * 2
result_2.append(intermediate_result_2)
final_result_3 = intermediate_result_2 - 4
result_3.append(final_result_3)
# An example aggregation
result_1 = sum(result_1)
print(result_1)
print(result_2)
print(result_3)
45
[6, 12, 18, 24, 30]
[2, 8, 14, 20, 26]
Fundamentally, this pattern works well. However, it does not really fit into the idea of declarative code that we are trying to achieve with tpcp. While programming, there are 3 places where you need to think about the result and the result types. This makes it harder to reason about the code and also makes it harder to change the code later on. In addition, the main pipeline code, which should be the most important part of the code, is cluttered with boilerplate code concerned with just storing the results.
While we could fix some of these issues by refactoring a little, with TypedIterator
we provide (in our opinion)
a much cleaner solution.
The basic idea of TypedIterator
is to provide a way to specify all configuration (i.e. what results to expect and
how to aggregate them) in one place at the beginning.
It further simplifies how to store results, by inverting the data structure.
Instead of worrying about one data structure for each result, you only need to worry about one data structure for each
iteration.
Using dataclasses, these objects are also typed, preventing typos and providing IDE support.
Let’s rewrite the above example using TypedIterator
:
We define our result-datatype as a dataclass.
We define the aggregations we want to apply to the results. If we don’t want to aggregate a result, we simply don’t add it to the list. We provide some more explanation on aggregations below, just accept this for now.
from tpcp.misc import TypedIteratorResultTuple
def sum_agg(results: list[TypedIteratorResultTuple[int, ResultType]]):
return sum(r.result.result_1 for r in results)
aggregations = [
("result_1", sum_agg),
]
3. We create a new instance of TypedIterator
with the result type and the aggregations.
We use the “square bracket” typing syntax to bind the output datatype and the input datatype we are planning to
iterate over.
This way, our IDE is able to autocomplete the attributes of the result type.
from tpcp.misc import TypedIterator
iterator = TypedIterator[int, ResultType](ResultType, aggregations=aggregations)
Now we can iterate over our data and get a result object for each iteration, that we can then fill with the results.
You can access the data using the results_
attribute.
ResultType(result_1=45, result_2=[6, 12, 18, 24, 30], result_3=[2, 8, 14, 20, 26])
Your IDE should be able to autocomplete the attributes.
45
The raw results are available as a list of Result tuples. They allow us to access the results in the order they were created, and contain further metadata like the input data.
iterator.raw_results_
[TypedIteratorResultTuple(iteration_name='__main__', input=1, result=ResultType(result_1=3, result_2=6, result_3=2), iteration_context={}), TypedIteratorResultTuple(iteration_name='__main__', input=2, result=ResultType(result_1=6, result_2=12, result_3=8), iteration_context={}), TypedIteratorResultTuple(iteration_name='__main__', input=3, result=ResultType(result_1=9, result_2=18, result_3=14), iteration_context={}), TypedIteratorResultTuple(iteration_name='__main__', input=4, result=ResultType(result_1=12, result_2=24, result_3=20), iteration_context={}), TypedIteratorResultTuple(iteration_name='__main__', input=5, result=ResultType(result_1=15, result_2=30, result_3=26), iteration_context={})]
While this version of the code required a couple more lines, it is much easier to understand and reason about. It clearly separates the configuration from the actual code and the core pipeline code is much cleaner.
A real-world example#
Below we apply this pattern to a pipeline that iterates over an actual dataset. The return types are a little bit more complex to show some more advanced features of aggregations.
For this example we apply the QRS detection algorithm to the ECG dataset demonstrated in some of the other examples. The QRS detection algorithm only has a single output. Hence, we use the “number of r-peaks” as a second result here to demonstrate the use case.
Again we start by defining the result dataclass.
import pandas as pd
@dataclass
class QRSResultType:
"""The result type of the QRS detection algorithm."""
r_peak_positions: pd.Series
n_r_peaks: int
Our input data is going to be a dataset object of the ECGExampleData type.
from pathlib import Path
from examples.datasets.datasets_final_ecg import ECGExampleData
try:
HERE = Path(__file__).parent
except NameError:
HERE = Path().resolve()
data_path = HERE.parent.parent / "example_data/ecg_mit_bih_arrhythmia/data"
dataset = ECGExampleData(data_path)
For the aggregations, we want to concatenate the r-peak positions.
The aggregation function gets all raw results as input.
So it can access all inputs, all results, and all metadata.
This means you can define any aggregation you want.
In this case, we want to concatenate the r-peak positions into a single dataframe.
And we turn the n_r_peaks
into a dictionary, to make it easier to map the results back to the inputs.
Note that we can type these functions using the TypedIteratorResultTuple
type.
Like the iterator itself, this type is generic and allows you to specify the input and output types.
So in our case, the input is ECGExampleData
and the output is QRSResultType
.
from typing_extensions import TypeAlias
from tpcp.misc import TypedIteratorResultTuple
result_tup: TypeAlias = TypedIteratorResultTuple[ECGExampleData, QRSResultType]
def concat_r_peak_positions(results: list[result_tup]):
return pd.concat({r.input.group_label: r.result.r_peak_positions for r in results})
def aggregate_n_r_peaks(results: list[result_tup]):
return {r.input.group_label: r.result.n_r_peaks for r in results}
aggregations = [
("r_peak_positions", concat_r_peak_positions),
("n_r_peaks", aggregate_n_r_peaks),
]
Now we can create the iterator and iterate over the dataset. The iterator takes the same type parameters as our result-tuple.
We can then iterate over the dataset and apply the QRS detection algorithm.
from examples.algorithms.algorithms_qrs_detection_final import QRSDetector
qrs_iterator = TypedIterator[ECGExampleData, QRSResultType](QRSResultType, aggregations=aggregations)
for d, r in qrs_iterator.iterate(dataset):
r.r_peak_positions = QRSDetector().detect(d.data["ecg"], sampling_rate_hz=d.sampling_rate_hz).r_peak_positions_
r.n_r_peaks = len(r.r_peak_positions)
Finally we can inspect the results stored on the iterator.
QRSResultType(r_peak_positions=group_1 100 0 77
1 370
2 663
3 947
4 1231
...
group_3 200 1448 647546
1449 648357
1450 648629
1451 649409
1452 649928
Length: 17782, dtype: int64, n_r_peaks={ECGExampleDataGroupLabel(patient_group='group_1', participant='100'): 2270, ECGExampleDataGroupLabel(patient_group='group_2', participant='102'): 1710, ECGExampleDataGroupLabel(patient_group='group_3', participant='104'): 2066, ECGExampleDataGroupLabel(patient_group='group_1', participant='105'): 2567, ECGExampleDataGroupLabel(patient_group='group_2', participant='106'): 1704, ECGExampleDataGroupLabel(patient_group='group_3', participant='108'): 78, ECGExampleDataGroupLabel(patient_group='group_1', participant='114'): 30, ECGExampleDataGroupLabel(patient_group='group_2', participant='116'): 2392, ECGExampleDataGroupLabel(patient_group='group_3', participant='119'): 1988, ECGExampleDataGroupLabel(patient_group='group_1', participant='121'): 6, ECGExampleDataGroupLabel(patient_group='group_2', participant='123'): 1518, ECGExampleDataGroupLabel(patient_group='group_3', participant='200'): 1453})
Note, that r_peak_positions_
is a single dataframe now and not a list of dataframes.
group_1 100 0 77
1 370
2 663
3 947
4 1231
...
group_3 200 1448 647546
1449 648357
1450 648629
1451 649409
1452 649928
Length: 17782, dtype: int64
The n_r_peaks_
is still a dictionary, as expected.
{ECGExampleDataGroupLabel(patient_group='group_1', participant='100'): 2270, ECGExampleDataGroupLabel(patient_group='group_2', participant='102'): 1710, ECGExampleDataGroupLabel(patient_group='group_3', participant='104'): 2066, ECGExampleDataGroupLabel(patient_group='group_1', participant='105'): 2567, ECGExampleDataGroupLabel(patient_group='group_2', participant='106'): 1704, ECGExampleDataGroupLabel(patient_group='group_3', participant='108'): 78, ECGExampleDataGroupLabel(patient_group='group_1', participant='114'): 30, ECGExampleDataGroupLabel(patient_group='group_2', participant='116'): 2392, ECGExampleDataGroupLabel(patient_group='group_3', participant='119'): 1988, ECGExampleDataGroupLabel(patient_group='group_1', participant='121'): 6, ECGExampleDataGroupLabel(patient_group='group_2', participant='123'): 1518, ECGExampleDataGroupLabel(patient_group='group_3', participant='200'): 1453}
The raw results are still available.
qrs_iterator.raw_results_
[TypedIteratorResultTuple(iteration_name='__main__', input=ECGExampleData [1 groups/rows]
patient_group participant
0 group_1 100, result=QRSResultType(r_peak_positions=0 77
1 370
2 663
3 947
4 1231
...
2265 648978
2266 649232
2267 649485
2268 649734
2269 649992
Length: 2270, dtype: int64, n_r_peaks=2270), iteration_context={}), TypedIteratorResultTuple(iteration_name='__main__', input=ECGExampleData [1 groups/rows]
patient_group participant
0 group_2 102, result=QRSResultType(r_peak_positions=0 409
1 697
2 988
3 1304
4 1613
...
1705 648639
1706 648930
1707 649243
1708 649553
1709 649851
Length: 1710, dtype: int64, n_r_peaks=1710), iteration_context={}), TypedIteratorResultTuple(iteration_name='__main__', input=ECGExampleData [1 groups/rows]
patient_group participant
0 group_3 104, result=QRSResultType(r_peak_positions=0 17
1 314
2 613
3 899
4 1186
...
2061 648729
2062 649021
2063 649298
2064 649578
2065 649874
Length: 2066, dtype: int64, n_r_peaks=2066), iteration_context={}), TypedIteratorResultTuple(iteration_name='__main__', input=ECGExampleData [1 groups/rows]
patient_group participant
0 group_1 105, result=QRSResultType(r_peak_positions=0 197
1 459
2 708
3 964
4 1221
...
2562 648733
2563 648977
2564 649221
2565 649471
2566 649740
Length: 2567, dtype: int64, n_r_peaks=2567), iteration_context={}), TypedIteratorResultTuple(iteration_name='__main__', input=ECGExampleData [1 groups/rows]
patient_group participant
0 group_2 106, result=QRSResultType(r_peak_positions=0 351
1 725
2 1086
3 1448
4 1830
...
1699 648969
1700 649161
1701 649335
1702 649792
1703 649990
Length: 1704, dtype: int64, n_r_peaks=1704), iteration_context={}), TypedIteratorResultTuple(iteration_name='__main__', input=ECGExampleData [1 groups/rows]
patient_group participant
0 group_3 108, result=QRSResultType(r_peak_positions=0 10875
1 168524
2 169689
3 170426
4 170802
...
73 343872
74 359503
75 361856
76 472918
77 526420
Length: 78, dtype: int64, n_r_peaks=78), iteration_context={}), TypedIteratorResultTuple(iteration_name='__main__', input=ECGExampleData [1 groups/rows]
patient_group participant
0 group_1 114, result=QRSResultType(r_peak_positions=0 281594
1 281953
2 282291
3 299048
4 300134
5 300486
6 303833
7 304565
8 305674
9 306034
10 307475
11 314265
12 354268
13 469019
14 477999
15 512726
16 513064
17 513384
18 627927
19 629093
20 629709
21 630859
22 631156
23 636224
24 636519
25 636821
26 637118
27 637399
28 637652
29 638046
dtype: int64, n_r_peaks=30), iteration_context={}), TypedIteratorResultTuple(iteration_name='__main__', input=ECGExampleData [1 groups/rows]
patient_group participant
0 group_2 116, result=QRSResultType(r_peak_positions=0 16
1 284
2 562
3 838
4 1105
...
2387 648934
2388 649192
2389 649444
2390 649703
2391 649958
Length: 2392, dtype: int64, n_r_peaks=2392), iteration_context={}), TypedIteratorResultTuple(iteration_name='__main__', input=ECGExampleData [1 groups/rows]
patient_group participant
0 group_3 119, result=QRSResultType(r_peak_positions=0 309
1 504
2 977
3 1315
4 1651
...
1983 648792
1984 649129
1985 649468
1986 649788
1987 649985
Length: 1988, dtype: int64, n_r_peaks=1988), iteration_context={}), TypedIteratorResultTuple(iteration_name='__main__', input=ECGExampleData [1 groups/rows]
patient_group participant
0 group_1 121, result=QRSResultType(r_peak_positions=0 1569
1 88217
2 92814
3 168263
4 301711
5 581676
dtype: int64, n_r_peaks=6), iteration_context={}), TypedIteratorResultTuple(iteration_name='__main__', input=ECGExampleData [1 groups/rows]
patient_group participant
0 group_2 123, result=QRSResultType(r_peak_positions=0 71
1 551
2 1022
3 1499
4 1926
...
1513 648248
1514 648627
1515 648999
1516 649343
1517 649690
Length: 1518, dtype: int64, n_r_peaks=1518), iteration_context={}), TypedIteratorResultTuple(iteration_name='__main__', input=ECGExampleData [1 groups/rows]
patient_group participant
0 group_3 200, result=QRSResultType(r_peak_positions=0 488
1 965
2 1434
3 1883
4 2332
...
1448 647546
1449 648357
1450 648629
1451 649409
1452 649928
Length: 1453, dtype: int64, n_r_peaks=1453), iteration_context={})]
Custom Iterators#
When passing an iterable directly is not really convenient, you can also create a custom iterator class.
This class can reimplement iterate
with custom logic.
For example, you could provide a custom iterator that takes a data and a sections parameter and then loops over the
sections of the data.
For this we need to create a custom subclass inheriting from BaseTypedIterator
.
from collections.abc import Iterator
from typing import Generic, TypeVar
from tpcp.misc import BaseTypedIterator
CustomTypeT = TypeVar("CustomTypeT")
class SectionIterator(BaseTypedIterator[pd.DataFrame, CustomTypeT], Generic[CustomTypeT]):
def iterate(self, data: pd.DataFrame, sections: pd.DataFrame) -> Iterator[tuple[pd.DataFrame, CustomTypeT]]:
# We turn the sections into a generator of dataframes
data_iterable = (data.iloc[s.start : s.end] for s in sections.itertuples(index=False))
# We use the `_iterate` method to do the heavy lifting
yield from self._iterate(data_iterable)
We create some dummy data and sections to test the iterator.
dummy_data = pd.DataFrame({"data": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]})
dummy_sections = pd.DataFrame({"start": [0, 5], "end": [5, 10]})
Now we can use the iterator to iterate over the data. We skip any form of aggregation here, as it is not really relevant for this example, but it would work the same way as before.
@dataclass
class SimpleResultType:
n_samples: int
custom_iterator = SectionIterator[SimpleResultType](SimpleResultType)
for d, r in custom_iterator.iterate(dummy_data, dummy_sections):
print(d)
r.n_samples = len(d)
data
0 1
1 2
2 3
3 4
4 5
data
5 6
6 7
7 8
8 9
9 10
We can see that the iterator iterated over the two sections of the data. And the raw results contain two instances of the result dataclass.
custom_iterator.raw_results_
[TypedIteratorResultTuple(iteration_name='__main__', input= data
0 1
1 2
2 3
3 4
4 5, result=SimpleResultType(n_samples=5), iteration_context={}), TypedIteratorResultTuple(iteration_name='__main__', input= data
5 6
6 7
7 8
8 9
9 10, result=SimpleResultType(n_samples=5), iteration_context={})]
SimpleResultType(n_samples=[5, 5])
[5, 5]
Advanced Usacases#
For a really advanced use cases, check out mobgap GsIterator. This makes use of sub-iterations to allow to iterate and aggregate subregions of the data dynamically.
Additional Aggregators#
We allow to pass additional aggregators to the iterator that have names that are not part of the result type.
This allows to perform additional aggregations.
They work as before, but the aggregation results are not available on the result object, but rather as raw dictionary
via the additional_results_
attribute.
We show that below with the section iterator we defined above.
aggregations = [("sum_n_samples", lambda results: sum(r.result.n_samples for r in results))]
custom_iterator = SectionIterator[SimpleResultType](SimpleResultType, aggregations=aggregations)
for d, r in custom_iterator.iterate(dummy_data, dummy_sections):
r.n_samples = len(d)
custom_iterator.results_
SimpleResultType(n_samples=[5, 5])
custom_iterator.additional_results_
{'sum_n_samples': 10}
Total running time of the script: (0 minutes 7.170 seconds)
Estimated memory usage: 15 MB