Skip to content
Snippets Groups Projects
Commit 47ae466b authored by Kien Le's avatar Kien Le
Browse files

Refactored mass-position log processing

parent 488c49d0
No related branches found
No related tags found
No related merge requests found
This commit is part of merge request !191. Comments created here will be created in the context of that merge request.
...@@ -10,7 +10,9 @@ from obspy.core import Stream ...@@ -10,7 +10,9 @@ from obspy.core import Stream
from sohstationviewer.conf import constants from sohstationviewer.conf import constants
from sohstationviewer.model.reftek_data.reftek_reader.log_file_reader import \ from sohstationviewer.model.reftek_data.reftek_reader.log_file_reader import \
LogFileReader (
LogFileReader, process_mass_poss_line,
)
from sohstationviewer.view.util.enums import LogType from sohstationviewer.view.util.enums import LogType
from sohstationviewer.model.general_data.general_data import \ from sohstationviewer.model.general_data.general_data import \
...@@ -151,20 +153,11 @@ class RT130(GeneralData): ...@@ -151,20 +153,11 @@ class RT130(GeneralData):
:param masspos_lines: the mass-position lines to process :param masspos_lines: the mass-position lines to process
""" """
# Mass-position channels is suffixed by a number from 1 to 6. # Mass-position channels is suffixed by a number from 1 to 6.
for masspos_num in range(1, 7): processed_masspos_data = process_mass_poss_line(masspos_lines)
masspos_chan = f'MassPos{masspos_num}' for i, (times, data) in enumerate(processed_masspos_data):
current_lines = [line.split()
for line
in masspos_lines
if int(line.split()[2]) == masspos_num]
data = np.asarray([line[3] for line in current_lines], dtype=float)
time_format = '%Y:%j:%H:%M:%S.%f'
times = np.array(
[UTCDateTime.strptime(line[1] + '000', time_format).timestamp
for line in current_lines]
)
if len(data) == 0: if len(data) == 0:
continue continue
masspos_chan = f'MassPos{i}'
trace = {'startTmEpoch': times[0], 'endTmEpoch': times[-1], trace = {'startTmEpoch': times[0], 'endTmEpoch': times[-1],
'data': data, 'times': times} 'data': data, 'times': times}
if masspos_chan not in self.mass_pos_data[key]: if masspos_chan not in self.mass_pos_data[key]:
......
from pathlib import Path from pathlib import Path
from typing import List, Literal, Optional, Dict, Callable, Tuple from typing import List, Literal, Optional, Dict, Callable, Tuple
import numpy as np
from obspy import UTCDateTime
LogFileFormat = Literal['rt2ms', 'logpeek', 'sohstationviewer'] LogFileFormat = Literal['rt2ms', 'logpeek', 'sohstationviewer']
# These packets can be found in section 4 of the RT130 record documentation. # These packets can be found in section 4 of the RT130 record documentation.
RT130_PACKETS = ['SH', 'SC', 'OM', 'DS', 'AD', 'CD', 'FD', 'EH', 'ET'] RT130_PACKETS = ['SH', 'SC', 'OM', 'DS', 'AD', 'CD', 'FD', 'EH', 'ET']
...@@ -256,3 +259,31 @@ class LogFileReader: ...@@ -256,3 +259,31 @@ class LogFileReader:
# separate the SOH lines blocks during processing. # separate the SOH lines blocks during processing.
self.soh_lines.append('\n') self.soh_lines.append('\n')
self.masspos_lines.extend(masspos_lines) self.masspos_lines.extend(masspos_lines)
def process_mass_poss_line(masspos_lines: List[str]
) -> List[Tuple[np.ndarray, np.ndarray]]:
"""
Process a list of mass-position lines into a list of mass-position data,
sorted by the channel suffix
:param masspos_lines: a list of mass-position log lines
:return: a list of mass-position data, sorted by the channel suffix
"""
# There can be 6 mass-position channels.
mass_pos_data = []
for masspos_num in range(6):
current_lines = [line.split()
for line
in masspos_lines
if int(line.split()[2]) == masspos_num]
data = np.asarray([line[3] for line in current_lines], dtype=float)
time_format = '%Y:%j:%H:%M:%S.%f'
times = np.array(
# strptime requires the microsecond component to have 6 digits, but
# a mass-position log lines only have 3 digits for microsecond. So,
# we have to pad the time with 0s.
[UTCDateTime.strptime(line[1] + '000', time_format).timestamp
for line in current_lines]
)
mass_pos_data.append((times, data))
return mass_pos_data
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment