Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
Showing
with 1987 additions and 82 deletions
......@@ -5,7 +5,7 @@ from sohstationviewer.database.process_db import execute_db_dict, execute_db
from sohstationviewer.conf.dbSettings import dbConf
def get_chan_plot_info(org_chan_id: str, chan_info: Dict, data_type: str,
def get_chan_plot_info(org_chan_id: str, data_type: str,
color_mode: ColorMode = 'B') -> Dict:
"""
Given chanID read from raw data file and detected dataType
......@@ -24,10 +24,10 @@ def get_chan_plot_info(org_chan_id: str, chan_info: Dict, data_type: str,
chan = 'VM?'
if org_chan_id.startswith('MassPos'):
chan = 'MassPos?'
if org_chan_id.startswith('DS'):
chan = 'SEISMIC'
if org_chan_id.startswith('Event DS'):
chan = 'Event DS?'
if org_chan_id.startswith('DS'):
chan = 'DS?'
if org_chan_id.startswith('Disk Usage'):
chan = 'Disk Usage?'
if dbConf['seisRE'].match(chan):
......@@ -46,17 +46,13 @@ def get_chan_plot_info(org_chan_id: str, chan_info: Dict, data_type: str,
sql = (f"{o_sql} WHERE channel='{chan}' and C.param=P.param"
f" and dataType='{data_type}'")
chan_db_info = execute_db_dict(sql)
seismic_label = None
if len(chan_db_info) == 0:
chan_db_info = execute_db_dict(
f"{o_sql} WHERE channel='DEFAULT' and C.param=P.param")
else:
if chan_db_info[0]['channel'] == 'SEISMIC':
try:
chan_db_info[0]['label'] = dbConf['seisLabel'][org_chan_id[-1]]
except KeyError:
chan_db_info[0]['label'] = str(chan_info['samplerate'])
seismic_label = get_seismic_chan_label(org_chan_id)
chan_db_info[0]['channel'] = org_chan_id
chan_db_info[0]['label'] = (
......@@ -68,6 +64,8 @@ def get_chan_plot_info(org_chan_id: str, chan_info: Dict, data_type: str,
else chan_db_info[0]['fixPoint'])
if chan_db_info[0]['label'].strip() == '':
chan_db_info[0]['label'] = chan_db_info[0]['channel']
elif seismic_label is not None:
chan_db_info[0]['label'] = seismic_label
else:
chan_db_info[0]['label'] = '-'.join([chan_db_info[0]['channel'],
chan_db_info[0]['label']])
......@@ -76,30 +74,23 @@ def get_chan_plot_info(org_chan_id: str, chan_info: Dict, data_type: str,
return chan_db_info[0]
def get_wf_plot_info(org_chan_id: str, *args, **kwargs) -> Dict:
"""
:param org_chan_id: channel name read from data source
:param chan_info: to be compliant with get_chan_plot_info()
:param data_type: to be compliant with get_chan_plot_info()
:param color_mode: to be compliant with get_chan_plot_info()
:return info of channel read from DB which is used for plotting
"""
# Waveform plot's color is fixed to NULL in the database, so we do not need
# to get the valueColors columns from the database.
chan_info = execute_db_dict(
"SELECT param, plotType, height "
"FROM Parameters WHERE param='Seismic data'")
# The plotting API still requires that the key 'valueColors' is mapped to
# something, so we are setting it to None.
chan_info[0]['valueColors'] = None
chan_info[0]['label'] = get_chan_label(org_chan_id)
chan_info[0]['unit'] = ''
chan_info[0]['channel'] = 'SEISMIC'
chan_info[0]['convertFactor'] = 1
return chan_info[0]
def get_convert_factor(chan_id, data_type):
sql = f"SELECT convertFactor FROM Channels WHERE channel='{chan_id}' " \
f"AND dataType='{data_type}'"
ret = execute_db(sql)
if ret:
return ret[0][0]
else:
return None
def get_chan_label(chan_id):
def get_seismic_chan_label(chan_id):
"""
Get label for chan_id in which data stream can use chan_id for label while
other seismic need to add coordinate to chan_id for label
:param chan_id: name of channel
:return label: label to put in front of the plot of the channel
"""
if chan_id.startswith("DS"):
label = chan_id
else:
......
No preview for this file type
......@@ -9,8 +9,8 @@ from PySide2 import QtCore, QtWidgets
from sohstationviewer.conf import constants
from sohstationviewer.controller.util import display_tracking_info
from sohstationviewer.model.data_type_model import (
DataTypeModel, ThreadStopped, ProcessingDataError)
from sohstationviewer.model.general_data.general_data import (
GeneralData, ThreadStopped, ProcessingDataError)
from sohstationviewer.view.util.enums import LogType
......@@ -18,7 +18,7 @@ class DataLoaderWorker(QtCore.QObject):
"""
The worker class that executes the code to load the data.
"""
finished = QtCore.Signal(DataTypeModel)
finished = QtCore.Signal(GeneralData)
failed = QtCore.Signal()
stopped = QtCore.Signal()
notification = QtCore.Signal(QtWidgets.QTextBrowser, str, str)
......@@ -26,23 +26,28 @@ class DataLoaderWorker(QtCore.QObject):
button_chosen = QtCore.Signal(int)
def __init__(self, data_type: str, tracking_box: QtWidgets.QTextBrowser,
is_multiplex: Optional[bool],
folder: str, list_of_rt130_paths: List[Path],
req_wf_chans: Union[List[str], List[int]] = [],
req_soh_chans: List[str] = [], read_start: float = 0,
gap_minimum: Optional[float] = None,
read_end: float = constants.HIGHEST_INT,
include_mp123: bool = False, include_mp456: bool = False,
parent_thread=None):
rt130_waveform_data_req: bool = False, parent_thread=None):
super().__init__()
self.data_type = data_type
self.tracking_box = tracking_box
self.is_multiplex = is_multiplex
self.folder = folder
self.list_of_rt130_paths = list_of_rt130_paths
self.req_wf_chans = req_wf_chans
self.req_soh_chans = req_soh_chans
self.gap_minimum = gap_minimum
self.read_start = read_start
self.read_end = read_end
self.include_mp123 = include_mp123
self.include_mp456 = include_mp456
self. rt130_waveform_data_req = rt130_waveform_data_req
self.parent_thread = parent_thread
# display_tracking_info updates a QtWidget, which can only be done in
# the read. Since self.run runs in a background thread, we need to use
......@@ -57,7 +62,7 @@ class DataLoaderWorker(QtCore.QObject):
from sohstationviewer.model.reftek.reftek import RT130
object_type = RT130
else:
from sohstationviewer.model.mseed.mseed import MSeed
from sohstationviewer.model.mseed_data.mseed import MSeed
object_type = MSeed
# Create data object without loading any data in order to connect
# its unpause slot to the loader's unpause signal
......@@ -65,12 +70,14 @@ class DataLoaderWorker(QtCore.QObject):
self.button_chosen.connect(data_object.receive_pause_response,
type=QtCore.Qt.DirectConnection)
data_object.__init__(
self.data_type, self.tracking_box, self.folder,
self.data_type, self.tracking_box,
self.is_multiplex, self.folder,
self.list_of_rt130_paths, req_wf_chans=self.req_wf_chans,
req_soh_chans=self.req_soh_chans,
req_soh_chans=self.req_soh_chans, gap_minimum=self.gap_minimum,
read_start=self.read_start, read_end=self.read_end,
include_mp123zne=self.include_mp123,
include_mp456uvw=self.include_mp456,
rt130_waveform_data_req=self.rt130_waveform_data_req,
creator_thread=self.parent_thread,
notification_signal=self.notification,
pause_signal=self.button_dialog
......@@ -107,14 +114,19 @@ class DataLoader(QtCore.QObject):
self.thread: Optional[QtCore.QThread] = None
self.worker: Optional[DataLoaderWorker] = None
def init_loader(self, data_type: str, tracking_box: QtWidgets.QTextBrowser,
def init_loader(self, data_type: str,
tracking_box: QtWidgets.QTextBrowser,
is_multiplex: bool,
list_of_dir: List[Union[str, Path]],
list_of_rt130_paths: List[Union[str, Path]],
req_wf_chans: Union[List[str], List[int]] = [],
req_soh_chans: List[str] = [], read_start: float = 0,
req_soh_chans: List[str] = [],
gap_minimum: Optional[float] = None,
read_start: float = 0,
read_end: float = constants.HIGHEST_INT,
include_mp123: bool = False,
include_mp456: bool = False):
include_mp456: bool = False,
rt130_waveform_data_req: bool = False):
"""
Initialize the data loader. Construct the thread and worker and connect
them together. Separated from the actual loading of the data to allow
......@@ -142,14 +154,17 @@ class DataLoader(QtCore.QObject):
self.worker = DataLoaderWorker(
data_type,
tracking_box,
is_multiplex,
list_of_dir[0], # Only work on one directory for now.
list_of_rt130_paths,
req_wf_chans=req_wf_chans,
req_soh_chans=req_soh_chans,
gap_minimum=gap_minimum,
read_start=read_start,
read_end=read_end,
include_mp123=include_mp123,
include_mp456=include_mp456,
rt130_waveform_data_req=rt130_waveform_data_req,
parent_thread=self.thread
)
......
......@@ -43,6 +43,7 @@ class DataTypeModel():
read_end: Optional[float] = UTCDateTime().timestamp,
include_mp123zne: bool = False,
include_mp456uvw: bool = False,
rt130_waveform_data_req: bool = False,
creator_thread: Optional[QtCore.QThread] = None,
notification_signal: Optional[QtCore.Signal] = None,
pause_signal: Optional[QtCore.Signal] = None,
......@@ -60,6 +61,7 @@ class DataTypeModel():
:param read_end: requested end time to read
:param include_mp123zne: if mass position channels 1,2,3 are requested
:param include_mp456uvw: if mass position channels 4,5,6 are requested
:param rt130_waveform_data_req: flag for RT130 to read waveform data
:param creator_thread: the thread the current DataTypeModel instance is
being created in. If None, the DataTypeModel instance is being
created in the main thread
......@@ -78,6 +80,7 @@ class DataTypeModel():
self.read_end = read_end
self.include_mp123zne = include_mp123zne
self.include_mp456uvw = include_mp456uvw
self.rt130_waveform_data_req = rt130_waveform_data_req
if creator_thread is None:
err_msg = (
'A signal is not None while running in main thread'
......@@ -357,7 +360,8 @@ class DataTypeModel():
list_of_rt130_paths,
req_wf_chans=[], req_soh_chans=[],
read_start=0, read_end=constants.HIGHEST_INT,
include_mp123=False, include_mp456=False):
include_mp123=False, include_mp456=False,
rt130_waveform_data_req=False):
"""
Create a DataTypeModel object, with the concrete class being based on
data_type. Run on the same thread as its caller, and so will block the
......@@ -383,7 +387,9 @@ class DataTypeModel():
data_type, tracking_box, folder, list_of_rt130_paths,
reqWFChans=req_wf_chans, reqSOHChans=req_soh_chans,
readStart=read_start, readEnd=read_end,
include_mp123=include_mp123, include_mp456=include_mp456)
include_mp123=include_mp123, include_mp456=include_mp456,
rt130_waveform_data_req=rt130_waveform_data_req
)
else:
from sohstationviewer.model.mseed.mseed import MSeed
data_object = MSeed(
......
## Log data:
info from log channels, soh messages, text file in dict:
{'TEXT': [str,], key:{chan_id: [str,],},}
In which 'TEXT': is the chan_id given by sohview for text only files which have
no station or channel associate with it.
Note: log_data for RT130's dataset has only one channel: SOH
## data_dict:
{set_key: {
chan_id (str): {
'file_path' (str): path of file to keep track of file changes in MSeedReader
'chanID' (str): name of channel
'samplerate' (float): Sampling rate of the data
'startTmEpoch' (float): start epoch time of channel
'endTmEpoch' (float): end epoch time of channel
'size' (int): size of channel data
'tracesInfo': [{
'startTmEpoch': Start epoch time of the trace - float
'endTmEpoch': End epoch time of the trace - float
'times': time of channel's trace: List[float] in mseed_reader but changed to ndarray in combine_data()
'data': data of channel's trace: List[float] in mseed_reader but changed to ndarray in combine_data()
}]
'tps_data': list of lists of mean of square of every 5m of data in each day
'times' (np.array): times that has been trimmed and down-sampled for plotting
'data' (np.array): data that has been trimmed and down-sampled for plotting
'chan_db_info' (dict): the plotting parameters got from database
for this channel - dict,
ax: axes to draw the channel in PlottingWidget
ax_wf (matplotlib.axes.Axes): axes to draw the channel in WaveformWidget
}
}
Use both ax and ax_wf because mass position channels are plotted in both widgets while
soh channels are plotted in PlottingWidget and waveform channel are plotted in WaveformWidget
tps_data created in TimePoserSquareWidget only and apply for waveform_data only
## tps_data: data that aren't separated to traces
{set_key - str or (str, str): {
chan_id - str: {
times: np.array,
data: np.array,
}
}
}
\ No newline at end of file
from __future__ import annotations
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Optional, Union, List, Tuple, Dict
from obspy import UTCDateTime
from PySide2 import QtCore
from PySide2 import QtWidgets
from sohstationviewer.controller.util import display_tracking_info
from sohstationviewer.view.plotting.gps_plot.gps_point import GPSPoint
from sohstationviewer.view.util.enums import LogType
from sohstationviewer.database.process_db import execute_db
from sohstationviewer.model.general_data.general_data_helper import \
retrieve_data_time_from_data_dict, retrieve_gaps_from_data_dict, \
combine_data, sort_data, squash_gaps, apply_convert_factor_to_data_dict, \
reset_data
class ProcessingDataError(Exception):
def __init__(self, msg):
self.message = msg
class ThreadStopped(Exception):
"""
An exception that is raised when the user requests for the data loader
thread to be stopped.
"""
def __init__(self, *args, **kwargs):
self.args = (args, kwargs)
class GeneralData():
def __init__(self, data_type,
tracking_box: Optional[QtWidgets.QTextBrowser] = None,
is_multiplex: bool = False, folder: str = '.',
list_of_rt130_paths: List[Path] = [],
req_wf_chans: Union[List[str], List[int]] = [],
req_soh_chans: List[str] = [],
gap_minimum: float = None,
read_start: Optional[float] = UTCDateTime(0).timestamp,
read_end: Optional[float] = UTCDateTime().timestamp,
include_mp123zne: bool = False,
include_mp456uvw: bool = False,
rt130_waveform_data_req: bool = False,
creator_thread: Optional[QtCore.QThread] = None,
notification_signal: Optional[QtCore.Signal] = None,
pause_signal: Optional[QtCore.Signal] = None,
on_unittest: bool = False,
*args, **kwargs):
"""
CHANGED FROM data_type_model.DataTypeModel.__init__:
+ add self.is_multiplex, self.on_unittest, self.gap_minimum,
self.keys
+ remove docstring for self.log_data, self.soh_data,
self.mass_pos_data,
self.waveform_data, self.gaps_by_key_chan,
self.stream_header_by_key_chan
Super class for different data type to process data from data files
:param data_type: type of the object
:param tracking_box: widget to display tracking info
:param folder: path to the folder of data
:param list_of_rt130_paths: path to the folders of RT130 data
:param req_wf_chans: requested waveform channel list
:param req_soh_chans: requested SOH channel list
:param read_start: requested start time to read
:param read_end: requested end time to read
:param include_mp123zne: if mass position channels 1,2,3 are requested
:param include_mp456uvw: if mass position channels 4,5,6 are requested
:param rt130_waveform_data_req: flag for RT130 to read waveform data
:param creator_thread: the thread the current DataTypeModel instance is
being created in. If None, the DataTypeModel instance is being
created in the main thread
:param notification_signal: signal used to send notifications to the
main thread. Only not None when creator_thread is not None
:param pause_signal: signal used to notify the main thread that the
data loader is paused.
"""
self.data_type = data_type
self.is_multiplex = is_multiplex
self.tracking_box = tracking_box
self.dir = folder
self.list_of_rt130_paths = list_of_rt130_paths
self.req_soh_chans = req_soh_chans
self.req_wf_chans = req_wf_chans
self.gap_minimum = gap_minimum
self.read_start = read_start
self.read_end = read_end
self.include_mp123zne = include_mp123zne
self.include_mp456uvw = include_mp456uvw
self.rt130_waveform_data_req = rt130_waveform_data_req
self.on_unittest = on_unittest
if creator_thread is None:
err_msg = (
'A signal is not None while running in main thread'
)
assert notification_signal is None, err_msg
assert pause_signal is None, err_msg
self.creator_thread = QtCore.QThread()
else:
self.creator_thread = creator_thread
self.notification_signal = notification_signal
self.pause_signal = pause_signal
"""
processing_log: record the progress of processing
"""
self.processing_log: List[Tuple[str, LogType]] = []
"""
keys: list of all keys
"""
self.keys = []
DataKey = Union[Tuple[str, str], str]
"""
log_texts: dictionary of content of text files by filenames
"""
self.log_texts: Dict[str, str] = {}
# Look for description in data_structures.MD
self.log_data = {'TEXT': []} # noqa
self.waveform_data = {}
self.soh_data = {}
self.mass_pos_data = {}
"""
data_time: time range of data sets:
"""
self.data_time: Dict[DataKey, List[float]] = {}
"""
The given data may include more than one data set which is station_id
in mseed or (unit_id, exp_no) in reftek. User are allow to choose which
data set to be displayed
selected_key: str - key of the data set to be displayed
"""
self.selected_key: Optional[str] = None
"""
gaps: gaps info in dict:
"""
self.gaps: Dict[DataKey, List[List[float]]] = {}
"""
tmp_dir: dir to keep memmap files. Deleted when object is deleted
"""
self.tmp_dir_obj: TemporaryDirectory = TemporaryDirectory()
self.tmp_dir = self.tmp_dir_obj.name
if not on_unittest:
self.save_temp_data_folder_to_database()
self._pauser = QtCore.QSemaphore()
self.pause_response = None
self.gps_points: List[GPSPoint] = []
def read_folder(self, folder: str) -> Tuple[Dict]:
"""
FROM data_type_model.DataTypeModel.read_folder
Read data from given folder
:param folder: path to folder to read data
:return: Tuple of different data dicts
"""
pass
def select_key(self) -> Union[str, Tuple[str, str]]:
"""
FROM data_type_model.DataTypeModel.select_key
Get the key for the data set to process.
:return: key of the selected data set
"""
pass
def processing_data(self):
"""
CHANGED FROM data_type_model.Data_Type_Model.processing_data
"""
if self.creator_thread.isInterruptionRequested():
raise ThreadStopped()
self.read_folder(self.dir)
self.selected_key = self.select_key()
self.fill_empty_data()
if self.creator_thread.isInterruptionRequested():
raise ThreadStopped()
self.finalize_data()
def finalize_data(self):
"""
CHANGED FROM data_type_model.Data_Type_Model.finalize_data
This function should be called after all folders finish reading to
+ Filling an empty_dict into station with no data added in
data_dicts
+ Sort all data traces in time order
+ Combine traces in data and split at gaps > gap_minimum
+ Apply convert_factor to avoid using flags to prevent double
applying convert factor when plotting
+ Check not found channels
+ Retrieve gaps from data_dicts
+ Retrieve data_time from data_dicts
+ Change data time with default value that are invalid for plotting
to read_start, read_end.
"""
if self.selected_key is None:
return
self.track_info("Finalizing...", LogType.INFO)
self.sort_all_data()
self.combine_all_data()
self.apply_convert_factor_to_data_dicts()
self.retrieve_gaps_from_data_dicts()
self.retrieve_data_time_from_data_dicts()
if self.selected_key not in self.data_time.keys():
self.data_time[self.selected_key] = \
[self.read_start, self.read_end]
def __del__(self):
# FROM data_type_model.Data_Type_Model.__del__
print("delete dataType Object")
try:
del self.tmp_dir_obj
except OSError as e:
self.track_info(
"Error deleting %s : %s" % (self.tmp_dir, e.strerror),
LogType.ERROR)
print("Error deleting %s : %s" % (self.tmp_dir, e.strerror))
print("finish deleting")
def track_info(self, text: str, type: LogType) -> None:
"""
CHANGED FROM data_type_model.Data_Type_Model.track_info:
Display tracking info in tracking_box.
Add all errors/warnings to processing_log.
:param text: str - message to display
:param type: str - type of message (error/warning/info)
"""
# display_tracking_info updates a QtWidget, which can only be done in
# the main thread. So, if we are running in a background thread
# (i.e. self.creator_thread is not None), we need to use signal slot
# mechanism to ensure that display_tracking_info is run in the main
# thread.
if self.notification_signal is None:
display_tracking_info(self.tracking_box, text, type)
else:
self.notification_signal.emit(self.tracking_box, text, type)
if type != LogType.INFO:
self.processing_log.append((text, type))
def pause(self) -> None:
"""
FROM data_type_model.Data_Type_Model.pause
Pause the thread this DataTypeModel instance is in. Works by trying
to acquire a semaphore that is not available, which causes the thread
to block.
Note: due to how this is implemented, each call to pause will require
a corresponding call to unpause. Thus, it is inadvisable to call this
method more than once.
Caution: not safe to call in the main thread. Unless a background
thread releases the semaphore, the whole program will freeze.
"""
self._pauser.acquire()
@QtCore.Slot()
def unpause(self):
"""
FROM data_type_model.Data_Type_Model.unpause
Unpause the thread this DataTypeModel instance is in. Works by trying
to acquire a semaphore that is not available, which causes the thread
to block.
Caution: due to how this is implemented, if unpause is called before
pause, the thread will not be paused until another call to pause is
made. Also, like pause, each call to unpause must be matched by another
call to pause for everything to work.
"""
self._pauser.release()
@QtCore.Slot()
def receive_pause_response(self, response: object):
"""
FROM data_type_model.Data_Type_Model.receive_pause_response
Receive a response to a request made to another thread and unpause the
calling thread.
:param response: the response to the request made
:type response: object
"""
self.pause_response = response
self.unpause()
@classmethod
def get_empty_instance(cls) -> GeneralData:
"""
# FROM data_type_model.Data_Type_Model.get_empty_instance
Create an empty data object. Useful if a DataTypeModel instance is
needed, but it is undesirable to load a data set. Basically wraps
__new__().
:return: an empty data object
:rtype: DataTypeModel
"""
return cls.__new__(cls)
def save_temp_data_folder_to_database(self):
# FROM
# data_type_model.Data_Type_Model.save_temp_data_folder_to_database
execute_db(f'UPDATE PersistentData SET FieldValue="{self.tmp_dir}" '
f'WHERE FieldName="tempDataDirectory"')
def check_not_found_soh_channels(self):
# FROM data_type_model.Data_Type_Model.check_not_found_soh_channels
all_chans_meet_req = (
list(self.soh_data[self.selected_key].keys()) +
list(self.mass_pos_data[self.selected_key].keys()) +
list(self.log_data[self.selected_key].keys()))
not_found_chans = [c for c in self.req_soh_chans
if c not in all_chans_meet_req]
if not_found_chans != []:
msg = (f"No data found for the following channels: "
f"{', '.join( not_found_chans)}")
self.processing_log.append((msg, LogType.WARNING))
def sort_all_data(self):
"""
FROM data_type_model.Data_Type_Model.sort_all_data
Sort traces by startTmEpoch on all data: waveform_data, mass_pos_data,
soh_data.
Reftek's soh_data won't be sorted here. It has been sorted by time
because it is created from log data which is sorted in
prepare_soh_data_from_log_data()
"""
sort_data(self.waveform_data[self.selected_key])
sort_data(self.mass_pos_data[self.selected_key])
try:
sort_data(self.soh_data[self.selected_key])
except KeyError:
# Reftek's SOH trace doesn't have startTmEpoch and
# actually soh_data consists of only one trace
pass
def combine_all_data(self):
combine_data(self.selected_key, self.waveform_data, self.gap_minimum)
combine_data(self.selected_key, self.mass_pos_data, self.gap_minimum)
try:
combine_data(self.selected_key, self.soh_data, self.gap_minimum)
except KeyError:
# Reftek's SOH trace doesn't have startTmEpoch and
# actually soh_data consists of only one trace
pass
def retrieve_gaps_from_data_dicts(self):
"""
Getting gaps from each data_dicts then squash all related gaps
"""
self.gaps[self.selected_key] = []
retrieve_gaps_from_data_dict(
self.selected_key, self.soh_data, self.gaps)
retrieve_gaps_from_data_dict(
self.selected_key, self.mass_pos_data, self.gaps)
retrieve_gaps_from_data_dict(
self.selected_key, self.waveform_data, self.gaps)
self.gaps[self.selected_key] = squash_gaps(
self.gaps[self.selected_key])
def retrieve_data_time_from_data_dicts(self):
"""
Going through each data_dict to update the data_time to be
[min of startTimeEpoch, max of endTimeEpoch] for each station.
"""
retrieve_data_time_from_data_dict(
self.selected_key, self.soh_data, self.data_time)
retrieve_data_time_from_data_dict(
self.selected_key, self.mass_pos_data, self.data_time)
retrieve_data_time_from_data_dict(
self.selected_key, self.waveform_data, self.data_time)
def fill_empty_data(self):
"""
Filling an empty_dict into station with no data added in data_dicts
"""
for key in self.keys:
if key not in self.soh_data:
self.soh_data[key] = {}
if key not in self.waveform_data:
self.waveform_data[key] = {}
if key not in self.mass_pos_data:
self.mass_pos_data[key] = {}
if key not in self.log_data:
self.log_data[key] = {}
def apply_convert_factor_to_data_dicts(self):
"""
Applying convert_factor to avoid using flags to prevent double
applying convert factor when plotting
"""
apply_convert_factor_to_data_dict(
self.selected_key, self.soh_data, self.data_type)
apply_convert_factor_to_data_dict(
self.selected_key, self.mass_pos_data, self.data_type)
apply_convert_factor_to_data_dict(
self.selected_key, self.waveform_data, self.data_type)
def reset_all_selected_data(self):
"""
FROM data_type_model.reset_all_selected_data()
Remove all keys created in the plotting process, and change fullData
to False. This function is to replace deepcopy which uses more memory.
"""
reset_data(self.selected_key, self.soh_data)
reset_data(self.selected_key, self.waveform_data)
reset_data(self.selected_key, self.mass_pos_data)
from typing import List, Dict, Optional, Union, Tuple
import numpy as np
from sohstationviewer.database.extract_data import get_convert_factor
def _check_related_gaps(min1: float, max1: float,
min2: float, max2: float,
index: int, checked_indexes: List[int]):
"""
FROM handling_data.check_related_gaps
Check if the passing ranges overlapping each other and add indexes to
checked_indexes.
:param min1: start of range 1
:param max1: end of range 1
:param min2: start of range 2
:param max2: end of range 2
:param index: index of gap being checked
:param checked_indexes: list of gaps that have been checked
:return: True if the two ranges overlap each other, False otherwise
"""
if ((min1 <= min2 <= max1) or (min1 <= max2 <= max1)
or (min2 <= min1 <= max2) or (min2 <= max1 <= max2)):
# range [min1, max1] and [min2, max2] have some part overlap each other
checked_indexes.append(index)
return True
else:
return False
def squash_gaps(gaps: List[List[float]]) -> List[List[float]]:
"""
FROM handling_data.squash_gaps
Compress gaps from different channels that have time range related to
each other to the ones with outside boundary (min start, max end)
or (min end, max start) in case of overlap.
:param gaps: [[float, float],], [[float, float],] -
list of gaps of multiple channels: [[start, end],], [[start, end],]
:return: squashed_gaps: [[float, float],] - all related gaps are squashed
extending to the outside start and end
[[min start, max end], [max start, min end]]
"""
gaps = sorted(gaps, key=lambda x: x[0])
squashed_gaps = []
checked_indexes = []
for idx, g in enumerate(gaps):
if idx in checked_indexes:
continue
squashed_gaps.append(g)
checked_indexes.append(idx)
overlap = g[0] >= g[1]
for idx_, g_ in enumerate(gaps):
if idx_ in checked_indexes:
continue
if not overlap:
if g_[0] >= g_[1]:
continue
if _check_related_gaps(g[0], g[1], g_[0], g_[1],
idx_, checked_indexes):
squashed_gaps[-1][0] = min(g[0], g_[0])
squashed_gaps[-1][1] = max(g[1], g_[1])
else:
break
else:
if g_[0] < g_[1]:
continue
if _check_related_gaps(g[1], g[0], g_[1], g_[0],
idx_, checked_indexes):
squashed_gaps[-1][0] = max(g[0], g_[0])
squashed_gaps[-1][1] = min(g[1], g_[1])
return squashed_gaps
def sort_data(sta_data_dict: Dict) -> None:
"""
FROM handling_data.sort_data
Sort data in 'traces_info' of each channel by 'startTmEpoch' order
:param sta_data_dict: data of a station
"""
for chan_id in sta_data_dict:
traces_info = sta_data_dict[chan_id]['tracesInfo']
sta_data_dict[chan_id]['tracesInfo'] = sorted(
traces_info, key=lambda i: i['startTmEpoch'])
def retrieve_data_time_from_data_dict(
selected_key: Union[str, Tuple[str, str]],
data_dict: Dict, data_time: Dict[str, List[float]]) -> None:
"""
Going through each channel in each station to get data_time for each
station which is [min of startTimeEpoch, max of endTimeEpoch] among
the station's channels.
:param selected_key: the key of the selected data set
:param data_dict: the given data_dict
:param data_time: data by sta_id
"""
selected_data_dict = data_dict[selected_key]
for c in selected_data_dict:
dtime = [selected_data_dict[c]['startTmEpoch'],
selected_data_dict[c]['endTmEpoch']]
if selected_key in data_time.keys():
data_time[selected_key][0] = min(data_time[selected_key][0],
dtime[0])
data_time[selected_key][1] = max(data_time[selected_key][1],
dtime[1])
else:
data_time[selected_key] = dtime
def retrieve_gaps_from_data_dict(selected_key: Union[str, Tuple[str, str]],
data_dict: Dict,
gaps: Dict[str, List[List[float]]]) -> None:
"""
Create each station's gaps by adding all gaps from all channels
:param selected_key: the key of the selected data set
:param data_dict: given stream
:param gaps: gaps list by key
"""
selected_data_dict = data_dict[selected_key]
for c in selected_data_dict.keys():
cgaps = selected_data_dict[c]['gaps']
if cgaps != []:
gaps[selected_key] += cgaps
def combine_data(selected_key: Union[str, Tuple[str, str]],
data_dict: Dict, gap_minimum: Optional[float]) -> None:
"""
Traverse through traces in each channel, add to gap list if
delta >= gap_minimum with delta is the distance between
contiguous traces.
Combine sorted data using concatenate, which also change data ot ndarray
and update startTmEpoch and endTmEpoch.
:param selected_key: the key of the selected data set
:param station_data_dict: dict of data of a station
:param gap_minimum: minimum length of gaps to be detected
"""
selected_data_dict = data_dict[selected_key]
for chan_id in selected_data_dict:
channel = selected_data_dict[chan_id]
traces_info = channel['tracesInfo']
if 'gaps' in channel:
# gaps key is for mseed data only
for idx in range(len(traces_info) - 1):
curr_end_tm = traces_info[idx]['endTmEpoch']
next_start_tm = traces_info[idx + 1]['startTmEpoch']
delta = abs(curr_end_tm - next_start_tm)
if gap_minimum is not None and delta >= gap_minimum:
# add gap
gap = [curr_end_tm, next_start_tm]
selected_data_dict[chan_id]['gaps'].append(gap)
channel['startTmEpoch'] = min([tr['startTmEpoch']
for tr in traces_info])
channel['endTmEpoch'] = max([tr['endTmEpoch'] for tr in traces_info])
data_list = [tr['data'] for tr in traces_info]
times_list = [tr['times'] for tr in traces_info]
channel['tracesInfo'] = [{
'startTmEpoch': channel['startTmEpoch'],
'endTmEpoch': channel['endTmEpoch'],
'data': np.concatenate(data_list),
'times': np.concatenate(times_list)
}]
def apply_convert_factor_to_data_dict(
selected_key: Union[str, Tuple[str, str]],
data_dict: Dict, data_type: str) -> None:
"""
Traverse through traces in each channel to convert data according to
convert_factor got from DB
:param selected_key: the key of the selected data set
:param data_dict: dict of data
:param data_type: type of data
"""
selected_data_dict = data_dict[selected_key]
for chan_id in selected_data_dict:
channel = selected_data_dict[chan_id]
convert_factor = get_convert_factor(chan_id, data_type)
if convert_factor is not None and convert_factor != 1:
for tr in channel['tracesInfo']:
tr['data'] = convert_factor * tr['data']
def reset_data(selected_key: Union[str, Tuple[str, str]], data_dict: Dict):
"""
FROM data_type_model.reset_data()
Remove all keys created in the plotting process for the given data dict
:param selected_key: the key of the selected data set
:param data_dict: data of the selected key
"""
selected_data_dict = data_dict[selected_key]
for chan_id in selected_data_dict:
selected_data_dict[chan_id]['fullData'] = False
del_keys = ['chan_db_info', 'times', 'data', 'ax', 'ax_wf']
for k in del_keys:
try:
del selected_data_dict[chan_id][k]
except KeyError:
pass
......@@ -47,7 +47,10 @@ def check_reftek_header(
if chan_id not in cur_data_dict:
cur_data_dict[chan_id] = {'tracesInfo': [],
'samplerate': samplerate}
if trace.stats.npts == 0:
# this trace isn't available to prevent bug when creating memmap
# with no data
continue
if (starttime <= trace.stats['starttime'] <= endtime or
starttime <= trace.stats['endtime'] <= endtime):
avail_trace_indexes.append(index)
......
def decode_int16(buffer, unpacker):
requested_bytes = buffer[:2]
return unpacker.unpack('h', requested_bytes)[0]
def decode_int24(buffer, unpacker):
requested_bytes = buffer[:3]
byte_order = 'big' if unpacker.byte_order_char == '>' else 'little'
# We delegate to int.from_bytes() because it takes a lot of work to make
# struct.unpack() handle signed 24-bits integers.
# See: https://stackoverflow.com/questions/3783677/how-to-read-integers-from-a-file-that-are-24bit-and-little-endian-using-python # noqa
return int.from_bytes(requested_bytes, byte_order)
def decode_int32(buffer, unpacker):
requested_bytes = buffer[:4]
return unpacker.unpack('i', requested_bytes)[0]
def decode_ieee_float(buffer, unpacker):
requested_bytes = buffer[:4]
return unpacker.unpack('f', requested_bytes)[0]
def decode_ieee_double(buffer, unpacker):
requested_bytes = buffer[:8]
return unpacker.unpack('d', requested_bytes)[0]
def decode_steim(buffer, unpacker):
# The first 4 bytes in a Steim frame is metadata of the record. Since we
# aren't decompressing the data, we are skipping. The next 4 bytes contain
# the first data point of the MSEED data record, which is what we need.
requested_bytes = buffer[4:8]
return unpacker.unpack('i', requested_bytes)[0]
"""
MSeed object to hold and process MSeed data
"""
import os
import re
import traceback
from pathlib import Path
from typing import Dict, Tuple, List
from sohstationviewer.controller.util import validate_file, validate_dir
from sohstationviewer.model.mseed_data.mseed_reader import MSeedReader
from sohstationviewer.model.general_data.general_data import \
GeneralData, ThreadStopped, ProcessingDataError
from sohstationviewer.view.util.enums import LogType
from sohstationviewer.model.mseed_data.mseed_helper import \
retrieve_nets_from_data_dict, read_text
from sohstationviewer.model.mseed_data.record_reader_helper import \
MSeedReadError
class MSeed(GeneralData):
"""
read and process mseed file into object with properties can be used to
plot SOH data, mass position data, waveform data and gaps
"""
def __init__(self, *args, **kwargs):
# FROM mseed.mseed.MSEED.__init__
super().__init__(*args, **kwargs)
self.nets_by_sta: Dict[str, List[str]] = {}
self.processing_data()
def finalize_data(self):
"""
CHANGED FROM mseed.mseed.MSEED.finalize_data
This function should be called after all folders finish reading to
+ get nets_by_sta from stream_header_by_key_chan
+ other tasks in super().finalize_data()
"""
self.distribute_log_text_to_station()
self.retrieve_nets_from_data_dicts()
super().finalize_data()
def read_folder(self, folder: str) -> Tuple[Dict]:
"""
CHANGED FROM mseed.mseed.MSEED.read_folder
Read data streams for soh, mass position and waveform.
:param folder: absolute path to data set folder
:return waveform_data: waveform data by station
:return soh_data: soh data by station
:return mass_pos_data: mass position data by station
:return gaps: gap list by station
:return nets_by_sta: netcodes list by station
"""
if not os.path.isdir(folder):
raise ProcessingDataError(f"Path '{folder}' not exist")
count = 0
total = sum([len(files) for _, _, files in os.walk(folder)])
invalid_blockettes = False
not_mseed_files = []
for path, sub_dirs, files in os.walk(folder):
try:
validate_dir(path)
except Exception as e:
# skip Information folder
self.track_info(str(e), LogType.WARNING)
continue
for file_name in files:
if self.creator_thread.isInterruptionRequested():
raise ThreadStopped()
path2file = Path(path).joinpath(file_name)
if not validate_file(path2file, file_name):
continue
count += 1
if count % 10 == 0:
self.track_info(
f'Read {count} files/{total}', LogType.INFO)
log_text = read_text(path2file)
if log_text is not None:
self.log_texts[path2file] = log_text
continue
reader = MSeedReader(
path2file,
read_start=self.read_start,
read_end=self.read_end,
is_multiplex=self.is_multiplex,
req_soh_chans=self.req_soh_chans,
req_wf_chans=self.req_wf_chans,
include_mp123zne=self.include_mp123zne,
include_mp456uvw=self.include_mp456uvw,
soh_data=self.soh_data,
mass_pos_data=self.mass_pos_data,
waveform_data=self.waveform_data,
log_data=self.log_data,
gap_minimum=self.gap_minimum)
try:
reader.read()
invalid_blockettes = (invalid_blockettes
or reader.invalid_blockettes)
except MSeedReadError:
not_mseed_files.append(file_name)
except Exception:
fmt = traceback.format_exc()
self.track_info(f"Skip file {path2file} can't be read "
f"due to error: {str(fmt)}",
LogType.WARNING)
if not_mseed_files:
self.track_info(
f"Not MSeed files: {not_mseed_files}", LogType.WARNING)
if invalid_blockettes:
# This check to only print out message once
print("We currently only handle blockettes 500, 1000,"
" and 1001.")
self.track_info(
f'Skipped {total - count} invalid files.', LogType.INFO)
def retrieve_nets_from_data_dicts(self):
"""
Going through stations of each data_dict to get all network codes found
in all channel of a station to add to nets_by_station.
"""
retrieve_nets_from_data_dict(self.soh_data, self.nets_by_sta)
retrieve_nets_from_data_dict(self.mass_pos_data, self.nets_by_sta)
retrieve_nets_from_data_dict(self.waveform_data, self.nets_by_sta)
def select_key(self) -> str:
"""
CHANGED FROM mseed.mseed.MSEED:
+ get sta_ids from self.keys
+ add condition if not on_unittest to create unittest for mseed
:return selected_sta_id: the selected station id from available
key of stream header.
+ If there is only one station id, return it.
+ If there is more than one, show all ids, let user choose one to
return.
"""
self.keys = sorted(list(set(
list(self.soh_data.keys()) +
list(self.mass_pos_data.keys()) +
list(self.waveform_data.keys()) +
[k for k in list(self.log_data.keys()) if k != 'TEXT']
)))
sta_ids = self.keys
if len(sta_ids) == 0:
return
selected_sta_id = sta_ids[0]
if not self.on_unittest and len(sta_ids) > 1:
msg = ("There are more than one stations in the given data.\n"
"Please select one to display")
self.pause_signal.emit(msg, sta_ids)
self.pause()
selected_sta_id = sta_ids[self.pause_response]
self.track_info(f'Select Station {selected_sta_id}', LogType.INFO)
return selected_sta_id
def distribute_log_text_to_station(self):
"""
Loop through paths to text files to look for station id in the path.
+ If there is station id in the path, add the content to the
station id with channel 'TXT'.
+ if station id not in the path, add the content to the key 'TEXT'
which means don't know the station for these texts.
"""
for path2file in self.log_texts:
try:
file_parts = re.split(rf"{os.sep}|\.", path2file.as_posix())
sta = [s for s in self.keys if s in file_parts][0]
except IndexError:
self.log_data['TEXT'].append(self.log_texts[path2file])
continue
if 'TXT' not in self.log_data[sta]:
self.log_data[sta]['TXT'] = []
self.log_data[sta]['TXT'].append(self.log_texts[path2file])
# Functions that change from handling_data's functions
import os
from pathlib import Path
from typing import Union, List, Dict
def retrieve_nets_from_data_dict(data_dict: Dict,
nets_by_sta: Dict[str, List[str]]) -> None:
"""
Retrieve nets by sta_id from the given data_dict.
:param data_dict: dict of data by station
:param nets_by_sta: nets list by sta_id
"""
for sta_id in data_dict.keys():
if sta_id not in nets_by_sta:
nets_by_sta[sta_id] = set()
for c in data_dict[sta_id]:
nets_by_sta[sta_id].update(
data_dict[sta_id][c]['nets'])
def read_text(path2file: Path) -> Union[bool, str]:
"""
CHANGED FROM handling_data.read_text:
+ Don't need to check binary because UnicodeDecodeError caught means
the file is binary
Read text file and add to log_data under channel TEXT.
+ Raise exception if the file isn't a text file
+ Remove empty lines in content
:param path2file: str - absolute path to text file
:param file_name: str - name of text file
:param text_logs: holder to keep log string, refer to
DataTypeModel.__init__.log_data['TEXT']
"""
try:
with open(path2file, 'r') as file:
content = file.read().strip()
except UnicodeDecodeError:
return
if content != '':
# skip empty lines
no_empty_line_list = [
line for line in content.splitlines() if line]
no_empty_line_content = os.linesep.join(no_empty_line_list)
log_text = "\n\n** STATE OF HEALTH: %s\n" % path2file.name
log_text += no_empty_line_content
else:
log_text = ''
return log_text
from numbers import Real
from typing import BinaryIO, Optional, Dict, Union, List
from pathlib import Path
from obspy import UTCDateTime
from sohstationviewer.model.mseed_data.record_reader import RecordReader
from sohstationviewer.model.mseed_data.record_reader_helper import \
RecordMetadata
from sohstationviewer.model.mseed_data.mseed_reader_helper import check_chan
def move_to_next_record(file, current_record_start: int,
record: RecordReader):
"""
Move the current position of file to next record
:param current_record_start: the start position of the current record
:param reader: the record that is reading
"""
# MSEED stores the size of a data record as an exponent of a
# power of two, so we have to convert that to actual size before
# doing anything else.
record_length_exp = record.header_unpacker.unpack(
'B', record.blockette_1000.record_length
)[0]
record_size = 2 ** record_length_exp
file.seek(current_record_start)
file.seek(record_size, 1)
class MSeedReader:
def __init__(self, file_path: Path,
read_start: float = UTCDateTime(0).timestamp,
read_end: float = UTCDateTime().timestamp,
is_multiplex: Optional[bool] = None,
req_soh_chans: List[str] = [],
req_wf_chans: List[str] = [],
include_mp123zne: bool = False,
include_mp456uvw: bool = False,
soh_data: Dict = {},
mass_pos_data: Dict = {},
waveform_data: Dict = {},
log_data: Dict[str, Union[List[str],
Dict[str, List[str]]]] = {},
gap_minimum: Optional[float] = None
) -> None:
"""
The object of the class is to read data from given file to add
to given stream if meet requirements.
If data_type is not multiplex, all records of a file are belong to the
same channel; the info found from the first record can
be used to determine to keep reading if the first one doesn't meet
channel's requirement.
If data_type is multiplex, every record have to be examined.
All data_dicts' definition can be found in data_dict_structures.MD
:param file_path: Absolute path to data file
:param read_start: time that is required to start reading
:param read_end: time that is required to end reading
:param is_multiplex: multiplex status of the file's data_type
:param req_soh_chans: requested SOH channel list
:param req_wf_chans: requested waveform channel list
:param include_mp123zne: if mass position channels 1,2,3 are requested
:param include_mp456uvw: if mass position channels 4,5,6 are requested
:param soh_data: data dict of SOH
:param mass_pos_data: data dict of mass position
:param waveform_data: data dict of waveform
:param log_data: data dict of log_data
:param gap_minimum: minimum length of gaps required to detect
from record
"""
self.read_start = read_start
self.read_end = read_end
self.is_multiplex = is_multiplex
self.gap_minimum = gap_minimum
self.req_soh_chans = req_soh_chans
self.req_wf_chans = req_wf_chans
self.include_mp123zne = include_mp123zne
self.include_mp456uvw = include_mp456uvw
self.soh_data = soh_data
self.mass_pos_data = mass_pos_data
self.waveform_data = waveform_data
self.log_data = log_data
self.file_path = file_path
self.file: BinaryIO = open(file_path, 'rb')
self.invalid_blockettes = False,
def get_data_dict(self, metadata: RecordMetadata) -> Dict:
"""
Find which data_dict to add data to from req_soh_chans, req_wf_chans,
include_mp123zne, include_mp456uvw, samplerate
:param metadata: record's metadata
:return: data_dict to add data
"""
chan_id = metadata.channel
sample_rate = metadata.sample_rate
chan_type = check_chan(chan_id, self.req_soh_chans, self.req_wf_chans,
self.include_mp123zne, self.include_mp456uvw)
if chan_type == 'SOH':
if self.req_soh_chans == [] and sample_rate > 1:
# If 'All chans' is selected for SOH, channel with samplerate>1
# will be skipped by default to improve performance.
# Note: If user intentionally added channels with samplerate>1
# using SOH Channel Preferences dialog, they are still read.
return
return self.soh_data
if chan_type == 'MP':
return self.mass_pos_data
if chan_type == 'WF':
return self.waveform_data
def check_time(self, record: RecordReader) -> bool:
"""
Check if record time in the time range that user require to read
:param record: the record read from file
:return: True when record time satisfy the requirement
"""
meta = record.record_metadata
if self.read_start > meta.end_time or self.read_end < meta.start_time:
return False
return True
def append_log(self, record: RecordReader) -> None:
"""
Add all text info retrieved from record to log_data
:param record: the record read from file
"""
logs = [record.ascii_text] + record.other_blockettes
log_str = "===========\n".join(logs)
if log_str == "":
return
meta = record.record_metadata
log_str = "\n\nSTATE OF HEALTH: " + \
f"From:{meta.start_time} To:{meta.end_time}\n" + log_str
sta_id = meta.station
chan_id = meta.channel
if sta_id not in self.log_data.keys():
self.log_data[sta_id] = {}
if chan_id not in self.log_data[sta_id]:
self.log_data[sta_id][chan_id] = []
self.log_data[sta_id][chan_id].append(log_str)
def append_data(self, data_dict: dict,
record: RecordReader,
data_point: Real) -> None:
"""
Append data point to the given data_dict
:param data_dict: the data dict to add data get from record
:param record: the record read from file
:param data_point: the first sample of the record frame
"""
if data_point is None:
return
meta = record.record_metadata
sta_id = meta.station
if sta_id not in data_dict.keys():
data_dict[sta_id] = {}
station = data_dict[sta_id]
self.add_chan_data(station, meta, data_point)
def _add_new_trace(self, channel: Dict, metadata: RecordMetadata,
data_point: Real) -> None:
"""
Start a new trace to channel['tracesInfo'] with data_point as
the first data value and metadata's start_time as first time value
:param channel: dict of channel's info
:param metadata: record's meta data
:param data_point: the first sample of the record frame
"""
channel['tracesInfo'].append({
'startTmEpoch': metadata.start_time,
'data': [data_point],
'times': [metadata.start_time]
})
def _append_trace(self, channel, metadata, data_point):
"""
Appending data_point to the latest trace of channel['tracesInfo']
:param channel: dict of channel's info
:param metadata: record's meta data
:param data_point: the first sample of the record frame
"""
channel['tracesInfo'][-1]['data'].append(data_point)
channel['tracesInfo'][-1]['times'].append(metadata.start_time)
def add_chan_data(self, station: dict, metadata: RecordMetadata,
data_point: Real) -> None:
"""
Add new channel to the passed station and append data_point to the
channel if there's no gap/overlap or start a new trace of data
when there's a gap.
If gap/overlap > gap_minimum, add to gaps list.
:param station: dict of chan by id of a station
:param metadata: an Object of metadata from the record
:param data_point: the first sample of the record frame
"""
meta = metadata
chan_id = metadata.channel
if chan_id not in station.keys():
station[chan_id] = {
'file_path': self.file_path,
'chanID': chan_id,
'samplerate': meta.sample_rate,
'startTmEpoch': meta.start_time,
'endTmEpoch': meta.end_time,
'size': meta.sample_count,
'nets': {meta.network},
'gaps': [],
'tracesInfo': [{
'startTmEpoch': meta.start_time,
'endTmEpoch': meta.end_time,
'data': [data_point],
'times': [meta.start_time]
}]
}
else:
channel = station[chan_id]
record_start_time = meta.start_time
previous_end_time = channel['endTmEpoch']
delta = abs(record_start_time - previous_end_time)
if channel['file_path'] != self.file_path:
# Start new trace for each file to reorder trace and
# combine traces again later
channel['file_path'] = self.file_path
self._add_new_trace(channel, meta, data_point)
else:
if self.gap_minimum is not None and delta >= self.gap_minimum:
gap = [previous_end_time, record_start_time]
channel['gaps'].append(gap)
# appending data
self._append_trace(channel, meta, data_point)
channel['tracesInfo'][-1]['endTmEpoch'] = meta.end_time
# update channel's metadata
channel['endTmEpoch'] = meta.end_time
channel['size'] += meta.sample_count
channel['nets'].add(meta.network)
def read(self):
while 1:
# We know that end of file is reached when read() returns an empty
# string.
is_eof = (self.file.read(1) == b'')
if is_eof:
break
# We need to move the file pointer back to its position after we
# do the end of file check. Otherwise, we would be off by one
# byte for all the reads afterward.
self.file.seek(-1, 1)
# We save the start of the current record so that after we are
# done reading the record, we can move back. This makes moving
# to the next record a lot easier, seeing as we can simply move
# the file pointer a distance the size of the current record.
current_record_start = self.file.tell()
record = RecordReader(self.file)
if record.invalid_blockettes:
self.invalid_blockettes = True
if not self.check_time(record):
move_to_next_record(
self.file, current_record_start, record)
continue
data_dict = self.get_data_dict(record.record_metadata)
if data_dict is None:
if self.is_multiplex:
move_to_next_record(
self.file, current_record_start, record)
continue
else:
break
first_data_point = record.get_first_data_point()
self.append_data(data_dict, record, first_data_point)
self.append_log(record)
move_to_next_record(self.file, current_record_start, record)
self.file.close()
# ALL FUNCTIONS IN THIS FILE ARE FROM HANDLING DATA. NO NEED TO REVIEW
import re
from typing import Tuple, List, Union
from sohstationviewer.conf.dbSettings import dbConf
def check_chan(chan_id: str, req_soh_chans: List[str], req_wf_chans: List[str],
include_mp123zne: bool, include_mp456uvw: bool) \
-> Union[str, bool]:
"""
Check if chanID is a requested channel.
:param chan_id: str - channel ID
:param req_soh_chans: list of str - requested SOH channels
:param req_wf_chans: list of str - requested waveform channels
:param include_mp123zne: if mass position channels 1,2,3 are requested
:param include_mp456uvw: if mass position channels 4,5,6 are requested
:return: str/bool -
'WF' if chanID is a requested waveform channel,
'SOH' if chanID is a requested SOH channel,
'MP' if chanID is a requested mass position channel
False otherwise.
"""
if chan_id.startswith('VM'):
if (not include_mp123zne and
chan_id[-1] in ['1', '2', '3', 'Z', 'N', 'E']):
return False
if (not include_mp456uvw
and chan_id[-1] in ['4', '5', '6', 'U', 'V', 'W']):
return False
return 'MP'
ret = check_wf_chan(chan_id, req_wf_chans)
if ret[0] == 'WF':
if ret[1]:
return "WF"
else:
return False
if check_soh_chan(chan_id, req_soh_chans):
return "SOH"
return False
def check_soh_chan(chan_id: str, req_soh_chans: List[str]) -> bool:
"""
Check if chan_id is a requested SOH channel.
Mass position is always included.
This function is used for mseed only so mass position is 'VM'.
If there is no reqSOHChans, it means all SOH channels are requested
:param chan_id: str - channel ID
:param req_soh_chans: list of str - requested SOH channels
:return: bool - True if chan_id is a requested SOH channel. False otherwise
"""
if req_soh_chans == []:
return True
if chan_id in req_soh_chans:
return True
if 'EX?' in req_soh_chans and chan_id.startswith('EX'):
if chan_id[2] in ['1', '2', '3']:
return True
# TODO: remove mass position channels from SOH
if chan_id.startswith('VM'):
if chan_id[2] in ['0', '1', '2', '3', '4', '5', '6']:
return True
return False
def check_wf_chan(chan_id: str, req_wf_chans: List[str]) -> Tuple[str, bool]:
"""
Check if chanID is a waveform channel and is requested by user
:param chan_id: str - channel ID
:param req_wf_chans: list of str - requested waveform channels
:return wf: str - '' if chan_id is not a waveform channel.
'WF' if chan_id is a waveform channel.
:return has_chan: bool - True if chan_id is a requested waveform channel.
"""
if not dbConf['seisRE'].match(chan_id):
return '', False
for req in req_wf_chans:
if len(req) == 1:
req = req.replace('*', '...')
elif len(req) == 2:
req = req.replace('*', '..')
elif len(req) == 3:
req = req.replace('*', '.')
if re.compile(f'^{req}$').match(chan_id):
return 'WF', True
return 'WF', False
from numbers import Real
from typing import BinaryIO, Optional, List
from obspy import UTCDateTime
from sohstationviewer.model.mseed_data.decode_mseed import (
decode_ieee_float, decode_ieee_double, decode_steim, decode_int16,
decode_int24, decode_int32,
)
from sohstationviewer.model.mseed_data.record_reader_helper import (
FixedHeader, Blockette1000, get_data_endianness, Unpacker,
get_record_metadata, get_header_endianness, RecordMetadata,
EncodingFormat,
)
class RecordReader:
"""
This class reads one data record from an MSEED file.
"""
def __init__(self, file: BinaryIO) -> None:
# The MSEED file object to read from. The file pointer needs to be
# located at the start of a data record.
self.file = file
self.fixed_header: Optional[FixedHeader] = None
self.blockette_1000: Optional[Blockette1000] = None
self.other_blockettes: List[str] = []
# Utility object that helps unpack byte strings in the header (the
# fixed header and the blockettes).
# Separate from the one for data in case the header has a different
# byte order.
# TODO: change blockettes to use this unpacker as well.
self.header_unpacker: Unpacker = Unpacker()
self.data_unpacker: Unpacker = Unpacker()
self.record_metadata: Optional[RecordMetadata] = None
self.invalid_blockettes = False
self.ascii_text: str = ''
self.read_header()
def read_header(self) -> None:
"""
Read the header of the current data record. The header includes the
fixed portion, blockette 1000, and any blockettes that follow.
"""
# Save the start of the record so that we can go back after reading the
# header.
record_start = self.file.tell()
self.read_fixed_header()
self.read_blockette_1000()
header_endianness = get_header_endianness(self.fixed_header)
if header_endianness == 'little':
self.header_unpacker.byte_order_char = '<'
else:
self.header_unpacker.byte_order_char = '>'
data_endianness = get_data_endianness(self.blockette_1000)
if data_endianness == 'little':
self.data_unpacker.byte_order_char = '<'
else:
self.data_unpacker.byte_order_char = '>'
self.record_metadata = get_record_metadata(self.fixed_header,
self.header_unpacker)
self.apply_time_correction()
self.read_blockettes()
self.file.seek(record_start)
def read_fixed_header(self) -> None:
"""
Read the fixed header of the current data record and store it in
self.fixed_header.
"""
byte_counts = [6, 1, 1, 5, 2, 3, 2, 10, 2, 2, 2, 1, 1, 1, 1, 4, 2, 2]
fixed_header_sections_values = []
for byte_count in byte_counts:
fixed_header_sections_values.append(self.file.read(byte_count))
self.fixed_header = FixedHeader(*fixed_header_sections_values)
def read_blockette_500(self) -> None:
"""
Read blockette 500 and format its content. The result is stored for
future uses. Assumes that the file pointer is at the start of the
blockette.
"""
blockette_content = {}
# Skip the first four bytes because they contain meta-information about
# the blockettes.
self.file.read(4)
vco_correction = self.file.read(4)
blockette_content['VCO correction'] = self.header_unpacker.unpack(
'f', vco_correction
)[0]
exception_time_bytes = self.file.read(10)
exception_time_tuple = self.header_unpacker.unpack(
'HHBBBBH', exception_time_bytes)
exception_time = UTCDateTime(year=exception_time_tuple[0],
julday=exception_time_tuple[1],
hour=exception_time_tuple[2],
minute=exception_time_tuple[3],
second=exception_time_tuple[4],
microsecond=exception_time_tuple[6] * 100)
blockette_content['Time of exception'] = exception_time.strftime(
'%Y:%j:%H:%M:%S:%f'
)
microsecond = self.file.read(1)
microsecond = self.header_unpacker.unpack('B', microsecond)[0]
start_time_adjustment = microsecond / (10 ** 6)
self.record_metadata.start_time += start_time_adjustment
blockette_content['Micro sec'] = microsecond
reception_quality = self.file.read(1)
blockette_content['Reception Quality'] = self.header_unpacker.unpack(
'B', reception_quality
)[0]
exception_count = self.file.read(4)
blockette_content['Exception Count'] = self.header_unpacker.unpack(
'I', exception_count
)[0]
exception_type = self.file.read(16)
blockette_content['Exception Type'] = self.header_unpacker.unpack(
'16s', exception_type
)[0].decode('utf-8').strip()
clock_model = self.file.read(32)
blockette_content['Clock Model'] = self.header_unpacker.unpack(
'32s', clock_model
)[0].decode('utf-8').strip()
clock_status = self.file.read(128)
blockette_content['Clock Status'] = self.header_unpacker.unpack(
'128s', clock_status
)[0].decode('utf-8').strip()
formatted_blockette = '\n'.join([f'{key}: {value}'
for key, value
in blockette_content.items()])
self.other_blockettes.append(formatted_blockette)
def read_blockette_1000(self) -> None:
"""
Read blockette 1000 of the current data record and store it in
self.blockette_1000.
"""
blockette_1000_section_lengths = [2, 2, 1, 1, 1, 1]
blockette_1000_values = []
for section_length in blockette_1000_section_lengths:
blockette_1000_values.append(self.file.read(section_length))
self.blockette_1000 = Blockette1000(*blockette_1000_values)
def read_blockette_1001(self) -> None:
"""
Read blockette 1001. The only valuable thing in this blockette is the
more precise start time. Assumes that the file pointer is at the start
of the blockette.
"""
self.file.read(5)
start_time_microsecond = self.file.read(1)
start_time_microsecond = self.header_unpacker.unpack(
'b', start_time_microsecond
)[0]
# Convert from microsecond to second so that UTCDateTime can handle it.
start_time_microsecond /= (10 ** 6)
self.record_metadata.start_time += start_time_microsecond
self.file.read(2)
def read_blockette_2000(self) -> None:
pass
def apply_time_correction(self) -> None:
"""
Apply the time correction found in the fixed header to the start time.
"""
# format() is used here instead of bin() because we need to pad the
# resulting bit string with 0 to the left.
activity_flags = format(
self.header_unpacker.unpack(
'B', self.fixed_header.activity_flags)[0],
'0>8b'
)
is_time_correction_applied = int(activity_flags[1])
if is_time_correction_applied:
return
time_correction = self.header_unpacker.unpack(
'L', self.fixed_header.time_correction
)[0]
# We need to convert the unit from 0.0001 seconds to seconds
time_correction *= 0.0001
self.record_metadata.start_time += time_correction
def read_blockettes(self) -> None:
"""
Read all the blockettes in the current data record aside from blockette
1000, which has beem read previously. Currently only handle blockettes
500, 1001, and 2000.
"""
blockette_count = self.header_unpacker.unpack(
'B', self.fixed_header.blockette_count
)[0]
for i in range(1, blockette_count):
# All blockettes store their type in the first two bytes, so we
# read that to determine what to do
next_blockette_type = self.file.read(2)
# Move file pointer back to start of blockette
self.file.seek(-2, 1)
next_blockette_type = self.header_unpacker.unpack(
'H', next_blockette_type
)[0]
if next_blockette_type not in (500, 1000, 1001):
self.invalid_blockettes = True
continue
if next_blockette_type == 500:
self.read_blockette_500()
elif next_blockette_type == 1001:
self.read_blockette_1001()
elif next_blockette_type == 2000:
self.read_blockette_2000()
def decode_ascii_data(self, data_start: int):
"""
Read ASCII string from data portion of the record but remove the
padding
:param data_start: Byte number where the data start
"""
# We want to read everything in the record if the encoding is
# ASCII.
record_length_exp = self.header_unpacker.unpack(
'B', self.blockette_1000.record_length
)[0]
record_size = 2 ** record_length_exp
data_block = self.file.read(record_size - data_start)
single_padding = b'\x00'.decode()
try:
self.ascii_text = data_block.decode().rstrip(single_padding)
except UnicodeDecodeError:
pass
def get_first_data_point(self) -> Optional[Real]:
"""
Get the first data point of the current data record.
:return: the first data point of the current data record, whose type is
determined based on the encoding type stored in blockette 1000.
"""
record_start = self.file.tell()
data_start = self.header_unpacker.unpack(
'H', self.fixed_header.data_offset
)[0]
# The data start byte is defined as an offset from the start of the
# data record. Seeing as we should be at the start of the data record
# by seeking there at the end of every major step, we can simply seek
# to the start of the data.
self.file.seek(data_start, 1)
encoding_format = self.blockette_1000.encoding_format
encoding_format = self.header_unpacker.unpack('b', encoding_format)[0]
encoding_format = EncodingFormat(encoding_format)
if encoding_format == EncodingFormat.ASCII:
self.decode_ascii_data(data_start)
first_data_point = None
else:
# Currently, we are extracting only the first data point in each
# record. The smallest possible amount of bytes we can extract
# while guaranteeing that we get the first data point in the
# record is 8, with Steim encodings and IEEE double precision
# float needing to use the whole buffer.
buffer = self.file.read(8)
encoding_to_decoder = {
EncodingFormat.INT_16_BIT: decode_int16,
EncodingFormat.INT_24_BIT: decode_int24,
EncodingFormat.INT_32_BIT: decode_int32,
EncodingFormat.IEEE_FLOAT_32_BIT: decode_ieee_float,
EncodingFormat.IEEE_FLOAT_64_BIT: decode_ieee_double,
EncodingFormat.STEIM_1: decode_steim,
EncodingFormat.STEIM_2: decode_steim,
}
first_data_point = encoding_to_decoder[encoding_format](
buffer, self.data_unpacker
)
# Seek back to the start of the record so we can call this method again
# if needed.
self.file.seek(record_start)
return first_data_point
from dataclasses import dataclass
import struct
from enum import Enum
from obspy import UTCDateTime
class MSeedReadError(Exception):
def __init__(self, msg):
self.message = msg
class Unpacker:
"""
A wrapper around struct.unpack() to unpack binary data without having to
explicitly define the byte order in the format string. Also restrict the
type of format to str and buffer to bytes.
"""
def __init__(self, byte_order_char: str = '') -> None:
self.byte_order_char = byte_order_char
def unpack(self, format: str, buffer: bytes):
"""
Unpack a string of bytes into a tuple of values based on the given
format
:param format: the format used to unpack the byte string
:param buffer: the byte string
:return: a tuple containing the unpacked values.
"""
default_byte_order_chars = ('@', '=', '>', '<', '!')
if format.startswith(default_byte_order_chars):
format = self.byte_order_char + format[:1]
else:
format = self.byte_order_char + format
return struct.unpack(format, buffer)
@dataclass
class FixedHeader:
"""
The fixed portion of the header of a data record. All fields are stored as
bytes to minimize time wasted on decoding unused values.
"""
sequence_number: bytes
data_header_quality_indicator: bytes
reserved: bytes
station: bytes
location: bytes
channel: bytes
net_code: bytes
record_start_time: bytes
sample_count: bytes
sample_rate_factor: bytes
sample_rate_multiplier: bytes
activity_flags: bytes
io_and_clock_flags: bytes
data_quality_flags: bytes
blockette_count: bytes
time_correction: bytes
data_offset: bytes
first_blockette_offset: bytes
@dataclass
class Blockette1000:
"""
Blockette 100 of a data record. All fields are stored as bytes to minimize
time wasted on decoding unused values.
"""
blockette_type: bytes
next_blockette_offset: bytes
encoding_format: bytes
byte_order: bytes
record_length: bytes
reserved_byte: bytes
@dataclass
class RecordMetadata:
"""
The metadata of a data record.
"""
station: str
location: str
channel: str
network: str
start_time: float
end_time: float
sample_count: int
sample_rate: float
class EncodingFormat(Enum):
ASCII = 0
INT_16_BIT = 1
INT_24_BIT = 2
INT_32_BIT = 3
IEEE_FLOAT_32_BIT = 4
IEEE_FLOAT_64_BIT = 5
STEIM_1 = 10
STEIM_2 = 11
def check_time_from_time_string(endian, time_string):
try:
record_start_time_tuple = struct.unpack(f'{endian}hhbbbbh',
time_string)
except struct.error:
raise MSeedReadError("Not an MSeed file.")
# libmseed uses 1900 to 2100 as the sane year range. We follow their
# example here.
year_is_good = (1900 <= record_start_time_tuple[0] <= 2100)
# The upper range is 366 to account for leap years.
day_is_good = (1 <= record_start_time_tuple[1] <= 366)
return year_is_good and day_is_good
def get_header_endianness(header: FixedHeader):
"""
Determine the endianness of the fixed header of a data record. Works by
checking if the decoded record start time has a sane value if the header
is assumed to be big-endian.
WARNING: This check fails on three dates: 2056-1, 2056-256, and 2056-257.
2056 is a palindrome when encoded as a pair of octets, so endianness does
not affect it. Similarly, 257 is also 2-octet-palindromic. Meanwhile, 1 and
256 are counterparts when encoded as pairs of octets. Because they are both
valid values for day of year, it is impossible to make a conclusion about
endianness based on day of year if it is either 1 or 256 in big-endian.
These facts combined means that we cannot determine the endianness of the
header whose record starts on the aforementioned dates. The way this
function was written, the endianness will be recorded as big in these
cases. This problem is also recorded in libmseed.
:param header: the fixed header of the data record
:return: either of the string 'big' or 'little' depending on the extracted
endianness of header
"""
record_start_time_string = header.record_start_time
good_time = check_time_from_time_string('>', record_start_time_string)
if good_time:
endianness = 'big'
else:
good_time = check_time_from_time_string('<', record_start_time_string)
if good_time:
endianness = 'little'
else:
raise MSeedReadError("Not an MSeed file.")
return endianness
def get_data_endianness(blockette_1000: Blockette1000):
"""
Get endianness of a data record by examining blockette 1000.
:param blockette_1000: the blockette 1000 of the data record.
"""
# The byte order is only one byte so using big or little endian does not
# matter.
blockette_1000_endianness = int.from_bytes(
blockette_1000.byte_order, 'big'
)
if blockette_1000_endianness:
return 'big'
else:
return 'little'
def calculate_sample_rate(factor: int, multiplier: int) -> float:
"""
Calculate the sample rate using the sample rate factor and multiplier. This
algorithm is described around the start of Chapter 8 in the SEED manual.
:param factor: the sample rate factor
:param multiplier: the sample rate multiplier
:return: the nominal sample rate
"""
sample_rate = 0
if factor == 0:
sample_rate = 0
elif factor > 0 and multiplier > 0:
sample_rate = factor * multiplier
elif factor > 0 and multiplier < 0:
sample_rate = -(factor / multiplier)
elif factor < 0 and multiplier > 0:
sample_rate = -(multiplier / factor)
elif factor < 0 and multiplier < 0:
sample_rate = 1 / (factor * multiplier)
return sample_rate
def get_record_metadata(header: FixedHeader, header_unpacker: Unpacker):
"""
Extract and parse the metadata of a data record from its fixed header.
:param header: the fixed header of the data record
:param header_unpacker: the unpacker corresponding to the data record;
needed so that the correct byte order can be used
:return: the extract record metadata
"""
try:
station = header.station.decode('utf-8').rstrip()
location = header.location.decode('utf-8').rstrip()
channel = header.channel.decode('utf-8').rstrip()
network = header.net_code.decode('utf-8').rstrip()
record_start_time_string = header.record_start_time
record_start_time_tuple = header_unpacker.unpack(
'HHBBBBH', record_start_time_string)
record_start_time = UTCDateTime(year=record_start_time_tuple[0],
julday=record_start_time_tuple[1],
hour=record_start_time_tuple[2],
minute=record_start_time_tuple[3],
second=record_start_time_tuple[4],
microsecond=record_start_time_tuple[
6] * 100).timestamp
sample_count = header_unpacker.unpack('H', header.sample_count)[0]
sample_rate_factor = header_unpacker.unpack(
'h', header.sample_rate_factor
)[0]
sample_rate_multiplier = header_unpacker.unpack(
'h', header.sample_rate_multiplier
)[0]
except ValueError:
raise MSeedReadError("Not an MSeed file.")
sample_rate = calculate_sample_rate(sample_rate_factor,
sample_rate_multiplier)
if sample_rate == 0:
record_end_time = record_start_time
else:
record_time_taken = sample_count / sample_rate
record_end_time = record_start_time + record_time_taken
return RecordMetadata(station, location, channel, network,
record_start_time, record_end_time,
sample_count, sample_rate)
......@@ -68,21 +68,23 @@ class LogInfo():
# TT =2001:253:15:13:59:768 NS: 144005 SPS: 40 ETO: 0
parts = line.split()
data_stream = int(parts[5])
if data_stream not in self.parent.req_data_streams:
return (0, 0)
try:
if parts[8].startswith("00:000"):
if parts[11].startswith("00:000"):
return -1, 0
epoch, _ = get_time_6(parts[11])
if (self.req_data_streams == ['*'] or
data_stream in self.req_data_streams):
try:
if parts[8].startswith("00:000"):
if parts[11].startswith("00:000"):
return -1, 0
epoch, _ = get_time_6(parts[11])
else:
epoch, _ = get_time_6(parts[8])
except AttributeError:
self.parent.processing_log.append((line, LogType.ERROR))
return False
if epoch > 0:
self.min_epoch = min(epoch, self.min_epoch)
self.max_epoch = max(epoch, self.max_epoch)
else:
epoch, _ = get_time_6(parts[8])
except AttributeError:
self.parent.processing_log.append(line, LogType.ERROR)
return False
if epoch > 0:
self.min_epoch = min(epoch, self.min_epoch)
self.max_epoch = max(epoch, self.max_epoch)
return 0, 0
else:
return 0, 0
return epoch, data_stream
......@@ -203,10 +205,10 @@ class LogInfo():
return False
return epoch, disk, val
def read_dps_clock_diff(self, line: str
def read_dsp_clock_diff(self, line: str
) -> Union[bool, Tuple[float, float]]:
"""
Read DPS clock difference
Read DSP clock difference
:param line: str - a line of evt message
:return epoch: float - time when info is recorded
:return total: float - total difference time in milliseconds
......@@ -347,18 +349,17 @@ class LogInfo():
line = line.upper()
if 'FST' in line:
ret = self.read_evt(line)
if ret:
if ret is not False:
epoch, data_stream = ret
if data_stream in self.req_data_streams:
if epoch > 0:
chan_name = 'Event DS%s' % data_stream
self.add_chan_info(chan_name, epoch, 1, idx)
elif epoch == 0:
self.parent.processing_log.append(
line, LogType.WARNING)
else:
self.parent.processing_log.append(
line, LogType.ERROR)
if epoch > 0:
chan_name = 'Event DS%s' % data_stream
self.add_chan_info(chan_name, epoch, 1, idx)
elif epoch == 0:
self.parent.processing_log.append(
(line, LogType.WARNING))
else:
self.parent.processing_log.append(
(line, LogType.ERROR))
elif line.startswith("STATE OF HEALTH"):
epoch = self.read_sh_header(line)
......@@ -415,11 +416,11 @@ class LogInfo():
if epoch:
self.add_chan_info('Jerks/DSP Sets', epoch, 0, idx)
elif "DPS clock diff" in line:
ret = self.read_dps_clock_diff()
elif "DSP CLOCK DIFFERENCE" in line:
ret = self.read_dsp_clock_diff(line)
if ret:
epoch, total = ret
self.add_chan_info('DPS Clock Diff', epoch, total, idx)
self.add_chan_info('DSP Clock Diff', epoch, total, idx)
elif "ACQUISITION STARTED" in line:
epoch = self.simple_read(line)[1]
......@@ -457,7 +458,7 @@ class LogInfo():
elif "EXTERNAL CLOCK IS UNLOCKED" in line:
epoch = self.simple_read(line)[1]
if epoch:
self.add_chan_info('GPS Lk/Unlk', epoch, 0, idx)
self.add_chan_info('GPS Lk/Unlk', epoch, -1, idx)
elif "EXTERNAL CLOCK IS LOCKED" in line:
epoch = self.simple_read(line)[1]
if epoch:
......
......@@ -4,7 +4,7 @@ RT130 object to hold and process RefTek data
import os
from pathlib import Path
from typing import Tuple, List, Union
import traceback
import numpy as np
from sohstationviewer.model.reftek.from_rt2ms import (
......@@ -35,6 +35,11 @@ class RT130(DataTypeModel):
"""
self.req_data_streams: List[Union[int, str]] = self.req_wf_chans
"""
rt130_waveform_data_req: flag to create waveform data according to
req_data_stream
"""
self.rt130_waveform_data_req: bool = kwarg['rt130_waveform_data_req']
"""
found_data_streams: list of data streams found to help inform user
why the selected data streams don't show up
"""
......@@ -89,8 +94,15 @@ class RT130(DataTypeModel):
path2file = Path(path).joinpath(file_name)
if not validate_file(path2file, file_name):
continue
if not self.read_reftek_130(path2file):
read_text(path2file, file_name, self.log_data['TEXT'])
try:
if not self.read_reftek_130(path2file):
read_text(path2file, self.log_data['TEXT'])
except Exception:
fmt = traceback.format_exc()
self.track_info(f"Skip file {path2file} can't be read "
f"due to error: {str(fmt)}",
LogType.WARNING)
count += 1
if count % 50 == 0:
self.track_info(
......@@ -133,7 +145,13 @@ class RT130(DataTypeModel):
:param path2file: absolute path to file
"""
rt130 = core.Reftek130.from_file(path2file)
try:
rt130 = core.Reftek130.from_file(path2file)
except Exception:
fmt = traceback.format_exc()
self.track_info(f"Skip file {path2file} can't be read "
f"due to error: {str(fmt)}", LogType.WARNING)
return
unique, counts = np.unique(rt130._data["packet_type"],
return_counts=True)
nbr_packet_type = dict(zip(unique, counts))
......@@ -189,7 +207,9 @@ class RT130(DataTypeModel):
cur_key = (rt130._data[0]['unit_id'].decode(),
f"{rt130._data[0]['experiment_number']}")
self.populate_cur_key_for_all_data(cur_key)
self.get_ehet_in_log_data(rt130, cur_key)
if data_stream != 9:
# don't get event info for mass position
self.get_ehet_in_log_data(rt130, cur_key)
self.get_mass_pos_data_and_waveform_data(rt130, data_stream, cur_key)
def get_ehet_in_log_data(self, rt130: core.Reftek130,
......@@ -230,8 +250,10 @@ class RT130(DataTypeModel):
"""
if data_stream == 9:
cur_data_dict = self.mass_pos_data[cur_key]
else:
elif self.rt130_waveform_data_req:
cur_data_dict = self.waveform_data[cur_key]
else:
return
avail_trace_indexes = check_reftek_header(
rt130, cur_key, self.read_start, self.read_end,
......
......@@ -47,7 +47,7 @@ class ParamDialog(UiDBInfoDialog):
color_mode_label = QtWidgets.QLabel('Color mode:')
color_selector = QComboBox()
color_selector.insertItem(0, initial_color_mode)
other_color_modes = ALL_COLOR_MODES - {initial_color_mode}
other_color_modes = set(ALL_COLOR_MODES.keys()) - {initial_color_mode}
color_selector.insertItems(1, other_color_modes)
color_selector.setFixedWidth(100)
color_selector.currentTextChanged.connect(self.on_color_mode_changed)
......