Skip to content
Snippets Groups Projects
Commit af042951 authored by Kien Le's avatar Kien Le
Browse files

Add type hint

parent 8f63eb6b
No related branches found
No related tags found
No related merge requests found
from __future__ import annotations
#!/usr/bin/env python
# -*- coding: utf-8 -*-
......@@ -37,7 +39,7 @@ class DiscontinuousTrace(Trace):
Extension of obspy.Trace that changes the way time data is handled when
reading data using the method from logpeek/qpeek.
"""
def __init__(self, *args, times, **kwargs):
def __init__(self, *args, times: np.ndarray, **kwargs):
super().__init__(*args, **kwargs)
self._times = times
......@@ -74,7 +76,7 @@ class Reftek130(obspy_rt130_core.Reftek130):
better performance.
"""
@staticmethod
def from_file(file: Union[str, Path]):
def from_file(file: Union[str, Path]) -> Reftek130:
"""
Read data from an RT130 file and save it in a Reftek130 object.
:param file: the RT130 file to read
......@@ -92,10 +94,10 @@ class Reftek130(obspy_rt130_core.Reftek130):
rt._data = np.array(converted_packets, dtype=PACKET_FINAL_DTYPE)
return rt
def to_stream(self, network="", location="", component_codes=None,
include_mp123=False, include_mp456=False,
headonly=False, verbose=False,
sort_permuted_package_sequence=False):
def to_stream(self, network: str = "", location: str = "",
include_mp123: bool = False, include_mp456: bool = False,
headonly: bool = False, verbose: bool = False,
sort_permuted_package_sequence: bool = False) -> Stream:
"""
Create an obspy.Stream object that holds the data stored in this
Reftek130 object.
......
import dataclasses
from typing import Tuple, Any
from sohstationviewer.model.mseed_data.record_reader_helper import Unpacker
from sohstationviewer.model.reftek.reftek_data.header import PacketHeader
def decode_uncompressed(packet: bytes, data_format: str, unpacker: Unpacker):
@dataclasses.dataclass
class DTExtendedHeader:
"""
The extended header of a DT packet.
"""
event_number: int
data_stream_number: int
channel_number: int
number_of_samples: int
flags: int
data_format: str
@dataclasses.dataclass
class DTPacket:
"""
The decoded data of a DT packet.
"""
header: PacketHeader
extended_header: DTExtendedHeader
data: int
def decode_uncompressed(packet: bytes, data_format: str, unpacker: Unpacker
) -> int:
"""
Grab the first data point in a packet that contains uncompressed RT130 data
(aka packets with data format 16, 32, or 33_.
......@@ -33,7 +58,8 @@ def decode_uncompressed(packet: bytes, data_format: str, unpacker: Unpacker):
return unpacker.unpack(f'{format_char}', first_data_point)[0]
def decode_compressed(packet: bytes, data_format: str, unpacker: Unpacker):
def decode_compressed(packet: bytes, data_format: str, unpacker: Unpacker
) -> int:
"""
Grab the stop point in a packet that contains compressed RT130 data (aka
packets with data format C0, C1, C2, or C3).
......@@ -53,7 +79,8 @@ def decode_compressed(packet: bytes, data_format: str, unpacker: Unpacker):
return unpacker.unpack('i', first_data_point)[0]
def read_dt_packet(packet: bytes, unpacker: Unpacker):
def read_dt_packet(packet: bytes, unpacker: Unpacker
) -> tuple[DTExtendedHeader, Any]:
"""
Process a DT packet and get its extended header and first data point.
:param packet: the bytes that make up the given DT packet.
......@@ -77,26 +104,3 @@ def read_dt_packet(packet: bytes, unpacker: Unpacker):
flags, data_format)
first_data_point = decoders[data_format](packet, data_format, unpacker)
return extended_header, first_data_point
@dataclasses.dataclass
class DTExtendedHeader:
"""
The extended header of a DT packet.
"""
event_number: int
data_stream_number: int
channel_number: int
number_of_samples: int
flags: int
data_format: str
@dataclasses.dataclass
class DTPacket:
"""
The decoded data of a DT packet.
"""
header: PacketHeader
extended_header: DTExtendedHeader
data: int
import dataclasses
from typing import Tuple
from sohstationviewer.model.mseed_data.record_reader_helper import Unpacker
from sohstationviewer.model.reftek.reftek_data.packet import \
......@@ -6,28 +7,6 @@ from sohstationviewer.model.reftek.reftek_data.packet import \
from sohstationviewer.model.reftek.reftek_data.header import PacketHeader
def read_eh_et_packet(packet: bytes, unpacker: Unpacker):
"""
Process an EH/ET packet and get its extended header and required part of
the payload.
:param packet: the bytes that make up the given EH/ET packet.
:param unpacker: the unpacker to use to decode the data.
:return: the extended header and truncated payload of the given EH/ET
packet.
"""
event_number = int(packet[16:18].hex())
data_stream_number = int(packet[18:19].hex())
flags = unpacker.unpack('B', packet[22:23])[0]
data_format = packet[23:24].hex().upper()
extended_header = EHETExtendedHeader(event_number, data_stream_number,
flags, data_format)
# The largest possible data point has a size of 4 bytes, so we need to
# grab at least data.
payload = packet[24:eh_et_payload_end_in_packet]
return extended_header, payload
@dataclasses.dataclass
class EHETExtendedHeader:
"""
......@@ -57,3 +36,28 @@ class EHETPacket:
header: PacketHeader
extended_header: EHETExtendedHeader
data: bytes
def read_eh_et_packet(packet: bytes, unpacker: Unpacker
) -> Tuple[EHETExtendedHeader, bytes]:
"""
Process an EH/ET packet and get its extended header and required part of
the payload.
:param packet: the bytes that make up the given EH/ET packet.
:param unpacker: the unpacker to use to decode the data.
:return: the extended header and truncated payload of the given EH/ET
packet.
"""
event_number = int(packet[16:18].hex())
data_stream_number = int(packet[18:19].hex())
flags = unpacker.unpack('B', packet[22:23])[0]
data_format = packet[23:24].hex().upper()
extended_header = EHETExtendedHeader(event_number, data_stream_number,
flags, data_format)
# The largest possible data point has a size of 4 bytes, so we need to
# grab at least data.
payload = packet[24:eh_et_payload_end_in_packet]
return extended_header, payload
......@@ -9,7 +9,9 @@ Suggested updates to obspy.io.reftek.packet:
Maeva Pourpoint IRIS/PASSCAL
"""
from typing import List
import numpy
import obspy.io.reftek.packet as obspy_rt130_packet
from obspy import UTCDateTime
......@@ -75,7 +77,7 @@ obspy_rt130_packet.EH_PAYLOAD = {
class EHPacket(obspy_rt130_packet.EHPacket):
def __init__(self, data):
def __init__(self, data: numpy.ndarray) -> None:
"""
Reimplement __init__ to change a different value for EH_PAYLOAD.
This should be the cleanest way to do it, seeing as any other way I
......@@ -93,7 +95,7 @@ class EHPacket(obspy_rt130_packet.EHPacket):
data = converter(data)
setattr(self, name, data)
def __str__(self, compact=False):
def __str__(self, compact: bool = False) -> str:
if compact:
sta = (self.station_name.strip() +
self.station_name_extension.strip())
......@@ -123,7 +125,7 @@ class EHPacket(obspy_rt130_packet.EHPacket):
"\n\t".join(info))
return info
def eh_et_info(self, nbr_DT_samples):
def eh_et_info(self, nbr_DT_samples: int) -> list[str]:
"""
Compile EH and ET info to write to log file.
Returns list of strings.
......
import os
from typing import Any, Dict, Callable, Union
from typing import Any, Dict, Callable, Union, List, Tuple
import numpy
import numpy as np
......@@ -20,12 +20,12 @@ from sohstationviewer.model.reftek.reftek_data.soh_packets import (
)
def packet_reader_placeholder(*args: Any, **kwargs: Any) -> None:
def packet_reader_placeholder(*args: Any, **kwargs: Any) -> Tuple[Any, Any]:
"""
Placeholder function to be used in place of an RT130 packet reader
function. This function immediately returns None.
"""
return None
return None, None
class RT130ParseError(Exception):
......@@ -35,7 +35,8 @@ class RT130ParseError(Exception):
pass
def read_rt130_file(file_name: str, unpacker: Unpacker):
def read_rt130_file(file_name: str, unpacker: Unpacker
) -> List[Union[EHETPacket, DTPacket, SOHPacket]]:
"""
Read an RT130 file and stores the data in a list of RT130 packets.
:param file_name: the name of the file to read.
......@@ -93,13 +94,13 @@ def read_rt130_file(file_name: str, unpacker: Unpacker):
def convert_packet_to_obspy_format(packet: Union[EHETPacket, DTPacket,
SOHPacket],
unpacker: Unpacker):
unpacker: Unpacker) -> Tuple:
"""
Convert an RT130 packet into a numpy array of type PACKET_FINAL_DTYPE
:param packet: an RT130 packet.
:param unpacker: the decoder used to decode the data.
:return: a numpy array of type PACKET_FINAL_DTYPE that contains the data
stored in packet.
:return: a tuple that can be converted into an object of type
PACKET_FINAL_DTYPE that contains the data stored in packet.
"""
# We want to convert the packet to a tuple. In order to make it easier to
# maintain, we first convert the packet to a dictionary. Then, we grab the
......
import dataclasses
from typing import Tuple
from obspy.io.reftek.util import bcd
......@@ -39,10 +40,10 @@ class SOHPacket:
data: bytes
def bcd_16bit_int(_i):
def bcd_16bit_int(_i) -> int:
"""
Reimplement a private function of the same name in obspy. Kept here in case
the private function is removed in future obspy version.
the private function is removed in a future obspy version.
:param _i: the byte string to convert into a 16-bite integer
:return: a 16-bit integer
"""
......@@ -50,7 +51,8 @@ def bcd_16bit_int(_i):
return _i[0] * 100 + _i[1]
def read_soh_packet(packet: bytes, unpacker: Unpacker):
def read_soh_packet(packet: bytes, unpacker: Unpacker
) -> Tuple[SOHExtendedHeader, bytes]:
"""
Process an SOH packet and get its extended header and poyload.
:param packet: the bytes that make up the given SOH packet.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment