From 4f46f8dc080db6638debea49e7db9b2b30cf77dc Mon Sep 17 00:00:00 2001 From: kienle <kienle@passcal.nmt.edu> Date: Wed, 27 Sep 2023 17:26:09 -0600 Subject: [PATCH] Documentation and type hint --- .../reftek_reader/log_file_reader.py | 100 ++++++++++++++---- 1 file changed, 80 insertions(+), 20 deletions(-) diff --git a/sohstationviewer/model/reftek_data/reftek_reader/log_file_reader.py b/sohstationviewer/model/reftek_data/reftek_reader/log_file_reader.py index 5da9ae02c..47d7f56e3 100644 --- a/sohstationviewer/model/reftek_data/reftek_reader/log_file_reader.py +++ b/sohstationviewer/model/reftek_data/reftek_reader/log_file_reader.py @@ -2,7 +2,9 @@ from pathlib import Path from typing import List, Literal, Optional, Dict, Callable, Tuple LogFileFormat = Literal['rt2ms', 'logpeek', 'sohstationviewer'] -PACKETS = ['SH', 'SC', 'OM', 'DS', 'AD', 'CD', 'FD', 'EH', 'ET'] +# These packets can be found in section 4 of the RT130 record documentation. +RT130_PACKETS = ['SH', 'SC', 'OM', 'DS', 'AD', 'CD', 'FD', 'EH', 'ET'] +SeparatedPacketLines = Tuple[List[str], List[str], List[str]] def detect_log_file_packet_format(packet: List[str]) -> LogFileFormat: @@ -28,7 +30,7 @@ def detect_log_file_packet_format(packet: List[str]) -> LogFileFormat: # The first line of a packet in a log file generated by rt2ms starts with # a 2-letter packet type. That is then followed by an empty space and the # string 'exp'. - if packet_start[:2] in PACKETS and packet_start[3:6] == 'exp': + if packet_start[:2] in RT130_PACKETS and packet_start[3:6] == 'exp': return 'rt2ms' # SOHStationViewer stores all events' info at the end of its log file, and @@ -43,7 +45,8 @@ def detect_log_file_packet_format(packet: List[str]) -> LogFileFormat: if packet_start.startswith('LPMP'): return 'sohstationviewer' - # Logpeek write its events' info right after an SH packet. + # Logpeek write its events' info and mass-position data right after an SH + # packet. packet_end = packet[-1] packet_end_with_event_info = (packet_end.startswith('DAS') or packet_end.startswith('WARNING')) @@ -53,14 +56,29 @@ def detect_log_file_packet_format(packet: List[str]) -> LogFileFormat: return 'logpeek' -def parse_soh_packet_no_type(packet: List[str]): +def parse_log_packet_unknown_format(packet: List[str]) -> SeparatedPacketLines: + """ + Parse a log packet assuming that the format of the log file is unknown. In + this case, all the lines in the packet are SOH lines. + :param packet: list of lines in the packet + :return: the lists of event lines, SOH lines, and mass-position lines in + packet + """ eh_et_lines = [] soh_lines = packet masspos_lines = [] return eh_et_lines, soh_lines, masspos_lines -def parse_soh_packet_logpeek(packet: List[str]): +def parse_log_packet_logpeek(packet: List[str]) -> SeparatedPacketLines: + """ + Parse a log packet assuming that the log file comes from logpeek. In this + case, a packet can be composed of SOH lines, event info lines, and + mass-position lines. + :param packet: list of lines in the packet + :return: the lists of event lines, SOH lines, and mass-position lines in + packet + """ eh_et_lines = [] soh_lines = [] masspos_lines = [] @@ -75,7 +93,15 @@ def parse_soh_packet_logpeek(packet: List[str]): return eh_et_lines, soh_lines, masspos_lines -def parse_soh_packet_rt2ms(packet: List[str]): +def parse_log_packet_rt2ms(packet: List[str]) -> SeparatedPacketLines: + """ + Parse a log packet assuming that the log file comes from rt2ms. In this + case, SOH data and event info are stored in separate packets. The first + line of a packet is a header that contains some metadata. + :param packet: list of lines in the packet + :return: the lists of event lines, SOH lines, and mass-position lines in + packet + """ eh_et_lines = [] soh_lines = [] masspos_lines = [] @@ -83,12 +109,22 @@ def parse_soh_packet_rt2ms(packet: List[str]): # The event info is summarized in the last line of an event info packet eh_et_lines = [packet[-1]] else: + # The header is not counted as an SOH line. soh_lines = packet[1:] return eh_et_lines, soh_lines, masspos_lines -def parse_soh_packet_sohstationviewer(packet: List[str]): +def parse_log_packet_sohstationviewer(packet: List[str] + ) -> SeparatedPacketLines: + """ + Parse a log packet assuming that the log file comes from sohstationviewer. + In this case, the file is composed mainly of SOH packets, with the event + info lines being written at the end of the file. + :param packet: list of lines in the packet + :return: the lists of event lines, SOH lines, and mass-position lines in + packet + """ eh_et_lines = [] soh_lines = [] masspos_lines = [] @@ -99,8 +135,10 @@ def parse_soh_packet_sohstationviewer(packet: List[str]): return eh_et_lines, soh_lines, masspos_lines - class LogFile: + """ + Iterator over a log file. + """ def __init__(self, file_path: Path): self.file_path = file_path self.file = open(file_path) @@ -113,6 +151,8 @@ class LogFile: if line == '': self.file.close() raise StopIteration + # The log packets are separated by empty lines, so we know we have + # reached the next packet when we find a non-empty line. while line == '\n': line = self.file.readline() packet = [] @@ -131,25 +171,38 @@ class LogFile: return packet def __del__(self): + """ + Close the file handle when this iterator is garbage collected just to + be absolutely sure that no memory is leaked. + """ self.file.close() -Parser = Callable[[List[str]], Tuple[List[str], List[str], List[str]]] +Parser = Callable[[List[str]], SeparatedPacketLines] +# Mapping each log packet type to its corresponding parser. PACKET_PARSERS: Dict[Optional[LogFileFormat], Parser] = { - None: parse_soh_packet_no_type, - 'sohstationviewer': parse_soh_packet_sohstationviewer, - 'rt2ms': parse_soh_packet_rt2ms, - 'logpeek': parse_soh_packet_logpeek + None: parse_log_packet_unknown_format, + 'sohstationviewer': parse_log_packet_sohstationviewer, + 'rt2ms': parse_log_packet_rt2ms, + 'logpeek': parse_log_packet_logpeek } def get_experiment_number(soh_lines: List[str]): + """ + Get the experiment number from the list of SOH lines in a packet. + + :param soh_lines: the list of SOH lines from a packet + :return: the experiment number if the packet has the correct format, None + otherwise + """ + # The experiment number only exists in SC packets. if not soh_lines[0].startswith('Station Channel Definition'): return None - # The experiment number can be in either the first or second line after - # the header. These lines are indented, so we have to strip them of - # whitespace. + # The experiment number can be in the first (rt2ms) or second (logpeek, + # SOHStationViewer) line after the header. These lines are indented, so we + # have to strip them of whitespace. if soh_lines[1].strip().startswith('Experiment Number ='): experiment_number_line = soh_lines[1].split() elif soh_lines[2].strip().startswith('Experiment Number ='): @@ -173,13 +226,16 @@ class LogFileReader: def __init__(self, file_path: Path): self.file_path = file_path self.log_file_type: Optional[LogFileFormat] = None - self.eh_et_lines = [] - self.soh_lines = [] - self.masspos_lines = [] + self.eh_et_lines: List[str] = [] + self.soh_lines: List[str] = [] + self.masspos_lines: List[str] = [] self.station_code: Optional[str] = None self.experiment_number: Optional[str] = None - def read(self): + def read(self) -> None: + """ + Read the log file. + """ log_file = LogFile(self.file_path) for packet in log_file: if self.log_file_type is None: @@ -188,12 +244,16 @@ class LogFileReader: parser = PACKET_PARSERS[self.log_file_type] eh_et_lines, soh_lines, masspos_lines = parser(packet) if self.station_code is None and soh_lines: + # All header lines contain the station code at the end. self.station_code = soh_lines[0].split(' ')[-1].strip() if self.experiment_number is None and soh_lines: found_experiment_number = get_experiment_number(soh_lines) self.experiment_number = found_experiment_number self.eh_et_lines.extend(eh_et_lines) self.soh_lines.extend(soh_lines) + # We need to add a new line between two blocks of SOH lines to + # separate them. This makes it so that we don't have to manually + # separate the SOH lines blocks during processing. self.soh_lines.append('\n') self.masspos_lines.extend(masspos_lines) -- GitLab