diff --git a/sohstationviewer/model/reftek/rt130_experiment/__init__.py b/sohstationviewer/model/reftek/rt130_experiment/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/sohstationviewer/model/reftek/rt130_experiment/dt_packet.py b/sohstationviewer/model/reftek/rt130_experiment/dt_packet.py new file mode 100644 index 0000000000000000000000000000000000000000..1b8bb71340bae4492a9752337cc57d01d57fa57f --- /dev/null +++ b/sohstationviewer/model/reftek/rt130_experiment/dt_packet.py @@ -0,0 +1,70 @@ +import dataclasses +from dataclasses import dataclass + +from sohstationviewer.model.mseed.read_mseed_experiment.mseed_helper import \ + Unpacker +from sohstationviewer.model.reftek.rt130_experiment.reftek_helper import \ + PacketHeader + + +def decode_uncompressed(packet: bytes, data_format: str, unpacker: Unpacker): + data = packet[24:] + # For uncompressed RT130 data, the data format is also the size of a data + # point in bit (aside from data format 33, which uses the same size as data + # format 32). + point_size = int(data_format) + if point_size == 33: + point_size = 32 + # Convert the size of a data point to byte because the data is stored + # as a byte string. + point_size = point_size // 8 + + # struct.unpack uses different format characters for different point sizes. + format_char = {2: 'h', 4: 'i'}[point_size] + + first_data_point = data[:point_size] + + return unpacker.unpack(f'{format_char}', first_data_point)[0] + + +def decode_compressed(packet: bytes, data_format: str, unpacker: Unpacker): + data = packet[64:] + first_data_point = data[4:8] + return unpacker.unpack('i', first_data_point)[0] + + +def read_dt_packet(packet: bytes, unpacker: Unpacker): + decoders = { + **dict.fromkeys(['16', '32', '33'], decode_uncompressed), + **dict.fromkeys(['C0', 'C1', 'C2', 'C3'], decode_compressed) + } + + event_number = int(packet[16:18].hex()) + data_stream_number = int(packet[18:19].hex()) + channel_number = int(packet[19:20].hex()) + number_of_samples = int(packet[20:22].hex()) + flags = unpacker.unpack('B', packet[22:23])[0] + data_format = packet[23:24].hex().upper() + + extended_header = DTExtendedHeader(event_number, data_stream_number, + channel_number, number_of_samples, + flags, data_format) + first_data_point = decoders[data_format](packet, data_format, unpacker) + return extended_header, first_data_point + + +@dataclasses.dataclass +class DTExtendedHeader: + event_number: int + data_stream_number: int + channel_number: int + number_of_samples: int + flags: int + data_format: str + + +@dataclasses.dataclass +class DTPacket: + header: PacketHeader + extended_header: DTExtendedHeader + data: int diff --git a/sohstationviewer/model/reftek/rt130_experiment/eh_et_packet.py b/sohstationviewer/model/reftek/rt130_experiment/eh_et_packet.py new file mode 100644 index 0000000000000000000000000000000000000000..74ae28a9f8db912e46eb904110bdc70256a20832 --- /dev/null +++ b/sohstationviewer/model/reftek/rt130_experiment/eh_et_packet.py @@ -0,0 +1,37 @@ +import dataclasses + +from sohstationviewer.model.mseed.read_mseed_experiment.mseed_helper import \ + Unpacker +from sohstationviewer.model.reftek.rt130_experiment.reftek_helper import \ + PacketHeader + + +def read_eh_et_packet(packet: bytes, unpacker: Unpacker): + event_number = int(packet[16:18].hex()) + data_stream_number = int(packet[18:19].hex()) + flags = unpacker.unpack('B', packet[22:23])[0] + data_format = packet[23:24].hex().upper() + + extended_header = EHETExtendedHeader(event_number, data_stream_number, + flags, data_format) + first_data_point = 9999 + return extended_header, first_data_point + + +@dataclasses.dataclass +class EHETExtendedHeader: + event_number: int + data_stream_number: int + flags: int + data_format: str + + def __post_init__(self): + self.channel_number = 0 + self.number_of_samples = 0 + + +@dataclasses.dataclass +class EHETPacket: + header: PacketHeader + extended_header: EHETExtendedHeader + data: int diff --git a/sohstationviewer/model/reftek/rt130_experiment/reftek.py b/sohstationviewer/model/reftek/rt130_experiment/reftek.py new file mode 100644 index 0000000000000000000000000000000000000000..4897a9f29c769813e444528d1202379d4c399b27 --- /dev/null +++ b/sohstationviewer/model/reftek/rt130_experiment/reftek.py @@ -0,0 +1,208 @@ +import os +import warnings +from typing import Callable, Dict, Union + +import numpy +import numpy as np +from obspy import UTCDateTime +from obspy.io.reftek.util import ( + bcd, bcd_hex, + bcd_julian_day_string_to_nanoseconds_of_year, bcd_16bit_int, bcd_8bit_hex, +) + +from sohstationviewer.model.mseed.read_mseed_experiment.mseed_helper import \ + Unpacker +from sohstationviewer.model.reftek.rt130_experiment.dt_packet import \ + ( + read_dt_packet, DTPacket, +) +from sohstationviewer.model.reftek.rt130_experiment.eh_et_packet import \ + ( + EHETPacket, read_eh_et_packet, +) +from sohstationviewer.model.reftek.rt130_experiment.reftek_helper import ( + print_error, packet_reader_placeholder, RT130ParseError, PacketHeader, +) + +from sohstationviewer.model.reftek.from_rt2ms import core + + +PACKET = [ + ("packet_type", "|S2", None, "S2"), + ("experiment_number", np.uint8, bcd, np.uint8), + ("year", np.uint8, bcd, np.uint8), + ("unit_id", (np.uint8, 2), bcd_hex, "S4"), + ("time", (np.uint8, 6), bcd_julian_day_string_to_nanoseconds_of_year, + np.int64), + ("byte_count", (np.uint8, 2), bcd_16bit_int, np.uint16), + ("packet_sequence", (np.uint8, 2), bcd_16bit_int, np.uint16), + ("event_number", (np.uint8, 2), bcd_16bit_int, np.uint16), + ("data_stream_number", np.uint8, bcd, np.uint8), + ("channel_number", np.uint8, bcd, np.uint8), + ("number_of_samples", (np.uint8, 2), bcd_16bit_int, np.uint32), + ("flags", np.uint8, None, np.uint8), + ("data_format", np.uint8, bcd_8bit_hex, "S2"), + # Temporarily store the payload here. + ("payload", (np.uint8, 4), None, (np.uint8, 4))] + +PACKET_FINAL_DTYPE = np.dtype([ + (name, dtype_final) + for name, dtype_initial, converter, dtype_final in PACKET]) + + +def parse_rt130_time(year: int, time_bytes: bytes) -> UTCDateTime: + time_string = time_bytes.hex() + # The time string has the format of DDDHHMMSSTTT, where + # D = day of year + # H = hour + # M = minute + # S = second + # T = microsecond + day_of_year, hour, minute, second, microsecond = ( + int(time_string[0:3]), + int(time_string[3:5]), + int(time_string[5:7]), + int(time_string[7:9]), + int(time_string[9:12]) + ) + # RT130 only stores the last two digits of the year. Because the + # documentation for RT130 does not define a way to retrieve the full year, + # we use Obspy's method. Accordingly, we convert 0-49 to 2000-2049 and + # 50-99 to 1950-1999. + if 0 <= year <= 49: + year += 2000 + elif 50 <= year <= 99: + year += 1900 + converted_time = UTCDateTime(year=year, julday=day_of_year, hour=hour, + minute=minute, second=second, + microsecond=microsecond) + return converted_time + + +def get_rt130_packet_header(rt130_packet: bytes, + unpacker: Unpacker) -> PacketHeader: + try: + # Because RT130 data is always big-endian, it is more convenient to + # use str.decode() than the unpacker. + packet_type = rt130_packet[:2].decode('ASCII') + except UnicodeError: + print_error(f'Cannot decode packet type.') + print_error('The given file does not appear to be a valid RT130 file.') + raise RT130ParseError + valid_packet_types = ['AD', 'CD', 'DS', 'DT', 'EH', 'ET', 'OM', 'SH', 'SC', + 'FD'] + if packet_type not in valid_packet_types: + print_error(f'Invalid packet type found: {packet_type}') + print_error('The given file does not appear to be a valid RT130 file.') + raise RT130ParseError + + experiment_number = unpacker.unpack('b', rt130_packet[2:3])[0] + year = unpacker.unpack('b', rt130_packet[3:4])[0] + # A call to str.upper() is needed because bytes.hex() makes any + # hexadecimal letter (i.e. ABCDEF) lowercase, while we want them to be + # uppercase for display purpose. + unit_id = rt130_packet[4:6].hex().upper() + time_bytes = rt130_packet[6:12] + packet_time = parse_rt130_time(year, time_bytes) + byte_count = int(rt130_packet[12:14].hex()) + packet_sequence = int(rt130_packet[14:16].hex()) + + return PacketHeader(packet_type, experiment_number, unit_id, packet_time, + byte_count, packet_sequence) + + +def read_rt130_file(file_name: str, unpacker: Unpacker): + # RT130 data looks to be all big-endian (logpeek assumes this, and it has + # been working pretty well), so we don't have to do any endianness check. + + if os.path.getsize(file_name) % 1024: + warnings.warn('The size of the data is not a multiple of 1024; it ' + 'might be truncated.') + + packets = [] + + with open(file_name, 'rb') as rt130_file: + for i in range(os.path.getsize(file_name) // 1024): + packet = rt130_file.read(1024) + packet_header = get_rt130_packet_header(packet, unpacker) + + packet_handlers: Dict[str, Callable] = { + 'SC': packet_reader_placeholder, + 'SH': packet_reader_placeholder, + 'EH': read_eh_et_packet, + 'ET': read_eh_et_packet, + 'DT': read_dt_packet, + } + + packet_handler = packet_handlers.get( + packet_header.packet_type, packet_reader_placeholder + ) + return_val = packet_handler(packet, unpacker) + if packet_header.packet_type == 'DT': + packet_type = DTPacket + elif packet_header.packet_type in ['EH', 'ET']: + packet_type = EHETPacket + + extended_header, data = return_val + current_packet = packet_type(packet_header, extended_header, data) + packets.append(current_packet) + + return packets + + +def convert_packet_to_obspy_format(packet: Union[EHETPacket, DTPacket], unpacker: Unpacker): + # We want to convert the packet to a tuple. In order to make it easier to + # maintain, we first convert the packet to a dictionary. Then, we grab the + # values of the dictionary as tuple to get the final result. + converted_packet = {} + converted_packet['packet_type'] = packet.header.packet_type + converted_packet['experiment_number'] = packet.header.experiment_number + # Obspy only stores the last two digits of the year. + converted_packet['year'] = packet.header.time.year % 100 + converted_packet['unit_id'] = packet.header.unit_id + # We need to convert the stored time from second to nanosecond + converted_packet['time'] = packet.header.time.timestamp * 10**9 + converted_packet['byte_count'] = packet.header.byte_count + converted_packet['packet_sequence'] = packet.header.packet_sequence + converted_packet['event_number'] = packet.extended_header.event_number + converted_packet['data_stream_number'] = packet.extended_header.data_stream_number + converted_packet['channel_number'] = packet.extended_header.channel_number + converted_packet['number_of_samples'] = packet.extended_header.number_of_samples + converted_packet['flags'] = packet.extended_header.flags + converted_packet['data_format'] = packet.extended_header.data_format + + # Obspy stores the data as list of 1-byte integers. We store the data as an + # arbitrary length integer, so we need to do some conversion. + # To make converting the resulting tuple to an element of a structured + # array of type PACKET_FINAL_DTYPE easier, we set the size of the payload + # to be 4. This only affect data with format 16, and as long as we are + # careful in self.to_stream, we don't even have to make a special case when + # decoding (note: this is possible because the affected data will be + # prepended with 0s when encoded. Then, during decoding, we will get the + # original data prepended with 0s, which is the original data.) + data_size = 4 + format_char = 'B' + converted_packet['payload'] = list(unpacker.unpack( + f'{data_size}{format_char}', + packet.data.to_bytes(data_size, 'big', signed=True) + )) + + return tuple(converted_packet.values()) + + +class Reftek130(core.Reftek130): + @staticmethod + def from_file(filename: str): + rt130_unpacker = Unpacker('>') + rt = Reftek130() + rt._filename = filename + packets_in_file = read_rt130_file(filename, rt130_unpacker) + converted_packets = [] + for packet in packets_in_file: + converted_packets.append(convert_packet_to_obspy_format(packet, rt130_unpacker)) + rt._data = numpy.array(converted_packets, PACKET_FINAL_DTYPE) + return rt + + + + diff --git a/sohstationviewer/model/reftek/rt130_experiment/reftek_helper.py b/sohstationviewer/model/reftek/rt130_experiment/reftek_helper.py new file mode 100644 index 0000000000000000000000000000000000000000..e90cd055f1918eb73728d2fef3f734c63c3da5ed --- /dev/null +++ b/sohstationviewer/model/reftek/rt130_experiment/reftek_helper.py @@ -0,0 +1,46 @@ +import dataclasses +from typing import Any + +from obspy import UTCDateTime + + +class bcolors: + HEADER = '\033[95m' + OKBLUE = '\033[94m' + OKCYAN = '\033[96m' + OKGREEN = '\033[92m' + WARNING = '\033[93m' + FAIL = '\033[91m' + ENDC = '\033[0m' + BOLD = '\033[1m' + UNDERLINE = '\033[4m' + + +def print_warning(message, *args, **kwargs): + print(bcolors.WARNING + message + bcolors.ENDC, *args, **kwargs) + + +def print_error(message, *args, **kwargs): + print(bcolors.FAIL + message + bcolors.ENDC, *args, **kwargs) + + +def packet_reader_placeholder(*args: Any, **kwargs: Any) -> None: + """ + Placeholder function to be used in place of an RT130 packet reader + function. This function immediately returns None. + """ + return None + + +class RT130ParseError(Exception): + pass + + +@dataclasses.dataclass +class PacketHeader: + packet_type: str + experiment_number: int + unit_id: str + time: UTCDateTime + byte_count: int + packet_sequence: int