Parsing the numerical output from Sensovation SensoSpot image analysis.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

206 lines
7.0 KiB

""" Sensovation Data Parser
Parsing the numerical output from Sensovation image analysis.
"""
__version__ = "0.0.1"
import re
from pathlib import Path
from collections import namedtuple
import pandas
from defusedxml import ElementTree
REGEX_WELL = re.compile(
r"""
(?P<row>([A-Z]+)) # row name containing one or more letters
(?P<column>(\d+)) # column, one or more decimals
""",
re.VERBOSE | re.IGNORECASE,
)
COLUMNS_TO_DROP = ["Rect.", "Contour"]
COLUMNS_RENAME_MAP = {
" ID ": "Pos.Id",
"Found": "Spot.Found",
"Dia.": "Spot.Diameter",
}
CACHE_FILE_NAME = "cached_data.h5"
CACHE_TABLE_NAME = f"raw_data_v{__version__}"
FileInfo = namedtuple("FileInfo", ["row", "column", "exposure"])
ExposureInfo = namedtuple("ExposureInfo", ["channel", "time"])
def _guess_decimal_separator(file_handle):
""" guesses the decimal spearator of a opened data file """
file_handle.seek(0)
headers = next(file_handle) # noqa: F841
data = next(file_handle)
separator = "," if data.count(",") > data.count(".") else "."
file_handle.seek(0)
return separator
def _parse_csv(data_file):
""" parse a csv sensovation data file """
data_path = Path(data_file)
with data_path.open("r") as handle:
decimal_sep = _guess_decimal_separator(handle)
return pandas.read_csv(handle, sep="\t", decimal=decimal_sep)
def _extract_measurement_info(data_file):
""" extract measurement meta data from a file name """
data_path = Path(data_file)
*rest, well, exposure = data_path.stem.rsplit("_", 2) # noqa: F841
matched = REGEX_WELL.match(well)
if matched is None:
raise ValueError(f"not a valid well: '{well}'")
row = matched["row"].upper()
column = int(matched["column"])
exposure = int(exposure)
return FileInfo(row, column, exposure)
def _cleanup_data_columns(data_frame):
""" renames some data columns for consistency and drops unused columns """
renamed = data_frame.rename(columns=COLUMNS_RENAME_MAP)
return renamed.drop(columns=COLUMNS_TO_DROP)
def parse_file(data_file):
""" parses one data file and adds metadata to result """
measurement_info = _extract_measurement_info(data_file)
data_frame = _parse_csv(data_file)
data_frame["Field.Row"] = measurement_info.row
data_frame["Field.Column"] = measurement_info.column
data_frame["Exposure.Id"] = measurement_info.exposure
return _cleanup_data_columns(data_frame)
def parse_multiple_files(file_list):
""" parses a list of file paths to one combined dataframe """
if not file_list:
raise ValueError("Empty file list provided")
collection = (parse_file(path) for path in file_list)
data_frame = next(collection)
for next_frame in collection:
data_frame = data_frame.append(next_frame, ignore_index=True)
return data_frame
def _list_csv_files(folder):
""" returns all csv files in a folder """
folder_path = Path(folder)
files = (item for item in folder_path.iterdir() if item.is_file())
visible = (item for item in files if not item.stem.startswith("."))
return (item for item in visible if item.suffix.lower() == ".csv")
def _sanity_check(data_frame):
""" checks some basic constrains of a combined data frame """
field_rows = len(data_frame["Field.Row"].unique())
field_cols = len(data_frame["Field.Column"].unique())
exposures = len(data_frame["Exposure.Id"].unique())
spot_positions = len(data_frame["Pos.Id"].unique())
expected_rows = field_rows * field_cols * exposures * spot_positions
if expected_rows != len(data_frame):
raise ValueError("Measurements are missing")
return data_frame
def parse_folder(folder):
""" parses all csv files in a folder to one large dataframe """
file_list = _list_csv_files(folder)
data_frame = parse_multiple_files(file_list)
return data_frame
def _search_channel_info_file(folder):
""" searches for a exposure settings file in a folder """
folder_path = Path(folder)
params_folder = folder_path / "Parameters"
if not params_folder.is_dir():
return None
param_files = list(params_folder.glob("**/*.svexp"))
if len(param_files) == 1:
return param_files[0]
else:
return None
def _parse_channel_info(channel_file):
""" parses the cannel informations from a settings file """
file_path = Path(channel_file)
with file_path.open("r") as file_handle:
tree = ElementTree.parse(file_handle)
result = {}
for child in tree.find("Channels"):
# child.tag == "ChannelConfig1"
exposure = int(child.tag[-1])
channel_description = child.attrib["Description"]
# channel_description == "Cy3/Cy5 Green"
channel = channel_description.rsplit(" ", 1)[-1]
time = int(child.attrib["ExposureTimeMs"])
result[exposure] = ExposureInfo(channel.lower(), time)
return result
def _get_valid_exposure_info(folder, data_frame, exposure_info=None):
""" returns valid exposure information """
available_exposures = set(data_frame["Exposure.Id"].unique())
if exposure_info is None:
params_file = _search_channel_info_file(folder)
if params_file is not None:
exposure_info = _parse_channel_info(params_file)
if exposure_info is not None:
if available_exposures == set(exposure_info.keys()):
return exposure_info
return {c: ExposureInfo(None, None) for c in available_exposures}
def _augment_exposure_info(data_frame, exposure_info):
data_frame["Exposure.Channel"] = ""
data_frame["Exposure.Time"] = 0
for exposure_id, info in exposure_info.items():
mask = data_frame["Exposure.Id"] == exposure_id
data_frame.loc[mask, "Exposure.Channel"] = info.channel
data_frame.loc[mask, "Exposure.Time"] = info.time
return data_frame
def _process_folder(folder, exposures=None):
""" parses all csv files in a folder, adds some checks and more data """
data_frame = parse_folder(folder)
exposures = _get_valid_exposure_info(folder, data_frame, exposures)
data_frame = _augment_exposure_info(data_frame, exposures)
data_frame["Field.Row"] = data_frame["Field.Row"].astype("category")
data_frame["Exposure.Channel"] = data_frame["Exposure.Channel"].astype(
"category"
)
return data_frame
def process_folder(folder, exposures=None, use_cache=True):
""" parses all csv files in a folder, adds some checks and more data """
hdf5_path = folder / CACHE_FILE_NAME
if use_cache:
try:
return pandas.read_hdf(hdf5_path, CACHE_TABLE_NAME)
except (FileNotFoundError, KeyError):
# either file or table doesn't exist
pass
data_frame = _process_folder(folder, exposures)
if use_cache:
try:
data_frame.to_hdf(hdf5_path, CACHE_TABLE_NAME, format="table")
except OSError:
# capturing high level OSError
# read only filesystems don't throw a more specific exception
pass
return data_frame