Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
Showing
with 1327 additions and 400 deletions
import numpy as np
from unittest import TestCase
from unittest.mock import patch
from sohstationviewer.model.general_data.general_data_helper import (
_check_related_gaps, squash_gaps, sort_data,
retrieve_data_time_from_data_dict, retrieve_gaps_from_data_dict,
combine_data, apply_convert_factor_to_data_dict
)
class TestCheckRelatedGaps(TestCase):
# FROM test_handling_data_rearrange_data.TestCheckRelatedGaps
@classmethod
def setUpClass(cls) -> None:
cls.checked_indexes = []
def test_minmax1_inside_minmax2(self):
self.assertTrue(
_check_related_gaps(3, 4, 1, 5, 1, self.checked_indexes))
self.assertIn(1, self.checked_indexes)
def test_minmax2_inside_minmax1(self):
self.assertTrue(
_check_related_gaps(1, 5, 3, 4, 2, self.checked_indexes))
self.assertIn(2, self.checked_indexes)
def end_minmax1_overlap_start_minmax(self):
self.assertTrue(
_check_related_gaps(1, 4, 3, 5, 3, self.checked_indexes))
self.assertIn(3, self.checked_indexes)
def end_minmax2_overlap_start_minmax1(self):
self.assertTrue(
_check_related_gaps(3, 5, 1, 4, 4, self.checked_indexes))
self.assertIn(4, self.checked_indexes)
def minmax1_less_than_minmax2(self):
self.assertFalse(
_check_related_gaps(1, 3, 4, 6, 5, self.checked_indexes))
self.assertNotIn(5, self.checked_indexes, )
def minmax1_greater_than_minmax2(self):
self.assertFalse(
_check_related_gaps(6, 6, 1, 3, 5, self.checked_indexes))
self.assertEqual(5, self.checked_indexes)
class TestSquashGaps(TestCase):
# FROM test_handling_data_rearrange_data.TestSquashGaps
def setUp(self) -> None:
self.normal_gaps = [[4, 7], [4, 6], [5, 6], [3, 7], [5, 8]]
self.overlap_gaps = [[17, 14], [16, 14], [16, 15], [17, 13], [18, 15]]
self.mixed_gaps = []
for i in range(len(self.normal_gaps)):
self.mixed_gaps.append(self.normal_gaps[i])
self.mixed_gaps.append(self.overlap_gaps[i])
def test_normal_gaps(self):
gaps = squash_gaps(self.normal_gaps)
self.assertEqual(gaps, [[3, 8]])
def test_overlap_gaps(self):
gaps = squash_gaps(self.overlap_gaps)
self.assertEqual(gaps, [[18, 13]])
def test_mixed_gaps(self):
gaps = squash_gaps((self.mixed_gaps))
self.assertEqual(gaps, [[3, 8], [18, 13]])
class TestSortData(TestCase):
# FROM test_handling_data_rearrange_data.TestSortData
def setUp(self) -> None:
self.station_data_dict = {
'CH1': {'tracesInfo': [{'startTmEpoch': 7},
{'startTmEpoch': 1},
{'startTmEpoch': 5},
{'startTmEpoch': 3}]},
'CH2': {'tracesInfo': [{'startTmEpoch': 2},
{'startTmEpoch': 8},
{'startTmEpoch': 6},
{'startTmEpoch': 4}]}
}
def test_sort_data(self):
sort_data(self.station_data_dict)
self.assertEqual(
self.station_data_dict,
{'CH1': {'tracesInfo': [{'startTmEpoch': 1}, {'startTmEpoch': 3},
{'startTmEpoch': 5}, {'startTmEpoch': 7}]},
'CH2': {'tracesInfo': [{'startTmEpoch': 2}, {'startTmEpoch': 4},
{'startTmEpoch': 6}, {'startTmEpoch': 8}]}}
)
class TestRetrieveDataTimeFromDataDict(TestCase):
def setUp(self) -> None:
self.data_dict = {
'STA1': {'CH1': {'startTmEpoch': 4, 'endTmEpoch': 6},
'CH2': {'startTmEpoch': 5, 'endTmEpoch': 9}
},
'STA2': {'CH1': {'startTmEpoch': 2, 'endTmEpoch': 4},
'CH2': {'startTmEpoch': 6, 'endTmEpoch': 8}
}
}
self.data_time = {}
self.expected_data_time = {'STA1': [4, 9], 'STA2': [2, 8]}
def test_retrieve_data_time(self):
retrieve_data_time_from_data_dict(
'STA1', self.data_dict, self.data_time)
self.assertEqual(self.data_time,
{'STA1': self.expected_data_time['STA1']})
retrieve_data_time_from_data_dict(
'STA2', self.data_dict, self.data_time)
self.assertEqual(self.data_time,
self.expected_data_time)
class TestRetrieveGapsFromDataDict(TestCase):
def setUp(self) -> None:
self.data_dict = {
'STA1': {'CH1': {'gaps': [[1, 2], [4, 3]]},
'CH2': {'gaps': []}
},
'STA2': {'CH1': {'gaps': [[1, 2], [4, 3], [2, 3]]},
'CH2': {'gaps': [[1, 3], [3, 2]]}
},
}
self.gaps = {}
self.expected_gaps = {'STA1': [[1, 2], [4, 3]],
'STA2': [[1, 2], [4, 3], [2, 3], [1, 3], [3, 2]]}
def test_retrieve_gaps(self):
self.gaps['STA1'] = []
retrieve_gaps_from_data_dict('STA1', self.data_dict, self.gaps)
self.assertEqual(self.gaps,
{'STA1': self.expected_gaps['STA1']})
self.gaps['STA2'] = []
retrieve_gaps_from_data_dict('STA2', self.data_dict, self.gaps)
self.assertEqual(self.gaps,
self.expected_gaps)
class TestCombineData(TestCase):
def test_overlap_lt_gap_minimum(self):
# combine; not add to gap list
data_dict = {'STA1': {
'CH1': {
'gaps': [],
'tracesInfo': [
{'startTmEpoch': 5,
'endTmEpoch': 15,
'data': [1, 2, 2, -1],
'times': [5, 8, 11, 15]},
{'startTmEpoch': 13, # delta = 2 < 10
'endTmEpoch': 20,
'data': [1, -2, 1, 1],
'times': [13, 16, 18, 20]}
]}
}}
gap_minimum = 10
combine_data('STA1', data_dict, gap_minimum)
self.assertEqual(data_dict['STA1']['CH1']['gaps'], [])
self.assertEqual(
len(data_dict['STA1']['CH1']['tracesInfo']),
1)
self.assertEqual(
data_dict['STA1']['CH1']['tracesInfo'][0]['startTmEpoch'],
5)
self.assertEqual(
data_dict['STA1']['CH1']['tracesInfo'][0]['endTmEpoch'],
20)
self.assertListEqual(
data_dict['STA1']['CH1']['tracesInfo'][0]['data'].tolist(),
[1, 2, 2, -1, 1, -2, 1, 1])
self.assertListEqual(
data_dict['STA1']['CH1']['tracesInfo'][0]['times'].tolist(),
[5, 8, 11, 15, 13, 16, 18, 20])
def test_overlap_gt_or_equal_gap_minimum(self):
# combine; add to gap list
data_dict = {'STA1': {
'CH1': {
'gaps': [],
'tracesInfo': [
{'startTmEpoch': 5,
'endTmEpoch': 15,
'data': [1, 2, 2, -1],
'times': [5, 8, 11, 15]},
{'startTmEpoch': 5, # delta = 10 >= 10
'endTmEpoch': 20,
'data': [1, -2, 1, 1],
'times': [5, 11, 15, 20]}
]}
}}
gap_minimum = 10
combine_data('STA1', data_dict, gap_minimum)
self.assertEqual(data_dict['STA1']['CH1']['gaps'], [[15, 5]])
self.assertEqual(
len(data_dict['STA1']['CH1']['tracesInfo']),
1)
self.assertEqual(
data_dict['STA1']['CH1']['tracesInfo'][0]['startTmEpoch'],
5)
self.assertEqual(
data_dict['STA1']['CH1']['tracesInfo'][0]['endTmEpoch'],
20)
self.assertListEqual(
data_dict['STA1']['CH1']['tracesInfo'][0]['data'].tolist(),
[1, 2, 2, -1, 1, -2, 1, 1])
self.assertListEqual(
data_dict['STA1']['CH1']['tracesInfo'][0]['times'].tolist(),
[5, 8, 11, 15, 5, 11, 15, 20])
def test_lt_gap_minimum(self):
# not combine; not add to gap list
data_dict = {'STA1': {
'CH1': {
'gaps': [],
'tracesInfo': [
{'startTmEpoch': 5,
'endTmEpoch': 15,
'data': [1, 2, 2, -1],
'times': [5, 8, 11, 15]},
{'startTmEpoch': 22, # delta = 7 > 6, < 10
'endTmEpoch': 34,
'data': [1, -2, 1, 1],
'times': [22, 26, 30, 34]}
]}
}}
gap_minimum = 10
combine_data('STA1', data_dict, gap_minimum)
self.assertEqual(data_dict['STA1']['CH1']['gaps'], [])
self.assertEqual(
data_dict['STA1']['CH1']['tracesInfo'][0]['startTmEpoch'],
5)
self.assertEqual(
data_dict['STA1']['CH1']['tracesInfo'][0]['endTmEpoch'],
34)
self.assertListEqual(
data_dict['STA1']['CH1']['tracesInfo'][0]['data'].tolist(),
[1, 2, 2, -1, 1, -2, 1, 1])
self.assertListEqual(
data_dict['STA1']['CH1']['tracesInfo'][0]['times'].tolist(),
[5, 8, 11, 15, 22, 26, 30, 34])
def test_gap_gt_or_equal_gap_minimum(self):
# not combine; add to gap list
data_dict = {'STA1': {
'CH1': {
'gaps': [],
'tracesInfo': [
{'startTmEpoch': 5,
'endTmEpoch': 15,
'data': [1, 2, 2, -1],
'times': [5, 8, 11, 15]},
{'startTmEpoch': 25, # delta = 10 >= 10
'endTmEpoch': 40,
'data': [1, -2, 1, 1],
'times': [25, 29, 33, 36, 40]}
]}
}}
gap_minimum = 10
combine_data('STA1', data_dict, gap_minimum)
self.assertEqual(data_dict['STA1']['CH1']['gaps'], [[15, 25]])
self.assertEqual(
data_dict['STA1']['CH1']['tracesInfo'][0]['startTmEpoch'],
5)
self.assertEqual(
data_dict['STA1']['CH1']['tracesInfo'][0]['endTmEpoch'],
40)
self.assertListEqual(
data_dict['STA1']['CH1']['tracesInfo'][0]['data'].tolist(),
[1, 2, 2, -1, 1, -2, 1, 1])
self.assertListEqual(
data_dict['STA1']['CH1']['tracesInfo'][0]['times'].tolist(),
[5, 8, 11, 15, 25, 29, 33, 36, 40])
class TestApplyConvertFactorToDataDict(TestCase):
def setUp(self) -> None:
self.data_dict = {
'STA1': {
'CH1': {'tracesInfo': [{'data': np.array([1, 2, 2, -1])}]}
}
}
self.expected_data = [0.1, 0.2, 0.2, -0.1]
@patch('sohstationviewer.model.general_data.general_data_helper.'
'get_convert_factor')
def test_convert_factor(self, mock_get_convert_factor):
mock_get_convert_factor.return_value = 0.1
apply_convert_factor_to_data_dict('STA1', self.data_dict, 'Q330')
self.assertEqual(
self.data_dict['STA1']['CH1']['tracesInfo'][0]['data'].tolist(),
self.expected_data)
from unittest import TestCase
from pathlib import Path
from sohstationviewer.model.mseed_data.mseed import MSeed
from sohstationviewer.model.general_data.general_data import \
ProcessingDataError
TEST_DATA_DIR = Path(__file__).resolve().parent.parent.parent.joinpath(
'test_data')
pegasus_data = TEST_DATA_DIR.joinpath("Pegasus-sample")
q330_data = TEST_DATA_DIR.joinpath("Q330-sample")
blockettes_data = TEST_DATA_DIR.joinpath("Q330_unimplemented_ascii_block")
multiplex_data = TEST_DATA_DIR.joinpath("Q330_multiplex")
centaur_data = TEST_DATA_DIR.joinpath("Centaur-sample")
class TestMSeed(TestCase):
def test_path_not_exist(self):
# raise exception when path not exist
args = {
'data_type': 'Q330',
'is_multiplex': False,
'folder': '_',
'on_unittest': True
}
with self.assertRaises(ProcessingDataError) as context:
MSeed(**args)
self.assertEqual(
str(context.exception),
"Path '_' not exist"
)
def test_read_text_only(self):
# There is no station recognized, add text to key 'TEXT' in log_data
args = {
'data_type': 'Pegasus',
'is_multiplex': False,
'folder': pegasus_data,
'req_soh_chans': ['_'],
'on_unittest': True
}
obj = MSeed(**args)
self.assertEqual(list(obj.log_data.keys()), ['TEXT'])
self.assertEqual(len(obj.log_data['TEXT']), 2)
self.assertEqual(
obj.log_data['TEXT'][0][:100],
'\n\n** STATE OF HEALTH: XX.KC01...D.2020.130'
'\n2020-05-09 00:00:09.839 UTC: I(TimingThread): timing unce')
self.assertEqual(
obj.log_data['TEXT'][1][:100],
'\n\n** STATE OF HEALTH: XX.KC01...D.2020.129'
'\n2020-05-08 22:55:45.390 UTC: I(Initializations): Firmware')
def test_read_text_with_soh(self):
# text get station from soh data with TXT as channel to add to log_data
args = {
'data_type': 'Pegasus',
'is_multiplex': False,
'folder': pegasus_data,
'req_soh_chans': ['VE1'],
'on_unittest': True
}
obj = MSeed(**args)
self.assertEqual(list(obj.log_data.keys()), ['TEXT', 'KC01'])
self.assertEqual(len(obj.log_data['TEXT']), 0)
self.assertEqual(list(obj.log_data['KC01'].keys()), ['TXT'])
self.assertEqual(len(obj.log_data['KC01']['TXT']), 2)
self.assertEqual(
obj.log_data['KC01']['TXT'][0][:100],
'\n\n** STATE OF HEALTH: XX.KC01...D.2020.130'
'\n2020-05-09 00:00:09.839 UTC: I(TimingThread): timing unce')
self.assertEqual(
obj.log_data['KC01']['TXT'][1][:100],
'\n\n** STATE OF HEALTH: XX.KC01...D.2020.129'
'\n2020-05-08 22:55:45.390 UTC: I(Initializations): Firmware')
def test_read_text_with_waveform(self):
# text get station from waveform data with TXT as channel to add to
# log_data
args = {
'data_type': 'Pegasus',
'is_multiplex': False,
'folder': pegasus_data,
'req_wf_chans': ['HH1'],
'req_soh_chans': ['_'],
'on_unittest': True
}
obj = MSeed(**args)
self.assertEqual(list(obj.log_data.keys()), ['TEXT', 'KC01'])
self.assertEqual(len(obj.log_data['TEXT']), 0)
self.assertEqual(list(obj.log_data['KC01'].keys()), ['TXT'])
self.assertEqual(len(obj.log_data['KC01']['TXT']), 2)
self.assertEqual(
obj.log_data['KC01']['TXT'][0][:100],
'\n\n** STATE OF HEALTH: XX.KC01...D.2020.130'
'\n2020-05-09 00:00:09.839 UTC: I(TimingThread): timing unce')
self.assertEqual(
obj.log_data['KC01']['TXT'][1][:100],
'\n\n** STATE OF HEALTH: XX.KC01...D.2020.129'
'\n2020-05-08 22:55:45.390 UTC: I(Initializations): Firmware')
def test_read_ascii(self):
# info is text wrapped in mseed format
args = {
'data_type': 'Q330',
'is_multiplex': False,
'folder': q330_data,
'req_soh_chans': ['LOG'],
}
obj = MSeed(**args)
self.assertEqual(list(obj.log_data.keys()), ['TEXT', 'AX08'])
self.assertEqual(list(obj.log_data['AX08'].keys()), ['LOG'])
self.assertEqual(obj.log_data['TEXT'], [])
self.assertEqual(len(obj.log_data['AX08']['LOG']), 16)
self.assertEqual(
obj.log_data['AX08']['LOG'][0][:100],
'\n\nSTATE OF HEALTH: From:1625456260.12 To:1625456260.12\n\r'
'\nQuanterra Packet Baler Model 14 Restart. V'
)
self.assertEqual(
obj.log_data['AX08']['LOG'][1][:100],
'\n\nSTATE OF HEALTH: From:1625456366.62 To:1625456366.62'
'\nReducing Status Polling Interval\r\n[2021-07-0'
)
def test_read_blockettes_info(self):
# info in blockette 500
args = {
'data_type': 'Q330',
'is_multiplex': True,
'folder': blockettes_data,
'req_soh_chans': ['ACE'],
}
obj = MSeed(**args)
self.assertEqual(list(obj.log_data.keys()), ['TEXT', '3203'])
self.assertEqual(list(obj.log_data['3203'].keys()), ['ACE'])
self.assertEqual(obj.log_data['TEXT'], [])
self.assertEqual(len(obj.log_data['3203']['ACE']), 1)
self.assertEqual(
obj.log_data['3203']['ACE'][0][:100],
'\n\nSTATE OF HEALTH: From:1671729287.00014 To:1671729287.0'
'\n===========\nVCO correction: 53.7109375\nTim'
)
def test_not_is_multiplex_read_channel(self):
# is_multiplex = False => stop when reach to channel not match req
# so the channel 'EL1' is read but not finished
args = {
'data_type': 'Q330',
'is_multiplex': False,
'folder': multiplex_data,
'req_soh_chans': [],
'req_wf_chans': ['EL1']
}
obj = MSeed(**args)
self.assertEqual(list(obj.waveform_data.keys()), ['3203'])
self.assertEqual(list(obj.waveform_data['3203'].keys()), ['EL1'])
self.assertEqual(obj.waveform_data['3203']['EL1']['samplerate'], 200)
self.assertEqual(obj.waveform_data['3203']['EL1']['startTmEpoch'],
1671730004.145029)
self.assertEqual(obj.waveform_data['3203']['EL1']['endTmEpoch'],
1671730013.805)
self.assertEqual(obj.waveform_data['3203']['EL1']['size'], 1932)
self.assertEqual(obj.waveform_data['3203']['EL1']['gaps'], [])
self.assertEqual(len(obj.waveform_data['3203']['EL1']['tracesInfo']),
1)
def test_is_multiplex_read_channel(self):
# is_multiplex = True => read every record
args = {
'data_type': 'Q330',
'is_multiplex': True,
'folder': multiplex_data,
'req_soh_chans': [],
'req_wf_chans': ['EL1']
}
obj = MSeed(**args)
self.assertEqual(list(obj.waveform_data.keys()), ['3203'])
self.assertEqual(list(obj.waveform_data['3203'].keys()), ['EL1'])
self.assertEqual(obj.waveform_data['3203']['EL1']['samplerate'], 200)
self.assertEqual(obj.waveform_data['3203']['EL1']['startTmEpoch'],
1671730004.145029)
self.assertEqual(obj.waveform_data['3203']['EL1']['endTmEpoch'],
1671730720.4348998)
self.assertEqual(obj.waveform_data['3203']['EL1']['size'], 143258)
self.assertEqual(obj.waveform_data['3203']['EL1']['gaps'], [])
self.assertEqual(len(obj.waveform_data['3203']['EL1']['tracesInfo']),
1)
def test_not_is_multiplex_selected_channel_in_middle(self):
# won't reached selected channel because previous record doesn't meet
# requirement when is_multiplex = False
args = {
'data_type': 'Q330',
'is_multiplex': False,
'folder': multiplex_data,
'req_soh_chans': [],
'req_wf_chans': ['EL2']
}
obj = MSeed(**args)
self.assertEqual(list(obj.waveform_data.keys()), [])
def test_is_multiplex_selected_channel_in_middle(self):
# is_multiplex = True => the selected channel will be read
args = {
'data_type': 'Q330',
'is_multiplex': True,
'folder': multiplex_data,
'req_soh_chans': [],
'req_wf_chans': ['EL2']
}
obj = MSeed(**args)
self.assertEqual(list(obj.waveform_data.keys()), ['3203'])
self.assertEqual(list(obj.waveform_data['3203'].keys()), ['EL2'])
self.assertEqual(obj.waveform_data['3203']['EL2']['samplerate'], 200)
self.assertEqual(obj.waveform_data['3203']['EL2']['startTmEpoch'],
1671730004.3100293)
self.assertEqual(obj.waveform_data['3203']['EL2']['endTmEpoch'],
1671730720.5549)
self.assertEqual(obj.waveform_data['3203']['EL2']['size'], 143249)
self.assertEqual(obj.waveform_data['3203']['EL2']['gaps'], [])
self.assertEqual(len(obj.waveform_data['3203']['EL2']['tracesInfo']),
1)
def test_existing_time_range(self):
# check if data_time is from the given range, end time may get
# a little greater than read_end according to record's end time
args = {
'data_type': 'Q330',
'is_multiplex': False,
'folder': q330_data,
'req_soh_chans': [],
'read_start': 1625456018.0,
'read_end': 1625505627.9998999
}
obj = MSeed(**args)
self.assertEqual(obj.keys, ['AX08'])
self.assertEqual(list(obj.soh_data['AX08'].keys()), ['VKI'])
self.assertEqual(list(obj.mass_pos_data['AX08'].keys()), [])
self.assertEqual(list(obj.waveform_data['AX08'].keys()), [])
self.assertEqual(obj.data_time['AX08'], [1625446018.0, 1625510338.0])
def test_non_existing_time_range(self):
# if given time range out of the data time, no station will be created
args = {
'data_type': 'Q330',
'is_multiplex': False,
'folder': q330_data,
'req_soh_chans': [],
'read_start': 1625356018.0,
'read_end': 1625405627.9998999
}
obj = MSeed(**args)
self.assertEqual(obj.keys, [])
self.assertEqual(obj.soh_data, {})
self.assertEqual(obj.mass_pos_data, {})
self.assertEqual(obj.waveform_data, {})
self.assertEqual(obj.data_time, {})
def test_read_waveform(self):
# data from tps similar to waveform but not separated at gaps
args = {
'data_type': 'Q330',
'is_multiplex': False,
'folder': q330_data,
'req_soh_chans': [],
'req_wf_chans': ['LHE']
}
obj = MSeed(**args)
self.assertEqual(list(obj.waveform_data.keys()), ['AX08'])
self.assertEqual(list(obj.waveform_data['AX08'].keys()), ['LHE'])
self.assertEqual(obj.waveform_data['AX08']['LHE']['samplerate'], 1)
self.assertEqual(obj.waveform_data['AX08']['LHE']['startTmEpoch'],
1625445156.000001)
self.assertEqual(obj.waveform_data['AX08']['LHE']['endTmEpoch'],
1625532950.0)
self.assertEqual(obj.waveform_data['AX08']['LHE']['size'], 87794)
self.assertEqual(obj.waveform_data['AX08']['LHE']['gaps'], [])
self.assertEqual(len(obj.waveform_data['AX08']['LHE']['tracesInfo']),
1)
def test_read_mass_pos_channel(self):
# mass position channels will be read if one or both include_mpxxxxxx
# are True
args = {
'data_type': 'Q330',
'is_multiplex': True,
'folder': q330_data,
'req_soh_chans': [],
'req_wf_chans': [],
'include_mp123zne': True
}
obj = MSeed(**args)
self.assertEqual(list(obj.mass_pos_data.keys()), ['AX08'])
self.assertEqual(list(obj.mass_pos_data['AX08'].keys()), ['VM1'])
self.assertEqual(obj.mass_pos_data['AX08']['VM1']['samplerate'], 0.1)
self.assertEqual(obj.mass_pos_data['AX08']['VM1']['startTmEpoch'],
1625444970.0)
self.assertEqual(obj.mass_pos_data['AX08']['VM1']['endTmEpoch'],
1625574580.0)
self.assertEqual(obj.mass_pos_data['AX08']['VM1']['size'], 12961)
self.assertEqual(obj.mass_pos_data['AX08']['VM1']['gaps'], [])
self.assertEqual(len(obj.mass_pos_data['AX08']['VM1']['tracesInfo']),
1)
def test_gap(self):
# gaps will be detected when gap_minimum is set
args = {
'data_type': 'Centaur',
'is_multiplex': True,
'folder': centaur_data,
'req_soh_chans': [],
'gap_minimum': 60
}
obj = MSeed(**args)
self.assertEqual(list(obj.soh_data.keys()), ['3734'])
self.assertEqual(sorted(list(obj.soh_data['3734'].keys())),
['EX1', 'EX2', 'EX3', 'GAN', 'GEL', 'GLA', 'GLO',
'GNS', 'GPL', 'GST', 'LCE', 'LCQ', 'VCO', 'VDT',
'VEC', 'VEI', 'VPB'])
self.assertAlmostEqual(obj.soh_data['3734']['EX1']['samplerate'],
0.0166, 3)
self.assertEqual(obj.soh_data['3734']['EX1']['startTmEpoch'],
1534512840.0)
self.assertEqual(obj.soh_data['3734']['EX1']['endTmEpoch'],
1534550400.0)
self.assertEqual(obj.soh_data['3734']['EX1']['size'], 597)
self.assertEqual(obj.gaps['3734'], [[1534521420.0, 1534524000.0]])
def test_not_detect_gap(self):
# if gap_minimum isn't set but gap exist, data still be separated, but
# gap won't be added to gap list
args = {
'data_type': 'Centaur',
'is_multiplex': True,
'folder': centaur_data,
'req_soh_chans': [],
'gap_minimum': None
}
obj = MSeed(**args)
self.assertEqual(list(obj.soh_data.keys()), ['3734'])
self.assertEqual(sorted(list(obj.soh_data['3734'].keys())),
['EX1', 'EX2', 'EX3', 'GAN', 'GEL', 'GLA', 'GLO',
'GNS', 'GPL', 'GST', 'LCE', 'LCQ', 'VCO', 'VDT',
'VEC', 'VEI', 'VPB'])
self.assertAlmostEqual(obj.soh_data['3734']['EX1']['samplerate'],
0.0166, 3)
self.assertEqual(obj.soh_data['3734']['EX1']['startTmEpoch'],
1534512840.0)
self.assertEqual(obj.soh_data['3734']['EX1']['endTmEpoch'],
1534550400.0)
self.assertEqual(obj.soh_data['3734']['EX1']['size'], 597)
self.assertEqual(obj.gaps['3734'], []) # no gaps
from unittest import TestCase
from pathlib import Path
from sohstationviewer.model.mseed_data.mseed_helper import (
retrieve_nets_from_data_dict, read_text
)
TEST_DATA_DIR = Path(__file__).resolve().parent.parent.parent.joinpath(
'test_data')
text_file = TEST_DATA_DIR.joinpath(
"Pegasus-sample/Pegasus_SVC4/logs/2020/XX/KC01/XX.KC01...D.2020.129")
binary_file = TEST_DATA_DIR.joinpath(
"Pegasus-sample/Pegasus_SVC4/soh/2020/XX/KC01/VDT.D/"
"XX.KC01..VDT.D.2020.129")
class TestReadText(TestCase):
def test_text_file(self):
ret = read_text(text_file)
expected_ret = (
"\n\n** STATE OF HEALTH: XX.KC01...D.2020.129"
"\n2020-05-08 22:55:45.390 UTC: I(Initializations): Firmware")
self.assertEqual(ret[:100], expected_ret
)
def test_binary_file(self):
ret = read_text(binary_file)
self.assertIsNone(ret)
class TestRetrieveNetsFromDataDict(TestCase):
def setUp(self):
self.nets_by_sta = {}
self.data_dict = {
'STA1': {'CHA1': {'nets': {'NET1', 'NET2'}},
'CHA2': {'nets': {'NET2', 'NET3'}}
},
'STA2': {'CHA1': {'nets': {'NET1'}},
'CHA2': {'nets': {'NET1'}}
}
}
def test_retrieve_nets(self):
retrieve_nets_from_data_dict(self.data_dict, self.nets_by_sta)
self.assertEqual(list(self.nets_by_sta.keys()), ['STA1', 'STA2'])
self.assertEqual(sorted(list(self.nets_by_sta['STA1'])),
['NET1', 'NET2', 'NET3'])
self.assertEqual(sorted(list(self.nets_by_sta['STA2'])), ['NET1'])
from unittest import TestCase
from pathlib import Path
from sohstationviewer.model.mseed_data.mseed_reader import MSeedReader
TEST_DATA_DIR = Path(__file__).resolve().parent.parent.parent.joinpath(
'test_data')
ascii_file = TEST_DATA_DIR.joinpath(
"Q330-sample/day_vols_AX08/AX08.XA..LOG.2021.186")
blockettes_files = TEST_DATA_DIR.joinpath(
"Q330_unimplemented_ascii_block/XX-3203_4-20221222190255")
multiplex_file = TEST_DATA_DIR.joinpath(
"Q330_multiplex/XX-3203_4-20221222183011")
soh_file = TEST_DATA_DIR.joinpath(
"Q330-sample/day_vols_AX08/AX08.XA..VKI.2021.186")
waveform_file = TEST_DATA_DIR.joinpath(
"Q330-sample/day_vols_AX08/AX08.XA..LHE.2021.186")
mass_pos_file = TEST_DATA_DIR.joinpath(
"Q330-sample/day_vols_AX08/AX08.XA..VM1.2021.186")
gap_file = TEST_DATA_DIR.joinpath(
"Centaur-sample/SOH/"
"XX.3734.SOH.centaur-3_3734..20180817_000000.miniseed.miniseed")
class TestMSeedReader(TestCase):
def setUp(self) -> None:
self.soh_data = {}
self.mass_pos_data = {}
self.waveform_data = {}
self.log_data = {}
def test_read_ascii(self):
args = {
'file_path': ascii_file,
'is_multiplex': False,
'req_soh_chans': ['LOG'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.log_data.keys()), ['AX08'])
self.assertEqual(list(self.log_data['AX08'].keys()), ['LOG'])
self.assertEqual(len(self.log_data['AX08']['LOG']), 16)
self.assertEqual(
self.log_data['AX08']['LOG'][0][:100],
'\n\nSTATE OF HEALTH: From:1625456260.12 To:1625456260.12\n\r'
'\nQuanterra Packet Baler Model 14 Restart. V'
)
self.assertEqual(
self.log_data['AX08']['LOG'][1][:100],
'\n\nSTATE OF HEALTH: From:1625456366.62 To:1625456366.62'
'\nReducing Status Polling Interval\r\n[2021-07-0'
)
def test_read_blockettes_info(self):
args = {
'file_path': blockettes_files,
'is_multiplex': True,
'req_soh_chans': ['ACE'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.log_data.keys()), ['3203'])
self.assertEqual(list(self.log_data['3203'].keys()), ['ACE'])
self.assertEqual(len(self.log_data['3203']['ACE']), 1)
self.assertEqual(
self.log_data['3203']['ACE'][0][:100],
'\n\nSTATE OF HEALTH: From:1671729287.00014 To:1671729287.0'
'\n===========\nVCO correction: 53.7109375\nTim'
)
def test_not_is_multiplex_read_channel(self):
# is_multiplex = False => stop when reach to channel not match req
# so the channel 'EL1' is read but not finished
args = {
'file_path': multiplex_file,
'is_multiplex': False,
'req_wf_chans': ['EL1'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.waveform_data.keys()), ['3203'])
self.assertEqual(list(self.waveform_data['3203'].keys()), ['EL1'])
self.assertEqual(self.waveform_data['3203']['EL1']['samplerate'], 200)
self.assertEqual(self.waveform_data['3203']['EL1']['startTmEpoch'],
1671730004.145029)
self.assertEqual(self.waveform_data['3203']['EL1']['endTmEpoch'],
1671730013.805)
self.assertEqual(self.waveform_data['3203']['EL1']['size'], 1932)
self.assertEqual(self.waveform_data['3203']['EL1']['gaps'], [])
self.assertEqual(len(self.waveform_data['3203']['EL1']['tracesInfo']),
1)
def test_is_multiplex_read_channel(self):
# is_multiplex = True => read every record
args = {
'file_path': multiplex_file,
'is_multiplex': True,
'req_wf_chans': ['EL1'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.waveform_data.keys()), ['3203'])
self.assertEqual(list(self.waveform_data['3203'].keys()), ['EL1'])
self.assertEqual(self.waveform_data['3203']['EL1']['samplerate'], 200)
self.assertEqual(self.waveform_data['3203']['EL1']['startTmEpoch'],
1671730004.145029)
self.assertEqual(self.waveform_data['3203']['EL1']['endTmEpoch'],
1671730720.4348998)
self.assertEqual(self.waveform_data['3203']['EL1']['size'], 143258)
self.assertEqual(self.waveform_data['3203']['EL1']['gaps'], [])
self.assertEqual(len(self.waveform_data['3203']['EL1']['tracesInfo']),
1)
def test_not_is_multiplex_selected_channel_in_middle(self):
# won't reached selected channel because previous record doesn't meet
# requirement when is_multiplex = False
args = {
'file_path': multiplex_file,
'is_multiplex': False,
'req_wf_chans': ['EL2'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.waveform_data.keys()), [])
def test_is_multiplex_selected_channel_in_middle(self):
# is_multiplex = True => the selected channel will be read
args = {
'file_path': multiplex_file,
'is_multiplex': True,
'req_wf_chans': ['EL2'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.waveform_data.keys()), ['3203'])
self.assertEqual(list(self.waveform_data['3203'].keys()), ['EL2'])
self.assertEqual(self.waveform_data['3203']['EL2']['samplerate'], 200)
self.assertEqual(self.waveform_data['3203']['EL2']['startTmEpoch'],
1671730004.3100293)
self.assertEqual(self.waveform_data['3203']['EL2']['endTmEpoch'],
1671730720.5549)
self.assertEqual(self.waveform_data['3203']['EL2']['size'], 143249)
self.assertEqual(self.waveform_data['3203']['EL2']['gaps'], [])
self.assertEqual(len(self.waveform_data['3203']['EL2']['tracesInfo']),
1)
def test_existing_time_range(self):
# check if data_time is from the given range, end time may get
# a little greater than read_end according to record's end time
args = {
'file_path': soh_file,
'is_multiplex': False,
'req_soh_chans': ['VKI'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data,
'read_start': 1625456018.0,
'read_end': 1625505627.9998999
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.soh_data['AX08'].keys()), ['VKI'])
self.assertEqual(self.soh_data['AX08']['VKI']['startTmEpoch'],
1625446018.0)
self.assertEqual(self.soh_data['AX08']['VKI']['endTmEpoch'],
1625510338.0)
def test_non_existing_time_range(self):
# if given time range out of the data time, no station will be created
args = {
'file_path': soh_file,
'is_multiplex': False,
'req_soh_chans': ['VKI'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data,
'read_start': 1625356018.0,
'read_end': 1625405627.9998999
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(self.soh_data, {})
self.assertEqual(self.mass_pos_data, {})
self.assertEqual(self.waveform_data, {})
def test_read_waveform(self):
args = {
'file_path': waveform_file,
'is_multiplex': False,
'req_wf_chans': ['LHE'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.waveform_data.keys()), ['AX08'])
self.assertEqual(list(self.waveform_data['AX08'].keys()), ['LHE'])
self.assertEqual(self.waveform_data['AX08']['LHE']['samplerate'], 1)
self.assertEqual(self.waveform_data['AX08']['LHE']['startTmEpoch'],
1625445156.000001)
self.assertEqual(self.waveform_data['AX08']['LHE']['endTmEpoch'],
1625532950.0)
self.assertEqual(self.waveform_data['AX08']['LHE']['size'], 87794)
self.assertEqual(self.waveform_data['AX08']['LHE']['gaps'], [])
self.assertEqual(len(self.waveform_data['AX08']['LHE']['tracesInfo']),
1)
def test_read_mass_pos_channel(self):
# mass position channels will be read if one or both include_mpxxxxxx
# are True
args = {
'file_path': mass_pos_file,
'is_multiplex': False,
'include_mp123zne': True,
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.mass_pos_data.keys()), ['AX08'])
self.assertEqual(list(self.mass_pos_data['AX08'].keys()), ['VM1'])
self.assertEqual(self.mass_pos_data['AX08']['VM1']['samplerate'], 0.1)
self.assertEqual(self.mass_pos_data['AX08']['VM1']['startTmEpoch'],
1625444970.0)
self.assertEqual(self.mass_pos_data['AX08']['VM1']['endTmEpoch'],
1625574580.0)
self.assertEqual(self.mass_pos_data['AX08']['VM1']['size'], 12961)
self.assertEqual(self.mass_pos_data['AX08']['VM1']['gaps'], [])
self.assertEqual(len(self.mass_pos_data['AX08']['VM1']['tracesInfo']),
1)
def test_gap(self):
# gaps will be detected when gap_minimum is set
args = {
'file_path': gap_file,
'is_multiplex': True,
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data,
'gap_minimum': 60
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.soh_data.keys()), ['3734'])
self.assertEqual(sorted(list(self.soh_data['3734'].keys())),
['EX1', 'EX2', 'EX3', 'GAN', 'GEL', 'GLA', 'GLO',
'GNS', 'GPL', 'GST', 'LCE', 'LCQ', 'VCO', 'VDT',
'VEC', 'VEI', 'VPB'])
self.assertAlmostEqual(self.soh_data['3734']['EX1']['samplerate'],
0.0166, 3)
self.assertEqual(self.soh_data['3734']['EX1']['startTmEpoch'],
1534512840.0)
self.assertEqual(self.soh_data['3734']['EX1']['endTmEpoch'],
1534550400.0)
self.assertEqual(self.soh_data['3734']['EX1']['size'], 597)
self.assertEqual(self.soh_data['3734']['EX1']['gaps'],
[[1534522200.0, 1534523940.0]])
def test_not_detect_gap(self):
# if gap_minimum isn't set but gap exist, data still be separated, but
# gap won't be added to gap list
args = {
'file_path': gap_file,
'is_multiplex': True,
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data,
'gap_minimum': None
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.soh_data.keys()), ['3734'])
self.assertEqual(sorted(list(self.soh_data['3734'].keys())),
['EX1', 'EX2', 'EX3', 'GAN', 'GEL', 'GLA', 'GLO',
'GNS', 'GPL', 'GST', 'LCE', 'LCQ', 'VCO', 'VDT',
'VEC', 'VEI', 'VPB'])
self.assertAlmostEqual(self.soh_data['3734']['EX1']['samplerate'],
0.0166, 3)
self.assertEqual(self.soh_data['3734']['EX1']['startTmEpoch'],
1534512840.0)
self.assertEqual(self.soh_data['3734']['EX1']['endTmEpoch'],
1534550400.0)
self.assertEqual(self.soh_data['3734']['EX1']['size'], 597)
self.assertEqual(self.soh_data['3734']['EX1']['gaps'], []) # no gaps
......@@ -2,8 +2,7 @@ import unittest
from sohstationviewer.database.extract_data import (
get_chan_plot_info,
get_wf_plot_info,
get_chan_label,
get_seismic_chan_label,
get_signature_channels,
get_color_def,
get_color_ranges,
......@@ -11,7 +10,7 @@ from sohstationviewer.database.extract_data import (
class TestExtractData(unittest.TestCase):
def test_get_chan_plot_info_good_channel_and_data_type(self):
def test_get_chan_plot_info_good_soh_channel_and_data_type(self):
"""
Test basic functionality of get_chan_plot_info - channel and data type
combination exists in database table `Channels`
......@@ -25,9 +24,62 @@ class TestExtractData(unittest.TestCase):
'label': 'SOH/Data Def',
'fixPoint': 0,
'valueColors': '0:W|1:C'}
self.assertDictEqual(
get_chan_plot_info('SOH/Data Def', {'samplerate': 10}, 'RT130'),
expected_result)
self.assertDictEqual(get_chan_plot_info('SOH/Data Def', 'RT130'),
expected_result)
def test_get_chan_plot_info_masspos_channel(self):
with self.subTest("Mass position 'VM'"):
expected_result = {'channel': 'VM1',
'plotType': 'linesMasspos',
'height': 4,
'unit': 'V',
'linkedChan': None,
'convertFactor': 0.1,
'label': 'VM1-MassPos',
'fixPoint': 1,
'valueColors': None}
self.assertDictEqual(get_chan_plot_info('VM1', 'Q330'),
expected_result)
with self.subTest("Mass position 'MassPos'"):
expected_result = {'channel': 'MassPos1',
'plotType': 'linesMasspos',
'height': 4,
'unit': 'V',
'linkedChan': None,
'convertFactor': 1,
'label': 'MassPos1',
'fixPoint': 1,
'valueColors': None}
self.assertDictEqual(get_chan_plot_info('MassPos1', 'RT130'),
expected_result)
def test_get_chan_plot_info_seismic_channel(self):
with self.subTest("RT130 Seismic"):
expected_result = {'channel': 'DS2',
'plotType': 'linesSRate',
'height': 8,
'unit': '',
'linkedChan': None,
'convertFactor': 1,
'label': 'DS2',
'fixPoint': 0,
'valueColors': None}
self.assertDictEqual(get_chan_plot_info('DS2', 'RT130'),
expected_result)
with self.subTest("MSeed Seismic"):
expected_result = {'channel': 'LHE',
'plotType': 'linesSRate',
'height': 8,
'unit': '',
'linkedChan': None,
'convertFactor': 1,
'label': 'LHE-EW',
'fixPoint': 0,
'valueColors': None}
self.assertDictEqual(get_chan_plot_info('LHE', 'Q330'),
expected_result)
def test_get_chan_plot_info_data_type_is_unknown(self):
"""
......@@ -44,10 +96,8 @@ class TestExtractData(unittest.TestCase):
'label': 'DEFAULT-Bad Channel ID',
'fixPoint': 0,
'valueColors': None}
self.assertDictEqual(
get_chan_plot_info('Bad Channel ID',
{'samplerate': 10}, 'Unknown'),
expected_result)
self.assertDictEqual(get_chan_plot_info('Bad Channel ID', 'Unknown'),
expected_result)
# Channel exist in database
expected_result = {'channel': 'LCE',
......@@ -59,12 +109,8 @@ class TestExtractData(unittest.TestCase):
'label': 'LCE-PhaseError',
'fixPoint': 0,
'valueColors': 'L:W|D:Y'}
self.assertDictEqual(
get_chan_plot_info('LCE', {'samplerate': 10}, 'Unknown'),
expected_result)
self.assertDictEqual(
get_chan_plot_info('LCE', {'samplerate': 10}, 'Unknown'),
expected_result)
self.assertDictEqual(get_chan_plot_info('LCE', 'Unknown'),
expected_result)
def test_get_chan_plot_info_bad_channel_or_data_type(self):
"""
......@@ -86,69 +132,54 @@ class TestExtractData(unittest.TestCase):
# Data type has None value. None value comes from
# controller.processing.detect_data_type.
expected_result['label'] = 'DEFAULT-SOH/Data Def'
self.assertDictEqual(
get_chan_plot_info('SOH/Data Def', {'samplerate': 10}, None),
expected_result)
self.assertDictEqual(get_chan_plot_info('SOH/Data Def', None),
expected_result)
# Channel and data type are empty strings
expected_result['label'] = 'DEFAULT-'
self.assertDictEqual(
get_chan_plot_info('', {'samplerate': 10}, ''),
expected_result)
self.assertDictEqual(get_chan_plot_info('', ''),
expected_result)
# Channel exists in database but data type does not
expected_result['label'] = 'DEFAULT-SOH/Data Def'
self.assertDictEqual(
get_chan_plot_info('SOH/Data Def',
{'samplerate': 10}, 'Bad Data Type'),
get_chan_plot_info('SOH/Data Def', 'Bad Data Type'),
expected_result
)
# Data type exists in database but channel does not
expected_result['label'] = 'DEFAULT-Bad Channel ID'
self.assertDictEqual(
get_chan_plot_info('Bad Channel ID',
{'samplerate': 10}, 'RT130'),
expected_result)
self.assertDictEqual(get_chan_plot_info('Bad Channel ID', 'RT130'),
expected_result)
# Both channel and data type exists in database but not their
# combination
expected_result['label'] = 'DEFAULT-SOH/Data Def'
self.assertDictEqual(
get_chan_plot_info('SOH/Data Def', {'samplerate': 10}, 'Q330'),
expected_result)
def test_get_wf_plot_info(self):
"""
Test basic functionality of get_wf_plot_info - ensures returned
dictionary contains all the needed key. Bad channel IDs cases are
handled in tests for get_chan_label.
"""
result = get_wf_plot_info('CH1')
expected_keys = {'param', 'plotType', 'valueColors', 'height',
'label', 'unit', 'channel', 'convertFactor'}
self.assertSetEqual(set(result.keys()), expected_keys)
self.assertDictEqual(get_chan_plot_info('SOH/Data Def', 'Q330'),
expected_result)
def test_get_chan_label_good_channel_id(self):
def test_get_seismic_chan_label_good_channel_id(self):
"""
Test basic functionality of get_chan_label - channel ID ends in one
of the keys in conf.dbSettings.dbConf['seisLabel'] or starts with 'DS'
Test basic functionality of get_seismic_chan_label - channel ID ends
in one of the keys in conf.dbSettings.dbConf['seisLabel'] or
starts with 'DS'
"""
# Channel ID does not start with 'DS'
self.assertEqual(get_chan_label('CH1'), 'CH1-NS')
self.assertEqual(get_chan_label('CH2'), 'CH2-EW')
self.assertEqual(get_chan_label('CHG'), 'CHG')
self.assertEqual(get_seismic_chan_label('CH1'), 'CH1-NS')
self.assertEqual(get_seismic_chan_label('CH2'), 'CH2-EW')
self.assertEqual(get_seismic_chan_label('CHG'), 'CHG')
# Channel ID starts with 'DS'
self.assertEqual(get_chan_label('DS-TEST-CHANNEL'), 'DS-TEST-CHANNEL')
self.assertEqual(get_seismic_chan_label('DS-TEST-CHANNEL'),
'DS-TEST-CHANNEL')
def test_get_chan_label_bad_channel_id(self):
"""
Test basic functionality of get_chan_label - channel ID does not end in
one of the keys in conf.dbSettings.dbConf['seisLabel'] or is the empty
string.
Test basic functionality of get_seismic_chan_label - channel ID does
not end in one of the keys in conf.dbSettings.dbConf['seisLabel']
or is the empty string.
"""
self.assertRaises(IndexError, get_chan_label, '')
self.assertRaises(IndexError, get_seismic_chan_label, '')
def test_get_signature_channels(self):
"""Test basic functionality of get_signature_channels"""
......
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Optional, Dict, Union, List
from typing import Dict, Union, List
from unittest import TestCase
from unittest.mock import patch
......@@ -8,17 +8,12 @@ from unittest.mock import patch
from obspy.core import UTCDateTime
import numpy as np
import sohstationviewer.view.plotting.time_power_squared_processor
from sohstationviewer.conf import constants as const
from sohstationviewer.model.handling_data import (
trim_downsample_chan_with_spr_less_or_equal_1,
trim_downsample_wf_chan,
trim_waveform_data,
downsample_waveform_data,
get_start_5mins_of_diff_days,
)
from sohstationviewer.view.plotting.time_power_squared_processor import (
TimePowerSquaredProcessor,
)
from sohstationviewer.model.downsampler import downsample, chunk_minmax
......@@ -610,337 +605,3 @@ class TestTrimDownsampleWfChan(TestCase):
self.end_time, False)
self.assertTrue(mock_trim.called)
self.assertTrue(mock_downsample.called)
class TestGetTrimTpsData(TestCase):
def no_file_memmap(self, file_path: Path, *args, **kwargs):
"""
A mock of numpy.memmap. Reduce test run time significantly by making
sure that data access happens in memory and not on disk.
This method does not actually load the data stored on disk. Instead, it
constructs the array of data using the name of the given file. To do
so, this method requires the file name to be in the format
<prefix>_<index>. This method then constructs an array of
self.trace_size consecutive integers starting at
<index> * self.trace_size.
:param file_path: the path to a file used to construct the data array.
:param args: dummy arguments to make the API similar to numpy.memmap.
:param kwargs: dummy arguments to make the API similar to numpy.memmap.
:return: a numpy array constructed using file_path's name.
"""
file_idx = int(file_path.name.split('_')[-1])
start = file_idx * self.trace_size
end = start + self.trace_size
return np.arange(start, end)
def add_trace(self, start_time: float, idx: Optional[int] = None):
"""
Add a trace to the stored list of traces.
:param start_time: the start time of the trace to be added.
:param idx: the index to insert the trace into. If None, the new trace
will be appended to the list of traces
"""
trace = {}
trace['startTmEpoch'] = start_time
trace['endTmEpoch'] = start_time + self.trace_size - 1
trace['size'] = self.trace_size
file_idx = start_time // self.trace_size
times_file_name = Path(self.data_folder.name) / f'times_{file_idx}'
trace['times_f'] = times_file_name
data_file_name = Path(self.data_folder.name) / f'data_{file_idx}'
trace['data_f'] = data_file_name
if idx is not None:
self.traces_info.insert(idx, trace)
else:
self.traces_info.append(trace)
def setUp(self) -> None:
"""Set up text fixtures."""
memmap_patcher = patch.object(np, 'memmap',
side_effect=self.no_file_memmap)
self.addCleanup(memmap_patcher.stop)
memmap_patcher.start()
# Channel ID is only used when communicating with the main window.
# Seeing as we are testing the processing step here, we don't really
# need it.
channel_id = ''
self.channel_data: ChannelData = {'samplerate': 1}
self.traces_info = []
self.channel_data['tracesInfo'] = self.traces_info
self.data_folder = TemporaryDirectory()
self.trace_size = 1000
for i in range(100):
start_time = i * self.trace_size
self.add_trace(start_time)
self.start_time = 25000
self.end_time = 75000
self.start_5mins_of_diff_days = get_start_5mins_of_diff_days(
self.start_time, self.end_time)
self.tps_processor = TimePowerSquaredProcessor(
channel_id, self.channel_data, self.start_time, self.end_time,
self.start_5mins_of_diff_days
)
local_TimePowerSquaredProcessor = (sohstationviewer.view.plotting.
time_power_squared_processor.
TimePowerSquaredProcessor)
# If object obj is instance of class A, then the method call obj.method1()
# translate to A.method1(obj) for Python. So, in order to mock method1 for
# obj, we mock it for the class A.
@patch.object(local_TimePowerSquaredProcessor, 'trim_waveform_data')
def test_data_is_trimmed(self, mock_trim_waveform_data):
"""Test that the data is trimmed."""
self.tps_processor.run()
self.assertTrue(mock_trim_waveform_data.called)
def test_appropriate_amount_of_5_mins_skipped(self):
"""Test that the trimmed part of the data is skipped over."""
self.tps_processor.run()
with self.subTest('test_skip_before_start_time'):
first_unskipped_idx = 83
skipped_tps_arr = (
self.channel_data['tps_data'][0][:first_unskipped_idx]
)
self.assertTrue((skipped_tps_arr == 0).all())
with self.subTest('test_skip_after_end_time'):
last_unskipped_idx = 252
skipped_tps_arr = (
self.channel_data['tps_data'][0][last_unskipped_idx + 1:]
)
self.assertTrue((skipped_tps_arr == 0).all())
def test_result_is_stored(self):
"""Test that the result of the TPS calculation is stored."""
self.tps_processor.run()
self.assertTrue('tps_data' in self.channel_data)
def test_formula_is_correct(self):
"""Test that the TPS calculation uses the correct formula."""
self.tps_processor.start_time = 50000
self.tps_processor.end_time = 52000
self.tps_processor.run()
first_unskipped_idx = 166
last_unskipped_idx = 175
tps_data = self.channel_data['tps_data'][0]
unskipped_tps_arr = (
tps_data[first_unskipped_idx:last_unskipped_idx + 1]
)
expected = np.array([
2.51497985e+09, 2.54515955e+09, 2.57551925e+09, 0.00000000e+00,
1.96222188e+09, 2.64705855e+09, 2.67801825e+09, 2.03969638e+09,
2.75095755e+09, 2.78251725e+09
])
self.assertTrue(np.allclose(unskipped_tps_arr, expected))
def test_one_tps_array_for_each_day_one_day_of_data(self):
"""
Test that there is one TPS array for each day of data.
Test the case where there is only one day of data.
"""
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 1)
def test_one_tps_array_for_each_day_multiple_days_of_data(self):
"""
Test that there is one TPS array for each dat of data.
Test the case where there are more than one day of data.
"""
# Currently, the data time goes from 0 to 100000, which is enough to
# cover two days (the start of the second positive day in epoch time is
# 86400). Thus, we only have to set the end time to the data end time
# to have two days of data.
self.tps_processor.end_time = 100000
self.tps_processor.start_5mins_of_diff_days = \
get_start_5mins_of_diff_days(self.tps_processor.start_time,
self.tps_processor.end_time)
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 2)
def test_data_has_gap_to_the_right_data_same_day_before_gap(self):
"""
Test that gaps in the data are skipped in TPS calculation by checking
that the elements in the TPS array corresponding to the gaps are
0.
Test the case where there are gaps to the right of the data and the
traces directly next to the gaps are in the same day.
"""
# Remove traces that go from 1000 to 24999 (traces 2 to 25) in order to
# create a gap on the right side of the data.
self.traces_info = [trace
for i, trace in enumerate(self.traces_info)
if not 0 < i < 25]
self.channel_data['tracesInfo'] = self.traces_info
with self.subTest('test_start_time_in_gap'):
self.tps_processor.start_time = 15000
self.tps_processor.start_5mins_of_diff_days = \
get_start_5mins_of_diff_days(self.tps_processor.start_time,
self.tps_processor.end_time)
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 1)
tps_gap = slice(0, 50)
tps_data_in_gap = self.channel_data['tps_data'][0][tps_gap]
tps_data_in_gap_contains_zero = np.allclose(
tps_data_in_gap, np.zeros(tps_data_in_gap.size)
)
self.assertTrue(tps_data_in_gap_contains_zero)
with self.subTest('test_start_time_cover_all_traces'):
self.tps_processor.start_time = 500
self.tps_processor.start_5mins_of_diff_days = \
get_start_5mins_of_diff_days(self.tps_processor.start_time,
self.tps_processor.end_time)
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 1)
tps_gap = slice(2, 83)
tps_data_in_gap = self.channel_data['tps_data'][0][tps_gap]
tps_data_in_gap_contains_zero = np.allclose(
tps_data_in_gap, np.zeros(tps_data_in_gap.size)
)
self.assertTrue(tps_data_in_gap_contains_zero)
def test_data_has_gap_to_the_left_data_same_day_after_gap(self):
"""
Test that gaps in the data are skipped in TPS calculation by checking
that the elements in the TPS array corresponding to the gaps are
0.
Test the case where there are gaps to the left of the data and the
traces directly next to the gaps are in the same day.
"""
# Data end time is 100000, so we want a trace that starts after 100001
trace_start_time = 125000
self.add_trace(trace_start_time)
with self.subTest('test_end_time_in_gap'):
# Subject to change after Issue #37 is fixed
self.tps_processor.end_time = 110000
self.tps_processor.start_5mins_of_diff_days = \
get_start_5mins_of_diff_days(self.tps_processor.start_time,
self.tps_processor.end_time)
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 2)
tps_gaps = (slice(45, 128), slice(131, None))
tps_data_in_gaps = np.concatenate(
[self.channel_data['tps_data'][1][gap] for gap in tps_gaps]
)
tps_data_in_gaps_contains_zero = np.allclose(
tps_data_in_gaps, np.zeros(tps_data_in_gaps.size)
)
self.assertTrue(tps_data_in_gaps_contains_zero)
with self.subTest('test_end_time_cover_all_traces'):
self.tps_processor.end_time = trace_start_time + 50
self.tps_processor.start_5mins_of_diff_days = \
get_start_5mins_of_diff_days(self.tps_processor.start_time,
self.tps_processor.end_time)
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 2)
tps_gaps = (slice(45, 128), slice(131, None))
tps_data_in_gaps = np.concatenate(
[self.channel_data['tps_data'][1][gap] for gap in tps_gaps]
)
tps_data_in_gaps_contains_zero = np.allclose(
tps_data_in_gaps, np.zeros(tps_data_in_gaps.size)
)
self.assertTrue(tps_data_in_gaps_contains_zero)
def test_data_has_gap_to_the_right_data_different_day_before_gap(self):
"""
Test that gaps in the data are skipped in TPS calculation by checking
that the elements in the TPS array corresponding to the gaps are
0.
Test the case where there are gaps to the right of the data and the
traces directly next to the gaps are in different days.
"""
trace_start_time = -50000
self.add_trace(trace_start_time, idx=0)
with self.subTest('test_start_time_in_gap'):
self.tps_processor.start_time = -25000
self.tps_processor.start_5mins_of_diff_days = \
get_start_5mins_of_diff_days(self.tps_processor.start_time,
self.tps_processor.end_time)
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 2)
tps_gap = slice(const.NO_5M_DAY)
tps_data_in_gap = self.channel_data['tps_data'][0][tps_gap]
tps_data_in_gap_contains_zero = np.allclose(
tps_data_in_gap, np.zeros(tps_data_in_gap.size)
)
self.assertTrue(tps_data_in_gap_contains_zero)
with self.subTest('test_start_time_cover_all_traces'):
self.tps_processor.start_time = -60000
self.tps_processor.start_5mins_of_diff_days = \
get_start_5mins_of_diff_days(self.tps_processor.start_time,
self.tps_processor.end_time)
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 2)
tps_gaps = (slice(0, 121), slice(124, None))
tps_data_in_gaps = np.concatenate(
[self.channel_data['tps_data'][0][gap] for gap in tps_gaps]
)
tps_data_in_gaps_contains_zero = np.allclose(
tps_data_in_gaps, np.zeros(tps_data_in_gaps.size)
)
self.assertTrue(tps_data_in_gaps_contains_zero)
def test_data_has_gap_to_the_left_data_different_day_after_gap(self):
"""
Test that gaps in the data are skipped in TPS calculation by checking
that the elements in the TPS array corresponding to the gaps are
0.
Test the case where there are gaps to the left of the data and the
traces directly next to the gaps are in different days.
"""
# The setup portion of this test suite only create traces in the first
# positive day in epoch time. So, in order to guarantee there is a gap
# in the TPS array, we skip the second positive day. The start of the
# third positive day in epoch time is 172800, so we want a trace that
# starts after 172801.
trace_start_time = 173100
self.add_trace(trace_start_time)
with self.subTest('test_end_time_same_day_as_second_to_last_trace'):
# Subject to change after Issue #37 is fixed
self.tps_processor.end_time = 125000
self.tps_processor.start_5mins_of_diff_days = \
get_start_5mins_of_diff_days(self.tps_processor.start_time,
self.tps_processor.end_time)
with self.assertRaises(IndexError):
self.tps_processor.run()
with self.subTest('test_end_time_cover_all_traces'):
self.tps_processor.end_time = trace_start_time + 50
self.tps_processor.start_5mins_of_diff_days = \
get_start_5mins_of_diff_days(self.tps_processor.start_time,
self.tps_processor.end_time)
self.tps_processor.run()
self.assertEqual(len(self.channel_data['tps_data']), 3)
tps_gap_day_2 = slice(45, None)
tps_gap_day_3 = slice(4, None)
tps_data_in_gaps = np.hstack(
(
self.channel_data['tps_data'][1][tps_gap_day_2],
self.channel_data['tps_data'][2][tps_gap_day_3]
)
)
tps_data_in_gaps_contains_zero = np.allclose(
tps_data_in_gaps, np.zeros(tps_data_in_gaps.size)
)
self.assertTrue(tps_data_in_gaps_contains_zero)
......@@ -223,7 +223,8 @@ class MockMSeed(MSeed):
class TestGetGPSChannelPrefix(TestCase):
def setUp(self) -> None:
self.mseed_obj = MockMSeed()
self.mseed_obj.channels = set()
self.mseed_obj.selected_key = 'STA'
self.mseed_obj.soh_data = {'STA': {}}
def test_pegasus_data_type(self):
data_type = 'Pegasus'
......@@ -239,14 +240,16 @@ class TestGetGPSChannelPrefix(TestCase):
def test_unknown_data_type_pegasus_gps_channels(self):
data_type = 'Unknown'
self.mseed_obj.channels = {'VNS', 'VLA', 'VLO', 'VEL'}
self.mseed_obj.soh_data = {
'STA': {'VNS': {}, 'VLA': {}, 'VEL': {}, 'VLO': {}}}
expected = 'V'
result = get_gps_channel_prefix(self.mseed_obj, data_type)
self.assertEqual(expected, result)
def test_unknown_data_type_centaur_gps_channels(self):
data_type = 'Unknown'
self.mseed_obj.channels = {'GNS', 'GLA', 'GLO', 'GEL'}
self.mseed_obj.soh_data = {
'STA': {'GNS': {}, 'GLA': {}, 'GEL': {}, 'GLO': {}}}
expected = 'G'
result = get_gps_channel_prefix(self.mseed_obj, data_type)
self.assertEqual(expected, result)
......
......@@ -97,7 +97,6 @@ class TestParseGpsPoint(unittest.TestCase):
gps_point = parse_gps_point_rt130(self.good_gps_line,
self.gps_year)
result = gps_point.longitude
print(result)
expected = -106.92038611111111
self.assertTrue(math.isclose(result, expected))
......
from unittest import TestCase
from unittest.mock import patch
from obspy.core import UTCDateTime
import numpy as np
from sohstationviewer.view.plotting.plotting_widget.plotting_processor_helper \
import downsample, chunk_minmax
ZERO_EPOCH_TIME = UTCDateTime(1970, 1, 1, 0, 0, 0).timestamp
class TestDownsample(TestCase):
# FROM test_handling_data_trim_downsample.TestDownsample
def setUp(self) -> None:
patcher = patch('sohstationviewer.view.plotting.plotting_widget.'
'plotting_processor_helper.chunk_minmax')
self.addCleanup(patcher.stop)
self.mock_chunk_minmax = patcher.start()
self.times = np.arange(1000)
self.data = np.arange(1000)
self.log_idx = np.arange(1000)
def test_first_downsample_step_remove_enough_points(self):
req_points = 999
downsample(self.times, self.data, rq_points=req_points)
self.assertFalse(self.mock_chunk_minmax.called)
def test_first_downsample_step_remove_enough_points_with_logidx(self):
req_points = 999
downsample(self.times, self.data, self.log_idx, rq_points=req_points)
self.assertFalse(self.mock_chunk_minmax.called)
def test_second_downsample_step_required(self):
req_points = 1
downsample(self.times, self.data, rq_points=req_points)
self.assertTrue(self.mock_chunk_minmax.called)
times, data, _, rq_points = self.mock_chunk_minmax.call_args[0]
self.assertIsNot(times, self.times)
self.assertIsNot(data, self.data)
self.assertEqual(rq_points, req_points)
def test_second_downsample_step_required_with_logidx(self):
req_points = 1
downsample(self.times, self.data, self.log_idx, rq_points=req_points)
self.assertTrue(self.mock_chunk_minmax.called)
times, data, log_idx, rq_points = self.mock_chunk_minmax.call_args[0]
self.assertIsNot(times, self.times)
self.assertIsNot(data, self.data)
self.assertIsNot(log_idx, self.log_idx)
self.assertEqual(rq_points, req_points)
def test_requested_points_greater_than_data_size(self):
req_points = 10000
times, data, _ = downsample(
self.times, self.data, rq_points=req_points)
self.assertFalse(self.mock_chunk_minmax.called)
# Check that we did not do any processing on the times and data arrays.
# This ensures that we don't do two unneeded copy operations.
self.assertIs(times, self.times)
self.assertIs(data, self.data)
def test_requested_points_greater_than_data_size_with_logidx(self):
req_points = 10000
times, data, log_idx = downsample(
self.times, self.data, self.log_idx, rq_points=req_points)
self.assertFalse(self.mock_chunk_minmax.called)
# Check that we did not do any processing on the times and data arrays.
# This ensures that we don't do two unneeded copy operations.
self.assertIs(times, self.times)
self.assertIs(data, self.data)
self.assertIs(log_idx, self.log_idx)
def test_requested_points_is_zero(self):
req_points = 0
downsample(self.times, self.data, rq_points=req_points)
self.assertTrue(self.mock_chunk_minmax.called)
times, data, _, rq_points = self.mock_chunk_minmax.call_args[0]
self.assertIsNot(times, self.times)
self.assertIsNot(data, self.data)
self.assertEqual(rq_points, req_points)
def test_requested_points_is_zero_with_logidx(self):
req_points = 0
downsample(self.times, self.data, self.log_idx, rq_points=req_points)
self.assertTrue(self.mock_chunk_minmax.called)
times, data, log_idx, rq_points = self.mock_chunk_minmax.call_args[0]
self.assertIsNot(times, self.times)
self.assertIsNot(data, self.data)
self.assertIsNot(log_idx, self.log_idx)
self.assertEqual(rq_points, req_points)
def test_empty_times_and_data(self):
req_points = 1000
self.times = np.empty((0, 0))
self.data = np.empty((0, 0))
times, data, _ = downsample(
self.times, self.data, rq_points=req_points)
self.assertFalse(self.mock_chunk_minmax.called)
# Check that we did not do any processing on the times and data arrays.
# This ensures that we don't do two unneeded copy operations.
self.assertIs(times, self.times)
self.assertIs(data, self.data)
def test_empty_times_and_data_with_logidx(self):
req_points = 1000
self.times = np.empty((0, 0))
self.data = np.empty((0, 0))
self.log_idx = np.empty((0, 0))
times, data, log_idx = downsample(
self.times, self.data, self.log_idx, rq_points=req_points)
self.assertFalse(self.mock_chunk_minmax.called)
# Check that we did not do any processing on the times and data arrays.
# This ensures that we don't do two unneeded copy operations.
self.assertIs(times, self.times)
self.assertIs(data, self.data)
self.assertIs(log_idx, self.log_idx)
class TestChunkMinmax(TestCase):
# FROM test_handling_data_trim_downsample.TestChunkMinmax
def setUp(self):
self.times = np.arange(1000)
self.data = np.arange(1000)
self.log_idx = np.arange(1000)
def test_data_size_is_multiple_of_requested_points(self):
req_points = 100
times, data, log_idx = chunk_minmax(
self.times, self.data, self.log_idx, req_points)
self.assertEqual(times.size, req_points)
self.assertEqual(data.size, req_points)
self.assertEqual(log_idx.size, req_points)
@patch('sohstationviewer.model.downsampler.downsample', wraps=downsample)
def test_data_size_is_not_multiple_of_requested_points(
self, mock_downsample):
req_points = 102
chunk_minmax(self.times, self.data, self.log_idx, req_points)
self.assertTrue(mock_downsample.called)
def test_requested_points_too_small(self):
small_req_points_list = [0, 1]
for req_points in small_req_points_list:
with self.subTest(f'test_requested_points_is_{req_points}'):
times, data, log_idx = chunk_minmax(
self.times, self.data, self.log_idx, rq_points=req_points)
self.assertEqual(times.size, 0)
self.assertEqual(data.size, 0)
self.assertEqual(data.size, 0)
import math
from unittest import TestCase
import numpy as np
from obspy import UTCDateTime
from sohstationviewer.model.handling_data import (
get_start_5mins_of_diff_days, find_tps_tm_idx
from sohstationviewer.view.plotting.time_power_squared_helper import (
get_start_5mins_of_diff_days, find_tps_tm_idx,
get_tps_for_discontinuous_data
)
from sohstationviewer.conf import constants as const
class TestGetEachDay5MinList(TestCase):
# FROM handling_data_calc_time
def test_start_in_midle_end_exact(self):
"""
Start in the middle of a day and end at the exact end of a day
......@@ -55,6 +60,7 @@ class TestGetEachDay5MinList(TestCase):
class TestFindTPSTmIdx(TestCase):
# FROM handling_data_calc_time
@classmethod
def setUpClass(cls) -> None:
start = UTCDateTime("2012-09-07T12:15:00").timestamp
......@@ -83,3 +89,53 @@ class TestFindTPSTmIdx(TestCase):
tm = UTCDateTime("2012-09-09T00:00:00").timestamp
start_tps_tm_idx = find_tps_tm_idx(tm, self.start_5mins_of_diff_days)
self.assertEqual(start_tps_tm_idx, (287, -1))
class TestGetTPSForDiscontinuousData(TestCase):
@classmethod
def setUpClass(cls) -> None:
cls.day_begin = UTCDateTime("2021-07-05T00:00:00").timestamp
cls.start = UTCDateTime("2021-07-05T22:59:28.340").timestamp
cls.end = UTCDateTime("2021-07-06T3:59:51.870").timestamp
cls.start_5mins_of_diff_days = get_start_5mins_of_diff_days(
cls.start, cls.end
)
def test_more_than_10_minute_apart(self):
# check for empty block in between tps data
times = np.arange(self.start, self.end, 60*60) # 60m apart
data = np.random.uniform(-1000, 1000, times.size)
channel_data = {'tracesInfo': [{'times': times, 'data': data}]}
tps = get_tps_for_discontinuous_data(
channel_data, self.start_5mins_of_diff_days)
self.assertEqual(len(tps), 2)
expected_first_index = \
math.ceil((self.start - self.day_begin)/const.SEC_5M) - 1
day0_indexes = np.where(tps[0] != 0)[0]
day1_indexes = np.where(tps[1] != 0)[0]
self.assertEqual(day0_indexes[0], expected_first_index)
# different (60/5) = 12 blocks from each other
self.assertTrue(np.all(np.diff(day0_indexes) == 60/5))
self.assertTrue(np.all(np.diff(day1_indexes) == 60/5))
def test_less_than_10_minute_apart(self):
# though times of data are apart from each other, but with less
# than 10m apart, the function will fill up the empty space
times = np.arange(self.start, self.end, 9*60) # 9m apart
data = np.random.uniform(-1000, 1000, times.size)
channel_data = {'tracesInfo': [{'times': times, 'data': data}]}
tps = get_tps_for_discontinuous_data(
channel_data, self.start_5mins_of_diff_days)
self.assertEqual(len(tps), 2)
expected_first_index = \
math.ceil((self.start - self.day_begin)/const.SEC_5M) - 1
day0_indexes = np.where(tps[0] != 0)[0]
day1_indexes = np.where(tps[1] != 0)[0]
self.assertEqual(day0_indexes[0], expected_first_index)
# no blocks apart from each other
self.assertTrue(np.all(np.diff(day0_indexes) == 1))
self.assertTrue(np.all(np.diff(day1_indexes) == 1))
# last block of day0 has value
self.assertIn(const.NO_5M_DAY - 1, day0_indexes)