From 4320c56a2252f3847911b23cdaaa8348d686dd95 Mon Sep 17 00:00:00 2001 From: kienle <kienle@passcal.nmt.edu> Date: Wed, 19 Feb 2025 16:55:11 -0700 Subject: [PATCH] Add code to split data for Q8's LFT channel --- sohstationviewer/model/mseed_data/mseed.py | 17 ++- .../model/mseed_data/mseed_helper.py | 44 +++++++- tests/model/mseed_data/test_mseed_helper.py | 105 ++++++++++++++---- 3 files changed, 139 insertions(+), 27 deletions(-) diff --git a/sohstationviewer/model/mseed_data/mseed.py b/sohstationviewer/model/mseed_data/mseed.py index ecb65f28..44aceb60 100644 --- a/sohstationviewer/model/mseed_data/mseed.py +++ b/sohstationviewer/model/mseed_data/mseed.py @@ -12,7 +12,10 @@ from sohstationviewer.model.general_data.general_data import GeneralData from sohstationviewer.model.general_data.general_data_helper import read_text from sohstationviewer.model.mseed_data.mseed_helper import \ - retrieve_nets_from_data_dict, split_vst_channel + ( + retrieve_nets_from_data_dict, split_pegasus_vst_channel, + split_q8_lft_channel, +) from sohstationviewer.model.mseed_data.record_reader_helper import \ MSeedReadError from sohstationviewer.model.mseed_data.mseed_reader import MSeedReader @@ -42,7 +45,9 @@ class MSeed(GeneralData): self.retrieve_nets_from_data_dicts() super().finalize_data() if self.data_type == 'Pegasus': - self.split_vst_channel() + self.split_pegasus_vst_channel() + if self.data_type == 'Q8': + self.split_q8_lft_channel() def read_folders(self) -> None: """ @@ -156,6 +161,10 @@ class MSeed(GeneralData): self.log_data[sta]['TXT'] = [] self.log_data[sta]['TXT'].append(self.log_texts[path2file]) - def split_vst_channel(self): + def split_pegasus_vst_channel(self): for data_set_id in self.data_set_ids: - split_vst_channel(data_set_id, self.soh_data) + split_pegasus_vst_channel(data_set_id, self.soh_data) + + def split_q8_lft_channel(self): + for data_set_id in self.data_set_ids: + split_q8_lft_channel(data_set_id, self.soh_data) diff --git a/sohstationviewer/model/mseed_data/mseed_helper.py b/sohstationviewer/model/mseed_data/mseed_helper.py index 014d68e9..f4d2dea1 100644 --- a/sohstationviewer/model/mseed_data/mseed_helper.py +++ b/sohstationviewer/model/mseed_data/mseed_helper.py @@ -19,7 +19,7 @@ def retrieve_nets_from_data_dict(data_dict: Dict, data_dict[sta_id][c]['nets']) -def split_vst_channel(selected_data_set_id: str, data_dict: Dict): +def split_pegasus_vst_channel(selected_data_set_id: str, data_dict: Dict): """ Each value consists of 4 bit. This function will split each bit into one array that will be the data @@ -49,3 +49,45 @@ def split_vst_channel(selected_data_set_id: str, data_dict: Dict): selected_data_dict[name]['tracesInfo'][0]['data'] = np.array( vst_channels_data[i]) del selected_data_dict['VST'] + + +def split_q8_lft_channel(selected_data_set_id: str, data_dict: Dict): + """ + Each data point in LFT encodes two values, the on-off state and the lock + type of the GPS module. + This function will split the LFT channel into two sub-channels, LFT0 and + LFT1. LFT0 stores the GPS on-off state, while LFT1 stores the GPS lock + type. The original data of the LFT channel is removed from the data. + + :param selected_data_set_id: the id of the selected data set + :param data_dict: data of the selected key + """ + try: + selected_data_dict = data_dict[selected_data_set_id] + except KeyError: + return + if 'LFT' not in selected_data_dict: + return + + # We create a copy of the data first so that we can work directly on it + # instead of having to create a (possibly very big) intermediate array. + selected_data_dict['LFT0'] = deepcopy(selected_data_dict['LFT']) + selected_data_dict['LFT1'] = deepcopy(selected_data_dict['LFT']) + + # The transformation below is based on page 218 of the Q8 Reference Guide, + # Revision 5 (Q8RGâ€R5â€20201207). + # The on and off states are cleanly demarcated at the value 4. + lft_0_data = selected_data_dict['LFT0']['tracesInfo'][0]['data'] + lft_0_data[np.where(lft_0_data <= 4)] = 0 + lft_0_data[np.where(lft_0_data > 4)] = 1 + + # We have three fix types: no fix, unknown fix, and known fix (1D, 2D, 3D + # fix). + lft_1_data = selected_data_dict['LFT1']['tracesInfo'][0]['data'] + # We offset the final data by 20 to solve the problem where the final data + # has overlap with the input data. + lft_1_data[np.isin(lft_1_data, [1, 5])] = 20 + lft_1_data[np.isin(lft_1_data, [0, 6])] = 21 + lft_1_data[np.isin(lft_1_data, [2, 3, 4, 7, 8, 9])] = 22 + lft_1_data -= 20 + del selected_data_dict['LFT'] diff --git a/tests/model/mseed_data/test_mseed_helper.py b/tests/model/mseed_data/test_mseed_helper.py index 0ffca2df..100e8500 100644 --- a/tests/model/mseed_data/test_mseed_helper.py +++ b/tests/model/mseed_data/test_mseed_helper.py @@ -1,7 +1,10 @@ +from unittest import TestCase + import numpy as np from sohstationviewer.model.mseed_data.mseed_helper import ( - retrieve_nets_from_data_dict, split_vst_channel + retrieve_nets_from_data_dict, split_pegasus_vst_channel, + split_q8_lft_channel, ) from tests.base_test_case import BaseTestCase @@ -26,7 +29,7 @@ class TestRetrieveNetsFromDataDict(BaseTestCase): self.assertEqual(sorted(list(self.nets_by_sta['STA2'])), ['NET1']) -class TestSpitVSTChannel(BaseTestCase): +class TestSplitPegasusVSTChannel(BaseTestCase): def test_has_vst(self): data_dict = { 'STA': { @@ -35,7 +38,7 @@ class TestSpitVSTChannel(BaseTestCase): 'time': np.array([1, 2, 3, 4, 5]), 'data': np.array([0, 1, 5, 10, 11])}] }}} - split_vst_channel('STA', data_dict) + split_pegasus_vst_channel('STA', data_dict) self.assertNotIn('VST', data_dict['STA'].keys()) self.assertIn('VST0', data_dict['STA'].keys()) self.assertIn('VST1', data_dict['STA'].keys()) @@ -61,33 +64,24 @@ class TestSpitVSTChannel(BaseTestCase): def test_has_vst_empty(self): data_dict = { 'STA': { - 'VST': { + 'LFT': { 'tracesInfo': [{ 'time': np.array([]), 'data': np.array([])}] }}} - split_vst_channel('STA', data_dict) - self.assertNotIn('VST', data_dict['STA'].keys()) - self.assertIn('VST0', data_dict['STA'].keys()) - self.assertIn('VST1', data_dict['STA'].keys()) - self.assertIn('VST2', data_dict['STA'].keys()) - self.assertIn('VST3', data_dict['STA'].keys()) + split_q8_lft_channel('STA', data_dict) + self.assertNotIn('LFT', data_dict['STA'].keys()) + self.assertIn('LFT0', data_dict['STA'].keys()) + self.assertIn('LFT1', data_dict['STA'].keys()) self.assertListEqual( - data_dict['STA']['VST0']['tracesInfo'][0]['time'].tolist(), - []) - self.assertListEqual( - data_dict['STA']['VST0']['tracesInfo'][0]['data'].tolist(), - []) + data_dict['STA']['LFT0']['tracesInfo'][0]['time'].tolist(), []) self.assertListEqual( - data_dict['STA']['VST1']['tracesInfo'][0]['data'].tolist(), - []) + data_dict['STA']['LFT0']['tracesInfo'][0]['data'].tolist(), []) self.assertListEqual( - data_dict['STA']['VST2']['tracesInfo'][0]['data'].tolist(), - []) + data_dict['STA']['LFT1']['tracesInfo'][0]['time'].tolist(), []) self.assertListEqual( - data_dict['STA']['VST3']['tracesInfo'][0]['data'].tolist(), - []) + data_dict['STA']['LFT1']['tracesInfo'][0]['data'].tolist(), []) def test_has_no_vst(self): data_dict = { @@ -97,5 +91,72 @@ class TestSpitVSTChannel(BaseTestCase): 'time': np.array([1, 2]), 'data': np.array([1, 2])}] }}} - split_vst_channel('STA', data_dict) + split_pegasus_vst_channel('STA', data_dict) self.assertNotIn('VST0', data_dict['STA'].keys()) + + +class TestSplitQ8LFTChannel(BaseTestCase): + def test_has_lft(self): + data_dict = { + 'STA': { + 'LFT': { + 'tracesInfo': [{ + 'time': np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), + 'data': np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])}] + }}} + split_q8_lft_channel('STA', data_dict) + self.assertNotIn('LFT', data_dict['STA'].keys()) + self.assertIn('LFT0', data_dict['STA'].keys()) + self.assertIn('LFT1', data_dict['STA'].keys()) + + self.assertListEqual( + data_dict['STA']['LFT0']['tracesInfo'][0]['time'].tolist(), + [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + ) + self.assertListEqual( + data_dict['STA']['LFT0']['tracesInfo'][0]['data'].tolist(), + [0, 0, 0, 0, 0, 1, 1, 1, 1, 1] + ) + self.assertListEqual( + data_dict['STA']['LFT1']['tracesInfo'][0]['time'].tolist(), + [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + ) + self.assertListEqual( + data_dict['STA']['LFT1']['tracesInfo'][0]['data'].tolist(), + [1, 0, 2, 2, 2, 0, 1, 2, 2, 2] + ) + + def test_has_lft_empty(self): + data_dict = { + 'STA': { + 'LFT': { + 'tracesInfo': [{ + 'time': np.array([]), + 'data': np.array([])}] + }}} + split_q8_lft_channel('STA', data_dict) + self.assertNotIn('LFT', data_dict['STA'].keys()) + self.assertIn('LFT0', data_dict['STA'].keys()) + self.assertIn('LFT1', data_dict['STA'].keys()) + + self.assertListEqual( + data_dict['STA']['LFT0']['tracesInfo'][0]['time'].tolist(), []) + self.assertListEqual( + data_dict['STA']['LFT0']['tracesInfo'][0]['data'].tolist(), []) + self.assertListEqual( + data_dict['STA']['LFT1']['tracesInfo'][0]['time'].tolist(), []) + self.assertListEqual( + data_dict['STA']['LFT1']['tracesInfo'][0]['data'].tolist(), []) + + def test_has_no_lft(self): + data_dict = { + 'STA': { + 'VDP': { + 'tracesInfo': [{ + 'time': np.array([1, 2]), + 'data': np.array([1, 2])}] + }}} + split_pegasus_vst_channel('STA', data_dict) + self.assertNotIn('LFT0', data_dict['STA'].keys()) + self.assertNotIn('LFT1', data_dict['STA'].keys()) + -- GitLab