diff --git a/sohstationviewer/model/reftek_data/reftek.py b/sohstationviewer/model/reftek_data/reftek.py index 8bf7eee12796adcd4be6774c0b1823fc30fc27e5..02c2cd5d9e934509f9e032a543599259b9e0270a 100755 --- a/sohstationviewer/model/reftek_data/reftek.py +++ b/sohstationviewer/model/reftek_data/reftek.py @@ -3,25 +3,29 @@ RT130 object to hold and process RefTek data """ import os from pathlib import Path -from typing import Tuple, List, Union +from typing import Union, List, Tuple, Dict import traceback import numpy as np +from obspy.core import Stream -from sohstationviewer.model.reftek_data.reftek_data import core, soh_packet -from sohstationviewer.model.reftek_data.log_info import LogInfo -from sohstationviewer.model.data_type_model import ( - DataTypeModel, ThreadStopped, ProcessingDataError) -from sohstationviewer.model.handling_data import read_text -from sohstationviewer.model.handling_data_reftek import ( - check_reftek_header, read_reftek_stream) from sohstationviewer.conf import constants from sohstationviewer.controller.util import validate_file from sohstationviewer.view.util.enums import LogType +from sohstationviewer.model.general_data.general_data import \ + GeneralData, ThreadStopped, ProcessingDataError +from sohstationviewer.model.general_data.general_data_helper import read_text + +from sohstationviewer.model.reftek_data.reftek_helper import ( + check_reftek_header, read_reftek_stream, + retrieve_gaps_from_stream_header) +from sohstationviewer.model.reftek_data.reftek_reader import core, soh_packet +from sohstationviewer.model.reftek_data.log_info import LogInfo + -class RT130(DataTypeModel): +class RT130(GeneralData): """ - read and process reftek file into object with properties can be used to + read and process reftek_data file into object with properties can be used to plot SOH data, mass position data, waveform data and gaps """ def __init__(self, *args, **kwarg): @@ -39,12 +43,39 @@ class RT130(DataTypeModel): """ self.rt130_waveform_data_req: bool = kwarg['rt130_waveform_data_req'] """ + stream_header_by_key_chan: stream header by key, chan_id to get key + list, gaps by sta_id, nets by sta_id, channels by sta_id + """ + self.stream_header_by_key_chan: Dict[str, Dict[str, Stream]] = {} + """ + gaps_by_key_chan: gap list for each key/chan_id to separate data at + gaps, overlaps + """ + self.gaps_by_key_chan: Dict[Union[str, Tuple[str, str]], + Dict[str, List[List[int]]]] = {} + """ found_data_streams: list of data streams found to help inform user why the selected data streams don't show up """ self.found_data_streams: List[int] = [] + self.processing_data() + def processing_data(self): + if self.creator_thread.isInterruptionRequested(): + raise ThreadStopped() + self.read_folder(self.dir) + + if self.creator_thread.isInterruptionRequested(): + raise ThreadStopped() + self.selected_key = self.select_key() + if self.selected_key is None: + raise ThreadStopped() + + if self.creator_thread.isInterruptionRequested(): + raise ThreadStopped() + self.finalize_data() + def finalize_data(self): """ This function should be called after all folders finish reading to @@ -52,6 +83,7 @@ class RT130(DataTypeModel): + check not found data stream to give user a warning if needed + other tasks in super().finalize_data() """ + self.track_info("Finalizing...", LogType.INFO) self.track_info( "Prepare SOH data from log data", LogType.INFO) self.prepare_soh_data_from_log_data() @@ -64,7 +96,19 @@ class RT130(DataTypeModel): f"{', '.join(map(str, not_found_data_streams))}") self.processing_log.append((msg, LogType.WARNING)) - super().finalize_data() + self.sort_all_data() + self.combine_all_data() + self.apply_convert_factor_to_data_dicts() + + retrieve_gaps_from_stream_header( + self.stream_header_by_key_chan, self.gaps_by_key_chan, + self.gaps, self.gap_minimum, self.read_start, self.read_end) + + for key in self.data_time: + if self.data_time[key] == [constants.HIGHEST_INT, 0]: + # this happens when there is text or ascii only in the data + self.data_time[key] = [self.read_start, self.read_end] + def read_folder(self, folder: str) -> None: """ @@ -119,7 +163,40 @@ class RT130(DataTypeModel): + If there is more than one, show all keys, let user choose one to return. """ - keys = sorted(list(self.stream_header_by_key_chan.keys())) + self.keys = sorted(list(set( + list(self.soh_data.keys()) + + list(self.mass_pos_data.keys()) + + list(self.waveform_data.keys())))) + keys = self.keys + if len(keys) == 0: + msg = 'No required data found for the data set.' + raise ProcessingDataError(msg) + + selected_key = keys[0] + if len(keys) > 1: + msg = ("There are more than one keys in the given data.\n" + "Please select one to display") + self.pause_signal.emit(msg, keys) + self.pause() + selected_key = keys[self.pause_response] + + self.track_info(f'Select Key {selected_key}', LogType.INFO) + return selected_key + + def select_key(self) -> Tuple[str, str]: + """ + :return selected_key: + (device's serial number, experiment_number) + the selected keys from available key of stream header. + + If there is only one key, return it. + + If there is more than one, show all keys, let user choose one to + return. + """ + self.keys = sorted(list(set( + list(self.soh_data.keys()) + + list(self.mass_pos_data.keys()) + + list(self.waveform_data.keys())))) + keys = self.keys if len(keys) == 0: msg = 'No required data found for the data set.' raise ProcessingDataError(msg) @@ -135,6 +212,7 @@ class RT130(DataTypeModel): self.track_info(f'Select Key {selected_key}', LogType.INFO) return selected_key + def read_reftek_130(self, path2file: Path) -> bool: """ From the given file: @@ -311,3 +389,18 @@ class RT130(DataTypeModel): 'endTmEpoch': self.data_time[k][1] } self.soh_data[k][c_name]['tracesInfo'] = [tr] + + def populate_cur_key_for_all_data(self, cur_key: Tuple[str, str]) -> None: + """ + Set up new data set's key for all data + + :param cur_key: current processing key: DAS SN, experiment number + """ + if cur_key not in self.log_data: + self.log_data[cur_key] = {} + self.soh_data[cur_key] = {} + self.mass_pos_data[cur_key] = {} + self.waveform_data[cur_key] = {} + self.gaps[cur_key] = [] + self.data_time[cur_key] = [constants.HIGHEST_INT, 0] + self.stream_header_by_key_chan[cur_key] = {} diff --git a/sohstationviewer/model/reftek_data/reftek_helper.py b/sohstationviewer/model/reftek_data/reftek_helper.py new file mode 100644 index 0000000000000000000000000000000000000000..fc467b112196552c3fb1e4d42091c3bf6c2a1659 --- /dev/null +++ b/sohstationviewer/model/reftek_data/reftek_helper.py @@ -0,0 +1,205 @@ +import numpy as np +from typing import Tuple, List, Dict, Optional, Union + +from obspy.core import Trace +from obspy.core import Stream +from obspy import UTCDateTime + +from sohstationviewer.model.reftek_data.reftek_reader.core import ( + DiscontinuousTrace, Reftek130) +from sohstationviewer.model.general_data.general_data_helper import squash_gaps + + +def check_reftek_header( + rt130: Reftek130, cur_key: Tuple[str, str], + starttime: UTCDateTime, endtime: UTCDateTime, + stream_header_by_key_chan: Dict[str, Dict[str, Stream]], + cur_data_dict: Dict, cur_data_time: List[float], + include_mp123zne: bool, include_mp456uvw: bool): + """ + FROM handling_data_reftek.check_reftek_header() + + Read mseed headers of a file from the given rt130 object + to check for time, create stream_header for retrieving gaps later. + Requested data stream has been checked before passing to this function. + + :param rt130: RT130 object to get data stream from + :param cur_key: Tuple of DAS serial number, experiment number of the + current file. + :param starttime: start of read data to skip reading actual data if not + in range + :param endtime: end of read data to skip reading actual data if not + in range + :param stream_header_by_key_chan: dict of stream header by key, chan to get + gaps later + :param cur_data_dict: waveform_data/mass_pos_data of the current key + :param cur_data_time: data_time of the current key + :param include_mp123zne: if mass position channels 1,2,3 are requested + :param include_mp456uvw: if mass position channels 4,5,6 are requested + """ + stream = Reftek130.to_stream( + rt130, + include_mp123=include_mp123zne, + include_mp456=include_mp456uvw, + headonly=True, + verbose=False, + sort_permuted_package_sequence=True) + + avail_trace_indexes = [] + for index, trace in enumerate(stream): + chan_id = trace.stats['channel'].strip() + samplerate = trace.stats['sampling_rate'] + if chan_id not in cur_data_dict: + cur_data_dict[chan_id] = {'tracesInfo': [], + 'samplerate': samplerate} + if trace.stats.npts == 0: + # this trace isn't available to prevent bug when creating memmap + # with no data + continue + if (starttime <= trace.stats['starttime'] <= endtime or + starttime <= trace.stats['endtime'] <= endtime): + avail_trace_indexes.append(index) + + if chan_id not in stream_header_by_key_chan[cur_key]: + stream_header_by_key_chan[cur_key][chan_id] = Stream() + stream_header_by_key_chan[cur_key][chan_id].append(trace) + + cur_data_time[0] = min( + trace.stats['starttime'].timestamp, cur_data_time[0]) + cur_data_time[1] = max( + trace.stats['endtime'].timestamp, cur_data_time[1]) + + return avail_trace_indexes + + +def read_reftek_stream( + rt130: Reftek130, tmp_dir: str, cur_key: Tuple[str, str], + avail_trace_indexes: List[int], cur_data_dict: Dict, + include_mp123zne: bool, include_mp456uvw: bool): + """ + FROM handling_data_reftek.read_reftek_stream + Read traces of a file from the given rt130 object for the index in + avail_trace_indexes. + + :param rt130: RT130 object to get data stream from + :param tmp_dir: dir to keep memmap files + :param cur_key: Tuple of DAS serial number, experiment number of the + current file. + :param avail_trace_indexes: index of traces to get + :param cur_data_dict: waveform_data/mass_pos_data of the current key + :param include_mp123zne: if mass position channels 1,2,3 are requested + :param include_mp456uvw: if mass position channels 4,5,6 are requested + """ + # TODO: rewrite reftek to read stream with start and end time + stream = Reftek130.to_stream( + rt130, + include_mp123=include_mp123zne, + include_mp456=include_mp456uvw, + headonly=False, + verbose=False, + sort_permuted_package_sequence=True) + for index in avail_trace_indexes: + trace = stream[index] + chan_id = trace.stats['channel'].strip() + traces_info = cur_data_dict[chan_id]['tracesInfo'] + tr = read_mseed_trace(trace) + traces_info.append(tr) + + +def read_mseed_trace( + trace: Union[Trace, DiscontinuousTrace]) -> Dict: + """ + FROM handling_data.read_mseed_trace_spr_less_than_or_equal_1() + + For mseed trace of which sample rate <=1, read and keep all info of the + trace including times and data in memory. + Traces that have sample_rate <=1 can be soh, mass position, waveform + + :param trace: mseed trace + :return tr: dict of trace's info in which data and times are kept + """ + tr = {} + tr['chanID'] = trace.stats.channel + tr['startTmEpoch'] = trace.stats.starttime.timestamp + tr['endTmEpoch'] = trace.stats.endtime.timestamp + tr['samplerate'] = trace.stats.sampling_rate + if hasattr(trace.stats, 'actual_npts'): + tr['size'] = trace.stats.actual_npts + else: + tr['size'] = trace.stats.npts + """ + trace time start with 0 => need to add with epoch starttime + times and data have type ndarray + """ + tr['times'] = trace.times() + trace.stats['starttime'].timestamp + if trace.stats.channel.startswith('MassPos'): + tr['data'] = _convert_reftek_masspos_data(trace.data) + else: + tr['data'] = trace.data + return tr + + +def _convert_reftek_masspos_data(data: np.ndarray) -> Dict: + """ + FROM handling_data.convert_reftek_masspos_data() + + Read mass possition trace's info using read_mseed_trace_spr_lt1(), then + calculate real value for mass possition + + :param data: mseed data + :return data that has been converted from 16-bit signed integer in which + 32767= 2 ** 16/2 - 1 is the highest value of 16-bit two's complement + number. The value is also multiplied by 10 for readable display. + (According to 130_theory.pdf: Each channel connects to a 12-bit A/D + converter with an input range of +/- 10V. These channel are read + once per second as left-justified, 2's-compliment, 16 bit values.) + + """ + return np.round_(data / 32767.0 * 10.0, 1) + + +def retrieve_gaps_from_stream_header( + streams: Dict[str, Dict[str, Stream]], + gaps_by_key_chan: Dict[Union[str, Tuple[str, str]], + Dict[str, List[List[int]]]], + gaps: Dict[str, List[List[float]]], + gap_minimum, + read_start: Optional[float], + read_end: Optional[float]) -> Dict[str, List[List[float]]]: + """ + CHANGED FROM handling_data.retrieve_gaps_from_stream_header() + Retrieve gaps by sta_id from stream_header_by_key_chan + + :param streams: dict of stream header by sta, chan + :param gaps_by_key_chan: gaps list by key and channel id + :param gaps: gaps list by key + """ + for sta_id in streams: + sta_gaps = [] + gaps_by_key_chan[sta_id] = {} + for chan_id in streams[sta_id]: + stream = streams[sta_id][chan_id] + gaps_in_stream = stream.get_gaps() + gaps_by_key_chan[sta_id][chan_id] = stream_gaps = [ + [g[4].timestamp, g[5].timestamp] for g in gaps_in_stream + if _check_gap(g[4], g[5], read_start, read_end, gap_minimum)] + + sta_gaps += stream_gaps + gaps[sta_id] = squash_gaps(sta_gaps) + + +def _check_gap(t1, t2, start, end, gap_minimum): + """ + Check if a part of the given gap in the given time range and the gap is + greater than gap_minimum + :param t1: start of given gap + :param t2: end of given gap + :param start: start of time range + :param end: end of given time range + :param gap_minimum: + :return: True if check is satisfied, False otherwise + """ + t1 = t1.timestamp + t2 = t2.timestamp + return (abs(t2 - t1) > gap_minimum and + (start <= min(t1, t2) <= end or start <= max(t1, t2) <= end))