Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
Commits on Source (17)
Showing
with 2371 additions and 45 deletions
......@@ -12,7 +12,7 @@ WF_3RD = 'ZNE123456'
HIGHEST_INT = 1E100
# warn user if file bigger than this size
BIG_FILE_SIZE = 2 * 10**8
BIG_FILE_SIZE = 2 * 10**9 # 2 GB
# Matplotlib's performance be slow if data point total > than this limit
CHAN_SIZE_LIMIT = 10**6
......
......@@ -160,3 +160,35 @@ def get_color_ranges():
clr_labels[idx].append("+/- {:,} counts".format(cnt))
clr_labels[idx].append("> {:,} counts".format(cnt))
return range_names, all_square_counts, clr_labels
def create_assign_string_for_db_query(col: str, val: str) -> str:
"""
Create assign db string that assign value in single quote signs if val is
a string or to NULL if val is empty str
:param col: column name in the db table
:param val: value to be assigned to the column
:return: the assign db string
"""
return f"{col}='{val}'" if val != '' else f"{col}=NULL"
def get_params():
# get parameter list from database
param_rows = execute_db("SELECT param from parameters")
return sorted([d[0] for d in param_rows])
def get_channel_info(chan_id: str, data_type: str):
# get channel info from DB
sql = f"SELECT * FROM Channels " \
f"WHERE channel='{chan_id}' AND dataType='{data_type}'"
chan_info = execute_db_dict(sql)[0]
return chan_info
def get_param_info(param: str):
# get all info of a param from DB
sql = f"SELECT * FROM Parameters WHERE param='{param}'"
param_info = execute_db_dict(sql)[0]
return param_info
from __future__ import annotations
import os
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Optional, Union, List, Tuple, Dict
import traceback
......@@ -14,7 +13,6 @@ from sohstationviewer.controller.util import \
display_tracking_info, get_valid_file_count, validate_file, validate_dir
from sohstationviewer.view.plotting.gps_plot.gps_point import GPSPoint
from sohstationviewer.view.util.enums import LogType
from sohstationviewer.database.process_db import execute_db
from sohstationviewer.model.general_data.general_data_helper import \
retrieve_data_time_from_data_dict, retrieve_gaps_from_data_dict, \
combine_data, sort_data, squash_gaps, apply_convert_factor_to_data_dict, \
......@@ -148,14 +146,6 @@ class GeneralData():
"""
self.gaps: Dict[DataKey, List[List[float]]] = {}
"""
tmp_dir: dir to keep memmap files. Deleted when object is deleted
"""
self.tmp_dir_obj: TemporaryDirectory = TemporaryDirectory()
self.tmp_dir = self.tmp_dir_obj.name
if not on_unittest:
self.save_temp_data_folder_to_database()
self._pauser = QtCore.QSemaphore()
self.pause_response = None
......@@ -279,17 +269,6 @@ class GeneralData():
self.selected_key = self.keys[ret]
return True
def __del__(self):
print("delete dataType Object")
try:
del self.tmp_dir_obj
except OSError as e:
self.track_info(
"Error deleting %s : %s" % (self.tmp_dir, e.strerror),
LogType.ERROR)
print("Error deleting %s : %s" % (self.tmp_dir, e.strerror))
print("finish deleting")
def track_info(self, text: str, type: LogType) -> None:
"""
Display tracking info in tracking_box.
......@@ -361,10 +340,6 @@ class GeneralData():
"""
return cls.__new__(cls)
def save_temp_data_folder_to_database(self):
execute_db(f'UPDATE PersistentData SET FieldValue="{self.tmp_dir}" '
f'WHERE FieldName="tempDataDirectory"')
def sort_all_data(self):
"""
Sort traces by startTmEpoch on all data: waveform_data, mass_pos_data,
......
......@@ -377,8 +377,8 @@ class RT130(GeneralData):
cur_key = (rt130._data[0]['unit_id'].decode(),
f"{rt130._data[0]['experiment_number']}")
self.populate_cur_key_for_all_data(cur_key)
self.get_ehet_in_log_data(rt130, cur_key)
if data_stream != 9:
self.get_ehet_in_log_data(rt130, cur_key)
self.get_mass_pos_data_and_waveform_data(rt130, data_stream, cur_key)
def get_ehet_in_log_data(self, rt130: core.Reftek130,
......
import sys
import platform
import os
from typing import Optional, Dict
from PySide2 import QtWidgets, QtGui
from PySide2.QtWidgets import QWidget, QDialog
from sohstationviewer.database.process_db import execute_db
from sohstationviewer.database.extract_data import (
get_params, get_channel_info, create_assign_string_for_db_query
)
from sohstationviewer.view.db_config.edit_single_param_dialog import \
EditSingleParamDialog
from sohstationviewer.conf.dbSettings import modify_db_path
def add_separation_line(layout):
"""
Add a line for separation to the given layout.
:param layout: QLayout - the layout that contains the line
"""
label = QtWidgets.QLabel()
label.setFrameStyle(QtWidgets.QFrame.HLine | QtWidgets.QFrame.Sunken)
label.setLineWidth(1)
layout.addWidget(label)
class AddEditSingleChannelDialog(QDialog):
"""
Dialog to add info for channel not in database or edit the existing channel
"""
def __init__(self, parent: Optional[QWidget],
chan_id: str, data_type: str):
"""
:param parent: the parent widget
:param chan_id: name of channel to be added/edited
:param data_type: type of the data being processed
"""
self.parent = parent
# name of the channel
self.chan_id = chan_id
# data_type of the channel
self.data_type = data_type
# param of the channel
self.param: str = 'Default'
# True if this channel isn't in DB yet
self.is_new_db_channel: bool = False
# To skip on_param_chkbox_changed() when param is changed by the
# program at the beginning
self.param_changed_by_signal: bool = False
# database info of the channel
self.channel_info: Dict = {}
# database info of the channel's parameter
self.param_info: Dict = {}
super(AddEditSingleChannelDialog, self).__init__(parent)
# short name of the channel
self.channel_name_lnedit = QtWidgets.QLineEdit(self)
self.channel_name_lnedit.setReadOnly(True)
# description added to channel name
self.label_lnedit = QtWidgets.QLineEdit(self)
self.label_lnedit.setPlaceholderText(
"added to channel name to be displayed")
# convert factor to change from count to actual value
self.conversion_lnedit = QtWidgets.QLineEdit(self)
self.conversion_lnedit.setPlaceholderText(
"to convert from count to actual value"
)
validator = QtGui.QDoubleValidator(0.0, 5.0, 6)
validator.setNotation(QtGui.QDoubleValidator.StandardNotation)
self.conversion_lnedit.setValidator(validator)
self.conversion_lnedit.setText('1')
# channel's unit
self.unit_lnedit = QtWidgets.QLineEdit(self)
# dedimal point for channel's value
self.fix_point_spnbox = QtWidgets.QSpinBox()
self.fix_point_spnbox.setToolTip("Decimal point that allow in display")
self.fix_point_spnbox.setMinimum(0)
self.fix_point_spnbox.setMaximum(5)
# data_type
self.data_type_lnedit = QtWidgets.QLineEdit(self)
self.data_type_lnedit.setReadOnly(True)
# channel's parameter which decides how channel is plotted
self.param_cbobox = QtWidgets.QComboBox(self)
self.param_cbobox.addItems(get_params())
# button to edit param
self.edit_param_btn = QtWidgets.QPushButton("EDIT PARAMETER", self)
# button to save changes to DB
self.save_btn = QtWidgets.QPushButton("SAVE CHANNEL", self)
# button to save changes and replot channel
self.save_replot_btn = QtWidgets.QPushButton("SAVE & REPLOT", self)
# button to close dialog without doing anything
self.cancel_btn = QtWidgets.QPushButton('CANCEL', self)
self.setup_ui()
self.set_channel_info()
self.connect_signals()
def setup_ui(self) -> None:
dlg_type = 'Add' if 'DEFAULT' in self.chan_id else 'Edit'
self.setWindowTitle(f"{dlg_type} channel {self.chan_id}"
f" - {self.data_type}")
main_layout = QtWidgets.QVBoxLayout()
self.setLayout(main_layout)
instruction = (
f"This dialog is to {dlg_type} channel {self.chan_id}.\n"
"Parameter need to be any value different than 'Default' to be "
"saved.")
main_layout.addWidget(QtWidgets.QLabel(instruction))
channel_layout = QtWidgets.QGridLayout()
main_layout.addLayout(channel_layout)
channel_layout.addWidget(QtWidgets.QLabel('Name'), 0, 0, 1, 1)
channel_layout.addWidget(self.channel_name_lnedit, 0, 1, 1, 1)
channel_layout.addWidget(QtWidgets.QLabel('Label'), 1, 0, 1, 1)
channel_layout.addWidget(self.label_lnedit, 1, 1, 1, 1)
channel_layout.addWidget(QtWidgets.QLabel('Conversion'), 2, 0, 1, 1)
channel_layout.addWidget(self.conversion_lnedit, 2, 1, 1, 1)
channel_layout.addWidget(QtWidgets.QLabel('Unit'), 3, 0, 1, 1)
channel_layout.addWidget(self.unit_lnedit, 3, 1, 1, 1)
channel_layout.addWidget(QtWidgets.QLabel('Fix Point'), 4, 0, 1, 1)
channel_layout.addWidget(self.fix_point_spnbox, 4, 1, 1, 1)
channel_layout.addWidget(QtWidgets.QLabel('Data Type'), 5, 0, 1, 1)
channel_layout.addWidget(self.data_type_lnedit, 5, 1, 1, 1)
channel_layout.addWidget(QtWidgets.QLabel('Parameter'), 6, 0, 1, 1)
channel_layout.addWidget(self.param_cbobox, 6, 1, 1, 1)
channel_layout.addWidget(self.save_btn, 7, 0, 1, 1)
channel_layout.addWidget(self.save_replot_btn, 7, 1, 1, 1)
channel_layout.addWidget(self.edit_param_btn, 8, 1, 1, 1)
channel_layout.addWidget(self.cancel_btn, 8, 0, 1, 1)
self.save_replot_btn.setFocus()
def connect_signals(self) -> None:
self.param_cbobox.currentTextChanged.connect(
self.on_param_cbobox_changed)
self.cancel_btn.clicked.connect(self.close)
self.save_btn.clicked.connect(self.on_save)
self.save_replot_btn.clicked.connect(self.on_save_replot)
self.edit_param_btn.clicked.connect(self.on_edit_param)
def set_channel_info(self):
"""
Add all Channel related info according to information got from DB.
In case Channel isn't in the DB, use the info of DEFAULT channel.
Call set_param_info to set Parameter related info.
"""
try:
self.channel_info = get_channel_info(self.chan_id, self.data_type)
except IndexError:
self.is_new_db_channel = True
self.channel_info = get_channel_info('DEFAULT', 'Default')
self.channel_name_lnedit.setText(self.chan_id)
self.label_lnedit.setText(self.channel_info['label'])
self.conversion_lnedit.setText(
str(float(self.channel_info['convertFactor'])))
self.unit_lnedit.setText(self.channel_info['unit'])
if self.channel_info['fixPoint'] is not None:
self.fix_point_spnbox.setValue(self.channel_info['fixPoint'])
self.data_type_lnedit.setText(self.data_type)
self.param_cbobox.setCurrentText(self.channel_info['param'])
self.param = self.channel_info['param']
self.set_buttons_enabled()
def on_param_cbobox_changed(self):
"""
+ Check self.param_changed_by_signal to make sure the signal come from
user selecting a parameter from param_cbobox, not the signal from
changing back to original value when condition not pass fo the set
value
+ Not allow parameter 'Default' to be select because it is only the
parameter for the channel that has no record in DB
+ If the channel already has a record in DB, give a warning when user's
trying to change its parameter.
"""
if self.param_changed_by_signal:
self.param_changed_by_signal = False
return
new_param = self.param_cbobox.currentText()
if new_param == 'Default':
# Parameter Default is only for channel that has no record in DB.
# So it isn't allowed to be selected
self.param_changed_by_signal = True
self.param_cbobox.setCurrentText(self.param)
return
if not self.is_new_db_channel:
msg = ("ARE YOU SURE YOU WANT TO CHANGE PARAMETER FOR CHANNEL "
f"'{self.chan_id}'?")
result = QtWidgets.QMessageBox.question(
self, "Confirmation", msg,
QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No)
if result == QtWidgets.QMessageBox.No:
self.param_changed_by_signal = True
self.param_cbobox.setCurrentText(self.param)
return
self.param = new_param
self.set_buttons_enabled()
def save(self):
"""
Save info from GUI to DB
"""
if self.is_new_db_channel:
self.insert_channel_info()
else:
self.update_channel_info()
def on_save(self):
"""
Save new channel info to DB
"""
self.save()
self.close()
def on_save_replot(self):
"""
Save new channel info to DB
TODO: Replot the channel in the plotting area
"""
self.save()
print("Do REPLOT")
self.close()
def set_buttons_enabled(self):
"""
Disable the 3 buttons to save and to edit parameters so that user are
forced to change param before they want to continue.
"""
if self.param == 'Default':
self.edit_param_btn.setEnabled(False)
self.save_btn.setEnabled(False)
self.save_replot_btn.setEnabled(False)
else:
self.edit_param_btn.setEnabled(True)
self.save_btn.setEnabled(True)
self.save_replot_btn.setEnabled(True)
def on_edit_param(self):
"""
Give user a warning when they want to change parameter's info then
open EditSingleParamDialog for user to edit parameter's info after
they give their confirmation.
"""
msg = ("Changing parameter will affect all of other channels that "
"have the same parameter.\n\n"
"Are you sure you want to continue?")
result = QtWidgets.QMessageBox.question(
self, "Confirmation", msg,
QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No)
if result == QtWidgets.QMessageBox.No:
return
win = EditSingleParamDialog(self, self.param_cbobox.currentText())
win.exec_()
def update_para_info(self, param):
"""
Save parameter related info to Parameters table
:param param: param condition string
"""
plot_type = create_assign_string_for_db_query(
'plotType', self.plot_type_cbo_box.currentText())
value_colorb = create_assign_string_for_db_query(
'valueColorsB', self.value_colorb_widget.text())
value_colorw = create_assign_string_for_db_query(
'valueColorsW', self.value_colorw_widget.text())
height = f"height={self.height_spnbox.value()}"
sql = (f"UPDATE Parameters SET {plot_type}, {value_colorb}, "
f"{value_colorw}, {height} WHERE {param}")
execute_db(sql)
def insert_channel_info(self):
sql = ("INSERT INTO Channels VALUES ("
f"'{self.channel_name_lnedit.text()}', "
f"'{self.label_lnedit.text()}', "
f"'{self.param_cbobox.currentText()}', "
f"NULL, " # linkedChan for RT130 only and won't be changed
f"{self.conversion_lnedit.text()}, "
f"'{self.unit_lnedit.text()}', "
f"{self.fix_point_spnbox.value()}, "
f"'{self.data_type_lnedit.text()}')")
execute_db(sql)
def update_channel_info(self):
channel = f"channel='{self.channel_name_lnedit.text()}'"
label = f"label='{self.label_lnedit.text()}'"
param = f"param='{self.param_cbobox.currentText()}'"
linked_chan = "linkedChan=NULL"
convert_factor = f"convertFactor={self.conversion_lnedit.text()}"
unit = f"unit='{self.unit_lnedit.text()}'"
fix_point = f"fixPoint={self.fix_point_spnbox.value()}"
data_type = f"dataType='{self.data_type_lnedit.text()}'"
sql = (f"UPDATE Channels SET {label}, {param}, {linked_chan}, "
f"{convert_factor}, {unit}, {fix_point}, {data_type} "
f"WHERE {channel}")
execute_db(sql)
if __name__ == '__main__':
modify_db_path()
os_name, version, *_ = platform.platform().split('-')
if os_name == 'macOS':
os.environ['QT_MAC_WANTS_LAYER'] = '1'
app = QtWidgets.QApplication(sys.argv)
# test new channel
# test = AddEditSingleChannelDialog(None, 'VEE', 'Q330')
# test linesDots. Ex: param: Input power supply current
# test = AddEditSingleChannelDialog(None, 'VEC', 'Q330')
# test MultiColorDotsLowerBound. Ex: param:Backup volt
# test = AddEditSingleChannelDialog(None, 'Backup Volt', 'RT130')
# test MultiColorDotsUpperBound. Ex: param:GNSS status
test = AddEditSingleChannelDialog(None, 'VST', 'Pegasus')
# test UpDownDots. Ex: param. Ex: param:Net Up/down
# test = AddEditSingleChannelDialog(None, 'Net Up/Down', 'RT130')
# test TriColorLInes. Ex: param. Ex: param:Error/warning
# test = AddEditSingleChannelDialog(None, 'Error/Warning', 'RT130')
test.exec_()
sys.exit(app.exec_())
import sys
import platform
import os
from typing import Optional, Dict
from PySide2 import QtWidgets
from PySide2.QtWidgets import QWidget, QDialog, QLineEdit
from sohstationviewer.view.util.plot_func_names import plot_functions
from sohstationviewer.database.process_db import execute_db
from sohstationviewer.database.extract_data import (
get_param_info, create_assign_string_for_db_query
)
from sohstationviewer.conf.dbSettings import modify_db_path
class EditSingleParamDialog(QDialog):
"""
Dialog to add info for channel not in database or edit the existing channel
"""
def __init__(self, parent: Optional[QWidget],
param: str):
"""
:param parent: the parent widget
:param chan_id: name of channel to be added/edited
:param data_type: type of the data being processed
"""
self.param = param
# # To skip on_param_chkbox_changed() when param is changed by the
# # program at the beginning
# self.param_changed_by_signal = False
# database info of the channel
self.channel_info: Dict = {}
# database info of the channel's parameter
self.param_info: Dict = {}
super(EditSingleParamDialog, self).__init__(parent)
# parameter's plot type which decides the shape of the plot
self.plot_type_cbo_box = QtWidgets.QComboBox(self)
self.plot_type_cbo_box.addItems([""] + list(plot_functions.keys()))
# value color in black mode
self.value_colorb_widget = QLineEdit(self)
self.value_colorb_widget.setPlaceholderText(
"Click edit button to add value color string")
self.value_colorb_widget.setToolTip("Priority from left to right")
# value, color in white mode
self.value_colorw_widget = QLineEdit(self)
self.value_colorw_widget.setPlaceholderText(
"Click edit button to add value color string")
self.value_colorw_widget.setToolTip("Priority from left to right")
# height of the plot
self.height_spnbox = QtWidgets.QSpinBox()
self.height_spnbox.setMinimum(0)
self.height_spnbox.setMaximum(8)
self.height_spnbox.setToolTip("Relative height of the plot")
# button to save change to DB
self.save_param_btn = QtWidgets.QPushButton(
"SAVE PARAMETER", self)
# button to close dialog without doing anything
self.cancel_btn = QtWidgets.QPushButton('CANCEL', self)
self.setup_ui()
self.set_param_info()
self.connect_signals()
def setup_ui(self) -> None:
self.setWindowTitle(f"Edit Parameter {self.param}")
main_layout = QtWidgets.QVBoxLayout()
self.setLayout(main_layout)
param_layout = QtWidgets.QGridLayout()
main_layout.addLayout(param_layout)
param_layout.addWidget(QtWidgets.QLabel('Plot Type'), 0, 0, 1, 1)
param_layout.addWidget(self.plot_type_cbo_box, 0, 1, 1, 1)
param_layout.addWidget(QtWidgets.QLabel(
'Value Color (black)'), 1, 0, 1, 1)
param_layout.addWidget(self.value_colorb_widget, 1, 1, 1, 1)
param_layout.addWidget(QtWidgets.QLabel(
'Value Color (white)'), 2, 0, 1, 1)
param_layout.addWidget(self.value_colorw_widget, 2, 1, 1, 1)
param_layout.addWidget(QtWidgets.QLabel('Height'), 3, 0, 1, 1)
param_layout.addWidget(self.height_spnbox, 3, 1, 1, 1)
param_layout.addWidget(self.cancel_btn, 4, 0, 1, 1)
param_layout.addWidget(self.save_param_btn, 4, 1, 1, 1)
def connect_signals(self) -> None:
self.plot_type_cbo_box.currentTextChanged.connect(self.set_plot_type)
self.cancel_btn.clicked.connect(self.close)
self.save_param_btn.clicked.connect(self.on_save_param)
def set_param_info(self) -> None:
"""
Fill up all info boxes
"""
self.param_info = get_param_info(self.param)
self.set_plot_type(self.param_info['plotType'])
self.height_spnbox.setValue(self.param_info['height'])
def set_plot_type(self, plot_type: str) -> None:
"""
Add Plot Type, Value Color strings.
If there is no Plot Type, no Value Color or Height because no plot.
:param plot_type: name of Plot Type
"""
if plot_type in ["", None]:
self.plot_type_cbo_box.setCurrentText('')
self.value_colorb_widget.setEnabled(False)
self.value_colorb_widget.clear()
self.value_colorw_widget.setEnabled(False)
self.value_colorw_widget.clear()
self.height_spnbox.setValue(0)
else:
self.plot_type_cbo_box.setCurrentText(plot_type)
value_color_b = self.param_info['valueColorsB']
value_color_w = self.param_info['valueColorsW']
self.value_colorb_widget.setText(value_color_b)
self.value_colorw_widget.setText(value_color_w)
def on_save_param(self):
"""
Save parameter info to Parameters table
"""
plot_type = create_assign_string_for_db_query(
'plotType', self.plot_type_cbo_box.currentText())
value_colorb = create_assign_string_for_db_query(
'valueColorsB', self.value_colorb_widget.text())
value_colorw = create_assign_string_for_db_query(
'valueColorsW', self.value_colorw_widget.text())
height = f"height={self.height_spnbox.value()}"
sql = (f"UPDATE Parameters SET {plot_type}, {value_colorb}, "
f"{value_colorw}, {height} WHERE param='{self.param}'")
execute_db(sql)
self.close()
if __name__ == '__main__':
modify_db_path()
os_name, version, *_ = platform.platform().split('-')
if os_name == 'macOS':
os.environ['QT_MAC_WANTS_LAYER'] = '1'
app = QtWidgets.QApplication(sys.argv)
# test linesDots. Ex: param: Input power supply current
test = EditSingleParamDialog(None, 'Input power supply current')
# test MultiColorDotsLowerBound. Ex: param:Backup volt
# test = EditSingleParamDialog(None, 'Backup Volt', 'RT130')
# test MultiColorDotsUpperBound. Ex: param:GNSS status
# test = EditSingleParamDialog(None, 'GNSS status')
# test UpDownDots. Ex: param. Ex: param:Net Up/down
# test = EditSingleParamDialog(None, 'Net Up/Down', 'RT130')
# test TriColorLInes. Ex: param. Ex: param:Error/warning
# test = EditSingleParamDialog(None, 'Error/Warning', 'RT130')
test.exec_()
sys.exit(app.exec_())
from PySide2 import QtWidgets, QtGui
from PySide2.QtWidgets import QWidget, QDialog
def display_color(color_label: QtWidgets.QLabel, color: str):
"""
Display color on color_label.
Display the given color on the color_label
:param color_label: the label to display color
:param color: the color that is given to update the color_label
"""
palette = color_label.palette()
palette.setColor(QtGui.QPalette.Background, QtGui.QColor(color))
color_label.setPalette(palette)
class EditValueColorDialog(QDialog):
"""Base class for value color editing dialogs of different plot types"""
def __init__(self, parent: QWidget, value_color_str: str):
"""
:param parent: the parent widget
:param value_color_str: string for value color to be saved in DB
"""
super(EditValueColorDialog, self).__init__(parent)
self.value_color_str = value_color_str
self.main_layout = QtWidgets.QGridLayout()
self.setLayout(self.main_layout)
self.cancel_btn = QtWidgets.QPushButton('CANCEL', self)
self.save_colors_btn = QtWidgets.QPushButton('SAVE COLORS', self)
self.setup_ui()
self.set_value()
self.connect_signals()
def setup_ui(self):
pass
def set_value(self):
pass
def setup_complete_buttons(self, row_total) -> None:
"""
:param row_total: total of rows to edit
"""
self.main_layout.addWidget(self.cancel_btn, row_total, 0, 1, 1)
self.main_layout.addWidget(self.save_colors_btn, row_total, 3, 1, 1)
def connect_signals(self) -> None:
self.cancel_btn.clicked.connect(self.close)
self.save_colors_btn.clicked.connect(self.on_save_color)
def on_select_color(self, color_label: QtWidgets.QLabel):
"""
When clicking on Select Color button, Color Picker will pop up with
the default color is color_label's color.
User will select a color then save to update the selected color to
the color_label.
:param color_label: the label that display the color of the obj_type
"""
color = color_label.palette().window().color()
new_color = QtWidgets.QColorDialog.getColor(color)
if new_color.isValid():
display_color(color_label, new_color.name())
self.raise_()
import sys
import platform
import os
from PySide2 import QtWidgets
from PySide2.QtWidgets import QWidget
from sohstationviewer.view.db_config.value_color_helper.\
edit_value_color_dialog.edit_value_color_dialog_super_class import \
EditValueColorDialog, display_color
class LineDotDialog(EditValueColorDialog):
"""Dialog to edit color for Line/Dot Plot"""
def __init__(self, parent: QWidget, value_color_str: str):
"""
:param parent: the parent widget
:param value_color_str: string for value color to be saved in DB
"""
# Widget that allow user to add/edit line's color
self.select_line_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display line's color
self.line_color_label = QtWidgets.QLabel()
self.line_color_label.setFixedWidth(30)
self.line_color_label.setAutoFillBackground(True)
# check box to include dot in value_color_str or not
self.dot_include_chkbox = QtWidgets.QCheckBox('Included')
# Widget that allow user to add/edit dot's color
self.select_dot_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display dot's color
self.dot_color_label = QtWidgets.QLabel()
self.dot_color_label.setFixedWidth(30)
self.dot_color_label.setAutoFillBackground(True)
super(LineDotDialog, self).__init__(parent, value_color_str)
self.setWindowTitle("Edit Line/Dot Plotting's Colors")
self.on_click_include_dot()
def setup_ui(self) -> None:
self.main_layout.addWidget(QtWidgets.QLabel('Line Color'), 0, 1, 1, 1)
self.main_layout.addWidget(self.line_color_label, 0, 2, 1, 1)
self.main_layout.addWidget(self.select_line_color_btn, 0, 3, 1, 1)
self.main_layout.addWidget(self.dot_include_chkbox, 1, 0, 1, 1)
self.main_layout.addWidget(QtWidgets.QLabel('Dot Color'), 1, 1, 1, 1)
self.main_layout.addWidget(self.dot_color_label, 1, 2, 1, 1)
self.main_layout.addWidget(self.select_dot_color_btn, 1, 3, 1, 1)
self.setup_complete_buttons(2)
def connect_signals(self) -> None:
self.select_line_color_btn.clicked.connect(
lambda: self.on_select_color(self.line_color_label))
self.select_dot_color_btn.clicked.connect(
lambda: self.on_select_color(self.dot_color_label))
self.dot_include_chkbox.clicked.connect(self.on_click_include_dot)
super().connect_signals()
def on_click_include_dot(self):
"""
Enable/disable select color and show/hide color label according to
dot_include_chkbox is checked or unchecked.
"""
enabled = self.dot_include_chkbox.isChecked()
self.select_dot_color_btn.setEnabled(enabled)
self.dot_color_label.setHidden(not enabled)
def set_value(self):
"""
Change the corresponding color_labels's color according to the color
from value_color_str.
"""
self.dot_include_chkbox.setChecked(False)
if self.value_color_str == "":
return
vc_parts = self.value_color_str.split('|')
for vc_str in vc_parts:
obj_type, color = vc_str.split(':')
if obj_type == 'Line':
display_color(self.line_color_label, color)
if obj_type == 'Dot':
display_color(self.dot_color_label, color)
self.dot_include_chkbox.setChecked(True)
def on_save_color(self):
"""
Create value_color_str from GUI's info and close the GUI with color
is the hex color got from color_labels' color
"""
line_color = self.line_color_label.palette().window().color().name()
self.value_color_str = f"Line:{line_color}"
if self.dot_include_chkbox.isChecked():
dot_color = self.dot_color_label.palette().window().color().name()
self.value_color_str += f"|Dot:{dot_color}"
self.close()
if __name__ == '__main__':
os_name, version, *_ = platform.platform().split('-')
if os_name == 'macOS':
os.environ['QT_MAC_WANTS_LAYER'] = '1'
app = QtWidgets.QApplication(sys.argv)
test = LineDotDialog(None, "Line:#00FF00|Dot:#00FF00")
test.exec_()
print("result:", test.value_color_str)
sys.exit(app.exec_())
import sys
import platform
import os
from PySide2 import QtWidgets
from PySide2.QtWidgets import QWidget
from sohstationviewer.view.db_config.value_color_helper.\
edit_value_color_dialog.edit_value_color_dialog_super_class import \
EditValueColorDialog, display_color
class TriColorLinesDialog(EditValueColorDialog):
"""Dialog to edit color for triColorLines plot"""
def __init__(self, parent: QWidget, value_color_str: str):
"""
:param parent: the parent widget
:param value_color_str: string for value color to be saved in DB
"""
# Widget that allow user to add/edit value positive one's color
self.select_pos_one_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display positive one's color
self.pos_one_color_label = QtWidgets.QLabel()
self.pos_one_color_label.setFixedWidth(30)
self.pos_one_color_label.setAutoFillBackground(True)
# Widget that allow user to add/edit value zero's color
self.select_zero_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display down's color
self.zero_color_label = QtWidgets.QLabel()
self.zero_color_label.setFixedWidth(30)
self.zero_color_label.setAutoFillBackground(True)
# Widget that allow user to add/edit value positive one's color
self.select_neg_one_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display positive one's color
self.neg_one_color_label = QtWidgets.QLabel()
self.neg_one_color_label.setFixedWidth(30)
self.neg_one_color_label.setAutoFillBackground(True)
super(TriColorLinesDialog, self).__init__(parent, value_color_str)
self.setWindowTitle("Edit TriColor Plotting's Colors")
def setup_ui(self) -> None:
self.main_layout.addWidget(QtWidgets.QLabel('" 1" Color'), 0, 0, 1, 1)
self.main_layout.addWidget(self.pos_one_color_label, 0, 1, 1, 1)
self.main_layout.addWidget(self.select_pos_one_color_btn, 0, 2, 1, 1)
self.main_layout.addWidget(QtWidgets.QLabel('" 0" Color'), 1, 0, 1, 1)
self.main_layout.addWidget(self.zero_color_label, 1, 1, 1, 1)
self.main_layout.addWidget(self.select_zero_color_btn, 1, 2, 1, 1)
self.main_layout.addWidget(QtWidgets.QLabel('"-1" Color'), 2, 0, 1, 1)
self.main_layout.addWidget(self.neg_one_color_label, 2, 1, 1, 1)
self.main_layout.addWidget(self.select_neg_one_color_btn, 2, 2, 1, 1)
self.setup_complete_buttons(3)
def connect_signals(self) -> None:
self.select_pos_one_color_btn.clicked.connect(
lambda: self.on_select_color(self.pos_one_color_label))
self.select_zero_color_btn.clicked.connect(
lambda: self.on_select_color(self.zero_color_label))
self.select_neg_one_color_btn.clicked.connect(
lambda: self.on_select_color(self.neg_one_color_label))
super().connect_signals()
def set_value(self):
"""
Change the corresponding color_labels's color according to the color
from value_color_str.
"""
if self.value_color_str == "":
return
vc_parts = self.value_color_str.split('|')
for vc_str in vc_parts:
val, color = vc_str.split(':')
if val == '1':
display_color(self.pos_one_color_label, color)
if val == '0':
display_color(self.zero_color_label, color)
if val == '-1':
display_color(self.neg_one_color_label, color)
def on_save_color(self):
"""
Create value_color_str from GUI's info and close the GUI with color
is the hex color got from color_labels' color
"""
pos_one_color = self.pos_one_color_label.palette()\
.window().color().name()
zero_color = self.zero_color_label.palette().window().color().name()
neg_one_color = self.neg_one_color_label.palette() \
.window().color().name()
self.value_color_str = (f"-1:{neg_one_color}|0:{zero_color}"
f"|1:{pos_one_color}")
self.close()
if __name__ == '__main__':
os_name, version, *_ = platform.platform().split('-')
if os_name == 'macOS':
os.environ['QT_MAC_WANTS_LAYER'] = '1'
app = QtWidgets.QApplication(sys.argv)
test = TriColorLinesDialog(None, '-1:#FF00FF|0:#FF0000|1:#00FF00')
test.exec_()
print("result:", test.value_color_str)
sys.exit(app.exec_())
import sys
import platform
import os
from PySide2 import QtWidgets
from PySide2.QtWidgets import QWidget
from sohstationviewer.view.db_config.value_color_helper.\
edit_value_color_dialog.edit_value_color_dialog_super_class import \
EditValueColorDialog, display_color
class UpDownDialog(EditValueColorDialog):
"""Dialog to edit color for Up/Down Plot"""
def __init__(self, parent: QWidget, value_color_str: str):
"""
:param parent: the parent widget
:param value_color_str: string for value color to be saved in DB
"""
# Widget that allow user to add/edit up's color
self.select_up_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display up's color
self.up_color_label = QtWidgets.QLabel()
self.up_color_label.setFixedWidth(30)
self.up_color_label.setAutoFillBackground(True)
# Widget that allow user to add/edit down's color
self.select_down_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display down's color
self.down_color_label = QtWidgets.QLabel()
self.down_color_label.setFixedWidth(30)
self.down_color_label.setAutoFillBackground(True)
super(UpDownDialog, self).__init__(parent, value_color_str)
self.setWindowTitle("Edit Up/Down Plotting's Colors")
def setup_ui(self) -> None:
self.main_layout.addWidget(QtWidgets.QLabel('Up Color'), 0, 0, 1, 1)
self.main_layout.addWidget(self.up_color_label, 0, 1, 1, 1)
self.main_layout.addWidget(self.select_up_color_btn, 0, 2, 1, 1)
self.main_layout.addWidget(QtWidgets.QLabel('Down Color'), 1, 0, 1, 1)
self.main_layout.addWidget(self.down_color_label, 1, 1, 1, 1)
self.main_layout.addWidget(self.select_down_color_btn, 1, 2, 1, 1)
self.setup_complete_buttons(2)
def connect_signals(self) -> None:
self.select_up_color_btn.clicked.connect(
lambda: self.on_select_color(self.up_color_label))
self.select_down_color_btn.clicked.connect(
lambda: self.on_select_color(self.down_color_label))
super().connect_signals()
def set_value(self):
"""
Change the corresponding color_labels's color according to the color
from value_color_str.
"""
if self.value_color_str == "":
return
vc_parts = self.value_color_str.split('|')
for vc_str in vc_parts:
obj_type, color = vc_str.split(':')
if obj_type == 'Up':
display_color(self.up_color_label, color)
if obj_type == 'Down':
display_color(self.down_color_label, color)
def on_save_color(self):
"""
Create value_color_str from GUI's info and close the GUI with color
is the hex color got from color_labels' color
"""
up_color = self.up_color_label.palette().window().color().name()
down_color = self.down_color_label.palette().window().color().name()
self.value_color_str = f"Up:{up_color}|Down:{down_color}"
self.close()
if __name__ == '__main__':
os_name, version, *_ = platform.platform().split('-')
if os_name == 'macOS':
os.environ['QT_MAC_WANTS_LAYER'] = '1'
app = QtWidgets.QApplication(sys.argv)
test = UpDownDialog(None, 'Down:#FF0000|Up:#00FF00')
test.exec_()
print("result:", test.value_color_str)
sys.exit(app.exec_())
......@@ -36,8 +36,9 @@ from sohstationviewer.view.help_view import HelpBrowser
from sohstationviewer.view.ui.main_ui import UIMainWindow
from sohstationviewer.view.util.enums import LogType
from sohstationviewer.view.util.functions import (
check_chan_wildcards_format, check_masspos,
)
check_chan_wildcards_format, check_masspos)
from sohstationviewer.view.util.check_file_size import check_folders_size
from sohstationviewer.view.channel_prefer_dialog import ChannelPreferDialog
from sohstationviewer.view.create_muti_buttons_dialog import (
create_multi_buttons_dialog
......@@ -47,7 +48,6 @@ from sohstationviewer.controller.processing import detect_data_type
from sohstationviewer.controller.util import (
display_tracking_info, rt130_find_cf_dass, check_data_sdata
)
from sohstationviewer.database.process_db import execute_db_dict, execute_db
from sohstationviewer.conf.constants import TM_FORMAT, ColorMode, CONFIG_PATH
......@@ -574,7 +574,12 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
if self.list_of_dir == []:
msg = "No directories have been selected."
raise Exception(msg)
if self.warn_big_file_sizes.isChecked():
# call check_folder_size() here b/c it requires list_of_dir and it
# is before the called for detect_data_type() which sometimes take
# quite a long time.
if not check_folders_size(self.list_of_dir, self.req_wf_chans):
raise Exception("Big size")
# Log files don't have a data type that can be detected, so we don't
# detect the data type if we are reading them.
if self.rt130_das_dict == {} and not self.log_checkbox.isChecked():
......@@ -624,22 +629,16 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.has_problem = False
if self.gap_len_line_edit.text().strip() != '':
try:
# convert from minute to second
self.gap_minimum = float(
self.gap_len_line_edit.text()) * 60
except ValueError:
msg = "Minimum Gap must be a number."
QtWidgets.QMessageBox.warning(
self, "Invalid Minimum Gap request", msg)
self.cancel_loading()
return
if self.gap_minimum < 0.1:
# convert from minute to second
minimum_gap_in_minutes = float(self.gap_len_line_edit.text())
if minimum_gap_in_minutes < 0.1:
msg = "Minimum Gap must be greater than 0.1 minute to be " \
"detected."
QtWidgets.QMessageBox.warning(
self, "Invalid Minimum Gap request", msg)
self.cancel_loading()
return
self.gap_minimum = minimum_gap_in_minutes * 60
else:
self.gap_minimum = None
......@@ -680,6 +679,9 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.cancel_loading()
return
self.data_type == 'Unknown'
elif str(e) == "Big size":
self.cancel_loading()
return
else:
fmt = traceback.format_exc()
QtWidgets.QMessageBox.warning(
......
......@@ -219,6 +219,11 @@ class UIMainWindow(object):
# ========================== Option Menu =======================
"""
warn_big_file_sizes: option to check file sizes and give warning if
total file sizes are greater than constant.BIG_FILE_SIZE
"""
self.warn_big_file_sizes: Optional[QAction] = None
"""
mp_regular_color_action: set self.mass_pos_volt_range_opt to 'regular'
mp_trillium_color_action: set self.mass_pos_volt_range_opt to
'trillium'
......@@ -473,6 +478,10 @@ class UIMainWindow(object):
gap_layout.addWidget(QLabel("Minimum Gap Length "))
self.gap_len_line_edit = QLineEdit(self.central_widget)
gap_validator = QtGui.QDoubleValidator()
gap_validator.setDecimals(2)
gap_validator.setNotation(QtGui.QDoubleValidator.StandardNotation)
self.gap_len_line_edit.setValidator(gap_validator)
gap_layout.addWidget(self.gap_len_line_edit)
gap_layout.addWidget(QLabel(' m'))
......@@ -629,6 +638,13 @@ class UIMainWindow(object):
:param main_window: QMainWindow - main GUI for user to interact with
:param menu: QMenu - Options Menu
"""
self.warn_big_file_sizes = QAction(
'Warn big file sizes', main_window
)
self.warn_big_file_sizes.setCheckable(True)
menu.addAction(self.warn_big_file_sizes)
menu.addSeparator()
mp_coloring_menu = QMenu('MP Coloring:', main_window)
menu.addMenu(mp_coloring_menu)
mp_coloring_group = QActionGroup(main_window)
......
This diff is collapsed.
......@@ -183,7 +183,7 @@ class TestReftek(BaseTestCase):
expected_soh = [
'SOH/Data Def', 'Battery Volt', 'DAS Temp', 'Backup Volt',
'Disk Usage1', 'Disk Usage2', 'Dump Called/Comp', 'GPS On/Off/Err',
'GPS Lk/Unlk', 'Clk Phase Err', 'Event DS1', 'Event DS9']
'GPS Lk/Unlk', 'Clk Phase Err', 'Event DS1']
expected_waveform = ['DS1-1', 'DS1-2', 'DS1-3']
obj = RT130(**args)
self.assertEqual(obj.found_data_streams, [9, 1, 1, 2, 2])
......
......@@ -339,7 +339,6 @@ class MockMSeed(MSeed):
def __init__(self): # noqa
self.notification_signal = None
self.tmp_dir = ''
def track_info(self, text: str, type: LogType) -> None:
print(text)
......
from tempfile import TemporaryDirectory, NamedTemporaryFile
import shutil
import os
from pathlib import Path
from unittest import TestCase
from sohstationviewer.view.util.check_file_size import _check_folders_size
from sohstationviewer.conf.constants import BIG_FILE_SIZE
TEST_DATA_DIR = Path(__file__).resolve().parent.parent.parent.joinpath(
'test_data')
NON_DATA_FILE = TEST_DATA_DIR.joinpath('Non-data-file/non_data_file')
MULTIPLEX_FILE = TEST_DATA_DIR.joinpath(
'Q330_multiplex/XX-3203_4-20221222183011')
NON_MULTIPLEX_LOW_SPR_FILE = TEST_DATA_DIR.joinpath(
'Q330-sample/day_vols_AX08/AX08.XA..VM1.2021.186')
NON_MULTIPLEX_HIGH_SPR_FILE = TEST_DATA_DIR.joinpath(
'Q330-sample/day_vols_AX08/AX08.XA..HHE.2021.186')
NON_MULTIPLEX_HIGH_N_LOW_SPR_SET = TEST_DATA_DIR.joinpath('Q330-sample')
RT130_FILE = TEST_DATA_DIR.joinpath(
'RT130-sample/2017149.92EB/2017150/92EB/1/010000015_0036EE80')
class TestGetDirSize(TestCase):
def test_less_or_equal_200_text_files(self):
number_of_text_files = 25
with TemporaryDirectory() as directory:
files = []
for i in range(number_of_text_files):
files.append(NamedTemporaryFile(dir=directory))
expected_result = {'data_size': 0,
'text_count': 25,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
# Explicitly clean up the temporary files. If we don't do this,
# the temporary directory will clean up itself and delete the
# temporary files. Then, when the function returns, the references
# to these temporary files will attempt to clean up the files. This
# leads to exceptions being raised because the files being cleaned
# up does not exist anymore.
[file.close() for file in files]
def test_more_than_200_text_files(self):
number_of_text_files = 250
with TemporaryDirectory() as directory:
files = []
for i in range(number_of_text_files):
files.append(NamedTemporaryFile(dir=directory))
expected_result = {'data_size': 0,
'text_count': 201, # stop when more than 200
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[file.close() for file in files]
def test_less_or_equal_200_binary_files(self):
number_of_binary_files = 25
with TemporaryDirectory() as directory:
files = []
for i in range(number_of_binary_files):
new_file_path = Path(directory).joinpath(
f'{NON_DATA_FILE.name}_{i}')
shutil.copy(NON_DATA_FILE, new_file_path)
files.append(new_file_path)
expected_result = {'data_size': 0,
'text_count': 0,
'binary_count': 25}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_more_than_200_binary_files(self):
number_of_binary_files = 250
with TemporaryDirectory() as directory:
files = []
for i in range(number_of_binary_files):
new_file_path = Path(directory).joinpath(
f'{NON_DATA_FILE.name}_{i}')
shutil.copy(NON_DATA_FILE, new_file_path)
files.append(new_file_path)
expected_result = {'data_size': 0,
'text_count': 0,
'binary_count': 201} # stop when more than 200
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_less_or_equal_limit_mseed_multiplexed_files(self):
sample_file_size = os.path.getsize(MULTIPLEX_FILE)
expected_size = 0
with TemporaryDirectory() as directory:
files = []
for i in range(3):
new_file_path = Path(directory).joinpath(
f'{MULTIPLEX_FILE.name}_{i}')
shutil.copy(MULTIPLEX_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_more_than_limit_mseed_multiplexed_files(self):
sample_file_size = os.path.getsize(MULTIPLEX_FILE)
expected_size = 0
count = 0
with TemporaryDirectory() as directory:
files = []
while 1:
new_file_path = Path(directory).joinpath(
f'{MULTIPLEX_FILE.name}_{count}')
shutil.copy(MULTIPLEX_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
if expected_size > BIG_FILE_SIZE:
break
count += 1
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_less_or_equal_limit_mseed_non_multiplexed_low_spr_files(self):
sample_file_size = os.path.getsize(NON_MULTIPLEX_LOW_SPR_FILE)
expected_size = 0
with TemporaryDirectory() as directory:
files = []
for i in range(3):
new_file_path = Path(directory).joinpath(
f'{NON_MULTIPLEX_LOW_SPR_FILE.name}_{i}')
shutil.copy(NON_MULTIPLEX_LOW_SPR_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_less_or_equal_limit_mseed_non_multiplexed_high_spr_files(self):
sample_file_size = os.path.getsize(NON_MULTIPLEX_HIGH_SPR_FILE)
expected_size = 0
with TemporaryDirectory() as directory:
files = []
for i in range(3):
new_file_path = Path(directory).joinpath(
f'{NON_MULTIPLEX_HIGH_SPR_FILE.name}_{i}')
shutil.copy(NON_MULTIPLEX_HIGH_SPR_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_more_than_limit_mseed_non_multiplexed_high_spr_files(self):
sample_file_size = os.path.getsize(NON_MULTIPLEX_HIGH_SPR_FILE)
expected_size = 0
count = 0
with TemporaryDirectory() as directory:
files = []
while 1:
new_file_path = Path(directory).joinpath(
f'{NON_MULTIPLEX_HIGH_SPR_FILE.name}_{count}')
shutil.copy(NON_MULTIPLEX_HIGH_SPR_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
if expected_size > BIG_FILE_SIZE:
break
count += 1
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_mseed_non_multiplexed_high_n_low_spr_files(self):
expected_result = {'data_size': 11251712,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([NON_MULTIPLEX_HIGH_N_LOW_SPR_SET], [])
self.assertEqual(ret, expected_result)
def test_less_or_equal_limit_rt130_files(self):
sample_file_size = os.path.getsize(RT130_FILE)
expected_size = 0
with TemporaryDirectory() as directory:
files = []
new_data_stream_path = Path(directory).joinpath('1')
new_data_stream_path.mkdir(
parents=True, exist_ok=True)
for i in range(3):
new_file_path = new_data_stream_path.joinpath(
f'{RT130_FILE.name}_{i}')
shutil.copy(RT130_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], ['1'])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_more_than_limit_rt130_files(self):
sample_file_size = os.path.getsize(RT130_FILE)
expected_size = 0
count = 0
with TemporaryDirectory() as directory:
files = []
new_data_stream_path = Path(directory).joinpath('1')
new_data_stream_path.mkdir(
parents=True, exist_ok=True)
while 1:
new_file_path = new_data_stream_path.joinpath(
f'{RT130_FILE.name}_{count}')
shutil.copy(RT130_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
if expected_size > BIG_FILE_SIZE:
break
count += 1
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], ['1'])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_rt130_no_requested_datastream_files(self):
sample_file_size = os.path.getsize(RT130_FILE)
expected_size = 0
with TemporaryDirectory() as directory:
files = []
new_data_stream_path = Path(directory).joinpath('1')
new_data_stream_path.mkdir(
parents=True, exist_ok=True)
for i in range(3):
new_file_path = new_data_stream_path.joinpath(
f'{RT130_FILE.name}_{i}')
shutil.copy(RT130_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
expected_result = {'data_size': 0,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], ['2'])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_empty_directory(self):
with TemporaryDirectory() as temp_dir:
expected_result = {'data_size': 0,
'text_count': 0,
'binary_count': 0}
result = _check_folders_size([temp_dir], ['*'])
self.assertEqual(result, expected_result)
def test_directory_does_not_exist(self):
empty_name_dir = ''
try:
_check_folders_size([empty_name_dir], [])
except Exception as e:
self.assertEqual(
str(e),
"'' isn't a valid directory"
)
non_existent_dir = 'directory does not exist'
try:
_check_folders_size([non_existent_dir], [])
except Exception as e:
self.assertEqual(
str(e),
"'directory does not exist' isn't a valid directory"
)