Skip to content
Snippets Groups Projects

Write log file

Merged Kien Le requested to merge feature-write_log_file into master
All threads resolved!
1 file
+ 6
2
Compare changes
  • Side-by-side
  • Inline
"""
"""
RT130 object to hold and process RefTek data
RT130 object to hold and process RefTek data
"""
"""
 
import datetime
from pathlib import Path
from pathlib import Path
from typing import Union, List, Tuple, Dict
from typing import Union, List, Tuple, Dict
import traceback
import traceback
import numpy as np
import numpy as np
 
from obspy import UTCDateTime
from obspy.core import Stream
from obspy.core import Stream
from sohstationviewer.conf import constants
from sohstationviewer.conf import constants
 
from sohstationviewer.conf.constants import SOFTWARE_VERSION
from sohstationviewer.model.reftek_data.reftek_reader.log_file_reader import (
from sohstationviewer.model.reftek_data.reftek_reader.log_file_reader import (
LogFileReader, process_mass_poss_line,
LogFileReader, process_mass_poss_line,
)
)
@@ -26,6 +29,24 @@ from sohstationviewer.model.reftek_data.reftek_reader import core, soh_packet
@@ -26,6 +29,24 @@ from sohstationviewer.model.reftek_data.reftek_reader import core, soh_packet
from sohstationviewer.model.reftek_data.log_info import LogInfo
from sohstationviewer.model.reftek_data.log_info import LogInfo
 
def format_timestamp_as_mass_pos_time(timestamp: float) -> str:
 
"""
 
Format a UTC timestamp in the format of a mass-position time in SOH
 
messages.
 
:param timestamp: the UTC timestamp to format
 
:return: the formatted time string
 
"""
 
# We trim the last three character because the %f format character has a
 
# millisecond precision (6 characters), while the time format we use only
 
# has a microsecond precision (3 characters).
 
return UTCDateTime(timestamp).strftime('%Y:%j:%H:%M:%S.%f')[:-3]
 
 
 
format_timestamp_as_mass_pos_time = np.vectorize(
 
format_timestamp_as_mass_pos_time, otypes=[str]
 
)
 
 
class RT130(GeneralData):
class RT130(GeneralData):
"""
"""
read and process reftek file into object with properties can be used to
read and process reftek file into object with properties can be used to
@@ -114,6 +135,41 @@ class RT130(GeneralData):
@@ -114,6 +135,41 @@ class RT130(GeneralData):
# this happens when there is text or ascii only in the data
# this happens when there is text or ascii only in the data
self.data_time[key] = [self.read_start, self.read_end]
self.data_time[key] = [self.read_start, self.read_end]
 
for key in self.log_data:
 
if key == 'TEXT':
 
continue
 
soh_header = ''
 
soh_footer = ''
 
soh_messages = self.log_data[key]['SOH']
 
# We want to include a log file header only when there is not
 
# already one.
 
possible_programs = ['logpeek', 'SOHStationViewer', 'rt2ms']
 
message_has_header = any(soh_messages[0].startswith(program)
 
for program
 
in possible_programs)
 
if not message_has_header:
 
current_time = datetime.datetime.now().ctime()
 
soh_header = (f'SOHStationViewer: v{SOFTWARE_VERSION} '
 
f'Run time (UTC): {current_time}\n')
 
if self.include_masspos_in_soh_messages:
 
soh_footer += '\n\nMass-positions:\n'
 
mass_pos_lines = []
 
soh_messages = self.log_data[key]['SOH']
 
for chan in self.mass_pos_data[key]:
 
chan_data = self.mass_pos_data[key][chan]
 
# We combined all data into one trace above
 
trace = chan_data['tracesInfo'][0]
 
times = format_timestamp_as_mass_pos_time(trace['times'])
 
data = trace['data'].astype(times.dtype)
 
formatted_lines = [
 
f'LPMP {time} {chan[-1]} {mass_position}'
 
for (time, mass_position)
 
in zip(times, data)
 
]
 
mass_pos_lines.extend(formatted_lines)
 
soh_footer += '\n'.join(mass_pos_lines)
 
soh_messages[0] = soh_header + soh_messages[0] + soh_footer
 
def read_log_files(self):
def read_log_files(self):
"""
"""
Read data from self.rt130_log_files and store it in self.log_data
Read data from self.rt130_log_files and store it in self.log_data
Loading