Skip to content
Snippets Groups Projects
Commit 30fd0ce1 authored by Lan Dam's avatar Lan Dam
Browse files

Change read_mseed_channels to use MSeed's RecordReader and apply is_multiplex

parent c2a79173
No related branches found
No related tags found
1 merge request!140Change read_mseed_channels to use MSeed's RecordReader and apply is_multiplex
...@@ -14,16 +14,16 @@ from PySide2.QtGui import QCursor ...@@ -14,16 +14,16 @@ from PySide2.QtGui import QCursor
from PySide2.QtWidgets import QTextBrowser, QApplication from PySide2.QtWidgets import QTextBrowser, QApplication
from obspy.io import reftek from obspy.io import reftek
from sohstationviewer.model.mseed_data.mseed_reader import \
move_to_next_record
from sohstationviewer.model.mseed_data.record_reader import RecordReader \ from sohstationviewer.model.mseed_data.record_reader import RecordReader \
as MSeedRecordReader as MSeedRecordReader
from sohstationviewer.model.mseed_data.record_reader_helper import \ from sohstationviewer.model.mseed_data.record_reader_helper import \
MSeedReadError MSeedReadError
from sohstationviewer.model.mseed_data.mseed_reader import \
move_to_next_record
from sohstationviewer.database.extract_data import get_signature_channels from sohstationviewer.database.extract_data import get_signature_channels
from sohstationviewer.model.handling_data import ( from sohstationviewer.model.handling_data import (
read_mseed_chanids_from_headers) check_chan)
from sohstationviewer.controller.util import ( from sohstationviewer.controller.util import (
validate_file, display_tracking_info validate_file, display_tracking_info
...@@ -32,7 +32,61 @@ from sohstationviewer.controller.util import ( ...@@ -32,7 +32,61 @@ from sohstationviewer.controller.util import (
from sohstationviewer.view.util.enums import LogType from sohstationviewer.view.util.enums import LogType
def _read_mseed_chanids(path2file: Path, is_multiplex) \
-> Tuple[Set[str], Set[str], Set[str], Set[str]]:
"""
from the given file get set of channel ids for soh, mass position,
waveform, soh with sample rate greater than 1 which are
calibration signals .
:param path2file: path to file
:param is_multiplex: flag that tell if data has more than one channel in
a file
:return soh_chan_ids: list of chan_ids for SOH
:return mass_pos_chan_ids: list of chan_ids for mass position
:return wf_chan_ids: list of chan_ids for waveform
:return spr_gr_1_chan_ids: list of chan_ids for SOH with sample rate > 1
"""
soh_chan_ids = set()
mass_pos_chan_ids = set()
wf_chan_ids = set()
spr_gr_1_chan_ids = set()
file = open(path2file, 'rb')
while 1:
is_eof = (file.read(1) == b'')
if is_eof:
break
file.seek(-1, 1)
current_record_start = file.tell()
try:
record = MSeedRecordReader(file)
except MSeedReadError:
file.close()
raise Exception(f"File{path2file}: Data type isn't MSeed.")
chan_id = record.record_metadata.channel
chan_type = check_chan(chan_id, [], ['*'], True, True)
if chan_type is False:
if not is_multiplex:
break
continue
if chan_type == 'MP':
mass_pos_chan_ids.add(chan_id)
elif chan_type == 'SOH':
sample_rate = record.record_metadata.sample_rate
if sample_rate <= 1:
soh_chan_ids.add(chan_id)
else:
spr_gr_1_chan_ids.add(chan_id)
else:
wf_chan_ids.add(chan_id)
if not is_multiplex:
break
move_to_next_record(file, current_record_start, record)
file.close()
return soh_chan_ids, mass_pos_chan_ids, wf_chan_ids, spr_gr_1_chan_ids
def read_mseed_channels(tracking_box: QTextBrowser, list_of_dir: List[str], def read_mseed_channels(tracking_box: QTextBrowser, list_of_dir: List[str],
is_multiplex: bool,
on_unittest: bool = False on_unittest: bool = False
) -> Set[str]: ) -> Set[str]:
""" """
...@@ -43,6 +97,7 @@ def read_mseed_channels(tracking_box: QTextBrowser, list_of_dir: List[str], ...@@ -43,6 +97,7 @@ def read_mseed_channels(tracking_box: QTextBrowser, list_of_dir: List[str],
include_mp123zne and include_mp456uvw for MSeed are False include_mp123zne and include_mp456uvw for MSeed are False
:param tracking_box: widget to display tracking info :param tracking_box: widget to display tracking info
:param list_of_dir: list of directories selected by users :param list_of_dir: list of directories selected by users
:param is_multiplex: flag that tell if data is multiplex
:param on_unittest: flag to avoid cursor code to display tracking_info :param on_unittest: flag to avoid cursor code to display tracking_info
:return data_object.channels: set of channels present in listofDir :return data_object.channels: set of channels present in listofDir
""" """
...@@ -70,14 +125,30 @@ def read_mseed_channels(tracking_box: QTextBrowser, list_of_dir: List[str], ...@@ -70,14 +125,30 @@ def read_mseed_channels(tracking_box: QTextBrowser, list_of_dir: List[str],
count += 1 count += 1
if count % 10 == 0: if count % 10 == 0:
display_tracking_info( display_tracking_info(
tracking_box, f'Read {count} file headers/ SOH files', tracking_box, f'Read {count} file headers',
LogType.INFO) LogType.INFO)
if reftek.core._is_reftek130(path2file):
ret = read_mseed_chanids_from_headers(path2file, file_name) # if data_type is RT130, this function shouldn't be called
soh_chan_ids.update(ret[0]) display_tracking_info(
mass_pos_chan_ids.update(ret[1]) tracking_box, "There is something wrong with the "
wf_chan_ids.update(ret[2]) "work-flow. read_mseed_channels shouldn't be fed an "
spr_gr_1_chan_ids.update(ret[3]) f"RT130 file: {path2file}",
LogType.ERROR
)
return [], [], [], []
try:
# skip when data type isn't mseed
ret = _read_mseed_chanids(path2file, is_multiplex)
soh_chan_ids.update(ret[0])
mass_pos_chan_ids.update(ret[1])
wf_chan_ids.update(ret[2])
spr_gr_1_chan_ids.update(ret[3])
except Exception:
display_tracking_info(
tracking_box, f'Skip non-mseed file: {file_name}',
LogType.WARNING
)
pass
if not on_unittest: if not on_unittest:
QApplication.restoreOverrideCursor() QApplication.restoreOverrideCursor()
return sorted(list(soh_chan_ids)), sorted(list(mass_pos_chan_ids)), \ return sorted(list(soh_chan_ids)), sorted(list(mass_pos_chan_ids)), \
......
...@@ -406,7 +406,7 @@ class ChannelPreferDialog(OneWindowAtATimeDialog): ...@@ -406,7 +406,7 @@ class ChannelPreferDialog(OneWindowAtATimeDialog):
data_type = 'RT130' data_type = 'RT130'
else: else:
try: try:
data_type = detect_data_type(self.dir_names) data_type, is_multiplex = detect_data_type(self.dir_names)
except Exception as e: except Exception as e:
QtWidgets.QMessageBox.warning(self, "Scan Channels", str(e)) QtWidgets.QMessageBox.warning(self, "Scan Channels", str(e))
return return
...@@ -428,7 +428,7 @@ class ChannelPreferDialog(OneWindowAtATimeDialog): ...@@ -428,7 +428,7 @@ class ChannelPreferDialog(OneWindowAtATimeDialog):
else: else:
self.scan_chan_btn.setEnabled(True) self.scan_chan_btn.setEnabled(True)
ret = read_mseed_channels(self.tracking_info_text_browser, ret = read_mseed_channels(self.tracking_info_text_browser,
self.dir_names) self.dir_names, is_multiplex)
if ret == ([], [], [], []): if ret == ([], [], [], []):
msg = "No data can be read from " + ', '.join(self.dir_names) msg = "No data can be read from " + ', '.join(self.dir_names)
return QtWidgets.QMessageBox.warning(self, "No data", msg) return QtWidgets.QMessageBox.warning(self, "No data", msg)
......
...@@ -21,8 +21,8 @@ pegasus_dir = TEST_DATA_DIR.joinpath('Pegasus-sample/Pegasus_SVC4/soh') ...@@ -21,8 +21,8 @@ pegasus_dir = TEST_DATA_DIR.joinpath('Pegasus-sample/Pegasus_SVC4/soh')
multiplex_dir = TEST_DATA_DIR.joinpath('Q330_multiplex') multiplex_dir = TEST_DATA_DIR.joinpath('Q330_multiplex')
class TestReadChannels(TestCase): class TestReadMSeedChannel(TestCase):
"""Test suite for load_data and read_mseed_channels.""" """Test suite for read_mseed_channels."""
def setUp(self) -> None: def setUp(self) -> None:
"""Set up test fixtures.""" """Set up test fixtures."""
...@@ -40,61 +40,95 @@ class TestReadChannels(TestCase): ...@@ -40,61 +40,95 @@ class TestReadChannels(TestCase):
Test basic functionality of load_data - the given directory contains Test basic functionality of load_data - the given directory contains
MSeed data. MSeed data.
""" """
q330_soh_channels = sorted(['LOG', 'VKI']) with self.subTest("q330 - non multiplex"):
q330_mass_pos_channels = ['VM1'] q330_soh_channels = sorted(['LOG', 'VKI'])
q330_wf_channels = ['HHE', 'LHE'] q330_mass_pos_channels = ['VM1']
q330_spr_gt_1 = [] q330_wf_channels = ['HHE', 'LHE']
ret = read_mseed_channels(self.widget_stub, [q330_dir], True) q330_spr_gt_1 = []
self.assertListEqual(ret[0], q330_soh_channels) ret = read_mseed_channels(self.widget_stub, [q330_dir],
self.assertListEqual(ret[1], q330_mass_pos_channels) False, True)
self.assertListEqual(ret[2], q330_wf_channels) self.assertListEqual(ret[0], q330_soh_channels)
self.assertListEqual(ret[3], q330_spr_gt_1) self.assertListEqual(ret[1], q330_mass_pos_channels)
self.assertListEqual(ret[2], q330_wf_channels)
centaur_soh_channels = sorted( self.assertListEqual(ret[3], q330_spr_gt_1)
['VDT', 'EX3', 'GEL', 'VEC', 'EX2', 'LCE', 'EX1', 'GLA', 'LCQ',
'GPL', 'GNS', 'GST', 'VCO', 'GAN', 'GLO', 'VPB', 'VEI']) with self.subTest(("centaur - multiplex - is_multiplex=True")):
centaur_mass_pos_channels = sorted(['VM1', 'VM2', 'VM3']) centaur_soh_channels = sorted(
centaur_wf_channels = [] ['VDT', 'EX3', 'GEL', 'VEC', 'EX2', 'LCE', 'EX1', 'GLA', 'LCQ',
centaur_spr_gt_1 = [] 'GPL', 'GNS', 'GST', 'VCO', 'GAN', 'GLO', 'VPB', 'VEI'])
ret = read_mseed_channels(self.widget_stub, [centaur_dir], True) centaur_mass_pos_channels = sorted(['VM1', 'VM2', 'VM3'])
self.assertListEqual(ret[0], centaur_soh_channels) centaur_wf_channels = []
self.assertListEqual(ret[1], centaur_mass_pos_channels) centaur_spr_gt_1 = []
self.assertListEqual(ret[2], centaur_wf_channels) ret = read_mseed_channels(self.widget_stub, [centaur_dir],
self.assertListEqual(ret[3], centaur_spr_gt_1) True, True)
self.assertListEqual(ret[0], centaur_soh_channels)
pegasus_soh_channels = sorted(['VDT', 'VE1']) self.assertListEqual(ret[1], centaur_mass_pos_channels)
pegasus_mass_pos_channels = sorted(['VM1']) self.assertListEqual(ret[2], centaur_wf_channels)
pegasus_wf_channels = [] self.assertListEqual(ret[3], centaur_spr_gt_1)
pegasus_spr_gt_1 = []
ret = read_mseed_channels(self.widget_stub, [pegasus_dir], True) with self.subTest("centaur - multiplex - is_multiplex=False"):
self.assertListEqual(ret[0], pegasus_soh_channels) # not all channels detected if is_multiplex isn't set
self.assertListEqual(ret[1], pegasus_mass_pos_channels) centaur_soh_channels = sorted(['GEL'])
self.assertListEqual(ret[2], pegasus_wf_channels) centaur_mass_pos_channels = sorted([])
self.assertListEqual(ret[3], pegasus_spr_gt_1) centaur_wf_channels = []
centaur_spr_gt_1 = []
multiplex_soh_channels = ['LOG'] ret = read_mseed_channels(self.widget_stub, [centaur_dir],
multiplex_mass_pos_channels = [] False, True)
multiplex_wf_channels = sorted( self.assertListEqual(ret[0], centaur_soh_channels)
['BH1', 'BH2', 'BH3', 'BH4', 'BH5', 'BH6', self.assertListEqual(ret[1], centaur_mass_pos_channels)
'EL1', 'EL2', 'EL4', 'EL5', 'EL6', 'ELZ']) self.assertListEqual(ret[2], centaur_wf_channels)
multiplex_spr_gt_1 = sorted( self.assertListEqual(ret[3], centaur_spr_gt_1)
['BS1', 'BS2', 'BS3', 'BS4', 'BS5', 'BS6',
'ES1', 'ES2', 'ES3', 'ES4', 'ES5', 'ES6', with self.subTest("pegasus - non multiplex"):
'LS1', 'LS2', 'LS3', 'LS4', 'LS5', 'LS6', pegasus_soh_channels = sorted(['VDT', 'VE1'])
'SS1', 'SS2', 'SS3', 'SS4', 'SS5', 'SS6']) pegasus_mass_pos_channels = sorted(['VM1'])
ret = read_mseed_channels(self.widget_stub, [multiplex_dir], True) pegasus_wf_channels = []
self.assertListEqual(ret[0], multiplex_soh_channels) pegasus_spr_gt_1 = []
self.assertListEqual(ret[1], multiplex_mass_pos_channels) ret = read_mseed_channels(self.widget_stub, [pegasus_dir],
self.assertListEqual(ret[2], multiplex_wf_channels) False, True)
self.assertListEqual(ret[3], multiplex_spr_gt_1) self.assertListEqual(ret[0], pegasus_soh_channels)
self.assertListEqual(ret[1], pegasus_mass_pos_channels)
self.assertListEqual(ret[2], pegasus_wf_channels)
self.assertListEqual(ret[3], pegasus_spr_gt_1)
with self.subTest("q330 - multiplex - is_multiplex=True"):
multiplex_soh_channels = ['LOG']
multiplex_mass_pos_channels = []
multiplex_wf_channels = sorted(
['BH1', 'BH2', 'BH3', 'BH4', 'BH5', 'BH6',
'EL1', 'EL2', 'EL4', 'EL5', 'EL6', 'ELZ'])
multiplex_spr_gt_1 = sorted(
['BS1', 'BS2', 'BS3', 'BS4', 'BS5', 'BS6',
'ES1', 'ES2', 'ES3', 'ES4', 'ES5', 'ES6',
'LS1', 'LS2', 'LS3', 'LS4', 'LS5', 'LS6',
'SS1', 'SS2', 'SS3', 'SS4', 'SS5', 'SS6'])
ret = read_mseed_channels(self.widget_stub, [multiplex_dir],
True, True)
self.assertListEqual(ret[0], multiplex_soh_channels)
self.assertListEqual(ret[1], multiplex_mass_pos_channels)
self.assertListEqual(ret[2], multiplex_wf_channels)
self.assertListEqual(ret[3], multiplex_spr_gt_1)
with self.subTest("q330 - multiplex - is_multiplex=False"):
# not all channels detected if is_multiplex isn't set
multiplex_soh_channels = []
multiplex_mass_pos_channels = []
multiplex_wf_channels = sorted(['EL1'])
multiplex_spr_gt_1 = sorted([])
ret = read_mseed_channels(self.widget_stub, [multiplex_dir],
False, True)
self.assertListEqual(ret[0], multiplex_soh_channels)
self.assertListEqual(ret[1], multiplex_mass_pos_channels)
self.assertListEqual(ret[2], multiplex_wf_channels)
self.assertListEqual(ret[3], multiplex_spr_gt_1)
def test_read_channels_rt130_dir(self): def test_read_channels_rt130_dir(self):
""" """
Test basic functionality of load_data - the given directory contains Test basic functionality of load_data - the given directory contains
RT130 data. RT130 data.
""" """
with self.assertRaises(Exception): ret = read_mseed_channels(self.widget_stub, [rt130_dir], True, True)
read_mseed_channels(self.widget_stub, [rt130_dir], True) self.assertEqual(ret, ([], [], [], []))
def test_read_mseed_channels_no_dir(self): def test_read_mseed_channels_no_dir(self):
""" """
...@@ -102,7 +136,7 @@ class TestReadChannels(TestCase): ...@@ -102,7 +136,7 @@ class TestReadChannels(TestCase):
given. given.
""" """
no_dir = [] no_dir = []
ret = read_mseed_channels(self.widget_stub, no_dir, True) ret = read_mseed_channels(self.widget_stub, no_dir, True, True)
self.assertEqual(ret, ([], [], [], [])) self.assertEqual(ret, ([], [], [], []))
def test_read_mseed_channels_dir_does_not_exist(self): def test_read_mseed_channels_dir_does_not_exist(self):
...@@ -111,11 +145,12 @@ class TestReadChannels(TestCase): ...@@ -111,11 +145,12 @@ class TestReadChannels(TestCase):
does not exist. does not exist.
""" """
empty_name_dir = [''] empty_name_dir = ['']
ret = read_mseed_channels(self.widget_stub, empty_name_dir, True) ret = read_mseed_channels(self.widget_stub, empty_name_dir, True, True)
self.assertEqual(ret, ([], [], [], [])) self.assertEqual(ret, ([], [], [], []))
non_existent_dir = ['non_existent_dir'] non_existent_dir = ['non_existent_dir']
ret = read_mseed_channels(self.widget_stub, non_existent_dir, True) ret = read_mseed_channels(self.widget_stub, non_existent_dir,
True, True)
self.assertEqual(ret, ([], [], [], [])) self.assertEqual(ret, ([], [], [], []))
def test_read_mseed_channels_empty_dir(self): def test_read_mseed_channels_empty_dir(self):
...@@ -124,7 +159,8 @@ class TestReadChannels(TestCase): ...@@ -124,7 +159,8 @@ class TestReadChannels(TestCase):
is empty. is empty.
""" """
with TemporaryDirectory() as empty_dir: with TemporaryDirectory() as empty_dir:
ret = read_mseed_channels(self.widget_stub, [empty_dir], True) ret = read_mseed_channels(self.widget_stub, [empty_dir],
True, True)
self.assertEqual(ret, ([], [], [], [])) self.assertEqual(ret, ([], [], [], []))
def test_read_mseed_channels_empty_data_dir(self): def test_read_mseed_channels_empty_data_dir(self):
...@@ -134,7 +170,8 @@ class TestReadChannels(TestCase): ...@@ -134,7 +170,8 @@ class TestReadChannels(TestCase):
""" """
with TemporaryDirectory() as outer_dir: with TemporaryDirectory() as outer_dir:
with TemporaryDirectory(dir=outer_dir): with TemporaryDirectory(dir=outer_dir):
ret = read_mseed_channels(self.widget_stub, [outer_dir], True) ret = read_mseed_channels(self.widget_stub, [outer_dir],
True, True)
self.assertEqual(ret, ([], [], [], [])) self.assertEqual(ret, ([], [], [], []))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment