Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
Commits on Source (2)
from __future__ import annotations from __future__ import annotations
from sohstationviewer.model.reftek_data.reftek_reader import soh_packet
""" """
Suggested updates to obspy.io.reftek.core: Suggested updates to obspy.io.reftek.core:
- modify section of the code that deals with upacking data - modify section of the code that deals with upacking data
...@@ -26,7 +28,10 @@ from sohstationviewer.model.general_data.general_record_helper import Unpacker ...@@ -26,7 +28,10 @@ from sohstationviewer.model.general_data.general_record_helper import Unpacker
from sohstationviewer.model.reftek_data.reftek_reader.packet import EHPacket from sohstationviewer.model.reftek_data.reftek_reader.packet import EHPacket
from sohstationviewer.model.reftek_data.reftek_reader.reftek_reader_helper \ from sohstationviewer.model.reftek_data.reftek_reader.reftek_reader_helper \
import (read_rt130_file, convert_packet_to_obspy_format) import (
read_rt130_file, convert_waveform_packet_to_obspy_format,
convert_soh_packet_to_obspy_format,
)
class DiscontinuousTrace(Trace): class DiscontinuousTrace(Trace):
...@@ -34,6 +39,7 @@ class DiscontinuousTrace(Trace): ...@@ -34,6 +39,7 @@ class DiscontinuousTrace(Trace):
Extension of obspy.Trace that changes the way time data is handled when Extension of obspy.Trace that changes the way time data is handled when
reading data using the method from logpeek/qpeek. reading data using the method from logpeek/qpeek.
""" """
def __init__(self, *args, times: np.ndarray, **kwargs): def __init__(self, *args, times: np.ndarray, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self._times = times self._times = times
...@@ -73,6 +79,7 @@ class Reftek130(obspy_rt130_core.Reftek130): ...@@ -73,6 +79,7 @@ class Reftek130(obspy_rt130_core.Reftek130):
Child class of obspy.Reftek that reads waveform data similar to logpeek for Child class of obspy.Reftek that reads waveform data similar to logpeek for
better performance. better performance.
""" """
@staticmethod @staticmethod
def from_file(file: Union[str, Path]) -> Reftek130: def from_file(file: Union[str, Path]) -> Reftek130:
""" """
...@@ -85,11 +92,17 @@ class Reftek130(obspy_rt130_core.Reftek130): ...@@ -85,11 +92,17 @@ class Reftek130(obspy_rt130_core.Reftek130):
rt = Reftek130() rt = Reftek130()
rt._filename = file rt._filename = file
packets_in_file = read_rt130_file(file, rt130_unpacker) packets_in_file = read_rt130_file(file, rt130_unpacker)
first_packet_type = packets_in_file[0].header.packet_type
if first_packet_type in ['EH', 'ET', 'DT']:
packet_converter = convert_waveform_packet_to_obspy_format
final_dtype = PACKET_FINAL_DTYPE
else:
packet_converter = convert_soh_packet_to_obspy_format
final_dtype = soh_packet.PACKET_FINAL_DTYPE
converted_packets = [] converted_packets = []
for packet in packets_in_file: for packet in packets_in_file:
converted_packets.append( converted_packets.append(packet_converter(packet, rt130_unpacker))
convert_packet_to_obspy_format(packet, rt130_unpacker)) rt._data = np.array(converted_packets, dtype=final_dtype)
rt._data = np.array(converted_packets, dtype=PACKET_FINAL_DTYPE)
return rt return rt
def to_stream(self, network: str = "", location: str = "", def to_stream(self, network: str = "", location: str = "",
...@@ -167,9 +180,9 @@ class Reftek130(obspy_rt130_core.Reftek130): ...@@ -167,9 +180,9 @@ class Reftek130(obspy_rt130_core.Reftek130):
# split into contiguous blocks, i.e. find gaps. packet sequence # split into contiguous blocks, i.e. find gaps. packet sequence
# was sorted already.. # was sorted already..
endtimes = ( endtimes = (
packets[:-1]["time"] + packets[:-1]["time"] +
packets[:-1]["number_of_samples"].astype(np.int64) * packets[:-1]["number_of_samples"].astype(np.int64) *
delta_nanoseconds) delta_nanoseconds)
# check if next starttime matches seamless to last chunk # check if next starttime matches seamless to last chunk
# 1e-3 seconds == 1e6 nanoseconds is the smallest time # 1e-3 seconds == 1e6 nanoseconds is the smallest time
# difference reftek130 format can represent, so anything larger # difference reftek130 format can represent, so anything larger
...@@ -204,7 +217,7 @@ class Reftek130(obspy_rt130_core.Reftek130): ...@@ -204,7 +217,7 @@ class Reftek130(obspy_rt130_core.Reftek130):
npts = sample_data.size npts = sample_data.size
tr = DiscontinuousTrace( tr = DiscontinuousTrace(
data=sample_data, header=copy.deepcopy(header), data=sample_data, header=copy.deepcopy(header),
times=(packets_['time'] / 10**9).round(3) times=(packets_['time'] / 10 ** 9).round(3)
) )
# The plotting process needs to know about the number of # The plotting process needs to know about the number of
# points stored in the trace. However, tr.stats use the # points stored in the trace. However, tr.stats use the
......
from typing import Tuple, Any from typing import Tuple, Any
import numpy
from obspy.io.reftek.util import bcd
from sohstationviewer.model.general_data.general_record_helper import Unpacker from sohstationviewer.model.general_data.general_record_helper import Unpacker
from sohstationviewer.model.reftek_data.reftek_reader.packet import \ from sohstationviewer.model.reftek_data.reftek_reader.packet import \
eh_et_payload_end_in_packet eh_et_payload_end_in_packet
from sohstationviewer.model.reftek_data.reftek_reader.packets import ( from sohstationviewer.model.reftek_data.reftek_reader.packets import (
DTExtendedHeader, DTExtendedHeader, EHETExtendedHeader,
EHETExtendedHeader, SOHExtendedHeader,
) )
...@@ -113,37 +109,11 @@ def read_eh_et_packet(packet: bytes, unpacker: Unpacker ...@@ -113,37 +109,11 @@ def read_eh_et_packet(packet: bytes, unpacker: Unpacker
return extended_header, payload return extended_header, payload
def bcd_16bit_int(_i) -> int: def read_soh_packet(packet: bytes) -> bytes:
"""
Reimplement a private function of the same name in obspy. Kept here in case
the private function is removed in a future obspy version.
:param _i: the byte string to convert into a 16-bite integer
:return: a 16-bit integer
"""
_i = bcd(_i)
return _i[0] * 100 + _i[1]
def read_soh_packet(packet: bytes, unpacker: Unpacker
) -> Tuple[SOHExtendedHeader, bytes]:
""" """
Process an SOH packet and get its extended header and poyload. Process an SOH packet and get its payload.
:param packet: the bytes that make up the given SOH packet. :param packet: the bytes that make up the given SOH packet.
:param unpacker: the unpacker to use to decode the data. :return: the payload of the given SOH packet.
:return: the extended header and payload of the given SOH packet.
""" """
payload = packet[16:]
event_number = bcd_16bit_int(numpy.frombuffer(packet[16:18], numpy.uint8)) return payload
data_stream_number = bcd(numpy.frombuffer(packet[18:19], numpy.uint8))
channel_number = bcd(numpy.frombuffer(packet[19:20], numpy.uint8))
number_of_samples = bcd_16bit_int(
numpy.frombuffer(packet[20:22], numpy.uint8)
)
flags = unpacker.unpack('B', packet[22:23])[0]
data_format = packet[23:24].hex().upper()
extended_header = SOHExtendedHeader(event_number, data_stream_number,
channel_number, number_of_samples,
flags, data_format)
payload = packet[24:]
return extended_header, payload
...@@ -58,32 +58,10 @@ class EHETPacket: ...@@ -58,32 +58,10 @@ class EHETPacket:
data: bytes data: bytes
@dataclasses.dataclass
class SOHExtendedHeader:
"""
A collection of dummy data for some information needed so that
core.Reftek130 can understand SOH packets.
core.Reftek130 focuses on reading waveform data, so it wants information
available in the waveform packets (EH/ET/DT). However, core.Reftek130 also
supports SOH packets, which does not contain the required information. As
a result, we need to store dummy data in its place.
"""
event_number: int
data_stream_number: int
channel_number: int
number_of_samples: int
flags: int
data_format: str
@dataclasses.dataclass @dataclasses.dataclass
class SOHPacket: class SOHPacket:
""" """
The decoded data of an SOH packet. The extended_header field is to ensure The decoded data of an SOH packet.
compatibility with dt_packet.DTPacket. SOH packets do not have an
extended header otherwise.
""" """
header: PacketHeader header: PacketHeader
extended_header: SOHExtendedHeader payload: bytes
data: bytes
import os import os
from typing import Any, Dict, Callable, Union, List, Tuple from typing import Any, Dict, Union, List, Tuple
import numpy import numpy
import numpy as np import numpy as np
...@@ -13,8 +13,9 @@ from sohstationviewer.model.reftek_data.reftek_reader.packet_readers import ( ...@@ -13,8 +13,9 @@ from sohstationviewer.model.reftek_data.reftek_reader.packet_readers import (
from sohstationviewer.model.reftek_data.reftek_reader.packets import ( from sohstationviewer.model.reftek_data.reftek_reader.packets import (
DTPacket, EHETPacket, SOHPacket, DTPacket, EHETPacket, SOHPacket,
) )
from sohstationviewer.model.reftek_data.reftek_reader.header import \ from sohstationviewer.model.reftek_data.reftek_reader.header import (
get_rt130_packet_header get_rt130_packet_header, PacketHeader,
)
def packet_reader_placeholder(*args: Any, **kwargs: Any) -> Tuple[Any, Any]: def packet_reader_placeholder(*args: Any, **kwargs: Any) -> Tuple[Any, Any]:
...@@ -45,42 +46,45 @@ def read_rt130_file(file_name: str, unpacker: Unpacker ...@@ -45,42 +46,45 @@ def read_rt130_file(file_name: str, unpacker: Unpacker
packet = rt130_file.read(1024) packet = rt130_file.read(1024)
packet_header = get_rt130_packet_header(packet) packet_header = get_rt130_packet_header(packet)
waveform_handlers: Dict[str, Callable] = { if packet_header.packet_type in ['EH', 'ET', 'DT']:
'EH': read_eh_et_packet, waveform_handlers = {
'ET': read_eh_et_packet, 'EH': (read_eh_et_packet, EHETPacket),
'DT': read_dt_packet, 'ET': (read_eh_et_packet, EHETPacket),
} 'DT': (read_dt_packet, DTPacket),
}
soh_handlers: Dict[str, Callable] = dict.fromkeys( packet_handlers = {
['AD', 'CD', 'DS', 'FD', 'OM', 'SC', 'SH'], **waveform_handlers
read_soh_packet }
) packet_handler, packet_type = packet_handlers.get(
packet_header.packet_type, packet_reader_placeholder
packet_handlers = { )
**waveform_handlers, **soh_handlers return_val = packet_handler(packet, unpacker)
} extended_header, data = return_val
current_packet = packet_type(packet_header, extended_header,
packet_handler = packet_handlers.get( data)
packet_header.packet_type, packet_reader_placeholder
)
return_val = packet_handler(packet, unpacker)
if packet_header.packet_type == 'DT':
packet_type = DTPacket
elif packet_header.packet_type in ['EH', 'ET']:
packet_type = EHETPacket
else: else:
packet_type = SOHPacket payload = read_soh_packet(packet)
current_packet = SOHPacket(packet_header, payload)
extended_header, data = return_val
current_packet = packet_type(packet_header, extended_header, data)
packets.append(current_packet) packets.append(current_packet)
return packets return packets
def convert_packet_to_obspy_format(packet: Union[EHETPacket, DTPacket, def convert_packet_header_to_dict(packet_header: PacketHeader) -> Dict:
SOHPacket], converted_header = {'packet_type': packet_header.packet_type,
unpacker: Unpacker) -> Tuple: 'experiment_number': packet_header.experiment_number,
# Obspy only stores the last two digits of the year.
'year': packet_header.time.year % 100,
'unit_id': packet_header.unit_id,
'time': packet_header.time.ns,
'byte_count': packet_header.byte_count,
'packet_sequence': packet_header.packet_sequence}
return converted_header
def convert_waveform_packet_to_obspy_format(
packet: Union[EHETPacket, DTPacket], unpacker: Unpacker) -> Tuple:
""" """
Convert an RT130 packet into a numpy array of type PACKET_FINAL_DTYPE Convert an RT130 packet into a numpy array of type PACKET_FINAL_DTYPE
:param packet: an RT130 packet. :param packet: an RT130 packet.
...@@ -91,15 +95,7 @@ def convert_packet_to_obspy_format(packet: Union[EHETPacket, DTPacket, ...@@ -91,15 +95,7 @@ def convert_packet_to_obspy_format(packet: Union[EHETPacket, DTPacket,
# We want to convert the packet to a tuple. In order to make it easier to # We want to convert the packet to a tuple. In order to make it easier to
# maintain, we first convert the packet to a dictionary. Then, we grab the # maintain, we first convert the packet to a dictionary. Then, we grab the
# values of the dictionary as tuple to get the final result. # values of the dictionary as tuple to get the final result.
converted_packet = {} converted_packet = convert_packet_header_to_dict(packet.header)
converted_packet['packet_type'] = packet.header.packet_type
converted_packet['experiment_number'] = packet.header.experiment_number
# Obspy only stores the last two digits of the year.
converted_packet['year'] = packet.header.time.year % 100
converted_packet['unit_id'] = packet.header.unit_id
converted_packet['time'] = packet.header.time.ns
converted_packet['byte_count'] = packet.header.byte_count
converted_packet['packet_sequence'] = packet.header.packet_sequence
converted_packet['event_number'] = packet.extended_header.event_number converted_packet['event_number'] = packet.extended_header.event_number
converted_packet[ converted_packet[
'data_stream_number'] = packet.extended_header.data_stream_number 'data_stream_number'] = packet.extended_header.data_stream_number
...@@ -133,3 +129,10 @@ def convert_packet_to_obspy_format(packet: Union[EHETPacket, DTPacket, ...@@ -133,3 +129,10 @@ def convert_packet_to_obspy_format(packet: Union[EHETPacket, DTPacket,
else: else:
converted_packet['payload'] = numpy.frombuffer(packet.data, np.uint8) converted_packet['payload'] = numpy.frombuffer(packet.data, np.uint8)
return tuple(converted_packet.values()) return tuple(converted_packet.values())
def convert_soh_packet_to_obspy_format(packet: SOHPacket, unpacker: Unpacker
) -> Tuple:
converted_packet = convert_packet_header_to_dict(packet.header)
converted_packet['payload'] = numpy.frombuffer(packet.payload, np.uint8)
return tuple(converted_packet.values())
import os import os
import unittest import unittest
from unittest.mock import patch
from pathlib import Path from pathlib import Path
import numpy import numpy
...@@ -12,6 +13,11 @@ from sohstationviewer.model.reftek_data.reftek_reader.core import ( ...@@ -12,6 +13,11 @@ from sohstationviewer.model.reftek_data.reftek_reader.core import (
) )
from sohstationviewer.model.reftek_data.reftek_reader.header import \ from sohstationviewer.model.reftek_data.reftek_reader.header import \
NotRT130FileError NotRT130FileError
from sohstationviewer.model.reftek_data.reftek_reader.reftek_reader_helper \
import (
convert_soh_packet_to_obspy_format,
convert_waveform_packet_to_obspy_format,
)
class TestDiscontinuousTrace(unittest.TestCase): class TestDiscontinuousTrace(unittest.TestCase):
...@@ -67,11 +73,29 @@ class TestReftek130FromFile(unittest.TestCase): ...@@ -67,11 +73,29 @@ class TestReftek130FromFile(unittest.TestCase):
self.rt130_dir = self.TEST_DATA_DIR.joinpath( self.rt130_dir = self.TEST_DATA_DIR.joinpath(
'RT130-sample/2017149.92EB/2017150/92EB' 'RT130-sample/2017149.92EB/2017150/92EB'
) )
base_path = 'sohstationviewer.model.reftek_data.reftek_reader.core'
waveform_patcher = patch(
f'{base_path}.convert_waveform_packet_to_obspy_format',
wraps=convert_waveform_packet_to_obspy_format
)
self.addCleanup(waveform_patcher.stop)
self.mock_waveform_converter = waveform_patcher.start()
soh_patcher = patch(
f'{base_path}.convert_soh_packet_to_obspy_format',
wraps=convert_soh_packet_to_obspy_format
)
self.addCleanup(soh_patcher.stop)
self.mock_soh_converter = soh_patcher.start()
def test_rt130_file(self): def test_rt130_file(self):
file = self.rt130_dir.joinpath('0/000000000_00000000') with self.subTest('test_soh_file'):
rt130 = Reftek130.from_file(file) file = self.rt130_dir.joinpath('0/000000000_00000000')
self.assertIsInstance(rt130, Reftek130) rt130 = Reftek130.from_file(file)
self.assertIsInstance(rt130, Reftek130)
with self.subTest('test_waveform_file'):
file = self.rt130_dir.joinpath('1/000000015_0036EE80')
rt130 = Reftek130.from_file(file)
self.assertIsInstance(rt130, Reftek130)
def test_rt130_soh_file(self): def test_rt130_soh_file(self):
file = self.rt130_dir.joinpath('0/000000000_00000000') file = self.rt130_dir.joinpath('0/000000000_00000000')
...@@ -79,6 +103,8 @@ class TestReftek130FromFile(unittest.TestCase): ...@@ -79,6 +103,8 @@ class TestReftek130FromFile(unittest.TestCase):
# The most common SOH packet type looks to be SH, so we use that as # The most common SOH packet type looks to be SH, so we use that as
# the default. # the default.
self.assertIn(b'SH', rt130._data['packet_type']) self.assertIn(b'SH', rt130._data['packet_type'])
self.assertTrue(self.mock_soh_converter.called)
self.assertFalse(self.mock_waveform_converter.called)
def test_rt130_raw_data_file(self): def test_rt130_raw_data_file(self):
file = self.rt130_dir.joinpath('1/000000015_0036EE80') file = self.rt130_dir.joinpath('1/000000015_0036EE80')
...@@ -87,6 +113,8 @@ class TestReftek130FromFile(unittest.TestCase): ...@@ -87,6 +113,8 @@ class TestReftek130FromFile(unittest.TestCase):
numpy.unique(numpy.sort(rt130._data['packet_type'])), numpy.unique(numpy.sort(rt130._data['packet_type'])),
numpy.sort([b'EH', b'DT', b'ET']) numpy.sort([b'EH', b'DT', b'ET'])
) )
self.assertTrue(self.mock_waveform_converter.called)
self.assertFalse(self.mock_soh_converter.called)
def test_non_rt130_file(self): def test_non_rt130_file(self):
with self.subTest('test_file_exist'): with self.subTest('test_file_exist'):
......
...@@ -8,8 +8,6 @@ from sohstationviewer.model.reftek_data.reftek_reader.packet_readers import ( ...@@ -8,8 +8,6 @@ from sohstationviewer.model.reftek_data.reftek_reader.packet_readers import (
decode_uncompressed, decode_compressed, read_dt_packet, read_eh_et_packet, decode_uncompressed, decode_compressed, read_dt_packet, read_eh_et_packet,
read_soh_packet, read_soh_packet,
) )
from sohstationviewer.model.reftek_data.reftek_reader.packets import \
SOHExtendedHeader
unpacker = Unpacker('>') unpacker = Unpacker('>')
...@@ -166,19 +164,7 @@ class TestReadEHETPacket(unittest.TestCase): ...@@ -166,19 +164,7 @@ class TestReadEHETPacket(unittest.TestCase):
class TestReadSOHPacket(unittest.TestCase): class TestReadSOHPacket(unittest.TestCase):
"""
Test suite for packet_readers.read_soh_packet. We only test that the
function has the correct interface, seeing as the intended purpose of this
method is to be compatible with packet_readers.read_dt_packet and
packet_readers.read_eh_et_packet interface-wise.
"""
def test_correct_interface(self):
packet = b' ' * 1024
extended_header, payload = read_soh_packet(packet, unpacker)
self.assertIsInstance(extended_header, SOHExtendedHeader)
self.assertIsInstance(payload, bytes)
def test_payload_has_correct_length(self): def test_payload_has_correct_length(self):
packet = b' ' * 1024 packet = b' ' * 1024
extended_header, payload = read_soh_packet(packet, unpacker) payload = read_soh_packet(packet)
self.assertEqual(len(payload), 1000) self.assertEqual(len(payload), 1008)
...@@ -3,20 +3,25 @@ import unittest ...@@ -3,20 +3,25 @@ import unittest
from pathlib import Path from pathlib import Path
from unittest.mock import patch from unittest.mock import patch
from obspy import UTCDateTime
from obspy.io.reftek.packet import PACKET_FINAL_DTYPE from obspy.io.reftek.packet import PACKET_FINAL_DTYPE
from sohstationviewer.model.mseed_data.record_reader_helper import Unpacker from sohstationviewer.model.mseed_data.record_reader_helper import Unpacker
from sohstationviewer.model.reftek_data.reftek_reader.header import \ from sohstationviewer.model.reftek_data.reftek_reader import soh_packet
NotRT130FileError from sohstationviewer.model.reftek_data.reftek_reader.header import (
NotRT130FileError, PacketHeader,
)
from sohstationviewer.model.reftek_data.reftek_reader.packet_readers import ( from sohstationviewer.model.reftek_data.reftek_reader.packet_readers import (
read_eh_et_packet, read_dt_packet, read_soh_packet, read_eh_et_packet, read_dt_packet, read_soh_packet,
) )
from sohstationviewer.model.reftek_data.reftek_reader.packets import ( from sohstationviewer.model.reftek_data.reftek_reader.packets import (
SOHPacket, SOHPacket, EHETPacket, DTPacket,
EHETPacket, DTPacket,
) )
from sohstationviewer.model.reftek_data.reftek_reader.reftek_reader_helper \ from sohstationviewer.model.reftek_data.reftek_reader.reftek_reader_helper \
import (read_rt130_file, convert_packet_to_obspy_format) import (
read_rt130_file, convert_waveform_packet_to_obspy_format,
convert_packet_header_to_dict, convert_soh_packet_to_obspy_format,
)
unpacker = Unpacker('>') unpacker = Unpacker('>')
...@@ -61,9 +66,9 @@ class TestReadRT130File(unittest.TestCase): ...@@ -61,9 +66,9 @@ class TestReadRT130File(unittest.TestCase):
self.assertTrue( self.assertTrue(
all(isinstance(packet, SOHPacket) for packet in packets) all(isinstance(packet, SOHPacket) for packet in packets)
) )
self.assertTrue(self.mock_read_soh.called)
self.assertFalse(self.mock_read_dt.called) self.assertFalse(self.mock_read_dt.called)
self.assertFalse(self.mock_read_eh_et.called) self.assertFalse(self.mock_read_eh_et.called)
self.assertTrue(self.mock_read_soh.called)
def test_rt130_raw_data_file(self): def test_rt130_raw_data_file(self):
file = self.rt130_dir.joinpath('1/000000015_0036EE80') file = self.rt130_dir.joinpath('1/000000015_0036EE80')
...@@ -72,15 +77,15 @@ class TestReadRT130File(unittest.TestCase): ...@@ -72,15 +77,15 @@ class TestReadRT130File(unittest.TestCase):
isinstance(packet, EHETPacket) or isinstance(packet, DTPacket) isinstance(packet, EHETPacket) or isinstance(packet, DTPacket)
for packet in packets) for packet in packets)
) )
self.assertFalse(self.mock_read_soh.called)
self.assertTrue(self.mock_read_dt.called) self.assertTrue(self.mock_read_dt.called)
self.assertTrue(self.mock_read_eh_et.called) self.assertTrue(self.mock_read_eh_et.called)
self.assertFalse(self.mock_read_soh.called)
def test_non_rt130_file(self): def test_non_rt130_file(self):
with self.subTest('test_file_exist'): with self.subTest('test_file_exist'):
file = self.TEST_DATA_DIR.joinpath( file = self.TEST_DATA_DIR.joinpath(
'Q330-sample/day_vols_AX08/AX08.XA..HHE.2021.186' 'Q330-sample/day_vols_AX08/AX08.XA..HHE.2021.186'
) ).as_posix()
with self.assertRaises(NotRT130FileError): with self.assertRaises(NotRT130FileError):
read_rt130_file(file, unpacker) read_rt130_file(file, unpacker)
...@@ -90,18 +95,66 @@ class TestReadRT130File(unittest.TestCase): ...@@ -90,18 +95,66 @@ class TestReadRT130File(unittest.TestCase):
read_rt130_file(file, unpacker) read_rt130_file(file, unpacker)
class TestConvertPacketToObspyFormat(unittest.TestCase): class TestConvertPacketHeaderToDict(unittest.TestCase):
def setUp(self):
self.header = PacketHeader('EH', 1000, '98E1',
UTCDateTime(year=1951, month=1, day=1),
1024, 1)
def test_all_fields_are_in_the_converted_dict(self):
expected_fields = ['packet_type', 'experiment_number', 'year',
'unit_id', 'time', 'byte_count', 'packet_sequence']
actual_field = list(convert_packet_header_to_dict(self.header).keys())
self.assertEqual(expected_fields, actual_field)
def test_year_is_converted_correctly(self):
with self.subTest('test_year_has_fewer_than_two_digits'):
self.header.time = UTCDateTime(year=51, month=1, day=1)
expected_year = 51
actual_year = convert_packet_header_to_dict(self.header)['year']
self.assertEqual(actual_year, expected_year)
with self.subTest('test_year_has_more_than_two_digits'):
self.header.time = UTCDateTime(year=2352, month=1, day=1)
expected_year = 52
actual_year = convert_packet_header_to_dict(self.header)['year']
self.assertEqual(actual_year, expected_year)
with self.subTest('test_year_has_two_digits'):
self.header.time = UTCDateTime(year=9, month=1, day=1)
expected_year = 9
actual_year = convert_packet_header_to_dict(self.header)['year']
self.assertEqual(actual_year, expected_year)
class TestConvertWaveformPacketToObspyFormat(unittest.TestCase):
def setUp(self) -> None: def setUp(self) -> None:
TEST_DATA_DIR = Path(os.getcwd()).joinpath('tests/test_data') TEST_DATA_DIR = Path(os.getcwd()).joinpath('tests/test_data')
rt130_dir = TEST_DATA_DIR.joinpath( rt130_dir = TEST_DATA_DIR.joinpath(
'RT130-sample/2017149.92EB/2017150/92EB' 'RT130-sample/2017149.92EB/2017150/92EB'
) )
file = rt130_dir.joinpath('1/000000015_0036EE80') file = rt130_dir.joinpath('1/000000015_0036EE80').as_posix()
self.packet = read_rt130_file(file, unpacker)[0] self.packet = read_rt130_file(file, unpacker)[0]
def test_all_needed_fields_are_available(self): def test_all_needed_fields_are_available(self):
converted_packet = convert_packet_to_obspy_format( converted_packet = convert_waveform_packet_to_obspy_format(
self.packet, unpacker self.packet, unpacker
) )
self.assertEqual(len(converted_packet), len(PACKET_FINAL_DTYPE)) self.assertEqual(len(converted_packet), len(PACKET_FINAL_DTYPE))
class TestConvertSOHPacketToObspyFormat(unittest.TestCase):
def setUp(self) -> None:
TEST_DATA_DIR = Path(os.getcwd()).joinpath('tests/test_data')
rt130_dir = TEST_DATA_DIR.joinpath(
'RT130-sample/2017149.92EB/2017150/92EB'
)
file = rt130_dir.joinpath('0/000000000_00000000').as_posix()
self.packet = read_rt130_file(file, unpacker)[0]
def test_all_needed_fields_are_available(self):
converted_packet = convert_soh_packet_to_obspy_format(
self.packet, unpacker
)
self.assertEqual(len(converted_packet),
len(soh_packet.PACKET_FINAL_DTYPE))