Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
Commits on Source (6)
Showing
with 1301 additions and 177 deletions
......@@ -66,19 +66,20 @@ def display_tracking_info(tracking_box: QTextBrowser, text: str,
msg = {'text': text}
if type == LogType.ERROR:
msg['color'] = 'white'
msg['bgcolor'] = '#e46269'
msg['bgcolor'] = '#c45259'
elif type == LogType.WARNING:
msg['color'] = '#ffd966'
msg['bgcolor'] = 'orange'
msg['color'] = 'white'
msg['bgcolor'] = '#c4a347'
else:
msg['color'] = 'blue'
msg['bgcolor'] = 'white'
html_text = """<body>
<div style='color:%(color)s; background-color:%(bgcolor)s'>
%(text)s
<div style='color:%(color)s'>
<strong>%(text)s</strong>
</div>
</body>"""
tracking_box.setHtml(html_text % msg)
tracking_box.setStyleSheet(f"background-color: {msg['bgcolor']}")
# parent.update()
tracking_box.repaint()
......
......@@ -5,7 +5,7 @@ from sohstationviewer.database.process_db import execute_db_dict, execute_db
from sohstationviewer.conf.dbSettings import dbConf
def get_chan_plot_info(org_chan_id: str, chan_info: Dict, data_type: str,
def get_chan_plot_info(org_chan_id: str, data_type: str,
color_mode: ColorMode = 'B') -> Dict:
"""
Given chanID read from raw data file and detected dataType
......@@ -24,10 +24,10 @@ def get_chan_plot_info(org_chan_id: str, chan_info: Dict, data_type: str,
chan = 'VM?'
if org_chan_id.startswith('MassPos'):
chan = 'MassPos?'
if org_chan_id.startswith('DS'):
chan = 'SEISMIC'
if org_chan_id.startswith('Event DS'):
chan = 'Event DS?'
if org_chan_id.startswith('DS') and 'DSP' not in org_chan_id:
chan = 'DS?'
if org_chan_id.startswith('Disk Usage'):
chan = 'Disk Usage?'
if dbConf['seisRE'].match(chan):
......@@ -46,17 +46,13 @@ def get_chan_plot_info(org_chan_id: str, chan_info: Dict, data_type: str,
sql = (f"{o_sql} WHERE channel='{chan}' and C.param=P.param"
f" and dataType='{data_type}'")
chan_db_info = execute_db_dict(sql)
seismic_label = None
if len(chan_db_info) == 0:
chan_db_info = execute_db_dict(
f"{o_sql} WHERE channel='DEFAULT' and C.param=P.param")
else:
if chan_db_info[0]['channel'] == 'SEISMIC':
try:
chan_db_info[0]['label'] = dbConf['seisLabel'][org_chan_id[-1]]
except KeyError:
chan_db_info[0]['label'] = str(chan_info['samplerate'])
seismic_label = get_seismic_chan_label(org_chan_id)
chan_db_info[0]['channel'] = org_chan_id
chan_db_info[0]['label'] = (
......@@ -68,6 +64,8 @@ def get_chan_plot_info(org_chan_id: str, chan_info: Dict, data_type: str,
else chan_db_info[0]['fixPoint'])
if chan_db_info[0]['label'].strip() == '':
chan_db_info[0]['label'] = chan_db_info[0]['channel']
elif seismic_label is not None:
chan_db_info[0]['label'] = seismic_label
else:
chan_db_info[0]['label'] = '-'.join([chan_db_info[0]['channel'],
chan_db_info[0]['label']])
......@@ -76,30 +74,23 @@ def get_chan_plot_info(org_chan_id: str, chan_info: Dict, data_type: str,
return chan_db_info[0]
def get_wf_plot_info(org_chan_id: str, *args, **kwargs) -> Dict:
"""
:param org_chan_id: channel name read from data source
:param chan_info: to be compliant with get_chan_plot_info()
:param data_type: to be compliant with get_chan_plot_info()
:param color_mode: to be compliant with get_chan_plot_info()
:return info of channel read from DB which is used for plotting
"""
# Waveform plot's color is fixed to NULL in the database, so we do not need
# to get the valueColors columns from the database.
chan_info = execute_db_dict(
"SELECT param, plotType, height "
"FROM Parameters WHERE param='Seismic data'")
# The plotting API still requires that the key 'valueColors' is mapped to
# something, so we are setting it to None.
chan_info[0]['valueColors'] = None
chan_info[0]['label'] = get_chan_label(org_chan_id)
chan_info[0]['unit'] = ''
chan_info[0]['channel'] = 'SEISMIC'
chan_info[0]['convertFactor'] = 1
return chan_info[0]
def get_convert_factor(chan_id, data_type):
sql = f"SELECT convertFactor FROM Channels WHERE channel='{chan_id}' " \
f"AND dataType='{data_type}'"
ret = execute_db(sql)
if ret:
return ret[0][0]
else:
return None
def get_chan_label(chan_id):
def get_seismic_chan_label(chan_id):
"""
Get label for chan_id in which data stream can use chan_id for label while
other seismic need to add coordinate to chan_id for label
:param chan_id: name of channel
:return label: label to put in front of the plot of the channel
"""
if chan_id.startswith("DS"):
label = chan_id
else:
......
No preview for this file type
## Log data:
info from log channels, soh messages, text file in dict:
{'TEXT': [str,], key:{chan_id: [str,],},}
In which 'TEXT': is the chan_id given by sohview for text only files which have
no station or channel associate with it.
Note: log_data for RT130's dataset has only one channel: SOH
## data_dict:
{set_key: {
chan_id (str): {
'file_path' (str): path of file to keep track of file changes in MSeedReader
'chanID' (str): name of channel
'samplerate' (float): Sampling rate of the data
'startTmEpoch' (float): start epoch time of channel
'endTmEpoch' (float): end epoch time of channel
'size' (int): size of channel data
'tracesInfo': [{
'startTmEpoch': Start epoch time of the trace - float
'endTmEpoch': End epoch time of the trace - float
'times': time of channel's trace: List[float] in mseed_reader but changed to ndarray in combine_data()
'data': data of channel's trace: List[float] in mseed_reader but changed to ndarray in combine_data()
}]
'tps_data': list of lists of mean of square of every 5m of data in each day
'times' (np.array): times that has been trimmed and down-sampled for plotting
'data' (np.array): data that has been trimmed and down-sampled for plotting
'chan_db_info' (dict): the plotting parameters got from database
for this channel - dict,
ax: axes to draw the channel in PlottingWidget
ax_wf (matplotlib.axes.Axes): axes to draw the channel in WaveformWidget
}
}
Use both ax and ax_wf because mass position channels are plotted in both widgets while
soh channels are plotted in PlottingWidget and waveform channel are plotted in WaveformWidget
tps_data created in TimePoserSquareWidget only and apply for waveform_data only
## tps_data: data that aren't separated to traces
{set_key - str or (str, str): {
chan_id - str: {
times: np.array,
data: np.array,
}
}
}
\ No newline at end of file
from __future__ import annotations
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Optional, Union, List, Tuple, Dict
from obspy import UTCDateTime
from PySide2 import QtCore
from PySide2 import QtWidgets
from sohstationviewer.controller.util import display_tracking_info
from sohstationviewer.view.plotting.gps_plot.gps_point import GPSPoint
from sohstationviewer.view.util.enums import LogType
from sohstationviewer.database.process_db import execute_db
from sohstationviewer.model.general_data.general_data_helper import \
retrieve_data_time_from_data_dict, retrieve_gaps_from_data_dict, \
combine_data, sort_data, squash_gaps, apply_convert_factor_to_data_dict
class ProcessingDataError(Exception):
def __init__(self, msg):
self.message = msg
class ThreadStopped(Exception):
"""
An exception that is raised when the user requests for the data loader
thread to be stopped.
"""
def __init__(self, *args, **kwargs):
self.args = (args, kwargs)
class GeneralData():
def __init__(self, data_type,
tracking_box: Optional[QtWidgets.QTextBrowser] = None,
is_multiplex: bool = False, folder: str = '.',
list_of_rt130_paths: List[Path] = [],
req_wf_chans: Union[List[str], List[int]] = [],
req_soh_chans: List[str] = [],
gap_minimum: float = None,
read_start: Optional[float] = UTCDateTime(0).timestamp,
read_end: Optional[float] = UTCDateTime().timestamp,
include_mp123zne: bool = False,
include_mp456uvw: bool = False,
rt130_waveform_data_req: bool = False,
creator_thread: Optional[QtCore.QThread] = None,
notification_signal: Optional[QtCore.Signal] = None,
pause_signal: Optional[QtCore.Signal] = None,
on_unittest: bool = False,
*args, **kwargs):
"""
CHANGED FROM data_type_model.DataTypeModel.__init__:
+ add self.is_multiplex, self.on_unittest, self.gap_minimum,
self.keys
+ remove docstring for self.log_data, self.soh_data,
self.mass_pos_data,
self.waveform_data, self.gaps_by_key_chan,
self.stream_header_by_key_chan
Super class for different data type to process data from data files
:param data_type: type of the object
:param tracking_box: widget to display tracking info
:param folder: path to the folder of data
:param list_of_rt130_paths: path to the folders of RT130 data
:param req_wf_chans: requested waveform channel list
:param req_soh_chans: requested SOH channel list
:param read_start: requested start time to read
:param read_end: requested end time to read
:param include_mp123zne: if mass position channels 1,2,3 are requested
:param include_mp456uvw: if mass position channels 4,5,6 are requested
:param rt130_waveform_data_req: flag for RT130 to read waveform data
:param creator_thread: the thread the current DataTypeModel instance is
being created in. If None, the DataTypeModel instance is being
created in the main thread
:param notification_signal: signal used to send notifications to the
main thread. Only not None when creator_thread is not None
:param pause_signal: signal used to notify the main thread that the
data loader is paused.
"""
self.data_type = data_type
self.is_multiplex = is_multiplex
self.tracking_box = tracking_box
self.dir = folder
self.list_of_rt130_paths = list_of_rt130_paths
self.req_soh_chans = req_soh_chans
self.req_wf_chans = req_wf_chans
self.gap_minimum = gap_minimum
self.read_start = read_start
self.read_end = read_end
self.include_mp123zne = include_mp123zne
self.include_mp456uvw = include_mp456uvw
self.rt130_waveform_data_req = rt130_waveform_data_req
self.on_unittest = on_unittest
if creator_thread is None:
err_msg = (
'A signal is not None while running in main thread'
)
assert notification_signal is None, err_msg
assert pause_signal is None, err_msg
self.creator_thread = QtCore.QThread()
else:
self.creator_thread = creator_thread
self.notification_signal = notification_signal
self.pause_signal = pause_signal
"""
processing_log: record the progress of processing
"""
self.processing_log: List[Tuple[str, LogType]] = []
"""
keys: list of all keys
"""
self.keys = []
DataKey = Union[Tuple[str, str], str]
"""
log_texts: dictionary of content of text files by filenames
"""
self.log_texts: Dict[str, str] = {}
# Look for description in data_structures.MD
self.log_data = {'TEXT': []} # noqa
self.waveform_data = {}
self.soh_data = {}
self.mass_pos_data = {}
"""
data_time: time range of data sets:
"""
self.data_time: Dict[DataKey, List[float]] = {}
"""
The given data may include more than one data set which is station_id
in mseed or (unit_id, exp_no) in reftek. User are allow to choose which
data set to be displayed
selected_key: str - key of the data set to be displayed
"""
self.selected_key: Optional[str] = None
"""
gaps: gaps info in dict:
"""
self.gaps: Dict[DataKey, List[List[float]]] = {}
"""
tmp_dir: dir to keep memmap files. Deleted when object is deleted
"""
self.tmp_dir_obj: TemporaryDirectory = TemporaryDirectory()
self.tmp_dir = self.tmp_dir_obj.name
if not on_unittest:
self.save_temp_data_folder_to_database()
self._pauser = QtCore.QSemaphore()
self.pause_response = None
self.gps_points: List[GPSPoint] = []
def read_folder(self, folder: str) -> Tuple[Dict]:
"""
FROM data_type_model.DataTypeModel.read_folder
Read data from given folder
:param folder: path to folder to read data
:return: Tuple of different data dicts
"""
pass
def select_key(self) -> Union[str, Tuple[str, str]]:
"""
FROM data_type_model.DataTypeModel.select_key
Get the key for the data set to process.
:return: key of the selected data set
"""
pass
def processing_data(self):
"""
CHANGED FROM data_type_model.Data_Type_Model.processing_data
"""
if self.creator_thread.isInterruptionRequested():
raise ThreadStopped()
self.read_folder(self.dir)
self.selected_key = self.select_key()
self.fill_empty_data()
if self.creator_thread.isInterruptionRequested():
raise ThreadStopped()
self.finalize_data()
def finalize_data(self):
"""
CHANGED FROM data_type_model.Data_Type_Model.finalize_data
This function should be called after all folders finish reading to
+ Filling an empty_dict into station with no data added in
data_dicts
+ Sort all data traces in time order
+ Combine traces in data and split at gaps > gap_minimum
+ Apply convert_factor to avoid using flags to prevent double
applying convert factor when plotting
+ Check not found channels
+ Retrieve gaps from data_dicts
+ Retrieve data_time from data_dicts
+ Change data time with default value that are invalid for plotting
to read_start, read_end.
"""
if self.selected_key is None:
return
self.track_info("Finalizing...", LogType.INFO)
self.sort_all_data()
self.combine_all_data()
self.apply_convert_factor_to_data_dicts()
self.check_not_found_soh_channels()
self.retrieve_gaps_from_data_dicts()
self.retrieve_data_time_from_data_dicts()
for key in self.keys:
if key not in self.data_time.keys():
self.data_time[key] = [self.read_start, self.read_end]
def __del__(self):
# FROM data_type_model.Data_Type_Model.__del__
print("delete dataType Object")
try:
del self.tmp_dir_obj
except OSError as e:
self.track_info(
"Error deleting %s : %s" % (self.tmp_dir, e.strerror),
LogType.ERROR)
print("Error deleting %s : %s" % (self.tmp_dir, e.strerror))
print("finish deleting")
def track_info(self, text: str, type: LogType) -> None:
"""
CHANGED FROM data_type_model.Data_Type_Model.track_info:
Display tracking info in tracking_box.
Add all errors/warnings to processing_log.
:param text: str - message to display
:param type: str - type of message (error/warning/info)
"""
# display_tracking_info updates a QtWidget, which can only be done in
# the main thread. So, if we are running in a background thread
# (i.e. self.creator_thread is not None), we need to use signal slot
# mechanism to ensure that display_tracking_info is run in the main
# thread.
if self.notification_signal is None:
display_tracking_info(self.tracking_box, text, type)
else:
self.notification_signal.emit(self.tracking_box, text, type)
if type != LogType.INFO:
self.processing_log.append((text, type))
def pause(self) -> None:
"""
FROM data_type_model.Data_Type_Model.pause
Pause the thread this DataTypeModel instance is in. Works by trying
to acquire a semaphore that is not available, which causes the thread
to block.
Note: due to how this is implemented, each call to pause will require
a corresponding call to unpause. Thus, it is inadvisable to call this
method more than once.
Caution: not safe to call in the main thread. Unless a background
thread releases the semaphore, the whole program will freeze.
"""
self._pauser.acquire()
@QtCore.Slot()
def unpause(self):
"""
FROM data_type_model.Data_Type_Model.unpause
Unpause the thread this DataTypeModel instance is in. Works by trying
to acquire a semaphore that is not available, which causes the thread
to block.
Caution: due to how this is implemented, if unpause is called before
pause, the thread will not be paused until another call to pause is
made. Also, like pause, each call to unpause must be matched by another
call to pause for everything to work.
"""
self._pauser.release()
@QtCore.Slot()
def receive_pause_response(self, response: object):
"""
FROM data_type_model.Data_Type_Model.receive_pause_response
Receive a response to a request made to another thread and unpause the
calling thread.
:param response: the response to the request made
:type response: object
"""
self.pause_response = response
self.unpause()
@classmethod
def get_empty_instance(cls) -> GeneralData:
"""
# FROM data_type_model.Data_Type_Model.get_empty_instance
Create an empty data object. Useful if a DataTypeModel instance is
needed, but it is undesirable to load a data set. Basically wraps
__new__().
:return: an empty data object
:rtype: DataTypeModel
"""
return cls.__new__(cls)
def save_temp_data_folder_to_database(self):
# FROM
# data_type_model.Data_Type_Model.save_temp_data_folder_to_database
execute_db(f'UPDATE PersistentData SET FieldValue="{self.tmp_dir}" '
f'WHERE FieldName="tempDataDirectory"')
def check_not_found_soh_channels(self):
# FROM data_type_model.Data_Type_Model.check_not_found_soh_channels
all_chans_meet_req = (
list(self.soh_data[self.selected_key].keys()) +
list(self.mass_pos_data[self.selected_key].keys()) +
list(self.log_data[self.selected_key].keys()))
not_found_chans = [c for c in self.req_soh_chans
if c not in all_chans_meet_req]
if not_found_chans != []:
msg = (f"No data found for the following channels: "
f"{', '.join( not_found_chans)}")
self.processing_log.append((msg, LogType.WARNING))
def sort_all_data(self):
"""
FROM data_type_model.Data_Type_Model.sort_all_data
Sort traces by startTmEpoch on all data: waveform_data, mass_pos_data,
soh_data.
Reftek's soh_data won't be sorted here. It has been sorted by time
because it is created from log data which is sorted in
prepare_soh_data_from_log_data()
"""
sort_data(self.waveform_data[self.selected_key])
sort_data(self.mass_pos_data[self.selected_key])
try:
sort_data(self.soh_data[self.selected_key])
except KeyError:
# Reftek's SOH trace doesn't have startTmEpoch and
# actually soh_data consists of only one trace
pass
def combine_all_data(self):
combine_data(self.waveform_data[self.selected_key], self.gap_minimum)
combine_data(self.mass_pos_data[self.selected_key], self.gap_minimum)
try:
combine_data(self.soh_data[self.selected_key], self.gap_minimum)
except KeyError:
# Reftek's SOH trace doesn't have startTmEpoch and
# actually soh_data consists of only one trace
pass
def retrieve_gaps_from_data_dicts(self):
"""
Getting gaps from each data_dicts then squash all related gaps
"""
retrieve_gaps_from_data_dict(self.soh_data, self.gaps)
retrieve_gaps_from_data_dict(self.mass_pos_data, self.gaps)
retrieve_gaps_from_data_dict(self.waveform_data, self.gaps)
for sta_id in self.gaps:
self.gaps[sta_id] = squash_gaps(self.gaps[sta_id])
def retrieve_data_time_from_data_dicts(self):
"""
Going through each data_dict to update the data_time to be
[min of startTimeEpoch, max of endTimeEpoch] for each station.
"""
retrieve_data_time_from_data_dict(self.soh_data, self.data_time)
retrieve_data_time_from_data_dict(self.mass_pos_data, self.data_time)
retrieve_data_time_from_data_dict(self.waveform_data, self.data_time)
def fill_empty_data(self):
"""
Filling an empty_dict into station with no data added in data_dicts
"""
for key in self.keys:
if key not in self.soh_data:
self.soh_data[key] = {}
if key not in self.waveform_data:
self.waveform_data[key] = {}
if key not in self.mass_pos_data:
self.mass_pos_data[key] = {}
if key not in self.log_data:
self.log_data[key] = {}
def apply_convert_factor_to_data_dicts(self):
"""
Applying convert_factor to avoid using flags to prevent double
applying convert factor when plotting
"""
apply_convert_factor_to_data_dict(self.soh_data, self.data_type)
apply_convert_factor_to_data_dict(self.mass_pos_data, self.data_type)
apply_convert_factor_to_data_dict(self.waveform_data, self.data_type)
from typing import List, Dict, Optional
import numpy as np
from sohstationviewer.database.extract_data import get_convert_factor
def _check_related_gaps(min1: float, max1: float,
min2: float, max2: float,
index: int, checked_indexes: List[int]):
"""
FROM handling_data.check_related_gaps
Check if the passing ranges overlapping each other and add indexes to
checked_indexes.
:param min1: start of range 1
:param max1: end of range 1
:param min2: start of range 2
:param max2: end of range 2
:param index: index of gap being checked
:param checked_indexes: list of gaps that have been checked
:return: True if the two ranges overlap each other, False otherwise
"""
if ((min1 <= min2 <= max1) or (min1 <= max2 <= max1)
or (min2 <= min1 <= max2) or (min2 <= max1 <= max2)):
# range [min1, max1] and [min2, max2] have some part overlap each other
checked_indexes.append(index)
return True
else:
return False
def squash_gaps(gaps: List[List[float]]) -> List[List[float]]:
"""
FROM handling_data.squash_gaps
Compress gaps from different channels that have time range related to
each other to the ones with outside boundary (min start, max end)
or (min end, max start) in case of overlap.
:param gaps: [[float, float],], [[float, float],] -
list of gaps of multiple channels: [[start, end],], [[start, end],]
:return: squashed_gaps: [[float, float],] - all related gaps are squashed
extending to the outside start and end
[[min start, max end], [max start, min end]]
"""
gaps = sorted(gaps, key=lambda x: x[0])
squashed_gaps = []
checked_indexes = []
for idx, g in enumerate(gaps):
if idx in checked_indexes:
continue
squashed_gaps.append(g)
checked_indexes.append(idx)
overlap = g[0] >= g[1]
for idx_, g_ in enumerate(gaps):
if idx_ in checked_indexes:
continue
if not overlap:
if g_[0] >= g_[1]:
continue
if _check_related_gaps(g[0], g[1], g_[0], g_[1],
idx_, checked_indexes):
squashed_gaps[-1][0] = min(g[0], g_[0])
squashed_gaps[-1][1] = max(g[1], g_[1])
else:
break
else:
if g_[0] < g_[1]:
continue
if _check_related_gaps(g[1], g[0], g_[1], g_[0],
idx_, checked_indexes):
squashed_gaps[-1][0] = max(g[0], g_[0])
squashed_gaps[-1][1] = min(g[1], g_[1])
return squashed_gaps
def sort_data(sta_data_dict: Dict) -> None:
"""
FROM handling_data.sort_data
Sort data in 'traces_info' of each channel by 'startTmEpoch' order
:param sta_data_dict: data of a station
"""
for chan_id in sta_data_dict:
traces_info = sta_data_dict[chan_id]['tracesInfo']
sta_data_dict[chan_id]['tracesInfo'] = sorted(
traces_info, key=lambda i: i['startTmEpoch'])
def retrieve_data_time_from_data_dict(
data_dict: Dict, data_time: Dict[str, List[float]]) -> None:
"""
Going through each channel in each station to get data_time for each
station which is [min of startTimeEpoch, max of endTimeEpoch] among
the station's channels.
:param data_dict: the given data_dict
:param data_time: data by sta_id
"""
for sta_id in data_dict.keys():
for c in data_dict[sta_id]:
dtime = [data_dict[sta_id][c]['startTmEpoch'],
data_dict[sta_id][c]['endTmEpoch']]
if sta_id in data_time.keys():
data_time[sta_id][0] = min(data_time[sta_id][0], dtime[0])
data_time[sta_id][1] = max(data_time[sta_id][1], dtime[1])
else:
data_time[sta_id] = dtime
def retrieve_gaps_from_data_dict(data_dict: Dict,
gaps: Dict[str, List[List[float]]]) -> None:
"""
Create each station's gaps by adding all gaps from all channels
:param data_dict: given stream
:param gaps: gaps list by key
"""
for key in data_dict.keys():
if key not in gaps:
gaps[key] = []
for c in data_dict[key].keys():
cgaps = data_dict[key][c]['gaps']
if cgaps != []:
gaps[key] += cgaps
def combine_data(station_data_dict: Dict, gap_minimum: Optional[float]) \
-> None:
"""
Traverse through traces in each channel, add to gap list if
delta >= gap_minimum with delta is the distance between
contiguous traces.
Combine sorted data using concatenate, which also change data ot ndarray
and update startTmEpoch and endTmEpoch.
:param station_data_dict: dict of data of a station
:param gap_minimum: minimum length of gaps to be detected
"""
for chan_id in station_data_dict:
channel = station_data_dict[chan_id]
traces_info = channel['tracesInfo']
for idx in range(len(traces_info) - 1):
curr_end_tm = traces_info[idx]['endTmEpoch']
next_start_tm = traces_info[idx+1]['startTmEpoch']
delta = abs(curr_end_tm - next_start_tm)
if gap_minimum is not None and delta >= gap_minimum:
# add gap
gap = [curr_end_tm, next_start_tm]
station_data_dict[chan_id]['gaps'].append(gap)
channel['startTmEpoch'] = min([tr['startTmEpoch']
for tr in traces_info])
channel['endTmEpoch'] = max([tr['endTmEpoch'] for tr in traces_info])
data_list = [tr['data'] for tr in traces_info]
times_list = [tr['times'] for tr in traces_info]
channel['tracesInfo'] = [{
'startTmEpoch': channel['startTmEpoch'],
'endTmEpoch': channel['endTmEpoch'],
'data': np.concatenate(data_list),
'times': np.concatenate(times_list)
}]
def apply_convert_factor_to_data_dict(data_dict: Dict, data_type: str):
"""
Traverse through traces in each channel to convert data according to
convert_factor got from DB
:param data_dict: dict of data
:param data_type: type of data
"""
for key in data_dict:
for chan_id in data_dict[key]:
channel = data_dict[key][chan_id]
convert_factor = get_convert_factor(chan_id, data_type)
if convert_factor is not None and convert_factor != 1:
for tr in channel['tracesInfo']:
tr['data'] = convert_factor * tr['data']
from typing import BinaryIO
import obspy
from record_reader import RecordReader
class MSeedReader:
def __init__(self, file: BinaryIO) -> None:
self.file = file
def read(self):
trace = []
while 1:
# We know that end of file is reached when read() returns an empty
# string.
is_eof = (self.file.read(1) == b'')
if is_eof:
break
# We need to move the file pointer back to its position after we
# do the end of file check. Otherwise, we would be off by one
# byte for all the reads afterward.
self.file.seek(-1, 1)
# We save the start of the current record so that after we are
# done reading the record, we can move back. This makes moving
# to the next record a lot easier, seeing as we can simply move
# the file pointer a distance the size of the current record.
current_record_start = self.file.tell()
reader = RecordReader(self.file)
trace.append(reader.get_first_data_point())
# sample_count = reader.record_metadata.sample_count
# sample_rate = reader.record_metadata.sample_rate
# record_time_taken = sample_count / sample_rate
# record_end_time = (reader.record_metadata.start_time +
# record_time_taken)
# MSEED stores the size of a data record as an exponent of a
# power of two, so we have to convert that to actual size before
# doing anything else.
record_length_exp = reader.header_unpacker.unpack(
'B', reader.blockette_1000.record_length
)[0]
record_size = 2 ** record_length_exp
self.file.seek(current_record_start)
self.file.seek(record_size, 1)
if __name__ == '__main__':
# numpy.set_printoptions(threshold=sys.maxsize)
file_path = '/Users/ldam/Documents/GIT/sohstationviewer/tests/test_data/' \
'Q330_mixed_traces/XX-3203_4-20221222183011'
file = open(file_path, 'rb')
stream = obspy.read(file_path)
MSeedReader(file).read()
"""
MSeed object to hold and process MSeed data
"""
import os
import re
import traceback
from pathlib import Path
from typing import Dict, Tuple, List
from sohstationviewer.controller.util import validate_file, validate_dir
from sohstationviewer.model.mseed_data.mseed_reader import MSeedReader
from sohstationviewer.model.general_data.general_data import \
GeneralData, ThreadStopped, ProcessingDataError
from sohstationviewer.view.util.enums import LogType
from sohstationviewer.model.mseed_data.mseed_helper import \
retrieve_nets_from_data_dict, read_text
from sohstationviewer.model.mseed_data.record_reader_helper import \
MSeedReadError
class MSeed(GeneralData):
"""
read and process mseed file into object with properties can be used to
plot SOH data, mass position data, waveform data and gaps
"""
def __init__(self, *args, **kwargs):
# FROM mseed.mseed.MSEED.__init__
super().__init__(*args, **kwargs)
self.nets_by_sta: Dict[str, List[str]] = {}
self.processing_data()
def finalize_data(self):
"""
CHANGED FROM mseed.mseed.MSEED.finalize_data
This function should be called after all folders finish reading to
+ get nets_by_sta from stream_header_by_key_chan
+ other tasks in super().finalize_data()
"""
self.distribute_log_text_to_station()
self.retrieve_nets_from_data_dicts()
super().finalize_data()
def read_folder(self, folder: str) -> Tuple[Dict]:
"""
CHANGED FROM mseed.mseed.MSEED.read_folder
Read data streams for soh, mass position and waveform.
:param folder: absolute path to data set folder
:return waveform_data: waveform data by station
:return soh_data: soh data by station
:return mass_pos_data: mass position data by station
:return gaps: gap list by station
:return nets_by_sta: netcodes list by station
"""
if not os.path.isdir(folder):
raise ProcessingDataError(f"Path '{folder}' not exist")
count = 0
total = sum([len(files) for _, _, files in os.walk(folder)])
invalid_blockettes = False
not_mseed_files = []
for path, sub_dirs, files in os.walk(folder):
try:
validate_dir(path)
except Exception as e:
# skip Information folder
self.track_info(str(e), LogType.WARNING)
continue
for file_name in files:
if self.creator_thread.isInterruptionRequested():
raise ThreadStopped()
path2file = Path(path).joinpath(file_name)
if not validate_file(path2file, file_name):
continue
count += 1
if count % 10 == 0:
self.track_info(
f'Read {count} files/{total}', LogType.INFO)
log_text = read_text(path2file)
if log_text is not None:
self.log_texts[path2file] = log_text
continue
reader = MSeedReader(
path2file,
read_start=self.read_start,
read_end=self.read_end,
is_multiplex=self.is_multiplex,
req_soh_chans=self.req_soh_chans,
req_wf_chans=self.req_wf_chans,
include_mp123zne=self.include_mp123zne,
include_mp456uvw=self.include_mp456uvw,
soh_data=self.soh_data,
mass_pos_data=self.mass_pos_data,
waveform_data=self.waveform_data,
log_data=self.log_data,
gap_minimum=self.gap_minimum)
try:
reader.read()
invalid_blockettes = (invalid_blockettes
or reader.invalid_blockettes)
except MSeedReadError:
not_mseed_files.append(file_name)
except Exception:
fmt = traceback.format_exc()
self.track_info(f"Skip file {path2file} can't be read "
f"due to error: {str(fmt)}",
LogType.WARNING)
if not_mseed_files:
self.track_info(
f"Not MSeed files: {not_mseed_files}", LogType.WARNING)
if invalid_blockettes:
# This check to only print out message once
print("We currently only handle blockettes 500, 1000,"
" and 1001.")
self.track_info(
f'Skipped {total - count} invalid files.', LogType.INFO)
def retrieve_nets_from_data_dicts(self):
"""
Going through stations of each data_dict to get all network codes found
in all channel of a station to add to nets_by_station.
"""
retrieve_nets_from_data_dict(self.soh_data, self.nets_by_sta)
retrieve_nets_from_data_dict(self.mass_pos_data, self.nets_by_sta)
retrieve_nets_from_data_dict(self.waveform_data, self.nets_by_sta)
def select_key(self) -> str:
"""
CHANGED FROM mseed.mseed.MSEED:
+ get sta_ids from self.keys
+ add condition if not on_unittest to create unittest for mseed
:return selected_sta_id: the selected station id from available
key of stream header.
+ If there is only one station id, return it.
+ If there is more than one, show all ids, let user choose one to
return.
"""
self.keys = sorted(list(set(
list(self.soh_data.keys()) +
list(self.mass_pos_data.keys()) +
list(self.waveform_data.keys()) +
[k for k in list(self.log_data.keys()) if k != 'TEXT']
)))
sta_ids = self.keys
if len(sta_ids) == 0:
return
selected_sta_id = sta_ids[0]
if not self.on_unittest and len(sta_ids) > 1:
msg = ("There are more than one stations in the given data.\n"
"Please select one to display")
self.pause_signal.emit(msg, sta_ids)
self.pause()
selected_sta_id = sta_ids[self.pause_response]
self.track_info(f'Select Station {selected_sta_id}', LogType.INFO)
return selected_sta_id
def distribute_log_text_to_station(self):
"""
Loop through paths to text files to look for station id in the path.
+ If there is station id in the path, add the content to the
station id with channel 'TXT'.
+ if station id not in the path, add the content to the key 'TEXT'
which means don't know the station for these texts.
"""
for path2file in self.log_texts:
try:
file_parts = re.split(rf"{os.sep}|\.", path2file.as_posix())
sta = [s for s in self.keys if s in file_parts][0]
except IndexError:
self.log_data['TEXT'].append(self.log_texts[path2file])
continue
if 'TXT' not in self.log_data[sta]:
self.log_data[sta]['TXT'] = []
self.log_data[sta]['TXT'].append(self.log_texts[path2file])
# Functions that change from handling_data's functions
import os
from pathlib import Path
from typing import Union, List, Dict
def retrieve_nets_from_data_dict(data_dict: Dict,
nets_by_sta: Dict[str, List[str]]) -> None:
"""
Retrieve nets by sta_id from the given data_dict.
:param data_dict: dict of data by station
:param nets_by_sta: nets list by sta_id
"""
for sta_id in data_dict.keys():
if sta_id not in nets_by_sta:
nets_by_sta[sta_id] = set()
for c in data_dict[sta_id]:
nets_by_sta[sta_id].update(
data_dict[sta_id][c]['nets'])
def read_text(path2file: Path) -> Union[bool, str]:
"""
CHANGED FROM handling_data.read_text:
+ Don't need to check binary because UnicodeDecodeError caught means
the file is binary
Read text file and add to log_data under channel TEXT.
+ Raise exception if the file isn't a text file
+ Remove empty lines in content
:param path2file: str - absolute path to text file
:param file_name: str - name of text file
:param text_logs: holder to keep log string, refer to
DataTypeModel.__init__.log_data['TEXT']
"""
try:
with open(path2file, 'r') as file:
content = file.read().strip()
except UnicodeDecodeError:
return
if content != '':
# skip empty lines
no_empty_line_list = [
line for line in content.splitlines() if line]
no_empty_line_content = os.linesep.join(no_empty_line_list)
log_text = "\n\n** STATE OF HEALTH: %s\n" % path2file.name
log_text += no_empty_line_content
else:
log_text = ''
return log_text
from numbers import Real
from typing import BinaryIO, Optional, Dict, Union, List
from pathlib import Path
from obspy import UTCDateTime
from sohstationviewer.model.mseed_data.record_reader import RecordReader
from sohstationviewer.model.mseed_data.record_reader_helper import \
RecordMetadata
from sohstationviewer.model.handling_data import check_chan
class MSeedReader:
def __init__(self, file_path: Path,
read_start: float = UTCDateTime(0).timestamp,
read_end: float = UTCDateTime().timestamp,
is_multiplex: Optional[bool] = None,
req_soh_chans: List[str] = [],
req_wf_chans: List[str] = [],
include_mp123zne: bool = False,
include_mp456uvw: bool = False,
soh_data: Dict = {},
mass_pos_data: Dict = {},
waveform_data: Dict = {},
log_data: Dict[str, Union[List[str],
Dict[str, List[str]]]] = {},
gap_minimum: Optional[float] = None
) -> None:
"""
The object of the class is to read data from given file to add
to given stream if meet requirements.
If data_type is not multiplex, all records of a file are belong to the
same channel; the info found from the first record can
be used to determine to keep reading if the first one doesn't meet
channel's requirement.
If data_type is multiplex, every record have to be examined.
All data_dicts' definition can be found in data_dict_structures.MD
:param file_path: Absolute path to data file
:param read_start: time that is required to start reading
:param read_end: time that is required to end reading
:param is_multiplex: multiplex status of the file's data_type
:param req_soh_chans: requested SOH channel list
:param req_wf_chans: requested waveform channel list
:param include_mp123zne: if mass position channels 1,2,3 are requested
:param include_mp456uvw: if mass position channels 4,5,6 are requested
:param soh_data: data dict of SOH
:param mass_pos_data: data dict of mass position
:param waveform_data: data dict of waveform
:param log_data: data dict of log_data
:param gap_minimum: minimum length of gaps required to detect
from record
"""
self.read_start = read_start
self.read_end = read_end
self.is_multiplex = is_multiplex
self.gap_minimum = gap_minimum
self.req_soh_chans = req_soh_chans
self.req_wf_chans = req_wf_chans
self.include_mp123zne = include_mp123zne
self.include_mp456uvw = include_mp456uvw
self.soh_data = soh_data
self.mass_pos_data = mass_pos_data
self.waveform_data = waveform_data
self.log_data = log_data
self.file_path = file_path
self.file: BinaryIO = open(file_path, 'rb')
self.invalid_blockettes = False,
def get_data_dict(self, metadata: RecordMetadata) -> Dict:
"""
Find which data_dict to add data to from req_soh_chans, req_wf_chans,
include_mp123zne, include_mp456uvw, samplerate
:param metadata: record's metadata
:return: data_dict to add data
"""
chan_id = metadata.channel
sample_rate = metadata.sample_rate
chan_type = check_chan(chan_id, self.req_soh_chans, self.req_wf_chans,
self.include_mp123zne, self.include_mp456uvw)
if chan_type == 'SOH':
if self.req_soh_chans == [] and sample_rate > 1:
# If 'All chans' is selected for SOH, channel with samplerate>1
# will be skipped by default to improve performance.
# Note: If user intentionally added channels with samplerate>1
# using SOH Channel Preferences dialog, they are still read.
return
return self.soh_data
if chan_type == 'MP':
return self.mass_pos_data
if chan_type == 'WF':
return self.waveform_data
def check_time(self, record: RecordReader) -> bool:
"""
Check if record time in the time range that user require to read
:param record: the record read from file
:return: True when record time satisfy the requirement
"""
meta = record.record_metadata
if self.read_start > meta.end_time or self.read_end < meta.start_time:
return False
return True
def append_log(self, record: RecordReader) -> None:
"""
Add all text info retrieved from record to log_data
:param record: the record read from file
"""
logs = [record.ascii_text] + record.other_blockettes
log_str = "===========\n".join(logs)
if log_str == "":
return
meta = record.record_metadata
log_str = "\n\nSTATE OF HEALTH: " + \
f"From:{meta.start_time} To:{meta.end_time}\n" + log_str
sta_id = meta.station
chan_id = meta.channel
if sta_id not in self.log_data.keys():
self.log_data[sta_id] = {}
if chan_id not in self.log_data[sta_id]:
self.log_data[sta_id][chan_id] = []
self.log_data[sta_id][chan_id].append(log_str)
def append_data(self, data_dict: dict,
record: RecordReader,
data_point: Real) -> None:
"""
Append data point to the given data_dict
:param data_dict: the data dict to add data get from record
:param record: the record read from file
:param data_point: the first sample of the record frame
"""
if data_point is None:
return
meta = record.record_metadata
sta_id = meta.station
if sta_id not in data_dict.keys():
data_dict[sta_id] = {}
station = data_dict[sta_id]
self.add_chan_data(station, meta, data_point)
def _add_new_trace(self, channel: Dict, metadata: RecordMetadata,
data_point: Real) -> None:
"""
Start a new trace to channel['tracesInfo'] with data_point as
the first data value and metadata's start_time as first time value
:param channel: dict of channel's info
:param metadata: record's meta data
:param data_point: the first sample of the record frame
"""
channel['tracesInfo'].append({
'startTmEpoch': metadata.start_time,
'data': [data_point],
'times': [metadata.start_time]
})
def _append_trace(self, channel, metadata, data_point):
"""
Appending data_point to the latest trace of channel['tracesInfo']
:param channel: dict of channel's info
:param metadata: record's meta data
:param data_point: the first sample of the record frame
"""
channel['tracesInfo'][-1]['data'].append(data_point)
channel['tracesInfo'][-1]['times'].append(metadata.start_time)
def add_chan_data(self, station: dict, metadata: RecordMetadata,
data_point: Real) -> None:
"""
Add new channel to the passed station and append data_point to the
channel if there's no gap/overlap or start a new trace of data
when there's a gap.
If gap/overlap > gap_minimum, add to gaps list.
:param station: dict of chan by id of a station
:param metadata: an Object of metadata from the record
:param data_point: the first sample of the record frame
"""
meta = metadata
chan_id = metadata.channel
if chan_id not in station.keys():
station[chan_id] = {
'file_path': self.file_path,
'chanID': chan_id,
'samplerate': meta.sample_rate,
'startTmEpoch': meta.start_time,
'endTmEpoch': meta.end_time,
'size': meta.sample_count,
'nets': {meta.network},
'gaps': [],
'tracesInfo': [{
'startTmEpoch': meta.start_time,
'endTmEpoch': meta.end_time,
'data': [data_point],
'times': [meta.start_time]
}]
}
else:
channel = station[chan_id]
record_start_time = meta.start_time
previous_end_time = channel['endTmEpoch']
delta = abs(record_start_time - previous_end_time)
if channel['file_path'] != self.file_path:
# Start new trace for each file to reorder trace and
# combine traces again later
channel['file_path'] = self.file_path
self._add_new_trace(channel, meta, data_point)
else:
if self.gap_minimum is not None and delta >= self.gap_minimum:
gap = [previous_end_time, record_start_time]
channel['gaps'].append(gap)
# appending data
self._append_trace(channel, meta, data_point)
channel['tracesInfo'][-1]['endTmEpoch'] = meta.end_time
# update channel's metadata
channel['endTmEpoch'] = meta.end_time
channel['size'] += meta.sample_count
channel['nets'].add(meta.network)
def get_ready_for_next_read(self, current_record_start: int,
record: RecordReader):
"""
Move the current position of file to next record' start.
:param current_record_start: the start position of the current record
:param record: the record that is reading
"""
# MSEED stores the size of a data record as an exponent of a
# power of two, so we have to convert that to actual size before
# doing anything else.
record_length_exp = record.header_unpacker.unpack(
'B', record.blockette_1000.record_length
)[0]
record_size = 2 ** record_length_exp
self.file.seek(current_record_start)
self.file.seek(record_size, 1)
def read(self):
while 1:
# We know that end of file is reached when read() returns an empty
# string.
is_eof = (self.file.read(1) == b'')
if is_eof:
break
# We need to move the file pointer back to its position after we
# do the end of file check. Otherwise, we would be off by one
# byte for all the reads afterward.
self.file.seek(-1, 1)
# We save the start of the current record so that after we are
# done reading the record, we can move back. This makes moving
# to the next record a lot easier, seeing as we can simply move
# the file pointer a distance the size of the current record.
current_record_start = self.file.tell()
record = RecordReader(self.file)
if record.invalid_blockettes:
self.invalid_blockettes = True
if not self.check_time(record):
self.get_ready_for_next_read(current_record_start, record)
continue
data_dict = self.get_data_dict(record.record_metadata)
if data_dict is None:
if self.is_multiplex:
self.get_ready_for_next_read(current_record_start, record)
continue
else:
break
first_data_point = record.get_first_data_point()
self.append_data(data_dict, record, first_data_point)
self.append_log(record)
self.get_ready_for_next_read(current_record_start, record)
self.file.close()
......@@ -4,11 +4,11 @@ from typing import BinaryIO, Optional, List
from obspy import UTCDateTime
from decode_mseed import (
from sohstationviewer.model.mseed_data.decode_mseed import (
decode_ieee_float, decode_ieee_double, decode_steim, decode_int16,
decode_int24, decode_int32,
)
from mseed_helper import (
from sohstationviewer.model.mseed_data.record_reader_helper import (
FixedHeader, Blockette1000, get_data_endianness, Unpacker,
get_record_metadata, get_header_endianness, RecordMetadata,
EncodingFormat,
......@@ -37,7 +37,8 @@ class RecordReader:
self.data_unpacker: Unpacker = Unpacker()
self.record_metadata: Optional[RecordMetadata] = None
self.invalid_blockettes = False
self.ascii_text: str = ''
self.read_header()
def read_header(self) -> None:
......@@ -220,8 +221,7 @@ class RecordReader:
'H', next_blockette_type
)[0]
if next_blockette_type not in (500, 1000, 1001):
print('We currently only handle blockettes 500, 1000, and'
'1001.')
self.invalid_blockettes = True
continue
if next_blockette_type == 500:
self.read_blockette_500()
......@@ -230,7 +230,27 @@ class RecordReader:
elif next_blockette_type == 2000:
self.read_blockette_2000()
def get_first_data_point(self) -> Real:
def decode_ascii_data(self, data_start: int):
"""
Read ASCII string from data portion of the record but remove the
padding
:param data_start: Byte number where the data start
"""
# We want to read everything in the record if the encoding is
# ASCII.
record_length_exp = self.header_unpacker.unpack(
'B', self.blockette_1000.record_length
)[0]
record_size = 2 ** record_length_exp
data_block = self.file.read(record_size - data_start)
single_padding = b'\x00'.decode()
try:
self.ascii_text = data_block.decode().rstrip(single_padding)
except UnicodeDecodeError:
pass
def get_first_data_point(self) -> Optional[Real]:
"""
Get the first data point of the current data record.
:return: the first data point of the current data record, whose type is
......@@ -251,17 +271,8 @@ class RecordReader:
encoding_format = EncodingFormat(encoding_format)
if encoding_format == EncodingFormat.ASCII:
# We want to read everything in the record if the encoding is
# ASCII.
record_length_exp = self.header_unpacker.unpack(
'B', self.blockette_1000.record_length
)[0]
record_size = 2 ** record_length_exp
# This name does not make much sense with what we are doing here,
# but it will have to do for now.
# The size of the record includes the header, so we have to account
# for that when grabbing the data.
first_data_point = self.file.read(record_size - data_start)
self.decode_ascii_data(data_start)
first_data_point = None
else:
# Currently, we are extracting only the first data point in each
......
......@@ -5,6 +5,11 @@ from enum import Enum
from obspy import UTCDateTime
class MSeedReadError(Exception):
def __init__(self, msg):
self.message = msg
class Unpacker:
"""
A wrapper around struct.unpack() to unpack binary data without having to
......@@ -79,7 +84,8 @@ class RecordMetadata:
location: str
channel: str
network: str
start_time: UTCDateTime
start_time: float
end_time: float
sample_count: int
sample_rate: float
......@@ -95,6 +101,21 @@ class EncodingFormat(Enum):
STEIM_2 = 11
def check_time_from_time_string(endian, time_string):
try:
record_start_time_tuple = struct.unpack(f'{endian}hhbbbbh',
time_string)
except struct.error:
raise MSeedReadError("Not an MSeed file.")
# libmseed uses 1900 to 2100 as the sane year range. We follow their
# example here.
year_is_good = (1900 <= record_start_time_tuple[0] <= 2100)
# The upper range is 366 to account for leap years.
day_is_good = (1 <= record_start_time_tuple[1] <= 366)
return year_is_good and day_is_good
def get_header_endianness(header: FixedHeader):
"""
Determine the endianness of the fixed header of a data record. Works by
......@@ -117,15 +138,15 @@ def get_header_endianness(header: FixedHeader):
endianness of header
"""
record_start_time_string = header.record_start_time
record_start_time_tuple = struct.unpack('>hhbbbbh',
record_start_time_string)
# libmseed uses 1900 to 2100 as the sane year range. We follow their
# example here.
year_is_good = (1900 <= record_start_time_tuple[0] <= 2100)
# The upper range is 366 to account for leap years.
day_is_good = (1 <= record_start_time_tuple[1] <= 366)
endianness = 'big' if year_is_good and day_is_good else 'little'
good_time = check_time_from_time_string('>', record_start_time_string)
if good_time:
endianness = 'big'
else:
good_time = check_time_from_time_string('<', record_start_time_string)
if good_time:
endianness = 'little'
else:
raise MSeedReadError("Not an MSeed file.")
return endianness
......@@ -178,32 +199,41 @@ def get_record_metadata(header: FixedHeader, header_unpacker: Unpacker):
needed so that the correct byte order can be used
:return: the extract record metadata
"""
station = header.station.decode('utf-8').rstrip()
location = header.location.decode('utf-8').rstrip()
channel = header.channel.decode('utf-8').rstrip()
network = header.net_code.decode('utf-8').rstrip()
record_start_time_string = header.record_start_time
record_start_time_tuple = header_unpacker.unpack('HHBBBBH',
record_start_time_string)
record_start_time = UTCDateTime(year=record_start_time_tuple[0],
julday=record_start_time_tuple[1],
hour=record_start_time_tuple[2],
minute=record_start_time_tuple[3],
second=record_start_time_tuple[4],
microsecond=record_start_time_tuple[
6] * 100)
sample_count = header_unpacker.unpack('H', header.sample_count)[0]
sample_rate_factor = header_unpacker.unpack(
'h', header.sample_rate_factor
)[0]
sample_rate_multiplier = header_unpacker.unpack(
'h', header.sample_rate_multiplier
)[0]
try:
station = header.station.decode('utf-8').rstrip()
location = header.location.decode('utf-8').rstrip()
channel = header.channel.decode('utf-8').rstrip()
network = header.net_code.decode('utf-8').rstrip()
record_start_time_string = header.record_start_time
record_start_time_tuple = header_unpacker.unpack(
'HHBBBBH', record_start_time_string)
record_start_time = UTCDateTime(year=record_start_time_tuple[0],
julday=record_start_time_tuple[1],
hour=record_start_time_tuple[2],
minute=record_start_time_tuple[3],
second=record_start_time_tuple[4],
microsecond=record_start_time_tuple[
6] * 100).timestamp
sample_count = header_unpacker.unpack('H', header.sample_count)[0]
sample_rate_factor = header_unpacker.unpack(
'h', header.sample_rate_factor
)[0]
sample_rate_multiplier = header_unpacker.unpack(
'h', header.sample_rate_multiplier
)[0]
except ValueError:
raise MSeedReadError("Not an MSeed file.")
sample_rate = calculate_sample_rate(sample_rate_factor,
sample_rate_multiplier)
if sample_rate == 0:
record_end_time = record_start_time
else:
record_time_taken = sample_count / sample_rate
record_end_time = record_start_time + record_time_taken
return RecordMetadata(station, location, channel, network,
record_start_time, sample_count, sample_rate)
record_start_time, record_end_time,
sample_count, sample_rate)
......@@ -504,13 +504,20 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.waveform_dlg.plotting_widget.clear()
self.tps_dlg.plotting_widget.clear()
def cancel_loading(self):
display_tracking_info(self.tracking_info_text_browser,
"Loading cancelled",
LogType.WARNING)
@QtCore.Slot()
def read_selected_files(self):
"""
Read data from selected files/directories, process and plot channels
read from those according to current options set on the GUI
"""
display_tracking_info(self.tracking_info_text_browser,
"Loading started",
LogType.INFO)
self.clear_plots()
start_tm_str = self.time_from_date_edit.date().toString(
QtCore.Qt.ISODate)
......@@ -520,6 +527,7 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
if self.end_tm <= self.start_tm:
msg = "To Date must be greater than From Date."
QtWidgets.QMessageBox.warning(self, "Wrong Date Given", msg)
self.cancel_loading()
return
self.info_list_widget.clear()
is_working = (self.is_loading_data or self.is_plotting_soh or
......@@ -538,6 +546,7 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
msg = "Minimum Gap must be a number."
QtWidgets.QMessageBox.warning(
self, "Invalid Minimum Gap request", msg)
self.cancel_loading()
return
else:
self.min_gap = None
......@@ -555,6 +564,7 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
except Exception as e:
QtWidgets.QMessageBox.warning(
self, "Incorrect Wildcard", str(e))
self.cancel_loading()
return
try:
......@@ -572,6 +582,7 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.read_from_file_list()
except Exception as e:
QtWidgets.QMessageBox.warning(self, "Select directory", str(e))
self.cancel_loading()
return
dir_size = sum(get_dir_size(str(dir))[0] for dir in self.dir_names)
......@@ -587,6 +598,7 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
data_too_big_dialog.setIcon(QMessageBox.Question)
ret = data_too_big_dialog.exec_()
if ret == QMessageBox.Abort:
self.cancel_loading()
return
self.req_soh_chans = self.get_requested_soh_chan()
......@@ -594,6 +606,7 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.req_wf_chans = self.get_requested_wf_chans()
except Exception as e:
QMessageBox.information(self, "Waveform Selection", str(e))
self.cancel_loading()
return
start_tm_str = self.time_from_date_edit.date().toString(
......
# Define functions to call processor
from typing import Tuple, Union, Dict, Callable, List, Optional
from typing import Tuple, Union, Dict, List
from PySide2 import QtCore
......@@ -105,20 +105,17 @@ class MultiThreadedPlottingWidget(PlottingWidget):
return True
def create_plotting_channel_processors(
self, plotting_data: Dict,
get_plot_info: Optional[Callable[[str, Dict, str], Dict]]) -> None:
self, plotting_data: Dict, need_db_info: bool = False) -> None:
"""
Create a data processor for each channel data.
:param plotting_data: dict of data by chan_id
:param get_plot_info: function to get plotting info from database
:param need_db_info: flag to get db info
"""
for chan_id in plotting_data:
if get_plot_info is not None:
chan_db_info = get_plot_info(chan_id,
plotting_data[chan_id],
self.parent.data_type,
self.c_mode)
if need_db_info:
chan_db_info = get_chan_plot_info(
chan_id, self.parent.data_type, self.c_mode)
if chan_db_info['height'] == 0:
# not draw
continue
......@@ -196,16 +193,10 @@ class MultiThreadedPlottingWidget(PlottingWidget):
self.clean_up()
self.finished.emit()
return
self.create_plotting_channel_processors(
self.plotting_data1, self.get_plot_info)
self.create_plotting_channel_processors(
self.plotting_data2, get_chan_plot_info)
self.create_plotting_channel_processors(self.plotting_data1, True)
self.create_plotting_channel_processors(self.plotting_data2, True)
self.process_channel()
def get_plot_info(self, *args, **kwargs):
# function to get database info for channels in self.plotting_data1
pass
@QtCore.Slot()
def process_channel(self, channel_data=None, channel_id=None):
"""
......@@ -347,6 +338,6 @@ class MultiThreadedPlottingWidget(PlottingWidget):
self.is_working = True
start_msg = 'Zooming in...'
display_tracking_info(self.tracking_box, start_msg, 'info')
self.create_plotting_channel_processors(self.plotting_data1, None)
self.create_plotting_channel_processors(self.plotting_data2, None)
self.create_plotting_channel_processors(self.plotting_data1)
self.create_plotting_channel_processors(self.plotting_data2)
self.process_channel()
......@@ -8,8 +8,6 @@ from sohstationviewer.controller.util import apply_convert_factor
from sohstationviewer.model.data_type_model import DataTypeModel
from sohstationviewer.database.extract_data import get_chan_plot_info
from sohstationviewer.view.util.enums import LogType
from sohstationviewer.view.plotting.plotting_widget.\
multi_threaded_plotting_widget import MultiThreadedPlottingWidget
......@@ -52,10 +50,6 @@ class SOHWidget(MultiThreadedPlottingWidget):
self.processing_log.append((msg, LogType.WARNING))
return True
def get_plot_info(self, *args, **kwargs):
# function to get database info for soh channels in self.plotting_data1
return get_chan_plot_info(*args, **kwargs)
def plot_single_channel(self, c_data: Dict, chan_id: str):
"""
Plot the channel chan_id.
......
......@@ -13,7 +13,7 @@ from sohstationviewer.controller.util import (
display_tracking_info, add_thousand_separator,
)
from sohstationviewer.database.extract_data import (
get_color_def, get_color_ranges, get_chan_label,
get_color_def, get_color_ranges, get_seismic_chan_label,
)
from sohstationviewer.model.data_type_model import DataTypeModel
from sohstationviewer.model.handling_data import (
......@@ -227,7 +227,7 @@ class TimePowerSquaredWidget(plotting_widget.PlottingWidget):
ax.spines[['right', 'left', 'top', 'bottom']].set_visible(False)
ax.text(
-0.12, 1,
f"{get_chan_label(chan_id)} {c_data['samplerate']}sps",
f"{get_seismic_chan_label(chan_id)} {c_data['samplerate']}sps",
horizontalalignment='left',
verticalalignment='top',
rotation='horizontal',
......
......@@ -11,8 +11,6 @@ from sohstationviewer.view.plotting.plotting_widget.\
from sohstationviewer.controller.util import apply_convert_factor
from sohstationviewer.database.extract_data import get_wf_plot_info
class WaveformWidget(MultiThreadedPlottingWidget):
"""
......@@ -39,10 +37,6 @@ class WaveformWidget(MultiThreadedPlottingWidget):
return super().init_plot(d_obj, data_time, key, start_tm, end_tm,
time_ticks_total, is_waveform=True)
def get_plot_info(self, *args, **kwargs):
# function to get database info for wf channels in self.plotting_data1
return get_wf_plot_info(*args, **kwargs)
def plot_single_channel(self, c_data: Dict, chan_id: str):
"""
Plot the channel chan_id.
......