Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
Commits on Source (4)
Showing
with 837 additions and 652 deletions
......@@ -44,10 +44,12 @@ checked, a warning will be created, "Checked data streams will be ignored
for RT130 data type."
## Displaying waveform channels
If one of TPS or RAW checkboxes aren't checked which means no data need to be
displayed, all the waveform selected will be ignored.
To display waveform channels, user need to check:
TPS needs to be checked to display Time-Power-Squared of waveform.
RAW needs to be checked to display actual signal of waveform.
+ <img alt="TPS" src="images/select_waveform/select_TPS.png" height="30" />: to diplay Time-Power-Squared of the selected waveform data
+ <img alt="RAW" src="images/select_waveform/select_RAW.png" height="30" />: and check RAW to display the actual selected waveform data.
<br />
\ No newline at end of file
<br />
If any of waveform is checked but no TPS or RAW is checked,
+ For RT130, the program will read event of the selected data stream.
+ For MSeed, the program will pop up message request user to clear waveform selection or select either TPS or RAW.
\ No newline at end of file
......@@ -6,18 +6,22 @@ channels, datatype
import os
import json
import re
import traceback
from pathlib import Path
from typing import List, Set, Optional, Dict, Tuple
from PySide2.QtCore import QEventLoop, Qt
from PySide2.QtGui import QCursor
from PySide2.QtWidgets import QTextBrowser, QApplication
from obspy.core import read as read_ms
from obspy.io.reftek.core import Reftek130Exception
from obspy.io import reftek
from sohstationviewer.model.mseed_data.record_reader import RecordReader \
as MSeedRecordReader
from sohstationviewer.model.mseed_data.record_reader_helper import \
MSeedReadError
from sohstationviewer.model.mseed_data.mseed_reader import \
move_to_next_record
from sohstationviewer.database.extract_data import get_signature_channels
from sohstationviewer.model.data_type_model import DataTypeModel
from sohstationviewer.model.handling_data import (
read_mseed_chanids_from_headers)
......@@ -28,69 +32,6 @@ from sohstationviewer.controller.util import (
from sohstationviewer.view.util.enums import LogType
def load_data(data_type: str, tracking_box: QTextBrowser, dir_list: List[str],
list_of_rt130_paths: List[Path],
req_wf_chans: List[str] = [], req_soh_chans: List[str] = [],
read_start: Optional[float] = None,
read_end: Optional[float] = None) -> DataTypeModel:
"""
Load the data stored in list_of_dir and store it in a DataTypeModel object.
The concrete class of the data object is based on dataType. Run on the same
thread as its caller, and so will block the GUI if called on the main
thread. It is advisable to use model.data_loader.DataLoader to load data
unless it is necessary to load data in the main thread (e.g. if there is
a need to access the call stack).
:param data_type: type of data read
:param tracking_box: widget to display tracking info
:param dir_list: list of directories selected by users
:param list_of_rt130_paths: list of rt130 directories selected by users
:param req_wf_chans: requested waveform channel list
:param req_soh_chans: requested soh channel list
:param read_start: start time of read data
:param read_end: finish time of read data
:return data_object: object that keep the data read from
list_of_dir
"""
data_object = None
if list_of_rt130_paths == []:
for d in dir_list:
if data_object is None:
try:
data_object = DataTypeModel.create_data_object(
data_type, tracking_box, d, [],
req_wf_chans=req_wf_chans, req_soh_chans=req_soh_chans,
read_start=read_start, read_end=read_end)
except Exception:
fmt = traceback.format_exc()
msg = f"Dir {d} can't be read due to error: {str(fmt)}"
display_tracking_info(tracking_box, msg, LogType.WARNING)
# if data_object.has_data():
# continue
# If no data can be read from the first dir, throw exception
# raise Exception("No data can be read from ", d)
# TODO: will work with select more than one dir later
# else:
# data_object.readDir(d)
else:
try:
data_object = DataTypeModel.create_data_object(
data_type, tracking_box, [''], list_of_rt130_paths,
req_wf_chans=req_wf_chans, req_soh_chans=req_soh_chans,
read_start=read_start, read_end=read_end)
except Exception:
fmt = traceback.format_exc()
msg = f"RT130 selected can't be read due to error: {str(fmt)}"
display_tracking_info(tracking_box, msg, LogType.WARNING)
if data_object is None:
msg = "No data object created. Check with implementer"
display_tracking_info(tracking_box, msg, LogType.WARNING)
return data_object
def read_mseed_channels(tracking_box: QTextBrowser, list_of_dir: List[str],
on_unittest: bool = False
) -> Set[str]:
......@@ -157,6 +98,7 @@ def detect_data_type(list_of_dir: List[str]) -> Optional[str]:
sign_chan_data_type_dict = get_signature_channels()
dir_data_type_dict = {}
is_multiplex = None
for d in list_of_dir:
data_type = "Unknown"
for path, subdirs, files in os.walk(d):
......@@ -165,17 +107,24 @@ def detect_data_type(list_of_dir: List[str]) -> Optional[str]:
if not validate_file(path2file, file_name):
continue
ret = get_data_type_from_file(path2file,
sign_chan_data_type_dict)
sign_chan_data_type_dict,
is_multiplex)
if ret is not None:
data_type, chan = ret
break
d_type, is_multiplex = ret
if d_type is not None:
data_type = d_type
break
if data_type != "Unknown":
break
if is_multiplex is None:
raise Exception("No channel found for the data set")
if data_type == "Unknown":
dir_data_type_dict[d] = ("Unknown", '_')
dir_data_type_dict[d] = "Unknown"
else:
dir_data_type_dict[d] = (data_type, chan)
data_type_list = {d[0] for d in dir_data_type_dict.values()}
dir_data_type_dict[d] = data_type
data_type_list = list(set(dir_data_type_dict.values()))
if len(data_type_list) > 1:
dir_data_type_str = json.dumps(dir_data_type_dict)
dir_data_type_str = re.sub(r'\{|\}|"', '', dir_data_type_str)
......@@ -185,38 +134,78 @@ def detect_data_type(list_of_dir: List[str]) -> Optional[str]:
f"Please have only data that related to each other.")
raise Exception(msg)
elif data_type_list == {'Unknown'}:
msg = ("There are no known data detected.\n"
"Please select different folder(s).")
elif data_type_list == ['Unknown']:
msg = ("There are no known data detected.\n\n"
"Do you want to cancel to select different folder(s)\n"
"Or continue to read any available mseed file?")
raise Exception(msg)
return list(dir_data_type_dict.values())[0][0]
return data_type_list[0], is_multiplex
def get_data_type_from_file(
path2file: Path,
sign_chan_data_type_dict: Dict[str, str]
) -> Optional[Tuple[str, str]]:
sign_chan_data_type_dict: Dict[str, str],
is_multiplex: bool = None
) -> Optional[Tuple[Optional[str], bool]]:
"""
+ Try to read mseed data from given file
if catch TypeError: no data type detected => return None
if catch Reftek130Exception: data type => return data type RT130
otherwise data type is mseed which includes: q330, pegasus, centaur
+ Continue to identify data type for a file by checking if the channel
in that file is a unique channel of a data type.
+ Exclude files for waveform data to improve performance
+ Loop through each record for file
If MSeedRecordReader gives Error; check if the file is RT130, report
data_type is RT130 or else, return to continue checking on another
file.
If there're more than one channels in a file, this file is multiplex.
If found signature channel, report the data_type of the file.
:param path2file: absolute path to processed file
:param sign_chan_data_type_dict: dict of unique chan for data
type
:param is_multiplex: if the file is multiplex
:return: detected data type, channel from which data type is detected
"""
try:
stream = read_ms(path2file)
except TypeError:
return
except Reftek130Exception:
return 'RT130', '_'
for trace in stream:
chan = trace.stats['channel']
wf_chan_posibilities = ['FH', 'FN', # ≥ 1000 to < 5000
'GH', 'GL', # ≥ 1000 to < 5000
'DH', 'DL', # ≥ 250 to < 1000
'CH', 'CN', # ≥ 250 to < 1000
'EH', 'EL', 'EP', # ≥ 80
'SH', 'SL', 'SP', # ≥ 10 to < 80
'HH', 'HN', # ≥ 80
'BH', 'BN', # ≥ 10 to < 80
'MH', 'MN', 'MP', 'ML',
'LH', 'LL', 'LP', 'LN',
'VP', 'VL', 'VL', 'VH',
'UN', 'UP', 'UL', 'UH']
if any(x in path2file.name for x in wf_chan_posibilities):
# Skip checking waveform files which aren't signature channels
return None, False
file = open(path2file, 'rb')
chans_in_stream = set()
data_type = None
while 1:
is_eof = (file.read(1) == b'')
if is_eof:
break
file.seek(-1, 1)
current_record_start = file.tell()
try:
record = MSeedRecordReader(file)
except MSeedReadError:
file.close()
if reftek.core._is_reftek130(path2file):
return 'RT130', False
return
chan = record.record_metadata.channel
if is_multiplex is None:
chans_in_stream.add(chan)
if len(chans_in_stream) > 1:
is_multiplex = True
if chan in sign_chan_data_type_dict.keys():
return sign_chan_data_type_dict[chan], chan
data_type = sign_chan_data_type_dict[chan]
if is_multiplex:
file.close()
return data_type, is_multiplex
move_to_next_record(file, current_record_start, record)
file.close()
is_multiplex = True if len(chans_in_stream) > 1 else False
return data_type, is_multiplex
No preview for this file type
......@@ -9,8 +9,8 @@ from PySide2 import QtCore, QtWidgets
from sohstationviewer.conf import constants
from sohstationviewer.controller.util import display_tracking_info
from sohstationviewer.model.data_type_model import (
DataTypeModel, ThreadStopped, ProcessingDataError)
from sohstationviewer.model.general_data.general_data import (
GeneralData, ThreadStopped, ProcessingDataError)
from sohstationviewer.view.util.enums import LogType
......@@ -18,7 +18,7 @@ class DataLoaderWorker(QtCore.QObject):
"""
The worker class that executes the code to load the data.
"""
finished = QtCore.Signal(DataTypeModel)
finished = QtCore.Signal(GeneralData)
failed = QtCore.Signal()
stopped = QtCore.Signal()
notification = QtCore.Signal(QtWidgets.QTextBrowser, str, str)
......@@ -26,19 +26,23 @@ class DataLoaderWorker(QtCore.QObject):
button_chosen = QtCore.Signal(int)
def __init__(self, data_type: str, tracking_box: QtWidgets.QTextBrowser,
is_multiplex: Optional[bool],
folder: str, list_of_rt130_paths: List[Path],
req_wf_chans: Union[List[str], List[int]] = [],
req_soh_chans: List[str] = [], read_start: float = 0,
gap_minimum: Optional[float] = None,
read_end: float = constants.HIGHEST_INT,
include_mp123: bool = False, include_mp456: bool = False,
rt130_waveform_data_req: bool = False, parent_thread=None):
super().__init__()
self.data_type = data_type
self.tracking_box = tracking_box
self.is_multiplex = is_multiplex
self.folder = folder
self.list_of_rt130_paths = list_of_rt130_paths
self.req_wf_chans = req_wf_chans
self.req_soh_chans = req_soh_chans
self.gap_minimum = gap_minimum
self.read_start = read_start
self.read_end = read_end
self.include_mp123 = include_mp123
......@@ -58,7 +62,7 @@ class DataLoaderWorker(QtCore.QObject):
from sohstationviewer.model.reftek.reftek import RT130
object_type = RT130
else:
from sohstationviewer.model.mseed.mseed import MSeed
from sohstationviewer.model.mseed_data.mseed import MSeed
object_type = MSeed
# Create data object without loading any data in order to connect
# its unpause slot to the loader's unpause signal
......@@ -66,9 +70,10 @@ class DataLoaderWorker(QtCore.QObject):
self.button_chosen.connect(data_object.receive_pause_response,
type=QtCore.Qt.DirectConnection)
data_object.__init__(
self.data_type, self.tracking_box, self.folder,
self.data_type, self.tracking_box,
self.is_multiplex, self.folder,
self.list_of_rt130_paths, req_wf_chans=self.req_wf_chans,
req_soh_chans=self.req_soh_chans,
req_soh_chans=self.req_soh_chans, gap_minimum=self.gap_minimum,
read_start=self.read_start, read_end=self.read_end,
include_mp123zne=self.include_mp123,
include_mp456uvw=self.include_mp456,
......@@ -109,11 +114,15 @@ class DataLoader(QtCore.QObject):
self.thread: Optional[QtCore.QThread] = None
self.worker: Optional[DataLoaderWorker] = None
def init_loader(self, data_type: str, tracking_box: QtWidgets.QTextBrowser,
def init_loader(self, data_type: str,
tracking_box: QtWidgets.QTextBrowser,
is_multiplex: bool,
list_of_dir: List[Union[str, Path]],
list_of_rt130_paths: List[Union[str, Path]],
req_wf_chans: Union[List[str], List[int]] = [],
req_soh_chans: List[str] = [], read_start: float = 0,
req_soh_chans: List[str] = [],
gap_minimum: Optional[float] = None,
read_start: float = 0,
read_end: float = constants.HIGHEST_INT,
include_mp123: bool = False,
include_mp456: bool = False,
......@@ -145,10 +154,12 @@ class DataLoader(QtCore.QObject):
self.worker = DataLoaderWorker(
data_type,
tracking_box,
is_multiplex,
list_of_dir[0], # Only work on one directory for now.
list_of_rt130_paths,
req_wf_chans=req_wf_chans,
req_soh_chans=req_soh_chans,
gap_minimum=gap_minimum,
read_start=read_start,
read_end=read_end,
include_mp123=include_mp123,
......
......@@ -15,7 +15,8 @@ from sohstationviewer.view.util.enums import LogType
from sohstationviewer.database.process_db import execute_db
from sohstationviewer.model.general_data.general_data_helper import \
retrieve_data_time_from_data_dict, retrieve_gaps_from_data_dict, \
combine_data, sort_data, squash_gaps, apply_convert_factor_to_data_dict
combine_data, sort_data, squash_gaps, apply_convert_factor_to_data_dict, \
reset_data
class ProcessingDataError(Exception):
......@@ -139,7 +140,6 @@ class GeneralData():
selected_key: str - key of the data set to be displayed
"""
self.selected_key: Optional[str] = None
"""
gaps: gaps info in dict:
"""
......@@ -215,13 +215,12 @@ class GeneralData():
self.sort_all_data()
self.combine_all_data()
self.apply_convert_factor_to_data_dicts()
self.check_not_found_soh_channels()
self.retrieve_gaps_from_data_dicts()
self.retrieve_data_time_from_data_dicts()
for key in self.keys:
if key not in self.data_time.keys():
self.data_time[key] = [self.read_start, self.read_end]
if self.selected_key not in self.data_time.keys():
self.data_time[self.selected_key] = \
[self.read_start, self.read_end]
def __del__(self):
# FROM data_type_model.Data_Type_Model.__del__
......@@ -352,10 +351,10 @@ class GeneralData():
pass
def combine_all_data(self):
combine_data(self.waveform_data[self.selected_key], self.gap_minimum)
combine_data(self.mass_pos_data[self.selected_key], self.gap_minimum)
combine_data(self.selected_key, self.waveform_data, self.gap_minimum)
combine_data(self.selected_key, self.mass_pos_data, self.gap_minimum)
try:
combine_data(self.soh_data[self.selected_key], self.gap_minimum)
combine_data(self.selected_key, self.soh_data, self.gap_minimum)
except KeyError:
# Reftek's SOH trace doesn't have startTmEpoch and
# actually soh_data consists of only one trace
......@@ -365,20 +364,28 @@ class GeneralData():
"""
Getting gaps from each data_dicts then squash all related gaps
"""
retrieve_gaps_from_data_dict(self.soh_data, self.gaps)
retrieve_gaps_from_data_dict(self.mass_pos_data, self.gaps)
retrieve_gaps_from_data_dict(self.waveform_data, self.gaps)
for sta_id in self.gaps:
self.gaps[sta_id] = squash_gaps(self.gaps[sta_id])
self.gaps[self.selected_key] = []
retrieve_gaps_from_data_dict(
self.selected_key, self.soh_data, self.gaps)
retrieve_gaps_from_data_dict(
self.selected_key, self.mass_pos_data, self.gaps)
retrieve_gaps_from_data_dict(
self.selected_key, self.waveform_data, self.gaps)
self.gaps[self.selected_key] = squash_gaps(
self.gaps[self.selected_key])
def retrieve_data_time_from_data_dicts(self):
"""
Going through each data_dict to update the data_time to be
[min of startTimeEpoch, max of endTimeEpoch] for each station.
"""
retrieve_data_time_from_data_dict(self.soh_data, self.data_time)
retrieve_data_time_from_data_dict(self.mass_pos_data, self.data_time)
retrieve_data_time_from_data_dict(self.waveform_data, self.data_time)
retrieve_data_time_from_data_dict(
self.selected_key, self.soh_data, self.data_time)
retrieve_data_time_from_data_dict(
self.selected_key, self.mass_pos_data, self.data_time)
retrieve_data_time_from_data_dict(
self.selected_key, self.waveform_data, self.data_time)
def fill_empty_data(self):
"""
......@@ -399,6 +406,19 @@ class GeneralData():
Applying convert_factor to avoid using flags to prevent double
applying convert factor when plotting
"""
apply_convert_factor_to_data_dict(self.soh_data, self.data_type)
apply_convert_factor_to_data_dict(self.mass_pos_data, self.data_type)
apply_convert_factor_to_data_dict(self.waveform_data, self.data_type)
apply_convert_factor_to_data_dict(
self.selected_key, self.soh_data, self.data_type)
apply_convert_factor_to_data_dict(
self.selected_key, self.mass_pos_data, self.data_type)
apply_convert_factor_to_data_dict(
self.selected_key, self.waveform_data, self.data_type)
def reset_all_selected_data(self):
"""
FROM data_type_model.reset_all_selected_data()
Remove all keys created in the plotting process, and change fullData
to False. This function is to replace deepcopy which uses more memory.
"""
reset_data(self.selected_key, self.soh_data)
reset_data(self.selected_key, self.waveform_data)
reset_data(self.selected_key, self.mass_pos_data)
from typing import List, Dict, Optional
from typing import List, Dict, Optional, Union, Tuple
import numpy as np
from sohstationviewer.database.extract_data import get_convert_factor
......@@ -91,68 +91,73 @@ def sort_data(sta_data_dict: Dict) -> None:
def retrieve_data_time_from_data_dict(
selected_key: Union[str, Tuple[str, str]],
data_dict: Dict, data_time: Dict[str, List[float]]) -> None:
"""
Going through each channel in each station to get data_time for each
station which is [min of startTimeEpoch, max of endTimeEpoch] among
the station's channels.
:param selected_key: the key of the selected data set
:param data_dict: the given data_dict
:param data_time: data by sta_id
"""
for sta_id in data_dict.keys():
for c in data_dict[sta_id]:
dtime = [data_dict[sta_id][c]['startTmEpoch'],
data_dict[sta_id][c]['endTmEpoch']]
if sta_id in data_time.keys():
data_time[sta_id][0] = min(data_time[sta_id][0], dtime[0])
data_time[sta_id][1] = max(data_time[sta_id][1], dtime[1])
else:
data_time[sta_id] = dtime
def retrieve_gaps_from_data_dict(data_dict: Dict,
selected_data_dict = data_dict[selected_key]
for c in selected_data_dict:
dtime = [selected_data_dict[c]['startTmEpoch'],
selected_data_dict[c]['endTmEpoch']]
if selected_key in data_time.keys():
data_time[selected_key][0] = min(data_time[selected_key][0],
dtime[0])
data_time[selected_key][1] = max(data_time[selected_key][1],
dtime[1])
else:
data_time[selected_key] = dtime
def retrieve_gaps_from_data_dict(selected_key: Union[str, Tuple[str, str]],
data_dict: Dict,
gaps: Dict[str, List[List[float]]]) -> None:
"""
Create each station's gaps by adding all gaps from all channels
:param selected_key: the key of the selected data set
:param data_dict: given stream
:param gaps: gaps list by key
"""
for key in data_dict.keys():
if key not in gaps:
gaps[key] = []
for c in data_dict[key].keys():
cgaps = data_dict[key][c]['gaps']
if cgaps != []:
gaps[key] += cgaps
selected_data_dict = data_dict[selected_key]
for c in selected_data_dict.keys():
cgaps = selected_data_dict[c]['gaps']
if cgaps != []:
gaps[selected_key] += cgaps
def combine_data(station_data_dict: Dict, gap_minimum: Optional[float]) \
-> None:
def combine_data(selected_key: Union[str, Tuple[str, str]],
data_dict: Dict, gap_minimum: Optional[float]) -> None:
"""
Traverse through traces in each channel, add to gap list if
delta >= gap_minimum with delta is the distance between
contiguous traces.
Combine sorted data using concatenate, which also change data ot ndarray
and update startTmEpoch and endTmEpoch.
:param selected_key: the key of the selected data set
:param station_data_dict: dict of data of a station
:param gap_minimum: minimum length of gaps to be detected
"""
for chan_id in station_data_dict:
channel = station_data_dict[chan_id]
selected_data_dict = data_dict[selected_key]
for chan_id in selected_data_dict:
channel = selected_data_dict[chan_id]
traces_info = channel['tracesInfo']
if 'gaps' in channel:
# gaps key is for mseed data only
for idx in range(len(traces_info) - 1):
curr_end_tm = traces_info[idx]['endTmEpoch']
next_start_tm = traces_info[idx + 1]['startTmEpoch']
delta = abs(curr_end_tm - next_start_tm)
if gap_minimum is not None and delta >= gap_minimum:
# add gap
gap = [curr_end_tm, next_start_tm]
selected_data_dict[chan_id]['gaps'].append(gap)
for idx in range(len(traces_info) - 1):
curr_end_tm = traces_info[idx]['endTmEpoch']
next_start_tm = traces_info[idx+1]['startTmEpoch']
delta = abs(curr_end_tm - next_start_tm)
if gap_minimum is not None and delta >= gap_minimum:
# add gap
gap = [curr_end_tm, next_start_tm]
station_data_dict[chan_id]['gaps'].append(gap)
channel['startTmEpoch'] = min([tr['startTmEpoch']
for tr in traces_info])
channel['endTmEpoch'] = max([tr['endTmEpoch'] for tr in traces_info])
......@@ -167,18 +172,38 @@ def combine_data(station_data_dict: Dict, gap_minimum: Optional[float]) \
}]
def apply_convert_factor_to_data_dict(data_dict: Dict, data_type: str):
def apply_convert_factor_to_data_dict(
selected_key: Union[str, Tuple[str, str]],
data_dict: Dict, data_type: str) -> None:
"""
Traverse through traces in each channel to convert data according to
convert_factor got from DB
:param selected_key: the key of the selected data set
:param data_dict: dict of data
:param data_type: type of data
"""
for key in data_dict:
for chan_id in data_dict[key]:
channel = data_dict[key][chan_id]
convert_factor = get_convert_factor(chan_id, data_type)
if convert_factor is not None and convert_factor != 1:
for tr in channel['tracesInfo']:
tr['data'] = convert_factor * tr['data']
selected_data_dict = data_dict[selected_key]
for chan_id in selected_data_dict:
channel = selected_data_dict[chan_id]
convert_factor = get_convert_factor(chan_id, data_type)
if convert_factor is not None and convert_factor != 1:
for tr in channel['tracesInfo']:
tr['data'] = convert_factor * tr['data']
def reset_data(selected_key: Union[str, Tuple[str, str]], data_dict: Dict):
"""
FROM data_type_model.reset_data()
Remove all keys created in the plotting process for the given data dict
:param selected_key: the key of the selected data set
:param data_dict: data of the selected key
"""
selected_data_dict = data_dict[selected_key]
for chan_id in selected_data_dict:
selected_data_dict[chan_id]['fullData'] = False
del_keys = ['chan_db_info', 'times', 'data', 'ax', 'ax_wf']
for k in del_keys:
try:
del selected_data_dict[chan_id][k]
except KeyError:
pass
......@@ -72,7 +72,6 @@ class MSeed(GeneralData):
self.track_info(str(e), LogType.WARNING)
continue
for file_name in files:
if self.creator_thread.isInterruptionRequested():
raise ThreadStopped()
......
......@@ -7,7 +7,27 @@ from sohstationviewer.model.mseed_data.record_reader import RecordReader
from sohstationviewer.model.mseed_data.record_reader_helper import \
RecordMetadata
from sohstationviewer.model.handling_data import check_chan
from sohstationviewer.model.mseed_data.mseed_reader_helper import check_chan
def move_to_next_record(file, current_record_start: int,
record: RecordReader):
"""
Move the current position of file to next record
:param current_record_start: the start position of the current record
:param reader: the record that is reading
"""
# MSEED stores the size of a data record as an exponent of a
# power of two, so we have to convert that to actual size before
# doing anything else.
record_length_exp = record.header_unpacker.unpack(
'B', record.blockette_1000.record_length
)[0]
record_size = 2 ** record_length_exp
file.seek(current_record_start)
file.seek(record_size, 1)
class MSeedReader:
......@@ -225,25 +245,6 @@ class MSeedReader:
channel['size'] += meta.sample_count
channel['nets'].add(meta.network)
def get_ready_for_next_read(self, current_record_start: int,
record: RecordReader):
"""
Move the current position of file to next record' start.
:param current_record_start: the start position of the current record
:param record: the record that is reading
"""
# MSEED stores the size of a data record as an exponent of a
# power of two, so we have to convert that to actual size before
# doing anything else.
record_length_exp = record.header_unpacker.unpack(
'B', record.blockette_1000.record_length
)[0]
record_size = 2 ** record_length_exp
self.file.seek(current_record_start)
self.file.seek(record_size, 1)
def read(self):
while 1:
# We know that end of file is reached when read() returns an empty
......@@ -266,12 +267,14 @@ class MSeedReader:
if record.invalid_blockettes:
self.invalid_blockettes = True
if not self.check_time(record):
self.get_ready_for_next_read(current_record_start, record)
move_to_next_record(
self.file, current_record_start, record)
continue
data_dict = self.get_data_dict(record.record_metadata)
if data_dict is None:
if self.is_multiplex:
self.get_ready_for_next_read(current_record_start, record)
move_to_next_record(
self.file, current_record_start, record)
continue
else:
break
......@@ -279,5 +282,5 @@ class MSeedReader:
self.append_data(data_dict, record, first_data_point)
self.append_log(record)
self.get_ready_for_next_read(current_record_start, record)
move_to_next_record(self.file, current_record_start, record)
self.file.close()
# ALL FUNCTIONS IN THIS FILE ARE FROM HANDLING DATA. NO NEED TO REVIEW
import re
from typing import Tuple, List, Union
from sohstationviewer.conf.dbSettings import dbConf
def check_chan(chan_id: str, req_soh_chans: List[str], req_wf_chans: List[str],
include_mp123zne: bool, include_mp456uvw: bool) \
-> Union[str, bool]:
"""
Check if chanID is a requested channel.
:param chan_id: str - channel ID
:param req_soh_chans: list of str - requested SOH channels
:param req_wf_chans: list of str - requested waveform channels
:param include_mp123zne: if mass position channels 1,2,3 are requested
:param include_mp456uvw: if mass position channels 4,5,6 are requested
:return: str/bool -
'WF' if chanID is a requested waveform channel,
'SOH' if chanID is a requested SOH channel,
'MP' if chanID is a requested mass position channel
False otherwise.
"""
if chan_id.startswith('VM'):
if (not include_mp123zne and
chan_id[-1] in ['1', '2', '3', 'Z', 'N', 'E']):
return False
if (not include_mp456uvw
and chan_id[-1] in ['4', '5', '6', 'U', 'V', 'W']):
return False
return 'MP'
ret = check_wf_chan(chan_id, req_wf_chans)
if ret[0] == 'WF':
if ret[1]:
return "WF"
else:
return False
if check_soh_chan(chan_id, req_soh_chans):
return "SOH"
return False
def check_soh_chan(chan_id: str, req_soh_chans: List[str]) -> bool:
"""
Check if chan_id is a requested SOH channel.
Mass position is always included.
This function is used for mseed only so mass position is 'VM'.
If there is no reqSOHChans, it means all SOH channels are requested
:param chan_id: str - channel ID
:param req_soh_chans: list of str - requested SOH channels
:return: bool - True if chan_id is a requested SOH channel. False otherwise
"""
if req_soh_chans == []:
return True
if chan_id in req_soh_chans:
return True
if 'EX?' in req_soh_chans and chan_id.startswith('EX'):
if chan_id[2] in ['1', '2', '3']:
return True
# TODO: remove mass position channels from SOH
if chan_id.startswith('VM'):
if chan_id[2] in ['0', '1', '2', '3', '4', '5', '6']:
return True
return False
def check_wf_chan(chan_id: str, req_wf_chans: List[str]) -> Tuple[str, bool]:
"""
Check if chanID is a waveform channel and is requested by user
:param chan_id: str - channel ID
:param req_wf_chans: list of str - requested waveform channels
:return wf: str - '' if chan_id is not a waveform channel.
'WF' if chan_id is a waveform channel.
:return has_chan: bool - True if chan_id is a requested waveform channel.
"""
if not dbConf['seisRE'].match(chan_id):
return '', False
for req in req_wf_chans:
if len(req) == 1:
req = req.replace('*', '...')
elif len(req) == 2:
req = req.replace('*', '..')
elif len(req) == 3:
req = req.replace('*', '.')
if re.compile(f'^{req}$').match(chan_id):
return 'WF', True
return 'WF', False
from typing import Union, Dict, List, Set, Tuple
from sohstationviewer.controller.plotting_data import format_time
from sohstationviewer.model.data_type_model import DataTypeModel
from sohstationviewer.model.mseed.mseed import MSeed
from sohstationviewer.model.general_data.general_data import GeneralData
from sohstationviewer.model.mseed_data.mseed import MSeed
from sohstationviewer.model.reftek.reftek import RT130
from sohstationviewer.view.util.functions import extract_netcodes
def extract_data_set_info(data_obj: Union[DataTypeModel, RT130, MSeed],
def extract_data_set_info(data_obj: Union[GeneralData, RT130, MSeed],
date_format: str
) -> Dict[str, Union[str, List[str]]]:
"""
......@@ -45,7 +45,7 @@ def extract_data_set_info(data_obj: Union[DataTypeModel, RT130, MSeed],
f"\n\t\tTo: {end_time_str}")
data_set_info['Time ranges'] = '\n\t'.join(time_range_info_list)
key_sets = data_obj.stream_header_by_key_chan.keys()
key_sets = data_obj.keys
if data_type == 'RT130':
das_serials = list({key[0] for key in key_sets})
experiment_numbers = list({key[1] for key in key_sets})
......
......@@ -10,9 +10,9 @@ from PySide2.QtCore import QSize
from PySide2.QtGui import QFont, QPalette, QColor
from PySide2.QtWidgets import QFrame, QListWidgetItem, QMessageBox
from sohstationviewer.conf import constants
from sohstationviewer.model.data_loader import DataLoader
from sohstationviewer.model.data_type_model import DataTypeModel
from sohstationviewer.model.general_data.general_data import \
GeneralData
from sohstationviewer.view.calendar.calendar_dialog import CalendarDialog
from sohstationviewer.view.db_config.channel_dialog import ChannelDialog
......@@ -41,8 +41,7 @@ from sohstationviewer.view.channel_prefer_dialog import ChannelPreferDialog
from sohstationviewer.controller.processing import detect_data_type
from sohstationviewer.controller.util import (
display_tracking_info, rt130_find_cf_dass, check_data_sdata,
get_dir_size
display_tracking_info, rt130_find_cf_dass, check_data_sdata
)
from sohstationviewer.database.process_db import execute_db_dict, execute_db
......@@ -89,6 +88,11 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
"""
self.data_type: str = 'Unknown'
"""
is_multiplex: flag showing if data_set is multiplex (more than one
channels in a file)
"""
self.is_multiplex = None
"""
color_mode: str - the current color mode of the plot; can be either 'B'
or 'W'
"""
......@@ -125,11 +129,11 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
"""
data_object: Object that keep data read from data set for plotting
"""
self.data_object: Union[DataTypeModel, None] = None
self.data_object: Union[GeneralData, None] = None
"""
min_gap: minimum minutes of gap length to be display on gap bar
gap_minimum: minimum minutes of gap length to be display on gap bar
"""
self.min_gap: Union[float, None] = None
self.gap_minimum: Union[float, None] = None
"""
pref_soh_list_name: name of selected preferred channels list
"""
......@@ -398,31 +402,41 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
:rtype: List[str, int]
"""
req_wf_chans = []
if self.all_wf_chans_check_box.isChecked():
req_mseed_wildcards = ['*']
req_dss = ['*'] # all data stream
else:
req_dss = []
req_mseed_wildcards = []
for idx, ds_checkbox in enumerate(self.ds_check_boxes):
if ds_checkbox.isChecked():
req_dss.append(idx + 1)
if self.mseed_wildcard_edit.text().strip() != "":
req_mseed_wildcards = self.mseed_wildcard_edit.text(
).split(",")
if self.data_type == 'RT130':
req_wf_chans = req_dss
if req_dss != ['*'] and req_mseed_wildcards != []:
msg = 'MSeed Wildcards will be ignored for RT130.'
self.processing_log.append((msg, LogType.WARNING))
else:
req_wf_chans = req_mseed_wildcards
if req_mseed_wildcards != ['*'] and req_dss != []:
msg = ('Checked data streams will be ignored for '
'none-RT130 data type.')
self.processing_log.append((msg, LogType.WARNING))
if (self.data_type != 'RT130' and
(self.all_wf_chans_check_box.isChecked()
or self.mseed_wildcard_edit.text().strip() != "")
and not self.tps_check_box.isChecked()
and not self.raw_check_box.isChecked()):
raise Exception(
"Waveform channels have been selected but there are none of "
"TPS or RAW checkboxes checked.\nPlease clear the "
"selection of waveform if you don't want to display the data.")
if self.tps_check_box.isChecked() or self.raw_check_box.isChecked():
if self.all_wf_chans_check_box.isChecked():
req_mseed_wildcards = ['*']
req_dss = ['*'] # all data stream
else:
req_dss = []
req_mseed_wildcards = []
for idx, ds_checkbox in enumerate(self.ds_check_boxes):
if ds_checkbox.isChecked():
req_dss.append(idx + 1)
if self.mseed_wildcard_edit.text().strip() != "":
req_mseed_wildcards = self.mseed_wildcard_edit.text(
).split(",")
if self.data_type == 'RT130':
req_wf_chans = req_dss
if req_dss != ['*'] and req_mseed_wildcards != []:
msg = 'MSeed Wildcards will be ignored for RT130.'
self.processing_log.append((msg, LogType.WARNING))
else:
req_wf_chans = req_mseed_wildcards
if req_mseed_wildcards != ['*'] and req_dss != []:
msg = ('Checked data streams will be ignored for '
'none-RT130 data type.')
self.processing_log.append((msg, LogType.WARNING))
return req_wf_chans
def get_requested_soh_chan(self):
......@@ -494,10 +508,8 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
raise Exception(msg)
if self.rt130_das_dict == {}:
try:
self.data_type = detect_data_type(self.dir_names)
except Exception as e:
raise e
self.data_type, self.is_multiplex = detect_data_type(
self.dir_names)
def clear_plots(self):
self.plotting_widget.clear()
......@@ -540,23 +552,23 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
if self.gap_len_line_edit.text().strip() != '':
try:
self.min_gap = float(
self.gap_len_line_edit.text())
# convert from minute to second
self.gap_minimum = float(
self.gap_len_line_edit.text()) * 60
except ValueError:
msg = "Minimum Gap must be a number."
QtWidgets.QMessageBox.warning(
self, "Invalid Minimum Gap request", msg)
self.cancel_loading()
return
if self.gap_minimum < 0.1:
msg = "Minimum Gap must be greater than 0.1 minute to be " \
"detected."
QtWidgets.QMessageBox.warning(
self, "Invalid Minimum Gap request", msg)
return
else:
self.min_gap = None
# if waveform channels are selected, Event DS will be read from EH/ET
# header
# rt130_waveform_data_req is to read data for wave form data
rt130_waveform_data_req = False
if self.raw_check_box.isChecked() or self.tps_check_box.isChecked():
rt130_waveform_data_req = True
self.gap_minimum = None
if self.mseed_wildcard_edit.text().strip() != '':
try:
......@@ -574,17 +586,29 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
except AttributeError:
pass
self.req_soh_chans = (self.pref_soh_list
if not self.all_soh_chans_check_box.isChecked()
else [])
try:
self.read_from_file_list()
except Exception as e:
QtWidgets.QMessageBox.warning(self, "Select directory", str(e))
self.cancel_loading()
return
if 'no known data detected' in str(e):
msgbox = QtWidgets.QMessageBox()
msgbox.setWindowTitle('Do you want to continue?')
msgbox.setText(str(e))
msgbox.addButton(QtWidgets.QMessageBox.Cancel)
msgbox.addButton('Continue', QtWidgets.QMessageBox.YesRole)
result = msgbox.exec_()
if result == QtWidgets.QMessageBox.Cancel:
self.cancel_loading()
return
self.data_type == 'Unknown'
else:
fmt = traceback.format_exc()
QtWidgets.QMessageBox.warning(
self, "Select directory", str(fmt))
self.cancel_loading()
return
"""
temporary skip check_size for it take too long.
dir_size = sum(get_dir_size(str(dir))[0] for dir in self.dir_names)
if dir_size > constants.BIG_FILE_SIZE:
data_too_big_dialog = QMessageBox()
......@@ -600,7 +624,7 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
if ret == QMessageBox.Abort:
self.cancel_loading()
return
"""
self.req_soh_chans = self.get_requested_soh_chan()
try:
self.req_wf_chans = self.get_requested_wf_chans()
......@@ -623,15 +647,16 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.data_loader.init_loader(
self.data_type,
self.tracking_info_text_browser,
self.is_multiplex,
self.dir_names,
self.selected_rt130_paths,
req_wf_chans=self.req_wf_chans,
req_soh_chans=self.req_soh_chans,
gap_minimum=self.gap_minimum,
read_start=self.start_tm,
read_end=self.end_tm,
include_mp123=self.mass_pos_123zne_check_box.isChecked(),
include_mp456=self.mass_pos_456uvw_check_box.isChecked(),
rt130_waveform_data_req=rt130_waveform_data_req
include_mp456=self.mass_pos_456uvw_check_box.isChecked()
)
self.data_loader.worker.finished.connect(self.data_loaded)
......@@ -702,13 +727,19 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.is_stopping = False
@QtCore.Slot()
def data_loaded(self, data_obj: DataTypeModel):
def data_loaded(self, data_obj: GeneralData):
"""
Process the loaded data.
:param data_obj: the data object that contains the loaded data.
"""
self.is_loading_data = False
self.data_object = data_obj
if (self.data_type == 'Q330' and
'LOG' not in data_obj.log_data[data_obj.selected_key]):
log_message = ("Channel 'LOG' is required to get file info and "
"gps info for Q330", LogType.WARNING)
self.processing_log.append(log_message)
return
try:
self.gps_dialog.gps_points = extract_gps_data(data_obj)
except ValueError as e:
......@@ -743,7 +774,6 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
"""
if self.has_problem:
return
self.clear_plots()
self.is_plotting_soh = True
self.plotting_widget.set_colors(self.color_mode)
self.waveform_dlg.plotting_widget.set_colors(self.color_mode)
......@@ -756,7 +786,6 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
sel_key = d_obj.selected_key
d_obj.reset_all_selected_data()
d_obj.reset_need_process_for_mass_pos()
try:
check_masspos(d_obj.mass_pos_data[sel_key], sel_key,
self.mass_pos_123zne_check_box.isChecked(),
......
......@@ -6,8 +6,7 @@ from typing import List, Optional, Dict, NoReturn
import numpy as np
from obspy import UTCDateTime
from sohstationviewer.controller.processing import detect_data_type
from sohstationviewer.model.mseed.mseed import MSeed
from sohstationviewer.model.mseed_data.mseed import MSeed
from sohstationviewer.model.reftek.reftek import RT130
from sohstationviewer.view.plotting.gps_plot.gps_point import GPSPoint
from sohstationviewer.view.util.enums import LogType
......@@ -184,9 +183,10 @@ def get_gps_channel_prefix(data_obj: MSeed, data_type: str) -> Optional[str]:
# Determine the GPS channels by checking if the current data set
# has all the GPS channels of a data type.
if pegasus_gps_channels & data_obj.channels == pegasus_gps_channels:
channels = set(data_obj.soh_data[data_obj.selected_key].keys())
if pegasus_gps_channels & channels == pegasus_gps_channels:
gps_prefix = 'V'
elif centaur_gps_channels & data_obj.channels == centaur_gps_channels:
elif centaur_gps_channels & channels == centaur_gps_channels:
gps_prefix = 'G'
else:
msg = "Can't detect GPS channels."
......@@ -234,7 +234,9 @@ def extract_gps_data_pegasus_centaur(data_obj: MSeed, data_type: str
gps_prefix = get_gps_channel_prefix(data_obj, data_type)
gps_chans = {gps_prefix + 'NS', gps_prefix + 'LA', gps_prefix + 'LO',
gps_prefix + 'EL'}
channels = data_obj.stream_header_by_key_chan[data_obj.selected_key].keys()
if data_obj.selected_key is None:
return []
channels = data_obj.soh_data[data_obj.selected_key].keys()
if not gps_chans.issubset(channels):
missing_gps_chans = gps_chans - channels
missing_gps_chans_string = ', '.join(missing_gps_chans)
......@@ -434,8 +436,23 @@ def gps_data_rt130(data_obj: RT130) -> List[GPSPoint]:
@extract_gps_data.register(MSeed)
def gps_data_mseed(data_obj: MSeed) -> List[GPSPoint]:
data_type = detect_data_type([data_obj.dir])
try:
data_type = data_obj.data_type
except Exception:
data_type = 'Unknown'
if data_type == 'Q330':
return extract_gps_data_q330(data_obj)
elif data_type == 'Centaur' or data_type == 'Pegasus':
return extract_gps_data_pegasus_centaur(data_obj, data_type)
else:
# data_type = "Unknown"
try:
gps_data = extract_gps_data_q330(data_obj)
except KeyError:
try:
gps_data = extract_gps_data_pegasus_centaur(
data_obj, data_type)
except AttributeError:
return []
return gps_data
......@@ -9,7 +9,7 @@ from matplotlib.backends.backend_qt5agg import (
FigureCanvasQTAgg as Canvas)
from sohstationviewer.controller.plotting_data import (
get_gaps, get_time_ticks, get_unit_bitweight)
get_time_ticks, get_unit_bitweight)
from sohstationviewer.conf import constants
from sohstationviewer.view.util.color import clr
......@@ -325,15 +325,15 @@ class PlottingAxes:
:param gaps: [[float, float], ] - list of [min, max] of gaps
"""
if self.main_window.min_gap is None:
if self.main_window.gap_minimum is None:
return
self.gaps = gaps = get_gaps(gaps, self.main_window.min_gap)
self.gaps = gaps
self.parent.plotting_bot -= 0.003
self.parent.gap_bar = self.create_axes(self.parent.plotting_bot,
0.001,
has_min_max_lines=False)
gap_label = f"GAP({self.main_window.min_gap}min)"
gap_label = f"GAP({self.main_window.gap_minimum}sec)"
h = 0.001 # height of rectangle represent gap
self.set_axes_info(self.parent.gap_bar, [len(gaps)],
label=gap_label)
......
from typing import List, Dict
from PySide2 import QtCore
from obspy import UTCDateTime
from obspy.core import Trace
from sohstationviewer.conf import constants as const
import numpy as np
# from sohstationviewer.model.decimator import Decimator
from sohstationviewer.model.downsampler import Downsampler
from sohstationviewer.model.handling_data import \
trim_downsample_chan_with_spr_less_or_equal_1
from sohstationviewer.view.plotting.plotting_widget.plotting_processor_helper\
import downsample
class PlottingChannelProcessorSignals(QtCore.QObject):
......@@ -33,10 +25,6 @@ class PlottingChannelProcessor(QtCore.QRunnable):
self.stop_requested = False
self.downsampler = Downsampler()
# self.downsampler = Decimator()
self.decimator = self.downsampler
self.channel_data: dict = channel_data
self.channel_id = channel_id
......@@ -44,288 +32,27 @@ class PlottingChannelProcessor(QtCore.QRunnable):
self.end_time = end_time
self.first_time = first_time
self.trimmed_trace_list = None
self.downsampled_times_list = []
self.downsampled_data_list = []
self.downsampled_list_lock = QtCore.QMutex()
def trim_plotting_data(self) -> List[Dict]:
"""
Trim off plotting traces whose times do not intersect the closed
interval [self.start_time, self.end_time]. Store the traces that are
not removed in self.trimmed_trace_list.
"""
data_start_time = self.channel_data['tracesInfo'][0]['startTmEpoch']
data_end_time = self.channel_data['tracesInfo'][-1]['endTmEpoch']
if (self.start_time > data_end_time
or self.end_time < data_start_time):
return []
good_start_indices = [index
for index, tr
in enumerate(self.channel_data['tracesInfo'])
if tr['startTmEpoch'] > self.start_time]
if good_start_indices:
start_idx = good_start_indices[0]
if start_idx > 0:
start_idx -= 1 # start_time in middle of trace
else:
start_idx = 0
good_end_indices = [idx
for idx, tr
in enumerate(self.channel_data['tracesInfo'])
if tr['endTmEpoch'] <= self.end_time]
if good_end_indices:
end_idx = good_end_indices[-1]
if end_idx < len(self.channel_data['tracesInfo']) - 1:
end_idx += 1 # end_time in middle of trace
else:
end_idx = 0
end_idx += 1 # a[x:y+1] = [a[x], ...a[y]]
good_indices = slice(start_idx, end_idx)
self.trimmed_trace_list = self.channel_data['tracesInfo'][good_indices]
def init_downsampler_(self):
"""
Initialize the downsampler by loading the memmapped traces' data into
Obsby Trace and creating a downsampler worker for each loaded trace
which use Obspy's decimate for downsampling
Currently using decimate from obspy is slower than using downsample.
Besides, decimate taking sample constantly while downsample which using
chunckminmax, taking min, max of each part, is better in detecting
spike of signal.
We decide to not use this function but leave this here as reference
to compare with the result of other method.
"""
decimate_factor = int(self.channel_size / const.CHAN_SIZE_LIMIT)
if decimate_factor > 16:
decimate_factor = 16
do_decimate = decimate_factor > 1
for tr in self.trimmed_trace_list:
if not self.stop_requested:
trace = Trace(data=np.memmap(tr['data_f'], dtype='int64',
mode='r', shape=tr['size']))
trace.stats.starttime = UTCDateTime(tr['startTmEpoch'])
trace.stats.sampling_rate = tr['samplerate']
worker = self.decimator.add_worker(
trace, decimate_factor, do_decimate
)
# We need these connections to run in the background thread.
# However, their owner (the channel processor) is in the main
# thread, so the default connection type would make them
# run in the main thread. Instead, we have to use a direct
# connection to make these slots run in the background thread.
worker.signals.finished.connect(
self.decimator_trace_processed,
type=QtCore.Qt.DirectConnection
)
worker.signals.stopped.connect(
self.stopped,
type=QtCore.Qt.DirectConnection
)
def init_downsampler(self):
"""
Initialize the downsampler by loading the memmapped traces' data and
creating a downsampler worker for each loaded trace.
"""
# Calculate the number of requested_points
total_size = sum([tr['size'] for tr in self.trimmed_trace_list])
requested_points = 0
if total_size > const.CHAN_SIZE_LIMIT:
requested_points = int(
const.CHAN_SIZE_LIMIT / len(self.trimmed_trace_list)
)
# Downsample the data
for tr_idx, tr in enumerate(self.trimmed_trace_list):
if not self.stop_requested:
times = np.linspace(tr['startTmEpoch'], tr['endTmEpoch'],
tr['size'])
data = np.memmap(tr['data_f'],
dtype='int64', mode='r',
shape=tr['size'])
indexes = np.where((self.start_time <= times) &
(times <= self.end_time))
times = times[indexes]
data = data[indexes]
do_downsample = (requested_points != 0)
worker = self.downsampler.add_worker(
times, data, rq_points=requested_points,
do_downsample=do_downsample
)
# We need these connections to run in the background thread.
# However, their owner (the channel processor) is in the main
# thread, so the default connection type would make them
# run in the main thread. Instead, we have to use a direct
# connection to make these slots run in the background thread.
worker.signals.finished.connect(
self.trace_processed, type=QtCore.Qt.DirectConnection
)
worker.signals.stopped.connect(
self.stopped, type=QtCore.Qt.DirectConnection
)
@QtCore.Slot()
def trace_processed(self, times, data):
"""
The slot called when the downsampler worker of a plotting trace
finishes its job. Add the downsampled data to the appropriate list.
If the worker that emitted the signal is the last one, combine and
store the processed data in self.channel_data but not combine when
there is an overlap and then emit the finished signal of this class.
:param times: the downsampled array of time data.
:param data: the downsampled array of plotting data.
"""
self.downsampled_list_lock.lock()
self.downsampled_times_list.append(times)
self.downsampled_data_list.append(data)
self.downsampled_list_lock.unlock()
if len(self.downsampled_times_list) == len(self.trimmed_trace_list):
times_list = []
data_list = []
last_end_time = 0
current_times = []
current_data = []
for idx, tr in enumerate(self.trimmed_trace_list):
# combine traces together but split at overlap
if tr['startTmEpoch'] > last_end_time:
current_times.append(self.downsampled_times_list[idx])
current_data.append(self.downsampled_data_list[idx])
else:
if len(current_times) > 0:
times_list.append(np.hstack(current_times))
data_list.append(np.hstack(current_data))
current_times = [self.downsampled_times_list[idx]]
current_data = [self.downsampled_data_list[idx]]
last_end_time = tr['endTmEpoch']
times_list.append(np.hstack(current_times))
data_list.append(np.hstack(current_data))
self.channel_data['times'] = times_list
self.channel_data['data'] = data_list
self.signals.finished.emit(self.channel_data, self.channel_id)
@QtCore.Slot()
def decimator_trace_processed(self, trace: Trace):
"""
The slot called when the decimator worker of a plotting trace
finishes its job. Add the decimated trace.data to the appropriate list,
construct time using np.linspace and add to the appropriate list.
If the worker that emitted the signal is the last one, combine and
store the processed data in self.channel_data but not combine when
there is an overlap and then emit the finished signal of this class.
:param trace: the decimated trace.
"""
self.downsampled_list_lock.lock()
self.downsampled_times_list.append(
np.linspace(trace.stats.starttime.timestamp,
trace.stats.endtime.timestamp,
trace.stats.npts)
)
self.downsampled_data_list.append(trace.data)
self.downsampled_list_lock.unlock()
if len(self.downsampled_times_list) == len(self.trimmed_trace_list):
times_list = []
data_list = []
last_end_time = 0
current_times = []
current_data = []
for idx, tr in enumerate(self.trimmed_trace_list):
# combine traces together but split at overlap
if tr['startTmEpoch'] > last_end_time:
current_times.append(self.downsampled_times_list[idx])
current_data.append(self.downsampled_data_list[idx])
else:
if len(current_times) > 0:
times_list.append(np.hstack(current_times))
data_list.append(np.hstack(current_data))
current_times = [self.downsampled_times_list[idx]]
current_data = [self.downsampled_data_list[idx]]
last_end_time = tr['endTmEpoch']
times_list.append(np.hstack(current_times))
data_list.append(np.hstack(current_data))
self.channel_data['times'] = times_list
self.channel_data['data'] = data_list
self.signals.finished.emit(self.channel_data, self.channel_id)
def run(self):
"""
The main method of this class. First check that the channel is not
already small enough after the first trim that there is no need for
further processing. Then, trim the plotting data based on
self.start_time and self.end_time. Afterwards, do some checks to
determine if there is a need to downsample the data. If yes, initialize
and start the downsampler.
Because of changes that read less data instead of all data in files,
now data has only one trace. We can assign the times and data in that
trace to times and data of the channel. Trimming won't be necessary
anymore.
"""
if 'needProcess' in self.channel_data:
# refer to DataTypeModel.reset_need_process_for_mass_pos
# for needProcess
if not self.channel_data['needProcess']:
self.finished.emit(self.channel_data, self.channel_id)
return
else:
# put needProcess flag down
self.channel_data['needProcess'] = False
if self.channel_data['fullData']:
# Data is small, already has full in the first trim.
self.finished.emit(self.channel_data, self.channel_id)
return
self.trim_plotting_data()
if not self.trimmed_trace_list:
self.channel_data['fullData'] = True
self.channel_data['times'] = np.array([])
self.channel_data['data'] = np.array([])
self.finished.emit(self.channel_data, self.channel_id)
return False
if self.channel_data['samplerate'] <= 1:
self.channel_data['needConvert'] = True
self.channel_data['times'] = [
tr['times'] for tr in self.trimmed_trace_list]
self.channel_data['data'] = [
tr['data'] for tr in self.trimmed_trace_list]
trim_downsample_chan_with_spr_less_or_equal_1(
self.channel_data, self.start_time, self.end_time)
self.finished.emit(self.channel_data, self.channel_id)
return
self.channel_size = sum(
[tr['size'] for tr in self.trimmed_trace_list])
total_size = sum([tr['size'] for tr in self.trimmed_trace_list])
if not self.first_time and total_size > const.RECAL_SIZE_LIMIT:
# The data is so big that processing it would not make it any
# easier to understand the result plot.
self.finished.emit(self.channel_data, self.channel_id)
return
if total_size <= const.CHAN_SIZE_LIMIT and self.first_time:
self.channel_data['fullData'] = True
try:
del self.channel_data['times']
del self.channel_data['data']
except Exception:
pass
tr = self.channel_data['tracesInfo'][0]
if 'logIdx' in tr.keys():
tr_times, tr_data, tr_logidx = downsample(
tr['times'], tr['data'], tr['logIdx'],
rq_points=const.CHAN_SIZE_LIMIT)
self.channel_data['logIdx'] = [tr_logidx]
else:
tr_times, tr_data, _ = downsample(
tr['times'], tr['data'], rq_points=const.CHAN_SIZE_LIMIT)
self.channel_data['times'] = [tr_times]
self.channel_data['data'] = [tr_data]
self.channel_data['needConvert'] = True
self.init_downsampler()
self.downsampler.start()
self.finished.emit(self.channel_data, self.channel_id)
def request_stop(self):
"""
......@@ -333,4 +60,3 @@ class PlottingChannelProcessor(QtCore.QRunnable):
running.
"""
self.stop_requested = True
self.downsampler.request_stop()
import numpy as np
import math
from sohstationviewer.conf import constants as const
def downsample(times, data, log_indexes=None, rq_points=0):
"""
Reduce sample rate of times and data so that times and data return has
the size around the rq_points.
Since the functions used for downsampling (chunk_minmax()/constant_rate)
are very slow, the values of data from mean to CUT_FROM_MEAN_FACTOR
will be removed first. If the size not meet the rq_points, then
continue to downsample.
:param times: numpy array - of a waveform channel's times
:param data: numpy array - of a waveform channel's data
:param log_indexes: numpy array - of a waveform channel's soh message line
index
:param rq_points: int - requested size to return.
:return np.array, np.array,(np.array) - new times and new data (and new
log_indexes) with the requested size
"""
# create a dummy array for log_indexes. However this way may slow down
# the performance of waveform downsample because waveform channel are large
# and have no log_indexes.
if times.size <= rq_points:
return times, data, log_indexes
if log_indexes is None:
log_indexes = np.empty_like(times)
data_max = max(abs(data.max()), abs(data.min()))
data_mean = abs(data.mean())
indexes = np.where(
abs(data - data.mean()) >
(data_max - data_mean) * const.CUT_FROM_MEAN_FACTOR)
times = times[indexes]
data = data[indexes]
log_indexes = log_indexes[indexes]
if times.size <= rq_points:
return times, data, log_indexes
return chunk_minmax(times, data, log_indexes, rq_points)
def chunk_minmax(times, data, log_indexes, rq_points):
"""
Split data into different chunks, take the min, max of each chunk to add
to the data return
:param times: numpy array - of a channel's times
:param data: numpy array - of a channel's data
:param log_indexes: numpy array - of a channel's log_indexes
:param rq_points: int - requested size to return.
:return times, data: np.array, np.array - new times and new data with the
requested size
"""
final_points = 0
if times.size <= rq_points:
final_points += times.size
return times, data, log_indexes
if rq_points < 2:
return np.empty((1, 0)), np.empty((1, 0)), np.empty((1, 0))
# Since grabbing the min and max from each
# chunk, need to div the requested number of points
# by 2.
chunk_size = rq_points // 2
chunk_count = math.ceil(times.size / chunk_size)
if chunk_count * chunk_size > times.size:
chunk_count -= 1
# Length of the trace is not divisible by the number of requested
# points. So split into an array that is divisible by the requested
# size, and an array that contains the excess. Downsample both,
# and combine. This case gives slightly more samples than
# the requested sample size, but not by much.
times_0 = times[:chunk_count * chunk_size]
data_0 = data[:chunk_count * chunk_size]
log_indexes_0 = log_indexes[:chunk_count * chunk_size]
excess_times = times[chunk_count * chunk_size:]
excess_data = data[chunk_count * chunk_size:]
excess_log_indexes = data[chunk_count * chunk_size:]
new_times_0, new_data_0, new_log_indexes_0 = downsample(
times_0, data_0, log_indexes_0, rq_points=rq_points
)
# right-most subarray is always smaller than
# the initially requested number of points.
excess_times, excess_data, excess_log_indexes = downsample(
excess_times, excess_data, excess_log_indexes,
rq_points=chunk_count
)
new_times = np.zeros(new_times_0.size + excess_times.size)
new_data = np.zeros(new_data_0.size + excess_data.size)
new_log_indexes = np.zeros(
new_log_indexes_0.size + excess_log_indexes.size
)
new_times[:new_times_0.size] = new_times_0
new_data[:new_data_0.size] = new_data_0
new_log_indexes[:new_log_indexes_0.size] = new_log_indexes_0
new_times[new_times_0.size:] = excess_times
new_data[new_data_0.size:] = excess_data
new_log_indexes[new_log_indexes_0.size:] = excess_log_indexes
return new_times, new_data, new_log_indexes
new_times = times.reshape(chunk_size, chunk_count)
new_data = data.reshape(chunk_size, chunk_count)
new_log_indexes = log_indexes.reshape(chunk_size, chunk_count)
min_data_idx = np.argmin(new_data, axis=1)
max_data_idx = np.argmax(new_data, axis=1)
rows = np.arange(chunk_size)
mask = np.zeros(shape=(chunk_size, chunk_count), dtype=bool)
mask[rows, min_data_idx] = True
mask[rows, max_data_idx] = True
new_times = new_times[mask]
new_data = new_data[mask]
new_log_indexes = new_log_indexes[mask]
return new_times, new_data, new_log_indexes
......@@ -369,12 +369,6 @@ class PlottingWidget(QtWidgets.QScrollArea):
# tps_t was assigned in TPS Widget
xdata = self.tps_t
else:
if (modifiers == QtCore.Qt.ShiftModifier and
self.zoom_marker1_shown):
# When start zooming, need to reset mass position for processor
# to decide to calculate mass position channel or not
self.data_object.reset_need_process_for_mass_pos()
xdata = self.get_timestamp(event)
# We only want to remove the text on the ruler when we start zooming in
......
......@@ -4,8 +4,6 @@ from typing import Tuple, Union, Dict
from sohstationviewer.view.util.plot_func_names import plot_functions
from sohstationviewer.controller.util import apply_convert_factor
from sohstationviewer.model.data_type_model import DataTypeModel
from sohstationviewer.view.util.enums import LogType
......@@ -33,10 +31,10 @@ class SOHWidget(MultiThreadedPlottingWidget):
:param time_ticks_total: max number of tick to show on time bar
"""
self.data_object = d_obj
self.plotting_data1 = d_obj.soh_data[key]
self.plotting_data2 = d_obj.mass_pos_data[key]
channel_list = d_obj.soh_data[key].keys()
data_time = d_obj.data_time[key]
self.plotting_data1 = d_obj.soh_data[key] if key else {}
self.plotting_data2 = d_obj.mass_pos_data[key] if key else {}
channel_list = d_obj.soh_data[key].keys() if key else []
data_time = d_obj.data_time[key] if key else [0, 1]
ret = super().init_plot(d_obj, data_time, key, start_tm, end_tm,
time_ticks_total, is_waveform=False)
if not ret:
......@@ -64,7 +62,6 @@ class SOHWidget(MultiThreadedPlottingWidget):
return
chan_db_info = c_data['chan_db_info']
plot_type = chan_db_info['plotType']
apply_convert_factor(c_data, chan_db_info['convertFactor'])
linked_ax = None
if chan_db_info['linkedChan'] not in [None, 'None', '']:
......
import numpy as np
from typing import Dict, Tuple, List
from sohstationviewer.conf import constants as const
def get_start_5mins_of_diff_days(start_tm: float, end_tm: float) -> np.ndarray:
"""
FROM handling_data.get_start_5mins_of_diff_days()
Get the list of the start time of all five minutes for each day start from
the day of startTm and end at the day of endTm.
:param start_tm: float - start time
:param end_tm: float - end time
:return start_5mins_of_diff_days: [[288 of floats], ] - the list of
start of all five minutes of days specified by start_tm and end_tm in
which each day has 288 of 5 minutes.
"""
exact_day_tm = (start_tm // const.SEC_DAY) * const.SEC_DAY
exact_day_tm_list = []
if start_tm < exact_day_tm:
exact_day_tm_list = [exact_day_tm - const.SEC_DAY]
while exact_day_tm < end_tm:
exact_day_tm_list.append(exact_day_tm)
exact_day_tm += const.SEC_DAY
# list of start/end 5m in each day: start_5mins_of_diff_days
for idx, start_day_tm in enumerate(exact_day_tm_list):
start_5mins_of_day = np.arange(start_day_tm,
start_day_tm + const.SEC_DAY,
const.SEC_5M)
if idx == 0:
start_5mins_of_diff_days = np.array([start_5mins_of_day])
else:
start_5mins_of_diff_days = np.vstack(
(start_5mins_of_diff_days, start_5mins_of_day))
return start_5mins_of_diff_days
def find_tps_tm_idx(
given_tm: float, start_5mins_of_diff_days: List[List[float]]) \
-> Tuple[float, float]:
"""
FROM handling_data.find_tps_tm_idx()
Find the position of the given time (given_tm) in time-power-squared plot
:param given_tm: float - given time
:param start_5mins_of_diff_days: [[288 of floats], ] - the list of
start of all five minutes of some specific days in which each day has
288 of 5 minutes.
:return x_idx: int - index of 5m section
:return y_idx: int - index of the day the given time belong to in plotting
"""
x_idx = None
y_idx = None
for day_idx, a_day_5mins in enumerate(start_5mins_of_diff_days):
for start_5m_idx, start_5m in enumerate(a_day_5mins):
if start_5m > given_tm:
# index of day start from 0 to negative because day is plotted
# from top to bottom
y_idx = - day_idx
x_idx = start_5m_idx - 1
if start_5m_idx == 0:
# if the start_5m_idx == 0, the given time belong to the
# last 5m of the previous day
y_idx = -(day_idx - 1)
x_idx = const.NO_5M_DAY - 1
break
if x_idx is not None:
break
if x_idx is None:
# x_idx == None happens when the given time fall into the last 5m of
# the last day. Although the time 24:00 of the last day belongs
# to the next days of other cases, but since there is no more days to
# plot it, it is no harm to set it at the last 5m of the last day.
x_idx = const.NO_5M_DAY - 1
y_idx = - (len(start_5mins_of_diff_days) - 1)
return x_idx, y_idx
def get_tps_for_discontinuous_data(
channel_data: Dict,
start_5mins_of_diff_days: List[List[float]]) -> np.ndarray:
"""
First loop: look in times for indexes for each block of 5m of each day.
Because data is discontinuous, some block might have no data points.
Second loop: For each 5m block, calculate mean of all square of data in
that block (mean_square). For the blocks that have no data points,
use the mean of all square of data in the previous and next blocks if
they both have data or else the mean_square will be zero.
:param channel_data: dictionary that keeps data of a waveform channel
:param start_5mins_of_diff_days: the list of starts of all five minutes
of days in which each day has 288 of 5 minutes.
:return: array of mean square of five-minute data that are separated into
days
"""
times = channel_data['tracesInfo'][0]['times']
data = channel_data['tracesInfo'][0]['data']
# create list of indexes for data points of each block of 5m data separated
# into different days
tps_idxs = []
for start5m_of_a_day in start_5mins_of_diff_days:
tps_idxs.append([])
for start5m in start5m_of_a_day:
end5m = start5m + const.SEC_5M
indexes = np.where((start5m <= times) & (times < end5m))[0]
tps_idxs[-1].append(indexes)
# based on tps_idxs, calculated mean square for each 5m data separated into
# different days
tps_data = []
for day_idx in range(len(tps_idxs)):
tps_data.append([])
for idx_5m in range(len(tps_idxs[day_idx])):
try:
indexes = tps_idxs[day_idx][idx_5m]
if len(indexes) == 0:
# No data point, check both sides, if have data points then
# calculate mean square of both sides' data points
prev_indexes = tps_idxs[day_idx][idx_5m - 1]
if idx_5m < len(tps_idxs[day_idx]) - 1:
next_indexes = tps_idxs[day_idx][idx_5m + 1]
else:
# current 5m block is the last one, the right side
# is the first 5m block of the next day
next_indexes = tps_idxs[day_idx + 1][0]
if len(prev_indexes) != 0 and len(next_indexes) != 0:
indexes = np.hstack((prev_indexes, next_indexes))
if len(indexes) == 0:
mean_square = 0
else:
data5m = data[indexes]
mean_square = np.mean(np.square(data5m))
except IndexError:
mean_square = 0
tps_data[-1].append(mean_square)
return np.array(tps_data)
def get_tps_for_continuous_data(channel_data: Dict,
start_5mins_of_diff_days: List[List[float]],
start_time, end_time):
"""
Different from soh_data where times and data are each in one np.array,
in waveform_data, times and data are each kept in a list of np.memmap
files along with startTmEpoch and endTmEpoch.
self.channel_data['startIdx'] and self.channel_data['endIdx'] will be
used to exclude np.memmap files that aren't in the zoom time range
(startTm, endTm). Data in np.memmap will be trimmed according to times
then time-power-square value for each 5 minutes will be calculated and
saved in channel_data['tps-data']: np.mean(np.square(5m data))
"""
# preset all 0 for all 5 minutes for each day
tps_data = np.zeros((len(start_5mins_of_diff_days),
const.NO_5M_DAY))
spr = channel_data['samplerate']
channel_data['tps_data'] = []
start_tps_tm = 0
acc_data_list = []
for tr_idx, tr in enumerate(channel_data['tracesInfo']):
if 'data_f' in tr:
times = np.linspace(tr['startTmEpoch'], tr['endTmEpoch'],
tr['size'])
data = np.memmap(tr['data_f'],
dtype='int64', mode='r',
shape=tr['size'])
else:
times = tr['times']
data = tr['data']
start_index = 0
if tr_idx == 0:
# get index of times with closet value to startTm
start_index = np.abs(times - start_time).argmin()
start_tps_tm = times[start_index]
# identify index in case of overlaps or gaps
index = np.where(
(start_5mins_of_diff_days <= times[start_index]) &
(start_5mins_of_diff_days + const.SEC_5M > times[start_index])
# noqa: E501
)
curr_row = index[0][0]
curr_col = index[1][0]
next_tps_tm = start_tps_tm + const.SEC_5M
while end_time >= next_tps_tm:
next_index = int(start_index + spr * const.SEC_5M)
if next_index >= tr['size']:
acc_data_list.append(data[start_index:tr['size']])
break
else:
acc_data_list.append(
np.square(data[start_index:next_index]))
acc_data = np.hstack(acc_data_list)
if acc_data.size == 0:
tps_data[curr_row, curr_col] = 0
else:
tps_data[curr_row, curr_col] = np.mean(acc_data)
start_index = next_index
curr_col += 1
acc_data_list = []
if curr_col == const.NO_5M_DAY:
curr_col = 0
curr_row += 1
next_tps_tm += const.SEC_5M
return tps_data
......@@ -3,7 +3,8 @@ from typing import Dict, Optional, List
import numpy as np
from PySide2 import QtCore
from sohstationviewer.conf import constants as const
from sohstationviewer.view.plotting.time_power_squared_helper import \
get_tps_for_discontinuous_data
class TimePowerSquaredProcessorSignal(QtCore.QObject):
......@@ -76,75 +77,9 @@ class TimePowerSquaredProcessor(QtCore.QRunnable):
saved in channel_data['tps-data']: np.mean(np.square(5m data))
"""
trimmed_traces_list = self.trim_waveform_data()
self.channel_data['tps_data'] = get_tps_for_discontinuous_data(
self.channel_data, self.start_5mins_of_diff_days)
# preset all 0 for all 5 minutes for each day
tps_data = np.zeros((len(self.start_5mins_of_diff_days),
const.NO_5M_DAY))
spr = self.channel_data['samplerate']
self.channel_data['tps_data'] = []
start_tps_tm = 0
acc_data_list = []
for tr_idx, tr in enumerate(trimmed_traces_list):
self.stop_lock.lock()
if self.stop:
self.stop_lock.unlock()
return self.signals.stopped.emit('')
self.stop_lock.unlock()
if 'data_f' in tr:
times = np.linspace(tr['startTmEpoch'], tr['endTmEpoch'],
tr['size'])
data = np.memmap(tr['data_f'],
dtype='int64', mode='r',
shape=tr['size'])
else:
times = tr['times']
data = tr['data']
start_index = 0
if tr_idx == 0:
# get index of times with closet value to startTm
start_index = np.abs(times - self.start_time).argmin()
start_tps_tm = times[start_index]
# identify index in case of overlaps or gaps
index = np.where(
(self.start_5mins_of_diff_days <= times[start_index]) &
(self.start_5mins_of_diff_days + const.SEC_5M > times[start_index]) # noqa: E501
)
curr_row = index[0][0]
curr_col = index[1][0]
next_tps_tm = start_tps_tm + const.SEC_5M
while self.end_time >= next_tps_tm:
self.stop_lock.lock()
if self.stop:
self.stop_lock.unlock()
return self.signals.stopped.emit('')
self.stop_lock.unlock()
next_index = int(start_index + spr * const.SEC_5M)
if next_index >= tr['size']:
acc_data_list.append(data[start_index:tr['size']])
break
else:
acc_data_list.append(
np.square(data[start_index:next_index]))
acc_data = np.hstack(acc_data_list)
if acc_data.size == 0:
tps_data[curr_row, curr_col] = 0
else:
tps_data[curr_row, curr_col] = np.mean(acc_data)
start_index = next_index
curr_col += 1
acc_data_list = []
if curr_col == const.NO_5M_DAY:
curr_col = 0
curr_row += 1
next_tps_tm += const.SEC_5M
self.channel_data['tps_data'] = tps_data
self.signals.finished.emit(self.channel_id)
def request_stop(self):
......
......@@ -9,8 +9,6 @@ from sohstationviewer.view.util.plot_func_names import plot_functions
from sohstationviewer.view.plotting.plotting_widget.\
multi_threaded_plotting_widget import MultiThreadedPlottingWidget
from sohstationviewer.controller.util import apply_convert_factor
class WaveformWidget(MultiThreadedPlottingWidget):
"""
......@@ -31,9 +29,9 @@ class WaveformWidget(MultiThreadedPlottingWidget):
:param time_ticks_total: max number of tick to show on time bar
"""
self.data_object = d_obj
self.plotting_data1 = d_obj.waveform_data[key]
self.plotting_data2 = d_obj.mass_pos_data[key]
data_time = d_obj.data_time[key]
self.plotting_data1 = d_obj.waveform_data[key] if key else {}
self.plotting_data2 = d_obj.mass_pos_data[key] if key else {}
data_time = d_obj.data_time[key] if key else [0, 1]
return super().init_plot(d_obj, data_time, key, start_tm, end_tm,
time_ticks_total, is_waveform=True)
......@@ -51,7 +49,7 @@ class WaveformWidget(MultiThreadedPlottingWidget):
return
chan_db_info = c_data['chan_db_info']
plot_type = chan_db_info['plotType']
apply_convert_factor(c_data, chan_db_info['convertFactor'])
# refer to doc string for mass_pos_data to know the reason for 'ax_wf'
if 'ax_wf' not in c_data:
ax = getattr(self.plotting, plot_functions[plot_type][1])(
......@@ -87,7 +85,7 @@ class WaveformDialog(QtWidgets.QWidget):
data_type: str - type of data being plotted
"""
self.data_type = None
self.setGeometry(300, 300, 1200, 700)
self.setGeometry(50, 10, 1600, 700)
self.setWindowTitle("Raw Data Plot")
main_layout = QtWidgets.QVBoxLayout()
......