Browse Source

now relying on sensospot_tools for functionality

main
Holger Frey 1 year ago
parent
commit
f259f6c209
  1. 1
      pyproject.toml
  2. 5
      src/conda_helpers/__init__.py
  3. 60
      src/conda_helpers/iter_uniques.py
  4. 123
      src/conda_helpers/mbp.py
  5. 2
      tests/test_conda_helpers.py
  6. 48
      tests/test_iter_uniques.py
  7. 21
      tests/test_mbp.py

1
pyproject.toml

@ -29,6 +29,7 @@ classifiers = [
dependencies = [ dependencies = [
"pandas", "pandas",
"scikit-learn", "scikit-learn",
"sensospot_tools >= 0.2",
] ]
[project.urls] [project.urls]

5
src/conda_helpers/__init__.py

@ -3,9 +3,10 @@
Helpers for working with data frames in a conda environment Helpers for working with data frames in a conda environment
""" """
__version__ = "0.0.1" __version__ = "0.0.2"
from .iter_uniques import iter_uniques, select # noqa: F401 from sensospot_tools import select, split # noqa: F401
from .linear_regression import linear_regression # noqa: F401 from .linear_regression import linear_regression # noqa: F401
from .mbp import add_exposure_info, normalize # noqa: F401 from .mbp import add_exposure_info, normalize # noqa: F401

60
src/conda_helpers/iter_uniques.py

@ -1,60 +0,0 @@
from __future__ import annotations
from typing import Any
import pandas as pd
def _iter_uniques(
data: pd.DataFrame, *on: tuple[Any], _prev_values: None | tuple[Any] = None
) -> tuple[Any, ..., pd.DataFrame]:
"""Splits a data frame on uniques values in a column
Returns a generator of tuples with at least two elements.
The _last_ element is the resulting partial data frame,
the element(s) before are the values used to split up the original data.
Example:
for well, pos, partial_data in split_uniques(full_data, "Well", "Pos"):
# `well` is one of the unique values in full_data["Well"]
# `pos` is one of the unique values in full_data["Pos"]
# parital_data is a data frame, containing values for this well and pos
"""
if _prev_values is None:
_prev_values = ()
current_column, *rest = on
for current_value in data[current_column].unique():
selection = data[current_column] == current_value
selected = data.loc[selection].copy()
values = (*_prev_values, current_value)
if rest:
yield from _iter_uniques(selected, *rest, _prev_values=values)
else:
yield *values, selected
def iter_uniques(
data: pd.DataFrame, *on: tuple[Any]
) -> tuple[Any, ..., pd.DataFrame]:
"""Splits a data frame on uniques values in a column
Returns a generator of tuples with at least two elements.
The _last_ element is the resulting partial data frame,
the element(s) before are the values used to split up the original data.
Example:
for well, pos, partial_data in split_uniques(full_data, "Well", "Pos"):
# `well` is one of the unique values in full_data["Well"]
# `pos` is one of the unique values in full_data["Pos"]
# parital_data is a data frame, containing values for this well and pos
"""
yield from _iter_uniques(data, *on)
def select(data: pd.DataFrame, column: str, value: Any) -> pd.DataFrame:
selection = data[column] == value
return data.loc[selection].copy()

123
src/conda_helpers/mbp.py

@ -1,17 +1,41 @@
import pandas as pd import pandas as pd
from sensospot_tools import (
normalize as normalize_xdr_data,
)
from sensospot_tools import (
select_hdr_data,
split,
)
from .iter_uniques import select EXPOSURE_ID = "Exposure.Id"
EXPOSURE_CHANNEL = "Exposure.Channel"
EXPOSURE_TIME = "Exposure.Time"
EXPOSURE_TIME_NORMALIZED = "Exposure.Time.Normalized"
EXPOSURE_COLUMNS = [ EXPOSURE_COLUMNS = [
"Exposure.Id", EXPOSURE_ID,
"Exposure.Channel", EXPOSURE_CHANNEL,
"Exposure.Time", EXPOSURE_TIME,
"Exposure.Time.Normalized", EXPOSURE_TIME_NORMALIZED,
] ]
SATURATION_LIMIT = 2 SATURATION_LIMIT = 2
SPOT_SATURATION = "Spot.Saturation"
TEST_OVERFLOW = "Test.Spot.Overflow"
TEST_OVERFLOW_COLUMN = "Test.Spot.Overflow" SPOT_ID_COLUMNS = ["Analysis.Name", "Well.Name", "Pos.Id"]
TIME_DEPENDENT_COLUMNS = [
"Bkg.Mean",
"Bkg.Median",
"Bkg.StdDev",
"Bkg.Sum",
"Spot.Mean",
"Spot.Median",
"Spot.StdDev",
"Spot.Sum",
]
def add_exposure_info(data: pd.DataFrame, analysis="hyb") -> pd.DataFrame: def add_exposure_info(data: pd.DataFrame, analysis="hyb") -> pd.DataFrame:
@ -22,79 +46,36 @@ def add_exposure_info(data: pd.DataFrame, analysis="hyb") -> pd.DataFrame:
(3, "Cy5", 15, 25), (3, "Cy5", 15, 25),
] ]
exposure_df = pd.DataFrame(exposure_values, columns=EXPOSURE_COLUMNS) exposure_df = pd.DataFrame(exposure_values, columns=EXPOSURE_COLUMNS)
return data.merge(exposure_df, on="Exposure.Id") return data.merge(exposure_df, on=EXPOSURE_ID)
def test_overflow( def test_overflow(data: pd.DataFrame, result_column: str = TEST_OVERFLOW):
data: pd.DataFrame, result_column: str = TEST_OVERFLOW_COLUMN data[result_column] = data[SPOT_SATURATION] > SATURATION_LIMIT
):
data[result_column] = data["Spot.Saturation"] > SATURATION_LIMIT
return data return data
def select_xdr_data(data: pd.DataFrame) -> pd.DataFrame: def normalize(raw_data: pd.DataFrame, analysis="hyb"):
xdr_columns = [*EXPOSURE_COLUMNS, TEST_OVERFLOW_COLUMN] with_exposure_info = add_exposure_info(raw_data, analysis=analysis)
missing = [c for c in xdr_columns if c not in data.columns] overflow_tested = test_overflow(with_exposure_info)
if missing: normalized_data = (
message = f"Columns {missing} are missing in the data frame" _normalize_channel(data)
raise KeyError(message) for _, data in split(overflow_tested, EXPOSURE_CHANNEL)
)
cy3_data = select(data, "Exposure.Channel", "Cy3") return pd.concat(normalized_data).reset_index()
cy5_data = select(data, "Exposure.Channel", "Cy5")
id_columns = ["Analysis.Name", "Well.Name", "Pos.Id"]
cy5_long = select(cy5_data, "Exposure.Time", 150).set_index(id_columns)
cy5_short = select(cy5_data, "Exposure.Time", 15).set_index(id_columns)
in_overflow = cy5_long[TEST_OVERFLOW_COLUMN]
cy5_long_selected = cy5_long.loc[~in_overflow].reset_index()
cy5_short_selected = cy5_short.loc[in_overflow].reset_index()
return pd.concat(
[cy3_data, cy5_long_selected, cy5_short_selected]
).reset_index()
def normalize_xdr_data( def _normalize_channel(data: pd.DataFrame) -> pd.DataFrame:
data: pd.DataFrame, template="{}.Normalized" normalized_time = data[EXPOSURE_TIME_NORMALIZED].unique()[0]
) -> pd.DataFrame: hdr_data = select_hdr_data(
cy5_data = select(data, "Exposure.Channel", "Cy5") data, SPOT_ID_COLUMNS, EXPOSURE_TIME, TEST_OVERFLOW
cy5_long_data = select(cy5_data, "Exposure.Time", 150)
if True in list(cy5_long_data[TEST_OVERFLOW_COLUMN].unique()):
message = (
"Some spots for long Cy5 exposure time are still in overflow. "
"Did you forget to select the appropriate data (select_xdr_data) ?"
) )
raise ValueError(message)
time_dependend_columns = [
"Bkg.Mean",
"Bkg.Median",
"Bkg.StdDev",
"Bkg.Sum",
"Spot.Mean",
"Spot.Median",
"Spot.StdDev",
"Spot.Sum",
]
available_columns = [ available_columns = [
c for c in time_dependend_columns if c in data.columns c for c in hdr_data.columns if c in TIME_DEPENDENT_COLUMNS
] ]
return normalize_xdr_data(
for column in available_columns: hdr_data,
adjusted_column = template.format(column) normalized_time,
data[adjusted_column] = ( EXPOSURE_TIME,
data[column] available_columns,
* data["Exposure.Time.Normalized"] "{}.Normalized",
/ data["Exposure.Time"]
) )
return data
def normalize(raw_data: pd.DataFrame, analysis="hyb"):
with_exposure_info = add_exposure_info(raw_data, analysis=analysis)
overflow_tested = test_overflow(with_exposure_info)
xdr_data = select_xdr_data(overflow_tested)
return normalize_xdr_data(xdr_data)

