From 98bfb906148f37088a02c177bf593d565b727bfe Mon Sep 17 00:00:00 2001 From: kienle <kienle@passcal.nmt.edu> Date: Mon, 25 Sep 2023 22:31:51 -0600 Subject: [PATCH] Detect log file format --- .../reftek_reader/log_file_reader.py | 52 +++++++++++++++++-- 1 file changed, 48 insertions(+), 4 deletions(-) diff --git a/sohstationviewer/model/reftek_data/reftek_reader/log_file_reader.py b/sohstationviewer/model/reftek_data/reftek_reader/log_file_reader.py index 83aaf4ab7..d5ea03a83 100644 --- a/sohstationviewer/model/reftek_data/reftek_reader/log_file_reader.py +++ b/sohstationviewer/model/reftek_data/reftek_reader/log_file_reader.py @@ -1,9 +1,51 @@ from pathlib import Path -from typing import List +from typing import List, Literal, Optional +LogFileFormat = Literal['rt2ms', 'logpeek', 'sohstationviewer'] +PACKETS = ['SH', 'SC', 'OM', 'DS', 'AD', 'CD', 'FD', 'EH', 'ET'] + + +def detect_log_file_packet_format(packet: List[str]) -> LogFileFormat: + """ + Detect the format of a log file packet. The format can be either rt2ms', + logpeek's, or SOHStationViewer's. + + :param packet: a packet extracted from a log file. + :return: the format of packet. Can be either 'rt2ms', 'logpeek', or + 'sohstationviewer'. + """ + # We want to take advantage of the metadata written by the various programs + # as much as possible. + if packet[0].startswith('logpeek'): + return 'logpeek' + elif packet[0].startswith('rt2ms'): + return 'rt2ms' + elif packet[0].startswith('sohstationviewer'): + return 'sohstationviewer' + + packet_start = packet[0] + + # The first line of a packet in a log file generated by rt2ms starts with + # a 2-letter packet type. That is then followed by an empty space and the + # string 'exp'. + if packet_start[:2] in PACKETS and packet_start[3:6] == 'exp': + return 'rt2ms' + + # SOHStationViewer stores all events' info at the end of its log file, and + # so if we see the line + # Events: + # at the start of a packet, we know the log file is from SOHStationViewer. + if packet_start.startswith('Events:'): + return 'sohstationviewer' + + # Logpeek write its events' info right after an SH packet. + packet_end = packet[-1] + packet_end_with_event_info = (packet_end.startswith('DAS') or + packet_end.startswith('WARNING')) + if (packet_start.startswith('State of Health') and + packet_end_with_event_info): + return 'logpeek' -def detect_log_file_packet_format(packet: List[str]): - pass def read_soh_packet_base(packet: List[str]): pass @@ -51,11 +93,13 @@ class LogFileReader: def __init__(self, file_path: Path): self.file_path = file_path self.packet_reader = None - self.log_file_type: LogFileFormat + self.log_file_type: Optional[LogFileFormat] = None def read(self): log_file = LogFile(self.file_path) for packet in log_file: + if self.log_file_type is None: + self.log_file_type = detect_log_file_packet_format(packet) pass -- GitLab