Skip to content
Snippets Groups Projects
test_handling_data.py 12.9 KiB
Newer Older
from pathlib import Path
from tempfile import TemporaryDirectory

from unittest import TestCase
from unittest.mock import patch

import numpy as np
Kien Le's avatar
Kien Le committed
from obspy.core import UTCDateTime

from sohstationviewer.conf import constants as const
from sohstationviewer.model.handling_data import (
    downsample,
    trim_downsample_WFChan,
Kien Le's avatar
Kien Le committed
    get_eachDay5MinList,
    get_trimTPSData,
    trim_waveform_data,
)

ORIGINAL_CHAN_SIZE_LIMIT = const.CHAN_SIZE_LIMIT
ORIGINAL_RECAL_SIZE_LIMIT = const.RECAL_SIZE_LIMIT


class TestTrimWfData(TestCase):
    def setUp(self) -> None:
        self.channel_data = {}
        self.traces_info = []
        self.channel_data['tracesInfo'] = self.traces_info

        for i in range(100):
            trace_size = 100
            start_time = i * trace_size
            trace = {}
            trace['startTmEpoch'] = start_time
            trace['endTmEpoch'] = start_time + trace_size - 1
            self.traces_info.append(trace)
        self.start_time = 2500
        self.end_time = 7500

    def test_data_is_trimmed_neither_start_nor_end_time_is_trace_start_or_end_time(self):  # noqa: E501
        self.start_time = 2444
        self.end_time = 7444
        trimmed_traces_list = trim_waveform_data(
            self.channel_data, self.start_time, self.end_time
        )
        
        self.assertTrue(
            trimmed_traces_list[0]['startTmEpoch'] <= self.start_time)
        self.assertTrue(
            trimmed_traces_list[0]['endTmEpoch'] > self.start_time
        )
        trimmed_traces_list.pop(0)
        trimmed_traces_list.pop()
        is_left_trimmed = all(trace['startTmEpoch'] > self.start_time
                              for trace in trimmed_traces_list)
        is_right_trimmed = all(trace['endTmEpoch'] <= self.end_time
                               for trace in trimmed_traces_list)
        self.assertTrue(is_left_trimmed and is_right_trimmed)

    def test_data_out_of_range(self):
        with self.subTest('test_start_time_later_than_data_end_time'):
            self.start_time = 12500
            self.end_time = 17500
            self.assertFalse(
                trim_downsample_WFChan(self.channel_data, self.start_time,
                                       self.end_time, True)
            )
        with self.subTest('test_end_time_earlier_than_data_start_time'):
            self.start_time = -7500
            self.end_time = -2500
            self.assertFalse(
                trim_downsample_WFChan(self.channel_data, self.start_time,
                                       self.end_time, True)
            )

    def test_no_data(self):
        self.channel_data['tracesInfo'] = []
        with self.assertRaises(IndexError):
            trim_waveform_data(
                self.channel_data, self.start_time, self.end_time
            )

    def test_end_time_earlier_than_start_time(self):
        self.start_time, self.end_time = self.end_time, self.start_time
        trimmed_traces_list = trim_waveform_data(
            self.channel_data, self.start_time, self.end_time
        )
        self.assertListEqual(trimmed_traces_list, [])

    def test_data_does_not_need_to_be_trimmed(self):
        with self.subTest('test_start_time_earlier_than_trace_earliest_time'):
            self.start_time = -2500
            self.end_time = 7500
            trimmed_traces_list = trim_waveform_data(
                self.channel_data, self.start_time, self.end_time
            )
            self.assertEqual(len(trimmed_traces_list), 76)
        with self.subTest('test_end_time_later_than_trace_latest_time'):
            self.start_time = 2500
            self.end_time = 12500
            trimmed_traces_list = trim_waveform_data(
                self.channel_data, self.start_time, self.end_time
            )
            self.assertEqual(len(trimmed_traces_list), 75)
        with self.subTest('test_data_contained_in_time_range'):
            self.start_time = self.traces_info[0]['startTmEpoch']
            self.end_time = self.traces_info[-1]['endTmEpoch']
            trimmed_traces_list = trim_waveform_data(
                self.channel_data, self.start_time, self.end_time
            )
            self.assertEqual(len(trimmed_traces_list), len(self.traces_info))


