From 6737274120723899835e249727a92c4f43d009f4 Mon Sep 17 00:00:00 2001
From: ldam <ldam@passcal.nmt.edu>
Date: Mon, 28 Aug 2023 12:07:03 -0600
Subject: [PATCH] intergrate new rt130 with existing code

---
 sohstationviewer/model/reftek_data/reftek.py  | 117 +++++++++-
 .../model/reftek_data/reftek_helper.py        | 205 ++++++++++++++++++
 2 files changed, 310 insertions(+), 12 deletions(-)
 create mode 100644 sohstationviewer/model/reftek_data/reftek_helper.py

diff --git a/sohstationviewer/model/reftek_data/reftek.py b/sohstationviewer/model/reftek_data/reftek.py
index 8bf7eee12..02c2cd5d9 100755
--- a/sohstationviewer/model/reftek_data/reftek.py
+++ b/sohstationviewer/model/reftek_data/reftek.py
@@ -3,25 +3,29 @@ RT130 object to hold and process RefTek data
 """
 import os
 from pathlib import Path
-from typing import Tuple, List, Union
+from typing import  Union, List, Tuple, Dict
 import traceback
 import numpy as np
+from obspy.core import Stream
 
-from sohstationviewer.model.reftek_data.reftek_data import core, soh_packet
-from sohstationviewer.model.reftek_data.log_info import LogInfo
-from sohstationviewer.model.data_type_model import (
-    DataTypeModel, ThreadStopped, ProcessingDataError)
-from sohstationviewer.model.handling_data import read_text
-from sohstationviewer.model.handling_data_reftek import (
-    check_reftek_header, read_reftek_stream)
 from sohstationviewer.conf import constants
 from sohstationviewer.controller.util import validate_file
 from sohstationviewer.view.util.enums import LogType
 
+from sohstationviewer.model.general_data.general_data import \
+    GeneralData, ThreadStopped, ProcessingDataError
+from sohstationviewer.model.general_data.general_data_helper import read_text
+
+from sohstationviewer.model.reftek_data.reftek_helper import (
+    check_reftek_header, read_reftek_stream,
+    retrieve_gaps_from_stream_header)
+from sohstationviewer.model.reftek_data.reftek_reader import core, soh_packet
+from sohstationviewer.model.reftek_data.log_info import LogInfo
+
 
-class RT130(DataTypeModel):
+class RT130(GeneralData):
     """
-    read and process reftek file into object with properties can be used to
+    read and process reftek_data file into object with properties can be used to
     plot SOH data, mass position data, waveform data and gaps
     """
     def __init__(self, *args, **kwarg):
@@ -39,12 +43,39 @@ class RT130(DataTypeModel):
         """
         self.rt130_waveform_data_req: bool = kwarg['rt130_waveform_data_req']
         """
+        stream_header_by_key_chan: stream header by key, chan_id to get key
+            list, gaps by sta_id, nets by sta_id, channels by sta_id
+        """
+        self.stream_header_by_key_chan: Dict[str, Dict[str, Stream]] = {}
+        """
+        gaps_by_key_chan: gap list for each key/chan_id to separate data at
+            gaps, overlaps
+        """
+        self.gaps_by_key_chan: Dict[Union[str, Tuple[str, str]],
+                                    Dict[str, List[List[int]]]] = {}
+        """
         found_data_streams: list of data streams found to help inform user
             why the selected data streams don't show up
         """
         self.found_data_streams: List[int] = []
+
         self.processing_data()
 
+    def processing_data(self):
+        if self.creator_thread.isInterruptionRequested():
+            raise ThreadStopped()
+        self.read_folder(self.dir)
+
+        if self.creator_thread.isInterruptionRequested():
+            raise ThreadStopped()
+        self.selected_key = self.select_key()
+        if self.selected_key is None:
+            raise ThreadStopped()
+
+        if self.creator_thread.isInterruptionRequested():
+            raise ThreadStopped()
+        self.finalize_data()
+
     def finalize_data(self):
         """
         This function should be called after all folders finish reading to
@@ -52,6 +83,7 @@ class RT130(DataTypeModel):
             + check not found data stream to give user a warning if needed
             + other tasks in super().finalize_data()
         """
