Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
Commits on Source (4)
......@@ -14,16 +14,16 @@ from PySide2.QtGui import QCursor
from PySide2.QtWidgets import QTextBrowser, QApplication
from obspy.io import reftek
from sohstationviewer.model.mseed_data.mseed_reader import \
move_to_next_record
from sohstationviewer.model.mseed_data.record_reader import RecordReader \
as MSeedRecordReader
from sohstationviewer.model.mseed_data.record_reader_helper import \
MSeedReadError
from sohstationviewer.model.mseed_data.mseed_reader import \
move_to_next_record
from sohstationviewer.database.extract_data import get_signature_channels
from sohstationviewer.model.handling_data import (
read_mseed_chanids_from_headers)
check_chan)
from sohstationviewer.controller.util import (
validate_file, display_tracking_info
......@@ -32,7 +32,61 @@ from sohstationviewer.controller.util import (
from sohstationviewer.view.util.enums import LogType
def _read_mseed_chanids(path2file: Path, is_multiplex) \
-> Tuple[Set[str], Set[str], Set[str], Set[str]]:
"""
from the given file get set of channel ids for soh, mass position,
waveform, soh with sample rate greater than 1 which are
calibration signals .
:param path2file: path to file
:param is_multiplex: flag that tell if data has more than one channel in
a file
:return soh_chan_ids: list of chan_ids for SOH
:return mass_pos_chan_ids: list of chan_ids for mass position
:return wf_chan_ids: list of chan_ids for waveform
:return spr_gr_1_chan_ids: list of chan_ids for SOH with sample rate > 1
"""
soh_chan_ids = set()
mass_pos_chan_ids = set()
wf_chan_ids = set()
spr_gr_1_chan_ids = set()
file = open(path2file, 'rb')
while 1:
is_eof = (file.read(1) == b'')
if is_eof:
break
file.seek(-1, 1)
current_record_start = file.tell()
try:
record = MSeedRecordReader(file)
except MSeedReadError:
file.close()
raise Exception(f"File{path2file}: Data type isn't MSeed.")
chan_id = record.record_metadata.channel
chan_type = check_chan(chan_id, [], ['*'], True, True)
if chan_type is False:
if not is_multiplex:
break
continue
if chan_type == 'MP':
mass_pos_chan_ids.add(chan_id)
elif chan_type == 'SOH':
sample_rate = record.record_metadata.sample_rate
if sample_rate <= 1:
soh_chan_ids.add(chan_id)
else:
spr_gr_1_chan_ids.add(chan_id)
else:
wf_chan_ids.add(chan_id)
if not is_multiplex:
break
move_to_next_record(file, current_record_start, record)
file.close()
return soh_chan_ids, mass_pos_chan_ids, wf_chan_ids, spr_gr_1_chan_ids
def read_mseed_channels(tracking_box: QTextBrowser, list_of_dir: List[str],
is_multiplex: bool,
on_unittest: bool = False
) -> Set[str]:
"""
......@@ -43,6 +97,7 @@ def read_mseed_channels(tracking_box: QTextBrowser, list_of_dir: List[str],
include_mp123zne and include_mp456uvw for MSeed are False
:param tracking_box: widget to display tracking info
:param list_of_dir: list of directories selected by users
:param is_multiplex: flag that tell if data is multiplex
:param on_unittest: flag to avoid cursor code to display tracking_info
:return data_object.channels: set of channels present in listofDir
"""
......@@ -70,14 +125,30 @@ def read_mseed_channels(tracking_box: QTextBrowser, list_of_dir: List[str],
count += 1
if count % 10 == 0:
display_tracking_info(
tracking_box, f'Read {count} file headers/ SOH files',
tracking_box, f'Read {count} file headers',
LogType.INFO)
ret = read_mseed_chanids_from_headers(path2file, file_name)
soh_chan_ids.update(ret[0])
mass_pos_chan_ids.update(ret[1])
wf_chan_ids.update(ret[2])
spr_gr_1_chan_ids.update(ret[3])
if reftek.core._is_reftek130(path2file):
# if data_type is RT130, this function shouldn't be called
display_tracking_info(
tracking_box, "There is something wrong with the "
"work-flow. read_mseed_channels shouldn't be fed an "
f"RT130 file: {path2file}",
LogType.ERROR
)
return [], [], [], []
try:
# skip when data type isn't mseed
ret = _read_mseed_chanids(path2file, is_multiplex)
soh_chan_ids.update(ret[0])
mass_pos_chan_ids.update(ret[1])
wf_chan_ids.update(ret[2])
spr_gr_1_chan_ids.update(ret[3])
except Exception:
display_tracking_info(
tracking_box, f'Skip non-mseed file: {file_name}',
LogType.WARNING
)
pass
if not on_unittest:
QApplication.restoreOverrideCursor()
return sorted(list(soh_chan_ids)), sorted(list(mass_pos_chan_ids)), \
......
......@@ -318,20 +318,6 @@ class GeneralData():
execute_db(f'UPDATE PersistentData SET FieldValue="{self.tmp_dir}" '
f'WHERE FieldName="tempDataDirectory"')
def check_not_found_soh_channels(self):
# FROM data_type_model.Data_Type_Model.check_not_found_soh_channels
all_chans_meet_req = (
list(self.soh_data[self.selected_key].keys()) +
list(self.mass_pos_data[self.selected_key].keys()) +
list(self.log_data[self.selected_key].keys()))
not_found_chans = [c for c in self.req_soh_chans
if c not in all_chans_meet_req]
if not_found_chans != []:
msg = (f"No data found for the following channels: "
f"{', '.join( not_found_chans)}")
self.processing_log.append((msg, LogType.WARNING))
def sort_all_data(self):
"""
FROM data_type_model.Data_Type_Model.sort_all_data
......@@ -341,51 +327,51 @@ class GeneralData():
because it is created from log data which is sorted in
prepare_soh_data_from_log_data()
"""
sort_data(self.waveform_data[self.selected_key])
sort_data(self.mass_pos_data[self.selected_key])
try:
sort_data(self.soh_data[self.selected_key])
except KeyError:
# Reftek's SOH trace doesn't have startTmEpoch and
# actually soh_data consists of only one trace
pass
for key in self.keys:
sort_data(self.waveform_data[key])
sort_data(self.mass_pos_data[key])
try:
sort_data(self.soh_data[key])
except KeyError:
# Reftek's SOH trace doesn't have startTmEpoch and
# actually soh_data consists of only one trace
pass
def combine_all_data(self):
combine_data(self.selected_key, self.waveform_data, self.gap_minimum)
combine_data(self.selected_key, self.mass_pos_data, self.gap_minimum)
try:
combine_data(self.selected_key, self.soh_data, self.gap_minimum)
except KeyError:
# Reftek's SOH trace doesn't have startTmEpoch and
# actually soh_data consists of only one trace
pass
for key in self.keys:
combine_data(key, self.waveform_data, self.gap_minimum)
combine_data(key, self.mass_pos_data, self.gap_minimum)
try:
combine_data(key, self.soh_data, self.gap_minimum)
except KeyError:
# Reftek's SOH trace doesn't have startTmEpoch and
# actually soh_data consists of only one trace
pass
def retrieve_gaps_from_data_dicts(self):
"""
Getting gaps from each data_dicts then squash all related gaps
"""
self.gaps[self.selected_key] = []
retrieve_gaps_from_data_dict(
self.selected_key, self.soh_data, self.gaps)
retrieve_gaps_from_data_dict(
self.selected_key, self.mass_pos_data, self.gaps)
retrieve_gaps_from_data_dict(
self.selected_key, self.waveform_data, self.gaps)
for key in self.keys:
self.gaps[key] = []
retrieve_gaps_from_data_dict(key, self.soh_data, self.gaps)
retrieve_gaps_from_data_dict(key, self.mass_pos_data, self.gaps)
retrieve_gaps_from_data_dict(key, self.waveform_data, self.gaps)
self.gaps[self.selected_key] = squash_gaps(
self.gaps[self.selected_key])
self.gaps[key] = squash_gaps(self.gaps[key])
def retrieve_data_time_from_data_dicts(self):
"""
Going through each data_dict to update the data_time to be
[min of startTimeEpoch, max of endTimeEpoch] for each station.
"""
retrieve_data_time_from_data_dict(
self.selected_key, self.soh_data, self.data_time)
retrieve_data_time_from_data_dict(
self.selected_key, self.mass_pos_data, self.data_time)
retrieve_data_time_from_data_dict(
self.selected_key, self.waveform_data, self.data_time)
for key in self.keys:
retrieve_data_time_from_data_dict(
key, self.soh_data, self.data_time)
retrieve_data_time_from_data_dict(
key, self.mass_pos_data, self.data_time)
retrieve_data_time_from_data_dict(
key, self.waveform_data, self.data_time)
def fill_empty_data(self):
"""
......@@ -406,12 +392,13 @@ class GeneralData():
Applying convert_factor to avoid using flags to prevent double
applying convert factor when plotting
"""
apply_convert_factor_to_data_dict(
self.selected_key, self.soh_data, self.data_type)
apply_convert_factor_to_data_dict(
self.selected_key, self.mass_pos_data, self.data_type)
apply_convert_factor_to_data_dict(
self.selected_key, self.waveform_data, self.data_type)
for key in self.keys:
apply_convert_factor_to_data_dict(
key, self.soh_data, self.data_type)
apply_convert_factor_to_data_dict(
key, self.mass_pos_data, self.data_type)
apply_convert_factor_to_data_dict(
key, self.waveform_data, self.data_type)
def reset_all_selected_data(self):
"""
......
......@@ -406,7 +406,7 @@ class ChannelPreferDialog(OneWindowAtATimeDialog):
data_type = 'RT130'
else:
try:
data_type = detect_data_type(self.dir_names)
data_type, is_multiplex = detect_data_type(self.dir_names)
except Exception as e:
QtWidgets.QMessageBox.warning(self, "Scan Channels", str(e))
return
......@@ -428,7 +428,7 @@ class ChannelPreferDialog(OneWindowAtATimeDialog):
else:
self.scan_chan_btn.setEnabled(True)
ret = read_mseed_channels(self.tracking_info_text_browser,
self.dir_names)
self.dir_names, is_multiplex)
if ret == ([], [], [], []):
msg = "No data can be read from " + ', '.join(self.dir_names)
return QtWidgets.QMessageBox.warning(self, "No data", msg)
......
......@@ -21,8 +21,8 @@ pegasus_dir = TEST_DATA_DIR.joinpath('Pegasus-sample/Pegasus_SVC4/soh')
multiplex_dir = TEST_DATA_DIR.joinpath('Q330_multiplex')
class TestReadChannels(TestCase):
"""Test suite for load_data and read_mseed_channels."""
class TestReadMSeedChannel(TestCase):
"""Test suite for read_mseed_channels."""
def setUp(self) -> None:
"""Set up test fixtures."""
......@@ -40,61 +40,95 @@ class TestReadChannels(TestCase):
Test basic functionality of load_data - the given directory contains
MSeed data.
"""
q330_soh_channels = sorted(['LOG', 'VKI'])
q330_mass_pos_channels = ['VM1']
q330_wf_channels = ['HHE', 'LHE']
q330_spr_gt_1 = []
ret = read_mseed_channels(self.widget_stub, [q330_dir], True)
self.assertListEqual(ret[0], q330_soh_channels)
self.assertListEqual(ret[1], q330_mass_pos_channels)
self.assertListEqual(ret[2], q330_wf_channels)
self.assertListEqual(ret[3], q330_spr_gt_1)
centaur_soh_channels = sorted(
['VDT', 'EX3', 'GEL', 'VEC', 'EX2', 'LCE', 'EX1', 'GLA', 'LCQ',
'GPL', 'GNS', 'GST', 'VCO', 'GAN', 'GLO', 'VPB', 'VEI'])
centaur_mass_pos_channels = sorted(['VM1', 'VM2', 'VM3'])
centaur_wf_channels = []
centaur_spr_gt_1 = []
ret = read_mseed_channels(self.widget_stub, [centaur_dir], True)
self.assertListEqual(ret[0], centaur_soh_channels)
self.assertListEqual(ret[1], centaur_mass_pos_channels)
self.assertListEqual(ret[2], centaur_wf_channels)
self.assertListEqual(ret[3], centaur_spr_gt_1)
pegasus_soh_channels = sorted(['VDT', 'VE1'])
pegasus_mass_pos_channels = sorted(['VM1'])
pegasus_wf_channels = []
pegasus_spr_gt_1 = []
ret = read_mseed_channels(self.widget_stub, [pegasus_dir], True)
self.assertListEqual(ret[0], pegasus_soh_channels)
self.assertListEqual(ret[1], pegasus_mass_pos_channels)
self.assertListEqual(ret[2], pegasus_wf_channels)
self.assertListEqual(ret[3], pegasus_spr_gt_1)
multiplex_soh_channels = ['LOG']
multiplex_mass_pos_channels = []
multiplex_wf_channels = sorted(
['BH1', 'BH2', 'BH3', 'BH4', 'BH5', 'BH6',
'EL1', 'EL2', 'EL4', 'EL5', 'EL6', 'ELZ'])
multiplex_spr_gt_1 = sorted(
['BS1', 'BS2', 'BS3', 'BS4', 'BS5', 'BS6',
'ES1', 'ES2', 'ES3', 'ES4', 'ES5', 'ES6',
'LS1', 'LS2', 'LS3', 'LS4', 'LS5', 'LS6',
'SS1', 'SS2', 'SS3', 'SS4', 'SS5', 'SS6'])
ret = read_mseed_channels(self.widget_stub, [multiplex_dir], True)
self.assertListEqual(ret[0], multiplex_soh_channels)
self.assertListEqual(ret[1], multiplex_mass_pos_channels)
self.assertListEqual(ret[2], multiplex_wf_channels)
self.assertListEqual(ret[3], multiplex_spr_gt_1)
with self.subTest("q330 - non multiplex"):
q330_soh_channels = sorted(['LOG', 'VKI'])
q330_mass_pos_channels = ['VM1']
q330_wf_channels = ['HHE', 'LHE']
q330_spr_gt_1 = []
ret = read_mseed_channels(self.widget_stub, [q330_dir],
False, True)
self.assertListEqual(ret[0], q330_soh_channels)
self.assertListEqual(ret[1], q330_mass_pos_channels)
self.assertListEqual(ret[2], q330_wf_channels)
self.assertListEqual(ret[3], q330_spr_gt_1)
with self.subTest(("centaur - multiplex - is_multiplex=True")):
centaur_soh_channels = sorted(
['VDT', 'EX3', 'GEL', 'VEC', 'EX2', 'LCE', 'EX1', 'GLA', 'LCQ',
'GPL', 'GNS', 'GST', 'VCO', 'GAN', 'GLO', 'VPB', 'VEI'])
centaur_mass_pos_channels = sorted(['VM1', 'VM2', 'VM3'])
centaur_wf_channels = []
centaur_spr_gt_1 = []
ret = read_mseed_channels(self.widget_stub, [centaur_dir],
True, True)
self.assertListEqual(ret[0], centaur_soh_channels)
self.assertListEqual(ret[1], centaur_mass_pos_channels)
self.assertListEqual(ret[2], centaur_wf_channels)
self.assertListEqual(ret[3], centaur_spr_gt_1)
with self.subTest("centaur - multiplex - is_multiplex=False"):
# not all channels detected if is_multiplex isn't set
centaur_soh_channels = sorted(['GEL'])
centaur_mass_pos_channels = sorted([])
centaur_wf_channels = []
centaur_spr_gt_1 = []
ret = read_mseed_channels(self.widget_stub, [centaur_dir],
False, True)
self.assertListEqual(ret[0], centaur_soh_channels)
self.assertListEqual(ret[1], centaur_mass_pos_channels)
self.assertListEqual(ret[2], centaur_wf_channels)
self.assertListEqual(ret[3], centaur_spr_gt_1)
with self.subTest("pegasus - non multiplex"):
pegasus_soh_channels = sorted(['VDT', 'VE1'])
pegasus_mass_pos_channels = sorted(['VM1'])
pegasus_wf_channels = []
pegasus_spr_gt_1 = []
ret = read_mseed_channels(self.widget_stub, [pegasus_dir],
False, True)
self.assertListEqual(ret[0], pegasus_soh_channels)
self.assertListEqual(ret[1], pegasus_mass_pos_channels)
self.assertListEqual(ret[2], pegasus_wf_channels)
self.assertListEqual(ret[3], pegasus_spr_gt_1)
with self.subTest("q330 - multiplex - is_multiplex=True"):
multiplex_soh_channels = ['LOG']
multiplex_mass_pos_channels = []
multiplex_wf_channels = sorted(
['BH1', 'BH2', 'BH3', 'BH4', 'BH5', 'BH6',
'EL1', 'EL2', 'EL4', 'EL5', 'EL6', 'ELZ'])
multiplex_spr_gt_1 = sorted(
['BS1', 'BS2', 'BS3', 'BS4', 'BS5', 'BS6',
'ES1', 'ES2', 'ES3', 'ES4', 'ES5', 'ES6',
'LS1', 'LS2', 'LS3', 'LS4', 'LS5', 'LS6',
'SS1', 'SS2', 'SS3', 'SS4', 'SS5', 'SS6'])
ret = read_mseed_channels(self.widget_stub, [multiplex_dir],
True, True)
self.assertListEqual(ret[0], multiplex_soh_channels)
self.assertListEqual(ret[1], multiplex_mass_pos_channels)
self.assertListEqual(ret[2], multiplex_wf_channels)
self.assertListEqual(ret[3], multiplex_spr_gt_1)
with self.subTest("q330 - multiplex - is_multiplex=False"):
# not all channels detected if is_multiplex isn't set
multiplex_soh_channels = []
multiplex_mass_pos_channels = []
multiplex_wf_channels = sorted(['EL1'])
multiplex_spr_gt_1 = sorted([])
ret = read_mseed_channels(self.widget_stub, [multiplex_dir],
False, True)
self.assertListEqual(ret[0], multiplex_soh_channels)
self.assertListEqual(ret[1], multiplex_mass_pos_channels)
self.assertListEqual(ret[2], multiplex_wf_channels)
self.assertListEqual(ret[3], multiplex_spr_gt_1)
def test_read_channels_rt130_dir(self):
"""
Test basic functionality of load_data - the given directory contains
RT130 data.
"""
with self.assertRaises(Exception):
read_mseed_channels(self.widget_stub, [rt130_dir], True)
ret = read_mseed_channels(self.widget_stub, [rt130_dir], True, True)
self.assertEqual(ret, ([], [], [], []))
def test_read_mseed_channels_no_dir(self):
"""
......@@ -102,7 +136,7 @@ class TestReadChannels(TestCase):
given.
"""
no_dir = []
ret = read_mseed_channels(self.widget_stub, no_dir, True)
ret = read_mseed_channels(self.widget_stub, no_dir, True, True)
self.assertEqual(ret, ([], [], [], []))
def test_read_mseed_channels_dir_does_not_exist(self):
......@@ -111,11 +145,12 @@ class TestReadChannels(TestCase):
does not exist.
"""
empty_name_dir = ['']
ret = read_mseed_channels(self.widget_stub, empty_name_dir, True)
ret = read_mseed_channels(self.widget_stub, empty_name_dir, True, True)
self.assertEqual(ret, ([], [], [], []))
non_existent_dir = ['non_existent_dir']
ret = read_mseed_channels(self.widget_stub, non_existent_dir, True)
ret = read_mseed_channels(self.widget_stub, non_existent_dir,
True, True)
self.assertEqual(ret, ([], [], [], []))
def test_read_mseed_channels_empty_dir(self):
......@@ -124,7 +159,8 @@ class TestReadChannels(TestCase):
is empty.
"""
with TemporaryDirectory() as empty_dir:
ret = read_mseed_channels(self.widget_stub, [empty_dir], True)
ret = read_mseed_channels(self.widget_stub, [empty_dir],
True, True)
self.assertEqual(ret, ([], [], [], []))
def test_read_mseed_channels_empty_data_dir(self):
......@@ -134,7 +170,8 @@ class TestReadChannels(TestCase):
"""
with TemporaryDirectory() as outer_dir:
with TemporaryDirectory(dir=outer_dir):
ret = read_mseed_channels(self.widget_stub, [outer_dir], True)
ret = read_mseed_channels(self.widget_stub, [outer_dir],
True, True)
self.assertEqual(ret, ([], [], [], []))
......