Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
......@@ -793,6 +793,8 @@ class UIMainWindow(object):
self.stop_button.clicked.connect(main_window.stop)
self.save_plot_button.clicked.connect(main_window.save_plot)
def read_config(self):
self.config = configparser.ConfigParser()
config_path = Path('sohstationviewer/conf/read_settings.ini')
......
......@@ -96,6 +96,9 @@ def create_table_of_content_file(base_path: Path) -> None:
"this software.\n\n"
"On the left-hand side you will find a list of currently available"
" help topics.\n\n"
"If the links of the Table of Contents are broken, click on Recreate "
"Table of Content <img src='recreate_table_contents.png' height=30 /> "
"to rebuild it.\n\n"
"The home button can be used to return to this page at any time.\n\n"
"# Table of Contents\n\n")
links = ""
......
import numpy as np
from unittest import TestCase
from unittest.mock import patch
from sohstationviewer.model.general_data.general_data_helper import (
_check_related_gaps, squash_gaps, sort_data,
retrieve_data_time_from_data_dict, retrieve_gaps_from_data_dict,
combine_data, apply_convert_factor_to_data_dict
)
class TestCheckRelatedGaps(TestCase):
# FROM test_handling_data_rearrange_data.TestCheckRelatedGaps
@classmethod
def setUpClass(cls) -> None:
cls.checked_indexes = []
def test_minmax1_inside_minmax2(self):
self.assertTrue(
_check_related_gaps(3, 4, 1, 5, 1, self.checked_indexes))
self.assertIn(1, self.checked_indexes)
def test_minmax2_inside_minmax1(self):
self.assertTrue(
_check_related_gaps(1, 5, 3, 4, 2, self.checked_indexes))
self.assertIn(2, self.checked_indexes)
def end_minmax1_overlap_start_minmax(self):
self.assertTrue(
_check_related_gaps(1, 4, 3, 5, 3, self.checked_indexes))
self.assertIn(3, self.checked_indexes)
def end_minmax2_overlap_start_minmax1(self):
self.assertTrue(
_check_related_gaps(3, 5, 1, 4, 4, self.checked_indexes))
self.assertIn(4, self.checked_indexes)
def minmax1_less_than_minmax2(self):
self.assertFalse(
_check_related_gaps(1, 3, 4, 6, 5, self.checked_indexes))
self.assertNotIn(5, self.checked_indexes, )
def minmax1_greater_than_minmax2(self):
self.assertFalse(
_check_related_gaps(6, 6, 1, 3, 5, self.checked_indexes))
self.assertEqual(5, self.checked_indexes)
class TestSquashGaps(TestCase):
# FROM test_handling_data_rearrange_data.TestSquashGaps
def setUp(self) -> None:
self.normal_gaps = [[4, 7], [4, 6], [5, 6], [3, 7], [5, 8]]
self.overlap_gaps = [[17, 14], [16, 14], [16, 15], [17, 13], [18, 15]]
self.mixed_gaps = []
for i in range(len(self.normal_gaps)):
self.mixed_gaps.append(self.normal_gaps[i])
self.mixed_gaps.append(self.overlap_gaps[i])
def test_normal_gaps(self):
gaps = squash_gaps(self.normal_gaps)
self.assertEqual(gaps, [[3, 8]])
def test_overlap_gaps(self):
gaps = squash_gaps(self.overlap_gaps)
self.assertEqual(gaps, [[18, 13]])
def test_mixed_gaps(self):
gaps = squash_gaps((self.mixed_gaps))
self.assertEqual(gaps, [[3, 8], [18, 13]])
class TestSortData(TestCase):
# FROM test_handling_data_rearrange_data.TestSortData
def setUp(self) -> None:
self.station_data_dict = {
'CH1': {'tracesInfo': [{'startTmEpoch': 7},
{'startTmEpoch': 1},
{'startTmEpoch': 5},
{'startTmEpoch': 3}]},
'CH2': {'tracesInfo': [{'startTmEpoch': 2},
{'startTmEpoch': 8},
{'startTmEpoch': 6},
{'startTmEpoch': 4}]}
}
def test_sort_data(self):
sort_data(self.station_data_dict)
self.assertEqual(
self.station_data_dict,
{'CH1': {'tracesInfo': [{'startTmEpoch': 1}, {'startTmEpoch': 3},
{'startTmEpoch': 5}, {'startTmEpoch': 7}]},
'CH2': {'tracesInfo': [{'startTmEpoch': 2}, {'startTmEpoch': 4},
{'startTmEpoch': 6}, {'startTmEpoch': 8}]}}
)
class TestRetrieveDataTimeFromDataDict(TestCase):
def setUp(self) -> None:
self.data_dict = {
'STA1': {'CH1': {'startTmEpoch': 4, 'endTmEpoch': 6},
'CH2': {'startTmEpoch': 5, 'endTmEpoch': 9}
},
'STA2': {'CH1': {'startTmEpoch': 2, 'endTmEpoch': 4},
'CH2': {'startTmEpoch': 6, 'endTmEpoch': 8}
}
}
self.data_time = {}
self.expected_data_time = {'STA1': [4, 9], 'STA2': [2, 8]}
def test_retrieve_data_time(self):
retrieve_data_time_from_data_dict(self.data_dict, self.data_time)
self.assertEqual(self.data_time,
self.expected_data_time)
class TestRetrieveGapsFromDataDict(TestCase):
def setUp(self) -> None:
self.data_dict = {
'STA1': {'CH1': {'gaps': [[1, 2], [4, 3]]},
'CH2': {'gaps': []}
},
'STA2': {'CH1': {'gaps': [[1, 2], [4, 3], [2, 3]]},
'CH2': {'gaps': [[1, 3], [3, 2]]}
},
}
self.gaps = {}
self.expected_gaps = {'STA1': [[1, 2], [4, 3]],
'STA2': [[1, 2], [4, 3], [2, 3], [1, 3], [3, 2]]}
def test_retrieve_gaps(self):
retrieve_gaps_from_data_dict(self.data_dict, self.gaps)
self.assertEqual(self.gaps,
self.expected_gaps)
class TestCombineData(TestCase):
def test_overlap_lt_gap_minimum(self):
# combine; not add to gap list
station_data_dict = {
'CH1': {
'gaps': [],
'tracesInfo': [
{'startTmEpoch': 5,
'endTmEpoch': 15,
'data': [1, 2, 2, -1],
'times': [5, 8, 11, 15]},
{'startTmEpoch': 13, # delta = 2 < 10
'endTmEpoch': 20,
'data': [1, -2, 1, 1],
'times': [13, 16, 18, 20]}
]}
}
gap_minimum = 10
combine_data(station_data_dict, gap_minimum)
self.assertEqual(station_data_dict['CH1']['gaps'], [])
self.assertEqual(
len(station_data_dict['CH1']['tracesInfo']),
1)
self.assertEqual(
station_data_dict['CH1']['tracesInfo'][0]['startTmEpoch'],
5)
self.assertEqual(
station_data_dict['CH1']['tracesInfo'][0]['endTmEpoch'],
20)
self.assertListEqual(
station_data_dict['CH1']['tracesInfo'][0]['data'].tolist(),
[1, 2, 2, -1, 1, -2, 1, 1])
self.assertListEqual(
station_data_dict['CH1']['tracesInfo'][0]['times'].tolist(),
[5, 8, 11, 15, 13, 16, 18, 20])
def test_overlap_gt_or_equal_gap_minimum(self):
# combine; add to gap list
station_data_dict = {
'CH1': {
'gaps': [],
'tracesInfo': [
{'startTmEpoch': 5,
'endTmEpoch': 15,
'data': [1, 2, 2, -1],
'times': [5, 8, 11, 15]},
{'startTmEpoch': 5, # delta = 10 >= 10
'endTmEpoch': 20,
'data': [1, -2, 1, 1],
'times': [5, 11, 15, 20]}
]}
}
gap_minimum = 10
combine_data(station_data_dict, gap_minimum)
self.assertEqual(station_data_dict['CH1']['gaps'], [[15, 5]])
self.assertEqual(
len(station_data_dict['CH1']['tracesInfo']),
1)
self.assertEqual(
station_data_dict['CH1']['tracesInfo'][0]['startTmEpoch'],
5)
self.assertEqual(
station_data_dict['CH1']['tracesInfo'][0]['endTmEpoch'],
20)
self.assertListEqual(
station_data_dict['CH1']['tracesInfo'][0]['data'].tolist(),
[1, 2, 2, -1, 1, -2, 1, 1])
self.assertListEqual(
station_data_dict['CH1']['tracesInfo'][0]['times'].tolist(),
[5, 8, 11, 15, 5, 11, 15, 20])
def test_lt_gap_minimum(self):
# not combine; not add to gap list
station_data_dict = {
'CH1': {
'gaps': [],
'tracesInfo': [
{'startTmEpoch': 5,
'endTmEpoch': 15,
'data': [1, 2, 2, -1],
'times': [5, 8, 11, 15]},
{'startTmEpoch': 22, # delta = 7 > 6, < 10
'endTmEpoch': 34,
'data': [1, -2, 1, 1],
'times': [22, 26, 30, 34]}
]}
}
gap_minimum = 10
combine_data(station_data_dict, gap_minimum)
self.assertEqual(station_data_dict['CH1']['gaps'], [])
self.assertEqual(
station_data_dict['CH1']['tracesInfo'][0]['startTmEpoch'],
5)
self.assertEqual(
station_data_dict['CH1']['tracesInfo'][0]['endTmEpoch'],
34)
self.assertListEqual(
station_data_dict['CH1']['tracesInfo'][0]['data'].tolist(),
[1, 2, 2, -1, 1, -2, 1, 1])
self.assertListEqual(
station_data_dict['CH1']['tracesInfo'][0]['times'].tolist(),
[5, 8, 11, 15, 22, 26, 30, 34])
def test_gap_gt_or_equal_gap_minimum(self):
# not combine; add to gap list
station_data_dict = {
'CH1': {
'gaps': [],
'tracesInfo': [
{'startTmEpoch': 5,
'endTmEpoch': 15,
'data': [1, 2, 2, -1],
'times': [5, 8, 11, 15]},
{'startTmEpoch': 25, # delta = 10 >= 10
'endTmEpoch': 40,
'data': [1, -2, 1, 1],
'times': [25, 29, 33, 36, 40]}
]}
}
gap_minimum = 10
combine_data(station_data_dict, gap_minimum)
self.assertEqual(station_data_dict['CH1']['gaps'], [[15, 25]])
self.assertEqual(
station_data_dict['CH1']['tracesInfo'][0]['startTmEpoch'],
5)
self.assertEqual(
station_data_dict['CH1']['tracesInfo'][0]['endTmEpoch'],
40)
self.assertListEqual(
station_data_dict['CH1']['tracesInfo'][0]['data'].tolist(),
[1, 2, 2, -1, 1, -2, 1, 1])
self.assertListEqual(
station_data_dict['CH1']['tracesInfo'][0]['times'].tolist(),
[5, 8, 11, 15, 25, 29, 33, 36, 40])
class TestApplyConvertFactorToDataDict(TestCase):
def setUp(self) -> None:
self.data_dict = {
'STA1': {
'CH1': {'tracesInfo': [{'data': np.array([1, 2, 2, -1])}]}
}
}
self.expected_data = [0.1, 0.2, 0.2, -0.1]
@patch('sohstationviewer.model.general_data.general_data_helper.'
'get_convert_factor')
def test_convert_factor(self, mock_get_convert_factor):
mock_get_convert_factor.return_value = 0.1
apply_convert_factor_to_data_dict(self.data_dict, 'Q330')
self.assertEqual(
self.data_dict['STA1']['CH1']['tracesInfo'][0]['data'].tolist(),
self.expected_data)
from unittest import TestCase
from pathlib import Path
from sohstationviewer.model.mseed_data.mseed import MSeed
from sohstationviewer.model.general_data.general_data import \
ProcessingDataError
TEST_DATA_DIR = Path(__file__).resolve().parent.parent.parent.joinpath(
'test_data')
pegasus_data = TEST_DATA_DIR.joinpath("Pegasus-sample")
q330_data = TEST_DATA_DIR.joinpath("Q330-sample")
blockettes_data = TEST_DATA_DIR.joinpath("Q330_unimplemented_ascii_block")
multiplex_data = TEST_DATA_DIR.joinpath("Q330_multiplex")
centaur_data = TEST_DATA_DIR.joinpath("Centaur-sample")
class TestMSeed(TestCase):
def test_path_not_exist(self):
# raise exception when path not exist
args = {
'data_type': 'Q330',
'is_multiplex': False,
'folder': '_',
'on_unittest': True
}
with self.assertRaises(ProcessingDataError) as context:
MSeed(**args)
self.assertEqual(
str(context.exception),
"Path '_' not exist"
)
def test_read_text_only(self):
# There is no station recognized, add text to key 'TEXT' in log_data
args = {
'data_type': 'Pegasus',
'is_multiplex': False,
'folder': pegasus_data,
'req_soh_chans': ['_'],
'on_unittest': True
}
obj = MSeed(**args)
self.assertEqual(list(obj.log_data.keys()), ['TEXT'])
self.assertEqual(len(obj.log_data['TEXT']), 2)
self.assertEqual(
obj.log_data['TEXT'][0][:100],
'\n\n** STATE OF HEALTH: XX.KC01...D.2020.130'
'\n2020-05-09 00:00:09.839 UTC: I(TimingThread): timing unce')
self.assertEqual(
obj.log_data['TEXT'][1][:100],
'\n\n** STATE OF HEALTH: XX.KC01...D.2020.129'
'\n2020-05-08 22:55:45.390 UTC: I(Initializations): Firmware')
def test_read_text_with_soh(self):
# text get station from soh data with TXT as channel to add to log_data
args = {
'data_type': 'Pegasus',
'is_multiplex': False,
'folder': pegasus_data,
'req_soh_chans': ['VE1'],
'on_unittest': True
}
obj = MSeed(**args)
self.assertEqual(list(obj.log_data.keys()), ['TEXT', 'KC01'])
self.assertEqual(len(obj.log_data['TEXT']), 0)
self.assertEqual(list(obj.log_data['KC01'].keys()), ['TXT'])
self.assertEqual(len(obj.log_data['KC01']['TXT']), 2)
self.assertEqual(
obj.log_data['KC01']['TXT'][0][:100],
'\n\n** STATE OF HEALTH: XX.KC01...D.2020.130'
'\n2020-05-09 00:00:09.839 UTC: I(TimingThread): timing unce')
self.assertEqual(
obj.log_data['KC01']['TXT'][1][:100],
'\n\n** STATE OF HEALTH: XX.KC01...D.2020.129'
'\n2020-05-08 22:55:45.390 UTC: I(Initializations): Firmware')
def test_read_text_with_waveform(self):
# text get station from waveform data with TXT as channel to add to
# log_data
args = {
'data_type': 'Pegasus',
'is_multiplex': False,
'folder': pegasus_data,
'req_wf_chans': ['HH1'],
'req_soh_chans': ['_'],
'on_unittest': True
}
obj = MSeed(**args)
self.assertEqual(list(obj.log_data.keys()), ['TEXT', 'KC01'])
self.assertEqual(len(obj.log_data['TEXT']), 0)
self.assertEqual(list(obj.log_data['KC01'].keys()), ['TXT'])
self.assertEqual(len(obj.log_data['KC01']['TXT']), 2)
self.assertEqual(
obj.log_data['KC01']['TXT'][0][:100],
'\n\n** STATE OF HEALTH: XX.KC01...D.2020.130'
'\n2020-05-09 00:00:09.839 UTC: I(TimingThread): timing unce')
self.assertEqual(
obj.log_data['KC01']['TXT'][1][:100],
'\n\n** STATE OF HEALTH: XX.KC01...D.2020.129'
'\n2020-05-08 22:55:45.390 UTC: I(Initializations): Firmware')
def test_read_ascii(self):
# info is text wrapped in mseed format
args = {
'data_type': 'Q330',
'is_multiplex': False,
'folder': q330_data,
'req_soh_chans': ['LOG'],
}
obj = MSeed(**args)
self.assertEqual(list(obj.log_data.keys()), ['TEXT', 'AX08'])
self.assertEqual(list(obj.log_data['AX08'].keys()), ['LOG'])
self.assertEqual(obj.log_data['TEXT'], [])
self.assertEqual(len(obj.log_data['AX08']['LOG']), 16)
self.assertEqual(
obj.log_data['AX08']['LOG'][0][:100],
'\n\nSTATE OF HEALTH: From:1625456260.12 To:1625456260.12\n\r'
'\nQuanterra Packet Baler Model 14 Restart. V'
)
self.assertEqual(
obj.log_data['AX08']['LOG'][1][:100],
'\n\nSTATE OF HEALTH: From:1625456366.62 To:1625456366.62'
'\nReducing Status Polling Interval\r\n[2021-07-0'
)
def test_read_blockettes_info(self):
# info in blockette 500
args = {
'data_type': 'Q330',
'is_multiplex': True,
'folder': blockettes_data,
'req_soh_chans': ['ACE'],
}
obj = MSeed(**args)
self.assertEqual(list(obj.log_data.keys()), ['TEXT', '3203'])
self.assertEqual(list(obj.log_data['3203'].keys()), ['ACE'])
self.assertEqual(obj.log_data['TEXT'], [])
self.assertEqual(len(obj.log_data['3203']['ACE']), 1)
self.assertEqual(
obj.log_data['3203']['ACE'][0][:100],
'\n\nSTATE OF HEALTH: From:1671729287.00014 To:1671729287.0'
'\n===========\nVCO correction: 53.7109375\nTim'
)
def test_not_is_multiplex_read_channel(self):
# is_multiplex = False => stop when reach to channel not match req
# so the channel 'EL1' is read but not finished
args = {
'data_type': 'Q330',
'is_multiplex': False,
'folder': multiplex_data,
'req_soh_chans': [],
'req_wf_chans': ['EL1']
}
obj = MSeed(**args)
self.assertEqual(list(obj.waveform_data.keys()), ['3203'])
self.assertEqual(list(obj.waveform_data['3203'].keys()), ['EL1'])
self.assertEqual(obj.waveform_data['3203']['EL1']['samplerate'], 200)
self.assertEqual(obj.waveform_data['3203']['EL1']['startTmEpoch'],
1671730004.145029)
self.assertEqual(obj.waveform_data['3203']['EL1']['endTmEpoch'],
1671730013.805)
self.assertEqual(obj.waveform_data['3203']['EL1']['size'], 1932)
self.assertEqual(obj.waveform_data['3203']['EL1']['gaps'], [])
self.assertEqual(len(obj.waveform_data['3203']['EL1']['tracesInfo']),
1)
def test_is_multiplex_read_channel(self):
# is_multiplex = True => read every record
args = {
'data_type': 'Q330',
'is_multiplex': True,
'folder': multiplex_data,
'req_soh_chans': [],
'req_wf_chans': ['EL1']
}
obj = MSeed(**args)
self.assertEqual(list(obj.waveform_data.keys()), ['3203'])
self.assertEqual(list(obj.waveform_data['3203'].keys()), ['EL1'])
self.assertEqual(obj.waveform_data['3203']['EL1']['samplerate'], 200)
self.assertEqual(obj.waveform_data['3203']['EL1']['startTmEpoch'],
1671730004.145029)
self.assertEqual(obj.waveform_data['3203']['EL1']['endTmEpoch'],
1671730720.4348998)
self.assertEqual(obj.waveform_data['3203']['EL1']['size'], 143258)
self.assertEqual(obj.waveform_data['3203']['EL1']['gaps'], [])
self.assertEqual(len(obj.waveform_data['3203']['EL1']['tracesInfo']),
1)
def test_not_is_multiplex_selected_channel_in_middle(self):
# won't reached selected channel because previous record doesn't meet
# requirement when is_multiplex = False
args = {
'data_type': 'Q330',
'is_multiplex': False,
'folder': multiplex_data,
'req_soh_chans': [],
'req_wf_chans': ['EL2']
}
obj = MSeed(**args)
self.assertEqual(list(obj.waveform_data.keys()), [])
def test_is_multiplex_selected_channel_in_middle(self):
# is_multiplex = True => the selected channel will be read
args = {
'data_type': 'Q330',
'is_multiplex': True,
'folder': multiplex_data,
'req_soh_chans': [],
'req_wf_chans': ['EL2']
}
obj = MSeed(**args)
self.assertEqual(list(obj.waveform_data.keys()), ['3203'])
self.assertEqual(list(obj.waveform_data['3203'].keys()), ['EL2'])
self.assertEqual(obj.waveform_data['3203']['EL2']['samplerate'], 200)
self.assertEqual(obj.waveform_data['3203']['EL2']['startTmEpoch'],
1671730004.3100293)
self.assertEqual(obj.waveform_data['3203']['EL2']['endTmEpoch'],
1671730720.5549)
self.assertEqual(obj.waveform_data['3203']['EL2']['size'], 143249)
self.assertEqual(obj.waveform_data['3203']['EL2']['gaps'], [])
self.assertEqual(len(obj.waveform_data['3203']['EL2']['tracesInfo']),
1)
def test_existing_time_range(self):
import os
print(os.getcwd())
# check if data_time is from the given range, end time may get
# a little greater than read_end according to record's end time
args = {
'data_type': 'Q330',
'is_multiplex': False,
'folder': q330_data,
'req_soh_chans': [],
'read_start': 1625456018.0,
'read_end': 1625505627.9998999
}
obj = MSeed(**args)
self.assertEqual(obj.keys, ['AX08'])
self.assertEqual(list(obj.soh_data['AX08'].keys()), ['VKI'])
self.assertEqual(list(obj.mass_pos_data['AX08'].keys()), [])
self.assertEqual(list(obj.waveform_data['AX08'].keys()), [])
self.assertEqual(obj.data_time['AX08'], [1625446018.0, 1625510338.0])
def test_non_existing_time_range(self):
# if given time range out of the data time, no station will be created
args = {
'data_type': 'Q330',
'is_multiplex': False,
'folder': q330_data,
'req_soh_chans': [],
'read_start': 1625356018.0,
'read_end': 1625405627.9998999
}
obj = MSeed(**args)
self.assertEqual(obj.keys, [])
self.assertEqual(obj.soh_data, {})
self.assertEqual(obj.mass_pos_data, {})
self.assertEqual(obj.waveform_data, {})
self.assertEqual(obj.data_time, {})
def test_read_waveform(self):
# data from tps similar to waveform but not separated at gaps
args = {
'data_type': 'Q330',
'is_multiplex': False,
'folder': q330_data,
'req_soh_chans': [],
'req_wf_chans': ['LHE']
}
obj = MSeed(**args)
self.assertEqual(list(obj.waveform_data.keys()), ['AX08'])
self.assertEqual(list(obj.waveform_data['AX08'].keys()), ['LHE'])
self.assertEqual(obj.waveform_data['AX08']['LHE']['samplerate'], 1)
self.assertEqual(obj.waveform_data['AX08']['LHE']['startTmEpoch'],
1625445156.000001)
self.assertEqual(obj.waveform_data['AX08']['LHE']['endTmEpoch'],
1625532950.0)
self.assertEqual(obj.waveform_data['AX08']['LHE']['size'], 87794)
self.assertEqual(obj.waveform_data['AX08']['LHE']['gaps'], [])
self.assertEqual(len(obj.waveform_data['AX08']['LHE']['tracesInfo']),
1)
def test_read_mass_pos_channel(self):
# mass position channels will be read if one or both include_mpxxxxxx
# are True
args = {
'data_type': 'Q330',
'is_multiplex': True,
'folder': q330_data,
'req_soh_chans': [],
'req_wf_chans': [],
'include_mp123zne': True
}
obj = MSeed(**args)
self.assertEqual(list(obj.mass_pos_data.keys()), ['AX08'])
self.assertEqual(list(obj.mass_pos_data['AX08'].keys()), ['VM1'])
self.assertEqual(obj.mass_pos_data['AX08']['VM1']['samplerate'], 0.1)
self.assertEqual(obj.mass_pos_data['AX08']['VM1']['startTmEpoch'],
1625444970.0)
self.assertEqual(obj.mass_pos_data['AX08']['VM1']['endTmEpoch'],
1625574580.0)
self.assertEqual(obj.mass_pos_data['AX08']['VM1']['size'], 12961)
self.assertEqual(obj.mass_pos_data['AX08']['VM1']['gaps'], [])
self.assertEqual(len(obj.mass_pos_data['AX08']['VM1']['tracesInfo']),
1)
def test_gap(self):
# gaps will be detected when gap_minimum is set
args = {
'data_type': 'Centaur',
'is_multiplex': True,
'folder': centaur_data,
'req_soh_chans': [],
'gap_minimum': 60
}
obj = MSeed(**args)
self.assertEqual(list(obj.soh_data.keys()), ['3734'])
self.assertEqual(sorted(list(obj.soh_data['3734'].keys())),
['EX1', 'EX2', 'EX3', 'GAN', 'GEL', 'GLA', 'GLO',
'GNS', 'GPL', 'GST', 'LCE', 'LCQ', 'VCO', 'VDT',
'VEC', 'VEI', 'VPB'])
self.assertAlmostEqual(obj.soh_data['3734']['EX1']['samplerate'],
0.0166, 3)
self.assertEqual(obj.soh_data['3734']['EX1']['startTmEpoch'],
1534512840.0)
self.assertEqual(obj.soh_data['3734']['EX1']['endTmEpoch'],
1534550400.0)
self.assertEqual(obj.soh_data['3734']['EX1']['size'], 597)
self.assertEqual(obj.gaps['3734'], [[1534521420.0, 1534524000.0]])
def test_not_detect_gap(self):
# if gap_minimum isn't set but gap exist, data still be separated, but
# gap won't be added to gap list
args = {
'data_type': 'Centaur',
'is_multiplex': True,
'folder': centaur_data,
'req_soh_chans': [],
'gap_minimum': None
}
obj = MSeed(**args)
self.assertEqual(list(obj.soh_data.keys()), ['3734'])
self.assertEqual(sorted(list(obj.soh_data['3734'].keys())),
['EX1', 'EX2', 'EX3', 'GAN', 'GEL', 'GLA', 'GLO',
'GNS', 'GPL', 'GST', 'LCE', 'LCQ', 'VCO', 'VDT',
'VEC', 'VEI', 'VPB'])
self.assertAlmostEqual(obj.soh_data['3734']['EX1']['samplerate'],
0.0166, 3)
self.assertEqual(obj.soh_data['3734']['EX1']['startTmEpoch'],
1534512840.0)
self.assertEqual(obj.soh_data['3734']['EX1']['endTmEpoch'],
1534550400.0)
self.assertEqual(obj.soh_data['3734']['EX1']['size'], 597)
self.assertEqual(obj.gaps['3734'], []) # no gaps
from unittest import TestCase
from pathlib import Path
from sohstationviewer.model.mseed_data.mseed_helper import (
retrieve_nets_from_data_dict, read_text
)
TEST_DATA_DIR = Path(__file__).resolve().parent.parent.parent.joinpath(
'test_data')
text_file = TEST_DATA_DIR.joinpath(
"Pegasus-sample/Pegasus_SVC4/logs/2020/XX/KC01/XX.KC01...D.2020.129")
binary_file = TEST_DATA_DIR.joinpath(
"Pegasus-sample/Pegasus_SVC4/soh/2020/XX/KC01/VDT.D/"
"XX.KC01..VDT.D.2020.129")
class TestReadText(TestCase):
def test_text_file(self):
ret = read_text(text_file)
expected_ret = (
"\n\n** STATE OF HEALTH: XX.KC01...D.2020.129"
"\n2020-05-08 22:55:45.390 UTC: I(Initializations): Firmware")
self.assertEqual(ret[:100], expected_ret
)
def test_binary_file(self):
ret = read_text(binary_file)
self.assertIsNone(ret)
class TestRetrieveNetsFromDataDict(TestCase):
def setUp(self):
self.nets_by_sta = {}
self.data_dict = {
'STA1': {'CHA1': {'nets': {'NET1', 'NET2'}},
'CHA2': {'nets': {'NET2', 'NET3'}}
},
'STA2': {'CHA1': {'nets': {'NET1'}},
'CHA2': {'nets': {'NET1'}}
}
}
def test_retrieve_nets(self):
retrieve_nets_from_data_dict(self.data_dict, self.nets_by_sta)
self.assertEqual(list(self.nets_by_sta.keys()), ['STA1', 'STA2'])
self.assertEqual(sorted(list(self.nets_by_sta['STA1'])),
['NET1', 'NET2', 'NET3'])
self.assertEqual(sorted(list(self.nets_by_sta['STA2'])), ['NET1'])
from unittest import TestCase
from pathlib import Path
from sohstationviewer.model.mseed_data.mseed_reader import MSeedReader
TEST_DATA_DIR = Path(__file__).resolve().parent.parent.parent.joinpath(
'test_data')
ascii_file = TEST_DATA_DIR.joinpath(
"Q330-sample/day_vols_AX08/AX08.XA..LOG.2021.186")
blockettes_files = TEST_DATA_DIR.joinpath(
"Q330_unimplemented_ascii_block/XX-3203_4-20221222190255")
multiplex_file = TEST_DATA_DIR.joinpath(
"Q330_multiplex/XX-3203_4-20221222183011")
soh_file = TEST_DATA_DIR.joinpath(
"Q330-sample/day_vols_AX08/AX08.XA..VKI.2021.186")
waveform_file = TEST_DATA_DIR.joinpath(
"Q330-sample/day_vols_AX08/AX08.XA..LHE.2021.186")
mass_pos_file = TEST_DATA_DIR.joinpath(
"Q330-sample/day_vols_AX08/AX08.XA..VM1.2021.186")
gap_file = TEST_DATA_DIR.joinpath(
"Centaur-sample/SOH/"
"XX.3734.SOH.centaur-3_3734..20180817_000000.miniseed.miniseed")
class TestMSeedReader(TestCase):
def setUp(self) -> None:
self.soh_data = {}
self.mass_pos_data = {}
self.waveform_data = {}
self.log_data = {}
def test_read_ascii(self):
args = {
'file_path': ascii_file,
'is_multiplex': False,
'req_soh_chans': ['LOG'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.log_data.keys()), ['AX08'])
self.assertEqual(list(self.log_data['AX08'].keys()), ['LOG'])
self.assertEqual(len(self.log_data['AX08']['LOG']), 16)
self.assertEqual(
self.log_data['AX08']['LOG'][0][:100],
'\n\nSTATE OF HEALTH: From:1625456260.12 To:1625456260.12\n\r'
'\nQuanterra Packet Baler Model 14 Restart. V'
)
self.assertEqual(
self.log_data['AX08']['LOG'][1][:100],
'\n\nSTATE OF HEALTH: From:1625456366.62 To:1625456366.62'
'\nReducing Status Polling Interval\r\n[2021-07-0'
)
def test_read_blockettes_info(self):
args = {
'file_path': blockettes_files,
'is_multiplex': True,
'req_soh_chans': ['ACE'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.log_data.keys()), ['3203'])
self.assertEqual(list(self.log_data['3203'].keys()), ['ACE'])
self.assertEqual(len(self.log_data['3203']['ACE']), 1)
self.assertEqual(
self.log_data['3203']['ACE'][0][:100],
'\n\nSTATE OF HEALTH: From:1671729287.00014 To:1671729287.0'
'\n===========\nVCO correction: 53.7109375\nTim'
)
def test_not_is_multiplex_read_channel(self):
# is_multiplex = False => stop when reach to channel not match req
# so the channel 'EL1' is read but not finished
args = {
'file_path': multiplex_file,
'is_multiplex': False,
'req_wf_chans': ['EL1'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.waveform_data.keys()), ['3203'])
self.assertEqual(list(self.waveform_data['3203'].keys()), ['EL1'])
self.assertEqual(self.waveform_data['3203']['EL1']['samplerate'], 200)
self.assertEqual(self.waveform_data['3203']['EL1']['startTmEpoch'],
1671730004.145029)
self.assertEqual(self.waveform_data['3203']['EL1']['endTmEpoch'],
1671730013.805)
self.assertEqual(self.waveform_data['3203']['EL1']['size'], 1932)
self.assertEqual(self.waveform_data['3203']['EL1']['gaps'], [])
self.assertEqual(len(self.waveform_data['3203']['EL1']['tracesInfo']),
1)
def test_is_multiplex_read_channel(self):
# is_multiplex = True => read every record
args = {
'file_path': multiplex_file,
'is_multiplex': True,
'req_wf_chans': ['EL1'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.waveform_data.keys()), ['3203'])
self.assertEqual(list(self.waveform_data['3203'].keys()), ['EL1'])
self.assertEqual(self.waveform_data['3203']['EL1']['samplerate'], 200)
self.assertEqual(self.waveform_data['3203']['EL1']['startTmEpoch'],
1671730004.145029)
self.assertEqual(self.waveform_data['3203']['EL1']['endTmEpoch'],
1671730720.4348998)
self.assertEqual(self.waveform_data['3203']['EL1']['size'], 143258)
self.assertEqual(self.waveform_data['3203']['EL1']['gaps'], [])
self.assertEqual(len(self.waveform_data['3203']['EL1']['tracesInfo']),
1)
def test_not_is_multiplex_selected_channel_in_middle(self):
# won't reached selected channel because previous record doesn't meet
# requirement when is_multiplex = False
args = {
'file_path': multiplex_file,
'is_multiplex': False,
'req_wf_chans': ['EL2'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.waveform_data.keys()), [])
def test_is_multiplex_selected_channel_in_middle(self):
# is_multiplex = True => the selected channel will be read
args = {
'file_path': multiplex_file,
'is_multiplex': True,
'req_wf_chans': ['EL2'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.waveform_data.keys()), ['3203'])
self.assertEqual(list(self.waveform_data['3203'].keys()), ['EL2'])
self.assertEqual(self.waveform_data['3203']['EL2']['samplerate'], 200)
self.assertEqual(self.waveform_data['3203']['EL2']['startTmEpoch'],
1671730004.3100293)
self.assertEqual(self.waveform_data['3203']['EL2']['endTmEpoch'],
1671730720.5549)
self.assertEqual(self.waveform_data['3203']['EL2']['size'], 143249)
self.assertEqual(self.waveform_data['3203']['EL2']['gaps'], [])
self.assertEqual(len(self.waveform_data['3203']['EL2']['tracesInfo']),
1)
def test_existing_time_range(self):
# check if data_time is from the given range, end time may get
# a little greater than read_end according to record's end time
args = {
'file_path': soh_file,
'is_multiplex': False,
'req_soh_chans': ['VKI'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data,
'read_start': 1625456018.0,
'read_end': 1625505627.9998999
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.soh_data['AX08'].keys()), ['VKI'])
self.assertEqual(self.soh_data['AX08']['VKI']['startTmEpoch'],
1625446018.0)
self.assertEqual(self.soh_data['AX08']['VKI']['endTmEpoch'],
1625510338.0)
def test_non_existing_time_range(self):
# if given time range out of the data time, no station will be created
args = {
'file_path': soh_file,
'is_multiplex': False,
'req_soh_chans': ['VKI'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data,
'read_start': 1625356018.0,
'read_end': 1625405627.9998999
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(self.soh_data, {})
self.assertEqual(self.mass_pos_data, {})
self.assertEqual(self.waveform_data, {})
def test_read_waveform(self):
args = {
'file_path': waveform_file,
'is_multiplex': False,
'req_wf_chans': ['LHE'],
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.waveform_data.keys()), ['AX08'])
self.assertEqual(list(self.waveform_data['AX08'].keys()), ['LHE'])
self.assertEqual(self.waveform_data['AX08']['LHE']['samplerate'], 1)
self.assertEqual(self.waveform_data['AX08']['LHE']['startTmEpoch'],
1625445156.000001)
self.assertEqual(self.waveform_data['AX08']['LHE']['endTmEpoch'],
1625532950.0)
self.assertEqual(self.waveform_data['AX08']['LHE']['size'], 87794)
self.assertEqual(self.waveform_data['AX08']['LHE']['gaps'], [])
self.assertEqual(len(self.waveform_data['AX08']['LHE']['tracesInfo']),
1)
def test_read_mass_pos_channel(self):
# mass position channels will be read if one or both include_mpxxxxxx
# are True
args = {
'file_path': mass_pos_file,
'is_multiplex': False,
'include_mp123zne': True,
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.mass_pos_data.keys()), ['AX08'])
self.assertEqual(list(self.mass_pos_data['AX08'].keys()), ['VM1'])
self.assertEqual(self.mass_pos_data['AX08']['VM1']['samplerate'], 0.1)
self.assertEqual(self.mass_pos_data['AX08']['VM1']['startTmEpoch'],
1625444970.0)
self.assertEqual(self.mass_pos_data['AX08']['VM1']['endTmEpoch'],
1625574580.0)
self.assertEqual(self.mass_pos_data['AX08']['VM1']['size'], 12961)
self.assertEqual(self.mass_pos_data['AX08']['VM1']['gaps'], [])
self.assertEqual(len(self.mass_pos_data['AX08']['VM1']['tracesInfo']),
1)
def test_gap(self):
# gaps will be detected when gap_minimum is set
args = {
'file_path': gap_file,
'is_multiplex': True,
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data,
'gap_minimum': 60
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.soh_data.keys()), ['3734'])
self.assertEqual(sorted(list(self.soh_data['3734'].keys())),
['EX1', 'EX2', 'EX3', 'GAN', 'GEL', 'GLA', 'GLO',
'GNS', 'GPL', 'GST', 'LCE', 'LCQ', 'VCO', 'VDT',
'VEC', 'VEI', 'VPB'])
self.assertAlmostEqual(self.soh_data['3734']['EX1']['samplerate'],
0.0166, 3)
self.assertEqual(self.soh_data['3734']['EX1']['startTmEpoch'],
1534512840.0)
self.assertEqual(self.soh_data['3734']['EX1']['endTmEpoch'],
1534550400.0)
self.assertEqual(self.soh_data['3734']['EX1']['size'], 597)
self.assertEqual(self.soh_data['3734']['EX1']['gaps'],
[[1534522200.0, 1534523940.0]])
def test_not_detect_gap(self):
# if gap_minimum isn't set but gap exist, data still be separated, but
# gap won't be added to gap list
args = {
'file_path': gap_file,
'is_multiplex': True,
'soh_data': self.soh_data,
'mass_pos_data': self.mass_pos_data,
'waveform_data': self.waveform_data,
'log_data': self.log_data,
'gap_minimum': None
}
reader = MSeedReader(**args)
reader.read()
self.assertEqual(list(self.soh_data.keys()), ['3734'])
self.assertEqual(sorted(list(self.soh_data['3734'].keys())),
['EX1', 'EX2', 'EX3', 'GAN', 'GEL', 'GLA', 'GLO',
'GNS', 'GPL', 'GST', 'LCE', 'LCQ', 'VCO', 'VDT',
'VEC', 'VEI', 'VPB'])
self.assertAlmostEqual(self.soh_data['3734']['EX1']['samplerate'],
0.0166, 3)
self.assertEqual(self.soh_data['3734']['EX1']['startTmEpoch'],
1534512840.0)
self.assertEqual(self.soh_data['3734']['EX1']['endTmEpoch'],
1534550400.0)
self.assertEqual(self.soh_data['3734']['EX1']['size'], 597)
self.assertEqual(self.soh_data['3734']['EX1']['gaps'], []) # no gaps
......@@ -22,7 +22,7 @@ rt130_dir = TEST_DATA_DIR.joinpath('RT130-sample/2017149.92EB/2017150')
q330_dir = TEST_DATA_DIR.joinpath('Q330-sample/day_vols_AX08')
centaur_dir = TEST_DATA_DIR.joinpath('Centaur-sample/SOH')
pegasus_dir = TEST_DATA_DIR.joinpath('Pegasus-sample/Pegasus_SVC4/soh')
mix_traces_dir = TEST_DATA_DIR.joinpath('Q330_mixed_traces')
multiplex_dir = TEST_DATA_DIR.joinpath('Q330_multiplex')
class TestLoadDataAndReadChannels(TestCase):
......@@ -212,21 +212,21 @@ class TestLoadDataAndReadChannels(TestCase):
self.assertListEqual(ret[2], pegasus_wf_channels)
self.assertListEqual(ret[3], pegasus_spr_gt_1)
mix_traces_soh_channels = ['LOG']
mix_traces_mass_pos_channels = []
mix_traces_wf_channels = sorted(
multiplex_soh_channels = ['LOG']
multiplex_mass_pos_channels = []
multiplex_wf_channels = sorted(
['BH1', 'BH2', 'BH3', 'BH4', 'BH5', 'BH6',
'EL1', 'EL2', 'EL4', 'EL5', 'EL6', 'ELZ'])
mix_traces_spr_gt_1 = sorted(
multiplex_spr_gt_1 = sorted(
['BS1', 'BS2', 'BS3', 'BS4', 'BS5', 'BS6',
'ES1', 'ES2', 'ES3', 'ES4', 'ES5', 'ES6',
'LS1', 'LS2', 'LS3', 'LS4', 'LS5', 'LS6',
'SS1', 'SS2', 'SS3', 'SS4', 'SS5', 'SS6'])
ret = read_mseed_channels(self.widget_stub, [mix_traces_dir], True)
self.assertListEqual(ret[0], mix_traces_soh_channels)
self.assertListEqual(ret[1], mix_traces_mass_pos_channels)
self.assertListEqual(ret[2], mix_traces_wf_channels)
self.assertListEqual(ret[3], mix_traces_spr_gt_1)
ret = read_mseed_channels(self.widget_stub, [multiplex_dir], True)
self.assertListEqual(ret[0], multiplex_soh_channels)
self.assertListEqual(ret[1], multiplex_mass_pos_channels)
self.assertListEqual(ret[2], multiplex_wf_channels)
self.assertListEqual(ret[3], multiplex_spr_gt_1)
def test_read_channels_rt130_dir(self):
"""
......