+        self.track_info("Finalizing...", LogType.INFO)
         self.track_info(
             "Prepare SOH data from log data", LogType.INFO)
         self.prepare_soh_data_from_log_data()
@@ -64,7 +96,19 @@ class RT130(DataTypeModel):
                        f"{', '.join(map(str, not_found_data_streams))}")
                 self.processing_log.append((msg, LogType.WARNING))
 
-        super().finalize_data()
+        self.sort_all_data()
+        self.combine_all_data()
+        self.apply_convert_factor_to_data_dicts()
+
+        retrieve_gaps_from_stream_header(
+            self.stream_header_by_key_chan, self.gaps_by_key_chan,
+            self.gaps, self.gap_minimum, self.read_start, self.read_end)
+
+        for key in self.data_time:
+            if self.data_time[key] == [constants.HIGHEST_INT, 0]:
+                # this happens when there is text or ascii only in the data
+                self.data_time[key] = [self.read_start, self.read_end]
+
 
     def read_folder(self, folder: str) -> None:
         """
@@ -119,7 +163,40 @@ class RT130(DataTypeModel):
         + If there is more than one, show all keys, let user choose one to
         return.
         """
-        keys = sorted(list(self.stream_header_by_key_chan.keys()))
+        self.keys = sorted(list(set(
+            list(self.soh_data.keys()) +
+            list(self.mass_pos_data.keys()) +
+            list(self.waveform_data.keys()))))
+        keys = self.keys
+        if len(keys) == 0:
+            msg = 'No required data found for the data set.'
+            raise ProcessingDataError(msg)
+
+        selected_key = keys[0]
+        if len(keys) > 1:
+            msg = ("There are more than one keys in the given data.\n"
+                   "Please select one to display")
+            self.pause_signal.emit(msg, keys)
+            self.pause()
+            selected_key = keys[self.pause_response]
+
+        self.track_info(f'Select Key {selected_key}', LogType.INFO)
+        return selected_key
+
+    def select_key(self) -> Tuple[str, str]:
+        """
+        :return selected_key:
+            (device's serial number, experiment_number)
+            the selected keys from available key of stream header.
+        + If there is only one key, return it.
+        + If there is more than one, show all keys, let user choose one to
+        return.
+        """
+        self.keys = sorted(list(set(
+            list(self.soh_data.keys()) +
+            list(self.mass_pos_data.keys()) +
+            list(self.waveform_data.keys()))))
+        keys = self.keys
         if len(keys) == 0:
             msg = 'No required data found for the data set.'
             raise ProcessingDataError(msg)
@@ -135,6 +212,7 @@ class RT130(DataTypeModel):
         self.track_info(f'Select Key {selected_key}', LogType.INFO)
         return selected_key
 
