Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
Commits on Source (17)
Showing
with 2371 additions and 45 deletions
......@@ -12,7 +12,7 @@ WF_3RD = 'ZNE123456'
HIGHEST_INT = 1E100
# warn user if file bigger than this size
BIG_FILE_SIZE = 2 * 10**8
BIG_FILE_SIZE = 2 * 10**9 # 2 GB
# Matplotlib's performance be slow if data point total > than this limit
CHAN_SIZE_LIMIT = 10**6
......
......@@ -160,3 +160,35 @@ def get_color_ranges():
clr_labels[idx].append("+/- {:,} counts".format(cnt))
clr_labels[idx].append("> {:,} counts".format(cnt))
return range_names, all_square_counts, clr_labels
def create_assign_string_for_db_query(col: str, val: str) -> str:
"""
Create assign db string that assign value in single quote signs if val is
a string or to NULL if val is empty str
:param col: column name in the db table
:param val: value to be assigned to the column
:return: the assign db string
"""
return f"{col}='{val}'" if val != '' else f"{col}=NULL"
def get_params():
# get parameter list from database
param_rows = execute_db("SELECT param from parameters")
return sorted([d[0] for d in param_rows])
def get_channel_info(chan_id: str, data_type: str):
# get channel info from DB
sql = f"SELECT * FROM Channels " \
f"WHERE channel='{chan_id}' AND dataType='{data_type}'"
chan_info = execute_db_dict(sql)[0]
return chan_info
def get_param_info(param: str):
# get all info of a param from DB
sql = f"SELECT * FROM Parameters WHERE param='{param}'"
param_info = execute_db_dict(sql)[0]
return param_info
from __future__ import annotations
import os
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Optional, Union, List, Tuple, Dict
import traceback
......@@ -14,7 +13,6 @@ from sohstationviewer.controller.util import \
display_tracking_info, get_valid_file_count, validate_file, validate_dir
from sohstationviewer.view.plotting.gps_plot.gps_point import GPSPoint
from sohstationviewer.view.util.enums import LogType
from sohstationviewer.database.process_db import execute_db
from sohstationviewer.model.general_data.general_data_helper import \
retrieve_data_time_from_data_dict, retrieve_gaps_from_data_dict, \
combine_data, sort_data, squash_gaps, apply_convert_factor_to_data_dict, \
......@@ -148,14 +146,6 @@ class GeneralData():
"""
self.gaps: Dict[DataKey, List[List[float]]] = {}
"""
tmp_dir: dir to keep memmap files. Deleted when object is deleted
"""
self.tmp_dir_obj: TemporaryDirectory = TemporaryDirectory()
self.tmp_dir = self.tmp_dir_obj.name
if not on_unittest:
self.save_temp_data_folder_to_database()
self._pauser = QtCore.QSemaphore()
self.pause_response = None
......@@ -279,17 +269,6 @@ class GeneralData():
self.selected_key = self.keys[ret]
return True
def __del__(self):
print("delete dataType Object")
try:
del self.tmp_dir_obj
except OSError as e:
self.track_info(
"Error deleting %s : %s" % (self.tmp_dir, e.strerror),
LogType.ERROR)
print("Error deleting %s : %s" % (self.tmp_dir, e.strerror))
print("finish deleting")
def track_info(self, text: str, type: LogType) -> None:
"""
Display tracking info in tracking_box.
......@@ -361,10 +340,6 @@ class GeneralData():
"""
return cls.__new__(cls)
def save_temp_data_folder_to_database(self):
execute_db(f'UPDATE PersistentData SET FieldValue="{self.tmp_dir}" '
f'WHERE FieldName="tempDataDirectory"')
def sort_all_data(self):
"""
Sort traces by startTmEpoch on all data: waveform_data, mass_pos_data,
......
......@@ -377,8 +377,8 @@ class RT130(GeneralData):
cur_key = (rt130._data[0]['unit_id'].decode(),
f"{rt130._data[0]['experiment_number']}")
self.populate_cur_key_for_all_data(cur_key)
self.get_ehet_in_log_data(rt130, cur_key)
if data_stream != 9:
self.get_ehet_in_log_data(rt130, cur_key)
self.get_mass_pos_data_and_waveform_data(rt130, data_stream, cur_key)
def get_ehet_in_log_data(self, rt130: core.Reftek130,
......
import sys
import platform
import os
from typing import Optional, Dict
from PySide2 import QtWidgets, QtGui
from PySide2.QtWidgets import QWidget, QDialog
from sohstationviewer.database.process_db import execute_db
from sohstationviewer.database.extract_data import (
get_params, get_channel_info, create_assign_string_for_db_query
)
from sohstationviewer.view.db_config.edit_single_param_dialog import \
EditSingleParamDialog
from sohstationviewer.conf.dbSettings import modify_db_path
def add_separation_line(layout):
"""
Add a line for separation to the given layout.
:param layout: QLayout - the layout that contains the line
"""
label = QtWidgets.QLabel()
label.setFrameStyle(QtWidgets.QFrame.HLine | QtWidgets.QFrame.Sunken)
label.setLineWidth(1)
layout.addWidget(label)
class AddEditSingleChannelDialog(QDialog):
"""
Dialog to add info for channel not in database or edit the existing channel
"""
def __init__(self, parent: Optional[QWidget],
chan_id: str, data_type: str):
"""
:param parent: the parent widget
:param chan_id: name of channel to be added/edited
:param data_type: type of the data being processed
"""
self.parent = parent
# name of the channel
self.chan_id = chan_id
# data_type of the channel
self.data_type = data_type
# param of the channel
self.param: str = 'Default'
# True if this channel isn't in DB yet
self.is_new_db_channel: bool = False
# To skip on_param_chkbox_changed() when param is changed by the
# program at the beginning
self.param_changed_by_signal: bool = False
# database info of the channel
self.channel_info: Dict = {}
# database info of the channel's parameter
self.param_info: Dict = {}
super(AddEditSingleChannelDialog, self).__init__(parent)
# short name of the channel
self.channel_name_lnedit = QtWidgets.QLineEdit(self)
self.channel_name_lnedit.setReadOnly(True)
# description added to channel name
self.label_lnedit = QtWidgets.QLineEdit(self)
self.label_lnedit.setPlaceholderText(
"added to channel name to be displayed")
# convert factor to change from count to actual value
self.conversion_lnedit = QtWidgets.QLineEdit(self)
self.conversion_lnedit.setPlaceholderText(
"to convert from count to actual value"
)
validator = QtGui.QDoubleValidator(0.0, 5.0, 6)
validator.setNotation(QtGui.QDoubleValidator.StandardNotation)
self.conversion_lnedit.setValidator(validator)
self.conversion_lnedit.setText('1')
# channel's unit
self.unit_lnedit = QtWidgets.QLineEdit(self)
# dedimal point for channel's value
self.fix_point_spnbox = QtWidgets.QSpinBox()
self.fix_point_spnbox.setToolTip("Decimal point that allow in display")
self.fix_point_spnbox.setMinimum(0)
self.fix_point_spnbox.setMaximum(5)
# data_type
self.data_type_lnedit = QtWidgets.QLineEdit(self)
self.data_type_lnedit.setReadOnly(True)
# channel's parameter which decides how channel is plotted
self.param_cbobox = QtWidgets.QComboBox(self)
self.param_cbobox.addItems(get_params())
# button to edit param
self.edit_param_btn = QtWidgets.QPushButton("EDIT PARAMETER", self)
# button to save changes to DB
self.save_btn = QtWidgets.QPushButton("SAVE CHANNEL", self)
# button to save changes and replot channel
self.save_replot_btn = QtWidgets.QPushButton("SAVE & REPLOT", self)
# button to close dialog without doing anything
self.cancel_btn = QtWidgets.QPushButton('CANCEL', self)
self.setup_ui()
self.set_channel_info()
self.connect_signals()
def setup_ui(self) -> None:
dlg_type = 'Add' if 'DEFAULT' in self.chan_id else 'Edit'
self.setWindowTitle(f"{dlg_type} channel {self.chan_id}"
f" - {self.data_type}")
main_layout = QtWidgets.QVBoxLayout()
self.setLayout(main_layout)
instruction = (
f"This dialog is to {dlg_type} channel {self.chan_id}.\n"
"Parameter need to be any value different than 'Default' to be "
"saved.")
main_layout.addWidget(QtWidgets.QLabel(instruction))
channel_layout = QtWidgets.QGridLayout()
main_layout.addLayout(channel_layout)
channel_layout.addWidget(QtWidgets.QLabel('Name'), 0, 0, 1, 1)
channel_layout.addWidget(self.channel_name_lnedit, 0, 1, 1, 1)
channel_layout.addWidget(QtWidgets.QLabel('Label'), 1, 0, 1, 1)
channel_layout.addWidget(self.label_lnedit, 1, 1, 1, 1)
channel_layout.addWidget(QtWidgets.QLabel('Conversion'), 2, 0, 1, 1)
channel_layout.addWidget(self.conversion_lnedit, 2, 1, 1, 1)
channel_layout.addWidget(QtWidgets.QLabel('Unit'), 3, 0, 1, 1)
channel_layout.addWidget(self.unit_lnedit, 3, 1, 1, 1)
channel_layout.addWidget(QtWidgets.QLabel('Fix Point'), 4, 0, 1, 1)
channel_layout.addWidget(self.fix_point_spnbox, 4, 1, 1, 1)
channel_layout.addWidget(QtWidgets.QLabel('Data Type'), 5, 0, 1, 1)
channel_layout.addWidget(self.data_type_lnedit, 5, 1, 1, 1)
channel_layout.addWidget(QtWidgets.QLabel('Parameter'), 6, 0, 1, 1)
channel_layout.addWidget(self.param_cbobox, 6, 1, 1, 1)
channel_layout.addWidget(self.save_btn, 7, 0, 1, 1)
channel_layout.addWidget(self.save_replot_btn, 7, 1, 1, 1)
channel_layout.addWidget(self.edit_param_btn, 8, 1, 1, 1)
channel_layout.addWidget(self.cancel_btn, 8, 0, 1, 1)
self.save_replot_btn.setFocus()
def connect_signals(self) -> None:
self.param_cbobox.currentTextChanged.connect(
self.on_param_cbobox_changed)
self.cancel_btn.clicked.connect(self.close)
self.save_btn.clicked.connect(self.on_save)
self.save_replot_btn.clicked.connect(self.on_save_replot)
self.edit_param_btn.clicked.connect(self.on_edit_param)
def set_channel_info(self):
"""
Add all Channel related info according to information got from DB.
In case Channel isn't in the DB, use the info of DEFAULT channel.
Call set_param_info to set Parameter related info.
"""
try:
self.channel_info = get_channel_info(self.chan_id, self.data_type)
except IndexError:
self.is_new_db_channel = True
self.channel_info = get_channel_info('DEFAULT', 'Default')
self.channel_name_lnedit.setText(self.chan_id)
self.label_lnedit.setText(self.channel_info['label'])
self.conversion_lnedit.setText(
str(float(self.channel_info['convertFactor'])))
self.unit_lnedit.setText(self.channel_info['unit'])
if self.channel_info['fixPoint'] is not None:
self.fix_point_spnbox.setValue(self.channel_info['fixPoint'])
self.data_type_lnedit.setText(self.data_type)
self.param_cbobox.setCurrentText(self.channel_info['param'])
self.param = self.channel_info['param']
self.set_buttons_enabled()
def on_param_cbobox_changed(self):
"""
+ Check self.param_changed_by_signal to make sure the signal come from
user selecting a parameter from param_cbobox, not the signal from
changing back to original value when condition not pass fo the set
value
+ Not allow parameter 'Default' to be select because it is only the
parameter for the channel that has no record in DB
+ If the channel already has a record in DB, give a warning when user's
trying to change its parameter.
"""
if self.param_changed_by_signal:
self.param_changed_by_signal = False
return
new_param = self.param_cbobox.currentText()
if new_param == 'Default':
# Parameter Default is only for channel that has no record in DB.
# So it isn't allowed to be selected
self.param_changed_by_signal = True
self.param_cbobox.setCurrentText(self.param)
return
if not self.is_new_db_channel:
msg = ("ARE YOU SURE YOU WANT TO CHANGE PARAMETER FOR CHANNEL "
f"'{self.chan_id}'?")
result = QtWidgets.QMessageBox.question(
self, "Confirmation", msg,
QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No)
if result == QtWidgets.QMessageBox.No:
self.param_changed_by_signal = True
self.param_cbobox.setCurrentText(self.param)
return
self.param = new_param
self.set_buttons_enabled()
def save(self):
"""
Save info from GUI to DB
"""
if self.is_new_db_channel:
self.insert_channel_info()
else:
self.update_channel_info()
def on_save(self):
"""
Save new channel info to DB
"""
self.save()
self.close()
def on_save_replot(self):
"""
Save new channel info to DB
TODO: Replot the channel in the plotting area
"""
self.save()
print("Do REPLOT")
self.close()
def set_buttons_enabled(self):
"""
Disable the 3 buttons to save and to edit parameters so that user are
forced to change param before they want to continue.
"""
if self.param == 'Default':
self.edit_param_btn.setEnabled(False)
self.save_btn.setEnabled(False)
self.save_replot_btn.setEnabled(False)
else:
self.edit_param_btn.setEnabled(True)
self.save_btn.setEnabled(True)
self.save_replot_btn.setEnabled(True)
def on_edit_param(self):
"""
Give user a warning when they want to change parameter's info then
open EditSingleParamDialog for user to edit parameter's info after
they give their confirmation.
"""
msg = ("Changing parameter will affect all of other channels that "
"have the same parameter.\n\n"
"Are you sure you want to continue?")
result = QtWidgets.QMessageBox.question(
self, "Confirmation", msg,
QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No)
if result == QtWidgets.QMessageBox.No:
return
win = EditSingleParamDialog(self, self.param_cbobox.currentText())
win.exec_()
def update_para_info(self, param):
"""
Save parameter related info to Parameters table
:param param: param condition string
"""
plot_type = create_assign_string_for_db_query(
'plotType', self.plot_type_cbo_box.currentText())
value_colorb = create_assign_string_for_db_query(
'valueColorsB', self.value_colorb_widget.text())
value_colorw = create_assign_string_for_db_query(
'valueColorsW', self.value_colorw_widget.text())
height = f"height={self.height_spnbox.value()}"
sql = (f"UPDATE Parameters SET {plot_type}, {value_colorb}, "
f"{value_colorw}, {height} WHERE {param}")
execute_db(sql)
def insert_channel_info(self):
sql = ("INSERT INTO Channels VALUES ("
f"'{self.channel_name_lnedit.text()}', "
f"'{self.label_lnedit.text()}', "
f"'{self.param_cbobox.currentText()}', "
f"NULL, " # linkedChan for RT130 only and won't be changed
f"{self.conversion_lnedit.text()}, "
f"'{self.unit_lnedit.text()}', "
f"{self.fix_point_spnbox.value()}, "
f"'{self.data_type_lnedit.text()}')")
execute_db(sql)
def update_channel_info(self):
channel = f"channel='{self.channel_name_lnedit.text()}'"
label = f"label='{self.label_lnedit.text()}'"
param = f"param='{self.param_cbobox.currentText()}'"
linked_chan = "linkedChan=NULL"
convert_factor = f"convertFactor={self.conversion_lnedit.text()}"
unit = f"unit='{self.unit_lnedit.text()}'"
fix_point = f"fixPoint={self.fix_point_spnbox.value()}"
data_type = f"dataType='{self.data_type_lnedit.text()}'"
sql = (f"UPDATE Channels SET {label}, {param}, {linked_chan}, "
f"{convert_factor}, {unit}, {fix_point}, {data_type} "
f"WHERE {channel}")
execute_db(sql)
if __name__ == '__main__':
modify_db_path()
os_name, version, *_ = platform.platform().split('-')
if os_name == 'macOS':
os.environ['QT_MAC_WANTS_LAYER'] = '1'
app = QtWidgets.QApplication(sys.argv)
# test new channel
# test = AddEditSingleChannelDialog(None, 'VEE', 'Q330')
# test linesDots. Ex: param: Input power supply current
# test = AddEditSingleChannelDialog(None, 'VEC', 'Q330')
# test MultiColorDotsLowerBound. Ex: param:Backup volt
# test = AddEditSingleChannelDialog(None, 'Backup Volt', 'RT130')
# test MultiColorDotsUpperBound. Ex: param:GNSS status
test = AddEditSingleChannelDialog(None, 'VST', 'Pegasus')
# test UpDownDots. Ex: param. Ex: param:Net Up/down
# test = AddEditSingleChannelDialog(None, 'Net Up/Down', 'RT130')
# test TriColorLInes. Ex: param. Ex: param:Error/warning
# test = AddEditSingleChannelDialog(None, 'Error/Warning', 'RT130')
test.exec_()
sys.exit(app.exec_())
import sys
import platform
import os
from typing import Optional, Dict
from PySide2 import QtWidgets
from PySide2.QtWidgets import QWidget, QDialog, QLineEdit
from sohstationviewer.view.util.plot_func_names import plot_functions
from sohstationviewer.database.process_db import execute_db
from sohstationviewer.database.extract_data import (
get_param_info, create_assign_string_for_db_query
)
from sohstationviewer.conf.dbSettings import modify_db_path
class EditSingleParamDialog(QDialog):
"""
Dialog to add info for channel not in database or edit the existing channel
"""
def __init__(self, parent: Optional[QWidget],
param: str):
"""
:param parent: the parent widget
:param chan_id: name of channel to be added/edited
:param data_type: type of the data being processed
"""
self.param = param
# # To skip on_param_chkbox_changed() when param is changed by the
# # program at the beginning
# self.param_changed_by_signal = False
# database info of the channel
self.channel_info: Dict = {}
# database info of the channel's parameter
self.param_info: Dict = {}
super(EditSingleParamDialog, self).__init__(parent)
# parameter's plot type which decides the shape of the plot
self.plot_type_cbo_box = QtWidgets.QComboBox(self)
self.plot_type_cbo_box.addItems([""] + list(plot_functions.keys()))
# value color in black mode
self.value_colorb_widget = QLineEdit(self)
self.value_colorb_widget.setPlaceholderText(
"Click edit button to add value color string")
self.value_colorb_widget.setToolTip("Priority from left to right")
# value, color in white mode
self.value_colorw_widget = QLineEdit(self)
self.value_colorw_widget.setPlaceholderText(
"Click edit button to add value color string")
self.value_colorw_widget.setToolTip("Priority from left to right")
# height of the plot
self.height_spnbox = QtWidgets.QSpinBox()
self.height_spnbox.setMinimum(0)
self.height_spnbox.setMaximum(8)
self.height_spnbox.setToolTip("Relative height of the plot")
# button to save change to DB
self.save_param_btn = QtWidgets.QPushButton(
"SAVE PARAMETER", self)
# button to close dialog without doing anything
self.cancel_btn = QtWidgets.QPushButton('CANCEL', self)
self.setup_ui()
self.set_param_info()
self.connect_signals()
def setup_ui(self) -> None:
self.setWindowTitle(f"Edit Parameter {self.param}")
main_layout = QtWidgets.QVBoxLayout()
self.setLayout(main_layout)
param_layout = QtWidgets.QGridLayout()
main_layout.addLayout(param_layout)
param_layout.addWidget(QtWidgets.QLabel('Plot Type'), 0, 0, 1, 1)
param_layout.addWidget(self.plot_type_cbo_box, 0, 1, 1, 1)
param_layout.addWidget(QtWidgets.QLabel(
'Value Color (black)'), 1, 0, 1, 1)
param_layout.addWidget(self.value_colorb_widget, 1, 1, 1, 1)
param_layout.addWidget(QtWidgets.QLabel(
'Value Color (white)'), 2, 0, 1, 1)
param_layout.addWidget(self.value_colorw_widget, 2, 1, 1, 1)
param_layout.addWidget(QtWidgets.QLabel('Height'), 3, 0, 1, 1)
param_layout.addWidget(self.height_spnbox, 3, 1, 1, 1)
param_layout.addWidget(self.cancel_btn, 4, 0, 1, 1)
param_layout.addWidget(self.save_param_btn, 4, 1, 1, 1)
def connect_signals(self) -> None:
self.plot_type_cbo_box.currentTextChanged.connect(self.set_plot_type)
self.cancel_btn.clicked.connect(self.close)
self.save_param_btn.clicked.connect(self.on_save_param)
def set_param_info(self) -> None:
"""
Fill up all info boxes
"""
self.param_info = get_param_info(self.param)
self.set_plot_type(self.param_info['plotType'])
self.height_spnbox.setValue(self.param_info['height'])
def set_plot_type(self, plot_type: str) -> None:
"""
Add Plot Type, Value Color strings.
If there is no Plot Type, no Value Color or Height because no plot.
:param plot_type: name of Plot Type
"""
if plot_type in ["", None]:
self.plot_type_cbo_box.setCurrentText('')
self.value_colorb_widget.setEnabled(False)
self.value_colorb_widget.clear()
self.value_colorw_widget.setEnabled(False)
self.value_colorw_widget.clear()
self.height_spnbox.setValue(0)
else:
self.plot_type_cbo_box.setCurrentText(plot_type)
value_color_b = self.param_info['valueColorsB']
value_color_w = self.param_info['valueColorsW']
self.value_colorb_widget.setText(value_color_b)
self.value_colorw_widget.setText(value_color_w)
def on_save_param(self):
"""
Save parameter info to Parameters table
"""
plot_type = create_assign_string_for_db_query(
'plotType', self.plot_type_cbo_box.currentText())
value_colorb = create_assign_string_for_db_query(
'valueColorsB', self.value_colorb_widget.text())
value_colorw = create_assign_string_for_db_query(
'valueColorsW', self.value_colorw_widget.text())
height = f"height={self.height_spnbox.value()}"
sql = (f"UPDATE Parameters SET {plot_type}, {value_colorb}, "
f"{value_colorw}, {height} WHERE param='{self.param}'")
execute_db(sql)
self.close()
if __name__ == '__main__':
modify_db_path()
os_name, version, *_ = platform.platform().split('-')
if os_name == 'macOS':
os.environ['QT_MAC_WANTS_LAYER'] = '1'
app = QtWidgets.QApplication(sys.argv)
# test linesDots. Ex: param: Input power supply current
test = EditSingleParamDialog(None, 'Input power supply current')
# test MultiColorDotsLowerBound. Ex: param:Backup volt
# test = EditSingleParamDialog(None, 'Backup Volt', 'RT130')
# test MultiColorDotsUpperBound. Ex: param:GNSS status
# test = EditSingleParamDialog(None, 'GNSS status')
# test UpDownDots. Ex: param. Ex: param:Net Up/down
# test = EditSingleParamDialog(None, 'Net Up/Down', 'RT130')
# test TriColorLInes. Ex: param. Ex: param:Error/warning
# test = EditSingleParamDialog(None, 'Error/Warning', 'RT130')
test.exec_()
sys.exit(app.exec_())
from PySide2 import QtWidgets, QtGui
from PySide2.QtWidgets import QWidget, QDialog
def display_color(color_label: QtWidgets.QLabel, color: str):
"""
Display color on color_label.
Display the given color on the color_label
:param color_label: the label to display color
:param color: the color that is given to update the color_label
"""
palette = color_label.palette()
palette.setColor(QtGui.QPalette.Background, QtGui.QColor(color))
color_label.setPalette(palette)
class EditValueColorDialog(QDialog):
"""Base class for value color editing dialogs of different plot types"""
def __init__(self, parent: QWidget, value_color_str: str):
"""
:param parent: the parent widget
:param value_color_str: string for value color to be saved in DB
"""
super(EditValueColorDialog, self).__init__(parent)
self.value_color_str = value_color_str
self.main_layout = QtWidgets.QGridLayout()
self.setLayout(self.main_layout)
self.cancel_btn = QtWidgets.QPushButton('CANCEL', self)
self.save_colors_btn = QtWidgets.QPushButton('SAVE COLORS', self)
self.setup_ui()
self.set_value()
self.connect_signals()
def setup_ui(self):
pass
def set_value(self):
pass
def setup_complete_buttons(self, row_total) -> None:
"""
:param row_total: total of rows to edit
"""
self.main_layout.addWidget(self.cancel_btn, row_total, 0, 1, 1)
self.main_layout.addWidget(self.save_colors_btn, row_total, 3, 1, 1)
def connect_signals(self) -> None:
self.cancel_btn.clicked.connect(self.close)
self.save_colors_btn.clicked.connect(self.on_save_color)
def on_select_color(self, color_label: QtWidgets.QLabel):
"""
When clicking on Select Color button, Color Picker will pop up with
the default color is color_label's color.
User will select a color then save to update the selected color to
the color_label.
:param color_label: the label that display the color of the obj_type
"""
color = color_label.palette().window().color()
new_color = QtWidgets.QColorDialog.getColor(color)
if new_color.isValid():
display_color(color_label, new_color.name())
self.raise_()
import sys
import platform
import os
from PySide2 import QtWidgets
from PySide2.QtWidgets import QWidget
from sohstationviewer.view.db_config.value_color_helper.\
edit_value_color_dialog.edit_value_color_dialog_super_class import \
EditValueColorDialog, display_color
class LineDotDialog(EditValueColorDialog):
"""Dialog to edit color for Line/Dot Plot"""
def __init__(self, parent: QWidget, value_color_str: str):
"""
:param parent: the parent widget
:param value_color_str: string for value color to be saved in DB
"""
# Widget that allow user to add/edit line's color
self.select_line_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display line's color
self.line_color_label = QtWidgets.QLabel()
self.line_color_label.setFixedWidth(30)
self.line_color_label.setAutoFillBackground(True)
# check box to include dot in value_color_str or not
self.dot_include_chkbox = QtWidgets.QCheckBox('Included')
# Widget that allow user to add/edit dot's color
self.select_dot_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display dot's color
self.dot_color_label = QtWidgets.QLabel()
self.dot_color_label.setFixedWidth(30)
self.dot_color_label.setAutoFillBackground(True)
super(LineDotDialog, self).__init__(parent, value_color_str)
self.setWindowTitle("Edit Line/Dot Plotting's Colors")
self.on_click_include_dot()
def setup_ui(self) -> None:
self.main_layout.addWidget(QtWidgets.QLabel('Line Color'), 0, 1, 1, 1)
self.main_layout.addWidget(self.line_color_label, 0, 2, 1, 1)
self.main_layout.addWidget(self.select_line_color_btn, 0, 3, 1, 1)
self.main_layout.addWidget(self.dot_include_chkbox, 1, 0, 1, 1)
self.main_layout.addWidget(QtWidgets.QLabel('Dot Color'), 1, 1, 1, 1)
self.main_layout.addWidget(self.dot_color_label, 1, 2, 1, 1)
self.main_layout.addWidget(self.select_dot_color_btn, 1, 3, 1, 1)
self.setup_complete_buttons(2)
def connect_signals(self) -> None:
self.select_line_color_btn.clicked.connect(
lambda: self.on_select_color(self.line_color_label))
self.select_dot_color_btn.clicked.connect(
lambda: self.on_select_color(self.dot_color_label))
self.dot_include_chkbox.clicked.connect(self.on_click_include_dot)
super().connect_signals()
def on_click_include_dot(self):
"""
Enable/disable select color and show/hide color label according to
dot_include_chkbox is checked or unchecked.
"""
enabled = self.dot_include_chkbox.isChecked()
self.select_dot_color_btn.setEnabled(enabled)
self.dot_color_label.setHidden(not enabled)
def set_value(self):
"""
Change the corresponding color_labels's color according to the color
from value_color_str.
"""
self.dot_include_chkbox.setChecked(False)
if self.value_color_str == "":
return
vc_parts = self.value_color_str.split('|')
for vc_str in vc_parts:
obj_type, color = vc_str.split(':')
if obj_type == 'Line':
display_color(self.line_color_label, color)
if obj_type == 'Dot':
display_color(self.dot_color_label, color)
self.dot_include_chkbox.setChecked(True)
def on_save_color(self):
"""
Create value_color_str from GUI's info and close the GUI with color
is the hex color got from color_labels' color
"""
line_color = self.line_color_label.palette().window().color().name()
self.value_color_str = f"Line:{line_color}"
if self.dot_include_chkbox.isChecked():
dot_color = self.dot_color_label.palette().window().color().name()
self.value_color_str += f"|Dot:{dot_color}"
self.close()
if __name__ == '__main__':
os_name, version, *_ = platform.platform().split('-')
if os_name == 'macOS':
os.environ['QT_MAC_WANTS_LAYER'] = '1'
app = QtWidgets.QApplication(sys.argv)
test = LineDotDialog(None, "Line:#00FF00|Dot:#00FF00")
test.exec_()
print("result:", test.value_color_str)
sys.exit(app.exec_())
import sys
import platform
import os
from typing import List, Dict, Optional
from PySide2 import QtWidgets, QtCore, QtGui
from PySide2.QtWidgets import QWidget
from sohstationviewer.view.db_config.value_color_helper.\
edit_value_color_dialog.edit_value_color_dialog_super_class import \
EditValueColorDialog, display_color
ROW_TOTAL = 7
class BoundValidator(QtGui.QValidator):
"""
Validator that allow float value and empty string.
Range of value is from -10, 10 because -20/20 are the values assigned to
higher bounds of the previous/next when the current row is 0 and last
so the editable higher bounds can't ever be reached to.
Value '-' to allow typing negative float. If user type '-' only, it will
be checked when editing is finished.
"""
def validate(self, input_val, pos):
if input_val in ['', '-']:
return QtGui.QValidator.Acceptable
try:
input_val = float(input_val)
except ValueError:
return QtGui.QValidator.Invalid
if -10 <= input_val <= 10:
return QtGui.QValidator.Acceptable
else:
return QtGui.QValidator.Invalid
class MultiColorDotDialog(EditValueColorDialog):
def __init__(
self, parent: Optional[QWidget],
value_color_str: str, upper_equal: bool):
"""
Dialog to edit color for Multi-color Dot Plot
:param parent: the parent widget
:param value_color_str: string for value color to be saved in DB
:param upper_equal: flag to know if equal is on upper bound
or lower_bound
"""
self.upper_equal = upper_equal
# list of widgets to display lower bound which is read only of the
# value range of rows
self.lower_bound_lnedits: List[QtWidgets.QLineEdit] = []
# list of widgets to display lower bound of the value range of rows
self.higher_bound_lnedits: List[QtWidgets.QLineEdit] = []
# List of buttons that allow user to add/edit color
self.select_color_btns: List[QtWidgets.QPushButton] = []
# list of labels to display color of the data points defined by
# the value range of rows
self.color_labels: List[QtWidgets.QLabel] = []
# After a higher bound widget is edited and enter is hitted, button's
# clicked signal will be called before higher_bound_editting_finished
# is reached. The following flags are to change that order.
# These flags are also used to prevent the case hitting Select Color or
# Save right after user done with editing. In case there is error when
# checking the value, the button click will be canceled too.
self.changed_row_id = None
self.select_color_btn_clicked = False
self.save_colors_btn_clicked = False
# Check box for plotting data point less than all the rest
self.include_less_than_chkbox = QtWidgets.QCheckBox('Include')
# Check box for including data point greater than all the rest
self.include_greater_than_chkbox = QtWidgets.QCheckBox('Include')
# Keep track of value by row_id
self.row_id_value_dict: Dict[int, float] = {}
super(MultiColorDotDialog, self).__init__(parent, value_color_str)
self.setWindowTitle("Edit Multi Color Dot Plotting")
# set focus policy to not be clicked by hitting enter in higher bound
self.cancel_btn.setFocusPolicy(QtCore.Qt.NoFocus)
self.save_colors_btn.setFocusPolicy(QtCore.Qt.NoFocus)
self.set_value_row_id_dict()
def setup_ui(self) -> None:
for i in range(ROW_TOTAL):
if i == 0:
self.main_layout.addWidget(
self.include_less_than_chkbox, 0, 0, 1, 1)
self.include_less_than_chkbox.setChecked(True)
if i == ROW_TOTAL - 1:
self.main_layout.addWidget(
self.include_greater_than_chkbox, ROW_TOTAL - 1, 0, 1, 1)
(lower_bound_lnedit, higher_bound_lnedit,
select_color_btn, color_label) = self.add_row(i)
self.lower_bound_lnedits.append(lower_bound_lnedit)
self.higher_bound_lnedits.append(higher_bound_lnedit)
self.select_color_btns.append(select_color_btn)
self.color_labels.append(color_label)
self.set_color_enabled(i, False)
self.setup_complete_buttons(ROW_TOTAL)
def connect_signals(self) -> None:
self.include_greater_than_chkbox.clicked.connect(
lambda: self.on_include(ROW_TOTAL - 1,
self.include_greater_than_chkbox))
self.include_less_than_chkbox.clicked.connect(
lambda: self.on_include(0, self.include_less_than_chkbox))
for i in range(ROW_TOTAL):
self.higher_bound_lnedits[i].textChanged.connect(
lambda arg, idx=i: setattr(self, 'changed_row_id', idx))
self.higher_bound_lnedits[i].editingFinished.connect(
lambda idx=i: self.on_higher_bound_editing_finished(idx))
# some how pass False to on_select_color, have to handle this
# in add_row() instead
# self.select_color_btns[i].clicked.connect(
# lambda idx=i: self.on_select_color(idx))
super().connect_signals()
def get_comparing_text(self, row_id):
pass
def add_row(self, row_id):
"""
Adding a row including Lower bound line dit, comparing operator label,
higher bound line edit, select color button, display color label
:param row_id: id of current row
"""
lower_bound_lnedit = QtWidgets.QLineEdit()
lower_bound_lnedit.setEnabled(False)
comparing_text = self.get_comparing_text(row_id)
comparing_label = QtWidgets.QLabel(comparing_text)
higher_bound_lnedit = QtWidgets.QLineEdit()
validator = BoundValidator()
higher_bound_lnedit.setValidator(validator)
if row_id == 0:
lower_bound_lnedit.setHidden(True)
elif row_id == ROW_TOTAL - 1:
higher_bound_lnedit.setHidden(True)
select_color_btn = QtWidgets.QPushButton("Select Color")
select_color_btn.clicked.connect(
lambda: self.on_select_color(row_id))
# set focus policy to not be clicked by hitting enter in higher bound
select_color_btn.setFocusPolicy(QtCore.Qt.NoFocus)
color_label = QtWidgets.QLabel()
color_label.setFixedWidth(30)
color_label.setAutoFillBackground(True)
# layout
self.main_layout.addWidget(lower_bound_lnedit, row_id, 1, 1, 1)
self.main_layout.addWidget(comparing_label, row_id, 2, 1, 1)
self.main_layout.addWidget(higher_bound_lnedit, row_id, 3, 1, 1)
self.main_layout.addWidget(select_color_btn, row_id, 4, 1, 1)
self.main_layout.addWidget(color_label, row_id, 5, 1, 1)
return (lower_bound_lnedit, higher_bound_lnedit,
select_color_btn, color_label)
def handle_clear_higher_bound(self, row_id):
"""
If user clear higher_bound
+ in the middle, not allow.
+ of the only row, not allow.
+ in the last edited row, the lower_bound for the next row need to be
cleared, and the very end row's value will be set to the higher_bound
of the previous row.
"""
if row_id < len(self.row_id_value_dict) - 1:
# If the cleared one isn't the last one, it shouldn't be
# allowed. An Error message should be given.
msg = "Higher bound must be a number"
QtWidgets.QMessageBox.information(self, "Error", msg)
self.higher_bound_lnedits[row_id].setText(
str(self.row_id_value_dict[row_id]))
else:
# If the cleared one is the last one, the lower_bound_lnedit
# of the next row will be cleared too.
if row_id == 0:
# If the cleared one is the only one, this shouldn't be
# allowed. A warning should be given.
msg = ("There should be at least one value"
" for this plot type.")
QtWidgets.QMessageBox.information(self, "Error", msg)
return
self.set_color_enabled(row_id, False)
self.lower_bound_lnedits[row_id + 1].setText('')
self.set_value_row_id_dict()
def handle_skipping_editing_row(self, row_id):
"""
If user add value in a row that skipping from the last edited row,
it shouldn't be allowed, an Error message should be given before the
higher_bound is cleared, the actions will ignite the call to this
function one more time which should be ignored.
"""
if self.higher_bound_lnedits[row_id].text() == '':
# called by clearing text in this section
return
self.changed_row_id = None
# When user edit the row that skips some row from the last row
# the current higher_bound_lnedit will be cleared
msg = ("You have to edit rows in order.\n"
"Skipping rows isn't allowed.")
QtWidgets.QMessageBox.information(self, "Error", msg)
self.higher_bound_lnedits[row_id].setText('')
def handle_edited_value_not_ascending(
self, row_id, prev_higher_bound, next_higher_bound):
"""
If value enter to the current higher_bound_lnedit make it out of
increasing sorting, a warning will be given, and the widget will be
set to the original value.
:param row_id: id of current row
:param prev_higher_bound: higher_bound value of the previous row
:param next_higher_bound: higher_bound value of the next row
"""
cond = (f"{prev_higher_bound} < "
if prev_higher_bound != -20 else '')
cond += (f"d < {next_higher_bound}"
if next_higher_bound != 20 else 'd')
msg = f"Value entered must be: {cond}"
QtWidgets.QMessageBox.information(self, "Error", msg)
try:
self.higher_bound_lnedits[row_id].setText(
str(self.row_id_value_dict[row_id]))
except KeyError:
# If the current row is the last one, there's no original value
# So the widget will be cleared.
self.higher_bound_lnedits[row_id].setText('')
self.select_color_btn_clicked = False
self.save_colors_btn_clicked = False
def on_higher_bound_editing_finished(self, row_id: int):
"""
Check value entered for higher bound:
+ If the last row is empty string, that value will be eliminated
from condition.
+ If value not greater than previous row and less than latter row,
give error message.
Call set_value_row_id_dict to keep track of value by row_id.
:param row_id: id of the row being checked.
row_id == self.changed_row_id when the method is ignited by
editing the row's value (not by clicking on some buttons)
"""
if self.changed_row_id is None:
# When the function isn't called from user's changing text on
# a higher_bound_lnedit, this function will be ignored.
return
if len(self.row_id_value_dict) < self.changed_row_id < ROW_TOTAL - 1:
self.handle_skipping_editing_row(row_id)
return
self.changed_row_id = None
# -20 is assigned to higher bound of the row before the first row
# to always satisfy validating this row's higher bound value for value
# range is [-10,10]
prev_higher_bound = (
-20 if row_id == 0
else float(self.higher_bound_lnedits[row_id - 1].text()))
if self.higher_bound_lnedits[row_id].text().strip() == '':
self.handle_clear_higher_bound(row_id)
self.select_color_btn_clicked = False
self.save_colors_btn_clicked = False
return
# 20 is assigned to higher bound of the row after the last edited row
# to always satisfy validating this row's higher bound value for value
# range is [-10,10]
next_higher_bound = (
20 if row_id == len(self.row_id_value_dict) - 1
else float(self.higher_bound_lnedits[row_id + 1].text()))
curr_higher_bound = float(self.higher_bound_lnedits[row_id].text())
if (curr_higher_bound <= prev_higher_bound
or curr_higher_bound >= next_higher_bound):
self.handle_edited_value_not_ascending(row_id,
prev_higher_bound,
next_higher_bound)
return
if ((row_id == 0 and self.include_less_than_chkbox.isChecked())
or row_id != 0):
# Enable button Select Color unless the row is the first row but
# Include checkbox isn't checked
self.set_color_enabled(row_id, True)
self.lower_bound_lnedits[row_id + 1].setText(str(curr_higher_bound))
self.set_value_row_id_dict()
if self.save_colors_btn_clicked:
self.on_save_color()
if self.select_color_btn_clicked:
self.on_select_color(row_id)
def set_value_row_id_dict(self):
"""
Update row_id_value_dict to the current higher bound
Update lower bound of the last row which is for greater than all the
rest to be the max of all higher bound
"""
self.row_id_value_dict = {i: float(self.higher_bound_lnedits[i].text())
for i in range(ROW_TOTAL - 1)
if self.higher_bound_lnedits[i].text() != ''}
last_row_lnedit = self.lower_bound_lnedits[ROW_TOTAL - 1]
if len(self.row_id_value_dict) == 0:
last_row_lnedit.clear()
else:
last_row_lnedit.setText(str(max(self.row_id_value_dict.values())))
def set_color_enabled(self, row_id: int, enabled: bool):
"""
Enable color: allow to edit and display color
Disable color: disallow to edit and hide color
"""
self.select_color_btns[row_id].setEnabled(enabled)
self.color_labels[row_id].setHidden(not enabled)
def on_select_color(self, row_id: int):
"""
When clicking on Select Color button, Color Picker will pop up with
the default color is color_label's color.
User will select a color then save to update the selected color to
the color_label.
"""
if self.changed_row_id is not None:
self.select_color_btn_clicked = True
self.on_higher_bound_editing_finished(self.changed_row_id)
return
self.select_color_btn_clicked = False
super().on_select_color(self.color_labels[row_id])
def on_include(self, row_id: int, chkbox: QtWidgets.QCheckBox):
"""
Enable/Disable Select Color button when include_less_than_chkbox/
include_greater_than_chkbox is checked/unchecked.
"""
self.set_color_enabled(row_id, chkbox.isChecked())
def set_row(self, vc_idx: int, value: float, color: str):
"""
Add values to widgets in a row
+ row 0: consider uncheck include checkbox if color='not plot' and
check otherwise.
+ row TOTAL-1: check include checkbox and set value for lower bound
+ all row other than TOTAL-1, set value for higher bound and
next row's lower bound, enable and display color
"""
if vc_idx == 0:
if color == 'not plot':
self.include_less_than_chkbox.setChecked(False)
else:
self.include_less_than_chkbox.setChecked(True)
if vc_idx < ROW_TOTAL - 1:
self.higher_bound_lnedits[vc_idx].setText(str(value))
self.lower_bound_lnedits[vc_idx + 1].setText(str(value))
else:
self.include_greater_than_chkbox.setChecked(True)
self.lower_bound_lnedits[ROW_TOTAL - 1].setText(str(value))
if color == 'not plot':
self.set_color_enabled(vc_idx, False)
else:
self.set_color_enabled(vc_idx, True)
display_color(self.color_labels[vc_idx], color)
def on_save_color(self):
"""
Create value_color_str from GUI's info and close the GUI.
+ Skip row that has no color
+ color = color_label's color name
+ if include_less_than_chkbox is not checked, 'not_plot' will be
set for the first color
+ Format for value_color of all row other than TOTAL - 1th:
<=value:color
+ If include_greater_than_chkbox is checked, format will be:
value<:color
"""
if self.changed_row_id and self.changed_row_id < ROW_TOTAL - 1:
self.save_colors_btn_clicked = True
self.on_higher_bound_editing_finished(self.changed_row_id)
return
self.save_colors_btn_clicked = False
value_color_list = []
for i in range(ROW_TOTAL - 1):
if self.color_labels[i].isHidden() and i != 0:
continue
value = self.higher_bound_lnedits[i].text()
color = self.color_labels[i].palette(
).window().color().name()
if i == 0 and not self.include_less_than_chkbox.isChecked():
color = 'not plot'
operator = '<=' if self.upper_equal else '<'
value_color_list.append(f"{operator}{value}:{color}")
if self.include_greater_than_chkbox.isChecked():
color = self.color_labels[ROW_TOTAL - 1].palette().window(
).color().name()
val = f"{self.lower_bound_lnedits[ROW_TOTAL - 1].text()}"
if self.upper_equal:
value_color_list.append(f"{val}<:{color}")
else:
value_color_list.append(f"={val}:{color}")
self.value_color_str = '|'.join(value_color_list)
self.close()
class MultiColorDotLowerEqualDialog(MultiColorDotDialog):
def __init__(self, parent, value_color_str):
super(MultiColorDotLowerEqualDialog, self).__init__(
parent, value_color_str, upper_equal=False)
def get_comparing_text(self, row_id: int):
"""
Create text that show relationship between lower_bound and higher_bound
to data for the selected color.
:param row_id: id of current row
"""
if row_id == 0:
comparing_text = " d <"
elif row_id < ROW_TOTAL - 1:
comparing_text = "<= d <"
else:
comparing_text = "= d"
return comparing_text
def set_value(self):
"""
Change the corresponding color_labels's color, higher/lower bound,
include check boxes according to value_color_str.
"""
self.include_greater_than_chkbox.setChecked(False)
self.include_less_than_chkbox.setChecked(False)
if self.value_color_str == "":
return
vc_parts = self.value_color_str.split('|')
count = 0
for vc_str in vc_parts:
value, color = vc_str.split(':')
if value.startswith('<'):
# Ex: <1:#00FFFF
# Ex: <0:not plot (can be on first value only)
value = value.replace('<', '')
self.set_row(count, float(value), color)
count += 1
else:
# Ex: =1:#FF00FF
value = value.replace('=', '')
self.set_row(ROW_TOTAL - 1, float(value), color)
class MultiColorDotUpperEqualDialog(MultiColorDotDialog):
def __init__(self, parent, value_color_str):
super(MultiColorDotUpperEqualDialog, self).__init__(
parent, value_color_str, upper_equal=True)
def get_comparing_text(self, row_id):
"""
Create text that show relationship between lower_bound and higher_bound
to data for the selected color.
:param row_id: id of current row
"""
if row_id == 0:
comparing_text = " d <="
elif row_id < ROW_TOTAL - 1:
comparing_text = "< d <="
else:
comparing_text = "< d "
return comparing_text
def set_value(self):
"""
Change the corresponding color_labels's color, higher/lower bound,
include check boxes according to value_color_str.
"""
self.include_greater_than_chkbox.setChecked(False)
self.include_less_than_chkbox.setChecked(False)
if self.value_color_str == "":
return
vc_parts = self.value_color_str.split('|')
count = 0
for vc_str in vc_parts:
value, color = vc_str.split(':')
if value.startswith('<='):
# Ex: <=1:#00FFFF
# Ex: <=0:not plot (can be on first value only)
value = value.replace('<=', '')
self.set_row(count, float(value), color)
count += 1
else:
# Ex: 1<:#FF00FF
value = value.replace('<', '')
self.set_row(ROW_TOTAL - 1, float(value), color)
if __name__ == '__main__':
os_name, version, *_ = platform.platform().split('-')
if os_name == 'macOS':
os.environ['QT_MAC_WANTS_LAYER'] = '1'
app = QtWidgets.QApplication(sys.argv)
# test = MultiColorDotLowerEqualDialog(
# None, '<3:#FF0000|<3.3:#FFFF00|=3.3:#00FF00')
test = MultiColorDotUpperEqualDialog(
None, '<=0:not plot|<=1:#FFFF00|<=2:#00FF00|2<:#FF00FF')
test.exec_()
print("result:", test.value_color_str)
sys.exit(app.exec_())
import sys
import platform
import os
from PySide2 import QtWidgets
from PySide2.QtWidgets import QWidget
from sohstationviewer.view.db_config.value_color_helper.\
edit_value_color_dialog.edit_value_color_dialog_super_class import \
EditValueColorDialog, display_color
class TriColorLinesDialog(EditValueColorDialog):
"""Dialog to edit color for triColorLines plot"""
def __init__(self, parent: QWidget, value_color_str: str):
"""
:param parent: the parent widget
:param value_color_str: string for value color to be saved in DB
"""
# Widget that allow user to add/edit value positive one's color
self.select_pos_one_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display positive one's color
self.pos_one_color_label = QtWidgets.QLabel()
self.pos_one_color_label.setFixedWidth(30)
self.pos_one_color_label.setAutoFillBackground(True)
# Widget that allow user to add/edit value zero's color
self.select_zero_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display down's color
self.zero_color_label = QtWidgets.QLabel()
self.zero_color_label.setFixedWidth(30)
self.zero_color_label.setAutoFillBackground(True)
# Widget that allow user to add/edit value positive one's color
self.select_neg_one_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display positive one's color
self.neg_one_color_label = QtWidgets.QLabel()
self.neg_one_color_label.setFixedWidth(30)
self.neg_one_color_label.setAutoFillBackground(True)
super(TriColorLinesDialog, self).__init__(parent, value_color_str)
self.setWindowTitle("Edit TriColor Plotting's Colors")
def setup_ui(self) -> None:
self.main_layout.addWidget(QtWidgets.QLabel('" 1" Color'), 0, 0, 1, 1)
self.main_layout.addWidget(self.pos_one_color_label, 0, 1, 1, 1)
self.main_layout.addWidget(self.select_pos_one_color_btn, 0, 2, 1, 1)
self.main_layout.addWidget(QtWidgets.QLabel('" 0" Color'), 1, 0, 1, 1)
self.main_layout.addWidget(self.zero_color_label, 1, 1, 1, 1)
self.main_layout.addWidget(self.select_zero_color_btn, 1, 2, 1, 1)
self.main_layout.addWidget(QtWidgets.QLabel('"-1" Color'), 2, 0, 1, 1)
self.main_layout.addWidget(self.neg_one_color_label, 2, 1, 1, 1)
self.main_layout.addWidget(self.select_neg_one_color_btn, 2, 2, 1, 1)
self.setup_complete_buttons(3)
def connect_signals(self) -> None:
self.select_pos_one_color_btn.clicked.connect(
lambda: self.on_select_color(self.pos_one_color_label))
self.select_zero_color_btn.clicked.connect(
lambda: self.on_select_color(self.zero_color_label))
self.select_neg_one_color_btn.clicked.connect(
lambda: self.on_select_color(self.neg_one_color_label))
super().connect_signals()
def set_value(self):
"""
Change the corresponding color_labels's color according to the color
from value_color_str.
"""
if self.value_color_str == "":
return
vc_parts = self.value_color_str.split('|')
for vc_str in vc_parts:
val, color = vc_str.split(':')
if val == '1':
display_color(self.pos_one_color_label, color)
if val == '0':
display_color(self.zero_color_label, color)
if val == '-1':
display_color(self.neg_one_color_label, color)
def on_save_color(self):
"""
Create value_color_str from GUI's info and close the GUI with color
is the hex color got from color_labels' color
"""
pos_one_color = self.pos_one_color_label.palette()\
.window().color().name()
zero_color = self.zero_color_label.palette().window().color().name()
neg_one_color = self.neg_one_color_label.palette() \
.window().color().name()
self.value_color_str = (f"-1:{neg_one_color}|0:{zero_color}"
f"|1:{pos_one_color}")
self.close()
if __name__ == '__main__':
os_name, version, *_ = platform.platform().split('-')
if os_name == 'macOS':
os.environ['QT_MAC_WANTS_LAYER'] = '1'
app = QtWidgets.QApplication(sys.argv)
test = TriColorLinesDialog(None, '-1:#FF00FF|0:#FF0000|1:#00FF00')
test.exec_()
print("result:", test.value_color_str)
sys.exit(app.exec_())
import sys
import platform
import os
from PySide2 import QtWidgets
from PySide2.QtWidgets import QWidget
from sohstationviewer.view.db_config.value_color_helper.\
edit_value_color_dialog.edit_value_color_dialog_super_class import \
EditValueColorDialog, display_color
class UpDownDialog(EditValueColorDialog):
"""Dialog to edit color for Up/Down Plot"""
def __init__(self, parent: QWidget, value_color_str: str):
"""
:param parent: the parent widget
:param value_color_str: string for value color to be saved in DB
"""
# Widget that allow user to add/edit up's color
self.select_up_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display up's color
self.up_color_label = QtWidgets.QLabel()
self.up_color_label.setFixedWidth(30)
self.up_color_label.setAutoFillBackground(True)
# Widget that allow user to add/edit down's color
self.select_down_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display down's color
self.down_color_label = QtWidgets.QLabel()
self.down_color_label.setFixedWidth(30)
self.down_color_label.setAutoFillBackground(True)
super(UpDownDialog, self).__init__(parent, value_color_str)
self.setWindowTitle("Edit Up/Down Plotting's Colors")
def setup_ui(self) -> None:
self.main_layout.addWidget(QtWidgets.QLabel('Up Color'), 0, 0, 1, 1)
self.main_layout.addWidget(self.up_color_label, 0, 1, 1, 1)
self.main_layout.addWidget(self.select_up_color_btn, 0, 2, 1, 1)
self.main_layout.addWidget(QtWidgets.QLabel('Down Color'), 1, 0, 1, 1)
self.main_layout.addWidget(self.down_color_label, 1, 1, 1, 1)
self.main_layout.addWidget(self.select_down_color_btn, 1, 2, 1, 1)
self.setup_complete_buttons(2)
def connect_signals(self) -> None:
self.select_up_color_btn.clicked.connect(
lambda: self.on_select_color(self.up_color_label))
self.select_down_color_btn.clicked.connect(
lambda: self.on_select_color(self.down_color_label))
super().connect_signals()
def set_value(self):
"""
Change the corresponding color_labels's color according to the color
from value_color_str.
"""
if self.value_color_str == "":
return
vc_parts = self.value_color_str.split('|')
for vc_str in vc_parts:
obj_type, color = vc_str.split(':')
if obj_type == 'Up':
display_color(self.up_color_label, color)
if obj_type == 'Down':
display_color(self.down_color_label, color)
def on_save_color(self):
"""
Create value_color_str from GUI's info and close the GUI with color
is the hex color got from color_labels' color
"""
up_color = self.up_color_label.palette().window().color().name()
down_color = self.down_color_label.palette().window().color().name()
self.value_color_str = f"Up:{up_color}|Down:{down_color}"
self.close()
if __name__ == '__main__':
os_name, version, *_ = platform.platform().split('-')
if os_name == 'macOS':
os.environ['QT_MAC_WANTS_LAYER'] = '1'
app = QtWidgets.QApplication(sys.argv)
test = UpDownDialog(None, 'Down:#FF0000|Up:#00FF00')
test.exec_()
print("result:", test.value_color_str)
sys.exit(app.exec_())
......@@ -36,8 +36,9 @@ from sohstationviewer.view.help_view import HelpBrowser
from sohstationviewer.view.ui.main_ui import UIMainWindow
from sohstationviewer.view.util.enums import LogType
from sohstationviewer.view.util.functions import (
check_chan_wildcards_format, check_masspos,
)
check_chan_wildcards_format, check_masspos)
from sohstationviewer.view.util.check_file_size import check_folders_size
from sohstationviewer.view.channel_prefer_dialog import ChannelPreferDialog
from sohstationviewer.view.create_muti_buttons_dialog import (
create_multi_buttons_dialog
......@@ -47,7 +48,6 @@ from sohstationviewer.controller.processing import detect_data_type
from sohstationviewer.controller.util import (
display_tracking_info, rt130_find_cf_dass, check_data_sdata
)
from sohstationviewer.database.process_db import execute_db_dict, execute_db
from sohstationviewer.conf.constants import TM_FORMAT, ColorMode, CONFIG_PATH
......@@ -574,7 +574,12 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
if self.list_of_dir == []:
msg = "No directories have been selected."
raise Exception(msg)
if self.warn_big_file_sizes.isChecked():
# call check_folder_size() here b/c it requires list_of_dir and it
# is before the called for detect_data_type() which sometimes take
# quite a long time.
if not check_folders_size(self.list_of_dir, self.req_wf_chans):
raise Exception("Big size")
# Log files don't have a data type that can be detected, so we don't
# detect the data type if we are reading them.
if self.rt130_das_dict == {} and not self.log_checkbox.isChecked():
......@@ -624,22 +629,16 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.has_problem = False
if self.gap_len_line_edit.text().strip() != '':
try:
# convert from minute to second
self.gap_minimum = float(
self.gap_len_line_edit.text()) * 60
except ValueError:
msg = "Minimum Gap must be a number."
QtWidgets.QMessageBox.warning(
self, "Invalid Minimum Gap request", msg)
self.cancel_loading()
return
if self.gap_minimum < 0.1:
# convert from minute to second
minimum_gap_in_minutes = float(self.gap_len_line_edit.text())
if minimum_gap_in_minutes < 0.1:
msg = "Minimum Gap must be greater than 0.1 minute to be " \
"detected."
QtWidgets.QMessageBox.warning(
self, "Invalid Minimum Gap request", msg)
self.cancel_loading()
return
self.gap_minimum = minimum_gap_in_minutes * 60
else:
self.gap_minimum = None
......@@ -680,6 +679,9 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.cancel_loading()
return
self.data_type == 'Unknown'
elif str(e) == "Big size":
self.cancel_loading()
return
else:
fmt = traceback.format_exc()
QtWidgets.QMessageBox.warning(
......
......@@ -219,6 +219,11 @@ class UIMainWindow(object):
# ========================== Option Menu =======================
"""
warn_big_file_sizes: option to check file sizes and give warning if
total file sizes are greater than constant.BIG_FILE_SIZE
"""
self.warn_big_file_sizes: Optional[QAction] = None
"""
mp_regular_color_action: set self.mass_pos_volt_range_opt to 'regular'
mp_trillium_color_action: set self.mass_pos_volt_range_opt to
'trillium'
......@@ -473,6 +478,10 @@ class UIMainWindow(object):
gap_layout.addWidget(QLabel("Minimum Gap Length "))
self.gap_len_line_edit = QLineEdit(self.central_widget)
gap_validator = QtGui.QDoubleValidator()
gap_validator.setDecimals(2)
gap_validator.setNotation(QtGui.QDoubleValidator.StandardNotation)
self.gap_len_line_edit.setValidator(gap_validator)
gap_layout.addWidget(self.gap_len_line_edit)
gap_layout.addWidget(QLabel(' m'))
......@@ -629,6 +638,13 @@ class UIMainWindow(object):
:param main_window: QMainWindow - main GUI for user to interact with
:param menu: QMenu - Options Menu
"""
self.warn_big_file_sizes = QAction(
'Warn big file sizes', main_window
)
self.warn_big_file_sizes.setCheckable(True)
menu.addAction(self.warn_big_file_sizes)
menu.addSeparator()
mp_coloring_menu = QMenu('MP Coloring:', main_window)
menu.addMenu(mp_coloring_menu)
mp_coloring_group = QActionGroup(main_window)
......
from typing import List, Union, Optional, Tuple, Dict
import sys
import os
from PySide2.QtWidgets import QMessageBox, QApplication
from obspy.io.reftek.core import _is_reftek130
from sohstationviewer.conf.constants import BIG_FILE_SIZE
from sohstationviewer.controller.util import validate_file
from sohstationviewer.controller.processing import (
get_next_channel_from_mseed_file)
def _get_file_type(path2file: str) -> str:
"""
Get type of the given file:
+ TEXT if strip() can be used for the first 64 bytes
+ MSEED if channel name of the first record can be read
+ RT130 is decided using obspy method _is_reftek130()
+ BINARY if none of the above types are detected.
:param path2file: absolute path to file
:return: file's type
"""
with open(path2file, 'r') as file:
try:
file.read(64).strip()
return 'TEXT'
except UnicodeDecodeError:
pass
with open(path2file, 'rb') as file:
try:
# read header of the first record to decide MSEED file
get_next_channel_from_mseed_file(file)
return 'MSEED'
except ValueError:
if _is_reftek130(path2file):
return 'RT130'
else:
return 'BINARY'
def _get_multiplex_and_chan_id(path2file: str,
is_multiplex: bool,
non_multiplexed_reach_total_limit: bool
) -> Tuple[bool, Optional[str]]:
"""
Recursively reading a part of header of each data record in the given file
to get the channel name.
+ If more than one channel names are detected in a file, the data set is
definitely multiplexed.
+ If only one channel name is detected in a file, keep checking other
files until the number of files checked goes up to 10 then decide the
data set is non multiplexed which means using the first record to
decide the channel of the whole file.
:param path2file: absolute path to file
:param is_multiplex: multiplex status of the data set so far
:param non_multiplexed_reach_total_limit: the total of multiplexed
files reach limit
:return is_multiplex: multiplex status of the data set after the file is
read
:return chan: the last channel name read from this file.
+ If the file is multiplexed, which channel name return isn't matter.
+ If the file is non-multiplexed, all records have the same channel name.
+ chan can be None if the file isn't mseed, e.g. TEXT.
"""
chan = None
file = open(path2file, 'rb')
chans_in_stream = set()
while 1:
# read all records in a file until it is detected as multiplexed
is_eof = (file.read(1) == b'')
if is_eof:
break
file.seek(-1, 1)
try:
chan = get_next_channel_from_mseed_file(file)
if non_multiplexed_reach_total_limit:
# If total of non_multiplexed files reach limit, don't need
# to check all records anymore but treat the file as
# non-multiplexed which is all records have the same channel id
# => Use the first record to decide the channel of the file.
break
except ValueError:
file.close()
break
chans_in_stream.add(chan)
if len(chans_in_stream) > 1:
# a file is multiplexed if it has more than one channel id
is_multiplex = True
break
file.close()
return is_multiplex, chan
def _get_size_of_non_multiplex_waveform_file(
dir_path: str, req_wf_chans: List[Union[str, int]]):
"""
Estimate size of directory by collecting sizes of non-multiplexed waveform
files. This way, we can skip reading sizes of small files which makes the
speed of reading sizes improved especially for the case that the number of
small files is big.
:param dir_path: absolute path to directory
:param req_wf_chans: waveform request which can be list of data streams or
list of mseed wildcards
:return total_size:
+ 0 if this method won't be used to estimate the size of the directory
+ Estimated total size of the directory up to where it is greater than
BIG_FILE_SIZE
"""
# list of prefix of high sampling rate channels of which files' sizes are
# significantly larger than the ones of lower sampling rate channels.
wf_high_spr_prefix = ['FH', 'FN', # ≥ 1000 to < 5000
'GH', 'GL', # ≥ 1000 to < 5000
'DH', 'DL', # ≥ 250 to < 1000
'CH', 'CN', # ≥ 250 to < 1000
'EH', 'EL', 'EP', # ≥ 80
'SH', 'SL', 'SP', # ≥ 10 to < 80
'HH', 'HN', # ≥ 80
'BH', 'BN', # ≥ 10 to < 80
'MH', 'MN', 'MP', 'ML'] # >1 to < 10
"""
'LH','LL', 'LP', 'LN' =1
'VP', 'VL', 'VL', 'VH' = 0.1
'UN', 'UP', 'UL', 'UH' <=0.01
Skip channels with sampling rate <=1 because there are less data in the
files, which can result many files with small sizes in compare with sizes
of high sample rate files. (For case that the data set has only low
sampling rates, collecting data sizes won't be conducted in this method.)
"""
wf_chan_possibilities = set()
for request in req_wf_chans:
if request == '*':
wf_chan_possibilities.update(wf_high_spr_prefix)
elif request[0] == '*':
wf_chan_possibilities.update(
[prefix for prefix in wf_high_spr_prefix
if prefix[1] == request[1]])
elif request[1] == '*':
wf_chan_possibilities.update(
[prefix for prefix in wf_high_spr_prefix
if prefix[0] == request[0]])
if len(wf_chan_possibilities) == 0:
# no waveform channels available to pick, this method is not available
# for this data set.
return 0
total_size = 0
is_multiplex = False
count = 0
total_non_multiplexed_limit = 10
for path, subdirs, files in os.walk(dir_path):
total_of_mseed_files = 0
for file_name in files:
if not validate_file(os.path.join(path, file_name), file_name):
continue
fp = os.path.join(path, file_name)
is_multiplex, chan = _get_multiplex_and_chan_id(
fp, is_multiplex, count >= total_non_multiplexed_limit)
if chan is None:
continue
count += 1
total_of_mseed_files += 1
if is_multiplex:
# Don't use this method for multiplexed data set to elevate
# the speed
return 0
# not multiplex
if (chan is not None and
chan.startswith(tuple(wf_chan_possibilities))):
# ------ high sample rate mseed ------
# to help skip get size of too many small files,
# only read the big files which are in the list wf_chan_pos
file_size = os.path.getsize(fp)
total_size += file_size
if total_size > BIG_FILE_SIZE:
return total_size
else:
# ------ low sample rate mseed ------
if total_of_mseed_files == 50:
# When there are more than 50 low sampling rate mseed files
# in a folder, break the for loop to move to a different
# folder.
break
return total_size
def _get_size_rt130(dir_path: str, req_ds: List[int]):
"""
Get size of RT130's requested datas treams which is inside folder that has
data stream number as name.
:param dir_path: absolute path to directory
:param req_ds: list of requested data streams
:return total_size: total size of requested data streams up to where it
greater than BIG_FILE_SIZE
"""
if req_ds == ['*']:
req_ds = ['1', '2', '3', '4', '5', '6', '7', '8']
else:
req_ds = [str(req) for req in req_ds]
total_size = 0
for path, subdirs, files in os.walk(dir_path):
path_parts = path.split(os.sep)
ds = path_parts[-1]
if ds in req_ds:
# the direct folder of rt130 file must be named after data stream
for file_name in files:
fp = os.path.join(path, file_name)
file_size = os.path.getsize(fp)
total_size += file_size
if total_size > BIG_FILE_SIZE:
break
return total_size
def _get_size_mseed(dir_path: str) -> int:
"""
Get size of all files until total size > BIG_FILE_SIZE
:param dir_path: absolute path to directory
:return total_size: total size of the directory up to where it greater
than BIG_FILE_SIZE
"""
total_size = 0
count = 0
for path, subdirs, files in os.walk(dir_path):
for file_name in files:
if not validate_file(os.path.join(path, file_name), file_name):
continue
fp = os.path.join(path, file_name)
file_size = os.path.getsize(fp)
total_size += file_size
count += 1
if total_size > BIG_FILE_SIZE:
break
return total_size
def _get_dir_size(dir_path: str, req_wf_chans: List[Union[str, int]]):
"""
Get size of directory.
To make the process go fast, separate it into different case:
+ Non-multiplex MSeed with high sampling rate waveform
+ The rest cases of MSeed
+ RT130
+ If only text files or binary files found, count the most 200 files
and ask user to decide stopping or continuing process at their
own risk
:param dir_path: absolute path to directory
:param req_wf_chans: waveform request which can be list of data streams or
list of mseed wildcards
:return total_size:
+ 0 if don't have waveform request
+ total size of the directory up to where it greater than BIG_FILE_SIZE
+ -1 if count more than 200 TEXT files
+ -2 if count more than 200 BINARY files of which types are unkonwn
"""
text_file_count = 0
binary_file_count = 0
for path, subdirs, files in os.walk(dir_path):
for file_name in files:
path2file = os.path.join(path, file_name)
if not validate_file(path2file, file_name):
continue
file_type = _get_file_type(path2file)
if file_type == 'TEXT':
text_file_count += 1
if text_file_count > 200:
return {'data_size': -1, 'text_count': text_file_count}
continue
elif file_type == 'RT130':
return {'data_size': _get_size_rt130(dir_path, req_wf_chans)}
elif file_type == 'MSEED':
total_size = _get_size_of_non_multiplex_waveform_file(
dir_path, req_wf_chans)
if total_size != 0:
return {'data_size': total_size}
else:
return {'data_size': _get_size_mseed(dir_path)}
else:
binary_file_count += 1
if binary_file_count > 200:
return {'data_size': -1, 'binary_count': binary_file_count}
return {'data_size': -1,
'binary_count': binary_file_count, 'text_count': text_file_count}
def _abort_dialog(msg: str) -> bool:
"""
Provide confirming dialog for user to continue or not
:param msg: message of what need to be confirmed
:return: True for the confirmation. False for the cancel.
"""
dlg = QMessageBox()
dlg.setText(msg)
dlg.setInformativeText('Do you want to proceed?')
dlg.setStandardButtons(QMessageBox.Yes |
QMessageBox.Abort)
dlg.setDefaultButton(QMessageBox.Abort)
dlg.setIcon(QMessageBox.Question)
ret = dlg.exec_()
if ret == QMessageBox.Abort:
return False
else:
return True
def _check_folders_size(dir_paths: List[str],
req_wf_chans: List[Union[str, int]]
) -> Dict[str, int]:
"""
Check the folders in the list dir_paths for size of data files and count of
text file or binary.
:param dir_paths: list of paths to check for sizes
:param req_wf_chans: requirement of waveform channels
:return: dictionary of size or count info in the dir_paths
"""
final_result = {'data_size': 0, 'text_count': 0, 'binary_count': 0}
for dir_path in dir_paths:
if not os.path.isdir(dir_path):
raise Exception(f"'{dir_path}' isn't a valid directory")
result = _get_dir_size(dir_path, req_wf_chans)
if result['data_size'] >= 0:
final_result['data_size'] += result['data_size']
if final_result['data_size'] > BIG_FILE_SIZE:
break
else:
# only consider text and binary if no data
if 'text_count' in result:
final_result['text_count'] += result['text_count']
if final_result['text_count'] > 200:
break
if 'binary_count' in result:
final_result['binary_count'] += result['binary_count']
if final_result['binary_count'] > 200:
break
return final_result
def check_folders_size(dir_paths: List[str],
req_wf_chans: List[Union[str, int]]
) -> bool:
"""
Check the folders in the list dir_paths:
+ If found data in folders, return True if size <= BIG_FILE_SIZE.
Otherwise, ask user to continue or not.
+ If there are no data files at all, report the files found and ask user to
continue or not
:param dir_paths: list of paths to check for sizes
:param req_wf_chans: requirement of waveform channels
:return: True if the check is passed and False otherwise
"""
try:
final_result = _check_folders_size(dir_paths, req_wf_chans)
except Exception as e:
QMessageBox.information(None, "Error", str(e))
return False
if final_result['data_size'] > BIG_FILE_SIZE:
msg = ('The selected data set is greater than 2GB. It '
'might take a while to finish reading '
'and plotting everything.')
return _abort_dialog(msg)
elif final_result['data_size'] > 0:
return True
elif final_result['text_count'] > 200:
msg = ("There are more than 200 text files detected."
"Do you want to continue at your own risk?")
return _abort_dialog(msg)
elif final_result['binary_count'] > 200:
msg = ("There are more than 200 binary files detected."
"Do you want to continue at your own risk?")
return _abort_dialog(msg)
else:
file_info = []
if final_result['text_count'] > 0:
file_info.append(f"{final_result['text_count']} text files")
if final_result['binary_count'] > 0:
file_info.append(f"{final_result['binary_count']} binary files")
file_info_str = ' and '.join(file_info)
msg = (f"There are {file_info_str} detected with no data files."
"Do you want to continue at your own risk?")
return _abort_dialog(msg)
if __name__ == '__main__':
import platform
# Enable Layer-backing for MacOs version >= 11
# Only needed if using the pyside2 library with version>=5.15.
# Layer-backing is always enabled in pyside6.
os_name, version, *_ = platform.platform().split('-')
# if os_name == 'macOS' and version >= '11':
# mac OSX 11.6 appear to be 10.16 when read with python and still required
# this environment variable
if os_name == 'macOS':
os.environ['QT_MAC_WANTS_LAYER'] = '1'
app = QApplication(sys.argv)
print("BIG FILE SIZE:", BIG_FILE_SIZE)
data_folder = "/Volumes/UNTITLED/SOHView_data/"
"""
The following examples are based on BIG_FILE_SIZE = 2GB
"""
# ============== Centaur ========================
# multiplexed mseed: 1530200576B; system:1.53GB
# print(check_folders_size(
# [f'{data_folder}Centaur/Centaur_DataTest.nan'], ['*']))
# multiplexed mseed: 34171904; system:34.2MB
# print(check_folders_size(
# [f'{data_folder}Centaur/Centaur-2018-3734.nan'], ['*']))
# multiplexed mseed: 25198592; system:25.2MB
# print(check_folders_size(
# [f'{data_folder}Centaur/Centaur3604_100sps.nan'], ['*']))
# multiplexed mseed: 468665991; system:468.7 MB
# print(check_folders_size(
# [f'{data_folder}Centaur/CentaurDiskSOH600.nan'], ['*']))
# multiplexed mseed: 20992; system:21 KB
# print(check_folders_size([f'{data_folder}Centaur/soh'], ['*']))
# multiplexed mseed: 700416; system:700 KB
# print(check_folders_size([f'{data_folder}Centaur/SOH_split600'], ['*']))
# ============ pegasus ==============
# non-multiplexed mseed: 1703583744; system:1.72 GB
# total files: 1534
# total files counted for size: 153
# print(check_folders_size(
# [f'{data_folder}Pegasus/dave_pegasus.nan'], ['*']))
# non-multiplexed mseed: 288489472; system:292.3 MB
# total files: 251
# total files counted for size: 24
# print(check_folders_size(
# [f'{data_folder}Pegasus/GNARBOX_svc1'], ['*']))
# non-multiplexed mseed: 151818240; system: 152.6 MB
# total files: 112
# total files counted for size: 12
# print(check_folders_size(
# [f'{data_folder}Pegasus/LaptopSvc4_part'], ['*']))
# non-multiplexed mseed: 151818240; system: 378.7 MB
# total files: 919
# total files counted for size: 84
# print(check_folders_size(
# [f'{data_folder}Pegasus/Pegasus Offloads'], ['*']))
# non-multiplexed mseed: over limit, stop at 2002317312; system: 2.78 GB
# total files: 1882
# total files counted for size: 151
# print(check_folders_size(
# [f'{data_folder}Pegasus/Pegasus_KCT06_Test.nan'], ['*']))
# non-multiplexed mseed: 547151872; system: 571.4 MB
# total files: 578
# total files counted for size: 84
# print(check_folders_size(
# [f'{data_folder}Pegasus/Pegasus_SVC4.nan'], ['*']))
# non-multiplexed mseed: 108064768; system: 108.1 MB
# total files: 10
# total files counted for size: 9
# print(check_folders_size(
# [f'{data_folder}Pegasus/PegasusData_LittleEndian'], ['*']))
# ============ q330 ==============
# non-multiplexed mseed: over limit, stop at 2013265920; system: 11.25 GB
# total files: 685
# total files counted for size: 120
# print(check_folders_size(
# [f'{data_folder}Q330/5083.sdr'], ['*']))
# non-multiplexed mseed: 20725760; system: 21.1 MB
# total files: 21
# total files counted for size: 3
# print(check_folders_size(
# [f'{data_folder}Q330/5244.sdr'], ['*']))
# multiplexed mseed: 341540864; system: 341.5 MB
# print(check_folders_size(
# [f'{data_folder}Q330/B44-4000939.sdr/data'], ['*']))
# multiplexed mseed: 17319742; system: 17.3 MB
# print(check_folders_size(
# [f'{data_folder}Q330/GLISN-REF-SENSLOC-2018.06.26'], ['*']))
# non-multiplexed mseed: over limit, stop at 2013265920; system: 7.55 GB
# total files: 465
# total files counted for size: 120
# print(check_folders_size(
# [f'{data_folder}Q330/Q330_5281.sdr'], ['*']))
# ============ rt130 ==============
# rt130: over limit, stop at 2071080960; system: 3.6 GB
# print(check_folders_size(
# [f'{data_folder}RT130/9BB3_D1.cf'], ['*']))
# rt130: over limit, stop at 2008623104; system: 2.16 GB
# print(check_folders_size(
# [f'{data_folder}RT130/9BB3_D2.cf'], ['*']))
# rt130: 95880192; system: 95.9 MB
# print(check_folders_size(
# [f'{data_folder}RT130/9BB3_D3.cf'], ['*']))
# rt130: 1227625472; system: 1.24 GB
# print(check_folders_size(
# [f'{data_folder}RT130/2011028.9AFA'], ['*']))
# rt130: 294737920; system: 296.8 MB
# print(check_folders_size(
# [f'{data_folder}RT130/2013326.9E4A'], ['*']))
# rt130: 1375256576; system: 1.38 GB
# print(check_folders_size(
# [f'{data_folder}RT130/2016174.9AC4'], ['*']))
# rt130: 46885888; system: 46.9 MB
# print(check_folders_size(
# [f'{data_folder}RT130/2017149.92EB'], ['*']))
# rt130: over limit, stop at 2087160832; system: 4.01 GB
# print(check_folders_size(
# [f'{data_folder}RT130/RT130-92E9-1.cf'], ['*']))
# rt130: 11527168; system: 11.5 MB
# print(check_folders_size(
# [f'{data_folder}RT130/RT130-2016290.9DF5.cf'], ['*']))
# rt130: 126618624; system: 127.4 MB
# print(check_folders_size(
# [f'{data_folder}RT130/RT130-A195-1.cf'], ['*']))
# rt130: 32062464; system: 32.2 MB
# print(check_folders_size(
# [f'{data_folder}RT130/testCF'], ['*']))
# rt130: size: 306176; system: 319 KB
# print(check_folders_size(
# [f'{data_folder}RT130/TESTRT130'], ['*']))
# ===============================
# text:
# print(check_folders_size(
# [f'{data_folder}Pegasus/Pegasus Offloads/logs'], ['*']))
# =================================
data_folder = "/Volumes/UNTITLED/issues_from_data_group/"
# mseed: size: 496574464; system: 496.6 MB
# print(check_folders_size(
# [f'{data_folder}6407.sdr'], ['*']))
# # non-multiplex mseed: size: 40435712; system: 41.2 MB
# print(check_folders_size(
# [f'{data_folder}77702'], ['*']))
# mseed: size: 206174720; system: 206.2 MB
# print(check_folders_size(
# [f'{data_folder}CONZ-5296-SOH.nan'], ['*']))
# non-multiplexed mseed: over limit, stop at 2013265920; system: 19.54 GB
# print(check_folders_size(
# [f'{data_folder}ELHT-6445.sdr'], ['*']))
# non-multiplexed mseed: 1814528; system: 37.6 MB
# Only one high sampling rate waveform file, many small soh files
# and text file => The size got is the waveform file. The size isn't
# correct but doesn't affect much of the result.
# THIS CASE IS SPECIAL.
# the first time for some reason it couldn't stop process.
# print(check_folders_size(
# [f'{data_folder}MN38'], ['*']))
# non-multiplexed mseed: 120705024; system: 120.7 MB
# No waveform files.
# print(check_folders_size(
# [f'{data_folder}NHRK.sdr'], ['*']))
# mseed: 708777984; system: 712.1 MB
# print(check_folders_size(
# [f'{data_folder}RT-9926-1.cf'], ['*']))
print("FINISH")
sys.exit(app.exec_())
......@@ -183,7 +183,7 @@ class TestReftek(BaseTestCase):
expected_soh = [
'SOH/Data Def', 'Battery Volt', 'DAS Temp', 'Backup Volt',
'Disk Usage1', 'Disk Usage2', 'Dump Called/Comp', 'GPS On/Off/Err',
'GPS Lk/Unlk', 'Clk Phase Err', 'Event DS1', 'Event DS9']
'GPS Lk/Unlk', 'Clk Phase Err', 'Event DS1']
expected_waveform = ['DS1-1', 'DS1-2', 'DS1-3']
obj = RT130(**args)
self.assertEqual(obj.found_data_streams, [9, 1, 1, 2, 2])
......
......@@ -339,7 +339,6 @@ class MockMSeed(MSeed):
def __init__(self): # noqa
self.notification_signal = None
self.tmp_dir = ''
def track_info(self, text: str, type: LogType) -> None:
print(text)
......
from tempfile import TemporaryDirectory, NamedTemporaryFile
import shutil
import os
from pathlib import Path
from unittest import TestCase
from sohstationviewer.view.util.check_file_size import _check_folders_size
from sohstationviewer.conf.constants import BIG_FILE_SIZE
TEST_DATA_DIR = Path(__file__).resolve().parent.parent.parent.joinpath(
'test_data')
NON_DATA_FILE = TEST_DATA_DIR.joinpath('Non-data-file/non_data_file')
MULTIPLEX_FILE = TEST_DATA_DIR.joinpath(
'Q330_multiplex/XX-3203_4-20221222183011')
NON_MULTIPLEX_LOW_SPR_FILE = TEST_DATA_DIR.joinpath(
'Q330-sample/day_vols_AX08/AX08.XA..VM1.2021.186')
NON_MULTIPLEX_HIGH_SPR_FILE = TEST_DATA_DIR.joinpath(
'Q330-sample/day_vols_AX08/AX08.XA..HHE.2021.186')
NON_MULTIPLEX_HIGH_N_LOW_SPR_SET = TEST_DATA_DIR.joinpath('Q330-sample')
RT130_FILE = TEST_DATA_DIR.joinpath(
'RT130-sample/2017149.92EB/2017150/92EB/1/010000015_0036EE80')
class TestGetDirSize(TestCase):
def test_less_or_equal_200_text_files(self):
number_of_text_files = 25
with TemporaryDirectory() as directory:
files = []
for i in range(number_of_text_files):
files.append(NamedTemporaryFile(dir=directory))
expected_result = {'data_size': 0,
'text_count': 25,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
# Explicitly clean up the temporary files. If we don't do this,
# the temporary directory will clean up itself and delete the
# temporary files. Then, when the function returns, the references
# to these temporary files will attempt to clean up the files. This
# leads to exceptions being raised because the files being cleaned
# up does not exist anymore.
[file.close() for file in files]
def test_more_than_200_text_files(self):
number_of_text_files = 250
with TemporaryDirectory() as directory:
files = []
for i in range(number_of_text_files):
files.append(NamedTemporaryFile(dir=directory))
expected_result = {'data_size': 0,
'text_count': 201, # stop when more than 200
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[file.close() for file in files]
def test_less_or_equal_200_binary_files(self):
number_of_binary_files = 25
with TemporaryDirectory() as directory:
files = []
for i in range(number_of_binary_files):
new_file_path = Path(directory).joinpath(
f'{NON_DATA_FILE.name}_{i}')
shutil.copy(NON_DATA_FILE, new_file_path)
files.append(new_file_path)
expected_result = {'data_size': 0,
'text_count': 0,
'binary_count': 25}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_more_than_200_binary_files(self):
number_of_binary_files = 250
with TemporaryDirectory() as directory:
files = []
for i in range(number_of_binary_files):
new_file_path = Path(directory).joinpath(
f'{NON_DATA_FILE.name}_{i}')
shutil.copy(NON_DATA_FILE, new_file_path)
files.append(new_file_path)
expected_result = {'data_size': 0,
'text_count': 0,
'binary_count': 201} # stop when more than 200
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_less_or_equal_limit_mseed_multiplexed_files(self):
sample_file_size = os.path.getsize(MULTIPLEX_FILE)
expected_size = 0
with TemporaryDirectory() as directory:
files = []
for i in range(3):
new_file_path = Path(directory).joinpath(
f'{MULTIPLEX_FILE.name}_{i}')
shutil.copy(MULTIPLEX_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_more_than_limit_mseed_multiplexed_files(self):
sample_file_size = os.path.getsize(MULTIPLEX_FILE)
expected_size = 0
count = 0
with TemporaryDirectory() as directory:
files = []
while 1:
new_file_path = Path(directory).joinpath(
f'{MULTIPLEX_FILE.name}_{count}')
shutil.copy(MULTIPLEX_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
if expected_size > BIG_FILE_SIZE:
break
count += 1
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_less_or_equal_limit_mseed_non_multiplexed_low_spr_files(self):
sample_file_size = os.path.getsize(NON_MULTIPLEX_LOW_SPR_FILE)
expected_size = 0
with TemporaryDirectory() as directory:
files = []
for i in range(3):
new_file_path = Path(directory).joinpath(
f'{NON_MULTIPLEX_LOW_SPR_FILE.name}_{i}')
shutil.copy(NON_MULTIPLEX_LOW_SPR_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_less_or_equal_limit_mseed_non_multiplexed_high_spr_files(self):
sample_file_size = os.path.getsize(NON_MULTIPLEX_HIGH_SPR_FILE)
expected_size = 0
with TemporaryDirectory() as directory:
files = []
for i in range(3):
new_file_path = Path(directory).joinpath(
f'{NON_MULTIPLEX_HIGH_SPR_FILE.name}_{i}')
shutil.copy(NON_MULTIPLEX_HIGH_SPR_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_more_than_limit_mseed_non_multiplexed_high_spr_files(self):
sample_file_size = os.path.getsize(NON_MULTIPLEX_HIGH_SPR_FILE)
expected_size = 0
count = 0
with TemporaryDirectory() as directory:
files = []
while 1:
new_file_path = Path(directory).joinpath(
f'{NON_MULTIPLEX_HIGH_SPR_FILE.name}_{count}')
shutil.copy(NON_MULTIPLEX_HIGH_SPR_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
if expected_size > BIG_FILE_SIZE:
break
count += 1
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_mseed_non_multiplexed_high_n_low_spr_files(self):
expected_result = {'data_size': 11251712,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([NON_MULTIPLEX_HIGH_N_LOW_SPR_SET], [])
self.assertEqual(ret, expected_result)
def test_less_or_equal_limit_rt130_files(self):
sample_file_size = os.path.getsize(RT130_FILE)
expected_size = 0
with TemporaryDirectory() as directory:
files = []
new_data_stream_path = Path(directory).joinpath('1')
new_data_stream_path.mkdir(
parents=True, exist_ok=True)
for i in range(3):
new_file_path = new_data_stream_path.joinpath(
f'{RT130_FILE.name}_{i}')
shutil.copy(RT130_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], ['1'])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_more_than_limit_rt130_files(self):
sample_file_size = os.path.getsize(RT130_FILE)
expected_size = 0
count = 0
with TemporaryDirectory() as directory:
files = []
new_data_stream_path = Path(directory).joinpath('1')
new_data_stream_path.mkdir(
parents=True, exist_ok=True)
while 1:
new_file_path = new_data_stream_path.joinpath(
f'{RT130_FILE.name}_{count}')
shutil.copy(RT130_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
if expected_size > BIG_FILE_SIZE:
break
count += 1
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], ['1'])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_rt130_no_requested_datastream_files(self):
sample_file_size = os.path.getsize(RT130_FILE)
expected_size = 0
with TemporaryDirectory() as directory:
files = []
new_data_stream_path = Path(directory).joinpath('1')
new_data_stream_path.mkdir(
parents=True, exist_ok=True)
for i in range(3):
new_file_path = new_data_stream_path.joinpath(
f'{RT130_FILE.name}_{i}')
shutil.copy(RT130_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
expected_result = {'data_size': 0,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], ['2'])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_empty_directory(self):
with TemporaryDirectory() as temp_dir:
expected_result = {'data_size': 0,
'text_count': 0,
'binary_count': 0}
result = _check_folders_size([temp_dir], ['*'])
self.assertEqual(result, expected_result)
def test_directory_does_not_exist(self):
empty_name_dir = ''
try:
_check_folders_size([empty_name_dir], [])
except Exception as e:
self.assertEqual(
str(e),
"'' isn't a valid directory"
)
non_existent_dir = 'directory does not exist'
try:
_check_folders_size([non_existent_dir], [])
except Exception as e:
self.assertEqual(
str(e),
"'directory does not exist' isn't a valid directory"
)