Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
Commits on Source (2)
......@@ -43,8 +43,6 @@ def display_tracking_info(tracking_box: QTextBrowser, text: str,
:param type: (info/warning/error) type of info to be displayed in
different color
"""
print(text)
return
if tracking_box is None:
print(f"{type.name}: {text}")
return
......
......@@ -3,12 +3,13 @@ import os
from tempfile import mkdtemp
import shutil
from typing import List, Tuple, Dict, Optional, Union
from typing import Optional, Union, List, Tuple, Dict
from PySide2 import QtCore
from sohstationviewer.controller.util import display_tracking_info
from sohstationviewer.conf import constants
from sohstationviewer.model.gps_point import GPSPoint
from sohstationviewer.view.util.enums import LogType
from sohstationviewer.database.process_db import execute_db
......@@ -234,6 +235,8 @@ class DataTypeModel():
self._pauser = QtCore.QSemaphore()
self.pause_response = None
self.gps_points: List[GPSPoint] = []
def __del__(self):
print("delete dataType Object")
try:
......
from typing import NamedTuple
class GPSPoint(NamedTuple):
"""
The metadata and location data of a GPS data point.
"""
last_timemark: str
fix_type: str
num_satellite_used: int
latitude: float
longitude: float
height: float
height_unit: str
"""
MSeed object to hold and process MSeed data
"""
import functools
import operator
import os
from pathlib import Path
from typing import Dict, Tuple, List, Set
......@@ -11,6 +12,7 @@ from obspy.core import Stream
from sohstationviewer.conf import constants
from sohstationviewer.controller.util import validate_file
from sohstationviewer.model.data_type_model import DataTypeModel, ThreadStopped
from sohstationviewer.model.gps_point import GPSPoint
from sohstationviewer.model.handling_data import (
read_waveform_mseed, squash_gaps, check_wf_chan, sort_data, read_soh_trace,
)
......@@ -56,6 +58,8 @@ class MSeed(DataTypeModel):
if len(self.req_wf_chans) != 0:
self.read_wf_files(self.selected_key)
self.get_gps_data_q330()
def read_soh_and_index_waveform(self, folder: str):
"""
+ read waveform data for filename associate with time range
......@@ -293,10 +297,11 @@ class MSeed(DataTypeModel):
# check time
has_data = False
# formatter:off
if ((self.read_start <= file_info['startEpoch'] <= self.read_end) or # noqa: E501
(self.read_start <= file_info['endEpoch'] <= self.read_end)): # noqa: E501
has_data = True
# formatter:on
if not has_data:
continue
read_waveform_mseed(file_info['path2file'],
......@@ -309,4 +314,147 @@ class MSeed(DataTypeModel):
self.track_info(
f'Read {count} waveform files', LogType.INFO)
sort_data(self.waveformData[sta_id]['readData'])
sort_data(self.waveform_data[sta_id]['readData'])
def get_gps_data_q330(self):
"""
Read LOG channel and extract GPS data stored in Q330 data set.
"""
for station in self.log_data:
if station == 'TEXT':
continue
# Q330 log data is composed of a list of string, so we combine them
# into one big string for ease of processing.
log_str = functools.reduce(
operator.iconcat, self.log_data[station]['LOG'], ''
)
log_lines = [line for line in log_str.splitlines() if line != '']
for idx, line in enumerate(log_lines):
if line == "GPS Status":
try:
# We are assuming that a GPS status report is 12 lines
# long, has a specific format, and is followed by a PLL
# status report. This method call checks if these
# preconditions hold and raise an error if not.
self.check_gps_status_format_q330(
log_lines[idx:idx + 13]
)
point = self.extract_gps_point_q330(
log_lines[idx:idx + 12]
)
self.gps_points.append(point)
except ValueError as e:
self.processing_log.append((e.args[0], LogType.ERROR))
@staticmethod
def extract_gps_point_q330(gps_status_lines: List[str]) -> GPSPoint:
"""
Extract the data from a set of log lines that encode a GPS data point.
:param gps_status_lines: a list of log lines that encodes a GPS data
point. Must have a length of 12.
:return a GPSPoint object that contains data encoded in
gps_status_lines.
"""
if len(gps_status_lines) != 12:
raise ValueError('The number of GPS log lines must be 12.')
# Last timemark and fix type are always available, so we have to get
# them before doing anything else.
last_timemark = gps_status_lines[11][19:]
fix_type = gps_status_lines[3][10:]
# If location data is missing, we set them to 0.
if gps_status_lines[5] == 'Latitude: ':
return GPSPoint(last_timemark, fix_type, 0, 0, 0, 0, '')
num_sats_used = int(gps_status_lines[8].split(': ')[1])
# Height is encoded as a float followed by the unit. We
# don't know how many characters the unit is composed of,
# so we have to loop through the height string backward
# until we can detect the end of the height value.
height_str: str = gps_status_lines[4].split(': ')[1]
# Start pass the end of the string and look backward one
# index every iteration so we don't have to add 1 to the
# final index.
current_idx = len(height_str)
current_char = height_str[current_idx - 1]
while current_char != '.' and not current_char.isnumeric():
current_idx -= 1
current_char = height_str[current_idx - 1]
height = float(height_str[:current_idx])
height_unit = height_str[current_idx:]
# Latitude and longitude are encoded in the format
# <degree><decimal minute><cardinal direction>. For
# latitude, <degree> has two characters, while for longitude, <degree>
# has three.
# To make the GPS points easier to plot, we convert the latitude and
# longitude to decimal degree at the cost of possible precision loss
# due to rounding error.
raw_latitude = gps_status_lines[5].split(': ')[1]
lat_degree = int(raw_latitude[:2])
lat_minute = float(raw_latitude[2:-1]) / 60
latitude = lat_degree + lat_minute
if raw_latitude[-1].lower() == 's':
latitude = -latitude
raw_longitude = gps_status_lines[6].split(': ')[1]
long_degree = int(raw_longitude[:3])
long_minute = float(raw_longitude[3:-1]) / 60
longitude = long_degree + long_minute
if raw_longitude[-1].lower() == 'w':
longitude = -longitude
gps_point = GPSPoint(last_timemark, fix_type, num_sats_used,
latitude, longitude, height, height_unit)
return gps_point
@staticmethod
def check_gps_status_format_q330(status_lines: List[str]):
"""
Check if the given set of log lines encode a GPS data point by
determining if they follow a specific format.
:param status_lines: a list of log lines. Must have a length of 13.
"""
if len(status_lines) != 13:
raise ValueError('The number of possible GPS log lines to check'
'must be 13.')
if status_lines[12].lower() != 'pll status':
raise ValueError(
'Q330 log data is malformed: '
'PLL status does not follow GPS status.'
)
if 'fix type' not in status_lines[3].lower():
raise ValueError(
'Q330 log data is malformed: '
'Fix type is not at expected position.'
)
if 'height' not in status_lines[4].lower():
raise ValueError(
'Q330 log data is malformed: '
'Height is not at expected position.'
)
if 'latitude' not in status_lines[5].lower():
raise ValueError(
'Q330 log data is malformed: '
'Latitude is not at expected position.'
)
if 'longitude' not in status_lines[6].lower():
raise ValueError(
'Q330 log data is malformed: '
'Longitude is not at expected position.'
)
if 'sat. used' not in status_lines[8].lower():
raise ValueError(
'Q330 log data is malformed: '
'Sat. Used is not at expected position.'
)
if 'last gps timemark' not in status_lines[11].lower():
raise ValueError(
'Q330 log data is malformed: '
'Last GPS timemark is not at expected position.'
)
import unittest
import math
from sohstationviewer.model.mseed.mseed import MSeed
class TestCheckGPSStatusFormatQ330(unittest.TestCase):
def setUp(self) -> None:
self.status_lines = [
'GPS Status',
'Time: 18:28:49',
'Date: 27/08/2018',
'Fix Type: 3-D',
'Height: 47.6M',
'Latitude: 5906.7572N',
'Longitude: 15651.4038W',
'On Time: 15min',
'Sat. Used: 6',
'In View: 11',
'Checksum Errors: 0',
'Last GPS timemark: 2018-08-27 18:28:48',
'PLL Status'
]
def test_not_enough_status_lines(self):
self.status_lines.pop()
with self.assertRaises(ValueError):
MSeed.check_gps_status_format_q330(self.status_lines)
def test_too_many_status_lines(self):
self.status_lines.append('')
with self.assertRaises(ValueError):
MSeed.check_gps_status_format_q330(self.status_lines)
def test_gps_status_not_followed_by_pll_status(self):
self.status_lines[12] = ''
with self.assertRaises(ValueError):
MSeed.check_gps_status_format_q330(self.status_lines)
def test_good_data(self):
try:
MSeed.check_gps_status_format_q330(self.status_lines)
except ValueError:
self.fail()
def test_fix_type_bad_data(self):
self.status_lines[3] = ''
with self.assertRaises(ValueError):
MSeed.check_gps_status_format_q330(self.status_lines)
def test_height_bad_data(self):
self.status_lines[4] = ''
with self.assertRaises(ValueError):
MSeed.check_gps_status_format_q330(self.status_lines)
def test_latitude_bad_data(self):
self.status_lines[5] = ''
with self.assertRaises(ValueError):
MSeed.check_gps_status_format_q330(self.status_lines)
def test_longitude_bad_data(self):
self.status_lines[6] = ''
with self.assertRaises(ValueError):
MSeed.check_gps_status_format_q330(self.status_lines)
def test_sat_used_bad_data(self):
self.status_lines[8] = ''
with self.assertRaises(ValueError):
MSeed.check_gps_status_format_q330(self.status_lines)
def test_gps_timemark_bad_data(self):
self.status_lines[11] = ''
with self.assertRaises(ValueError):
MSeed.check_gps_status_format_q330(self.status_lines)
class TestExtractGPSPointQ330(unittest.TestCase):
def setUp(self) -> None:
self.gps_lines = ['GPS Status',
'Time: 03:37:39',
'Date: 05/07/2021',
'Fix Type: 3-D',
'Height: 1000.6M',
'Latitude: 3853.9013N',
'Longitude: 04610.8865E',
'On Time: 65535min',
'Sat. Used: 7',
'In View: 11',
'Checksum Errors: 0',
'Last GPS timemark: 2021-07-05 03:37:38']
def test_not_enough_gps_lines(self):
self.gps_lines.pop()
with self.assertRaises(ValueError):
MSeed.extract_gps_point_q330(self.gps_lines)
def test_too_many_gps_lines(self):
self.gps_lines.append('')
with self.assertRaises(ValueError):
MSeed.extract_gps_point_q330(self.gps_lines)
def test_last_timemark_extracted_correctly(self):
with self.subTest('test_data_from_file'):
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = '2021-07-05 03:37:38'
self.assertEqual(result.last_timemark, expected)
with self.subTest('test_made_up_data'):
self.gps_lines[11] = 'Last GPS timemark: test time mark'
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = 'test time mark'
self.assertEqual(result.last_timemark, expected)
def test_fix_type_extracted_correctly(self):
with self.subTest('test_data_from_file'):
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = '3-D'
self.assertEqual(result.fix_type, expected)
with self.subTest('test_made_up_data'):
self.gps_lines[3] = 'Fix Type: test fix type'
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = 'test fix type'
self.assertEqual(result.fix_type, expected)
def test_height_extracted_correctly(self):
with self.subTest('test_data_from_file'):
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = 1000.6
self.assertTrue(math.isclose(result.height, expected))
with self.subTest('test_made_up_data'):
self.gps_lines[4] = 'Height: 3522362.623623MMMSM'
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = 3522362.623623
self.assertTrue(math.isclose(result.height, expected))
def test_height_unit_extracted_correctly(self):
with self.subTest('test_data_from_file'):
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = 'M'
self.assertEqual(result.height_unit, expected)
with self.subTest('test_made_up_data'):
self.gps_lines[4] = 'Height: 3522362.623623MMMSM'
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = 'MMMSM'
self.assertEqual(result.height_unit, expected)
def test_latitude_extracted_correctly(self):
with self.subTest('test_latitude_in_the_north'):
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = 38.898355
self.assertTrue(math.isclose(result.latitude, expected))
with self.subTest('test_latitude_in_the_south'):
self.gps_lines[5] = 'Latitude: 3853.9013S'
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = -38.898355
self.assertTrue(math.isclose(result.latitude, expected))
def test_longitude_extracted_correctly(self):
with self.subTest('test_longitude_in_the_east'):
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = 46.18144166666666667
self.assertTrue(math.isclose(result.longitude, expected))
with self.subTest('test_longitude_in_the_west'):
self.gps_lines[6] = 'Longitude: 04610.8865W'
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = -46.18144166666666667
self.assertTrue(math.isclose(result.longitude, expected))
def test_num_sat_used_extracted_correctly(self):
with self.subTest('test_longitude_in_the_east'):
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = 7
self.assertEqual(result.num_satellite_used, expected)
with self.subTest('test_longitude_in_the_west'):
self.gps_lines[8] = 'Sat. Used: 53252352'
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = 53252352
self.assertEqual(result.num_satellite_used, expected)
def test_no_location_data(self):
self.gps_lines[4] = 'Height: '
self.gps_lines[5] = 'Latitude: '
self.gps_lines[6] = 'Longitude: '
self.gps_lines[8] = 'Sat. Used: '
result = MSeed.extract_gps_point_q330(self.gps_lines)
self.assertEqual(result.last_timemark, '2021-07-05 03:37:38')
self.assertEqual(result.fix_type, '3-D')
self.assertEqual(result.height, 0)
self.assertEqual(result.latitude, 0)
self.assertEqual(result.longitude, 0)
self.assertEqual(result.num_satellite_used, 0)