Skip to content
Snippets Groups Projects

Read log files generated by users

Merged Kien Le requested to merge feature-#94-read_log_files into master
1 file
+ 19
2
Compare changes
  • Side-by-side
  • Inline
from pathlib import Path
from typing import List, Literal, Optional, Dict, Callable, Tuple
import numpy as np
from obspy import UTCDateTime
# The possible formats for a log file.
LogFileFormat = Literal['rt2ms', 'logpeek', 'sohstationviewer']
# These packets can be found in section 4 of the RT130 record documentation.
RT130_PACKETS = ['SH', 'SC', 'OM', 'DS', 'AD', 'CD', 'FD', 'EH', 'ET']
# The lists of event lines, SOH lines, and mass-position lines in a packet.
SeparatedPacketLines = Tuple[List[str], List[str], List[str]]
    • Maintainer

      Add comment describe the format of this variable as what you described for it in the function's docstring, "the lists of event lines, SOH lines, and mass-position lines in packet"

      Edited by Lan Dam
Please register or sign in to reply
def detect_log_packet_format(packet: List[str]) -> Optional[LogFileFormat]:
"""
Detect the format of a log file packet. The format can be either rt2ms',
logpeek's, or SOHStationViewer's.
:param packet: a packet extracted from a log file.
:return: the format of packet. Can be either 'rt2ms', 'logpeek', or
'sohstationviewer'.
"""
# We want to take advantage of the metadata written by the various programs
# as much as possible.
if packet[0].startswith('logpeek'):
return 'logpeek'
elif packet[0].startswith('rt2ms'):
return 'rt2ms'
elif packet[0].startswith('sohstationviewer'):
return 'sohstationviewer'
packet_start = packet[0]
# The first line of a packet in a log file generated by rt2ms starts with
# a 2-letter packet type. That is then followed by an empty space and the
# string 'exp'.
if packet_start[:2] in RT130_PACKETS and packet_start[3:6] == 'exp':
return 'rt2ms'
# SOHStationViewer stores all events' info at the end of its log file, and
# so if we see the line
# Events:
# at the start of a packet, we know the log file is from SOHStationViewer.
if packet_start.startswith('Events:'):
return 'sohstationviewer'
# Unlike logpeek, we are writing the mass position in its own packet. So,
# a packet that starts with mass position data would be an SOHStationViewer
# packet.
if packet_start.startswith('LPMP'):
return 'sohstationviewer'
# Logpeek write its events' info and mass-position data right after an SH
# packet.
packet_end = packet[-1]
packet_end_with_event_info = (packet_end.startswith('DAS') or
packet_end.startswith('WARNING'))
packet_end_with_mass_pos = packet_end.startswith('LPMP')
packet_end_special = packet_end_with_event_info or packet_end_with_mass_pos
if packet_start.startswith('State of Health') and packet_end_special:
return 'logpeek'
def parse_log_packet_unknown_format(packet: List[str]) -> SeparatedPacketLines:
"""
Parse a log packet assuming that the format of the log file is unknown. In
this case, all the lines in the packet are SOH lines.
:param packet: list of lines in the packet
:return: the lists of event lines, SOH lines, and mass-position lines in
packet
"""
eh_et_lines = []
soh_lines = packet
masspos_lines = []
return eh_et_lines, soh_lines, masspos_lines
def parse_log_packet_logpeek(packet: List[str]) -> SeparatedPacketLines:
"""
Parse a log packet assuming that the log file comes from logpeek. In this
case, a packet can be composed of SOH lines, event info lines, and
mass-position lines.
:param packet: list of lines in the packet
:return: the lists of event lines, SOH lines, and mass-position lines in
packet
"""
eh_et_lines = []
soh_lines = []
masspos_lines = []
for line in packet:
if line.startswith('DAS: '):
eh_et_lines.append(line)
elif line.startswith('LPMP'):
masspos_lines.append(line)
else:
soh_lines.append(line)
return eh_et_lines, soh_lines, masspos_lines
def parse_log_packet_rt2ms(packet: List[str]) -> SeparatedPacketLines:
"""
Parse a log packet assuming that the log file comes from rt2ms. In this
case, SOH data and event info are stored in separate packets. The first
line of a packet is a header that contains some metadata.
:param packet: list of lines in the packet
:return: the lists of event lines, SOH lines, and mass-position lines in
packet
"""
eh_et_lines = []
soh_lines = []
masspos_lines = []
if packet[0].startswith('EH') or packet[0].startswith('ET'):
# The event info is summarized in the last line of an event info packet
eh_et_lines = [packet[-1]]
else:
# The header is not counted as an SOH line.
soh_lines = packet[1:]
return eh_et_lines, soh_lines, masspos_lines
def parse_log_packet_sohstationviewer(packet: List[str]
) -> SeparatedPacketLines:
"""
Parse a log packet assuming that the log file comes from sohstationviewer.
In this case, the file is composed mainly of SOH packets, with the event
info lines being written at the end of the file.
:param packet: list of lines in the packet
:return: the lists of event lines, SOH lines, and mass-position lines in
packet
"""
eh_et_lines = []
soh_lines = []
masspos_lines = []
if packet[0].startswith('Events:'):
eh_et_lines = packet[1:]
else:
soh_lines = packet
return eh_et_lines, soh_lines, masspos_lines
class LogFile:
"""
Iterator over a log file.
"""
def __init__(self, file_path: Path):
self.file_path = file_path
self.file = open(file_path)
def __iter__(self):
return self
def __next__(self) -> List[str]:
line = self.file.readline()
if line == '':
self.file.close()
raise StopIteration
# The log packets are separated by empty lines, so we know we have
# reached the next packet when we find a non-empty line.
while line == '\n':
line = self.file.readline()
packet = []
# We have to check that we are not at the end of the file as well.
while line != '\n' and line != '':
packet.append(line)
line = self.file.readline()
if line == '':
break
# If there are more than one blank lines at the end of a log file, the
# last packet will be empty. This causes problem if the log file came
# from rt2ms or SOHStationViewer.
if not packet:
self.file.close()
raise StopIteration
return packet
def __del__(self):
"""
Close the file handle when this iterator is garbage collected just to
be absolutely sure that no memory is leaked.
"""
self.file.close()
# A function that take in a log file packet and separate it into event info
# lines, SOH lines, and mass-position lines.
Parser = Callable[[List[str]], SeparatedPacketLines]
# Mapping each log packet type to its corresponding parser.
PACKET_PARSERS: Dict[Optional[LogFileFormat], Parser] = {
None: parse_log_packet_unknown_format,
'sohstationviewer': parse_log_packet_sohstationviewer,
'rt2ms': parse_log_packet_rt2ms,
'logpeek': parse_log_packet_logpeek
}
def get_experiment_number(soh_lines: List[str]):
"""
Get the experiment number from the list of SOH lines in a packet.
:param soh_lines: the list of SOH lines from a packet
:return: the experiment number if the packet has the correct format, None
otherwise
"""
# The experiment number only exists in SC packets.
if not soh_lines[0].startswith('Station Channel Definition'):
return None
# The experiment number can be in the first (rt2ms) or second (logpeek,
# SOHStationViewer) line after the header. These lines are indented, so we
# have to strip them of whitespace.
if soh_lines[1].strip().startswith('Experiment Number ='):
experiment_number_line = soh_lines[1].split()
elif soh_lines[2].strip().startswith('Experiment Number ='):
experiment_number_line = soh_lines[2].split()
else:
return None
# If the experiment number is not recorded, we know that it will be 0. In
# order to not have too many return statements, we add 0 to the experiment
# line instead of returning it immediately.
if len(experiment_number_line) < 4:
experiment_number_line.append('0')
return experiment_number_line[-1]
class LogFileReader:
"""
Class that reads a log file.
"""
def __init__(self, file_path: Path):
self.file_path = file_path
self.log_file_type: Optional[LogFileFormat] = None
self.eh_et_lines: List[str] = []
self.soh_lines: List[str] = []
self.masspos_lines: List[str] = []
self.station_code: Optional[str] = None
self.experiment_number: Optional[str] = None
def read(self) -> None:
"""
Read the log file.
"""
log_file = LogFile(self.file_path)
for packet in log_file:
if self.log_file_type is None:
self.log_file_type = detect_log_packet_format(packet)
parser = PACKET_PARSERS[self.log_file_type]
eh_et_lines, soh_lines, masspos_lines = parser(packet)
if self.station_code is None and soh_lines:
# All header lines contain the station code at the end.
self.station_code = soh_lines[0].split(' ')[-1].strip()
if self.experiment_number is None and soh_lines:
found_experiment_number = get_experiment_number(soh_lines)
self.experiment_number = found_experiment_number
self.eh_et_lines.extend(eh_et_lines)
self.soh_lines.extend(soh_lines)
# We need to add a new line between two blocks of SOH lines to
# separate them. This makes it so that we don't have to manually
# separate the SOH lines blocks during processing.
self.soh_lines.append('\n')
self.masspos_lines.extend(masspos_lines)
def process_mass_poss_line(masspos_lines: List[str]
) -> List[Tuple[np.ndarray, np.ndarray]]:
"""
Process a list of mass-position lines into a list of mass-position data,
sorted by the channel suffix
:param masspos_lines: a list of mass-position log lines
:return: a list of mass-position data, sorted by the channel suffix
"""
# There can be 6 mass-position channels.
mass_pos_data = []
for masspos_num in range(1, 7):
current_lines = [line.split()
for line
in masspos_lines
if int(line.split()[2]) == masspos_num]
data = np.asarray([line[3] for line in current_lines], dtype=float)
time_format = '%Y:%j:%H:%M:%S.%f'
times = np.array(
# strptime requires the microsecond component to have 6 digits, but
# a mass-position log lines only have 3 digits for microsecond. So,
# we have to pad the time with 0s.
[UTCDateTime.strptime(line[1] + '000', time_format).timestamp
for line in current_lines]
)
mass_pos_data.append((times, data))
return mass_pos_data
Loading