Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
Commits on Source (9)
......@@ -92,14 +92,14 @@ def format_time(time: Union[UTCDateTime, float], date_mode: str,
format = ''
if date_mode == 'YYYY-MM-DD':
format = '%Y-%m-%d'
elif date_mode == 'YYYYMMDD':
format = '%Y%m%d'
elif date_mode == 'YYYYMMMDD':
format = '%Y%b%d'
elif date_mode == 'YYYY:DOY':
format = '%Y:%j'
if time_mode == 'HH:MM:SS':
format += " %H:%M:%S"
ret = t.strftime(format)
ret = t.strftime(format).upper()
return ret
......
......@@ -635,107 +635,6 @@ def get_each_day_5_min_list(start_tm: float, end_tm: float) -> np.ndarray:
return every_day_5_min_list
def get_trim_tps_data(chan: Dict, start_tm: float, end_tm: float,
every_day_5_min_list: List[List[float]]
) -> Optional[bool]:
"""
Different with soh_data where times and data are each in one np.array,
in waveform_data, times and data are each kept in a list of np.memmap
files along with startTmEpoch and endTmEpoch.
chan['startIdx'] and chan['endIdx'] will be identify to exclude np.memmap
files that aren't in the zoom time range (startTm, endTm).
Data in np.memmap will be trimmed according to times then time-power-square
value for each 5 minutes will be calculated and saved in
chan['tps-data']: np.mean(np.square(5m data))
:param chan: dict - chan dictionary, refer to
DataTypeModel.__init__.waveform_data[key]['readData'][chan_id]
:param start_tm: float - start time of zoomed section
:param end_tm: float - end time of zoomed section
:param every_day_5_min_list: [[288 of floats], ] - the list of all start
of five minutes for every day in which each day has 288 of 5 minutes.
"""
# preset all 0 for all 5 minutes for each day
tps_data = np.zeros((len(every_day_5_min_list), const.NO_5M_DAY))
# zoom in to the given range
chan['startIdx'] = 0
chan['endIdx'] = len(chan['tracesInfo'])
if ((start_tm > chan['tracesInfo'][-1]['endTmEpoch']) or
(end_tm < chan['tracesInfo'][0]['startTmEpoch'])):
return False
indexes = [index for index, tr in enumerate(chan['tracesInfo'])
if tr['startTmEpoch'] > start_tm]
if indexes != []:
chan['startIdx'] = indexes[0]
if chan['startIdx'] > 0:
chan['startIdx'] -= 1 # startTm in middle of trace
else:
chan['startIdx'] = 0
indexes = [idx for (idx, tr) in enumerate(chan['tracesInfo'])
if tr['endTmEpoch'] <= end_tm]
if indexes != []:
chan['endIdx'] = indexes[-1]
if chan['endIdx'] < len(chan['tracesInfo']) - 1:
chan['endIdx'] += 1 # endTm in middle of trace
else:
chan['endIdx'] = 0
chan['endIdx'] += 1 # a[x:y+1] = [a[x], ...a[y]
z_traces_info = chan['tracesInfo'][chan['startIdx']:chan['endIdx']]
spr = chan['samplerate']
chan['tps_data'] = []
start_tps_tm = 0
acc_data_list = []
for tr_idx, tr in enumerate(z_traces_info):
times = np.memmap(tr['times_f'],
dtype='int64', mode='r',
shape=tr['size'])
data = np.memmap(tr['data_f'],
dtype='int64', mode='r',
shape=tr['size'])
start_index = 0
if tr_idx == 0:
# get index of times with closet value to startTm
start_index = np.abs(times - start_tm).argmin()
start_tps_tm = times[start_index]
# identify index in case of overlaps or gaps
index = np.where((every_day_5_min_list <= times[start_index]) &
(every_day_5_min_list + const.SEC_5M >
times[start_index]))
curr_row = index[0][0]
curr_col = index[1][0]
next_tps_tm = start_tps_tm + const.SEC_5M
while end_tm >= next_tps_tm:
next_index = int(start_index + spr * const.SEC_5M)
if next_index >= tr['size']:
acc_data_list.append(data[start_index:tr['size']])
break
else:
acc_data_list.append(np.square(data[start_index:next_index]))
acc_data = np.hstack(acc_data_list)
if acc_data.size == 0:
tps_data[curr_row, curr_col] = 0
else:
tps_data[curr_row, curr_col] = np.mean(acc_data)
start_index = next_index
curr_col += 1
acc_data_list = []
if curr_col == const.NO_5M_DAY:
curr_col = 0
curr_row += 1
next_tps_tm += const.SEC_5M
chan['tps_data'] = tps_data
def find_tps_tm(given_tm: float, each_day_5_min_list: List[List[float]]
) -> Tuple[float, float]:
"""
......
......@@ -302,7 +302,6 @@ class MSeed(DataTypeModel):
if not has_data:
continue
read_waveform_mseed(file_info['path2file'],
file_info['fileName'],
sta_id, chan_id, traces_info,
self.data_time[sta_id], self.tmp_dir)
file_info['read'] = True
......
......@@ -269,9 +269,7 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
which was clicked.
"""
print(f'Opening {item.text()}')
# TODO: Do something with the Path object,
# i.e., path.open(), or path.iterdir() ...
self.read_selected_files()
@QtCore.Slot()
def change_current_directory(self):
......@@ -449,15 +447,29 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.stop_load_data()
if self.is_plotting_soh:
display_tracking_info(self.tracking_info_text_browser,
'Stopping SOH plot...', 'info')
'Stopping SOH plot...')
if self.is_plotting_waveform:
display_tracking_info(self.waveform_dlg.info_text_browser,
'Stopping waveform plot...', 'info')
'Stopping waveform plot...')
waveform_widget = self.waveform_dlg.plotting_widget
running_processor = waveform_widget.data_processors[0]
running_processor.stopped.connect(self.reset_flags)
waveform_widget.request_stop()
self.waveform_dlg.plotting_widget.request_stop()
if self.is_plotting_tps:
display_tracking_info(self.tps_dlg.info_text_browser,
'Stopping TPS plot...')
tps_widget = self.tps_dlg.plotting_widget
tps_widget.request_stop()
def check_if_all_stopped(self):
"""
Check if everything has been stopped. If true, reset the is_stopping
flag.
"""
not_all_stopped = (self.is_loading_data or self.is_plotting_soh or
self.is_plotting_waveform or self.is_plotting_tps)
if not not_all_stopped:
self.is_stopping = False
@QtCore.Slot()
def data_loaded(self, data_obj: DataTypeModel):
......@@ -510,10 +522,18 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
peer_plotting_widgets = [self.plotting_widget]
if self.tps_check_box.isChecked():
self.is_plotting_tps = True
peer_plotting_widgets.append(self.tps_dlg.plotting_widget)
self.tps_dlg.set_data(
self.data_type, ','.join([str(d) for d in self.dir_names]))
self.tps_dlg.show()
# The waveform and TPS plots is being stopped at the same time, so
# we can't simply reset all flags. Instead, we use an intermediate
# method that check whether all plots have been stopped before
# resetting the is_stopping flag.
tps_widget = self.tps_dlg.plotting_widget
tps_widget.stopped.connect(self.reset_is_plotting_tps)
tps_widget.stopped.connect(self.check_if_all_stopped)
self.tps_dlg.plotting_widget.plot_channels(
self.start_tm, self.end_tm, sel_key,
do.data_time[sel_key],
......@@ -529,6 +549,9 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.waveform_dlg.set_data(
self.data_type, ','.join([str(d) for d in self.dir_names]))
self.waveform_dlg.show()
waveform_widget = self.waveform_dlg.plotting_widget
waveform_widget.stopped.connect(self.reset_is_plotting_waveform)
waveform_widget.stopped.connect(self.check_if_all_stopped)
self.waveform_dlg.plotting_widget.plot_channels(
self.start_tm, self.end_tm, sel_key,
do.data_time[sel_key], time_tick_total,
......@@ -565,6 +588,22 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.is_plotting_tps = False
self.is_stopping = False
@QtCore.Slot()
def reset_is_plotting_waveform(self):
"""
Reset the is_plotting_waveform flag. Used because lambda does not allow
assignment.
"""
self.is_plotting_waveform = False
@QtCore.Slot()
def reset_is_plotting_tps(self):
"""
Reset the is_plotting_tps flag. Used because lambda does not allow
assignment.
"""
self.is_plotting_tps = False
def set_current_directory(self, path: str = '') -> None:
"""
Update currentDirectory with path in DB table PersistentData.
......
# Display time-power-squared values for waveform data
from math import sqrt
import numpy as np
from typing import List
import numpy as np
from PySide2 import QtWidgets, QtCore
from sohstationviewer.view.plotting.plotting_widget import plotting_widget
from sohstationviewer.view.util.color import clr
from sohstationviewer.conf import constants as const
from sohstationviewer.controller.plotting_data import (
get_title, get_day_ticks, format_time)
get_title, get_day_ticks, format_time,
)
from sohstationviewer.controller.util import (
display_tracking_info, add_thousand_separator
display_tracking_info, add_thousand_separator,
)
from sohstationviewer.model.handling_data import (
get_trim_tps_data, get_each_day_5_min_list, find_tps_tm)
from sohstationviewer.database.extract_data import (
get_color_def, get_color_ranges, get_chan_label)
from sohstationviewer.conf import constants as const
get_color_def, get_color_ranges, get_chan_label,
)
from sohstationviewer.model.handling_data import (
get_each_day_5_min_list, find_tps_tm,
)
from sohstationviewer.view.plotting.plotting_widget import plotting_widget
from sohstationviewer.view.plotting.time_power_squared_processor import (
TimePowerSquaredProcessor,
)
from sohstationviewer.view.util.color import clr
class TimePowerSquaredWidget(plotting_widget.PlottingWidget):
stopped = QtCore.Signal()
"""
Widget to display time power square data for waveform channels
"""
......@@ -56,6 +60,17 @@ class TimePowerSquaredWidget(plotting_widget.PlottingWidget):
"""
self.tps_t = 0
self.tps_processors: List[TimePowerSquaredProcessor] = []
# The list of all channels that are processed.
self.channels = []
# The list of channels that have been processed.
self.processed_channels = []
# The post-processing step does not take too much time so there is no
# need to limit the number of threads that can run at once.
self.thread_pool = QtCore.QThreadPool()
self.finished_lock = QtCore.QMutex()
super().__init__(*args, **kwarg)
def plot_channels(self, start_tm, end_tm, key,
......@@ -71,6 +86,12 @@ class TimePowerSquaredWidget(plotting_widget.PlottingWidget):
:param waveform_data: dict - read waveform data of selected data set,
refer to DataTypeModel.__init__.waveform_data[key]['read_data']
"""
self.processed_channels = []
self.channels = []
self.tps_processors = []
start_msg = 'Plotting TPS data...'
display_tracking_info(self.tracking_box, start_msg)
self.processing_log = [] # [(message, type)]
self.gap_bar = None
self.min_x = max(data_time[0], start_tm)
......@@ -78,6 +99,7 @@ class TimePowerSquaredWidget(plotting_widget.PlottingWidget):
if self.axes:
self.plotting_axes.fig.clear()
self.draw()
self.date_mode = self.parent.date_format.upper()
if waveform_data == {}:
title = "NO WAVEFORM DATA TO DISPLAY."
......@@ -104,8 +126,60 @@ class TimePowerSquaredWidget(plotting_widget.PlottingWidget):
self.each_day5_min_list = get_each_day_5_min_list(self.min_x,
self.max_x)
for chan_id in self.plotting_data1:
ax = self.get_plot_data(self.plotting_data1[chan_id], chan_id)
c_data = self.plotting_data1[chan_id]
if 'tps_data' not in c_data:
self.channels.append(chan_id)
channel_processor = TimePowerSquaredProcessor(
chan_id, c_data, self.min_x, self.max_x,
self.each_day5_min_list
)
channel_processor.signals.finished.connect(self.channel_done)
channel_processor.signals.stopped.connect(self.channel_done)
self.tps_processors.append(channel_processor)
# Because the widget determine if processing is done by comparing the
# lists of scheduled and finished channels, if a channel runs fast
# enough that it finishes before any other channel can be scheduled,
# it will be the only channel executed. To prevent this, we tell the
# threadpool to only start running the processors once all channels
# have been scheduled.
for processor in self.tps_processors:
self.thread_pool.start(processor)
@QtCore.Slot()
def channel_done(self, chan_id: str):
"""
Slot called when a TPS processor is finished. Plot the TPS data of
channel chan_id if chan_id is not an empty string and add chan_id to
the list of processed of channels. If the list of processed channels
is the same as the list of all channels, notify the user that the
plotting is finished and add finishing touches to the plot.
If chan_id is the empty string, notify the user that the plotting has
been stopped.
:param chan_id: the name of the channel whose TPS data was processed.
If the TPS plot is stopped before it is finished, this will be the
empty string
"""
self.finished_lock.lock()
if chan_id != '':
ax = self.plot_channel(self.plotting_data1[chan_id], chan_id)
self.axes.append(ax)
self.processed_channels.append(chan_id)
if len(self.processed_channels) == len(self.channels):
if chan_id == '':
stopped_msg = 'TPS plot stopped.'
display_tracking_info(self.tracking_box, stopped_msg)
else:
finished_msg = 'TPS plot finished.'
display_tracking_info(self.tracking_box, finished_msg)
self.done()
self.stopped.emit()
self.finished_lock.unlock()
def done(self):
"""Add finishing touches to the plot and display it on the screen."""
self.set_legend()
# Set view size fit with the given data
if self.main_widget.geometry().height() < self.plotting_bot_pixel:
......@@ -113,7 +187,7 @@ class TimePowerSquaredWidget(plotting_widget.PlottingWidget):
self.set_lim_markers()
self.draw()
def get_plot_data(self, c_data, chan_id):
def plot_channel(self, c_data, chan_id):
"""
TPS is plotted in lines of small rectangular, so called bars.
Each line is a day so - y value is the order of days
......@@ -125,10 +199,8 @@ class TimePowerSquaredWidget(plotting_widget.PlottingWidget):
based on mapping between tps value of the five minutes against
the selected color range.
This function trim data to minx, max_x and calculate time-power-square
for each 5 minute into c_data['tps_data'] then draw each 5 minute
with the color corresponding to value.
Create ruler, zoom_marker1, zoom_marker2 for the channel.
This function draws each 5 minute with the color corresponding to
value and create ruler, zoom_marker1, and zoom_marker2 for the channel.
:param c_data: dict - data of the channel which includes down-sampled
data in keys 'times' and 'data'. Refer to
......@@ -136,10 +208,7 @@ class TimePowerSquaredWidget(plotting_widget.PlottingWidget):
:param chan_id: str - name of channel
:return ax: matplotlib.axes.Axes - axes of the channel
"""
if 'tps_data' not in c_data:
# get new minX, maxX according to exact start time of days
get_trim_tps_data(c_data, self.min_x, self.max_x,
self.each_day5_min_list)
total_days = c_data['tps_data'].shape[0]
plot_h = self.plotting_axes.get_height(
1.5 * total_days, bw_plots_distance=0.003)
......@@ -280,6 +349,10 @@ class TimePowerSquaredWidget(plotting_widget.PlottingWidget):
:param event: pick event - event when object of canvas is selected.
The event happens before button_press_event.
"""
if event.mouseevent.name == 'scroll_event':
return
if event.mouseevent.button in ('up', 'down'):
return
info_str = ""
if event.artist in self.axes:
xdata = event.mouseevent.xdata
......@@ -293,7 +366,7 @@ class TimePowerSquaredWidget(plotting_widget.PlottingWidget):
xdata = 287
ydata = round(event.mouseevent.ydata) # y value on the plot
# refer to description in get_plot_data to understand x,y vs
# refer to description in plot_channel to understand x,y vs
# day_index, five_min_index
day_index = - ydata
five_min_index = xdata
......@@ -370,6 +443,11 @@ class TimePowerSquaredWidget(plotting_widget.PlottingWidget):
for zm2 in self.zoom_marker2s:
zm2.set_data(x_idx, y_idx)
def request_stop(self):
"""Request all running channel processors to stop."""
for processor in self.tps_processors:
processor.request_stop()
class TimePowerSquaredDialog(QtWidgets.QWidget):
def __init__(self, parent):
......
from typing import Dict, Optional, List
import numpy as np
from PySide2 import QtCore
from sohstationviewer.conf import constants as const
class TimePowerSquaredProcessorSignal(QtCore.QObject):
finished = QtCore.Signal(str)
stopped = QtCore.Signal(str)
class TimePowerSquaredProcessor(QtCore.QRunnable):
def __init__(self, channel_id: str, channel_data: dict, start_time: float,
end_time: float, each_day_5_mins_list: np.ndarray):
super().__init__()
self.channel_id = channel_id
self.channel_data = channel_data
self.start_time = start_time
self.end_time = end_time
self.each_day_5_mins_list = each_day_5_mins_list
self.signals = TimePowerSquaredProcessorSignal()
# Flag to indicate whether the processor should stop running and clean
# up.
self.stop = False
self.stop_lock = QtCore.QMutex()
def trim_waveform_data(self) -> List[Dict]:
"""
Trim off waveform traces whose times do not intersect the closed
interval [self.start_time, self.end_time]. Store the traces that are
not removed in self.trimmed_trace_list.
"""
data_start_time = self.channel_data['tracesInfo'][0]['startTmEpoch']
data_end_time = self.channel_data['tracesInfo'][-1]['endTmEpoch']
if (self.start_time > data_end_time
or self.end_time < data_start_time):
return []
good_start_indices = [index
for index, tr
in enumerate(self.channel_data['tracesInfo'])
if tr['startTmEpoch'] > self.start_time]
if good_start_indices:
start_idx = good_start_indices[0]
if start_idx > 0:
start_idx -= 1 # start_time in middle of trace
else:
start_idx = 0
good_end_indices = [idx
for idx, tr
in enumerate(self.channel_data['tracesInfo'])
if tr['endTmEpoch'] <= self.end_time]
if good_end_indices:
end_idx = good_end_indices[-1]
if end_idx < len(self.channel_data['tracesInfo']) - 1:
end_idx += 1 # end_time in middle of trace
else:
end_idx = 0
end_idx += 1 # a[x:y+1] = [a[x], ...a[y]]
good_indices = slice(start_idx, end_idx)
return self.channel_data['tracesInfo'][good_indices]
def run(self) -> Optional[bool]:
"""
Different from soh_data where times and data are each in one np.array,
in waveform_data, times and data are each kept in a list of np.memmap
files along with startTmEpoch and endTmEpoch.
self.channel_data['startIdx'] and self.channel_data['endIdx'] will be
used to exclude np.memmap files that aren't in the zoom time range
(startTm, endTm). Data in np.memmap will be trimmed according to times
then time-power-square value for each 5 minutes will be calculated and
saved in channel_data['tps-data']: np.mean(np.square(5m data))
"""
trimmed_traces_list = self.trim_waveform_data()
# preset all 0 for all 5 minutes for each day
tps_data = np.zeros((len(self.each_day_5_mins_list), const.NO_5M_DAY))
spr = self.channel_data['samplerate']
self.channel_data['tps_data'] = []
start_tps_tm = 0
acc_data_list = []
for tr_idx, tr in enumerate(trimmed_traces_list):
self.stop_lock.lock()
if self.stop:
self.stop_lock.unlock()
return self.signals.stopped.emit('')
self.stop_lock.unlock()
times = np.memmap(tr['times_f'],
dtype='int64', mode='r',
shape=tr['size'])
data = np.memmap(tr['data_f'],
dtype='int64', mode='r',
shape=tr['size'])
start_index = 0
if tr_idx == 0:
# get index of times with closet value to startTm
start_index = np.abs(times - self.start_time).argmin()
start_tps_tm = times[start_index]
# identify index in case of overlaps or gaps
index = np.where(
(self.each_day_5_mins_list <= times[start_index]) &
(self.each_day_5_mins_list + const.SEC_5M > times[start_index])
)
curr_row = index[0][0]
curr_col = index[1][0]
next_tps_tm = start_tps_tm + const.SEC_5M
while self.end_time >= next_tps_tm:
self.stop_lock.lock()
if self.stop:
self.stop_lock.unlock()
return self.signals.stopped.emit('')
self.stop_lock.unlock()
next_index = int(start_index + spr * const.SEC_5M)
if next_index >= tr['size']:
acc_data_list.append(data[start_index:tr['size']])
break
else:
acc_data_list.append(
np.square(data[start_index:next_index]))
acc_data = np.hstack(acc_data_list)
if acc_data.size == 0:
tps_data[curr_row, curr_col] = 0
else:
tps_data[curr_row, curr_col] = np.mean(acc_data)
start_index = next_index
curr_col += 1
acc_data_list = []
if curr_col == const.NO_5M_DAY:
curr_col = 0
curr_row += 1
next_tps_tm += const.SEC_5M
self.channel_data['tps_data'] = tps_data
self.signals.finished.emit(self.channel_id)
def request_stop(self):
"""Request that the processor stops by setting the stop flag."""
self.stop_lock.lock()
self.stop = True
self.stop_lock.unlock()
......@@ -25,6 +25,7 @@ class WaveformWidget(plotting_widget.PlottingWidget):
"""
finished = QtCore.Signal()
stopped = QtCore.Signal()
def __init__(self, parent, tracking_box, name):
super().__init__(parent, tracking_box, name)
......@@ -42,6 +43,7 @@ class WaveformWidget(plotting_widget.PlottingWidget):
# Used to ensure that the user cannot read a new data set when we are
# zooming in on the waveform plot.
self.is_working = False
self.finished.connect(self.stopped)
def reset_widget(self):
"""
......@@ -181,7 +183,7 @@ class WaveformWidget(plotting_widget.PlottingWidget):
)
self.data_processors.append(channel_processor)
channel_processor.finished.connect(self.process_channel)
channel_processor.stopped.connect(self.stopped)
channel_processor.stopped.connect(self.has_stopped)
def plot_mass_pos_channels(self):
"""
......@@ -308,7 +310,7 @@ class WaveformWidget(plotting_widget.PlottingWidget):
self.data_processors = [self.data_processors[0]]
@QtCore.Slot()
def stopped(self):
def has_stopped(self):
"""
The slot that is called when the last channel processor has terminated
all running background threads.
......@@ -316,6 +318,7 @@ class WaveformWidget(plotting_widget.PlottingWidget):
display_tracking_info(self.tracking_box,
'Waveform plot stopped', 'info')
self.is_working = False
self.stopped.emit()
class WaveformDialog(QtWidgets.QWidget):
......
......@@ -606,19 +606,20 @@ class UIMainWindow(object):
menu.addAction(self.add_positions_to_et_action)
date_format_menu = QMenu('Date Format:', main_window)
date_format_action_group = QActionGroup(main_window)
menu.addMenu(date_format_menu)
self.yyyy_doy_action = QAction(
'YYYY:DOY', main_window)
self.yyyy_doy_action.setCheckable(True)
date_format_menu.addAction(self.yyyy_doy_action)
self.yyyy_mm_dd_action = QAction(
'YYYY-MM-DD', main_window)
self.yyyy_mm_dd_action = QAction('YYYY-MM-DD', main_window)
self.yyyy_mm_dd_action.setCheckable(True)
date_format_menu.addAction(self.yyyy_mm_dd_action)
self.yyyymmmdd_action = QAction(
'YYYYMMMDD', main_window)
date_format_action_group.addAction(self.yyyy_mm_dd_action)
self.yyyy_doy_action = QAction('YYYY:DOY', main_window)
self.yyyy_doy_action.setCheckable(True)
date_format_menu.addAction(self.yyyy_doy_action)
date_format_action_group.addAction(self.yyyy_doy_action)
self.yyyymmmdd_action = QAction('YYYYMMMDD', main_window)
self.yyyymmmdd_action.setCheckable(True)
date_format_menu.addAction(self.yyyymmmdd_action)
date_format_action_group.addAction(self.yyyymmmdd_action)
def create_database_menu(self, main_window, menu):
"""
......@@ -694,11 +695,12 @@ class UIMainWindow(object):
main_window, 'mass_pos_volt_range_opt', 'trillium'))
self.yyyy_mm_dd_action.triggered.connect(
lambda: main_window.set_date_format('yyyy-MM-dd'))
lambda: main_window.set_date_format('YYYY-MM-DD'))
self.yyyymmmdd_action.triggered.connect(
lambda: main_window.set_date_format('yyyyMMMdd'))
lambda: main_window.set_date_format('YYYYMMMDD'))
self.yyyy_doy_action.triggered.connect(
lambda: main_window.set_date_format('yyyyDOY'))
lambda: main_window.set_date_format('YYYY:DOY'))
self.yyyy_mm_dd_action.trigger()
# Database
self.add_edit_data_type_action.triggered.connect(
......
......@@ -166,7 +166,7 @@ class TestGetTimeTicks(TestCase):
def setUp(self) -> None:
"""Set up text fixtures."""
self.label_cnt = 5
self.date_fmt = 'YYYYMMDD'
self.date_fmt = 'YYYY-MM-DD'
def test_expected_time_range(self):
"""
......@@ -180,8 +180,8 @@ class TestGetTimeTicks(TestCase):
expected = (
[1.0, 2.0, 3.0, 4.0],
[1.0, 2.0, 3.0, 4.0],
['19700101 00:00:01', '19700101 00:00:02',
'19700101 00:00:03', '19700101 00:00:04']
['1970-01-01 00:00:01', '1970-01-01 00:00:02',
'1970-01-01 00:00:03', '1970-01-01 00:00:04']
)
self.assertTupleEqual(
get_time_ticks(earliest, latest,
......@@ -195,8 +195,9 @@ class TestGetTimeTicks(TestCase):
expected = (
[60.0, 120.0, 180.0, 240.0, 300.0],
[60.0, 120.0, 180.0, 240.0, 300.0],
['19700101 00:01:00', '19700101 00:02:00', '19700101 00:03:00',
'19700101 00:04:00', '19700101 00:05:00']
['1970-01-01 00:01:00', '1970-01-01 00:02:00',
'1970-01-01 00:03:00', '1970-01-01 00:04:00',
'1970-01-01 00:05:00']
)
self.assertTupleEqual(
get_time_ticks(earliest, latest,
......@@ -210,8 +211,8 @@ class TestGetTimeTicks(TestCase):
expected = (
[3600.0, 7200.0, 10800.0, 14400.0, 18000.0, 21600.0, 25200.0],
[3600.0, 10800.0, 18000.0, 25200.0],
['19700101 01:00', '19700101 03:00', '19700101 05:00',
'19700101 07:00']
['1970-01-01 01:00', '1970-01-01 03:00', '1970-01-01 05:00',
'1970-01-01 07:00']
)
self.assertTupleEqual(
get_time_ticks(earliest, latest,
......@@ -227,7 +228,8 @@ class TestGetTimeTicks(TestCase):
[86400.0, 172800.0, 259200.0, 345600.0, 432000.0,
518400.0, 604800.0, 691200.0, 777600.0],
[86400.0, 259200.0, 432000.0, 604800.0, 777600.0],
['19700102', '19700104', '19700106', '19700108', '19700110']
['1970-01-02', '1970-01-04', '1970-01-06', '1970-01-08',
'1970-01-10']
)
self.assertTupleEqual(
get_time_ticks(earliest, latest,
......@@ -243,7 +245,8 @@ class TestGetTimeTicks(TestCase):
518400.0, 604800.0, 691200.0, 777600.0, 864000.0,
950400.0, 1036800.0, 1123200.0],
[86400.0, 345600.0, 604800.0, 864000.0, 1123200.0],
['19700102', '19700105', '19700108', '19700111', '19700114']
['1970-01-02', '1970-01-05', '1970-01-08', '1970-01-11',
'1970-01-14']
)
self.assertTupleEqual(
get_time_ticks(earliest, latest,
......@@ -257,7 +260,7 @@ class TestGetTimeTicks(TestCase):
latest = UTCDateTime(1970, 1, 31, 0, 0, 0).timestamp
expected = ([864000.0, 1728000.0],
[864000.0, 1728000.0],
['19700111', '19700121'])
['1970-01-11', '1970-01-21'])
self.assertTupleEqual(
get_time_ticks(earliest, latest,
self.date_fmt, self.label_cnt),
......@@ -271,7 +274,7 @@ class TestGetTimeTicks(TestCase):
[864000.0, 1728000.0, 2592000.0, 3456000.0, 4320000.0,
5184000.0, 6048000.0],
[864000.0, 2592000.0, 4320000.0, 6048000.0],
['19700111', '19700131', '19700220', '19700312']
['1970-01-11', '1970-01-31', '1970-02-20', '1970-03-12']
)
self.assertTupleEqual(
get_time_ticks(earliest, latest, self.date_fmt,
......@@ -349,7 +352,7 @@ class TestFormatTimeAndGetTitle(TestCase):
self.positive_epoch_time = 67567567
self.positive_formatted_dates = {
'YYYY-MM-DD': '1972-02-22',
'YYYYMMDD': '19720222',
'YYYYMMMDD': '1972FEB22',
'YYYY:DOY': '1972:053',
}
self.positive_formatted_time = '00:46:07'
......@@ -357,7 +360,7 @@ class TestFormatTimeAndGetTitle(TestCase):
self.negative_epoch_time = -67567567
self.negative_formatted_dates = {
'YYYY-MM-DD': '1967-11-10',
'YYYYMMDD': '19671110',
'YYYYMMMDD': '1967NOV10',
'YYYY:DOY': '1967:314',
}
self.negative_formatted_time = '23:13:53'
......@@ -371,12 +374,12 @@ class TestFormatTimeAndGetTitle(TestCase):
# formatter:off
test_name_to_date_mode_map = {
'test_year_month_day_format': 'YYYY-MM-DD',
'test_year_month_day_format_no_dash': 'YYYYMMDD',
'test_year_month_day_format_no_dash': 'YYYYMMMDD',
'test_day_of_year_format': 'YYYY:DOY'
}
zero_epoch_formatted = {
'test_year_month_day_format': '1970-01-01',
'test_year_month_day_format_no_dash': '19700101',
'test_year_month_day_format_no_dash': '1970JAN01',
'test_day_of_year_format': '1970:001',
}
# formatter:on
......@@ -401,12 +404,12 @@ class TestFormatTimeAndGetTitle(TestCase):
# formatter:off
test_name_to_date_mode_map = {
'test_year_month_day_format': 'YYYY-MM-DD',
'test_year_month_day_format_no_dash': 'YYYYMMDD',
'test_year_month_day_format_no_dash': 'YYYYMMMDD',
'test_day_of_year_format': 'YYYY:DOY',
}
zero_epoch_formatted = {
'test_year_month_day_format': '1970-01-01 00:00:00',
'test_year_month_day_format_no_dash': '19700101 00:00:00',
'test_year_month_day_format_no_dash': '1970JAN01 00:00:00',
'test_day_of_year_format': '1970:001 00:00:00',
}
# formatter:on
......@@ -441,7 +444,7 @@ class TestFormatTimeAndGetTitle(TestCase):
"""
test_name_to_date_mode_map = {
'test_year_month_day_format': 'YYYY-MM-DD',
'test_year_month_day_format_no_dash': 'YYYYMMDD',
'test_year_month_day_format_no_dash': 'YYYYMMMDD',
'test_day_of_year_format': 'YYYY:DOY',
}
utc_date_time = UTCDateTime(self.positive_epoch_time)
......@@ -458,7 +461,7 @@ class TestFormatTimeAndGetTitle(TestCase):
"""
test_name_to_date_mode_map = {
'test_year_month_day_format': 'YYYY-MM-DD',
'test_year_month_day_format_no_dash': 'YYYYMMDD',
'test_year_month_day_format_no_dash': 'YYYYMMMDD',
'test_day_of_year_format': 'YYYY:DOY',
}
test_time = UTCDateTime(self.positive_epoch_time)
......@@ -500,7 +503,7 @@ class TestFormatTimeAndGetTitle(TestCase):
supported.
"""
test_time = self.positive_epoch_time
date_format = 'YYYYMMDD'
date_format = 'YYYYMMMDD'
empty_format = ''
bad_format = 'bad_format'
......@@ -528,20 +531,20 @@ class TestFormatTimeAndGetTitle(TestCase):
def test_get_title(self):
"""Test basic functionality of get_title."""
date_mode = 'YYYYMMDD'
date_mode = 'YYYY-MM-DD'
min_time = 0
max_time = self.positive_epoch_time
formatted_max_time = (f'{self.positive_formatted_dates[date_mode]}'
f' {self.positive_formatted_time}')
with self.subTest('test_mseed'):
key = '3734'
expected = (f'3734 19700101 00:00:00 to '
expected = (f'3734 1970-01-01 00:00:00 to '
f'{formatted_max_time} (18768.77)')
self.assertEqual(get_title(key, min_time, max_time, date_mode),
expected)
with self.subTest('test_rt130'):
key = ('92EB', 25)
expected = (f"('92EB', 25) 19700101 00:00:00 to "
expected = (f"('92EB', 25) 1970-01-01 00:00:00 to "
f"{formatted_max_time} (18768.77)")
self.assertEqual(get_title(key, min_time, max_time, date_mode),
expected)
......@@ -551,7 +554,7 @@ class TestFormatTimeAndGetTitle(TestCase):
Test basic functionality of get_title - the given maximum time is
chronologically earlier than the given minimum time.
"""
date_mode = 'YYYYMMDD'
date_mode = 'YYYY-MM-DD'
min_time = self.positive_epoch_time
max_time = 0
formatted_max_time = (f'{self.positive_formatted_dates[date_mode]}'
......@@ -559,13 +562,13 @@ class TestFormatTimeAndGetTitle(TestCase):
with self.subTest('test_mseed'):
key = '3734'
expected = (f'3734 {formatted_max_time} to '
f'19700101 00:00:00 (-18768.77)')
f'1970-01-01 00:00:00 (-18768.77)')
self.assertEqual(get_title(key, min_time, max_time, date_mode),
expected)
with self.subTest('test_rt130'):
key = ('92EB', 25)
expected = (f"('92EB', 25) {formatted_max_time} to "
f"19700101 00:00:00 (-18768.77)")
f"1970-01-01 00:00:00 (-18768.77)")
self.assertEqual(get_title(key, min_time, max_time, date_mode),
expected)
......
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Optional, Dict, Union, List
from unittest import TestCase
from unittest.mock import patch
......@@ -7,18 +8,27 @@ from unittest.mock import patch
from obspy.core import UTCDateTime
import numpy as np
import sohstationviewer.view.plotting.time_power_squared_processor
from sohstationviewer.conf import constants as const
from sohstationviewer.model.handling_data import (
trim_downsample_soh_chan,
trim_downsample_wf_chan,
trim_waveform_data,
downsample_waveform_data
downsample_waveform_data,
get_each_day_5_min_list,
)
from sohstationviewer.view.plotting.time_power_squared_processor import (
TimePowerSquaredProcessor,
)
from sohstationviewer.model.downsampler import downsample, chunk_minmax
ORIGINAL_CHAN_SIZE_LIMIT = const.CHAN_SIZE_LIMIT
ORIGINAL_RECAL_SIZE_LIMIT = const.RECAL_SIZE_LIMIT
ZERO_EPOCH_TIME = UTCDateTime(1970, 1, 1, 0, 0, 0).timestamp
TraceInfo = Dict[str, Union[float, str]]
ChannelData = Dict[str,
Union[float, List[np.ndarray], List[TraceInfo], Dict,
np.ndarray]]
class TestTrimWfData(TestCase):
......@@ -596,3 +606,335 @@ class TestTrimDownsampleWfChan(TestCase):
self.end_time, False)
self.assertTrue(mock_trim.called)
self.assertTrue(mock_downsample.called)
class TestGetTrimTpsData(TestCase):
def no_file_memmap(self, file_path: Path, *args, **kwargs):
"""
A mock of numpy.memmap. Reduce test run time significantly by making
sure that data access happens in memory and not on disk.
This method does not actually load the data stored on disk. Instead, it
constructs the array of data using the name of the given file. To do
so, this method requires the file name to be in the format
<prefix>_<index>. This method then constructs an array of
self.trace_size consecutive integers starting at
<index> * self.trace_size.
:param file_path: the path to a file used to construct the data array.
:param args: dummy arguments to make the API similar to numpy.memmap.
:param kwargs: dummy arguments to make the API similar to numpy.memmap.
:return: a numpy array constructed using file_path's name.
"""
file_idx = int(file_path.name.split('_')[-1])
start = file_idx * self.trace_size
end = start + self.trace_size
return np.arange(start, end)
def add_trace(self, start_time: float, idx: Optional[int] = None):
"""
Add a trace to the stored list of traces.
:param start_time: the start time of the trace to be added.
:param idx: the index to insert the trace into. If None, the new trace
will be appended to the list of traces
"""
trace = {}
trace['startTmEpoch'] = start_time
trace['endTmEpoch'] = start_time + self.trace_size - 1
trace['size'] = self.trace_size
file_idx = start_time // self.trace_size
times_file_name = Path(self.data_folder.name) / f'times_{file_idx}'
trace['times_f'] = times_file_name
data_file_name = Path(self.data_folder.name) / f'data_{file_idx}'
trace['data_f'] = data_file_name
if idx is not None:
self.traces_info.insert(idx, trace)
else:
self.traces_info.append(trace)
def setUp(self) -> None:
"""Set up text fixtures."""
memmap_patcher = patch.object(np, 'memmap',
side_effect=self.no_file_memmap)
self.addCleanup(memmap_patcher.stop)
memmap_patcher.start()
# Channel ID is only used when communicating with the main window.
# Seeing as we are testing the processing step here, we don't really
# need it.
channel_id = ''
self.channel_data: ChannelData = {'samplerate': 1}
self.traces_info = []
self.channel_data['tracesInfo'] = self.traces_info
self.data_folder = TemporaryDirectory()
self.trace_size = 1000
for i in range(100):
start_time = i * self.trace_size
self.add_trace(start_time)
self.start_time = 25000
self.end_time = 75000
self.each_day_5_mins_list = get_each_day_5_min_list(self.start_time,
self.end_time)
self.tps_processor = TimePowerSquaredProcessor(
channel_id, self.channel_data, self.start_time, self.end_time,
self.each_day_5_mins_list
)
local_TimePowerSquaredProcessor = (sohstationviewer.view.plotting.
time_power_squared_processor.
TimePowerSquaredProcessor)
# If object obj is instance of class A, then the method call obj.method1()
# translate to A.method1(obj) for Python. So, in order to mock method1 for
# obj, we mock it for the class A.
@patch.object(local_TimePowerSquaredProcessor, 'trim_waveform_data')
def test_data_is_trimmed(self, mock_trim_waveform_data):
"""Test that the data is trimmed."""
self.tps_processor.run()
self.assertTrue(mock_trim_waveform_data.called)
def test_appropriate_amount_of_5_mins_skipped(self):
"""Test that the trimmed part of the data is skipped over."""
self.tps_processor.run()
with self.subTest('test_skip_before_start_time'):
first_unskipped_idx = 83
skipped_tps_arr = (
self.channel_data['tps_data'][0][:first_unskipped_idx]
)
self.assertTrue((skipped_tps_arr == 0).all())
with self.subTest('test_skip_after_end_time'):
last_unskipped_idx = 252
skipped_tps_arr = (
self.channel_data['tps_data'][0][last_unskipped_idx + 1:]
)
self.assertTrue((skipped_tps_arr == 0).all())
def test_result_is_stored(self):
"""Test that the result of the TPS calculation is stored."""
self.tps_processor.run()
self.assertTrue('tps_data' in self.channel_data)
def test_formula_is_correct(self):
"""Test that the TPS calculation uses the correct formula."""
self.tps_processor.start_time = 50000
self.tps_processor.end_time = 52000
self.tps_processor.run()
first_unskipped_idx = 166
last_unskipped_idx = 175
tps_data = self.channel_data['tps_data'][0]
unskipped_tps_arr = (
tps_data[first_unskipped_idx:last_unskipped_idx + 1]
)
expected = np.array([
2.51497985e+09, 2.54515955e+09, 2.57551925e+09, 0.00000000e+00,
1.96222188e+09, 2.64705855e+09, 2.67801825e+09, 2.03969638e+09,
2.75095755e+09, 2.78251725e+09
])
self.assertTrue(np.allclose(unskipped_tps_arr, expected))
def test_one_tps_array_for_each_day_one_day_of_data(self):
"""
Test that there is one TPS array for each day of data.
Test the case where there is only one day of data.
"""
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 1)
def test_one_tps_array_for_each_day_multiple_days_of_data(self):
"""
Test that there is one TPS array for each dat of data.
Test the case where there are more than one day of data.
"""
# Currently, the data time goes from 0 to 100000, which is enough to
# cover two days (the start of the second positive day in epoch time is
# 86400). Thus, we only have to set the end time to the data end time
# to have two days of data.
self.tps_processor.end_time = 100000
self.tps_processor.each_day_5_mins_list = get_each_day_5_min_list(
self.tps_processor.start_time, self.tps_processor.end_time
)
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 2)
def test_data_has_gap_to_the_right_data_same_day_before_gap(self):
"""
Test that gaps in the data are skipped in TPS calculation by checking
that the elements in the TPS array corresponding to the gaps are
0.
Test the case where there are gaps to the right of the data and the
traces directly next to the gaps are in the same day.
"""
# Remove traces that go from 1000 to 24999 (traces 2 to 25) in order to
# create a gap on the right side of the data.
self.traces_info = [trace
for i, trace in enumerate(self.traces_info)
if not 0 < i < 25]
self.channel_data['tracesInfo'] = self.traces_info
with self.subTest('test_start_time_in_gap'):
self.tps_processor.start_time = 15000
self.tps_processor.each_day_5_mins_list = get_each_day_5_min_list(
self.tps_processor.start_time,
self.tps_processor.end_time)
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 1)
tps_gap = slice(0, 50)
tps_data_in_gap = self.channel_data['tps_data'][0][tps_gap]
tps_data_in_gap_contains_zero = np.allclose(
tps_data_in_gap, np.zeros(tps_data_in_gap.size)
)
self.assertTrue(tps_data_in_gap_contains_zero)
with self.subTest('test_start_time_cover_all_traces'):
self.tps_processor.start_time = 500
self.tps_processor.each_day_5_mins_list = get_each_day_5_min_list(
self.tps_processor.start_time,
self.tps_processor.end_time)
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 1)
tps_gap = slice(2, 83)
tps_data_in_gap = self.channel_data['tps_data'][0][tps_gap]
tps_data_in_gap_contains_zero = np.allclose(
tps_data_in_gap, np.zeros(tps_data_in_gap.size)
)
self.assertTrue(tps_data_in_gap_contains_zero)
def test_data_has_gap_to_the_left_data_same_day_after_gap(self):
"""
Test that gaps in the data are skipped in TPS calculation by checking
that the elements in the TPS array corresponding to the gaps are
0.
Test the case where there are gaps to the left of the data and the
traces directly next to the gaps are in the same day.
"""
# Data end time is 100000, so we want a trace that starts after 100001
trace_start_time = 125000
self.add_trace(trace_start_time)
with self.subTest('test_end_time_in_gap'):
# Subject to change after Issue #37 is fixed
self.tps_processor.end_time = 110000
self.tps_processor.each_day_5_mins_list = get_each_day_5_min_list(
self.tps_processor.start_time,
self.tps_processor.end_time)
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 2)
tps_gaps = (slice(45, 128), slice(131, None))
tps_data_in_gaps = np.concatenate(
[self.channel_data['tps_data'][1][gap] for gap in tps_gaps]
)
tps_data_in_gaps_contains_zero = np.allclose(
tps_data_in_gaps, np.zeros(tps_data_in_gaps.size)
)
self.assertTrue(tps_data_in_gaps_contains_zero)
with self.subTest('test_end_time_cover_all_traces'):
self.tps_processor.end_time = trace_start_time + 50
self.tps_processor.each_day_5_mins_list = get_each_day_5_min_list(
self.tps_processor.start_time,
self.tps_processor.end_time)
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 2)
tps_gaps = (slice(45, 128), slice(131, None))
tps_data_in_gaps = np.concatenate(
[self.channel_data['tps_data'][1][gap] for gap in tps_gaps]
)
tps_data_in_gaps_contains_zero = np.allclose(
tps_data_in_gaps, np.zeros(tps_data_in_gaps.size)
)
self.assertTrue(tps_data_in_gaps_contains_zero)
def test_data_has_gap_to_the_right_data_different_day_before_gap(self):
"""
Test that gaps in the data are skipped in TPS calculation by checking
that the elements in the TPS array corresponding to the gaps are
0.
Test the case where there are gaps to the right of the data and the
traces directly next to the gaps are in different days.
"""
trace_start_time = -50000
self.add_trace(trace_start_time, idx=0)
with self.subTest('test_start_time_in_gap'):
self.tps_processor.start_time = -25000
self.tps_processor.each_day_5_mins_list = get_each_day_5_min_list(
self.tps_processor.start_time,
self.tps_processor.end_time)
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 2)
tps_gap = slice(const.NO_5M_DAY)
tps_data_in_gap = self.channel_data['tps_data'][0][tps_gap]
tps_data_in_gap_contains_zero = np.allclose(
tps_data_in_gap, np.zeros(tps_data_in_gap.size)
)
self.assertTrue(tps_data_in_gap_contains_zero)
with self.subTest('test_start_time_cover_all_traces'):
self.tps_processor.start_time = -60000
self.tps_processor.each_day_5_mins_list = get_each_day_5_min_list(
self.tps_processor.start_time,
self.tps_processor.end_time)
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 2)
tps_gaps = (slice(0, 121), slice(124, None))
tps_data_in_gaps = np.concatenate(
[self.channel_data['tps_data'][0][gap] for gap in tps_gaps]
)
tps_data_in_gaps_contains_zero = np.allclose(
tps_data_in_gaps, np.zeros(tps_data_in_gaps.size)
)
self.assertTrue(tps_data_in_gaps_contains_zero)
def test_data_has_gap_to_the_left_data_different_day_after_gap(self):
"""
Test that gaps in the data are skipped in TPS calculation by checking
that the elements in the TPS array corresponding to the gaps are
0.
Test the case where there are gaps to the left of the data and the
traces directly next to the gaps are in different days.
"""
# The start of the third positive day in epoch time is 172800, so we
# want a trace that starts after 172801 so that the trace after the gap
# is of a different day than the previous ones
trace_start_time = 173100
self.add_trace(trace_start_time)
with self.subTest('test_end_time_same_day_as_second_to_last_trace'):
# Subject to change after Issue #37 is fixed
self.tps_processor.end_time = 125000
self.tps_processor.each_day_5_mins_list = get_each_day_5_min_list(
self.tps_processor.start_time,
self.tps_processor.end_time)
with self.assertRaises(IndexError):
self.tps_processor.run()
with self.subTest('test_end_time_cover_all_traces'):
self.tps_processor.end_time = trace_start_time + 50
self.tps_processor.each_day_5_mins_list = get_each_day_5_min_list(
self.tps_processor.start_time,
self.tps_processor.end_time)
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 3)
tps_gap_day_2 = slice(45, None)
tps_gap_day_3 = slice(3, None)
tps_data_in_gaps = np.hstack(
(
self.channel_data['tps_data'][1][tps_gap_day_2],
self.channel_data['tps_data'][2][tps_gap_day_3]
)
)
tps_data_in_gaps_contains_zero = np.allclose(
tps_data_in_gaps, np.zeros(tps_data_in_gaps.size)
)
self.assertTrue(tps_data_in_gaps_contains_zero)