From eb40d3f85960b0714fc15ffad8bf255a3905b008 Mon Sep 17 00:00:00 2001 From: kienle <kienle@passcal.nmt.edu> Date: Thu, 24 Aug 2023 15:33:44 -0600 Subject: [PATCH] Add type hint Add type hint --- .../model/reftek/reftek_data/core.py | 14 +++-- .../model/reftek/reftek_data/dt_packet.py | 56 ++++++++++--------- .../model/reftek/reftek_data/eh_et_packet.py | 48 ++++++++-------- .../model/reftek/reftek_data/packet.py | 8 ++- .../model/reftek/reftek_data/reftek_helper.py | 15 ++--- .../model/reftek/reftek_data/soh_packet.py | 48 ++++++++-------- .../model/reftek/reftek_data/soh_packets.py | 8 ++- 7 files changed, 108 insertions(+), 89 deletions(-) diff --git a/sohstationviewer/model/reftek/reftek_data/core.py b/sohstationviewer/model/reftek/reftek_data/core.py index a5c75e9a6..a2d24a8c5 100644 --- a/sohstationviewer/model/reftek/reftek_data/core.py +++ b/sohstationviewer/model/reftek/reftek_data/core.py @@ -1,3 +1,5 @@ +from __future__ import annotations + #!/usr/bin/env python # -*- coding: utf-8 -*- @@ -37,7 +39,7 @@ class DiscontinuousTrace(Trace): Extension of obspy.Trace that changes the way time data is handled when reading data using the method from logpeek/qpeek. """ - def __init__(self, *args, times, **kwargs): + def __init__(self, *args, times: np.ndarray, **kwargs): super().__init__(*args, **kwargs) self._times = times @@ -74,7 +76,7 @@ class Reftek130(obspy_rt130_core.Reftek130): better performance. """ @staticmethod - def from_file(file: Union[str, Path]): + def from_file(file: Union[str, Path]) -> Reftek130: """ Read data from an RT130 file and save it in a Reftek130 object. :param file: the RT130 file to read @@ -92,10 +94,10 @@ class Reftek130(obspy_rt130_core.Reftek130): rt._data = np.array(converted_packets, dtype=PACKET_FINAL_DTYPE) return rt - def to_stream(self, network="", location="", component_codes=None, - include_mp123=False, include_mp456=False, - headonly=False, verbose=False, - sort_permuted_package_sequence=False): + def to_stream(self, network: str = "", location: str = "", + include_mp123: bool = False, include_mp456: bool = False, + headonly: bool = False, verbose: bool = False, + sort_permuted_package_sequence: bool = False) -> Stream: """ Create an obspy.Stream object that holds the data stored in this Reftek130 object. diff --git a/sohstationviewer/model/reftek/reftek_data/dt_packet.py b/sohstationviewer/model/reftek/reftek_data/dt_packet.py index 679224659..d1e7b440a 100644 --- a/sohstationviewer/model/reftek/reftek_data/dt_packet.py +++ b/sohstationviewer/model/reftek/reftek_data/dt_packet.py @@ -1,10 +1,35 @@ import dataclasses +from typing import Tuple, Any from sohstationviewer.model.mseed_data.record_reader_helper import Unpacker from sohstationviewer.model.reftek.reftek_data.header import PacketHeader -def decode_uncompressed(packet: bytes, data_format: str, unpacker: Unpacker): +@dataclasses.dataclass +class DTExtendedHeader: + """ + The extended header of a DT packet. + """ + event_number: int + data_stream_number: int + channel_number: int + number_of_samples: int + flags: int + data_format: str + + +@dataclasses.dataclass +class DTPacket: + """ + The decoded data of a DT packet. + """ + header: PacketHeader + extended_header: DTExtendedHeader + data: int + + +def decode_uncompressed(packet: bytes, data_format: str, unpacker: Unpacker + ) -> int: """ Grab the first data point in a packet that contains uncompressed RT130 data (aka packets with data format 16, 32, or 33_. @@ -33,7 +58,8 @@ def decode_uncompressed(packet: bytes, data_format: str, unpacker: Unpacker): return unpacker.unpack(f'{format_char}', first_data_point)[0] -def decode_compressed(packet: bytes, data_format: str, unpacker: Unpacker): +def decode_compressed(packet: bytes, data_format: str, unpacker: Unpacker + ) -> int: """ Grab the stop point in a packet that contains compressed RT130 data (aka packets with data format C0, C1, C2, or C3). @@ -53,7 +79,8 @@ def decode_compressed(packet: bytes, data_format: str, unpacker: Unpacker): return unpacker.unpack('i', first_data_point)[0] -def read_dt_packet(packet: bytes, unpacker: Unpacker): +def read_dt_packet(packet: bytes, unpacker: Unpacker + ) -> tuple[DTExtendedHeader, Any]: """ Process a DT packet and get its extended header and first data point. :param packet: the bytes that make up the given DT packet. @@ -77,26 +104,3 @@ def read_dt_packet(packet: bytes, unpacker: Unpacker): flags, data_format) first_data_point = decoders[data_format](packet, data_format, unpacker) return extended_header, first_data_point - - -@dataclasses.dataclass -class DTExtendedHeader: - """ - The extended header of a DT packet. - """ - event_number: int - data_stream_number: int - channel_number: int - number_of_samples: int - flags: int - data_format: str - - -@dataclasses.dataclass -class DTPacket: - """ - The decoded data of a DT packet. - """ - header: PacketHeader - extended_header: DTExtendedHeader - data: int diff --git a/sohstationviewer/model/reftek/reftek_data/eh_et_packet.py b/sohstationviewer/model/reftek/reftek_data/eh_et_packet.py index 366ba2f7b..cf4b4c108 100644 --- a/sohstationviewer/model/reftek/reftek_data/eh_et_packet.py +++ b/sohstationviewer/model/reftek/reftek_data/eh_et_packet.py @@ -1,4 +1,5 @@ import dataclasses +from typing import Tuple from sohstationviewer.model.mseed_data.record_reader_helper import Unpacker from sohstationviewer.model.reftek.reftek_data.packet import \ @@ -6,28 +7,6 @@ from sohstationviewer.model.reftek.reftek_data.packet import \ from sohstationviewer.model.reftek.reftek_data.header import PacketHeader -def read_eh_et_packet(packet: bytes, unpacker: Unpacker): - """ - Process an EH/ET packet and get its extended header and required part of - the payload. - :param packet: the bytes that make up the given EH/ET packet. - :param unpacker: the unpacker to use to decode the data. - :return: the extended header and truncated payload of the given EH/ET - packet. - """ - event_number = int(packet[16:18].hex()) - data_stream_number = int(packet[18:19].hex()) - flags = unpacker.unpack('B', packet[22:23])[0] - data_format = packet[23:24].hex().upper() - - extended_header = EHETExtendedHeader(event_number, data_stream_number, - flags, data_format) - # The largest possible data point has a size of 4 bytes, so we need to - # grab at least data. - payload = packet[24:eh_et_payload_end_in_packet] - return extended_header, payload - - @dataclasses.dataclass class EHETExtendedHeader: """ @@ -57,3 +36,28 @@ class EHETPacket: header: PacketHeader extended_header: EHETExtendedHeader data: bytes + + +def read_eh_et_packet(packet: bytes, unpacker: Unpacker + ) -> Tuple[EHETExtendedHeader, bytes]: + """ + Process an EH/ET packet and get its extended header and required part of + the payload. + :param packet: the bytes that make up the given EH/ET packet. + :param unpacker: the unpacker to use to decode the data. + :return: the extended header and truncated payload of the given EH/ET + packet. + """ + event_number = int(packet[16:18].hex()) + data_stream_number = int(packet[18:19].hex()) + flags = unpacker.unpack('B', packet[22:23])[0] + data_format = packet[23:24].hex().upper() + + extended_header = EHETExtendedHeader(event_number, data_stream_number, + flags, data_format) + # The largest possible data point has a size of 4 bytes, so we need to + # grab at least data. + payload = packet[24:eh_et_payload_end_in_packet] + return extended_header, payload + + diff --git a/sohstationviewer/model/reftek/reftek_data/packet.py b/sohstationviewer/model/reftek/reftek_data/packet.py index be79c3eb9..ec1e3f4e7 100644 --- a/sohstationviewer/model/reftek/reftek_data/packet.py +++ b/sohstationviewer/model/reftek/reftek_data/packet.py @@ -9,7 +9,9 @@ Suggested updates to obspy.io.reftek.packet: Maeva Pourpoint IRIS/PASSCAL """ +from typing import List +import numpy import obspy.io.reftek.packet as obspy_rt130_packet from obspy import UTCDateTime @@ -75,7 +77,7 @@ obspy_rt130_packet.EH_PAYLOAD = { class EHPacket(obspy_rt130_packet.EHPacket): - def __init__(self, data): + def __init__(self, data: numpy.ndarray) -> None: """ Reimplement __init__ to change a different value for EH_PAYLOAD. This should be the cleanest way to do it, seeing as any other way I @@ -93,7 +95,7 @@ class EHPacket(obspy_rt130_packet.EHPacket): data = converter(data) setattr(self, name, data) - def __str__(self, compact=False): + def __str__(self, compact: bool = False) -> str: if compact: sta = (self.station_name.strip() + self.station_name_extension.strip()) @@ -123,7 +125,7 @@ class EHPacket(obspy_rt130_packet.EHPacket): "\n\t".join(info)) return info - def eh_et_info(self, nbr_DT_samples): + def eh_et_info(self, nbr_DT_samples: int) -> list[str]: """ Compile EH and ET info to write to log file. Returns list of strings. diff --git a/sohstationviewer/model/reftek/reftek_data/reftek_helper.py b/sohstationviewer/model/reftek/reftek_data/reftek_helper.py index 09fe6ca50..11ab74d78 100644 --- a/sohstationviewer/model/reftek/reftek_data/reftek_helper.py +++ b/sohstationviewer/model/reftek/reftek_data/reftek_helper.py @@ -1,5 +1,5 @@ import os -from typing import Any, Dict, Callable, Union +from typing import Any, Dict, Callable, Union, List, Tuple import numpy import numpy as np @@ -20,12 +20,12 @@ from sohstationviewer.model.reftek.reftek_data.soh_packets import ( ) -def packet_reader_placeholder(*args: Any, **kwargs: Any) -> None: +def packet_reader_placeholder(*args: Any, **kwargs: Any) -> Tuple[Any, Any]: """ Placeholder function to be used in place of an RT130 packet reader function. This function immediately returns None. """ - return None + return None, None class RT130ParseError(Exception): @@ -35,7 +35,8 @@ class RT130ParseError(Exception): pass -def read_rt130_file(file_name: str, unpacker: Unpacker): +def read_rt130_file(file_name: str, unpacker: Unpacker + ) -> List[Union[EHETPacket, DTPacket, SOHPacket]]: """ Read an RT130 file and stores the data in a list of RT130 packets. :param file_name: the name of the file to read. @@ -93,13 +94,13 @@ def read_rt130_file(file_name: str, unpacker: Unpacker): def convert_packet_to_obspy_format(packet: Union[EHETPacket, DTPacket, SOHPacket], - unpacker: Unpacker): + unpacker: Unpacker) -> Tuple: """ Convert an RT130 packet into a numpy array of type PACKET_FINAL_DTYPE :param packet: an RT130 packet. :param unpacker: the decoder used to decode the data. - :return: a numpy array of type PACKET_FINAL_DTYPE that contains the data - stored in packet. + :return: a tuple that can be converted into an object of type + PACKET_FINAL_DTYPE that contains the data stored in packet. """ # We want to convert the packet to a tuple. In order to make it easier to # maintain, we first convert the packet to a dictionary. Then, we grab the diff --git a/sohstationviewer/model/reftek/reftek_data/soh_packet.py b/sohstationviewer/model/reftek/reftek_data/soh_packet.py index c3a83f6be..dc60af5f5 100644 --- a/sohstationviewer/model/reftek/reftek_data/soh_packet.py +++ b/sohstationviewer/model/reftek/reftek_data/soh_packet.py @@ -1,3 +1,7 @@ +from __future__ import annotations + +from typing import Optional, List + #!/usr/bin/env python # -*- coding: utf-8 -*- """ @@ -275,7 +279,7 @@ class SOHPacket(obspy_rt130_packet.Packet): 'packet_sequence', 'time') @staticmethod - def from_data(data): + def from_data(data: np.ndarray) -> SOHPacket: """ Checks for valid packet type identifier and returns appropriate packet object @@ -300,7 +304,7 @@ class SOHPacket(obspy_rt130_packet.Packet): raise NotImplementedError(msg.format(packet_type)) @staticmethod - def time_tag(time, implement_time=None): + def time_tag(time: UTCDateTime, implement_time: Optional[int] = None): if implement_time is not None and time > UTCDateTime(ns=implement_time): # noqa: E501 time = UTCDateTime(ns=implement_time) return "{:04d}:{:03d}:{:02d}:{:02d}:{:02d}:{:03d}".format(time.year, @@ -311,14 +315,14 @@ class SOHPacket(obspy_rt130_packet.Packet): time.microsecond) # noqa: E501 @property - def packet_tagline(self): + def packet_tagline(self) -> str: return "\n" class SHPacket(SOHPacket): """Class used to parse and generate string representation for SH packets""" - def __init__(self, data): + def __init__(self, data: np.ndarray) -> None: self._data = data payload = self._data["payload"].tobytes() start_sh = 0 @@ -335,7 +339,7 @@ class SHPacket(SOHPacket): setattr(self, name, data) start_sh = start_sh + length - def __str__(self): + def __str__(self) -> str: info = [] # info.append(self.packet_tagline) packet_soh_string = ("\nState of Health {:s} ST: {:s}" @@ -349,7 +353,7 @@ class SHPacket(SOHPacket): class SCPacket(SOHPacket): """Class used to parse and generate string representation for SC packets""" - def __init__(self, data): + def __init__(self, data: np.ndarray) -> None: # Station/Channel payload self._data = data payload = self._data["payload"].tobytes() @@ -383,7 +387,7 @@ class SCPacket(SOHPacket): setattr(self, name, data) start_info = start_info + length - def __str__(self): + def __str__(self) -> str: info = [] packet_soh_string = ("\nStation Channel Definition {:s} ST: {:s}" .format(self.time_tag(self.time), @@ -420,7 +424,7 @@ class SCPacket(SOHPacket): info.append("\n Comments - " + getattr(self, 'sc' + str(ind_sc) + '_comments')) # noqa: E501 return info - def get_info(self, infos): + def get_info(self, infos: List[List]) -> List[List]: """ Compile relevant information - unit id, reference channel, network code, station code, component code, gain and implementation time - for @@ -454,7 +458,7 @@ class SCPacket(SOHPacket): class OMPacket(SOHPacket): """Class used to parse and generate string representation for OM packets""" - def __init__(self, data): + def __init__(self, data: np.ndarray) -> None: self._data = data payload = self._data["payload"].tobytes() start_om = 0 @@ -471,7 +475,7 @@ class OMPacket(SOHPacket): setattr(self, name, data) start_om = start_om + length - def __str__(self): + def __str__(self) -> str: info = [] # info.append(self.packet_tagline) packet_soh_string = ("\nOperating Mode Definition {:s} ST: {:s}" @@ -496,7 +500,7 @@ class OMPacket(SOHPacket): class DSPacket(SOHPacket): """Class used to parse and generate string representation for DS packets""" - def __init__(self, data): + def __init__(self, data: np.ndarray) -> None: # Data Stream payload self._data = data payload = self._data["payload"].tobytes() @@ -551,7 +555,7 @@ class DSPacket(SOHPacket): msg = ("Trigger type {:s} not found".format(trigger_type)) warnings.warn(msg) - def __str__(self): + def __str__(self) -> str: info = [] info.append(self.packet_tagline) packet_soh_string = ("\nData Stream Definition {:s} ST: {:s}" @@ -587,7 +591,7 @@ class DSPacket(SOHPacket): info.append(" ".join(["\n Trigger", key, trigger_info])) # noqa: E501 return info - def get_info(self, infos): + def get_info(self, infos: List[List]) -> List[List]: """ Compile relevant information - reference data stream, band and instrument codes, sample rate and implementation time - for given DS @@ -617,7 +621,7 @@ class DSPacket(SOHPacket): class ADPacket(SOHPacket): """Class used to parse and generate string representation for AD packets""" - def __init__(self, data): + def __init__(self, data: np.ndarray) -> None: self._data = data payload = self._data["payload"].tobytes() start_ad = 0 @@ -634,7 +638,7 @@ class ADPacket(SOHPacket): setattr(self, name, data) start_ad = start_ad + length - def __str__(self): + def __str__(self) -> str: info = [] # info.append(self.packet_tagline) packet_soh_string = ("\nAuxiliary Data Parameter {:s} ST: {:s}" @@ -657,7 +661,7 @@ class ADPacket(SOHPacket): class CDPacket(SOHPacket): """Class used to parse and generate string representation for CD packets""" - def __init__(self, data): + def __init__(self, data: np.ndarray) -> None: # Calibration parameter payload self._data = data payload = self._data["payload"].tobytes() @@ -726,7 +730,7 @@ class CDPacket(SOHPacket): setattr(self, name, data) start_info_seq = start_info_seq + length - def __str__(self): + def __str__(self) -> str: info = [] # info.append(self.packet_tagline) packet_soh_string = ("\nCalibration Definition {:s} ST: {:s}" @@ -783,7 +787,7 @@ class CDPacket(SOHPacket): class FDPacket(SOHPacket): """Class used to parse and generate string representation for FD packets""" - def __init__(self, data): + def __init__(self, data: np.ndarray) -> None: # Filter description payload self._data = data payload = self._data["payload"] @@ -835,7 +839,7 @@ class FDPacket(SOHPacket): setattr(self, name, data) start_info = start_info + length - def __str__(self): + def __str__(self) -> str: info = [] # info.append(self.packet_tagline) packet_soh_string = ("\nFilter Description {:s} ST: {:s}" @@ -863,7 +867,7 @@ class FDPacket(SOHPacket): return info @staticmethod - def twosCom_bin2dec(bin_, digit): + def twosCom_bin2dec(bin_: str, digit: int): while len(bin_) < digit: bin_ = '0' + bin_ if bin_[0] == '0': @@ -872,7 +876,7 @@ class FDPacket(SOHPacket): return -1 * (int(''.join('1' if x == '0' else '0' for x in bin_), 2) + 1) # noqa: E501 @staticmethod - def twosCom_dec2bin(dec, digit): + def twosCom_dec2bin(dec: int, digit: int): if dec >= 0: bin_ = bin(dec).split("0b")[1] while len(bin_) < digit: @@ -883,7 +887,7 @@ class FDPacket(SOHPacket): return bin(dec - pow(2, digit)).split("0b")[1] -def _initial_unpack_packets_soh(bytestring): +def _initial_unpack_packets_soh(bytestring: bytes) -> np.ndarray: """ First unpack data with dtype matching itemsize of storage in the reftek file, than allocate result array with dtypes for storage of python diff --git a/sohstationviewer/model/reftek/reftek_data/soh_packets.py b/sohstationviewer/model/reftek/reftek_data/soh_packets.py index e1445dd6b..2badb6fa0 100644 --- a/sohstationviewer/model/reftek/reftek_data/soh_packets.py +++ b/sohstationviewer/model/reftek/reftek_data/soh_packets.py @@ -1,4 +1,5 @@ import dataclasses +from typing import Tuple from obspy.io.reftek.util import bcd @@ -39,10 +40,10 @@ class SOHPacket: data: bytes -def bcd_16bit_int(_i): +def bcd_16bit_int(_i) -> int: """ Reimplement a private function of the same name in obspy. Kept here in case - the private function is removed in future obspy version. + the private function is removed in a future obspy version. :param _i: the byte string to convert into a 16-bite integer :return: a 16-bit integer """ @@ -50,7 +51,8 @@ def bcd_16bit_int(_i): return _i[0] * 100 + _i[1] -def read_soh_packet(packet: bytes, unpacker: Unpacker): +def read_soh_packet(packet: bytes, unpacker: Unpacker + ) -> Tuple[SOHExtendedHeader, bytes]: """ Process an SOH packet and get its extended header and poyload. :param packet: the bytes that make up the given SOH packet. -- GitLab