Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
Commits on Source (4)
Showing
with 1281 additions and 5 deletions
......@@ -12,7 +12,7 @@ WF_3RD = 'ZNE123456'
HIGHEST_INT = 1E100
# warn user if file bigger than this size
BIG_FILE_SIZE = 2 * 10**8
BIG_FILE_SIZE = 2 * 10**9 # 2 GB
# Matplotlib's performance be slow if data point total > than this limit
CHAN_SIZE_LIMIT = 10**6
......
from PySide2 import QtWidgets, QtGui
from PySide2.QtWidgets import QWidget, QDialog
def display_color(color_label: QtWidgets.QLabel, color: str):
"""
Display color on color_label.
Display the given color on the color_label
:param color_label: the label to display color
:param color: the color that is given to update the color_label
"""
palette = color_label.palette()
palette.setColor(QtGui.QPalette.Background, QtGui.QColor(color))
color_label.setPalette(palette)
class EditValueColorDialog(QDialog):
"""Base class for value color editing dialogs of different plot types"""
def __init__(self, parent: QWidget, value_color_str: str):
"""
:param parent: the parent widget
:param value_color_str: string for value color to be saved in DB
"""
super(EditValueColorDialog, self).__init__(parent)
self.value_color_str = value_color_str
self.main_layout = QtWidgets.QGridLayout()
self.setLayout(self.main_layout)
self.cancel_btn = QtWidgets.QPushButton('CANCEL', self)
self.save_colors_btn = QtWidgets.QPushButton('SAVE COLORS', self)
self.setup_ui()
self.set_value()
self.connect_signals()
def setup_ui(self):
pass
def set_value(self):
pass
def setup_complete_buttons(self, row_total) -> None:
"""
:param row_total: total of rows to edit
"""
self.main_layout.addWidget(self.cancel_btn, row_total, 0, 1, 1)
self.main_layout.addWidget(self.save_colors_btn, row_total, 3, 1, 1)
def connect_signals(self) -> None:
self.cancel_btn.clicked.connect(self.close)
self.save_colors_btn.clicked.connect(self.on_save_color)
def on_select_color(self, color_label: QtWidgets.QLabel):
"""
When clicking on Select Color button, Color Picker will pop up with
the default color is color_label's color.
User will select a color then save to update the selected color to
the color_label.
:param color_label: the label that display the color of the obj_type
"""
color = color_label.palette().window().color()
new_color = QtWidgets.QColorDialog.getColor(color)
if new_color.isValid():
display_color(color_label, new_color.name())
self.raise_()
import sys
import platform
import os
from PySide2 import QtWidgets
from PySide2.QtWidgets import QWidget
from sohstationviewer.view.db_config.value_color_helper.\
edit_value_color_dialog.edit_value_color_dialog_super_class import \
EditValueColorDialog, display_color
class LineDotDialog(EditValueColorDialog):
"""Dialog to edit color for Line/Dot Plot"""
def __init__(self, parent: QWidget, value_color_str: str):
"""
:param parent: the parent widget
:param value_color_str: string for value color to be saved in DB
"""
# Widget that allow user to add/edit line's color
self.select_line_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display line's color
self.line_color_label = QtWidgets.QLabel()
self.line_color_label.setFixedWidth(30)
self.line_color_label.setAutoFillBackground(True)
# check box to include dot in value_color_str or not
self.dot_include_chkbox = QtWidgets.QCheckBox('Included')
# Widget that allow user to add/edit dot's color
self.select_dot_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display dot's color
self.dot_color_label = QtWidgets.QLabel()
self.dot_color_label.setFixedWidth(30)
self.dot_color_label.setAutoFillBackground(True)
super(LineDotDialog, self).__init__(parent, value_color_str)
self.setWindowTitle("Edit Line/Dot Plotting's Colors")
self.on_click_include_dot()
def setup_ui(self) -> None:
self.main_layout.addWidget(QtWidgets.QLabel('Line Color'), 0, 1, 1, 1)
self.main_layout.addWidget(self.line_color_label, 0, 2, 1, 1)
self.main_layout.addWidget(self.select_line_color_btn, 0, 3, 1, 1)
self.main_layout.addWidget(self.dot_include_chkbox, 1, 0, 1, 1)
self.main_layout.addWidget(QtWidgets.QLabel('Dot Color'), 1, 1, 1, 1)
self.main_layout.addWidget(self.dot_color_label, 1, 2, 1, 1)
self.main_layout.addWidget(self.select_dot_color_btn, 1, 3, 1, 1)
self.setup_complete_buttons(2)
def connect_signals(self) -> None:
self.select_line_color_btn.clicked.connect(
lambda: self.on_select_color(self.line_color_label))
self.select_dot_color_btn.clicked.connect(
lambda: self.on_select_color(self.dot_color_label))
self.dot_include_chkbox.clicked.connect(self.on_click_include_dot)
super().connect_signals()
def on_click_include_dot(self):
"""
Enable/disable select color and show/hide color label according to
dot_include_chkbox is checked or unchecked.
"""
enabled = self.dot_include_chkbox.isChecked()
self.select_dot_color_btn.setEnabled(enabled)
self.dot_color_label.setHidden(not enabled)
def set_value(self):
"""
Change the corresponding color_labels's color according to the color
from value_color_str.
"""
self.dot_include_chkbox.setChecked(False)
if self.value_color_str == "":
return
vc_parts = self.value_color_str.split('|')
for vc_str in vc_parts:
obj_type, color = vc_str.split(':')
if obj_type == 'Line':
display_color(self.line_color_label, color)
if obj_type == 'Dot':
display_color(self.dot_color_label, color)
self.dot_include_chkbox.setChecked(True)
def on_save_color(self):
"""
Create value_color_str from GUI's info and close the GUI with color
is the hex color got from color_labels' color
"""
line_color = self.line_color_label.palette().window().color().name()
self.value_color_str = f"Line:{line_color}"
if self.dot_include_chkbox.isChecked():
dot_color = self.dot_color_label.palette().window().color().name()
self.value_color_str += f"|Dot:{dot_color}"
self.close()
if __name__ == '__main__':
os_name, version, *_ = platform.platform().split('-')
if os_name == 'macOS':
os.environ['QT_MAC_WANTS_LAYER'] = '1'
app = QtWidgets.QApplication(sys.argv)
test = LineDotDialog(None, "Line:#00FF00|Dot:#00FF00")
test.exec_()
print("result:", test.value_color_str)
sys.exit(app.exec_())
import sys
import platform
import os
from PySide2 import QtWidgets
from PySide2.QtWidgets import QWidget
from sohstationviewer.view.db_config.value_color_helper.\
edit_value_color_dialog.edit_value_color_dialog_super_class import \
EditValueColorDialog, display_color
class TriColorLinesDialog(EditValueColorDialog):
"""Dialog to edit color for triColorLines plot"""
def __init__(self, parent: QWidget, value_color_str: str):
"""
:param parent: the parent widget
:param value_color_str: string for value color to be saved in DB
"""
# Widget that allow user to add/edit value positive one's color
self.select_pos_one_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display positive one's color
self.pos_one_color_label = QtWidgets.QLabel()
self.pos_one_color_label.setFixedWidth(30)
self.pos_one_color_label.setAutoFillBackground(True)
# Widget that allow user to add/edit value zero's color
self.select_zero_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display down's color
self.zero_color_label = QtWidgets.QLabel()
self.zero_color_label.setFixedWidth(30)
self.zero_color_label.setAutoFillBackground(True)
# Widget that allow user to add/edit value positive one's color
self.select_neg_one_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display positive one's color
self.neg_one_color_label = QtWidgets.QLabel()
self.neg_one_color_label.setFixedWidth(30)
self.neg_one_color_label.setAutoFillBackground(True)
super(TriColorLinesDialog, self).__init__(parent, value_color_str)
self.setWindowTitle("Edit TriColor Plotting's Colors")
def setup_ui(self) -> None:
self.main_layout.addWidget(QtWidgets.QLabel('" 1" Color'), 0, 0, 1, 1)
self.main_layout.addWidget(self.pos_one_color_label, 0, 1, 1, 1)
self.main_layout.addWidget(self.select_pos_one_color_btn, 0, 2, 1, 1)
self.main_layout.addWidget(QtWidgets.QLabel('" 0" Color'), 1, 0, 1, 1)
self.main_layout.addWidget(self.zero_color_label, 1, 1, 1, 1)
self.main_layout.addWidget(self.select_zero_color_btn, 1, 2, 1, 1)
self.main_layout.addWidget(QtWidgets.QLabel('"-1" Color'), 2, 0, 1, 1)
self.main_layout.addWidget(self.neg_one_color_label, 2, 1, 1, 1)
self.main_layout.addWidget(self.select_neg_one_color_btn, 2, 2, 1, 1)
self.setup_complete_buttons(3)
def connect_signals(self) -> None:
self.select_pos_one_color_btn.clicked.connect(
lambda: self.on_select_color(self.pos_one_color_label))
self.select_zero_color_btn.clicked.connect(
lambda: self.on_select_color(self.zero_color_label))
self.select_neg_one_color_btn.clicked.connect(
lambda: self.on_select_color(self.neg_one_color_label))
super().connect_signals()
def set_value(self):
"""
Change the corresponding color_labels's color according to the color
from value_color_str.
"""
if self.value_color_str == "":
return
vc_parts = self.value_color_str.split('|')
for vc_str in vc_parts:
val, color = vc_str.split(':')
if val == '1':
display_color(self.pos_one_color_label, color)
if val == '0':
display_color(self.zero_color_label, color)
if val == '-1':
display_color(self.neg_one_color_label, color)
def on_save_color(self):
"""
Create value_color_str from GUI's info and close the GUI with color
is the hex color got from color_labels' color
"""
pos_one_color = self.pos_one_color_label.palette()\
.window().color().name()
zero_color = self.zero_color_label.palette().window().color().name()
neg_one_color = self.neg_one_color_label.palette() \
.window().color().name()
self.value_color_str = (f"-1:{neg_one_color}|0:{zero_color}"
f"|1:{pos_one_color}")
self.close()
if __name__ == '__main__':
os_name, version, *_ = platform.platform().split('-')
if os_name == 'macOS':
os.environ['QT_MAC_WANTS_LAYER'] = '1'
app = QtWidgets.QApplication(sys.argv)
test = TriColorLinesDialog(None, '-1:#FF00FF|0:#FF0000|1:#00FF00')
test.exec_()
print("result:", test.value_color_str)
sys.exit(app.exec_())
import sys
import platform
import os
from PySide2 import QtWidgets
from PySide2.QtWidgets import QWidget
from sohstationviewer.view.db_config.value_color_helper.\
edit_value_color_dialog.edit_value_color_dialog_super_class import \
EditValueColorDialog, display_color
class UpDownDialog(EditValueColorDialog):
"""Dialog to edit color for Up/Down Plot"""
def __init__(self, parent: QWidget, value_color_str: str):
"""
:param parent: the parent widget
:param value_color_str: string for value color to be saved in DB
"""
# Widget that allow user to add/edit up's color
self.select_up_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display up's color
self.up_color_label = QtWidgets.QLabel()
self.up_color_label.setFixedWidth(30)
self.up_color_label.setAutoFillBackground(True)
# Widget that allow user to add/edit down's color
self.select_down_color_btn = QtWidgets.QPushButton("Select Color")
# Widget to display down's color
self.down_color_label = QtWidgets.QLabel()
self.down_color_label.setFixedWidth(30)
self.down_color_label.setAutoFillBackground(True)
super(UpDownDialog, self).__init__(parent, value_color_str)
self.setWindowTitle("Edit Up/Down Plotting's Colors")
def setup_ui(self) -> None:
self.main_layout.addWidget(QtWidgets.QLabel('Up Color'), 0, 0, 1, 1)
self.main_layout.addWidget(self.up_color_label, 0, 1, 1, 1)
self.main_layout.addWidget(self.select_up_color_btn, 0, 2, 1, 1)
self.main_layout.addWidget(QtWidgets.QLabel('Down Color'), 1, 0, 1, 1)
self.main_layout.addWidget(self.down_color_label, 1, 1, 1, 1)
self.main_layout.addWidget(self.select_down_color_btn, 1, 2, 1, 1)
self.setup_complete_buttons(2)
def connect_signals(self) -> None:
self.select_up_color_btn.clicked.connect(
lambda: self.on_select_color(self.up_color_label))
self.select_down_color_btn.clicked.connect(
lambda: self.on_select_color(self.down_color_label))
super().connect_signals()
def set_value(self):
"""
Change the corresponding color_labels's color according to the color
from value_color_str.
"""
if self.value_color_str == "":
return
vc_parts = self.value_color_str.split('|')
for vc_str in vc_parts:
obj_type, color = vc_str.split(':')
if obj_type == 'Up':
display_color(self.up_color_label, color)
if obj_type == 'Down':
display_color(self.down_color_label, color)
def on_save_color(self):
"""
Create value_color_str from GUI's info and close the GUI with color
is the hex color got from color_labels' color
"""
up_color = self.up_color_label.palette().window().color().name()
down_color = self.down_color_label.palette().window().color().name()
self.value_color_str = f"Up:{up_color}|Down:{down_color}"
self.close()
if __name__ == '__main__':
os_name, version, *_ = platform.platform().split('-')
if os_name == 'macOS':
os.environ['QT_MAC_WANTS_LAYER'] = '1'
app = QtWidgets.QApplication(sys.argv)
test = UpDownDialog(None, 'Down:#FF0000|Up:#00FF00')
test.exec_()
print("result:", test.value_color_str)
sys.exit(app.exec_())
......@@ -36,8 +36,9 @@ from sohstationviewer.view.help_view import HelpBrowser
from sohstationviewer.view.ui.main_ui import UIMainWindow
from sohstationviewer.view.util.enums import LogType
from sohstationviewer.view.util.functions import (
check_chan_wildcards_format, check_masspos,
)
check_chan_wildcards_format, check_masspos)
from sohstationviewer.view.util.check_file_size import check_folders_size
from sohstationviewer.view.channel_prefer_dialog import ChannelPreferDialog
from sohstationviewer.view.create_muti_buttons_dialog import (
create_multi_buttons_dialog
......@@ -47,7 +48,6 @@ from sohstationviewer.controller.processing import detect_data_type
from sohstationviewer.controller.util import (
display_tracking_info, rt130_find_cf_dass, check_data_sdata
)
from sohstationviewer.database.process_db import execute_db_dict, execute_db
from sohstationviewer.conf.constants import TM_FORMAT, ColorMode, CONFIG_PATH
......@@ -574,7 +574,12 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
if self.list_of_dir == []:
msg = "No directories have been selected."
raise Exception(msg)
if self.warn_big_file_sizes.isChecked():
# call check_folder_size() here b/c it requires list_of_dir and it
# is before the called for detect_data_type() which sometimes take
# quite a long time.
if not check_folders_size(self.list_of_dir, self.req_wf_chans):
raise Exception("Big size")
# Log files don't have a data type that can be detected, so we don't
# detect the data type if we are reading them.
if self.rt130_das_dict == {} and not self.log_checkbox.isChecked():
......@@ -680,6 +685,9 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.cancel_loading()
return
self.data_type == 'Unknown'
elif str(e) == "Big size":
self.cancel_loading()
return
else:
fmt = traceback.format_exc()
QtWidgets.QMessageBox.warning(
......
......@@ -219,6 +219,11 @@ class UIMainWindow(object):
# ========================== Option Menu =======================
"""
warn_big_file_sizes: option to check file sizes and give warning if
total file sizes are greater than constant.BIG_FILE_SIZE
"""
self.warn_big_file_sizes: Optional[QAction] = None
"""
mp_regular_color_action: set self.mass_pos_volt_range_opt to 'regular'
mp_trillium_color_action: set self.mass_pos_volt_range_opt to
'trillium'
......@@ -629,6 +634,13 @@ class UIMainWindow(object):
:param main_window: QMainWindow - main GUI for user to interact with
:param menu: QMenu - Options Menu
"""
self.warn_big_file_sizes = QAction(
'Warn big file sizes', main_window
)
self.warn_big_file_sizes.setCheckable(True)
menu.addAction(self.warn_big_file_sizes)
menu.addSeparator()
mp_coloring_menu = QMenu('MP Coloring:', main_window)
menu.addMenu(mp_coloring_menu)
mp_coloring_group = QActionGroup(main_window)
......
This diff is collapsed.
from tempfile import TemporaryDirectory, NamedTemporaryFile
import shutil
import os
from pathlib import Path
from unittest import TestCase
from sohstationviewer.view.util.check_file_size import _check_folders_size
from sohstationviewer.conf.constants import BIG_FILE_SIZE
TEST_DATA_DIR = Path(__file__).resolve().parent.parent.parent.joinpath(
'test_data')
NON_DATA_FILE = TEST_DATA_DIR.joinpath('Non-data-file/non_data_file')
MULTIPLEX_FILE = TEST_DATA_DIR.joinpath(
'Q330_multiplex/XX-3203_4-20221222183011')
NON_MULTIPLEX_LOW_SPR_FILE = TEST_DATA_DIR.joinpath(
'Q330-sample/day_vols_AX08/AX08.XA..VM1.2021.186')
NON_MULTIPLEX_HIGH_SPR_FILE = TEST_DATA_DIR.joinpath(
'Q330-sample/day_vols_AX08/AX08.XA..HHE.2021.186')
NON_MULTIPLEX_HIGH_N_LOW_SPR_SET = TEST_DATA_DIR.joinpath('Q330-sample')
RT130_FILE = TEST_DATA_DIR.joinpath(
'RT130-sample/2017149.92EB/2017150/92EB/1/010000015_0036EE80')
class TestGetDirSize(TestCase):
def test_less_or_equal_200_text_files(self):
number_of_text_files = 25
with TemporaryDirectory() as directory:
files = []
for i in range(number_of_text_files):
files.append(NamedTemporaryFile(dir=directory))
expected_result = {'data_size': 0,
'text_count': 25,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
# Explicitly clean up the temporary files. If we don't do this,
# the temporary directory will clean up itself and delete the
# temporary files. Then, when the function returns, the references
# to these temporary files will attempt to clean up the files. This
# leads to exceptions being raised because the files being cleaned
# up does not exist anymore.
[file.close() for file in files]
def test_more_than_200_text_files(self):
number_of_text_files = 250
with TemporaryDirectory() as directory:
files = []
for i in range(number_of_text_files):
files.append(NamedTemporaryFile(dir=directory))
expected_result = {'data_size': 0,
'text_count': 201, # stop when more than 200
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[file.close() for file in files]
def test_less_or_equal_200_binary_files(self):
number_of_binary_files = 25
with TemporaryDirectory() as directory:
files = []
for i in range(number_of_binary_files):
new_file_path = Path(directory).joinpath(
f'{NON_DATA_FILE.name}_{i}')
shutil.copy(NON_DATA_FILE, new_file_path)
files.append(new_file_path)
expected_result = {'data_size': 0,
'text_count': 0,
'binary_count': 25}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_more_than_200_binary_files(self):
number_of_binary_files = 250
with TemporaryDirectory() as directory:
files = []
for i in range(number_of_binary_files):
new_file_path = Path(directory).joinpath(
f'{NON_DATA_FILE.name}_{i}')
shutil.copy(NON_DATA_FILE, new_file_path)
files.append(new_file_path)
expected_result = {'data_size': 0,
'text_count': 0,
'binary_count': 201} # stop when more than 200
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_less_or_equal_limit_mseed_multiplexed_files(self):
sample_file_size = os.path.getsize(MULTIPLEX_FILE)
expected_size = 0
with TemporaryDirectory() as directory:
files = []
for i in range(3):
new_file_path = Path(directory).joinpath(
f'{MULTIPLEX_FILE.name}_{i}')
shutil.copy(MULTIPLEX_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_more_than_limit_mseed_multiplexed_files(self):
sample_file_size = os.path.getsize(MULTIPLEX_FILE)
expected_size = 0
count = 0
with TemporaryDirectory() as directory:
files = []
while 1:
new_file_path = Path(directory).joinpath(
f'{MULTIPLEX_FILE.name}_{count}')
shutil.copy(MULTIPLEX_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
if expected_size > BIG_FILE_SIZE:
break
count += 1
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_less_or_equal_limit_mseed_non_multiplexed_low_spr_files(self):
sample_file_size = os.path.getsize(NON_MULTIPLEX_LOW_SPR_FILE)
expected_size = 0
with TemporaryDirectory() as directory:
files = []
for i in range(3):
new_file_path = Path(directory).joinpath(
f'{NON_MULTIPLEX_LOW_SPR_FILE.name}_{i}')
shutil.copy(NON_MULTIPLEX_LOW_SPR_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_less_or_equal_limit_mseed_non_multiplexed_high_spr_files(self):
sample_file_size = os.path.getsize(NON_MULTIPLEX_HIGH_SPR_FILE)
expected_size = 0
with TemporaryDirectory() as directory:
files = []
for i in range(3):
new_file_path = Path(directory).joinpath(
f'{NON_MULTIPLEX_HIGH_SPR_FILE.name}_{i}')
shutil.copy(NON_MULTIPLEX_HIGH_SPR_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_more_than_limit_mseed_non_multiplexed_high_spr_files(self):
sample_file_size = os.path.getsize(NON_MULTIPLEX_HIGH_SPR_FILE)
expected_size = 0
count = 0
with TemporaryDirectory() as directory:
files = []
while 1:
new_file_path = Path(directory).joinpath(
f'{NON_MULTIPLEX_HIGH_SPR_FILE.name}_{count}')
shutil.copy(NON_MULTIPLEX_HIGH_SPR_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
if expected_size > BIG_FILE_SIZE:
break
count += 1
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], [])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_mseed_non_multiplexed_high_n_low_spr_files(self):
expected_result = {'data_size': 11251712,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([NON_MULTIPLEX_HIGH_N_LOW_SPR_SET], [])
self.assertEqual(ret, expected_result)
def test_less_or_equal_limit_rt130_files(self):
sample_file_size = os.path.getsize(RT130_FILE)
expected_size = 0
with TemporaryDirectory() as directory:
files = []
new_data_stream_path = Path(directory).joinpath('1')
new_data_stream_path.mkdir(
parents=True, exist_ok=True)
for i in range(3):
new_file_path = new_data_stream_path.joinpath(
f'{RT130_FILE.name}_{i}')
shutil.copy(RT130_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], ['1'])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_more_than_limit_rt130_files(self):
sample_file_size = os.path.getsize(RT130_FILE)
expected_size = 0
count = 0
with TemporaryDirectory() as directory:
files = []
new_data_stream_path = Path(directory).joinpath('1')
new_data_stream_path.mkdir(
parents=True, exist_ok=True)
while 1:
new_file_path = new_data_stream_path.joinpath(
f'{RT130_FILE.name}_{count}')
shutil.copy(RT130_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
if expected_size > BIG_FILE_SIZE:
break
count += 1
expected_result = {'data_size': expected_size,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], ['1'])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_rt130_no_requested_datastream_files(self):
sample_file_size = os.path.getsize(RT130_FILE)
expected_size = 0
with TemporaryDirectory() as directory:
files = []
new_data_stream_path = Path(directory).joinpath('1')
new_data_stream_path.mkdir(
parents=True, exist_ok=True)
for i in range(3):
new_file_path = new_data_stream_path.joinpath(
f'{RT130_FILE.name}_{i}')
shutil.copy(RT130_FILE, new_file_path)
files.append(new_file_path)
expected_size += sample_file_size
expected_result = {'data_size': 0,
'text_count': 0,
'binary_count': 0}
ret = _check_folders_size([directory], ['2'])
self.assertEqual(ret, expected_result)
[os.unlink(file) for file in files]
def test_empty_directory(self):
with TemporaryDirectory() as temp_dir:
expected_result = {'data_size': 0,
'text_count': 0,
'binary_count': 0}
result = _check_folders_size([temp_dir], ['*'])
self.assertEqual(result, expected_result)
def test_directory_does_not_exist(self):
empty_name_dir = ''
try:
_check_folders_size([empty_name_dir], [])
except Exception as e:
self.assertEqual(
str(e),
"'' isn't a valid directory"
)
non_existent_dir = 'directory does not exist'
try:
_check_folders_size([non_existent_dir], [])
except Exception as e:
self.assertEqual(
str(e),
"'directory does not exist' isn't a valid directory"
)