""" Sensospot Data Parser Parsing the numerical output from Sensovations Sensospot image analysis. """ import re from pathlib import Path from collections import namedtuple import pandas from .columns import ( COL_NAME_POS_ID, COL_NAME_WELL_ROW, COL_NAME_SPOT_FOUND, COL_NAME_EXPOSURE_ID, COL_NAME_WELL_COLUMN, COL_NAME_SPOT_DIAMETER, ) from .parameters import add_optional_measurement_parameters REGEX_WELL = re.compile( r""" (?P([A-Z]+)) # row name containing one or more letters (?P(\d+)) # column, one or more decimals """, re.VERBOSE | re.IGNORECASE, ) COLUMNS_TO_DROP = ["Rect.", "Contour"] COLUMNS_RENAME_MAP = { " ID ": COL_NAME_POS_ID, "Found": COL_NAME_SPOT_FOUND, "Dia.": COL_NAME_SPOT_DIAMETER, } CACHE_FILE_NAME = "raw_data.h5" FileInfo = namedtuple("FileInfo", ["row", "column", "exposure"]) def _get_cache_table_name(): """ automatic hdf5 table name, avoids a circular import """ from . import VERSION_TABLE_NAME return VERSION_TABLE_NAME def _guess_decimal_separator(file_handle): """ guesses the decimal spearator of a opened data file """ file_handle.seek(0) headers = next(file_handle) # noqa: F841 data = next(file_handle) separator = "," if data.count(",") > data.count(".") else "." file_handle.seek(0) return separator def _parse_csv(data_file): """ parse a csv sensovation data file """ data_path = Path(data_file) with data_path.open("r") as handle: decimal_sep = _guess_decimal_separator(handle) return pandas.read_csv(handle, sep="\t", decimal=decimal_sep) def _extract_measurement_info(data_file): """ extract measurement meta data from a file name """ data_path = Path(data_file) *rest, well, exposure = data_path.stem.rsplit("_", 2) # noqa: F841 matched = REGEX_WELL.match(well) if matched is None: raise ValueError(f"not a valid well: '{well}'") row = matched["row"].upper() column = int(matched["column"]) exposure = int(exposure) return FileInfo(row, column, exposure) def _cleanup_data_columns(data_frame): """ renames some data columns for consistency and drops unused columns """ renamed = data_frame.rename(columns=COLUMNS_RENAME_MAP) return renamed.drop(columns=COLUMNS_TO_DROP) def parse_file(data_file): """ parses one data file and adds metadata to result """ measurement_info = _extract_measurement_info(Path(data_file)) data_frame = _parse_csv(data_file) data_frame[COL_NAME_WELL_ROW] = measurement_info.row data_frame[COL_NAME_WELL_COLUMN] = measurement_info.column data_frame[COL_NAME_EXPOSURE_ID] = measurement_info.exposure return _cleanup_data_columns(data_frame) def parse_multiple_files(file_list): """ parses a list of file paths to one combined dataframe """ if not file_list: raise ValueError("Empty file list provided") collection = (parse_file(path) for path in file_list) data_frame = next(collection) for next_frame in collection: data_frame = data_frame.append(next_frame, ignore_index=True) data_frame[COL_NAME_WELL_ROW] = data_frame[COL_NAME_WELL_ROW].astype( "category" ) return data_frame def _list_csv_files(folder): """ returns all csv files in a folder """ folder_path = Path(folder) files = (item for item in folder_path.iterdir() if item.is_file()) visible = (item for item in files if not item.stem.startswith(".")) return (item for item in visible if item.suffix.lower() == ".csv") def _sanity_check(data_frame): """ checks some basic constrains of a combined data frame """ field_rows = len(data_frame[COL_NAME_WELL_ROW].unique()) field_cols = len(data_frame[COL_NAME_WELL_COLUMN].unique()) exposures = len(data_frame[COL_NAME_EXPOSURE_ID].unique()) spot_positions = len(data_frame[COL_NAME_POS_ID].unique()) expected_rows = field_rows * field_cols * exposures * spot_positions if expected_rows != len(data_frame): raise ValueError("Measurements are missing") return data_frame def parse_folder(folder): """ parses all csv files in a folder to one large dataframe """ file_list = _list_csv_files(Path(folder)) data_frame = parse_multiple_files(file_list) data_frame = add_optional_measurement_parameters(data_frame, folder) return _sanity_check(data_frame) def process_folder(folder, use_cache=True): """ parses all csv files in a folder, adds some checks and more data """ hdf5_path = Path(folder) / CACHE_FILE_NAME if use_cache: try: return pandas.read_hdf(hdf5_path, _get_cache_table_name()) except (FileNotFoundError, KeyError): # either file or table doesn't exist pass data_frame = parse_folder(folder) if use_cache: try: data_frame.to_hdf( hdf5_path, _get_cache_table_name(), format="table" ) except OSError: # capturing high level OSError # read only filesystems don't throw a more specific exception pass return data_frame