""" Sensospot Data Parser Parsing the numerical output from Sensovations Sensospot image analysis. """ import re from pathlib import Path from collections import namedtuple import pandas from .columns import ( RAW_DATA_POS_ID, META_DATA_WELL_ROW, META_DATA_WELL_NAME, META_DATA_EXPOSURE_ID, META_DATA_WELL_COLUMN, PARSED_DATA_COLUMN_SET, RAW_DATA_NORMALIZATION_MAP, RAW_DATA_COLUMNS_RENAME_MAP, ) from .parameters import add_optional_measurement_parameters REGEX_WELL = re.compile( r""" (?P([A-Z]+)) # row name containing one or more letters (?P(\d+)) # column, one or more decimals """, re.VERBOSE | re.IGNORECASE, ) FileInfo = namedtuple("FileInfo", ["row", "column", "exposure"]) def _guess_decimal_separator(file_handle): """guesses the decimal spearator of a opened data file""" file_handle.seek(0) headers = next(file_handle) # noqa: F841 data = next(file_handle) separator = "," if data.count(",") > data.count(".") else "." file_handle.seek(0) return separator def _parse_csv(data_file): """parse a csv sensovation data file""" data_path = Path(data_file) with data_path.open("r") as handle: decimal_sep = _guess_decimal_separator(handle) return pandas.read_csv(handle, sep="\t", decimal=decimal_sep) def _extract_measurement_info(data_file): """extract measurement meta data from a file name""" data_path = Path(data_file) *rest, well, exposure = data_path.stem.rsplit("_", 2) # noqa: F841 matched = REGEX_WELL.match(well) if matched is None: raise ValueError(f"not a valid well: '{well}'") row = matched["row"].upper() column = int(matched["column"]) exposure = int(exposure) return FileInfo(row, column, exposure) def _cleanup_data_columns(data_frame): """renames some data columns for consistency and drops unused columns""" renamed = data_frame.rename(columns=RAW_DATA_COLUMNS_RENAME_MAP) surplus_columns = set(renamed.columns) - PARSED_DATA_COLUMN_SET return renamed.drop(columns=surplus_columns) def parse_file(data_file): """parses one data file and adds metadata to result will race a ValueError, if metadata could not be extracted """ measurement_info = _extract_measurement_info(Path(data_file)) data_frame = _parse_csv(data_file) # normalized well name data_frame[ META_DATA_WELL_NAME ] = f"{measurement_info.row}{measurement_info.column:02d}" data_frame[META_DATA_WELL_ROW] = measurement_info.row data_frame[META_DATA_WELL_COLUMN] = measurement_info.column data_frame[META_DATA_EXPOSURE_ID] = measurement_info.exposure return _cleanup_data_columns(data_frame) def _silenced_parse_file(data_file): """parses one data file and adds metadata returns data frame or None on ValueError """ try: return parse_file(data_file) except ValueError: return None def parse_multiple_files(file_list): """parses a list of file paths to one combined dataframe""" if not file_list: raise ValueError("Empty file list provided") collection = (_silenced_parse_file(path) for path in file_list) filtered = (frame for frame in collection if frame is not None) data_frame = next(filtered) for next_frame in filtered: data_frame = data_frame.append(next_frame, ignore_index=True) data_frame[META_DATA_WELL_ROW] = data_frame[META_DATA_WELL_ROW].astype( "category" ) return data_frame def list_csv_files(folder): """returns all csv files in a folder""" folder_path = Path(folder) files = (item for item in folder_path.iterdir() if item.is_file()) visible = (item for item in files if not item.stem.startswith(".")) return (item for item in visible if item.suffix.lower() == ".csv") def _sanity_check(data_frame): """checks some basic constrains of a combined data frame""" field_rows = len(data_frame[META_DATA_WELL_ROW].unique()) field_cols = len(data_frame[META_DATA_WELL_COLUMN].unique()) exposures = len(data_frame[META_DATA_EXPOSURE_ID].unique()) spot_positions = len(data_frame[RAW_DATA_POS_ID].unique()) expected_rows = field_rows * field_cols * exposures * spot_positions if expected_rows != len(data_frame): raise ValueError( f"Measurements are missing: {expected_rows} != {len(data_frame)}" ) # set the right data type for measurement columns for raw_column in RAW_DATA_NORMALIZATION_MAP: data_frame[raw_column] = pandas.to_numeric(data_frame[raw_column]) return data_frame def parse_folder(folder, quiet=False): """parses all csv files in a folder to one large dataframe""" file_list = list_csv_files(Path(folder)) data_frame = parse_multiple_files(file_list) data_frame = add_optional_measurement_parameters(data_frame, folder) if quiet: return data_frame return _sanity_check(data_frame)