Skip to content
Snippets Groups Projects
Commit 8f752d00 authored by Kien Le's avatar Kien Le
Browse files

Revert "Revert "Read all data points in SOH channels""

This reverts commit e5ce8e4b.
parent 29a5e414
No related branches found
No related tags found
No related merge requests found
Pipeline #4311 passed with stage
in 4 minutes and 19 seconds
from numbers import Real from numbers import Real
from typing import BinaryIO, Optional, Dict, Union, List, Tuple from typing import BinaryIO, Optional, Dict, Union, List, Tuple
from pathlib import Path from pathlib import Path
import numpy
from obspy import UTCDateTime from obspy import UTCDateTime
from sohstationviewer.model.mseed_data.record_reader import RecordReader from sohstationviewer.model.mseed_data.record_reader import RecordReader
...@@ -30,6 +32,9 @@ def move_to_next_record(file, current_record_start: int, ...@@ -30,6 +32,9 @@ def move_to_next_record(file, current_record_start: int,
file.seek(record_size, 1) file.seek(record_size, 1)
LogData = Dict[str, Union[List[str], Dict[str, List[str]]]]
class MSeedReader: class MSeedReader:
def __init__(self, file_path: Path, def __init__(self, file_path: Path,
read_start: float = UTCDateTime(0).timestamp, read_start: float = UTCDateTime(0).timestamp,
...@@ -42,8 +47,7 @@ class MSeedReader: ...@@ -42,8 +47,7 @@ class MSeedReader:
soh_data: Dict = {}, soh_data: Dict = {},
mass_pos_data: Dict = {}, mass_pos_data: Dict = {},
waveform_data: Dict = {}, waveform_data: Dict = {},
log_data: Dict[str, Union[List[str], log_data: LogData = {},
Dict[str, List[str]]]] = {},
gap_minimum: Optional[float] = None gap_minimum: Optional[float] = None
) -> None: ) -> None:
""" """
...@@ -147,8 +151,8 @@ class MSeedReader: ...@@ -147,8 +151,8 @@ class MSeedReader:
def append_data(self, data_dict: dict, def append_data(self, data_dict: dict,
record: RecordReader, record: RecordReader,
data_points: Tuple[Real, Real], data_points: List[Real],
times: Tuple[Real, Real]) -> None: times: List[Real]) -> None:
""" """
Append some data points to the given data_dict Append some data points to the given data_dict
...@@ -167,8 +171,8 @@ class MSeedReader: ...@@ -167,8 +171,8 @@ class MSeedReader:
self.add_chan_data(station, meta, data_points, times) self.add_chan_data(station, meta, data_points, times)
def _add_new_trace(self, channel: Dict, metadata: RecordMetadata, def _add_new_trace(self, channel: Dict, metadata: RecordMetadata,
data_points: Tuple[Real, Real], data_points: List[Real],
times: Tuple[Real, Real]) -> None: times: List[Real]) -> None:
""" """
Start a new trace to channel['tracesInfo'] with data_point as Start a new trace to channel['tracesInfo'] with data_point as
the first data value and metadata's start_time as first time value the first data value and metadata's start_time as first time value
...@@ -208,14 +212,14 @@ class MSeedReader: ...@@ -208,14 +212,14 @@ class MSeedReader:
channel['tracesInfo'][-1]['data'] = channel['tracesInfo'][-1][ channel['tracesInfo'][-1]['data'] = channel['tracesInfo'][-1][
'data'][:-1] 'data'][:-1]
channel['tracesInfo'][-1]['times'] = channel['tracesInfo'][-1][ channel['tracesInfo'][-1]['times'] = channel['tracesInfo'][-1][
'times'][:-1] 'times'][:-1]
channel['tracesInfo'][-1]['data'].extend(data_points) channel['tracesInfo'][-1]['data'].extend(data_points)
channel['tracesInfo'][-1]['times'].extend(times) channel['tracesInfo'][-1]['times'].extend(times)
def add_chan_data(self, station: dict, metadata: RecordMetadata, def add_chan_data(self, station: dict, metadata: RecordMetadata,
data_points: Tuple[Real, Real], data_points: List[Real],
times: Tuple[Real, Real]) -> None: times: List[Real]) -> None:
""" """
Add new channel to the passed station and append data_point to the Add new channel to the passed station and append data_point to the
channel if there's no gap/overlap or start a new trace of data channel if there's no gap/overlap or start a new trace of data
...@@ -315,10 +319,28 @@ class MSeedReader: ...@@ -315,10 +319,28 @@ class MSeedReader:
continue continue
else: else:
break break
first_data_point = record.get_two_data_points() if data_dict is self.waveform_data:
times = (record.record_metadata.start_time, data_points = list(record.get_two_data_points())
record.record_metadata.end_time) times = [record.record_metadata.start_time,
self.append_data(data_dict, record, first_data_point, times) record.record_metadata.end_time]
else:
try:
data_points = record.get_data_points()
sample_interval = 1 / record.record_metadata.sample_rate
times = numpy.arange(
record.record_metadata.start_time,
record.record_metadata.end_time,
sample_interval).tolist()
# The previous calculation will always miss one time point
# at the end. We can adjust the stop argument to capture
# that point in one calculation, but the code for that is a
# lot harder to parse. It is a lot more straightforward to
# just add the final time point at the end.
times.append(times[-1] + sample_interval) # noqa
except ZeroDivisionError:
data_points = None
times = None
self.append_data(data_dict, record, data_points, times)
self.append_log(record) self.append_log(record)
move_to_next_record(self.file, current_record_start, record) move_to_next_record(self.file, current_record_start, record)
......
import sys
import warnings
from numbers import Real from numbers import Real
from typing import BinaryIO, Optional, List, Tuple from typing import BinaryIO, Optional, List, Tuple
import numpy
from obspy import UTCDateTime from obspy import UTCDateTime
from obspy.io.mseed.util import _unpack_steim_1, _unpack_steim_2
from sohstationviewer.model.mseed_data.decode_mseed import ( from sohstationviewer.model.mseed_data.decode_mseed import (
decode_ieee_float, decode_ieee_double, decode_steim, decode_int16, decode_ieee_float, decode_ieee_double, decode_steim, decode_int16,
...@@ -40,6 +44,37 @@ class RecordReader: ...@@ -40,6 +44,37 @@ class RecordReader:
self.ascii_text: str = '' self.ascii_text: str = ''
self.read_header() self.read_header()
def get_libmseed_swapflag(self):
"""
Get the appropriate byte-order swap flag to pass to obspy's wrappers
of libmseed's functions. A swap flag of 0 means big-endian, while a
swap flag of 1 means little-endian.
In C, the language libmseed was written in, the swap flag is pretty
straightforward and depends entirely on the endianness of the record
being read. However, the abstractions Python builds on top of C make it
so that we have to take into consider the running's system endianness
when determining the appropriate swap flag. In particular, a
little-endian system means we have to invert the endianness of the
current record to get the final swap flag.
:return: 0 or 1, depending on the data set and the running system.
"""
if sys.byteorder == 'little':
if self.data_unpacker.byte_order_char in ['!', '>']:
swapflag = 1
else:
swapflag = 0
elif sys.byteorder == 'big':
warnings.warn('SOHViewer has not been tested on big-endian '
'hardware. Please be advised that the plots shown '
'might not reflect the actual data.')
if self.data_unpacker.byte_order_char in ['!', '>', '@', '=']:
swapflag = 0
else:
swapflag = 1
return swapflag
def read_header(self) -> None: def read_header(self) -> None:
""" """
Read the header of the current data record. The header includes the Read the header of the current data record. The header includes the
...@@ -242,18 +277,21 @@ class RecordReader: ...@@ -242,18 +277,21 @@ class RecordReader:
except UnicodeDecodeError: except UnicodeDecodeError:
pass pass
def get_two_data_points(self) -> Optional[Tuple[Real, Real]]: def get_encoding_format(self):
""" """
Get two data points from the current data record. Get the encoding format of the record.
:return: two data point from the current data record; the chosen points :return: the encoding format of the record
depend on the encoding type stored in blockette 1000.
""" """
record_start = self.file.tell()
encoding_format = self.blockette_1000.encoding_format encoding_format = self.blockette_1000.encoding_format
encoding_format = self.header_unpacker.unpack('b', encoding_format)[0] encoding_format = self.header_unpacker.unpack('b', encoding_format)[0]
encoding_format = EncodingFormat(encoding_format) encoding_format = EncodingFormat(encoding_format)
return encoding_format
def get_raw_record_data(self):
"""
Get the data stored in the record as a string of raw bytes.
:return: the data bytes stored in the record
"""
data_start = self.header_unpacker.unpack( data_start = self.header_unpacker.unpack(
'H', self.fixed_header.data_offset 'H', self.fixed_header.data_offset
)[0] )[0]
...@@ -261,8 +299,20 @@ class RecordReader: ...@@ -261,8 +299,20 @@ class RecordReader:
'B', self.blockette_1000.record_length 'B', self.blockette_1000.record_length
)[0] )[0]
record_size = 2 ** record_size_exponent record_size = 2 ** record_size_exponent
record_data = self.file.read(record_size)[data_start:] record_data = self.file.read(record_size)[data_start:]
return record_data
def get_two_data_points(self) -> Optional[Tuple[Real, Real]]:
"""
Get two data points from the current data record.
:return: two data points from the current data record; the chosen
points depend on the encoding type stored in blockette 1000.
"""
record_start = self.file.tell()
encoding_format = self.get_encoding_format()
record_data = self.get_raw_record_data()
if encoding_format == EncodingFormat.ASCII: if encoding_format == EncodingFormat.ASCII:
self.decode_ascii_data(record_data) self.decode_ascii_data(record_data)
data_points = None data_points = None
...@@ -291,3 +341,53 @@ class RecordReader: ...@@ -291,3 +341,53 @@ class RecordReader:
# if needed. # if needed.
self.file.seek(record_start) self.file.seek(record_start)
return data_points return data_points
def get_data_points(self) -> numpy.ndarray:
"""
Get all the data points in the current record by decoding the raw data.
:return: a numpy array containing all the data points stored in the
current record
"""
try:
record_start = self.file.tell()
encoding_format = self.get_encoding_format()
record_data = self.get_raw_record_data()
record_sample_count = self.header_unpacker.unpack(
'H', self.fixed_header.sample_count
)[0]
if encoding_format == EncodingFormat.ASCII:
self.decode_ascii_data(record_data)
data_points = None
elif encoding_format in [EncodingFormat.STEIM_1,
EncodingFormat.STEIM_2]:
if encoding_format == EncodingFormat.STEIM_1:
unpack_steim = _unpack_steim_1
else:
unpack_steim = _unpack_steim_2
compressed_dtype = numpy.dtype(numpy.uint8)
data = numpy.frombuffer(record_data, compressed_dtype)
swapflag = self.get_libmseed_swapflag()
data_points = unpack_steim(data, record_sample_count, swapflag,
0)
else:
# We currently support only a subset of the data types
# supported by obspy.
encoding_to_dtype = {
EncodingFormat.INT_16_BIT: 'i2',
EncodingFormat.INT_32_BIT: 'i4',
EncodingFormat.IEEE_FLOAT_32_BIT: 'f4',
EncodingFormat.IEEE_FLOAT_64_BIT: 'f8',
}
dtype = numpy.dtype(self.data_unpacker.byte_order_char +
encoding_to_dtype[encoding_format])
data_points = numpy.frombuffer(record_data, dtype,
record_sample_count)
return data_points
finally:
# Ensure that we seek back to the start of the record. This needs
# to happen so that we can call this method again if needed.
self.file.seek(record_start)
import tempfile
from unittest import TestCase
import numpy
import obspy
from numpy.testing import assert_array_equal
from obspy import Trace, Stream
from sohstationviewer.model.mseed_data.record_reader import RecordReader
class TestRecordReader(TestCase):
temp_data_file = tempfile.NamedTemporaryFile(delete=False)
temp_ascii_file = tempfile.NamedTemporaryFile(delete=False)
@classmethod
def setUpClass(cls) -> None:
data_file = ('tests/test_data/Q330-sample/day_vols_AX08/'
'AX08.XA..VKI.2021.186')
ascii_file = ('tests/test_data/Q330-sample/day_vols_AX08/'
'AX08.XA..LOG.2021.186')
# Rewrite the data to make testing easier.
data_stream: Stream = obspy.read(data_file)
data_trace: Trace = data_stream[0]
# We only want enough data that the record has exactly one hour of
# data for easier testing. The number of points hardcoded below is
# calculated based on the sample rate of the data.
data_trace.data = numpy.arange(0, 361, dtype=data_trace.data.dtype)
data_trace.stats['starttime'] -= data_trace.stats['starttime']
data_trace.write(cls.temp_data_file.name, format='MSEED')
ascii_stream: Stream = obspy.read(ascii_file)
ascii_trace: Trace = ascii_stream[0]
ascii_trace.data = numpy.array([b'\n'] * len(ascii_trace))
ascii_trace.stats['starttime'] -= ascii_trace.stats['starttime']
ascii_trace.write(cls.temp_ascii_file.name, format='MSEED')
@classmethod
def tearDownClass(cls) -> None:
cls.temp_data_file.close()
cls.temp_ascii_file.close()
def test_get_two_data_points_data_record(self):
with open(self.temp_data_file.name, 'rb') as data_file:
expected_data_point = (0, 360)
record = RecordReader(data_file)
actual_data_points = record.get_two_data_points()
self.assertEqual(expected_data_point, actual_data_points)
self.assertEqual('', record.ascii_text)
def test_get_two_data_points_ascii_record(self):
with open(self.temp_ascii_file.name, 'rb') as data_file:
record = RecordReader(data_file)
actual_data_points = record.get_two_data_points()
self.assertIsNone(actual_data_points)
self.assertEqual('\n' * 3950, record.ascii_text)
def test_get_data_points_data_record(self):
with open(self.temp_data_file.name, 'rb') as data_file:
expected_data_point = numpy.arange(0, 361, dtype=numpy.int32)
record = RecordReader(data_file)
actual_data_points = record.get_data_points()
assert_array_equal(expected_data_point, actual_data_points)
self.assertEqual('', record.ascii_text)
def test_get_data_points_ascii_record(self):
with open(self.temp_ascii_file.name, 'rb') as data_file:
record = RecordReader(data_file)
actual_data_points = record.get_data_points()
self.assertIsNone(actual_data_points)
self.assertEqual('\n' * 3950, record.ascii_text)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment