Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
Commits on Source (10)
Showing
with 491 additions and 30 deletions
......@@ -43,8 +43,6 @@ def display_tracking_info(tracking_box: QTextBrowser, text: str,
:param type: (info/warning/error) type of info to be displayed in
different color
"""
print(text)
return
if tracking_box is None:
print(f"{type.name}: {text}")
return
......
......@@ -3,12 +3,13 @@ import os
from tempfile import mkdtemp
import shutil
from typing import List, Tuple, Dict, Optional, Union
from typing import Optional, Union, List, Tuple, Dict
from PySide2 import QtCore
from sohstationviewer.controller.util import display_tracking_info
from sohstationviewer.conf import constants
from sohstationviewer.model.gps_point import GPSPoint
from sohstationviewer.view.util.enums import LogType
from sohstationviewer.database.process_db import execute_db
......@@ -109,7 +110,7 @@ class DataTypeModel():
'read': file has been read or not - bool
}]
}
'read_data': {
'readData': {
chan_id - str: {
'samplerate': Sample rate of the data - float
'tracesInfo': [{
......@@ -234,6 +235,8 @@ class DataTypeModel():
self._pauser = QtCore.QSemaphore()
self.pause_response = None
self.gps_points: List[GPSPoint] = []
def __del__(self):
print("delete dataType Object")
try:
......
from typing import NamedTuple
class GPSPoint(NamedTuple):
"""
The metadata and location data of a GPS data point.
"""
last_timemark: str
fix_type: str
num_satellite_used: int
latitude: float
longitude: float
height: float
height_unit: str
......@@ -397,11 +397,10 @@ def sort_data(data_dict: Dict) -> None:
Sort data in 'traces_info' in 'startTmEpoch' order
:param data_dict: DataTypeModel.__init__.waveform_data
"""
for sta_id in data_dict:
for chan_id in data_dict[sta_id]['readData']:
traces_info = data_dict[sta_id]['readData'][chan_id]['tracesInfo']
traces_info = sorted(
traces_info, key=lambda i: i['startTmEpoch'])
for chan_id in data_dict:
traces_info = data_dict[chan_id]['tracesInfo']
data_dict[chan_id]['tracesInfo'] = sorted(
traces_info, key=lambda i: i['startTmEpoch'])
def squash_gaps(gaps: List[List[float]]) -> List[List[float]]:
......
......@@ -6,7 +6,7 @@ from struct import unpack
class ReadBlocketteError(Exception):
def __init__(self, errno, msg):
def __init__(self, msg):
self.msg = msg
......@@ -29,7 +29,7 @@ def read_next_blkt(b_no, data_bytes, byte_order):
try:
# readBlkt will skip first 4 bytes (HH) as they are already read
info = eval("readBlkt%s(%s, %s, '%s')"
info = eval("read_blkt_%s(%s, %s, '%s')"
% (blockette_type, b_no, data_bytes, byte_order))
except NameError:
raise ReadBlocketteError(f"Function to read blockette {blockette_type}"
......
"""
MSeed object to hold and process MSeed data
"""
import functools
import operator
import os
from pathlib import Path
from typing import Dict, Tuple, List, Set
......@@ -11,6 +12,7 @@ from obspy.core import Stream
from sohstationviewer.conf import constants
from sohstationviewer.controller.util import validate_file
from sohstationviewer.model.data_type_model import DataTypeModel, ThreadStopped
from sohstationviewer.model.gps_point import GPSPoint
from sohstationviewer.model.handling_data import (
read_waveform_mseed, squash_gaps, check_wf_chan, sort_data, read_soh_trace,
)
......@@ -56,6 +58,8 @@ class MSeed(DataTypeModel):
if len(self.req_wf_chans) != 0:
self.read_wf_files(self.selected_key)
self.get_gps_data_q330()
def read_soh_and_index_waveform(self, folder: str):
"""
+ read waveform data for filename associate with time range
......@@ -293,10 +297,11 @@ class MSeed(DataTypeModel):
# check time
has_data = False
# formatter:off
if ((self.read_start <= file_info['startEpoch'] <= self.read_end) or # noqa: E501
(self.read_start <= file_info['endEpoch'] <= self.read_end)): # noqa: E501
has_data = True
# formatter:on
if not has_data:
continue
read_waveform_mseed(file_info['path2file'],
......@@ -309,4 +314,147 @@ class MSeed(DataTypeModel):
self.track_info(
f'Read {count} waveform files', LogType.INFO)
sort_data(self.waveform_data)
sort_data(self.waveform_data[sta_id]['readData'])
def get_gps_data_q330(self):
"""
Read LOG channel and extract GPS data stored in Q330 data set.
"""
for station in self.log_data:
if station == 'TEXT':
continue
# Q330 log data is composed of a list of string, so we combine them
# into one big string for ease of processing.
log_str = functools.reduce(
operator.iconcat, self.log_data[station]['LOG'], ''
)
log_lines = [line for line in log_str.splitlines() if line != '']
for idx, line in enumerate(log_lines):
if line == "GPS Status":
try:
# We are assuming that a GPS status report is 12 lines
# long, has a specific format, and is followed by a PLL
# status report. This method call checks if these
# preconditions hold and raise an error if not.
self.check_gps_status_format_q330(
log_lines[idx:idx + 13]
)
point = self.extract_gps_point_q330(
log_lines[idx:idx + 12]
)
self.gps_points.append(point)
except ValueError as e:
self.processing_log.append((e.args[0], LogType.ERROR))
@staticmethod
def extract_gps_point_q330(gps_status_lines: List[str]) -> GPSPoint:
"""
Extract the data from a set of log lines that encode a GPS data point.
:param gps_status_lines: a list of log lines that encodes a GPS data
point. Must have a length of 12.
:return a GPSPoint object that contains data encoded in
gps_status_lines.
"""
if len(gps_status_lines) != 12:
raise ValueError('The number of GPS log lines must be 12.')
# Last timemark and fix type are always available, so we have to get
# them before doing anything else.
last_timemark = gps_status_lines[11][19:]
fix_type = gps_status_lines[3][10:]
# If location data is missing, we set them to 0.
if gps_status_lines[5] == 'Latitude: ':
return GPSPoint(last_timemark, fix_type, 0, 0, 0, 0, '')
num_sats_used = int(gps_status_lines[8].split(': ')[1])
# Height is encoded as a float followed by the unit. We
# don't know how many characters the unit is composed of,
# so we have to loop through the height string backward
# until we can detect the end of the height value.
height_str: str = gps_status_lines[4].split(': ')[1]
# Start pass the end of the string and look backward one
# index every iteration so we don't have to add 1 to the
# final index.
current_idx = len(height_str)
current_char = height_str[current_idx - 1]
while current_char != '.' and not current_char.isnumeric():
current_idx -= 1
current_char = height_str[current_idx - 1]
height = float(height_str[:current_idx])
height_unit = height_str[current_idx:]
# Latitude and longitude are encoded in the format
# <degree><decimal minute><cardinal direction>. For
# latitude, <degree> has two characters, while for longitude, <degree>
# has three.
# To make the GPS points easier to plot, we convert the latitude and
# longitude to decimal degree at the cost of possible precision loss
# due to rounding error.
raw_latitude = gps_status_lines[5].split(': ')[1]
lat_degree = int(raw_latitude[:2])
lat_minute = float(raw_latitude[2:-1]) / 60
latitude = lat_degree + lat_minute
if raw_latitude[-1].lower() == 's':
latitude = -latitude
raw_longitude = gps_status_lines[6].split(': ')[1]
long_degree = int(raw_longitude[:3])
long_minute = float(raw_longitude[3:-1]) / 60
longitude = long_degree + long_minute
if raw_longitude[-1].lower() == 'w':
longitude = -longitude
gps_point = GPSPoint(last_timemark, fix_type, num_sats_used,
latitude, longitude, height, height_unit)
return gps_point
@staticmethod
def check_gps_status_format_q330(status_lines: List[str]):
"""
Check if the given set of log lines encode a GPS data point by
determining if they follow a specific format.
:param status_lines: a list of log lines. Must have a length of 13.
"""
if len(status_lines) != 13:
raise ValueError('The number of possible GPS log lines to check'
'must be 13.')
if status_lines[12].lower() != 'pll status':
raise ValueError(
'Q330 log data is malformed: '
'PLL status does not follow GPS status.'
)
if 'fix type' not in status_lines[3].lower():
raise ValueError(
'Q330 log data is malformed: '
'Fix type is not at expected position.'
)
if 'height' not in status_lines[4].lower():
raise ValueError(
'Q330 log data is malformed: '
'Height is not at expected position.'
)
if 'latitude' not in status_lines[5].lower():
raise ValueError(
'Q330 log data is malformed: '
'Latitude is not at expected position.'
)
if 'longitude' not in status_lines[6].lower():
raise ValueError(
'Q330 log data is malformed: '
'Longitude is not at expected position.'
)
if 'sat. used' not in status_lines[8].lower():
raise ValueError(
'Q330 log data is malformed: '
'Sat. Used is not at expected position.'
)
if 'last gps timemark' not in status_lines[11].lower():
raise ValueError(
'Q330 log data is malformed: '
'Last GPS timemark is not at expected position.'
)
......@@ -135,7 +135,7 @@ class RT130(DataTypeModel):
if count % 50 == 0:
self.track_info(
f'Read {count} waveform files', LogType.INFO)
sort_data(self.waveform_data)
sort_data(self.waveformData[key]['readData'])
def add_log(self, chan_pkt, log_info):
if chan_pkt not in self.log_data[self.cur_key].keys():
......
......@@ -3,12 +3,12 @@ import pathlib
import shutil
import traceback
from datetime import datetime
from typing import List
from copy import deepcopy
from pathlib import Path
from PySide2 import QtCore, QtWidgets, QtGui
from sohstationviewer.controller.util import display_tracking_info
from sohstationviewer.model.data_loader import DataLoader
from sohstationviewer.model.data_type_model import DataTypeModel
......@@ -31,6 +31,7 @@ from sohstationviewer.view.util.enums import LogType
from sohstationviewer.view.channel_prefer_dialog import ChannelPreferDialog
from sohstationviewer.controller.processing import detect_data_type
from sohstationviewer.controller.util import display_tracking_info
from sohstationviewer.database.process_db import execute_db_dict, execute_db
......@@ -55,6 +56,10 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.data_loader = DataLoader()
self.data_loader.finished.connect(self.replot_loaded_data)
"""
forms_in_forms_menu: List of forms in forms_menu
"""
self.forms_in_forms_menu: List[QtWidgets.QWidget] = []
"""
req_soh_chans: [str,] - list of State-Of-Health channels to read data
from. For Reftek, the list of channels is fixed => may not need
......@@ -221,7 +226,7 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.curr_soh_ids_name_line_edit.setText('')
@QtCore.Slot()
def set_date_format(self, display_format):
def set_date_format(self, display_format: str):
"""
Sets the calendar format used by the QDateEdit text boxes.
:param display_format: str - A valid display format to be used for date
......@@ -232,7 +237,7 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.date_format = display_format
@QtCore.Slot()
def open_files_list_item_double_clicked(self, item):
def open_files_list_item_double_clicked(self, item: FileListItem):
"""
Handles the double-click event emitted when a user double-clicks on an
item contained within openFilesList
......@@ -275,6 +280,7 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
try:
del self.data_object
self.clear_actions_from_forms_menu()
except AttributeError:
pass
......@@ -449,6 +455,7 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.start_tm, self.end_tm, sel_key,
do.data_time[sel_key],
wf_data)
self.add_action_to_forms_menu('TPS Plot', self.tps_dlg)
else:
self.tps_dlg.hide()
......@@ -463,6 +470,7 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.start_tm, self.end_tm, sel_key,
do.data_time[sel_key], time_tick_total,
wf_data, mp_data)
self.add_action_to_forms_menu('Raw Data Plot', self.waveform_dlg)
else:
self.waveform_dlg.hide()
......@@ -478,6 +486,8 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.search_message_dialog.setup_logview(
sel_key, do.log_data, processing_log)
self.search_message_dialog.show()
self.add_action_to_forms_menu('Search Messages',
self.search_message_dialog)
@QtCore.Slot()
def reset_flags(self):
......@@ -492,7 +502,7 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.is_plotting_tps = False
self.is_stopping = False
def set_current_directory(self, path=''):
def set_current_directory(self, path: str = '') -> None:
"""
Update currentDirectory with path in DB table PersistentData.
Set all directories under current directory to self.open_files_list.
......@@ -565,7 +575,11 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
self.data_loader.thread.quit()
self.data_loader.thread.wait()
def delete_old_temp_data_folder(self):
def delete_old_temp_data_folder(self) -> None:
"""
Delete temp_data_folder which is used for keeping memmap files in case
SOHView wasn't closed correctly.
"""
rows = execute_db(
'SELECT FieldValue FROM PersistentData '
'WHERE FieldName="tempDataDirectory"')
......@@ -578,3 +592,44 @@ class MainWindow(QtWidgets.QMainWindow, UIMainWindow):
)
except (FileNotFoundError, TypeError):
pass
def add_action_to_forms_menu(
self, form_name: str, form: QtWidgets.QWidget) -> None:
"""
Creating and adding an action to forms_menu, connect to function
raise_form() to help opening and raising the form on top of
others when triggering the action.
Adding the form into forms_in_forms_menu to handle closing it when
loading new data set.
:param form_name: name of form that will be shown in forms_menu
:type form_name: str
:param form: QtWidget
:type form: QtWidgets.QWidget
"""
if not isinstance(form, QtWidgets.QWidget):
print(f"DEVELOPING ERROR: Type of form must be 'QWidget' instead "
f"of '{form.__class__.__name__}'")
if form not in self.forms_in_forms_menu:
action = QtWidgets.QAction(form_name, self)
self.forms_menu.addAction(action)
action.triggered.connect(lambda: self.raise_form(form))
self.forms_in_forms_menu.append(form)
self.forms_menu.removeAction(self.no_forms_action)
def clear_actions_from_forms_menu(self) -> None:
"""
Remove all actions from forms_menu.
Close all forms related to forms_menu.
Add no_forms_action to let user know that there are no forms available.
"""
self.forms_menu.clear() # remove all actions
for i in range(len(self.forms_in_forms_menu) - 1, -1, -1):
form = self.forms_in_forms_menu.pop(i)
form.close()
self.forms_menu.addAction(self.no_forms_action)
@QtCore.Slot(QtWidgets.QWidget)
def raise_form(self, form: QtWidgets.QWidget) -> None:
form.show() # in case form has been closed
form.raise_() # to raise form on top of others
......@@ -309,15 +309,22 @@ class PlottingWidget(QtWidgets.QScrollArea):
When click mouse on the current plottingWidget, SOHView will loop
through different plottingWidgets to do the same task for
interaction:
+ shift+click: call on_shift_click() to do zooming
+ shift+click: call on_shift_click() to do zooming. This is
disregarded in TimePowerSquareWidget because it isn't subjected
to be zoomed in.
+ ctrl+click or cmd+click in mac: call on_ctrl_cmd_click() to show
ruler
:param event: button_press_event - mouse click event
"""
modifiers = event.guiEvent.modifiers()
if (modifiers == QtCore.Qt.ShiftModifier and
self.name == "timepowersquaredwidget"):
# not start zooming from TimePowerSquareWidget
return
try:
# on_pick() take place after on_pick_on_artist where
# on_button_press_event() take place after on_pick_event where
# tps_t was assigned in TimePowerSquareWidget
xdata = self.tps_t
except AttributeError:
......
......@@ -188,11 +188,16 @@ class UIMainWindow(object):
self.info_list_widget = None
# ========================= Menus =============================
"""
form_menu: QMenu - to show all of the current open windows so that
user can select to put a window on top of all others when need to
look for that.
forms_menu: QMenu - to show all of the current available dialogs so
that user can select a dialog to put it on top of others to even
if the dialog has been closed by user.
"""
self.form_menu = None
self.forms_menu = None
"""
no_forms_action: QAction - to show in forms_menu when there are no
dialogs available to let user know about that
"""
self.no_forms_action = None
"""
exit_action: QAction - to exit SOHView
"""
......@@ -519,8 +524,9 @@ class UIMainWindow(object):
file_menu = main_menu.addMenu("File")
command_menu = main_menu.addMenu("Commands")
option_menu = main_menu.addMenu("Options")
self.form_menu = main_menu.addMenu("Forms")
database_menu = main_menu.addMenu("Database")
self.forms_menu = main_menu.addMenu("Forms")
help_menu = main_menu.addMenu("Help")
# exitAction = QtWidgets.QAction(QtGui.QIcon('exit.png'), "Exit", self)
......@@ -528,8 +534,8 @@ class UIMainWindow(object):
self.create_file_menu(main_window, file_menu)
self.create_command_menu(main_window, command_menu)
self.create_option_menu(main_window, option_menu)
self.create_database_menu(main_window, database_menu)
self.create_forms_menu(main_window, self.forms_menu)
self.create_help_menu(main_window, help_menu)
def create_file_menu(self, main_window, menu):
......@@ -638,6 +644,12 @@ class UIMainWindow(object):
self.doc_action = QtWidgets.QAction('Documentation', main_window)
menu.addAction(self.doc_action)
def create_forms_menu(self, main_window, menu):
self.no_forms_action = QtWidgets.QAction(
"No Forms Are Open", main_window)
menu.addAction(self.no_forms_action)
self.no_forms_action.setEnabled(False)
def connect_signals(self, main_window):
"""
Connect widgets what they do
......@@ -685,7 +697,7 @@ class UIMainWindow(object):
# Help
self.calendar_action.triggered.connect(main_window.open_calendar)
self.doc_action.triggered.connect(self.open_help_browser)
self.doc_action.triggered.connect(main_window.open_help_browser)
def connect_widget_signals(self, main_window):
main_window.current_directory_changed.connect(
......
from unittest import TestCase
from sohstationviewer.model.handling_data import (
sort_data
)
class TestSortData(TestCase):
def setUp(self) -> None:
self.data_dict = {'CH1': {'tracesInfo': [{'startTmEpoch': 7},
{'startTmEpoch': 1},
{'startTmEpoch': 5},
{'startTmEpoch': 3}]},
'CH2': {'tracesInfo': [{'startTmEpoch': 2},
{'startTmEpoch': 8},
{'startTmEpoch': 6},
{'startTmEpoch': 4}]}
}
def test_sort_data(self):
sort_data(self.data_dict)
self.assertEqual(
self.data_dict,
{'CH1': {'tracesInfo': [{'startTmEpoch': 1}, {'startTmEpoch': 3},
{'startTmEpoch': 5}, {'startTmEpoch': 7}]},
'CH2': {'tracesInfo': [{'startTmEpoch': 2}, {'startTmEpoch': 4},
{'startTmEpoch': 6}, {'startTmEpoch': 8}]}}
)
import unittest
import math
from sohstationviewer.model.mseed.mseed import MSeed
class TestCheckGPSStatusFormatQ330(unittest.TestCase):
def setUp(self) -> None:
self.status_lines = [
'GPS Status',
'Time: 18:28:49',
'Date: 27/08/2018',
'Fix Type: 3-D',
'Height: 47.6M',
'Latitude: 5906.7572N',
'Longitude: 15651.4038W',
'On Time: 15min',
'Sat. Used: 6',
'In View: 11',
'Checksum Errors: 0',
'Last GPS timemark: 2018-08-27 18:28:48',
'PLL Status'
]
def test_not_enough_status_lines(self):
self.status_lines.pop()
with self.assertRaises(ValueError):
MSeed.check_gps_status_format_q330(self.status_lines)
def test_too_many_status_lines(self):
self.status_lines.append('')
with self.assertRaises(ValueError):
MSeed.check_gps_status_format_q330(self.status_lines)
def test_gps_status_not_followed_by_pll_status(self):
self.status_lines[12] = ''
with self.assertRaises(ValueError):
MSeed.check_gps_status_format_q330(self.status_lines)
def test_good_data(self):
try:
MSeed.check_gps_status_format_q330(self.status_lines)
except ValueError:
self.fail()
def test_fix_type_bad_data(self):
self.status_lines[3] = ''
with self.assertRaises(ValueError):
MSeed.check_gps_status_format_q330(self.status_lines)
def test_height_bad_data(self):
self.status_lines[4] = ''
with self.assertRaises(ValueError):
MSeed.check_gps_status_format_q330(self.status_lines)
def test_latitude_bad_data(self):
self.status_lines[5] = ''
with self.assertRaises(ValueError):
MSeed.check_gps_status_format_q330(self.status_lines)
def test_longitude_bad_data(self):
self.status_lines[6] = ''
with self.assertRaises(ValueError):
MSeed.check_gps_status_format_q330(self.status_lines)
def test_sat_used_bad_data(self):
self.status_lines[8] = ''
with self.assertRaises(ValueError):
MSeed.check_gps_status_format_q330(self.status_lines)
def test_gps_timemark_bad_data(self):
self.status_lines[11] = ''
with self.assertRaises(ValueError):
MSeed.check_gps_status_format_q330(self.status_lines)
class TestExtractGPSPointQ330(unittest.TestCase):
def setUp(self) -> None:
self.gps_lines = ['GPS Status',
'Time: 03:37:39',
'Date: 05/07/2021',
'Fix Type: 3-D',
'Height: 1000.6M',
'Latitude: 3853.9013N',
'Longitude: 04610.8865E',
'On Time: 65535min',
'Sat. Used: 7',
'In View: 11',
'Checksum Errors: 0',
'Last GPS timemark: 2021-07-05 03:37:38']
def test_not_enough_gps_lines(self):
self.gps_lines.pop()
with self.assertRaises(ValueError):
MSeed.extract_gps_point_q330(self.gps_lines)
def test_too_many_gps_lines(self):
self.gps_lines.append('')
with self.assertRaises(ValueError):
MSeed.extract_gps_point_q330(self.gps_lines)
def test_last_timemark_extracted_correctly(self):
with self.subTest('test_data_from_file'):
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = '2021-07-05 03:37:38'
self.assertEqual(result.last_timemark, expected)
with self.subTest('test_made_up_data'):
self.gps_lines[11] = 'Last GPS timemark: test time mark'
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = 'test time mark'
self.assertEqual(result.last_timemark, expected)
def test_fix_type_extracted_correctly(self):
with self.subTest('test_data_from_file'):
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = '3-D'
self.assertEqual(result.fix_type, expected)
with self.subTest('test_made_up_data'):
self.gps_lines[3] = 'Fix Type: test fix type'
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = 'test fix type'
self.assertEqual(result.fix_type, expected)
def test_height_extracted_correctly(self):
with self.subTest('test_data_from_file'):
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = 1000.6
self.assertTrue(math.isclose(result.height, expected))
with self.subTest('test_made_up_data'):
self.gps_lines[4] = 'Height: 3522362.623623MMMSM'
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = 3522362.623623
self.assertTrue(math.isclose(result.height, expected))
def test_height_unit_extracted_correctly(self):
with self.subTest('test_data_from_file'):
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = 'M'
self.assertEqual(result.height_unit, expected)
with self.subTest('test_made_up_data'):
self.gps_lines[4] = 'Height: 3522362.623623MMMSM'
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = 'MMMSM'
self.assertEqual(result.height_unit, expected)
def test_latitude_extracted_correctly(self):
with self.subTest('test_latitude_in_the_north'):
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = 38.898355
self.assertTrue(math.isclose(result.latitude, expected))
with self.subTest('test_latitude_in_the_south'):
self.gps_lines[5] = 'Latitude: 3853.9013S'
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = -38.898355
self.assertTrue(math.isclose(result.latitude, expected))
def test_longitude_extracted_correctly(self):
with self.subTest('test_longitude_in_the_east'):
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = 46.18144166666666667
self.assertTrue(math.isclose(result.longitude, expected))
with self.subTest('test_longitude_in_the_west'):
self.gps_lines[6] = 'Longitude: 04610.8865W'
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = -46.18144166666666667
self.assertTrue(math.isclose(result.longitude, expected))
def test_num_sat_used_extracted_correctly(self):
with self.subTest('test_longitude_in_the_east'):
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = 7
self.assertEqual(result.num_satellite_used, expected)
with self.subTest('test_longitude_in_the_west'):
self.gps_lines[8] = 'Sat. Used: 53252352'
result = MSeed.extract_gps_point_q330(self.gps_lines)
expected = 53252352
self.assertEqual(result.num_satellite_used, expected)
def test_no_location_data(self):
self.gps_lines[4] = 'Height: '
self.gps_lines[5] = 'Latitude: '
self.gps_lines[6] = 'Longitude: '
self.gps_lines[8] = 'Sat. Used: '
result = MSeed.extract_gps_point_q330(self.gps_lines)
self.assertEqual(result.last_timemark, '2021-07-05 03:37:38')
self.assertEqual(result.fix_type, '3-D')
self.assertEqual(result.height, 0)
self.assertEqual(result.latitude, 0)
self.assertEqual(result.longitude, 0)
self.assertEqual(result.num_satellite_used, 0)