Skip to content
Snippets Groups Projects
Commit 61c4340d authored by Kien Le's avatar Kien Le
Browse files

First commit

parent a41506c6
No related branches found
No related tags found
1 merge request!132Fast MSEED read
Pipeline #2567 failed with stage
in 2 minutes and 29 seconds
import struct
def decode_int16(buffer, unpacker):
requested_bytes = buffer[:2]
return unpacker.unpack('h', requested_bytes)[0]
def decode_int24(buffer, unpacker):
requested_bytes = buffer[:3]
byte_order = 'big' if unpacker.byte_order_char == '>' else 'little'
# We delegate to int.from_bytes() because it takes a lot of work to make
# struct.unpack() handle signed 24-bits integers.
# See: https://stackoverflow.com/questions/3783677/how-to-read-integers-from-a-file-that-are-24bit-and-little-endian-using-python # noqa
return int.from_bytes(requested_bytes, byte_order)
def decode_int32(buffer, unpacker):
requested_bytes = buffer[:4]
return unpacker.unpack('i', requested_bytes)[0]
def decode_ieee_float(buffer, unpacker):
requested_bytes = buffer[:4]
return unpacker.unpack('f', requested_bytes)[0]
def decode_ieee_double(buffer, unpacker):
requested_bytes = buffer[:8]
return unpacker.unpack('d', requested_bytes)[0]
def decode_steim(buffer, unpacker):
# The first 4 bytes in a Steim frame is metadata of the record. Since we
# aren't decompressing the data, we are skipping. The next 4 bytes contain
# the first data point of the MSEED data record, which is what we need.
requested_bytes = buffer[4:8]
return unpacker.unpack('i', requested_bytes)[0]
from dataclasses import dataclass
import struct
from enum import Enum
from obspy import UTCDateTime
class Unpacker:
def __init__(self, byte_order_char=''):
self.byte_order_char = byte_order_char
def unpack(self, format: str, buffer):
default_byte_order_chars = ('@', '=', '>', '<', '!')
if format.startswith(default_byte_order_chars):
format = self.byte_order_char + format[:1]
else:
format = self.byte_order_char + format
return struct.unpack(format, buffer)
@dataclass
class FixedHeader:
sequence_number: bytes
data_header_quality_indicator: bytes
reserved: bytes
station: bytes
location: bytes
channel: bytes
net_code: bytes
record_start_time: bytes
sample_count: bytes
sample_rate_factor: bytes
sample_rate_multiplier: bytes
activity_flags: bytes
io_and_clock_flags: bytes
data_quality_flags: bytes
blockette_count: bytes
time_correction: bytes
data_offset: bytes
first_blockette_offset: bytes
@dataclass
class Blockette1000:
blockette_type: bytes
next_blockette_offset: bytes
encoding_format: bytes
byte_order: bytes
record_length: bytes
reserved_byte: bytes
@dataclass
class RecordMetadata:
station: str
location: str
channel: str
network: str
start_time: UTCDateTime
sample_count: int
sample_rate: float
class EncodingFormat(Enum):
ASCII = 0
INT_16_BIT = 1
INT_24_BIT = 2
INT_32_BIT = 3
IEEE_FLOAT_32_BIT = 4
IEEE_FLOAT_64_BIT = 5
STEIM_1 = 10
STEIM_2 = 11
def get_header_endianness(header: FixedHeader):
"""
Determine the endianness of the fixed header of a data record. Works by
checking if the decoded record start time has a sane value if the header
is assumed to be big-endian.
WARNING: This check fails on three dates: 2056-1, 2056-256, and 2056-257.
2056 is a palindrome when encoded as a pair of octets, so endianness does
not affect it. Similarly, 257 is also 2-octet-palindromic. Meanwhile, 1 and
256 are counterparts when encoded as pairs of octets. Because they are both
valid values for day of year, it is impossible to make a conclusion about
endianness based on day of year if it is either 1 or 256 in big-endian. On
the other hand, 257 is a palindrome. These two facts combined means that we
cannot determine the endianness of the header that starts on the
aforementioned dates. The way this function was written, the endianness
will be determined to be big in these cases. This problem is also recorded
in libmseed.
:param header: the fixed header of the data record
:return: either of the string 'big' or 'little' depending on the extracted
endianness of header
"""
record_start_time_string = header.record_start_time
record_start_time_tuple = struct.unpack('>hhbbbbh',
record_start_time_string)
# libmseed uses 1900 to 2100 as the sane year range. We follow their
# example here.
year_is_good = (1900 <= record_start_time_tuple[0] <= 2100)
# The upper range is 366 to account for leap years.
day_is_good = (1 <= record_start_time_tuple[1] <= 366)
endianness = 'big' if year_is_good and day_is_good else 'little'
return endianness
def get_data_endianness(blockette_1000: Blockette1000):
"""
Get endianness of a data record by examining blockette 1000.
:param fixed_header: the fixed header of the data record.
:param blockette_1000: the blockette 1000 of the data record.
"""
# The byte order is only one byte so using big or little endian does not
# matter.
blockette_1000_endianness = int.from_bytes(
blockette_1000.byte_order, 'big'
)
if blockette_1000_endianness:
return 'big'
else:
return 'little'
def calculate_sample_rate(factor: int, multiplier: int) -> float:
"""
Calculate the sample rate using the sample rate factor and multiplier. This
algorithm is described around the start of Chapter 8 in the SEED manual.
:param factor: the sample rate factor
:param multiplier: the sample rate multiplier
:return: the nominal sample rate
"""
sample_rate = 0
if factor == 0:
sample_rate = 0
elif factor > 0 and multiplier > 0:
sample_rate = factor * multiplier
elif factor > 0 and multiplier < 0:
sample_rate = -(factor / multiplier)
elif factor < 0 and multiplier > 0:
sample_rate = -(multiplier / factor)
elif factor < 0 and multiplier < 0:
sample_rate = 1 / (factor * multiplier)
return sample_rate
def get_record_metadata(header: FixedHeader, header_unpacker: Unpacker):
station = header.station.decode('utf-8').rstrip()
location = header.location.decode('utf-8').rstrip()
channel = header.channel.decode('utf-8').rstrip()
network = header.net_code.decode('utf-8').rstrip()
record_start_time_string = header.record_start_time
record_start_time_tuple = header_unpacker.unpack('HHBBBBH',
record_start_time_string)
record_start_time = UTCDateTime(year=record_start_time_tuple[0],
julday=record_start_time_tuple[1],
hour=record_start_time_tuple[2],
minute=record_start_time_tuple[3],
second=record_start_time_tuple[4],
microsecond=record_start_time_tuple[
6] * 100)
sample_count = header_unpacker.unpack('H', header.sample_count)[0]
sample_rate_factor = header_unpacker.unpack('h', header.sample_rate_factor)[0]
sample_rate_multiplier = header_unpacker.unpack(
'h', header.sample_rate_multiplier
)[0]
sample_rate = calculate_sample_rate(sample_rate_factor,
sample_rate_multiplier)
return RecordMetadata(station, location, channel, network,
record_start_time, sample_count, sample_rate)
import obspy
from decode_mseed import (
decode_ieee_float, decode_ieee_double, decode_steim, decode_int16,
decode_int24, decode_int32,
)
from help_do_stuff import (
FixedHeader, Blockette1000, get_data_endianness, Unpacker,
get_record_metadata, get_header_endianness, RecordMetadata,
EncodingFormat,
)
class RecordReader:
def __init__(self, file):
self.file = file
self.fixed_header: FixedHeader = None
self.blockette_1000: Blockette1000 = None
# Utility object that helps unpack byte string in fixed header.
# Separate from the one for data in case the header has a different
# byte order.
# TODO: change blockettes to use this unpacker as well.
self.fixed_header_unpacker: Unpacker = Unpacker()
self.data_unpacker: Unpacker = Unpacker()
self.record_metadata: RecordMetadata = None
def read_fixed_header(self):
byte_counts = [6, 1, 1, 5, 2, 3, 2, 10, 2, 2, 2, 1, 1, 1, 1, 4, 2, 2]
fixed_header_sections_values = []
for byte_count in byte_counts:
fixed_header_sections_values.append(self.file.read(byte_count))
self.fixed_header = FixedHeader(*fixed_header_sections_values)
def read_blockette_500(self):
"""
Read blockette 500. Only read more precise start time. Assumes that the
file pointer is at the start of the blockette.
# TODO: find out what VCO correction is.
"""
self.file.seek(18, 1)
start_time_microsecond = self.file.read(1)
start_time_microsecond = self.data_unpacker.unpack(
'b', start_time_microsecond
)[0]
# Convert from microsecond to second so that UTCDateTime can handle it.
start_time_microsecond /= (10 ** 6)
self.record_metadata.start_time += start_time_microsecond
self.file.seek(181, 1)
def read_blockette_1000(self):
blockette_1000_section_lengths = [2, 2, 1, 1, 1, 1]
blockette_1000_values = []
for section_length in blockette_1000_section_lengths:
blockette_1000_values.append(self.file.read(section_length))
self.blockette_1000 = Blockette1000(*blockette_1000_values)
def read_blockette_1001(self):
"""
Read blockette 1001. The only valuable thing in this blockette is the
more precise start time. Assumes that the file pointer is at the start
of the blockette.
"""
self.file.read(5)
start_time_microsecond = self.file.read(1)
start_time_microsecond = self.data_unpacker.unpack(
'b', start_time_microsecond
)[0]
# Convert from microsecond to second so that UTCDateTime can handle it.
start_time_microsecond /= (10**6)
self.record_metadata.start_time += start_time_microsecond
self.file.read(2)
def apply_time_correction(self):
# format() is used here instead of bin() because we need to pad the
# resulting bit string with 0 to the left.
activity_flags = format(
self.fixed_header_unpacker.unpack(
'B', self.fixed_header.activity_flags)[0],
'0>8b'
)
is_time_correction_applied = int(activity_flags[1])
if is_time_correction_applied:
return
time_correction = self.fixed_header_unpacker.unpack(
'L', self.fixed_header.time_correction
)[0]
# We need to convert the unit from 0.0001 seconds to seconds
time_correction *= 0.0001
self.record_metadata.start_time += time_correction
def read_blockettes(self):
blockette_count = self.fixed_header_unpacker.unpack(
'B', self.fixed_header.blockette_count
)[0]
for i in range(1, blockette_count):
# All blockettes store their type in the first two bytes, so we
# read that to determine what to do
next_blockette_type = self.file.read(2)
# Move file pointer back to start of blockette
self.file.seek(-2, 1)
next_blockette_type = self.data_unpacker.unpack(
'H', next_blockette_type
)[0]
if next_blockette_type not in (1000, 1001, 500):
print('We currently only handle blockettes 500, 1000, and'
'1001.')
continue
if next_blockette_type == 1001:
self.read_blockette_1001()
elif next_blockette_type == 500:
self.read_blockette_500()
def get_first_data_point(self):
record_start = self.file.tell()
self.read_fixed_header()
self.read_blockette_1000()
header_endianness = get_header_endianness(self.fixed_header)
if header_endianness == 'little':
self.fixed_header_unpacker.byte_order_char = '<'
else:
self.fixed_header_unpacker.byte_order_char = '>'
data_endianness = get_data_endianness(self.blockette_1000)
if data_endianness == 'little':
self.data_unpacker.byte_order_char = '<'
else:
self.data_unpacker.byte_order_char = '>'
self.record_metadata = get_record_metadata(self.fixed_header,
self.fixed_header_unpacker)
self.apply_time_correction()
self.read_blockettes()
data_start = self.fixed_header_unpacker.unpack('H', self.fixed_header.data_offset)[0]
# It is most convenient to seek to the start of the record, then seek
# to the start of the data. Any other way would involve some
# calculation to determine which point to seek to.
self.file.seek(record_start)
self.file.seek(data_start, 1)
encoding_format = self.blockette_1000.encoding_format
encoding_format = self.data_unpacker.unpack('b', encoding_format)[0]
encoding_format = EncodingFormat(encoding_format)
# Currently, we are extracting only the first data point in each
# record. The smallest possible amount of bytes we can extract while
# guaranteeing that we get the first data point in the record is 8,
# with Steim encodings and IEEE double precision float needing to use
# the whole buffer.
buffer = self.file.read(8)
encoding_to_decoder = {
EncodingFormat.INT_16_BIT: decode_int16,
EncodingFormat.INT_24_BIT: decode_int24,
EncodingFormat.INT_32_BIT: decode_int32,
EncodingFormat.IEEE_FLOAT_32_BIT: decode_ieee_float,
EncodingFormat.IEEE_FLOAT_64_BIT: decode_ieee_double,
EncodingFormat.STEIM_1: decode_steim,
EncodingFormat.STEIM_2: decode_steim,
}
first_data_point = encoding_to_decoder[encoding_format](buffer, self.data_unpacker)
return first_data_point
if __name__ == '__main__':
# numpy.set_printoptions(threshold=sys.maxsize)
file_path = '/Users/kle/PycharmProjects/sohstationviewer/tests/test_data/potato.sdr/XX.KC01..VCO.D.2020.145'
file = open(file_path, 'rb')
stream = obspy.read(file_path)
trace = []
while 1:
# We know that end of file is reached when read() returns an empty
# string.
is_eof = (file.read(1) == b'')
if is_eof:
break
# We need to move the file pointer back to its position before we do
# the end of file check. Otherwise, we would be off by one byte for all
# the reads afterward.
file.seek(-1, 1)
# We save the start of the current record so that after we are done
# reading the record, we can move back. This makes moving to the next
# record a lot easier, seeing as we can simply move the file pointer
# a distance the size of the current record.
current_record_start = file.tell()
reader = RecordReader(file)
trace.append(reader.get_first_data_point())
sample_count = reader.record_metadata.sample_count
sample_rate = reader.record_metadata.sample_rate
record_time_taken = sample_count / sample_rate
record_end_time = reader.record_metadata.start_time + record_time_taken
# MSEED stores the size of a data record as an exponent of a power of
# two, so we have to convert that to actual size before doing anything
# else.
record_length_exp = reader.data_unpacker.unpack(
'B', reader.blockette_1000.record_length
)[0]
record_size = 2 ** record_length_exp
file.seek(current_record_start)
file.seek(record_size, 1)
print(trace)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment