Browse Source

first working version

main
Holger Frey 1 year ago
parent
commit
278298cadc
  1. 1
      pyproject.toml
  2. 5
      src/conda_helpers/__init__.py
  3. 60
      src/conda_helpers/iter_uniques.py
  4. 47
      src/conda_helpers/linear_regression.py
  5. 100
      src/conda_helpers/mbp.py
  6. 29
      tests/test_conda_helpers.py
  7. 48
      tests/test_iter_uniques.py
  8. 62
      tests/test_linear_regression.py
  9. 101
      tests/test_mbp.py

1
pyproject.toml

@ -28,6 +28,7 @@ classifiers = [
dependencies = [ dependencies = [
"pandas", "pandas",
"scikit-learn",
] ]
[project.urls] [project.urls]

5
src/conda_helpers/__init__.py

@ -4,3 +4,8 @@ Helpers for working with data frames in a conda environment
""" """
__version__ = "0.0.1" __version__ = "0.0.1"
from .iter_uniques import iter_uniques, select # noqa: F401
from .linear_regression import linear_regression # noqa: F401
from .mbp import add_exposure_info, normalize # noqa: F401

60
src/conda_helpers/iter_uniques.py

@ -0,0 +1,60 @@
from __future__ import annotations
from typing import Any
import pandas as pd
def _iter_uniques(
data: pd.DataFrame, *on: tuple[Any], _prev_values: None | tuple[Any] = None
) -> tuple[Any, ..., pd.DataFrame]:
"""Splits a data frame on uniques values in a column
Returns a generator of tuples with at least two elements.
The _last_ element is the resulting partial data frame,
the element(s) before are the values used to split up the original data.
Example:
for well, pos, partial_data in split_uniques(full_data, "Well", "Pos"):
# `well` is one of the unique values in full_data["Well"]
# `pos` is one of the unique values in full_data["Pos"]
# parital_data is a data frame, containing values for this well and pos
"""
if _prev_values is None:
_prev_values = ()
current_column, *rest = on
for current_value in data[current_column].unique():
selection = data[current_column] == current_value
selected = data.loc[selection].copy()
values = (*_prev_values, current_value)
if rest:
yield from _iter_uniques(selected, *rest, _prev_values=values)
else:
yield *values, selected
def iter_uniques(
data: pd.DataFrame, *on: tuple[Any]
) -> tuple[Any, ..., pd.DataFrame]:
"""Splits a data frame on uniques values in a column
Returns a generator of tuples with at least two elements.
The _last_ element is the resulting partial data frame,
the element(s) before are the values used to split up the original data.
Example:
for well, pos, partial_data in split_uniques(full_data, "Well", "Pos"):
# `well` is one of the unique values in full_data["Well"]
# `pos` is one of the unique values in full_data["Pos"]
# parital_data is a data frame, containing values for this well and pos
"""
yield from _iter_uniques(data, *on)
def select(data: pd.DataFrame, column: str, value: Any) -> pd.DataFrame:
selection = data[column] == value
return data.loc[selection].copy()

47
src/conda_helpers/linear_regression.py

@ -0,0 +1,47 @@
from __future__ import annotations
import dataclasses
import pandas as pd
from sklearn import linear_model
@dataclasses.dataclass
class Regression:
intercept: float
coefficient: float
score: float
@property
def coeff(self) -> float:
return self.coefficient
@property
def r2(self) -> float:
return self.score
def predict(
self, *, x: float | None = None, y: float | None = None
) -> float:
"""predict a value if x or y is given"""
if x is not None and y is not None:
msg = "predict() expects one keyword argument 'x' or 'y', got both"
raise TypeError(msg)
if x is not None:
return self.intercept + x * self.coefficient
if y is not None:
return (y - self.intercept) / self.coefficient
msg = "predict() expects a keyword argument 'x' or 'y'"
raise TypeError(msg)
def to_dict(self):
return dataclasses.asdict(self)
def linear_regression(data: pd.DataFrame, *, x: str, y: str) -> Regression:
"""calculates a linear regression for two columns of a DataFrame"""
x_values = data[x].to_numpy().reshape(-1, 1)
y_values = data[y].to_numpy().reshape(-1, 1)
fit = linear_model.LinearRegression().fit(x_values, y_values)
score = fit.score(x_values, y_values)
return Regression(fit.intercept_[0], fit.coef_[0][0], score)

100
src/conda_helpers/mbp.py

@ -0,0 +1,100 @@
import pandas as pd
from .iter_uniques import select
EXPOSURE_COLUMNS = [
"Exposure.Id",
"Exposure.Channel",
"Exposure.Time",
"Exposure.Time.Normalized",
]
SATURATION_LIMIT = 2
TEST_OVERFLOW_COLUMN = "Test.Spot.Overflow"
def add_exposure_info(data: pd.DataFrame, analysis="hyb") -> pd.DataFrame:
time_cy3 = 100 if "1" in analysis else 200
exposure_values = [
(1, "Cy3", time_cy3, time_cy3),
(2, "Cy5", 150, 25),
(3, "Cy5", 15, 25),
]
exposure_df = pd.DataFrame(exposure_values, columns=EXPOSURE_COLUMNS)
return data.merge(exposure_df, on="Exposure.Id")
def test_overflow(
data: pd.DataFrame, result_column: str = TEST_OVERFLOW_COLUMN
):
data[result_column] = data["Spot.Saturation"] > SATURATION_LIMIT
return data
def select_xdr_data(data: pd.DataFrame) -> pd.DataFrame:
xdr_columns = [*EXPOSURE_COLUMNS, TEST_OVERFLOW_COLUMN]
missing = [c for c in xdr_columns if c not in data.columns]
if missing:
message = f"Columns {missing} are missing in the data frame"
raise KeyError(message)
cy3_data = select(data, "Exposure.Channel", "Cy3")
cy5_data = select(data, "Exposure.Channel", "Cy5")
id_columns = ["Analysis.Name", "Well.Name", "Pos.Id"]
cy5_long = select(cy5_data, "Exposure.Time", 150).set_index(id_columns)
cy5_short = select(cy5_data, "Exposure.Time", 15).set_index(id_columns)
in_overflow = cy5_long[TEST_OVERFLOW_COLUMN]
cy5_long_selected = cy5_long.loc[~in_overflow].reset_index()
cy5_short_selected = cy5_short.loc[in_overflow].reset_index()
return pd.concat(
[cy3_data, cy5_long_selected, cy5_short_selected]
).reset_index()
def normalize_xdr_data(
data: pd.DataFrame, template="{}.Normalized"
) -> pd.DataFrame:
cy5_data = select(data, "Exposure.Channel", "Cy5")
cy5_long_data = select(cy5_data, "Exposure.Time", 150)
if True in list(cy5_long_data[TEST_OVERFLOW_COLUMN].unique()):
message = (
"Some spots for long Cy5 exposure time are still in overflow. "
"Did you forget to select the appropriate data (select_xdr_data) ?"
)
raise ValueError(message)
time_dependend_columns = [
"Bkg.Mean",
"Bkg.Median",
"Bkg.StdDev",
"Bkg.Sum",
"Spot.Mean",
"Spot.Median",
"Spot.StdDev",
"Spot.Sum",
]
available_columns = [
c for c in time_dependend_columns if c in data.columns
]
for column in available_columns:
adjusted_column = template.format(column)
data[adjusted_column] = (
data[column]
* data["Exposure.Time.Normalized"]
/ data["Exposure.Time"]
)
return data
def normalize(raw_data: pd.DataFrame, analysis="hyb"):
with_exposure_info = add_exposure_info(raw_data, analysis=analysis)
overflow_tested = test_overflow(with_exposure_info)
xdr_data = select_xdr_data(overflow_tested)
return normalize_xdr_data(xdr_data)

29
tests/test_conda_helpers.py

@ -21,25 +21,12 @@ all three test strategies will run "make lint" before to catch easily made
mistakes. mistakes.
""" """
import pytest
def test_api():
def test_example_unittest(): from conda_helpers import (
"""example unittest - try importing the project add_exposure_info, # noqa: F401
iter_uniques, # noqa: F401
will be run by 'make test' and 'make testall' but not 'make coverage' linear_regression, # noqa: F401
""" normalize, # noqa: F401
import conda_helpers # noqa: F401 select, # noqa: F401
)
assert True
@pytest.mark.functional()
def test_example_functional_test():
"""example unittest
will be by 'make coverage' and 'make testall' but not 'make test'
"""
import conda_helpers # noqa: F401
assert True

48
tests/test_iter_uniques.py

@ -0,0 +1,48 @@
import pandas as pd
import pytest
@pytest.fixture()
def example_data():
return pd.DataFrame({"A": [1, 2, 2], "B": [3, 4, 3], "C": ["x", "y", "z"]})
def test_split_uniques_one_column(example_data):
from conda_helpers import iter_uniques
result = list(iter_uniques(example_data, "A"))
assert len(result) == 2
assert isinstance(result[0], tuple)
a_value, data = result[0]
assert a_value == 1
assert list(data["C"]) == ["x"]
a_value, data = result[1]
assert a_value == 2
assert list(data["C"]) == ["y", "z"]
def test_split_uniques_multiple_columns(example_data):
from conda_helpers import iter_uniques
result = list(iter_uniques(example_data, "B", "A"))
assert len(result) == 3
assert isinstance(result[0], tuple)
b_value, a_value, data = result[0]
assert b_value == 3
assert a_value == 1
assert list(data["C"]) == ["x"]
b_value, a_value, data = result[1]
assert b_value == 3
assert a_value == 2
assert list(data["C"]) == ["z"]
b_value, a_value, data = result[2]
assert b_value == 4
assert a_value == 2
assert list(data["C"]) == ["y"]

62
tests/test_linear_regression.py

@ -0,0 +1,62 @@
import pandas as pd
import pytest
@pytest.fixture()
def example_data() -> pd.DataFrame:
x = list(range(1, 6))
y = [4.1, 6.9, 10.1, 12.9, 15.9]
return pd.DataFrame({"A": x, "B": y})
def test_linear_regression(example_data):
from conda_helpers import linear_regression
from conda_helpers.linear_regression import Regression
result = linear_regression(example_data, x="A", y="B")
assert isinstance(result, Regression)
assert pytest.approx(2.96) == result.coefficient
assert pytest.approx(2.96) == result.coeff
assert pytest.approx(1.1) == result.intercept
assert pytest.approx(0.9996349) == result.score
assert pytest.approx(0.9996349) == result.r2
def test_regression_predict(example_data):
from conda_helpers import linear_regression
regression = linear_regression(example_data, x="A", y="B")
prediction = regression.predict(x=10)
assert pytest.approx(30.7) == prediction
assert pytest.approx(10) == regression.predict(y=prediction)
def test_regression_predict_exceptions(example_data):
from conda_helpers import linear_regression
regression = linear_regression(example_data, x="A", y="B")
with pytest.raises(TypeError, match="expects a keyword"):
regression.predict()
with pytest.raises(TypeError, match="expects one keyword"):
regression.predict(x=1, y=2)
with pytest.raises(TypeError, match="takes 1 positional argument but"):
regression.predict(1)
def test_regression_to_dict(example_data):
from conda_helpers import linear_regression
regression = linear_regression(example_data, x="A", y="B")
result = regression.to_dict()
assert sorted(result.keys()) == ["coefficient", "intercept", "score"]
assert pytest.approx(2.96) == result["coefficient"]
assert pytest.approx(1.1) == result["intercept"]
assert pytest.approx(0.9996349) == result["score"]

101
tests/test_mbp.py

@ -0,0 +1,101 @@
import pytest
@pytest.fixture()
def example_data():
import pandas as pd
data = [
("A", "a", 1, 1, 100, 5),
("A", "a", 1, 2, 100, 2),
("A", "a", 1, 3, 100, 0),
("A", "b", 1, 1, 200, 0),
("A", "b", 1, 2, 200, 3),
("A", "b", 1, 3, 200, 0),
("B", "a", 1, 1, 300, 0),
("B", "a", 1, 2, 300, 2),
("B", "a", 1, 3, 300, 0),
]
columns = [
"Analysis.Name",
"Well.Name",
"Pos.Id",
"Exposure.Id",
"Spot.Mean",
"Spot.Saturation",
]
return pd.DataFrame(data, columns=columns)
@pytest.mark.parametrize(
("analysis", "expected_cy3"), [("dry1", 100), ("hyb", 200)]
)
def test_add_exposure_info(example_data, analysis, expected_cy3):
from conda_helpers.mbp import add_exposure_info
result = add_exposure_info(example_data, analysis=analysis)
assert "Exposure.Channel" in result.columns
assert "Exposure.Time" in result.columns
assert "Exposure.Time.Normalized" in result.columns
for i, channel, time in [
(1, "Cy3", expected_cy3),
(2, "Cy5", 150),
(3, "Cy5", 15),
]:
selection = result["Exposure.Id"] == i
selected = result[selection].copy()
assert list(selected["Exposure.Channel"].unique()) == [channel]
assert list(selected["Exposure.Time"].unique()) == [time]
def test_test_overflow(example_data):
from conda_helpers.mbp import TEST_OVERFLOW_COLUMN, test_overflow
result = test_overflow(example_data)
assert list(result[TEST_OVERFLOW_COLUMN]) == [
True,
False,
False,
False,
True,
False,
False,
False,
False,
]
def test_select_xdr_data(example_data):
from conda_helpers.mbp import (
add_exposure_info,
select_xdr_data,
test_overflow,
)
tmp = add_exposure_info(example_data)
tmp = test_overflow(tmp)
result = select_xdr_data(tmp)
assert list(result["Exposure.Channel"]) == ["Cy3"] * 3 + ["Cy5"] * 3
assert list(result["Exposure.Time"]) == [200] * 3 + [150, 150, 15]
assert list(result["Analysis.Name"]) == list("AABABA")
assert list(result["Well.Name"]) == list("abaaab")
def test_normalize(example_data):
from conda_helpers.mbp import normalize
result = normalize(example_data)
assert "Spot.Mean.Normalized" in result.columns
assert list(result["Spot.Mean.Normalized"]) == [
100,
200,
300,
100 * 25 / 150,
300 * 25 / 150,
200 * 25 / 15,
]
Loading…
Cancel
Save