from pathlib import Path from tempfile import TemporaryDirectory from unittest import TestCase from unittest.mock import patch import numpy as np from obspy.core import UTCDateTime from sohstationviewer.conf import constants as const from sohstationviewer.model.handling_data import ( downsample, trim_downsample_WFChan, get_eachDay5MinList, get_trimTPSData, trim_waveform_data, downsample_waveform_chan ) ORIGINAL_CHAN_SIZE_LIMIT = const.CHAN_SIZE_LIMIT ORIGINAL_RECAL_SIZE_LIMIT = const.RECAL_SIZE_LIMIT class TestTrimWfData(TestCase): def setUp(self) -> None: self.channel_data = {} self.traces_info = [] self.channel_data['tracesInfo'] = self.traces_info for i in range(100): trace_size = 100 start_time = i * trace_size trace = {} trace['startTmEpoch'] = start_time trace['endTmEpoch'] = start_time + trace_size - 1 self.traces_info.append(trace) self.start_time = 2500 self.end_time = 7500 def test_data_is_trimmed_neither_start_nor_end_time_is_trace_start_or_end_time(self): # noqa: E501 self.start_time = 2444 self.end_time = 7444 trimmed_traces_list = trim_waveform_data( self.channel_data, self.start_time, self.end_time ) self.assertTrue( trimmed_traces_list[0]['startTmEpoch'] <= self.start_time) self.assertTrue( trimmed_traces_list[0]['endTmEpoch'] > self.start_time ) trimmed_traces_list.pop(0) trimmed_traces_list.pop() is_left_trimmed = all(trace['startTmEpoch'] > self.start_time for trace in trimmed_traces_list) is_right_trimmed = all(trace['endTmEpoch'] <= self.end_time for trace in trimmed_traces_list) self.assertTrue(is_left_trimmed and is_right_trimmed) def test_data_out_of_range(self): with self.subTest('test_start_time_later_than_data_end_time'): self.start_time = 12500 self.end_time = 17500 self.assertFalse( trim_downsample_WFChan(self.channel_data, self.start_time, self.end_time, True) ) with self.subTest('test_end_time_earlier_than_data_start_time'): self.start_time = -7500 self.end_time = -2500 self.assertFalse( trim_downsample_WFChan(self.channel_data, self.start_time, self.end_time, True) ) def test_no_data(self): self.channel_data['tracesInfo'] = [] with self.assertRaises(IndexError): trim_waveform_data( self.channel_data, self.start_time, self.end_time ) def test_end_time_earlier_than_start_time(self): self.start_time, self.end_time = self.end_time, self.start_time trimmed_traces_list = trim_waveform_data( self.channel_data, self.start_time, self.end_time ) self.assertListEqual(trimmed_traces_list, []) def test_data_does_not_need_to_be_trimmed(self): with self.subTest('test_start_time_earlier_than_trace_earliest_time'): self.start_time = -2500 self.end_time = 7500 trimmed_traces_list = trim_waveform_data( self.channel_data, self.start_time, self.end_time ) self.assertEqual(len(trimmed_traces_list), 76) with self.subTest('test_end_time_later_than_trace_latest_time'): self.start_time = 2500 self.end_time = 12500 trimmed_traces_list = trim_waveform_data( self.channel_data, self.start_time, self.end_time ) self.assertEqual(len(trimmed_traces_list), 75) with self.subTest('test_data_contained_in_time_range'): self.start_time = self.traces_info[0]['startTmEpoch'] self.end_time = self.traces_info[-1]['endTmEpoch'] trimmed_traces_list = trim_waveform_data( self.channel_data, self.start_time, self.end_time ) self.assertEqual(len(trimmed_traces_list), len(self.traces_info)) class TestDownsampleWaveformData(TestCase): def no_file_memmap(self, file_path: Path, **kwargs): # Data will look the same as times. This has two benefits: # - It is a lot easier to inspect what data remains after trimming # and downsampling, seeing as the remaining data would be the same # as the remaining times. # - It is a lot easier to reproducibly create a test data set. array_size = 100 file_idx = int(file_path.name.split('-')[-1]) start = file_idx * array_size end = start + array_size return np.arange(start, end) def setUp(self) -> None: memmap_patcher = patch.object(np, 'memmap', side_effect=self.no_file_memmap) self.addCleanup(memmap_patcher.stop) memmap_patcher.start() self.channel_data = {} self.traces_info = [] self.channel_data['tracesInfo'] = self.traces_info self.data_folder = TemporaryDirectory() for i in range(100): trace_size = 100 start_time = i * trace_size trace = {} trace['startTmEpoch'] = start_time trace['endTmEpoch'] = start_time + trace_size - 1 trace['size'] = trace_size times_file_name = Path(self.data_folder.name) / f'times-{i}' trace['times_f'] = times_file_name data_file_name = Path(self.data_folder.name) / f'data-{i}' trace['data_f'] = data_file_name self.traces_info.append(trace) self.start_time = 2550 self.end_time = 7550 self.trimmed_traces_list = trim_waveform_data( self.channel_data, self.start_time, self.end_time ) @patch('sohstationviewer.model.handling_data.downsample', wraps=downsample) def test_data_is_downsampled(self, mock_downsample): const.CHAN_SIZE_LIMIT = 1000 downsample_waveform_chan(self.channel_data, self.trimmed_traces_list, self.start_time, self.end_time, True) self.assertTrue(mock_downsample.called) const.CHAN_SIZE_LIMIT = ORIGINAL_CHAN_SIZE_LIMIT def test_all_traces_handled(self): downsampled_times, downsampled_data = downsample_waveform_chan( self.channel_data, self.trimmed_traces_list, self.start_time, self.end_time, True ) self.assertEqual(len(downsampled_times), 51) self.assertEqual(len(downsampled_data), 51) def test_downsampling_not_needed(self): downsampled_times, downsampled_data = downsample_waveform_chan( self.channel_data, self.trimmed_traces_list, self.start_time, self.end_time, True ) with self.subTest('test_data_points_outside_time_range_removed'): self.assertEqual(downsampled_times.pop[0].size, 50) self.assertEqual(downsampled_times.pop[-1].size, 50) self.assertEqual(downsampled_data.pop[0].size, 50) self.assertEqual(downsampled_data.pop[-1].size, 50) with self.subTest('test_intermediate_data_points_not_removed'): self.assertTrue( all(times.size == 100 for times in downsampled_times) ) self.assertTrue( all(data.size == 100 for data in downsampled_times) ) def test_trace_list_empty(self): self.trimmed_traces_list = [] downsampled_times, downsampled_data = downsample_waveform_chan( self.channel_data, self.trimmed_traces_list, self.start_time, self.end_time, True ) self.assertListEqual(downsampled_times, []) self.assertListEqual(downsampled_data, []) def test_end_time_earlier_than_start_time(self): self.start_time, self.end_time = self.end_time, self.start_time downsampled_times, downsampled_data = downsample_waveform_chan( self.channel_data, self.trimmed_traces_list, self.start_time, self.end_time, True ) self.assertListEqual(downsampled_times, []) self.assertListEqual(downsampled_data, []) class TestTrimDownsampleWfChan(TestCase): def setUp(self) -> None: self.channel_data = {} self.traces_info = [] self.channel_data['tracesInfo'] = self.traces_info self.data_folder = TemporaryDirectory() for i in range(100): trace_size = 100 start_time = i * trace_size trace = {} trace['startTmEpoch'] = start_time trace['endTmEpoch'] = start_time + trace_size - 1 trace['size'] = trace_size times_file_name = Path(self.data_folder.name) / f'times-{i}' trace['times_f'] = times_file_name data_file_name = Path(self.data_folder.name) / f'data-{i}' trace['data_f'] = data_file_name self.traces_info.append(trace) self.start_time = 2500 self.end_time = 7500 def test_result_is_stored(self): trim_downsample_WFChan(self.channel_data, self.start_time, self.end_time, True) self.assertTrue('times' in self.channel_data) self.assertGreater(len(self.channel_data['times']), 0) self.assertTrue('data' in self.channel_data) self.assertGreater(len(self.channel_data['data']), 0) def test_data_small_enough_after_first_trim_flag_is_set(self): trim_downsample_WFChan(self.channel_data, self.start_time, self.end_time, True) self.assertTrue('fulldata' in self.channel_data) def test_no_additional_work_if_data_small_enough_after_first_trim(self): trim_downsample_WFChan(self.channel_data, self.start_time, self.end_time, True) current_times = self.channel_data['times'] current_data = self.channel_data['data'] trim_downsample_WFChan(self.channel_data, self.start_time, self.end_time, True) self.assertIs(current_times, self.channel_data['times']) self.assertIs(current_data, self.channel_data['data']) def test_data_too_large_after_trimming(self): const.RECAL_SIZE_LIMIT = 1 trim_downsample_WFChan(self.channel_data, self.start_time, self.end_time, False) self.assertTrue('times' not in self.channel_data) self.assertTrue('data' not in self.channel_data) const.RECAL_SIZE_LIMIT = ORIGINAL_RECAL_SIZE_LIMIT class TestGetTrimTpsData(TestCase): def no_file_memmap(self, file_path: Path, **kwargs): # Data will look the same as times. This has two benefits: # - It is a lot easier to inspect what data remains after trimming # and downsampling, seeing as the remaining data would be the same # as the remaining times. # - It is a lot easier to reproducibly create a test data set. file_idx = int(file_path.name.split('-')[-1]) if file_idx < const.SEC_DAY: array_size = 100 start = file_idx * array_size end = start + array_size else: array_size = const.SEC_5M * 10 start = file_idx end = start + array_size return np.arange(start, end) def setUp(self) -> None: memmap_patcher = patch.object(np, 'memmap', side_effect=self.no_file_memmap) self.addCleanup(memmap_patcher.stop) memmap_patcher.start() self.channel_data = {'samplerate': 1} self.traces_info = [] self.channel_data['tracesInfo'] = self.traces_info self.data_folder = TemporaryDirectory() for i in range(100): trace_size = 100 start_time = i * trace_size trace = {} trace['startTmEpoch'] = start_time trace['endTmEpoch'] = start_time + trace_size - 1 trace['size'] = trace_size times_file_name = Path(self.data_folder.name) / f'times-{i}' trace['times_f'] = times_file_name data_file_name = Path(self.data_folder.name) / f'data-{i}' trace['data_f'] = data_file_name self.traces_info.append(trace) data_start_time = 0 data_end_time = 10000 self.each_day_5_min_list = get_eachDay5MinList(data_start_time, data_end_time) self.start_time = 2500 self.end_time = 7500 def test_data_out_of_range(self): with self.subTest('test_start_time_later_than_data_end_time'): self.start_time = 12500 self.end_time = 17500 self.assertFalse( get_trimTPSData(self.channel_data, self.start_time, self.end_time, self.each_day_5_min_list) ) with self.subTest('test_end_time_earlier_than_data_start_time'): self.start_time = -7500 self.end_time = -2500 self.assertFalse( get_trimTPSData(self.channel_data, self.start_time, self.end_time, self.each_day_5_min_list) ) def test_result_is_stored_one_day_of_data(self): num_day = 1 get_trimTPSData(self.channel_data, self.start_time, self.end_time, self.each_day_5_min_list) self.assertTrue('tps_data' in self.channel_data) self.assertEqual(len(self.channel_data['tps_data']), num_day) def test_result_is_stored_multiple_days_of_data(self): second_day_data_start_time = int( UTCDateTime(1970, 1, 2, 18, 0, 0).timestamp ) self.end_time = UTCDateTime(1970, 1, 2, 6, 0, 0).timestamp trace_size = const.SEC_5M * 10 trace = {} trace['startTmEpoch'] = second_day_data_start_time trace['endTmEpoch'] = second_day_data_start_time + trace_size - 1 trace['size'] = trace_size times_file_name = Path(self.data_folder.name).joinpath( f'times-{second_day_data_start_time}' ) trace['times_f'] = times_file_name data_file_name = Path(self.data_folder.name).joinpath( f'data-{second_day_data_start_time}' ) trace['data_f'] = data_file_name self.traces_info.append(trace) self.each_day_5_min_list = get_eachDay5MinList(0, trace['endTmEpoch']) num_day = 2 get_trimTPSData(self.channel_data, self.start_time, self.end_time, self.each_day_5_min_list) self.assertTrue('tps_data' in self.channel_data) self.assertEqual(len(self.channel_data['tps_data']), num_day) def test_data_has_gaps(self): third_day_data_start_time = int( UTCDateTime(1970, 1, 3, 18, 0, 0).timestamp ) self.end_time = UTCDateTime(1970, 1, 2, 6, 0, 0).timestamp trace_size = const.SEC_5M * 10 trace = {} trace['startTmEpoch'] = third_day_data_start_time trace['endTmEpoch'] = third_day_data_start_time + trace_size - 1 trace['size'] = trace_size times_file_name = Path(self.data_folder.name).joinpath( f'times-{third_day_data_start_time}' ) trace['times_f'] = times_file_name data_file_name = Path(self.data_folder.name).joinpath( f'data-{third_day_data_start_time}' ) trace['data_f'] = data_file_name self.traces_info.append(trace) self.each_day_5_min_list = get_eachDay5MinList(0, trace['endTmEpoch']) num_day = 3 get_trimTPSData(self.channel_data, self.start_time, self.end_time, self.each_day_5_min_list) self.assertTrue('tps_data' in self.channel_data) self.assertEqual(len(self.channel_data['tps_data']), num_day)