class TestTrimDownsampleWfChan(TestCase):
    def no_file_memmap(self, file_path: Path, **kwargs):
        # Data will look the same as times. This has two benefits:
        # - It is a lot easier to inspect what data remains after trimming
        # and downsampling, seeing as the remaining data would be the same
        # as the remaining times.
        # - It is a lot easier to reproducibly create a test data set.
        array_size = 100
        file_idx = int(file_path.name.split('-')[-1])
        start = file_idx * array_size
        end = start + array_size
        return np.arange(start, end)

    def setUp(self) -> None:
        memmap_patcher = patch.object(np, 'memmap',
                                      side_effect=self.no_file_memmap)
        self.addCleanup(memmap_patcher.stop)
        memmap_patcher.start()

        self.channel_data = {}
        self.traces_info = []
        self.channel_data['tracesInfo'] = self.traces_info
        self.data_folder = TemporaryDirectory()
        for i in range(100):
            trace_size = 100
            start_time = i * trace_size
            trace = {}
            trace['startTmEpoch'] = start_time
            trace['endTmEpoch'] = start_time + trace_size - 1
            trace['size'] = trace_size

            times_file_name = Path(self.data_folder.name) / f'times-{i}'
            trace['times_f'] = times_file_name

            data_file_name = Path(self.data_folder.name) / f'data-{i}'
            trace['data_f'] = data_file_name

            self.traces_info.append(trace)
        self.start_time = 2500
        self.end_time = 7500

    def test_result_is_stored(self):
        trim_downsample_WFChan(self.channel_data, self.start_time,
                               self.end_time, True)
        self.assertTrue('times' in self.channel_data)
        self.assertGreater(len(self.channel_data['times']), 0)
        self.assertTrue('data' in self.channel_data)
        self.assertGreater(len(self.channel_data['data']), 0)

    @patch('sohstationviewer.model.handling_data.downsample', wraps=downsample)
    def test_data_is_downsampled(self, mock_downsample):
        const.CHAN_SIZE_LIMIT = 1000
        trim_downsample_WFChan(self.channel_data, self.start_time,
                               self.end_time, True)
        self.assertTrue(mock_downsample.called)
        const.CHAN_SIZE_LIMIT = ORIGINAL_CHAN_SIZE_LIMIT

    def test_data_small_enough_after_first_trim_flag_is_set(self):
        trim_downsample_WFChan(self.channel_data, self.start_time,
                               self.end_time, True)
        self.assertTrue('fulldata' in self.channel_data)

    def test_no_additional_work_if_data_small_enough_after_first_trim(self):
        trim_downsample_WFChan(self.channel_data, self.start_time,
                               self.end_time, True)
        current_times = self.channel_data['times']
        current_data = self.channel_data['data']
        trim_downsample_WFChan(self.channel_data, self.start_time,
                               self.end_time, True)
        self.assertIs(current_times, self.channel_data['times'])
        self.assertIs(current_data, self.channel_data['data'])

    def test_data_too_large_after_first_time(self):
        const.RECAL_SIZE_LIMIT = 1
        trim_downsample_WFChan(self.channel_data, self.start_time,
                               self.end_time, False)
        self.assertTrue('times' not in self.channel_data)
        self.assertTrue('data' not in self.channel_data)
        const.RECAL_SIZE_LIMIT = ORIGINAL_RECAL_SIZE_LIMIT
Kien Le's avatar
Kien Le committed


