Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
Commits on Source (2)
from __future__ import annotations
from sohstationviewer.model.reftek_data.reftek_reader import soh_packet
"""
Suggested updates to obspy.io.reftek.core:
- modify section of the code that deals with upacking data
......@@ -26,7 +28,10 @@ from sohstationviewer.model.general_data.general_record_helper import Unpacker
from sohstationviewer.model.reftek_data.reftek_reader.packet import EHPacket
from sohstationviewer.model.reftek_data.reftek_reader.reftek_reader_helper \
import (read_rt130_file, convert_packet_to_obspy_format)
import (
read_rt130_file, convert_waveform_packet_to_obspy_format,
convert_soh_packet_to_obspy_format,
)
class DiscontinuousTrace(Trace):
......@@ -34,6 +39,7 @@ class DiscontinuousTrace(Trace):
Extension of obspy.Trace that changes the way time data is handled when
reading data using the method from logpeek/qpeek.
"""
def __init__(self, *args, times: np.ndarray, **kwargs):
super().__init__(*args, **kwargs)
self._times = times
......@@ -73,6 +79,7 @@ class Reftek130(obspy_rt130_core.Reftek130):
Child class of obspy.Reftek that reads waveform data similar to logpeek for
better performance.
"""
@staticmethod
def from_file(file: Union[str, Path]) -> Reftek130:
"""
......@@ -85,11 +92,17 @@ class Reftek130(obspy_rt130_core.Reftek130):
rt = Reftek130()
rt._filename = file
packets_in_file = read_rt130_file(file, rt130_unpacker)
first_packet_type = packets_in_file[0].header.packet_type
if first_packet_type in ['EH', 'ET', 'DT']:
packet_converter = convert_waveform_packet_to_obspy_format
final_dtype = PACKET_FINAL_DTYPE
else:
packet_converter = convert_soh_packet_to_obspy_format
final_dtype = soh_packet.PACKET_FINAL_DTYPE
converted_packets = []
for packet in packets_in_file:
converted_packets.append(
convert_packet_to_obspy_format(packet, rt130_unpacker))
rt._data = np.array(converted_packets, dtype=PACKET_FINAL_DTYPE)
converted_packets.append(packet_converter(packet, rt130_unpacker))
rt._data = np.array(converted_packets, dtype=final_dtype)
return rt
def to_stream(self, network: str = "", location: str = "",
......@@ -167,9 +180,9 @@ class Reftek130(obspy_rt130_core.Reftek130):
# split into contiguous blocks, i.e. find gaps. packet sequence
# was sorted already..
endtimes = (
packets[:-1]["time"] +
packets[:-1]["number_of_samples"].astype(np.int64) *
delta_nanoseconds)
packets[:-1]["time"] +
packets[:-1]["number_of_samples"].astype(np.int64) *
delta_nanoseconds)
# check if next starttime matches seamless to last chunk
# 1e-3 seconds == 1e6 nanoseconds is the smallest time
# difference reftek130 format can represent, so anything larger
......@@ -204,7 +217,7 @@ class Reftek130(obspy_rt130_core.Reftek130):
npts = sample_data.size
tr = DiscontinuousTrace(
data=sample_data, header=copy.deepcopy(header),
times=(packets_['time'] / 10**9).round(3)
times=(packets_['time'] / 10 ** 9).round(3)
)
# The plotting process needs to know about the number of
# points stored in the trace. However, tr.stats use the
......
from typing import Tuple, Any
import numpy
from obspy.io.reftek.util import bcd
from sohstationviewer.model.general_data.general_record_helper import Unpacker
from sohstationviewer.model.reftek_data.reftek_reader.packet import \
eh_et_payload_end_in_packet
from sohstationviewer.model.reftek_data.reftek_reader.packets import (
DTExtendedHeader,
EHETExtendedHeader, SOHExtendedHeader,
DTExtendedHeader, EHETExtendedHeader,
)
......@@ -113,37 +109,11 @@ def read_eh_et_packet(packet: bytes, unpacker: Unpacker
return extended_header, payload
def bcd_16bit_int(_i) -> int:
"""
Reimplement a private function of the same name in obspy. Kept here in case
the private function is removed in a future obspy version.
:param _i: the byte string to convert into a 16-bite integer
:return: a 16-bit integer
"""
_i = bcd(_i)
return _i[0] * 100 + _i[1]
def read_soh_packet(packet: bytes, unpacker: Unpacker
) -> Tuple[SOHExtendedHeader, bytes]:
def read_soh_packet(packet: bytes) -> bytes:
"""
Process an SOH packet and get its extended header and poyload.
Process an SOH packet and get its payload.
:param packet: the bytes that make up the given SOH packet.
:param unpacker: the unpacker to use to decode the data.
:return: the extended header and payload of the given SOH packet.
:return: the payload of the given SOH packet.
"""
event_number = bcd_16bit_int(numpy.frombuffer(packet[16:18], numpy.uint8))
data_stream_number = bcd(numpy.frombuffer(packet[18:19], numpy.uint8))
channel_number = bcd(numpy.frombuffer(packet[19:20], numpy.uint8))
number_of_samples = bcd_16bit_int(
numpy.frombuffer(packet[20:22], numpy.uint8)
)
flags = unpacker.unpack('B', packet[22:23])[0]
data_format = packet[23:24].hex().upper()
extended_header = SOHExtendedHeader(event_number, data_stream_number,
channel_number, number_of_samples,
flags, data_format)
payload = packet[24:]
return extended_header, payload
payload = packet[16:]
return payload
......@@ -58,32 +58,10 @@ class EHETPacket:
data: bytes
@dataclasses.dataclass
class SOHExtendedHeader:
"""
A collection of dummy data for some information needed so that
core.Reftek130 can understand SOH packets.
core.Reftek130 focuses on reading waveform data, so it wants information
available in the waveform packets (EH/ET/DT). However, core.Reftek130 also
supports SOH packets, which does not contain the required information. As
a result, we need to store dummy data in its place.
"""
event_number: int
data_stream_number: int
channel_number: int
number_of_samples: int
flags: int
data_format: str
@dataclasses.dataclass
class SOHPacket:
"""
The decoded data of an SOH packet. The extended_header field is to ensure
compatibility with dt_packet.DTPacket. SOH packets do not have an
extended header otherwise.
The decoded data of an SOH packet.
"""
header: PacketHeader
extended_header: SOHExtendedHeader
data: bytes
payload: bytes
import os
from typing import Any, Dict, Callable, Union, List, Tuple
from typing import Any, Dict, Union, List, Tuple
import numpy
import numpy as np
......@@ -13,8 +13,9 @@ from sohstationviewer.model.reftek_data.reftek_reader.packet_readers import (
from sohstationviewer.model.reftek_data.reftek_reader.packets import (
DTPacket, EHETPacket, SOHPacket,
)
from sohstationviewer.model.reftek_data.reftek_reader.header import \
get_rt130_packet_header
from sohstationviewer.model.reftek_data.reftek_reader.header import (
get_rt130_packet_header, PacketHeader,
)
def packet_reader_placeholder(*args: Any, **kwargs: Any) -> Tuple[Any, Any]:
......@@ -45,42 +46,45 @@ def read_rt130_file(file_name: str, unpacker: Unpacker
packet = rt130_file.read(1024)
packet_header = get_rt130_packet_header(packet)
waveform_handlers: Dict[str, Callable] = {
'EH': read_eh_et_packet,
'ET': read_eh_et_packet,
'DT': read_dt_packet,
}
soh_handlers: Dict[str, Callable] = dict.fromkeys(
['AD', 'CD', 'DS', 'FD', 'OM', 'SC', 'SH'],
read_soh_packet
)
packet_handlers = {
**waveform_handlers, **soh_handlers
}
packet_handler = packet_handlers.get(
packet_header.packet_type, packet_reader_placeholder
)
return_val = packet_handler(packet, unpacker)
if packet_header.packet_type == 'DT':
packet_type = DTPacket
elif packet_header.packet_type in ['EH', 'ET']:
packet_type = EHETPacket
if packet_header.packet_type in ['EH', 'ET', 'DT']:
waveform_handlers = {
'EH': (read_eh_et_packet, EHETPacket),
'ET': (read_eh_et_packet, EHETPacket),
'DT': (read_dt_packet, DTPacket),
}
packet_handlers = {
**waveform_handlers
}
packet_handler, packet_type = packet_handlers.get(
packet_header.packet_type, packet_reader_placeholder
)
return_val = packet_handler(packet, unpacker)
extended_header, data = return_val
current_packet = packet_type(packet_header, extended_header,
data)
else:
packet_type = SOHPacket
payload = read_soh_packet(packet)
current_packet = SOHPacket(packet_header, payload)
extended_header, data = return_val
current_packet = packet_type(packet_header, extended_header, data)
packets.append(current_packet)
return packets
def convert_packet_to_obspy_format(packet: Union[EHETPacket, DTPacket,
SOHPacket],
unpacker: Unpacker) -> Tuple:
def convert_packet_header_to_dict(packet_header: PacketHeader) -> Dict:
converted_header = {'packet_type': packet_header.packet_type,
'experiment_number': packet_header.experiment_number,
# Obspy only stores the last two digits of the year.
'year': packet_header.time.year % 100,
'unit_id': packet_header.unit_id,
'time': packet_header.time.ns,
'byte_count': packet_header.byte_count,
'packet_sequence': packet_header.packet_sequence}
return converted_header
def convert_waveform_packet_to_obspy_format(
packet: Union[EHETPacket, DTPacket], unpacker: Unpacker) -> Tuple:
"""
Convert an RT130 packet into a numpy array of type PACKET_FINAL_DTYPE
:param packet: an RT130 packet.
......@@ -91,15 +95,7 @@ def convert_packet_to_obspy_format(packet: Union[EHETPacket, DTPacket,
# We want to convert the packet to a tuple. In order to make it easier to
# maintain, we first convert the packet to a dictionary. Then, we grab the
# values of the dictionary as tuple to get the final result.
converted_packet = {}
converted_packet['packet_type'] = packet.header.packet_type
converted_packet['experiment_number'] = packet.header.experiment_number
# Obspy only stores the last two digits of the year.
converted_packet['year'] = packet.header.time.year % 100
converted_packet['unit_id'] = packet.header.unit_id
converted_packet['time'] = packet.header.time.ns
converted_packet['byte_count'] = packet.header.byte_count
converted_packet['packet_sequence'] = packet.header.packet_sequence
converted_packet = convert_packet_header_to_dict(packet.header)
converted_packet['event_number'] = packet.extended_header.event_number
converted_packet[
'data_stream_number'] = packet.extended_header.data_stream_number
......@@ -133,3 +129,10 @@ def convert_packet_to_obspy_format(packet: Union[EHETPacket, DTPacket,
else:
converted_packet['payload'] = numpy.frombuffer(packet.data, np.uint8)
return tuple(converted_packet.values())
def convert_soh_packet_to_obspy_format(packet: SOHPacket, unpacker: Unpacker
) -> Tuple:
converted_packet = convert_packet_header_to_dict(packet.header)
converted_packet['payload'] = numpy.frombuffer(packet.payload, np.uint8)
return tuple(converted_packet.values())
import os
import unittest
from unittest.mock import patch
from pathlib import Path
import numpy
......@@ -12,6 +13,11 @@ from sohstationviewer.model.reftek_data.reftek_reader.core import (
)
from sohstationviewer.model.reftek_data.reftek_reader.header import \
NotRT130FileError
from sohstationviewer.model.reftek_data.reftek_reader.reftek_reader_helper \
import (
convert_soh_packet_to_obspy_format,
convert_waveform_packet_to_obspy_format,
)
class TestDiscontinuousTrace(unittest.TestCase):
......@@ -67,11 +73,29 @@ class TestReftek130FromFile(unittest.TestCase):
self.rt130_dir = self.TEST_DATA_DIR.joinpath(
'RT130-sample/2017149.92EB/2017150/92EB'
)
base_path = 'sohstationviewer.model.reftek_data.reftek_reader.core'
waveform_patcher = patch(
f'{base_path}.convert_waveform_packet_to_obspy_format',
wraps=convert_waveform_packet_to_obspy_format
)
self.addCleanup(waveform_patcher.stop)
self.mock_waveform_converter = waveform_patcher.start()
soh_patcher = patch(
f'{base_path}.convert_soh_packet_to_obspy_format',
wraps=convert_soh_packet_to_obspy_format
)
self.addCleanup(soh_patcher.stop)
self.mock_soh_converter = soh_patcher.start()
def test_rt130_file(self):
file = self.rt130_dir.joinpath('0/000000000_00000000')
rt130 = Reftek130.from_file(file)
self.assertIsInstance(rt130, Reftek130)
with self.subTest('test_soh_file'):
file = self.rt130_dir.joinpath('0/000000000_00000000')
rt130 = Reftek130.from_file(file)
self.assertIsInstance(rt130, Reftek130)
with self.subTest('test_waveform_file'):
file = self.rt130_dir.joinpath('1/000000015_0036EE80')
rt130 = Reftek130.from_file(file)
self.assertIsInstance(rt130, Reftek130)
def test_rt130_soh_file(self):
file = self.rt130_dir.joinpath('0/000000000_00000000')
......@@ -79,6 +103,8 @@ class TestReftek130FromFile(unittest.TestCase):
# The most common SOH packet type looks to be SH, so we use that as
# the default.
self.assertIn(b'SH', rt130._data['packet_type'])
self.assertTrue(self.mock_soh_converter.called)
self.assertFalse(self.mock_waveform_converter.called)
def test_rt130_raw_data_file(self):
file = self.rt130_dir.joinpath('1/000000015_0036EE80')
......@@ -87,6 +113,8 @@ class TestReftek130FromFile(unittest.TestCase):
numpy.unique(numpy.sort(rt130._data['packet_type'])),
numpy.sort([b'EH', b'DT', b'ET'])
)
self.assertTrue(self.mock_waveform_converter.called)
self.assertFalse(self.mock_soh_converter.called)
def test_non_rt130_file(self):
with self.subTest('test_file_exist'):
......
......@@ -8,8 +8,6 @@ from sohstationviewer.model.reftek_data.reftek_reader.packet_readers import (
decode_uncompressed, decode_compressed, read_dt_packet, read_eh_et_packet,
read_soh_packet,
)
from sohstationviewer.model.reftek_data.reftek_reader.packets import \
SOHExtendedHeader
unpacker = Unpacker('>')
......@@ -166,19 +164,7 @@ class TestReadEHETPacket(unittest.TestCase):
class TestReadSOHPacket(unittest.TestCase):
"""
Test suite for packet_readers.read_soh_packet. We only test that the
function has the correct interface, seeing as the intended purpose of this
method is to be compatible with packet_readers.read_dt_packet and
packet_readers.read_eh_et_packet interface-wise.
"""
def test_correct_interface(self):
packet = b' ' * 1024
extended_header, payload = read_soh_packet(packet, unpacker)
self.assertIsInstance(extended_header, SOHExtendedHeader)
self.assertIsInstance(payload, bytes)
def test_payload_has_correct_length(self):
packet = b' ' * 1024
extended_header, payload = read_soh_packet(packet, unpacker)
self.assertEqual(len(payload), 1000)
payload = read_soh_packet(packet)
self.assertEqual(len(payload), 1008)
......@@ -3,20 +3,25 @@ import unittest
from pathlib import Path
from unittest.mock import patch
from obspy import UTCDateTime
from obspy.io.reftek.packet import PACKET_FINAL_DTYPE
from sohstationviewer.model.mseed_data.record_reader_helper import Unpacker
from sohstationviewer.model.reftek_data.reftek_reader.header import \
NotRT130FileError
from sohstationviewer.model.reftek_data.reftek_reader import soh_packet
from sohstationviewer.model.reftek_data.reftek_reader.header import (
NotRT130FileError, PacketHeader,
)
from sohstationviewer.model.reftek_data.reftek_reader.packet_readers import (
read_eh_et_packet, read_dt_packet, read_soh_packet,
)
from sohstationviewer.model.reftek_data.reftek_reader.packets import (
SOHPacket,
EHETPacket, DTPacket,
SOHPacket, EHETPacket, DTPacket,
)
from sohstationviewer.model.reftek_data.reftek_reader.reftek_reader_helper \
import (read_rt130_file, convert_packet_to_obspy_format)
import (
read_rt130_file, convert_waveform_packet_to_obspy_format,
convert_packet_header_to_dict, convert_soh_packet_to_obspy_format,
)
unpacker = Unpacker('>')
......@@ -61,9 +66,9 @@ class TestReadRT130File(unittest.TestCase):
self.assertTrue(
all(isinstance(packet, SOHPacket) for packet in packets)
)
self.assertTrue(self.mock_read_soh.called)
self.assertFalse(self.mock_read_dt.called)
self.assertFalse(self.mock_read_eh_et.called)
self.assertTrue(self.mock_read_soh.called)
def test_rt130_raw_data_file(self):
file = self.rt130_dir.joinpath('1/000000015_0036EE80')
......@@ -72,15 +77,15 @@ class TestReadRT130File(unittest.TestCase):
isinstance(packet, EHETPacket) or isinstance(packet, DTPacket)
for packet in packets)
)
self.assertFalse(self.mock_read_soh.called)
self.assertTrue(self.mock_read_dt.called)
self.assertTrue(self.mock_read_eh_et.called)
self.assertFalse(self.mock_read_soh.called)
def test_non_rt130_file(self):
with self.subTest('test_file_exist'):
file = self.TEST_DATA_DIR.joinpath(
'Q330-sample/day_vols_AX08/AX08.XA..HHE.2021.186'
)
).as_posix()
with self.assertRaises(NotRT130FileError):
read_rt130_file(file, unpacker)
......@@ -90,18 +95,66 @@ class TestReadRT130File(unittest.TestCase):
read_rt130_file(file, unpacker)
class TestConvertPacketToObspyFormat(unittest.TestCase):
class TestConvertPacketHeaderToDict(unittest.TestCase):
def setUp(self):
self.header = PacketHeader('EH', 1000, '98E1',
UTCDateTime(year=1951, month=1, day=1),
1024, 1)
def test_all_fields_are_in_the_converted_dict(self):
expected_fields = ['packet_type', 'experiment_number', 'year',
'unit_id', 'time', 'byte_count', 'packet_sequence']
actual_field = list(convert_packet_header_to_dict(self.header).keys())
self.assertEqual(expected_fields, actual_field)
def test_year_is_converted_correctly(self):
with self.subTest('test_year_has_fewer_than_two_digits'):
self.header.time = UTCDateTime(year=51, month=1, day=1)
expected_year = 51
actual_year = convert_packet_header_to_dict(self.header)['year']
self.assertEqual(actual_year, expected_year)
with self.subTest('test_year_has_more_than_two_digits'):
self.header.time = UTCDateTime(year=2352, month=1, day=1)
expected_year = 52
actual_year = convert_packet_header_to_dict(self.header)['year']
self.assertEqual(actual_year, expected_year)
with self.subTest('test_year_has_two_digits'):
self.header.time = UTCDateTime(year=9, month=1, day=1)
expected_year = 9
actual_year = convert_packet_header_to_dict(self.header)['year']
self.assertEqual(actual_year, expected_year)
class TestConvertWaveformPacketToObspyFormat(unittest.TestCase):
def setUp(self) -> None:
TEST_DATA_DIR = Path(os.getcwd()).joinpath('tests/test_data')
rt130_dir = TEST_DATA_DIR.joinpath(
'RT130-sample/2017149.92EB/2017150/92EB'
)
file = rt130_dir.joinpath('1/000000015_0036EE80')
file = rt130_dir.joinpath('1/000000015_0036EE80').as_posix()
self.packet = read_rt130_file(file, unpacker)[0]
def test_all_needed_fields_are_available(self):
converted_packet = convert_packet_to_obspy_format(
converted_packet = convert_waveform_packet_to_obspy_format(
self.packet, unpacker
)
self.assertEqual(len(converted_packet), len(PACKET_FINAL_DTYPE))
class TestConvertSOHPacketToObspyFormat(unittest.TestCase):
def setUp(self) -> None:
TEST_DATA_DIR = Path(os.getcwd()).joinpath('tests/test_data')
rt130_dir = TEST_DATA_DIR.joinpath(
'RT130-sample/2017149.92EB/2017150/92EB'
)
file = rt130_dir.joinpath('0/000000000_00000000').as_posix()
self.packet = read_rt130_file(file, unpacker)[0]
def test_all_needed_fields_are_available(self):
converted_packet = convert_soh_packet_to_obspy_format(
self.packet, unpacker
)
self.assertEqual(len(converted_packet),
len(soh_packet.PACKET_FINAL_DTYPE))