Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
Commits on Source (3)
...@@ -11,6 +11,8 @@ from obspy.core import Stream ...@@ -11,6 +11,8 @@ from obspy.core import Stream
from sohstationviewer.conf import constants from sohstationviewer.conf import constants
from sohstationviewer.conf.constants import SOFTWARE_VERSION from sohstationviewer.conf.constants import SOFTWARE_VERSION
from sohstationviewer.model.reftek_data.reftek_reader.core import \
DecimatedReftek130
from sohstationviewer.model.reftek_data.reftek_reader.log_file_reader import ( from sohstationviewer.model.reftek_data.reftek_reader.log_file_reader import (
LogFileReader, process_mass_poss_line, LogReadError, LogFileReader, process_mass_poss_line, LogReadError,
) )
...@@ -297,12 +299,6 @@ class RT130(GeneralData): ...@@ -297,12 +299,6 @@ class RT130(GeneralData):
raise ProcessingDataError(msg) raise ProcessingDataError(msg)
selected_data_set_id = data_set_ids[0] selected_data_set_id = data_set_ids[0]
if not self.on_unittest and len(data_set_ids) > 1:
msg = ("There are more than one data set IDs in the given data.\n"
"Please select one to display")
self.pause_signal.emit(msg, data_set_ids)
self.pause()
selected_data_set_id = data_set_ids[self.pause_response]
self.track_info(f'Select data set ID {selected_data_set_id}', self.track_info(f'Select data set ID {selected_data_set_id}',
LogType.INFO) LogType.INFO)
...@@ -318,7 +314,7 @@ class RT130(GeneralData): ...@@ -318,7 +314,7 @@ class RT130(GeneralData):
:param path2file: absolute path to file :param path2file: absolute path to file
""" """
try: try:
rt130 = core.Reftek130.from_file(path2file) rt130 = core.DecimatedReftek130.from_file(path2file)
except Exception: except Exception:
fmt = traceback.format_exc() fmt = traceback.format_exc()
self.track_info(f"Skip file {path2file} can't be read " self.track_info(f"Skip file {path2file} can't be read "
...@@ -329,22 +325,20 @@ class RT130(GeneralData): ...@@ -329,22 +325,20 @@ class RT130(GeneralData):
nbr_packet_type = dict(zip(unique, counts)) nbr_packet_type = dict(zip(unique, counts))
if b"SH" in nbr_packet_type: if b"SH" in nbr_packet_type:
self.read_sh_packet(path2file) self.read_sh_packet(rt130)
if b"EH" in nbr_packet_type or b"ET" in nbr_packet_type: if b"EH" in nbr_packet_type or b"ET" in nbr_packet_type:
self.read_eh_or_et_packet(rt130) self.read_eh_or_et_packet(rt130)
return True return True
def read_sh_packet(self, path2file: Path) -> None: def read_sh_packet(self, rt130: DecimatedReftek130) -> None:
""" """
Use soh_packet library to read file with SH packet for soh data Use soh_packet library to read file with SH packet for soh data
to append tuple (time, log string) to to append tuple (time, log string) to
log_data[self.cur_data_set_id][SOH] log_data[self.cur_data_set_id][SOH]
:param path2file: absolute path to file :param rt130: RT130 object of an SOH file.
""" """
with open(path2file, "rb") as fh: data = rt130._data
str = fh.read()
data = soh_packet._initial_unpack_packets_soh(str)
for ind, d in enumerate(data): for ind, d in enumerate(data):
cur_data_set_id = (d['unit_id'].decode(), cur_data_set_id = (d['unit_id'].decode(),
f"{d['experiment_number']}") f"{d['experiment_number']}")
...@@ -363,7 +357,7 @@ class RT130(GeneralData): ...@@ -363,7 +357,7 @@ class RT130(GeneralData):
self.log_data[cur_data_set_id]['SOH'] = [] self.log_data[cur_data_set_id]['SOH'] = []
self.log_data[cur_data_set_id]['SOH'].append((d['time'], logs)) self.log_data[cur_data_set_id]['SOH'].append((d['time'], logs))
def read_eh_or_et_packet(self, rt130: core.Reftek130) -> None: def read_eh_or_et_packet(self, rt130: core.DecimatedReftek130) -> None:
""" """
Files that contents EH or ET packets are data stream (DS and Files that contents EH or ET packets are data stream (DS and
mass position (DS 9) files. mass position (DS 9) files.
...@@ -394,7 +388,7 @@ class RT130(GeneralData): ...@@ -394,7 +388,7 @@ class RT130(GeneralData):
self.get_mass_pos_data_and_waveform_data( self.get_mass_pos_data_and_waveform_data(
rt130, data_stream, cur_data_set_id) rt130, data_stream, cur_data_set_id)
def get_ehet_in_log_data(self, rt130: core.Reftek130, def get_ehet_in_log_data(self, rt130: core.DecimatedReftek130,
cur_data_set_id: Tuple[str, str]) -> None: cur_data_set_id: Tuple[str, str]) -> None:
""" """
Read event header info to add to log_data['EHET'] Read event header info to add to log_data['EHET']
...@@ -419,7 +413,7 @@ class RT130(GeneralData): ...@@ -419,7 +413,7 @@ class RT130(GeneralData):
(d['time'], logs)) (d['time'], logs))
def get_mass_pos_data_and_waveform_data( def get_mass_pos_data_and_waveform_data(
self, rt130: core.Reftek130, data_stream: int, self, rt130: core.DecimatedReftek130, data_stream: int,
cur_data_set_id: Tuple[str, str]) -> None: cur_data_set_id: Tuple[str, str]) -> None:
""" """
Get mass_pos_data for the current data_set_id in DS 9. Get mass_pos_data for the current data_set_id in DS 9.
......
...@@ -6,12 +6,12 @@ from obspy.core import Stream ...@@ -6,12 +6,12 @@ from obspy.core import Stream
from obspy import UTCDateTime from obspy import UTCDateTime
from sohstationviewer.model.reftek_data.reftek_reader.core import ( from sohstationviewer.model.reftek_data.reftek_reader.core import (
DiscontinuousTrace, Reftek130) DiscontinuousTrace, DecimatedReftek130)
from sohstationviewer.model.general_data.general_data_helper import squash_gaps from sohstationviewer.model.general_data.general_data_helper import squash_gaps
def check_reftek_header( def check_reftek_header(
rt130: Reftek130, cur_data_set_id: Tuple[str, str], rt130: DecimatedReftek130, cur_data_set_id: Tuple[str, str],
starttime: UTCDateTime, endtime: UTCDateTime, starttime: UTCDateTime, endtime: UTCDateTime,
stream_header_by_data_set_id_chan: Dict[str, Dict[str, Stream]], stream_header_by_data_set_id_chan: Dict[str, Dict[str, Stream]],
cur_data_dict: Dict, cur_data_time: List[float], cur_data_dict: Dict, cur_data_time: List[float],
...@@ -36,7 +36,7 @@ def check_reftek_header( ...@@ -36,7 +36,7 @@ def check_reftek_header(
:param include_mp123zne: if mass position channels 1,2,3 are requested :param include_mp123zne: if mass position channels 1,2,3 are requested
:param include_mp456uvw: if mass position channels 4,5,6 are requested :param include_mp456uvw: if mass position channels 4,5,6 are requested
""" """
stream = Reftek130.to_stream( stream = DecimatedReftek130.to_stream(
rt130, rt130,
include_mp123=include_mp123zne, include_mp123=include_mp123zne,
include_mp456=include_mp456uvw, include_mp456=include_mp456uvw,
...@@ -76,7 +76,7 @@ def check_reftek_header( ...@@ -76,7 +76,7 @@ def check_reftek_header(
def read_reftek_stream( def read_reftek_stream(
rt130: Reftek130, rt130: DecimatedReftek130,
avail_trace_indexes: List[int], cur_data_dict: Dict, avail_trace_indexes: List[int], cur_data_dict: Dict,
include_mp123zne: bool, include_mp456uvw: bool): include_mp123zne: bool, include_mp456uvw: bool):
""" """
...@@ -91,7 +91,7 @@ def read_reftek_stream( ...@@ -91,7 +91,7 @@ def read_reftek_stream(
:param include_mp456uvw: if mass position channels 4,5,6 are requested :param include_mp456uvw: if mass position channels 4,5,6 are requested
""" """
# TODO: rewrite reftek to read stream with start and end time # TODO: rewrite reftek to read stream with start and end time
stream = Reftek130.to_stream( stream = DecimatedReftek130.to_stream(
rt130, rt130,
include_mp123=include_mp123zne, include_mp123=include_mp123zne,
include_mp456=include_mp456uvw, include_mp456=include_mp456uvw,
......
from __future__ import annotations from __future__ import annotations
import numpy
from sohstationviewer.model.reftek_data.reftek_reader import soh_packet from sohstationviewer.model.reftek_data.reftek_reader import soh_packet
""" """
...@@ -74,35 +76,38 @@ class Reftek130Exception(ObsPyException): ...@@ -74,35 +76,38 @@ class Reftek130Exception(ObsPyException):
pass pass
class Reftek130(obspy_rt130_core.Reftek130): class DecimatedReftek130(obspy_rt130_core.Reftek130):
""" """
Child class of obspy.Reftek that reads waveform data similar to logpeek for Child class of obspy.Reftek that reads waveform data similar to logpeek for
better performance. better performance.
""" """
@staticmethod @staticmethod
def from_file(file: Union[str, Path]) -> Reftek130: def from_file(file: Union[str, Path]) -> DecimatedReftek130:
""" """
Read data from an RT130 file and save it in a Reftek130 object. Read data from an RT130 file and save it in a Reftek130 object.
:param file: the RT130 file to read :param file: the RT130 file to read
:return: a Reftek130 object that stores the data in file :return: a Reftek130 object that stores the data in file
""" """
# RT130 data is all big-endian rt = DecimatedReftek130()
rt130_unpacker = Unpacker('>')
rt = Reftek130()
rt._filename = file rt._filename = file
packets_in_file = read_rt130_file(file, rt130_unpacker)
first_packet_type = packets_in_file[0].header.packet_type # SOH and waveform packets are read into different formats, so we have
# to handle them separately. The two formats are encoded in the PACKET
# constants in obspy.io.reftek.packet (waveform packets) and
# model.reftek_data.reftek_reader.soh_packet (SOH packets).
infile = open(file, 'rb')
# Because SOH and waveform data are stored in separate files, we
# only need to look at the first packet type
first_packet_type = infile.read(2)
infile.seek(0)
if first_packet_type in ['EH', 'ET', 'DT']: if first_packet_type in ['EH', 'ET', 'DT']:
packet_converter = convert_waveform_packet_to_obspy_format data = obspy_rt130_core.Reftek130.from_file(file)._data
final_dtype = PACKET_FINAL_DTYPE
else: else:
packet_converter = convert_soh_packet_to_obspy_format data = soh_packet._initial_unpack_packets_soh(infile.read())
final_dtype = soh_packet.PACKET_FINAL_DTYPE rt._data = data
converted_packets = []
for packet in packets_in_file:
converted_packets.append(packet_converter(packet, rt130_unpacker))
rt._data = np.array(converted_packets, dtype=final_dtype)
return rt return rt
def to_stream(self, network: str = "", location: str = "", def to_stream(self, network: str = "", location: str = "",
......
...@@ -5,7 +5,10 @@ from pathlib import Path ...@@ -5,7 +5,10 @@ from pathlib import Path
import numpy import numpy
import obspy.core import obspy.core
from numpy.testing import assert_array_equal from numpy.testing import assert_array_equal
from obspy.io.reftek import packet
from obspy.io.reftek.packet import PACKET_FINAL_DTYPE
from sohstationviewer.model.reftek_data.reftek_reader import soh_packet
from sohstationviewer.model.reftek_data.reftek_reader.core import ( from sohstationviewer.model.reftek_data.reftek_reader.core import (
DiscontinuousTrace, DiscontinuousTrace,
Reftek130, Reftek130,
...@@ -14,9 +17,9 @@ from sohstationviewer.model.reftek_data.reftek_reader.header import \ ...@@ -14,9 +17,9 @@ from sohstationviewer.model.reftek_data.reftek_reader.header import \
NotRT130FileError NotRT130FileError
from sohstationviewer.model.reftek_data.reftek_reader.reftek_reader_helper \ from sohstationviewer.model.reftek_data.reftek_reader.reftek_reader_helper \
import ( import (
convert_soh_packet_to_obspy_format, convert_soh_packet_to_obspy_format,
convert_waveform_packet_to_obspy_format, convert_waveform_packet_to_obspy_format,
) )
from tests.base_test_case import BaseTestCase from tests.base_test_case import BaseTestCase
...@@ -73,19 +76,6 @@ class TestReftek130FromFile(BaseTestCase): ...@@ -73,19 +76,6 @@ class TestReftek130FromFile(BaseTestCase):
self.rt130_dir = self.TEST_DATA_DIR.joinpath( self.rt130_dir = self.TEST_DATA_DIR.joinpath(
'RT130-sample/2017149.92EB/2017150/92EB' 'RT130-sample/2017149.92EB/2017150/92EB'
) )
base_path = 'sohstationviewer.model.reftek_data.reftek_reader.core'
waveform_patcher = patch(
f'{base_path}.convert_waveform_packet_to_obspy_format',
wraps=convert_waveform_packet_to_obspy_format
)
self.addCleanup(waveform_patcher.stop)
self.mock_waveform_converter = waveform_patcher.start()
soh_patcher = patch(
f'{base_path}.convert_soh_packet_to_obspy_format',
wraps=convert_soh_packet_to_obspy_format
)
self.addCleanup(soh_patcher.stop)
self.mock_soh_converter = soh_patcher.start()
def test_rt130_file(self): def test_rt130_file(self):
with self.subTest('test_soh_file'): with self.subTest('test_soh_file'):
...@@ -100,21 +90,228 @@ class TestReftek130FromFile(BaseTestCase): ...@@ -100,21 +90,228 @@ class TestReftek130FromFile(BaseTestCase):
def test_rt130_soh_file(self): def test_rt130_soh_file(self):
file = self.rt130_dir.joinpath('0/000000000_00000000') file = self.rt130_dir.joinpath('0/000000000_00000000')
rt130 = Reftek130.from_file(file) rt130 = Reftek130.from_file(file)
# The most common SOH packet type looks to be SH, so we use that as # Construct the data type of the SOH packet metadata, which is the SOH
# the default. # packet data type without the payload. Numpy does not have a
self.assertIn(b'SH', rt130._data['packet_type']) # straightforward way to create a data type based on another, so we
self.assertTrue(self.mock_soh_converter.called) # have to construct this from the raw data.
self.assertFalse(self.mock_waveform_converter.called) soh_packet_metadata_dtype = numpy.dtype(
[(name, dtype_final)
for name, dtype_initial, converter, dtype_final
in soh_packet.PACKET[:-1]]
)
expected_metadata = numpy.array([
(b'SH', 25, 17, b'92EB', 1496102400000000000, 1002, 0),
(b'SH', 25, 17, b'92EB', 1496102400000000000, 815, 1),
(b'SC', 25, 17, b'92EB', 1496102400000000000, 640, 2),
(b'OM', 25, 17, b'92EB', 1496102400000000000, 112, 3),
(b'DS', 25, 17, b'92EB', 1496102400000000000, 476, 4),
(b'AD', 25, 17, b'92EB', 1496102400000000000, 56, 5),
(b'CD', 25, 17, b'92EB', 1496102400000000000, 538, 6),
(b'FD', 25, 17, b'92EB', 1496102400000000000, 404, 7),
(b'FD', 25, 17, b'92EB', 1496102400000000000, 780, 8),
(b'FD', 25, 17, b'92EB', 1496102400000000000, 964, 9),
(b'FD', 25, 17, b'92EB', 1496102400000000000, 156, 10),
(b'FD', 25, 17, b'92EB', 1496102400000000000, 76, 11),
(b'FD', 25, 17, b'92EB', 1496102400000000000, 428, 12),
(b'FD', 25, 17, b'92EB', 1496102400000000000, 68, 13),
(b'FD', 25, 17, b'92EB', 1496102400000000000, 84, 14),
(b'FD', 25, 17, b'92EB', 1496102400000000000, 232, 15),
(b'FD', 25, 17, b'92EB', 1496102400000000000, 464, 16),
(b'SH', 25, 17, b'92EB', 1496102400000000000, 1018, 17),
(b'SH', 25, 17, b'92EB', 1496108400000000000, 1010, 18),
(b'SH', 25, 17, b'92EB', 1496113129000000000, 970, 19),
(b'SH', 25, 17, b'92EB', 1496116800000000000, 1022, 20),
(b'SH', 25, 17, b'92EB', 1496120400000000000, 988, 21),
(b'SH', 25, 17, b'92EB', 1496124000000000000, 991, 22),
(b'SH', 25, 17, b'92EB', 1496130005000000000, 1014, 23),
(b'SH', 25, 17, b'92EB', 1496134088000000000, 985, 24),
(b'SH', 25, 17, b'92EB', 1496138400000000000, 991, 25),
(b'SH', 25, 17, b'92EB', 1496142000000000000, 1022, 26),
(b'SH', 25, 17, b'92EB', 1496145600000000000, 988, 27),
(b'SH', 25, 17, b'92EB', 1496151600000000000, 1011, 28),
(b'SH', 25, 17, b'92EB', 1496155344000000000, 1005, 29),
(b'SH', 25, 17, b'92EB', 1496160000000000000, 999, 30),
(b'SH', 25, 17, b'92EB', 1496163600000000000, 1020, 31),
(b'SH', 25, 17, b'92EB', 1496167200000000000, 981, 32),
(b'SH', 25, 17, b'92EB', 1496170800000000000, 1000, 33),
(b'SH', 25, 17, b'92EB', 1496175661000000000, 1013, 34),
(b'SH', 25, 17, b'92EB', 1496180462000000000, 1008, 35),
(b'SH', 25, 17, b'92EB', 1496185200000000000, 868, 36),
], dtype=numpy.dtype(soh_packet_metadata_dtype))
actual_metadata = rt130._data[list(rt130._data.dtype.names)[:-1]]
self.assertTrue((expected_metadata == actual_metadata).all())
# We only look at the head and tail of the payloads because each one
# contains 1000 bytes.
expected_payload_head = numpy.array(
[[32, 32, 32, 32, 32], [32, 32, 32, 32, 32], [50, 53, 73, 66, 68],
[32, 32, 32, 32, 32], [49, 32, 83, 49, 115], [32, 32, 49, 50, 51],
[32, 32, 32, 32, 32], [0, 51, 2, 32, 95], [0, 52, 4, 32, 189],
[0, 53, 5, 32, 235], [0, 65, 8, 0, 33], [0, 66, 2, 0, 13],
[0, 67, 2, 0, 101], [0, 68, 2, 0, 11], [0, 69, 2, 0, 15],
[0, 70, 2, 0, 52], [0, 71, 2, 0, 110], [32, 32, 32, 32, 32],
[32, 32, 32, 32, 32], [32, 32, 32, 32, 32], [32, 32, 32, 32, 32],
[32, 32, 32, 32, 32], [32, 32, 32, 32, 32], [32, 32, 32, 32, 32],
[32, 32, 32, 32, 32], [32, 32, 32, 32, 32], [32, 32, 32, 32, 32],
[32, 32, 32, 32, 32], [32, 32, 32, 32, 32], [32, 32, 32, 32, 32],
[32, 32, 32, 32, 32], [32, 32, 32, 32, 32], [32, 32, 32, 32, 32],
[32, 32, 32, 32, 32], [32, 32, 32, 32, 32], [32, 32, 32, 32, 32],
[32, 32, 32, 32, 32]],
dtype=numpy.uint8)
actual_payload_head = rt130._data['payload'][:, :5]
self.assertTrue((expected_payload_head == actual_payload_head).all())
expected_payload_tail = numpy.array(
[[32, 32, 32, 32, 32], [32, 32, 32, 32, 32], [50, 48, 48, 48, 48],
[50, 48, 48, 48, 48], [50, 48, 48, 48, 48], [50, 48, 48, 48, 48],
[50, 48, 48, 48, 48], [50, 48, 48, 48, 48], [50, 48, 48, 48, 48],
[50, 48, 48, 48, 48], [50, 48, 48, 48, 48], [50, 48, 48, 48, 48],
[50, 48, 48, 48, 48], [50, 48, 48, 48, 48], [50, 48, 48, 48, 48],
[50, 48, 48, 48, 48], [50, 48, 48, 48, 48], [32, 32, 32, 32, 32],
[32, 32, 32, 32, 32], [32, 32, 32, 32, 32], [82, 13, 10, 32, 32],
[32, 32, 32, 32, 32], [32, 32, 32, 32, 32], [32, 32, 32, 32, 32],
[32, 32, 32, 32, 32], [32, 32, 32, 32, 32], [84, 13, 10, 32, 32],
[32, 32, 32, 32, 32], [32, 32, 32, 32, 32], [32, 32, 32, 32, 32],
[32, 32, 32, 32, 32], [10, 32, 32, 32, 32], [32, 32, 32, 32, 32],
[32, 32, 32, 32, 32], [32, 32, 32, 32, 32], [32, 32, 32, 32, 32],
[32, 32, 32, 32, 32]],
dtype=numpy.uint8)
actual_payload_tail = rt130._data['payload'][:, -5:]
self.assertTrue((expected_payload_tail == actual_payload_tail).all())
def test_rt130_uncompressed_raw_data_file(self):
file = self.rt130_dir.joinpath('9/054910000_013EE8A0')
rt130 = Reftek130.from_file(file)
# Construct the data type of the SOH packet metadata, which is the SOH
# packet data type without the payload. Numpy does not have a
# straightforward way to create a data type based on another, so we
# have to construct this from the raw data.
data_packet_metadata_dtype = numpy.dtype(
[(name, dtype_final)
for name, dtype_initial, converter, dtype_final
in packet.PACKET[:-1]]
)
expected_metadata = numpy.array([
(b'EH', 25, 17, b'92EB', 1496123350000000000, 416, 0, 17, 8, 0, 0,
0, b'16'),
(b'DT', 25, 17, b'92EB', 1496123350000000000, 1024, 1, 17, 8, 0,
500, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496123350000000000, 1024, 2, 17, 8, 1,
500, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496123350000000000, 1024, 3, 17, 8, 2,
500, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496128350000000000, 1024, 4, 17, 8,
0, 500, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496128350000000000, 1024, 5, 17, 8,
1, 500, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496128350000000000, 1024, 6, 17, 8,
2, 500, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496133350000000000, 1024, 7, 17,
8, 0, 500, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496133350000000000, 1024, 8, 17,
8, 1, 500, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496133350000000000, 1024, 9, 17,
8, 2, 500, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496138350000000000, 1024, 10,
17, 8, 0, 500, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496138350000000000, 1024, 11,
17, 8, 1, 500, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496138350000000000, 1024, 12,
17, 8, 2, 500, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496143350000000000, 204, 13,
17, 8, 0, 90, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496143350000000000, 204,
14, 17, 8, 1, 90, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496143350000000000, 204,
15, 17, 8, 2, 90, 0, b'16'),
(b'ET', 25, 17, b'92EB', 1496123350000000000, 416,
16, 17, 8, 0, 0, 0, b'16'),
], dtype=numpy.dtype(data_packet_metadata_dtype))
actual_metadata = rt130._data[list(rt130._data.dtype.names)[:-1]]
self.assertTrue((expected_metadata == actual_metadata).all())
expected_dt_payload = numpy.array(
[[255, 255, 250, 144], [0, 0, 11, 209], [0, 0, 7, 82],
[255, 255, 250, 160], [0, 0, 13, 97], [0, 0, 6, 34],
[255, 255, 251, 0], [0, 0, 14, 209], [0, 0, 5, 34],
[255, 255, 251, 144], [0, 0, 16, 33], [0, 0, 4, 34],
[255, 255, 252, 64], [0, 0, 17, 113], [0, 0, 3, 50]],
dtype=numpy.uint8)
def test_rt130_raw_data_file(self): dt_packets_idx = numpy.where(rt130._data['packet_type'] == b'DT')
# The payload of DT packets only include one data point which is padded
# to 4 bytes. The rest of the payload are all empty bytes.
actual_dt_payload = rt130._data['payload'][dt_packets_idx][:, :4]
self.assertTrue((expected_dt_payload == actual_dt_payload).all())
# The EH/ET packets payloads are truncated. This is liable to change in
# the future, so we don't check that the truncated part are all 0s.
expected_ehet_payload_head = numpy.array(
[[84, 114, 105, 103, 103, 101, 114, 32, 84, 105],
[84, 114, 105, 103, 103, 101, 114, 32, 84, 105]],
dtype=numpy.uint8)
ehet_packets_idx = numpy.where(rt130._data['packet_type'] != b'DT')
eh_et_payload = rt130._data['payload'][ehet_packets_idx]
actual_ehet_payload_head = eh_et_payload[:, :10]
self.assertTrue(
(expected_ehet_payload_head == actual_ehet_payload_head).all()
)
def test_rt130_compressed_raw_data_file(self):
file = self.rt130_dir.joinpath('1/000000015_0036EE80') file = self.rt130_dir.joinpath('1/000000015_0036EE80')
rt130 = Reftek130.from_file(file) rt130 = Reftek130.from_file(file)
assert_array_equal( # This file contains more than 900 records, so we have to truncate the
numpy.unique(numpy.sort(rt130._data['packet_type'])), # data to get a more manageable number.
numpy.sort([b'EH', b'DT', b'ET']) truncated_data = rt130._data[[0, 1, 2, 3, 4, -1]]
# Construct the data type of the SOH packet metadata, which is the SOH
# packet data type without the payload. Numpy does not have a
# straightforward way to create a data type based on another, so we
# have to construct this from the raw data.
data_packet_metadata_dtype = numpy.dtype(
[(name, dtype_final)
for name, dtype_initial, converter, dtype_final
in packet.PACKET[:-1]]
)
expected_metadata = numpy.array(
[
(b'EH', 25, 17, b'92EB', 1496102400000000000, 416, 0, 9, 0, 0,
0, 0, b'C0'),
(b'DT', 25, 17, b'92EB', 1496102400015000000, 1024, 1, 9, 0, 0,
446, 0, b'C0'),
(b'DT', 25, 17, b'92EB', 1496102400015000000, 1024, 2, 9, 0, 1,
464, 0, b'C0'),
(b'DT', 25, 17, b'92EB', 1496102400015000000, 1024, 3, 9, 0, 2,
446, 0, b'C0'),
(b'DT', 25, 17, b'92EB', 1496102411165000000, 1024, 4, 9, 0, 0,
446, 0, b'C0'),
(b'ET', 25, 17, b'92EB', 1496102400000000000, 416, 962, 9, 0,
0, 0, 0, b'C0')
],
dtype=numpy.dtype(data_packet_metadata_dtype))
actual_metadata = truncated_data[list(truncated_data.dtype.names)[:-1]]
self.assertTrue((expected_metadata == actual_metadata).all())
expected_dt_payload = numpy.array(
[[255, 255, 240, 226], [0, 0, 2, 215], [255, 255, 237, 88],
[255, 255, 239, 142]],
dtype=numpy.uint8)
dt_packets_idx = numpy.where(truncated_data['packet_type'] == b'DT')
# The payload of DT packets only include one data point which is padded
# to 4 bytes. The rest of the payload are all empty bytes.
actual_dt_payload = truncated_data['payload'][dt_packets_idx][:, :4]
self.assertTrue((expected_dt_payload == actual_dt_payload).all())
# The EH/ET packets payloads are truncated. This is liable to change in
# the future, so we don't check that the truncated part are all 0s.
expected_ehet_payload_head = numpy.array(
[[84, 114, 105, 103, 103, 101, 114, 32, 84, 105],
[84, 114, 105, 103, 103, 101, 114, 32, 84, 105]],
dtype=numpy.uint8)
ehet_packets_idx = numpy.where(truncated_data['packet_type'] != b'DT')
eh_et_payload = truncated_data['payload'][ehet_packets_idx]
actual_ehet_payload_head = eh_et_payload[:, :10]
self.assertTrue(
(expected_ehet_payload_head == actual_ehet_payload_head).all()
) )
self.assertTrue(self.mock_waveform_converter.called)
self.assertFalse(self.mock_soh_converter.called)
def test_non_rt130_file(self): def test_non_rt130_file(self):
with self.subTest('test_file_exist'): with self.subTest('test_file_exist'):
......