Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
Showing
with 1461 additions and 84 deletions
from typing import Optional
from sohstationviewer.view.util.color import clr
from sohstationviewer.view.util.plot_func_names import plot_functions
def convert_value_color_str(
plot_type: str, old_value_color_str: Optional[str]) -> str:
"""
Convert value_color str to new format. This will be removed after
value_colors in database changed
linesDots: L:G|D:G => Line:#00FF00|Dot:#00FF00
upDownDots: 0:R|1:G => Down:#FF0000|Up:#00FF00
multiColorDotsEqualOnUpperBound:
0:_|1:Y|2:G|+2:M => <=0:not plot|<=1:#FFFF00|<=2:#00FF00|2<:#FF00FF
multiColorDotsEqualOnLowerBound:
3:R|3.3:Y|=3.3:G => <3:#FF0000|<3.3:#FFFF00|=3.3:#00FF00
triColorLines:
-1:M|0:R|1:G => -1:#FF00FF|0:#FF0000|1:#00FF00
:param plot_type: type of channel's plot
:param old_value_color_str: value_color_str in old format
:return: value_color_str in new format
"""
if old_value_color_str is None:
return ""
value_color_list = []
if old_value_color_str == '' and plot_type == 'linesDots':
old_value_color_str = "L:G"
value_color_parts = old_value_color_str.split('|')
for c_str in value_color_parts:
val, color = c_str.split(':')
val = convert_value(plot_type, val)
if color == '_':
color = "not plot"
else:
if color in clr.keys():
color = clr[color]
value_color_list.append(f"{val}:{color}")
return '|'.join(value_color_list)
def convert_value(plot_type: str, old_value: str):
"""
Convert value part in value color str to new format
:param plot_type: type of channel's plot
:param old_value: value in old format
:return: value in new format
"""
if not plot_functions[plot_type]['value_pattern'].match(old_value):
return "unrecognized:" + old_value
if old_value in ['L', 'Line'] and plot_type == 'linesDots':
new_value = 'Line'
elif old_value in ['D', 'Dot'] and plot_type == 'linesDots':
new_value = 'Dot'
elif old_value in ['1', 'Up'] and plot_type == 'upDownDots':
new_value = 'Up'
elif old_value in ['0', 'Down'] and plot_type == 'upDownDots':
new_value = 'Down'
elif plot_type == "multiColorDotsEqualOnUpperBound":
if old_value.startswith('+'):
new_value = old_value[1:] + '<'
elif old_value.startswith('<=') or old_value.endswith('<'):
new_value = old_value
else:
new_value = '<=' + old_value
elif plot_type == "multiColorDotsEqualOnLowerBound":
if old_value.startswith('=') or old_value.startswith('<'):
new_value = old_value
else:
new_value = '<' + old_value
elif plot_type == 'triColorLines':
new_value = old_value
else:
new_value = "Sth wrong:" + old_value
return new_value
def prepare_value_color_html(value_colors: Optional[str]) -> str:
"""
Change value_color with hex color to html to square with actual color from
hex color.
:param value_colors: string for value color to be saved in DB.
Possible formats
Line:color|Dot:color
Up:color|Down:color
<=value:color|value<:color
(color can be hex color '#00FFF' or 'not plot')
:return: value color in html
Ex: <p>Line:<span style='#00FFFF; font-size:25px;'>&#8718;</span>|
Dot:<span style='#FF0000; font-size:25px;'>&#8718;</span></p>
"""
if value_colors in [None, ""]:
return ""
html_color_parts = []
color_parts = value_colors.split('|')
for c_str in color_parts:
value, color = c_str.split(':')
value = value.replace('<=', '&le;').replace('<', '&lt;')
if color == 'not plot':
c_html = f"{value}:not plot"
else:
c_html = (
f"{value}:"
f"<span style='color: {color}; font-size:25px;'>&#8718;"
f"</span>")
html_color_parts.append(c_html)
value_color_html = f"<p>{'|'.join(html_color_parts)}</p>"
return value_color_html
import sys
import platform
import os
from pathlib import Path
from typing import Optional
from PySide2 import QtWidgets, QtGui, QtCore
from PySide2.QtCore import Qt
from PySide2.QtWidgets import QWidget, QDialog, QTextEdit
from sohstationviewer.view.db_config.value_color_helper.functions import \
convert_value_color_str, prepare_value_color_html
class ValueColorWidget(QTextEdit):
def __init__(self, parent: Optional[QWidget], background: str):
"""
Widget to display valueColors and call a dialog to edit tha value
:param parent: the parent widget
:param background: 'B'/'W': flag indicating background color
"""
QtWidgets.QTextEdit.__init__(self, parent)
self.set_background(background)
# string for value color to be saved in DB
self.value_color_str: str = ''
# dialog that pop up when clicking on edit_button to help edit color
# and value
self.edit_value_color_dialog: Optional[QWidget] = None
# type of channel's plot
self.plot_type: str = ''
self.setReadOnly(True)
# change cursor to Arrow so user know they can't edit directly
self.viewport().setCursor(QtGui.QCursor(QtCore.Qt.ArrowCursor))
# to see all info
self.setFixedHeight(28)
self.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
self.setVerticalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
self.verticalScrollBar().setDisabled(True)
self.edit_button = QtWidgets.QToolButton(self)
self.edit_button.setCursor(Qt.PointingHandCursor)
current_file_path = os.path.abspath(__file__)
root_path = Path(current_file_path).parent.parent.parent.parent
if background == 'B':
img_file = f"{root_path}/images/edit_icon_black_background.png"
else:
img_file = f"{root_path}/images/edit_icon_white_background.png"
self.edit_button.setIcon(QtGui.QIcon(img_file))
self.edit_button.setStyleSheet(
"background: transparent; border: none;")
self.edit_button.clicked.connect(self.on_edit)
layout = QtWidgets.QHBoxLayout(self)
layout.addWidget(self.edit_button, 0, Qt.AlignRight)
layout.setSpacing(0)
layout.setMargin(5)
def set_background(self, background: str):
"""
Set black background for user to have better feeling how the colors
displayed on black background. Text and PlaceholderText's colors
have to be changed to be readable on the black background too.
:param background: 'B'/'W': sign for background color
"""
palette = self.palette()
if background == 'B':
palette.setColor(QtGui.QPalette.Text, Qt.white)
palette.setColor(QtGui.QPalette.Base, Qt.black)
palette.setColor(QtGui.QPalette.PlaceholderText, Qt.lightGray)
self.setPalette(palette)
def set_value_color(self, plot_type: str, value_color_str: str) \
-> None:
"""
Set value_color_str, value_color_edit_dialog and display value color
string in html to show color for user to have the feeling.
:param plot_type: type of channel's plot
:param value_color_str: string for value color to be saved in DB
"""
self.plot_type = plot_type
# Won't need to convert after database's valueColors are changed
self.value_color_str = convert_value_color_str(
plot_type, value_color_str)
value_color_html = prepare_value_color_html(self.value_color_str)
self.setHtml(value_color_html)
def on_edit(self):
print('edit value color')
class TestDialog(QDialog):
def __init__(self):
super(TestDialog, self).__init__(None)
main_layout = QtWidgets.QVBoxLayout()
self.setLayout(main_layout)
self.value_colorb_widget = ValueColorWidget(self, 'B')
main_layout.addWidget(self.value_colorb_widget)
self.value_colorw_widget = ValueColorWidget(self, 'W')
main_layout.addWidget(self.value_colorw_widget)
# linesDots
self.value_colorb_widget.set_value_color('linesDots', 'L:R|D:G')
self.value_colorw_widget.set_value_color('linesDots', 'L:R|D:G')
# triColorLines
# self.value_colorb_widget.set_value_color(
# 'triColorLines', '-1:M|0:R|1:G')
# self.value_colorw_widget.set_value_color(
# 'triColorLines', '-1:M|0:R|1:G')
# multiColorDotsEqualOnUpperBound
# self.value_colorb_widget.set_value_color(
# 'multiColorDotsEqualOnUpperBound', '0:R|1:Y|2:G|+2:M')
# self.value_colorw_widget.set_value_color(
# 'multiColorDotsEqualOnUpperBound', '0:R|1:Y|2:G|+2:M')
# multiColorDotsEqualOnLowerBound
# self.value_colorb_widget.set_value_color('multiColorDotsEqualOnLowerBound',
# '3:R|3.3:Y|=3.3:G')
# self.value_colorw_widget.set_value_color('multiColorDotsEqualOnLowerBound',
# '3:R|3.3:Y|=3.3:G')
if __name__ == '__main__':
os_name, version, *_ = platform.platform().split('-')
if os_name == 'macOS':
os.environ['QT_MAC_WANTS_LAYER'] = '1'
app = QtWidgets.QApplication(sys.argv)
test = TestDialog()
test.exec_()
sys.exit(app.exec_())
...@@ -36,8 +36,9 @@ from sohstationviewer.view.help_view import HelpBrowser ...@@ -36,8 +36,9 @@ from sohstationviewer.view.help_view import HelpBrowser
from sohstationviewer.view.ui.main_ui import UIMainWindow from sohstationviewer.view.ui.main_ui import UIMainWindow
from sohstationviewer.view.util.enums import LogType from sohstationviewer.view.util.enums import LogType
from sohstationviewer.view.util.functions import ( from sohstationviewer.view.util.functions import (
check_chan_wildcards_format, check_masspos, check_chan_wildcards_format, check_masspos)
) from sohstationviewer.view.util.check_file_size import check_folders_size
from sohstationviewer.view.channel_prefer_dialog import ChannelPreferDialog from sohstationviewer.view.channel_prefer_dialog import ChannelPreferDialog
from sohstationviewer.view.create_muti_buttons_dialog import ( from sohstationviewer.view.create_muti_buttons_dialog import (
create_multi_buttons_dialog create_multi_buttons_dialog
...@@ -47,7 +48,6 @@ from sohstationviewer.controller.processing import detect_data_type ...@@ -47,7 +48,6 @@ from sohstationviewer.controller.processing import detect_data_type
from sohstationviewer.controller.util import ( from sohstationviewer.controller.util import (
display_tracking_info, rt130_find_cf_dass, check_data_sdata display_tracking_info, rt130_find_cf_dass, check_data_sdata
) )
from sohstationviewer.database.process_db import execute_db_dict, execute_db from sohstationviewer.database.process_db import execute_db_dict, execute_db
from sohstationviewer.conf.constants import TM_FORMAT, ColorMode, CONFIG_PATH from sohstationviewer.conf.constants import TM_FORMAT, ColorMode, CONFIG_PATH
...@@ -574,7 +574,12 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow): ...@@ -574,7 +574,12 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
if self.list_of_dir == []: if self.list_of_dir == []:
msg = "No directories have been selected." msg = "No directories have been selected."
raise Exception(msg) raise Exception(msg)
if self.warn_big_file_sizes.isChecked():
# call check_folder_size() here b/c it requires list_of_dir and it
# is before the called for detect_data_type() which sometimes take
# quite a long time.
if not check_folders_size(self.list_of_dir, self.req_wf_chans):
raise Exception("Big size")
# Log files don't have a data type that can be detected, so we don't # Log files don't have a data type that can be detected, so we don't
# detect the data type if we are reading them. # detect the data type if we are reading them.
if self.rt130_das_dict == {} and not self.log_checkbox.isChecked(): if self.rt130_das_dict == {} and not self.log_checkbox.isChecked():
...@@ -624,22 +629,16 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow): ...@@ -624,22 +629,16 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.has_problem = False self.has_problem = False
if self.gap_len_line_edit.text().strip() != '': if self.gap_len_line_edit.text().strip() != '':
try: # convert from minute to second
# convert from minute to second minimum_gap_in_minutes = float(self.gap_len_line_edit.text())
self.gap_minimum = float( if minimum_gap_in_minutes < 0.1:
self.gap_len_line_edit.text()) * 60
except ValueError:
msg = "Minimum Gap must be a number."
QtWidgets.QMessageBox.warning(
self, "Invalid Minimum Gap request", msg)
self.cancel_loading()
return
if self.gap_minimum < 0.1:
msg = "Minimum Gap must be greater than 0.1 minute to be " \ msg = "Minimum Gap must be greater than 0.1 minute to be " \
"detected." "detected."
QtWidgets.QMessageBox.warning( QtWidgets.QMessageBox.warning(
self, "Invalid Minimum Gap request", msg) self, "Invalid Minimum Gap request", msg)
self.cancel_loading()
return return
self.gap_minimum = minimum_gap_in_minutes * 60
else: else:
self.gap_minimum = None self.gap_minimum = None
...@@ -680,6 +679,9 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow): ...@@ -680,6 +679,9 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.cancel_loading() self.cancel_loading()
return return
self.data_type == 'Unknown' self.data_type == 'Unknown'
elif str(e) == "Big size":
self.cancel_loading()
return
else: else:
fmt = traceback.format_exc() fmt = traceback.format_exc()
QtWidgets.QMessageBox.warning( QtWidgets.QMessageBox.warning(
......
...@@ -3,7 +3,7 @@ import sys ...@@ -3,7 +3,7 @@ import sys
import os import os
import traceback import traceback
from pathlib import Path from pathlib import Path
from typing import List, Optional, Literal, Iterable from typing import List, Optional, Literal, Iterable, Any
from PySide2 import QtWidgets, QtCore from PySide2 import QtWidgets, QtCore
from matplotlib.axes import Axes from matplotlib.axes import Axes
...@@ -31,7 +31,7 @@ class GPSWidget(QtWidgets.QWidget): ...@@ -31,7 +31,7 @@ class GPSWidget(QtWidgets.QWidget):
# one point for each coordinate. # one point for each coordinate.
self.unique_gps_points: Optional[List[GPSPoint]] = None self.unique_gps_points: Optional[List[GPSPoint]] = None
self.fig = Figure(figsize=(6, 6), dpi=100) self.fig = Figure(figsize=(6, 6), dpi=100, facecolor='#ECECEC')
self.canvas = Canvas(self.fig) self.canvas = Canvas(self.fig)
self.canvas.mpl_connect('pick_event', self.on_pick_event) self.canvas.mpl_connect('pick_event', self.on_pick_event)
...@@ -156,7 +156,7 @@ class GPSWidget(QtWidgets.QWidget): ...@@ -156,7 +156,7 @@ class GPSWidget(QtWidgets.QWidget):
self.repaint() self.repaint()
self.tracking_box.clear() self.tracking_box.clear()
def on_pick_event(self, event) -> None: def on_pick_event(self, event) -> Any:
""" """
On a GPS point being picked, display the data of that point on the On a GPS point being picked, display the data of that point on the
tracking box. tracking box.
...@@ -172,13 +172,13 @@ class GPSWidget(QtWidgets.QWidget): ...@@ -172,13 +172,13 @@ class GPSWidget(QtWidgets.QWidget):
# and longitude follows correspondingly. # and longitude follows correspondingly.
lat_dir = 'N' if picked_point.latitude > 0 else 'S' lat_dir = 'N' if picked_point.latitude > 0 else 'S'
long_dir = 'E' if picked_point.longitude > 0 else 'W' long_dir = 'E' if picked_point.longitude > 0 else 'W'
meta_separator = '&nbsp;' * 22 meta_separator = '&nbsp;' * 7
loc_separator = '&nbsp;' * 10 loc_separator = '&nbsp;' * 5
msg = ( msg = (
f'Mark: {picked_point.last_timemark}{meta_separator}' f' Mark: {picked_point.last_timemark}{meta_separator}'
f'Fix: {picked_point.fix_type}{meta_separator}' f'Fix: {picked_point.fix_type}{meta_separator}'
f'Sats: {picked_point.num_satellite_used}<br>' f'Sats: {picked_point.num_satellite_used}<br>'
f'Lat: {lat_dir}{abs(picked_point.latitude):.6f}{loc_separator}' f' Lat: {lat_dir}{abs(picked_point.latitude):.6f}{loc_separator}'
f'Long: {long_dir}{abs(picked_point.longitude):.6f}{loc_separator}' f'Long: {long_dir}{abs(picked_point.longitude):.6f}{loc_separator}'
f'Elev: {picked_point.height}{picked_point.height_unit}' f'Elev: {picked_point.height}{picked_point.height_unit}'
) )
...@@ -254,7 +254,7 @@ class GPSDialog(QtWidgets.QWidget): ...@@ -254,7 +254,7 @@ class GPSDialog(QtWidgets.QWidget):
bottom_layout = QtWidgets.QVBoxLayout() bottom_layout = QtWidgets.QVBoxLayout()
bottom_layout.addLayout(button_layout) bottom_layout.addLayout(button_layout)
self.info_text_browser.setFixedHeight(42) self.info_text_browser.setFixedHeight(45)
bottom_layout.addWidget(self.info_text_browser) bottom_layout.addWidget(self.info_text_browser)
main_layout.addLayout(bottom_layout) main_layout.addLayout(bottom_layout)
......
...@@ -68,7 +68,8 @@ class SOHWidget(MultiThreadedPlottingWidget): ...@@ -68,7 +68,8 @@ class SOHWidget(MultiThreadedPlottingWidget):
return return
chan_db_info = c_data['chan_db_info'] chan_db_info = c_data['chan_db_info']
plot_type = chan_db_info['plotType'] plot_type = chan_db_info['plotType']
ax = getattr(self.plotting, plot_functions[plot_type][1])( ax = getattr(
self.plotting, plot_functions[plot_type]['plot_function'])(
c_data, chan_db_info, chan_id) c_data, chan_db_info, chan_id)
c_data['ax'] = ax c_data['ax'] = ax
ax.chan = chan_id ax.chan = chan_id
......
...@@ -40,7 +40,6 @@ class TimePowerSquaredDialog(QtWidgets.QWidget): ...@@ -40,7 +40,6 @@ class TimePowerSquaredDialog(QtWidgets.QWidget):
""" """
self.date_format: str = 'YYYY-MM-DD' self.date_format: str = 'YYYY-MM-DD'
self.setGeometry(50, 50, 1200, 800)
self.setWindowTitle("TPS Plot") self.setWindowTitle("TPS Plot")
main_layout = QtWidgets.QVBoxLayout() main_layout = QtWidgets.QVBoxLayout()
......
...@@ -231,7 +231,7 @@ class TimePowerSquaredWidget(plotting_widget.PlottingWidget): ...@@ -231,7 +231,7 @@ class TimePowerSquaredWidget(plotting_widget.PlottingWidget):
ax = self.create_axes(self.plotting_bot, plot_h) ax = self.create_axes(self.plotting_bot, plot_h)
ax.spines[['right', 'left', 'top', 'bottom']].set_visible(False) ax.spines[['right', 'left', 'top', 'bottom']].set_visible(False)
ax.text( ax.text(
-0.12, 1, -0.15, 1,
f"{get_seismic_chan_label(chan_id)} {c_data['samplerate']}sps", f"{get_seismic_chan_label(chan_id)} {c_data['samplerate']}sps",
horizontalalignment='left', horizontalalignment='left',
verticalalignment='top', verticalalignment='top',
......
...@@ -49,7 +49,8 @@ class WaveformWidget(MultiThreadedPlottingWidget): ...@@ -49,7 +49,8 @@ class WaveformWidget(MultiThreadedPlottingWidget):
plot_type = chan_db_info['plotType'] plot_type = chan_db_info['plotType']
# refer to doc string for mass_pos_data to know the reason for 'ax_wf' # refer to doc string for mass_pos_data to know the reason for 'ax_wf'
ax = getattr(self.plotting, plot_functions[plot_type][1])( ax = getattr(
self.plotting, plot_functions[plot_type]['plot_function'])(
c_data, chan_db_info, chan_id) c_data, chan_db_info, chan_id)
c_data['ax_wf'] = ax c_data['ax_wf'] = ax
ax.chan = chan_id ax.chan = chan_id
...@@ -76,7 +77,6 @@ class WaveformDialog(QtWidgets.QWidget): ...@@ -76,7 +77,6 @@ class WaveformDialog(QtWidgets.QWidget):
date_format: format for date date_format: format for date
""" """
self.date_format: str = 'YYYY-MM-DD' self.date_format: str = 'YYYY-MM-DD'
self.setGeometry(50, 10, 1600, 700)
self.setWindowTitle("Raw Data Plot") self.setWindowTitle("Raw Data Plot")
main_layout = QtWidgets.QVBoxLayout() main_layout = QtWidgets.QVBoxLayout()
......
...@@ -7,6 +7,7 @@ from PySide2.QtWidgets import ( ...@@ -7,6 +7,7 @@ from PySide2.QtWidgets import (
QMainWindow, QWidget, QTextBrowser, QPushButton, QLineEdit, QDateEdit, QMainWindow, QWidget, QTextBrowser, QPushButton, QLineEdit, QDateEdit,
QListWidget, QCheckBox, QRadioButton, QMenu, QLabel, QFrame, QListWidget, QCheckBox, QRadioButton, QMenu, QLabel, QFrame,
QVBoxLayout, QHBoxLayout, QGridLayout, QAbstractItemView, QButtonGroup, QVBoxLayout, QHBoxLayout, QGridLayout, QAbstractItemView, QButtonGroup,
QSplitter,
) )
from PySide2.QtWidgets import ( from PySide2.QtWidgets import (
QAction, QActionGroup, QShortcut QAction, QActionGroup, QShortcut
...@@ -218,6 +219,11 @@ class UIMainWindow(object): ...@@ -218,6 +219,11 @@ class UIMainWindow(object):
# ========================== Option Menu ======================= # ========================== Option Menu =======================
""" """
warn_big_file_sizes: option to check file sizes and give warning if
total file sizes are greater than constant.BIG_FILE_SIZE
"""
self.warn_big_file_sizes: Optional[QAction] = None
"""
mp_regular_color_action: set self.mass_pos_volt_range_opt to 'regular' mp_regular_color_action: set self.mass_pos_volt_range_opt to 'regular'
mp_trillium_color_action: set self.mass_pos_volt_range_opt to mp_trillium_color_action: set self.mass_pos_volt_range_opt to
'trillium' 'trillium'
...@@ -279,7 +285,6 @@ class UIMainWindow(object): ...@@ -279,7 +285,6 @@ class UIMainWindow(object):
:param main_window: QMainWindow - main GUI for user to interact with :param main_window: QMainWindow - main GUI for user to interact with
""" """
self.main_window = main_window self.main_window = main_window
main_window.resize(1798, 1110)
main_window.setWindowTitle("SOH Station Viewer") main_window.setWindowTitle("SOH Station Viewer")
self.central_widget = QWidget(main_window) self.central_widget = QWidget(main_window)
main_window.setCentralWidget(self.central_widget) main_window.setCentralWidget(self.central_widget)
...@@ -294,8 +299,6 @@ class UIMainWindow(object): ...@@ -294,8 +299,6 @@ class UIMainWindow(object):
self.set_first_row(main_layout) self.set_first_row(main_layout)
self.set_second_row(main_layout) self.set_second_row(main_layout)
self.tracking_info_text_browser.setFixedHeight(80)
main_layout.addWidget(self.tracking_info_text_browser)
self.create_menu_bar(main_window) self.create_menu_bar(main_window)
self.connect_signals(main_window) self.connect_signals(main_window)
self.create_shortcuts(main_window) self.create_shortcuts(main_window)
...@@ -352,7 +355,7 @@ class UIMainWindow(object): ...@@ -352,7 +355,7 @@ class UIMainWindow(object):
left_widget = QWidget(self.central_widget) left_widget = QWidget(self.central_widget)
h_layout.addWidget(left_widget) h_layout.addWidget(left_widget)
left_widget.setFixedWidth(240) left_widget.setFixedWidth(240)
left_widget.setMinimumHeight(650) # left_widget.setMinimumHeight(650)
left_layout = QVBoxLayout() left_layout = QVBoxLayout()
left_layout.setContentsMargins(0, 0, 0, 0) left_layout.setContentsMargins(0, 0, 0, 0)
left_layout.setSpacing(0) left_layout.setSpacing(0)
...@@ -360,12 +363,22 @@ class UIMainWindow(object): ...@@ -360,12 +363,22 @@ class UIMainWindow(object):
self.set_control_column(left_layout) self.set_control_column(left_layout)
plot_splitter = QSplitter(QtCore.Qt.Orientation.Vertical)
h_layout.addWidget(plot_splitter, 2)
self.plotting_widget = SOHWidget(self.main_window, self.plotting_widget = SOHWidget(self.main_window,
self.tracking_info_text_browser, self.tracking_info_text_browser,
'SOH', 'SOH',
self.main_window) self.main_window)
plot_splitter.addWidget(self.plotting_widget)
h_layout.addWidget(self.plotting_widget, 2) self.tracking_info_text_browser.setMinimumHeight(60)
self.tracking_info_text_browser.setMaximumHeight(80)
plot_splitter.addWidget(self.tracking_info_text_browser)
tracking_browser_idx = plot_splitter.indexOf(
self.tracking_info_text_browser
)
plot_splitter.setCollapsible(tracking_browser_idx, False)
def set_control_column(self, left_layout): def set_control_column(self, left_layout):
""" """
...@@ -465,6 +478,10 @@ class UIMainWindow(object): ...@@ -465,6 +478,10 @@ class UIMainWindow(object):
gap_layout.addWidget(QLabel("Minimum Gap Length ")) gap_layout.addWidget(QLabel("Minimum Gap Length "))
self.gap_len_line_edit = QLineEdit(self.central_widget) self.gap_len_line_edit = QLineEdit(self.central_widget)
gap_validator = QtGui.QDoubleValidator()
gap_validator.setDecimals(2)
gap_validator.setNotation(QtGui.QDoubleValidator.StandardNotation)
self.gap_len_line_edit.setValidator(gap_validator)
gap_layout.addWidget(self.gap_len_line_edit) gap_layout.addWidget(self.gap_len_line_edit)
gap_layout.addWidget(QLabel(' m')) gap_layout.addWidget(QLabel(' m'))
...@@ -621,6 +638,13 @@ class UIMainWindow(object): ...@@ -621,6 +638,13 @@ class UIMainWindow(object):
:param main_window: QMainWindow - main GUI for user to interact with :param main_window: QMainWindow - main GUI for user to interact with
:param menu: QMenu - Options Menu :param menu: QMenu - Options Menu
""" """
self.warn_big_file_sizes = QAction(
'Warn big file sizes', main_window
)
self.warn_big_file_sizes.setCheckable(True)
menu.addAction(self.warn_big_file_sizes)
menu.addSeparator()
mp_coloring_menu = QMenu('MP Coloring:', main_window) mp_coloring_menu = QMenu('MP Coloring:', main_window)
menu.addMenu(mp_coloring_menu) menu.addMenu(mp_coloring_menu)
mp_coloring_group = QActionGroup(main_window) mp_coloring_group = QActionGroup(main_window)
......
from typing import List, Union, Optional, Tuple, Dict
import sys
import os
from PySide2.QtWidgets import QMessageBox, QApplication
from obspy.io.reftek.core import _is_reftek130
from sohstationviewer.conf.constants import BIG_FILE_SIZE
from sohstationviewer.controller.util import validate_file
from sohstationviewer.controller.processing import (
get_next_channel_from_mseed_file)
def _get_file_type(path2file: str) -> str:
"""
Get type of the given file:
+ TEXT if strip() can be used for the first 64 bytes
+ MSEED if channel name of the first record can be read
+ RT130 is decided using obspy method _is_reftek130()
+ BINARY if none of the above types are detected.
:param path2file: absolute path to file
:return: file's type
"""
with open(path2file, 'r') as file:
try:
file.read(64).strip()
return 'TEXT'
except UnicodeDecodeError:
pass
with open(path2file, 'rb') as file:
try:
# read header of the first record to decide MSEED file
get_next_channel_from_mseed_file(file)
return 'MSEED'
except ValueError:
if _is_reftek130(path2file):
return 'RT130'
else:
return 'BINARY'
def _get_multiplex_and_chan_id(path2file: str,
is_multiplex: bool,
non_multiplexed_reach_total_limit: bool
) -> Tuple[bool, Optional[str]]:
"""
Recursively reading a part of header of each data record in the given file
to get the channel name.
+ If more than one channel names are detected in a file, the data set is
definitely multiplexed.
+ If only one channel name is detected in a file, keep checking other
files until the number of files checked goes up to 10 then decide the
data set is non multiplexed which means using the first record to
decide the channel of the whole file.
:param path2file: absolute path to file
:param is_multiplex: multiplex status of the data set so far
:param non_multiplexed_reach_total_limit: the total of multiplexed
files reach limit
:return is_multiplex: multiplex status of the data set after the file is
read
:return chan: the last channel name read from this file.
+ If the file is multiplexed, which channel name return isn't matter.
+ If the file is non-multiplexed, all records have the same channel name.
+ chan can be None if the file isn't mseed, e.g. TEXT.
"""
chan = None
file = open(path2file, 'rb')
chans_in_stream = set()
while 1:
# read all records in a file until it is detected as multiplexed
is_eof = (file.read(1) == b'')
if is_eof:
break
file.seek(-1, 1)
try:
chan = get_next_channel_from_mseed_file(file)
if non_multiplexed_reach_total_limit:
# If total of non_multiplexed files reach limit, don't need
# to check all records anymore but treat the file as
# non-multiplexed which is all records have the same channel id
# => Use the first record to decide the channel of the file.
break
except ValueError:
file.close()
break
chans_in_stream.add(chan)
if len(chans_in_stream) > 1:
# a file is multiplexed if it has more than one channel id
is_multiplex = True
break
file.close()
return is_multiplex, chan
def _get_size_of_non_multiplex_waveform_file(
dir_path: str, req_wf_chans: List[Union[str, int]]):
"""
Estimate size of directory by collecting sizes of non-multiplexed waveform
files. This way, we can skip reading sizes of small files which makes the
speed of reading sizes improved especially for the case that the number of
small files is big.
:param dir_path: absolute path to directory
:param req_wf_chans: waveform request which can be list of data streams or
list of mseed wildcards
:return total_size:
+ 0 if this method won't be used to estimate the size of the directory
+ Estimated total size of the directory up to where it is greater than
BIG_FILE_SIZE
"""
# list of prefix of high sampling rate channels of which files' sizes are
# significantly larger than the ones of lower sampling rate channels.
wf_high_spr_prefix = ['FH', 'FN', # ≥ 1000 to < 5000
'GH', 'GL', # ≥ 1000 to < 5000
'DH', 'DL', # ≥ 250 to < 1000
'CH', 'CN', # ≥ 250 to < 1000
'EH', 'EL', 'EP', # ≥ 80
'SH', 'SL', 'SP', # ≥ 10 to < 80
'HH', 'HN', # ≥ 80
'BH', 'BN', # ≥ 10 to < 80
'MH', 'MN', 'MP', 'ML'] # >1 to < 10
"""
'LH','LL', 'LP', 'LN' =1
'VP', 'VL', 'VL', 'VH' = 0.1
'UN', 'UP', 'UL', 'UH' <=0.01
Skip channels with sampling rate <=1 because there are less data in the
files, which can result many files with small sizes in compare with sizes
of high sample rate files. (For case that the data set has only low
sampling rates, collecting data sizes won't be conducted in this method.)
"""
wf_chan_possibilities = set()
for request in req_wf_chans:
if request == '*':
wf_chan_possibilities.update(wf_high_spr_prefix)
elif request[0] == '*':
wf_chan_possibilities.update(
[prefix for prefix in wf_high_spr_prefix
if prefix[1] == request[1]])
elif request[1] == '*':
wf_chan_possibilities.update(
[prefix for prefix in wf_high_spr_prefix
if prefix[0] == request[0]])
if len(wf_chan_possibilities) == 0:
# no waveform channels available to pick, this method is not available
# for this data set.
return 0
total_size = 0
is_multiplex = False
count = 0
total_non_multiplexed_limit = 10
for path, subdirs, files in os.walk(dir_path):
total_of_mseed_files = 0
for file_name in files:
if not validate_file(os.path.join(path, file_name), file_name):
continue
fp = os.path.join(path, file_name)
is_multiplex, chan = _get_multiplex_and_chan_id(
fp, is_multiplex, count >= total_non_multiplexed_limit)
if chan is None:
continue
count += 1
total_of_mseed_files += 1
if is_multiplex:
# Don't use this method for multiplexed data set to elevate
# the speed
return 0
# not multiplex
if (chan is not None and
chan.startswith(tuple(wf_chan_possibilities))):
# ------ high sample rate mseed ------
# to help skip get size of too many small files,
# only read the big files which are in the list wf_chan_pos
file_size = os.path.getsize(fp)
total_size += file_size
if total_size > BIG_FILE_SIZE:
return total_size
else:
# ------ low sample rate mseed ------
if total_of_mseed_files == 50:
# When there are more than 50 low sampling rate mseed files
# in a folder, break the for loop to move to a different
# folder.
break
return total_size
def _get_size_rt130(dir_path: str, req_ds: List[int]):
"""
Get size of RT130's requested datas treams which is inside folder that has
data stream number as name.
:param dir_path: absolute path to directory
:param req_ds: list of requested data streams
:return total_size: total size of requested data streams up to where it
greater than BIG_FILE_SIZE
"""
if req_ds == ['*']:
req_ds = ['1', '2', '3', '4', '5', '6', '7', '8']
else:
req_ds = [str(req) for req in req_ds]
total_size = 0
for path, subdirs, files in os.walk(dir_path):
path_parts = path.split(os.sep)
ds = path_parts[-1]
if ds in req_ds:
# the direct folder of rt130 file must be named after data stream
for file_name in files:
fp = os.path.join(path, file_name)
file_size = os.path.getsize(fp)
total_size += file_size
if total_size > BIG_FILE_SIZE:
break
return total_size
def _get_size_mseed(dir_path: str) -> int:
"""
Get size of all files until total size > BIG_FILE_SIZE
:param dir_path: absolute path to directory
:return total_size: total size of the directory up to where it greater
than BIG_FILE_SIZE
"""
total_size = 0
count = 0
for path, subdirs, files in os.walk(dir_path):
for file_name in files:
if not validate_file(os.path.join(path, file_name), file_name):
continue
fp = os.path.join(path, file_name)
file_size = os.path.getsize(fp)
total_size += file_size
count += 1
if total_size > BIG_FILE_SIZE:
break
return total_size
def _get_dir_size(dir_path: str, req_wf_chans: List[Union[str, int]]):
"""
Get size of directory.
To make the process go fast, separate it into different case:
+ Non-multiplex MSeed with high sampling rate waveform
+ The rest cases of MSeed
+ RT130
+ If only text files or binary files found, count the most 200 files
and ask user to decide stopping or continuing process at their
own risk
:param dir_path: absolute path to directory
:param req_wf_chans: waveform request which can be list of data streams or
list of mseed wildcards
:return total_size:
+ 0 if don't have waveform request
+ total size of the directory up to where it greater than BIG_FILE_SIZE
+ -1 if count more than 200 TEXT files
+ -2 if count more than 200 BINARY files of which types are unkonwn
"""
text_file_count = 0
binary_file_count = 0
for path, subdirs, files in os.walk(dir_path):
for file_name in files:
path2file = os.path.join(path, file_name)
if not validate_file(path2file, file_name):
continue
file_type = _get_file_type(path2file)
if file_type == 'TEXT':
text_file_count += 1
if text_file_count > 200:
return {'data_size': -1, 'text_count': text_file_count}
continue
elif file_type == 'RT130':
return {'data_size': _get_size_rt130(dir_path, req_wf_chans)}
elif file_type == 'MSEED':
total_size = _get_size_of_non_multiplex_waveform_file(
dir_path, req_wf_chans)
if total_size != 0:
return {'data_size': total_size}
else:
return {'data_size': _get_size_mseed(dir_path)}
else:
binary_file_count += 1
if binary_file_count > 200:
return {'data_size': -1, 'binary_count': binary_file_count}
return {'data_size': -1,
'binary_count': binary_file_count, 'text_count': text_file_count}
def _abort_dialog(msg: str) -> bool:
"""
Provide confirming dialog for user to continue or not
:param msg: message of what need to be confirmed
:return: True for the confirmation. False for the cancel.
"""
dlg = QMessageBox()
dlg.setText(msg)
dlg.setInformativeText('Do you want to proceed?')
dlg.setStandardButtons(QMessageBox.Yes |
QMessageBox.Abort)
dlg.setDefaultButton(QMessageBox.Abort)
dlg.setIcon(QMessageBox.Question)
ret = dlg.exec_()
if ret == QMessageBox.Abort:
return False
else:
return True
def _check_folders_size(dir_paths: List[str],
req_wf_chans: List[Union[str, int]]
) -> Dict[str, int]:
"""
Check the folders in the list dir_paths for size of data files and count of
text file or binary.
:param dir_paths: list of paths to check for sizes
:param req_wf_chans: requirement of waveform channels
:return: dictionary of size or count info in the dir_paths
"""
final_result = {'data_size': 0, 'text_count': 0, 'binary_count': 0}
for dir_path in dir_paths:
if not os.path.isdir(dir_path):
raise Exception(f"'{dir_path}' isn't a valid directory")
result = _get_dir_size(dir_path, req_wf_chans)
if result['data_size'] >= 0:
final_result['data_size'] += result['data_size']
if final_result['data_size'] > BIG_FILE_SIZE:
break
else:
# only consider text and binary if no data
if 'text_count' in result:
final_result['text_count'] += result['text_count']
if final_result['text_count'] > 200:
break
if 'binary_count' in result:
final_result['binary_count'] += result['binary_count']
if final_result['binary_count'] > 200:
break
return final_result
def check_folders_size(dir_paths: List[str],
req_wf_chans: List[Union[str, int]]
) -> bool:
"""
Check the folders in the list dir_paths:
+ If found data in folders, return True if size <= BIG_FILE_SIZE.
Otherwise, ask user to continue or not.
+ If there are no data files at all, report the files found and ask user to
continue or not
:param dir_paths: list of paths to check for sizes
:param req_wf_chans: requirement of waveform channels
:return: True if the check is passed and False otherwise
"""
try:
final_result = _check_folders_size(dir_paths, req_wf_chans)
except Exception as e:
QMessageBox.information(None, "Error", str(e))
return False
if final_result['data_size'] > BIG_FILE_SIZE:
msg = ('The selected data set is greater than 2GB. It '
'might take a while to finish reading '
'and plotting everything.')
return _abort_dialog(msg)
elif final_result['data_size'] > 0:
return True
elif final_result['text_count'] > 200:
msg = ("There are more than 200 text files detected."
"Do you want to continue at your own risk?")
return _abort_dialog(msg)
elif final_result['binary_count'] > 200:
msg = ("There are more than 200 binary files detected."
"Do you want to continue at your own risk?")
return _abort_dialog(msg)
else:
file_info = []
if final_result['text_count'] > 0:
file_info.append(f"{final_result['text_count']} text files")
if final_result['binary_count'] > 0:
file_info.append(f"{final_result['binary_count']} binary files")
file_info_str = ' and '.join(file_info)
msg = (f"There are {file_info_str} detected with no data files."
"Do you want to continue at your own risk?")
return _abort_dialog(msg)
if __name__ == '__main__':
import platform
# Enable Layer-backing for MacOs version >= 11
# Only needed if using the pyside2 library with version>=5.15.
# Layer-backing is always enabled in pyside6.
os_name, version, *_ = platform.platform().split('-')
# if os_name == 'macOS' and version >= '11':
# mac OSX 11.6 appear to be 10.16 when read with python and still required
# this environment variable
if os_name == 'macOS':
os.environ['QT_MAC_WANTS_LAYER'] = '1'
app = QApplication(sys.argv)
print("BIG FILE SIZE:", BIG_FILE_SIZE)
data_folder = "/Volumes/UNTITLED/SOHView_data/"
"""
The following examples are based on BIG_FILE_SIZE = 2GB
"""
# ============== Centaur ========================
# multiplexed mseed: 1530200576B; system:1.53GB
# print(check_folders_size(
# [f'{data_folder}Centaur/Centaur_DataTest.nan'], ['*']))
# multiplexed mseed: 34171904; system:34.2MB
# print(check_folders_size(
# [f'{data_folder}Centaur/Centaur-2018-3734.nan'], ['*']))
# multiplexed mseed: 25198592; system:25.2MB
# print(check_folders_size(
# [f'{data_folder}Centaur/Centaur3604_100sps.nan'], ['*']))
# multiplexed mseed: 468665991; system:468.7 MB
# print(check_folders_size(
# [f'{data_folder}Centaur/CentaurDiskSOH600.nan'], ['*']))
# multiplexed mseed: 20992; system:21 KB
# print(check_folders_size([f'{data_folder}Centaur/soh'], ['*']))
# multiplexed mseed: 700416; system:700 KB
# print(check_folders_size([f'{data_folder}Centaur/SOH_split600'], ['*']))
# ============ pegasus ==============
# non-multiplexed mseed: 1703583744; system:1.72 GB
# total files: 1534
# total files counted for size: 153
# print(check_folders_size(
# [f'{data_folder}Pegasus/dave_pegasus.nan'], ['*']))
# non-multiplexed mseed: 288489472; system:292.3 MB
# total files: 251
# total files counted for size: 24
# print(check_folders_size(
# [f'{data_folder}Pegasus/GNARBOX_svc1'], ['*']))
# non-multiplexed mseed: 151818240; system: 152.6 MB
# total files: 112
# total files counted for size: 12
# print(check_folders_size(
# [f'{data_folder}Pegasus/LaptopSvc4_part'], ['*']))
# non-multiplexed mseed: 151818240; system: 378.7 MB
# total files: 919
# total files counted for size: 84
# print(check_folders_size(
# [f'{data_folder}Pegasus/Pegasus Offloads'], ['*']))
# non-multiplexed mseed: over limit, stop at 2002317312; system: 2.78 GB
# total files: 1882
# total files counted for size: 151
# print(check_folders_size(
# [f'{data_folder}Pegasus/Pegasus_KCT06_Test.nan'], ['*']))
# non-multiplexed mseed: 547151872; system: 571.4 MB
# total files: 578
# total files counted for size: 84
# print(check_folders_size(
# [f'{data_folder}Pegasus/Pegasus_SVC4.nan'], ['*']))
# non-multiplexed mseed: 108064768; system: 108.1 MB
# total files: 10
# total files counted for size: 9
# print(check_folders_size(
# [f'{data_folder}Pegasus/PegasusData_LittleEndian'], ['*']))
# ============ q330 ==============
# non-multiplexed mseed: over limit, stop at 2013265920; system: 11.25 GB
# total files: 685
# total files counted for size: 120
# print(check_folders_size(
# [f'{data_folder}Q330/5083.sdr'], ['*']))
# non-multiplexed mseed: 20725760; system: 21.1 MB
# total files: 21
# total files counted for size: 3
# print(check_folders_size(
# [f'{data_folder}Q330/5244.sdr'], ['*']))
# multiplexed mseed: 341540864; system: 341.5 MB
# print(check_folders_size(
# [f'{data_folder}Q330/B44-4000939.sdr/data'], ['*']))
# multiplexed mseed: 17319742; system: 17.3 MB
# print(check_folders_size(
# [f'{data_folder}Q330/GLISN-REF-SENSLOC-2018.06.26'], ['*']))
# non-multiplexed mseed: over limit, stop at 2013265920; system: 7.55 GB
# total files: 465
# total files counted for size: 120
# print(check_folders_size(
# [f'{data_folder}Q330/Q330_5281.sdr'], ['*']))
# ============ rt130 ==============
# rt130: over limit, stop at 2071080960; system: 3.6 GB
# print(check_folders_size(
# [f'{data_folder}RT130/9BB3_D1.cf'], ['*']))
# rt130: over limit, stop at 2008623104; system: 2.16 GB
# print(check_folders_size(
# [f'{data_folder}RT130/9BB3_D2.cf'], ['*']))
# rt130: 95880192; system: 95.9 MB
# print(check_folders_size(
# [f'{data_folder}RT130/9BB3_D3.cf'], ['*']))
# rt130: 1227625472; system: 1.24 GB
# print(check_folders_size(
# [f'{data_folder}RT130/2011028.9AFA'], ['*']))
# rt130: 294737920; system: 296.8 MB
# print(check_folders_size(
# [f'{data_folder}RT130/2013326.9E4A'], ['*']))
# rt130: 1375256576; system: 1.38 GB
# print(check_folders_size(
# [f'{data_folder}RT130/2016174.9AC4'], ['*']))
# rt130: 46885888; system: 46.9 MB
# print(check_folders_size(
# [f'{data_folder}RT130/2017149.92EB'], ['*']))
# rt130: over limit, stop at 2087160832; system: 4.01 GB
# print(check_folders_size(
# [f'{data_folder}RT130/RT130-92E9-1.cf'], ['*']))
# rt130: 11527168; system: 11.5 MB
# print(check_folders_size(
# [f'{data_folder}RT130/RT130-2016290.9DF5.cf'], ['*']))
# rt130: 126618624; system: 127.4 MB
# print(check_folders_size(
# [f'{data_folder}RT130/RT130-A195-1.cf'], ['*']))
# rt130: 32062464; system: 32.2 MB
# print(check_folders_size(
# [f'{data_folder}RT130/testCF'], ['*']))
# rt130: size: 306176; system: 319 KB
# print(check_folders_size(
# [f'{data_folder}RT130/TESTRT130'], ['*']))
# ===============================
# text:
# print(check_folders_size(
# [f'{data_folder}Pegasus/Pegasus Offloads/logs'], ['*']))
# =================================
data_folder = "/Volumes/UNTITLED/issues_from_data_group/"
# mseed: size: 496574464; system: 496.6 MB
# print(check_folders_size(
# [f'{data_folder}6407.sdr'], ['*']))
# # non-multiplex mseed: size: 40435712; system: 41.2 MB
# print(check_folders_size(
# [f'{data_folder}77702'], ['*']))
# mseed: size: 206174720; system: 206.2 MB
# print(check_folders_size(
# [f'{data_folder}CONZ-5296-SOH.nan'], ['*']))
# non-multiplexed mseed: over limit, stop at 2013265920; system: 19.54 GB
# print(check_folders_size(
# [f'{data_folder}ELHT-6445.sdr'], ['*']))
# non-multiplexed mseed: 1814528; system: 37.6 MB
# Only one high sampling rate waveform file, many small soh files
# and text file => The size got is the waveform file. The size isn't
# correct but doesn't affect much of the result.
# THIS CASE IS SPECIAL.
# the first time for some reason it couldn't stop process.
# print(check_folders_size(
# [f'{data_folder}MN38'], ['*']))
# non-multiplexed mseed: 120705024; system: 120.7 MB
# No waveform files.
# print(check_folders_size(
# [f'{data_folder}NHRK.sdr'], ['*']))
# mseed: 708777984; system: 712.1 MB
# print(check_folders_size(
# [f'{data_folder}RT-9926-1.cf'], ['*']))
print("FINISH")
sys.exit(app.exec_())
import re
plot_functions = { plot_functions = {
'linesDots': ( 'linesDots': {
"Lines, one color dots. Dot or line/color mapping defined by " "description": (
"ValueColors with available colors: RYGMCW.\n" "Lines, one color dots. Dot or line/color mapping defined by "
"Ex: L:G|D:W\n" "ValueColors with available colors: RYGMCW.\n"
" means \tLines are plotted with color G\n" "Ex: L:G|D:W\n"
"\tDots are plotted with color W\n" " means \tLines are plotted with color G\n"
"If D is not defined, dots won't be displayed.\n" "\tDots are plotted with color W\n"
"If L is not defined, lines will be plotted with color G", "If D is not defined, dots won't be displayed.\n"
"plot_lines_dots"), "If L is not defined, lines will be plotted with color G"),
'linesSRate': ( "plot_function": "plot_lines_dots",
"Lines, one color dots, bitweight info. ", "value_pattern": re.compile('^(L|D|Line|Dot)$')
"plot_lines_s_rate"), },
'linesMasspos': ( 'linesSRate': {
"multi-line mass position, multi-color dots. ", "description": "Lines, one color dots, bitweight info. ",
"plot_lines_mass_pos"), "plot_function": "plot_lines_s_rate"
'triColorLines': ( },
"Three lines with three different colors for values -1, 0, 1.", 'linesMasspos': {
"plot_tri_colors"), "description": "multi-line mass position, multi-color dots. ",
'dotForTime': ( "plot_function": "plot_lines_mass_pos"
"Dots according to timestamp.\n" },
"Color defined by ValueColors with available colors: RYGMCW.\n" 'triColorLines': {
"Ex: G", "description": "Three lines with three different colors for "
"plot_time_dots"), "values -1, 0, 1.",
'multiColorDotsEqualOnUpperBound': ( "plot_function": "plot_tri_colors",
("Multicolor dots in center with value/colors mapping defined by " "value_pattern": re.compile('^-?[10]?$')
"ValueColors with available colors: RYGMCW.\n" },
"Value from low to high.\n" 'dotForTime': {
"Ex: *:W in which all values represent by white dots.\n" "description": (
"Ex: -1:_|0:R|2.3:Y|+2.3:G\n" "Dots according to timestamp.\n"
" in which \tvalue <= -1 => not plot\n" "Color defined by ValueColors with available colors: RYGMCW.\n"
"\tvalue <= 0 => plot with R color\n" "Ex: G"),
"\tvalue <= 2.3 => plot with Y color\n" "plot_function": "plot_time_dots"
"\tvalue > 2.3 => plot with G color\n"), },
"plot_multi_color_dots_equal_on_upper_bound" 'multiColorDotsEqualOnUpperBound': {
), "description": (
'multiColorDotsEqualOnLowerBound': ( "Multicolor dots in center with value/colors mapping defined "
("Multicolor dots in center with value/colors mapping defined by " "by ValueColors with available colors: RYGMCW.\n"
"ValueColors with available colors: RYGMCW.\n" "Value from low to high.\n"
"Value from low to high.\n" "Ex: *:W in which all values represent by white dots.\n"
"Ex: 3.:R|3.3:Y|=3.3:G\n" "Ex: -1:_|0:R|2.3:Y|+2.3:G\n"
" in which" " in which \tvalue <= -1 => not plot\n"
"\tvalue < 3. => plot with R color\n" "\tvalue <= 0 => plot with R color\n"
"\t3 <= value < 3.3 => plot with Y color\n" "\t0 < value <= 2.3 => plot with Y color\n"
"\tvalue = 3.3 => plot with G color\n"), "\t2.3 < value => plot with G color\n"),
"plot_multi_color_dots_equal_on_lower_bound" "plot_function": "plot_multi_color_dots_equal_on_upper_bound",
), "value_pattern": re.compile('^(\+|<=)?[0-9]+\.?[0-9]?<?$') # noqa: W605,E501
'upDownDots': ( },
("Show data with 2 different values: first down/ second up. " 'multiColorDotsEqualOnLowerBound': {
"Colors defined by ValueColors.\nEx: 1:R|0:Y"), "description": (
'plot_up_down_dots' "Multicolor dots in center with value/colors mapping defined "
) "by ValueColors with available colors: RYGMCW.\n"
"Value from low to high.\n"
"Ex: 3.:R|3.3:Y|=3.3:G\n"
" in which"
"\tvalue < 3. => plot with R color\n"
"\t3. <= value < 3.3 => plot with Y color\n"
"\tvalue = 3.3 => plot with G color\n"),
"plot_function": "plot_multi_color_dots_equal_on_lower_bound",
"value_pattern": re.compile('^[=<]?[0-9]\.?[0-9]?$') # noqa: W605
},
'upDownDots': {
"description": (
"Show data with 2 different values: first down/ second up. "
"Colors defined by ValueColors.\nEx: 1:R|0:Y"),
"plot_function": 'plot_up_down_dots',
"value_pattern": re.compile("^(0|1|Up|Down)$")
}
} }
from sohstationviewer.view.db_config.value_color_helper.functions import (
convert_value_color_str, prepare_value_color_html
)
from tests.base_test_case import BaseTestCase
class TestConvertValueColorStr(BaseTestCase):
def test_lines_dots(self):
with self.subTest("Old format of both line and dot value"):
expected_value_colors = "Line:#00FF00|Dot:#00FF00"
result = convert_value_color_str('linesDots', 'L:G|D:G')
self.assertEqual(result, expected_value_colors)
with self.subTest("Old format of line value"):
expected_value_colors = "Line:#00FF00"
result = convert_value_color_str('linesDots', 'L:G')
self.assertEqual(result, expected_value_colors)
with self.subTest("Old format of default value which is empty string"):
expected_value_colors = "Line:#00FF00"
result = convert_value_color_str('linesDots', '')
self.assertEqual(result, expected_value_colors)
with self.subTest("New format of both line and dot value"):
expected_value_colors = "Line:#00FF00|Dot:#00FF00"
result = convert_value_color_str('linesDots',
"Line:#00FF00|Dot:#00FF00")
self.assertEqual(result, expected_value_colors)
with self.subTest("New format of line value"):
expected_value_colors = "Line:#00FF00"
result = convert_value_color_str('linesDots', "Line:#00FF00")
self.assertEqual(result, expected_value_colors)
def test_up_down_dots(self):
with self.subTest("Old format"):
expected_value_colors = "Down:#FF0000|Up:#00FF00"
result = convert_value_color_str('upDownDots', '0:R|1:G')
self.assertEqual(result, expected_value_colors)
with self.subTest("New format"):
expected_value_colors = "Down:#FF0000|Up:#00FF00"
result = convert_value_color_str('upDownDots',
"Down:#FF0000|Up:#00FF00")
self.assertEqual(result, expected_value_colors)
def test_multi_color_dots_equal_on_upper_bound(self):
with self.subTest("Old format"):
expected_value_colors = ('<=0:not plot|<=1:#FFFF00|<=2:#00FF00'
'|2<:#FF00FF')
result = convert_value_color_str(
'multiColorDotsEqualOnUpperBound',
'0:_|1:Y|2:G|+2:M')
self.assertEqual(result, expected_value_colors)
with self.subTest("New format"):
expected_value_colors = ('<=0:not plot|<=1:#FFFF00|<=2:#00FF00'
'|2<:#FF00FF')
result = convert_value_color_str(
'multiColorDotsEqualOnUpperBound',
'<=0:not plot|<=1:#FFFF00|<=2:#00FF00|2<:#FF00FF')
self.assertEqual(result, expected_value_colors)
def test_multi_color_dots_equal_on_lower_bound(self):
with self.subTest("Old format"):
expected_value_colors = '<3:#FF0000|<3.3:#FFFF00|=3.3:#00FF00'
result = convert_value_color_str(
'multiColorDotsEqualOnLowerBound',
'3:R|3.3:Y|=3.3:G')
self.assertEqual(result, expected_value_colors)
with self.subTest("New format"):
expected_value_colors = '<3:#FF0000|<3.3:#FFFF00|=3.3:#00FF00'
result = convert_value_color_str(
'multiColorDotsEqualOnLowerBound',
'<3:#FF0000|<3.3:#FFFF00|=3.3:#00FF00')
self.assertEqual(result, expected_value_colors)
def test_tri_color_lines(self):
with self.subTest("Old format"):
expected_value_colors = '-1:#FF00FF|0:#FF0000|1:#00FF00'
result = convert_value_color_str(
'triColorLines',
'-1:M|0:R|1:G')
self.assertEqual(result, expected_value_colors)
with self.subTest("New format"):
expected_value_colors = '-1:#FF00FF|0:#FF0000|1:#00FF00'
result = convert_value_color_str(
'triColorLines',
'-1:#FF00FF|0:#FF0000|1:#00FF00')
self.assertEqual(result, expected_value_colors)
def test_incorrect_format(self):
with self.subTest("triColorLines"):
expected_value_colors = ('unrecognized:=1:#FF00FF'
'|unrecognized:*0:#FF0000'
'|unrecognized:1.1:#00FF00')
result = convert_value_color_str(
'triColorLines',
'=1:M|*0:R|1.1:G')
self.assertEqual(result, expected_value_colors)
with self.subTest("upDownDots"):
expected_value_colors = ('unrecognized:2:#FF00FF'
'|unrecognized:Line:#FF0000'
'|unrecognized:1.1:#00FF00'
'|unrecognized:L:#FFFF00')
result = convert_value_color_str(
'upDownDots',
'2:M|Line:R|1.1:G|L:Y')
self.assertEqual(result, expected_value_colors)
with self.subTest("linesDots"):
expected_value_colors = ('unrecognized:1:#FF00FF'
'|unrecognized:Up:#FF0000'
'|unrecognized:1.1:#00FF00'
'|Line:#FFFF00')
result = convert_value_color_str(
'linesDots',
'1:M|Up:R|1.1:G|L:Y')
self.assertEqual(result, expected_value_colors)
with self.subTest("multiColorDotsEqualOnUpperBound"):
expected_value_colors = ('unrecognized:*3:#FF0000'
'|unrecognized:<3.3:#FFFF00'
'|unrecognized:=3.3:#00FF00')
result = convert_value_color_str(
'multiColorDotsEqualOnUpperBound',
'*3:#FF0000|<3.3:#FFFF00|=3.3:#00FF00')
self.assertEqual(result, expected_value_colors)
with self.subTest("multiColorDotsEqualOnLowerBound"):
expected_value_colors = ('unrecognized:+0:#FF0000'
'|unrecognized:-1:#FFFF00'
'|unrecognized:<=2:#00FF00'
'|unrecognized:2<:#FF00FF')
result = convert_value_color_str(
'multiColorDotsEqualOnLowerBound',
'+0:R|-1:Y|<=2:G|2<:M')
self.assertEqual(result, expected_value_colors)
def test_old_value_color_str_is_none(self):
result = convert_value_color_str('linesSRate', None)
self.assertEqual(result, '')
class TestPrepareValueColorHTML(BaseTestCase):
def test_lines_dots(self):
with self.subTest("Line and dot values"):
expected_value_colors = (
"<p>Line:"
"<span style='color: #00FF00; font-size:25px;'>&#8718;</span>"
"|Dot:"
"<span style='color: #00FF00; font-size:25px;'>&#8718;</span>"
"</p>")
result = prepare_value_color_html("Line:#00FF00|Dot:#00FF00")
self.assertEqual(result, expected_value_colors)
with self.subTest("Line value"):
expected_value_colors = (
"<p>Line:<span style='color: #00FF00; font-size:25px;'>&#8718;"
"</span></p>")
result = prepare_value_color_html("Line:#00FF00")
self.assertEqual(result, expected_value_colors)
def test_up_down_dots(self):
expected_value_colors = (
"<p>Down:"
"<span style='color: #FF0000; font-size:25px;'>&#8718;</span>"
"|Up:"
"<span style='color: #00FF00; font-size:25px;'>&#8718;</span>"
"</p>")
result = prepare_value_color_html("Down:#FF0000|Up:#00FF00")
self.assertEqual(result, expected_value_colors)
def test_multi_color_dots_equal_on_upper_bound(self):
expected_value_colors = (
"<p>&le;0:not plot"
"|&le;1:"
"<span style='color: #FFFF00; font-size:25px;'>&#8718;</span>"
"|&le;2:"
"<span style='color: #00FF00; font-size:25px;'>&#8718;</span>"
"|2&lt;:"
"<span style='color: #FF00FF; font-size:25px;'>&#8718;</span>"
"</p>")
result = prepare_value_color_html(
'<=0:not plot|<=1:#FFFF00|<=2:#00FF00|2<:#FF00FF')
self.assertEqual(result, expected_value_colors)
def test_multi_color_dots_equal_on_lower_bound(self):
expected_value_colors = (
"<p>&lt;3:not plot"
"|&lt;3.3:"
"<span style='color: #FFFF00; font-size:25px;'>&#8718;</span>"
"|=3.3:"
"<span style='color: #00FF00; font-size:25px;'>&#8718;</span>"
"</p>")
result = prepare_value_color_html(
'<3:not plot|<3.3:#FFFF00|=3.3:#00FF00')
self.assertEqual(result, expected_value_colors)
def test_tri_color_lines(self):
expected_value_colors = (
"<p>-1:"
"<span style='color: #FF00FF; font-size:25px;'>&#8718;</span>"
"|0:<span style='color: #FF0000; font-size:25px;'>&#8718;</span>"
"|1:<span style='color: #00FF00; font-size:25px;'>&#8718;</span>"
"</p>"
)
result = prepare_value_color_html('-1:#FF00FF|0:#FF0000|1:#00FF00')
self.assertEqual(result, expected_value_colors)
from tempfile import TemporaryDirectory, NamedTemporaryFile
import shutil
import os
from pathlib import Path
from unittest import TestCase
from sohstationviewer.view.util.check_file_size import _check_folders_size
from sohstationviewer.conf.constants import BIG_FILE_SIZE
TEST_DATA_DIR = Path(__file__).resolve().parent.parent.parent.joinpath(
'test_data')
NON_DATA_FILE = TEST_DATA_DIR.joinpath('Non-data-file/non_data_file')
MULTIPLEX_FILE = TEST_DATA_DIR.joinpath(
'Q330_multiplex/XX-3203_4-20221222183011')
NON_MULTIPLEX_LOW_SPR_FILE = TEST_DATA_DIR.joinpath(
'Q330-sample/day_vols_AX08/AX08.XA..VM1.2021.186')
NON_MULTIPLEX_HIGH_SPR_FILE = TEST_DATA_DIR.joinpath(
'Q330-sample/day_vols_AX08/AX08.XA..HHE.2021.186')
NON_MULTIPLEX_HIGH_N_LOW_SPR_SET = TEST_DATA_DIR.joinpath('Q330-sample')
RT130_FILE = TEST_DATA_DIR.joinpath(
'RT130-sample/2017149.92EB/2017150/92EB/1/010000015_0036EE80')
class TestGetDirSize(TestCase):
def test_less_or_equal_200_text_files(self):
number_of_text_files = 25
with TemporaryDirectory() as directory:
files = []
for i in range(number_of_text_files):
files.append(NamedTemporaryFile(dir=directory))
expected_result = {'data_size': 0,
'text_count': 25,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
# Explicitly clean up the temporary files. If we don't do this,
# the temporary directory will clean up itself and delete the
# temporary files. Then, when the function returns, the references
# to these temporary files will attempt to clean up the files. This
# leads to exceptions being raised because the files being cleaned
# up does not exist anymore.
[file.close() for file in files]
def test_more_than_200_text_files(self):
number_of_text_files = 250
with TemporaryDirectory() as directory:
files = []
for i in range(number_of_text_files):
files.append(NamedTemporaryFile(dir=directory))
expected_result = {'data_size': 0,
'text_count': 201, # stop when more than 200
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[file.close() for file in files]
def test_less_or_equal_200_binary_files(self):
number_of_binary_files = 25
with TemporaryDirectory() as directory:
files = []
for i in range(number_of_binary_files):
new_file_path = Path(directory).joinpath(
f'{NON_DATA_FILE.name}_{i}')
shutil.copy(NON_DATA_FILE, new_file_path)
files.append(new_file_path)
expected_result = {'data_size': 0,
'text_count': 0,
'binary_count': 25}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_more_than_200_binary_files(self):
number_of_binary_files = 250
with TemporaryDirectory() as directory:
files = []
for i in range(number_of_binary_files):
new_file_path = Path(directory).joinpath(
f'{NON_DATA_FILE.name}_{i}')
shutil.copy(NON_DATA_FILE, new_file_path)
files.append(new_file_path)
expected_result = {'data_size': 0,
'text_count': 0,
'binary_count': 201} # stop when more than 200
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_less_or_equal_limit_mseed_multiplexed_files(self):
sample_file_size = os.path.getsize(MULTIPLEX_FILE)
expected_size = 0
with TemporaryDirectory() as directory:
files = []
for i in range(3):
new_file_path = Path(directory).joinpath(
f'{MULTIPLEX_FILE.name}_{i}')
shutil.copy(MULTIPLEX_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_more_than_limit_mseed_multiplexed_files(self):
sample_file_size = os.path.getsize(MULTIPLEX_FILE)
expected_size = 0
count = 0
with TemporaryDirectory() as directory:
files = []
while 1:
new_file_path = Path(directory).joinpath(
f'{MULTIPLEX_FILE.name}_{count}')
shutil.copy(MULTIPLEX_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
if expected_size > BIG_FILE_SIZE:
break
count += 1
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_less_or_equal_limit_mseed_non_multiplexed_low_spr_files(self):
sample_file_size = os.path.getsize(NON_MULTIPLEX_LOW_SPR_FILE)
expected_size = 0
with TemporaryDirectory() as directory:
files = []
for i in range(3):
new_file_path = Path(directory).joinpath(
f'{NON_MULTIPLEX_LOW_SPR_FILE.name}_{i}')
shutil.copy(NON_MULTIPLEX_LOW_SPR_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_less_or_equal_limit_mseed_non_multiplexed_high_spr_files(self):
sample_file_size = os.path.getsize(NON_MULTIPLEX_HIGH_SPR_FILE)
expected_size = 0
with TemporaryDirectory() as directory:
files = []
for i in range(3):
new_file_path = Path(directory).joinpath(
f'{NON_MULTIPLEX_HIGH_SPR_FILE.name}_{i}')
shutil.copy(NON_MULTIPLEX_HIGH_SPR_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_more_than_limit_mseed_non_multiplexed_high_spr_files(self):
sample_file_size = os.path.getsize(NON_MULTIPLEX_HIGH_SPR_FILE)
expected_size = 0
count = 0
with TemporaryDirectory() as directory:
files = []
while 1:
new_file_path = Path(directory).joinpath(
f'{NON_MULTIPLEX_HIGH_SPR_FILE.name}_{count}')
shutil.copy(NON_MULTIPLEX_HIGH_SPR_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
if expected_size > BIG_FILE_SIZE:
break
count += 1
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_mseed_non_multiplexed_high_n_low_spr_files(self):
expected_result = {'data_size': 11251712,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([NON_MULTIPLEX_HIGH_N_LOW_SPR_SET], [])
self.assertEqual(ret, expected_result)
def test_less_or_equal_limit_rt130_files(self):
sample_file_size = os.path.getsize(RT130_FILE)
expected_size = 0
with TemporaryDirectory() as directory:
files = []
new_data_stream_path = Path(directory).joinpath('1')
new_data_stream_path.mkdir(
parents=True, exist_ok=True)
for i in range(3):
new_file_path = new_data_stream_path.joinpath(
f'{RT130_FILE.name}_{i}')
shutil.copy(RT130_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], ['1'])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_more_than_limit_rt130_files(self):
sample_file_size = os.path.getsize(RT130_FILE)
expected_size = 0
count = 0
with TemporaryDirectory() as directory:
files = []
new_data_stream_path = Path(directory).joinpath('1')
new_data_stream_path.mkdir(
parents=True, exist_ok=True)
while 1:
new_file_path = new_data_stream_path.joinpath(
f'{RT130_FILE.name}_{count}')
shutil.copy(RT130_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
if expected_size > BIG_FILE_SIZE:
break
count += 1
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], ['1'])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_rt130_no_requested_datastream_files(self):
sample_file_size = os.path.getsize(RT130_FILE)
expected_size = 0
with TemporaryDirectory() as directory:
files = []
new_data_stream_path = Path(directory).joinpath('1')
new_data_stream_path.mkdir(
parents=True, exist_ok=True)
for i in range(3):
new_file_path = new_data_stream_path.joinpath(
f'{RT130_FILE.name}_{i}')
shutil.copy(RT130_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
expected_result = {'data_size': 0,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], ['2'])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_empty_directory(self):
with TemporaryDirectory() as temp_dir:
expected_result = {'data_size': 0,
'text_count': 0,
'binary_count': 0}
result = _check_folders_size([temp_dir], ['*'])
self.assertEqual(result, expected_result)
def test_directory_does_not_exist(self):
empty_name_dir = ''
try:
_check_folders_size([empty_name_dir], [])
except Exception as e:
self.assertEqual(
str(e),
"'' isn't a valid directory"
)
non_existent_dir = 'directory does not exist'
try:
_check_folders_size([non_existent_dir], [])
except Exception as e:
self.assertEqual(
str(e),
"'directory does not exist' isn't a valid directory"
)