Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
import numpy as np
from unittest import TestCase
from unittest.mock import patch
from sohstationviewer.model.general_data.general_data_helper import (
_check_related_gaps, squash_gaps, sort_data,
retrieve_data_time_from_data_dict, retrieve_gaps_from_data_dict,
combine_data, apply_convert_factor_to_data_dict
)
class TestCheckRelatedGaps(TestCase):
# FROM test_handling_data_rearrange_data.TestCheckRelatedGaps
@classmethod
def setUpClass(cls) -> None:
cls.checked_indexes = []
def test_minmax1_inside_minmax2(self):
self.assertTrue(
_check_related_gaps(3, 4, 1, 5, 1, self.checked_indexes))
self.assertIn(1, self.checked_indexes)
def test_minmax2_inside_minmax1(self):
self.assertTrue(
_check_related_gaps(1, 5, 3, 4, 2, self.checked_indexes))
self.assertIn(2, self.checked_indexes)
def end_minmax1_overlap_start_minmax(self):
self.assertTrue(
_check_related_gaps(1, 4, 3, 5, 3, self.checked_indexes))
self.assertIn(3, self.checked_indexes)
def end_minmax2_overlap_start_minmax1(self):
self.assertTrue(
_check_related_gaps(3, 5, 1, 4, 4, self.checked_indexes))
self.assertIn(4, self.checked_indexes)
def minmax1_less_than_minmax2(self):
self.assertFalse(
_check_related_gaps(1, 3, 4, 6, 5, self.checked_indexes))
self.assertNotIn(5, self.checked_indexes, )
def minmax1_greater_than_minmax2(self):
self.assertFalse(
_check_related_gaps(6, 6, 1, 3, 5, self.checked_indexes))
self.assertEqual(5, self.checked_indexes)
class TestSquashGaps(TestCase):
# FROM test_handling_data_rearrange_data.TestSquashGaps
def setUp(self) -> None:
self.normal_gaps = [[4, 7], [4, 6], [5, 6], [3, 7], [5, 8]]
self.overlap_gaps = [[17, 14], [16, 14], [16, 15], [17, 13], [18, 15]]
self.mixed_gaps = []
for i in range(len(self.normal_gaps)):
self.mixed_gaps.append(self.normal_gaps[i])
self.mixed_gaps.append(self.overlap_gaps[i])
def test_normal_gaps(self):
gaps = squash_gaps(self.normal_gaps)
self.assertEqual(gaps, [[3, 8]])
def test_overlap_gaps(self):
gaps = squash_gaps(self.overlap_gaps)
self.assertEqual(gaps, [[18, 13]])
def test_mixed_gaps(self):
gaps = squash_gaps((self.mixed_gaps))
self.assertEqual(gaps, [[3, 8], [18, 13]])
class TestSortData(TestCase):
# FROM test_handling_data_rearrange_data.TestSortData
def setUp(self) -> None:
self.station_data_dict = {
'CH1': {'tracesInfo': [{'startTmEpoch': 7},
{'startTmEpoch': 1},
{'startTmEpoch': 5},
{'startTmEpoch': 3}]},
'CH2': {'tracesInfo': [{'startTmEpoch': 2},
{'startTmEpoch': 8},
{'startTmEpoch': 6},
{'startTmEpoch': 4}]}
}
def test_sort_data(self):
sort_data(self.station_data_dict)
self.assertEqual(
self.station_data_dict,
{'CH1': {'tracesInfo': [{'startTmEpoch': 1}, {'startTmEpoch': 3},
{'startTmEpoch': 5}, {'startTmEpoch': 7}]},
'CH2': {'tracesInfo': [{'startTmEpoch': 2}, {'startTmEpoch': 4},
{'startTmEpoch': 6}, {'startTmEpoch': 8}]}}
)
class TestRetrieveDataTimeFromDataDict(TestCase):
def setUp(self) -> None:
self.data_dict = {
'STA1': {'CH1': {'startTmEpoch': 4, 'endTmEpoch': 6},
'CH2': {'startTmEpoch': 5, 'endTmEpoch': 9}
},
'STA2': {'CH1': {'startTmEpoch': 2, 'endTmEpoch': 4},
'CH2': {'startTmEpoch': 6, 'endTmEpoch': 8}
}
}
self.data_time = {}
self.expected_data_time = {'STA1': [4, 9], 'STA2': [2, 8]}
def test_retrieve_data_time(self):
retrieve_data_time_from_data_dict(self.data_dict, self.data_time)
self.assertEqual(self.data_time,
self.expected_data_time)
class TestRetrieveGapsFromDataDict(TestCase):
def setUp(self) -> None:
self.data_dict = {
'STA1': {'CH1': {'gaps': [[1, 2], [4, 3]]},
'CH2': {'gaps': []}
},
'STA2': {'CH1': {'gaps': [[1, 2], [4, 3], [2, 3]]},
'CH2': {'gaps': [[1, 3], [3, 2]]}
},
}
self.gaps = {}
self.expected_gaps = {'STA1': [[1, 2], [4, 3]],
'STA2': [[1, 2], [4, 3], [2, 3], [1, 3], [3, 2]]}
def test_retrieve_gaps(self):
retrieve_gaps_from_data_dict(self.data_dict, self.gaps)
self.assertEqual(self.gaps,
self.expected_gaps)
class TestCombineData(TestCase):
def test_overlap_lt_gap_minimum(self):
# combine; not add to gap list
station_data_dict = {
'CH1': {
'gaps': [],
'tracesInfo': [
{'startTmEpoch': 5,
'endTmEpoch': 15,
'data': [1, 2, 2, -1],
'times': [5, 8, 11, 15]},
{'startTmEpoch': 13, # delta = 2 < 10
'endTmEpoch': 20,
'data': [1, -2, 1, 1],
'times': [13, 16, 18, 20]}
]}
}
gap_minimum = 10
combine_data(station_data_dict, gap_minimum)
self.assertEqual(station_data_dict['CH1']['gaps'], [])
self.assertEqual(
len(station_data_dict['CH1']['tracesInfo']),
1)
self.assertEqual(
station_data_dict['CH1']['tracesInfo'][0]['startTmEpoch'],
5)
self.assertEqual(
station_data_dict['CH1']['tracesInfo'][0]['endTmEpoch'],
20)
self.assertListEqual(
station_data_dict['CH1']['tracesInfo'][0]['data'].tolist(),
[1, 2, 2, -1, 1, -2, 1, 1])
self.assertListEqual(
station_data_dict['CH1']['tracesInfo'][0]['times'].tolist(),
[5, 8, 11, 15, 13, 16, 18, 20])
def test_overlap_gt_or_equal_gap_minimum(self):
# combine; add to gap list
station_data_dict = {
'CH1': {
'gaps': [],
'tracesInfo': [
{'startTmEpoch': 5,
'endTmEpoch': 15,
'data': [1, 2, 2, -1],
'times': [5, 8, 11, 15]},
{'startTmEpoch': 5, # delta = 10 >= 10
'endTmEpoch': 20,
'data': [1, -2, 1, 1],
'times': [5, 11, 15, 20]}
]}
}
gap_minimum = 10
combine_data(station_data_dict, gap_minimum)
self.assertEqual(station_data_dict['CH1']['gaps'], [[15, 5]])
self.assertEqual(
len(station_data_dict['CH1']['tracesInfo']),
1)
self.assertEqual(
station_data_dict['CH1']['tracesInfo'][0]['startTmEpoch'],
5)
self.assertEqual(
station_data_dict['CH1']['tracesInfo'][0]['endTmEpoch'],
20)
self.assertListEqual(
station_data_dict['CH1']['tracesInfo'][0]['data'].tolist(),
[1, 2, 2, -1, 1, -2, 1, 1])
self.assertListEqual(
station_data_dict['CH1']['tracesInfo'][0]['times'].tolist(),
[5, 8, 11, 15, 5, 11, 15, 20])
def test_lt_gap_minimum(self):
# not combine; not add to gap list
station_data_dict = {
'CH1': {
'gaps': [],
'tracesInfo': [
{'startTmEpoch': 5,
'endTmEpoch': 15,
'data': [1, 2, 2, -1],
'times': [5, 8, 11, 15]},
{'startTmEpoch': 22, # delta = 7 > 6, < 10
'endTmEpoch': 34,
'data': [1, -2, 1, 1],
'times': [22, 26, 30, 34]}
]}
}
gap_minimum = 10
combine_data(station_data_dict, gap_minimum)
self.assertEqual(station_data_dict['CH1']['gaps'], [])
self.assertEqual(
station_data_dict['CH1']['tracesInfo'][0]['startTmEpoch'],
5)
self.assertEqual(
station_data_dict['CH1']['tracesInfo'][0]['endTmEpoch'],
34)
self.assertListEqual(
station_data_dict['CH1']['tracesInfo'][0]['data'].tolist(),
[1, 2, 2, -1, 1, -2, 1, 1])
self.assertListEqual(
station_data_dict['CH1']['tracesInfo'][0]['times'].tolist(),
[5, 8, 11, 15, 22, 26, 30, 34])
def test_gap_gt_or_equal_gap_minimum(self):
# not combine; add to gap list
station_data_dict = {
'CH1': {
'gaps': [],
'tracesInfo': [
{'startTmEpoch': 5,
'endTmEpoch': 15,
'data': [1, 2, 2, -1],
'times': [5, 8, 11, 15]},
{'startTmEpoch': 25, # delta = 10 >= 10
'endTmEpoch': 40,
'data': [1, -2, 1, 1],
'times': [25, 29, 33, 36, 40]}
]}
}
gap_minimum = 10
combine_data(station_data_dict, gap_minimum)
self.assertEqual(station_data_dict['CH1']['gaps'], [[15, 25]])
self.assertEqual(
station_data_dict['CH1']['tracesInfo'][0]['startTmEpoch'],
5)
self.assertEqual(
station_data_dict['CH1']['tracesInfo'][0]['endTmEpoch'],
40)
self.assertListEqual(
station_data_dict['CH1']['tracesInfo'][0]['data'].tolist(),
[1, 2, 2, -1, 1, -2, 1, 1])
self.assertListEqual(
station_data_dict['CH1']['tracesInfo'][0]['times'].tolist(),
[5, 8, 11, 15, 25, 29, 33, 36, 40])
class TestApplyConvertFactorToDataDict(TestCase):
def setUp(self) -> None:
self.data_dict = {
'STA1': {
'CH1': {'tracesInfo': [{'data': np.array([1, 2, 2, -1])}]}
}
}
self.expected_data = [0.1, 0.2, 0.2, -0.1]
@patch('sohstationviewer.model.general_data.general_data_helper.'
'get_convert_factor')
def test_convert_factor(self, mock_get_convert_factor):
mock_get_convert_factor.return_value = 0.1
apply_convert_factor_to_data_dict(self.data_dict, 'Q330')
self.assertEqual(
self.data_dict['STA1']['CH1']['tracesInfo'][0]['data'].tolist(),
self.expected_data)
from unittest import TestCase
from pathlib import Path
from sohstationviewer.model.mseed_data.mseed import MSeed
from sohstationviewer.model.general_data.general_data import \
ProcessingDataError
TEST_DATA_DIR = Path(__file__).resolve().parent.parent.parent.joinpath(
'test_data')
pegasus_data = TEST_DATA_DIR.joinpath("Pegasus-sample")
q330_data = TEST_DATA_DIR.joinpath("Q330-sample")
blockettes_data = TEST_DATA_DIR.joinpath("Q330_unimplemented_ascii_block")
multiplex_data = TEST_DATA_DIR.joinpath("Q330_multiplex")
centaur_data = TEST_DATA_DIR.joinpath("Centaur-sample")
class TestMSeed(TestCase):
def test_path_not_exist(self):
# raise exception when path not exist
args = {
'data_type': 'Q330',
'is_multiplex': False,
'folder': '_',
'on_unittest': True
}
with self.assertRaises(ProcessingDataError) as context:
MSeed(**args)
self.assertEqual(
str(context.exception),
"Path '_' not exist"
)
def test_read_text_only(self):
# There is no station recognized, add text to key 'TEXT' in log_data
args = {
'data_type': 'Pegasus',
'is_multiplex': False,
'folder': pegasus_data,
'req_soh_chans': ['_'],
'on_unittest': True
}
obj = MSeed(**args)
self.assertEqual(list(obj.log_data.keys()), ['TEXT'])
self.assertEqual(len(obj.log_data['TEXT']), 2)
self.assertEqual(
obj.log_data['TEXT'][0][:100],
'\n\n** STATE OF HEALTH: XX.KC01...D.2020.130'
'\n2020-05-09 00:00:09.839 UTC: I(TimingThread): timing unce')
self.assertEqual(
obj.log_data['TEXT'][1][:100],
'\n\n** STATE OF HEALTH: XX.KC01...D.2020.129'
'\n2020-05-08 22:55:45.390 UTC: I(Initializations): Firmware')
def test_read_text_with_soh(self):
# text get station from soh data with TXT as channel to add to log_data
args = {
'data_type': 'Pegasus',
'is_multiplex': False,
'folder': pegasus_data,
'req_soh_chans': ['VE1'],
'on_unittest': True
}
obj = MSeed(**args)
self.assertEqual(list(obj.log_data.keys()), ['TEXT', 'KC01'])
self.assertEqual(len(obj.log_data['TEXT']), 0)
self.assertEqual(list(obj.log_data['KC01'].keys()), ['TXT'])
self.assertEqual(len(obj.log_data['KC01']['TXT']), 2)
self.assertEqual(
obj.log_data['KC01']['TXT'][0][:100],
'\n\n** STATE OF HEALTH: XX.KC01...D.2020.130'
'\n2020-05-09 00:00:09.839 UTC: I(TimingThread): timing unce')
self.assertEqual(
obj.log_data['KC01']['TXT'][1][:100],
'\n\n** STATE OF HEALTH: XX.KC01...D.2020.129'
'\n2020-05-08 22:55:45.390 UTC: I(Initializations): Firmware')
def test_read_text_with_waveform(self):
# text get station from waveform data with TXT as channel to add to
# log_data
args = {
'data_type': 'Pegasus',
'is_multiplex': False,
'folder': pegasus_data,
'req_wf_chans': ['HH1'],
'req_soh_chans': ['_'],
'on_unittest': True
}
obj = MSeed(**args)
self.assertEqual(list(obj.log_data.keys()), ['TEXT', 'KC01'])
self.assertEqual(len(obj.log_data['TEXT']), 0)
self.assertEqual(list(obj.log_data['KC01'].keys()), ['TXT'])
self.assertEqual(len(obj.log_data['KC01']['TXT']), 2)
self.assertEqual(
obj.log_data['KC01']['TXT'][0][:100],
'\n\n** STATE OF HEALTH: XX.KC01...D.2020.130'
'\n2020-05-09 00:00:09.839 UTC: I(TimingThread): timing unce')
self.assertEqual(
obj.log_data['KC01']['TXT'][1][:100],
'\n\n** STATE OF HEALTH: XX.KC01...D.2020.129'
'\n2020-05-08 22:55:45.390 UTC: I(Initializations): Firmware')
def test_read_ascii(self):
# info is text wrapped in mseed format
args = {
'data_type': 'Q330',
'is_multiplex': False,
'folder': q330_data,
'req_soh_chans': ['LOG'],
}
obj = MSeed(**args)
self.assertEqual(list(obj.log_data.keys()), ['TEXT', 'AX08'])
self.assertEqual(list(obj.log_data['AX08'].keys()), ['LOG'])
self.assertEqual(obj.log_data['TEXT'], [])
self.assertEqual(len(obj.log_data['AX08']['LOG']), 16)
self.assertEqual(
obj.log_data['AX08']['LOG'][0][:100],
'\n\nSTATE OF HEALTH: From:1625456260.12 To:1625456260.12\n\r'
'\nQuanterra Packet Baler Model 14 Restart. V'
)
self.assertEqual(
obj.log_data['AX08']['LOG'][1][:100],
'\n\nSTATE OF HEALTH: From:1625456366.62 To:1625456366.62'
'\nReducing Status Polling Interval\r\n[2021-07-0'
)
def test_read_blockettes_info(self):
# info in blockette 500
args = {
'data_type': 'Q330',
'is_multiplex': True,
'folder': blockettes_data,
'req_soh_chans': ['ACE'],
}
obj = MSeed(**args)
self.assertEqual(list(obj.log_data.keys()), ['TEXT', '3203'])
self.assertEqual(list(obj.log_data['3203'].keys()), ['ACE'])
self.assertEqual(obj.log_data['TEXT'], [])
self.assertEqual(len(obj.log_data['3203']['ACE']), 1)
self.assertEqual(
obj.log_data['3203']['ACE'][0][:100],
'\n\nSTATE OF HEALTH: From:1671729287.00014 To:1671729287.0'
'\n===========\nVCO correction: 53.7109375\nTim'
)
def test_not_is_multiplex_read_channel(self):
# is_multiplex = False => stop when reach to channel not match req
# so the channel 'EL1' is read but not finished
args = {
'data_type': 'Q330',
'is_multiplex': False,
'folder': multiplex_data,
'req_soh_chans': [],
'req_wf_chans': ['EL1']
}
obj = MSeed(**args)
self.assertEqual(list(obj.waveform_data.keys()), ['3203'])
self.assertEqual(list(obj.waveform_data['3203'].keys()), ['EL1'])
self.assertEqual(obj.waveform_data['3203']['EL1']['samplerate'], 200)
self.assertEqual(obj.waveform_data['3203']['EL1']['startTmEpoch'],
1671730004.145029)
self.assertEqual(obj.waveform_data['3203']['EL1']['endTmEpoch'],
1671730013.805)
self.assertEqual(obj.waveform_data['3203']['EL1']['size'], 1932)
self.assertEqual(obj.waveform_data['3203']['EL1']['gaps'], [])
self.assertEqual(len(obj.waveform_data['3203']['EL1']['tracesInfo']),
1)
def test_is_multiplex_read_channel(self):
# is_multiplex = True => read every record
args = {
'data_type': 'Q330',
'is_multiplex': True,
'folder': multiplex_data,
'req_soh_chans': [],
'req_wf_chans': ['EL1']
}
obj = MSeed(**args)
self.assertEqual(list(obj.waveform_data.keys()), ['3203'])
self.assertEqual(list(obj.waveform_data['3203'].keys()), ['EL1'])
self.assertEqual(obj.waveform_data['3203']['EL1']['samplerate'], 200)
self.assertEqual(obj.waveform_data['3203']['EL1']['startTmEpoch'],
1671730004.145029)
self.assertEqual(obj.waveform_data['3203']['EL1']['endTmEpoch'],
1671730720.4348998)
self.assertEqual(obj.waveform_data['3203']['EL1']['size'], 143258)
self.assertEqual(obj.waveform_data['3203']['EL1']['gaps'], [])
self.assertEqual(len(obj.waveform_data['3203']['EL1']['tracesInfo']),
1)
def test_not_is_multiplex_selected_channel_in_middle(self):
# won't reached selected channel because previous record doesn't meet
# requirement when is_multiplex = False
args = {
'data_type': 'Q330',
'is_multiplex': False,
'folder': multiplex_data,
'req_soh_chans': [],
'req_wf_chans': ['EL2']
}
obj = MSeed(**args)
self.assertEqual(list(obj.waveform_data.keys()), [])
def test_is_multiplex_selected_channel_in_middle(self):
# is_multiplex = True => the selected channel will be read
args = {
'data_type': 'Q330',
'is_multiplex': True,
'folder': multiplex_data,
'req_soh_chans': [],
'req_wf_chans': ['EL2']
}
obj = MSeed(**args)
self.assertEqual(list(obj.waveform_data.keys()), ['3203'])
self.assertEqual(list(obj.waveform_data['3203'].keys()), ['EL2'])
self.assertEqual(obj.waveform_data['3203']['EL2']['samplerate'], 200)
self.assertEqual(obj.waveform_data['3203']['EL2']['startTmEpoch'],
1671730004.3100293)
self.assertEqual(obj.waveform_data['3203']['EL2']['endTmEpoch'],
1671730720.5549)
self.assertEqual(obj.waveform_data['3203']['EL2']['size'], 143249)
self.assertEqual(obj.waveform_data['3203']['EL2']['gaps'], [])
self.assertEqual(len(obj.waveform_data['3203']['EL2']['tracesInfo']),
1)
def test_existing_time_range(self):
import os
print(os.getcwd())
# check if data_time is from the given range, end time may get
# a little greater than read_end according to record's end time
args = {
'data_type': 'Q330',
'is_multiplex': False,
'folder': q330_data,
'req_soh_chans': [],
'read_start': 1625456018.0,
'read_end': 1625505627.9998999
}
obj = MSeed(**args)
self.assertEqual(obj.keys, ['AX08'])
self.assertEqual(list(obj.soh_data['AX08'].keys()), ['VKI'])
self.assertEqual(list(obj.mass_pos_data['AX08'].keys()), [])
self.assertEqual(list(obj.waveform_data['AX08'].keys()), [])
self.assertEqual(obj.data_time['AX08'], [1625446018.0, 1625510338.0])
def test_non_existing_time_range(self):
# if given time range out of the data time, no station will be created
args = {
'data_type': 'Q330',
'is_multiplex': False,
'folder': q330_data,
'req_soh_chans': [],
'read_start': 1625356018.0,
'read_end': 1625405627.9998999
}
obj = MSeed(**args)
self.assertEqual(obj.keys, [])
self.assertEqual(obj.soh_data, {})
self.assertEqual(obj.mass_pos_data, {})
self.assertEqual(obj.waveform_data, {})
self.assertEqual(obj.data_time, {})
def test_read_waveform(self):
# data from tps similar to waveform but not separated at gaps
args = {
'data_type': 'Q330',
'is_multiplex': False,
'folder': q330_data,
'req_soh_chans': [],
'req_wf_chans': ['LHE']
}
obj = MSeed(**args)
self.assertEqual(list(obj.waveform_data.keys()), ['AX08'])
self.assertEqual(list(obj.waveform_data['AX08'].keys()), ['LHE'])
self.assertEqual(obj.waveform_data['AX08']['LHE']['samplerate'], 1)
self.assertEqual(obj.waveform_data['AX08']['LHE']['startTmEpoch'],
1625445156.000001)
self.assertEqual(obj.waveform_data['AX08']['LHE']['endTmEpoch'],
1625532950.0)
self.assertEqual(obj.waveform_data['AX08']['LHE']['size'], 87794)
self.assertEqual(obj.waveform_data['AX08']['LHE']['gaps'], [])
self.assertEqual(len(obj.waveform_data['AX08']['LHE']['tracesInfo']),
1)
def test_read_mass_pos_channel(self):
# mass position channels will be read if one or both include_mpxxxxxx
# are True
args = {
'data_type': 'Q330',
'is_multiplex': True,
'folder': q330_data,
'req_soh_chans': [],
'req_wf_chans': [],
'include_mp123zne': True
}
obj = MSeed(**args)
self.assertEqual(list(obj.mass_pos_data.keys()), ['AX08'])
self.assertEqual(list(obj.mass_pos_data['AX08'].keys()), ['VM1'])
self.assertEqual(obj.mass_pos_data['AX08']['VM1']['samplerate'], 0.1)
self.assertEqual(obj.mass_pos_data['AX08']['VM1']['startTmEpoch'],
1625444970.0)
self.assertEqual(obj.mass_pos_data['AX08']['VM1']['endTmEpoch'],
1625574580.0)
self.assertEqual(obj.mass_pos_data['AX08']['VM1']['size'], 12961)
self.assertEqual(obj.mass_pos_data['AX08']['VM1']['gaps'], [])
self.assertEqual(len(obj.mass_pos_data['AX08']['VM1']['tracesInfo']),
1)
def test_gap(self):
# gaps will be detected when gap_minimum is set
args = {
'data_type': 'Centaur',
'is_multiplex': True,
'folder': centaur_data,
'req_soh_chans': [],
'gap_minimum': 60
}
obj = MSeed(**args)
self.assertEqual(list(obj.soh_data.keys()), ['3734'])
self.assertEqual(sorted(list(obj.soh_data['3734'].keys())),
['EX1', 'EX2', 'EX3', 'GAN', 'GEL', 'GLA', 'GLO',
'GNS', 'GPL', 'GST', 'LCE', 'LCQ', 'VCO', 'VDT',
'VEC', 'VEI', 'VPB'])
self.assertAlmostEqual(obj.soh_data['3734']['EX1']['samplerate'],
0.0166, 3)
self.assertEqual(obj.soh_data['3734']['EX1']['startTmEpoch'],
1534512840.0)
self.assertEqual(obj.soh_data['3734']['EX1']['endTmEpoch'],
1534550400.0)
self.assertEqual(obj.soh_data['3734']['EX1']['size'], 597)
self.assertEqual(obj.gaps['3734'], [[1534521420.0, 1534524000.0]])
def test_not_detect_gap(self):
# if gap_minimum isn't set but gap exist, data still be separated, but
# gap won't be added to gap list
args = {
'data_type': 'Centaur',
'is_multiplex': True,
'folder': centaur_data,
'req_soh_chans': [],
'gap_minimum': None
}
obj = MSeed(**args)
self.assertEqual(list(obj.soh_data.keys()), ['3734'])
self.assertEqual(sorted(list(obj.soh_data['3734'].keys())),
['EX1', 'EX2', 'EX3', 'GAN', 'GEL', 'GLA', 'GLO',
'GNS', 'GPL', 'GST', 'LCE', 'LCQ', 'VCO', 'VDT',
'VEC', 'VEI', 'VPB'])
self.assertAlmostEqual(obj.soh_data['3734']['EX1']['samplerate'],
0.0166, 3)
self.assertEqual(obj.soh_data['3734']['EX1']['startTmEpoch'],
1534512840.0)
self.assertEqual(obj.soh_data['3734']['EX1']['endTmEpoch'],
1534550400.0)
self.assertEqual(obj.soh_data['3734']['EX1']['size'], 597)
self.assertEqual(obj.gaps['3734'], []) # no gaps
from unittest import TestCase
from pathlib import Path
from sohstationviewer.model.mseed_data.mseed_helper import (
retrieve_nets_from_data_dict, read_text
)
TEST_DATA_DIR = Path(__file__).resolve().parent.parent.parent.joinpath(
'test_data')
text_file = TEST_DATA_DIR.joinpath(
"Pegasus-sample/Pegasus_SVC4/logs/2020/XX/KC01/XX.KC01...D.2020.129")
binary_file = TEST_DATA_DIR.joinpath(
"Pegasus-sample/Pegasus_SVC4/soh/2020/XX/KC01/VDT.D/"
"XX.KC01..VDT.D.2020.129")
class TestReadText(TestCase):
def test_text_file(self):
ret = read_text(text_file)
expected_ret = (
"\n\n** STATE OF HEALTH: XX.KC01...D.2020.129"
"\n2020-05-08 22:55:45.390 UTC: I(Initializations): Firmware")
self.assertEqual(ret[:100], expected_ret
)
def test_binary_file(self):
ret = read_text(binary_file)
self.assertIsNone(ret)
class TestRetrieveNetsFromDataDict(TestCase):
def setUp(self):
self.nets_by_sta = {}
self.data_dict = {
'STA1': {'CHA1': {'nets': {'NET1', 'NET2'}},
'CHA2': {'nets': {'NET2', 'NET3'}}
},
'STA2': {'CHA1': {'nets': {'NET1'}},
'CHA2': {'nets': {'NET1'}}
}
}
def test_retrieve_nets(self):
retrieve_nets_from_data_dict(self.data_dict, self.nets_by_sta)
self.assertEqual(list(self.nets_by_sta.keys()), ['STA1', 'STA2'])
self.assertEqual(sorted(list(self.nets_by_sta['STA1'])),
['NET1', 'NET2', 'NET3'])
self.assertEqual(sorted(list(self.nets_by_sta['STA2'])), ['NET1'])
from unittest import TestCase
from pathlib import Path
from sohstationviewer.model.mseed_data.mseed_reader import MSeedReader
TEST_DATA_DIR = Path(__file__).resolve().parent.parent.parent.joinpath(
'test_data')
ascii_file = TEST_DATA_DIR.joinpath(
"Q330-sample/day_vols_AX08/AX08.XA..LOG.2021.186")
blockettes_files = TEST_DATA_DIR.joinpath(
"Q330_unimplemented_ascii_block/XX-3203_4-20221222190255")
multiplex_file = TEST_DATA_DIR.joinpath(
"Q330_multiplex/XX-3203_4-20221222183011")
soh_file = TEST_DATA_DIR.joinpath(
"Q330-sample/day_vols_AX08/AX08.XA..VKI.2021.186")
waveform_file = TEST_DATA_DIR.joinpath(
"Q330-sample/day_vols_AX08/AX08.XA..LHE.2021.186")
mass_pos_file = TEST_DATA_DIR.joinpath(
"Q330-sample/day_vols_AX08/AX08.XA..VM1.2021.186")
gap_file = TEST_DATA_DIR.joinpath(
"Centaur-sample/SOH/"
"XX.3734.SOH.centaur-3_3734..20180817_000000.miniseed.miniseed")
class TestMSeedReader(TestCase):
def setUp(self) -> None:
self.soh_data = {}
self.mass_pos_data = {}
self.waveform_data = {}
self.log_data = {}
def test_read_ascii(self):
args = {
'file_path': ascii_file,
'is_multiplex': False,
'req_soh_chans': ['LOG'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.log_data.keys()), ['AX08'])
self.assertEqual(list(self.log_data['AX08'].keys()), ['LOG'])
self.assertEqual(len(self.log_data['AX08']['LOG']), 16)
self.assertEqual(
self.log_data['AX08']['LOG'][0][:100],
'\n\nSTATE OF HEALTH: From:1625456260.12 To:1625456260.12\n\r'
'\nQuanterra Packet Baler Model 14 Restart. V'
)
self.assertEqual(
self.log_data['AX08']['LOG'][1][:100],
'\n\nSTATE OF HEALTH: From:1625456366.62 To:1625456366.62'
'\nReducing Status Polling Interval\r\n[2021-07-0'
)
def test_read_blockettes_info(self):
args = {
'file_path': blockettes_files,
'is_multiplex': True,
'req_soh_chans': ['ACE'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.log_data.keys()), ['3203'])
self.assertEqual(list(self.log_data['3203'].keys()), ['ACE'])
self.assertEqual(len(self.log_data['3203']['ACE']), 1)
self.assertEqual(
self.log_data['3203']['ACE'][0][:100],
'\n\nSTATE OF HEALTH: From:1671729287.00014 To:1671729287.0'
'\n===========\nVCO correction: 53.7109375\nTim'
)
def test_not_is_multiplex_read_channel(self):
# is_multiplex = False => stop when reach to channel not match req
# so the channel 'EL1' is read but not finished
args = {
'file_path': multiplex_file,
'is_multiplex': False,
'req_wf_chans': ['EL1'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.waveform_data.keys()), ['3203'])
self.assertEqual(list(self.waveform_data['3203'].keys()), ['EL1'])
self.assertEqual(self.waveform_data['3203']['EL1']['samplerate'], 200)
self.assertEqual(self.waveform_data['3203']['EL1']['startTmEpoch'],
1671730004.145029)
self.assertEqual(self.waveform_data['3203']['EL1']['endTmEpoch'],
1671730013.805)
self.assertEqual(self.waveform_data['3203']['EL1']['size'], 1932)
self.assertEqual(self.waveform_data['3203']['EL1']['gaps'], [])
self.assertEqual(len(self.waveform_data['3203']['EL1']['tracesInfo']),
1)
def test_is_multiplex_read_channel(self):
# is_multiplex = True => read every record
args = {
'file_path': multiplex_file,
'is_multiplex': True,
'req_wf_chans': ['EL1'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.waveform_data.keys()), ['3203'])
self.assertEqual(list(self.waveform_data['3203'].keys()), ['EL1'])
self.assertEqual(self.waveform_data['3203']['EL1']['samplerate'], 200)
self.assertEqual(self.waveform_data['3203']['EL1']['startTmEpoch'],
1671730004.145029)
self.assertEqual(self.waveform_data['3203']['EL1']['endTmEpoch'],
1671730720.4348998)
self.assertEqual(self.waveform_data['3203']['EL1']['size'], 143258)
self.assertEqual(self.waveform_data['3203']['EL1']['gaps'], [])
self.assertEqual(len(self.waveform_data['3203']['EL1']['tracesInfo']),
1)
def test_not_is_multiplex_selected_channel_in_middle(self):
# won't reached selected channel because previous record doesn't meet
# requirement when is_multiplex = False
args = {
'file_path': multiplex_file,
'is_multiplex': False,
'req_wf_chans': ['EL2'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.waveform_data.keys()), [])
def test_is_multiplex_selected_channel_in_middle(self):
# is_multiplex = True => the selected channel will be read
args = {
'file_path': multiplex_file,
'is_multiplex': True,
'req_wf_chans': ['EL2'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.waveform_data.keys()), ['3203'])
self.assertEqual(list(self.waveform_data['3203'].keys()), ['EL2'])
self.assertEqual(self.waveform_data['3203']['EL2']['samplerate'], 200)
self.assertEqual(self.waveform_data['3203']['EL2']['startTmEpoch'],
1671730004.3100293)
self.assertEqual(self.waveform_data['3203']['EL2']['endTmEpoch'],
1671730720.5549)
self.assertEqual(self.waveform_data['3203']['EL2']['size'], 143249)
self.assertEqual(self.waveform_data['3203']['EL2']['gaps'], [])
self.assertEqual(len(self.waveform_data['3203']['EL2']['tracesInfo']),
1)
def test_existing_time_range(self):
# check if data_time is from the given range, end time may get
# a little greater than read_end according to record's end time
args = {
'file_path': soh_file,
'is_multiplex': False,
'req_soh_chans': ['VKI'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data,
'read_start': 1625456018.0,
'read_end': 1625505627.9998999
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.soh_data['AX08'].keys()), ['VKI'])
self.assertEqual(self.soh_data['AX08']['VKI']['startTmEpoch'],
1625446018.0)
self.assertEqual(self.soh_data['AX08']['VKI']['endTmEpoch'],
1625510338.0)
def test_non_existing_time_range(self):
# if given time range out of the data time, no station will be created
args = {
'file_path': soh_file,
'is_multiplex': False,
'req_soh_chans': ['VKI'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data,
'read_start': 1625356018.0,
'read_end': 1625405627.9998999
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(self.soh_data, {})
self.assertEqual(self.mass_pos_data, {})
self.assertEqual(self.waveform_data, {})
def test_read_waveform(self):
args = {
'file_path': waveform_file,
'is_multiplex': False,
'req_wf_chans': ['LHE'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.waveform_data.keys()), ['AX08'])
self.assertEqual(list(self.waveform_data['AX08'].keys()), ['LHE'])
self.assertEqual(self.waveform_data['AX08']['LHE']['samplerate'], 1)
self.assertEqual(self.waveform_data['AX08']['LHE']['startTmEpoch'],
1625445156.000001)
self.assertEqual(self.waveform_data['AX08']['LHE']['endTmEpoch'],
1625532950.0)
self.assertEqual(self.waveform_data['AX08']['LHE']['size'], 87794)
self.assertEqual(self.waveform_data['AX08']['LHE']['gaps'], [])
self.assertEqual(len(self.waveform_data['AX08']['LHE']['tracesInfo']),
1)
def test_read_mass_pos_channel(self):
# mass position channels will be read if one or both include_mpxxxxxx
# are True
args = {
'file_path': mass_pos_file,
'is_multiplex': False,
'include_mp123zne': True,
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.mass_pos_data.keys()), ['AX08'])
self.assertEqual(list(self.mass_pos_data['AX08'].keys()), ['VM1'])
self.assertEqual(self.mass_pos_data['AX08']['VM1']['samplerate'], 0.1)
self.assertEqual(self.mass_pos_data['AX08']['VM1']['startTmEpoch'],
1625444970.0)
self.assertEqual(self.mass_pos_data['AX08']['VM1']['endTmEpoch'],
1625574580.0)
self.assertEqual(self.mass_pos_data['AX08']['VM1']['size'], 12961)
self.assertEqual(self.mass_pos_data['AX08']['VM1']['gaps'], [])
self.assertEqual(len(self.mass_pos_data['AX08']['VM1']['tracesInfo']),
1)
def test_gap(self):
# gaps will be detected when gap_minimum is set
args = {
'file_path': gap_file,
'is_multiplex': True,
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data,
'gap_minimum': 60
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.soh_data.keys()), ['3734'])
self.assertEqual(sorted(list(self.soh_data['3734'].keys())),
['EX1', 'EX2', 'EX3', 'GAN', 'GEL', 'GLA', 'GLO',
'GNS', 'GPL', 'GST', 'LCE', 'LCQ', 'VCO', 'VDT',
'VEC', 'VEI', 'VPB'])
self.assertAlmostEqual(self.soh_data['3734']['EX1']['samplerate'],
0.0166, 3)
self.assertEqual(self.soh_data['3734']['EX1']['startTmEpoch'],
1534512840.0)
self.assertEqual(self.soh_data['3734']['EX1']['endTmEpoch'],
1534550400.0)
self.assertEqual(self.soh_data['3734']['EX1']['size'], 597)
self.assertEqual(self.soh_data['3734']['EX1']['gaps'],
[[1534522200.0, 1534523940.0]])
def test_not_detect_gap(self):
# if gap_minimum isn't set but gap exist, data still be separated, but
# gap won't be added to gap list
args = {
'file_path': gap_file,
'is_multiplex': True,
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data,
'gap_minimum': None
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.soh_data.keys()), ['3734'])
self.assertEqual(sorted(list(self.soh_data['3734'].keys())),
['EX1', 'EX2', 'EX3', 'GAN', 'GEL', 'GLA', 'GLO',
'GNS', 'GPL', 'GST', 'LCE', 'LCQ', 'VCO', 'VDT',
'VEC', 'VEI', 'VPB'])
self.assertAlmostEqual(self.soh_data['3734']['EX1']['samplerate'],
0.0166, 3)
self.assertEqual(self.soh_data['3734']['EX1']['startTmEpoch'],
1534512840.0)
self.assertEqual(self.soh_data['3734']['EX1']['endTmEpoch'],
1534550400.0)
self.assertEqual(self.soh_data['3734']['EX1']['size'], 597)
self.assertEqual(self.soh_data['3734']['EX1']['gaps'], []) # no gaps
......@@ -22,7 +22,7 @@ rt130_dir = TEST_DATA_DIR.joinpath('RT130-sample/2017149.92EB/2017150')
q330_dir = TEST_DATA_DIR.joinpath('Q330-sample/day_vols_AX08')
centaur_dir = TEST_DATA_DIR.joinpath('Centaur-sample/SOH')
pegasus_dir = TEST_DATA_DIR.joinpath('Pegasus-sample/Pegasus_SVC4/soh')
mix_traces_dir = TEST_DATA_DIR.joinpath('Q330_mixed_traces')
multiplex_dir = TEST_DATA_DIR.joinpath('Q330_multiplex')
class TestLoadDataAndReadChannels(TestCase):
......@@ -212,21 +212,21 @@ class TestLoadDataAndReadChannels(TestCase):
self.assertListEqual(ret[2], pegasus_wf_channels)
self.assertListEqual(ret[3], pegasus_spr_gt_1)
mix_traces_soh_channels = ['LOG']
mix_traces_mass_pos_channels = []
mix_traces_wf_channels = sorted(
multiplex_soh_channels = ['LOG']
multiplex_mass_pos_channels = []
multiplex_wf_channels = sorted(
['BH1', 'BH2', 'BH3', 'BH4', 'BH5', 'BH6',
'EL1', 'EL2', 'EL4', 'EL5', 'EL6', 'ELZ'])
mix_traces_spr_gt_1 = sorted(
multiplex_spr_gt_1 = sorted(
['BS1', 'BS2', 'BS3', 'BS4', 'BS5', 'BS6',
'ES1', 'ES2', 'ES3', 'ES4', 'ES5', 'ES6',
'LS1', 'LS2', 'LS3', 'LS4', 'LS5', 'LS6',
'SS1', 'SS2', 'SS3', 'SS4', 'SS5', 'SS6'])
ret = read_mseed_channels(self.widget_stub, [mix_traces_dir], True)
self.assertListEqual(ret[0], mix_traces_soh_channels)
self.assertListEqual(ret[1], mix_traces_mass_pos_channels)
self.assertListEqual(ret[2], mix_traces_wf_channels)
self.assertListEqual(ret[3], mix_traces_spr_gt_1)
ret = read_mseed_channels(self.widget_stub, [multiplex_dir], True)
self.assertListEqual(ret[0], multiplex_soh_channels)
self.assertListEqual(ret[1], multiplex_mass_pos_channels)
self.assertListEqual(ret[2], multiplex_wf_channels)
self.assertListEqual(ret[3], multiplex_spr_gt_1)
def test_read_channels_rt130_dir(self):
"""
......
......@@ -2,8 +2,7 @@ import unittest
from sohstationviewer.database.extract_data import (
get_chan_plot_info,
get_wf_plot_info,
get_chan_label,
get_seismic_chan_label,
get_signature_channels,
get_color_def,
get_color_ranges,
......@@ -11,7 +10,7 @@ from sohstationviewer.database.extract_data import (
class TestExtractData(unittest.TestCase):
def test_get_chan_plot_info_good_channel_and_data_type(self):
def test_get_chan_plot_info_good_soh_channel_and_data_type(self):
"""
Test basic functionality of get_chan_plot_info - channel and data type
combination exists in database table `Channels`
......@@ -25,9 +24,62 @@ class TestExtractData(unittest.TestCase):
'label': 'SOH/Data Def',
'fixPoint': 0,
'valueColors': '0:W|1:C'}
self.assertDictEqual(
get_chan_plot_info('SOH/Data Def', {'samplerate': 10}, 'RT130'),
expected_result)
self.assertDictEqual(get_chan_plot_info('SOH/Data Def', 'RT130'),
expected_result)
def test_get_chan_plot_info_masspos_channel(self):
with self.subTest("Mass position 'VM'"):
expected_result = {'channel': 'VM1',
'plotType': 'linesMasspos',
'height': 4,
'unit': 'V',
'linkedChan': None,
'convertFactor': 0.1,
'label': 'VM1-MassPos',
'fixPoint': 1,
'valueColors': None}
self.assertDictEqual(get_chan_plot_info('VM1', 'Q330'),
expected_result)
with self.subTest("Mass position 'MassPos'"):
expected_result = {'channel': 'MassPos1',
'plotType': 'linesMasspos',
'height': 4,
'unit': 'V',
'linkedChan': None,
'convertFactor': 1,
'label': 'MassPos1',
'fixPoint': 1,
'valueColors': None}
self.assertDictEqual(get_chan_plot_info('MassPos1', 'RT130'),
expected_result)
def test_get_chan_plot_info_seismic_channel(self):
with self.subTest("RT130 Seismic"):
expected_result = {'channel': 'DS2',
'plotType': 'linesSRate',
'height': 4,
'unit': '',
'linkedChan': None,
'convertFactor': 1,
'label': 'DS2',
'fixPoint': 0,
'valueColors': None}
self.assertDictEqual(get_chan_plot_info('DS2', 'RT130'),
expected_result)
with self.subTest("MSeed Seismic"):
expected_result = {'channel': 'LHE',
'plotType': 'linesSRate',
'height': 4,
'unit': '',
'linkedChan': None,
'convertFactor': 1,
'label': 'LHE-EW',
'fixPoint': 0,
'valueColors': None}
self.assertDictEqual(get_chan_plot_info('LHE', 'Q330'),
expected_result)
def test_get_chan_plot_info_data_type_is_unknown(self):
"""
......@@ -44,10 +96,8 @@ class TestExtractData(unittest.TestCase):
'label': 'DEFAULT-Bad Channel ID',
'fixPoint': 0,
'valueColors': None}
self.assertDictEqual(
get_chan_plot_info('Bad Channel ID',
{'samplerate': 10}, 'Unknown'),
expected_result)
self.assertDictEqual(get_chan_plot_info('Bad Channel ID', 'Unknown'),
expected_result)
# Channel exist in database
expected_result = {'channel': 'LCE',
......@@ -59,12 +109,8 @@ class TestExtractData(unittest.TestCase):
'label': 'LCE-PhaseError',
'fixPoint': 0,
'valueColors': 'L:W|D:Y'}
self.assertDictEqual(
get_chan_plot_info('LCE', {'samplerate': 10}, 'Unknown'),
expected_result)
self.assertDictEqual(
get_chan_plot_info('LCE', {'samplerate': 10}, 'Unknown'),
expected_result)
self.assertDictEqual(get_chan_plot_info('LCE', 'Unknown'),
expected_result)
def test_get_chan_plot_info_bad_channel_or_data_type(self):
"""
......@@ -86,69 +132,54 @@ class TestExtractData(unittest.TestCase):
# Data type has None value. None value comes from
# controller.processing.detect_data_type.
expected_result['label'] = 'DEFAULT-SOH/Data Def'
self.assertDictEqual(
get_chan_plot_info('SOH/Data Def', {'samplerate': 10}, None),
expected_result)
self.assertDictEqual(get_chan_plot_info('SOH/Data Def', None),
expected_result)
# Channel and data type are empty strings
expected_result['label'] = 'DEFAULT-'
self.assertDictEqual(
get_chan_plot_info('', {'samplerate': 10}, ''),
expected_result)
self.assertDictEqual(get_chan_plot_info('', ''),
expected_result)
# Channel exists in database but data type does not
expected_result['label'] = 'DEFAULT-SOH/Data Def'
self.assertDictEqual(
get_chan_plot_info('SOH/Data Def',
{'samplerate': 10}, 'Bad Data Type'),
get_chan_plot_info('SOH/Data Def', 'Bad Data Type'),
expected_result
)
# Data type exists in database but channel does not
expected_result['label'] = 'DEFAULT-Bad Channel ID'
self.assertDictEqual(
get_chan_plot_info('Bad Channel ID',
{'samplerate': 10}, 'RT130'),
expected_result)
self.assertDictEqual(get_chan_plot_info('Bad Channel ID', 'RT130'),
expected_result)
# Both channel and data type exists in database but not their
# combination
expected_result['label'] = 'DEFAULT-SOH/Data Def'
self.assertDictEqual(
get_chan_plot_info('SOH/Data Def', {'samplerate': 10}, 'Q330'),
expected_result)
def test_get_wf_plot_info(self):
"""
Test basic functionality of get_wf_plot_info - ensures returned
dictionary contains all the needed key. Bad channel IDs cases are
handled in tests for get_chan_label.
"""
result = get_wf_plot_info('CH1')
expected_keys = {'param', 'plotType', 'valueColors', 'height',
'label', 'unit', 'channel', 'convertFactor'}
self.assertSetEqual(set(result.keys()), expected_keys)
self.assertDictEqual(get_chan_plot_info('SOH/Data Def', 'Q330'),
expected_result)
def test_get_chan_label_good_channel_id(self):
def test_get_seismic_chan_label_good_channel_id(self):
"""
Test basic functionality of get_chan_label - channel ID ends in one
of the keys in conf.dbSettings.dbConf['seisLabel'] or starts with 'DS'
Test basic functionality of get_seismic_chan_label - channel ID ends
in one of the keys in conf.dbSettings.dbConf['seisLabel'] or
starts with 'DS'
"""
# Channel ID does not start with 'DS'
self.assertEqual(get_chan_label('CH1'), 'CH1-NS')
self.assertEqual(get_chan_label('CH2'), 'CH2-EW')
self.assertEqual(get_chan_label('CHG'), 'CHG')
self.assertEqual(get_seismic_chan_label('CH1'), 'CH1-NS')
self.assertEqual(get_seismic_chan_label('CH2'), 'CH2-EW')
self.assertEqual(get_seismic_chan_label('CHG'), 'CHG')
# Channel ID starts with 'DS'
self.assertEqual(get_chan_label('DS-TEST-CHANNEL'), 'DS-TEST-CHANNEL')
self.assertEqual(get_seismic_chan_label('DS-TEST-CHANNEL'),
'DS-TEST-CHANNEL')
def test_get_chan_label_bad_channel_id(self):
"""
Test basic functionality of get_chan_label - channel ID does not end in
one of the keys in conf.dbSettings.dbConf['seisLabel'] or is the empty
string.
Test basic functionality of get_seismic_chan_label - channel ID does
not end in one of the keys in conf.dbSettings.dbConf['seisLabel']
or is the empty string.
"""
self.assertRaises(IndexError, get_chan_label, '')
self.assertRaises(IndexError, get_seismic_chan_label, '')
def test_get_signature_channels(self):
"""Test basic functionality of get_signature_channels"""
......