diff --git a/sohstationviewer/model/reftek/reftek_data/core.py b/sohstationviewer/model/reftek/reftek_data/core.py index a5c75e9a6636f6186f61ff570bbad030c835400b..a2d24a8c541e3b1dacef116e398ee9abcf57c402 100644 --- a/sohstationviewer/model/reftek/reftek_data/core.py +++ b/sohstationviewer/model/reftek/reftek_data/core.py @@ -1,3 +1,5 @@ +from __future__ import annotations + #!/usr/bin/env python # -*- coding: utf-8 -*- @@ -37,7 +39,7 @@ class DiscontinuousTrace(Trace): Extension of obspy.Trace that changes the way time data is handled when reading data using the method from logpeek/qpeek. """ - def __init__(self, *args, times, **kwargs): + def __init__(self, *args, times: np.ndarray, **kwargs): super().__init__(*args, **kwargs) self._times = times @@ -74,7 +76,7 @@ class Reftek130(obspy_rt130_core.Reftek130): better performance. """ @staticmethod - def from_file(file: Union[str, Path]): + def from_file(file: Union[str, Path]) -> Reftek130: """ Read data from an RT130 file and save it in a Reftek130 object. :param file: the RT130 file to read @@ -92,10 +94,10 @@ class Reftek130(obspy_rt130_core.Reftek130): rt._data = np.array(converted_packets, dtype=PACKET_FINAL_DTYPE) return rt - def to_stream(self, network="", location="", component_codes=None, - include_mp123=False, include_mp456=False, - headonly=False, verbose=False, - sort_permuted_package_sequence=False): + def to_stream(self, network: str = "", location: str = "", + include_mp123: bool = False, include_mp456: bool = False, + headonly: bool = False, verbose: bool = False, + sort_permuted_package_sequence: bool = False) -> Stream: """ Create an obspy.Stream object that holds the data stored in this Reftek130 object. diff --git a/sohstationviewer/model/reftek/reftek_data/dt_packet.py b/sohstationviewer/model/reftek/reftek_data/dt_packet.py index 6792246594f101b09e659712d2f602766a3365d4..d1e7b440a729cfa6e043f7203b4d6bb2f3a7e47f 100644 --- a/sohstationviewer/model/reftek/reftek_data/dt_packet.py +++ b/sohstationviewer/model/reftek/reftek_data/dt_packet.py @@ -1,10 +1,35 @@ import dataclasses +from typing import Tuple, Any from sohstationviewer.model.mseed_data.record_reader_helper import Unpacker from sohstationviewer.model.reftek.reftek_data.header import PacketHeader -def decode_uncompressed(packet: bytes, data_format: str, unpacker: Unpacker): +@dataclasses.dataclass +class DTExtendedHeader: + """ + The extended header of a DT packet. + """ + event_number: int + data_stream_number: int + channel_number: int + number_of_samples: int + flags: int + data_format: str + + +@dataclasses.dataclass +class DTPacket: + """ + The decoded data of a DT packet. + """ + header: PacketHeader + extended_header: DTExtendedHeader + data: int + + +def decode_uncompressed(packet: bytes, data_format: str, unpacker: Unpacker + ) -> int: """ Grab the first data point in a packet that contains uncompressed RT130 data (aka packets with data format 16, 32, or 33_. @@ -33,7 +58,8 @@ def decode_uncompressed(packet: bytes, data_format: str, unpacker: Unpacker): return unpacker.unpack(f'{format_char}', first_data_point)[0] -def decode_compressed(packet: bytes, data_format: str, unpacker: Unpacker): +def decode_compressed(packet: bytes, data_format: str, unpacker: Unpacker + ) -> int: """ Grab the stop point in a packet that contains compressed RT130 data (aka packets with data format C0, C1, C2, or C3). @@ -53,7 +79,8 @@ def decode_compressed(packet: bytes, data_format: str, unpacker: Unpacker): return unpacker.unpack('i', first_data_point)[0] -def read_dt_packet(packet: bytes, unpacker: Unpacker): +def read_dt_packet(packet: bytes, unpacker: Unpacker + ) -> tuple[DTExtendedHeader, Any]: """ Process a DT packet and get its extended header and first data point. :param packet: the bytes that make up the given DT packet. @@ -77,26 +104,3 @@ def read_dt_packet(packet: bytes, unpacker: Unpacker): flags, data_format) first_data_point = decoders[data_format](packet, data_format, unpacker) return extended_header, first_data_point - - -@dataclasses.dataclass -class DTExtendedHeader: - """ - The extended header of a DT packet. - """ - event_number: int - data_stream_number: int - channel_number: int - number_of_samples: int - flags: int - data_format: str - - -@dataclasses.dataclass -class DTPacket: - """ - The decoded data of a DT packet. - """ - header: PacketHeader - extended_header: DTExtendedHeader - data: int diff --git a/sohstationviewer/model/reftek/reftek_data/eh_et_packet.py b/sohstationviewer/model/reftek/reftek_data/eh_et_packet.py index 366ba2f7b4d64ea6d430ad3f8a87940184270121..cf4b4c1087a2ea428535a8f12395095a69cfe2a0 100644 --- a/sohstationviewer/model/reftek/reftek_data/eh_et_packet.py +++ b/sohstationviewer/model/reftek/reftek_data/eh_et_packet.py @@ -1,4 +1,5 @@ import dataclasses +from typing import Tuple from sohstationviewer.model.mseed_data.record_reader_helper import Unpacker from sohstationviewer.model.reftek.reftek_data.packet import \ @@ -6,28 +7,6 @@ from sohstationviewer.model.reftek.reftek_data.packet import \ from sohstationviewer.model.reftek.reftek_data.header import PacketHeader -def read_eh_et_packet(packet: bytes, unpacker: Unpacker): - """ - Process an EH/ET packet and get its extended header and required part of - the payload. - :param packet: the bytes that make up the given EH/ET packet. - :param unpacker: the unpacker to use to decode the data. - :return: the extended header and truncated payload of the given EH/ET - packet. - """ - event_number = int(packet[16:18].hex()) - data_stream_number = int(packet[18:19].hex()) - flags = unpacker.unpack('B', packet[22:23])[0] - data_format = packet[23:24].hex().upper() - - extended_header = EHETExtendedHeader(event_number, data_stream_number, - flags, data_format) - # The largest possible data point has a size of 4 bytes, so we need to - # grab at least data. - payload = packet[24:eh_et_payload_end_in_packet] - return extended_header, payload - - @dataclasses.dataclass class EHETExtendedHeader: """ @@ -57,3 +36,28 @@ class EHETPacket: header: PacketHeader extended_header: EHETExtendedHeader data: bytes + + +def read_eh_et_packet(packet: bytes, unpacker: Unpacker + ) -> Tuple[EHETExtendedHeader, bytes]: + """ + Process an EH/ET packet and get its extended header and required part of + the payload. + :param packet: the bytes that make up the given EH/ET packet. + :param unpacker: the unpacker to use to decode the data. + :return: the extended header and truncated payload of the given EH/ET + packet. + """ + event_number = int(packet[16:18].hex()) + data_stream_number = int(packet[18:19].hex()) + flags = unpacker.unpack('B', packet[22:23])[0] + data_format = packet[23:24].hex().upper() + + extended_header = EHETExtendedHeader(event_number, data_stream_number, + flags, data_format) + # The largest possible data point has a size of 4 bytes, so we need to + # grab at least data. + payload = packet[24:eh_et_payload_end_in_packet] + return extended_header, payload + + diff --git a/sohstationviewer/model/reftek/reftek_data/packet.py b/sohstationviewer/model/reftek/reftek_data/packet.py index be79c3eb93ade8850c3a0a30aa11a5b8c7ae61fe..ec1e3f4e735b00419c1e7689f2d5890be979fa48 100644 --- a/sohstationviewer/model/reftek/reftek_data/packet.py +++ b/sohstationviewer/model/reftek/reftek_data/packet.py @@ -9,7 +9,9 @@ Suggested updates to obspy.io.reftek.packet: Maeva Pourpoint IRIS/PASSCAL """ +from typing import List +import numpy import obspy.io.reftek.packet as obspy_rt130_packet from obspy import UTCDateTime @@ -75,7 +77,7 @@ obspy_rt130_packet.EH_PAYLOAD = { class EHPacket(obspy_rt130_packet.EHPacket): - def __init__(self, data): + def __init__(self, data: numpy.ndarray) -> None: """ Reimplement __init__ to change a different value for EH_PAYLOAD. This should be the cleanest way to do it, seeing as any other way I @@ -93,7 +95,7 @@ class EHPacket(obspy_rt130_packet.EHPacket): data = converter(data) setattr(self, name, data) - def __str__(self, compact=False): + def __str__(self, compact: bool = False) -> str: if compact: sta = (self.station_name.strip() + self.station_name_extension.strip()) @@ -123,7 +125,7 @@ class EHPacket(obspy_rt130_packet.EHPacket): "\n\t".join(info)) return info - def eh_et_info(self, nbr_DT_samples): + def eh_et_info(self, nbr_DT_samples: int) -> list[str]: """ Compile EH and ET info to write to log file. Returns list of strings. diff --git a/sohstationviewer/model/reftek/reftek_data/reftek_helper.py b/sohstationviewer/model/reftek/reftek_data/reftek_helper.py index 09fe6ca50e89fa652c971beb9f4ad307badc502f..11ab74d78ffab2365f1cdd10123cc1c36575f826 100644 --- a/sohstationviewer/model/reftek/reftek_data/reftek_helper.py +++ b/sohstationviewer/model/reftek/reftek_data/reftek_helper.py @@ -1,5 +1,5 @@ import os -from typing import Any, Dict, Callable, Union +from typing import Any, Dict, Callable, Union, List, Tuple import numpy import numpy as np @@ -20,12 +20,12 @@ from sohstationviewer.model.reftek.reftek_data.soh_packets import ( ) -def packet_reader_placeholder(*args: Any, **kwargs: Any) -> None: +def packet_reader_placeholder(*args: Any, **kwargs: Any) -> Tuple[Any, Any]: """ Placeholder function to be used in place of an RT130 packet reader function. This function immediately returns None. """ - return None + return None, None class RT130ParseError(Exception): @@ -35,7 +35,8 @@ class RT130ParseError(Exception): pass -def read_rt130_file(file_name: str, unpacker: Unpacker): +def read_rt130_file(file_name: str, unpacker: Unpacker + ) -> List[Union[EHETPacket, DTPacket, SOHPacket]]: """ Read an RT130 file and stores the data in a list of RT130 packets. :param file_name: the name of the file to read. @@ -93,13 +94,13 @@ def read_rt130_file(file_name: str, unpacker: Unpacker): def convert_packet_to_obspy_format(packet: Union[EHETPacket, DTPacket, SOHPacket], - unpacker: Unpacker): + unpacker: Unpacker) -> Tuple: """ Convert an RT130 packet into a numpy array of type PACKET_FINAL_DTYPE :param packet: an RT130 packet. :param unpacker: the decoder used to decode the data. - :return: a numpy array of type PACKET_FINAL_DTYPE that contains the data - stored in packet. + :return: a tuple that can be converted into an object of type + PACKET_FINAL_DTYPE that contains the data stored in packet. """ # We want to convert the packet to a tuple. In order to make it easier to # maintain, we first convert the packet to a dictionary. Then, we grab the diff --git a/sohstationviewer/model/reftek/reftek_data/soh_packets.py b/sohstationviewer/model/reftek/reftek_data/soh_packets.py index e1445dd6b6bbe2c790ea07f39ec5f1a77ecee3ac..2badb6fa0458c07fc31041ef94c03bb6e08c9968 100644 --- a/sohstationviewer/model/reftek/reftek_data/soh_packets.py +++ b/sohstationviewer/model/reftek/reftek_data/soh_packets.py @@ -1,4 +1,5 @@ import dataclasses +from typing import Tuple from obspy.io.reftek.util import bcd @@ -39,10 +40,10 @@ class SOHPacket: data: bytes -def bcd_16bit_int(_i): +def bcd_16bit_int(_i) -> int: """ Reimplement a private function of the same name in obspy. Kept here in case - the private function is removed in future obspy version. + the private function is removed in a future obspy version. :param _i: the byte string to convert into a 16-bite integer :return: a 16-bit integer """ @@ -50,7 +51,8 @@ def bcd_16bit_int(_i): return _i[0] * 100 + _i[1] -def read_soh_packet(packet: bytes, unpacker: Unpacker): +def read_soh_packet(packet: bytes, unpacker: Unpacker + ) -> Tuple[SOHExtendedHeader, bytes]: """ Process an SOH packet and get its extended header and poyload. :param packet: the bytes that make up the given SOH packet.