diff --git a/pyproject.toml b/pyproject.toml index 7e701fe..0ad8620 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,7 @@ classifiers = [ dependencies = [ "pandas", + "scikit-learn", ] [project.urls] diff --git a/src/conda_helpers/__init__.py b/src/conda_helpers/__init__.py index 33c8aae..07e1e0a 100644 --- a/src/conda_helpers/__init__.py +++ b/src/conda_helpers/__init__.py @@ -4,3 +4,8 @@ Helpers for working with data frames in a conda environment """ __version__ = "0.0.1" + + +from .iter_uniques import iter_uniques, select # noqa: F401 +from .linear_regression import linear_regression # noqa: F401 +from .mbp import add_exposure_info, normalize # noqa: F401 diff --git a/src/conda_helpers/iter_uniques.py b/src/conda_helpers/iter_uniques.py new file mode 100644 index 0000000..0fcc026 --- /dev/null +++ b/src/conda_helpers/iter_uniques.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +from typing import Any + +import pandas as pd + + +def _iter_uniques( + data: pd.DataFrame, *on: tuple[Any], _prev_values: None | tuple[Any] = None +) -> tuple[Any, ..., pd.DataFrame]: + """Splits a data frame on uniques values in a column + + Returns a generator of tuples with at least two elements. + The _last_ element is the resulting partial data frame, + the element(s) before are the values used to split up the original data. + + Example: + + for well, pos, partial_data in split_uniques(full_data, "Well", "Pos"): + # `well` is one of the unique values in full_data["Well"] + # `pos` is one of the unique values in full_data["Pos"] + # parital_data is a data frame, containing values for this well and pos + + """ + if _prev_values is None: + _prev_values = () + current_column, *rest = on + for current_value in data[current_column].unique(): + selection = data[current_column] == current_value + selected = data.loc[selection].copy() + values = (*_prev_values, current_value) + if rest: + yield from _iter_uniques(selected, *rest, _prev_values=values) + else: + yield *values, selected + + +def iter_uniques( + data: pd.DataFrame, *on: tuple[Any] +) -> tuple[Any, ..., pd.DataFrame]: + """Splits a data frame on uniques values in a column + + Returns a generator of tuples with at least two elements. + The _last_ element is the resulting partial data frame, + the element(s) before are the values used to split up the original data. + + Example: + + for well, pos, partial_data in split_uniques(full_data, "Well", "Pos"): + # `well` is one of the unique values in full_data["Well"] + # `pos` is one of the unique values in full_data["Pos"] + # parital_data is a data frame, containing values for this well and pos + + """ + yield from _iter_uniques(data, *on) + + +def select(data: pd.DataFrame, column: str, value: Any) -> pd.DataFrame: + selection = data[column] == value + return data.loc[selection].copy() diff --git a/src/conda_helpers/linear_regression.py b/src/conda_helpers/linear_regression.py new file mode 100644 index 0000000..f902ff9 --- /dev/null +++ b/src/conda_helpers/linear_regression.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +import dataclasses + +import pandas as pd +from sklearn import linear_model + + +@dataclasses.dataclass +class Regression: + intercept: float + coefficient: float + score: float + + @property + def coeff(self) -> float: + return self.coefficient + + @property + def r2(self) -> float: + return self.score + + def predict( + self, *, x: float | None = None, y: float | None = None + ) -> float: + """predict a value if x or y is given""" + if x is not None and y is not None: + msg = "predict() expects one keyword argument 'x' or 'y', got both" + raise TypeError(msg) + if x is not None: + return self.intercept + x * self.coefficient + if y is not None: + return (y - self.intercept) / self.coefficient + msg = "predict() expects a keyword argument 'x' or 'y'" + raise TypeError(msg) + + def to_dict(self): + return dataclasses.asdict(self) + + +def linear_regression(data: pd.DataFrame, *, x: str, y: str) -> Regression: + """calculates a linear regression for two columns of a DataFrame""" + x_values = data[x].to_numpy().reshape(-1, 1) + y_values = data[y].to_numpy().reshape(-1, 1) + fit = linear_model.LinearRegression().fit(x_values, y_values) + score = fit.score(x_values, y_values) + return Regression(fit.intercept_[0], fit.coef_[0][0], score) diff --git a/src/conda_helpers/mbp.py b/src/conda_helpers/mbp.py new file mode 100644 index 0000000..64b83b6 --- /dev/null +++ b/src/conda_helpers/mbp.py @@ -0,0 +1,100 @@ +import pandas as pd + +from .iter_uniques import select + +EXPOSURE_COLUMNS = [ + "Exposure.Id", + "Exposure.Channel", + "Exposure.Time", + "Exposure.Time.Normalized", +] + +SATURATION_LIMIT = 2 + +TEST_OVERFLOW_COLUMN = "Test.Spot.Overflow" + + +def add_exposure_info(data: pd.DataFrame, analysis="hyb") -> pd.DataFrame: + time_cy3 = 100 if "1" in analysis else 200 + exposure_values = [ + (1, "Cy3", time_cy3, time_cy3), + (2, "Cy5", 150, 25), + (3, "Cy5", 15, 25), + ] + exposure_df = pd.DataFrame(exposure_values, columns=EXPOSURE_COLUMNS) + return data.merge(exposure_df, on="Exposure.Id") + + +def test_overflow( + data: pd.DataFrame, result_column: str = TEST_OVERFLOW_COLUMN +): + data[result_column] = data["Spot.Saturation"] > SATURATION_LIMIT + return data + + +def select_xdr_data(data: pd.DataFrame) -> pd.DataFrame: + xdr_columns = [*EXPOSURE_COLUMNS, TEST_OVERFLOW_COLUMN] + missing = [c for c in xdr_columns if c not in data.columns] + if missing: + message = f"Columns {missing} are missing in the data frame" + raise KeyError(message) + + cy3_data = select(data, "Exposure.Channel", "Cy3") + cy5_data = select(data, "Exposure.Channel", "Cy5") + + id_columns = ["Analysis.Name", "Well.Name", "Pos.Id"] + + cy5_long = select(cy5_data, "Exposure.Time", 150).set_index(id_columns) + cy5_short = select(cy5_data, "Exposure.Time", 15).set_index(id_columns) + + in_overflow = cy5_long[TEST_OVERFLOW_COLUMN] + cy5_long_selected = cy5_long.loc[~in_overflow].reset_index() + cy5_short_selected = cy5_short.loc[in_overflow].reset_index() + + return pd.concat( + [cy3_data, cy5_long_selected, cy5_short_selected] + ).reset_index() + + +def normalize_xdr_data( + data: pd.DataFrame, template="{}.Normalized" +) -> pd.DataFrame: + cy5_data = select(data, "Exposure.Channel", "Cy5") + cy5_long_data = select(cy5_data, "Exposure.Time", 150) + if True in list(cy5_long_data[TEST_OVERFLOW_COLUMN].unique()): + message = ( + "Some spots for long Cy5 exposure time are still in overflow. " + "Did you forget to select the appropriate data (select_xdr_data) ?" + ) + raise ValueError(message) + + time_dependend_columns = [ + "Bkg.Mean", + "Bkg.Median", + "Bkg.StdDev", + "Bkg.Sum", + "Spot.Mean", + "Spot.Median", + "Spot.StdDev", + "Spot.Sum", + ] + available_columns = [ + c for c in time_dependend_columns if c in data.columns + ] + + for column in available_columns: + adjusted_column = template.format(column) + data[adjusted_column] = ( + data[column] + * data["Exposure.Time.Normalized"] + / data["Exposure.Time"] + ) + + return data + + +def normalize(raw_data: pd.DataFrame, analysis="hyb"): + with_exposure_info = add_exposure_info(raw_data, analysis=analysis) + overflow_tested = test_overflow(with_exposure_info) + xdr_data = select_xdr_data(overflow_tested) + return normalize_xdr_data(xdr_data) diff --git a/tests/test_conda_helpers.py b/tests/test_conda_helpers.py index 2eacc3f..c2953f0 100644 --- a/tests/test_conda_helpers.py +++ b/tests/test_conda_helpers.py @@ -21,25 +21,12 @@ all three test strategies will run "make lint" before to catch easily made mistakes. """ -import pytest - -def test_example_unittest(): - """example unittest - try importing the project - - will be run by 'make test' and 'make testall' but not 'make coverage' - """ - import conda_helpers # noqa: F401 - - assert True - - -@pytest.mark.functional() -def test_example_functional_test(): - """example unittest - - will be by 'make coverage' and 'make testall' but not 'make test' - """ - import conda_helpers # noqa: F401 - - assert True +def test_api(): + from conda_helpers import ( + add_exposure_info, # noqa: F401 + iter_uniques, # noqa: F401 + linear_regression, # noqa: F401 + normalize, # noqa: F401 + select, # noqa: F401 + ) diff --git a/tests/test_iter_uniques.py b/tests/test_iter_uniques.py new file mode 100644 index 0000000..fb2633b --- /dev/null +++ b/tests/test_iter_uniques.py @@ -0,0 +1,48 @@ +import pandas as pd +import pytest + + +@pytest.fixture() +def example_data(): + return pd.DataFrame({"A": [1, 2, 2], "B": [3, 4, 3], "C": ["x", "y", "z"]}) + + +def test_split_uniques_one_column(example_data): + from conda_helpers import iter_uniques + + result = list(iter_uniques(example_data, "A")) + + assert len(result) == 2 + assert isinstance(result[0], tuple) + + a_value, data = result[0] + assert a_value == 1 + assert list(data["C"]) == ["x"] + + a_value, data = result[1] + assert a_value == 2 + assert list(data["C"]) == ["y", "z"] + + +def test_split_uniques_multiple_columns(example_data): + from conda_helpers import iter_uniques + + result = list(iter_uniques(example_data, "B", "A")) + + assert len(result) == 3 + assert isinstance(result[0], tuple) + + b_value, a_value, data = result[0] + assert b_value == 3 + assert a_value == 1 + assert list(data["C"]) == ["x"] + + b_value, a_value, data = result[1] + assert b_value == 3 + assert a_value == 2 + assert list(data["C"]) == ["z"] + + b_value, a_value, data = result[2] + assert b_value == 4 + assert a_value == 2 + assert list(data["C"]) == ["y"] diff --git a/tests/test_linear_regression.py b/tests/test_linear_regression.py new file mode 100644 index 0000000..a862ac7 --- /dev/null +++ b/tests/test_linear_regression.py @@ -0,0 +1,62 @@ +import pandas as pd +import pytest + + +@pytest.fixture() +def example_data() -> pd.DataFrame: + x = list(range(1, 6)) + y = [4.1, 6.9, 10.1, 12.9, 15.9] + return pd.DataFrame({"A": x, "B": y}) + + +def test_linear_regression(example_data): + from conda_helpers import linear_regression + from conda_helpers.linear_regression import Regression + + result = linear_regression(example_data, x="A", y="B") + + assert isinstance(result, Regression) + assert pytest.approx(2.96) == result.coefficient + assert pytest.approx(2.96) == result.coeff + assert pytest.approx(1.1) == result.intercept + assert pytest.approx(0.9996349) == result.score + assert pytest.approx(0.9996349) == result.r2 + + +def test_regression_predict(example_data): + from conda_helpers import linear_regression + + regression = linear_regression(example_data, x="A", y="B") + + prediction = regression.predict(x=10) + + assert pytest.approx(30.7) == prediction + assert pytest.approx(10) == regression.predict(y=prediction) + + +def test_regression_predict_exceptions(example_data): + from conda_helpers import linear_regression + + regression = linear_regression(example_data, x="A", y="B") + + with pytest.raises(TypeError, match="expects a keyword"): + regression.predict() + + with pytest.raises(TypeError, match="expects one keyword"): + regression.predict(x=1, y=2) + + with pytest.raises(TypeError, match="takes 1 positional argument but"): + regression.predict(1) + + +def test_regression_to_dict(example_data): + from conda_helpers import linear_regression + + regression = linear_regression(example_data, x="A", y="B") + + result = regression.to_dict() + + assert sorted(result.keys()) == ["coefficient", "intercept", "score"] + assert pytest.approx(2.96) == result["coefficient"] + assert pytest.approx(1.1) == result["intercept"] + assert pytest.approx(0.9996349) == result["score"] diff --git a/tests/test_mbp.py b/tests/test_mbp.py new file mode 100644 index 0000000..6b9d7d1 --- /dev/null +++ b/tests/test_mbp.py @@ -0,0 +1,101 @@ +import pytest + + +@pytest.fixture() +def example_data(): + import pandas as pd + + data = [ + ("A", "a", 1, 1, 100, 5), + ("A", "a", 1, 2, 100, 2), + ("A", "a", 1, 3, 100, 0), + ("A", "b", 1, 1, 200, 0), + ("A", "b", 1, 2, 200, 3), + ("A", "b", 1, 3, 200, 0), + ("B", "a", 1, 1, 300, 0), + ("B", "a", 1, 2, 300, 2), + ("B", "a", 1, 3, 300, 0), + ] + columns = [ + "Analysis.Name", + "Well.Name", + "Pos.Id", + "Exposure.Id", + "Spot.Mean", + "Spot.Saturation", + ] + return pd.DataFrame(data, columns=columns) + + +@pytest.mark.parametrize( + ("analysis", "expected_cy3"), [("dry1", 100), ("hyb", 200)] +) +def test_add_exposure_info(example_data, analysis, expected_cy3): + from conda_helpers.mbp import add_exposure_info + + result = add_exposure_info(example_data, analysis=analysis) + + assert "Exposure.Channel" in result.columns + assert "Exposure.Time" in result.columns + assert "Exposure.Time.Normalized" in result.columns + + for i, channel, time in [ + (1, "Cy3", expected_cy3), + (2, "Cy5", 150), + (3, "Cy5", 15), + ]: + selection = result["Exposure.Id"] == i + selected = result[selection].copy() + assert list(selected["Exposure.Channel"].unique()) == [channel] + assert list(selected["Exposure.Time"].unique()) == [time] + + +def test_test_overflow(example_data): + from conda_helpers.mbp import TEST_OVERFLOW_COLUMN, test_overflow + + result = test_overflow(example_data) + + assert list(result[TEST_OVERFLOW_COLUMN]) == [ + True, + False, + False, + False, + True, + False, + False, + False, + False, + ] + + +def test_select_xdr_data(example_data): + from conda_helpers.mbp import ( + add_exposure_info, + select_xdr_data, + test_overflow, + ) + + tmp = add_exposure_info(example_data) + tmp = test_overflow(tmp) + result = select_xdr_data(tmp) + + assert list(result["Exposure.Channel"]) == ["Cy3"] * 3 + ["Cy5"] * 3 + assert list(result["Exposure.Time"]) == [200] * 3 + [150, 150, 15] + assert list(result["Analysis.Name"]) == list("AABABA") + assert list(result["Well.Name"]) == list("abaaab") + + +def test_normalize(example_data): + from conda_helpers.mbp import normalize + + result = normalize(example_data) + assert "Spot.Mean.Normalized" in result.columns + + assert list(result["Spot.Mean.Normalized"]) == [ + 100, + 200, + 300, + 100 * 25 / 150, + 300 * 25 / 150, + 200 * 25 / 15, + ]