Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
Showing
with 312 additions and 553 deletions
......@@ -3,19 +3,15 @@ from pathlib import Path
from unittest import TestCase
from unittest.mock import patch
from contextlib import redirect_stdout
import io
from sohstationviewer.controller.processing import (
load_data,
read_mseed_channels,
detect_data_type,
get_data_type_from_file
)
from sohstationviewer.database.extract_data import get_signature_channels
from PySide2 import QtWidgets
from sohstationviewer.model.mseed.mseed import MSeed
from sohstationviewer.model.reftek.reftek import RT130
TEST_DATA_DIR = Path(__file__).resolve().parent.parent.joinpath('test_data')
rt130_dir = TEST_DATA_DIR.joinpath('RT130-sample/2017149.92EB/2017150')
......@@ -25,7 +21,7 @@ pegasus_dir = TEST_DATA_DIR.joinpath('Pegasus-sample/Pegasus_SVC4/soh')
multiplex_dir = TEST_DATA_DIR.joinpath('Q330_multiplex')
class TestLoadDataAndReadChannels(TestCase):
class TestReadChannels(TestCase):
"""Test suite for load_data and read_mseed_channels."""
def setUp(self) -> None:
......@@ -39,142 +35,6 @@ class TestLoadDataAndReadChannels(TestCase):
# though, so we are setting it to a stub value.
self.mseed_dtype = 'MSeed'
def test_load_data_rt130_good_dir(self):
"""
Test basic functionality of load_data - the given directory can be
loaded without issues. Test RT130.
"""
self.assertIsInstance(
load_data('RT130', self.widget_stub, [rt130_dir], []),
RT130
)
def test_load_data_rt130_used(self):
with self.subTest("R130, no dir_list"):
self.assertIsInstance(
load_data('RT130', self.widget_stub, [], [rt130_dir]),
RT130
)
with self.subTest("R130, any dir_list"):
# should ignore dir_list
self.assertIsInstance(
load_data('RT130', self.widget_stub, ['_'], [rt130_dir]),
RT130
)
with self.subTest("R130, bad dir_list"):
self.assertIsNone(
load_data('RT130', self.widget_stub, [], ['_'])
)
with self.subTest("Q330"):
self.assertIsNone(
load_data('Q330', self.widget_stub, [], [rt130_dir])
)
def test_load_data_mseed_q330_good_data_dir(self):
"""
Test basic functionality of load_data - the given directory can be
loaded without issues. Test MSeed.
"""
self.assertIsInstance(
load_data(self.mseed_dtype, self.widget_stub, [q330_dir], []),
MSeed
)
self.assertIsInstance(
load_data(self.mseed_dtype, self.widget_stub, [centaur_dir], []),
MSeed
)
self.assertIsInstance(
load_data(self.mseed_dtype, self.widget_stub, [pegasus_dir], []),
MSeed
)
def test_load_data_no_dir(self):
"""Test basic functionality of load_data - no directory was given."""
no_dir_given = []
self.assertIsNone(load_data(
'RT130', self.widget_stub, no_dir_given, []))
self.assertIsNone(
load_data(
self.mseed_dtype, self.widget_stub, no_dir_given, []))
def test_load_data_dir_does_not_exist(self):
"""
Test basic functionality of load_data - the given directory does not
exist.
"""
empty_name_dir = ['']
non_existent_dir = ['dir_that_does_not_exist']
self.assertIsNone(
load_data('RT130', self.widget_stub, empty_name_dir, []))
self.assertIsNone(
load_data('RT130', self.widget_stub, non_existent_dir, []))
self.assertIsNone(
load_data(self.mseed_dtype, self.widget_stub, empty_name_dir, []))
self.assertIsNone(
load_data(
self.mseed_dtype, self.widget_stub, non_existent_dir, []))
def test_load_data_empty_dir(self):
"""
Test basic functionality of load_data - the given directory is empty.
"""
with TemporaryDirectory() as empty_dir:
self.assertIsNone(
load_data('RT130', self.widget_stub, [empty_dir], []))
self.assertIsNone(
load_data(self.mseed_dtype, self.widget_stub, [empty_dir], []))
def test_load_data_empty_data_dir(self):
"""
Test basic functionality of load_data - the given directory
contains a data folder but no data file.
"""
with TemporaryDirectory() as outer_dir:
with TemporaryDirectory(dir=outer_dir) as data_dir:
self.assertIsNone(
load_data('RT130', self.widget_stub, [data_dir], []))
self.assertIsNone(
load_data(
self.mseed_dtype, self.widget_stub, [outer_dir], []))
def test_load_data_data_type_mismatch(self):
"""
Test basic functionality of load_data - the data type given does not
match the type of the data contained in the given directory.
"""
self.assertIsNone(
load_data('RT130', self.widget_stub, [q330_dir], []))
self.assertIsNone(
load_data(self.mseed_dtype, self.widget_stub, [rt130_dir], []))
def test_load_data_data_traceback_error(self):
"""
Test basic functionality of load_data - when there is an error
on loading data, the traceback info will be printed out
"""
f = io.StringIO()
with redirect_stdout(f):
self.assertIsNone(load_data('RT130', None, [q330_dir], []))
output = f.getvalue()
self.assertIn(
f"Dir {q330_dir} "
f"can't be read due to error: Traceback",
output
)
with redirect_stdout(f):
self.assertIsNone(
load_data(self.mseed_dtype, None, [rt130_dir], []))
output = f.getvalue()
self.assertIn(
f"Dir {rt130_dir} "
f"can't be read due to error: Traceback",
output
)
def test_read_channels_mseed_dir(self):
"""
Test basic functionality of load_data - the given directory contains
......@@ -306,40 +166,40 @@ class TestDetectDataType(TestCase):
Test basic functionality of detect_data_type - only one directory was
given and the data type it contains can be detected.
"""
expected_data_type = ('RT130', '_')
expected_data_type = ('RT130', False)
self.mock_get_data_type_from_file.return_value = expected_data_type
self.assertEqual(
detect_data_type([self.dir1.name]),
expected_data_type[0]
expected_data_type
)
def test_same_data_type_and_channel(self):
def test_same_data_type_not_multiplex(self):
"""
Test basic functionality of detect_data_type - the given directories
contain the same data type and the data type was detected using the
same channel.
"""
expected_data_type = ('RT130', '_')
expected_data_type = ('RT130', False)
self.mock_get_data_type_from_file.return_value = expected_data_type
self.assertEqual(
detect_data_type([self.dir1.name, self.dir2.name]),
expected_data_type[0]
expected_data_type
)
def test_same_data_type_different_channel(self):
def test_same_data_type_multiplex(self):
"""
Test basic functionality of detect_data_type - the given directories
contain the same data type but the data type was detected using
different channels.
"""
returned_data_types = [('Q330', 'OCF'), ('Q330', 'VEP')]
returned_data_types = [('Q330', True), ('Q330', True)]
self.mock_get_data_type_from_file.side_effect = returned_data_types
self.assertEqual(
detect_data_type([self.dir1.name, self.dir2.name]),
returned_data_types[0][0]
returned_data_types[0]
)
def test_different_data_types(self):
......@@ -347,7 +207,7 @@ class TestDetectDataType(TestCase):
Test basic functionality of detect_data_type - the given directories
contain different data types.
"""
returned_data_types = [('RT130', '_'), ('Q330', 'VEP')]
returned_data_types = [('RT130', False), ('Q330', False)]
self.mock_get_data_type_from_file.side_effect = returned_data_types
with self.assertRaises(Exception) as context:
......@@ -355,8 +215,8 @@ class TestDetectDataType(TestCase):
self.assertEqual(
str(context.exception),
f"There are more than one types of data detected:\n"
f"{self.dir1.name}: [RT130, _]\n"
f"{self.dir2.name}: [Q330, VEP]\n\n"
f"{self.dir1.name}: RT130, "
f"{self.dir2.name}: Q330\n\n"
f"Please have only data that related to each other.")
def test_unknown_data_type(self):
......@@ -364,14 +224,28 @@ class TestDetectDataType(TestCase):
Test basic functionality of detect_data_type - can't detect any data
type.
"""
unknown_data_type = ('Unknown', '_')
unknown_data_type = ('Unknown', False)
self.mock_get_data_type_from_file.return_value = unknown_data_type
with self.assertRaises(Exception) as context:
detect_data_type([self.dir1.name])
self.assertEqual(
str(context.exception),
"There are no known data detected.\n\n"
"Do you want to cancel to select different folder(s)\n"
"Or continue to read any available mseed file?")
def test_multiplex_none(self):
"""
Test basic functionality of detect_data_type - can't detect any data
type.
"""
unknown_data_type = ('Unknown', None)
self.mock_get_data_type_from_file.return_value = unknown_data_type
with self.assertRaises(Exception) as context:
detect_data_type([self.dir1.name])
self.assertEqual(
str(context.exception),
"There are no known data detected.\n"
"Please select different folder(s).")
"No channel found for the data set")
class TestGetDataTypeFromFile(TestCase):
......@@ -383,7 +257,7 @@ class TestGetDataTypeFromFile(TestCase):
"""
rt130_file = Path(rt130_dir).joinpath(
'92EB/0/000000000_00000000')
expected_data_type = ('RT130', '_')
expected_data_type = ('RT130', False)
self.assertTupleEqual(
get_data_type_from_file(rt130_file, get_signature_channels()),
expected_data_type
......@@ -395,8 +269,9 @@ class TestGetDataTypeFromFile(TestCase):
data type contained in given file.
"""
test_file = NamedTemporaryFile()
self.assertIsNone(
get_data_type_from_file(test_file.name, get_signature_channels()))
ret = get_data_type_from_file(
Path(test_file.name), get_signature_channels())
self.assertEqual(ret, (None, False))
def test_mseed_data(self):
"""
......@@ -408,9 +283,9 @@ class TestGetDataTypeFromFile(TestCase):
'XX.3734.SOH.centaur-3_3734..20180817_000000.miniseed.miniseed')
pegasus_file = pegasus_dir.joinpath(
'2020/XX/KC01/VE1.D/XX.KC01..VE1.D.2020.129')
q330_data_type = ('Q330', 'VKI')
centaur_data_type = ('Centaur', 'GEL')
pegasus_data_type = ('Pegasus', 'VE1')
q330_data_type = ('Q330', False)
centaur_data_type = ('Centaur', True)
pegasus_data_type = ('Pegasus', False)
sig_chan = get_signature_channels()
......@@ -426,10 +301,16 @@ class TestGetDataTypeFromFile(TestCase):
Test basic functionality of get_data_type_from_file - given file does
not exist.
"""
empty_name_file = ''
non_existent_file = 'non_existent_dir'
with self.assertRaises(FileNotFoundError):
empty_name_file = Path('')
non_existent_file = Path('non_existent_dir')
with self.assertRaises(IsADirectoryError):
get_data_type_from_file(empty_name_file, get_signature_channels())
with self.assertRaises(FileNotFoundError):
get_data_type_from_file(non_existent_file,
get_signature_channels())
def test_non_data_binary_file(self):
binary_file = Path(__file__).resolve().parent.parent.parent.joinpath(
'images', 'home.png')
ret = get_data_type_from_file(binary_file, get_signature_channels())
self.assertIsNone(ret)
......@@ -108,7 +108,12 @@ class TestRetrieveDataTimeFromDataDict(TestCase):
self.expected_data_time = {'STA1': [4, 9], 'STA2': [2, 8]}
def test_retrieve_data_time(self):
retrieve_data_time_from_data_dict(self.data_dict, self.data_time)
retrieve_data_time_from_data_dict(
'STA1', self.data_dict, self.data_time)
self.assertEqual(self.data_time,
{'STA1': self.expected_data_time['STA1']})
retrieve_data_time_from_data_dict(
'STA2', self.data_dict, self.data_time)
self.assertEqual(self.data_time,
self.expected_data_time)
......@@ -128,7 +133,13 @@ class TestRetrieveGapsFromDataDict(TestCase):
'STA2': [[1, 2], [4, 3], [2, 3], [1, 3], [3, 2]]}
def test_retrieve_gaps(self):
retrieve_gaps_from_data_dict(self.data_dict, self.gaps)
self.gaps['STA1'] = []
retrieve_gaps_from_data_dict('STA1', self.data_dict, self.gaps)
self.assertEqual(self.gaps,
{'STA1': self.expected_gaps['STA1']})
self.gaps['STA2'] = []
retrieve_gaps_from_data_dict('STA2', self.data_dict, self.gaps)
self.assertEqual(self.gaps,
self.expected_gaps)
......@@ -136,7 +147,7 @@ class TestRetrieveGapsFromDataDict(TestCase):
class TestCombineData(TestCase):
def test_overlap_lt_gap_minimum(self):
# combine; not add to gap list
station_data_dict = {
data_dict = {'STA1': {
'CH1': {
'gaps': [],
'tracesInfo': [
......@@ -149,30 +160,30 @@ class TestCombineData(TestCase):
'data': [1, -2, 1, 1],
'times': [13, 16, 18, 20]}
]}
}
}}
gap_minimum = 10
combine_data(station_data_dict, gap_minimum)
self.assertEqual(station_data_dict['CH1']['gaps'], [])
combine_data('STA1', data_dict, gap_minimum)
self.assertEqual(data_dict['STA1']['CH1']['gaps'], [])
self.assertEqual(
len(station_data_dict['CH1']['tracesInfo']),
len(data_dict['STA1']['CH1']['tracesInfo']),
1)
self.assertEqual(
station_data_dict['CH1']['tracesInfo'][0]['startTmEpoch'],
data_dict['STA1']['CH1']['tracesInfo'][0]['startTmEpoch'],
5)
self.assertEqual(
station_data_dict['CH1']['tracesInfo'][0]['endTmEpoch'],
data_dict['STA1']['CH1']['tracesInfo'][0]['endTmEpoch'],
20)
self.assertListEqual(
station_data_dict['CH1']['tracesInfo'][0]['data'].tolist(),
data_dict['STA1']['CH1']['tracesInfo'][0]['data'].tolist(),
[1, 2, 2, -1, 1, -2, 1, 1])
self.assertListEqual(
station_data_dict['CH1']['tracesInfo'][0]['times'].tolist(),
data_dict['STA1']['CH1']['tracesInfo'][0]['times'].tolist(),
[5, 8, 11, 15, 13, 16, 18, 20])
def test_overlap_gt_or_equal_gap_minimum(self):
# combine; add to gap list
station_data_dict = {
data_dict = {'STA1': {
'CH1': {
'gaps': [],
'tracesInfo': [
......@@ -185,30 +196,30 @@ class TestCombineData(TestCase):
'data': [1, -2, 1, 1],
'times': [5, 11, 15, 20]}
]}
}
}}
gap_minimum = 10
combine_data(station_data_dict, gap_minimum)
self.assertEqual(station_data_dict['CH1']['gaps'], [[15, 5]])
combine_data('STA1', data_dict, gap_minimum)
self.assertEqual(data_dict['STA1']['CH1']['gaps'], [[15, 5]])
self.assertEqual(
len(station_data_dict['CH1']['tracesInfo']),
len(data_dict['STA1']['CH1']['tracesInfo']),
1)
self.assertEqual(
station_data_dict['CH1']['tracesInfo'][0]['startTmEpoch'],
data_dict['STA1']['CH1']['tracesInfo'][0]['startTmEpoch'],
5)
self.assertEqual(
station_data_dict['CH1']['tracesInfo'][0]['endTmEpoch'],
data_dict['STA1']['CH1']['tracesInfo'][0]['endTmEpoch'],
20)
self.assertListEqual(
station_data_dict['CH1']['tracesInfo'][0]['data'].tolist(),
data_dict['STA1']['CH1']['tracesInfo'][0]['data'].tolist(),
[1, 2, 2, -1, 1, -2, 1, 1])
self.assertListEqual(
station_data_dict['CH1']['tracesInfo'][0]['times'].tolist(),
data_dict['STA1']['CH1']['tracesInfo'][0]['times'].tolist(),
[5, 8, 11, 15, 5, 11, 15, 20])
def test_lt_gap_minimum(self):
# not combine; not add to gap list
station_data_dict = {
data_dict = {'STA1': {
'CH1': {
'gaps': [],
'tracesInfo': [
......@@ -221,27 +232,27 @@ class TestCombineData(TestCase):
'data': [1, -2, 1, 1],
'times': [22, 26, 30, 34]}
]}
}
}}
gap_minimum = 10
combine_data(station_data_dict, gap_minimum)
self.assertEqual(station_data_dict['CH1']['gaps'], [])
combine_data('STA1', data_dict, gap_minimum)
self.assertEqual(data_dict['STA1']['CH1']['gaps'], [])
self.assertEqual(
station_data_dict['CH1']['tracesInfo'][0]['startTmEpoch'],
data_dict['STA1']['CH1']['tracesInfo'][0]['startTmEpoch'],
5)
self.assertEqual(
station_data_dict['CH1']['tracesInfo'][0]['endTmEpoch'],
data_dict['STA1']['CH1']['tracesInfo'][0]['endTmEpoch'],
34)
self.assertListEqual(
station_data_dict['CH1']['tracesInfo'][0]['data'].tolist(),
data_dict['STA1']['CH1']['tracesInfo'][0]['data'].tolist(),
[1, 2, 2, -1, 1, -2, 1, 1])
self.assertListEqual(
station_data_dict['CH1']['tracesInfo'][0]['times'].tolist(),
data_dict['STA1']['CH1']['tracesInfo'][0]['times'].tolist(),
[5, 8, 11, 15, 22, 26, 30, 34])
def test_gap_gt_or_equal_gap_minimum(self):
# not combine; add to gap list
station_data_dict = {
data_dict = {'STA1': {
'CH1': {
'gaps': [],
'tracesInfo': [
......@@ -254,22 +265,22 @@ class TestCombineData(TestCase):
'data': [1, -2, 1, 1],
'times': [25, 29, 33, 36, 40]}
]}
}
}}
gap_minimum = 10
combine_data(station_data_dict, gap_minimum)
self.assertEqual(station_data_dict['CH1']['gaps'], [[15, 25]])
combine_data('STA1', data_dict, gap_minimum)
self.assertEqual(data_dict['STA1']['CH1']['gaps'], [[15, 25]])
self.assertEqual(
station_data_dict['CH1']['tracesInfo'][0]['startTmEpoch'],
data_dict['STA1']['CH1']['tracesInfo'][0]['startTmEpoch'],
5)
self.assertEqual(
station_data_dict['CH1']['tracesInfo'][0]['endTmEpoch'],
data_dict['STA1']['CH1']['tracesInfo'][0]['endTmEpoch'],
40)
self.assertListEqual(
station_data_dict['CH1']['tracesInfo'][0]['data'].tolist(),
data_dict['STA1']['CH1']['tracesInfo'][0]['data'].tolist(),
[1, 2, 2, -1, 1, -2, 1, 1])
self.assertListEqual(
station_data_dict['CH1']['tracesInfo'][0]['times'].tolist(),
data_dict['STA1']['CH1']['tracesInfo'][0]['times'].tolist(),
[5, 8, 11, 15, 25, 29, 33, 36, 40])
......@@ -286,7 +297,7 @@ class TestApplyConvertFactorToDataDict(TestCase):
'get_convert_factor')
def test_convert_factor(self, mock_get_convert_factor):
mock_get_convert_factor.return_value = 0.1
apply_convert_factor_to_data_dict(self.data_dict, 'Q330')
apply_convert_factor_to_data_dict('STA1', self.data_dict, 'Q330')
self.assertEqual(
self.data_dict['STA1']['CH1']['tracesInfo'][0]['data'].tolist(),
self.expected_data)
......@@ -230,8 +230,6 @@ class TestMSeed(TestCase):
1)
def test_existing_time_range(self):
import os
print(os.getcwd())
# check if data_time is from the given range, end time may get
# a little greater than read_end according to record's end time
args = {
......
......@@ -58,7 +58,7 @@ class TestExtractData(unittest.TestCase):
with self.subTest("RT130 Seismic"):
expected_result = {'channel': 'DS2',
'plotType': 'linesSRate',
'height': 4,
'height': 8,
'unit': '',
'linkedChan': None,
'convertFactor': 1,
......@@ -71,7 +71,7 @@ class TestExtractData(unittest.TestCase):
with self.subTest("MSeed Seismic"):
expected_result = {'channel': 'LHE',
'plotType': 'linesSRate',
'height': 4,
'height': 8,
'unit': '',
'linkedChan': None,
'convertFactor': 1,
......
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Optional, Dict, Union, List
from typing import Dict, Union, List
from unittest import TestCase
from unittest.mock import patch
......@@ -8,17 +8,12 @@ from unittest.mock import patch
from obspy.core import UTCDateTime
import numpy as np
import sohstationviewer.view.plotting.time_power_squared_processor
from sohstationviewer.conf import constants as const
from sohstationviewer.model.handling_data import (
trim_downsample_chan_with_spr_less_or_equal_1,
trim_downsample_wf_chan,
trim_waveform_data,
downsample_waveform_data,
get_start_5mins_of_diff_days,
)
from sohstationviewer.view.plotting.time_power_squared_processor import (
TimePowerSquaredProcessor,
)
from sohstationviewer.model.downsampler import downsample, chunk_minmax
......@@ -610,337 +605,3 @@ class TestTrimDownsampleWfChan(TestCase):
self.end_time, False)
self.assertTrue(mock_trim.called)
self.assertTrue(mock_downsample.called)
class TestGetTrimTpsData(TestCase):
def no_file_memmap(self, file_path: Path, *args, **kwargs):
"""
A mock of numpy.memmap. Reduce test run time significantly by making
sure that data access happens in memory and not on disk.
This method does not actually load the data stored on disk. Instead, it
constructs the array of data using the name of the given file. To do
so, this method requires the file name to be in the format
<prefix>_<index>. This method then constructs an array of
self.trace_size consecutive integers starting at
<index> * self.trace_size.
:param file_path: the path to a file used to construct the data array.
:param args: dummy arguments to make the API similar to numpy.memmap.
:param kwargs: dummy arguments to make the API similar to numpy.memmap.
:return: a numpy array constructed using file_path's name.
"""
file_idx = int(file_path.name.split('_')[-1])
start = file_idx * self.trace_size
end = start + self.trace_size
return np.arange(start, end)
def add_trace(self, start_time: float, idx: Optional[int] = None):
"""
Add a trace to the stored list of traces.
:param start_time: the start time of the trace to be added.
:param idx: the index to insert the trace into. If None, the new trace
will be appended to the list of traces
"""
trace = {}
trace['startTmEpoch'] = start_time
trace['endTmEpoch'] = start_time + self.trace_size - 1
trace['size'] = self.trace_size
file_idx = start_time // self.trace_size
times_file_name = Path(self.data_folder.name) / f'times_{file_idx}'
trace['times_f'] = times_file_name
data_file_name = Path(self.data_folder.name) / f'data_{file_idx}'
trace['data_f'] = data_file_name
if idx is not None:
self.traces_info.insert(idx, trace)
else:
self.traces_info.append(trace)
def setUp(self) -> None:
"""Set up text fixtures."""
memmap_patcher = patch.object(np, 'memmap',
side_effect=self.no_file_memmap)
self.addCleanup(memmap_patcher.stop)
memmap_patcher.start()
# Channel ID is only used when communicating with the main window.
# Seeing as we are testing the processing step here, we don't really
# need it.
channel_id = ''
self.channel_data: ChannelData = {'samplerate': 1}
self.traces_info = []
self.channel_data['tracesInfo'] = self.traces_info
self.data_folder = TemporaryDirectory()
self.trace_size = 1000
for i in range(100):
start_time = i * self.trace_size
self.add_trace(start_time)
self.start_time = 25000
self.end_time = 75000
self.start_5mins_of_diff_days = get_start_5mins_of_diff_days(
self.start_time, self.end_time)
self.tps_processor = TimePowerSquaredProcessor(
channel_id, self.channel_data, self.start_time, self.end_time,
self.start_5mins_of_diff_days
)
local_TimePowerSquaredProcessor = (sohstationviewer.view.plotting.
time_power_squared_processor.
TimePowerSquaredProcessor)
# If object obj is instance of class A, then the method call obj.method1()
# translate to A.method1(obj) for Python. So, in order to mock method1 for
# obj, we mock it for the class A.
@patch.object(local_TimePowerSquaredProcessor, 'trim_waveform_data')
def test_data_is_trimmed(self, mock_trim_waveform_data):
"""Test that the data is trimmed."""
self.tps_processor.run()
self.assertTrue(mock_trim_waveform_data.called)
def test_appropriate_amount_of_5_mins_skipped(self):
"""Test that the trimmed part of the data is skipped over."""
self.tps_processor.run()
with self.subTest('test_skip_before_start_time'):
first_unskipped_idx = 83
skipped_tps_arr = (
self.channel_data['tps_data'][0][:first_unskipped_idx]
)
self.assertTrue((skipped_tps_arr == 0).all())
with self.subTest('test_skip_after_end_time'):
last_unskipped_idx = 252
skipped_tps_arr = (
self.channel_data['tps_data'][0][last_unskipped_idx + 1:]
)
self.assertTrue((skipped_tps_arr == 0).all())
def test_result_is_stored(self):
"""Test that the result of the TPS calculation is stored."""
self.tps_processor.run()
self.assertTrue('tps_data' in self.channel_data)
def test_formula_is_correct(self):
"""Test that the TPS calculation uses the correct formula."""
self.tps_processor.start_time = 50000
self.tps_processor.end_time = 52000
self.tps_processor.run()
first_unskipped_idx = 166
last_unskipped_idx = 175
tps_data = self.channel_data['tps_data'][0]
unskipped_tps_arr = (
tps_data[first_unskipped_idx:last_unskipped_idx + 1]
)
expected = np.array([
2.51497985e+09, 2.54515955e+09, 2.57551925e+09, 0.00000000e+00,
1.96222188e+09, 2.64705855e+09, 2.67801825e+09, 2.03969638e+09,
2.75095755e+09, 2.78251725e+09
])
self.assertTrue(np.allclose(unskipped_tps_arr, expected))
def test_one_tps_array_for_each_day_one_day_of_data(self):
"""
Test that there is one TPS array for each day of data.
Test the case where there is only one day of data.
"""
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 1)
def test_one_tps_array_for_each_day_multiple_days_of_data(self):
"""
Test that there is one TPS array for each dat of data.
Test the case where there are more than one day of data.
"""
# Currently, the data time goes from 0 to 100000, which is enough to
# cover two days (the start of the second positive day in epoch time is
# 86400). Thus, we only have to set the end time to the data end time
# to have two days of data.
self.tps_processor.end_time = 100000
self.tps_processor.start_5mins_of_diff_days = \
get_start_5mins_of_diff_days(self.tps_processor.start_time,
self.tps_processor.end_time)
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 2)
def test_data_has_gap_to_the_right_data_same_day_before_gap(self):
"""
Test that gaps in the data are skipped in TPS calculation by checking
that the elements in the TPS array corresponding to the gaps are
0.
Test the case where there are gaps to the right of the data and the
traces directly next to the gaps are in the same day.
"""
# Remove traces that go from 1000 to 24999 (traces 2 to 25) in order to
# create a gap on the right side of the data.
self.traces_info = [trace
for i, trace in enumerate(self.traces_info)
if not 0 < i < 25]
self.channel_data['tracesInfo'] = self.traces_info
with self.subTest('test_start_time_in_gap'):
self.tps_processor.start_time = 15000
self.tps_processor.start_5mins_of_diff_days = \
get_start_5mins_of_diff_days(self.tps_processor.start_time,
self.tps_processor.end_time)
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 1)
tps_gap = slice(0, 50)
tps_data_in_gap = self.channel_data['tps_data'][0][tps_gap]
tps_data_in_gap_contains_zero = np.allclose(
tps_data_in_gap, np.zeros(tps_data_in_gap.size)
)
self.assertTrue(tps_data_in_gap_contains_zero)
with self.subTest('test_start_time_cover_all_traces'):
self.tps_processor.start_time = 500
self.tps_processor.start_5mins_of_diff_days = \
get_start_5mins_of_diff_days(self.tps_processor.start_time,
self.tps_processor.end_time)
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 1)
tps_gap = slice(2, 83)
tps_data_in_gap = self.channel_data['tps_data'][0][tps_gap]
tps_data_in_gap_contains_zero = np.allclose(
tps_data_in_gap, np.zeros(tps_data_in_gap.size)
)
self.assertTrue(tps_data_in_gap_contains_zero)
def test_data_has_gap_to_the_left_data_same_day_after_gap(self):
"""
Test that gaps in the data are skipped in TPS calculation by checking
that the elements in the TPS array corresponding to the gaps are
0.
Test the case where there are gaps to the left of the data and the
traces directly next to the gaps are in the same day.
"""
# Data end time is 100000, so we want a trace that starts after 100001
trace_start_time = 125000
self.add_trace(trace_start_time)
with self.subTest('test_end_time_in_gap'):
# Subject to change after Issue #37 is fixed
self.tps_processor.end_time = 110000
self.tps_processor.start_5mins_of_diff_days = \
get_start_5mins_of_diff_days(self.tps_processor.start_time,
self.tps_processor.end_time)
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 2)
tps_gaps = (slice(45, 128), slice(131, None))
tps_data_in_gaps = np.concatenate(
[self.channel_data['tps_data'][1][gap] for gap in tps_gaps]
)
tps_data_in_gaps_contains_zero = np.allclose(
tps_data_in_gaps, np.zeros(tps_data_in_gaps.size)
)
self.assertTrue(tps_data_in_gaps_contains_zero)
with self.subTest('test_end_time_cover_all_traces'):
self.tps_processor.end_time = trace_start_time + 50
self.tps_processor.start_5mins_of_diff_days = \
get_start_5mins_of_diff_days(self.tps_processor.start_time,
self.tps_processor.end_time)
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 2)
tps_gaps = (slice(45, 128), slice(131, None))
tps_data_in_gaps = np.concatenate(
[self.channel_data['tps_data'][1][gap] for gap in tps_gaps]
)
tps_data_in_gaps_contains_zero = np.allclose(
tps_data_in_gaps, np.zeros(tps_data_in_gaps.size)
)
self.assertTrue(tps_data_in_gaps_contains_zero)
def test_data_has_gap_to_the_right_data_different_day_before_gap(self):
"""
Test that gaps in the data are skipped in TPS calculation by checking
that the elements in the TPS array corresponding to the gaps are
0.
Test the case where there are gaps to the right of the data and the
traces directly next to the gaps are in different days.
"""
trace_start_time = -50000
self.add_trace(trace_start_time, idx=0)
with self.subTest('test_start_time_in_gap'):
self.tps_processor.start_time = -25000
self.tps_processor.start_5mins_of_diff_days = \
get_start_5mins_of_diff_days(self.tps_processor.start_time,
self.tps_processor.end_time)
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 2)
tps_gap = slice(const.NO_5M_DAY)
tps_data_in_gap = self.channel_data['tps_data'][0][tps_gap]
tps_data_in_gap_contains_zero = np.allclose(
tps_data_in_gap, np.zeros(tps_data_in_gap.size)
)
self.assertTrue(tps_data_in_gap_contains_zero)
with self.subTest('test_start_time_cover_all_traces'):
self.tps_processor.start_time = -60000
self.tps_processor.start_5mins_of_diff_days = \
get_start_5mins_of_diff_days(self.tps_processor.start_time,
self.tps_processor.end_time)
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 2)
tps_gaps = (slice(0, 121), slice(124, None))
tps_data_in_gaps = np.concatenate(
[self.channel_data['tps_data'][0][gap] for gap in tps_gaps]
)
tps_data_in_gaps_contains_zero = np.allclose(
tps_data_in_gaps, np.zeros(tps_data_in_gaps.size)
)
self.assertTrue(tps_data_in_gaps_contains_zero)
def test_data_has_gap_to_the_left_data_different_day_after_gap(self):
"""
Test that gaps in the data are skipped in TPS calculation by checking
that the elements in the TPS array corresponding to the gaps are
0.
Test the case where there are gaps to the left of the data and the
traces directly next to the gaps are in different days.
"""
# The setup portion of this test suite only create traces in the first
# positive day in epoch time. So, in order to guarantee there is a gap
# in the TPS array, we skip the second positive day. The start of the
# third positive day in epoch time is 172800, so we want a trace that
# starts after 172801.
trace_start_time = 173100
self.add_trace(trace_start_time)
with self.subTest('test_end_time_same_day_as_second_to_last_trace'):
# Subject to change after Issue #37 is fixed
self.tps_processor.end_time = 125000
self.tps_processor.start_5mins_of_diff_days = \
get_start_5mins_of_diff_days(self.tps_processor.start_time,
self.tps_processor.end_time)
with self.assertRaises(IndexError):
self.tps_processor.run()
with self.subTest('test_end_time_cover_all_traces'):
self.tps_processor.end_time = trace_start_time + 50
self.tps_processor.start_5mins_of_diff_days = \
get_start_5mins_of_diff_days(self.tps_processor.start_time,
self.tps_processor.end_time)
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 3)
tps_gap_day_2 = slice(45, None)
tps_gap_day_3 = slice(4, None)
tps_data_in_gaps = np.hstack(
(
self.channel_data['tps_data'][1][tps_gap_day_2],
self.channel_data['tps_data'][2][tps_gap_day_3]
)
)
tps_data_in_gaps_contains_zero = np.allclose(
tps_data_in_gaps, np.zeros(tps_data_in_gaps.size)
)
self.assertTrue(tps_data_in_gaps_contains_zero)
......@@ -223,7 +223,8 @@ class MockMSeed(MSeed):
class TestGetGPSChannelPrefix(TestCase):
def setUp(self) -> None:
self.mseed_obj = MockMSeed()
self.mseed_obj.channels = set()
self.mseed_obj.selected_key = 'STA'
self.mseed_obj.soh_data = {'STA': {}}
def test_pegasus_data_type(self):
data_type = 'Pegasus'
......@@ -239,14 +240,16 @@ class TestGetGPSChannelPrefix(TestCase):
def test_unknown_data_type_pegasus_gps_channels(self):
data_type = 'Unknown'
self.mseed_obj.channels = {'VNS', 'VLA', 'VLO', 'VEL'}
self.mseed_obj.soh_data = {
'STA': {'VNS': {}, 'VLA': {}, 'VEL': {}, 'VLO': {}}}
expected = 'V'
result = get_gps_channel_prefix(self.mseed_obj, data_type)
self.assertEqual(expected, result)
def test_unknown_data_type_centaur_gps_channels(self):
data_type = 'Unknown'
self.mseed_obj.channels = {'GNS', 'GLA', 'GLO', 'GEL'}
self.mseed_obj.soh_data = {
'STA': {'GNS': {}, 'GLA': {}, 'GEL': {}, 'GLO': {}}}
expected = 'G'
result = get_gps_channel_prefix(self.mseed_obj, data_type)
self.assertEqual(expected, result)
......
......@@ -97,7 +97,6 @@ class TestParseGpsPoint(unittest.TestCase):
gps_point = parse_gps_point_rt130(self.good_gps_line,
self.gps_year)
result = gps_point.longitude
print(result)
expected = -106.92038611111111
self.assertTrue(math.isclose(result, expected))
......
from unittest import TestCase
from unittest.mock import patch
from obspy.core import UTCDateTime
import numpy as np
from sohstationviewer.view.plotting.plotting_widget.plotting_processor_helper \
import downsample, chunk_minmax
ZERO_EPOCH_TIME = UTCDateTime(1970, 1, 1, 0, 0, 0).timestamp
class TestDownsample(TestCase):
# FROM test_handling_data_trim_downsample.TestDownsample
def setUp(self) -> None:
patcher = patch('sohstationviewer.view.plotting.plotting_widget.'
'plotting_processor_helper.chunk_minmax')
self.addCleanup(patcher.stop)
self.mock_chunk_minmax = patcher.start()
self.times = np.arange(1000)
self.data = np.arange(1000)
self.log_idx = np.arange(1000)
def test_first_downsample_step_remove_enough_points(self):
req_points = 999
downsample(self.times, self.data, rq_points=req_points)
self.assertFalse(self.mock_chunk_minmax.called)
def test_first_downsample_step_remove_enough_points_with_logidx(self):
req_points = 999
downsample(self.times, self.data, self.log_idx, rq_points=req_points)
self.assertFalse(self.mock_chunk_minmax.called)
def test_second_downsample_step_required(self):
req_points = 1
downsample(self.times, self.data, rq_points=req_points)
self.assertTrue(self.mock_chunk_minmax.called)
times, data, _, rq_points = self.mock_chunk_minmax.call_args[0]
self.assertIsNot(times, self.times)
self.assertIsNot(data, self.data)
self.assertEqual(rq_points, req_points)
def test_second_downsample_step_required_with_logidx(self):
req_points = 1
downsample(self.times, self.data, self.log_idx, rq_points=req_points)
self.assertTrue(self.mock_chunk_minmax.called)
times, data, log_idx, rq_points = self.mock_chunk_minmax.call_args[0]
self.assertIsNot(times, self.times)
self.assertIsNot(data, self.data)
self.assertIsNot(log_idx, self.log_idx)
self.assertEqual(rq_points, req_points)
def test_requested_points_greater_than_data_size(self):
req_points = 10000
times, data, _ = downsample(
self.times, self.data, rq_points=req_points)
self.assertFalse(self.mock_chunk_minmax.called)
# Check that we did not do any processing on the times and data arrays.
# This ensures that we don't do two unneeded copy operations.
self.assertIs(times, self.times)
self.assertIs(data, self.data)
def test_requested_points_greater_than_data_size_with_logidx(self):
req_points = 10000
times, data, log_idx = downsample(
self.times, self.data, self.log_idx, rq_points=req_points)
self.assertFalse(self.mock_chunk_minmax.called)
# Check that we did not do any processing on the times and data arrays.
# This ensures that we don't do two unneeded copy operations.
self.assertIs(times, self.times)
self.assertIs(data, self.data)
self.assertIs(log_idx, self.log_idx)
def test_requested_points_is_zero(self):
req_points = 0
downsample(self.times, self.data, rq_points=req_points)
self.assertTrue(self.mock_chunk_minmax.called)
times, data, _, rq_points = self.mock_chunk_minmax.call_args[0]
self.assertIsNot(times, self.times)
self.assertIsNot(data, self.data)
self.assertEqual(rq_points, req_points)
def test_requested_points_is_zero_with_logidx(self):
req_points = 0
downsample(self.times, self.data, self.log_idx, rq_points=req_points)
self.assertTrue(self.mock_chunk_minmax.called)
times, data, log_idx, rq_points = self.mock_chunk_minmax.call_args[0]
self.assertIsNot(times, self.times)
self.assertIsNot(data, self.data)
self.assertIsNot(log_idx, self.log_idx)
self.assertEqual(rq_points, req_points)
def test_empty_times_and_data(self):
req_points = 1000
self.times = np.empty((0, 0))
self.data = np.empty((0, 0))
times, data, _ = downsample(
self.times, self.data, rq_points=req_points)
self.assertFalse(self.mock_chunk_minmax.called)
# Check that we did not do any processing on the times and data arrays.
# This ensures that we don't do two unneeded copy operations.
self.assertIs(times, self.times)
self.assertIs(data, self.data)
def test_empty_times_and_data_with_logidx(self):
req_points = 1000
self.times = np.empty((0, 0))
self.data = np.empty((0, 0))
self.log_idx = np.empty((0, 0))
times, data, log_idx = downsample(
self.times, self.data, self.log_idx, rq_points=req_points)
self.assertFalse(self.mock_chunk_minmax.called)
# Check that we did not do any processing on the times and data arrays.
# This ensures that we don't do two unneeded copy operations.
self.assertIs(times, self.times)
self.assertIs(data, self.data)
self.assertIs(log_idx, self.log_idx)
class TestChunkMinmax(TestCase):
# FROM test_handling_data_trim_downsample.TestChunkMinmax
def setUp(self):
self.times = np.arange(1000)
self.data = np.arange(1000)
self.log_idx = np.arange(1000)
def test_data_size_is_multiple_of_requested_points(self):
req_points = 100
times, data, log_idx = chunk_minmax(
self.times, self.data, self.log_idx, req_points)
self.assertEqual(times.size, req_points)
self.assertEqual(data.size, req_points)
self.assertEqual(log_idx.size, req_points)
@patch('sohstationviewer.model.downsampler.downsample', wraps=downsample)
def test_data_size_is_not_multiple_of_requested_points(
self, mock_downsample):
req_points = 102
chunk_minmax(self.times, self.data, self.log_idx, req_points)
self.assertTrue(mock_downsample.called)
def test_requested_points_too_small(self):
small_req_points_list = [0, 1]
for req_points in small_req_points_list:
with self.subTest(f'test_requested_points_is_{req_points}'):
times, data, log_idx = chunk_minmax(
self.times, self.data, self.log_idx, rq_points=req_points)
self.assertEqual(times.size, 0)
self.assertEqual(data.size, 0)
self.assertEqual(data.size, 0)
import math
from unittest import TestCase
import numpy as np
from obspy import UTCDateTime
from sohstationviewer.model.handling_data import (
get_start_5mins_of_diff_days, find_tps_tm_idx
from sohstationviewer.view.plotting.time_power_squared_helper import (
get_start_5mins_of_diff_days, find_tps_tm_idx,
get_tps_for_discontinuous_data
)
from sohstationviewer.conf import constants as const
class TestGetEachDay5MinList(TestCase):
# FROM handling_data_calc_time
def test_start_in_midle_end_exact(self):
"""
Start in the middle of a day and end at the exact end of a day
......@@ -55,6 +60,7 @@ class TestGetEachDay5MinList(TestCase):
class TestFindTPSTmIdx(TestCase):
# FROM handling_data_calc_time
@classmethod
def setUpClass(cls) -> None:
start = UTCDateTime("2012-09-07T12:15:00").timestamp
......@@ -83,3 +89,53 @@ class TestFindTPSTmIdx(TestCase):
tm = UTCDateTime("2012-09-09T00:00:00").timestamp
start_tps_tm_idx = find_tps_tm_idx(tm, self.start_5mins_of_diff_days)
self.assertEqual(start_tps_tm_idx, (287, -1))
class TestGetTPSForDiscontinuousData(TestCase):
@classmethod
def setUpClass(cls) -> None:
cls.day_begin = UTCDateTime("2021-07-05T00:00:00").timestamp
cls.start = UTCDateTime("2021-07-05T22:59:28.340").timestamp
cls.end = UTCDateTime("2021-07-06T3:59:51.870").timestamp
cls.start_5mins_of_diff_days = get_start_5mins_of_diff_days(
cls.start, cls.end
)
def test_more_than_10_minute_apart(self):
# check for empty block in between tps data
times = np.arange(self.start, self.end, 60*60) # 60m apart
data = np.random.uniform(-1000, 1000, times.size)
channel_data = {'tracesInfo': [{'times': times, 'data': data}]}
tps = get_tps_for_discontinuous_data(
channel_data, self.start_5mins_of_diff_days)
self.assertEqual(len(tps), 2)
expected_first_index = \
math.ceil((self.start - self.day_begin)/const.SEC_5M) - 1
day0_indexes = np.where(tps[0] != 0)[0]
day1_indexes = np.where(tps[1] != 0)[0]
self.assertEqual(day0_indexes[0], expected_first_index)
# different (60/5) = 12 blocks from each other
self.assertTrue(np.all(np.diff(day0_indexes) == 60/5))
self.assertTrue(np.all(np.diff(day1_indexes) == 60/5))
def test_less_than_10_minute_apart(self):
# though times of data are apart from each other, but with less
# than 10m apart, the function will fill up the empty space
times = np.arange(self.start, self.end, 9*60) # 9m apart
data = np.random.uniform(-1000, 1000, times.size)
channel_data = {'tracesInfo': [{'times': times, 'data': data}]}
tps = get_tps_for_discontinuous_data(
channel_data, self.start_5mins_of_diff_days)
self.assertEqual(len(tps), 2)
expected_first_index = \
math.ceil((self.start - self.day_begin)/const.SEC_5M) - 1
day0_indexes = np.where(tps[0] != 0)[0]
day1_indexes = np.where(tps[1] != 0)[0]
self.assertEqual(day0_indexes[0], expected_first_index)
# no blocks apart from each other
self.assertTrue(np.all(np.diff(day0_indexes) == 1))
self.assertTrue(np.all(np.diff(day1_indexes) == 1))
# last block of day0 has value
self.assertIn(const.NO_5M_DAY - 1, day0_indexes)