2
tests/test_conda_helpers.py

@ -25,8 +25,8 @@ mistakes.
def test_api(): def test_api():
from conda_helpers import ( from conda_helpers import (
add_exposure_info, # noqa: F401 add_exposure_info, # noqa: F401
iter_uniques, # noqa: F401
linear_regression, # noqa: F401 linear_regression, # noqa: F401
normalize, # noqa: F401 normalize, # noqa: F401
select, # noqa: F401 select, # noqa: F401
split, # noqa: F401
) )

48
tests/test_iter_uniques.py

@ -1,48 +0,0 @@
import pandas as pd
import pytest
@pytest.fixture()
def example_data():
return pd.DataFrame({"A": [1, 2, 2], "B": [3, 4, 3], "C": ["x", "y", "z"]})
def test_split_uniques_one_column(example_data):
from conda_helpers import iter_uniques
result = list(iter_uniques(example_data, "A"))
assert len(result) == 2
assert isinstance(result[0], tuple)
a_value, data = result[0]
assert a_value == 1
assert list(data["C"]) == ["x"]
a_value, data = result[1]
assert a_value == 2
assert list(data["C"]) == ["y", "z"]
def test_split_uniques_multiple_columns(example_data):
from conda_helpers import iter_uniques
result = list(iter_uniques(example_data, "B", "A"))
assert len(result) == 3
assert isinstance(result[0], tuple)
b_value, a_value, data = result[0]
assert b_value == 3
assert a_value == 1
assert list(data["C"]) == ["x"]
b_value, a_value, data = result[1]
assert b_value == 3
assert a_value == 2
assert list(data["C"]) == ["z"]
b_value, a_value, data = result[2]
assert b_value == 4
assert a_value == 2
assert list(data["C"]) == ["y"]

