Skip to content
Snippets Groups Projects
Commit 2a2271d5 authored by Kien Le's avatar Kien Le
Browse files

Implement new way to read individual RT130 file

parent d7dd754d
No related branches found
No related tags found
No related merge requests found
import dataclasses
from dataclasses import dataclass
from sohstationviewer.model.mseed.read_mseed_experiment.mseed_helper import \
Unpacker
from sohstationviewer.model.reftek.rt130_experiment.reftek_helper import \
PacketHeader
def decode_uncompressed(packet: bytes, data_format: str, unpacker: Unpacker):
data = packet[24:]
# For uncompressed RT130 data, the data format is also the size of a data
# point in bit (aside from data format 33, which uses the same size as data
# format 32).
point_size = int(data_format)
if point_size == 33:
point_size = 32
# Convert the size of a data point to byte because the data is stored
# as a byte string.
point_size = point_size // 8
# struct.unpack uses different format characters for different point sizes.
format_char = {2: 'h', 4: 'i'}[point_size]
first_data_point = data[:point_size]
return unpacker.unpack(f'{format_char}', first_data_point)[0]
def decode_compressed(packet: bytes, data_format: str, unpacker: Unpacker):
data = packet[64:]
first_data_point = data[4:8]
return unpacker.unpack('i', first_data_point)[0]
def read_dt_packet(packet: bytes, unpacker: Unpacker):
decoders = {
**dict.fromkeys(['16', '32', '33'], decode_uncompressed),
**dict.fromkeys(['C0', 'C1', 'C2', 'C3'], decode_compressed)
}
event_number = int(packet[16:18].hex())
data_stream_number = int(packet[18:19].hex())
channel_number = int(packet[19:20].hex())
number_of_samples = int(packet[20:22].hex())
flags = unpacker.unpack('B', packet[22:23])[0]
data_format = packet[23:24].hex().upper()
extended_header = DTExtendedHeader(event_number, data_stream_number,
channel_number, number_of_samples,
flags, data_format)
first_data_point = decoders[data_format](packet, data_format, unpacker)
return extended_header, first_data_point
@dataclasses.dataclass
class DTExtendedHeader:
event_number: int
data_stream_number: int
channel_number: int
number_of_samples: int
flags: int
data_format: str
@dataclasses.dataclass
class DTPacket:
header: PacketHeader
extended_header: DTExtendedHeader
data: int
import dataclasses
from sohstationviewer.model.mseed.read_mseed_experiment.mseed_helper import \
Unpacker
from sohstationviewer.model.reftek.rt130_experiment.reftek_helper import \
PacketHeader
def read_eh_et_packet(packet: bytes, unpacker: Unpacker):
event_number = int(packet[16:18].hex())
data_stream_number = int(packet[18:19].hex())
flags = unpacker.unpack('B', packet[22:23])[0]
data_format = packet[23:24].hex().upper()
extended_header = EHETExtendedHeader(event_number, data_stream_number,
flags, data_format)
first_data_point = 9999
return extended_header, first_data_point
@dataclasses.dataclass
class EHETExtendedHeader:
event_number: int
data_stream_number: int
flags: int
data_format: str
def __post_init__(self):
self.channel_number = 0
self.number_of_samples = 0
@dataclasses.dataclass
class EHETPacket:
header: PacketHeader
extended_header: EHETExtendedHeader
data: int
import os
import warnings
from typing import Callable, Dict, Union
import numpy
import numpy as np
from obspy import UTCDateTime
from obspy.io.reftek.util import (
bcd, bcd_hex,
bcd_julian_day_string_to_nanoseconds_of_year, bcd_16bit_int, bcd_8bit_hex,
)
from sohstationviewer.model.mseed.read_mseed_experiment.mseed_helper import \
Unpacker
from sohstationviewer.model.reftek.rt130_experiment.dt_packet import \
(
read_dt_packet, DTPacket,
)
from sohstationviewer.model.reftek.rt130_experiment.eh_et_packet import \
(
EHETPacket, read_eh_et_packet,
)
from sohstationviewer.model.reftek.rt130_experiment.reftek_helper import (
print_error, packet_reader_placeholder, RT130ParseError, PacketHeader,
)
from sohstationviewer.model.reftek.from_rt2ms import core
PACKET = [
("packet_type", "|S2", None, "S2"),
("experiment_number", np.uint8, bcd, np.uint8),
("year", np.uint8, bcd, np.uint8),
("unit_id", (np.uint8, 2), bcd_hex, "S4"),
("time", (np.uint8, 6), bcd_julian_day_string_to_nanoseconds_of_year,
np.int64),
("byte_count", (np.uint8, 2), bcd_16bit_int, np.uint16),
("packet_sequence", (np.uint8, 2), bcd_16bit_int, np.uint16),
("event_number", (np.uint8, 2), bcd_16bit_int, np.uint16),
("data_stream_number", np.uint8, bcd, np.uint8),
("channel_number", np.uint8, bcd, np.uint8),
("number_of_samples", (np.uint8, 2), bcd_16bit_int, np.uint32),
("flags", np.uint8, None, np.uint8),
("data_format", np.uint8, bcd_8bit_hex, "S2"),
# Temporarily store the payload here.
("payload", (np.uint8, 4), None, (np.uint8, 4))]
PACKET_FINAL_DTYPE = np.dtype([
(name, dtype_final)
for name, dtype_initial, converter, dtype_final in PACKET])
def parse_rt130_time(year: int, time_bytes: bytes) -> UTCDateTime:
time_string = time_bytes.hex()
# The time string has the format of DDDHHMMSSTTT, where
# D = day of year
# H = hour
# M = minute
# S = second
# T = microsecond
day_of_year, hour, minute, second, microsecond = (
int(time_string[0:3]),
int(time_string[3:5]),
int(time_string[5:7]),
int(time_string[7:9]),
int(time_string[9:12])
)
# RT130 only stores the last two digits of the year. Because the
# documentation for RT130 does not define a way to retrieve the full year,
# we use Obspy's method. Accordingly, we convert 0-49 to 2000-2049 and
# 50-99 to 1950-1999.
if 0 <= year <= 49:
year += 2000
elif 50 <= year <= 99:
year += 1900
converted_time = UTCDateTime(year=year, julday=day_of_year, hour=hour,
minute=minute, second=second,
microsecond=microsecond)
return converted_time
def get_rt130_packet_header(rt130_packet: bytes,
unpacker: Unpacker) -> PacketHeader:
try:
# Because RT130 data is always big-endian, it is more convenient to
# use str.decode() than the unpacker.
packet_type = rt130_packet[:2].decode('ASCII')
except UnicodeError:
print_error(f'Cannot decode packet type.')
print_error('The given file does not appear to be a valid RT130 file.')
raise RT130ParseError
valid_packet_types = ['AD', 'CD', 'DS', 'DT', 'EH', 'ET', 'OM', 'SH', 'SC',
'FD']
if packet_type not in valid_packet_types:
print_error(f'Invalid packet type found: {packet_type}')
print_error('The given file does not appear to be a valid RT130 file.')
raise RT130ParseError
experiment_number = unpacker.unpack('b', rt130_packet[2:3])[0]
year = unpacker.unpack('b', rt130_packet[3:4])[0]
# A call to str.upper() is needed because bytes.hex() makes any
# hexadecimal letter (i.e. ABCDEF) lowercase, while we want them to be
# uppercase for display purpose.
unit_id = rt130_packet[4:6].hex().upper()
time_bytes = rt130_packet[6:12]
packet_time = parse_rt130_time(year, time_bytes)
byte_count = int(rt130_packet[12:14].hex())
packet_sequence = int(rt130_packet[14:16].hex())
return PacketHeader(packet_type, experiment_number, unit_id, packet_time,
byte_count, packet_sequence)
def read_rt130_file(file_name: str, unpacker: Unpacker):
# RT130 data looks to be all big-endian (logpeek assumes this, and it has
# been working pretty well), so we don't have to do any endianness check.
if os.path.getsize(file_name) % 1024:
warnings.warn('The size of the data is not a multiple of 1024; it '
'might be truncated.')
packets = []
with open(file_name, 'rb') as rt130_file:
for i in range(os.path.getsize(file_name) // 1024):
packet = rt130_file.read(1024)
packet_header = get_rt130_packet_header(packet, unpacker)
packet_handlers: Dict[str, Callable] = {
'SC': packet_reader_placeholder,
'SH': packet_reader_placeholder,
'EH': read_eh_et_packet,
'ET': read_eh_et_packet,
'DT': read_dt_packet,
}
packet_handler = packet_handlers.get(
packet_header.packet_type, packet_reader_placeholder
)
return_val = packet_handler(packet, unpacker)
if packet_header.packet_type == 'DT':
packet_type = DTPacket
elif packet_header.packet_type in ['EH', 'ET']:
packet_type = EHETPacket
extended_header, data = return_val
current_packet = packet_type(packet_header, extended_header, data)
packets.append(current_packet)
return packets
def convert_packet_to_obspy_format(packet: Union[EHETPacket, DTPacket], unpacker: Unpacker):
# We want to convert the packet to a tuple. In order to make it easier to
# maintain, we first convert the packet to a dictionary. Then, we grab the
# values of the dictionary as tuple to get the final result.
converted_packet = {}
converted_packet['packet_type'] = packet.header.packet_type
converted_packet['experiment_number'] = packet.header.experiment_number
# Obspy only stores the last two digits of the year.
converted_packet['year'] = packet.header.time.year % 100
converted_packet['unit_id'] = packet.header.unit_id
# We need to convert the stored time from second to nanosecond
converted_packet['time'] = packet.header.time.timestamp * 10**9
converted_packet['byte_count'] = packet.header.byte_count
converted_packet['packet_sequence'] = packet.header.packet_sequence
converted_packet['event_number'] = packet.extended_header.event_number
converted_packet['data_stream_number'] = packet.extended_header.data_stream_number
converted_packet['channel_number'] = packet.extended_header.channel_number
converted_packet['number_of_samples'] = packet.extended_header.number_of_samples
converted_packet['flags'] = packet.extended_header.flags
converted_packet['data_format'] = packet.extended_header.data_format
# Obspy stores the data as list of 1-byte integers. We store the data as an
# arbitrary length integer, so we need to do some conversion.
# To make converting the resulting tuple to an element of a structured
# array of type PACKET_FINAL_DTYPE easier, we set the size of the payload
# to be 4. This only affect data with format 16, and as long as we are
# careful in self.to_stream, we don't even have to make a special case when
# decoding (note: this is possible because the affected data will be
# prepended with 0s when encoded. Then, during decoding, we will get the
# original data prepended with 0s, which is the original data.)
data_size = 4
format_char = 'B'
converted_packet['payload'] = list(unpacker.unpack(
f'{data_size}{format_char}',
packet.data.to_bytes(data_size, 'big', signed=True)
))
return tuple(converted_packet.values())
class Reftek130(core.Reftek130):
@staticmethod
def from_file(filename: str):
rt130_unpacker = Unpacker('>')
rt = Reftek130()
rt._filename = filename
packets_in_file = read_rt130_file(filename, rt130_unpacker)
converted_packets = []
for packet in packets_in_file:
converted_packets.append(convert_packet_to_obspy_format(packet, rt130_unpacker))
rt._data = numpy.array(converted_packets, PACKET_FINAL_DTYPE)
return rt
import dataclasses
from typing import Any
from obspy import UTCDateTime
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
def print_warning(message, *args, **kwargs):
print(bcolors.WARNING + message + bcolors.ENDC, *args, **kwargs)
def print_error(message, *args, **kwargs):
print(bcolors.FAIL + message + bcolors.ENDC, *args, **kwargs)
def packet_reader_placeholder(*args: Any, **kwargs: Any) -> None:
"""
Placeholder function to be used in place of an RT130 packet reader
function. This function immediately returns None.
"""
return None
class RT130ParseError(Exception):
pass
@dataclasses.dataclass
class PacketHeader:
packet_type: str
experiment_number: int
unit_id: str
time: UTCDateTime
byte_count: int
packet_sequence: int
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment