Browse Source
The function `split_uniques()` splits a pandas data frame on the unique values in one or more columns.main
Holger Frey
2 years ago
2 changed files with 129 additions and 29 deletions
@ -1,30 +1,82 @@
@@ -1,30 +1,82 @@
|
||||
import pandas as pd |
||||
from typing import Iterable, NamedTuple |
||||
import pytest |
||||
|
||||
SplitUniqueKeys = dict[str:str] |
||||
from typing import Iterable, Any |
||||
|
||||
|
||||
class SplitUniqueResult(NamedTuple): |
||||
keys: SplitUniqueKeys |
||||
data: pd.DataFrame |
||||
def split_uniques( |
||||
data: pd.DataFrame, on: str | Iterable[str], *, _prev_values: tuple[Any] = None |
||||
) -> tuple[Any, ..., pd.DataFrame]: |
||||
"""Splits a data frame on uniques values in a column |
||||
|
||||
Returns a generator of tuples with at least two elements. |
||||
The _last_ element is the resulting partial data frame, |
||||
the element(s) before are the values used to split up the original data. |
||||
|
||||
def split( |
||||
data: pd.DataFrame, |
||||
columns: str | Iterable[str], |
||||
*, |
||||
prevkeys: SplitUniqueKeys = None |
||||
) -> Iterable[SplitUniqueResult]: |
||||
if isinstance(columns, str): |
||||
columns = [columns] |
||||
if prevkeys is None: |
||||
prevkeys = {} |
||||
current, *rest = columns |
||||
for value in data[current].unique(): |
||||
selection = data[current] == value |
||||
Example: |
||||
|
||||
for well, pos, partial_data in split_uniques(full_data, ["Well", "Pos"]): |
||||
# `well` is one of the unique values in full_data["Well"] |
||||
# `pos` is one of the unique values in full_data["Pos"] |
||||
# parital_data is a data frame, containing values for this well and pos |
||||
|
||||
""" |
||||
if isinstance(on, str): |
||||
on = [on] |
||||
if _prev_values is None: |
||||
_prev_values = tuple() |
||||
current_column, *rest = on |
||||
for current_value in data[current_column].unique(): |
||||
selection = data[current_column] == current_value |
||||
selected = data.loc[selection].copy() |
||||
keys = prevkeys | {current: value} |
||||
values = _prev_values + (current_value,) |
||||
if rest: |
||||
yield from split(selected, rest, prevkeys=keys) |
||||
yield from split_uniques(selected, rest, _prev_values=values) |
||||
else: |
||||
yield SplitUniqueResult(keys, selected) |
||||
yield *values, selected |
||||
|
||||
|
||||
# tests |
||||
|
||||
|
||||
@pytest.fixture() |
||||
def example_data(): |
||||
return pd.DataFrame({"A": [1, 2, 2], "B": [3, 4, 3], "C": ["x", "y", "z"]}) |
||||
|
||||
|
||||
@pytest.mark.parametrize("on", ["A", ["A"]]) |
||||
def test_split_uniques_one_column(example_data, on): |
||||
result = list(split_uniques(example_data, on)) |
||||
|
||||
assert len(result) == 2 |
||||
assert isinstance(result[0], tuple) |
||||
|
||||
a_value, data = result[0] |
||||
assert a_value == 1 |
||||
assert list(data["C"]) == ["x"] |
||||
|
||||
a_value, data = result[1] |
||||
assert a_value == 2 |
||||
assert list(data["C"]) == ["y", "z"] |
||||
|
||||
|
||||
def test_split_uniques_multiple_columns(example_data): |
||||
result = list(split_uniques(example_data, ["B", "A"])) |
||||
|
||||
assert len(result) == 3 |
||||
assert isinstance(result[0], tuple) |
||||
|
||||
b_value, a_value, data = result[0] |
||||
assert b_value == 3 |
||||
assert a_value == 1 |
||||
assert list(data["C"]) == ["x"] |
||||
|
||||
b_value, a_value, data = result[1] |
||||
assert b_value == 3 |
||||
assert a_value == 2 |
||||
assert list(data["C"]) == ["z"] |
||||
|
||||
b_value, a_value, data = result[2] |
||||
assert b_value == 4 |
||||
assert a_value == 2 |
||||
assert list(data["C"]) == ["y"] |
||||
|
Loading…
Reference in new issue