21
tests/test_mbp.py

@ -51,11 +51,11 @@ def test_add_exposure_info(example_data, analysis, expected_cy3):
def test_test_overflow(example_data): def test_test_overflow(example_data):
from conda_helpers.mbp import TEST_OVERFLOW_COLUMN, test_overflow from conda_helpers.mbp import TEST_OVERFLOW, test_overflow
result = test_overflow(example_data) result = test_overflow(example_data)
assert list(result[TEST_OVERFLOW_COLUMN]) == [ assert list(result[TEST_OVERFLOW]) == [
True, True,
False, False,
False, False,
@ -68,23 +68,6 @@ def test_test_overflow(example_data):
] ]
def test_select_xdr_data(example_data):
from conda_helpers.mbp import (
add_exposure_info,
select_xdr_data,
test_overflow,
)
tmp = add_exposure_info(example_data)
tmp = test_overflow(tmp)
result = select_xdr_data(tmp)
assert list(result["Exposure.Channel"]) == ["Cy3"] * 3 + ["Cy5"] * 3
assert list(result["Exposure.Time"]) == [200] * 3 + [150, 150, 15]
assert list(result["Analysis.Name"]) == list("AABABA")
assert list(result["Well.Name"]) == list("abaaab")
def test_normalize(example_data): def test_normalize(example_data):
from conda_helpers.mbp import normalize from conda_helpers.mbp import normalize

Loading…
Cancel
Save