|
|
|
import pandas as pd
|
|
|
|
import pytest
|
|
|
|
|
|
|
|
from typing import Iterable, Any
|
|
|
|
|
|
|
|
|
|
|
|
def split_uniques(
|
|
|
|
data: pd.DataFrame, on: str | Iterable[str], *, _prev_values: tuple[Any] = None
|
|
|
|
) -> tuple[Any, ..., pd.DataFrame]:
|
|
|
|
"""Splits a data frame on uniques values in a column
|
|
|
|
|
|
|
|
Returns a generator of tuples with at least two elements.
|
|
|
|
The _last_ element is the resulting partial data frame,
|
|
|
|
the element(s) before are the values used to split up the original data.
|
|
|
|
|
|
|
|
Example:
|
|
|
|
|
|
|
|
for well, pos, partial_data in split_uniques(full_data, ["Well", "Pos"]):
|
|
|
|
# `well` is one of the unique values in full_data["Well"]
|
|
|
|
# `pos` is one of the unique values in full_data["Pos"]
|
|
|
|
# parital_data is a data frame, containing values for this well and pos
|
|
|
|
|
|
|
|
"""
|
|
|
|
if isinstance(on, str):
|
|
|
|
on = [on]
|
|
|
|
if _prev_values is None:
|
|
|
|
_prev_values = tuple()
|
|
|
|
current_column, *rest = on
|
|
|
|
for current_value in data[current_column].unique():
|
|
|
|
selection = data[current_column] == current_value
|
|
|
|
selected = data.loc[selection].copy()
|
|
|
|
values = _prev_values + (current_value,)
|
|
|
|
if rest:
|
|
|
|
yield from split_uniques(selected, rest, _prev_values=values)
|
|
|
|
else:
|
|
|
|
yield *values, selected
|
|
|
|
|
|
|
|
|
|
|
|
# tests
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture()
|
|
|
|
def example_data():
|
|
|
|
return pd.DataFrame({"A": [1, 2, 2], "B": [3, 4, 3], "C": ["x", "y", "z"]})
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("on", ["A", ["A"]])
|
|
|
|
def test_split_uniques_one_column(example_data, on):
|
|
|
|
result = list(split_uniques(example_data, on))
|
|
|
|
|
|
|
|
assert len(result) == 2
|
|
|
|
assert isinstance(result[0], tuple)
|
|
|
|
|
|
|
|
a_value, data = result[0]
|
|
|
|
assert a_value == 1
|
|
|
|
assert list(data["C"]) == ["x"]
|
|
|
|
|
|
|
|
a_value, data = result[1]
|
|
|
|
assert a_value == 2
|
|
|
|
assert list(data["C"]) == ["y", "z"]
|
|
|
|
|
|
|
|
|
|
|
|
def test_split_uniques_multiple_columns(example_data):
|
|
|
|
result = list(split_uniques(example_data, ["B", "A"]))
|
|
|
|
|
|
|
|
assert len(result) == 3
|
|
|
|
assert isinstance(result[0], tuple)
|
|
|
|
|
|
|
|
b_value, a_value, data = result[0]
|
|
|
|
assert b_value == 3
|
|
|
|
assert a_value == 1
|
|
|
|
assert list(data["C"]) == ["x"]
|
|
|
|
|
|
|
|
b_value, a_value, data = result[1]
|
|
|
|
assert b_value == 3
|
|
|
|
assert a_value == 2
|
|
|
|
assert list(data["C"]) == ["z"]
|
|
|
|
|
|
|
|
b_value, a_value, data = result[2]
|
|
|
|
assert b_value == 4
|
|
|
|
assert a_value == 2
|
|
|
|
assert list(data["C"]) == ["y"]
|