Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
Commits on Source (10)
Showing
with 676 additions and 85 deletions
<img alt="Whole Section" src="images/select_waveform/whole_section.png" width="250" />
<br />
# Selecting Waveforms
---------------------------
---------------------------
## Selecting all waveform channels
Check the checkbox "All Waveform Channels"
<br />
<img alt="Select all waveform channels" src="images/select_waveform/check_all_waveform_channels.png" height="40" />
<br />
## Selecting no waveform channels
Unchecked the checkbox "All Waveform Channels", unchecked all "DSs" checkboxes, clear textbox "MSeed Wildcards".
<br />
<img alt="Select no waveform channels" src="images/select_waveform/check_none_waveform_channels.png" width="250" />
<br />
## Selecting data streams for Reftek
Check some of "DSs" checkboxes with the index corresponding to the data streams you want to select.
<br />
<img alt="Select data streams" src="images/select_waveform/select_data_stream.png" width="250" />
<br />
If some "DSs" are checked, but data type isn't Reftek and either TPS or RAW are checked, a warning will be created, "Checked data streams will be ignored for none-RT130 data type."
## Selecting mseed's waveform
User can add different wildcards separated with commas to MSeed Wildcard textbox.
For example: \*, LH*, L*1.
<br />
<img alt="Select data streams" src="images/select_waveform/mseed_wildcards.png" width="250" />
<br />
If some wildcard are added, but data type is Reftek and either TPS or RAW are checked, a warning will be created, "Checked data streams will be ignored for RT130 data type."
## Displaying waveform channels
If one of TPS or RAW checkboxes aren't checked which means no data need to be displayed, all the waveform selected will be ignored.
To display waveform channels, user need to check:
+ <img alt="TPS" src="images/select_waveform/select_TPS.png" height="30" />: to diplay Time-Power-Squared of the selected waveform data
+ <img alt="RAW" src="images/select_waveform/select_RAW.png" height="30" />: and check RAW to display the actual selected waveform data.
<br />
\ No newline at end of file
documentation/images/select_waveform/check_all_waveform_channels.png

8.62 KiB

documentation/images/select_waveform/check_none_waveform_channels.png

15.6 KiB

documentation/images/select_waveform/mseed_wildcards.png

9.06 KiB

documentation/images/select_waveform/select_RAW.png

3.29 KiB

documentation/images/select_waveform/select_TPS.png

3.14 KiB

documentation/images/select_waveform/select_data_stream.png

11.1 KiB

documentation/images/select_waveform/warning_select_DSs.png

24.4 KiB

documentation/images/select_waveform/whole_section.png

30.8 KiB

