""" Test suites for the functions defined in handling_data All random test data are named in the format sample_* and are generated using the builtin random library with a seed of 22589824271860044. This seed is obtained by converting the string PASSCAL to bytes and converting the resulting bytes to an integer with big endian ordering.""" from pathlib import Path import tempfile from math import isclose from copy import copy as shallow_copy import tracemalloc from unittest import TestCase, expectedFailure from unittest.mock import patch from obspy.core import Trace from obspy.core import UTCDateTime from obspy.core import read as read_ms import numpy as np from sohstationviewer.model.handling_data import ( readSOHMSeed, readSOHTrace, readMPTrace, readWaveformTrace, readWaveformMSeed, readWaveformReftek, readASCII, readText, saveData2File, checkChan, checkWFChan, checkSOHChan, sortData, squash_gaps, downsample, chunk_minmax, trim_downsample_SOHChan, get_eachDay5MinList, ) from sohstationviewer.model.reftek.from_rt2ms.core import Reftek130 from sohstationviewer.conf.constants import SEC_5M, SEC_DAY, NO_5M_DAY TEST_DATA_DIR = Path(__file__).parent.parent.joinpath('test_data') class TestHandlingData(TestCase): @classmethod def setUpClass(cls) -> None: rt130_dir = TEST_DATA_DIR.joinpath( 'RT130-sample/2017149.92EB/2017150/92EB') cls.rt130_soh_file = rt130_dir.joinpath('9/054910000_013EE8A0') rt130_soh = Reftek130.from_file(cls.rt130_soh_file) cls.rt130_soh_stream = Reftek130.to_stream(rt130_soh) cls.rt130_soh_trace = cls.rt130_soh_stream[0] cls.rt130_waveform_file = rt130_dir.joinpath('9/054910000_013EE8A0') rt130_waveform = Reftek130.from_file(cls.rt130_waveform_file) cls.rt130_waveform_stream = Reftek130.to_stream(rt130_waveform) cls.rt130_waveform_trace = cls.rt130_waveform_stream[0] q330_dir = TEST_DATA_DIR.joinpath('Q330-sample/day_vols_AX08') cls.q330_soh_file = q330_dir.joinpath('AX08.XA..VKI.2021.186') cls.mseed_soh_stream = read_ms(cls.q330_soh_file) cls.mseed_soh_trace = cls.mseed_soh_stream[0] cls.q330_waveform_file = q330_dir.joinpath('AX08.XA..LHE.2021.186') cls.mseed_waveform_stream = read_ms(cls.q330_waveform_file) cls.mseed_waveform_trace = cls.mseed_waveform_stream[0] # @expectedFailure # def test_read_sohmseed(self): # self.fail() def test_read_soh_trace_processed_trace_have_all_needed_info(self): processed_trace = readSOHTrace(self.mseed_soh_trace) with self.subTest('test_processed_trace_have_all_needed_info'): expected_key_list = [ 'chanID', 'samplerate', 'startTmEpoch', 'endTmEpoch', 'times', 'data' ] self.assertTrue( all(key in processed_trace for key in expected_key_list), ) def test_read_soh_trace_times_calculated_correctly(self): processed_trace = readSOHTrace(self.mseed_soh_trace) if isclose(processed_trace['startTmEpoch'], 0, abs_tol=0.0001): self.assertAlmostEqual(processed_trace['times'][0], 0) else: self.assertNotAlmostEqual(processed_trace['times'][0], 0) @patch('sohstationviewer.model.handling_data.readSOHTrace') def test_read_mp_trace(self, mock_read_soh_trace): mock_read_soh_trace.return_value = { # Contain five cases: # + Small positive # + Large positive # + Small negative # + Large negative # + Zero 'data': np.array([1, 27272, -2, -23526, 0]) } expected = np.array([0, 8.3, 0, -7.2, 0]) processed_trace = readMPTrace(self.rt130_soh_trace) self.assertTrue( np.array_equal(processed_trace['data'], expected) ) @patch('sohstationviewer.model.handling_data.saveData2File') def test_read_waveform_trace(self, mock_save_data_2_file): station_id = self.rt130_soh_trace.stats['station'] channel_id = self.rt130_soh_trace.stats['channel'] # The function itself only cares about the length of this list so we # stub it out. traces_info = [dict() for _ in range(4)] tmp_dir = tempfile.TemporaryDirectory() processed_trace = readWaveformTrace( self.rt130_soh_trace, station_id, channel_id, traces_info, tmp_dir.name ) expected_key_list = [ 'samplerate', 'startTmEpoch', 'endTmEpoch', 'size', 'times_f', 'data_f', ] self.assertTrue( all(key in processed_trace for key in expected_key_list), ) self.assertTrue(mock_save_data_2_file.called) # @skip # def test_squash_gaps(self): # self.fail() # # @skip # def test_downsample(self): # self.fail() # # @skip # def test_constant_rate(self): # self.fail() # # @skip # def test_chunk_minmax(self): # self.fail() # # @skip # def test_trim_downsample_sohchan(self): # self.fail() # # @skip # def test_trim_downsample_wfchan(self): # self.fail() # # @skip # def test_get_each_day5min_list(self): # self.fail() # # @skip # def test_get_trim_tpsdata(self): # self.fail() # # @skip # def test_find_tpstm(self): # self.fail() class TestReadWaveformMSeed(TestCase): @classmethod def setUpClass(cls) -> None: q330_dir = TEST_DATA_DIR.joinpath('Q330-sample/day_vols_AX08') cls.q330_waveform_file = q330_dir.joinpath('AX08.XA..LHE.2021.186') cls.mseed_waveform_stream = read_ms(cls.q330_waveform_file) cls.mseed_waveform_trace = cls.mseed_waveform_stream[0] def setUp(self) -> None: self.station_id = self.mseed_waveform_trace.stats['station'] self.channel_id = self.mseed_waveform_trace.stats['channel'] # This list is only ever written to, so we can keep it empty self.traces_info = [] self.data_time = [0, 0] self.temp_dir = tempfile.TemporaryDirectory() patcher = patch( 'sohstationviewer.model.handling_data.readWaveformTrace') self.addCleanup(patcher.stop) self.mock_read_waveform_trace = patcher.start() self.mock_read_waveform_trace.return_value = { 'startTmEpoch': 0, 'endTmEpoch': 0, } def tearDown(self) -> None: self.temp_dir.cleanup() def test_all_traces_are_processed(self): readWaveformMSeed( self.q330_waveform_file, self.q330_waveform_file.name, self.station_id, self.channel_id, self.traces_info, self.data_time, self.temp_dir.name) self.assertEqual(len(self.traces_info), len(self.mseed_waveform_stream)) def test_readWaveformTrace_called(self): readWaveformMSeed( self.q330_waveform_file, self.q330_waveform_file.name, self.station_id, self.channel_id, self.traces_info, self.data_time, self.temp_dir.name) self.assertTrue(self.mock_read_waveform_trace.called) def test_start_data_time_earlier_than_earliest_start_time(self): self.mock_read_waveform_trace.return_value = { 'startTmEpoch': 51251, 'endTmEpoch': 2623623, } start_time = 0 self.data_time = [start_time, 1625532949] readWaveformMSeed( self.q330_waveform_file, self.q330_waveform_file.name, self.station_id, self.channel_id, self.traces_info, self.data_time, self.temp_dir.name) self.assertEqual(self.data_time[0], start_time) def test_end_data_time_later_than_latest_start_time(self): # End time set to be the last second of 9999 so as to be later than the # end time of any test data. self.mock_read_waveform_trace.return_value = { 'startTmEpoch': 51251, 'endTmEpoch': 2623623, } end_time = 253402326000 self.data_time = [1625443222, end_time] readWaveformMSeed( self.q330_waveform_file, self.q330_waveform_file.name, self.station_id, self.channel_id, self.traces_info, self.data_time, self.temp_dir.name) self.assertEqual(self.data_time[1], end_time) def test_data_time_is_between_earliest_start_and_latest_end_time(self): self.mock_read_waveform_trace.return_value = { 'startTmEpoch': 51251, 'endTmEpoch': 2623623, } start_time = 512579 end_time = 2623616 self.data_time = [start_time, end_time] expected_updated_data_time = [51251, 2623623] readWaveformMSeed( self.q330_waveform_file, self.q330_waveform_file.name, self.station_id, self.channel_id, self.traces_info, self.data_time, self.temp_dir.name) self.assertEqual(self.data_time, expected_updated_data_time) class TestReadWaveformReftek(TestCase): @classmethod def setUpClass(cls) -> None: rt130_dir = TEST_DATA_DIR.joinpath( 'RT130-sample/2017149.92EB/2017150/92EB') cls.rt130_waveform_file = rt130_dir.joinpath('1/000000015_0036EE80') cls.rt130_waveform = Reftek130.from_file(cls.rt130_waveform_file) cls.rt130_waveform_stream = Reftek130.to_stream(cls.rt130_waveform) cls.rt130_waveform_trace = cls.rt130_waveform_stream[0] def setUp(self) -> None: # The first element of the key, the unit ID, should be a character # string. However, the unit ID is stored as a byte string in the data # stream. While having the unit ID as a byte string should not cause # any problem with the test, we convert it to a character string anyway # to be consistent with how readWaveformReftek is called. self.key = ( self.rt130_waveform_trace.stats['unit_id'].decode('utf-8'), self.rt130_waveform_trace.stats['experiment_number'] ) self.read_data = {} self.data_time = [0, 0] self.temp_dir = tempfile.TemporaryDirectory() patcher = patch( 'sohstationviewer.model.handling_data.readWaveformTrace') self.addCleanup(patcher.stop) self.mock_read_waveform_trace = patcher.start() self.mock_read_waveform_trace.return_value = { 'startTmEpoch': 0, 'endTmEpoch': 0, } def tearDown(self) -> None: self.temp_dir.cleanup() def test_all_traces_are_processed(self): readWaveformReftek(self.rt130_waveform, self.key, self.read_data, self.data_time, self.temp_dir.name) num_traces_read = 0 for channel_data in self.read_data.values(): num_traces_read += len(channel_data['tracesInfo']) self.assertTrue(num_traces_read, len(self.rt130_waveform_stream)) def test_read_data_updated_with_all_channels(self): readWaveformReftek(self.rt130_waveform, self.key, self.read_data, self.data_time, self.temp_dir.name) self.assertTupleEqual(tuple(self.read_data.keys()), ('DS1-1', 'DS1-2', 'DS1-3')) def test_read_data_existing_channel_appended_to(self): self.read_data = { 'DS1-1': {'tracesInfo': [{'startTmEpoch': 0, 'endTmEpoch': 0}], 'samplerate': 40.0} } readWaveformReftek(self.rt130_waveform, self.key, self.read_data, self.data_time, self.temp_dir.name) self.assertEqual(len(self.read_data['DS1-1']['tracesInfo']), 2) def test_readWaveformTrace_called(self): readWaveformReftek(self.rt130_waveform, self.key, self.read_data, self.data_time, self.temp_dir.name) self.assertTrue(self.mock_read_waveform_trace.called) def test_start_data_time_earlier_than_earliest_start_time(self): self.mock_read_waveform_trace.return_value = { 'startTmEpoch': 51251, 'endTmEpoch': 2623623, } start_time = 0 self.data_time = [start_time, 1625532949] readWaveformReftek(self.rt130_waveform, self.key, self.read_data, self.data_time, self.temp_dir.name) self.assertEqual(self.data_time[0], start_time) def test_end_data_time_later_than_latest_start_time(self): # End time set to be the last second of 9999 so as to be later than the # end time of any test data. self.mock_read_waveform_trace.return_value = { 'startTmEpoch': 51251, 'endTmEpoch': 2623623, } end_time = 253402326000 self.data_time = [1625443222, end_time] readWaveformReftek(self.rt130_waveform, self.key, self.read_data, self.data_time, self.temp_dir.name) self.assertEqual(self.data_time[1], end_time) def test_data_time_is_between_earliest_start_and_latest_end_time(self): self.mock_read_waveform_trace.return_value = { 'startTmEpoch': 51251, 'endTmEpoch': 2623623, } start_time = 512579 end_time = 2623616 self.data_time = [start_time, end_time] expected_updated_data_time = [51251, 2623623] readWaveformReftek(self.rt130_waveform, self.key, self.read_data, self.data_time, self.temp_dir.name) self.assertEqual(self.data_time, expected_updated_data_time) class TestReadASCII(TestCase): @classmethod def setUpClass(cls) -> None: q330_dir = TEST_DATA_DIR.joinpath('Q330-sample/day_vols_AX08') cls.q330_log_file = q330_dir.joinpath('AX08.XA..LOG.2021.186') cls.mseed_ascii_stream = read_ms(cls.q330_log_file) cls.mseed_ascii_trace = cls.mseed_ascii_stream[0] def track_info(self, msg, msg_type): self.info_tracked.append((msg, msg_type)) def setUp(self) -> None: self.station_id = self.mseed_ascii_trace.stats.station self.channel_id = self.mseed_ascii_trace.stats.channel self.log_data = {} self.info_tracked = [] def test_station_and_channel_inserted_into_log_data(self): readASCII(self.q330_log_file, None, self.station_id, self.channel_id, self.mseed_ascii_trace, self.log_data, self.track_info) self.assertTrue(self.station_id in self.log_data) self.assertTrue(self.channel_id in self.log_data[self.station_id]) def test_trace_contains_log_data(self): returned_file = readASCII(self.q330_log_file, None, self.station_id, self.channel_id, self.mseed_ascii_trace, self.log_data, self.track_info) self.assertIsNone(returned_file) log_string = self.log_data[self.station_id][self.channel_id][0] log_lines = log_string.split('\n') # Q330 uses \r\n as the new line character, so we need to remove the \r # after splitting the log string based on \n log_lines = [line.strip('\r') for line in log_lines] # The first four elements of log_lines will be there whether anything # is read from the file, so we know that something has been read from # the file when log_lines have more than four elements. self.assertGreater(len(log_lines), 4) # Check that we are not reading in gibberish self.assertEqual( log_lines[4], 'Quanterra Packet Baler Model 14 Restart. Version 2.26' ) @expectedFailure def test_trace_does_not_contain_log_data(self): raise NotImplementedError( self.test_trace_does_not_contain_log_data.__qualname__) # We are only reassigning the data reference of the trace and are not # modifying the stored data. As a result, we only need a shallow copy # of the trace. trace = shallow_copy(self.mseed_ascii_trace) trace.data = np.array([]) returned_file = readASCII(self.q330_log_file, None, self.station_id, self.channel_id, trace, self.log_data, self.track_info) self.assertIsNotNone(returned_file) log_header = ( '\n\n**** STATE OF HEALTH: From:2021-07-05T03:37:40.120000Z ' 'To:2021-07-05T03:37:40.120000Z\n' ) log_info = self.mseed_ascii_trace.data.tobytes().decode() log_string = log_header + log_info print(self.log_data[self.station_id][self.channel_id][0]) self.assertEqual(log_string, self.log_data[self.station_id][self.channel_id][0]) class TestReadText(TestCase): def setUp(self): self.text_file = tempfile.NamedTemporaryFile(mode='w+t') self.text_file.write('Test text') self.text_file.flush() self.text_file_name = Path(self.text_file.name).name self.non_text_file = TEST_DATA_DIR.joinpath('Q330-sample/' 'day_vols_AX08/' 'AX08.XA..HHE.2021.186') self.text_logs = [] def tearDown(self) -> None: self.text_file.close() def test_log_appended_to(self): readText(self.text_file.name, self.text_file.name, self.text_logs) self.assertGreater(len(self.text_logs), 0) def test_text_file(self): readText(self.text_file.name, Path(self.text_file.name).name, self.text_logs) self.assertEqual( self.text_logs[0], f'\n\n** STATE OF HEALTH: {Path(self.text_file.name).name}\nTest text') # noqa: E501 def test_non_text_file(self): with self.assertRaises(Exception): readText(self.non_text_file, self.non_text_file.name, self.text_logs) def test_non_existent_file(self): non_existent_file = TEST_DATA_DIR.joinpath('non_existent_file') with self.assertRaises(FileNotFoundError): readText(str(non_existent_file), non_existent_file.name, self.text_logs) class TestSaveData2File(TestCase): def setUp(self) -> None: self.temp_dir = tempfile.TemporaryDirectory() self.time_data = 'timedata' self.station_id = 'station' self.channel_id = 'channel' self.trace_idx = 0 self.trace_size = 100 tracemalloc.start() # We do not need the metadata of a data set (stored in Trace.stats), so # we can create a trace without reading any data set. This approach # allows us to create a trace that contains a large amount of data # without having to store a large data set in the GitLab server. self.trace = Trace(np.ones(self.trace_size)) # The current implementation stores the data in a memmap that contains # 64-bit integers, and so each data point has a size of 8 bytes in the # memmap self.data_point_size = 8 def tearDown(self) -> None: self.temp_dir.cleanup() tracemalloc.stop() def test_file_created(self): mem_file_name = saveData2File(self.temp_dir.name, self.time_data, self.station_id, self.channel_id, self.trace, self.trace_idx, self.trace_size) self.assertTrue(mem_file_name.is_file()) expected_mem_file_name = "station-channel-timedata-0" self.assertEqual(mem_file_name.name, expected_mem_file_name) memmap_size = self.data_point_size * self.trace_size self.assertEqual(mem_file_name.stat().st_size, memmap_size) @patch.object(np, 'memmap') def test_memmap_called(self, mock_memmap): saveData2File(self.temp_dir.name, self.time_data, self.station_id, self.channel_id, self.trace, self.trace_idx, self.trace_size) self.assertTrue(mock_memmap.called) @expectedFailure def test_memory_freed(self): # This test works by calculating the amount of memory freed after # saveData2File is called. Because there are operations happening # between the memory being freed and the calculation, the calculated # amount of memory won't be exact. Fixing this would involve removing # all the aforementioned intermediate operations, which is (very # likely) impossible and does not provide much value due to the actual # deviation being small (consistently at most 1000 bytes). # The difference between the calculated amount of freed memory and the # size of the memmap is around 1000 bytes, we want data that is at # least an order of magnitude bigger to ensure that this difference is # obvious in our calculation. self.trace_size = 10000 self.trace = Trace(np.ones(self.trace_size)) memmap_size = self.trace_size * self.data_point_size current_mem_used = tracemalloc.get_traced_memory()[0] saveData2File(self.temp_dir.name, self.time_data, self.station_id, self.channel_id, self.trace, self.trace_idx, self.trace_size) freed_mem = current_mem_used - tracemalloc.get_traced_memory()[0] freed_mem_max_error = 1000 self.assertTrue(isclose(memmap_size, freed_mem, abs_tol=freed_mem_max_error)) def test_empty_trace(self): self.trace_size = 0 self.trace = Trace(np.ones(self.trace_size)) with self.assertRaises(ValueError): saveData2File(self.temp_dir.name, self.time_data, self.station_id, self.channel_id, self.trace, self.trace_idx, self.trace_size) class TestCheckChan(TestCase): def setUp(self) -> None: self.req_soh_chans = ['LCE', 'LCQ'] self.req_wf_chans = ['LHE', 'HHE'] check_soh_chan_patcher = patch( 'sohstationviewer.model.handling_data.checkSOHChan', wraps=checkSOHChan ) self.addCleanup(check_soh_chan_patcher.stop) self.mock_check_soh_chan = check_soh_chan_patcher.start() check_wf_chan_patcher = patch( 'sohstationviewer.model.handling_data.checkWFChan', wraps=checkWFChan ) self.addCleanup(check_wf_chan_patcher.stop) self.mock_check_wf_chan = check_wf_chan_patcher.start() def test_channel_is_waveform_and_is_requested(self): waveform_channel = 'LHE' ret = checkChan(waveform_channel, self.req_soh_chans, self.req_wf_chans) self.assertEqual(ret, 'WF') self.assertTrue(self.mock_check_wf_chan.called) def test_channel_is_waveform_but_not_requested(self): waveform_channel = 'HH1' ret = checkChan(waveform_channel, self.req_soh_chans, self.req_wf_chans) self.assertFalse(ret) self.assertTrue(self.mock_check_wf_chan.called) def test_channel_is_soh_and_is_requested(self): with self.subTest('test_normal_channel'): soh_channel = 'LCE' ret = checkChan(soh_channel, self.req_soh_chans, self.req_wf_chans) self.assertEqual(ret, 'SOH') self.assertTrue(self.mock_check_soh_chan.called) self.mock_check_soh_chan.reset_mock() with self.subTest('test_mass_position_channel'): soh_channel = 'VM1' ret = checkChan(soh_channel, self.req_soh_chans, self.req_wf_chans) self.assertEqual(ret, 'SOH') self.assertTrue(self.mock_check_soh_chan.called) def test_channel_is_soh_but_not_requested(self): soh_channel = 'VKI' ret = checkChan(soh_channel, self.req_soh_chans, self.req_wf_chans) self.assertFalse(ret) self.assertTrue(self.mock_check_soh_chan.called) class TestCheckSohChan(TestCase): def setUp(self) -> None: self.req_soh_chans = ['LCE', 'LCQ', 'EX?'] self.sample_channel_ids = ['ODV', 'QGA', 'NF4', 'OLY', 'UZM'] def test_all_channels_requested(self): self.req_soh_chans = [] for channel_id in self.sample_channel_ids: self.assertTrue(checkSOHChan(channel_id, self.req_soh_chans)) def test_channel_is_requested(self): with self.subTest('test_normal_channels'): channel_id = 'LCE' self.assertTrue(checkSOHChan(channel_id, self.req_soh_chans)) with self.subTest('test_mass_position_channels'): base_channel_id = 'VM' channel_suffixes = ['0', '1', '2', '3', '4', '5', '6'] for suffix in channel_suffixes: channel_id = base_channel_id + suffix self.assertTrue(checkSOHChan(channel_id, self.req_soh_chans)) with self.subTest('test_external_soh_channels'): base_channel_id = 'EX' channel_suffixes = ['1', '2', '3'] for suffix in channel_suffixes: channel_id = base_channel_id + suffix self.assertTrue(checkSOHChan(channel_id, self.req_soh_chans)) def test_channel_not_requested(self): for channel_id in self.sample_channel_ids: self.assertFalse(checkSOHChan(channel_id, self.req_soh_chans)) class TestCheckWfChan(TestCase): def setUp(self) -> None: self.req_wf_chans = ['LHE', 'HHE'] self.sample_channel_ids = ['LHE', 'HHE', 'AL2', 'MNZ', 'VNN'] def test_all_channels_requested(self): self.req_wf_chans = ['*'] with self.subTest('test_waveform_channel'): for channel_id in self.sample_channel_ids: self.assertTupleEqual( checkWFChan(channel_id, self.req_wf_chans), ('WF', True) ) with self.subTest('test_non_waveform_channel'): channel_id = 'Not a waveform channel' self.assertTupleEqual( checkWFChan(channel_id, self.req_wf_chans), ('', True) ) def test_channel_is_requested(self): channel_id = 'LHE' self.assertTupleEqual( checkWFChan(channel_id, self.req_wf_chans), ('WF', True) ) def test_channel_not_requested(self): with self.subTest('test_waveform_channel'): channel_id = 'AHE' self.assertTupleEqual( checkWFChan(channel_id, self.req_wf_chans), ('WF', False) ) with self.subTest('test_non_waveform_channel'): channel_id = 'Not a waveform channel' self.assertTupleEqual( checkWFChan(channel_id, self.req_wf_chans), ('', False) ) class TestSortData(TestCase): def setUp(self): station_id = 'station' channel_id = 'channel' self.data_dict = {} self.data_dict[station_id] = {} station_data = self.data_dict[station_id] station_data['readData'] = {} read_data = station_data['readData'] read_data[channel_id] = {} channel_data = read_data[channel_id] channel_data['tracesInfo'] = [] self.traces_list: list = channel_data['tracesInfo'] self.sample_timestamps = [593960929, 336078110, 159498833, 77413117, 359137083, 445180140, 280423288, 13462065, 546898731, 219269289, 224077281, 111636543] @expectedFailure def test_unsorted_data_is_sorted(self): for timestamp in self.sample_timestamps: self.traces_list.append({'startTmEpoch': timestamp}) self.assertDictEqual(sortData(self.data_dict), self.data_dict) @expectedFailure def test_sorted_data_stays_the_same(self): self.sample_timestamps.sort() for timestamp in self.sample_timestamps: self.traces_list.append({'startTmEpoch': timestamp}) self.assertDictEqual(sortData(self.data_dict), self.data_dict) class TestSquashGaps(TestCase): def test_two_gaps_squashed_chronological_order(self): """ Test squash_gaps - there are two gaps that can be merged and the gaps are in chronological order with regards to their start time. Each subtest case includes a gap that can't be squashed into either of the other two gaps. """ with self.subTest('test_gaps_overlap_in_middle'): gaps = [[-1000, -1000], [0, 75], [50, 100]] expected = [[-1000, -1000], [0, 100]] self.assertListEqual(squash_gaps(gaps), expected) with self.subTest('test_one_gap_contained_in_another_gap'): gaps = [[-1000, -1000], [0, 100], [25, 75]] expected = [[-1000, -1000], [0, 100]] self.assertListEqual(squash_gaps(gaps), expected) with self.subTest('test_gaps_are_the_same'): gaps = [[-1000, -1000], [0, 100], [0, 100]] expected = [[-1000, -1000], [0, 100]] self.assertListEqual(squash_gaps(gaps), expected) with self.subTest('test_gaps_overlap_at_end_points'): gaps = [[-1000, -1000], [0, 50], [50, 100]] expected = [[-1000, -1000], [0, 100]] self.assertListEqual(squash_gaps(gaps), expected) def test_two_gaps_can_be_squashed_reverse_chronological_order(self): """ Test squash_gaps - there are two gaps that can be merged and the gaps are in reverse chronological order with regards to their start time. Each subtest case includes a gap that can't be squashed into either of the other two gaps. """ with self.subTest('test_gaps_overlap_in_middle'): gaps = [[1000, 1000], [50, 100], [0, 75]] expected = [[1000, 1000], [0, 100]] self.assertListEqual(squash_gaps(gaps), expected) with self.subTest('test_one_gap_contained_in_another_gap'): gaps = [[1000, 1000], [25, 75], [0, 100]] expected = [[1000, 1000], [0, 100]] self.assertListEqual(squash_gaps(gaps), expected) with self.subTest('test_gaps_are_the_same'): gaps = [[1000, 1000], [0, 100], [0, 100]] expected = [[1000, 1000], [0, 100]] self.assertListEqual(squash_gaps(gaps), expected) with self.subTest('test_gaps_overlap_at_end_points'): gaps = [[1000, 1000], [50, 100], [0, 50]] expected = [[1000, 1000], [0, 100]] self.assertListEqual(squash_gaps(gaps), expected) def test_no_gap_can_be_squashed_chronological_order(self): gaps = [[1, 100], [101, 200], [201, 300], [301, 400]] self.assertListEqual(squash_gaps(gaps), gaps) def test_no_gap_can_be_squashed_reverse_chronological_order(self): gaps = [[301, 400], [201, 300], [101, 200], [1, 100]] self.assertListEqual(squash_gaps(gaps), gaps) def test_no_gap_in_data(self): gaps = [] self.assertListEqual(squash_gaps(gaps), []) class TestDownsample(TestCase): def setUp(self) -> None: patcher = patch('sohstationviewer.model.handling_data.chunk_minmax') self.addCleanup(patcher.stop) self.mock_chunk_minmax = patcher.start() self.times = np.arange(1000) self.data = np.arange(1000) def test_first_downsample_step_remove_enough_points(self): req_points = 999 downsample(self.times, self.data, req_points) self.assertFalse(self.mock_chunk_minmax.called) def test_second_downsample_step_required(self): req_points = 1 downsample(self.times, self.data, req_points) self.assertTrue(self.mock_chunk_minmax.called) times, data, _ = self.mock_chunk_minmax.call_args[0] self.assertIsNot(times, self.times) self.assertIsNot(data, self.data) def test_requested_points_greater_than_data_size(self): req_points = 10000 times, data, = downsample(self.times, self.data, req_points) self.assertFalse(self.mock_chunk_minmax.called) # Check that we did not do any processing on the times and data arrays. # This ensures that we don't do two unneeded copy operations. self.assertIs(times, self.times) self.assertIs(data, self.data) def test_requested_points_is_zero(self): req_points = 0 downsample(self.times, self.data, req_points) self.assertTrue(self.mock_chunk_minmax.called) times, data, _ = self.mock_chunk_minmax.call_args[0] self.assertIsNot(times, self.times) self.assertIsNot(data, self.data) def test_empty_times_and_data(self): req_points = 1000 self.times = np.empty((0, 0)) self.data = np.empty((0, 0)) times, data = downsample(self.times, self.data, req_points) self.assertFalse(self.mock_chunk_minmax.called) # Check that we did not do any processing on the times and data arrays. # This ensures that we don't do two unneeded copy operations. self.assertIs(times, self.times) self.assertIs(data, self.data) class TestChunkMinmax(TestCase): def setUp(self): self.times = np.arange(1000) self.data = np.arange(1000) def test_data_size_is_multiple_of_requested_points(self): req_points = 100 times, data = chunk_minmax(self.times, self.data, req_points) self.assertEqual(times.size, req_points) self.assertEqual(data.size, req_points) @patch('sohstationviewer.model.handling_data.downsample', wraps=downsample) def test_data_size_is_not_multiple_of_requested_points(self, mock_downsample): req_points = 102 chunk_minmax(self.times, self.data, req_points) self.assertTrue(mock_downsample.called) def test_requested_points_too_small(self): small_req_points_list = [0, 1] for req_points in small_req_points_list: with self.subTest(f'test_requested_points_is_{req_points}'): times, data = chunk_minmax(self.times, self.data, req_points) self.assertEqual(times.size, 0) self.assertEqual(data.size, 0) class TestTrimDownsampleSohChan(TestCase): @staticmethod def downsample(times, data, req_points): return times, data def setUp(self) -> None: self.channel_info = {} self.org_trace = { 'times': np.arange(1000), 'data': np.arange(1000) } self.channel_info['orgTrace'] = self.org_trace self.start_time = 250 self.end_time = 750 self.first_time = False patcher = patch('sohstationviewer.model.handling_data.downsample') self.addCleanup(patcher.stop) self.mock_downsample = patcher.start() self.mock_downsample.side_effect = self.downsample def num_points_outside_time_range(self, start_time, end_time): return len([data_point for data_point in self.org_trace['times'] if not start_time <= data_point <= end_time]) def test_mseed_start_time_later_than_times_data(self): self.start_time = 250 self.end_time = 1250 trim_downsample_SOHChan(self.channel_info, self.start_time, self.end_time, self.first_time) self.assertGreaterEqual(self.channel_info['times'].min(), self.start_time) self.assertEqual( self.org_trace['times'].size - self.channel_info['times'].size, self.num_points_outside_time_range(self.start_time, self.end_time) ) def test_mseed_end_time_earlier_than_times_data(self): self.start_time = -250 self.end_time = 750 trim_downsample_SOHChan(self.channel_info, self.start_time, self.end_time, self.first_time) self.assertLessEqual(self.channel_info['times'].max(), self.end_time) self.assertEqual( self.org_trace['times'].size - self.channel_info['times'].size, self.num_points_outside_time_range(self.start_time, self.end_time) ) def test_mseed_start_time_earlier_than_times_data(self): self.start_time = -250 self.end_time = 750 trim_downsample_SOHChan(self.channel_info, self.start_time, self.end_time, self.first_time) self.assertEqual(self.channel_info['times'].min(), self.channel_info['orgTrace']['times'].min()) self.assertEqual( self.org_trace['times'].size - self.channel_info['times'].size, self.num_points_outside_time_range(self.start_time, self.end_time) ) def test_mseed_end_time_later_than_times_data(self): self.start_time = 250 self.end_time = 1250 trim_downsample_SOHChan(self.channel_info, self.start_time, self.end_time, self.first_time) self.assertEqual(self.channel_info['times'].max(), self.channel_info['orgTrace']['times'].max()) self.assertEqual( self.org_trace['times'].size - self.channel_info['times'].size, self.num_points_outside_time_range(self.start_time, self.end_time) ) def test_mseed_times_data_contained_in_time_range(self): self.start_time = -250 self.end_time = 1250 trim_downsample_SOHChan(self.channel_info, self.start_time, self.end_time, self.first_time) np.testing.assert_array_equal(self.channel_info['times'], self.org_trace['times']) def test_mseed_time_range_is_the_same_as_times_data(self): self.start_time = 0 self.end_time = 999 trim_downsample_SOHChan(self.channel_info, self.start_time, self.end_time, self.first_time) np.testing.assert_array_equal(self.channel_info['times'], self.org_trace['times']) def test_mseed_time_range_does_not_overlap_times_data(self): self.start_time = 2000 self.end_time = 3000 trim_downsample_SOHChan(self.channel_info, self.start_time, self.end_time, self.first_time) self.assertEqual(self.channel_info['times'].size, 0) self.assertEqual(self.channel_info['data'].size, 0) def test_mseed_data_is_downsampled(self): trim_downsample_SOHChan(self.channel_info, self.start_time, self.end_time, self.first_time) self.assertTrue(self.mock_downsample.called) def test_mseed_processed_data_is_stored_in_appropriate_location(self): trim_downsample_SOHChan(self.channel_info, self.start_time, self.end_time, self.first_time) expected_keys = ('orgTrace', 'times', 'data') self.assertTupleEqual(tuple(self.channel_info.keys()), expected_keys) @expectedFailure def test_reftek(self): self.fail() class TestTrimDownsampleWfChan(TestCase): def setUp(self) -> None: pass @expectedFailure def test(self): raise NotImplementedError(type(self)) class TestGetEachDay5MinList(TestCase): @staticmethod def calc_next_day_start_time(time: float): utc_time = UTCDateTime(time) start_of_day_utc_time = utc_time.replace(second=0, minute=0, hour=0) start_of_next_day_time = start_of_day_utc_time.timestamp + SEC_DAY return start_of_next_day_time def test_one_five_minute_list_each_day_times_are_same_day(self): num_of_days = 1 with self.subTest('test_positive_time'): start_time = UTCDateTime(1970, 1, 2, 4, 12, 52).timestamp end_time = UTCDateTime(1970, 1, 2, 15, 24, 43).timestamp five_min_lists = get_eachDay5MinList(start_time, end_time) self.assertEqual(len(five_min_lists), num_of_days) with self.subTest('test_negative_time'): start_time = UTCDateTime(1969, 10, 21, 4, 12, 52).timestamp end_time = UTCDateTime(1969, 10, 21, 15, 24, 43).timestamp five_min_lists = get_eachDay5MinList(start_time, end_time) self.assertEqual(len(five_min_lists), num_of_days) def test_one_five_minute_list_each_day_times_not_start_of_day(self): num_of_days = 5 test_names = ( 'test_positive_start_time', 'test_negative_start_time_and_positive_end_time', 'test_negative_start_time_and_negative_end_time' ) test_inputs = ( ( UTCDateTime(1970, 1, 1, 0, 2, 31).timestamp, UTCDateTime(1970, 1, 5, 4, 2, 52).timestamp ), ( UTCDateTime(1969, 12, 30, 4, 2, 52).timestamp, UTCDateTime(1970, 1, 3, 4, 2, 52).timestamp ), ( UTCDateTime(1969, 12, 20, 4, 2, 52).timestamp, UTCDateTime(1969, 12, 24, 4, 2, 52).timestamp ), ) for test_name, time_pair in zip(test_names, test_inputs): with self.subTest(test_name): start_time, end_time = time_pair five_min_lists = get_eachDay5MinList(start_time, end_time) self.assertEqual(len(five_min_lists), num_of_days) def test_one_five_minute_list_each_day_start_time_is_start_of_day(self): num_of_days = 5 with self.subTest('test_zero_start_time'): start_time = 0 end_time = UTCDateTime(1970, 1, 5, 12, 12, 12).timestamp five_min_lists = get_eachDay5MinList(start_time, end_time) self.assertEqual(len(five_min_lists), num_of_days) with self.subTest('test_positive_start_time'): start_time = UTCDateTime(1970, 1, 2, 0, 0, 0).timestamp end_time = UTCDateTime(1970, 1, 6, 12, 12, 12).timestamp five_min_lists = get_eachDay5MinList(start_time, end_time) self.assertEqual(len(five_min_lists), num_of_days) with self.subTest('test_negative_start_time'): start_time = UTCDateTime(1969, 12, 24, 0, 0, 0).timestamp end_time = UTCDateTime(1969, 12, 28, 12, 12, 12).timestamp five_min_lists = get_eachDay5MinList(start_time, end_time) self.assertEqual(len(five_min_lists), num_of_days) def test_one_five_minute_list_each_day_end_time_is_start_of_day(self): num_of_days = 4 with self.subTest('test_zero_end_time'): start_time = UTCDateTime(1969, 12, 28, 12, 12, 12).timestamp end_time = 0 five_min_lists = get_eachDay5MinList(start_time, end_time) self.assertEqual(len(five_min_lists), num_of_days) with self.subTest('test_positive_end_time'): start_time = UTCDateTime(1970, 1, 2, 12, 12, 12).timestamp end_time = UTCDateTime(1970, 1, 6, 0, 0, 0).timestamp five_min_lists = get_eachDay5MinList(start_time, end_time) self.assertEqual(len(five_min_lists), num_of_days) with self.subTest('test_negative_end_time'): start_time = UTCDateTime(1969, 12, 16, 12, 12, 12).timestamp end_time = UTCDateTime(1969, 12, 20, 0, 0, 0).timestamp five_min_lists = get_eachDay5MinList(start_time, end_time) self.assertEqual(len(five_min_lists), num_of_days) def test_each_day_contains_288_five_minutes_interval(self): test_names = ( 'test_positive_start_time', 'test_negative_start_time_and_positive_end_time', 'test_negative_start_time_and_negative_end_time' ) test_inputs = ( ( UTCDateTime(1970, 1, 1, 0, 2, 31).timestamp, UTCDateTime(1970, 1, 5, 4, 2, 52).timestamp ), ( UTCDateTime(1969, 12, 30, 4, 2, 52).timestamp, UTCDateTime(1970, 1, 3, 4, 2, 52).timestamp ), ( UTCDateTime(1969, 12, 20, 4, 2, 52).timestamp, UTCDateTime(1969, 12, 24, 4, 2, 52).timestamp ), ) for test_name, time_pair in zip(test_names, test_inputs): with self.subTest(test_name): start_time, end_time = time_pair five_min_lists = get_eachDay5MinList(start_time, end_time) self.assertTrue(all(day_5_min_list.size == NO_5M_DAY for day_5_min_list in five_min_lists)) def test_end_minutes_correct_start_day_equal_to_end_day(self): test_names = ( 'test_positive_time', 'test_negative_time' ) test_inputs = ( ( UTCDateTime(1970, 1, 1, 5, 21, 25).timestamp, UTCDateTime(1970, 1, 1, 6, 6, 12).timestamp ), ( UTCDateTime(1969, 12, 20, 1, 12, 26).timestamp, UTCDateTime(1969, 12, 20, 6, 7, 51).timestamp ), ) for test_name, time_pair in zip(test_names, test_inputs): with self.subTest(test_name): start_time, end_time = time_pair five_min_lists = get_eachDay5MinList(start_time, end_time) self.assertEqual( self.calc_next_day_start_time(start_time) - SEC_5M, five_min_lists[0][-1] ) def test_end_minutes_correct_start_day_and_end_day_differ_by_one_day(self): test_names = ( 'test_positive_start_time', 'test_negative_start_time_and_positive_end_time', 'test_negative_start_time_and_negative_end_time' ) test_inputs = ( ( UTCDateTime(1970, 1, 1, 5, 21, 25).timestamp, UTCDateTime(1970, 1, 2, 4, 6, 12).timestamp ), ( UTCDateTime(1969, 12, 31, 1, 12, 26).timestamp, UTCDateTime(1970, 1, 1, 6, 7, 51).timestamp ), ( UTCDateTime(1969, 12, 30, 11, 25, 9).timestamp, UTCDateTime(1969, 12, 31, 21, 5, 51).timestamp ), ) for test_name, time_pair in zip(test_names, test_inputs): with self.subTest(test_name): start_time, end_time = time_pair five_min_lists = get_eachDay5MinList(start_time, end_time) self.assertEqual( self.calc_next_day_start_time(start_time) - SEC_5M, five_min_lists[0][-1] ) self.assertEqual( self.calc_next_day_start_time(end_time) - SEC_5M, five_min_lists[1][-1] ) def test_end_minutes_correct_start_day_and_end_day_differ_by_multiple_days(self): # noqa test_names = ( 'test_positive_start_time', 'test_negative_start_time_and_positive_end_time', 'test_negative_start_time_and_negative_end_time' ) test_inputs = ( ( UTCDateTime(1970, 1, 1, 5, 21, 25).timestamp, UTCDateTime(1970, 1, 5, 4, 6, 12).timestamp ), ( UTCDateTime(1969, 12, 31, 1, 12, 26).timestamp, UTCDateTime(1970, 1, 4, 4, 7, 51).timestamp ), ( UTCDateTime(1969, 12, 20, 11, 25, 9).timestamp, UTCDateTime(1969, 12, 24, 21, 5, 51).timestamp ), ) for test_name, time_pair in zip(test_names, test_inputs): with self.subTest(test_name): start_time, end_time = time_pair five_min_lists = get_eachDay5MinList(start_time, end_time) curr_day_time = start_time day_after_last_start = self.calc_next_day_start_time(end_time) day_from_start = 0 while curr_day_time < day_after_last_start: self.assertEqual( self.calc_next_day_start_time(curr_day_time) - SEC_5M, five_min_lists[day_from_start][-1] ) curr_day_time += SEC_DAY day_from_start += 1 def test_each_interval_is_five_minutes_apart(self): start_time = UTCDateTime(1969, 12, 25, 12, 12, 12).timestamp end_time = UTCDateTime(1970, 1, 5, 12, 12, 12).timestamp five_min_lists = get_eachDay5MinList(start_time, end_time) a = np.apply_along_axis(lambda arr: np.absolute(arr[1:] - arr[:-1]), 1, five_min_lists) self.assertTrue(all(np.ravel(a) == SEC_5M)) def test_start_time_is_equal_to_end_time(self): with self.subTest('test_zero_time'): input_time = 0 with self.assertRaises(Exception): get_eachDay5MinList(input_time, input_time) with self.subTest('test_positive_time'): input_time = UTCDateTime(1970, 2, 1, 12, 25, 51).timestamp five_min_lists = get_eachDay5MinList(input_time, input_time) self.assertEqual(len(five_min_lists), 1) self.assertEqual( self.calc_next_day_start_time(input_time) - SEC_5M, five_min_lists[0][-1] ) with self.subTest('test_negative_time'): input_time = UTCDateTime(1969, 11, 7, 12, 25, 51).timestamp five_min_lists = get_eachDay5MinList(input_time, input_time) self.assertEqual(len(five_min_lists), 1) self.assertEqual( self.calc_next_day_start_time(input_time) - SEC_5M, five_min_lists[0][-1] ) def test_end_time_earlier_than_start_time(self): start_time = UTCDateTime(9999, 12, 31, 23, 59, 59).timestamp end_time = UTCDateTime(1, 1, 1, 0, 0).timestamp with self.assertRaises(Exception): get_eachDay5MinList(start_time, end_time) class TestGetTrimTpsData(TestCase): def setUp(self) -> None: pass @expectedFailure def test(self): raise NotImplementedError(type(self))