Holger Frey
2 years ago
7 changed files with 336 additions and 2 deletions
@ -0,0 +1,95 @@
@@ -0,0 +1,95 @@
|
||||
from typing import Union |
||||
|
||||
import pandas |
||||
|
||||
from .helpers import ensure_list, check_columns_exist |
||||
from .selection import select |
||||
|
||||
|
||||
def select_hdr_data( |
||||
data: pandas.DataFrame, |
||||
spot_id_columns: Union[list[str], str], |
||||
time_column: str, |
||||
overflow_column: str, |
||||
) -> pandas.DataFrame: |
||||
"""selects the data for increased dynamic measurement range |
||||
|
||||
To increase the dynamic range of a measurement, multiple exposures of one |
||||
microarray might be taken. |
||||
|
||||
This function selects the data of only one exposure time per spot, based |
||||
on the information if the spot is in overflow. It starts with the weakest |
||||
signals (longest exposure time) first and chooses the next lower exposure |
||||
time, if the result in the `overflow_column` is `True`. |
||||
|
||||
This is done for each spot, and therfore a spot needs a way to be |
||||
identified across multiple exposure times. Examples for this are: |
||||
- for a single array: |
||||
the spot id (e.g. "Pos.Id") |
||||
- for multiple arrays: |
||||
the array position and the spot id (e.g. "Well.Name" and "Pos.Id") |
||||
- for multiple runs: |
||||
the name of the run, array position and the spot id |
||||
(e.g. "File.Name", "Well.Name" and "Pos.Id") |
||||
|
||||
The function will raise a KeyError if any of the provided column names |
||||
is not present in the data frame |
||||
|
||||
spot_id_columns: column names identifying a spot |
||||
time_column: column name for the (nominal) exposure time |
||||
overflow_column: column name holding a overflow test result |
||||
returns: data frame with selected hdr data per spot |
||||
""" |
||||
|
||||
check_columns_exist(data, spot_id_columns, time_column, overflow_column) |
||||
spot_ids = ensure_list(spot_id_columns) |
||||
|
||||
sorted_times = sorted(data[time_column].unique(), reverse=True) |
||||
data_by_time = (select(data, time_column, t) for t in sorted_times) |
||||
indexed_data_by_time = (dbt.set_index(spot_ids) for dbt in data_by_time) |
||||
|
||||
# get the first data set (highest exposure time) |
||||
hdr_data = next(indexed_data_by_time) |
||||
|
||||
# iterate over the rest of the data sets |
||||
for next_higher_time in indexed_data_by_time: |
||||
selection = hdr_data[overflow_column] |
||||
not_in_overlow = hdr_data.loc[~selection].copy() |
||||
replacement_for_overlow = next_higher_time[selection].copy() |
||||
hdr_data = pandas.concat((not_in_overlow, replacement_for_overlow)) |
||||
|
||||
return hdr_data.reset_index() |
||||
|
||||
|
||||
def normalize( |
||||
data: pandas.DataFrame, |
||||
normalized_time: Union[int, float], |
||||
time_column: str, |
||||
value_columns: Union[list[str], str], |
||||
template: str = "Normalized.{}", |
||||
) -> pandas.DataFrame: |
||||
"""normalizes values to a normalized exposure time |
||||
|
||||
Will raise a KeyError, if any column is not in the data frame; |
||||
raises ValueError if no template string was provided. |
||||
|
||||
data: data frame to normalize |
||||
normalized_time: exposure time to normalize to |
||||
time_column: column name of the (nominal) exposure time |
||||
value_columns: which columns to normalize |
||||
template: a Python template string for the normalized column names |
||||
returns: copy of the data with additional normalized values |
||||
""" |
||||
check_columns_exist(data, time_column, value_columns) |
||||
if "{}" not in template: |
||||
raise ValueError(f"Not a template string: '{template}'") |
||||
|
||||
data = data.copy() |
||||
|
||||
for column in ensure_list(value_columns): |
||||
normalized_name = template.format(column) |
||||
data[normalized_name] = ( |
||||
normalized_time * data[column] / data[time_column] |
||||
) |
||||
|
||||
return data |
@ -0,0 +1,151 @@
@@ -0,0 +1,151 @@
|
||||
import pytest |
||||
|
||||
CSV_FULL_DATA = """ |
||||
spot time background signal overflow |
||||
1 100 1 100 FALSE |
||||
1 10 2 200 FALSE |
||||
1 1 3 300 FALSE |
||||
2 100 4 400 TRUE |
||||
2 10 5 500 FALSE |
||||
2 1 6 600 FALSE |
||||
3 100 7 700 TRUE |
||||
3 10 8 800 TRUE |
||||
3 1 9 900 FALSE |
||||
4 100 10 1000 TRUE |
||||
4 10 11 1100 TRUE |
||||
4 1 12 1200 TRUE |
||||
""" |
||||
|
||||
CSV_ONE_TIME_DATA = """ |
||||
spot time background signal overflow |
||||
1 100 1 100 TRUE |
||||
2 100 2 200 FALSE |
||||
3 100 3 300 TRUE |
||||
""" |
||||
|
||||
CSV_HDR_DATA = """ |
||||
spot time background signal overflow |
||||
1 100 1 100 FALSE |
||||
2 10 5 500 FALSE |
||||
3 1 9 900 FALSE |
||||
4 1 12 1200 TRUE |
||||
""" |
||||
|
||||
CSV_NORMALIZED_HDR_DATA = """ |
||||
spot time background signal overflow n.background n.signal |
||||
1 100 1 100 FALSE 2 200 |
||||
2 10 5 500 FALSE 100 1000 |
||||
3 1 9 900 FALSE 1800 180000 |
||||
4 1 12 1200 TRUE 2400 240000 |
||||
""" |
||||
|
||||
|
||||
def csv_to_data_frame(text): |
||||
import io |
||||
|
||||
import pandas |
||||
|
||||
buffer = io.StringIO(text.strip()) |
||||
return pandas.read_csv(buffer, sep="\t") |
||||
|
||||
|
||||
@pytest.fixture |
||||
def full_source_data(): |
||||
yield csv_to_data_frame(CSV_FULL_DATA) |
||||
|
||||
|
||||
@pytest.fixture |
||||
def one_time_source_data(): |
||||
yield csv_to_data_frame(CSV_ONE_TIME_DATA) |
||||
|
||||
|
||||
@pytest.fixture |
||||
def hdr_data(): |
||||
yield csv_to_data_frame(CSV_HDR_DATA) |
||||
|
||||
|
||||
@pytest.fixture |
||||
def hdr_normalized_data(): |
||||
yield csv_to_data_frame(CSV_HDR_DATA) |
||||
|
||||
|
||||
def test_select_hdr_data_full_data(full_source_data, hdr_data): |
||||
"""select the hdr data from a data frame with multiple exposure times""" |
||||
from sensospot_tools.hdr import select_hdr_data |
||||
|
||||
result = select_hdr_data( |
||||
data=full_source_data, |
||||
spot_id_columns="spot", |
||||
time_column="time", |
||||
overflow_column="overflow", |
||||
) |
||||
|
||||
for column in hdr_data.columns: |
||||
assert list(result[column]) == list(hdr_data[column]) |
||||
|
||||
|
||||
def test_select_hdr_data_one_time(one_time_source_data): |
||||
"""select the hdr data from a data frame with only one exposure time""" |
||||
from sensospot_tools.hdr import select_hdr_data |
||||
|
||||
result = select_hdr_data( |
||||
data=one_time_source_data, |
||||
spot_id_columns="spot", |
||||
time_column="time", |
||||
overflow_column="overflow", |
||||
) |
||||
|
||||
for column in one_time_source_data.columns: |
||||
assert list(result[column]) == list(one_time_source_data[column]) |
||||
|
||||
|
||||
def test_select_hdr_raises_error_on_wrong_column(one_time_source_data): |
||||
from sensospot_tools.hdr import select_hdr_data |
||||
|
||||
with pytest.raises(KeyError): |
||||
select_hdr_data( |
||||
data=one_time_source_data, |
||||
spot_id_columns="spot", |
||||
time_column="time", |
||||
overflow_column="UNKNOWN", |
||||
) |
||||
|
||||
|
||||
def test_normalize(hdr_data, hdr_normalized_data): |
||||
from sensospot_tools.hdr import normalize |
||||
|
||||
result = normalize( |
||||
hdr_data, |
||||
normalized_time=200, |
||||
time_column="time", |
||||
value_columns=["background", "signal"], |
||||
template="n.{}", |
||||
) |
||||
|
||||
for column in hdr_normalized_data.columns: |
||||
assert list(result[column]) == list(hdr_normalized_data[column]) |
||||
|
||||
|
||||
def test_normalize_raises_error_on_wrong_column(hdr_data): |
||||
from sensospot_tools.hdr import normalize |
||||
|
||||
with pytest.raises(KeyError): |
||||
normalize( |
||||
hdr_data, |
||||
normalized_time=200, |
||||
time_column="time", |
||||
value_columns=["UNKONWN", "signal"], |
||||
) |
||||
|
||||
|
||||
def test_normalize_raises_error_no_templae_string(hdr_data): |
||||
from sensospot_tools.hdr import normalize |
||||
|
||||
with pytest.raises(ValueError): |
||||
normalize( |
||||
hdr_data, |
||||
normalized_time=200, |
||||
time_column="time", |
||||
value_columns="signal", |
||||
template="NO TEMPLATE", |
||||
) |
Loading…
Reference in new issue