# waveform pattern
WF_1ST = 'A-HLM-V'
WF_2ND = 'HLN'
WF_3RD = 'ZNE123'
# to calc min()
HIGHEST_INT = 1E100
......
import re
from sohstationviewer.conf.constants import (WF_1ST, WF_2ND, WF_3RD)
"""
seisRE: Seismic data channels' regex:
First letter(Band Code): ABCDEFGHLMOPQRSTUV
......@@ -10,8 +11,7 @@ Third letter (Orientation Code): ZNE123
dbConf = {
'dbpath': 'sohstationviewer/database/soh.db',
'seisRE': re.compile('[A-HLM-V][HLN][ZNE123]'),
'wfReq': re.compile('^[A-HLM-V\*]([HLN\*][ZNE123\*]?)?$'), # noqa: W605
'seisRE': re.compile(f'[{WF_1ST}][{WF_2ND}][{WF_3RD}]'),
# key is last char of chan
'seisLabel': {'1': 'NS', '2': 'EW', 'N': 'NS', 'E': 'EW', 'Z': 'V'},
# +0.2:Y
......
......@@ -43,8 +43,6 @@ def display_tracking_info(tracking_box: QTextBrowser, text: str,
:param type: (info/warning/error) type of info to be displayed in
different color
"""
print(text)
return
if tracking_box is None:
print(f"{type.name}: {text}")
return
......
......@@ -3,12 +3,13 @@ import os
from tempfile import mkdtemp
import shutil
from typing import List, Tuple, Dict, Optional, Union
from typing import Optional, Union, List, Tuple, Dict
from PySide2 import QtCore
from sohstationviewer.controller.util import display_tracking_info
from sohstationviewer.conf import constants
from sohstationviewer.model.gps_point import GPSPoint
from sohstationviewer.view.util.enums import LogType
from sohstationviewer.database.process_db import execute_db
......@@ -109,7 +110,7 @@ class DataTypeModel():
'read': file has been read or not - bool
}]
}
'read_data': {
'readData': {
chan_id - str: {
'samplerate': Sample rate of the data - float
'tracesInfo': [{
......@@ -234,6 +235,8 @@ class DataTypeModel():
self._pauser = QtCore.QSemaphore()
self.pause_response = None
self.gps_points: List[GPSPoint] = []
def __del__(self):
print("delete dataType Object")
try:
......
from typing import NamedTuple
class GPSPoint(NamedTuple):
"""
The metadata and location data of a GPS data point.
"""
last_timemark: str
fix_type: str
num_satellite_used: int
latitude: float
longitude: float
height: float
height_unit: str
def is_bad_point(self) -> bool:
"""
Return True if this point is a bad point and False otherwise. A point
is bad if all its location data and number of satellites used are 0.
"""
return (self.num_satellite_used == 0 and self.latitude == 0 and
self.longitude == 0 and self.height == 0)
......@@ -3,6 +3,7 @@ Functions that help processing model data
"""
from struct import unpack
from pathlib import Path
import re
from typing import Dict, Callable, Tuple, List, Union, IO, Optional
import numpy as np
......@@ -381,15 +382,21 @@ def check_wf_chan(chan_id: str, req_wf_chans: List[str]) -> Tuple[str, bool]:
'WF' if chan_id is a waveform channel.
:return has_chan: bool - True if chan_id is a requested waveform channel.
"""
wf = ''
has_chan = False
if dbConf['seisRE'].match(chan_id):
wf = 'WF'
if req_wf_chans == ['*']:
has_chan = True
if chan_id in req_wf_chans:
has_chan = True
return wf, has_chan
if not dbConf['seisRE'].match(chan_id):
return '', False
for req in req_wf_chans:
if len(req) == 1:
req = req.replace('*', '...')
elif len(req) == 2:
req = req.replace('*', '..')
elif len(req) == 3:
req = req.replace('*', '.')
if re.compile(f'^{req}$').match(chan_id):
return 'WF', True
return 'WF', False
def sort_data(data_dict: Dict) -> None:
......@@ -397,11 +404,10 @@ def sort_data(data_dict: Dict) -> None:
Sort data in 'traces_info' in 'startTmEpoch' order
:param data_dict: DataTypeModel.__init__.waveform_data
"""
for sta_id in data_dict:
for chan_id in data_dict[sta_id]['readData']:
traces_info = data_dict[sta_id]['readData'][chan_id]['tracesInfo']
traces_info = sorted(
traces_info, key=lambda i: i['startTmEpoch'])
for chan_id in data_dict:
traces_info = data_dict[chan_id]['tracesInfo']
data_dict[chan_id]['tracesInfo'] = sorted(
traces_info, key=lambda i: i['startTmEpoch'])
def squash_gaps(gaps: List[List[float]]) -> List[List[float]]:
......
"""
MSeed object to hold and process MSeed data
"""
import functools
import operator
import os
from pathlib import Path
from typing import Dict, Tuple, List, Set
......@@ -11,8 +12,9 @@ from obspy.core import Stream
from sohstationviewer.conf import constants
from sohstationviewer.controller.util import validate_file
from sohstationviewer.model.data_type_model import DataTypeModel, ThreadStopped
from sohstationviewer.model.gps_point import GPSPoint
from sohstationviewer.model.handling_data import (
read_waveform_mseed, squash_gaps, check_wf_chan, sort_data, read_soh_trace,
read_waveform_mseed, squash_gaps, sort_data, read_soh_trace,
)
from sohstationviewer.model.mseed.from_mseedpeek.mseed_header import read_hdrs
from sohstationviewer.view.util.enums import LogType
......@@ -56,6 +58,8 @@ class MSeed(DataTypeModel):
if len(self.req_wf_chans) != 0:
self.read_wf_files(self.selected_key)
self.get_gps_data_q330()
def read_soh_and_index_waveform(self, folder: str):
"""
+ read waveform data for filename associate with time range
......@@ -103,11 +107,11 @@ class MSeed(DataTypeModel):
if count % 50 == 0:
self.track_info(
f'Read {count} file headers/ SOH files', LogType.INFO)
ret = read_hdrs(
path2file, file_name, soh_streams, self.log_data,
self.req_soh_chans, self.req_wf_chans,
self.nets_in_file, self.track_info)
if ret is None:
continue
......@@ -277,10 +281,6 @@ class MSeed(DataTypeModel):
count = 0
for chan_id in self.waveform_data[sta_id]['filesInfo']:
# check chan_id
has_chan = check_wf_chan(chan_id, self.req_wf_chans)
if not has_chan:
continue
traces_info = self.waveform_data[sta_id][
'readData'][chan_id]['tracesInfo']
......@@ -293,10 +293,11 @@ class MSeed(DataTypeModel):
# check time
has_data = False
# formatter:off
if ((self.read_start <= file_info['startEpoch'] <= self.read_end) or # noqa: E501
(self.read_start <= file_info['endEpoch'] <= self.read_end)): # noqa: E501
has_data = True
# formatter:on
if not has_data:
continue
read_waveform_mseed(file_info['path2file'],
......@@ -309,4 +310,147 @@ class MSeed(DataTypeModel):
self.track_info(
f'Read {count} waveform files', LogType.INFO)
sort_data(self.waveform_data)
sort_data(self.waveform_data[sta_id]['readData'])
def get_gps_data_q330(self):
"""
Read LOG channel and extract GPS data stored in Q330 data set.
"""
for station in self.log_data:
if station == 'TEXT':
continue
# Q330 log data is composed of a list of string, so we combine them
# into one big string for ease of processing.
log_str = functools.reduce(
operator.iconcat, self.log_data[station]['LOG'], ''
)
log_lines = [line for line in log_str.splitlines() if line != '']
for idx, line in enumerate(log_lines):
if line == "GPS Status":
try:
# We are assuming that a GPS status report is 12 lines
# long, has a specific format, and is followed by a PLL
# status report. This method call checks if these
# preconditions hold and raise an error if not.
self.check_gps_status_format_q330(
log_lines[idx:idx + 13]
)
point = self.extract_gps_point_q330(
log_lines[idx:idx + 12]
)
self.gps_points.append(point)
except ValueError as e:
self.processing_log.append((e.args[0], LogType.ERROR))
@staticmethod
def extract_gps_point_q330(gps_status_lines: List[str]) -> GPSPoint:
"""
Extract the data from a set of log lines that encode a GPS data point.
:param gps_status_lines: a list of log lines that encodes a GPS data
point. Must have a length of 12.
:return a GPSPoint object that contains data encoded in
gps_status_lines.
"""
if len(gps_status_lines) != 12:
raise ValueError('The number of GPS log lines must be 12.')
# Last timemark and fix type are always available, so we have to get
# them before doing anything else.
last_timemark = gps_status_lines[11][19:]
fix_type = gps_status_lines[3][10:]
# If location data is missing, we set them to 0.
if gps_status_lines[5] == 'Latitude: ':
return GPSPoint(last_timemark, fix_type, 0, 0, 0, 0, '')
num_sats_used = int(gps_status_lines[8].split(': ')[1])
# Height is encoded as a float followed by the unit. We
# don't know how many characters the unit is composed of,
# so we have to loop through the height string backward
# until we can detect the end of the height value.
height_str: str = gps_status_lines[4].split(': ')[1]
# Start pass the end of the string and look backward one
# index every iteration so we don't have to add 1 to the
# final index.
current_idx = len(height_str)
current_char = height_str[current_idx - 1]
while current_char != '.' and not current_char.isnumeric():
current_idx -= 1
current_char = height_str[current_idx - 1]
height = float(height_str[:current_idx])
height_unit = height_str[current_idx:]
# Latitude and longitude are encoded in the format
# <degree><decimal minute><cardinal direction>. For
# latitude, <degree> has two characters, while for longitude, <degree>
# has three.
# To make the GPS points easier to plot, we convert the latitude and
# longitude to decimal degree at the cost of possible precision loss
# due to rounding error.
raw_latitude = gps_status_lines[5].split(': ')[1]
lat_degree = int(raw_latitude[:2])
lat_minute = float(raw_latitude[2:-1]) / 60
latitude = lat_degree + lat_minute
if raw_latitude[-1].lower() == 's':
latitude = -latitude
raw_longitude = gps_status_lines[6].split(': ')[1]
long_degree = int(raw_longitude[:3])
long_minute = float(raw_longitude[3:-1]) / 60
longitude = long_degree + long_minute
if raw_longitude[-1].lower() == 'w':
longitude = -longitude
gps_point = GPSPoint(last_timemark, fix_type, num_sats_used,
latitude, longitude, height, height_unit)
return gps_point
@staticmethod
def check_gps_status_format_q330(status_lines: List[str]):
"""
Check if the given set of log lines encode a GPS data point by
determining if they follow a specific format.
:param status_lines: a list of log lines. Must have a length of 13.
"""
if len(status_lines) != 13:
raise ValueError('The number of possible GPS log lines to check'
'must be 13.')
if status_lines[12].lower() != 'pll status':
raise ValueError(
'Q330 log data is malformed: '
'PLL status does not follow GPS status.'
)
if 'fix type' not in status_lines[3].lower():
raise ValueError(
'Q330 log data is malformed: '
'Fix type is not at expected position.'
)
if 'height' not in status_lines[4].lower():
raise ValueError(
'Q330 log data is malformed: '
'Height is not at expected position.'
)
if 'latitude' not in status_lines[5].lower():
raise ValueError(
'Q330 log data is malformed: '
'Latitude is not at expected position.'
)
if 'longitude' not in status_lines[6].lower():
raise ValueError(
'Q330 log data is malformed: '
'Longitude is not at expected position.'
)
if 'sat. used' not in status_lines[8].lower():
raise ValueError(
'Q330 log data is malformed: '
'Sat. Used is not at expected position.'
)
if 'last gps timemark' not in status_lines[11].lower():
raise ValueError(
'Q330 log data is malformed: '
'Last GPS timemark is not at expected position.'
)
......@@ -4,7 +4,7 @@ RT130 object to hold and process RefTek data
import os
from pathlib import Path
from typing import Tuple, List
from typing import Tuple, List, Union
import numpy as np
......@@ -33,8 +33,9 @@ class RT130(DataTypeModel):
self.EH = {}
super().__init__(*args, **kwarg)
self.keys = set()
self.req_data_streams: List[int] = self.req_wf_chans
self.req_data_streams: List[Union[int, str]] = self.req_wf_chans
self.mass_pos_stream = {}
self.found_data_streams = []
if self.creator_thread.isInterruptionRequested():
raise ThreadStopped()
......@@ -50,6 +51,13 @@ class RT130(DataTypeModel):
raise ThreadStopped()
if len(self.req_wf_chans) != 0:
self.read_wf_files(self.selected_key)
if self.req_data_streams != ['*']:
not_found_data_streams = [ds for ds in self.req_data_streams
if ds not in self.found_data_streams]
if not_found_data_streams != []:
msg = (f"No data found for data streams: "
f"{', '.join(map(str, not_found_data_streams))}")
self.processing_log.append((msg, LogType.WARNING))
def read_soh_index_waveform(self, folder: str) -> None:
"""
......@@ -71,7 +79,6 @@ class RT130(DataTypeModel):
if count % 50 == 0:
self.track_info(
f'Read {count} file headers/ SOH files', LogType.INFO)
self.combine_data()
def select_key(self) -> Tuple[str, str]:
......@@ -135,7 +142,7 @@ class RT130(DataTypeModel):
if count % 50 == 0:
self.track_info(
f'Read {count} waveform files', LogType.INFO)
sort_data(self.waveform_data)
sort_data(self.waveform_data[key]['readData'])
def add_log(self, chan_pkt, log_info):
if chan_pkt not in self.log_data[self.cur_key].keys():
......@@ -194,9 +201,10 @@ class RT130(DataTypeModel):
be processed later
"""
data_stream = rt130._data['data_stream_number'][0] + 1
if data_stream not in self.req_data_streams + [9]:
if (self.req_data_streams != ['*'] and
data_stream not in self.req_data_streams + [9]):
return
self.found_data_streams.append(data_stream)
ind_ehet = [ind for ind, val in
enumerate(rt130._data["packet_type"])
if val in [b"EH"]] # only need event header
......@@ -221,7 +229,7 @@ class RT130(DataTypeModel):
if data_stream == 9:
self.read_mass_pos(rt130)
else:
self.index_wave_form(rt130, data_stream)
self.index_waveform(rt130, data_stream)
def read_mass_pos(self, rt130: core.Reftek130) -> None:
"""
......@@ -249,7 +257,7 @@ class RT130(DataTypeModel):
self.data_time[self.cur_key][1]
)
def index_wave_form(self, rt130: core.Reftek130, data_stream: int) -> None:
def index_waveform(self, rt130: core.Reftek130, data_stream: int) -> None:
"""
Indexing by adding rt130 object along with time range to
self.waveform_data[self.currKey]['filesInfo']
......
......@@ -3,9 +3,10 @@ import pathlib
import shutil
import traceback
from datetime import datetime
from typing import List
from typing import Union
from copy import deepcopy
from pathlib import Path
from typing import List, Tuple
from PySide2 import QtCore, QtWidgets, QtGui
......@@ -28,6 +29,7 @@ from sohstationviewer.view.search_message.search_message_dialog import (
from sohstationviewer.view.help_view import HelpBrowser
from sohstationviewer.view.ui.main_ui import UIMainWindow
from sohstationviewer.view.util.enums import LogType
from sohstationviewer.view.util.functions import check_chan_wildcards_format
from sohstationviewer.view.channel_prefer_dialog import ChannelPreferDialog
from sohstationviewer.controller.processing import detect_data_type
......@@ -56,6 +58,10 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.data_loader = DataLoader()
self.data_loader.finished.connect(self.replot_loaded_data)
"""
processing_log: [(message, type)] - record the progress of processing
"""
self.processing_log: List[Tuple[str, LogType]] = []
"""
forms_in_forms_menu: List of forms in forms_menu
"""
......@@ -210,16 +216,38 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
win.show()
@QtCore.Slot()
def all_chan_clicked(self):
def all_wf_chans_clicked(self):
if self.all_wf_chans_check_box.isChecked():
self.mseed_wildcard_edit.clear()
for cb in self.ds_check_boxes:
cb.setChecked(False)
@QtCore.Slot()
def data_stream_clicked(self):
ds_click = False
for cb in self.ds_check_boxes:
if cb.isChecked():
ds_click = True
break
if ds_click:
self.all_wf_chans_check_box.setChecked(False)
@QtCore.Slot()
def mseed_wildcard_changed(self):
if self.mseed_wildcard_edit.text().strip() != "":
self.all_wf_chans_check_box.setChecked(False)
@QtCore.Slot()
def all_soh_chans_clicked(self):
"""
When "All SOH" is checked,
+ If checked, clear current IDs textbox
+ If unchecked, set current IDs textbox if there is a preferred
channels list selected. If no list selected, re-check "All SOH"
"""
if not self.all_soh_chan_check_box.isChecked():
if not self.all_soh_chans_check_box.isChecked():
if self.ids == []:
self.all_soh_chan_check_box.setChecked(True)
self.all_soh_chans_check_box.setChecked(True)
else:
self.curr_soh_ids_name_line_edit.setText(self.ids_name)
else:
......@@ -264,6 +292,65 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
new_path = fd.selectedFiles()[0]
self.set_current_directory(new_path)
def get_requested_wf_chans(self) -> List[Union[str, int]]:
"""
Getting requested data stream for RT130 data or mseed wildcards for
non-RT130 data
:return req_wf_chans: list of data streams or list of mseed wildcards
:rtype: List[str, int]
"""
req_wf_chans = []
if self.tps_check_box.isChecked() or self.raw_check_box.isChecked():
if self.all_wf_chans_check_box.isChecked():
req_mseed_wildcards = ['*']
req_dss = ['*'] # all data stream
else:
req_dss = []
req_mseed_wildcards = []
for idx, ds_checkbox in enumerate(self.ds_check_boxes):
if ds_checkbox.isChecked():
req_dss.append(idx + 1)
if self.mseed_wildcard_edit.text().strip() != "":
req_mseed_wildcards = self.mseed_wildcard_edit.text(
).split(",")
if self.data_type == 'RT130':
req_wf_chans = req_dss
if req_dss != ['*'] and req_mseed_wildcards != []:
msg = 'MSeed Wildcards will be ignored for RT130.'
self.processing_log.append((msg, LogType.WARNING))
else:
req_wf_chans = req_mseed_wildcards
if req_mseed_wildcards != ['*'] and req_dss != []:
msg = ('Checked data streams will be ignored for '
'none-RT130 data type.')
self.processing_log.append((msg, LogType.WARNING))
return req_wf_chans
def get_requested_soh_chan(self):
"""
Getting requested soh channels from preferred channel dialog
:return list of requested soh channels
:rtype: List[str]
"""
if (not self.all_soh_chans_check_box.isChecked() and
self.data_type != self.ids_data_type):
msg = (f"DataType detected for the selected data set is "
f"{self.data_type} which is different to IDs' "
f"{self.ids_data_type}.\n"
f"SOHStationViewer will read all data available.\n"
f"Do you want to continue?")
result = QtWidgets.QMessageBox.question(
self, "Confirmation", msg,
QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No)
if result == QtWidgets.QMessageBox.No:
return
self.all_soh_chans_check_box.setChecked(True)
self.curr_soh_ids_name_line_edit.setText('')
return []
return (self.ids
if not self.all_soh_chans_check_box.isChecked() else [])
@QtCore.Slot()
def read_selected_files(self):
"""
......@@ -278,16 +365,18 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
return
self.has_problem = False
try:
check_chan_wildcards_format(self.mseed_wildcard_edit.text())
except Exception as e:
QtWidgets.QMessageBox.warning(self, "Incorrect Wildcard", str(e))
return
try:
del self.data_object
self.processing_log = []
self.clear_actions_from_forms_menu()
except AttributeError:
pass
self.req_soh_chans = (self.ids
if not self.all_soh_chan_check_box.isChecked()
else [])
self.dir_names = [
Path(self.curr_dir_line_edit.text()).joinpath(item.text())
for item in self.open_files_list.selectedItems()]
......@@ -300,34 +389,8 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
if self.data_type is None:
return
# get req_soh_chans
if (not self.all_soh_chan_check_box.isChecked() and
self.data_type != self.ids_data_type):
msg = (f"DataType detected for the selected data set is "
f"{self.data_type} which is different to IDs' "
f"{self.ids_data_type}.\n"
f"SOHStationViewer will read all data available.\n"
f"Do you want to continue?")
result = QtWidgets.QMessageBox.question(
self, "Confirmation", msg,
QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No)
if result == QtWidgets.QMessageBox.No:
return
self.all_soh_chan_check_box.setChecked(True)
self.curr_soh_ids_name_line_edit.setText('')
self.req_soh_chans = []
self.req_wf_chans = []
if self.data_type == 'RT130':
req_dss = []
for idx, ds_checkbox in enumerate(self.ds_check_boxes):
if ds_checkbox.isChecked():
req_dss.append(idx + 1)
self.req_wf_chans = req_dss
else:
if self.wf_all_check_box.isChecked():
self.req_wf_chans = ['*']
elif self.wf_chans_line_edit.text().strip() != "":
self.req_wf_chans = self.wf_chans_line_edit.text().split(",")
self.req_soh_chans = self.get_requested_soh_chan()
self.req_wf_chans = self.get_requested_wf_chans()
start_tm_str = self.time_from_date_edit.date().toString(
QtCore.Qt.ISODate)
end_tm_str = self.time_to_date_edit.date().toString(QtCore.Qt.ISODate)
......@@ -428,14 +491,13 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
if len(self.req_wf_chans) != 0:
wf_data = deepcopy(do.waveform_data[sel_key]['readData'])
else:
wf_data = []
wf_data = {}
try:
self.plotting_widget.plot_channels(
self.start_tm, self.end_tm, sel_key,
do.data_time[sel_key], soh_chans, time_tick_total,
soh_data, mp_data, do.gaps[sel_key])
except Exception:
print("Failed ")
fmt = traceback.format_exc()
msg = f"Can't plot SOH data due to error: {str(fmt)}"
display_tracking_info(self.tracking_info_text_browser, msg,
......@@ -459,7 +521,7 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
else:
self.tps_dlg.hide()
if self.req_wf_chans != []:
if self.raw_check_box.isChecked():
self.is_plotting_waveform = True
# waveformPlot
peer_plotting_widgets.append(self.waveform_dlg.plotting_widget)
......@@ -480,9 +542,9 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.tps_dlg.plotting_widget.set_peer_plotting_widgets(
peer_plotting_widgets)
processing_log = (
do.processing_log + self.plotting_widget.processing_log
)
processing_log = (self.processing_log +
do.processing_log +
self.plotting_widget.processing_log)
self.search_message_dialog.setup_logview(
sel_key, do.log_data, processing_log)
self.search_message_dialog.show()
......
import math
import sys
import traceback
from pathlib import Path
from typing import List, Optional
from sohstationviewer.controller.util import display_tracking_info
from sohstationviewer.model.gps_point import GPSPoint
from sohstationviewer.model.mseed.mseed import MSeed
from PySide2 import QtWidgets, QtCore
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as Canvas
from matplotlib.figure import Figure
from matplotlib.axes import Axes
from sohstationviewer.view.util.enums import LogType
class GPSWidget(QtWidgets.QWidget):
def __init__(self, parent: QtWidgets.QWidget,
tracking_box: QtWidgets.QTextBrowser):
super().__init__(parent)
self.tracking_box = tracking_box
self.gps_points: Optional[List[GPSPoint]] = None
self.fig = Figure(figsize=(6, 6), dpi=100)
self.canvas = Canvas(self.fig)
self.canvas.mpl_connect('pick_event', self.on_pick_event)
self.ax: Axes = self.fig.add_axes((0, 0, 1, 1))
self.ax.set_facecolor("dimgray")
# Force the plot to be square.
self.ax.set_box_aspect(1)
# Remove all traces of the axes, leaving only the plot.
self.ax.set_yticks([])
self.ax.set_xticks([])
self.ax.spines['right'].set_visible(False)
self.ax.spines['left'].set_visible(False)
self.ax.spines['top'].set_visible(False)
self.ax.spines['bottom'].set_visible(False)
@QtCore.Slot()
def plot_gps(self):
"""
Plot the GPS points onto the canvas. Display the latitude and longitude
range in meters and adjust the x and y limits of the plot so that the
axis is square.
"""
all_latitudes = [point.latitude for point in self.gps_points]
all_longitudes = [point.longitude for point in self.gps_points]
max_latitude = max(all_latitudes)
min_latitude = min(all_latitudes)
max_longitude = max(all_longitudes)
min_longitude = min(all_longitudes)
latitude_range = convert_latitude_degree_to_meter(
max_latitude - min_latitude
)
# qpeek uses the average latitude in this conversion, so we copy it.
longitude_range = convert_longitude_degree_to_meter(
max_longitude - min_longitude,
(max_latitude + min_latitude) / 2
)
self.ax.plot(all_longitudes, all_latitudes, 'ws', markersize=4,
markeredgecolor='black', picker=True, pickradius=4)
range_text = (f'Lat range: {latitude_range:.2f}m '
f'Long range: {longitude_range:.2f}m')
self.ax.text(0.01, 0.96, range_text, color='white', size=10,
transform=self.ax.transAxes
)
# Stretch the shorter axis so that it is as long as the longer axis.
latitude_range = max_latitude - min_latitude
longitude_range = max_longitude - min_longitude
if latitude_range > longitude_range:
longitude_average = (min_longitude + max_longitude) / 2
min_longitude = longitude_average - latitude_range / 2
max_longitude = longitude_average + latitude_range / 2
self.ax.set_xlim(min_longitude, max_longitude)
else:
latitude_average = (min_latitude + max_latitude) / 2
min_latitude = latitude_average - longitude_range / 2
max_latitude = latitude_average + longitude_range / 2
self.ax.set_ylim(min_latitude, max_latitude)
self.canvas.draw()
self.repaint()
msg = f'Plotted {len(self.gps_points)} points.'
display_tracking_info(self.tracking_box, msg)
def on_pick_event(self, event):
"""
On a GPS point being picked, display the data of that point onto the
tracking box.
:param event: the pick event
"""
# index of the clicked point on the plot
click_plot_index = event.ind[0]
picked_point = self.gps_points[click_plot_index]
lat_dir = 'N' if picked_point.latitude > 0 else 'S'
long_dir = 'E' if picked_point.longitude > 0 else 'W'
meta_separator = '&nbsp;' * 22
loc_separator = '&nbsp;' * 10
msg = (
f'Mark: {picked_point.last_timemark}{meta_separator}'
f'Fix: {picked_point.fix_type}{meta_separator}'
f'Sats: {picked_point.num_satellite_used}<br>'
f'Lat: {lat_dir}{abs(picked_point.latitude):.6f}{loc_separator}'
f'Long: {long_dir}{abs(picked_point.longitude):.6f}{loc_separator}'
f'Elev: {picked_point.height}{picked_point.height_unit}'
)
display_tracking_info(self.tracking_box, msg)
class GPSDialog(QtWidgets.QWidget):
def __init__(self, parent: QtWidgets.QWidget = None):
super().__init__()
self.parent = parent
self.data_path: Path = None
self.info_text_browser = QtWidgets.QTextBrowser(self)
self.plotting_widget = GPSWidget(self, self.info_text_browser)
self.read_button = QtWidgets.QPushButton('Read/Plot', self)
self.export_button = QtWidgets.QPushButton('Export', self)
self.close_button = QtWidgets.QPushButton('Close', self)
self.setup_ui()
def setup_ui(self):
"""
Set up the user interface of the dialog.
"""
# The height of the dialog is chosen so that the GPS plot is flush with
# the edges of the dialog.
dialog_height = 518
self.setGeometry(300, 300, 400, dialog_height)
self.setWindowTitle("GPS Plot")
main_layout = QtWidgets.QVBoxLayout()
self.setLayout(main_layout)
main_layout.setContentsMargins(0, 0, 0, 0)
main_layout.setSpacing(0)
main_layout.addWidget(self.plotting_widget.canvas)
# Add a 1-pixel high widget to the main layout so that there is a
# visible border between the GPS plot and the button. QTextBrowser
# is used because it is known to have a visible border.
invisible_widget = QtWidgets.QTextBrowser()
invisible_widget.setFixedHeight(1)
main_layout.addWidget(invisible_widget)
button_layout = QtWidgets.QHBoxLayout()
button_layout.setContentsMargins(100, 0, 100, 0)
button_layout.setStretch(1, 3)
button_layout.setSpacing(10)
self.read_button.clicked.connect(self.plotting_widget.plot_gps)
button_layout.addWidget(self.read_button)
self.export_button.clicked.connect(self.export_gps_points)
button_layout.addWidget(self.export_button)
self.close_button.setStyleSheet('QPushButton {color: red;}')
self.close_button.clicked.connect(self.close)
button_layout.addWidget(self.close_button)
bottom_layout = QtWidgets.QVBoxLayout()
bottom_layout.addLayout(button_layout)
self.info_text_browser.setFixedHeight(42)
self.info_text_browser.setHorizontalScrollBarPolicy(
QtCore.Qt.ScrollBarAlwaysOff)
self.info_text_browser.setVerticalScrollBarPolicy(
QtCore.Qt.ScrollBarAlwaysOff)
self.info_text_browser.verticalScrollBar().setDisabled(True)
self.info_text_browser.horizontalScrollBar().setDisabled(True)
bottom_layout.addWidget(self.info_text_browser)
main_layout.addLayout(bottom_layout)
@property
def gps_points(self):
return self.plotting_widget.gps_points
@gps_points.setter
def gps_points(self, points):
self.plotting_widget.gps_points = [point
for point in points
if not point.is_bad_point()]
def export_gps_points(self):
"""
Export the data of the GPS points to a file. The file will be in the
same directory as the data folder.
The first line in the export file contains information about the data
set. Each subsequent line has the form
<YYYY-MM-DD HH:MM:SS> <fix type> <no. sats. used> <latitude:.6f> <longitude:.6f> <height:.6f> # noqa
and is tab-separated.
"""
self.info_text_browser.setHorizontalScrollBarPolicy(
QtCore.Qt.ScrollBarAsNeeded)
self.info_text_browser.setVerticalScrollBarPolicy(
QtCore.Qt.ScrollBarAsNeeded)
self.info_text_browser.verticalScrollBar().setDisabled(False)
self.info_text_browser.horizontalScrollBar().setDisabled(False)
folder_name = self.data_path.name
outside_folder = self.data_path.parent
export_file_path = outside_folder / f'{folder_name}.gps.dat'
try:
with open(export_file_path, 'w+') as outfile:
outfile.write(f'# GPS data points for {folder_name}\n')
for point in self.gps_points:
if point.is_bad_point():
continue
line = (
f'{point.last_timemark}\t{point.fix_type}\t'
f'{point.num_satellite_used}\t{point.latitude:+.6f}\t'
f'{point.longitude:+.6f}\t{point.height:.6f}\n'
)
outfile.write(line)
except OSError:
err_msg = traceback.format_exc()
msg = f"Can't export to file due to error: {err_msg}"
display_tracking_info(self.info_text_browser, msg,
LogType.ERROR)
else:
msg = f'Successfully exported to {export_file_path}'
display_tracking_info(self.info_text_browser, msg, LogType.INFO)
# We use the WGS-84's version of the Earth ellipsoid as a reference point for
# converting latitude and longitude differences from degree to meter.
# The constants below are obtained from here
# https://en.wikipedia.org/wiki/Earth_ellipsoid#Historical_Earth_ellipsoids.
# In order to use other models of the Earth ellipsoid, simply change these
# constants.
POLAR_CIRCUMFERENCE = 40007863
EQUATORIAL_CIRCUMFERENCE = 40075017
def convert_latitude_degree_to_meter(lat: float) -> float:
"""
Convert the given latitude from degree to meter.
:param lat: latitude given in degree
:return: the given latitude converted to meter
"""
# A whole circumference is 360 degrees, so we can get the length of one
# degree by dividing the circumference by 360.
lat_degree_length_in_meter = POLAR_CIRCUMFERENCE / 360
return lat * lat_degree_length_in_meter
def convert_longitude_degree_to_meter(long: float, lat: float) -> float:
"""
Convert the given longitude from degree to meter. Need to adjust for
latitude because the length of a longitude degree changes with the
latitude.
:param long: longitude given in degree
:param lat: the latitude to adjust for
:return: the given longitude converted to meter
"""
# A whole circumference is 360 degrees, so we can get the length of one
# degree by dividing the circumference by 360.
long_degree_length_in_meter = EQUATORIAL_CIRCUMFERENCE / 360
adjustment_for_latitude = math.cos(math.radians(lat))
return long * long_degree_length_in_meter * adjustment_for_latitude
if __name__ == '__main__':
import os
os.chdir('/Users/kle/PycharmProjects/sohstationviewer')
app = QtWidgets.QApplication(sys.argv)
data_path = '/Users/kle/PycharmProjects/sohstationviewer/tests/test_data/Q330-sample' # noqa: E501
# data_path = '/Users/kle/Documents/SOHView data/Q330/Q330_5281.sdr'
data = MSeed(QtWidgets.QTextBrowser(), data_path)
wnd = GPSDialog()
wnd.gps_points = data.gps_points
wnd.data_path = Path(data_path)
wnd.show()
sys.exit(app.exec_())
......@@ -22,7 +22,7 @@ class PlottingWidget(QtWidgets.QScrollArea):
events to serve user's purpose.
"""
def __init__(self, parent, tracking_box, name=''):
def __init__(self, parent, tracking_box, name):
"""
:param parent: QWidget/QMainWindow - widget that contains this plotting
widget
......@@ -309,19 +309,27 @@ class PlottingWidget(QtWidgets.QScrollArea):
When click mouse on the current plottingWidget, SOHView will loop
through different plottingWidgets to do the same task for
interaction:
+ shift+click: call on_shift_click() to do zooming
+ shift+click: call on_shift_click() to do zooming. This is
disregarded in TimePowerSquareWidget because it isn't subjected
to be zoomed in.
+ ctrl+click or cmd+click in mac: call on_ctrl_cmd_click() to show
ruler
:param event: button_press_event - mouse click event
"""
modifiers = event.guiEvent.modifiers()
try:
# on_pick() take place after on_pick_on_artist where
# tps_t was assigned in TimePowerSquareWidget
if self.plotting_data1 == {}:
return
if self.name == 'TPSWidget':
if modifiers == QtCore.Qt.ShiftModifier:
# not start zooming from TPSWidget
return
# on_button_press_event() take place after on_pick_event where
# tps_t was assigned in TPSWidget
xdata = self.tps_t
except AttributeError:
else:
xdata = self.get_timestamp(event)
for w in self.peer_plotting_widgets:
if modifiers == QtCore.Qt.ShiftModifier:
w.on_shift_click(xdata)
......