Skip to content
Snippets Groups Projects
Commit f78cd62d authored by Kien Le's avatar Kien Le
Browse files

Add tests for get_trimTPSData

parent a963374a
No related branches found
No related tags found
1 merge request!33Add tests and refactor trim_downsample_WFChan
...@@ -5,11 +5,14 @@ from unittest import TestCase ...@@ -5,11 +5,14 @@ from unittest import TestCase
from unittest.mock import patch from unittest.mock import patch
import numpy as np import numpy as np
from obspy.core import UTCDateTime
from sohstationviewer.conf import constants as const from sohstationviewer.conf import constants as const
from sohstationviewer.model.handling_data import ( from sohstationviewer.model.handling_data import (
downsample, downsample,
trim_downsample_WFChan, trim_downsample_WFChan,
get_eachDay5MinList,
get_trimTPSData,
) )
ORIGINAL_CHAN_SIZE_LIMIT = const.CHAN_SIZE_LIMIT ORIGINAL_CHAN_SIZE_LIMIT = const.CHAN_SIZE_LIMIT
...@@ -121,3 +124,142 @@ class TestTrimDownsampleWfChan(TestCase): ...@@ -121,3 +124,142 @@ class TestTrimDownsampleWfChan(TestCase):
self.assertTrue('times' not in self.channel_data) self.assertTrue('times' not in self.channel_data)
self.assertTrue('data' not in self.channel_data) self.assertTrue('data' not in self.channel_data)
const.RECAL_SIZE_LIMIT = ORIGINAL_RECAL_SIZE_LIMIT const.RECAL_SIZE_LIMIT = ORIGINAL_RECAL_SIZE_LIMIT
class TestGetTrimTpsData(TestCase):
def no_file_memmap(self, file_path: Path, **kwargs):
# Data will look the same as times. This has two benefits:
# - It is a lot easier to inspect what data remains after trimming
# and downsampling, seeing as the remaining data would be the same
# as the remaining times.
# - It is a lot easier to reproducibly create a test data set.
file_idx = int(file_path.name.split('-')[-1])
if file_idx < const.SEC_DAY:
array_size = 100
start = file_idx * array_size
end = start + array_size
else:
array_size = const.SEC_5M * 10
start = file_idx
end = start + array_size
return np.arange(start, end)
def setUp(self) -> None:
memmap_patcher = patch.object(np, 'memmap',
side_effect=self.no_file_memmap)
self.addCleanup(memmap_patcher.stop)
memmap_patcher.start()
self.channel_data = {'samplerate': 1}
self.traces_info = []
self.channel_data['tracesInfo'] = self.traces_info
self.data_folder = TemporaryDirectory()
for i in range(100):
trace_size = 100
start_time = i * trace_size
trace = {}
trace['startTmEpoch'] = start_time
trace['endTmEpoch'] = start_time + trace_size - 1
trace['size'] = trace_size
times_file_name = Path(self.data_folder.name) / f'times-{i}'
trace['times_f'] = times_file_name
data_file_name = Path(self.data_folder.name) / f'data-{i}'
trace['data_f'] = data_file_name
self.traces_info.append(trace)
data_start_time = 0
data_end_time = 10000
self.each_day_5_min_list = get_eachDay5MinList(data_start_time,
data_end_time)
self.start_time = 2500
self.end_time = 7500
def test_data_out_of_range(self):
with self.subTest('test_start_time_later_than_data_end_time'):
self.start_time = 12500
self.end_time = 17500
self.assertFalse(
get_trimTPSData(self.channel_data, self.start_time,
self.end_time, self.each_day_5_min_list)
)
with self.subTest('test_end_time_earlier_than_data_start_time'):
self.start_time = -7500
self.end_time = -2500
self.assertFalse(
get_trimTPSData(self.channel_data, self.start_time,
self.end_time, self.each_day_5_min_list)
)
def test_result_is_stored_one_day_of_data(self):
num_day = 1
get_trimTPSData(self.channel_data, self.start_time,
self.end_time, self.each_day_5_min_list)
self.assertTrue('tps_data' in self.channel_data)
self.assertEqual(len(self.channel_data['tps_data']), num_day)
def test_result_is_stored_multiple_days_of_data(self):
second_day_data_start_time = int(
UTCDateTime(1970, 1, 2, 18, 0, 0).timestamp
)
self.end_time = UTCDateTime(1970, 1, 2, 6, 0, 0).timestamp
trace_size = const.SEC_5M * 10
trace = {}
trace['startTmEpoch'] = second_day_data_start_time
trace['endTmEpoch'] = second_day_data_start_time + trace_size - 1
trace['size'] = trace_size
times_file_name = Path(self.data_folder.name).joinpath(
f'times-{second_day_data_start_time}'
)
trace['times_f'] = times_file_name
data_file_name = Path(self.data_folder.name).joinpath(
f'data-{second_day_data_start_time}'
)
trace['data_f'] = data_file_name
self.traces_info.append(trace)
self.each_day_5_min_list = get_eachDay5MinList(0, trace['endTmEpoch'])
num_day = 2
get_trimTPSData(self.channel_data, self.start_time,
self.end_time, self.each_day_5_min_list)
self.assertTrue('tps_data' in self.channel_data)
self.assertEqual(len(self.channel_data['tps_data']), num_day)
def test_data_has_gaps(self):
third_day_data_start_time = int(
UTCDateTime(1970, 1, 3, 18, 0, 0).timestamp
)
self.end_time = UTCDateTime(1970, 1, 2, 6, 0, 0).timestamp
trace_size = const.SEC_5M * 10
trace = {}
trace['startTmEpoch'] = third_day_data_start_time
trace['endTmEpoch'] = third_day_data_start_time + trace_size - 1
trace['size'] = trace_size
times_file_name = Path(self.data_folder.name).joinpath(
f'times-{third_day_data_start_time}'
)
trace['times_f'] = times_file_name
data_file_name = Path(self.data_folder.name).joinpath(
f'data-{third_day_data_start_time}'
)
trace['data_f'] = data_file_name
self.traces_info.append(trace)
self.each_day_5_min_list = get_eachDay5MinList(0, trace['endTmEpoch'])
num_day = 3
get_trimTPSData(self.channel_data, self.start_time,
self.end_time, self.each_day_5_min_list)
self.assertTrue('tps_data' in self.channel_data)
self.assertEqual(len(self.channel_data['tps_data']), num_day)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment