Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
Commits on Source (20)
Showing
with 989 additions and 248 deletions
......@@ -72,4 +72,4 @@ channels in the data set.
+ "Save": Save any changes to database
+ "Save - Add to Main": Save any changes to database and add the selected
Preferred SOH's name to Main Window so that its SOH list will be included when
reading the data set.
\ No newline at end of file
reading the data set and the SOH channel will be plotted in this order.
\ No newline at end of file
......@@ -59,7 +59,7 @@ class DataLoaderWorker(QtCore.QObject):
def run(self):
try:
if self.data_type == 'RT130':
from sohstationviewer.model.reftek.reftek import RT130
from sohstationviewer.model.reftek_data.reftek import RT130
object_type = RT130
else:
from sohstationviewer.model.mseed_data.mseed import MSeed
......
......@@ -309,7 +309,6 @@ class DataTypeModel():
self.sort_all_data()
self.track_info("Combine data.", LogType.INFO)
self.combine_traces_in_all_data()
self.check_not_found_soh_channels()
for key in self.data_time:
if self.data_time[key] == [constants.HIGHEST_INT, 0]:
# this happens when there is text or ascii only in the data
......@@ -456,19 +455,6 @@ class DataTypeModel():
execute_db(f'UPDATE PersistentData SET FieldValue="{self.tmp_dir}" '
f'WHERE FieldName="tempDataDirectory"')
def check_not_found_soh_channels(self):
all_chans_meet_req = (
list(self.soh_data[self.selected_key].keys()) +
list(self.mass_pos_data[self.selected_key].keys()) +
list(self.log_data[self.selected_key].keys()))
not_found_chans = [c for c in self.req_soh_chans
if c not in all_chans_meet_req]
if not_found_chans != []:
msg = (f"No data found for the following channels: "
f"{', '.join( not_found_chans)}")
self.processing_log.append((msg, LogType.WARNING))
def combine_times_data_of_traces_w_spr_less_or_equal_1(
self, data: Dict[str, Dict], selected_key: Union[(str, str), str],
data_name: str):
......
from typing import List, Dict, Optional, Union, Tuple
import numpy as np
import os
from pathlib import Path
from sohstationviewer.database.extract_data import get_convert_factor
......@@ -207,3 +210,36 @@ def reset_data(selected_key: Union[str, Tuple[str, str]], data_dict: Dict):
del selected_data_dict[chan_id][k]
except KeyError:
pass
def read_text(path2file: Path) -> Union[bool, str]:
"""
CHANGED FROM handling_data.read_text:
+ Don't need to check binary because UnicodeDecodeError caught means
the file is binary
Read text file and add to log_data under channel TEXT.
+ Raise exception if the file isn't a text file
+ Remove empty lines in content
:param path2file: str - absolute path to text file
:param file_name: str - name of text file
:param text_logs: holder to keep log string, refer to
DataTypeModel.__init__.log_data['TEXT']
"""
try:
with open(path2file, 'r') as file:
content = file.read().strip()
except UnicodeDecodeError:
return
if content != '':
# skip empty lines
no_empty_line_list = [
line for line in content.splitlines() if line]
no_empty_line_content = os.linesep.join(no_empty_line_list)
log_text = "\n\n** STATE OF HEALTH: %s\n" % path2file.name
log_text += no_empty_line_content
else:
log_text = ''
return log_text
import struct
class Unpacker:
"""
A wrapper around struct.unpack() to unpack binary data without having to
explicitly define the byte order in the format string. Also restrict the
type of format to str and buffer to bytes.
"""
def __init__(self, byte_order_char: str = '') -> None:
self.byte_order_char = byte_order_char
def unpack(self, format: str, buffer: bytes):
"""
Unpack a string of bytes into a tuple of values based on the given
format
:param format: the format used to unpack the byte string
:param buffer: the byte string
:return: a tuple containing the unpacked values.
"""
default_byte_order_chars = ('@', '=', '>', '<', '!')
if format.startswith(default_byte_order_chars):
format = self.byte_order_char + format[:1]
else:
format = self.byte_order_char + format
return struct.unpack(format, buffer)
......@@ -3,7 +3,7 @@ from typing import Tuple, List, Dict
from obspy.core import Stream
from obspy import UTCDateTime
from sohstationviewer.model.reftek.from_rt2ms import core
from sohstationviewer.model.reftek.reftek_data import core
from sohstationviewer.model.handling_data import read_mseed_trace
......
......@@ -8,15 +8,17 @@ from pathlib import Path
from typing import Dict, Tuple, List
from sohstationviewer.controller.util import validate_file, validate_dir
from sohstationviewer.model.mseed_data.mseed_reader import MSeedReader
from sohstationviewer.view.util.enums import LogType
from sohstationviewer.model.general_data.general_data import \
GeneralData, ThreadStopped, ProcessingDataError
from sohstationviewer.view.util.enums import LogType
from sohstationviewer.model.general_data.general_data_helper import read_text
from sohstationviewer.model.mseed_data.mseed_helper import \
retrieve_nets_from_data_dict, read_text
retrieve_nets_from_data_dict
from sohstationviewer.model.mseed_data.record_reader_helper import \
MSeedReadError
from sohstationviewer.model.mseed_data.mseed_reader import MSeedReader
class MSeed(GeneralData):
......
# Functions that change from handling_data's functions
import os
from pathlib import Path
from typing import Union, List, Dict
from typing import List, Dict
def retrieve_nets_from_data_dict(data_dict: Dict,
......@@ -18,36 +15,3 @@ def retrieve_nets_from_data_dict(data_dict: Dict,
for c in data_dict[sta_id]:
nets_by_sta[sta_id].update(
data_dict[sta_id][c]['nets'])
def read_text(path2file: Path) -> Union[bool, str]:
"""
CHANGED FROM handling_data.read_text:
+ Don't need to check binary because UnicodeDecodeError caught means
the file is binary
Read text file and add to log_data under channel TEXT.
+ Raise exception if the file isn't a text file
+ Remove empty lines in content
:param path2file: str - absolute path to text file
:param file_name: str - name of text file
:param text_logs: holder to keep log string, refer to
DataTypeModel.__init__.log_data['TEXT']
"""
try:
with open(path2file, 'r') as file:
content = file.read().strip()
except UnicodeDecodeError:
return
if content != '':
# skip empty lines
no_empty_line_list = [
line for line in content.splitlines() if line]
no_empty_line_content = os.linesep.join(no_empty_line_list)
log_text = "\n\n** STATE OF HEALTH: %s\n" % path2file.name
log_text += no_empty_line_content
else:
log_text = ''
return log_text
......@@ -4,37 +4,14 @@ from enum import Enum
from obspy import UTCDateTime
from sohstationviewer.model.general_data.general_record_helper import Unpacker
class MSeedReadError(Exception):
def __init__(self, msg):
self.message = msg
class Unpacker:
"""
A wrapper around struct.unpack() to unpack binary data without having to
explicitly define the byte order in the format string. Also restrict the
type of format to str and buffer to bytes.
"""
def __init__(self, byte_order_char: str = '') -> None:
self.byte_order_char = byte_order_char
def unpack(self, format: str, buffer: bytes):
"""
Unpack a string of bytes into a tuple of values based on the given
format
:param format: the format used to unpack the byte string
:param buffer: the byte string
:return: a tuple containing the unpacked values.
"""
default_byte_order_chars = ('@', '=', '>', '<', '!')
if format.startswith(default_byte_order_chars):
format = self.byte_order_char + format[:1]
else:
format = self.byte_order_char + format
return struct.unpack(format, buffer)
@dataclass
class FixedHeader:
"""
......
......@@ -7,7 +7,7 @@ from sohstationviewer.controller.util import (
from sohstationviewer.view.util.enums import LogType
if TYPE_CHECKING:
from sohstationviewer.model.reftek.reftek import RT130
from sohstationviewer.model.reftek_data.reftek import RT130
class LogInfo():
......
......@@ -3,24 +3,27 @@ RT130 object to hold and process RefTek data
"""
import os
from pathlib import Path
from typing import Tuple, List, Union
from typing import Union, List, Tuple, Dict
import traceback
import numpy as np
from obspy.core import Stream
from sohstationviewer.model.reftek.from_rt2ms import (
core, soh_packet, packet)
from sohstationviewer.model.reftek.log_info import LogInfo
from sohstationviewer.model.data_type_model import (
DataTypeModel, ThreadStopped, ProcessingDataError)
from sohstationviewer.model.handling_data import read_text
from sohstationviewer.model.handling_data_reftek import (
check_reftek_header, read_reftek_stream)
from sohstationviewer.conf import constants
from sohstationviewer.controller.util import validate_file
from sohstationviewer.view.util.enums import LogType
from sohstationviewer.model.general_data.general_data import \
GeneralData, ThreadStopped, ProcessingDataError
from sohstationviewer.model.general_data.general_data_helper import read_text
class RT130(DataTypeModel):
from sohstationviewer.model.reftek_data.reftek_helper import (
check_reftek_header, read_reftek_stream,
retrieve_gaps_from_stream_header)
from sohstationviewer.model.reftek_data.reftek_reader import core, soh_packet
from sohstationviewer.model.reftek_data.log_info import LogInfo
class RT130(GeneralData):
"""
read and process reftek file into object with properties can be used to
plot SOH data, mass position data, waveform data and gaps
......@@ -40,12 +43,39 @@ class RT130(DataTypeModel):
"""
self.rt130_waveform_data_req: bool = kwarg['rt130_waveform_data_req']
"""
stream_header_by_key_chan: stream header by key, chan_id to get key
list, gaps by sta_id, nets by sta_id, channels by sta_id
"""
self.stream_header_by_key_chan: Dict[str, Dict[str, Stream]] = {}
"""
gaps_by_key_chan: gap list for each key/chan_id to separate data at
gaps, overlaps
"""
self.gaps_by_key_chan: Dict[Union[str, Tuple[str, str]],
Dict[str, List[List[int]]]] = {}
"""
found_data_streams: list of data streams found to help inform user
why the selected data streams don't show up
"""
self.found_data_streams: List[int] = []
self.processing_data()
def processing_data(self):
if self.creator_thread.isInterruptionRequested():
raise ThreadStopped()
self.read_folder(self.dir)
if self.creator_thread.isInterruptionRequested():
raise ThreadStopped()
self.selected_key = self.select_key()
if self.selected_key is None:
raise ThreadStopped()
if self.creator_thread.isInterruptionRequested():
raise ThreadStopped()
self.finalize_data()
def finalize_data(self):
"""
This function should be called after all folders finish reading to
......@@ -53,6 +83,7 @@ class RT130(DataTypeModel):
+ check not found data stream to give user a warning if needed
+ other tasks in super().finalize_data()
"""
self.track_info("Finalizing...", LogType.INFO)
self.track_info(
"Prepare SOH data from log data", LogType.INFO)
self.prepare_soh_data_from_log_data()
......@@ -65,7 +96,18 @@ class RT130(DataTypeModel):
f"{', '.join(map(str, not_found_data_streams))}")
self.processing_log.append((msg, LogType.WARNING))
super().finalize_data()
self.sort_all_data()
self.combine_all_data()
self.apply_convert_factor_to_data_dicts()
retrieve_gaps_from_stream_header(
self.stream_header_by_key_chan, self.gaps_by_key_chan,
self.gaps, self.gap_minimum, self.read_start, self.read_end)
for key in self.data_time:
if self.data_time[key] == [constants.HIGHEST_INT, 0]:
# this happens when there is text or ascii only in the data
self.data_time[key] = [self.read_start, self.read_end]
def read_folder(self, folder: str) -> None:
"""
......@@ -87,6 +129,8 @@ class RT130(DataTypeModel):
total = sum([len(files) for _, _, files in os.walk(self.dir)])
for folder in folders:
if not os.path.isdir(folder):
raise ProcessingDataError(f"Path '{folder}' not exist")
for path, subdirs, files in os.walk(folder):
for file_name in files:
if self.creator_thread.isInterruptionRequested():
......@@ -120,7 +164,11 @@ class RT130(DataTypeModel):
+ If there is more than one, show all keys, let user choose one to
return.
"""
keys = sorted(list(self.stream_header_by_key_chan.keys()))
self.keys = sorted(list(set(
list(self.soh_data.keys()) +
list(self.mass_pos_data.keys()) +
list(self.waveform_data.keys()))))
keys = self.keys
if len(keys) == 0:
msg = 'No required data found for the data set.'
raise ProcessingDataError(msg)
......@@ -176,7 +224,7 @@ class RT130(DataTypeModel):
cur_key = (d['unit_id'].decode(),
f"{d['experiment_number']}")
self.populate_cur_key_for_all_data(cur_key)
logs = soh_packet.Packet.from_data(d).__str__()
logs = soh_packet.SOHPacket.from_data(d).__str__()
if 'SOH' not in self.log_data[cur_key]:
self.log_data[cur_key]['SOH'] = []
self.log_data[cur_key]['SOH'].append((d['time'], logs))
......@@ -207,9 +255,8 @@ class RT130(DataTypeModel):
cur_key = (rt130._data[0]['unit_id'].decode(),
f"{rt130._data[0]['experiment_number']}")
self.populate_cur_key_for_all_data(cur_key)
if data_stream != 9:
# don't get event info for mass position
self.get_ehet_in_log_data(rt130, cur_key)
self.get_ehet_in_log_data(rt130, cur_key)
self.get_mass_pos_data_and_waveform_data(rt130, data_stream, cur_key)
def get_ehet_in_log_data(self, rt130: core.Reftek130,
......@@ -230,7 +277,7 @@ class RT130(DataTypeModel):
for index in ind_ehet:
d = rt130._data[index]
logs = packet.EHPacket(d).eh_et_info(nbr_dt_samples)
logs = core.EHPacket(d).eh_et_info(nbr_dt_samples)
if 'EHET' not in self.log_data[cur_key]:
self.log_data[cur_key]['EHET'] = []
self.log_data[cur_key]['EHET'].append((d['time'], logs))
......@@ -312,3 +359,18 @@ class RT130(DataTypeModel):
'endTmEpoch': self.data_time[k][1]
}
self.soh_data[k][c_name]['tracesInfo'] = [tr]
def populate_cur_key_for_all_data(self, cur_key: Tuple[str, str]) -> None:
"""
Set up new data set's key for all data
:param cur_key: current processing key: DAS SN, experiment number
"""
if cur_key not in self.log_data:
self.log_data[cur_key] = {}
self.soh_data[cur_key] = {}
self.mass_pos_data[cur_key] = {}
self.waveform_data[cur_key] = {}
self.gaps[cur_key] = []
self.data_time[cur_key] = [constants.HIGHEST_INT, 0]
self.stream_header_by_key_chan[cur_key] = {}
import numpy as np
from typing import Tuple, List, Dict, Optional, Union
from obspy.core import Trace
from obspy.core import Stream
from obspy import UTCDateTime
from sohstationviewer.model.reftek_data.reftek_reader.core import (
DiscontinuousTrace, Reftek130)
from sohstationviewer.model.general_data.general_data_helper import squash_gaps
def check_reftek_header(
rt130: Reftek130, cur_key: Tuple[str, str],
starttime: UTCDateTime, endtime: UTCDateTime,
stream_header_by_key_chan: Dict[str, Dict[str, Stream]],
cur_data_dict: Dict, cur_data_time: List[float],
include_mp123zne: bool, include_mp456uvw: bool):
"""
FROM handling_data_reftek.check_reftek_header()
Read mseed headers of a file from the given rt130 object
to check for time, create stream_header for retrieving gaps later.
Requested data stream has been checked before passing to this function.
:param rt130: RT130 object to get data stream from
:param cur_key: Tuple of DAS serial number, experiment number of the
current file.
:param starttime: start of read data to skip reading actual data if not
in range
:param endtime: end of read data to skip reading actual data if not
in range
:param stream_header_by_key_chan: dict of stream header by key, chan to get
gaps later
:param cur_data_dict: waveform_data/mass_pos_data of the current key
:param cur_data_time: data_time of the current key
:param include_mp123zne: if mass position channels 1,2,3 are requested
:param include_mp456uvw: if mass position channels 4,5,6 are requested
"""
stream = Reftek130.to_stream(
rt130,
include_mp123=include_mp123zne,
include_mp456=include_mp456uvw,
headonly=True,
verbose=False,
sort_permuted_package_sequence=True)
avail_trace_indexes = []
for index, trace in enumerate(stream):
chan_id = trace.stats['channel'].strip()
samplerate = trace.stats['sampling_rate']
if chan_id not in cur_data_dict:
cur_data_dict[chan_id] = {'tracesInfo': [],
'samplerate': samplerate}
if trace.stats.npts == 0:
# this trace isn't available to prevent bug when creating memmap
# with no data
continue
if (starttime <= trace.stats['starttime'] <= endtime or
starttime <= trace.stats['endtime'] <= endtime):
avail_trace_indexes.append(index)
if chan_id not in stream_header_by_key_chan[cur_key]:
stream_header_by_key_chan[cur_key][chan_id] = Stream()
stream_header_by_key_chan[cur_key][chan_id].append(trace)
cur_data_time[0] = min(
trace.stats['starttime'].timestamp, cur_data_time[0])
cur_data_time[1] = max(
trace.stats['endtime'].timestamp, cur_data_time[1])
return avail_trace_indexes
def read_reftek_stream(
rt130: Reftek130, tmp_dir: str, cur_key: Tuple[str, str],
avail_trace_indexes: List[int], cur_data_dict: Dict,
include_mp123zne: bool, include_mp456uvw: bool):
"""
FROM handling_data_reftek.read_reftek_stream
Read traces of a file from the given rt130 object for the index in
avail_trace_indexes.
:param rt130: RT130 object to get data stream from
:param tmp_dir: dir to keep memmap files
:param cur_key: Tuple of DAS serial number, experiment number of the
current file.
:param avail_trace_indexes: index of traces to get
:param cur_data_dict: waveform_data/mass_pos_data of the current key
:param include_mp123zne: if mass position channels 1,2,3 are requested
:param include_mp456uvw: if mass position channels 4,5,6 are requested
"""
# TODO: rewrite reftek to read stream with start and end time
stream = Reftek130.to_stream(
rt130,
include_mp123=include_mp123zne,
include_mp456=include_mp456uvw,
headonly=False,
verbose=False,
sort_permuted_package_sequence=True)
for index in avail_trace_indexes:
trace = stream[index]
chan_id = trace.stats['channel'].strip()
traces_info = cur_data_dict[chan_id]['tracesInfo']
tr = read_mseed_trace(trace)
traces_info.append(tr)
def read_mseed_trace(
trace: Union[Trace, DiscontinuousTrace]) -> Dict:
"""
FROM handling_data.read_mseed_trace_spr_less_than_or_equal_1()
For mseed trace of which sample rate <=1, read and keep all info of the
trace including times and data in memory.
Traces that have sample_rate <=1 can be soh, mass position, waveform
:param trace: mseed trace
:return tr: dict of trace's info in which data and times are kept
"""
tr = {}
tr['chanID'] = trace.stats.channel
tr['startTmEpoch'] = trace.stats.starttime.timestamp
tr['endTmEpoch'] = trace.stats.endtime.timestamp
tr['samplerate'] = trace.stats.sampling_rate
if hasattr(trace.stats, 'actual_npts'):
tr['size'] = trace.stats.actual_npts
else:
tr['size'] = trace.stats.npts
"""
trace time start with 0 => need to add with epoch starttime
times and data have type ndarray
"""
tr['times'] = trace.times() + trace.stats['starttime'].timestamp
if trace.stats.channel.startswith('MassPos'):
tr['data'] = _convert_reftek_masspos_data(trace.data)
else:
tr['data'] = trace.data
return tr
def _convert_reftek_masspos_data(data: np.ndarray) -> Dict:
"""
FROM handling_data.convert_reftek_masspos_data()
Read mass possition trace's info using read_mseed_trace_spr_lt1(), then
calculate real value for mass possition
:param data: mseed data
:return data that has been converted from 16-bit signed integer in which
32767= 2 ** 16/2 - 1 is the highest value of 16-bit two's complement
number. The value is also multiplied by 10 for readable display.
(According to 130_theory.pdf: Each channel connects to a 12-bit A/D
converter with an input range of +/- 10V. These channel are read
once per second as left-justified, 2's-compliment, 16 bit values.)
"""
return np.round_(data / 32767.0 * 10.0, 1)
def retrieve_gaps_from_stream_header(
streams: Dict[str, Dict[str, Stream]],
gaps_by_key_chan: Dict[Union[str, Tuple[str, str]],
Dict[str, List[List[int]]]],
gaps: Dict[str, List[List[float]]],
gap_minimum: Optional[float],
read_start: Optional[float],
read_end: Optional[float]) -> Dict[str, List[List[float]]]:
"""
CHANGED FROM handling_data.retrieve_gaps_from_stream_header()
Retrieve gaps by sta_id from stream_header_by_key_chan
:param streams: dict of stream header by sta, chan
:param gaps_by_key_chan: gaps list by key and channel id
:param gaps: gaps list by key
:param gap_minimum: minimum length of gaps to be detected
:param read_start: start time of data to be read
:param read_end: end time of data to be read
"""
for sta_id in streams:
sta_gaps = []
gaps_by_key_chan[sta_id] = {}
if gap_minimum is None:
continue
for chan_id in streams[sta_id]:
stream = streams[sta_id][chan_id]
gaps_in_stream = stream.get_gaps()
gaps_by_key_chan[sta_id][chan_id] = stream_gaps = [
[g[4].timestamp, g[5].timestamp] for g in gaps_in_stream
if _check_gap(g[4], g[5], read_start, read_end, gap_minimum)]
sta_gaps += stream_gaps
gaps[sta_id] = squash_gaps(sta_gaps)
def _check_gap(t1: float, t2: float, start: float, end: float,
gap_minimum: float) -> bool:
"""
Check if a part of the given gap in the given time range and the gap is
greater than gap_minimum
:param t1: start of given gap
:param t2: end of given gap
:param start: start of time range
:param end: end of given time range
:param gap_minimum: minimum length of gaps to be detected
:return: True if check is satisfied, False otherwise
"""
t1 = t1.timestamp
t2 = t2.timestamp
return (abs(t2 - t1) > gap_minimum and
(start <= min(t1, t2) <= end or start <= max(t1, t2) <= end))
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import annotations
"""
Suggested updates to obspy.io.reftek.core:
......@@ -10,8 +9,10 @@ Suggested updates to obspy.io.reftek.core:
Maeva Pourpoint IRIS/PASSCAL
"""
import copy
from pathlib import Path
from typing import Optional, Union
import obspy.io.reftek.core as obspy_rt130_core
import warnings
......@@ -19,8 +20,48 @@ import numpy as np
from obspy import Trace, Stream, UTCDateTime
from obspy.core.util.obspy_types import ObsPyException
from obspy.io.reftek.packet import _unpack_C0_C2_data
from sohstationviewer.model.reftek.from_rt2ms.packet import EHPacket
from obspy.io.reftek.packet import PACKET_FINAL_DTYPE
from sohstationviewer.model.general_data.general_record_helper import Unpacker
from sohstationviewer.model.reftek_data.reftek_reader.packet import EHPacket
from sohstationviewer.model.reftek_data.reftek_reader.reftek_reader_helper \
import (read_rt130_file, convert_packet_to_obspy_format)
class DiscontinuousTrace(Trace):
"""
Extension of obspy.Trace that changes the way time data is handled when
reading data using the method from logpeek/qpeek.
"""
def __init__(self, *args, times: np.ndarray, **kwargs):
super().__init__(*args, **kwargs)
self._times = times
def times(self, type: str = "relative",
reftime: Optional[UTCDateTime] = None) -> np.ndarray:
"""
Override Trace.times(). Returns a numpy array of stored times data,
modified based on the argument "type".
:param type: the type of times data to return. For more information,
refer to Trace.times(). Note: this method does not implement
types 'utcdatetime' and 'matplotlib' because they are not going
to be useful.
:param reftime: the time used as a reference point when getting
relative time. If None, the start time of the trace is used as
the reference point.
:return: the requested array of time data, modified based on the type
requested.
"""
if type == 'utcdatetime' or type == 'matplotlib':
raise NotImplementedError
elif type == 'relative':
if reftime is None:
return self._times - self.stats.starttime.timestamp
else:
return self._times - reftime.timestamp
elif type == 'timestamp':
return self._times
class Reftek130Exception(ObsPyException):
......@@ -28,18 +69,41 @@ class Reftek130Exception(ObsPyException):
class Reftek130(obspy_rt130_core.Reftek130):
"""
Child class of obspy.Reftek that reads waveform data similar to logpeek for
better performance.
"""
@staticmethod
def from_file(file: Union[str, Path]) -> Reftek130:
"""
Read data from an RT130 file and save it in a Reftek130 object.
:param file: the RT130 file to read
:return: a Reftek130 object that stores the data in file
"""
# RT130 data is all big-endian
rt130_unpacker = Unpacker('>')
rt = Reftek130()
rt._filename = file
packets_in_file = read_rt130_file(file, rt130_unpacker)
converted_packets = []
for packet in packets_in_file:
converted_packets.append(
convert_packet_to_obspy_format(packet, rt130_unpacker))
rt._data = np.array(converted_packets, dtype=PACKET_FINAL_DTYPE)
return rt
def to_stream(self, network="", location="", component_codes=None,
include_mp123=False, include_mp456=False,
headonly=False, verbose=False,
sort_permuted_package_sequence=False):
def to_stream(self, network: str = "", location: str = "",
include_mp123: bool = False, include_mp456: bool = False,
headonly: bool = False, verbose: bool = False,
sort_permuted_package_sequence: bool = False) -> Stream:
"""
Create an obspy.Stream object that holds the data stored in this
Reftek130 object.
:type headonly: bool
:param headonly: Determines whether or not to unpack the data or just
read the headers.
"""
if verbose:
print(self)
if not len(self._data):
msg = "No packet data in Reftek130 object (file: {})"
raise Reftek130Exception(msg.format(self._filename))
......@@ -81,20 +145,6 @@ class Reftek130(obspy_rt130_core.Reftek130):
eh = EHPacket(eh_packets[0])
else:
eh = EHPacket(et_packets[0])
# only C0, C2, 16, 32 encodings supported right now
if eh.data_format == b"C0":
encoding = 'C0'
elif eh.data_format == b"C2":
encoding = 'C2'
elif eh.data_format == b"16":
encoding = '16'
elif eh.data_format == b"32":
encoding = '32'
else:
msg = ("Reftek data encoding '{}' not implemented yet. Please "
"open an issue on GitHub and provide a small (< 50kb) "
"test file.").format(eh.data_format)
raise NotImplementedError(msg)
header = {
"unit_id": self._data['unit_id'][0],
"experiment_number": self._data['experiment_number'][0],
......@@ -140,74 +190,34 @@ class Reftek130(obspy_rt130_core.Reftek130):
sample_data = np.array([], dtype=np.int32)
npts = packets_["number_of_samples"].sum()
else:
if encoding in ('C0', 'C2'):
sample_data = _unpack_C0_C2_data(packets_,
encoding)
elif encoding in ('16', '32'):
# rt130 stores in big endian
dtype = {'16': '>i2', '32': '>i4'}[encoding]
# just fix endianness and use correct dtype
sample_data = np.require(
packets_['payload'],
requirements=['C_CONTIGUOUS'])
# either int16 or int32
sample_data = sample_data.view(dtype)
# account for number of samples, i.e. some packets
# might not use the full payload size but have
# empty parts at the end that need to be cut away
number_of_samples_max = sample_data.shape[1]
sample_data = sample_data.flatten()
# go through packets starting at the back,
# otherwise indices of later packets would change
# while looping
for ind, num_samps in reversed([
(ind, num_samps) for ind, num_samps in
enumerate(packets_["number_of_samples"])
if num_samps != number_of_samples_max]):
# looping backwards we can easily find the
# start of each packet, since the earlier
# packets are still untouched and at maximum
# sample length in our big array with all
# packets
start_of_packet = ind * number_of_samples_max
start_empty_part = start_of_packet + num_samps
end_empty_part = (start_of_packet +
number_of_samples_max)
sample_data = np.delete(
sample_data,
slice(start_empty_part, end_empty_part))
npts = len(sample_data)
tr = Trace(data=sample_data, header=copy.deepcopy(header))
# The payload stores the first data point of each
# packet, encoded as a numpy array of 4 1-byte numbers.
# Due to the way the payload is encoded during the
# reading process and a quirk of 2-complement binary
# numbers (namely, appending a negative number with 1s
# does not change its value), we do not have to care
# about the actual encoding type of the stored packets.
sample_data = np.ascontiguousarray(
packets_['payload'][:, :4])
sample_data = sample_data.view(np.dtype('>i4'))
sample_data = sample_data.squeeze(axis=-1)
npts = sample_data.size
tr = DiscontinuousTrace(
data=sample_data, header=copy.deepcopy(header),
times=(packets_['time'] / 10**9).round(3)
)
# The plotting process needs to know about the number of
# points stored in the trace. However, tr.stats use the
# stored npts to calculate some other metadata, so we can't
# store that information there. As a compromise, we keep
# tr.stats.npts the same, while storing the actual number
# of data points in the trace in another part of tr.stats.
tr.stats.npts = packets_['number_of_samples'].sum()
tr.stats.actual_npts = npts
# channel number is not included in the EH/ET packet
# payload, so add it to stats as well..
tr.stats.reftek130['channel_number'] = channel_number
if headonly:
tr.stats.npts = npts
tr.stats.starttime = UTCDateTime(ns=starttime)
"""
if component codes were explicitly provided, use them
together with the stream label
if component_codes is not None:
tr.stats.channel = (eh.stream_name.strip() +
component_codes[channel_number])
# otherwise check if channel code is set for the given
# channel (seems to be not the case usually)
elif eh.channel_code[channel_number] is not None:
tr.stats.channel = eh.channel_code[channel_number]
# otherwise fall back to using the stream label together
# with the number of the channel in the file (starting with
# 0, as Z-1-2 is common use for data streams not oriented
# against North)
else:
msg = ("No channel code specified in the data file "
"and no component codes specified. Using "
"stream label and number of channel in file as "
"channel codes.")
warnings.warn(msg)
tr.stats.channel = (
eh.stream_name.strip() + str(channel_number))
"""
DS = self._data['data_stream_number'][0] + 1
if DS != 9:
tr.stats.channel = "DS%s-%s" % (DS, channel_number + 1)
......@@ -218,22 +228,5 @@ class Reftek130(obspy_rt130_core.Reftek130):
continue
tr.stats.channel = "MassPos%s" % (channel_number + 1)
# check if endtime of trace is consistent
t_last = packets_[-1]['time']
npts_last = packets_[-1]['number_of_samples']
try:
if not headonly:
assert npts == len(sample_data)
if npts_last:
assert tr.stats.endtime == UTCDateTime(
ns=t_last) + (npts_last - 1) * delta
if npts:
assert tr.stats.endtime == (
tr.stats.starttime + (npts - 1) * delta)
except AssertionError:
msg = ("Reftek file has a trace with an inconsistent "
"endtime or number of samples. Please open an "
"issue on GitHub and provide your file for"
"testing.")
raise Reftek130Exception(msg)
st += tr
return st
import dataclasses
from obspy import UTCDateTime
class NotRT130FileError(Exception):
"""
Error to raise when there is a problem with parsing RT130 data.
"""
pass
@dataclasses.dataclass
class PacketHeader:
"""
The decoded header of an RT130 packet.
"""
packet_type: str
experiment_number: int
unit_id: str
time: UTCDateTime
byte_count: int
packet_sequence: int
def parse_rt130_time(year: int, time_bytes: bytes) -> UTCDateTime:
"""
Convert BCD-encoded RT130 time into UTCDateTime.
:param year: the year of the time. RT130's header store the year separate
from the time, so we have to pass it as an argument.
:param time_bytes: the BCD-encoded time.
:return: an UTCDateTime object that stores the decoded time.
"""
time_string = time_bytes.hex()
# The time string has the format of DDDHHMMSSTTT, where
# D = day of year
# H = hour
# M = minute
# S = second
# T = millisecond
day_of_year, hour, minute, second, millisecond = (
int(time_string[0:3]),
int(time_string[3:5]),
int(time_string[5:7]),
int(time_string[7:9]),
int(time_string[9:12])
)
# RT130 only stores the last two digits of the year. Because the
# documentation for RT130 does not define a way to retrieve the full year,
# we use Obspy's method. Accordingly, we convert 0-49 to 2000-2049 and
# 50-99 to 1950-1999.
if 0 <= year <= 49:
year += 2000
elif 50 <= year <= 99:
year += 1900
converted_time = UTCDateTime(year=year, julday=day_of_year, hour=hour,
minute=minute, second=second,
microsecond=millisecond * 1000)
return converted_time
def get_rt130_packet_header(rt130_packet: bytes) -> PacketHeader:
"""
Get the packet header stored in the first 16 bits of an RT130 packet.
:param rt130_packet: the RT130 packet to process
:return: a PacketHeader object containing the header of rt130_packet
"""
try:
# Because RT130 data is always big-endian, it is more convenient to
# use str.decode() than the unpacker.
packet_type = rt130_packet[:2].decode('ASCII')
except UnicodeError:
print('Cannot decode packet type.')
print('The given file does not appear to be a valid RT130 file.')
raise NotRT130FileError
valid_packet_types = ['AD', 'CD', 'DS', 'DT', 'EH', 'ET', 'OM', 'SH', 'SC',
'FD']
if packet_type not in valid_packet_types:
print(f'Invalid packet type found: {packet_type}')
print('The given file does not appear to be a valid RT130 file.')
raise NotRT130FileError
experiment_number = int(rt130_packet[2:3].hex())
year = int(rt130_packet[3:4].hex())
# A call to str.upper() is needed because bytes.hex() makes any
# hexadecimal letter (i.e. ABCDEF) lowercase, while we want them to be
# uppercase for display purpose.
unit_id = rt130_packet[4:6].hex().upper()
time_bytes = rt130_packet[6:12]
packet_time = parse_rt130_time(year, time_bytes)
byte_count = int(rt130_packet[12:14].hex())
packet_sequence = int(rt130_packet[14:16].hex())
return PacketHeader(packet_type, experiment_number, unit_id, packet_time,
byte_count, packet_sequence)
......@@ -9,24 +9,47 @@ Suggested updates to obspy.io.reftek.packet:
Maeva Pourpoint IRIS/PASSCAL
"""
from typing import List
import numpy
import obspy.io.reftek.packet as obspy_rt130_packet
from obspy import UTCDateTime
from obspy.io.reftek.util import (_decode_ascii,
_parse_long_time,
_16_tuple_ascii,
_16_tuple_int,
_16_tuple_float)
from sohstationviewer.model.reftek.from_rt2ms.soh_packet import Packet
from obspy.io.reftek.util import (
_decode_ascii, _parse_long_time, _16_tuple_ascii, _16_tuple_float,
_16_tuple_int,
)
from sohstationviewer.model.reftek_data.reftek_reader.soh_packet import (
SOHPacket)
class Reftek130UnpackPacketError(ValueError):
pass
eh_et_payload_last_field_start = 88
eh_et_payload_last_field_size = 16
# The payload start is based on the start of the payload, so we have to add 24
# to compensate for the size of the header and extended header.
eh_et_payload_end_in_packet = (
eh_et_payload_last_field_start + eh_et_payload_last_field_size + 24
)
# name, offset, length (bytes) and converter routine for EH/ET packet payload
# Trimmed to only include the parts used elsewhere for the sake of better
# performance.
EH_PAYLOAD = {
"station_name_extension": (35, 1, _decode_ascii),
"station_name": (36, 4, _decode_ascii),
"sampling_rate": (64, 4, float),
"trigger_time": (72, 16, _parse_long_time),
"first_sample_time": (
eh_et_payload_last_field_start, eh_et_payload_last_field_size,
_parse_long_time),
}
obspy_rt130_packet.EH_PAYLOAD = {
"trigger_time_message": (0, 33, _decode_ascii),
"time_source": (33, 1, _decode_ascii),
"time_quality": (34, 1, _decode_ascii),
......@@ -57,21 +80,37 @@ EH_PAYLOAD = {
"position": (894, 26, _decode_ascii),
"reftek_120": (920, 80, None)}
obspy_rt130_packet.EH_PAYLOAD = EH_PAYLOAD
class EHPacket(obspy_rt130_packet.EHPacket):
def __str__(self, compact=False):
def __init__(self, data: numpy.ndarray) -> None:
"""
Reimplement __init__ to change a different value for EH_PAYLOAD.
This should be the cleanest way to do it, seeing as any other way I
can think of modify EH_PAYLOAD in the original file, which can have
consequences that are not readily apparent.
:param data: the data of an EH packet. For more information, refer to
obspy.io.reftek.packet.PACKET_FINAL_DTYPE.
"""
self._data = data
payload = self._data["payload"].tobytes()
for name, (start, length, converter) in EH_PAYLOAD.items():
data = payload[start:start + length]
if converter is not None:
data = converter(data)
setattr(self, name, data)
def __str__(self, compact: bool = False) -> str:
if compact:
sta = (self.station_name.strip() +
self.station_name_extension.strip())
info = ("{:04d} {:2s} {:4s} {:2d} {:4d} {:4d} {:2d} {:2s} "
"{:5s} {:4s} {!s}").format(
self.packet_sequence, self.type.decode(),
self.unit_id.decode(), self.experiment_number,
self.byte_count, self.event_number,
self.data_stream_number, self.data_format.decode(),
sta, str(self.sampling_rate)[:4], self.time)
self.packet_sequence, self.type.decode(),
self.unit_id.decode(), self.experiment_number,
self.byte_count, self.event_number,
self.data_stream_number, self.data_format.decode(),
sta, str(self.sampling_rate)[:4], self.time)
else:
info = []
for key in self._headers:
......@@ -91,40 +130,16 @@ class EHPacket(obspy_rt130_packet.EHPacket):
"\n\t".join(info))
return info
def eh_et_info(self, nbr_DT_samples):
def eh_et_info(self, nbr_DT_samples: int) -> List[str]:
"""
Compile EH and ET info to write to log file.
Returns list of strings.
Formatting of strings is based on earlier version of rt2ms.
"""
info = []
# packet_tagline1 = ("\n\n{:s} exp {:02d} bytes {:04d} {:s} ID: {:s} "
# "seq {:04d}".format(self.type.decode(),
# self.experiment_number,
# self.byte_count,
# Packet.time_tag(self.time),
# self.unit_id.decode(),
# self.packet_sequence))
# info.append(packet_tagline1)
# if self.type.decode('ASCII') == 'EH':
# nbr_DT_samples = 0
# info.append("\nEvent Header")
# else:
# info.append("\nEvent Trailer")
# info.append("\n event = " + str(self.event_number))
# info.append("\n stream = " + str(self.data_stream_number + 1))
# info.append("\n format = " + self.data_format.decode('ASCII'))
# info.append("\n stream name = " + self.stream_name)
# info.append("\n sample rate = " + str(self.sampling_rate))
# info.append("\n trigger type = " + self.trigger_type)
trigger_time = Packet.time_tag(UTCDateTime(ns=self.trigger_time))
# info.append("\n trigger time = " + trigger_time)
first_sample_time = Packet.time_tag(UTCDateTime(ns=self.first_sample_time)) # noqa: E501
# info.append("\n first sample = " + first_sample_time)
# if self.last_sample_time:
# info.append("\n last sample = " + Packet.time_tag(UTCDateTime(ns=self.last_sample_time))) # noqa: E501
# info.append("\n bit weights = " + " ".join([val for val in self.channel_adjusted_nominal_bit_weights if val])) # noqa: E501
# info.append("\n true weights = " + " ".join([val for val in self.channel_true_bit_weights if val])) # noqa: E501
trigger_time = SOHPacket.time_tag(UTCDateTime(ns=self.trigger_time))
first_sample_time = SOHPacket.time_tag(
UTCDateTime(ns=self.first_sample_time)) # noqa: E501
packet_tagline2 = ("\nDAS: {:s} EV: {:04d} DS: {:d} FST = {:s} TT = "
"{:s} NS: {:d} SPS: {:.1f} ETO: 0"
.format(self.unit_id.decode(),
......
from typing import Tuple, Any
import numpy
from obspy.io.reftek.util import bcd
from sohstationviewer.model.general_data.general_record_helper import Unpacker
from sohstationviewer.model.reftek_data.reftek_reader.packet import \
eh_et_payload_end_in_packet
from sohstationviewer.model.reftek_data.reftek_reader.packets import (
DTExtendedHeader,
EHETExtendedHeader, SOHExtendedHeader,
)
def decode_uncompressed(packet: bytes, data_format: str, unpacker: Unpacker
) -> int:
"""
Grab the first data point in a packet that contains uncompressed RT130 data
(aka packets with data format 16, 32, or 33_.
:param packet: the bytes that make up the given packet.
:param data_format: the data format of the given packet, can be one of 16,
32, or 33.
:param unpacker: the unpacker to use to decode the data.
:return: the first data point in the given packet
"""
data = packet[24:]
# For uncompressed RT130 data, the data format is also the size of a data
# point in bit (aside from data format 33, which uses the same size as data
# format 32).
point_size = int(data_format)
if point_size == 33:
point_size = 32
# Convert the size of a data point to byte because the data is stored
# as a byte string.
point_size = point_size // 8
# struct.unpack uses different format characters for different point sizes.
format_char = {2: 'h', 4: 'i'}[point_size]
first_data_point = data[:point_size]
return unpacker.unpack(f'{format_char}', first_data_point)[0]
def decode_compressed(packet: bytes, data_format: str, unpacker: Unpacker
) -> int:
"""
Grab the stop point in a packet that contains compressed RT130 data (aka
packets with data format C0, C1, C2, or C3).
We get the stop point in this case because that is what logpeek did. It
also looks a lot better than using the start point, so that is a plus.
:param packet: the bytes that make up the given packet.
:param data_format: the data format of the given packet, can be one of C0,
C1, C2, or C3. Exist only to have the same signature as
decode_uncompressed
:param unpacker: the unpacker to use to decode the data.
:return: the first data point in the given packet
"""
# The data in a compressed data packet starts at byte 64, with bytes
# between byte 24 and 64 being fillers.
data = packet[64:]
first_data_point = data[8:12]
return unpacker.unpack('i', first_data_point)[0]
def read_dt_packet(packet: bytes, unpacker: Unpacker
) -> Tuple[DTExtendedHeader, Any]:
"""
Process a DT packet and get its extended header and first data point.
:param packet: the bytes that make up the given DT packet.
:param unpacker: the unpacker to use to decode the data.
:return: the extended header and first data point of the given DT packet.
"""
decoders = {
**dict.fromkeys(['16', '32', '33'], decode_uncompressed),
**dict.fromkeys(['C0', 'C1', 'C2', 'C3'], decode_compressed)
}
event_number = int(packet[16:18].hex())
data_stream_number = int(packet[18:19].hex())
channel_number = int(packet[19:20].hex())
number_of_samples = int(packet[20:22].hex())
flags = unpacker.unpack('B', packet[22:23])[0]
data_format = packet[23:24].hex().upper()
extended_header = DTExtendedHeader(event_number, data_stream_number,
channel_number, number_of_samples,
flags, data_format)
first_data_point = decoders[data_format](packet, data_format, unpacker)
return extended_header, first_data_point
def read_eh_et_packet(packet: bytes, unpacker: Unpacker
) -> Tuple[EHETExtendedHeader, bytes]:
"""
Process an EH/ET packet and get its extended header and required part of
the payload.
:param packet: the bytes that make up the given EH/ET packet.
:param unpacker: the unpacker to use to decode the data.
:return: the extended header and truncated payload of the given EH/ET
packet.
"""
event_number = int(packet[16:18].hex())
data_stream_number = int(packet[18:19].hex())
flags = unpacker.unpack('B', packet[22:23])[0]
data_format = packet[23:24].hex().upper()
extended_header = EHETExtendedHeader(event_number, data_stream_number,
flags, data_format)
# The largest possible data point has a size of 4 bytes, so we need to
# grab at least data.
payload = packet[24:eh_et_payload_end_in_packet]
return extended_header, payload
def bcd_16bit_int(_i) -> int:
"""
Reimplement a private function of the same name in obspy. Kept here in case
the private function is removed in a future obspy version.
:param _i: the byte string to convert into a 16-bite integer
:return: a 16-bit integer
"""
_i = bcd(_i)
return _i[0] * 100 + _i[1]
def read_soh_packet(packet: bytes, unpacker: Unpacker
) -> Tuple[SOHExtendedHeader, bytes]:
"""
Process an SOH packet and get its extended header and poyload.
:param packet: the bytes that make up the given SOH packet.
:param unpacker: the unpacker to use to decode the data.
:return: the extended header and payload of the given SOH packet.
"""
event_number = bcd_16bit_int(numpy.frombuffer(packet[16:18], numpy.uint8))
data_stream_number = bcd(numpy.frombuffer(packet[18:19], numpy.uint8))
channel_number = bcd(numpy.frombuffer(packet[19:20], numpy.uint8))
number_of_samples = bcd_16bit_int(
numpy.frombuffer(packet[20:22], numpy.uint8)
)
flags = unpacker.unpack('B', packet[22:23])[0]
data_format = packet[23:24].hex().upper()
extended_header = SOHExtendedHeader(event_number, data_stream_number,
channel_number, number_of_samples,
flags, data_format)
payload = packet[24:]
return extended_header, payload
import dataclasses
from sohstationviewer.model.reftek_data.reftek_reader.header import (
PacketHeader)
@dataclasses.dataclass
class DTExtendedHeader:
"""
The extended header of a DT packet.
"""
event_number: int
data_stream_number: int
channel_number: int
number_of_samples: int
flags: int
data_format: str
@dataclasses.dataclass
class DTPacket:
"""
The decoded data of a DT packet.
"""
header: PacketHeader
extended_header: DTExtendedHeader
data: int
@dataclasses.dataclass
class EHETExtendedHeader:
"""
A collection of some useful information about an EH/ET packet. Technically,
EH/ET packets do not have extended headers. We name this class what it is
due to the way obspy.Reftek130 (and consequently, core.Reftek130) stores
the data of processed packets. For more information, refer to
Reftek130._data.
"""
event_number: int
data_stream_number: int
flags: int
data_format: str
def __post_init__(self):
self.channel_number = 0
self.number_of_samples = 0
@dataclasses.dataclass
class EHETPacket:
"""
The decoded data of an EH/ET packet. The extended_header field is to ensure
compatibility with dt_packet.DTPacket. EH/ET packets do not have an
extended header otherwise.
"""
header: PacketHeader
extended_header: EHETExtendedHeader
data: bytes
@dataclasses.dataclass
class SOHExtendedHeader:
"""
A collection of dummy data for some information needed so that
core.Reftek130 can understand SOH packets.
core.Reftek130 focuses on reading waveform data, so it wants information
available in the waveform packets (EH/ET/DT). However, core.Reftek130 also
supports SOH packets, which does not contain the required information. As
a result, we need to store dummy data in its place.
"""
event_number: int
data_stream_number: int
channel_number: int
number_of_samples: int
flags: int
data_format: str
@dataclasses.dataclass
class SOHPacket:
"""
The decoded data of an SOH packet. The extended_header field is to ensure
compatibility with dt_packet.DTPacket. SOH packets do not have an
extended header otherwise.
"""
header: PacketHeader
extended_header: SOHExtendedHeader
data: bytes
import os
from typing import Any, Dict, Callable, Union, List, Tuple
import numpy
import numpy as np
from sohstationviewer.model.general_data.general_record_helper import Unpacker
from sohstationviewer.model.reftek_data.reftek_reader.packet import \
eh_et_payload_end_in_packet
from sohstationviewer.model.reftek_data.reftek_reader.packet_readers import (
read_dt_packet, read_eh_et_packet, read_soh_packet,
)
from sohstationviewer.model.reftek_data.reftek_reader.packets import (
DTPacket, EHETPacket, SOHPacket,
)
from sohstationviewer.model.reftek_data.reftek_reader.header import \
get_rt130_packet_header
def packet_reader_placeholder(*args: Any, **kwargs: Any) -> Tuple[Any, Any]:
"""
Placeholder function to be used in place of an RT130 packet reader
function. This function immediately returns None.
"""
return None, None
def read_rt130_file(file_name: str, unpacker: Unpacker
) -> List[Union[EHETPacket, DTPacket, SOHPacket]]:
"""
Read an RT130 file and stores the data in a list of RT130 packets.
:param file_name: the name of the file to read.
:param unpacker: the decoder used to decode the data.
:return: a list of processed RT130 packets.
"""
# RT130 data looks to be all big-endian (logpeek assumes this, and it has
# been working pretty well), so we don't have to do any endianness check.
packets = []
with open(file_name, 'rb') as rt130_file:
# Each packet is exactly 1024 bytes, so we can rely on that to know
# when we have finished reading.
for i in range(os.path.getsize(file_name) // 1024):
packet = rt130_file.read(1024)
packet_header = get_rt130_packet_header(packet)
waveform_handlers: Dict[str, Callable] = {
'EH': read_eh_et_packet,
'ET': read_eh_et_packet,
'DT': read_dt_packet,
}
soh_handlers: Dict[str, Callable] = dict.fromkeys(
['AD', 'CD', 'DS', 'FD', 'OM', 'SC', 'SH'],
read_soh_packet
)
packet_handlers = {
**waveform_handlers, **soh_handlers
}
packet_handler = packet_handlers.get(
packet_header.packet_type, packet_reader_placeholder
)
return_val = packet_handler(packet, unpacker)
if packet_header.packet_type == 'DT':
packet_type = DTPacket
elif packet_header.packet_type in ['EH', 'ET']:
packet_type = EHETPacket
else:
packet_type = SOHPacket
extended_header, data = return_val
current_packet = packet_type(packet_header, extended_header, data)
packets.append(current_packet)
return packets
def convert_packet_to_obspy_format(packet: Union[EHETPacket, DTPacket,
SOHPacket],
unpacker: Unpacker) -> Tuple:
"""
Convert an RT130 packet into a numpy array of type PACKET_FINAL_DTYPE
:param packet: an RT130 packet.
:param unpacker: the decoder used to decode the data.
:return: a tuple that can be converted into an object of type
PACKET_FINAL_DTYPE that contains the data stored in packet.
"""
# We want to convert the packet to a tuple. In order to make it easier to
# maintain, we first convert the packet to a dictionary. Then, we grab the
# values of the dictionary as tuple to get the final result.
converted_packet = {}
converted_packet['packet_type'] = packet.header.packet_type
converted_packet['experiment_number'] = packet.header.experiment_number
# Obspy only stores the last two digits of the year.
converted_packet['year'] = packet.header.time.year % 100
converted_packet['unit_id'] = packet.header.unit_id
converted_packet['time'] = packet.header.time.ns
converted_packet['byte_count'] = packet.header.byte_count
converted_packet['packet_sequence'] = packet.header.packet_sequence
converted_packet['event_number'] = packet.extended_header.event_number
converted_packet[
'data_stream_number'] = packet.extended_header.data_stream_number
converted_packet['channel_number'] = packet.extended_header.channel_number
converted_packet[
'number_of_samples'] = packet.extended_header.number_of_samples
converted_packet['flags'] = packet.extended_header.flags
converted_packet['data_format'] = packet.extended_header.data_format
if converted_packet['packet_type'] == 'DT':
# Obspy stores the data as list of 1-byte integers. We store the
# data as an arbitrary length integer, so we need to do some
# conversion. To make encoding and decoding the data point easier, we
# store it in 4 bytes no matter what the data format is. This only
# has an effect on data with format 16. Thanks to a quirk with
# 2-complement binary encoding, however, this does not cause any
# problem.
data_size = 4
format_char = 'B'
converted_packet['payload'] = numpy.empty(1000, np.uint8)
packet_data = list(unpacker.unpack(
f'{data_size}{format_char}',
packet.data.to_bytes(data_size, 'big', signed=True)
))
converted_packet['payload'][:4] = packet_data
elif converted_packet['packet_type'] in ['EH', 'ET']:
eh_et_payload_size = eh_et_payload_end_in_packet - 24
converted_packet['payload'] = numpy.empty(1000, np.uint8)
packet_data = numpy.frombuffer(packet.data, np.uint8)
converted_packet['payload'][:eh_et_payload_size] = packet_data
else:
converted_packet['payload'] = numpy.frombuffer(packet.data, np.uint8)
return tuple(converted_packet.values())