Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
Commits on Source (2)
Showing
with 1121 additions and 195 deletions
......@@ -3,7 +3,7 @@ from typing import Tuple, List, Dict
from obspy.core import Stream
from obspy import UTCDateTime
from sohstationviewer.model.reftek.from_rt2ms import core
from sohstationviewer.model.reftek.reftek_data import core
from sohstationviewer.model.handling_data import read_mseed_trace
......
......@@ -7,8 +7,7 @@ from typing import Tuple, List, Union
import traceback
import numpy as np
from sohstationviewer.model.reftek.from_rt2ms import (
core, soh_packet, packet)
from sohstationviewer.model.reftek.reftek_data import core, soh_packet
from sohstationviewer.model.reftek.log_info import LogInfo
from sohstationviewer.model.data_type_model import (
DataTypeModel, ThreadStopped, ProcessingDataError)
......@@ -176,7 +175,7 @@ class RT130(DataTypeModel):
cur_key = (d['unit_id'].decode(),
f"{d['experiment_number']}")
self.populate_cur_key_for_all_data(cur_key)
logs = soh_packet.Packet.from_data(d).__str__()
logs = soh_packet.SOHPacket.from_data(d).__str__()
if 'SOH' not in self.log_data[cur_key]:
self.log_data[cur_key]['SOH'] = []
self.log_data[cur_key]['SOH'].append((d['time'], logs))
......@@ -230,7 +229,7 @@ class RT130(DataTypeModel):
for index in ind_ehet:
d = rt130._data[index]
logs = packet.EHPacket(d).eh_et_info(nbr_dt_samples)
logs = core.EHPacket(d).eh_et_info(nbr_dt_samples)
if 'EHET' not in self.log_data[cur_key]:
self.log_data[cur_key]['EHET'] = []
self.log_data[cur_key]['EHET'].append((d['time'], logs))
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import annotations
"""
Suggested updates to obspy.io.reftek.core:
......@@ -10,8 +9,10 @@ Suggested updates to obspy.io.reftek.core:
Maeva Pourpoint IRIS/PASSCAL
"""
import copy
from pathlib import Path
from typing import Optional, Union
import obspy.io.reftek.core as obspy_rt130_core
import warnings
......@@ -19,8 +20,49 @@ import numpy as np
from obspy import Trace, Stream, UTCDateTime
from obspy.core.util.obspy_types import ObsPyException
from obspy.io.reftek.packet import _unpack_C0_C2_data
from sohstationviewer.model.reftek.from_rt2ms.packet import EHPacket
from obspy.io.reftek.packet import PACKET_FINAL_DTYPE
from sohstationviewer.model.mseed_data.record_reader_helper import Unpacker
from sohstationviewer.model.reftek.reftek_data.packet import EHPacket
from sohstationviewer.model.reftek.reftek_data.reftek_helper import (
read_rt130_file, convert_packet_to_obspy_format,
)
class DiscontinuousTrace(Trace):
"""
Extension of obspy.Trace that changes the way time data is handled when
reading data using the method from logpeek/qpeek.
"""
def __init__(self, *args, times: np.ndarray, **kwargs):
super().__init__(*args, **kwargs)
self._times = times
def times(self, type: str = "relative",
reftime: Optional[UTCDateTime] = None) -> np.ndarray:
"""
Override Trace.times(). Returns a numpy array of stored times data,
modified based on the argument "type".
:param type: the type of times data to return. For more information,
refer to Trace.times(). Note: this method does not implement
types 'utcdatetime' and 'matplotlib' because they are not going
to be useful.
:param reftime: the time used as a reference point when getting
relative time. If None, the start time of the trace is used as
the reference point.
:return: the requested array of time data, modified based on the type
requested.
"""
if type == 'utcdatetime' or type == 'matplotlib':
raise NotImplementedError
elif type == 'relative':
if reftime is None:
return self._times - self.stats.starttime.timestamp
else:
return self._times - reftime.timestamp
elif type == 'timestamp':
return self._times
class Reftek130Exception(ObsPyException):
......@@ -28,18 +70,41 @@ class Reftek130Exception(ObsPyException):
class Reftek130(obspy_rt130_core.Reftek130):
"""
Child class of obspy.Reftek that reads waveform data similar to logpeek for
better performance.
"""
@staticmethod
def from_file(file: Union[str, Path]) -> Reftek130:
"""
Read data from an RT130 file and save it in a Reftek130 object.
:param file: the RT130 file to read
:return: a Reftek130 object that stores the data in file
"""
# RT130 data is all big-endian
rt130_unpacker = Unpacker('>')
rt = Reftek130()
rt._filename = file
packets_in_file = read_rt130_file(file, rt130_unpacker)
converted_packets = []
for packet in packets_in_file:
converted_packets.append(
convert_packet_to_obspy_format(packet, rt130_unpacker))
rt._data = np.array(converted_packets, dtype=PACKET_FINAL_DTYPE)
return rt
def to_stream(self, network="", location="", component_codes=None,
include_mp123=False, include_mp456=False,
headonly=False, verbose=False,
sort_permuted_package_sequence=False):
def to_stream(self, network: str = "", location: str = "",
include_mp123: bool = False, include_mp456: bool = False,
headonly: bool = False, verbose: bool = False,
sort_permuted_package_sequence: bool = False) -> Stream:
"""
Create an obspy.Stream object that holds the data stored in this
Reftek130 object.
:type headonly: bool
:param headonly: Determines whether or not to unpack the data or just
read the headers.
"""
if verbose:
print(self)
if not len(self._data):
msg = "No packet data in Reftek130 object (file: {})"
raise Reftek130Exception(msg.format(self._filename))
......@@ -81,20 +146,6 @@ class Reftek130(obspy_rt130_core.Reftek130):
eh = EHPacket(eh_packets[0])
else:
eh = EHPacket(et_packets[0])
# only C0, C2, 16, 32 encodings supported right now
if eh.data_format == b"C0":
encoding = 'C0'
elif eh.data_format == b"C2":
encoding = 'C2'
elif eh.data_format == b"16":
encoding = '16'
elif eh.data_format == b"32":
encoding = '32'
else:
msg = ("Reftek data encoding '{}' not implemented yet. Please "
"open an issue on GitHub and provide a small (< 50kb) "
"test file.").format(eh.data_format)
raise NotImplementedError(msg)
header = {
"unit_id": self._data['unit_id'][0],
"experiment_number": self._data['experiment_number'][0],
......@@ -140,74 +191,33 @@ class Reftek130(obspy_rt130_core.Reftek130):
sample_data = np.array([], dtype=np.int32)
npts = packets_["number_of_samples"].sum()
else:
if encoding in ('C0', 'C2'):
sample_data = _unpack_C0_C2_data(packets_,
encoding)
elif encoding in ('16', '32'):
# rt130 stores in big endian
dtype = {'16': '>i2', '32': '>i4'}[encoding]
# just fix endianness and use correct dtype
sample_data = np.require(
packets_['payload'],
requirements=['C_CONTIGUOUS'])
# either int16 or int32
sample_data = sample_data.view(dtype)
# account for number of samples, i.e. some packets
# might not use the full payload size but have
# empty parts at the end that need to be cut away
number_of_samples_max = sample_data.shape[1]
sample_data = sample_data.flatten()
# go through packets starting at the back,
# otherwise indices of later packets would change
# while looping
for ind, num_samps in reversed([
(ind, num_samps) for ind, num_samps in
enumerate(packets_["number_of_samples"])
if num_samps != number_of_samples_max]):
# looping backwards we can easily find the
# start of each packet, since the earlier
# packets are still untouched and at maximum
# sample length in our big array with all
# packets
start_of_packet = ind * number_of_samples_max
start_empty_part = start_of_packet + num_samps
end_empty_part = (start_of_packet +
number_of_samples_max)
sample_data = np.delete(
sample_data,
slice(start_empty_part, end_empty_part))
npts = len(sample_data)
tr = Trace(data=sample_data, header=copy.deepcopy(header))
# The payload stores the first data point of each
# packet, encoded as a numpy array of 4 1-byte numbers.
# Due to the way the payload is encoded during the
# reading process and a quirk of 2-complement binary
# numbers (namely, appending a negative number with 1s
# does not change its value), we do not have to care
# about the actual encoding type of the stored packets.
sample_data = np.asarray(packets_['payload'][:, :4])
sample_data = sample_data.view(np.dtype('>i4'))
sample_data = sample_data.squeeze(axis=-1)
npts = sample_data.size
tr = DiscontinuousTrace(
data=sample_data, header=copy.deepcopy(header),
times=(packets_['time'] / 10**9).round(3)
)
# The plotting process needs to know about the number of
# points stored in the trace. However, tr.stats use the
# stored npts to calculate some other metadata, so we can't
# store that information there. As a compromise, we keep
# tr.stats.npts the same, while storing the actual number
# of data points in the trace in another part of tr.stats.
tr.stats.npts = packets_['number_of_samples'].sum()
tr.stats.actual_npts = npts
# channel number is not included in the EH/ET packet
# payload, so add it to stats as well..
tr.stats.reftek130['channel_number'] = channel_number
if headonly:
tr.stats.npts = npts
tr.stats.starttime = UTCDateTime(ns=starttime)
"""
if component codes were explicitly provided, use them
together with the stream label
if component_codes is not None:
tr.stats.channel = (eh.stream_name.strip() +
component_codes[channel_number])
# otherwise check if channel code is set for the given
# channel (seems to be not the case usually)
elif eh.channel_code[channel_number] is not None:
tr.stats.channel = eh.channel_code[channel_number]
# otherwise fall back to using the stream label together
# with the number of the channel in the file (starting with
# 0, as Z-1-2 is common use for data streams not oriented
# against North)
else:
msg = ("No channel code specified in the data file "
"and no component codes specified. Using "
"stream label and number of channel in file as "
"channel codes.")
warnings.warn(msg)
tr.stats.channel = (
eh.stream_name.strip() + str(channel_number))
"""
DS = self._data['data_stream_number'][0] + 1
if DS != 9:
tr.stats.channel = "DS%s-%s" % (DS, channel_number + 1)
......@@ -218,22 +228,5 @@ class Reftek130(obspy_rt130_core.Reftek130):
continue
tr.stats.channel = "MassPos%s" % (channel_number + 1)
# check if endtime of trace is consistent
t_last = packets_[-1]['time']
npts_last = packets_[-1]['number_of_samples']
try:
if not headonly:
assert npts == len(sample_data)
if npts_last:
assert tr.stats.endtime == UTCDateTime(
ns=t_last) + (npts_last - 1) * delta
if npts:
assert tr.stats.endtime == (
tr.stats.starttime + (npts - 1) * delta)
except AssertionError:
msg = ("Reftek file has a trace with an inconsistent "
"endtime or number of samples. Please open an "
"issue on GitHub and provide your file for"
"testing.")
raise Reftek130Exception(msg)
st += tr
return st
import dataclasses
from obspy import UTCDateTime
class NotRT130FileError(Exception):
"""
Error to raise when there is a problem with parsing RT130 data.
"""
pass
@dataclasses.dataclass
class PacketHeader:
"""
The decoded header of an RT130 packet.
"""
packet_type: str
experiment_number: int
unit_id: str
time: UTCDateTime
byte_count: int
packet_sequence: int
def parse_rt130_time(year: int, time_bytes: bytes) -> UTCDateTime:
"""
Convert BCD-encoded RT130 time into UTCDateTime.
:param year: the year of the time. RT130's header store the year separate
from the time, so we have to pass it as an argument.
:param time_bytes: the BCD-encoded time.
:return: an UTCDateTime object that stores the decoded time.
"""
time_string = time_bytes.hex()
# The time string has the format of DDDHHMMSSTTT, where
# D = day of year
# H = hour
# M = minute
# S = second
# T = millisecond
day_of_year, hour, minute, second, millisecond = (
int(time_string[0:3]),
int(time_string[3:5]),
int(time_string[5:7]),
int(time_string[7:9]),
int(time_string[9:12])
)
# RT130 only stores the last two digits of the year. Because the
# documentation for RT130 does not define a way to retrieve the full year,
# we use Obspy's method. Accordingly, we convert 0-49 to 2000-2049 and
# 50-99 to 1950-1999.
if 0 <= year <= 49:
year += 2000
elif 50 <= year <= 99:
year += 1900
converted_time = UTCDateTime(year=year, julday=day_of_year, hour=hour,
minute=minute, second=second,
microsecond=millisecond * 1000)
return converted_time
def get_rt130_packet_header(rt130_packet: bytes) -> PacketHeader:
"""
Get the packet header stored in the first 16 bits of an RT130 packet.
:param rt130_packet: the RT130 packet to process
:return: a PacketHeader object containing the header of rt130_packet
"""
try:
# Because RT130 data is always big-endian, it is more convenient to
# use str.decode() than the unpacker.
packet_type = rt130_packet[:2].decode('ASCII')
except UnicodeError:
print('Cannot decode packet type.')
print('The given file does not appear to be a valid RT130 file.')
raise NotRT130FileError
valid_packet_types = ['AD', 'CD', 'DS', 'DT', 'EH', 'ET', 'OM', 'SH', 'SC',
'FD']
if packet_type not in valid_packet_types:
print(f'Invalid packet type found: {packet_type}')
print('The given file does not appear to be a valid RT130 file.')
raise NotRT130FileError
experiment_number = int(rt130_packet[2:3].hex())
year = int(rt130_packet[3:4].hex())
# A call to str.upper() is needed because bytes.hex() makes any
# hexadecimal letter (i.e. ABCDEF) lowercase, while we want them to be
# uppercase for display purpose.
unit_id = rt130_packet[4:6].hex().upper()
time_bytes = rt130_packet[6:12]
packet_time = parse_rt130_time(year, time_bytes)
byte_count = int(rt130_packet[12:14].hex())
packet_sequence = int(rt130_packet[14:16].hex())
return PacketHeader(packet_type, experiment_number, unit_id, packet_time,
byte_count, packet_sequence)
......@@ -9,24 +9,46 @@ Suggested updates to obspy.io.reftek.packet:
Maeva Pourpoint IRIS/PASSCAL
"""
from typing import List
import numpy
import obspy.io.reftek.packet as obspy_rt130_packet
from obspy import UTCDateTime
from obspy.io.reftek.util import (_decode_ascii,
_parse_long_time,
_16_tuple_ascii,
_16_tuple_int,
_16_tuple_float)
from sohstationviewer.model.reftek.from_rt2ms.soh_packet import Packet
from obspy.io.reftek.util import (
_decode_ascii, _parse_long_time, _16_tuple_ascii, _16_tuple_float,
_16_tuple_int,
)
from sohstationviewer.model.reftek.reftek_data.soh_packet import SOHPacket
class Reftek130UnpackPacketError(ValueError):
pass
eh_et_payload_last_field_start = 88
eh_et_payload_last_field_size = 16
# The payload start is based on the start of the payload, so we have to add 24
# to compensate for the size of the header and extended header.
eh_et_payload_end_in_packet = (
eh_et_payload_last_field_start + eh_et_payload_last_field_size + 24
)
# name, offset, length (bytes) and converter routine for EH/ET packet payload
# Trimmed to only include the parts used elsewhere for the sake of better
# performance.
EH_PAYLOAD = {
"station_name_extension": (35, 1, _decode_ascii),
"station_name": (36, 4, _decode_ascii),
"sampling_rate": (64, 4, float),
"trigger_time": (72, 16, _parse_long_time),
"first_sample_time": (
eh_et_payload_last_field_start, eh_et_payload_last_field_size,
_parse_long_time),
}
obspy_rt130_packet.EH_PAYLOAD = {
"trigger_time_message": (0, 33, _decode_ascii),
"time_source": (33, 1, _decode_ascii),
"time_quality": (34, 1, _decode_ascii),
......@@ -57,21 +79,37 @@ EH_PAYLOAD = {
"position": (894, 26, _decode_ascii),
"reftek_120": (920, 80, None)}
obspy_rt130_packet.EH_PAYLOAD = EH_PAYLOAD
class EHPacket(obspy_rt130_packet.EHPacket):
def __str__(self, compact=False):
def __init__(self, data: numpy.ndarray) -> None:
"""
Reimplement __init__ to change a different value for EH_PAYLOAD.
This should be the cleanest way to do it, seeing as any other way I
can think of modify EH_PAYLOAD in the original file, which can have
consequences that are not readily apparent.
:param data: the data of an EH packet. For more information, refer to
obspy.io.reftek.packet.PACKET_FINAL_DTYPE.
"""
self._data = data
payload = self._data["payload"].tobytes()
for name, (start, length, converter) in EH_PAYLOAD.items():
data = payload[start:start + length]
if converter is not None:
data = converter(data)
setattr(self, name, data)
def __str__(self, compact: bool = False) -> str:
if compact:
sta = (self.station_name.strip() +
self.station_name_extension.strip())
info = ("{:04d} {:2s} {:4s} {:2d} {:4d} {:4d} {:2d} {:2s} "
"{:5s} {:4s} {!s}").format(
self.packet_sequence, self.type.decode(),
self.unit_id.decode(), self.experiment_number,
self.byte_count, self.event_number,
self.data_stream_number, self.data_format.decode(),
sta, str(self.sampling_rate)[:4], self.time)
self.packet_sequence, self.type.decode(),
self.unit_id.decode(), self.experiment_number,
self.byte_count, self.event_number,
self.data_stream_number, self.data_format.decode(),
sta, str(self.sampling_rate)[:4], self.time)
else:
info = []
for key in self._headers:
......@@ -91,40 +129,16 @@ class EHPacket(obspy_rt130_packet.EHPacket):
"\n\t".join(info))
return info
def eh_et_info(self, nbr_DT_samples):
def eh_et_info(self, nbr_DT_samples: int) -> List[str]:
"""
Compile EH and ET info to write to log file.
Returns list of strings.
Formatting of strings is based on earlier version of rt2ms.
"""
info = []
# packet_tagline1 = ("\n\n{:s} exp {:02d} bytes {:04d} {:s} ID: {:s} "
# "seq {:04d}".format(self.type.decode(),
# self.experiment_number,
# self.byte_count,
# Packet.time_tag(self.time),
# self.unit_id.decode(),
# self.packet_sequence))
# info.append(packet_tagline1)
# if self.type.decode('ASCII') == 'EH':
# nbr_DT_samples = 0
# info.append("\nEvent Header")
# else:
# info.append("\nEvent Trailer")
# info.append("\n event = " + str(self.event_number))
# info.append("\n stream = " + str(self.data_stream_number + 1))
# info.append("\n format = " + self.data_format.decode('ASCII'))
# info.append("\n stream name = " + self.stream_name)
# info.append("\n sample rate = " + str(self.sampling_rate))
# info.append("\n trigger type = " + self.trigger_type)
trigger_time = Packet.time_tag(UTCDateTime(ns=self.trigger_time))
# info.append("\n trigger time = " + trigger_time)
first_sample_time = Packet.time_tag(UTCDateTime(ns=self.first_sample_time)) # noqa: E501
# info.append("\n first sample = " + first_sample_time)
# if self.last_sample_time:
# info.append("\n last sample = " + Packet.time_tag(UTCDateTime(ns=self.last_sample_time))) # noqa: E501
# info.append("\n bit weights = " + " ".join([val for val in self.channel_adjusted_nominal_bit_weights if val])) # noqa: E501
# info.append("\n true weights = " + " ".join([val for val in self.channel_true_bit_weights if val])) # noqa: E501
trigger_time = SOHPacket.time_tag(UTCDateTime(ns=self.trigger_time))
first_sample_time = SOHPacket.time_tag(
UTCDateTime(ns=self.first_sample_time)) # noqa: E501
packet_tagline2 = ("\nDAS: {:s} EV: {:04d} DS: {:d} FST = {:s} TT = "
"{:s} NS: {:d} SPS: {:.1f} ETO: 0"
.format(self.unit_id.decode(),
......
from typing import Tuple, Any
import numpy
from obspy.io.reftek.util import bcd
from sohstationviewer.model.mseed_data.record_reader_helper import Unpacker
from sohstationviewer.model.reftek.reftek_data.packet import \
eh_et_payload_end_in_packet
from sohstationviewer.model.reftek.reftek_data.packets import (
DTExtendedHeader,
EHETExtendedHeader, SOHExtendedHeader,
)
def decode_uncompressed(packet: bytes, data_format: str, unpacker: Unpacker
) -> int:
"""
Grab the first data point in a packet that contains uncompressed RT130 data
(aka packets with data format 16, 32, or 33_.
:param packet: the bytes that make up the given packet.
:param data_format: the data format of the given packet, can be one of 16,
32, or 33.
:param unpacker: the unpacker to use to decode the data.
:return: the first data point in the given packet
"""
data = packet[24:]
# For uncompressed RT130 data, the data format is also the size of a data
# point in bit (aside from data format 33, which uses the same size as data
# format 32).
point_size = int(data_format)
if point_size == 33:
point_size = 32
# Convert the size of a data point to byte because the data is stored
# as a byte string.
point_size = point_size // 8
# struct.unpack uses different format characters for different point sizes.
format_char = {2: 'h', 4: 'i'}[point_size]
first_data_point = data[:point_size]
return unpacker.unpack(f'{format_char}', first_data_point)[0]
def decode_compressed(packet: bytes, data_format: str, unpacker: Unpacker
) -> int:
"""
Grab the stop point in a packet that contains compressed RT130 data (aka
packets with data format C0, C1, C2, or C3).
We get the stop point in this case because that is what logpeek did. It
also looks a lot better than using the start point, so that is a plus.
:param packet: the bytes that make up the given packet.
:param data_format: the data format of the given packet, can be one of C0,
C1, C2, or C3. Exist only to have the same signature as
decode_uncompressed
:param unpacker: the unpacker to use to decode the data.
:return: the first data point in the given packet
"""
# The data in a compressed data packet starts at byte 64, with bytes
# between byte 24 and 64 being fillers.
data = packet[64:]
first_data_point = data[8:12]
return unpacker.unpack('i', first_data_point)[0]
def read_dt_packet(packet: bytes, unpacker: Unpacker
) -> Tuple[DTExtendedHeader, Any]:
"""
Process a DT packet and get its extended header and first data point.
:param packet: the bytes that make up the given DT packet.
:param unpacker: the unpacker to use to decode the data.
:return: the extended header and first data point of the given DT packet.
"""
decoders = {
**dict.fromkeys(['16', '32', '33'], decode_uncompressed),
**dict.fromkeys(['C0', 'C1', 'C2', 'C3'], decode_compressed)
}
event_number = int(packet[16:18].hex())
data_stream_number = int(packet[18:19].hex())
channel_number = int(packet[19:20].hex())
number_of_samples = int(packet[20:22].hex())
flags = unpacker.unpack('B', packet[22:23])[0]
data_format = packet[23:24].hex().upper()
extended_header = DTExtendedHeader(event_number, data_stream_number,
channel_number, number_of_samples,
flags, data_format)
first_data_point = decoders[data_format](packet, data_format, unpacker)
return extended_header, first_data_point
def read_eh_et_packet(packet: bytes, unpacker: Unpacker
) -> Tuple[EHETExtendedHeader, bytes]:
"""
Process an EH/ET packet and get its extended header and required part of
the payload.
:param packet: the bytes that make up the given EH/ET packet.
:param unpacker: the unpacker to use to decode the data.
:return: the extended header and truncated payload of the given EH/ET
packet.
"""
event_number = int(packet[16:18].hex())
data_stream_number = int(packet[18:19].hex())
flags = unpacker.unpack('B', packet[22:23])[0]
data_format = packet[23:24].hex().upper()
extended_header = EHETExtendedHeader(event_number, data_stream_number,
flags, data_format)
# The largest possible data point has a size of 4 bytes, so we need to
# grab at least data.
payload = packet[24:eh_et_payload_end_in_packet]
return extended_header, payload
def bcd_16bit_int(_i) -> int:
"""
Reimplement a private function of the same name in obspy. Kept here in case
the private function is removed in a future obspy version.
:param _i: the byte string to convert into a 16-bite integer
:return: a 16-bit integer
"""
_i = bcd(_i)
return _i[0] * 100 + _i[1]
def read_soh_packet(packet: bytes, unpacker: Unpacker
) -> Tuple[SOHExtendedHeader, bytes]:
"""
Process an SOH packet and get its extended header and poyload.
:param packet: the bytes that make up the given SOH packet.
:param unpacker: the unpacker to use to decode the data.
:return: the extended header and payload of the given SOH packet.
"""
event_number = bcd_16bit_int(numpy.frombuffer(packet[16:18], numpy.uint8))
data_stream_number = bcd(numpy.frombuffer(packet[18:19], numpy.uint8))
channel_number = bcd(numpy.frombuffer(packet[19:20], numpy.uint8))
number_of_samples = bcd_16bit_int(
numpy.frombuffer(packet[20:22], numpy.uint8)
)
flags = unpacker.unpack('B', packet[22:23])[0]
data_format = packet[23:24].hex().upper()
extended_header = SOHExtendedHeader(event_number, data_stream_number,
channel_number, number_of_samples,
flags, data_format)
payload = packet[24:]
return extended_header, payload
import dataclasses
from sohstationviewer.model.reftek.reftek_data.header import PacketHeader
@dataclasses.dataclass
class DTExtendedHeader:
"""
The extended header of a DT packet.
"""
event_number: int
data_stream_number: int
channel_number: int
number_of_samples: int
flags: int
data_format: str
@dataclasses.dataclass
class DTPacket:
"""
The decoded data of a DT packet.
"""
header: PacketHeader
extended_header: DTExtendedHeader
data: int
@dataclasses.dataclass
class EHETExtendedHeader:
"""
A collection of some useful information about an EH/ET packet. Technically,
EH/ET packets do not have extended headers. We name this class what it is
due to the way obspy.Reftek130 (and consequently, core.Reftek130) stores
the data of processed packets. For more information, refer to
Reftek130._data.
"""
event_number: int
data_stream_number: int
flags: int
data_format: str
def __post_init__(self):
self.channel_number = 0
self.number_of_samples = 0
@dataclasses.dataclass
class EHETPacket:
"""
The decoded data of an EH/ET packet. The extended_header field is to ensure
compatibility with dt_packet.DTPacket. EH/ET packets do not have an
extended header otherwise.
"""
header: PacketHeader
extended_header: EHETExtendedHeader
data: bytes
@dataclasses.dataclass
class SOHExtendedHeader:
"""
A collection of dummy data for some information needed so that
core.Reftek130 can understand SOH packets.
core.Reftek130 focuses on reading waveform data, so it wants information
available in the waveform packets (EH/ET/DT). However, core.Reftek130 also
supports SOH packets, which does not contain the required information. As
a result, we need to store dummy data in its place.
"""
event_number: int
data_stream_number: int
channel_number: int
number_of_samples: int
flags: int
data_format: str
@dataclasses.dataclass
class SOHPacket:
"""
The decoded data of an SOH packet. The extended_header field is to ensure
compatibility with dt_packet.DTPacket. SOH packets do not have an
extended header otherwise.
"""
header: PacketHeader
extended_header: SOHExtendedHeader
data: bytes
import os
from typing import Any, Dict, Callable, Union, List, Tuple
import numpy
import numpy as np
from sohstationviewer.model.mseed_data.record_reader_helper import Unpacker
from sohstationviewer.model.reftek.reftek_data.packet import \
eh_et_payload_end_in_packet
from sohstationviewer.model.reftek.reftek_data.packet_readers import (
read_dt_packet, read_eh_et_packet, read_soh_packet,
)
from sohstationviewer.model.reftek.reftek_data.packets import (
DTPacket, EHETPacket, SOHPacket,
)
from sohstationviewer.model.reftek.reftek_data.header import \
get_rt130_packet_header
def packet_reader_placeholder(*args: Any, **kwargs: Any) -> Tuple[Any, Any]:
"""
Placeholder function to be used in place of an RT130 packet reader
function. This function immediately returns None.
"""
return None, None
def read_rt130_file(file_name: str, unpacker: Unpacker
) -> List[Union[EHETPacket, DTPacket, SOHPacket]]:
"""
Read an RT130 file and stores the data in a list of RT130 packets.
:param file_name: the name of the file to read.
:param unpacker: the decoder used to decode the data.
:return: a list of processed RT130 packets.
"""
# RT130 data looks to be all big-endian (logpeek assumes this, and it has
# been working pretty well), so we don't have to do any endianness check.
packets = []
with open(file_name, 'rb') as rt130_file:
# Each packet is exactly 1024 bytes, so we can rely on that to know
# when we have finished reading.
for i in range(os.path.getsize(file_name) // 1024):
packet = rt130_file.read(1024)
packet_header = get_rt130_packet_header(packet)
waveform_handlers: Dict[str, Callable] = {
'EH': read_eh_et_packet,
'ET': read_eh_et_packet,
'DT': read_dt_packet,
}
soh_handlers: Dict[str, Callable] = dict.fromkeys(
['AD', 'CD', 'DS', 'FD', 'OM', 'SC', 'SH'],
read_soh_packet
)
packet_handlers = {
**waveform_handlers, **soh_handlers
}
packet_handler = packet_handlers.get(
packet_header.packet_type, packet_reader_placeholder
)
return_val = packet_handler(packet, unpacker)
if packet_header.packet_type == 'DT':
packet_type = DTPacket
elif packet_header.packet_type in ['EH', 'ET']:
packet_type = EHETPacket
else:
packet_type = SOHPacket
extended_header, data = return_val
current_packet = packet_type(packet_header, extended_header, data)
packets.append(current_packet)
return packets
def convert_packet_to_obspy_format(packet: Union[EHETPacket, DTPacket,
SOHPacket],
unpacker: Unpacker) -> Tuple:
"""
Convert an RT130 packet into a numpy array of type PACKET_FINAL_DTYPE
:param packet: an RT130 packet.
:param unpacker: the decoder used to decode the data.
:return: a tuple that can be converted into an object of type
PACKET_FINAL_DTYPE that contains the data stored in packet.
"""
# We want to convert the packet to a tuple. In order to make it easier to
# maintain, we first convert the packet to a dictionary. Then, we grab the
# values of the dictionary as tuple to get the final result.
converted_packet = {}
converted_packet['packet_type'] = packet.header.packet_type
converted_packet['experiment_number'] = packet.header.experiment_number
# Obspy only stores the last two digits of the year.
converted_packet['year'] = packet.header.time.year % 100
converted_packet['unit_id'] = packet.header.unit_id
converted_packet['time'] = packet.header.time.ns
converted_packet['byte_count'] = packet.header.byte_count
converted_packet['packet_sequence'] = packet.header.packet_sequence
converted_packet['event_number'] = packet.extended_header.event_number
converted_packet[
'data_stream_number'] = packet.extended_header.data_stream_number
converted_packet['channel_number'] = packet.extended_header.channel_number
converted_packet[
'number_of_samples'] = packet.extended_header.number_of_samples
converted_packet['flags'] = packet.extended_header.flags
converted_packet['data_format'] = packet.extended_header.data_format
if converted_packet['packet_type'] == 'DT':
# Obspy stores the data as list of 1-byte integers. We store the
# data as an arbitrary length integer, so we need to do some
# conversion. To make encoding and decoding the data point easier, we
# store it in 4 bytes no matter what the data format is. This only
# has an effect on data with format 16. Thanks to a quirk with
# 2-complement binary encoding, however, this does not cause any
# problem.
data_size = 4
format_char = 'B'
converted_packet['payload'] = numpy.empty(1000, np.uint8)
packet_data = list(unpacker.unpack(
f'{data_size}{format_char}',
packet.data.to_bytes(data_size, 'big', signed=True)
))
converted_packet['payload'][:4] = packet_data
elif converted_packet['packet_type'] in ['EH', 'ET']:
eh_et_payload_size = eh_et_payload_end_in_packet - 24
converted_packet['payload'] = numpy.empty(1000, np.uint8)
packet_data = numpy.frombuffer(packet.data, np.uint8)
converted_packet['payload'][:eh_et_payload_size] = packet_data
else:
converted_packet['payload'] = numpy.frombuffer(packet.data, np.uint8)
return tuple(converted_packet.values())
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import Optional, List
"""
Routines building upon obspy.io.reftek.packet.
Redefine packet header (PACKET) based on rt130 manual.
......@@ -268,14 +270,14 @@ FD_INFO = {
"_coeff": (984, None)}
class Packet(obspy_rt130_packet.Packet):
class SOHPacket(obspy_rt130_packet.Packet):
"""Class used to define shared tools for the SOH packets"""
_headers = ('experiment_number', 'unit_id', 'byte_count',
'packet_sequence', 'time')
@staticmethod
def from_data(data):
def from_data(data: np.ndarray) -> SOHPacket:
"""
Checks for valid packet type identifier and returns appropriate
packet object
......@@ -300,7 +302,7 @@ class Packet(obspy_rt130_packet.Packet):
raise NotImplementedError(msg.format(packet_type))
@staticmethod
def time_tag(time, implement_time=None):
def time_tag(time: UTCDateTime, implement_time: Optional[int] = None):
if implement_time is not None and time > UTCDateTime(ns=implement_time): # noqa: E501
time = UTCDateTime(ns=implement_time)
return "{:04d}:{:03d}:{:02d}:{:02d}:{:02d}:{:03d}".format(time.year,
......@@ -311,20 +313,14 @@ class Packet(obspy_rt130_packet.Packet):
time.microsecond) # noqa: E501
@property
def packet_tagline(self):
def packet_tagline(self) -> str:
return "\n"
# return "\n\n{:s} exp {:02d} bytes {:04d} {:s} ID: {:s} seq {:04d}".format(self.type.decode(), # noqa: E501
# self.experiment_number, # noqa: E501
# self.byte_count, # noqa: E501
# self.time_tag(self.time), # noqa: E501
# self.unit_id.decode(), # noqa: E501
# self.packet_sequence) # noqa: E501
class SHPacket(Packet):
class SHPacket(SOHPacket):
"""Class used to parse and generate string representation for SH packets"""
def __init__(self, data):
def __init__(self, data: np.ndarray) -> None:
self._data = data
payload = self._data["payload"].tobytes()
start_sh = 0
......@@ -341,7 +337,7 @@ class SHPacket(Packet):
setattr(self, name, data)
start_sh = start_sh + length
def __str__(self):
def __str__(self) -> str:
info = []
# info.append(self.packet_tagline)
packet_soh_string = ("\nState of Health {:s} ST: {:s}"
......@@ -352,10 +348,10 @@ class SHPacket(Packet):
return info
class SCPacket(Packet):
class SCPacket(SOHPacket):
"""Class used to parse and generate string representation for SC packets"""
def __init__(self, data):
def __init__(self, data: np.ndarray) -> None:
# Station/Channel payload
self._data = data
payload = self._data["payload"].tobytes()
......@@ -389,12 +385,8 @@ class SCPacket(Packet):
setattr(self, name, data)
start_info = start_info + length
def __str__(self):
def __str__(self) -> str:
info = []
# info.append(self.packet_tagline)
# packet_soh_string = ("\nStation Channel Definition {:s} ST: {:s}"
# .format(self.time_tag(self.time, implement_time=self.implement_time), # noqa: E501
# self.unit_id.decode()))
packet_soh_string = ("\nStation Channel Definition {:s} ST: {:s}"
.format(self.time_tag(self.time),
self.unit_id.decode()))
......@@ -430,7 +422,7 @@ class SCPacket(Packet):
info.append("\n Comments - " + getattr(self, 'sc' + str(ind_sc) + '_comments')) # noqa: E501
return info
def get_info(self, infos):
def get_info(self, infos: List[List]) -> List[List]:
"""
Compile relevant information - unit id, reference channel, network
code, station code, component code, gain and implementation time - for
......@@ -461,10 +453,10 @@ class SCPacket(Packet):
return infos
class OMPacket(Packet):
class OMPacket(SOHPacket):
"""Class used to parse and generate string representation for OM packets"""
def __init__(self, data):
def __init__(self, data: np.ndarray) -> None:
self._data = data
payload = self._data["payload"].tobytes()
start_om = 0
......@@ -481,7 +473,7 @@ class OMPacket(Packet):
setattr(self, name, data)
start_om = start_om + length
def __str__(self):
def __str__(self) -> str:
info = []
# info.append(self.packet_tagline)
packet_soh_string = ("\nOperating Mode Definition {:s} ST: {:s}"
......@@ -503,10 +495,10 @@ class OMPacket(Packet):
return info
class DSPacket(Packet):
class DSPacket(SOHPacket):
"""Class used to parse and generate string representation for DS packets"""
def __init__(self, data):
def __init__(self, data: np.ndarray) -> None:
# Data Stream payload
self._data = data
payload = self._data["payload"].tobytes()
......@@ -561,7 +553,7 @@ class DSPacket(Packet):
msg = ("Trigger type {:s} not found".format(trigger_type))
warnings.warn(msg)
def __str__(self):
def __str__(self) -> str:
info = []
info.append(self.packet_tagline)
packet_soh_string = ("\nData Stream Definition {:s} ST: {:s}"
......@@ -597,7 +589,7 @@ class DSPacket(Packet):
info.append(" ".join(["\n Trigger", key, trigger_info])) # noqa: E501
return info
def get_info(self, infos):
def get_info(self, infos: List[List]) -> List[List]:
"""
Compile relevant information - reference data stream, band and
instrument codes, sample rate and implementation time - for given DS
......@@ -624,10 +616,10 @@ class DSPacket(Packet):
return infos
class ADPacket(Packet):
class ADPacket(SOHPacket):
"""Class used to parse and generate string representation for AD packets"""
def __init__(self, data):
def __init__(self, data: np.ndarray) -> None:
self._data = data
payload = self._data["payload"].tobytes()
start_ad = 0
......@@ -644,7 +636,7 @@ class ADPacket(Packet):
setattr(self, name, data)
start_ad = start_ad + length
def __str__(self):
def __str__(self) -> str:
info = []
# info.append(self.packet_tagline)
packet_soh_string = ("\nAuxiliary Data Parameter {:s} ST: {:s}"
......@@ -664,10 +656,10 @@ class ADPacket(Packet):
return info
class CDPacket(Packet):
class CDPacket(SOHPacket):
"""Class used to parse and generate string representation for CD packets"""
def __init__(self, data):
def __init__(self, data: np.ndarray) -> None:
# Calibration parameter payload
self._data = data
payload = self._data["payload"].tobytes()
......@@ -736,7 +728,7 @@ class CDPacket(Packet):
setattr(self, name, data)
start_info_seq = start_info_seq + length
def __str__(self):
def __str__(self) -> str:
info = []
# info.append(self.packet_tagline)
packet_soh_string = ("\nCalibration Definition {:s} ST: {:s}"
......@@ -790,10 +782,10 @@ class CDPacket(Packet):
return info
class FDPacket(Packet):
class FDPacket(SOHPacket):
"""Class used to parse and generate string representation for FD packets"""
def __init__(self, data):
def __init__(self, data: np.ndarray) -> None:
# Filter description payload
self._data = data
payload = self._data["payload"]
......@@ -845,7 +837,7 @@ class FDPacket(Packet):
setattr(self, name, data)
start_info = start_info + length
def __str__(self):
def __str__(self) -> str:
info = []
# info.append(self.packet_tagline)
packet_soh_string = ("\nFilter Description {:s} ST: {:s}"
......@@ -873,7 +865,7 @@ class FDPacket(Packet):
return info
@staticmethod
def twosCom_bin2dec(bin_, digit):
def twosCom_bin2dec(bin_: str, digit: int):
while len(bin_) < digit:
bin_ = '0' + bin_
if bin_[0] == '0':
......@@ -882,7 +874,7 @@ class FDPacket(Packet):
return -1 * (int(''.join('1' if x == '0' else '0' for x in bin_), 2) + 1) # noqa: E501
@staticmethod
def twosCom_dec2bin(dec, digit):
def twosCom_dec2bin(dec: int, digit: int):
if dec >= 0:
bin_ = bin(dec).split("0b")[1]
while len(bin_) < digit:
......@@ -893,7 +885,7 @@ class FDPacket(Packet):
return bin(dec - pow(2, digit)).split("0b")[1]
def _initial_unpack_packets_soh(bytestring):
def _initial_unpack_packets_soh(bytestring: bytes) -> np.ndarray:
"""
First unpack data with dtype matching itemsize of storage in the reftek
file, than allocate result array with dtypes for storage of python
......
import os
import unittest
from pathlib import Path
import numpy
import obspy.core
from numpy.testing import assert_array_equal
from sohstationviewer.model.reftek.reftek_data.core import (
DiscontinuousTrace,
Reftek130,
)
from sohstationviewer.model.reftek.reftek_data.header import NotRT130FileError
class TestDiscontinuousTrace(unittest.TestCase):
def setUp(self) -> None:
data = numpy.arange(1024)
stub_stats = obspy.core.Stats()
times = numpy.arange(1024)
self.trace = DiscontinuousTrace(data, stub_stats, times=times)
def test_times_argument_is_stored(self):
self.assertTrue(hasattr(self.trace, '_times'))
def test_times_utcdatetime(self):
with self.assertRaises(NotImplementedError):
self.trace.times('utcdatetime')
def test_times_matplotlib(self):
with self.assertRaises(NotImplementedError):
self.trace.times('matplotlib')
def test_times_relative(self):
with self.subTest('test_relative_to_start_time'):
# The default start time of a trace is 0 anyhow, but we write that
# down explicitly for clarity.
self.trace.stats.starttime = obspy.core.UTCDateTime(0)
expected = numpy.arange(1024)
assert_array_equal(self.trace.times('relative'), expected)
with self.subTest('test_relative_to_given_reftime'):
reftime = obspy.core.UTCDateTime(0)
expected = numpy.arange(1024)
assert_array_equal(self.trace.times('relative', reftime),
expected)
reftime = obspy.core.UTCDateTime(1024)
expected = numpy.arange(-1024, 0)
assert_array_equal(self.trace.times('relative', reftime),
expected)
reftime = obspy.core.UTCDateTime(-1024)
expected = numpy.arange(1024, 2048)
assert_array_equal(self.trace.times('relative', reftime),
expected)
def test_times_timestamp(self):
expected = numpy.arange(1024)
assert_array_equal(self.trace.times('timestamp'), expected)
class TestReftek130FromFile(unittest.TestCase):
def setUp(self) -> None:
self.TEST_DATA_DIR = Path(os.getcwd()).joinpath('tests/test_data')
self.rt130_dir = self.TEST_DATA_DIR.joinpath(
'RT130-sample/2017149.92EB/2017150/92EB'
)
def test_rt130_file(self):
file = self.rt130_dir.joinpath('0/000000000_00000000')
rt130 = Reftek130.from_file(file)
self.assertIsInstance(rt130, Reftek130)
def test_rt130_soh_file(self):
file = self.rt130_dir.joinpath('0/000000000_00000000')
rt130 = Reftek130.from_file(file)
# The most common SOH packet type looks to be SH, so we use that as
# the default.
self.assertIn(b'SH', rt130._data['packet_type'])
def test_rt130_raw_data_file(self):
file = self.rt130_dir.joinpath('1/000000015_0036EE80')
rt130 = Reftek130.from_file(file)
assert_array_equal(
numpy.unique(numpy.sort(rt130._data['packet_type'])),
numpy.sort([b'EH', b'DT', b'ET'])
)
def test_non_rt130_file(self):
with self.subTest('test_file_exist'):
test_file = self.TEST_DATA_DIR.joinpath(
'Q330-sample/day_vols_AX08/AX08.XA..HHE.2021.186'
)
with self.assertRaises(NotRT130FileError):
Reftek130.from_file(test_file)
with self.subTest('test_file_does_not_exist'):
test_file = ''
with self.assertRaises(FileNotFoundError):
Reftek130.from_file(test_file)
import unittest
from sohstationviewer.model.reftek.reftek_data.header import (
parse_rt130_time,
get_rt130_packet_header, NotRT130FileError,
)
class TestParseRT130Time(unittest.TestCase):
def test_time_bytes_parsed_correctly(self):
time_bytes = b'\x36\x01\x15\x13\x51\x35'
year = 15
result = parse_rt130_time(year, time_bytes)
self.assertEqual(result.julday, 360)
self.assertEqual(result.day, 26)
self.assertEqual(result.month, 12)
self.assertEqual(result.hour, 11)
self.assertEqual(result.minute, 51)
self.assertEqual(result.second, 35)
self.assertEqual(result.microsecond, 135000)
self.assertEqual(result.ns, 1451130695135000000)
def test_year_1900s(self):
time_bytes = b'\x36\x01\x15\x13\x51\x35'
year = 71
result = parse_rt130_time(year, time_bytes)
self.assertEqual(result.year, 1971)
def test_year_2000s(self):
time_bytes = b'\x36\x01\x15\x13\x51\x35'
year = 12
result = parse_rt130_time(year, time_bytes)
self.assertEqual(result.year, 2012)
def test_year_threshold(self):
with self.subTest('test_year_is_49'):
time_bytes = b'\x36\x01\x15\x13\x51\x35'
year = 49
result = parse_rt130_time(year, time_bytes)
self.assertEqual(result.year, 2049)
with self.subTest('test_year_is_50'):
time_bytes = b'\x36\x01\x15\x13\x51\x35'
year = 50
result = parse_rt130_time(year, time_bytes)
self.assertEqual(result.year, 1950)
class TestGetRT130PacketHeader(unittest.TestCase):
def test_header_extracted_correctly(self):
header = b'DT\x12\x15\x98\xe1\x36\x01\x15\x13\x51\x35\x05\x12\x01\x11'
packet = header + b' ' * 1008
result = get_rt130_packet_header(packet)
self.assertEqual(result.packet_type, 'DT')
self.assertEqual(result.experiment_number, 12)
self.assertEqual(result.unit_id, '98E1')
self.assertEqual(result.time.ns, 1451130695135000000)
self.assertEqual(result.byte_count, 512)
self.assertEqual(result.packet_sequence, 111)
def test_packet_type_cannot_be_parsed(self):
packet_type = b'\x01\x02'
header = packet_type + b'\x11' * 14
packet = header + b' ' * 1008
with self.assertRaises(NotRT130FileError):
get_rt130_packet_header(packet)
def test_packet_type_is_not_valid(self):
packet_type = b'AB'
header = packet_type + b'\x11' * 14
packet = header + b' ' * 1008
with self.assertRaises(NotRT130FileError):
get_rt130_packet_header(packet)
import unittest
from unittest.mock import patch
from sohstationviewer.model.mseed_data.record_reader_helper import Unpacker
from sohstationviewer.model.reftek.reftek_data.packet import \
eh_et_payload_end_in_packet
from sohstationviewer.model.reftek.reftek_data.packet_readers import (
decode_uncompressed, decode_compressed, read_dt_packet, read_eh_et_packet,
read_soh_packet,
)
from sohstationviewer.model.reftek.reftek_data.packets import SOHExtendedHeader
unpacker = Unpacker('>')
class TestDecodeFunctions(unittest.TestCase):
def setUp(self) -> None:
self.header = b' ' * 24
def test_decode_uncompressed_format_16(self):
data_format = '16'
with self.subTest('test_positive_number'):
first_data_point_byte = b'\x06\x19'
data_filler = b' ' * 998
packet = self.header + first_data_point_byte + data_filler
actual = decode_uncompressed(packet, data_format, unpacker)
expected = 1561
self.assertEqual(actual, expected)
with self.subTest('test_negative_number'):
first_data_point_byte = b'\xf9\xe4'
data_filler = b' ' * 998
packet = self.header + first_data_point_byte + data_filler
actual = decode_uncompressed(packet, data_format, unpacker)
expected = -1564
self.assertEqual(actual, expected)
def test_decode_uncompressed_format_32(self):
data_format = '32'
with self.subTest('test_positive_number'):
first_data_point_byte = b'\x03\xc5\x4e\x9a'
data_filler = b' ' * 996
packet = self.header + first_data_point_byte + data_filler
actual = decode_uncompressed(packet, data_format, unpacker)
expected = 63262362
self.assertEqual(actual, expected)
with self.subTest('test_negative_number'):
first_data_point_byte = b'\xf6\xac\xba\x00'
data_filler = b' ' * 996
packet = self.header + first_data_point_byte + data_filler
actual = decode_uncompressed(packet, data_format, unpacker)
expected = -156452352
self.assertEqual(actual, expected)
def test_decode_uncompressed_format_33(self):
data_format = '33'
with self.subTest('test_positive_number'):
first_data_point_byte = b'\x03\xc5\x4e\x9a'
data_filler = b' ' * 996
packet = self.header + first_data_point_byte + data_filler
actual = decode_uncompressed(packet, data_format, unpacker)
expected = 63262362
self.assertEqual(actual, expected)
with self.subTest('test_negative_number'):
first_data_point_byte = b'\xf6\xac\xba\x00'
data_filler = b' ' * 996
packet = self.header + first_data_point_byte + data_filler
actual = decode_uncompressed(packet, data_format, unpacker)
expected = -156452352
self.assertEqual(actual, expected)
def test_decode_compressed(self):
data_format = 'C0'
filler = b' ' * 40
first_frame_code = b'\x00\x11\x11\x11'
start_data_point_byte = b'0000'
header = self.header + filler
bytes_before_data = header + first_frame_code + start_data_point_byte
with self.subTest('test_positive_number'):
end_point_byte = b'\x03\xc5\x4e\x9a'
data_filler = b' ' * 952
packet = bytes_before_data + end_point_byte + data_filler
actual = decode_compressed(packet, data_format, unpacker)
expected = 63262362
self.assertEqual(actual, expected)
with self.subTest('test_negative_number'):
end_point_byte = b'\xf6\xac\xba\x00'
data_filler = b' ' * 952
packet = bytes_before_data + end_point_byte + data_filler
actual = decode_compressed(packet, data_format, unpacker)
expected = -156452352
self.assertEqual(actual, expected)
class TestReadDTPacket(unittest.TestCase):
def setUp(self) -> None:
self.header = b' ' * 16
# We only test if the correct method is used to extract the data point,
# so the data can be anything we want.
self.data = b' ' * 1000
uncompressed_patcher = patch(
'sohstationviewer.model.reftek.reftek_data.packet_readers.'
'decode_uncompressed'
)
compressed_patcher = patch(
'sohstationviewer.model.reftek.reftek_data.packet_readers.'
'decode_compressed'
)
self.mock_uncompressed = uncompressed_patcher.start()
self.mock_compressed = compressed_patcher.start()
self.addCleanup(uncompressed_patcher.stop)
self.addCleanup(compressed_patcher.stop)
def test_extended_header_is_extracted_correctly(self):
extended_header_bytes = b'\x01\x11\x01\x02\x05\x00\x00\xc0'
packet = self.header + extended_header_bytes + self.data
extended_header, _ = read_dt_packet(packet, unpacker)
self.assertEqual(extended_header.event_number, 111)
self.assertEqual(extended_header.data_stream_number, 1)
self.assertEqual(extended_header.channel_number, 2)
self.assertEqual(extended_header.number_of_samples, 500)
self.assertEqual(extended_header.flags, 0)
self.assertEqual(extended_header.data_format, 'C0')
def test_data_point_extracted_with_correct_method(self):
with self.subTest('test_uncompressed_packet'):
extended_header_bytes = b'\x01\x11\x01\x02\x05\x00\x00\x16'
packet = self.header + extended_header_bytes + self.data
read_dt_packet(packet, unpacker)
self.assertTrue(self.mock_uncompressed.called)
self.assertFalse(self.mock_compressed.called)
self.mock_uncompressed.reset_mock()
self.mock_compressed.reset_mock()
with self.subTest('test_compressed_packet'):
extended_header_bytes = b'\x01\x11\x01\x02\x05\x00\x00\xc0'
packet = self.header + extended_header_bytes + self.data
read_dt_packet(packet, unpacker)
self.assertTrue(self.mock_compressed.called)
self.assertFalse(self.mock_uncompressed.called)
class TestReadEHETPacket(unittest.TestCase):
def setUp(self) -> None:
header = b' ' * 16
extended_header_bytes = b'\x01\x11\x01\x00\x00\x00\x00\xc0'
# We only care about the length of the payload (the content is dealt
# with somewhere else), and so it can contain dummy data.
payload = b' ' * 1000
self.packet = header + extended_header_bytes + payload
def test_extended_header_is_extracted_correctly(self):
extended_header, _ = read_eh_et_packet(self.packet, unpacker)
self.assertEqual(extended_header.event_number, 111)
self.assertEqual(extended_header.data_stream_number, 1)
self.assertEqual(extended_header.channel_number, 0)
self.assertEqual(extended_header.number_of_samples, 0)
self.assertEqual(extended_header.flags, 0)
self.assertEqual(extended_header.data_format, 'C0')
def test_payload_extracted_correctly(self):
_, payload = read_eh_et_packet(self.packet, unpacker)
self.assertEqual(len(payload), eh_et_payload_end_in_packet - 24)
class TestReadSOHPacket(unittest.TestCase):
"""
Test suite for packet_readers.read_soh_packet. We only test that the
function has the correct interface, seeing as the intended purpose of this
method is to be compatible with packet_readers.read_dt_packet and
packet_readers.read_eh_et_packet interface-wise.
"""
def test_correct_interface(self):
packet = b' ' * 1024
extended_header, payload = read_soh_packet(packet, unpacker)
self.assertIsInstance(extended_header, SOHExtendedHeader)
self.assertIsInstance(payload, bytes)
def test_payload_has_correct_length(self):
packet = b' ' * 1024
extended_header, payload = read_soh_packet(packet, unpacker)
self.assertEqual(len(payload), 1000)
import os
import unittest
from pathlib import Path
from unittest.mock import patch
from obspy.io.reftek.packet import PACKET_FINAL_DTYPE
from sohstationviewer.model.mseed_data.record_reader_helper import Unpacker
from sohstationviewer.model.reftek.reftek_data.header import NotRT130FileError
from sohstationviewer.model.reftek.reftek_data.packet_readers import (
read_eh_et_packet, read_dt_packet, read_soh_packet,
)
from sohstationviewer.model.reftek.reftek_data.packets import (
SOHPacket,
EHETPacket, DTPacket,
)
from sohstationviewer.model.reftek.reftek_data.reftek_helper import (
read_rt130_file, convert_packet_to_obspy_format,
)
unpacker = Unpacker('>')
class TestReadRT130File(unittest.TestCase):
def setUp(self) -> None:
self.TEST_DATA_DIR = Path(os.getcwd()).joinpath('tests/test_data')
self.rt130_dir = self.TEST_DATA_DIR.joinpath(
'RT130-sample/2017149.92EB/2017150/92EB'
)
eh_et_patcher = patch(
'sohstationviewer.model.reftek.reftek_data.reftek_helper.'
'read_eh_et_packet',
wraps=read_eh_et_packet
)
self.mock_read_eh_et = eh_et_patcher.start()
self.addCleanup(eh_et_patcher.stop)
dt_patcher = patch(
'sohstationviewer.model.reftek.reftek_data.reftek_helper.'
'read_dt_packet',
wraps=read_dt_packet
)
self.mock_read_dt = dt_patcher.start()
self.addCleanup(dt_patcher.stop)
soh_patcher = patch(
'sohstationviewer.model.reftek.reftek_data.reftek_helper.'
'read_soh_packet',
wraps=read_soh_packet
)
self.mock_read_soh = soh_patcher.start()
self.addCleanup(soh_patcher.stop)
def test_rt130_soh_file(self):
file = self.rt130_dir.joinpath('0/000000000_00000000')
packets = read_rt130_file(file, unpacker)
self.assertTrue(
all(isinstance(packet, SOHPacket) for packet in packets)
)
self.assertTrue(self.mock_read_soh.called)
self.assertFalse(self.mock_read_dt.called)
self.assertFalse(self.mock_read_eh_et.called)
def test_rt130_raw_data_file(self):
file = self.rt130_dir.joinpath('1/000000015_0036EE80')
packets = read_rt130_file(file, unpacker)
self.assertTrue(all(
isinstance(packet, EHETPacket) or isinstance(packet, DTPacket)
for packet in packets)
)
self.assertFalse(self.mock_read_soh.called)
self.assertTrue(self.mock_read_dt.called)
self.assertTrue(self.mock_read_eh_et.called)
def test_non_rt130_file(self):
with self.subTest('test_file_exist'):
file = self.TEST_DATA_DIR.joinpath(
'Q330-sample/day_vols_AX08/AX08.XA..HHE.2021.186'
)
with self.assertRaises(NotRT130FileError):
read_rt130_file(file, unpacker)
with self.subTest('test_file_does_not_exist'):
file = ''
with self.assertRaises(FileNotFoundError):
read_rt130_file(file, unpacker)
class TestConvertPacketToObspyFormat(unittest.TestCase):
def setUp(self) -> None:
TEST_DATA_DIR = Path(os.getcwd()).joinpath('tests/test_data')
rt130_dir = TEST_DATA_DIR.joinpath(
'RT130-sample/2017149.92EB/2017150/92EB'
)
file = rt130_dir.joinpath('1/000000015_0036EE80')
self.packet = read_rt130_file(file, unpacker)[0]
def test_all_needed_fields_are_available(self):
converted_packet = convert_packet_to_obspy_format(
self.packet, unpacker
)
self.assertEqual(len(converted_packet), len(PACKET_FINAL_DTYPE))