Parsing the numerical output from Sensovation SensoSpot image analysis.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

164 lines
5.3 KiB

""" Sensospot Data Parser
Parsing the numerical output from Sensovations Sensospot image analysis.
"""
import re
from pathlib import Path
from collections import namedtuple
import pandas
from .columns import (
COL_NAME_POS_ID,
COL_NAME_WELL_ROW,
COL_NAME_SPOT_FOUND,
COL_NAME_EXPOSURE_ID,
COL_NAME_WELL_COLUMN,
COL_NAME_SPOT_DIAMETER,
RAW_DATA_COLUMN_SET
)
from .parameters import add_optional_measurement_parameters
REGEX_WELL = re.compile(
r"""
(?P<row>([A-Z]+)) # row name containing one or more letters
(?P<column>(\d+)) # column, one or more decimals
""",
re.VERBOSE | re.IGNORECASE,
)
COLUMNS_TO_DROP = ["Rect.", "Contour", "Id", "Name", "Foo"]
COLUMNS_RENAME_MAP = {
" ID ": COL_NAME_POS_ID,
"Found": COL_NAME_SPOT_FOUND,
"Dia.": COL_NAME_SPOT_DIAMETER,
}
CACHE_FILE_NAME = "raw_data.h5"
FileInfo = namedtuple("FileInfo", ["row", "column", "exposure"])
def _get_cache_table_name():
""" automatic hdf5 table name, avoids a circular import """
from . import VERSION_TABLE_NAME
return VERSION_TABLE_NAME
def _guess_decimal_separator(file_handle):
""" guesses the decimal spearator of a opened data file """
file_handle.seek(0)
headers = next(file_handle) # noqa: F841
data = next(file_handle)
separator = "," if data.count(",") > data.count(".") else "."
file_handle.seek(0)
return separator
def _parse_csv(data_file):
""" parse a csv sensovation data file """
data_path = Path(data_file)
with data_path.open("r") as handle:
decimal_sep = _guess_decimal_separator(handle)
return pandas.read_csv(handle, sep="\t", decimal=decimal_sep)
def _extract_measurement_info(data_file):
""" extract measurement meta data from a file name """
data_path = Path(data_file)
*rest, well, exposure = data_path.stem.rsplit("_", 2) # noqa: F841
matched = REGEX_WELL.match(well)
if matched is None:
raise ValueError(f"not a valid well: '{well}'")
row = matched["row"].upper()
column = int(matched["column"])
exposure = int(exposure)
return FileInfo(row, column, exposure)
def _cleanup_data_columns(data_frame):
""" renames some data columns for consistency and drops unused columns """
renamed = data_frame.rename(columns=COLUMNS_RENAME_MAP)
surplus_columns = set(renamed.columns) - RAW_DATA_COLUMN_SET
return renamed.drop(columns=surplus_columns)
def parse_file(data_file):
""" parses one data file and adds metadata to result """
try:
measurement_info = _extract_measurement_info(Path(data_file))
except ValueError as e:
return None
data_frame = _parse_csv(data_file)
data_frame[COL_NAME_WELL_ROW] = measurement_info.row
data_frame[COL_NAME_WELL_COLUMN] = measurement_info.column
data_frame[COL_NAME_EXPOSURE_ID] = measurement_info.exposure
return _cleanup_data_columns(data_frame)
def parse_multiple_files(file_list):
""" parses a list of file paths to one combined dataframe """
if not file_list:
raise ValueError("Empty file list provided")
collection = (parse_file(path) for path in file_list)
filtered = (frame for frame in collection if frame is not None)
data_frame = next(filtered)
for next_frame in filtered:
data_frame = data_frame.append(next_frame, ignore_index=True)
data_frame[COL_NAME_WELL_ROW] = data_frame[COL_NAME_WELL_ROW].astype(
"category"
)
return data_frame
def _list_csv_files(folder):
""" returns all csv files in a folder """
folder_path = Path(folder)
files = (item for item in folder_path.iterdir() if item.is_file())
visible = (item for item in files if not item.stem.startswith("."))
return (item for item in visible if item.suffix.lower() == ".csv")
def _sanity_check(data_frame):
""" checks some basic constrains of a combined data frame """
field_rows = len(data_frame[COL_NAME_WELL_ROW].unique())
field_cols = len(data_frame[COL_NAME_WELL_COLUMN].unique())
exposures = len(data_frame[COL_NAME_EXPOSURE_ID].unique())
spot_positions = len(data_frame[COL_NAME_POS_ID].unique())
expected_rows = field_rows * field_cols * exposures * spot_positions
if expected_rows != len(data_frame):
raise ValueError("Measurements are missing")
return data_frame
def parse_folder(folder):
""" parses all csv files in a folder to one large dataframe """
file_list = _list_csv_files(Path(folder))
data_frame = parse_multiple_files(file_list)
data_frame = add_optional_measurement_parameters(data_frame, folder)
return _sanity_check(data_frame)
def process_folder(folder, use_cache=True):
""" parses all csv files in a folder, adds some checks and more data """
hdf5_path = Path(folder) / CACHE_FILE_NAME
if use_cache:
try:
return pandas.read_hdf(hdf5_path, _get_cache_table_name())
except (FileNotFoundError, KeyError):
# either file or table doesn't exist
pass
data_frame = parse_folder(folder)
if use_cache:
try:
data_frame.to_hdf(
hdf5_path, _get_cache_table_name(), format="table"
)
except OSError:
# capturing high level OSError
# read only filesystems don't throw a more specific exception
pass
return data_frame