Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
from typing import List, Dict
from PySide2 import QtCore
from sohstationviewer.conf import constants as const
import numpy as np
from sohstationviewer.model.downsampler import Downsampler
class WaveformChannelProcessorSignals(QtCore.QObject):
finished = QtCore.Signal(dict, str)
stopped = QtCore.Signal()
class WaveformChannelProcessor(QtCore.QRunnable):
"""
The class that handles trimming excess data and interfacing with a
downsampler for a waveform channel.
"""
def __init__(self, channel_data, channel_id, start_time, end_time,
first_time):
super().__init__()
self.signals = WaveformChannelProcessorSignals()
# Aliasing the signals for ease of use
self.finished = self.signals.finished
self.stopped = self.signals.stopped
self.stop_requested = False
self.downsampler = Downsampler()
self.channel_data: dict = channel_data
self.channel_id = channel_id
self.start_time = start_time
self.end_time = end_time
self.first_time = first_time
self.trimmed_trace_list = None
self.downsampled_times_list = []
self.downsampled_data_list = []
self.downsampled_list_lock = QtCore.QMutex()
def trim_waveform_data(self) -> List[Dict]:
"""
Trim off waveform traces whose times do not intersect the closed
interval [self.start_time, self.end_time]. Store the traces that are
not removed in self.trimmed_trace_list.
"""
data_start_time = self.channel_data['tracesInfo'][0]['startTmEpoch']
data_end_time = self.channel_data['tracesInfo'][-1]['endTmEpoch']
if (self.start_time > data_end_time
or self.end_time < data_start_time):
return []
good_start_indices = [index
for index, tr
in enumerate(self.channel_data['tracesInfo'])
if tr['startTmEpoch'] > self.start_time]
if good_start_indices:
start_idx = good_start_indices[0]
if start_idx > 0:
start_idx -= 1 # start_time in middle of trace
else:
start_idx = 0
good_end_indices = [idx
for idx, tr
in enumerate(self.channel_data['tracesInfo'])
if tr['endTmEpoch'] <= self.end_time]
if good_end_indices:
end_idx = good_end_indices[-1]
if end_idx < len(self.channel_data['tracesInfo']) - 1:
end_idx += 1 # end_time in middle of trace
else:
end_idx = 0
end_idx += 1 # a[x:y+1] = [a[x], ...a[y]]
good_indices = slice(start_idx, end_idx)
self.trimmed_trace_list = self.channel_data['tracesInfo'][good_indices]
def init_downsampler(self):
"""
Initialize the downsampler by loading the memmapped traces' data and
creating a downsampler worker for each loaded trace.
"""
# Calculate the number of requested_points
total_size = sum([tr['size'] for tr in self.trimmed_trace_list])
requested_points = 0
if total_size > const.CHAN_SIZE_LIMIT:
requested_points = int(
const.CHAN_SIZE_LIMIT / len(self.trimmed_trace_list)
)
# Downsample the data
for tr_idx, tr in enumerate(self.trimmed_trace_list):
if not self.stop_requested:
times = np.memmap(tr['times_f'],
dtype='int64', mode='r',
shape=tr['size'])
data = np.memmap(tr['data_f'],
dtype='int64', mode='r',
shape=tr['size'])
indexes = np.where((self.start_time <= times) &
(times <= self.end_time))
times = times[indexes]
data = data[indexes]
do_downsample = (requested_points != 0)
worker = self.downsampler.add_worker(
times, data, rq_points=requested_points,
do_downsample=do_downsample
)
# We need these connections to run in the background thread.
# However, their owner (the channel processor) is in the main
# thread, so the default connection type would make them
# run in the main thread. Instead, we have to use a direct
# connection to make these slots run in the background thread.
worker.signals.finished.connect(
self.trace_processed, type=QtCore.Qt.DirectConnection
)
worker.signals.stopped.connect(
self.stopped, type=QtCore.Qt.DirectConnection
)
@QtCore.Slot()
def trace_processed(self, times, data):
"""
The slot called when the downsampler worker of a waveform trace
finishes its job. Add the downsampled data to the appropriate list.
If the worker that emitted the signal is the last one, combine and
store the processed data in self.channel_data and emit the finished
signal of this class.
:param times: the downsampled array of time data.
:param data: the downsampled array of waveform data.
"""
self.downsampled_list_lock.lock()
self.downsampled_times_list.append(times)
self.downsampled_data_list.append(data)
self.downsampled_list_lock.unlock()
if len(self.downsampled_times_list) == len(self.trimmed_trace_list):
self.channel_data['times'] = np.hstack(self.downsampled_times_list)
self.channel_data['data'] = np.hstack(self.downsampled_data_list)
self.signals.finished.emit(self.channel_data, self.channel_id)
def run(self):
"""
The main method of this class. First check that the channel is not
already small enough after the first trim that there is no need for
further processing. Then, trim the waveform data based on
self.start_time and self.end_time. Afterwards, do some checks to
determine if there is a need to downsample the data. If yes, initialize
and start the downsampler.
"""
if 'fulldata' in self.channel_data:
self.finished.emit(self.channel_data, self.channel_id)
# data is small, already has full in the first trim
return
self.trim_waveform_data()
if not self.trimmed_trace_list:
self.channel_data['times'] = np.array([])
self.channel_data['data'] = np.array([])
self.finished.emit(self.channel_data, self.channel_id)
return False
total_size = sum([tr['size'] for tr in self.trimmed_trace_list])
if not self.first_time and total_size > const.RECAL_SIZE_LIMIT:
# The data is so big that processing it would not make it any
# easier to understand resulting plot.
return
if total_size <= const.CHAN_SIZE_LIMIT and self.first_time:
self.channel_data['fulldata'] = True
try:
del self.channel_data['times']
del self.channel_data['data']
except Exception:
pass
self.init_downsampler()
self.downsampler.start()
def request_stop(self):
"""
Stop processing the data by requesting the downsampler to stop
running.
"""
self.stop_requested = True
self.downsampler.request_stop()
...@@ -713,4 +713,5 @@ class UIMainWindow(object): ...@@ -713,4 +713,5 @@ class UIMainWindow(object):
self.prefer_soh_chan_button.clicked.connect( self.prefer_soh_chan_button.clicked.connect(
main_window.open_channel_preferences) main_window.open_channel_preferences)
self.read_button.clicked.connect(main_window.read_selected_files) self.read_button.clicked.connect(main_window.read_selected_files)
self.stop_button.clicked.connect(main_window.stop_load_data)
self.stop_button.clicked.connect(main_window.stop)
This diff is collapsed.
This diff is collapsed.
...@@ -79,7 +79,7 @@ class TestExtractData(unittest.TestCase): ...@@ -79,7 +79,7 @@ class TestExtractData(unittest.TestCase):
'valueColors': None} 'valueColors': None}
# Data type has None value. None value comes from # Data type has None value. None value comes from
# controller.processing.detectDataType. # controller.processing.detect_data_type.
expected_result['label'] = 'DEFAULT-SOH/Data Def' expected_result['label'] = 'DEFAULT-SOH/Data Def'
self.assertDictEqual(get_chan_plot_info('SOH/Data Def', None), self.assertDictEqual(get_chan_plot_info('SOH/Data Def', None),
expected_result) expected_result)
......