Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
Commits on Source (4)
Showing
with 1281 additions and 5 deletions
......@@ -12,7 +12,7 @@ WF_3RD = 'ZNE123456'
HIGHEST_INT = 1E100
# warn user if file bigger than this size
BIG_FILE_SIZE = 2 * 10**8
BIG_FILE_SIZE = 2 * 10**9 # 2 GB
# Matplotlib's performance be slow if data point total > than this limit
CHAN_SIZE_LIMIT = 10**6
......
from PySide2 import QtWidgets, QtGui
from PySide2.QtWidgets import QWidget, QDialog
def display_color(color_label: QtWidgets.QLabel, color: str):
"""
Display color on color_label.
Display the given color on the color_label
:param color_label: the label to display color
:param color: the color that is given to update the color_label
"""
palette = color_label.palette()
palette.setColor(QtGui.QPalette.Background, QtGui.QColor(color))
color_label.setPalette(palette)
class EditValueColorDialog(QDialog):
"""Base class for value color editing dialogs of different plot types"""
def __init__(self, parent: QWidget, value_color_str: str):
"""
:param parent: the parent widget
:param value_color_str: string for value color to be saved in DB
"""
super(EditValueColorDialog, self).__init__(parent)
self.value_color_str = value_color_str
self.main_layout = QtWidgets.QGridLayout()
self.setLayout(self.main_layout)
self.cancel_btn = QtWidgets.QPushButton('CANCEL', self)
self.save_colors_btn = QtWidgets.QPushButton('SAVE COLORS', self)
self.setup_ui()
self.set_value()
self.connect_signals()
def setup_ui(self):
pass
def set_value(self):
pass
def setup_complete_buttons(self, row_total) -> None:
"""
:param row_total: total of rows to edit
"""
self.main_layout.addWidget(self.cancel_btn, row_total, 0, 1, 1)
self.main_layout.addWidget(self.save_colors_btn, row_total, 3, 1, 1)
def connect_signals(self) -> None:
self.cancel_btn.clicked.connect(self.close)
self.save_colors_btn.clicked.connect(self.on_save_color)
def on_select_color(self, color_label: QtWidgets.QLabel):
"""
When clicking on Select Color button, Color Picker will pop up with
the default color is color_label's color.
User will select a color then save to update the selected color to
the color_label.
:param color_label: the label that display the color of the obj_type
"""
color = color_label.palette().window().color()
new_color = QtWidgets.QColorDialog.getColor(color)
if new_color.isValid():
display_color(color_label, new_color.name())
self.raise_()
import sys
import platform
import os
from PySide2 import QtWidgets
from PySide2.QtWidgets import QWidget
from sohstationviewer.view.db_config.value_color_helper.\
edit_value_color_dialog.edit_value_color_dialog_super_class import \
EditValueColorDialog, display_color
class LineDotDialog(EditValueColorDialog):
"""Dialog to edit color for Line/Dot Plot"""
def __init__(self, parent: QWidget, value_color_str: str):
"""
:param parent: the parent widget
:param value_color_str: string for value color to be saved in DB
"""
# Widget that allow user to add/edit line's color
self.select_line_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display line's color
self.line_color_label = QtWidgets.QLabel()
self.line_color_label.setFixedWidth(30)
self.line_color_label.setAutoFillBackground(True)
# check box to include dot in value_color_str or not
self.dot_include_chkbox = QtWidgets.QCheckBox('Included')
# Widget that allow user to add/edit dot's color
self.select_dot_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display dot's color
self.dot_color_label = QtWidgets.QLabel()
self.dot_color_label.setFixedWidth(30)
self.dot_color_label.setAutoFillBackground(True)
super(LineDotDialog, self).__init__(parent, value_color_str)
self.setWindowTitle("Edit Line/Dot Plotting's Colors")
self.on_click_include_dot()
def setup_ui(self) -> None:
self.main_layout.addWidget(QtWidgets.QLabel('Line Color'), 0, 1, 1, 1)
self.main_layout.addWidget(self.line_color_label, 0, 2, 1, 1)
self.main_layout.addWidget(self.select_line_color_btn, 0, 3, 1, 1)
self.main_layout.addWidget(self.dot_include_chkbox, 1, 0, 1, 1)
self.main_layout.addWidget(QtWidgets.QLabel('Dot Color'), 1, 1, 1, 1)
self.main_layout.addWidget(self.dot_color_label, 1, 2, 1, 1)
self.main_layout.addWidget(self.select_dot_color_btn, 1, 3, 1, 1)
self.setup_complete_buttons(2)
def connect_signals(self) -> None:
self.select_line_color_btn.clicked.connect(
lambda: self.on_select_color(self.line_color_label))
self.select_dot_color_btn.clicked.connect(
lambda: self.on_select_color(self.dot_color_label))
self.dot_include_chkbox.clicked.connect(self.on_click_include_dot)
super().connect_signals()
def on_click_include_dot(self):
"""
Enable/disable select color and show/hide color label according to
dot_include_chkbox is checked or unchecked.
"""
enabled = self.dot_include_chkbox.isChecked()
self.select_dot_color_btn.setEnabled(enabled)
self.dot_color_label.setHidden(not enabled)
def set_value(self):
"""
Change the corresponding color_labels's color according to the color
from value_color_str.
"""
self.dot_include_chkbox.setChecked(False)
if self.value_color_str == "":
return
vc_parts = self.value_color_str.split('|')
for vc_str in vc_parts:
obj_type, color = vc_str.split(':')
if obj_type == 'Line':
display_color(self.line_color_label, color)
if obj_type == 'Dot':
display_color(self.dot_color_label, color)
self.dot_include_chkbox.setChecked(True)
def on_save_color(self):
"""
Create value_color_str from GUI's info and close the GUI with color
is the hex color got from color_labels' color
"""
line_color = self.line_color_label.palette().window().color().name()
self.value_color_str = f"Line:{line_color}"
if self.dot_include_chkbox.isChecked():
dot_color = self.dot_color_label.palette().window().color().name()
self.value_color_str += f"|Dot:{dot_color}"
self.close()
if __name__ == '__main__':
os_name, version, *_ = platform.platform().split('-')
if os_name == 'macOS':
os.environ['QT_MAC_WANTS_LAYER'] = '1'
app = QtWidgets.QApplication(sys.argv)
test = LineDotDialog(None, "Line:#00FF00|Dot:#00FF00")
test.exec_()
print("result:", test.value_color_str)
sys.exit(app.exec_())
import sys
import platform
import os
from PySide2 import QtWidgets
from PySide2.QtWidgets import QWidget
from sohstationviewer.view.db_config.value_color_helper.\
edit_value_color_dialog.edit_value_color_dialog_super_class import \
EditValueColorDialog, display_color
class TriColorLinesDialog(EditValueColorDialog):
"""Dialog to edit color for triColorLines plot"""
def __init__(self, parent: QWidget, value_color_str: str):
"""
:param parent: the parent widget
:param value_color_str: string for value color to be saved in DB
"""
# Widget that allow user to add/edit value positive one's color
self.select_pos_one_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display positive one's color
self.pos_one_color_label = QtWidgets.QLabel()
self.pos_one_color_label.setFixedWidth(30)
self.pos_one_color_label.setAutoFillBackground(True)
# Widget that allow user to add/edit value zero's color
self.select_zero_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display down's color
self.zero_color_label = QtWidgets.QLabel()
self.zero_color_label.setFixedWidth(30)
self.zero_color_label.setAutoFillBackground(True)
# Widget that allow user to add/edit value positive one's color
self.select_neg_one_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display positive one's color
self.neg_one_color_label = QtWidgets.QLabel()
self.neg_one_color_label.setFixedWidth(30)
self.neg_one_color_label.setAutoFillBackground(True)
super(TriColorLinesDialog, self).__init__(parent, value_color_str)
self.setWindowTitle("Edit TriColor Plotting's Colors")
def setup_ui(self) -> None:
self.main_layout.addWidget(QtWidgets.QLabel('" 1" Color'), 0, 0, 1, 1)
self.main_layout.addWidget(self.pos_one_color_label, 0, 1, 1, 1)
self.main_layout.addWidget(self.select_pos_one_color_btn, 0, 2, 1, 1)
self.main_layout.addWidget(QtWidgets.QLabel('" 0" Color'), 1, 0, 1, 1)
self.main_layout.addWidget(self.zero_color_label, 1, 1, 1, 1)
self.main_layout.addWidget(self.select_zero_color_btn, 1, 2, 1, 1)
self.main_layout.addWidget(QtWidgets.QLabel('"-1" Color'), 2, 0, 1, 1)
self.main_layout.addWidget(self.neg_one_color_label, 2, 1, 1, 1)
self.main_layout.addWidget(self.select_neg_one_color_btn, 2, 2, 1, 1)
self.setup_complete_buttons(3)
def connect_signals(self) -> None:
self.select_pos_one_color_btn.clicked.connect(
lambda: self.on_select_color(self.pos_one_color_label))
self.select_zero_color_btn.clicked.connect(
lambda: self.on_select_color(self.zero_color_label))
self.select_neg_one_color_btn.clicked.connect(
lambda: self.on_select_color(self.neg_one_color_label))
super().connect_signals()
def set_value(self):
"""
Change the corresponding color_labels's color according to the color
from value_color_str.
"""
if self.value_color_str == "":
return
vc_parts = self.value_color_str.split('|')
for vc_str in vc_parts:
val, color = vc_str.split(':')
if val == '1':
display_color(self.pos_one_color_label, color)
if val == '0':
display_color(self.zero_color_label, color)
if val == '-1':
display_color(self.neg_one_color_label, color)
def on_save_color(self):
"""
Create value_color_str from GUI's info and close the GUI with color
is the hex color got from color_labels' color
"""
pos_one_color = self.pos_one_color_label.palette()\
.window().color().name()
zero_color = self.zero_color_label.palette().window().color().name()
neg_one_color = self.neg_one_color_label.palette() \
.window().color().name()
self.value_color_str = (f"-1:{neg_one_color}|0:{zero_color}"
f"|1:{pos_one_color}")
self.close()
if __name__ == '__main__':
os_name, version, *_ = platform.platform().split('-')
if os_name == 'macOS':
os.environ['QT_MAC_WANTS_LAYER'] = '1'
app = QtWidgets.QApplication(sys.argv)
test = TriColorLinesDialog(None, '-1:#FF00FF|0:#FF0000|1:#00FF00')
test.exec_()
print("result:", test.value_color_str)
sys.exit(app.exec_())
import sys
import platform
import os
from PySide2 import QtWidgets
from PySide2.QtWidgets import QWidget
from sohstationviewer.view.db_config.value_color_helper.\
edit_value_color_dialog.edit_value_color_dialog_super_class import \
EditValueColorDialog, display_color
class UpDownDialog(EditValueColorDialog):
"""Dialog to edit color for Up/Down Plot"""
def __init__(self, parent: QWidget, value_color_str: str):
"""
:param parent: the parent widget
:param value_color_str: string for value color to be saved in DB
"""
# Widget that allow user to add/edit up's color
self.select_up_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display up's color
self.up_color_label = QtWidgets.QLabel()
self.up_color_label.setFixedWidth(30)
self.up_color_label.setAutoFillBackground(True)
# Widget that allow user to add/edit down's color
self.select_down_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display down's color
self.down_color_label = QtWidgets.QLabel()
self.down_color_label.setFixedWidth(30)
self.down_color_label.setAutoFillBackground(True)
super(UpDownDialog, self).__init__(parent, value_color_str)
self.setWindowTitle("Edit Up/Down Plotting's Colors")
def setup_ui(self) -> None:
self.main_layout.addWidget(QtWidgets.QLabel('Up Color'), 0, 0, 1, 1)
self.main_layout.addWidget(self.up_color_label, 0, 1, 1, 1)
self.main_layout.addWidget(self.select_up_color_btn, 0, 2, 1, 1)
self.main_layout.addWidget(QtWidgets.QLabel('Down Color'), 1, 0, 1, 1)
self.main_layout.addWidget(self.down_color_label, 1, 1, 1, 1)
self.main_layout.addWidget(self.select_down_color_btn, 1, 2, 1, 1)
self.setup_complete_buttons(2)
def connect_signals(self) -> None:
self.select_up_color_btn.clicked.connect(
lambda: self.on_select_color(self.up_color_label))
self.select_down_color_btn.clicked.connect(
lambda: self.on_select_color(self.down_color_label))
super().connect_signals()
def set_value(self):
"""
Change the corresponding color_labels's color according to the color
from value_color_str.
"""
if self.value_color_str == "":
return
vc_parts = self.value_color_str.split('|')
for vc_str in vc_parts:
obj_type, color = vc_str.split(':')
if obj_type == 'Up':
display_color(self.up_color_label, color)
if obj_type == 'Down':
display_color(self.down_color_label, color)
def on_save_color(self):
"""
Create value_color_str from GUI's info and close the GUI with color
is the hex color got from color_labels' color
"""
up_color = self.up_color_label.palette().window().color().name()
down_color = self.down_color_label.palette().window().color().name()
self.value_color_str = f"Up:{up_color}|Down:{down_color}"
self.close()
if __name__ == '__main__':
os_name, version, *_ = platform.platform().split('-')
if os_name == 'macOS':
os.environ['QT_MAC_WANTS_LAYER'] = '1'
app = QtWidgets.QApplication(sys.argv)
test = UpDownDialog(None, 'Down:#FF0000|Up:#00FF00')
test.exec_()
print("result:", test.value_color_str)
sys.exit(app.exec_())
......@@ -36,8 +36,9 @@ from sohstationviewer.view.help_view import HelpBrowser
from sohstationviewer.view.ui.main_ui import UIMainWindow
from sohstationviewer.view.util.enums import LogType
from sohstationviewer.view.util.functions import (
check_chan_wildcards_format, check_masspos,
)
check_chan_wildcards_format, check_masspos)
from sohstationviewer.view.util.check_file_size import check_folders_size
from sohstationviewer.view.channel_prefer_dialog import ChannelPreferDialog
from sohstationviewer.view.create_muti_buttons_dialog import (
create_multi_buttons_dialog
......@@ -47,7 +48,6 @@ from sohstationviewer.controller.processing import detect_data_type
from sohstationviewer.controller.util import (
display_tracking_info, rt130_find_cf_dass, check_data_sdata
)
from sohstationviewer.database.process_db import execute_db_dict, execute_db
from sohstationviewer.conf.constants import TM_FORMAT, ColorMode, CONFIG_PATH
......@@ -574,7 +574,12 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
if self.list_of_dir == []:
msg = "No directories have been selected."
raise Exception(msg)
if self.warn_big_file_sizes.isChecked():
# call check_folder_size() here b/c it requires list_of_dir and it
# is before the called for detect_data_type() which sometimes take
# quite a long time.
if not check_folders_size(self.list_of_dir, self.req_wf_chans):
raise Exception("Big size")
# Log files don't have a data type that can be detected, so we don't
# detect the data type if we are reading them.
if self.rt130_das_dict == {} and not self.log_checkbox.isChecked():
......@@ -680,6 +685,9 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.cancel_loading()
return
self.data_type == 'Unknown'
elif str(e) == "Big size":
self.cancel_loading()
return
else:
fmt = traceback.format_exc()
QtWidgets.QMessageBox.warning(
......
......@@ -219,6 +219,11 @@ class UIMainWindow(object):
# ========================== Option Menu =======================
"""
warn_big_file_sizes: option to check file sizes and give warning if
total file sizes are greater than constant.BIG_FILE_SIZE
"""
self.warn_big_file_sizes: Optional[QAction] = None
"""
mp_regular_color_action: set self.mass_pos_volt_range_opt to 'regular'
mp_trillium_color_action: set self.mass_pos_volt_range_opt to
'trillium'
......@@ -629,6 +634,13 @@ class UIMainWindow(object):
:param main_window: QMainWindow - main GUI for user to interact with
:param menu: QMenu - Options Menu
"""
self.warn_big_file_sizes = QAction(
'Warn big file sizes', main_window
)
self.warn_big_file_sizes.setCheckable(True)
menu.addAction(self.warn_big_file_sizes)
menu.addSeparator()
mp_coloring_menu = QMenu('MP Coloring:', main_window)
menu.addMenu(mp_coloring_menu)
mp_coloring_group = QActionGroup(main_window)
......
from typing import List, Union, Optional, Tuple, Dict
import sys
import os
from PySide2.QtWidgets import QMessageBox, QApplication
from obspy.io.reftek.core import _is_reftek130
from sohstationviewer.conf.constants import BIG_FILE_SIZE
from sohstationviewer.controller.util import validate_file
from sohstationviewer.controller.processing import (
get_next_channel_from_mseed_file)
def _get_file_type(path2file: str) -> str:
"""
Get type of the given file:
+ TEXT if strip() can be used for the first 64 bytes
+ MSEED if channel name of the first record can be read
+ RT130 is decided using obspy method _is_reftek130()
+ BINARY if none of the above types are detected.
:param path2file: absolute path to file
:return: file's type
"""
with open(path2file, 'r') as file:
try:
file.read(64).strip()
return 'TEXT'
except UnicodeDecodeError:
pass
with open(path2file, 'rb') as file:
try:
# read header of the first record to decide MSEED file
get_next_channel_from_mseed_file(file)
return 'MSEED'
except ValueError:
if _is_reftek130(path2file):
return 'RT130'
else:
return 'BINARY'
def _get_multiplex_and_chan_id(path2file: str,
is_multiplex: bool,
non_multiplexed_reach_total_limit: bool
) -> Tuple[bool, Optional[str]]:
"""
Recursively reading a part of header of each data record in the given file
to get the channel name.
+ If more than one channel names are detected in a file, the data set is
definitely multiplexed.
+ If only one channel name is detected in a file, keep checking other
files until the number of files checked goes up to 10 then decide the
data set is non multiplexed which means using the first record to
decide the channel of the whole file.
:param path2file: absolute path to file
:param is_multiplex: multiplex status of the data set so far
:param non_multiplexed_reach_total_limit: the total of multiplexed
files reach limit
:return is_multiplex: multiplex status of the data set after the file is
read
:return chan: the last channel name read from this file.
+ If the file is multiplexed, which channel name return isn't matter.
+ If the file is non-multiplexed, all records have the same channel name.
+ chan can be None if the file isn't mseed, e.g. TEXT.
"""
chan = None
file = open(path2file, 'rb')
chans_in_stream = set()
while 1:
# read all records in a file until it is detected as multiplexed
is_eof = (file.read(1) == b'')
if is_eof:
break
file.seek(-1, 1)
try:
chan = get_next_channel_from_mseed_file(file)
if non_multiplexed_reach_total_limit:
# If total of non_multiplexed files reach limit, don't need
# to check all records anymore but treat the file as
# non-multiplexed which is all records have the same channel id
# => Use the first record to decide the channel of the file.
break
except ValueError:
file.close()
break
chans_in_stream.add(chan)
if len(chans_in_stream) > 1:
# a file is multiplexed if it has more than one channel id
is_multiplex = True
break
file.close()
return is_multiplex, chan
def _get_size_of_non_multiplex_waveform_file(
dir_path: str, req_wf_chans: List[Union[str, int]]):
"""
Estimate size of directory by collecting sizes of non-multiplexed waveform
files. This way, we can skip reading sizes of small files which makes the
speed of reading sizes improved especially for the case that the number of
small files is big.
:param dir_path: absolute path to directory
:param req_wf_chans: waveform request which can be list of data streams or
list of mseed wildcards
:return total_size:
+ 0 if this method won't be used to estimate the size of the directory
+ Estimated total size of the directory up to where it is greater than
BIG_FILE_SIZE
"""
# list of prefix of high sampling rate channels of which files' sizes are
# significantly larger than the ones of lower sampling rate channels.
wf_high_spr_prefix = ['FH', 'FN', # ≥ 1000 to < 5000
'GH', 'GL', # ≥ 1000 to < 5000
'DH', 'DL', # ≥ 250 to < 1000
'CH', 'CN', # ≥ 250 to < 1000
'EH', 'EL', 'EP', # ≥ 80
'SH', 'SL', 'SP', # ≥ 10 to < 80
'HH', 'HN', # ≥ 80
'BH', 'BN', # ≥ 10 to < 80
'MH', 'MN', 'MP', 'ML'] # >1 to < 10
"""
'LH','LL', 'LP', 'LN' =1
'VP', 'VL', 'VL', 'VH' = 0.1
'UN', 'UP', 'UL', 'UH' <=0.01
Skip channels with sampling rate <=1 because there are less data in the
files, which can result many files with small sizes in compare with sizes
of high sample rate files. (For case that the data set has only low
sampling rates, collecting data sizes won't be conducted in this method.)
"""
wf_chan_possibilities = set()
for request in req_wf_chans:
if request == '*':
wf_chan_possibilities.update(wf_high_spr_prefix)
elif request[0] == '*':
wf_chan_possibilities.update(
[prefix for prefix in wf_high_spr_prefix
if prefix[1] == request[1]])
elif request[1] == '*':
wf_chan_possibilities.update(
[prefix for prefix in wf_high_spr_prefix
if prefix[0] == request[0]])
if len(wf_chan_possibilities) == 0:
# no waveform channels available to pick, this method is not available
# for this data set.
return 0
total_size = 0
is_multiplex = False
count = 0
total_non_multiplexed_limit = 10
for path, subdirs, files in os.walk(dir_path):
total_of_mseed_files = 0
for file_name in files:
if not validate_file(os.path.join(path, file_name), file_name):
continue
fp = os.path.join(path, file_name)
is_multiplex, chan = _get_multiplex_and_chan_id(
fp, is_multiplex, count >= total_non_multiplexed_limit)
if chan is None:
continue
count += 1
total_of_mseed_files += 1
if is_multiplex:
# Don't use this method for multiplexed data set to elevate
# the speed
return 0
# not multiplex
if (chan is not None and
chan.startswith(tuple(wf_chan_possibilities))):
# ------ high sample rate mseed ------
# to help skip get size of too many small files,
# only read the big files which are in the list wf_chan_pos
file_size = os.path.getsize(fp)
total_size += file_size
if total_size > BIG_FILE_SIZE:
return total_size
else:
# ------ low sample rate mseed ------
if total_of_mseed_files == 50:
# When there are more than 50 low sampling rate mseed files
# in a folder, break the for loop to move to a different
# folder.
break
return total_size
def _get_size_rt130(dir_path: str, req_ds: List[int]):
"""
Get size of RT130's requested datas treams which is inside folder that has
data stream number as name.
:param dir_path: absolute path to directory
:param req_ds: list of requested data streams
:return total_size: total size of requested data streams up to where it
greater than BIG_FILE_SIZE
"""
if req_ds == ['*']:
req_ds = ['1', '2', '3', '4', '5', '6', '7', '8']
else:
req_ds = [str(req) for req in req_ds]
total_size = 0
for path, subdirs, files in os.walk(dir_path):
path_parts = path.split(os.sep)
ds = path_parts[-1]
if ds in req_ds:
# the direct folder of rt130 file must be named after data stream
for file_name in files:
fp = os.path.join(path, file_name)
file_size = os.path.getsize(fp)
total_size += file_size
if total_size > BIG_FILE_SIZE:
break
return total_size
def _get_size_mseed(dir_path: str) -> int:
"""
Get size of all files until total size > BIG_FILE_SIZE
:param dir_path: absolute path to directory
:return total_size: total size of the directory up to where it greater
than BIG_FILE_SIZE
"""
total_size = 0
count = 0
for path, subdirs, files in os.walk(dir_path):
for file_name in files:
if not validate_file(os.path.join(path, file_name), file_name):
continue
fp = os.path.join(path, file_name)
file_size = os.path.getsize(fp)
total_size += file_size
count += 1
if total_size > BIG_FILE_SIZE:
break
return total_size
def _get_dir_size(dir_path: str, req_wf_chans: List[Union[str, int]]):
"""
Get size of directory.
To make the process go fast, separate it into different case:
+ Non-multiplex MSeed with high sampling rate waveform
+ The rest cases of MSeed
+ RT130
+ If only text files or binary files found, count the most 200 files
and ask user to decide stopping or continuing process at their
own risk
:param dir_path: absolute path to directory
:param req_wf_chans: waveform request which can be list of data streams or
list of mseed wildcards
:return total_size:
+ 0 if don't have waveform request
+ total size of the directory up to where it greater than BIG_FILE_SIZE
+ -1 if count more than 200 TEXT files
+ -2 if count more than 200 BINARY files of which types are unkonwn
"""
text_file_count = 0
binary_file_count = 0
for path, subdirs, files in os.walk(dir_path):
for file_name in files:
path2file = os.path.join(path, file_name)
if not validate_file(path2file, file_name):
continue
file_type = _get_file_type(path2file)
if file_type == 'TEXT':
text_file_count += 1
if text_file_count > 200:
return {'data_size': -1, 'text_count': text_file_count}
continue
elif file_type == 'RT130':
return {'data_size': _get_size_rt130(dir_path, req_wf_chans)}
elif file_type == 'MSEED':
total_size = _get_size_of_non_multiplex_waveform_file(
dir_path, req_wf_chans)
if total_size != 0:
return {'data_size': total_size}
else:
return {'data_size': _get_size_mseed(dir_path)}
else:
binary_file_count += 1
if binary_file_count > 200:
return {'data_size': -1, 'binary_count': binary_file_count}
return {'data_size': -1,
'binary_count': binary_file_count, 'text_count': text_file_count}
def _abort_dialog(msg: str) -> bool:
"""
Provide confirming dialog for user to continue or not
:param msg: message of what need to be confirmed
:return: True for the confirmation. False for the cancel.
"""
dlg = QMessageBox()
dlg.setText(msg)
dlg.setInformativeText('Do you want to proceed?')
dlg.setStandardButtons(QMessageBox.Yes |
QMessageBox.Abort)
dlg.setDefaultButton(QMessageBox.Abort)
dlg.setIcon(QMessageBox.Question)
ret = dlg.exec_()
if ret == QMessageBox.Abort:
return False
else:
return True
def _check_folders_size(dir_paths: List[str],
req_wf_chans: List[Union[str, int]]
) -> Dict[str, int]:
"""
Check the folders in the list dir_paths for size of data files and count of
text file or binary.
:param dir_paths: list of paths to check for sizes
:param req_wf_chans: requirement of waveform channels
:return: dictionary of size or count info in the dir_paths
"""
final_result = {'data_size': 0, 'text_count': 0, 'binary_count': 0}
for dir_path in dir_paths:
if not os.path.isdir(dir_path):
raise Exception(f"'{dir_path}' isn't a valid directory")
result = _get_dir_size(dir_path, req_wf_chans)
if result['data_size'] >= 0:
final_result['data_size'] += result['data_size']
if final_result['data_size'] > BIG_FILE_SIZE:
break
else:
# only consider text and binary if no data
if 'text_count' in result:
final_result['text_count'] += result['text_count']
if final_result['text_count'] > 200:
break
if 'binary_count' in result:
final_result['binary_count'] += result['binary_count']
if final_result['binary_count'] > 200:
break
return final_result
def check_folders_size(dir_paths: List[str],
req_wf_chans: List[Union[str, int]]
) -> bool:
"""
Check the folders in the list dir_paths:
+ If found data in folders, return True if size <= BIG_FILE_SIZE.
Otherwise, ask user to continue or not.
+ If there are no data files at all, report the files found and ask user to
continue or not
:param dir_paths: list of paths to check for sizes
:param req_wf_chans: requirement of waveform channels
:return: True if the check is passed and False otherwise
"""
try:
final_result = _check_folders_size(dir_paths, req_wf_chans)
except Exception as e:
QMessageBox.information(None, "Error", str(e))
return False
if final_result['data_size'] > BIG_FILE_SIZE:
msg = ('The selected data set is greater than 2GB. It '
'might take a while to finish reading '
'and plotting everything.')
return _abort_dialog(msg)
elif final_result['data_size'] > 0:
return True
elif final_result['text_count'] > 200:
msg = ("There are more than 200 text files detected."
"Do you want to continue at your own risk?")
return _abort_dialog(msg)
elif final_result['binary_count'] > 200:
msg = ("There are more than 200 binary files detected."
"Do you want to continue at your own risk?")
return _abort_dialog(msg)
else:
file_info = []
if final_result['text_count'] > 0:
file_info.append(f"{final_result['text_count']} text files")
if final_result['binary_count'] > 0:
file_info.append(f"{final_result['binary_count']} binary files")
file_info_str = ' and '.join(file_info)
msg = (f"There are {file_info_str} detected with no data files."
"Do you want to continue at your own risk?")
return _abort_dialog(msg)
if __name__ == '__main__':
import platform
# Enable Layer-backing for MacOs version >= 11
# Only needed if using the pyside2 library with version>=5.15.
# Layer-backing is always enabled in pyside6.
os_name, version, *_ = platform.platform().split('-')
# if os_name == 'macOS' and version >= '11':
# mac OSX 11.6 appear to be 10.16 when read with python and still required
# this environment variable
if os_name == 'macOS':
os.environ['QT_MAC_WANTS_LAYER'] = '1'
app = QApplication(sys.argv)
print("BIG FILE SIZE:", BIG_FILE_SIZE)
data_folder = "/Volumes/UNTITLED/SOHView_data/"
"""
The following examples are based on BIG_FILE_SIZE = 2GB
"""
# ============== Centaur ========================
# multiplexed mseed: 1530200576B; system:1.53GB
# print(check_folders_size(
# [f'{data_folder}Centaur/Centaur_DataTest.nan'], ['*']))
# multiplexed mseed: 34171904; system:34.2MB
# print(check_folders_size(
# [f'{data_folder}Centaur/Centaur-2018-3734.nan'], ['*']))
# multiplexed mseed: 25198592; system:25.2MB
# print(check_folders_size(
# [f'{data_folder}Centaur/Centaur3604_100sps.nan'], ['*']))
# multiplexed mseed: 468665991; system:468.7 MB
# print(check_folders_size(
# [f'{data_folder}Centaur/CentaurDiskSOH600.nan'], ['*']))
# multiplexed mseed: 20992; system:21 KB
# print(check_folders_size([f'{data_folder}Centaur/soh'], ['*']))
# multiplexed mseed: 700416; system:700 KB
# print(check_folders_size([f'{data_folder}Centaur/SOH_split600'], ['*']))
# ============ pegasus ==============
# non-multiplexed mseed: 1703583744; system:1.72 GB
# total files: 1534
# total files counted for size: 153
# print(check_folders_size(
# [f'{data_folder}Pegasus/dave_pegasus.nan'], ['*']))
# non-multiplexed mseed: 288489472; system:292.3 MB
# total files: 251
# total files counted for size: 24
# print(check_folders_size(
# [f'{data_folder}Pegasus/GNARBOX_svc1'], ['*']))
# non-multiplexed mseed: 151818240; system: 152.6 MB
# total files: 112
# total files counted for size: 12
# print(check_folders_size(
# [f'{data_folder}Pegasus/LaptopSvc4_part'], ['*']))
# non-multiplexed mseed: 151818240; system: 378.7 MB
# total files: 919
# total files counted for size: 84
# print(check_folders_size(
# [f'{data_folder}Pegasus/Pegasus Offloads'], ['*']))
# non-multiplexed mseed: over limit, stop at 2002317312; system: 2.78 GB
# total files: 1882
# total files counted for size: 151
# print(check_folders_size(
# [f'{data_folder}Pegasus/Pegasus_KCT06_Test.nan'], ['*']))
# non-multiplexed mseed: 547151872; system: 571.4 MB
# total files: 578
# total files counted for size: 84
# print(check_folders_size(
# [f'{data_folder}Pegasus/Pegasus_SVC4.nan'], ['*']))
# non-multiplexed mseed: 108064768; system: 108.1 MB
# total files: 10
# total files counted for size: 9
# print(check_folders_size(
# [f'{data_folder}Pegasus/PegasusData_LittleEndian'], ['*']))
# ============ q330 ==============
# non-multiplexed mseed: over limit, stop at 2013265920; system: 11.25 GB
# total files: 685
# total files counted for size: 120
# print(check_folders_size(
# [f'{data_folder}Q330/5083.sdr'], ['*']))
# non-multiplexed mseed: 20725760; system: 21.1 MB
# total files: 21
# total files counted for size: 3
# print(check_folders_size(
# [f'{data_folder}Q330/5244.sdr'], ['*']))
# multiplexed mseed: 341540864; system: 341.5 MB
# print(check_folders_size(
# [f'{data_folder}Q330/B44-4000939.sdr/data'], ['*']))
# multiplexed mseed: 17319742; system: 17.3 MB
# print(check_folders_size(
# [f'{data_folder}Q330/GLISN-REF-SENSLOC-2018.06.26'], ['*']))
# non-multiplexed mseed: over limit, stop at 2013265920; system: 7.55 GB
# total files: 465
# total files counted for size: 120
# print(check_folders_size(
# [f'{data_folder}Q330/Q330_5281.sdr'], ['*']))
# ============ rt130 ==============
# rt130: over limit, stop at 2071080960; system: 3.6 GB
# print(check_folders_size(
# [f'{data_folder}RT130/9BB3_D1.cf'], ['*']))
# rt130: over limit, stop at 2008623104; system: 2.16 GB
# print(check_folders_size(
# [f'{data_folder}RT130/9BB3_D2.cf'], ['*']))
# rt130: 95880192; system: 95.9 MB
# print(check_folders_size(
# [f'{data_folder}RT130/9BB3_D3.cf'], ['*']))
# rt130: 1227625472; system: 1.24 GB
# print(check_folders_size(
# [f'{data_folder}RT130/2011028.9AFA'], ['*']))
# rt130: 294737920; system: 296.8 MB
# print(check_folders_size(
# [f'{data_folder}RT130/2013326.9E4A'], ['*']))
# rt130: 1375256576; system: 1.38 GB
# print(check_folders_size(
# [f'{data_folder}RT130/2016174.9AC4'], ['*']))
# rt130: 46885888; system: 46.9 MB
# print(check_folders_size(
# [f'{data_folder}RT130/2017149.92EB'], ['*']))
# rt130: over limit, stop at 2087160832; system: 4.01 GB
# print(check_folders_size(
# [f'{data_folder}RT130/RT130-92E9-1.cf'], ['*']))
# rt130: 11527168; system: 11.5 MB
# print(check_folders_size(
# [f'{data_folder}RT130/RT130-2016290.9DF5.cf'], ['*']))
# rt130: 126618624; system: 127.4 MB
# print(check_folders_size(
# [f'{data_folder}RT130/RT130-A195-1.cf'], ['*']))
# rt130: 32062464; system: 32.2 MB
# print(check_folders_size(
# [f'{data_folder}RT130/testCF'], ['*']))
# rt130: size: 306176; system: 319 KB
# print(check_folders_size(
# [f'{data_folder}RT130/TESTRT130'], ['*']))
# ===============================
# text:
# print(check_folders_size(
# [f'{data_folder}Pegasus/Pegasus Offloads/logs'], ['*']))
# =================================
data_folder = "/Volumes/UNTITLED/issues_from_data_group/"
# mseed: size: 496574464; system: 496.6 MB
# print(check_folders_size(
# [f'{data_folder}6407.sdr'], ['*']))
# # non-multiplex mseed: size: 40435712; system: 41.2 MB
# print(check_folders_size(
# [f'{data_folder}77702'], ['*']))
# mseed: size: 206174720; system: 206.2 MB
# print(check_folders_size(
# [f'{data_folder}CONZ-5296-SOH.nan'], ['*']))
# non-multiplexed mseed: over limit, stop at 2013265920; system: 19.54 GB
# print(check_folders_size(
# [f'{data_folder}ELHT-6445.sdr'], ['*']))
# non-multiplexed mseed: 1814528; system: 37.6 MB
# Only one high sampling rate waveform file, many small soh files
# and text file => The size got is the waveform file. The size isn't
# correct but doesn't affect much of the result.
# THIS CASE IS SPECIAL.
# the first time for some reason it couldn't stop process.
# print(check_folders_size(
# [f'{data_folder}MN38'], ['*']))
# non-multiplexed mseed: 120705024; system: 120.7 MB
# No waveform files.
# print(check_folders_size(
# [f'{data_folder}NHRK.sdr'], ['*']))
# mseed: 708777984; system: 712.1 MB
# print(check_folders_size(
# [f'{data_folder}RT-9926-1.cf'], ['*']))
print("FINISH")
sys.exit(app.exec_())
from tempfile import TemporaryDirectory, NamedTemporaryFile
import shutil
import os
from pathlib import Path
from unittest import TestCase
from sohstationviewer.view.util.check_file_size import _check_folders_size
from sohstationviewer.conf.constants import BIG_FILE_SIZE
TEST_DATA_DIR = Path(__file__).resolve().parent.parent.parent.joinpath(
'test_data')
NON_DATA_FILE = TEST_DATA_DIR.joinpath('Non-data-file/non_data_file')
MULTIPLEX_FILE = TEST_DATA_DIR.joinpath(
'Q330_multiplex/XX-3203_4-20221222183011')
NON_MULTIPLEX_LOW_SPR_FILE = TEST_DATA_DIR.joinpath(
'Q330-sample/day_vols_AX08/AX08.XA..VM1.2021.186')
NON_MULTIPLEX_HIGH_SPR_FILE = TEST_DATA_DIR.joinpath(
'Q330-sample/day_vols_AX08/AX08.XA..HHE.2021.186')
NON_MULTIPLEX_HIGH_N_LOW_SPR_SET = TEST_DATA_DIR.joinpath('Q330-sample')
RT130_FILE = TEST_DATA_DIR.joinpath(
'RT130-sample/2017149.92EB/2017150/92EB/1/010000015_0036EE80')
class TestGetDirSize(TestCase):
def test_less_or_equal_200_text_files(self):
number_of_text_files = 25
with TemporaryDirectory() as directory:
files = []
for i in range(number_of_text_files):
files.append(NamedTemporaryFile(dir=directory))
expected_result = {'data_size': 0,
'text_count': 25,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
# Explicitly clean up the temporary files. If we don't do this,
# the temporary directory will clean up itself and delete the
# temporary files. Then, when the function returns, the references
# to these temporary files will attempt to clean up the files. This
# leads to exceptions being raised because the files being cleaned
# up does not exist anymore.
[file.close() for file in files]
def test_more_than_200_text_files(self):
number_of_text_files = 250
with TemporaryDirectory() as directory:
files = []
for i in range(number_of_text_files):
files.append(NamedTemporaryFile(dir=directory))
expected_result = {'data_size': 0,
'text_count': 201, # stop when more than 200
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[file.close() for file in files]
def test_less_or_equal_200_binary_files(self):
number_of_binary_files = 25
with TemporaryDirectory() as directory:
files = []
for i in range(number_of_binary_files):
new_file_path = Path(directory).joinpath(
f'{NON_DATA_FILE.name}_{i}')
shutil.copy(NON_DATA_FILE, new_file_path)
files.append(new_file_path)
expected_result = {'data_size': 0,
'text_count': 0,
'binary_count': 25}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_more_than_200_binary_files(self):
number_of_binary_files = 250
with TemporaryDirectory() as directory:
files = []
for i in range(number_of_binary_files):
new_file_path = Path(directory).joinpath(
f'{NON_DATA_FILE.name}_{i}')
shutil.copy(NON_DATA_FILE, new_file_path)
files.append(new_file_path)
expected_result = {'data_size': 0,
'text_count': 0,
'binary_count': 201} # stop when more than 200
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_less_or_equal_limit_mseed_multiplexed_files(self):
sample_file_size = os.path.getsize(MULTIPLEX_FILE)
expected_size = 0
with TemporaryDirectory() as directory:
files = []
for i in range(3):
new_file_path = Path(directory).joinpath(
f'{MULTIPLEX_FILE.name}_{i}')
shutil.copy(MULTIPLEX_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_more_than_limit_mseed_multiplexed_files(self):
sample_file_size = os.path.getsize(MULTIPLEX_FILE)
expected_size = 0
count = 0
with TemporaryDirectory() as directory:
files = []
while 1:
new_file_path = Path(directory).joinpath(
f'{MULTIPLEX_FILE.name}_{count}')
shutil.copy(MULTIPLEX_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
if expected_size > BIG_FILE_SIZE:
break
count += 1
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_less_or_equal_limit_mseed_non_multiplexed_low_spr_files(self):
sample_file_size = os.path.getsize(NON_MULTIPLEX_LOW_SPR_FILE)
expected_size = 0
with TemporaryDirectory() as directory:
files = []
for i in range(3):
new_file_path = Path(directory).joinpath(
f'{NON_MULTIPLEX_LOW_SPR_FILE.name}_{i}')
shutil.copy(NON_MULTIPLEX_LOW_SPR_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_less_or_equal_limit_mseed_non_multiplexed_high_spr_files(self):
sample_file_size = os.path.getsize(NON_MULTIPLEX_HIGH_SPR_FILE)
expected_size = 0
with TemporaryDirectory() as directory:
files = []
for i in range(3):
new_file_path = Path(directory).joinpath(
f'{NON_MULTIPLEX_HIGH_SPR_FILE.name}_{i}')
shutil.copy(NON_MULTIPLEX_HIGH_SPR_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_more_than_limit_mseed_non_multiplexed_high_spr_files(self):
sample_file_size = os.path.getsize(NON_MULTIPLEX_HIGH_SPR_FILE)
expected_size = 0
count = 0
with TemporaryDirectory() as directory:
files = []
while 1:
new_file_path = Path(directory).joinpath(
f'{NON_MULTIPLEX_HIGH_SPR_FILE.name}_{count}')
shutil.copy(NON_MULTIPLEX_HIGH_SPR_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
if expected_size > BIG_FILE_SIZE:
break
count += 1
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_mseed_non_multiplexed_high_n_low_spr_files(self):
expected_result = {'data_size': 11251712,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([NON_MULTIPLEX_HIGH_N_LOW_SPR_SET], [])
self.assertEqual(ret, expected_result)
def test_less_or_equal_limit_rt130_files(self):
sample_file_size = os.path.getsize(RT130_FILE)
expected_size = 0
with TemporaryDirectory() as directory:
files = []
new_data_stream_path = Path(directory).joinpath('1')
new_data_stream_path.mkdir(
parents=True, exist_ok=True)
for i in range(3):
new_file_path = new_data_stream_path.joinpath(
f'{RT130_FILE.name}_{i}')
shutil.copy(RT130_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], ['1'])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_more_than_limit_rt130_files(self):
sample_file_size = os.path.getsize(RT130_FILE)
expected_size = 0
count = 0
with TemporaryDirectory() as directory:
files = []
new_data_stream_path = Path(directory).joinpath('1')
new_data_stream_path.mkdir(
parents=True, exist_ok=True)
while 1:
new_file_path = new_data_stream_path.joinpath(
f'{RT130_FILE.name}_{count}')
shutil.copy(RT130_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
if expected_size > BIG_FILE_SIZE:
break
count += 1
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], ['1'])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_rt130_no_requested_datastream_files(self):
sample_file_size = os.path.getsize(RT130_FILE)
expected_size = 0
with TemporaryDirectory() as directory:
files = []
new_data_stream_path = Path(directory).joinpath('1')
new_data_stream_path.mkdir(
parents=True, exist_ok=True)
for i in range(3):
new_file_path = new_data_stream_path.joinpath(
f'{RT130_FILE.name}_{i}')
shutil.copy(RT130_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
expected_result = {'data_size': 0,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], ['2'])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_empty_directory(self):
with TemporaryDirectory() as temp_dir:
expected_result = {'data_size': 0,
'text_count': 0,
'binary_count': 0}
result = _check_folders_size([temp_dir], ['*'])
self.assertEqual(result, expected_result)
def test_directory_does_not_exist(self):
empty_name_dir = ''
try:
_check_folders_size([empty_name_dir], [])
except Exception as e:
self.assertEqual(
str(e),
"'' isn't a valid directory"
)
non_existent_dir = 'directory does not exist'
try:
_check_folders_size([non_existent_dir], [])
except Exception as e:
self.assertEqual(
str(e),
"'directory does not exist' isn't a valid directory"
)