Skip to content
Snippets Groups Projects

Read log files generated by users

Merged Kien Le requested to merge feature-#94-read_log_files into master
1 file
+ 1
3
Compare changes
  • Side-by-side
  • Inline
@@ -8,6 +8,9 @@ import numpy as np
from obspy.core import Stream
from sohstationviewer.conf import constants
from sohstationviewer.model.reftek_data.reftek_reader.log_file_reader import (
LogFileReader, process_mass_poss_line,
)
from sohstationviewer.view.util.enums import LogType
from sohstationviewer.model.general_data.general_data import (
@@ -66,7 +69,12 @@ class RT130(GeneralData):
def processing_data(self):
if self.creator_thread.isInterruptionRequested():
raise ThreadStopped()
self.read_folders()
# We separate the reading of log files and real data sets because their
# formats are very different.
if self.rt130_log_files:
self.read_log_files()
else:
self.read_folders()
self.selected_key = self.select_key()
if self.creator_thread.isInterruptionRequested():
@@ -106,6 +114,64 @@ class RT130(GeneralData):
# this happens when there is text or ascii only in the data
self.data_time[key] = [self.read_start, self.read_end]
def read_log_files(self):
"""
Read data from self.rt130_log_files and store it in self.log_data
"""
for log_file in self.rt130_log_files:
reader = LogFileReader(log_file)
reader.read()
file_key = (reader.station_code, reader.experiment_number)
self.populate_cur_key_for_all_data(file_key)
# We are creating the value for both keys 'SOH' and 'EHET' in this
# method (unlike how RT130 is usually read), so we only need to do
# one check.
if 'EHET' not in self.log_data[file_key]:
self.log_data[file_key]['EHET'] = [(1, reader.eh_et_lines)]
self.log_data[file_key]['SOH'] = [(1, reader.soh_lines)]
else:
# Just in case we are reading multiple files with the same key.
# We are going to assume that the log files are read in order.
# That makes dealing with multiple files a lot easier. We can
# always sort the processed SOH data if this assumption is
# wrong.
key_file_count = self.log_data[file_key]['EHET'][-1][0] + 1
self.log_data[file_key]['EHET'].append(
(key_file_count, reader.eh_et_lines)
)
self.log_data[file_key]['SOH'].append(
(key_file_count, reader.soh_lines)
)
self.process_mass_pos_log_lines(file_key, reader.masspos_lines)
def process_mass_pos_log_lines(self, key: Tuple[str, str],
masspos_lines: List[str]):
"""
Process mass-position log lines and store the result in
self.masspos_data.
:param key: the current data set key
:param masspos_lines: the mass-position lines to process
"""
# Mass-position channels is suffixed by a number from 1 to 6.
processed_masspos_data = process_mass_poss_line(masspos_lines)
for i, (times, data) in enumerate(processed_masspos_data, start=1):
if len(data) == 0:
continue
masspos_chan = f'MassPos{i}'
trace = {'startTmEpoch': times[0], 'endTmEpoch': times[-1],
'data': data, 'times': times}
if masspos_chan not in self.mass_pos_data[key]:
self.mass_pos_data[key][masspos_chan] = (
{'tracesInfo': []}
)
self.mass_pos_data[key][masspos_chan]['samplerate'] = 0
trace['startTmEpoch'] = times[0]
trace['endTmEpoch'] = times[-1]
traces = self.mass_pos_data[key][masspos_chan]['tracesInfo']
traces.append(trace)
def read_folders(self) -> None:
"""
Read data from list_of_dir or list_of_rt130_paths for soh,
Loading