Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
Commits on Source (20)
Showing
with 129 additions and 482 deletions
......@@ -53,4 +53,17 @@ python3.11:
- passoft
stage: Build Env and Test
script:
- python -m unittest
- python -m unittest
check-backup-db:
tags:
- passoft
stage: Build Env and Test
before_script:
- ''
script:
- |-
if ! cmp sohstationviewer/database/soh.db sohstationviewer/database/backup.db; then
echo "Backup database (sohstationviewer/database/backup.db) is different from working database (sohstationviewer/database/soh.db). Please copy the working database to the backup database."
exit 1
fi
\ No newline at end of file
include HISTORY.rst
include README.rst
include sohstationviewer/database/soh.db
include sohstationviewer/database/backup.db
graft sohstationviewer/documentation
graft sohstationviewer/images
......
......@@ -24,6 +24,7 @@ test:
source_files:
- tests
- sohstationviewer/database/soh.db
- sohstationviewer/database/backup.db
commands:
- python -m unittest
......
import os
from pathlib import Path
from typing import Literal
# The path to the package's root
ROOT_PATH = Path(os.path.abspath(__file__)).parent.parent
ROOT_PATH = Path(__file__).resolve().parent.parent
# The current version of SOHStationViewer
SOFTWARE_VERSION = '2024.2.1.0'
......@@ -42,7 +41,7 @@ SEC_DAY = 86400.0
SEC_5M = 300
# total of 5 minutes in a day
NO_5M_DAY = 288
NUMBER_OF_5M_IN_DAY = 288
# total of 5 minutes in an hour
NO_5M_1H = int(60 * 60 / 300)
......@@ -87,7 +86,8 @@ BASIC_HEIGHT_IN = 0.15
# timestamp.
DEFAULT_SIZE_FACTOR_TO_NORMALIZE = 0.02
# vertical margin
VSPACE_SIZE_FACTOR = 3
TOP_SPACE_SIZE_FACTOR = 3
BOT_SPACE_SIZE_FACTOR = 6 # to avoid hidden time bar partially
# space from previous bottom to time bar bottom
TIME_BAR_SIZE_FACTOR = 2
# space from time bar bottom to the bottom of gap bar
......
......@@ -13,6 +13,7 @@ Third letter (Orientation Code): ZNE123
dbConf = {
'dbpath': 'database/soh.db',
'backup_db_path': 'database/backup.db',
'seisRE': re.compile(f'[{WF_1ST}][{WF_2ND}][{WF_3RD}]'),
# key is last char of chan
'seisLabel': {'1': 'NS', '2': 'EW', 'N': 'NS', 'E': 'EW', 'Z': 'V'},
......
......@@ -146,9 +146,11 @@ def get_day_ticks() -> Tuple[List[int], List[int], List[str]]:
major_time_labels: 2 digit numbers of every 4 hours in a day
"""
times = list(range(const.NO_5M_1H, const.NO_5M_DAY, const.NO_5M_1H))
times = list(range(const.NO_5M_1H,
const.NUMBER_OF_5M_IN_DAY,
const.NO_5M_1H))
major_times = list(range(4 * const.NO_5M_1H,
const.NO_5M_DAY,
const.NUMBER_OF_5M_IN_DAY,
4 * const.NO_5M_1H))
major_time_labels = ["%02d" % int(t / const.NO_5M_1H) for t in major_times]
return times, major_times, major_time_labels
......
......@@ -236,11 +236,6 @@ def detect_data_type(list_of_dir: List[Union[str, Path]]) -> Optional[str]:
"detected.\n\nPlease have only data that related to"
" each other.")
raise Exception(msg)
elif data_type_list == ['Unknown']:
msg = ("There are no known data detected.\n\n"
"Do you want to cancel to select different folder(s)\n"
"Or continue to read any available mseed file?")
raise Exception(msg)
return data_type_list[0], is_multiplex_list[0]
......
......@@ -181,6 +181,7 @@ def get_val(text: str) -> float:
and remove '<' and '=' characters
"""
text = text.replace('=', '').replace('<', '')
# Retrieve a number (with sign if available) from a piece of text.
re_val = '^\+?\-?[0-9]+\.?[0-9]?' # noqa: W605
return float(re.search(re_val, text).group())
......
File added
from typing import Dict
from typing import Dict, List
from sohstationviewer.conf.constants import ColorMode
from sohstationviewer.database.process_db import execute_db_dict, execute_db
......@@ -193,3 +193,21 @@ def get_param_info(param: str):
sql = f"SELECT * FROM Parameters WHERE param='{param}'"
param_info = execute_db_dict(sql)[0]
return param_info
def get_data_types(include_default: bool = True, include_rt130: bool = True) \
-> List[str]:
"""
Get list of data types from DB.
:param include_default: flag to include Default type in the list or not
:param include_rt130: flag to include RT130 type in the list or not
:return: list of data types
"""
data_type_rows = execute_db(
'SELECT * FROM DataTypes ORDER BY dataType ASC')
data_type_list = [d[0] for d in data_type_rows]
if not include_default:
data_type_list = [d for d in data_type_list if d != 'Default']
if not include_rt130:
data_type_list = [d for d in data_type_list if d != 'RT130']
return data_type_list
......@@ -26,7 +26,7 @@ Note: log_data for RT130's dataset has only one channel: SOH
'chan_db_info' (dict): the plotting parameters got from database
for this channel - dict,
'ax': axes to draw the channel in PlottingWidget
'ax_wf' (matplotlib.axes.Axes): axes to draw the channel in WaveformWidget
'ax_wf': axes to draw the channel in WaveformWidget
'visible': flag to show or hide channel
}
}
......
......@@ -11,6 +11,8 @@ from obspy.core import Stream
from sohstationviewer.conf import constants
from sohstationviewer.conf.constants import SOFTWARE_VERSION
from sohstationviewer.model.reftek_data.reftek_reader.core import \
DecimatedReftek130
from sohstationviewer.model.reftek_data.reftek_reader.log_file_reader import (
LogFileReader, process_mass_poss_line, LogReadError,
)
......@@ -297,12 +299,6 @@ class RT130(GeneralData):
raise ProcessingDataError(msg)
selected_data_set_id = data_set_ids[0]
if not self.on_unittest and len(data_set_ids) > 1:
msg = ("There are more than one data set IDs in the given data.\n"
"Please select one to display")
self.pause_signal.emit(msg, data_set_ids)
self.pause()
selected_data_set_id = data_set_ids[self.pause_response]
self.track_info(f'Select data set ID {selected_data_set_id}',
LogType.INFO)
......@@ -318,7 +314,7 @@ class RT130(GeneralData):
:param path2file: absolute path to file
"""
try:
rt130 = core.Reftek130.from_file(path2file)
rt130 = DecimatedReftek130.from_file(path2file)
except Exception:
fmt = traceback.format_exc()
self.track_info(f"Skip file {path2file} can't be read "
......@@ -329,22 +325,20 @@ class RT130(GeneralData):
nbr_packet_type = dict(zip(unique, counts))
if b"SH" in nbr_packet_type:
self.read_sh_packet(path2file)
self.read_sh_packet(rt130)
if b"EH" in nbr_packet_type or b"ET" in nbr_packet_type:
self.read_eh_or_et_packet(rt130)
return True
def read_sh_packet(self, path2file: Path) -> None:
def read_sh_packet(self, rt130: DecimatedReftek130) -> None:
"""
Use soh_packet library to read file with SH packet for soh data
to append tuple (time, log string) to
log_data[self.cur_data_set_id][SOH]
:param path2file: absolute path to file
:param rt130: RT130 object of an SOH file.
"""
with open(path2file, "rb") as fh:
str = fh.read()
data = soh_packet._initial_unpack_packets_soh(str)
data = rt130._data
for ind, d in enumerate(data):
cur_data_set_id = (d['unit_id'].decode(),
f"{d['experiment_number']}")
......@@ -363,7 +357,7 @@ class RT130(GeneralData):
self.log_data[cur_data_set_id]['SOH'] = []
self.log_data[cur_data_set_id]['SOH'].append((d['time'], logs))
def read_eh_or_et_packet(self, rt130: core.Reftek130) -> None:
def read_eh_or_et_packet(self, rt130: DecimatedReftek130) -> None:
"""
Files that contents EH or ET packets are data stream (DS and
mass position (DS 9) files.
......@@ -394,7 +388,7 @@ class RT130(GeneralData):
self.get_mass_pos_data_and_waveform_data(
rt130, data_stream, cur_data_set_id)
def get_ehet_in_log_data(self, rt130: core.Reftek130,
def get_ehet_in_log_data(self, rt130: DecimatedReftek130,
cur_data_set_id: Tuple[str, str]) -> None:
"""
Read event header info to add to log_data['EHET']
......@@ -404,7 +398,7 @@ class RT130(GeneralData):
"""
ind_ehet = [ind for ind, val in
enumerate(rt130._data["packet_type"])
if val in [b"EH"]] # only need event header
if val in [b"EH", b"ET"]]
nbr_dt_samples = sum(
[rt130._data[ind]["number_of_samples"]
for ind in range(0, len(rt130._data))
......@@ -419,7 +413,7 @@ class RT130(GeneralData):
(d['time'], logs))
def get_mass_pos_data_and_waveform_data(
self, rt130: core.Reftek130, data_stream: int,
self, rt130: DecimatedReftek130, data_stream: int,
cur_data_set_id: Tuple[str, str]) -> None:
"""
Get mass_pos_data for the current data_set_id in DS 9.
......
......@@ -6,12 +6,12 @@ from obspy.core import Stream
from obspy import UTCDateTime
from sohstationviewer.model.reftek_data.reftek_reader.core import (
DiscontinuousTrace, Reftek130)
DiscontinuousTrace, DecimatedReftek130)
from sohstationviewer.model.general_data.general_data_helper import squash_gaps
def check_reftek_header(
rt130: Reftek130, cur_data_set_id: Tuple[str, str],
rt130: DecimatedReftek130, cur_data_set_id: Tuple[str, str],
starttime: UTCDateTime, endtime: UTCDateTime,
stream_header_by_data_set_id_chan: Dict[str, Dict[str, Stream]],
cur_data_dict: Dict, cur_data_time: List[float],
......@@ -36,7 +36,7 @@ def check_reftek_header(
:param include_mp123zne: if mass position channels 1,2,3 are requested
:param include_mp456uvw: if mass position channels 4,5,6 are requested
"""
stream = Reftek130.to_stream(
stream = DecimatedReftek130.to_stream(
rt130,
include_mp123=include_mp123zne,
include_mp456=include_mp456uvw,
......@@ -76,7 +76,7 @@ def check_reftek_header(
def read_reftek_stream(
rt130: Reftek130,
rt130: DecimatedReftek130,
avail_trace_indexes: List[int], cur_data_dict: Dict,
include_mp123zne: bool, include_mp456uvw: bool):
"""
......@@ -91,7 +91,7 @@ def read_reftek_stream(
:param include_mp456uvw: if mass position channels 4,5,6 are requested
"""
# TODO: rewrite reftek to read stream with start and end time
stream = Reftek130.to_stream(
stream = DecimatedReftek130.to_stream(
rt130,
include_mp123=include_mp123zne,
include_mp456=include_mp456uvw,
......
from __future__ import annotations
import numpy
from numpy._typing import NDArray
from sohstationviewer.model.reftek_data.reftek_reader import soh_packet
"""
......@@ -22,17 +25,9 @@ import numpy as np
from obspy import Trace, Stream, UTCDateTime
from obspy.core.util.obspy_types import ObsPyException
from obspy.io.reftek.packet import PACKET_FINAL_DTYPE
from sohstationviewer.model.general_data.general_record_helper import Unpacker
from sohstationviewer.model.reftek_data.reftek_reader.packet import EHPacket
from sohstationviewer.model.reftek_data.reftek_reader.reftek_reader_helper \
import (
read_rt130_file, convert_waveform_packet_to_obspy_format,
convert_soh_packet_to_obspy_format,
)
class DiscontinuousTrace(Trace):
"""
......@@ -74,35 +69,80 @@ class Reftek130Exception(ObsPyException):
pass
class Reftek130(obspy_rt130_core.Reftek130):
class DecimatedReftek130(obspy_rt130_core.Reftek130):
"""
Child class of obspy.Reftek that reads waveform data similar to logpeek for
better performance.
"""
@staticmethod
def from_file(file: Union[str, Path]) -> Reftek130:
def _decimate_waveform_data(
data: NDArray[obspy_rt130_core.PACKET_FINAL_DTYPE]):
"""
Decimate the waveform data of an RT130 file in-place. Works by grabbing
only one data point from each packet.
:param data: the waveform data of an RT130 file
"""
first_packet_payload = data['payload'][0].copy()
last_packet_payload = data['payload'][-1].copy()
if data[0]['data_format'] == b'16':
data_points = data['payload'][:, :2]
# The data is stored in a big-endian order.
# Merge the two bytes in each data point into a two-byte number.
data_points = data_points.view(np.dtype('>i2'))
# Sign extend the two-byte numbers into four-byte numbers. This is
# done to match the byte length of other data formats.
data_points = data_points.astype(np.dtype('>i4'))
# Convert each four-byte number into four bytes to match Obspy's
# payload format.
data_points = data_points.view('>u1')
else:
if data[0]['data_format'] in [b'C0', b'C1', b'C2', b'C3']:
# The first 40 bytes in the payload are filler, so we skip past
# them. Then, we grab the last data point in the packet, which
# is stored in bytes 8 to 12 (exclusive) of the actual data. We
# could also have used the first data point in the packet,
# which is stored in bytes 4 to 8 (exclusive) of the actual
# data, but experiments have indicated that using the last data
# point leads to a better-looking plot.
data_points = data['payload'][:, 48:52].copy()
else:
data_points = data['payload'][:, :4].copy()
data['payload'][:, :4] = data_points
data['payload'][:, 4:] = 0
if data['packet_type'][0] == b'EH':
data['payload'][0] = first_packet_payload
if data['packet_type'][-1] == b'ET':
data['payload'][-1] = last_packet_payload
@staticmethod
def from_file(file: Union[str, Path]) -> DecimatedReftek130:
"""
Read data from an RT130 file and save it in a Reftek130 object.
:param file: the RT130 file to read
:return: a Reftek130 object that stores the data in file
"""
# RT130 data is all big-endian
rt130_unpacker = Unpacker('>')
rt = Reftek130()
rt = DecimatedReftek130()
rt._filename = file
packets_in_file = read_rt130_file(file, rt130_unpacker)
first_packet_type = packets_in_file[0].header.packet_type
if first_packet_type in ['EH', 'ET', 'DT']:
packet_converter = convert_waveform_packet_to_obspy_format
final_dtype = PACKET_FINAL_DTYPE
# SOH and waveform packets are read into different formats, so we have
# to handle them separately. The two formats are encoded in the PACKET
# constants in obspy.io.reftek.packet (waveform packets) and
# model.reftek_data.reftek_reader.soh_packet (SOH packets).
infile = open(file, 'rb')
# Because SOH and waveform data are stored in separate files, we
# only need to look at the first packet type
first_packet_type = infile.read(2)
infile.seek(0)
if first_packet_type in [b'EH', b'ET', b'DT']:
data = obspy_rt130_core.Reftek130.from_file(file)._data
DecimatedReftek130._decimate_waveform_data(data)
else:
packet_converter = convert_soh_packet_to_obspy_format
final_dtype = soh_packet.PACKET_FINAL_DTYPE
converted_packets = []
for packet in packets_in_file:
converted_packets.append(packet_converter(packet, rt130_unpacker))
rt._data = np.array(converted_packets, dtype=final_dtype)
data = soh_packet._initial_unpack_packets_soh(infile.read())
rt._data = data
return rt
def to_stream(self, network: str = "", location: str = "",
......
import dataclasses
from obspy import UTCDateTime
class NotRT130FileError(Exception):
"""
Error to raise when there is a problem with parsing RT130 data.
"""
pass
@dataclasses.dataclass
class PacketHeader:
"""
The decoded header of an RT130 packet.
"""
packet_type: str
experiment_number: int
unit_id: str
time: UTCDateTime
byte_count: int
packet_sequence: int
def parse_rt130_time(year: int, time_bytes: bytes) -> UTCDateTime:
"""
Convert BCD-encoded RT130 time into UTCDateTime.
:param year: the year of the time. RT130's header store the year separate
from the time, so we have to pass it as an argument.
:param time_bytes: the BCD-encoded time.
:return: an UTCDateTime object that stores the decoded time.
"""
time_string = time_bytes.hex()
# The time string has the format of DDDHHMMSSTTT, where
# D = day of year
# H = hour
# M = minute
# S = second
# T = millisecond
day_of_year, hour, minute, second, millisecond = (
int(time_string[0:3]),
int(time_string[3:5]),
int(time_string[5:7]),
int(time_string[7:9]),
int(time_string[9:12])
)
# RT130 only stores the last two digits of the year. Because the
# documentation for RT130 does not define a way to retrieve the full year,
# we use Obspy's method. Accordingly, we convert 0-49 to 2000-2049 and
# 50-99 to 1950-1999.
if 0 <= year <= 49:
year += 2000
elif 50 <= year <= 99:
year += 1900
converted_time = UTCDateTime(year=year, julday=day_of_year, hour=hour,
minute=minute, second=second,
microsecond=millisecond * 1000)
return converted_time
def get_rt130_packet_header(rt130_packet: bytes) -> PacketHeader:
"""
Get the packet header stored in the first 16 bits of an RT130 packet.
:param rt130_packet: the RT130 packet to process
:return: a PacketHeader object containing the header of rt130_packet
"""
try:
# Because RT130 data is always big-endian, it is more convenient to
# use str.decode() than the unpacker.
packet_type = rt130_packet[:2].decode('ASCII')
except UnicodeError:
print('Cannot decode packet type.')
print('The given file does not appear to be a valid RT130 file.')
raise NotRT130FileError
valid_packet_types = ['AD', 'CD', 'DS', 'DT', 'EH', 'ET', 'OM', 'SH', 'SC',
'FD']
if packet_type not in valid_packet_types:
print(f'Invalid packet type found: {packet_type}')
print('The given file does not appear to be a valid RT130 file.')
raise NotRT130FileError
experiment_number = int(rt130_packet[2:3].hex())
year = int(rt130_packet[3:4].hex())
# A call to str.upper() is needed because bytes.hex() makes any
# hexadecimal letter (i.e. ABCDEF) lowercase, while we want them to be
# uppercase for display purpose.
unit_id = rt130_packet[4:6].hex().upper()
time_bytes = rt130_packet[6:12]
packet_time = parse_rt130_time(year, time_bytes)
byte_count = int(rt130_packet[12:14].hex())
packet_sequence = int(rt130_packet[14:16].hex())
return PacketHeader(packet_type, experiment_number, unit_id, packet_time,
byte_count, packet_sequence)
from typing import Tuple, Any
from sohstationviewer.model.general_data.general_record_helper import Unpacker
from sohstationviewer.model.reftek_data.reftek_reader.packet import \
eh_et_payload_end_in_packet
from sohstationviewer.model.reftek_data.reftek_reader.packets import (
DTExtendedHeader, EHETExtendedHeader,
)
def decode_uncompressed(packet: bytes, data_format: str, unpacker: Unpacker
) -> int:
"""
Grab the first data point in a packet that contains uncompressed RT130 data
(aka packets with data format 16, 32, or 33_.
:param packet: the bytes that make up the given packet.
:param data_format: the data format of the given packet, can be one of 16,
32, or 33.
:param unpacker: the unpacker to use to decode the data.
:return: the first data point in the given packet
"""
data = packet[24:]
# For uncompressed RT130 data, the data format is also the size of a data
# point in bit (aside from data format 33, which uses the same size as data
# format 32).
point_size = int(data_format)
if point_size == 33:
point_size = 32
# Convert the size of a data point to byte because the data is stored
# as a byte string.
point_size = point_size // 8
# struct.unpack uses different format characters for different point sizes.
format_char = {2: 'h', 4: 'i'}[point_size]
first_data_point = data[:point_size]
return unpacker.unpack(f'{format_char}', first_data_point)[0]
def decode_compressed(packet: bytes, data_format: str, unpacker: Unpacker
) -> int:
"""
Grab the stop point in a packet that contains compressed RT130 data (aka
packets with data format C0, C1, C2, or C3).
We get the stop point in this case because that is what logpeek did. It
also looks a lot better than using the start point, so that is a plus.
:param packet: the bytes that make up the given packet.
:param data_format: the data format of the given packet, can be one of C0,
C1, C2, or C3. Exist only to have the same signature as
decode_uncompressed
:param unpacker: the unpacker to use to decode the data.
:return: the first data point in the given packet
"""
# The data in a compressed data packet starts at byte 64, with bytes
# between byte 24 and 64 being fillers.
data = packet[64:]
first_data_point = data[8:12]
return unpacker.unpack('i', first_data_point)[0]
def read_dt_packet(packet: bytes, unpacker: Unpacker
) -> Tuple[DTExtendedHeader, Any]:
"""
Process a DT packet and get its extended header and first data point.
:param packet: the bytes that make up the given DT packet.
:param unpacker: the unpacker to use to decode the data.
:return: the extended header and first data point of the given DT packet.
"""
decoders = {
**dict.fromkeys(['16', '32', '33'], decode_uncompressed),
**dict.fromkeys(['C0', 'C1', 'C2', 'C3'], decode_compressed)
}
event_number = int(packet[16:18].hex())
data_stream_number = int(packet[18:19].hex())
channel_number = int(packet[19:20].hex())
number_of_samples = int(packet[20:22].hex())
flags = unpacker.unpack('B', packet[22:23])[0]
data_format = packet[23:24].hex().upper()
extended_header = DTExtendedHeader(event_number, data_stream_number,
channel_number, number_of_samples,
flags, data_format)
first_data_point = decoders[data_format](packet, data_format, unpacker)
return extended_header, first_data_point
def read_eh_et_packet(packet: bytes, unpacker: Unpacker
) -> Tuple[EHETExtendedHeader, bytes]:
"""
Process an EH/ET packet and get its extended header and required part of
the payload.
:param packet: the bytes that make up the given EH/ET packet.
:param unpacker: the unpacker to use to decode the data.
:return: the extended header and truncated payload of the given EH/ET
packet.
"""
event_number = int(packet[16:18].hex())
data_stream_number = int(packet[18:19].hex())
flags = unpacker.unpack('B', packet[22:23])[0]
data_format = packet[23:24].hex().upper()
extended_header = EHETExtendedHeader(event_number, data_stream_number,
flags, data_format)
# The largest possible data point has a size of 4 bytes, so we need to
# grab at least data.
payload = packet[24:eh_et_payload_end_in_packet]
return extended_header, payload
def read_soh_packet(packet: bytes) -> bytes:
"""
Process an SOH packet and get its payload.
:param packet: the bytes that make up the given SOH packet.
:return: the payload of the given SOH packet.
"""
payload = packet[16:]
return payload
import dataclasses
from sohstationviewer.model.reftek_data.reftek_reader.header import (
PacketHeader)
@dataclasses.dataclass
class DTExtendedHeader:
"""
The extended header of a DT packet.
"""
event_number: int
data_stream_number: int
channel_number: int
number_of_samples: int
flags: int
data_format: str
@dataclasses.dataclass
class DTPacket:
"""
The decoded data of a DT packet.
"""
header: PacketHeader
extended_header: DTExtendedHeader
data: int
@dataclasses.dataclass
class EHETExtendedHeader:
"""
A collection of some useful information about an EH/ET packet. Technically,
EH/ET packets do not have extended headers. We name this class what it is
due to the way obspy.Reftek130 (and consequently, core.Reftek130) stores
the data of processed packets. For more information, refer to
Reftek130._data.
"""
event_number: int
data_stream_number: int
flags: int
data_format: str
def __post_init__(self):
self.channel_number = 0
self.number_of_samples = 0
@dataclasses.dataclass
class EHETPacket:
"""
The decoded data of an EH/ET packet. The extended_header field is to ensure
compatibility with dt_packet.DTPacket. EH/ET packets do not have an
extended header otherwise.
"""
header: PacketHeader
extended_header: EHETExtendedHeader
data: bytes
@dataclasses.dataclass
class SOHPacket:
"""
The decoded data of an SOH packet.
"""
header: PacketHeader
payload: bytes
import os
from typing import Any, Dict, Union, List, Tuple
import numpy
import numpy as np
from sohstationviewer.model.general_data.general_record_helper import Unpacker
from sohstationviewer.model.reftek_data.reftek_reader.packet import \
eh_et_payload_end_in_packet
from sohstationviewer.model.reftek_data.reftek_reader.packet_readers import (
read_dt_packet, read_eh_et_packet, read_soh_packet,
)
from sohstationviewer.model.reftek_data.reftek_reader.packets import (
DTPacket, EHETPacket, SOHPacket,
)
from sohstationviewer.model.reftek_data.reftek_reader.header import (
get_rt130_packet_header, PacketHeader,
)
def packet_reader_placeholder(*args: Any, **kwargs: Any) -> Tuple[Any, Any]:
"""
Placeholder function to be used in place of an RT130 packet reader
function. This function immediately returns None.
"""
return None, None
def read_rt130_file(file_name: str, unpacker: Unpacker
) -> List[Union[EHETPacket, DTPacket, SOHPacket]]:
"""
Read an RT130 file and stores the data in a list of RT130 packets.
:param file_name: the name of the file to read.
:param unpacker: the decoder used to decode the data.
:return: a list of processed RT130 packets.
"""
# RT130 data looks to be all big-endian (logpeek assumes this, and it has
# been working pretty well), so we don't have to do any endianness check.
packets = []
with open(file_name, 'rb') as rt130_file:
# Each packet is exactly 1024 bytes, so we can rely on that to know
# when we have finished reading.
for i in range(os.path.getsize(file_name) // 1024):
packet = rt130_file.read(1024)
packet_header = get_rt130_packet_header(packet)
if packet_header.packet_type in ['EH', 'ET', 'DT']:
waveform_handlers = {
'EH': (read_eh_et_packet, EHETPacket),
'ET': (read_eh_et_packet, EHETPacket),
'DT': (read_dt_packet, DTPacket),
}
packet_handlers = {
**waveform_handlers
}
packet_handler, packet_type = packet_handlers.get(
packet_header.packet_type, packet_reader_placeholder
)
return_val = packet_handler(packet, unpacker)
extended_header, data = return_val
current_packet = packet_type(packet_header, extended_header,
data)
else:
payload = read_soh_packet(packet)
current_packet = SOHPacket(packet_header, payload)
packets.append(current_packet)
return packets
def convert_packet_header_to_dict(packet_header: PacketHeader) -> Dict:
converted_header = {'packet_type': packet_header.packet_type,
'experiment_number': packet_header.experiment_number,
# Obspy only stores the last two digits of the year.
'year': packet_header.time.year % 100,
'unit_id': packet_header.unit_id,
'time': packet_header.time.ns,
'byte_count': packet_header.byte_count,
'packet_sequence': packet_header.packet_sequence}
return converted_header
def convert_waveform_packet_to_obspy_format(
packet: Union[EHETPacket, DTPacket], unpacker: Unpacker) -> Tuple:
"""
Convert an RT130 packet into a numpy array of type PACKET_FINAL_DTYPE
:param packet: an RT130 packet.
:param unpacker: the decoder used to decode the data.
:return: a tuple that can be converted into an object of type
PACKET_FINAL_DTYPE that contains the data stored in packet.
"""
# We want to convert the packet to a tuple. In order to make it easier to
# maintain, we first convert the packet to a dictionary. Then, we grab the
# values of the dictionary as tuple to get the final result.
converted_packet = convert_packet_header_to_dict(packet.header)
converted_packet['event_number'] = packet.extended_header.event_number
converted_packet[
'data_stream_number'] = packet.extended_header.data_stream_number
converted_packet['channel_number'] = packet.extended_header.channel_number
converted_packet[
'number_of_samples'] = packet.extended_header.number_of_samples
converted_packet['flags'] = packet.extended_header.flags
converted_packet['data_format'] = packet.extended_header.data_format
if converted_packet['packet_type'] == 'DT':
# Obspy stores the data as list of 1-byte integers. We store the
# data as an arbitrary length integer, so we need to do some
# conversion. To make encoding and decoding the data point easier, we
# store it in 4 bytes no matter what the data format is. This only
# has an effect on data with format 16. Thanks to a quirk with
# 2-complement binary encoding, however, this does not cause any
# problem.
data_size = 4
format_char = 'B'
converted_packet['payload'] = numpy.empty(1000, np.uint8)
packet_data = list(unpacker.unpack(
f'{data_size}{format_char}',
packet.data.to_bytes(data_size, 'big', signed=True)
))
converted_packet['payload'][:4] = packet_data
elif converted_packet['packet_type'] in ['EH', 'ET']:
eh_et_payload_size = eh_et_payload_end_in_packet - 24
converted_packet['payload'] = numpy.empty(1000, np.uint8)
packet_data = numpy.frombuffer(packet.data, np.uint8)
converted_packet['payload'][:eh_et_payload_size] = packet_data
else:
converted_packet['payload'] = numpy.frombuffer(packet.data, np.uint8)
return tuple(converted_packet.values())
def convert_soh_packet_to_obspy_format(packet: SOHPacket, unpacker: Unpacker
) -> Tuple:
converted_packet = convert_packet_header_to_dict(packet.header)
converted_packet['payload'] = numpy.frombuffer(packet.payload, np.uint8)
return tuple(converted_packet.values())
......@@ -7,6 +7,7 @@ from PySide6.QtWidgets import QDialogButtonBox, QDialog, QPlainTextEdit, \
from sohstationviewer.database.process_db import (
execute_db, trunc_add_db, execute_db_dict)
from sohstationviewer.database.extract_data import get_data_types
from sohstationviewer.controller.processing import read_mseed_channels
from sohstationviewer.controller.util import display_tracking_info
......@@ -243,6 +244,7 @@ class ChannelPreferDialog(OneWindowAtATimeDialog):
)
self.soh_list_table_widget.setRowCount(TOTAL_ROW)
self.avail_data_types = get_data_types()
for row_idx in range(TOTAL_ROW):
self.add_row(row_idx)
self.update_data_table_widget_items()
......
......@@ -242,8 +242,7 @@ class AddEditSingleChannelDialog(QDialog):
)
PlottingAxes.clean_axes(self.ax)
self.plotting.plot_channel(self.ax.c_data,
self.channel_name_lnedit.text(),
self.ax)
self.channel_name_lnedit.text())
self.close()
def set_buttons_enabled(self):
......