class TestGetTrimTpsData(TestCase):
    def no_file_memmap(self, file_path: Path, **kwargs):
        # Data will look the same as times. This has two benefits:
        # - It is a lot easier to inspect what data remains after trimming
        # and downsampling, seeing as the remaining data would be the same
        # as the remaining times.
        # - It is a lot easier to reproducibly create a test data set.
        file_idx = int(file_path.name.split('-')[-1])
        if file_idx < const.SEC_DAY:
            array_size = 100
            start = file_idx * array_size
            end = start + array_size
        else:
            array_size = const.SEC_5M * 10
            start = file_idx
            end = start + array_size
        return np.arange(start, end)

    def setUp(self) -> None:
        memmap_patcher = patch.object(np, 'memmap',
                                      side_effect=self.no_file_memmap)
        self.addCleanup(memmap_patcher.stop)
        memmap_patcher.start()

        self.channel_data = {'samplerate': 1}
        self.traces_info = []
        self.channel_data['tracesInfo'] = self.traces_info
        self.data_folder = TemporaryDirectory()
        for i in range(100):
            trace_size = 100
            start_time = i * trace_size
            trace = {}
            trace['startTmEpoch'] = start_time
            trace['endTmEpoch'] = start_time + trace_size - 1
            trace['size'] = trace_size

            times_file_name = Path(self.data_folder.name) / f'times-{i}'
            trace['times_f'] = times_file_name

            data_file_name = Path(self.data_folder.name) / f'data-{i}'
            trace['data_f'] = data_file_name

            self.traces_info.append(trace)

        data_start_time = 0
        data_end_time = 10000
        self.each_day_5_min_list = get_eachDay5MinList(data_start_time,
                                                       data_end_time)
        self.start_time = 2500
        self.end_time = 7500

    def test_data_out_of_range(self):
        with self.subTest('test_start_time_later_than_data_end_time'):
            self.start_time = 12500
            self.end_time = 17500
            self.assertFalse(
                get_trimTPSData(self.channel_data, self.start_time,
                                self.end_time, self.each_day_5_min_list)
            )
        with self.subTest('test_end_time_earlier_than_data_start_time'):
            self.start_time = -7500
            self.end_time = -2500
            self.assertFalse(
                get_trimTPSData(self.channel_data, self.start_time,
                                self.end_time, self.each_day_5_min_list)
            )

    def test_result_is_stored_one_day_of_data(self):
        num_day = 1
        get_trimTPSData(self.channel_data, self.start_time,
                        self.end_time, self.each_day_5_min_list)
        self.assertTrue('tps_data' in self.channel_data)
        self.assertEqual(len(self.channel_data['tps_data']), num_day)

    def test_result_is_stored_multiple_days_of_data(self):
        second_day_data_start_time = int(
            UTCDateTime(1970, 1, 2, 18, 0, 0).timestamp
        )
        self.end_time = UTCDateTime(1970, 1, 2, 6, 0, 0).timestamp

        trace_size = const.SEC_5M * 10
        trace = {}
        trace['startTmEpoch'] = second_day_data_start_time
        trace['endTmEpoch'] = second_day_data_start_time + trace_size - 1
        trace['size'] = trace_size

        times_file_name = Path(self.data_folder.name).joinpath(
            f'times-{second_day_data_start_time}'
        )
        trace['times_f'] = times_file_name

        data_file_name = Path(self.data_folder.name).joinpath(
            f'data-{second_day_data_start_time}'
        )
        trace['data_f'] = data_file_name

        self.traces_info.append(trace)

        self.each_day_5_min_list = get_eachDay5MinList(0, trace['endTmEpoch'])

        num_day = 2
        get_trimTPSData(self.channel_data, self.start_time,
                        self.end_time, self.each_day_5_min_list)
        self.assertTrue('tps_data' in self.channel_data)
        self.assertEqual(len(self.channel_data['tps_data']), num_day)

    def test_data_has_gaps(self):
        third_day_data_start_time = int(
            UTCDateTime(1970, 1, 3, 18, 0, 0).timestamp
        )
        self.end_time = UTCDateTime(1970, 1, 2, 6, 0, 0).timestamp

        trace_size = const.SEC_5M * 10
        trace = {}
        trace['startTmEpoch'] = third_day_data_start_time
        trace['endTmEpoch'] = third_day_data_start_time + trace_size - 1
        trace['size'] = trace_size

        times_file_name = Path(self.data_folder.name).joinpath(
            f'times-{third_day_data_start_time}'
        )
        trace['times_f'] = times_file_name

        data_file_name = Path(self.data_folder.name).joinpath(
            f'data-{third_day_data_start_time}'
        )
        trace['data_f'] = data_file_name

        self.traces_info.append(trace)

        self.each_day_5_min_list = get_eachDay5MinList(0, trace['endTmEpoch'])

        num_day = 3
        get_trimTPSData(self.channel_data, self.start_time,
                        self.end_time, self.each_day_5_min_list)
        self.assertTrue('tps_data' in self.channel_data)
        self.assertEqual(len(self.channel_data['tps_data']), num_day)