+
     def read_reftek_130(self, path2file: Path) -> bool:
         """
         From the given file:
@@ -311,3 +389,18 @@ class RT130(DataTypeModel):
                     'endTmEpoch': self.data_time[k][1]
                 }
                 self.soh_data[k][c_name]['tracesInfo'] = [tr]
+
+    def populate_cur_key_for_all_data(self, cur_key: Tuple[str, str]) -> None:
+        """
+        Set up new data set's key for all data
+
+        :param cur_key: current processing key: DAS SN, experiment number
+        """
+        if cur_key not in self.log_data:
+            self.log_data[cur_key] = {}
+            self.soh_data[cur_key] = {}
+            self.mass_pos_data[cur_key] = {}
+            self.waveform_data[cur_key] = {}
+            self.gaps[cur_key] = []
+            self.data_time[cur_key] = [constants.HIGHEST_INT, 0]
+            self.stream_header_by_key_chan[cur_key] = {}
diff --git a/sohstationviewer/model/reftek_data/reftek_helper.py b/sohstationviewer/model/reftek_data/reftek_helper.py
new file mode 100644
index 000000000..fc467b112
--- /dev/null
+++ b/sohstationviewer/model/reftek_data/reftek_helper.py
@@ -0,0 +1,205 @@
+import numpy as np
+from typing import Tuple, List, Dict, Optional, Union
+
+from obspy.core import Trace
+from obspy.core import Stream
+from obspy import UTCDateTime
+
+from sohstationviewer.model.reftek_data.reftek_reader.core import (
+    DiscontinuousTrace, Reftek130)
+from sohstationviewer.model.general_data.general_data_helper import squash_gaps
+
+
+def check_reftek_header(
+        rt130: Reftek130, cur_key: Tuple[str, str],
+        starttime: UTCDateTime, endtime: UTCDateTime,
+        stream_header_by_key_chan: Dict[str, Dict[str, Stream]],
+        cur_data_dict: Dict, cur_data_time: List[float],
+        include_mp123zne: bool, include_mp456uvw: bool):
+    """
+    FROM handling_data_reftek.check_reftek_header()
+
+    Read mseed headers of a file from the given rt130 object
+        to check for time, create stream_header for retrieving gaps later.
+        Requested data stream has been checked before passing to this function.
+
+    :param rt130: RT130 object to get data stream from
+    :param cur_key: Tuple of DAS serial number, experiment number of the
+        current file.
+    :param starttime: start of read data to skip reading actual data if not
+        in range
+    :param endtime: end of read data to skip reading actual data if not
+        in range
+    :param stream_header_by_key_chan: dict of stream header by key, chan to get
+        gaps later
+    :param cur_data_dict: waveform_data/mass_pos_data of the current key
+    :param cur_data_time: data_time of the current key
+    :param include_mp123zne: if mass position channels 1,2,3 are requested
+    :param include_mp456uvw: if mass position channels 4,5,6 are requested
+    """
+    stream = Reftek130.to_stream(
+        rt130,
+        include_mp123=include_mp123zne,
+        include_mp456=include_mp456uvw,
+        headonly=True,
+        verbose=False,
+        sort_permuted_package_sequence=True)
+
+    avail_trace_indexes = []
+    for index, trace in enumerate(stream):
+        chan_id = trace.stats['channel'].strip()
+        samplerate = trace.stats['sampling_rate']
+        if chan_id not in cur_data_dict:
+            cur_data_dict[chan_id] = {'tracesInfo': [],
+                                      'samplerate': samplerate}
+        if trace.stats.npts == 0:
+            #  this trace isn't available to prevent bug when creating memmap
+            #  with no data
+            continue
+        if (starttime <= trace.stats['starttime'] <= endtime or
+                starttime <= trace.stats['endtime'] <= endtime):
+            avail_trace_indexes.append(index)
+
+            if chan_id not in stream_header_by_key_chan[cur_key]:
+                stream_header_by_key_chan[cur_key][chan_id] = Stream()
+            stream_header_by_key_chan[cur_key][chan_id].append(trace)
+
+            cur_data_time[0] = min(
+                trace.stats['starttime'].timestamp, cur_data_time[0])
+            cur_data_time[1] = max(
+                trace.stats['endtime'].timestamp, cur_data_time[1])
+
+    return avail_trace_indexes
+
+
+def read_reftek_stream(
+        rt130: Reftek130, tmp_dir: str, cur_key: Tuple[str, str],
+        avail_trace_indexes: List[int], cur_data_dict: Dict,
+        include_mp123zne: bool, include_mp456uvw: bool):
+    """
+    FROM handling_data_reftek.read_reftek_stream
+    Read traces of a file from the given rt130 object for the index in
+        avail_trace_indexes.
+
+    :param rt130: RT130 object to get data stream from
+    :param tmp_dir: dir to keep memmap files
+    :param cur_key: Tuple of DAS serial number, experiment number of the
+        current file.
+    :param avail_trace_indexes: index of traces to get
+    :param cur_data_dict: waveform_data/mass_pos_data of the current key
+    :param include_mp123zne: if mass position channels 1,2,3 are requested
+    :param include_mp456uvw: if mass position channels 4,5,6 are requested
+    """
+    # TODO: rewrite reftek to read stream with start and end time
+    stream = Reftek130.to_stream(
+        rt130,
+        include_mp123=include_mp123zne,
+        include_mp456=include_mp456uvw,
+        headonly=False,
+        verbose=False,
+        sort_permuted_package_sequence=True)
+    for index in avail_trace_indexes:
+        trace = stream[index]
+        chan_id = trace.stats['channel'].strip()
+        traces_info = cur_data_dict[chan_id]['tracesInfo']
+        tr = read_mseed_trace(trace)
+        traces_info.append(tr)
+
+
+def read_mseed_trace(
+        trace: Union[Trace, DiscontinuousTrace]) -> Dict:
+    """
+    FROM handling_data.read_mseed_trace_spr_less_than_or_equal_1()
+
+    For mseed trace of which sample rate <=1, read and keep all info of the
+        trace including times and data in memory.
+    Traces that have sample_rate <=1 can be soh, mass position, waveform
+
+    :param trace: mseed trace
+    :return tr: dict of trace's info in which data and times are kept
+    """
+    tr = {}
+    tr['chanID'] = trace.stats.channel
+    tr['startTmEpoch'] = trace.stats.starttime.timestamp
+    tr['endTmEpoch'] = trace.stats.endtime.timestamp
+    tr['samplerate'] = trace.stats.sampling_rate
+    if hasattr(trace.stats, 'actual_npts'):
+        tr['size'] = trace.stats.actual_npts
+    else:
+        tr['size'] = trace.stats.npts
+    """
+    trace time start with 0 => need to add with epoch starttime
+    times and data have type ndarray
+    """
+    tr['times'] = trace.times() + trace.stats['starttime'].timestamp
+    if trace.stats.channel.startswith('MassPos'):
+        tr['data'] = _convert_reftek_masspos_data(trace.data)
+    else:
+        tr['data'] = trace.data
+    return tr
+
+
+def _convert_reftek_masspos_data(data: np.ndarray) -> Dict:
+    """
+    FROM handling_data.convert_reftek_masspos_data()
+
+    Read mass possition trace's info using read_mseed_trace_spr_lt1(), then
+        calculate real value for mass possition
+
+    :param data: mseed data
+    :return data that has been converted from 16-bit signed integer in which
+        32767= 2 ** 16/2 - 1 is the highest value of 16-bit two's complement
+        number. The value is also multiplied by 10 for readable display.
+    (According to 130_theory.pdf: Each channel connects to a 12-bit A/D
+    converter with an input range of +/- 10V. These channel are read
+    once per second as left-justified, 2's-compliment, 16 bit values.)
+
+    """
+    return np.round_(data / 32767.0 * 10.0, 1)
+
+
+def retrieve_gaps_from_stream_header(
+        streams: Dict[str, Dict[str, Stream]],
+        gaps_by_key_chan: Dict[Union[str, Tuple[str, str]],
+                               Dict[str, List[List[int]]]],
+        gaps: Dict[str, List[List[float]]],
+        gap_minimum,
+        read_start: Optional[float],
+        read_end: Optional[float]) -> Dict[str, List[List[float]]]:
+    """
+    CHANGED FROM handling_data.retrieve_gaps_from_stream_header()
+    Retrieve gaps by sta_id from stream_header_by_key_chan
+
+    :param streams: dict of stream header by sta, chan
+    :param gaps_by_key_chan: gaps list by key and channel id
+    :param gaps: gaps list by key
+    """
+    for sta_id in streams:
+        sta_gaps = []
+        gaps_by_key_chan[sta_id] = {}
+        for chan_id in streams[sta_id]:
+            stream = streams[sta_id][chan_id]
+            gaps_in_stream = stream.get_gaps()
+            gaps_by_key_chan[sta_id][chan_id] = stream_gaps = [
+                [g[4].timestamp, g[5].timestamp] for g in gaps_in_stream
+                if _check_gap(g[4], g[5], read_start, read_end, gap_minimum)]
+
+            sta_gaps += stream_gaps
+        gaps[sta_id] = squash_gaps(sta_gaps)
+
+
+def _check_gap(t1, t2, start, end, gap_minimum):
+    """
+    Check if a part of the given gap in the given time range and the gap is
+        greater than gap_minimum
+    :param t1: start of given gap
+    :param t2: end of given gap
+    :param start: start of time range
+    :param end: end of given time range
+    :param gap_minimum:
+    :return: True if check is satisfied, False otherwise
+    """
+    t1 = t1.timestamp
+    t2 = t2.timestamp
+    return (abs(t2 - t1) > gap_minimum and
+            (start <= min(t1, t2) <= end or start <= max(t1, t2) <= end))
-- 
GitLab