From ef3cca60ae6d17762aa8af5e0cc4b370d4a0b5d9 Mon Sep 17 00:00:00 2001 From: kienle <kienle@passcal.nmt.edu> Date: Thu, 24 Aug 2023 15:56:45 -0600 Subject: [PATCH] Add type hint --- .../model/reftek/reftek_data/soh_packet.py | 48 ++++++++++--------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/sohstationviewer/model/reftek/reftek_data/soh_packet.py b/sohstationviewer/model/reftek/reftek_data/soh_packet.py index c3a83f6be..dc60af5f5 100644 --- a/sohstationviewer/model/reftek/reftek_data/soh_packet.py +++ b/sohstationviewer/model/reftek/reftek_data/soh_packet.py @@ -1,3 +1,7 @@ +from __future__ import annotations + +from typing import Optional, List + #!/usr/bin/env python # -*- coding: utf-8 -*- """ @@ -275,7 +279,7 @@ class SOHPacket(obspy_rt130_packet.Packet): 'packet_sequence', 'time') @staticmethod - def from_data(data): + def from_data(data: np.ndarray) -> SOHPacket: """ Checks for valid packet type identifier and returns appropriate packet object @@ -300,7 +304,7 @@ class SOHPacket(obspy_rt130_packet.Packet): raise NotImplementedError(msg.format(packet_type)) @staticmethod - def time_tag(time, implement_time=None): + def time_tag(time: UTCDateTime, implement_time: Optional[int] = None): if implement_time is not None and time > UTCDateTime(ns=implement_time): # noqa: E501 time = UTCDateTime(ns=implement_time) return "{:04d}:{:03d}:{:02d}:{:02d}:{:02d}:{:03d}".format(time.year, @@ -311,14 +315,14 @@ class SOHPacket(obspy_rt130_packet.Packet): time.microsecond) # noqa: E501 @property - def packet_tagline(self): + def packet_tagline(self) -> str: return "\n" class SHPacket(SOHPacket): """Class used to parse and generate string representation for SH packets""" - def __init__(self, data): + def __init__(self, data: np.ndarray) -> None: self._data = data payload = self._data["payload"].tobytes() start_sh = 0 @@ -335,7 +339,7 @@ class SHPacket(SOHPacket): setattr(self, name, data) start_sh = start_sh + length - def __str__(self): + def __str__(self) -> str: info = [] # info.append(self.packet_tagline) packet_soh_string = ("\nState of Health {:s} ST: {:s}" @@ -349,7 +353,7 @@ class SHPacket(SOHPacket): class SCPacket(SOHPacket): """Class used to parse and generate string representation for SC packets""" - def __init__(self, data): + def __init__(self, data: np.ndarray) -> None: # Station/Channel payload self._data = data payload = self._data["payload"].tobytes() @@ -383,7 +387,7 @@ class SCPacket(SOHPacket): setattr(self, name, data) start_info = start_info + length - def __str__(self): + def __str__(self) -> str: info = [] packet_soh_string = ("\nStation Channel Definition {:s} ST: {:s}" .format(self.time_tag(self.time), @@ -420,7 +424,7 @@ class SCPacket(SOHPacket): info.append("\n Comments - " + getattr(self, 'sc' + str(ind_sc) + '_comments')) # noqa: E501 return info - def get_info(self, infos): + def get_info(self, infos: List[List]) -> List[List]: """ Compile relevant information - unit id, reference channel, network code, station code, component code, gain and implementation time - for @@ -454,7 +458,7 @@ class SCPacket(SOHPacket): class OMPacket(SOHPacket): """Class used to parse and generate string representation for OM packets""" - def __init__(self, data): + def __init__(self, data: np.ndarray) -> None: self._data = data payload = self._data["payload"].tobytes() start_om = 0 @@ -471,7 +475,7 @@ class OMPacket(SOHPacket): setattr(self, name, data) start_om = start_om + length - def __str__(self): + def __str__(self) -> str: info = [] # info.append(self.packet_tagline) packet_soh_string = ("\nOperating Mode Definition {:s} ST: {:s}" @@ -496,7 +500,7 @@ class OMPacket(SOHPacket): class DSPacket(SOHPacket): """Class used to parse and generate string representation for DS packets""" - def __init__(self, data): + def __init__(self, data: np.ndarray) -> None: # Data Stream payload self._data = data payload = self._data["payload"].tobytes() @@ -551,7 +555,7 @@ class DSPacket(SOHPacket): msg = ("Trigger type {:s} not found".format(trigger_type)) warnings.warn(msg) - def __str__(self): + def __str__(self) -> str: info = [] info.append(self.packet_tagline) packet_soh_string = ("\nData Stream Definition {:s} ST: {:s}" @@ -587,7 +591,7 @@ class DSPacket(SOHPacket): info.append(" ".join(["\n Trigger", key, trigger_info])) # noqa: E501 return info - def get_info(self, infos): + def get_info(self, infos: List[List]) -> List[List]: """ Compile relevant information - reference data stream, band and instrument codes, sample rate and implementation time - for given DS @@ -617,7 +621,7 @@ class DSPacket(SOHPacket): class ADPacket(SOHPacket): """Class used to parse and generate string representation for AD packets""" - def __init__(self, data): + def __init__(self, data: np.ndarray) -> None: self._data = data payload = self._data["payload"].tobytes() start_ad = 0 @@ -634,7 +638,7 @@ class ADPacket(SOHPacket): setattr(self, name, data) start_ad = start_ad + length - def __str__(self): + def __str__(self) -> str: info = [] # info.append(self.packet_tagline) packet_soh_string = ("\nAuxiliary Data Parameter {:s} ST: {:s}" @@ -657,7 +661,7 @@ class ADPacket(SOHPacket): class CDPacket(SOHPacket): """Class used to parse and generate string representation for CD packets""" - def __init__(self, data): + def __init__(self, data: np.ndarray) -> None: # Calibration parameter payload self._data = data payload = self._data["payload"].tobytes() @@ -726,7 +730,7 @@ class CDPacket(SOHPacket): setattr(self, name, data) start_info_seq = start_info_seq + length - def __str__(self): + def __str__(self) -> str: info = [] # info.append(self.packet_tagline) packet_soh_string = ("\nCalibration Definition {:s} ST: {:s}" @@ -783,7 +787,7 @@ class CDPacket(SOHPacket): class FDPacket(SOHPacket): """Class used to parse and generate string representation for FD packets""" - def __init__(self, data): + def __init__(self, data: np.ndarray) -> None: # Filter description payload self._data = data payload = self._data["payload"] @@ -835,7 +839,7 @@ class FDPacket(SOHPacket): setattr(self, name, data) start_info = start_info + length - def __str__(self): + def __str__(self) -> str: info = [] # info.append(self.packet_tagline) packet_soh_string = ("\nFilter Description {:s} ST: {:s}" @@ -863,7 +867,7 @@ class FDPacket(SOHPacket): return info @staticmethod - def twosCom_bin2dec(bin_, digit): + def twosCom_bin2dec(bin_: str, digit: int): while len(bin_) < digit: bin_ = '0' + bin_ if bin_[0] == '0': @@ -872,7 +876,7 @@ class FDPacket(SOHPacket): return -1 * (int(''.join('1' if x == '0' else '0' for x in bin_), 2) + 1) # noqa: E501 @staticmethod - def twosCom_dec2bin(dec, digit): + def twosCom_dec2bin(dec: int, digit: int): if dec >= 0: bin_ = bin(dec).split("0b")[1] while len(bin_) < digit: @@ -883,7 +887,7 @@ class FDPacket(SOHPacket): return bin(dec - pow(2, digit)).split("0b")[1] -def _initial_unpack_packets_soh(bytestring): +def _initial_unpack_packets_soh(bytestring: bytes) -> np.ndarray: """ First unpack data with dtype matching itemsize of storage in the reftek file, than allocate result array with dtypes for storage of python -- GitLab