diff --git a/sohstationviewer/model/reftek/reftek_data/eh_et_packet.py b/sohstationviewer/model/reftek/reftek_data/eh_et_packet.py deleted file mode 100644 index 6f550e48a802304aaeaeb29fa6235fcd0ce81146..0000000000000000000000000000000000000000 --- a/sohstationviewer/model/reftek/reftek_data/eh_et_packet.py +++ /dev/null @@ -1,61 +0,0 @@ -import dataclasses -from typing import Tuple - -from sohstationviewer.model.mseed_data.record_reader_helper import Unpacker -from sohstationviewer.model.reftek.reftek_data.packet import \ - eh_et_payload_end_in_packet -from sohstationviewer.model.reftek.reftek_data.header import PacketHeader - - -@dataclasses.dataclass -class EHETExtendedHeader: - """ - A collection of some useful information about an EH/ET packet. Technically, - EH/ET packets do not have extended headers. We name this class what it is - due to the way obspy.Reftek130 (and consequently, core.Reftek130) stores - the data of processed packets. For more information, refer to - Reftek130._data. - """ - event_number: int - data_stream_number: int - flags: int - data_format: str - - def __post_init__(self): - self.channel_number = 0 - self.number_of_samples = 0 - - -@dataclasses.dataclass -class EHETPacket: - """ - The decoded data of an EH/ET packet. The extended_header field is to ensure - compatibility with dt_packet.DTPacket. EH/ET packets do not have an - extended header otherwise. - """ - header: PacketHeader - extended_header: EHETExtendedHeader - data: bytes - - -def read_eh_et_packet(packet: bytes, unpacker: Unpacker - ) -> Tuple[EHETExtendedHeader, bytes]: - """ - Process an EH/ET packet and get its extended header and required part of - the payload. - :param packet: the bytes that make up the given EH/ET packet. - :param unpacker: the unpacker to use to decode the data. - :return: the extended header and truncated payload of the given EH/ET - packet. - """ - event_number = int(packet[16:18].hex()) - data_stream_number = int(packet[18:19].hex()) - flags = unpacker.unpack('B', packet[22:23])[0] - data_format = packet[23:24].hex().upper() - - extended_header = EHETExtendedHeader(event_number, data_stream_number, - flags, data_format) - # The largest possible data point has a size of 4 bytes, so we need to - # grab at least data. - payload = packet[24:eh_et_payload_end_in_packet] - return extended_header, payload diff --git a/sohstationviewer/model/reftek/reftek_data/header.py b/sohstationviewer/model/reftek/reftek_data/header.py index 71db638e8cbf55516784573b6128ed8766b2708a..7164988838188cb278cf7eba27899a60447a5739 100644 --- a/sohstationviewer/model/reftek/reftek_data/header.py +++ b/sohstationviewer/model/reftek/reftek_data/header.py @@ -2,7 +2,12 @@ import dataclasses from obspy import UTCDateTime -from sohstationviewer.model.reftek.reftek_data import reftek_helper + +class NotRT130FileError(Exception): + """ + Error to raise when there is a problem with parsing RT130 data. + """ + pass @dataclasses.dataclass @@ -68,13 +73,13 @@ def get_rt130_packet_header(rt130_packet: bytes) -> PacketHeader: except UnicodeError: print('Cannot decode packet type.') print('The given file does not appear to be a valid RT130 file.') - raise reftek_helper.RT130ParseError + raise NotRT130FileError valid_packet_types = ['AD', 'CD', 'DS', 'DT', 'EH', 'ET', 'OM', 'SH', 'SC', 'FD'] if packet_type not in valid_packet_types: print(f'Invalid packet type found: {packet_type}') print('The given file does not appear to be a valid RT130 file.') - raise reftek_helper.RT130ParseError + raise NotRT130FileError experiment_number = int(rt130_packet[2:3].hex()) year = int(rt130_packet[3:4].hex()) diff --git a/sohstationviewer/model/reftek/reftek_data/dt_packet.py b/sohstationviewer/model/reftek/reftek_data/packet_readers.py similarity index 56% rename from sohstationviewer/model/reftek/reftek_data/dt_packet.py rename to sohstationviewer/model/reftek/reftek_data/packet_readers.py index 0a95ffbe1555559b1df006987cf8a5e4eb707b1a..4df12515647a3d13a65547ad332ad3f7cb7fb248 100644 --- a/sohstationviewer/model/reftek/reftek_data/dt_packet.py +++ b/sohstationviewer/model/reftek/reftek_data/packet_readers.py @@ -1,31 +1,15 @@ -import dataclasses -from typing import Any, Tuple - -from sohstationviewer.model.mseed_data.record_reader_helper import Unpacker -from sohstationviewer.model.reftek.reftek_data.header import PacketHeader - - -@dataclasses.dataclass -class DTExtendedHeader: - """ - The extended header of a DT packet. - """ - event_number: int - data_stream_number: int - channel_number: int - number_of_samples: int - flags: int - data_format: str +from typing import Tuple, Any +import numpy +from obspy.io.reftek.util import bcd -@dataclasses.dataclass -class DTPacket: - """ - The decoded data of a DT packet. - """ - header: PacketHeader - extended_header: DTExtendedHeader - data: int +from sohstationviewer.model.mseed_data.record_reader_helper import Unpacker +from sohstationviewer.model.reftek.reftek_data.packet import \ + eh_et_payload_end_in_packet +from sohstationviewer.model.reftek.reftek_data.packets import ( + DTExtendedHeader, + EHETExtendedHeader, SOHExtendedHeader, +) def decode_uncompressed(packet: bytes, data_format: str, unpacker: Unpacker @@ -104,3 +88,62 @@ def read_dt_packet(packet: bytes, unpacker: Unpacker flags, data_format) first_data_point = decoders[data_format](packet, data_format, unpacker) return extended_header, first_data_point + + +def read_eh_et_packet(packet: bytes, unpacker: Unpacker + ) -> Tuple[EHETExtendedHeader, bytes]: + """ + Process an EH/ET packet and get its extended header and required part of + the payload. + :param packet: the bytes that make up the given EH/ET packet. + :param unpacker: the unpacker to use to decode the data. + :return: the extended header and truncated payload of the given EH/ET + packet. + """ + event_number = int(packet[16:18].hex()) + data_stream_number = int(packet[18:19].hex()) + flags = unpacker.unpack('B', packet[22:23])[0] + data_format = packet[23:24].hex().upper() + + extended_header = EHETExtendedHeader(event_number, data_stream_number, + flags, data_format) + # The largest possible data point has a size of 4 bytes, so we need to + # grab at least data. + payload = packet[24:eh_et_payload_end_in_packet] + return extended_header, payload + + +def bcd_16bit_int(_i) -> int: + """ + Reimplement a private function of the same name in obspy. Kept here in case + the private function is removed in a future obspy version. + :param _i: the byte string to convert into a 16-bite integer + :return: a 16-bit integer + """ + _i = bcd(_i) + return _i[0] * 100 + _i[1] + + +def read_soh_packet(packet: bytes, unpacker: Unpacker + ) -> Tuple[SOHExtendedHeader, bytes]: + """ + Process an SOH packet and get its extended header and poyload. + :param packet: the bytes that make up the given SOH packet. + :param unpacker: the unpacker to use to decode the data. + :return: the extended header and payload of the given SOH packet. + """ + + event_number = bcd_16bit_int(numpy.frombuffer(packet[16:18], numpy.uint8)) + data_stream_number = bcd(numpy.frombuffer(packet[18:19], numpy.uint8)) + channel_number = bcd(numpy.frombuffer(packet[19:20], numpy.uint8)) + number_of_samples = bcd_16bit_int( + numpy.frombuffer(packet[20:22], numpy.uint8) + ) + flags = unpacker.unpack('B', packet[22:23])[0] + data_format = packet[23:24].hex().upper() + + extended_header = SOHExtendedHeader(event_number, data_stream_number, + channel_number, number_of_samples, + flags, data_format) + payload = packet[24:] + return extended_header, payload diff --git a/sohstationviewer/model/reftek/reftek_data/packets.py b/sohstationviewer/model/reftek/reftek_data/packets.py new file mode 100644 index 0000000000000000000000000000000000000000..5272ecb35f6f88bd641b370bccef0f04055d6cb9 --- /dev/null +++ b/sohstationviewer/model/reftek/reftek_data/packets.py @@ -0,0 +1,88 @@ +import dataclasses + +from sohstationviewer.model.reftek.reftek_data.header import PacketHeader + + +@dataclasses.dataclass +class DTExtendedHeader: + """ + The extended header of a DT packet. + """ + event_number: int + data_stream_number: int + channel_number: int + number_of_samples: int + flags: int + data_format: str + + +@dataclasses.dataclass +class DTPacket: + """ + The decoded data of a DT packet. + """ + header: PacketHeader + extended_header: DTExtendedHeader + data: int + + +@dataclasses.dataclass +class EHETExtendedHeader: + """ + A collection of some useful information about an EH/ET packet. Technically, + EH/ET packets do not have extended headers. We name this class what it is + due to the way obspy.Reftek130 (and consequently, core.Reftek130) stores + the data of processed packets. For more information, refer to + Reftek130._data. + """ + event_number: int + data_stream_number: int + flags: int + data_format: str + + def __post_init__(self): + self.channel_number = 0 + self.number_of_samples = 0 + + +@dataclasses.dataclass +class EHETPacket: + """ + The decoded data of an EH/ET packet. The extended_header field is to ensure + compatibility with dt_packet.DTPacket. EH/ET packets do not have an + extended header otherwise. + """ + header: PacketHeader + extended_header: EHETExtendedHeader + data: bytes + + +@dataclasses.dataclass +class SOHExtendedHeader: + """ + A collection of dummy data for some information needed so that + core.Reftek130 can understand SOH packets. + + core.Reftek130 focuses on reading waveform data, so it wants information + available in the waveform packets (EH/ET/DT). However, core.Reftek130 also + supports SOH packets, which does not contain the required information. As + a result, we need to store dummy data in its place. + """ + event_number: int + data_stream_number: int + channel_number: int + number_of_samples: int + flags: int + data_format: str + + +@dataclasses.dataclass +class SOHPacket: + """ + The decoded data of an SOH packet. The extended_header field is to ensure + compatibility with dt_packet.DTPacket. SOH packets do not have an + extended header otherwise. + """ + header: PacketHeader + extended_header: SOHExtendedHeader + data: bytes diff --git a/sohstationviewer/model/reftek/reftek_data/reftek_helper.py b/sohstationviewer/model/reftek/reftek_data/reftek_helper.py index 11ab74d78ffab2365f1cdd10123cc1c36575f826..5c40589ad28a7d29ebc12df371ac0bf8ff5c95ef 100644 --- a/sohstationviewer/model/reftek/reftek_data/reftek_helper.py +++ b/sohstationviewer/model/reftek/reftek_data/reftek_helper.py @@ -7,17 +7,16 @@ import numpy as np from sohstationviewer.model.mseed_data.record_reader_helper import Unpacker from sohstationviewer.model.reftek.reftek_data.packet import \ eh_et_payload_end_in_packet -from sohstationviewer.model.reftek.reftek_data.dt_packet import ( - read_dt_packet, DTPacket, +from sohstationviewer.model.reftek.reftek_data.packet_readers import \ + ( + read_dt_packet, read_eh_et_packet, read_soh_packet, ) -from sohstationviewer.model.reftek.reftek_data.eh_et_packet import ( - read_eh_et_packet, EHETPacket, +from sohstationviewer.model.reftek.reftek_data.packets import ( + DTPacket, + EHETPacket, SOHPacket, ) from sohstationviewer.model.reftek.reftek_data.header import \ get_rt130_packet_header -from sohstationviewer.model.reftek.reftek_data.soh_packets import ( - read_soh_packet, SOHPacket, -) def packet_reader_placeholder(*args: Any, **kwargs: Any) -> Tuple[Any, Any]: @@ -28,13 +27,6 @@ def packet_reader_placeholder(*args: Any, **kwargs: Any) -> Tuple[Any, Any]: return None, None -class RT130ParseError(Exception): - """ - Error to raise when there is a problem with parsing RT130 data. - """ - pass - - def read_rt130_file(file_name: str, unpacker: Unpacker ) -> List[Union[EHETPacket, DTPacket, SOHPacket]]: """ diff --git a/sohstationviewer/model/reftek/reftek_data/soh_packets.py b/sohstationviewer/model/reftek/reftek_data/soh_packets.py deleted file mode 100644 index e5a62b2750e8086e336f3c1ae54cffb585e04638..0000000000000000000000000000000000000000 --- a/sohstationviewer/model/reftek/reftek_data/soh_packets.py +++ /dev/null @@ -1,76 +0,0 @@ -import dataclasses -from typing import Tuple - -from obspy.io.reftek.util import bcd - -from sohstationviewer.model.mseed_data.record_reader_helper import Unpacker -from sohstationviewer.model.reftek.reftek_data.header import PacketHeader - -import numpy - - -@dataclasses.dataclass -class SOHExtendedHeader: - """ - A collection of dummy data for some information needed so that - core.Reftek130 can understand SOH packets. - - core.Reftek130 focuses on reading waveform data, so it wants information - available in the waveform packets (EH/ET/DT). However, core.Reftek130 also - supports SOH packets, which does not contain the required information. As - a result, we need to store dummy data in its place. - """ - event_number: int - data_stream_number: int - channel_number: int - number_of_samples: int - flags: int - data_format: str - - -@dataclasses.dataclass -class SOHPacket: - """ - The decoded data of an SOH packet. The extended_header field is to ensure - compatibility with dt_packet.DTPacket. SOH packets do not have an - extended header otherwise. - """ - header: PacketHeader - extended_header: SOHExtendedHeader - data: bytes - - -def bcd_16bit_int(_i) -> int: - """ - Reimplement a private function of the same name in obspy. Kept here in case - the private function is removed in a future obspy version. - :param _i: the byte string to convert into a 16-bite integer - :return: a 16-bit integer - """ - _i = bcd(_i) - return _i[0] * 100 + _i[1] - - -def read_soh_packet(packet: bytes, unpacker: Unpacker - ) -> Tuple[SOHExtendedHeader, bytes]: - """ - Process an SOH packet and get its extended header and poyload. - :param packet: the bytes that make up the given SOH packet. - :param unpacker: the unpacker to use to decode the data. - :return: the extended header and payload of the given SOH packet. - """ - - event_number = bcd_16bit_int(numpy.frombuffer(packet[16:18], numpy.uint8)) - data_stream_number = bcd(numpy.frombuffer(packet[18:19], numpy.uint8)) - channel_number = bcd(numpy.frombuffer(packet[19:20], numpy.uint8)) - number_of_samples = bcd_16bit_int( - numpy.frombuffer(packet[20:22], numpy.uint8) - ) - flags = unpacker.unpack('B', packet[22:23])[0] - data_format = packet[23:24].hex().upper() - - extended_header = SOHExtendedHeader(event_number, data_stream_number, - channel_number, number_of_samples, - flags, data_format) - payload = packet[24:] - return extended_header, payload