Parsing the numerical output from Sensovation SensoSpot image analysis.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

110 lines
3.8 KiB

from pandas.api.types import is_numeric_dtype
from .utils import split
from .columns import (
RAW_DATA_POS_ID,
RAW_DATA_SPOT_SAT,
CALC_SPOT_OVERFLOW,
META_DATA_WELL_ROW,
META_DATA_WELL_COLUMN,
SETTINGS_EXPOSURE_TIME,
SETTINGS_EXPOSURE_CHANNEL,
RAW_DATA_NORMALIZATION_MAP,
SETTINGS_NORMALIZED_EXPOSURE_TIME,
)
PROBE_MULTI_INDEX = [
META_DATA_WELL_ROW,
META_DATA_WELL_COLUMN,
RAW_DATA_POS_ID,
]
def _check_if_xdr_ready(data_frame):
"""check if a data frame meets the constraints for xdr"""
required_columns = {SETTINGS_EXPOSURE_CHANNEL, SETTINGS_EXPOSURE_TIME}
if not required_columns.issubset(data_frame.columns):
raise ValueError("XDR: Apply an exposure map first")
if len(data_frame[SETTINGS_EXPOSURE_CHANNEL].unique()) != 1:
raise ValueError("XDR: Mixed Exposure Channels")
if not is_numeric_dtype(data_frame[SETTINGS_EXPOSURE_TIME]):
raise ValueError("XDR: Exposure time is not numerical")
if data_frame[SETTINGS_EXPOSURE_TIME].hasnans:
raise ValueError("XDR: Exposure time contains NaNs")
def _calc_overflow_info(data_frame, column=RAW_DATA_SPOT_SAT, limit=2):
"""add overflow info, based on column and limit"""
data_frame.loc[:, CALC_SPOT_OVERFLOW] = data_frame[column] > limit
return data_frame
def _reduce_overflow(data_frame):
"""the heavy lifting for creating an extended dynamic range"""
split_frames = split(data_frame, SETTINGS_EXPOSURE_TIME)
# get the exposure times, longest first
exposure_times = sorted(split_frames.keys(), reverse=True)
max_time, *rest_times = exposure_times
result_frame = split_frames[max_time].set_index(PROBE_MULTI_INDEX)
for next_time in rest_times:
mask = result_frame[CALC_SPOT_OVERFLOW] == True # noqa: E712
next_frame = split_frames[next_time].set_index(PROBE_MULTI_INDEX)
rf_index = set(result_frame.index)
nf_index = set(next_frame.index)
diff = rf_index - nf_index | nf_index - rf_index
if diff:
num = len(diff)
raise ValueError(
f"XDR: Scan Data is incomplete, differs on {num} probes"
)
result_frame.loc[mask] = next_frame.loc[mask]
return result_frame.reset_index()
def blend(data_frame, column=RAW_DATA_SPOT_SAT, limit=2):
"""creates an extended dynamic range, eliminating overflowing spots"""
_check_if_xdr_ready(data_frame)
if CALC_SPOT_OVERFLOW not in data_frame.columns:
data_frame = _calc_overflow_info(data_frame, column, limit)
return _reduce_overflow(data_frame)
def normalize_values(data_frame, normalized_time=None):
"""add exposure time normalized values to a data frame
will use the maximum exposure time, if none is provided
and the column SETTINGS_NORMALIZED_EXPOSURE_TIME was not
set before.
"""
if normalized_time:
data_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME] = normalized_time
elif SETTINGS_NORMALIZED_EXPOSURE_TIME not in data_frame.columns:
normalized_time = data_frame[SETTINGS_EXPOSURE_TIME].max()
data_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME] = normalized_time
for original_col, normalized_col in RAW_DATA_NORMALIZATION_MAP.items():
data_frame[normalized_col] = (
data_frame[original_col] / data_frame[SETTINGS_EXPOSURE_TIME]
) * data_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME]
return data_frame
def create_xdr(
data_frame,
normalized_time=None,
column=RAW_DATA_SPOT_SAT,
limit=2,
):
"""normalize measurement exposures
normalized_time:
if it is None, the max exposure time is used for normalization.
"""
data_frame = blend(data_frame, column, limit)
return normalize_values(data_frame, normalized_time)