Parsing the numerical output from Sensovation SensoSpot image analysis.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

117 lines
3.6 KiB

from .columns import (
RAW_DATA_POS_ID,
CALC_SPOT_OVERFLOW,
META_DATA_WELL_ROW,
RAW_DATA_SPOT_MEAN,
META_DATA_WELL_COLUMN,
SETTINGS_EXPOSURE_TIME,
SETTINGS_EXPOSURE_CHANNEL,
RAW_DATA_NORMALIZATION_MAP,
SETTINGS_NORMALIZED_EXPOSURE_TIME,
)
PROBE_MULTI_INDEX = [
META_DATA_WELL_ROW,
META_DATA_WELL_COLUMN,
RAW_DATA_POS_ID,
]
from .utils import split_data_frame, apply_exposure_map
def _check_overflow_limit(data_frame, column=RAW_DATA_SPOT_MEAN, limit=0.5):
""" add overflow info, based on column and limit """
data_frame[CALC_SPOT_OVERFLOW] = data_frame[column] > limit
return data_frame
def reduce_overflow(data_frame, column=RAW_DATA_SPOT_MEAN, limit=0.5):
""" reduces the data set per channel, eliminating overflowing spots """
data_frame = _check_overflow_limit(data_frame, column, limit)
split_frames = split_data_frame(data_frame, SETTINGS_EXPOSURE_CHANNEL)
return {
channel_id: _reduce_overflow_in_channel(channel_frame)
for channel_id, channel_frame in split_frames.items()
}
def _reduce_overflow_in_channel(channel_frame):
""" does the heavy lifting for reduce_overflow """
split_frames = split_data_frame(channel_frame, SETTINGS_EXPOSURE_TIME)
if len(split_frames) == 1:
# shortcut, if there is only one exposure in the channel
return channel_frame
exposure_times = sorted(split_frames.keys(), reverse=True)
max_time, *rest_times = exposure_times
result_frame = split_frames[max_time].set_index(PROBE_MULTI_INDEX)
for next_time in rest_times:
mask = result_frame[CALC_SPOT_OVERFLOW] == True # noqa: E712
next_frame = split_frames[next_time].set_index(PROBE_MULTI_INDEX)
result_frame.loc[mask] = next_frame.loc[mask]
return result_frame.reset_index()
def _infer_normalization_map(split_data_frames):
""" extract a time normalization map from split data frames """
return {
key: frame[SETTINGS_EXPOSURE_TIME].max()
for key, frame in split_data_frames.items()
}
def normalize_exposure_time(split_data_frames):
"""add time normalized values to the split data frames
The max exposure time per channel is used for normalization.
"""
normalization_map = _infer_normalization_map(split_data_frames)
return {
key: normalize_channel(frame, normalization_map[key])
for key, frame in split_data_frames.items()
}
def normalize_channel(channel_frame, normalized_time):
""" add time normalized values to a channel data frames """
channel_frame = channel_frame.copy()
channel_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME] = normalized_time
for original_col, normalized_col in RAW_DATA_NORMALIZATION_MAP.items():
channel_frame[normalized_col] = (
channel_frame[original_col] / channel_frame[SETTINGS_EXPOSURE_TIME]
) * channel_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME]
return channel_frame
def split_channels(
data_frame,
exposure_map=None,
overflow_column=RAW_DATA_SPOT_MEAN,
overflow_limit=0.5,
):
"""augment normalize the measurement exposures
exposure map:
keys: must be the same as the exposure ids,
values: objects with at least time and channel attributes
if the exposure map is None, the values from the optionally parsed
measurement parameters are used.
The max exposure time per channel is used for normalization.
"""
exposure_data_frame = apply_exposure_map(data_frame, exposure_map)
split_data_frames = reduce_overflow(
exposure_data_frame, overflow_column, overflow_limit
)
return normalize_exposure_time(split_data_frames)