Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest import TestCase
from unittest.mock import patch
import numpy as np
from sohstationviewer.conf import constants as const
from sohstationviewer.model.handling_data import (
downsample,
trim_downsample_WFChan,
)
ORIGINAL_CHAN_SIZE_LIMIT = const.CHAN_SIZE_LIMIT
ORIGINAL_RECAL_SIZE_LIMIT = const.RECAL_SIZE_LIMIT
class TestTrimDownsampleWfChan(TestCase):
def no_file_memmap(self, file_path: Path, **kwargs):
# Data will look the same as times. This has two benefits:
# - It is a lot easier to inspect what data remains after trimming
# and downsampling, seeing as the remaining data would be the same
# as the remaining times.
# - It is a lot easier to reproducibly create a test data set.
array_size = 100
file_idx = int(file_path.name.split('-')[-1])
start = file_idx * array_size
end = start + array_size
return np.arange(start, end)
def setUp(self) -> None:
memmap_patcher = patch.object(np, 'memmap',
side_effect=self.no_file_memmap)
self.addCleanup(memmap_patcher.stop)
memmap_patcher.start()
self.channel_data = {}
self.traces_info = []
self.channel_data['tracesInfo'] = self.traces_info
self.data_folder = TemporaryDirectory()
for i in range(100):
trace_size = 100
start_time = i * trace_size
trace = {}
trace['startTmEpoch'] = start_time
trace['endTmEpoch'] = start_time + trace_size - 1
trace['size'] = trace_size
times_file_name = Path(self.data_folder.name) / f'times-{i}'
trace['times_f'] = times_file_name
data_file_name = Path(self.data_folder.name) / f'data-{i}'
trace['data_f'] = data_file_name
self.traces_info.append(trace)
self.start_time = 2500
self.end_time = 7500
def test_data_out_of_range(self):
with self.subTest('test_start_time_later_than_data_end_time'):
self.start_time = 12500
self.end_time = 17500
self.assertFalse(
trim_downsample_WFChan(self.channel_data, self.start_time,
self.end_time, True)
)
with self.subTest('test_end_time_earlier_than_data_start_time'):
self.start_time = -7500
self.end_time = -2500
self.assertFalse(
trim_downsample_WFChan(self.channel_data, self.start_time,
self.end_time, True)
)
def test_result_is_stored(self):
trim_downsample_WFChan(self.channel_data, self.start_time,
self.end_time, True)
self.assertTrue('times' in self.channel_data)
self.assertGreater(len(self.channel_data['times']), 0)
self.assertTrue('data' in self.channel_data)
self.assertGreater(len(self.channel_data['data']), 0)
def test_data_is_trimmed(self):
trim_downsample_WFChan(self.channel_data, self.start_time,
self.end_time, True)
is_times_trimmed = ((self.channel_data['times'] >= self.start_time) &
(self.channel_data['times'] <= self.end_time))
self.assertTrue(is_times_trimmed.all())
is_data_trimmed = ((self.channel_data['data'] >= self.start_time) &
(self.channel_data['data'] <= self.end_time))
self.assertTrue(is_data_trimmed.all())
@patch('sohstationviewer.model.handling_data.downsample', wraps=downsample)
def test_data_is_downsampled(self, mock_downsample):
const.CHAN_SIZE_LIMIT = 1000
trim_downsample_WFChan(self.channel_data, self.start_time,
self.end_time, True)
self.assertTrue(mock_downsample.called)
const.CHAN_SIZE_LIMIT = ORIGINAL_CHAN_SIZE_LIMIT
def test_data_small_enough_after_first_trim_flag_is_set(self):
trim_downsample_WFChan(self.channel_data, self.start_time,
self.end_time, True)
self.assertTrue('fulldata' in self.channel_data)
def test_no_additional_work_if_data_small_enough_after_first_trim(self):
trim_downsample_WFChan(self.channel_data, self.start_time,
self.end_time, True)
current_times = self.channel_data['times']
current_data = self.channel_data['data']
trim_downsample_WFChan(self.channel_data, self.start_time,
self.end_time, True)
self.assertIs(current_times, self.channel_data['times'])
self.assertIs(current_data, self.channel_data['data'])
def test_data_too_large_after_first_time(self):
const.RECAL_SIZE_LIMIT = 1
trim_downsample_WFChan(self.channel_data, self.start_time,
self.end_time, False)
self.assertTrue('times' not in self.channel_data)
self.assertTrue('data' not in self.channel_data)
const.RECAL_SIZE_LIMIT = ORIGINAL_RECAL_SIZE_LIMIT