""" Sensospot Data Parser Parsing the numerical output from Sensovations Sensospot image analysis. """ import re from pathlib import Path from collections import namedtuple import pandas from .columns import ( RAW_DATA_POS_ID, META_DATA_WELL_ROW, RAW_DATA_COLUMN_SET, META_DATA_EXPOSURE_ID, META_DATA_WELL_COLUMN, RAW_DATA_COLUMNS_RENAME_MAP, ) from .parameters import add_optional_measurement_parameters REGEX_WELL = re.compile( r""" (?P([A-Z]+)) # row name containing one or more letters (?P(\d+)) # column, one or more decimals """, re.VERBOSE | re.IGNORECASE, ) FileInfo = namedtuple("FileInfo", ["row", "column", "exposure"]) def _guess_decimal_separator(file_handle): """ guesses the decimal spearator of a opened data file """ file_handle.seek(0) headers = next(file_handle) # noqa: F841 data = next(file_handle) separator = "," if data.count(",") > data.count(".") else "." file_handle.seek(0) return separator def _parse_csv(data_file): """ parse a csv sensovation data file """ data_path = Path(data_file) with data_path.open("r") as handle: decimal_sep = _guess_decimal_separator(handle) return pandas.read_csv(handle, sep="\t", decimal=decimal_sep) def _extract_measurement_info(data_file): """ extract measurement meta data from a file name """ data_path = Path(data_file) *rest, well, exposure = data_path.stem.rsplit("_", 2) # noqa: F841 matched = REGEX_WELL.match(well) if matched is None: raise ValueError(f"not a valid well: '{well}'") row = matched["row"].upper() column = int(matched["column"]) exposure = int(exposure) return FileInfo(row, column, exposure) def _cleanup_data_columns(data_frame): """ renames some data columns for consistency and drops unused columns """ renamed = data_frame.rename(columns=RAW_DATA_COLUMNS_RENAME_MAP) surplus_columns = set(renamed.columns) - RAW_DATA_COLUMN_SET return renamed.drop(columns=surplus_columns) def parse_file(data_file): """parses one data file and adds metadata to result will race a ValueError, if metadata could not be extracted """ measurement_info = _extract_measurement_info(Path(data_file)) data_frame = _parse_csv(data_file) data_frame[META_DATA_WELL_ROW] = measurement_info.row data_frame[META_DATA_WELL_COLUMN] = measurement_info.column data_frame[META_DATA_EXPOSURE_ID] = measurement_info.exposure return _cleanup_data_columns(data_frame) def _silenced_parse_file(data_file): """parses one data file and adds metadata returns data frame or None on ValueError """ try: return parse_file(data_file) except ValueError: return None def parse_multiple_files(file_list): """ parses a list of file paths to one combined dataframe """ if not file_list: raise ValueError("Empty file list provided") collection = (_silenced_parse_file(path) for path in file_list) filtered = (frame for frame in collection if frame is not None) data_frame = next(filtered) for next_frame in filtered: data_frame = data_frame.append(next_frame, ignore_index=True) data_frame[META_DATA_WELL_ROW] = data_frame[META_DATA_WELL_ROW].astype( "category" ) return data_frame def list_csv_files(folder): """ returns all csv files in a folder """ folder_path = Path(folder) files = (item for item in folder_path.iterdir() if item.is_file()) visible = (item for item in files if not item.stem.startswith(".")) return (item for item in visible if item.suffix.lower() == ".csv") def _sanity_check(data_frame): """ checks some basic constrains of a combined data frame """ field_rows = len(data_frame[META_DATA_WELL_ROW].unique()) field_cols = len(data_frame[META_DATA_WELL_COLUMN].unique()) exposures = len(data_frame[META_DATA_EXPOSURE_ID].unique()) spot_positions = len(data_frame[RAW_DATA_POS_ID].unique()) expected_rows = field_rows * field_cols * exposures * spot_positions if expected_rows != len(data_frame): raise ValueError("Measurements are missing") return data_frame def parse_folder(folder): """ parses all csv files in a folder to one large dataframe """ file_list = list_csv_files(Path(folder)) data_frame = parse_multiple_files(file_list) data_frame = add_optional_measurement_parameters(data_frame, folder) return _sanity_check(data_frame)