Skip to content
Snippets Groups Projects
test_handling_data.py 5.09 KiB
Newer Older
from pathlib import Path
from tempfile import TemporaryDirectory

from unittest import TestCase
from unittest.mock import patch

import numpy as np

from sohstationviewer.conf import constants as const
from sohstationviewer.model.handling_data import (
    downsample,
    trim_downsample_WFChan,
)

ORIGINAL_CHAN_SIZE_LIMIT = const.CHAN_SIZE_LIMIT
ORIGINAL_RECAL_SIZE_LIMIT = const.RECAL_SIZE_LIMIT


class TestTrimDownsampleWfChan(TestCase):
    def no_file_memmap(self, file_path: Path, **kwargs):
        # Data will look the same as times. This has two benefits:
        # - It is a lot easier to inspect what data remains after trimming
        # and downsampling, seeing as the remaining data would be the same
        # as the remaining times.
        # - It is a lot easier to reproducibly create a test data set.
        array_size = 100
        file_idx = int(file_path.name.split('-')[-1])
        start = file_idx * array_size
        end = start + array_size
        return np.arange(start, end)

    def setUp(self) -> None:
        memmap_patcher = patch.object(np, 'memmap',
                                      side_effect=self.no_file_memmap)
        self.addCleanup(memmap_patcher.stop)
        memmap_patcher.start()

        self.channel_data = {}
        self.traces_info = []
        self.channel_data['tracesInfo'] = self.traces_info
        self.data_folder = TemporaryDirectory()
        for i in range(100):
            trace_size = 100
            start_time = i * trace_size
            trace = {}
            trace['startTmEpoch'] = start_time
            trace['endTmEpoch'] = start_time + trace_size - 1
            trace['size'] = trace_size

            times_file_name = Path(self.data_folder.name) / f'times-{i}'
            trace['times_f'] = times_file_name

            data_file_name = Path(self.data_folder.name) / f'data-{i}'
            trace['data_f'] = data_file_name

            self.traces_info.append(trace)
        self.start_time = 2500
        self.end_time = 7500

    def test_data_out_of_range(self):
        with self.subTest('test_start_time_later_than_data_end_time'):
            self.start_time = 12500
            self.end_time = 17500
            self.assertFalse(
                trim_downsample_WFChan(self.channel_data, self.start_time,
                                       self.end_time, True)
            )
        with self.subTest('test_end_time_earlier_than_data_start_time'):
            self.start_time = -7500
            self.end_time = -2500
            self.assertFalse(
                trim_downsample_WFChan(self.channel_data, self.start_time,
                                       self.end_time, True)
            )

    def test_result_is_stored(self):
        trim_downsample_WFChan(self.channel_data, self.start_time,
                               self.end_time, True)
        self.assertTrue('times' in self.channel_data)
        self.assertGreater(len(self.channel_data['times']), 0)
        self.assertTrue('data' in self.channel_data)
        self.assertGreater(len(self.channel_data['data']), 0)

    def test_data_is_trimmed(self):
        trim_downsample_WFChan(self.channel_data, self.start_time,
                               self.end_time, True)
        is_times_trimmed = ((self.channel_data['times'] >= self.start_time) &
                            (self.channel_data['times'] <= self.end_time))
        self.assertTrue(is_times_trimmed.all())
        is_data_trimmed = ((self.channel_data['data'] >= self.start_time) &
                           (self.channel_data['data'] <= self.end_time))
        self.assertTrue(is_data_trimmed.all())

    @patch('sohstationviewer.model.handling_data.downsample', wraps=downsample)
    def test_data_is_downsampled(self, mock_downsample):
        const.CHAN_SIZE_LIMIT = 1000
        trim_downsample_WFChan(self.channel_data, self.start_time,
                               self.end_time, True)
        self.assertTrue(mock_downsample.called)
        const.CHAN_SIZE_LIMIT = ORIGINAL_CHAN_SIZE_LIMIT

    def test_data_small_enough_after_first_trim_flag_is_set(self):
        trim_downsample_WFChan(self.channel_data, self.start_time,
                               self.end_time, True)
        self.assertTrue('fulldata' in self.channel_data)

    def test_no_additional_work_if_data_small_enough_after_first_trim(self):
        trim_downsample_WFChan(self.channel_data, self.start_time,
                               self.end_time, True)
        current_times = self.channel_data['times']
        current_data = self.channel_data['data']
        trim_downsample_WFChan(self.channel_data, self.start_time,
                               self.end_time, True)
        self.assertIs(current_times, self.channel_data['times'])
        self.assertIs(current_data, self.channel_data['data'])

    def test_data_too_large_after_first_time(self):
        const.RECAL_SIZE_LIMIT = 1
        trim_downsample_WFChan(self.channel_data, self.start_time,
                               self.end_time, False)
        self.assertTrue('times' not in self.channel_data)
        self.assertTrue('data' not in self.channel_data)
        const.RECAL_SIZE_LIMIT = ORIGINAL_RECAL_SIZE_LIMIT