diff --git a/sohstationviewer/model/reftek_data/reftek_reader/core.py b/sohstationviewer/model/reftek_data/reftek_reader/core.py index 5406c51e26f5c56c31e9fbda25b0454b1d253000..3f8244b0bda3290c58cf11d51ec093d53d5b8c13 100644 --- a/sohstationviewer/model/reftek_data/reftek_reader/core.py +++ b/sohstationviewer/model/reftek_data/reftek_reader/core.py @@ -1,5 +1,7 @@ from __future__ import annotations +from sohstationviewer.model.reftek_data.reftek_reader import soh_packet + """ Suggested updates to obspy.io.reftek.core: - modify section of the code that deals with upacking data @@ -26,7 +28,10 @@ from sohstationviewer.model.general_data.general_record_helper import Unpacker from sohstationviewer.model.reftek_data.reftek_reader.packet import EHPacket from sohstationviewer.model.reftek_data.reftek_reader.reftek_reader_helper \ - import (read_rt130_file, convert_packet_to_obspy_format) + import ( + read_rt130_file, convert_waveform_packet_to_obspy_format, + convert_soh_packet_to_obspy_format, + ) class DiscontinuousTrace(Trace): @@ -34,6 +39,7 @@ class DiscontinuousTrace(Trace): Extension of obspy.Trace that changes the way time data is handled when reading data using the method from logpeek/qpeek. """ + def __init__(self, *args, times: np.ndarray, **kwargs): super().__init__(*args, **kwargs) self._times = times @@ -73,6 +79,7 @@ class Reftek130(obspy_rt130_core.Reftek130): Child class of obspy.Reftek that reads waveform data similar to logpeek for better performance. """ + @staticmethod def from_file(file: Union[str, Path]) -> Reftek130: """ @@ -85,11 +92,17 @@ class Reftek130(obspy_rt130_core.Reftek130): rt = Reftek130() rt._filename = file packets_in_file = read_rt130_file(file, rt130_unpacker) + first_packet_type = packets_in_file[0].header.packet_type + if first_packet_type in ['EH', 'ET', 'DT']: + packet_converter = convert_waveform_packet_to_obspy_format + final_dtype = PACKET_FINAL_DTYPE + else: + packet_converter = convert_soh_packet_to_obspy_format + final_dtype = soh_packet.PACKET_FINAL_DTYPE converted_packets = [] for packet in packets_in_file: - converted_packets.append( - convert_packet_to_obspy_format(packet, rt130_unpacker)) - rt._data = np.array(converted_packets, dtype=PACKET_FINAL_DTYPE) + converted_packets.append(packet_converter(packet, rt130_unpacker)) + rt._data = np.array(converted_packets, dtype=final_dtype) return rt def to_stream(self, network: str = "", location: str = "", @@ -167,9 +180,9 @@ class Reftek130(obspy_rt130_core.Reftek130): # split into contiguous blocks, i.e. find gaps. packet sequence # was sorted already.. endtimes = ( - packets[:-1]["time"] + - packets[:-1]["number_of_samples"].astype(np.int64) * - delta_nanoseconds) + packets[:-1]["time"] + + packets[:-1]["number_of_samples"].astype(np.int64) * + delta_nanoseconds) # check if next starttime matches seamless to last chunk # 1e-3 seconds == 1e6 nanoseconds is the smallest time # difference reftek130 format can represent, so anything larger @@ -204,7 +217,7 @@ class Reftek130(obspy_rt130_core.Reftek130): npts = sample_data.size tr = DiscontinuousTrace( data=sample_data, header=copy.deepcopy(header), - times=(packets_['time'] / 10**9).round(3) + times=(packets_['time'] / 10 ** 9).round(3) ) # The plotting process needs to know about the number of # points stored in the trace. However, tr.stats use the diff --git a/sohstationviewer/model/reftek_data/reftek_reader/packet_readers.py b/sohstationviewer/model/reftek_data/reftek_reader/packet_readers.py index 9e1b4f75b74b918dd67ad2f54fd3349d781572b7..7a31f7917d108dd21a41aff388d3f7f97c25070d 100644 --- a/sohstationviewer/model/reftek_data/reftek_reader/packet_readers.py +++ b/sohstationviewer/model/reftek_data/reftek_reader/packet_readers.py @@ -1,14 +1,10 @@ from typing import Tuple, Any -import numpy -from obspy.io.reftek.util import bcd - from sohstationviewer.model.general_data.general_record_helper import Unpacker from sohstationviewer.model.reftek_data.reftek_reader.packet import \ eh_et_payload_end_in_packet from sohstationviewer.model.reftek_data.reftek_reader.packets import ( - DTExtendedHeader, - EHETExtendedHeader, SOHExtendedHeader, + DTExtendedHeader, EHETExtendedHeader, ) @@ -113,37 +109,11 @@ def read_eh_et_packet(packet: bytes, unpacker: Unpacker return extended_header, payload -def bcd_16bit_int(_i) -> int: - """ - Reimplement a private function of the same name in obspy. Kept here in case - the private function is removed in a future obspy version. - :param _i: the byte string to convert into a 16-bite integer - :return: a 16-bit integer - """ - _i = bcd(_i) - return _i[0] * 100 + _i[1] - - -def read_soh_packet(packet: bytes, unpacker: Unpacker - ) -> Tuple[SOHExtendedHeader, bytes]: +def read_soh_packet(packet: bytes) -> bytes: """ - Process an SOH packet and get its extended header and poyload. + Process an SOH packet and get its payload. :param packet: the bytes that make up the given SOH packet. - :param unpacker: the unpacker to use to decode the data. - :return: the extended header and payload of the given SOH packet. + :return: the payload of the given SOH packet. """ - - event_number = bcd_16bit_int(numpy.frombuffer(packet[16:18], numpy.uint8)) - data_stream_number = bcd(numpy.frombuffer(packet[18:19], numpy.uint8)) - channel_number = bcd(numpy.frombuffer(packet[19:20], numpy.uint8)) - number_of_samples = bcd_16bit_int( - numpy.frombuffer(packet[20:22], numpy.uint8) - ) - flags = unpacker.unpack('B', packet[22:23])[0] - data_format = packet[23:24].hex().upper() - - extended_header = SOHExtendedHeader(event_number, data_stream_number, - channel_number, number_of_samples, - flags, data_format) - payload = packet[24:] - return extended_header, payload + payload = packet[16:] + return payload diff --git a/sohstationviewer/model/reftek_data/reftek_reader/packets.py b/sohstationviewer/model/reftek_data/reftek_reader/packets.py index dddd4e68a003ecdfdeac9eb844bd6bd6aa500362..db9b64eb863f6a5d4a71438c2e103e641d03ae20 100644 --- a/sohstationviewer/model/reftek_data/reftek_reader/packets.py +++ b/sohstationviewer/model/reftek_data/reftek_reader/packets.py @@ -58,32 +58,10 @@ class EHETPacket: data: bytes -@dataclasses.dataclass -class SOHExtendedHeader: - """ - A collection of dummy data for some information needed so that - core.Reftek130 can understand SOH packets. - - core.Reftek130 focuses on reading waveform data, so it wants information - available in the waveform packets (EH/ET/DT). However, core.Reftek130 also - supports SOH packets, which does not contain the required information. As - a result, we need to store dummy data in its place. - """ - event_number: int - data_stream_number: int - channel_number: int - number_of_samples: int - flags: int - data_format: str - - @dataclasses.dataclass class SOHPacket: """ - The decoded data of an SOH packet. The extended_header field is to ensure - compatibility with dt_packet.DTPacket. SOH packets do not have an - extended header otherwise. + The decoded data of an SOH packet. """ header: PacketHeader - extended_header: SOHExtendedHeader - data: bytes + payload: bytes diff --git a/sohstationviewer/model/reftek_data/reftek_reader/reftek_reader_helper.py b/sohstationviewer/model/reftek_data/reftek_reader/reftek_reader_helper.py index 32b53080b94152e85e0af8a4c46102200d8839ef..1e03dfe3fcf183c1767f626b65b2104c825c2d17 100644 --- a/sohstationviewer/model/reftek_data/reftek_reader/reftek_reader_helper.py +++ b/sohstationviewer/model/reftek_data/reftek_reader/reftek_reader_helper.py @@ -1,5 +1,5 @@ import os -from typing import Any, Dict, Callable, Union, List, Tuple +from typing import Any, Dict, Union, List, Tuple import numpy import numpy as np @@ -13,8 +13,9 @@ from sohstationviewer.model.reftek_data.reftek_reader.packet_readers import ( from sohstationviewer.model.reftek_data.reftek_reader.packets import ( DTPacket, EHETPacket, SOHPacket, ) -from sohstationviewer.model.reftek_data.reftek_reader.header import \ - get_rt130_packet_header +from sohstationviewer.model.reftek_data.reftek_reader.header import ( + get_rt130_packet_header, PacketHeader, +) def packet_reader_placeholder(*args: Any, **kwargs: Any) -> Tuple[Any, Any]: @@ -45,42 +46,45 @@ def read_rt130_file(file_name: str, unpacker: Unpacker packet = rt130_file.read(1024) packet_header = get_rt130_packet_header(packet) - waveform_handlers: Dict[str, Callable] = { - 'EH': read_eh_et_packet, - 'ET': read_eh_et_packet, - 'DT': read_dt_packet, - } - - soh_handlers: Dict[str, Callable] = dict.fromkeys( - ['AD', 'CD', 'DS', 'FD', 'OM', 'SC', 'SH'], - read_soh_packet - ) - - packet_handlers = { - **waveform_handlers, **soh_handlers - } - - packet_handler = packet_handlers.get( - packet_header.packet_type, packet_reader_placeholder - ) - return_val = packet_handler(packet, unpacker) - if packet_header.packet_type == 'DT': - packet_type = DTPacket - elif packet_header.packet_type in ['EH', 'ET']: - packet_type = EHETPacket + if packet_header.packet_type in ['EH', 'ET', 'DT']: + waveform_handlers = { + 'EH': (read_eh_et_packet, EHETPacket), + 'ET': (read_eh_et_packet, EHETPacket), + 'DT': (read_dt_packet, DTPacket), + } + packet_handlers = { + **waveform_handlers + } + packet_handler, packet_type = packet_handlers.get( + packet_header.packet_type, packet_reader_placeholder + ) + return_val = packet_handler(packet, unpacker) + extended_header, data = return_val + current_packet = packet_type(packet_header, extended_header, + data) else: - packet_type = SOHPacket + payload = read_soh_packet(packet) + current_packet = SOHPacket(packet_header, payload) - extended_header, data = return_val - current_packet = packet_type(packet_header, extended_header, data) packets.append(current_packet) return packets -def convert_packet_to_obspy_format(packet: Union[EHETPacket, DTPacket, - SOHPacket], - unpacker: Unpacker) -> Tuple: +def convert_packet_header_to_dict(packet_header: PacketHeader) -> Dict: + converted_header = {'packet_type': packet_header.packet_type, + 'experiment_number': packet_header.experiment_number, + # Obspy only stores the last two digits of the year. + 'year': packet_header.time.year % 100, + 'unit_id': packet_header.unit_id, + 'time': packet_header.time.ns, + 'byte_count': packet_header.byte_count, + 'packet_sequence': packet_header.packet_sequence} + return converted_header + + +def convert_waveform_packet_to_obspy_format( + packet: Union[EHETPacket, DTPacket], unpacker: Unpacker) -> Tuple: """ Convert an RT130 packet into a numpy array of type PACKET_FINAL_DTYPE :param packet: an RT130 packet. @@ -91,15 +95,7 @@ def convert_packet_to_obspy_format(packet: Union[EHETPacket, DTPacket, # We want to convert the packet to a tuple. In order to make it easier to # maintain, we first convert the packet to a dictionary. Then, we grab the # values of the dictionary as tuple to get the final result. - converted_packet = {} - converted_packet['packet_type'] = packet.header.packet_type - converted_packet['experiment_number'] = packet.header.experiment_number - # Obspy only stores the last two digits of the year. - converted_packet['year'] = packet.header.time.year % 100 - converted_packet['unit_id'] = packet.header.unit_id - converted_packet['time'] = packet.header.time.ns - converted_packet['byte_count'] = packet.header.byte_count - converted_packet['packet_sequence'] = packet.header.packet_sequence + converted_packet = convert_packet_header_to_dict(packet.header) converted_packet['event_number'] = packet.extended_header.event_number converted_packet[ 'data_stream_number'] = packet.extended_header.data_stream_number @@ -133,3 +129,10 @@ def convert_packet_to_obspy_format(packet: Union[EHETPacket, DTPacket, else: converted_packet['payload'] = numpy.frombuffer(packet.data, np.uint8) return tuple(converted_packet.values()) + + +def convert_soh_packet_to_obspy_format(packet: SOHPacket, unpacker: Unpacker + ) -> Tuple: + converted_packet = convert_packet_header_to_dict(packet.header) + converted_packet['payload'] = numpy.frombuffer(packet.payload, np.uint8) + return tuple(converted_packet.values()) diff --git a/sohstationviewer/view/plotting/plotting_widget/plotting.py b/sohstationviewer/view/plotting/plotting_widget/plotting.py index 88fb9db2bd595cee073d9b51da06c64961314bad..09d07bf016cd751a6adfcc4c36bd73961b62ccb0 100644 --- a/sohstationviewer/view/plotting/plotting_widget/plotting.py +++ b/sohstationviewer/view/plotting/plotting_widget/plotting.py @@ -7,6 +7,9 @@ from sohstationviewer.controller.util import get_val from sohstationviewer.controller.plotting_data import get_masspos_value_colors from sohstationviewer.view.util.color import clr +from sohstationviewer.view.plotting.plotting_widget.plotting_helper import ( + get_colors_sizes_for_abs_y_from_value_colors +) from sohstationviewer.conf import constants @@ -438,26 +441,14 @@ class Plotting: sample_no_pos=[None, 0.5, None], chan_db_info=chan_db_info, y_list=y_list) for x, y in zip(x_list, y_list): + # plot to have artist pl.Line2D to get pick ax.myPlot = ax.plot(x, y, linestyle='-', linewidth=0.7, color=self.parent.display_color['sub_basic'], picker=True, pickradius=3, zorder=constants.Z_ORDER['LINE'])[0] - colors = [None] * len(y) - sizes = [1.5] * len(y) - for i in range(len(y)): - count = 0 - prev_v = 0 - for v, c in value_colors: - if count < (len(value_colors) - 1): - if prev_v < abs(y[i]) <= v: - colors[i] = clr[c] - break - else: - colors[i] = clr[c] - break - prev_v = v - count += 1 + colors, sizes = get_colors_sizes_for_abs_y_from_value_colors( + y, value_colors) ax.scatter(x, y, marker='s', c=colors, s=sizes, zorder=constants.Z_ORDER['DOT']) ax.x_center = x_list[0] diff --git a/sohstationviewer/view/plotting/plotting_widget/plotting_helper.py b/sohstationviewer/view/plotting/plotting_widget/plotting_helper.py new file mode 100644 index 0000000000000000000000000000000000000000..ba30b9ad2229729dc4188792cb16b76c97656fc7 --- /dev/null +++ b/sohstationviewer/view/plotting/plotting_widget/plotting_helper.py @@ -0,0 +1,32 @@ +from typing import List, Tuple + +from sohstationviewer.view.util.color import clr + + +def get_colors_sizes_for_abs_y_from_value_colors( + y: List[float], + value_colors: List[Tuple[float, str]]) -> \ + Tuple[List[str], List[float]]: + """ + Map each abs(value) in y with the value in value_colors to get the colors + for the indexes corresponding to the y's items. + Sizes is currently similar for all items + :param y: list of data values + :param value_colors: list of color with indexes corresponding to y's items + :return colors: list of colors of markers corresponding to y + :return sizes: list of sizes of markers corresponding to y + """ + colors = [None] * len(y) + sizes = [1.5] * len(y) + for i in range(len(y)): + count = -1 + for v, c in value_colors: + count += 1 + if count <= len(value_colors) - 2: + if abs(y[i]) <= v: + colors[i] = clr[c] + break + else: + # The last value color + colors[i] = clr[c] + return colors, sizes diff --git a/tests/model/reftek_data/reftek_reader/test_core.py b/tests/model/reftek_data/reftek_reader/test_core.py index c485d1fb04bc005525288fc00c32feb7dd24b402..82665da90f7741e4bb35238826f6190569ad206c 100644 --- a/tests/model/reftek_data/reftek_reader/test_core.py +++ b/tests/model/reftek_data/reftek_reader/test_core.py @@ -1,5 +1,6 @@ import os import unittest +from unittest.mock import patch from pathlib import Path import numpy @@ -12,6 +13,11 @@ from sohstationviewer.model.reftek_data.reftek_reader.core import ( ) from sohstationviewer.model.reftek_data.reftek_reader.header import \ NotRT130FileError +from sohstationviewer.model.reftek_data.reftek_reader.reftek_reader_helper \ + import ( + convert_soh_packet_to_obspy_format, + convert_waveform_packet_to_obspy_format, + ) class TestDiscontinuousTrace(unittest.TestCase): @@ -67,11 +73,29 @@ class TestReftek130FromFile(unittest.TestCase): self.rt130_dir = self.TEST_DATA_DIR.joinpath( 'RT130-sample/2017149.92EB/2017150/92EB' ) + base_path = 'sohstationviewer.model.reftek_data.reftek_reader.core' + waveform_patcher = patch( + f'{base_path}.convert_waveform_packet_to_obspy_format', + wraps=convert_waveform_packet_to_obspy_format + ) + self.addCleanup(waveform_patcher.stop) + self.mock_waveform_converter = waveform_patcher.start() + soh_patcher = patch( + f'{base_path}.convert_soh_packet_to_obspy_format', + wraps=convert_soh_packet_to_obspy_format + ) + self.addCleanup(soh_patcher.stop) + self.mock_soh_converter = soh_patcher.start() def test_rt130_file(self): - file = self.rt130_dir.joinpath('0/000000000_00000000') - rt130 = Reftek130.from_file(file) - self.assertIsInstance(rt130, Reftek130) + with self.subTest('test_soh_file'): + file = self.rt130_dir.joinpath('0/000000000_00000000') + rt130 = Reftek130.from_file(file) + self.assertIsInstance(rt130, Reftek130) + with self.subTest('test_waveform_file'): + file = self.rt130_dir.joinpath('1/000000015_0036EE80') + rt130 = Reftek130.from_file(file) + self.assertIsInstance(rt130, Reftek130) def test_rt130_soh_file(self): file = self.rt130_dir.joinpath('0/000000000_00000000') @@ -79,6 +103,8 @@ class TestReftek130FromFile(unittest.TestCase): # The most common SOH packet type looks to be SH, so we use that as # the default. self.assertIn(b'SH', rt130._data['packet_type']) + self.assertTrue(self.mock_soh_converter.called) + self.assertFalse(self.mock_waveform_converter.called) def test_rt130_raw_data_file(self): file = self.rt130_dir.joinpath('1/000000015_0036EE80') @@ -87,6 +113,8 @@ class TestReftek130FromFile(unittest.TestCase): numpy.unique(numpy.sort(rt130._data['packet_type'])), numpy.sort([b'EH', b'DT', b'ET']) ) + self.assertTrue(self.mock_waveform_converter.called) + self.assertFalse(self.mock_soh_converter.called) def test_non_rt130_file(self): with self.subTest('test_file_exist'): diff --git a/tests/model/reftek_data/reftek_reader/test_packet_readers.py b/tests/model/reftek_data/reftek_reader/test_packet_readers.py index 6494b416bc37b5af2d82cbaf50c3b3a750abd23b..461b7da6eb9833f34b2f29c6879bf656cc4fe103 100644 --- a/tests/model/reftek_data/reftek_reader/test_packet_readers.py +++ b/tests/model/reftek_data/reftek_reader/test_packet_readers.py @@ -8,8 +8,6 @@ from sohstationviewer.model.reftek_data.reftek_reader.packet_readers import ( decode_uncompressed, decode_compressed, read_dt_packet, read_eh_et_packet, read_soh_packet, ) -from sohstationviewer.model.reftek_data.reftek_reader.packets import \ - SOHExtendedHeader unpacker = Unpacker('>') @@ -166,19 +164,7 @@ class TestReadEHETPacket(unittest.TestCase): class TestReadSOHPacket(unittest.TestCase): - """ - Test suite for packet_readers.read_soh_packet. We only test that the - function has the correct interface, seeing as the intended purpose of this - method is to be compatible with packet_readers.read_dt_packet and - packet_readers.read_eh_et_packet interface-wise. - """ - def test_correct_interface(self): - packet = b' ' * 1024 - extended_header, payload = read_soh_packet(packet, unpacker) - self.assertIsInstance(extended_header, SOHExtendedHeader) - self.assertIsInstance(payload, bytes) - def test_payload_has_correct_length(self): packet = b' ' * 1024 - extended_header, payload = read_soh_packet(packet, unpacker) - self.assertEqual(len(payload), 1000) + payload = read_soh_packet(packet) + self.assertEqual(len(payload), 1008) diff --git a/tests/model/reftek_data/reftek_reader/test_reftek_helper.py b/tests/model/reftek_data/reftek_reader/test_reftek_helper.py index 7fe0e02e523f935ac98a63b89bfcd5454960cfe6..108a28f2464362533e60c8fb3d449d10f4f05db9 100644 --- a/tests/model/reftek_data/reftek_reader/test_reftek_helper.py +++ b/tests/model/reftek_data/reftek_reader/test_reftek_helper.py @@ -3,20 +3,25 @@ import unittest from pathlib import Path from unittest.mock import patch +from obspy import UTCDateTime from obspy.io.reftek.packet import PACKET_FINAL_DTYPE from sohstationviewer.model.mseed_data.record_reader_helper import Unpacker -from sohstationviewer.model.reftek_data.reftek_reader.header import \ - NotRT130FileError +from sohstationviewer.model.reftek_data.reftek_reader import soh_packet +from sohstationviewer.model.reftek_data.reftek_reader.header import ( + NotRT130FileError, PacketHeader, +) from sohstationviewer.model.reftek_data.reftek_reader.packet_readers import ( read_eh_et_packet, read_dt_packet, read_soh_packet, ) from sohstationviewer.model.reftek_data.reftek_reader.packets import ( - SOHPacket, - EHETPacket, DTPacket, + SOHPacket, EHETPacket, DTPacket, ) from sohstationviewer.model.reftek_data.reftek_reader.reftek_reader_helper \ - import (read_rt130_file, convert_packet_to_obspy_format) + import ( + read_rt130_file, convert_waveform_packet_to_obspy_format, + convert_packet_header_to_dict, convert_soh_packet_to_obspy_format, + ) unpacker = Unpacker('>') @@ -61,9 +66,9 @@ class TestReadRT130File(unittest.TestCase): self.assertTrue( all(isinstance(packet, SOHPacket) for packet in packets) ) - self.assertTrue(self.mock_read_soh.called) self.assertFalse(self.mock_read_dt.called) self.assertFalse(self.mock_read_eh_et.called) + self.assertTrue(self.mock_read_soh.called) def test_rt130_raw_data_file(self): file = self.rt130_dir.joinpath('1/000000015_0036EE80') @@ -72,15 +77,15 @@ class TestReadRT130File(unittest.TestCase): isinstance(packet, EHETPacket) or isinstance(packet, DTPacket) for packet in packets) ) - self.assertFalse(self.mock_read_soh.called) self.assertTrue(self.mock_read_dt.called) self.assertTrue(self.mock_read_eh_et.called) + self.assertFalse(self.mock_read_soh.called) def test_non_rt130_file(self): with self.subTest('test_file_exist'): file = self.TEST_DATA_DIR.joinpath( 'Q330-sample/day_vols_AX08/AX08.XA..HHE.2021.186' - ) + ).as_posix() with self.assertRaises(NotRT130FileError): read_rt130_file(file, unpacker) @@ -90,18 +95,66 @@ class TestReadRT130File(unittest.TestCase): read_rt130_file(file, unpacker) -class TestConvertPacketToObspyFormat(unittest.TestCase): +class TestConvertPacketHeaderToDict(unittest.TestCase): + def setUp(self): + self.header = PacketHeader('EH', 1000, '98E1', + UTCDateTime(year=1951, month=1, day=1), + 1024, 1) + + def test_all_fields_are_in_the_converted_dict(self): + expected_fields = ['packet_type', 'experiment_number', 'year', + 'unit_id', 'time', 'byte_count', 'packet_sequence'] + actual_field = list(convert_packet_header_to_dict(self.header).keys()) + self.assertEqual(expected_fields, actual_field) + + def test_year_is_converted_correctly(self): + with self.subTest('test_year_has_fewer_than_two_digits'): + self.header.time = UTCDateTime(year=51, month=1, day=1) + expected_year = 51 + actual_year = convert_packet_header_to_dict(self.header)['year'] + self.assertEqual(actual_year, expected_year) + with self.subTest('test_year_has_more_than_two_digits'): + self.header.time = UTCDateTime(year=2352, month=1, day=1) + expected_year = 52 + actual_year = convert_packet_header_to_dict(self.header)['year'] + self.assertEqual(actual_year, expected_year) + with self.subTest('test_year_has_two_digits'): + self.header.time = UTCDateTime(year=9, month=1, day=1) + expected_year = 9 + actual_year = convert_packet_header_to_dict(self.header)['year'] + self.assertEqual(actual_year, expected_year) + + +class TestConvertWaveformPacketToObspyFormat(unittest.TestCase): def setUp(self) -> None: TEST_DATA_DIR = Path(os.getcwd()).joinpath('tests/test_data') rt130_dir = TEST_DATA_DIR.joinpath( 'RT130-sample/2017149.92EB/2017150/92EB' ) - file = rt130_dir.joinpath('1/000000015_0036EE80') + file = rt130_dir.joinpath('1/000000015_0036EE80').as_posix() self.packet = read_rt130_file(file, unpacker)[0] def test_all_needed_fields_are_available(self): - converted_packet = convert_packet_to_obspy_format( + converted_packet = convert_waveform_packet_to_obspy_format( self.packet, unpacker ) self.assertEqual(len(converted_packet), len(PACKET_FINAL_DTYPE)) + + +class TestConvertSOHPacketToObspyFormat(unittest.TestCase): + def setUp(self) -> None: + TEST_DATA_DIR = Path(os.getcwd()).joinpath('tests/test_data') + rt130_dir = TEST_DATA_DIR.joinpath( + 'RT130-sample/2017149.92EB/2017150/92EB' + ) + file = rt130_dir.joinpath('0/000000000_00000000').as_posix() + self.packet = read_rt130_file(file, unpacker)[0] + + def test_all_needed_fields_are_available(self): + converted_packet = convert_soh_packet_to_obspy_format( + self.packet, unpacker + ) + + self.assertEqual(len(converted_packet), + len(soh_packet.PACKET_FINAL_DTYPE)) diff --git a/tests/view/plotting/plotting_widget/test_plotting_helper.py b/tests/view/plotting/plotting_widget/test_plotting_helper.py new file mode 100644 index 0000000000000000000000000000000000000000..66570fd7c44172f02bfca4925ce3b0277e1dedaf --- /dev/null +++ b/tests/view/plotting/plotting_widget/test_plotting_helper.py @@ -0,0 +1,26 @@ +from unittest import TestCase + +from sohstationviewer.view.plotting.plotting_widget.plotting_helper import ( + get_colors_sizes_for_abs_y_from_value_colors +) +from sohstationviewer.view.util.color import clr + + +class TestGetColorsSizesForAbsYFromValueColors(TestCase): + def test_get_colors_sizes_for_abs_y_from_value_colors(self): + y = [0, 0.5, 1., 2., 3., 4., 5., 7., 7.1, + -0, -0.5, -1., -2., -3., -4., -5., -7., -7.1] + value_colors = [(0.5, 'C'), (2.0, 'G'), (4.0, 'Y'), + (7.0, 'R'), (7.0, 'M')] + colors, sizes = get_colors_sizes_for_abs_y_from_value_colors( + y, value_colors) + + self.assertEqual( + colors, + [clr['C'], clr['C'], clr['G'], clr['G'], clr['Y'], clr['Y'], + clr['R'], clr['R'], clr['M'], + clr['C'], clr['C'], clr['G'], clr['G'], clr['Y'], clr['Y'], + clr['R'], clr['R'], clr['M'] + ] + ) + self.assertEqual(sizes, [1.5] * len(y))