Parsing the numerical output from Sensovation SensoSpot image analysis.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

211 lines
7.1 KiB

""" Sensovation Data Parser
Parsing the numerical output from Sensovation image analysis.
"""
import re
from pathlib import Path
from collections import namedtuple
import pandas
from defusedxml import ElementTree
REGEX_WELL = re.compile(
r"""
(?P<row>([A-Z]+)) # row name containing one or more letters
(?P<column>(\d+)) # column, one or more decimals
""",
re.VERBOSE | re.IGNORECASE,
)
COLUMNS_TO_DROP = ["Rect.", "Contour"]
COLUMNS_RENAME_MAP = {
" ID ": "Pos.Id",
"Found": "Spot.Found",
"Dia.": "Spot.Diameter",
}
CACHE_FILE_NAME = "raw_data.h5"
FileInfo = namedtuple("FileInfo", ["row", "column", "exposure"])
ExposureInfo = namedtuple("ExposureInfo", ["channel", "time"])
def _get_cache_table_name():
""" automatic hdf5 table name, avoids a circular import """
from . import __version__
return f"v{__version__}"
def _guess_decimal_separator(file_handle):
""" guesses the decimal spearator of a opened data file """
file_handle.seek(0)
headers = next(file_handle) # noqa: F841
data = next(file_handle)
separator = "," if data.count(",") > data.count(".") else "."
file_handle.seek(0)
return separator
def _parse_csv(data_file):
""" parse a csv sensovation data file """
data_path = Path(data_file)
with data_path.open("r") as handle:
decimal_sep = _guess_decimal_separator(handle)
return pandas.read_csv(handle, sep="\t", decimal=decimal_sep)
def _extract_measurement_info(data_file):
""" extract measurement meta data from a file name """
data_path = Path(data_file)
*rest, well, exposure = data_path.stem.rsplit("_", 2) # noqa: F841
matched = REGEX_WELL.match(well)
if matched is None:
raise ValueError(f"not a valid well: '{well}'")
row = matched["row"].upper()
column = int(matched["column"])
exposure = int(exposure)
return FileInfo(row, column, exposure)
def _cleanup_data_columns(data_frame):
""" renames some data columns for consistency and drops unused columns """
renamed = data_frame.rename(columns=COLUMNS_RENAME_MAP)
return renamed.drop(columns=COLUMNS_TO_DROP)
def parse_file(data_file):
""" parses one data file and adds metadata to result """
measurement_info = _extract_measurement_info(data_file)
data_frame = _parse_csv(data_file)
data_frame["Field.Row"] = measurement_info.row
data_frame["Field.Column"] = measurement_info.column
data_frame["Exposure.Id"] = measurement_info.exposure
return _cleanup_data_columns(data_frame)
def parse_multiple_files(file_list):
""" parses a list of file paths to one combined dataframe """
if not file_list:
raise ValueError("Empty file list provided")
collection = (parse_file(path) for path in file_list)
data_frame = next(collection)
for next_frame in collection:
data_frame = data_frame.append(next_frame, ignore_index=True)
return data_frame
def _list_csv_files(folder):
""" returns all csv files in a folder """
folder_path = Path(folder)
files = (item for item in folder_path.iterdir() if item.is_file())
visible = (item for item in files if not item.stem.startswith("."))
return (item for item in visible if item.suffix.lower() == ".csv")
def _sanity_check(data_frame):
""" checks some basic constrains of a combined data frame """
field_rows = len(data_frame["Field.Row"].unique())
field_cols = len(data_frame["Field.Column"].unique())
exposures = len(data_frame["Exposure.Id"].unique())
spot_positions = len(data_frame["Pos.Id"].unique())
expected_rows = field_rows * field_cols * exposures * spot_positions
if expected_rows != len(data_frame):
raise ValueError("Measurements are missing")
return data_frame
def parse_folder(folder):
""" parses all csv files in a folder to one large dataframe """
file_list = _list_csv_files(folder)
data_frame = parse_multiple_files(file_list)
return data_frame
def _search_channel_info_file(folder):
""" searches for a exposure settings file in a folder """
folder_path = Path(folder)
params_folder = folder_path / "Parameters"
if not params_folder.is_dir():
return None
param_files = list(params_folder.glob("**/*.svexp"))
if len(param_files) == 1:
return param_files[0]
else:
return None
def _parse_channel_info(channel_file):
""" parses the cannel informations from a settings file """
file_path = Path(channel_file)
with file_path.open("r") as file_handle:
tree = ElementTree.parse(file_handle)
result = {}
for child in tree.find("Channels"):
# child.tag == "ChannelConfig1"
exposure = int(child.tag[-1])
channel_description = child.attrib["Description"]
# channel_description == "Cy3/Cy5 Green"
channel = channel_description.rsplit(" ", 1)[-1]
time = int(child.attrib["ExposureTimeMs"])
result[exposure] = ExposureInfo(channel.lower(), time)
return result
def _get_valid_exposure_map(folder, data_frame, exposure_map=None):
""" returns valid exposure information """
available_exposures = set(data_frame["Exposure.Id"].unique())
if exposure_map is None:
params_file = _search_channel_info_file(folder)
if params_file is not None:
exposure_map = _parse_channel_info(params_file)
if exposure_map is not None:
if available_exposures == set(exposure_map.keys()):
return exposure_map
return {c: ExposureInfo(None, None) for c in available_exposures}
def _augment_exposure_map(data_frame, exposure_map):
data_frame["Exposure.Channel"] = ""
data_frame["Exposure.Time"] = 0
for exposure_id, info in exposure_map.items():
channel, time = info
mask = data_frame["Exposure.Id"] == exposure_id
data_frame.loc[mask, "Exposure.Channel"] = channel
data_frame.loc[mask, "Exposure.Time"] = time
return data_frame
def _process_folder(folder, exposures=None):
""" parses all csv files in a folder, adds some checks and more data """
data_frame = parse_folder(folder)
exposures = _get_valid_exposure_map(folder, data_frame, exposures)
data_frame = _augment_exposure_map(data_frame, exposures)
data_frame["Field.Row"] = data_frame["Field.Row"].astype("category")
data_frame["Exposure.Channel"] = data_frame["Exposure.Channel"].astype(
"category"
)
return data_frame
def process_folder(folder, exposures=None, use_cache=True):
""" parses all csv files in a folder, adds some checks and more data """
hdf5_path = folder / CACHE_FILE_NAME
if use_cache:
try:
return pandas.read_hdf(hdf5_path, _get_cache_table_name())
except (FileNotFoundError, KeyError):
# either file or table doesn't exist
pass
data_frame = _process_folder(folder, exposures)
if use_cache:
try:
data_frame.to_hdf(
hdf5_path, _get_cache_table_name(), format="table"
)
except OSError:
# capturing high level OSError
# read only filesystems don't throw a more specific exception
pass
return data_frame