Skip to content
Snippets Groups Projects
Commit d23651b9 authored by Kien Le's avatar Kien Le
Browse files

Implement extracting RT130 GPS data

parent bd755401
No related branches found
No related tags found
1 merge request!60Implement extracting RT130 GPS data
from typing import NamedTuple from typing import NamedTuple, Union
class GPSPoint(NamedTuple): class GPSPoint(NamedTuple):
...@@ -7,7 +7,7 @@ class GPSPoint(NamedTuple): ...@@ -7,7 +7,7 @@ class GPSPoint(NamedTuple):
""" """
last_timemark: str last_timemark: str
fix_type: str fix_type: str
num_satellite_used: int num_satellite_used: Union[int, str]
latitude: float latitude: float
longitude: float longitude: float
height: float height: float
......
""" """
RT130 object to hold and process RefTek data RT130 object to hold and process RefTek data
""" """
from datetime import datetime
import os import os
from pathlib import Path from pathlib import Path
from typing import Tuple, List, Union from typing import Tuple, List, Union
import numpy as np import numpy as np
from obspy.core import Stream from obspy.core import Stream
from sohstationviewer.model.gps_point import GPSPoint
from sohstationviewer.model.reftek.from_rt2ms import ( from sohstationviewer.model.reftek.from_rt2ms import (
core, soh_packet, packet) core, soh_packet, packet)
from sohstationviewer.model.reftek.log_info import LogInfo from sohstationviewer.model.reftek.log_info import LogInfo
from sohstationviewer.model.data_type_model import DataTypeModel, ThreadStopped from sohstationviewer.model.data_type_model import DataTypeModel, ThreadStopped
from sohstationviewer.model.handling_data import ( from sohstationviewer.model.handling_data import (
read_waveform_reftek, squash_gaps, sort_data, read_mp_trace, read_text) read_waveform_reftek, squash_gaps, sort_data, read_mp_trace, read_text)
from sohstationviewer.conf import constants from sohstationviewer.conf import constants
from sohstationviewer.controller.util import validate_file from sohstationviewer.controller.util import validate_file
from sohstationviewer.view.util.enums import LogType from sohstationviewer.view.util.enums import LogType
...@@ -58,6 +55,7 @@ class RT130(DataTypeModel): ...@@ -58,6 +55,7 @@ class RT130(DataTypeModel):
msg = (f"No data found for data streams: " msg = (f"No data found for data streams: "
f"{', '.join(map(str, not_found_data_streams))}") f"{', '.join(map(str, not_found_data_streams))}")
self.processing_log.append((msg, LogType.WARNING)) self.processing_log.append((msg, LogType.WARNING))
self.get_gps_data()
def read_soh_index_waveform(self, folder: str) -> None: def read_soh_index_waveform(self, folder: str) -> None:
""" """
...@@ -352,3 +350,112 @@ class RT130(DataTypeModel): ...@@ -352,3 +350,112 @@ class RT130(DataTypeModel):
# endtime, duration, number of missing samples] # endtime, duration, number of missing samples]
all_gaps += [[g[4].timestamp, g[5].timestamp] for g in gaps] all_gaps += [[g[4].timestamp, g[5].timestamp] for g in gaps]
self.gaps[k] = squash_gaps(all_gaps) self.gaps[k] = squash_gaps(all_gaps)
def get_gps_data(self):
"""
Retrieve the GPS of the current data set. Works by looking into the log
of the data set.
"""
log_lines = self.log_data[self.selected_key]['SOH']
log_lines = log_lines[0].split('\n')
log_lines = [line.strip('\r').strip('') for line in log_lines]
gps_year = None
for line in log_lines:
# The lines of the log that contains the GPS data does not include
# the year. Instead, we grab it by looking at the header of the
# state of health packet the GPS line is in.
# Also, it might be possible to only grab the year once. It would
# be easy to check if the year change by seeing if the day of year
# loops back to 1. That approach is a bit more complex, however,
# so we are not using it.
if 'State of Health' in line:
current_time = line.split()[3]
two_digit_year = current_time[:2]
gps_year = two_digit_year_to_four_digit_year(two_digit_year)
if 'GPS: POSITION' in line:
self.gps_points.append(self.parse_gps_point(line, gps_year))
@staticmethod
def parse_gps_point(gps_line: str, gps_year: str) -> GPSPoint:
"""
Parse a GPS log line to extract the GPS data point it contains. Needs
to be given a year because GPS log lines do not contain the year.
:param gps_line: the GPS log line to parse
:param gps_year: the year associated with gps_line
:return: a GPSPoint object that contains the GPS data points stored in
gps_line
"""
# RT130 data does not contain GPS fix type and number of satellites
# used, so we set them to not available.
fix_type = 'N/A'
num_sats_used = 'N/A'
time_str, *_, lat_str, long_str, height_str = gps_line.split()
time_str = gps_year + ':' + time_str
time_format = '%Y:%j:%H:%M:%S'
gps_time = datetime.strptime(time_str, time_format).isoformat(sep=' ')
# Latitude and longitude are stored in degrees, minutes, and seconds,
# with each quantity being separated by ':'. The degree part of
# latitude is stored as three characters, while the degree part of
# longitude is stored as four characters. The first character of the
# degree part is the cardinal direction (NS/EW), so we ignore it in
# calculating latitude and longitude.
lat_as_list = lat_str.split(':')
lat_second = float(lat_as_list[2])
lat_minute = float(lat_as_list[1]) + lat_second / 60
lat = float(lat_as_list[0][1:]) + lat_minute / 60
if lat_as_list[0][0] == 'S':
lat = -lat
long_as_list = long_str.split(':')
long_second = float(long_as_list[2])
long_minute = float(long_as_list[1]) + long_second / 60
long = float(long_as_list[0][1:]) + long_minute / 60
if long_as_list[0][0] == 'W':
long = -long
# Height is encoded as a signed float followed by the unit. We don't
# know how many characters the unit is composed of, so we have to loop
# through the height string backward until we can detect the end of the
# height value.
# Start pass the end of the string and look backward one index every
# iteration so we don't have to add 1 to the final index.
current_idx = len(height_str)
current_char = height_str[current_idx - 1]
while current_char != '.' and not current_char.isnumeric():
current_idx -= 1
current_char = height_str[current_idx - 1]
height = float(height_str[:current_idx])
height_unit = height_str[current_idx:]
return GPSPoint(gps_time, fix_type, num_sats_used, lat, long, height,
height_unit)
def two_digit_year_to_four_digit_year(year: str) -> str:
"""
Convert a year represented by two digits to its representation in 4 digits.
Follow the POSIX and ISO C standards mentioned by the Python documentation,
quoted below.
'When 2-digit years are parsed, they are converted according to the POSIX
and ISO C standards: values 69–99 are mapped to 1969–1999, and values 0–68
are mapped to 2000–2068.'
Raise ValueError if year is outside the 0-99 range (0n counts as n).
:param year: the 2-digit year as a string
:return: the 4-digit representation of year as a string
"""
year_as_int = int(year)
if not (0 <= year_as_int <= 99):
raise ValueError(f'Given year {year} is not in the valid range.')
if int(year) < 69:
prefix = '20'
else:
prefix = '19'
return f'{prefix}{year:0>2}'
import math
import unittest
from sohstationviewer.model.reftek.reftek import (
two_digit_year_to_four_digit_year,
RT130
)
class TestTwoDigitYearToFourDigitYear(unittest.TestCase):
def test_2000_years(self):
test_cases = {
'0': '2000',
'00': '2000',
'1': '2001',
'01': '2001',
'12': '2012',
'68': '2068',
}
for input, expected_output in test_cases.items():
with self.subTest(f'test_input_{input}'):
result = two_digit_year_to_four_digit_year(input)
self.assertEqual(result, expected_output)
def test_1900_years(self):
test_cases = {
'69': '1969',
'99': '1999',
'75': '1975',
}
for input, expected_output in test_cases.items():
with self.subTest(f'test_input_{input}'):
result = two_digit_year_to_four_digit_year(input)
self.assertEqual(result, expected_output)
def test_invalid_input(self):
with self.subTest('test_negative_year'):
input = '-1'
with self.assertRaises(ValueError):
two_digit_year_to_four_digit_year(input)
with self.subTest('test_year_has_more_than_two_dgit'):
input = '123'
with self.assertRaises(ValueError):
two_digit_year_to_four_digit_year(input)
with self.subTest('test_non_sensical_input'):
input = 'tt'
with self.assertRaises(ValueError):
two_digit_year_to_four_digit_year(input)
class TestParseGpsPoint(unittest.TestCase):
def setUp(self) -> None:
self.gps_year = '2000'
self.good_gps_line = ('150:03:00:00 GPS: POSITION: N34:04:26.94 '
'E106:55:13.39 +01425M')
def test_not_enough_gps_field(self):
gps_line = '150:03:00:00 GPS: POSITION: N34:04:26.94 W106:55:13.39'
with self.assertRaises(IndexError):
RT130.parse_gps_point(gps_line, self.gps_year)
def test_time_formatted_correctly(self):
gps_point = RT130.parse_gps_point(self.good_gps_line, self.gps_year)
result = gps_point.last_timemark
expected = '2000-05-29 03:00:00'
self.assertEqual(result, expected)
def test_latitude_extracted_correctly(self):
with self.subTest('test_northern_latitude'):
gps_point = RT130.parse_gps_point(self.good_gps_line,
self.gps_year)
result = gps_point.latitude
expected = 34.07415
self.assertTrue(math.isclose(result, expected))
with self.subTest('test_southern_latitude'):
self.good_gps_line = (self.good_gps_line[:28] +
'S' +
self.good_gps_line[29:])
gps_point = RT130.parse_gps_point(self.good_gps_line,
self.gps_year)
result = gps_point.latitude
expected = -34.07415
self.assertTrue(math.isclose(result, expected))
def test_longitude_extracted_correctly(self):
with self.subTest('test_eastern_longitude'):
gps_point = RT130.parse_gps_point(self.good_gps_line,
self.gps_year)
result = gps_point.longitude
expected = 106.92038611111111
self.assertTrue(math.isclose(result, expected))
with self.subTest('test_western_longitude'):
self.good_gps_line = (self.good_gps_line[:41] +
'W' +
self.good_gps_line[42:])
gps_point = RT130.parse_gps_point(self.good_gps_line,
self.gps_year)
result = gps_point.longitude
print(result)
expected = -106.92038611111111
self.assertTrue(math.isclose(result, expected))
def test_height_extracted_correctly(self):
with self.subTest('test_positive_height'):
gps_point = RT130.parse_gps_point(self.good_gps_line,
self.gps_year)
result = gps_point.height
expected = 1425
self.assertTrue(math.isclose(result, expected))
with self.subTest('test_negative_height'):
self.good_gps_line = self.good_gps_line.replace('+', '-')
gps_point = RT130.parse_gps_point(self.good_gps_line,
self.gps_year)
result = gps_point.height
expected = -1425
self.assertTrue(math.isclose(result, expected))
def test_height_unit_extracted_correctly(self):
with self.subTest('test_one_character_height_unit'):
gps_point = RT130.parse_gps_point(self.good_gps_line,
self.gps_year)
result = gps_point.height_unit
expected = 'M'
self.assertEqual(result, expected)
with self.subTest('test_more_than_one_character_height_unit'):
self.good_gps_line = self.good_gps_line + 'MM'
gps_point = RT130.parse_gps_point(self.good_gps_line,
self.gps_year)
result = gps_point.height_unit
expected = 'MMM'
self.assertEqual(result, expected)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment