diff --git a/sohstationviewer/model/mseed/read_mseed_experiment/decode_mseed.py b/sohstationviewer/model/mseed/read_mseed_experiment/decode_mseed.py index 166cf6f007c4bc742aba1425a2f66f9031221830..dc4d85396b81ef2962d365cad33d8f0a8acb2b0e 100644 --- a/sohstationviewer/model/mseed/read_mseed_experiment/decode_mseed.py +++ b/sohstationviewer/model/mseed/read_mseed_experiment/decode_mseed.py @@ -1,6 +1,3 @@ -import struct - - def decode_int16(buffer, unpacker): requested_bytes = buffer[:2] return unpacker.unpack('h', requested_bytes)[0] diff --git a/sohstationviewer/model/mseed/read_mseed_experiment/mseed_helper.py b/sohstationviewer/model/mseed/read_mseed_experiment/mseed_helper.py index f6ac85d1d3d4a54e70c65fb4467867fd1d786d96..59f41c5fe95b5fa42c165e6a64c2123d5c17eec4 100644 --- a/sohstationviewer/model/mseed/read_mseed_experiment/mseed_helper.py +++ b/sohstationviewer/model/mseed/read_mseed_experiment/mseed_helper.py @@ -6,10 +6,21 @@ from obspy import UTCDateTime class Unpacker: - def __init__(self, byte_order_char=''): + """ + A wrapper around struct.unpack() to unpack binary data without having to + explicitly define the byte order in the format string. Also restrict the + type of format to str and buffer to bytes. + """ + def __init__(self, byte_order_char: str = '') -> None: self.byte_order_char = byte_order_char - def unpack(self, format: str, buffer): + def unpack(self, format: str, buffer: bytes): + """ + + :param format: + :param buffer: the byte string + :return: a tuple containing the unpacked values. + """ default_byte_order_chars = ('@', '=', '>', '<', '!') if format.startswith(default_byte_order_chars): format = self.byte_order_char + format[:1] @@ -167,7 +178,9 @@ def get_record_metadata(header: FixedHeader, header_unpacker: Unpacker): sample_count = header_unpacker.unpack('H', header.sample_count)[0] - sample_rate_factor = header_unpacker.unpack('h', header.sample_rate_factor)[0] + sample_rate_factor = header_unpacker.unpack( + 'h', header.sample_rate_factor + )[0] sample_rate_multiplier = header_unpacker.unpack( 'h', header.sample_rate_multiplier )[0] @@ -176,5 +189,3 @@ def get_record_metadata(header: FixedHeader, header_unpacker: Unpacker): return RecordMetadata(station, location, channel, network, record_start_time, sample_count, sample_rate) - - diff --git a/sohstationviewer/model/mseed/read_mseed_experiment/mseed_reader.py b/sohstationviewer/model/mseed/read_mseed_experiment/mseed_reader.py index 3cfd423dc5f336765f9349c4e969821bbfdc4938..441dad773f046362ba5bc27827b2ac74c47c096d 100644 --- a/sohstationviewer/model/mseed/read_mseed_experiment/mseed_reader.py +++ b/sohstationviewer/model/mseed/read_mseed_experiment/mseed_reader.py @@ -1,4 +1,4 @@ -from numbers import Number, Real +from numbers import Real from typing import BinaryIO, Optional, List import obspy @@ -143,7 +143,9 @@ class RecordReader: '128s', clock_status )[0].decode('utf-8').strip() - formatted_blockette = '\n'.join([f'{key}: {value}' for key, value in blockette_content.items()]) + formatted_blockette = '\n'.join([f'{key}: {value}' + for key, value + in blockette_content.items()]) self.other_blockettes.append(formatted_blockette) def read_blockette_1000(self) -> None: @@ -264,8 +266,9 @@ class RecordReader: EncodingFormat.STEIM_1: decode_steim, EncodingFormat.STEIM_2: decode_steim, } - first_data_point = encoding_to_decoder[encoding_format](buffer, - self.data_unpacker) + first_data_point = encoding_to_decoder[encoding_format]( + buffer, self.data_unpacker + ) # Seek back to the start of the record so we can call this method again # if needed. self.file.seek(record_start) @@ -284,28 +287,28 @@ class MSeedReader: is_eof = (self.file.read(1) == b'') if is_eof: break - # We need to move the file pointer back to its position after we do - # the end of file check. Otherwise, we would be off by one byte for all - # the reads afterward. + # We need to move the file pointer back to its position after we + # do the end of file check. Otherwise, we would be off by one + # byte for all the reads afterward. self.file.seek(-1, 1) - # We save the start of the current record so that after we are done - # reading the record, we can move back. This makes moving to the next - # record a lot easier, seeing as we can simply move the file pointer - # a distance the size of the current record. + # We save the start of the current record so that after we are + # done reading the record, we can move back. This makes moving + # to the next record a lot easier, seeing as we can simply move + # the file pointer a distance the size of the current record. current_record_start = self.file.tell() reader = RecordReader(self.file) trace.append(reader.get_first_data_point()) - print(reader.other_blockettes[0]) # sample_count = reader.record_metadata.sample_count # sample_rate = reader.record_metadata.sample_rate # record_time_taken = sample_count / sample_rate - # record_end_time = reader.record_metadata.start_time + record_time_taken + # record_end_time = (reader.record_metadata.start_time + + # record_time_taken) - # MSEED stores the size of a data record as an exponent of a power of - # two, so we have to convert that to actual size before doing anything - # else. + # MSEED stores the size of a data record as an exponent of a + # power of two, so we have to convert that to actual size before + # doing anything else. record_length_exp = reader.header_unpacker.unpack( 'B', reader.blockette_1000.record_length )[0] @@ -313,12 +316,12 @@ class MSeedReader: self.file.seek(current_record_start) self.file.seek(record_size, 1) - print(trace) if __name__ == '__main__': # numpy.set_printoptions(threshold=sys.maxsize) - file_path = '/Users/kle/PycharmProjects/sohstationviewer/tests/test_data/DT0001__.ACE' + file_path = '/Users/kle/PycharmProjects/sohstationviewer/tests/test_data' \ + '/DT0001__.ACE ' file = open(file_path, 'rb') stream = obspy.read(file_path) MSeedReader(file).read()