Skip to content
Snippets Groups Projects
Commit 4f46f8dc authored by Kien Le's avatar Kien Le
Browse files

Documentation and type hint

parent 60c896df
No related branches found
No related tags found
No related merge requests found
...@@ -2,7 +2,9 @@ from pathlib import Path ...@@ -2,7 +2,9 @@ from pathlib import Path
from typing import List, Literal, Optional, Dict, Callable, Tuple from typing import List, Literal, Optional, Dict, Callable, Tuple
LogFileFormat = Literal['rt2ms', 'logpeek', 'sohstationviewer'] LogFileFormat = Literal['rt2ms', 'logpeek', 'sohstationviewer']
PACKETS = ['SH', 'SC', 'OM', 'DS', 'AD', 'CD', 'FD', 'EH', 'ET'] # These packets can be found in section 4 of the RT130 record documentation.
RT130_PACKETS = ['SH', 'SC', 'OM', 'DS', 'AD', 'CD', 'FD', 'EH', 'ET']
SeparatedPacketLines = Tuple[List[str], List[str], List[str]]
def detect_log_file_packet_format(packet: List[str]) -> LogFileFormat: def detect_log_file_packet_format(packet: List[str]) -> LogFileFormat:
...@@ -28,7 +30,7 @@ def detect_log_file_packet_format(packet: List[str]) -> LogFileFormat: ...@@ -28,7 +30,7 @@ def detect_log_file_packet_format(packet: List[str]) -> LogFileFormat:
# The first line of a packet in a log file generated by rt2ms starts with # The first line of a packet in a log file generated by rt2ms starts with
# a 2-letter packet type. That is then followed by an empty space and the # a 2-letter packet type. That is then followed by an empty space and the
# string 'exp'. # string 'exp'.
if packet_start[:2] in PACKETS and packet_start[3:6] == 'exp': if packet_start[:2] in RT130_PACKETS and packet_start[3:6] == 'exp':
return 'rt2ms' return 'rt2ms'
# SOHStationViewer stores all events' info at the end of its log file, and # SOHStationViewer stores all events' info at the end of its log file, and
...@@ -43,7 +45,8 @@ def detect_log_file_packet_format(packet: List[str]) -> LogFileFormat: ...@@ -43,7 +45,8 @@ def detect_log_file_packet_format(packet: List[str]) -> LogFileFormat:
if packet_start.startswith('LPMP'): if packet_start.startswith('LPMP'):
return 'sohstationviewer' return 'sohstationviewer'
# Logpeek write its events' info right after an SH packet. # Logpeek write its events' info and mass-position data right after an SH
# packet.
packet_end = packet[-1] packet_end = packet[-1]
packet_end_with_event_info = (packet_end.startswith('DAS') or packet_end_with_event_info = (packet_end.startswith('DAS') or
packet_end.startswith('WARNING')) packet_end.startswith('WARNING'))
...@@ -53,14 +56,29 @@ def detect_log_file_packet_format(packet: List[str]) -> LogFileFormat: ...@@ -53,14 +56,29 @@ def detect_log_file_packet_format(packet: List[str]) -> LogFileFormat:
return 'logpeek' return 'logpeek'
def parse_soh_packet_no_type(packet: List[str]): def parse_log_packet_unknown_format(packet: List[str]) -> SeparatedPacketLines:
"""
Parse a log packet assuming that the format of the log file is unknown. In
this case, all the lines in the packet are SOH lines.
:param packet: list of lines in the packet
:return: the lists of event lines, SOH lines, and mass-position lines in
packet
"""
eh_et_lines = [] eh_et_lines = []
soh_lines = packet soh_lines = packet
masspos_lines = [] masspos_lines = []
return eh_et_lines, soh_lines, masspos_lines return eh_et_lines, soh_lines, masspos_lines
def parse_soh_packet_logpeek(packet: List[str]): def parse_log_packet_logpeek(packet: List[str]) -> SeparatedPacketLines:
"""
Parse a log packet assuming that the log file comes from logpeek. In this
case, a packet can be composed of SOH lines, event info lines, and
mass-position lines.
:param packet: list of lines in the packet
:return: the lists of event lines, SOH lines, and mass-position lines in
packet
"""
eh_et_lines = [] eh_et_lines = []
soh_lines = [] soh_lines = []
masspos_lines = [] masspos_lines = []
...@@ -75,7 +93,15 @@ def parse_soh_packet_logpeek(packet: List[str]): ...@@ -75,7 +93,15 @@ def parse_soh_packet_logpeek(packet: List[str]):
return eh_et_lines, soh_lines, masspos_lines return eh_et_lines, soh_lines, masspos_lines
def parse_soh_packet_rt2ms(packet: List[str]): def parse_log_packet_rt2ms(packet: List[str]) -> SeparatedPacketLines:
"""
Parse a log packet assuming that the log file comes from rt2ms. In this
case, SOH data and event info are stored in separate packets. The first
line of a packet is a header that contains some metadata.
:param packet: list of lines in the packet
:return: the lists of event lines, SOH lines, and mass-position lines in
packet
"""
eh_et_lines = [] eh_et_lines = []
soh_lines = [] soh_lines = []
masspos_lines = [] masspos_lines = []
...@@ -83,12 +109,22 @@ def parse_soh_packet_rt2ms(packet: List[str]): ...@@ -83,12 +109,22 @@ def parse_soh_packet_rt2ms(packet: List[str]):
# The event info is summarized in the last line of an event info packet # The event info is summarized in the last line of an event info packet
eh_et_lines = [packet[-1]] eh_et_lines = [packet[-1]]
else: else:
# The header is not counted as an SOH line.
soh_lines = packet[1:] soh_lines = packet[1:]
return eh_et_lines, soh_lines, masspos_lines return eh_et_lines, soh_lines, masspos_lines
def parse_soh_packet_sohstationviewer(packet: List[str]): def parse_log_packet_sohstationviewer(packet: List[str]
) -> SeparatedPacketLines:
"""
Parse a log packet assuming that the log file comes from sohstationviewer.
In this case, the file is composed mainly of SOH packets, with the event
info lines being written at the end of the file.
:param packet: list of lines in the packet
:return: the lists of event lines, SOH lines, and mass-position lines in
packet
"""
eh_et_lines = [] eh_et_lines = []
soh_lines = [] soh_lines = []
masspos_lines = [] masspos_lines = []
...@@ -99,8 +135,10 @@ def parse_soh_packet_sohstationviewer(packet: List[str]): ...@@ -99,8 +135,10 @@ def parse_soh_packet_sohstationviewer(packet: List[str]):
return eh_et_lines, soh_lines, masspos_lines return eh_et_lines, soh_lines, masspos_lines
class LogFile: class LogFile:
"""
Iterator over a log file.
"""
def __init__(self, file_path: Path): def __init__(self, file_path: Path):
self.file_path = file_path self.file_path = file_path
self.file = open(file_path) self.file = open(file_path)
...@@ -113,6 +151,8 @@ class LogFile: ...@@ -113,6 +151,8 @@ class LogFile:
if line == '': if line == '':
self.file.close() self.file.close()
raise StopIteration raise StopIteration
# The log packets are separated by empty lines, so we know we have
# reached the next packet when we find a non-empty line.
while line == '\n': while line == '\n':
line = self.file.readline() line = self.file.readline()
packet = [] packet = []
...@@ -131,25 +171,38 @@ class LogFile: ...@@ -131,25 +171,38 @@ class LogFile:
return packet return packet
def __del__(self): def __del__(self):
"""
Close the file handle when this iterator is garbage collected just to
be absolutely sure that no memory is leaked.
"""
self.file.close() self.file.close()
Parser = Callable[[List[str]], Tuple[List[str], List[str], List[str]]] Parser = Callable[[List[str]], SeparatedPacketLines]
# Mapping each log packet type to its corresponding parser.
PACKET_PARSERS: Dict[Optional[LogFileFormat], Parser] = { PACKET_PARSERS: Dict[Optional[LogFileFormat], Parser] = {
None: parse_soh_packet_no_type, None: parse_log_packet_unknown_format,
'sohstationviewer': parse_soh_packet_sohstationviewer, 'sohstationviewer': parse_log_packet_sohstationviewer,
'rt2ms': parse_soh_packet_rt2ms, 'rt2ms': parse_log_packet_rt2ms,
'logpeek': parse_soh_packet_logpeek 'logpeek': parse_log_packet_logpeek
} }
def get_experiment_number(soh_lines: List[str]): def get_experiment_number(soh_lines: List[str]):
"""
Get the experiment number from the list of SOH lines in a packet.
:param soh_lines: the list of SOH lines from a packet
:return: the experiment number if the packet has the correct format, None
otherwise
"""
# The experiment number only exists in SC packets.
if not soh_lines[0].startswith('Station Channel Definition'): if not soh_lines[0].startswith('Station Channel Definition'):
return None return None
# The experiment number can be in either the first or second line after # The experiment number can be in the first (rt2ms) or second (logpeek,
# the header. These lines are indented, so we have to strip them of # SOHStationViewer) line after the header. These lines are indented, so we
# whitespace. # have to strip them of whitespace.
if soh_lines[1].strip().startswith('Experiment Number ='): if soh_lines[1].strip().startswith('Experiment Number ='):
experiment_number_line = soh_lines[1].split() experiment_number_line = soh_lines[1].split()
elif soh_lines[2].strip().startswith('Experiment Number ='): elif soh_lines[2].strip().startswith('Experiment Number ='):
...@@ -173,13 +226,16 @@ class LogFileReader: ...@@ -173,13 +226,16 @@ class LogFileReader:
def __init__(self, file_path: Path): def __init__(self, file_path: Path):
self.file_path = file_path self.file_path = file_path
self.log_file_type: Optional[LogFileFormat] = None self.log_file_type: Optional[LogFileFormat] = None
self.eh_et_lines = [] self.eh_et_lines: List[str] = []
self.soh_lines = [] self.soh_lines: List[str] = []
self.masspos_lines = [] self.masspos_lines: List[str] = []
self.station_code: Optional[str] = None self.station_code: Optional[str] = None
self.experiment_number: Optional[str] = None self.experiment_number: Optional[str] = None
def read(self): def read(self) -> None:
"""
Read the log file.
"""
log_file = LogFile(self.file_path) log_file = LogFile(self.file_path)
for packet in log_file: for packet in log_file:
if self.log_file_type is None: if self.log_file_type is None:
...@@ -188,12 +244,16 @@ class LogFileReader: ...@@ -188,12 +244,16 @@ class LogFileReader:
parser = PACKET_PARSERS[self.log_file_type] parser = PACKET_PARSERS[self.log_file_type]
eh_et_lines, soh_lines, masspos_lines = parser(packet) eh_et_lines, soh_lines, masspos_lines = parser(packet)
if self.station_code is None and soh_lines: if self.station_code is None and soh_lines:
# All header lines contain the station code at the end.
self.station_code = soh_lines[0].split(' ')[-1].strip() self.station_code = soh_lines[0].split(' ')[-1].strip()
if self.experiment_number is None and soh_lines: if self.experiment_number is None and soh_lines:
found_experiment_number = get_experiment_number(soh_lines) found_experiment_number = get_experiment_number(soh_lines)
self.experiment_number = found_experiment_number self.experiment_number = found_experiment_number
self.eh_et_lines.extend(eh_et_lines) self.eh_et_lines.extend(eh_et_lines)
self.soh_lines.extend(soh_lines) self.soh_lines.extend(soh_lines)
# We need to add a new line between two blocks of SOH lines to
# separate them. This makes it so that we don't have to manually
# separate the SOH lines blocks during processing.
self.soh_lines.append('\n') self.soh_lines.append('\n')
self.masspos_lines.extend(masspos_lines) self.masspos_lines.extend(masspos_lines)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment