Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
Showing
with 505 additions and 241 deletions
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import Optional, List
"""
Routines building upon obspy.io.reftek.packet.
Redefine packet header (PACKET) based on rt130 manual.
......@@ -268,14 +270,14 @@ FD_INFO = {
"_coeff": (984, None)}
class Packet(obspy_rt130_packet.Packet):
class SOHPacket(obspy_rt130_packet.Packet):
"""Class used to define shared tools for the SOH packets"""
_headers = ('experiment_number', 'unit_id', 'byte_count',
'packet_sequence', 'time')
@staticmethod
def from_data(data):
def from_data(data: np.ndarray) -> SOHPacket:
"""
Checks for valid packet type identifier and returns appropriate
packet object
......@@ -300,7 +302,7 @@ class Packet(obspy_rt130_packet.Packet):
raise NotImplementedError(msg.format(packet_type))
@staticmethod
def time_tag(time, implement_time=None):
def time_tag(time: UTCDateTime, implement_time: Optional[int] = None):
if implement_time is not None and time > UTCDateTime(ns=implement_time): # noqa: E501
time = UTCDateTime(ns=implement_time)
return "{:04d}:{:03d}:{:02d}:{:02d}:{:02d}:{:03d}".format(time.year,
......@@ -311,20 +313,14 @@ class Packet(obspy_rt130_packet.Packet):
time.microsecond) # noqa: E501
@property
def packet_tagline(self):
def packet_tagline(self) -> str:
return "\n"
# return "\n\n{:s} exp {:02d} bytes {:04d} {:s} ID: {:s} seq {:04d}".format(self.type.decode(), # noqa: E501
# self.experiment_number, # noqa: E501
# self.byte_count, # noqa: E501
# self.time_tag(self.time), # noqa: E501
# self.unit_id.decode(), # noqa: E501
# self.packet_sequence) # noqa: E501
class SHPacket(Packet):
class SHPacket(SOHPacket):
"""Class used to parse and generate string representation for SH packets"""
def __init__(self, data):
def __init__(self, data: np.ndarray) -> None:
self._data = data
payload = self._data["payload"].tobytes()
start_sh = 0
......@@ -341,7 +337,7 @@ class SHPacket(Packet):
setattr(self, name, data)
start_sh = start_sh + length
def __str__(self):
def __str__(self) -> str:
info = []
# info.append(self.packet_tagline)
packet_soh_string = ("\nState of Health {:s} ST: {:s}"
......@@ -352,10 +348,10 @@ class SHPacket(Packet):
return info
class SCPacket(Packet):
class SCPacket(SOHPacket):
"""Class used to parse and generate string representation for SC packets"""
def __init__(self, data):
def __init__(self, data: np.ndarray) -> None:
# Station/Channel payload
self._data = data
payload = self._data["payload"].tobytes()
......@@ -389,12 +385,8 @@ class SCPacket(Packet):
setattr(self, name, data)
start_info = start_info + length
def __str__(self):
def __str__(self) -> str:
info = []
# info.append(self.packet_tagline)
# packet_soh_string = ("\nStation Channel Definition {:s} ST: {:s}"
# .format(self.time_tag(self.time, implement_time=self.implement_time), # noqa: E501
# self.unit_id.decode()))
packet_soh_string = ("\nStation Channel Definition {:s} ST: {:s}"
.format(self.time_tag(self.time),
self.unit_id.decode()))
......@@ -430,7 +422,7 @@ class SCPacket(Packet):
info.append("\n Comments - " + getattr(self, 'sc' + str(ind_sc) + '_comments')) # noqa: E501
return info
def get_info(self, infos):
def get_info(self, infos: List[List]) -> List[List]:
"""
Compile relevant information - unit id, reference channel, network
code, station code, component code, gain and implementation time - for
......@@ -461,10 +453,10 @@ class SCPacket(Packet):
return infos
class OMPacket(Packet):
class OMPacket(SOHPacket):
"""Class used to parse and generate string representation for OM packets"""
def __init__(self, data):
def __init__(self, data: np.ndarray) -> None:
self._data = data
payload = self._data["payload"].tobytes()
start_om = 0
......@@ -481,7 +473,7 @@ class OMPacket(Packet):
setattr(self, name, data)
start_om = start_om + length
def __str__(self):
def __str__(self) -> str:
info = []
# info.append(self.packet_tagline)
packet_soh_string = ("\nOperating Mode Definition {:s} ST: {:s}"
......@@ -503,10 +495,10 @@ class OMPacket(Packet):
return info
class DSPacket(Packet):
class DSPacket(SOHPacket):
"""Class used to parse and generate string representation for DS packets"""
def __init__(self, data):
def __init__(self, data: np.ndarray) -> None:
# Data Stream payload
self._data = data
payload = self._data["payload"].tobytes()
......@@ -561,7 +553,7 @@ class DSPacket(Packet):
msg = ("Trigger type {:s} not found".format(trigger_type))
warnings.warn(msg)
def __str__(self):
def __str__(self) -> str:
info = []
info.append(self.packet_tagline)
packet_soh_string = ("\nData Stream Definition {:s} ST: {:s}"
......@@ -597,7 +589,7 @@ class DSPacket(Packet):
info.append(" ".join(["\n Trigger", key, trigger_info])) # noqa: E501
return info
def get_info(self, infos):
def get_info(self, infos: List[List]) -> List[List]:
"""
Compile relevant information - reference data stream, band and
instrument codes, sample rate and implementation time - for given DS
......@@ -624,10 +616,10 @@ class DSPacket(Packet):
return infos
class ADPacket(Packet):
class ADPacket(SOHPacket):
"""Class used to parse and generate string representation for AD packets"""
def __init__(self, data):
def __init__(self, data: np.ndarray) -> None:
self._data = data
payload = self._data["payload"].tobytes()
start_ad = 0
......@@ -644,7 +636,7 @@ class ADPacket(Packet):
setattr(self, name, data)
start_ad = start_ad + length
def __str__(self):
def __str__(self) -> str:
info = []
# info.append(self.packet_tagline)
packet_soh_string = ("\nAuxiliary Data Parameter {:s} ST: {:s}"
......@@ -664,10 +656,10 @@ class ADPacket(Packet):
return info
class CDPacket(Packet):
class CDPacket(SOHPacket):
"""Class used to parse and generate string representation for CD packets"""
def __init__(self, data):
def __init__(self, data: np.ndarray) -> None:
# Calibration parameter payload
self._data = data
payload = self._data["payload"].tobytes()
......@@ -736,7 +728,7 @@ class CDPacket(Packet):
setattr(self, name, data)
start_info_seq = start_info_seq + length
def __str__(self):
def __str__(self) -> str:
info = []
# info.append(self.packet_tagline)
packet_soh_string = ("\nCalibration Definition {:s} ST: {:s}"
......@@ -790,10 +782,10 @@ class CDPacket(Packet):
return info
class FDPacket(Packet):
class FDPacket(SOHPacket):
"""Class used to parse and generate string representation for FD packets"""
def __init__(self, data):
def __init__(self, data: np.ndarray) -> None:
# Filter description payload
self._data = data
payload = self._data["payload"]
......@@ -845,7 +837,7 @@ class FDPacket(Packet):
setattr(self, name, data)
start_info = start_info + length
def __str__(self):
def __str__(self) -> str:
info = []
# info.append(self.packet_tagline)
packet_soh_string = ("\nFilter Description {:s} ST: {:s}"
......@@ -873,7 +865,7 @@ class FDPacket(Packet):
return info
@staticmethod
def twosCom_bin2dec(bin_, digit):
def twosCom_bin2dec(bin_: str, digit: int):
while len(bin_) < digit:
bin_ = '0' + bin_
if bin_[0] == '0':
......@@ -882,7 +874,7 @@ class FDPacket(Packet):
return -1 * (int(''.join('1' if x == '0' else '0' for x in bin_), 2) + 1) # noqa: E501
@staticmethod
def twosCom_dec2bin(dec, digit):
def twosCom_dec2bin(dec: int, digit: int):
if dec >= 0:
bin_ = bin(dec).split("0b")[1]
while len(bin_) < digit:
......@@ -893,7 +885,7 @@ class FDPacket(Packet):
return bin(dec - pow(2, digit)).split("0b")[1]
def _initial_unpack_packets_soh(bytestring):
def _initial_unpack_packets_soh(bytestring: bytes) -> np.ndarray:
"""
First unpack data with dtype matching itemsize of storage in the reftek
file, than allocate result array with dtypes for storage of python
......
......@@ -3,7 +3,7 @@ from typing import Union, Dict, List, Set, Tuple
from sohstationviewer.controller.plotting_data import format_time
from sohstationviewer.model.general_data.general_data import GeneralData
from sohstationviewer.model.mseed_data.mseed import MSeed
from sohstationviewer.model.reftek.reftek import RT130
from sohstationviewer.model.reftek_data.reftek import RT130
from sohstationviewer.view.util.functions import extract_netcodes
......
......@@ -160,7 +160,6 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
"""
self.bit_weight_opt: str = '' # currently only need one option
self.get_channel_prefer()
self.yyyy_mm_dd_action.triggered.emit()
"""
waveform_dlg: widget to display waveform channels' plotting
......@@ -196,6 +195,7 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.read_config()
self.validate_config()
self.apply_config()
self.yyyy_mm_dd_action.trigger()
@QtCore.Slot()
def save_plot(self):
......@@ -364,6 +364,8 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.time_to_date_edit.setDisplayFormat(qt_format)
self.time_from_date_edit.setDisplayFormat(qt_format)
self.date_format = display_format
self.tps_dlg.date_format = self.date_format
self.waveform_dlg.date_format = self.date_format
@QtCore.Slot()
def open_files_list_item_double_clicked(self, item: FileListItem):
......@@ -546,7 +548,8 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.is_plotting_waveform or self.is_plotting_tps)
if is_working:
msg = 'Already working'
display_tracking_info(self.tracking_info_text_browser, msg, 'info')
display_tracking_info(self.tracking_info_text_browser,
msg, LogType.INFO)
return
self.has_problem = False
......@@ -570,6 +573,13 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
else:
self.gap_minimum = None
# if waveform channels are selected, Event DS will be read from EH/ET
# header
# rt130_waveform_data_req is to read data for wave form data
rt130_waveform_data_req = False
if self.raw_check_box.isChecked() or self.tps_check_box.isChecked():
rt130_waveform_data_req = True
if self.mseed_wildcard_edit.text().strip() != '':
try:
check_chan_wildcards_format(self.mseed_wildcard_edit.text())
......@@ -656,7 +666,8 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
read_start=self.start_tm,
read_end=self.end_tm,
include_mp123=self.mass_pos_123zne_check_box.isChecked(),
include_mp456=self.mass_pos_456uvw_check_box.isChecked()
include_mp456=self.mass_pos_456uvw_check_box.isChecked(),
rt130_waveform_data_req=rt130_waveform_data_req
)
self.data_loader.worker.finished.connect(self.data_loaded)
......@@ -796,7 +807,8 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
try:
self.plotting_widget.plot_channels(
d_obj, sel_key, self.start_tm, self.end_tm, time_tick_total)
d_obj, sel_key, self.start_tm, self.end_tm, time_tick_total,
self.req_soh_chans)
except Exception:
fmt = traceback.format_exc()
msg = f"Can't plot SOH data due to error: {str(fmt)}"
......@@ -812,7 +824,8 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.is_plotting_tps = True
peer_plotting_widgets.append(self.tps_dlg.plotting_widget)
self.tps_dlg.set_data(
self.data_type, ','.join([str(d) for d in self.dir_names]))
self.data_type,
','.join([str(d) for d in self.dir_names]))
self.tps_dlg.show()
# The waveform and TPS plots is being stopped at the same time, so
# we can't simply reset all flags. Instead, we use an intermediate
......@@ -833,7 +846,8 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
# waveformPlot
peer_plotting_widgets.append(self.waveform_dlg.plotting_widget)
self.waveform_dlg.set_data(
self.data_type, ','.join([str(d) for d in self.dir_names]))
self.data_type,
','.join([str(d) for d in self.dir_names]))
self.waveform_dlg.show()
waveform_widget = self.waveform_dlg.plotting_widget
waveform_widget.stopped.connect(self.reset_is_plotting_waveform)
......@@ -951,7 +965,8 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
'WHERE current=1')
if len(rows) > 0:
self.pref_soh_list_name = rows[0]['name']
self.pref_soh_list = [t.strip() for t in rows[0]['IDs'].split(',')]
self.pref_soh_list = [t.strip() for t in rows[0]['IDs'].split(',')
if t.strip() != '']
self.pref_soh_list_data_type = rows[0]['dataType']
def resizeEvent(self, event):
......@@ -982,7 +997,7 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
"""
display_tracking_info(self.tracking_info_text_browser,
'Cleaning up...',
'info')
LogType.INFO)
if self.data_loader.running:
self.data_loader.thread.requestInterruption()
self.data_loader.thread.quit()
......
......@@ -7,7 +7,7 @@ import numpy as np
from obspy import UTCDateTime
from sohstationviewer.model.mseed_data.mseed import MSeed
from sohstationviewer.model.reftek.reftek import RT130
from sohstationviewer.model.reftek_data.reftek import RT130
from sohstationviewer.view.plotting.gps_plot.gps_point import GPSPoint
from sohstationviewer.view.util.enums import LogType
......
......@@ -11,6 +11,8 @@ from sohstationviewer.view.plotting.plotting_widget.plotting_processor import (
from sohstationviewer.view.plotting.plotting_widget.plotting_widget import (
PlottingWidget)
from sohstationviewer.view.util.enums import LogType
from sohstationviewer.view.util.functions import (
replace_actual_question_chans, remove_not_found_chans)
from sohstationviewer.controller.util import display_tracking_info
from sohstationviewer.controller.plotting_data import get_title
......@@ -28,7 +30,8 @@ class MultiThreadedPlottingWidget(PlottingWidget):
def __init__(self, *args, **kwargs):
PlottingWidget.__init__(self, *args, **kwargs)
self.data_processors: List[PlottingChannelProcessor] = []
# pref_order: order of channels to be plotted
self.pref_order: List[str] = []
# Only one data processor can run at a time, so it is not a big problem
#
self.thread_pool = QtCore.QThreadPool()
......@@ -105,19 +108,33 @@ class MultiThreadedPlottingWidget(PlottingWidget):
return True
def create_plotting_channel_processors(
self, plotting_data: Dict, need_db_info: bool = False) -> None:
self, plotting_data: Dict,
need_db_info: bool = False) -> None:
"""
Create a data processor for each channel data.
Create a data processor for each channel data in the order of
pref_order. If pref_order isn't given, process in order of
plotting_data.
:param plotting_data: dict of data by chan_id
:param need_db_info: flag to get db info
"""
for chan_id in plotting_data:
chan_order = self.pref_order if self.pref_order \
else sorted(list(plotting_data.keys()))
chan_order = replace_actual_question_chans(
chan_order, list(plotting_data.keys()))
chan_order = remove_not_found_chans(
chan_order, list(plotting_data.keys()), self.processing_log)
not_plot_chans = []
for chan_id in chan_order:
if need_db_info:
chan_db_info = get_chan_plot_info(
chan_id, self.parent.data_type, self.c_mode)
if chan_db_info['height'] == 0:
chan_db_info = get_chan_plot_info(chan_id,
self.parent.data_type,
self.c_mode)
if (chan_db_info['height'] == 0 or
chan_db_info['plotType'] == ''):
# not draw
not_plot_chans.append(chan_id)
continue
if 'DEFAULT' in chan_db_info['channel']:
msg = (f"Channel {chan_id}'s "
......@@ -127,14 +144,16 @@ class MultiThreadedPlottingWidget(PlottingWidget):
# instruction here
self.processing_log.append((msg, LogType.WARNING))
if chan_db_info['plotType'] == '':
continue
plotting_data[chan_id]['chan_db_info'] = chan_db_info
if not_plot_chans != []:
msg = (f"The database settings 'plotType' or 'height' show not to "
f"be plotted for the following channels: "
f"{', '.join( not_plot_chans)}")
self.processing_log.append((msg, LogType.WARNING))
self.move_soh_channels_with_link_to_the_end()
for chan_id in plotting_data:
for chan_id in chan_order:
if 'chan_db_info' not in plotting_data[chan_id]:
continue
channel_processor = PlottingChannelProcessor(
......@@ -165,7 +184,8 @@ class MultiThreadedPlottingWidget(PlottingWidget):
for channel in channels_to_move:
self.plotting_data1[channel] = self.plotting_data1.pop(channel)
def plot_channels(self, d_obj, key, start_tm, end_tm, time_ticks_total):
def plot_channels(self, d_obj, key, start_tm, end_tm, time_ticks_total,
pref_order=[]):
"""
Prepare to plot waveform/SOH/mass-position data by creating a data
processor for each channel, then, run the processors.
......@@ -175,12 +195,14 @@ class MultiThreadedPlottingWidget(PlottingWidget):
:param start_tm: requested start time to read
:param end_tm: requested end time to read
:param time_ticks_total: max number of tick to show on time bar
:param pref_order: order of channels to be plotted
"""
self.pref_order = pref_order
if not self.is_working:
self.reset_widget()
self.is_working = True
start_msg = f'Plotting {self.name} data...'
display_tracking_info(self.tracking_box, start_msg, 'info')
display_tracking_info(self.tracking_box, start_msg, LogType.INFO)
ret = self.init_plot(d_obj, key, start_tm, end_tm,
time_ticks_total)
if not ret:
......@@ -188,8 +210,10 @@ class MultiThreadedPlottingWidget(PlottingWidget):
self.clean_up()
self.finished.emit()
return
self.create_plotting_channel_processors(self.plotting_data1, True)
self.create_plotting_channel_processors(self.plotting_data2, True)
self.process_channel()
@QtCore.Slot()
......@@ -307,32 +331,6 @@ class MultiThreadedPlottingWidget(PlottingWidget):
all running background threads.
"""
display_tracking_info(self.tracking_box,
f'{self.name} plot stopped', 'info')
f'{self.name} plot stopped', LogType.INFO)
self.is_working = False
self.stopped.emit()
def set_lim(self, first_time=False, is_waveform=False):
"""
The set_lim method of the base class PlottingWidget was not designed
with multi-threading in mind, so it made some assumption that is
difficult to satisfy in a multi-threaded design. While these
assumptions do not affect the initial plotting of the data, they make
designing a system for zooming more difficult.
Rather than trying to comply with the design of PlottingWidget.set_lim,
we decide to work around. This set_lim method still keeps the
functionality of processing the data based on the zoom range. However,
it delegates setting the new limit of the x and y axes to
PlottingWidget.set_lim.
:param first_time: flag that indicate whether set_lim is called the
fist time for a data set.
"""
self.data_processors = []
if not self.is_working:
self.is_working = True
start_msg = 'Zooming in...'
display_tracking_info(self.tracking_box, start_msg, 'info')
self.create_plotting_channel_processors(self.plotting_data1)
self.create_plotting_channel_processors(self.plotting_data2)
self.process_channel()
......@@ -37,6 +37,7 @@ class Plotting:
self.parent.plotting_bot, plot_h, has_min_max_lines=False)
ax.x = None
ax.plot([0], [0], linestyle="")
ax.chan_db_info = None
return ax
def plot_multi_color_dots(self, c_data, chan_db_info, chan_id,
......@@ -107,7 +108,6 @@ class Plotting:
total_samples = len(x)
x = sorted(x)
if len(colors) != 1:
sample_no_colors = [clr['W']]
else:
......@@ -116,10 +116,9 @@ class Plotting:
self.plotting_axes.set_axes_info(
ax, [total_samples], sample_no_colors=sample_no_colors,
chan_db_info=chan_db_info, linked_ax=linked_ax)
if linked_ax is None:
ax.x = x
else:
ax.linkedX = x
ax.x_list = c_data['times']
ax.chan_db_info = chan_db_info
return ax
def plot_up_down_dots(self, c_data, chan_db_info, chan_id, ax, linked_ax):
......@@ -172,18 +171,20 @@ class Plotting:
ax.plot(points_list[1], len(points_list[1]) * [0.5], linestyle="",
marker='s', markersize=2, zorder=constants.Z_ORDER['DOT'],
color=clr[colors[1]], picker=True, pickradius=3)
x = points_list[0] + points_list[1]
x = sorted(x)
ax.set_ylim(-2, 2)
self.plotting_axes.set_axes_info(
ax, [len(points_list[0]), len(points_list[1])],
sample_no_colors=[clr[colors[0]], clr[colors[1]]],
sample_no_pos=[0.25, 0.75],
chan_db_info=chan_db_info, linked_ax=linked_ax)
if linked_ax is None:
ax.x = x
else:
ax.linkedX = x
# x_bottom, x_top are the times of data points to be displayed at
# bottom or top of the plot
ax.x_bottom = np.array(points_list[0])
ax.x_top = np.array(points_list[1])
ax.chan_db_info = chan_db_info
return ax
def plot_time_dots(self, c_data, chan_db_info, chan_id, ax, linked_ax):
......@@ -222,10 +223,8 @@ class Plotting:
linestyle='', zorder=constants.Z_ORDER['LINE'],
color=clr[color], picker=True,
pickradius=3)
if linked_ax is None:
ax.x_list = x_list
else:
ax.linkedX = x_list
ax.x_list = x_list
ax.chan_db_info = chan_db_info
return ax
def plot_lines_dots(self, c_data, chan_db_info, chan_id,
......@@ -270,13 +269,14 @@ class Plotting:
obj, c = cStr.split(':')
colors[obj] = c
l_color = 'G'
d_color = 'W'
has_dot = False
if 'L' in colors:
l_color = colors['L']
if 'D' in colors:
d_color = colors['D']
has_dot = True
else:
d_color = l_color
if chan_id == 'GPS Lk/Unlk':
sample_no_list = []
......@@ -292,7 +292,7 @@ class Plotting:
info=info, y_list=y_list, linked_ax=linked_ax)
for x, y in zip(x_list, y_list):
if not has_dot:
if not has_dot and sample_no_list[0] > 1:
# set marker to be able to click point for info
# but marker's size is small to not show dot.
ax.myPlot = ax.plot(x, y, marker='o', markersize=0.01,
......@@ -309,12 +309,9 @@ class Plotting:
mec=clr[d_color],
picker=True, pickradius=3)
if linked_ax is None:
ax.x_list = x_list
ax.y_list = y_list
else:
ax.linkedX = x_list
ax.linkedY = y_list
ax.x_list = x_list
ax.y_list = y_list
ax.chan_db_info = chan_db_info
return ax
def plot_lines_s_rate(self, c_data, chan_db_info, chan_id, ax, linked_ax):
......@@ -399,4 +396,5 @@ class Plotting:
zorder=constants.Z_ORDER['DOT'])
ax.x_list = x_list
ax.y_list = y_list
ax.chan_db_info = chan_db_info
return ax
......@@ -184,13 +184,6 @@ class PlottingAxes:
axes, label of channel will be displayed with sub title's
format - under main title.
"""
if linked_ax is None:
# clear all texts before recreated.
# But not clear when this is a linked_ax because texts are already
# cleared with ax, if clear with linked_ax all info of ax won't be
# displayed
ax.texts.clear()
if label is None:
label = chan_db_info['label']
......@@ -229,7 +222,11 @@ class PlottingAxes:
# set samples' total on right side
if len(sample_no_list) == 1:
ax.sampleLbl = ax.text(
# center_total_point_lbl: The label to display total number of data
# points for plots whose ax has attribute x_list.
# The plotTypes that use this label are linesDot, linesSRate,
# linesMassPos, dotForTime, multiColorDot
ax.center_total_point_lbl = ax.text(
1.005, 0.5,
sample_no_list[0],
horizontalalignment='left',
......@@ -240,14 +237,13 @@ class PlottingAxes:
size=self.parent.font_size
)
else:
# Each zoom this infor is created again.
# Plots that have data separated in two to have text in top and
# bottom, sample rate= 1. These numbers completely depends
# on data created in trim_downsample_chan_with_spr_less_or_equal_1
# and won't be changed in set_lim, then don't need to assign a
# variable for it.
# bottom
ax.text(
# bottom_total_point_lbl, top_total_point_lbl are label to diplay
# total number of data points which are splitted into top
# and bottom. The ax needs to include attributes x_bottom and x_top
# The plotTypes that use these labels are upDownDots (and linesDot
# with channel='GPS Lk/Unlk' which will have another MR to add
# x_bottom and x_top for this)
ax.bottom_total_point_lbl = ax.text(
1.005, sample_no_pos[0],
sample_no_list[0],
horizontalalignment='left',
......@@ -258,7 +254,7 @@ class PlottingAxes:
size=self.parent.font_size
)
# top
ax.text(
ax.top_total_point_lbl = ax.text(
1.005, sample_no_pos[1],
sample_no_list[1],
horizontalalignment='left',
......
......@@ -2,6 +2,7 @@
Class of which object is used to plot data
"""
from typing import List, Optional, Union
import numpy as np
import matplotlib.text
from matplotlib import pyplot as pl
from matplotlib.transforms import Bbox
......@@ -466,26 +467,6 @@ class PlottingWidget(QtWidgets.QScrollArea):
ruler.set_visible(True)
ruler.xy1 = (xdata, 0)
ruler.xy2 = (xdata, self.bottom)
try:
if ruler == self.zoom_marker2:
# make zoom_marker2 follow mouse.
# need to disconnect when state of rulers change
self.follower = self.fig.canvas.mpl_connect(
"motion_notify_event", self.zoom_marker2_follow_mouse)
except AttributeError:
pass
def zoom_marker2_follow_mouse(self, mouseevent):
"""
Set zoom_marker2's to follow mouse's x.
:param mouseevent: motion_notify_event - event to help keeping track
of mouse move
"""
xdata = self.get_timestamp(mouseevent)
self.zoom_marker2.xy1 = (xdata, 0)
self.zoom_marker2.xy2 = (xdata, self.bottom)
self.draw()
def keyPressEvent(self, event):
"""
......@@ -557,7 +538,7 @@ class PlottingWidget(QtWidgets.QScrollArea):
self.max_x)]
# reset total of samples on the right
self.gap_bar.sampleLbl.set_text(len(new_gaps))
self.gap_bar.center_total_point_lbl.set_text(len(new_gaps))
for ax in self.axes:
if hasattr(ax, 'x') and ax.x is None:
......@@ -568,10 +549,25 @@ class PlottingWidget(QtWidgets.QScrollArea):
if not first_time:
new_min_y = None
new_max_y = None
if hasattr(ax, 'x_top'):
# plot_up_down_dots
new_x_bottom_indexes = np.where(
(ax.x_bottom >= self.min_x) &
(ax.x_bottom <= self.max_x))[0]
ax.bottom_total_point_lbl.set_text(
new_x_bottom_indexes.size)
new_x_top_indexes = np.where(
(ax.x_top >= self.min_x) &
(ax.x_top <= self.max_x))[0]
ax.top_total_point_lbl.set_text(
new_x_top_indexes.size)
if hasattr(ax, 'x_list'):
if not hasattr(ax, 'y_list'):
# dotForTime plots have attribute 'x_list' but not
# 'y_list'
# plot_time_dots and plot_multi_color_dots
x = ax.x_list[0]
new_x_indexes = np.where(
(x >= self.min_x) & (x <= self.max_x))[0]
ax.center_total_point_lbl.set_text(new_x_indexes.size)
continue
total_points = 0
tr_min_ys = []
......@@ -590,21 +586,12 @@ class PlottingWidget(QtWidgets.QScrollArea):
if tr_min_ys != []:
new_min_y = min(tr_min_ys)
new_max_y = max(tr_max_ys)
else:
total_points = len(ax.x)
if hasattr(ax, 'y') and len(ax.y) > 0:
new_min_y = min(ax.y)
new_max_y = max(ax.y)
try:
ax.sampleLbl.set_text(total_points)
except AttributeError:
# for case of having top and bottom total points
# which is for RT130's SOH only, trust in total point
# calculated in set_axes_info
pass
if new_min_y is not None:
self.plotting_axes.set_axes_ylim(ax, new_min_y, new_max_y)
# in case total_points == 1, y lim shouldn't be set
# again or the plot would be collapsed to one line
if total_points > 1:
self.plotting_axes.set_axes_ylim(
ax, new_min_y, new_max_y)
ax.center_total_point_lbl.set_text(total_points)
def draw(self):
"""
......
......@@ -66,19 +66,8 @@ class SOHWidget(MultiThreadedPlottingWidget):
linked_ax = None
if chan_db_info['linkedChan'] not in [None, 'None', '']:
linked_ax = self.plotting_data1[chan_db_info['linkedChan']]['ax']
if 'ax' not in c_data:
ax = getattr(self.plotting, plot_functions[plot_type][1])(
c_data, chan_db_info, chan_id, None, linked_ax)
if ax is None:
return
c_data['ax'] = ax
ax.chan = chan_id
self.axes.append(ax)
else:
for artist in c_data['ax'].lines + c_data['ax'].collections:
artist.remove()
getattr(self.plotting, plot_functions[plot_type][1])(
c_data, chan_db_info, chan_id, c_data['ax'], linked_ax)
def set_lim(self, first_time=False):
super().set_lim(first_time, is_waveform=False)
ax = getattr(self.plotting, plot_functions[plot_type][1])(
c_data, chan_db_info, chan_id, None, linked_ax)
c_data['ax'] = ax
ax.chan = chan_id
self.axes.append(ax)
......@@ -498,6 +498,11 @@ class TimePowerSquaredDialog(QtWidgets.QWidget):
data_type: str - type of data being plotted
"""
self.data_type = None
"""
date_format: format for date
"""
self.date_format: str = 'YYYY-MM-DD'
self.setGeometry(50, 50, 1200, 700)
self.setWindowTitle("TPS Plot")
......@@ -577,16 +582,16 @@ class TimePowerSquaredDialog(QtWidgets.QWidget):
self.connect_signals()
self.color_range_changed()
def set_data(self, data_type, file_name):
def set_data(self, data_type: str, folder_name: str):
"""
Set data_type and the window's title.
:param data_type: str - data type of data being plotted
:param file_name: str - name of the file/folder of the data set to be
:param data_type: data type of data being plotted
:param folder_name: name of the folder of the data set to be
displayed
"""
self.data_type = data_type
self.setWindowTitle("TPS Plot %s - %s" % (data_type, file_name))
self.setWindowTitle("TPS Plot %s - %s" % (data_type, folder_name))
def resizeEvent(self, event):
"""
......
......@@ -51,22 +51,11 @@ class WaveformWidget(MultiThreadedPlottingWidget):
plot_type = chan_db_info['plotType']
# refer to doc string for mass_pos_data to know the reason for 'ax_wf'
if 'ax_wf' not in c_data:
ax = getattr(self.plotting, plot_functions[plot_type][1])(
c_data, chan_db_info, chan_id, None, None)
if ax is None:
return
c_data['ax_wf'] = ax
ax.chan = chan_id
self.axes.append(ax)
else:
for artist in c_data['ax_wf'].lines + c_data['ax_wf'].collections:
artist.remove()
getattr(self.plotting, plot_functions[plot_type][1])(
c_data, chan_db_info, chan_id, c_data['ax_wf'], None)
def set_lim(self, first_time=False):
super().set_lim(first_time, is_waveform=True)
ax = getattr(self.plotting, plot_functions[plot_type][1])(
c_data, chan_db_info, chan_id, None, None)
c_data['ax_wf'] = ax
ax.chan = chan_id
self.axes.append(ax)
class WaveformDialog(QtWidgets.QWidget):
......@@ -85,6 +74,10 @@ class WaveformDialog(QtWidgets.QWidget):
data_type: str - type of data being plotted
"""
self.data_type = None
"""
date_format: format for date
"""
self.date_format: str = 'YYYY-MM-DD'
self.setGeometry(50, 10, 1600, 700)
self.setWindowTitle("Raw Data Plot")
......@@ -118,12 +111,12 @@ class WaveformDialog(QtWidgets.QWidget):
self.info_text_browser.setFixedHeight(60)
bottom_layout.addWidget(self.info_text_browser)
def set_data(self, data_type, folder_name):
def set_data(self, data_type: str, folder_name: str):
"""
Set data_type and the window's title.
:param data_type: str - data type of data being plotted
:param folder_name: str - name of the folder of the data set to be
:param data_type: data type of data being plotted
:param folder_name: name of the folder of the data set to be
displayed
"""
self.data_type = data_type
......
......@@ -721,7 +721,6 @@ class UIMainWindow(object):
lambda: main_window.set_date_format('YYYYMMMDD'))
self.yyyy_doy_action.triggered.connect(
lambda: main_window.set_date_format('YYYY:DOY'))
self.yyyy_mm_dd_action.trigger()
# Database
self.add_edit_data_type_action.triggered.connect(
......
......@@ -254,12 +254,8 @@ def get_total_miny_maxy(
if new_x.size == 0:
return 0, None, None
new_min_x = min(new_x)
new_max_x = max(new_x)
new_min_x_index = np.where(x == new_min_x)[0][0]
new_max_x_index = np.where(x == new_max_x)[0][0]
new_min_x_index = min(new_x_indexes)
new_max_x_index = max(new_x_indexes)
new_y = y[new_min_x_index:new_max_x_index + 1]
new_min_y = min(new_y)
new_max_y = max(new_y)
......@@ -328,5 +324,49 @@ def get_index_from_time(chan_data: List[np.ndarray], tm: float, val: float) \
return list_idx, section_idx
def remove_not_found_chans(
chan_order: List[str], actual_chans: List[str],
processing_log: List[Tuple[str, LogType]]) -> List[str]:
"""
Remove channels that are not found in actual_chans from chan_order.
:param chan_order: list of channels in order that user wants to plot
:param actual_chans: The actual channel list
:param processing_log: The log list to keep track with not found channels
:return: chan_order from which not found channels have been removed.
"""
not_found_chans = [c for c in chan_order if c not in actual_chans]
if not_found_chans != []:
msg = (f"No data found for the following channels: "
f"{', '.join(not_found_chans)}")
processing_log.append((msg, LogType.WARNING))
return [c for c in chan_order if c not in not_found_chans]
def replace_actual_question_chans(
chan_order: List[str], actual_chans: List[str]) -> List[str]:
"""
Remove channels end with '?' from chan_order and replace with corresponding
channels found in actual channels.
:param chan_order: The list of channel that have channels end with '?'
:param actual_chans: The actual channel list
:return: chan_order that have channels end with '?' replaced by actual
channels.
"""
question_chans = [c for c in chan_order if c.endswith('?')]
for qc in question_chans:
actual_question_chans = [c for c in list(actual_chans)
if qc[:-1] == c[:-1]]
if actual_question_chans:
question_idx = chan_order.index(qc)
chan_order.remove(qc)
# replace a question channel with the actual channels that it
# represent for
chan_order[question_idx:question_idx] = \
sorted(actual_question_chans)
return chan_order
if __name__ == '__main__':
create_table_of_content_file(Path('../../../documentation'))
import numpy as np
from pathlib import Path
from unittest import TestCase
from unittest.mock import patch
from sohstationviewer.model.general_data.general_data_helper import (
_check_related_gaps, squash_gaps, sort_data,
retrieve_data_time_from_data_dict, retrieve_gaps_from_data_dict,
combine_data, apply_convert_factor_to_data_dict
combine_data, apply_convert_factor_to_data_dict, read_text
)
TEST_DATA_DIR = Path(__file__).resolve().parent.parent.parent.joinpath(
'test_data')
text_file = TEST_DATA_DIR.joinpath(
"Pegasus-sample/Pegasus_SVC4/logs/2020/XX/KC01/XX.KC01...D.2020.129")
binary_file = TEST_DATA_DIR.joinpath(
"Pegasus-sample/Pegasus_SVC4/soh/2020/XX/KC01/VDT.D/"
"XX.KC01..VDT.D.2020.129")
class TestReadText(TestCase):
def test_text_file(self):
ret = read_text(text_file)
expected_ret = (
"\n\n** STATE OF HEALTH: XX.KC01...D.2020.129"
"\n2020-05-08 22:55:45.390 UTC: I(Initializations): Firmware")
self.assertEqual(ret[:100], expected_ret
)
def test_binary_file(self):
ret = read_text(binary_file)
self.assertIsNone(ret)
class TestCheckRelatedGaps(TestCase):
# FROM test_handling_data_rearrange_data.TestCheckRelatedGaps
......
from unittest import TestCase
from pathlib import Path
from sohstationviewer.model.mseed_data.mseed_helper import (
retrieve_nets_from_data_dict, read_text
retrieve_nets_from_data_dict
)
TEST_DATA_DIR = Path(__file__).resolve().parent.parent.parent.joinpath(
'test_data')
text_file = TEST_DATA_DIR.joinpath(
"Pegasus-sample/Pegasus_SVC4/logs/2020/XX/KC01/XX.KC01...D.2020.129")
binary_file = TEST_DATA_DIR.joinpath(
"Pegasus-sample/Pegasus_SVC4/soh/2020/XX/KC01/VDT.D/"
"XX.KC01..VDT.D.2020.129")
class TestReadText(TestCase):
def test_text_file(self):
ret = read_text(text_file)
expected_ret = (
"\n\n** STATE OF HEALTH: XX.KC01...D.2020.129"
"\n2020-05-08 22:55:45.390 UTC: I(Initializations): Firmware")
self.assertEqual(ret[:100], expected_ret
)
def test_binary_file(self):
ret = read_text(binary_file)
self.assertIsNone(ret)
class TestRetrieveNetsFromDataDict(TestCase):
def setUp(self):
......
from unittest import TestCase
from pathlib import Path
from sohstationviewer.model.reftek_data.reftek import RT130
from sohstationviewer.model.general_data.general_data import \
ProcessingDataError
TEST_DATA_DIR = Path(__file__).resolve().parent.parent.parent.joinpath(
'test_data')
reftek_data = TEST_DATA_DIR.joinpath("RT130-sample")
reftek_gap_data = TEST_DATA_DIR.joinpath("RT130-gap")
class TestReftek(TestCase):
def test_path_not_exist(self):
# raise exception when path not exist
args = {
'data_type': 'RT130',
'folder': '_',
'rt130_waveform_data_req': False,
'on_unittest': True
}
with self.assertRaises(ProcessingDataError) as context:
RT130(**args)
self.assertEqual(
str(context.exception),
"Path '_' not exist"
)
def test_read_soh(self):
args = {
'data_type': 'RT130',
'folder': reftek_data,
'req_soh_chans': [],
'rt130_waveform_data_req': False,
'on_unittest': True
}
expected_soh = [
'SOH/Data Def', 'Battery Volt', 'DAS Temp', 'Backup Volt',
'Disk Usage1', 'Disk Usage2', 'Dump Called/Comp', 'GPS On/Off/Err',
'GPS Lk/Unlk', 'Clk Phase Err']
obj = RT130(**args)
self.assertEqual(obj.found_data_streams, [9])
self.assertEqual(obj.keys, [('92EB', '25')])
self.assertEqual(
list(obj.stream_header_by_key_chan[('92EB', '25')].keys()),
[])
self.assertEqual(list(obj.log_data.keys()), ['TEXT', ('92EB', '25')])
self.assertEqual(len(obj.log_data['TEXT']), 0)
self.assertEqual(list(obj.log_data[('92EB', '25')].keys()), ['SOH'])
self.assertEqual(len(obj.log_data[('92EB', '25')]['SOH']), 1)
self.assertEqual(
obj.log_data[('92EB', '25')]['SOH'][0][:100],
'\nState of Health 17:150:00:00:00:000 ST: 92EB'
'\n150:00:00:00 REF TEK 130'
'\r\n150:00:00:00 CPU SOFTWARE')
self.assertEqual(list(obj.soh_data.keys()), [('92EB', '25')])
self.assertEqual(list(obj.soh_data[('92EB', '25')].keys()),
expected_soh)
def test_read_waveform(self):
args = {
'data_type': 'RT130',
'folder': reftek_data,
'req_soh_chans': [],
'req_wf_chans': [1],
'rt130_waveform_data_req': True,
'on_unittest': True
}
expected_waveform = ['DS1-1', 'DS1-2', 'DS1-3']
obj = RT130(**args)
self.assertEqual(obj.found_data_streams, [9, 1, 1])
self.assertEqual(obj.keys, [('92EB', '25')])
self.assertEqual(
list(obj.stream_header_by_key_chan[('92EB', '25')].keys()),
expected_waveform)
self.assertEqual(list(obj.waveform_data[('92EB', '25')].keys()),
expected_waveform)
self.assertEqual(list(obj.log_data.keys()), ['TEXT', ('92EB', '25')])
self.assertIn('Event DS1',
list(obj.soh_data[('92EB', '25')].keys()))
def test_read_mass_pos(self):
args = {
'data_type': 'RT130',
'folder': reftek_data,
'req_soh_chans': ['_'],
'include_mp123zne': True,
'rt130_waveform_data_req': False,
'on_unittest': True
}
expected_mass_pos = ['MassPos1', 'MassPos2', 'MassPos3']
obj = RT130(**args)
self.assertEqual(obj.found_data_streams, [9])
self.assertEqual(obj.keys, [('92EB', '25')])
self.assertEqual(
list(obj.stream_header_by_key_chan[('92EB', '25')].keys()),
expected_mass_pos)
self.assertEqual(list(obj.mass_pos_data[('92EB', '25')].keys()),
expected_mass_pos)
self.assertEqual(list(obj.log_data.keys()), ['TEXT', ('92EB', '25')])
def test_gap(self):
expected_waveform = ['DS2-1', 'DS2-2', 'DS2-3']
with self.subTest("no gap_minimum set"):
args = {
'data_type': 'RT130',
'folder': reftek_gap_data,
'req_soh_chans': [],
'req_wf_chans': ['*'],
'rt130_waveform_data_req': True,
'on_unittest': True
}
obj = RT130(**args)
self.assertEqual(obj.found_data_streams, [2, 2])
self.assertEqual(obj.keys, [('98AD', '0')])
self.assertEqual(
list(obj.stream_header_by_key_chan[('98AD', '0')].keys()),
expected_waveform)
self.assertEqual(list(obj.waveform_data[('98AD', '0')].keys()),
expected_waveform)
self.assertEqual(list(obj.log_data.keys()),
['TEXT', ('98AD', '0')])
self.assertEqual(obj.gaps[('98AD', '0')], [])
with self.subTest("has gap_minimum set"):
args = {
'data_type': 'RT130',
'folder': reftek_gap_data,
'req_soh_chans': [],
'req_wf_chans': ['*'],
'rt130_waveform_data_req': True,
'gap_minimum': 60,
'on_unittest': True
}
obj = RT130(**args)
self.assertEqual(obj.found_data_streams, [2, 2])
self.assertEqual(obj.keys, [('98AD', '0')])
self.assertEqual(
list(obj.stream_header_by_key_chan[('98AD', '0')].keys()),
expected_waveform)
self.assertEqual(list(obj.waveform_data[('98AD', '0')].keys()),
expected_waveform)
self.assertEqual(list(obj.log_data.keys()),
['TEXT', ('98AD', '0')])
self.assertEqual(obj.gaps[('98AD', '0')],
[[1648493999.64, 1648508400.64]])
File added
File added
import os
import unittest
from pathlib import Path
import numpy
import obspy.core
from numpy.testing import assert_array_equal
from sohstationviewer.model.reftek_data.reftek_reader.core import (
DiscontinuousTrace,
Reftek130,
)
from sohstationviewer.model.reftek_data.reftek_reader.header import \
NotRT130FileError
class TestDiscontinuousTrace(unittest.TestCase):
def setUp(self) -> None:
data = numpy.arange(1024)
stub_stats = obspy.core.Stats()
times = numpy.arange(1024)
self.trace = DiscontinuousTrace(data, stub_stats, times=times)
def test_times_argument_is_stored(self):
self.assertTrue(hasattr(self.trace, '_times'))
def test_times_utcdatetime(self):
with self.assertRaises(NotImplementedError):
self.trace.times('utcdatetime')
def test_times_matplotlib(self):
with self.assertRaises(NotImplementedError):
self.trace.times('matplotlib')
def test_times_relative(self):
with self.subTest('test_relative_to_start_time'):
# The default start time of a trace is 0 anyhow, but we write that
# down explicitly for clarity.
self.trace.stats.starttime = obspy.core.UTCDateTime(0)
expected = numpy.arange(1024)
assert_array_equal(self.trace.times('relative'), expected)
with self.subTest('test_relative_to_given_reftime'):
reftime = obspy.core.UTCDateTime(0)
expected = numpy.arange(1024)
assert_array_equal(self.trace.times('relative', reftime),
expected)
reftime = obspy.core.UTCDateTime(1024)
expected = numpy.arange(-1024, 0)
assert_array_equal(self.trace.times('relative', reftime),
expected)
reftime = obspy.core.UTCDateTime(-1024)
expected = numpy.arange(1024, 2048)
assert_array_equal(self.trace.times('relative', reftime),
expected)
def test_times_timestamp(self):
expected = numpy.arange(1024)
assert_array_equal(self.trace.times('timestamp'), expected)
class TestReftek130FromFile(unittest.TestCase):
def setUp(self) -> None:
self.TEST_DATA_DIR = Path(os.getcwd()).joinpath('tests/test_data')
self.rt130_dir = self.TEST_DATA_DIR.joinpath(
'RT130-sample/2017149.92EB/2017150/92EB'
)
def test_rt130_file(self):
file = self.rt130_dir.joinpath('0/000000000_00000000')
rt130 = Reftek130.from_file(file)
self.assertIsInstance(rt130, Reftek130)
def test_rt130_soh_file(self):
file = self.rt130_dir.joinpath('0/000000000_00000000')
rt130 = Reftek130.from_file(file)
# The most common SOH packet type looks to be SH, so we use that as
# the default.
self.assertIn(b'SH', rt130._data['packet_type'])
def test_rt130_raw_data_file(self):
file = self.rt130_dir.joinpath('1/000000015_0036EE80')
rt130 = Reftek130.from_file(file)
assert_array_equal(
numpy.unique(numpy.sort(rt130._data['packet_type'])),
numpy.sort([b'EH', b'DT', b'ET'])
)
def test_non_rt130_file(self):
with self.subTest('test_file_exist'):
test_file = self.TEST_DATA_DIR.joinpath(
'Q330-sample/day_vols_AX08/AX08.XA..HHE.2021.186'
)
with self.assertRaises(NotRT130FileError):
Reftek130.from_file(test_file)
with self.subTest('test_file_does_not_exist'):
test_file = ''
with self.assertRaises(FileNotFoundError):
Reftek130.from_file(test_file)