Skip to content
Snippets Groups Projects
Commit a5dfb4c4 authored by Kien Le's avatar Kien Le
Browse files

Update tests to prepare for refactor

parent 79e65ea2
No related branches found
No related tags found
1 merge request!280Optimize reading RT130 data
......@@ -5,7 +5,10 @@ from pathlib import Path
import numpy
import obspy.core
from numpy.testing import assert_array_equal
from obspy.io.reftek import packet
from obspy.io.reftek.packet import PACKET_FINAL_DTYPE
from sohstationviewer.model.reftek_data.reftek_reader import soh_packet
from sohstationviewer.model.reftek_data.reftek_reader.core import (
DiscontinuousTrace,
Reftek130,
......@@ -14,9 +17,9 @@ from sohstationviewer.model.reftek_data.reftek_reader.header import \
NotRT130FileError
from sohstationviewer.model.reftek_data.reftek_reader.reftek_reader_helper \
import (
convert_soh_packet_to_obspy_format,
convert_waveform_packet_to_obspy_format,
)
convert_soh_packet_to_obspy_format,
convert_waveform_packet_to_obspy_format,
)
from tests.base_test_case import BaseTestCase
......@@ -73,19 +76,6 @@ class TestReftek130FromFile(BaseTestCase):
self.rt130_dir = self.TEST_DATA_DIR.joinpath(
'RT130-sample/2017149.92EB/2017150/92EB'
)
base_path = 'sohstationviewer.model.reftek_data.reftek_reader.core'
waveform_patcher = patch(
f'{base_path}.convert_waveform_packet_to_obspy_format',
wraps=convert_waveform_packet_to_obspy_format
)
self.addCleanup(waveform_patcher.stop)
self.mock_waveform_converter = waveform_patcher.start()
soh_patcher = patch(
f'{base_path}.convert_soh_packet_to_obspy_format',
wraps=convert_soh_packet_to_obspy_format
)
self.addCleanup(soh_patcher.stop)
self.mock_soh_converter = soh_patcher.start()
def test_rt130_file(self):
with self.subTest('test_soh_file'):
......@@ -100,21 +90,228 @@ class TestReftek130FromFile(BaseTestCase):
def test_rt130_soh_file(self):
file = self.rt130_dir.joinpath('0/000000000_00000000')
rt130 = Reftek130.from_file(file)
# The most common SOH packet type looks to be SH, so we use that as
# the default.
self.assertIn(b'SH', rt130._data['packet_type'])
self.assertTrue(self.mock_soh_converter.called)
self.assertFalse(self.mock_waveform_converter.called)
# Construct the data type of the SOH packet metadata, which is the SOH
# packet data type without the payload. Numpy does not have a
# straightforward way to create a data type based on another, so we
# have to construct this from the raw data.
soh_packet_metadata_dtype = numpy.dtype(
[(name, dtype_final)
for name, dtype_initial, converter, dtype_final
in soh_packet.PACKET[:-1]]
)
expected_metadata = numpy.array([
(b'SH', 25, 17, b'92EB', 1496102400000000000, 1002, 0),
(b'SH', 25, 17, b'92EB', 1496102400000000000, 815, 1),
(b'SC', 25, 17, b'92EB', 1496102400000000000, 640, 2),
(b'OM', 25, 17, b'92EB', 1496102400000000000, 112, 3),
(b'DS', 25, 17, b'92EB', 1496102400000000000, 476, 4),
(b'AD', 25, 17, b'92EB', 1496102400000000000, 56, 5),
(b'CD', 25, 17, b'92EB', 1496102400000000000, 538, 6),
(b'FD', 25, 17, b'92EB', 1496102400000000000, 404, 7),
(b'FD', 25, 17, b'92EB', 1496102400000000000, 780, 8),
(b'FD', 25, 17, b'92EB', 1496102400000000000, 964, 9),
(b'FD', 25, 17, b'92EB', 1496102400000000000, 156, 10),
(b'FD', 25, 17, b'92EB', 1496102400000000000, 76, 11),
(b'FD', 25, 17, b'92EB', 1496102400000000000, 428, 12),
(b'FD', 25, 17, b'92EB', 1496102400000000000, 68, 13),
(b'FD', 25, 17, b'92EB', 1496102400000000000, 84, 14),
(b'FD', 25, 17, b'92EB', 1496102400000000000, 232, 15),
(b'FD', 25, 17, b'92EB', 1496102400000000000, 464, 16),
(b'SH', 25, 17, b'92EB', 1496102400000000000, 1018, 17),
(b'SH', 25, 17, b'92EB', 1496108400000000000, 1010, 18),
(b'SH', 25, 17, b'92EB', 1496113129000000000, 970, 19),
(b'SH', 25, 17, b'92EB', 1496116800000000000, 1022, 20),
(b'SH', 25, 17, b'92EB', 1496120400000000000, 988, 21),
(b'SH', 25, 17, b'92EB', 1496124000000000000, 991, 22),
(b'SH', 25, 17, b'92EB', 1496130005000000000, 1014, 23),
(b'SH', 25, 17, b'92EB', 1496134088000000000, 985, 24),
(b'SH', 25, 17, b'92EB', 1496138400000000000, 991, 25),
(b'SH', 25, 17, b'92EB', 1496142000000000000, 1022, 26),
(b'SH', 25, 17, b'92EB', 1496145600000000000, 988, 27),
(b'SH', 25, 17, b'92EB', 1496151600000000000, 1011, 28),
(b'SH', 25, 17, b'92EB', 1496155344000000000, 1005, 29),
(b'SH', 25, 17, b'92EB', 1496160000000000000, 999, 30),
(b'SH', 25, 17, b'92EB', 1496163600000000000, 1020, 31),
(b'SH', 25, 17, b'92EB', 1496167200000000000, 981, 32),
(b'SH', 25, 17, b'92EB', 1496170800000000000, 1000, 33),
(b'SH', 25, 17, b'92EB', 1496175661000000000, 1013, 34),
(b'SH', 25, 17, b'92EB', 1496180462000000000, 1008, 35),
(b'SH', 25, 17, b'92EB', 1496185200000000000, 868, 36),
], dtype=numpy.dtype(soh_packet_metadata_dtype))
actual_metadata = rt130._data[list(rt130._data.dtype.names)[:-1]]
self.assertTrue((expected_metadata == actual_metadata).all())
# We only look at the head and tail of the payloads because each one
# contains 1000 bytes.
expected_payload_head = numpy.array(
[[32, 32, 32, 32, 32], [32, 32, 32, 32, 32], [50, 53, 73, 66, 68],
[32, 32, 32, 32, 32], [49, 32, 83, 49, 115], [32, 32, 49, 50, 51],
[32, 32, 32, 32, 32], [0, 51, 2, 32, 95], [0, 52, 4, 32, 189],
[0, 53, 5, 32, 235], [0, 65, 8, 0, 33], [0, 66, 2, 0, 13],
[0, 67, 2, 0, 101], [0, 68, 2, 0, 11], [0, 69, 2, 0, 15],
[0, 70, 2, 0, 52], [0, 71, 2, 0, 110], [32, 32, 32, 32, 32],
[32, 32, 32, 32, 32], [32, 32, 32, 32, 32], [32, 32, 32, 32, 32],
[32, 32, 32, 32, 32], [32, 32, 32, 32, 32], [32, 32, 32, 32, 32],
[32, 32, 32, 32, 32], [32, 32, 32, 32, 32], [32, 32, 32, 32, 32],
[32, 32, 32, 32, 32], [32, 32, 32, 32, 32], [32, 32, 32, 32, 32],
[32, 32, 32, 32, 32], [32, 32, 32, 32, 32], [32, 32, 32, 32, 32],
[32, 32, 32, 32, 32], [32, 32, 32, 32, 32], [32, 32, 32, 32, 32],
[32, 32, 32, 32, 32]],
dtype=numpy.uint8)
actual_payload_head = rt130._data['payload'][:, :5]
self.assertTrue((expected_payload_head == actual_payload_head).all())
expected_payload_tail = numpy.array(
[[32, 32, 32, 32, 32], [32, 32, 32, 32, 32], [50, 48, 48, 48, 48],
[50, 48, 48, 48, 48], [50, 48, 48, 48, 48], [50, 48, 48, 48, 48],
[50, 48, 48, 48, 48], [50, 48, 48, 48, 48], [50, 48, 48, 48, 48],
[50, 48, 48, 48, 48], [50, 48, 48, 48, 48], [50, 48, 48, 48, 48],
[50, 48, 48, 48, 48], [50, 48, 48, 48, 48], [50, 48, 48, 48, 48],
[50, 48, 48, 48, 48], [50, 48, 48, 48, 48], [32, 32, 32, 32, 32],
[32, 32, 32, 32, 32], [32, 32, 32, 32, 32], [82, 13, 10, 32, 32],
[32, 32, 32, 32, 32], [32, 32, 32, 32, 32], [32, 32, 32, 32, 32],
[32, 32, 32, 32, 32], [32, 32, 32, 32, 32], [84, 13, 10, 32, 32],
[32, 32, 32, 32, 32], [32, 32, 32, 32, 32], [32, 32, 32, 32, 32],
[32, 32, 32, 32, 32], [10, 32, 32, 32, 32], [32, 32, 32, 32, 32],
[32, 32, 32, 32, 32], [32, 32, 32, 32, 32], [32, 32, 32, 32, 32],
[32, 32, 32, 32, 32]],
dtype=numpy.uint8)
actual_payload_tail = rt130._data['payload'][:, -5:]
self.assertTrue((expected_payload_tail == actual_payload_tail).all())
def test_rt130_uncompressed_raw_data_file(self):
file = self.rt130_dir.joinpath('9/054910000_013EE8A0')
rt130 = Reftek130.from_file(file)
# Construct the data type of the SOH packet metadata, which is the SOH
# packet data type without the payload. Numpy does not have a
# straightforward way to create a data type based on another, so we
# have to construct this from the raw data.
data_packet_metadata_dtype = numpy.dtype(
[(name, dtype_final)
for name, dtype_initial, converter, dtype_final
in packet.PACKET[:-1]]
)
expected_metadata = numpy.array([
(b'EH', 25, 17, b'92EB', 1496123350000000000, 416, 0, 17, 8, 0, 0,
0, b'16'),
(b'DT', 25, 17, b'92EB', 1496123350000000000, 1024, 1, 17, 8, 0,
500, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496123350000000000, 1024, 2, 17, 8, 1,
500, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496123350000000000, 1024, 3, 17, 8, 2,
500, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496128350000000000, 1024, 4, 17, 8,
0, 500, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496128350000000000, 1024, 5, 17, 8,
1, 500, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496128350000000000, 1024, 6, 17, 8,
2, 500, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496133350000000000, 1024, 7, 17,
8, 0, 500, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496133350000000000, 1024, 8, 17,
8, 1, 500, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496133350000000000, 1024, 9, 17,
8, 2, 500, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496138350000000000, 1024, 10,
17, 8, 0, 500, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496138350000000000, 1024, 11,
17, 8, 1, 500, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496138350000000000, 1024, 12,
17, 8, 2, 500, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496143350000000000, 204, 13,
17, 8, 0, 90, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496143350000000000, 204,
14, 17, 8, 1, 90, 0, b'16'),
(b'DT', 25, 17, b'92EB', 1496143350000000000, 204,
15, 17, 8, 2, 90, 0, b'16'),
(b'ET', 25, 17, b'92EB', 1496123350000000000, 416,
16, 17, 8, 0, 0, 0, b'16'),
], dtype=numpy.dtype(data_packet_metadata_dtype))
actual_metadata = rt130._data[list(rt130._data.dtype.names)[:-1]]
self.assertTrue((expected_metadata == actual_metadata).all())
expected_dt_payload = numpy.array(
[[255, 255, 250, 144], [0, 0, 11, 209], [0, 0, 7, 82],
[255, 255, 250, 160], [0, 0, 13, 97], [0, 0, 6, 34],
[255, 255, 251, 0], [0, 0, 14, 209], [0, 0, 5, 34],
[255, 255, 251, 144], [0, 0, 16, 33], [0, 0, 4, 34],
[255, 255, 252, 64], [0, 0, 17, 113], [0, 0, 3, 50]],
dtype=numpy.uint8)
def test_rt130_raw_data_file(self):
dt_packets_idx = numpy.where(rt130._data['packet_type'] == b'DT')
# The payload of DT packets only include one data point which is padded
# to 4 bytes. The rest of the payload are all empty bytes.
actual_dt_payload = rt130._data['payload'][dt_packets_idx][:, :4]
self.assertTrue((expected_dt_payload == actual_dt_payload).all())
# The EH/ET packets payloads are truncated. This is liable to change in
# the future, so we don't check that the truncated part are all 0s.
expected_ehet_payload_head = numpy.array(
[[84, 114, 105, 103, 103, 101, 114, 32, 84, 105],
[84, 114, 105, 103, 103, 101, 114, 32, 84, 105]],
dtype=numpy.uint8)
ehet_packets_idx = numpy.where(rt130._data['packet_type'] != b'DT')
eh_et_payload = rt130._data['payload'][ehet_packets_idx]
actual_ehet_payload_head = eh_et_payload[:, :10]
self.assertTrue(
(expected_ehet_payload_head == actual_ehet_payload_head).all()
)
def test_rt130_compressed_raw_data_file(self):
file = self.rt130_dir.joinpath('1/000000015_0036EE80')
rt130 = Reftek130.from_file(file)
assert_array_equal(
numpy.unique(numpy.sort(rt130._data['packet_type'])),
numpy.sort([b'EH', b'DT', b'ET'])
# This file contains more than 900 records, so we have to truncate the
# data to get a more manageable number.
truncated_data = rt130._data[[0, 1, 2, 3, 4, -1]]
# Construct the data type of the SOH packet metadata, which is the SOH
# packet data type without the payload. Numpy does not have a
# straightforward way to create a data type based on another, so we
# have to construct this from the raw data.
data_packet_metadata_dtype = numpy.dtype(
[(name, dtype_final)
for name, dtype_initial, converter, dtype_final
in packet.PACKET[:-1]]
)
expected_metadata = numpy.array(
[
(b'EH', 25, 17, b'92EB', 1496102400000000000, 416, 0, 9, 0, 0,
0, 0, b'C0'),
(b'DT', 25, 17, b'92EB', 1496102400015000000, 1024, 1, 9, 0, 0,
446, 0, b'C0'),
(b'DT', 25, 17, b'92EB', 1496102400015000000, 1024, 2, 9, 0, 1,
464, 0, b'C0'),
(b'DT', 25, 17, b'92EB', 1496102400015000000, 1024, 3, 9, 0, 2,
446, 0, b'C0'),
(b'DT', 25, 17, b'92EB', 1496102411165000000, 1024, 4, 9, 0, 0,
446, 0, b'C0'),
(b'ET', 25, 17, b'92EB', 1496102400000000000, 416, 962, 9, 0,
0, 0, 0, b'C0')
],
dtype=numpy.dtype(data_packet_metadata_dtype))
actual_metadata = truncated_data[list(truncated_data.dtype.names)[:-1]]
self.assertTrue((expected_metadata == actual_metadata).all())
expected_dt_payload = numpy.array(
[[255, 255, 240, 226], [0, 0, 2, 215], [255, 255, 237, 88],
[255, 255, 239, 142]],
dtype=numpy.uint8)
dt_packets_idx = numpy.where(truncated_data['packet_type'] == b'DT')
# The payload of DT packets only include one data point which is padded
# to 4 bytes. The rest of the payload are all empty bytes.
actual_dt_payload = truncated_data['payload'][dt_packets_idx][:, :4]
self.assertTrue((expected_dt_payload == actual_dt_payload).all())
# The EH/ET packets payloads are truncated. This is liable to change in
# the future, so we don't check that the truncated part are all 0s.
expected_ehet_payload_head = numpy.array(
[[84, 114, 105, 103, 103, 101, 114, 32, 84, 105],
[84, 114, 105, 103, 103, 101, 114, 32, 84, 105]],
dtype=numpy.uint8)
ehet_packets_idx = numpy.where(truncated_data['packet_type'] != b'DT')
eh_et_payload = truncated_data['payload'][ehet_packets_idx]
actual_ehet_payload_head = eh_et_payload[:, :10]
self.assertTrue(
(expected_ehet_payload_head == actual_ehet_payload_head).all()
)
self.assertTrue(self.mock_waveform_converter.called)
self.assertFalse(self.mock_soh_converter.called)
def test_non_rt130_file(self):
with self.subTest('test_file_exist'):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment