Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
Commits on Source (23)
......@@ -3,6 +3,7 @@ Functions that process data for plotting
"""
import math
from typing import List, Union, Optional, Tuple, Dict
from obspy import UTCDateTime
......@@ -19,19 +20,21 @@ MassPosColorPallets = {"B": ["C", "G", "Y", "R", "M"],
"W": ["B", "B", "B", "B", "B"]}
def getMassposValueColors(rangeOpt, chan_id, cMode, processing_log,
retType='str'):
def getMassposValueColors(rangeOpt: str, chan_id: str, cMode: str,
processing_log: List[Tuple[str, LogType]],
retType: str = 'str'
) -> Optional[Union[str, List[Tuple[float, str]]]]:
"""
Create a map between value and color based on given rangeOpt and cMode to
display mass position plots.
:param rangeOpt: massPosVoltRangeOpt got from Options Menu - MP coloring
in Main Window to define different values for mass position.
(regular/trillium)
:param chan_id: str - ID of the channel
:param cMode: str - color mode (B/W)
:param processing_log: [(str, str),] - list of processing info and type
:param retType: str - request return type
:return: [(float, str), ] - [(value, color), (value, color) ...]
:param chan_id: ID of the channel
:param cMode: color mode (B/W)
:param processing_log: list of processing info and type
:param retType: request return type
:return: [(value, color), (value, color) ...]
if retType is 'str', return "value:color|value:color"
"""
......@@ -67,13 +70,14 @@ def getMassposValueColors(rangeOpt, chan_id, cMode, processing_log,
return valueColors
def formatTime(time, dateMode, timeMode=None):
def formatTime(time: Union[UTCDateTime, float], dateMode: str,
timeMode: Optional[str] = None) -> str:
"""
Format time according to date_mode and time_mode
:param time: time to be format, can be UTCDateTime or epoch time
:param dateMode: str - the format of date
:param timeMode: str - the format of time
:return: the formatted time string - str
:param dateMode: the format of date
:param timeMode: the format of time
:return: the formatted time string
"""
if isinstance(time, UTCDateTime):
t = time
......@@ -96,14 +100,15 @@ def formatTime(time, dateMode, timeMode=None):
return ret
def getTitle(key, minTime, maxTime, dateMode):
def getTitle(key: Union[str, Tuple[str, int]], minTime: float, maxTime: float,
dateMode: str) -> str:
"""
Create title for the plot.
:param key: str or (str, str) sta_id for mseed, (unit_id, exp_no) for rt130
:param minTime: float - start time of the plot
:param maxTime: float - end time of the plot
:param dateMode: str - format of date
:return: title for the plot - str
:param minTime: start time of the plot
:param maxTime: end time of the plot
:param dateMode: format of date
:return: title for the plot
"""
diff = maxTime - minTime
hours = diff / 3600
......@@ -115,30 +120,33 @@ def getTitle(key, minTime, maxTime, dateMode):
)
def getGaps(gaps, gapMin):
def getGaps(gaps: List[List[float]], gapMin: float
) -> List[List[float]]:
"""
:param gaps: [[float, float],] - list of gaps
:param gapMin: int - minimum of gaps count in minutes
return list of gaps of which gaps smaller than gapMin have been removed
:param gaps: list of gaps
:param gapMin: minimum of gaps count in minutes
:return: list of gaps of which gaps smaller than gapMin have been removed
"""
gapMinSec = gapMin * 60
return [g for g in gaps if (g[1] - g[0]) >= gapMinSec]
def getTimeTicks(earliest, latest, dateFormat, labelTotal):
def getTimeTicks(earliest: float, latest: float, dateFormat: str,
labelTotal: int
) -> Tuple[List[float], List[float], List[str]]:
"""
split time range to use for tick labels
Ex: getTimeTicks(1595542860.0, 1595607663.91, 'YYYY-MM-DD', 3)
:param earliest: float - earliest epoch time
:param latest: float - latest epoch time
:param dateFormat: str - (YYYY:DOY, YYYY-MM-DD or YYYYMMMDD)
:param earliest: earliest epoch time
:param latest: latest epoch time
:param dateFormat: (YYYY:DOY, YYYY-MM-DD or YYYYMMMDD)
(selected in Menu Options - Date Format)
:param labelTotal: int - number of time label to be displayed,
:param labelTotal: number of time label to be displayed,
others will show as ticks oly
:return:
times: [float,] - list of times to show ticks
majorTimes: [float,] - list of times for displayed labels
majorTimelabels: [str,] - list of labels displayed
times: list of times to show ticks
majorTimes: list of times for displayed labels
majorTimelabels: list of labels displayed
"""
timeRange = latest - earliest
if timeRange >= 2592000.0:
......@@ -186,14 +194,13 @@ def getTimeTicks(earliest, latest, dateFormat, labelTotal):
return times, majorTimes, majorTimelabels
def getDayTicks():
def getDayTicks() -> Tuple[List[int], List[int], List[str]]:
"""
Get information for displaying time on plotting widget.
:return:
times: [int,] - list of indexes of every hour in each_day_5_min_list
majorTimes: [int,] - list of indexes of every 4 hours in
each_day_5_min_list
majorTimeLabels: [str,] - 2 digit numbers of every 4 hours in a day
times: list of indexes of every hour in each_day_5_min_list
majorTimes: list of indexes of every 4 hours in each_day_5_min_list
majorTimeLabels: 2 digit numbers of every 4 hours in a day
"""
times = list(range(const.NO_5M_1H, const.NO_5M_DAY, const.NO_5M_1H))
......@@ -204,14 +211,14 @@ def getDayTicks():
return times, majorTimes, majorTimeLabels
def getUnitBitweight(chanDB, bitweightOpt):
def getUnitBitweight(chanDB: Dict, bitweightOpt: str) -> str:
"""
Get the format to diplay value including fixed point decimal and unit based
on the information from the database.
:param chanDB: dict - channel's info got from database
:param bitweightOpt: str - option for bitweight (none, low, high)
:param chanDB: channel's info got from database
:param bitweightOpt: option for bitweight (none, low, high)
(Menu Options - Q330 Gain)
:return unitBitweight: str - format for displayed value on the left of each
:return unitBitweight: format for displayed value on the left of each
plot with fixed point decimal and unit
"""
......
......@@ -8,7 +8,9 @@ import json
import re
import traceback
from pathlib import Path
from typing import List, Set, Optional, Dict, Tuple
from PySide2.QtWidgets import QTextBrowser
from obspy.core import read as read_ms
from obspy.io.reftek.core import Reftek130Exception
......@@ -21,8 +23,10 @@ from sohstationviewer.controller.util import validateFile, displayTrackingInfo
from sohstationviewer.view.util.enums import LogType
def loadData(dataType, tracking_box, listOfDir, reqWFChans=[], reqSOHChans=[],
readStart=None, readEnd=None):
def loadData(dataType: str, tracking_box: QTextBrowser, listOfDir: List[str],
reqWFChans: List[str] = [], reqSOHChans: List[str] = [],
readStart: Optional[float] = None,
readEnd: Optional[float] = None) -> DataTypeModel:
"""
Load the data stored in listOfDir and store it in a DataTypeModel object.
The concrete class of the data object is based on dataType. Run on the same
......@@ -31,14 +35,14 @@ def loadData(dataType, tracking_box, listOfDir, reqWFChans=[], reqSOHChans=[],
unless it is necessary to load data in the main thread (e.g. if there is
a need to access the call stack).
:param dataType: str - type of data read
:param tracking_box: QTextBrowser - widget to display tracking info
:param listOfDir: [str,] - list of directories selected by users
:param reqWFChans: [str,] - requested waveform channel list
:param reqSOHChans: [str,] - requested soh channel list
:param readStart: [float,] - start time of read data
:param readEnd: [float,] - finish time of read data
:return dataObject: DataTypeModel - object that keep the data read from
:param dataType: type of data read
:param tracking_box: widget to display tracking info
:param listOfDir: list of directories selected by users
:param reqWFChans: requested waveform channel list
:param reqSOHChans: requested soh channel list
:param readStart: start time of read data
:param readEnd: finish time of read data
:return dataObject: object that keep the data read from
listOfDir
"""
dataObject = None
......@@ -66,14 +70,14 @@ def loadData(dataType, tracking_box, listOfDir, reqWFChans=[], reqSOHChans=[],
return dataObject
def readChannels(tracking_box, listOfDir):
def readChannels(tracking_box: QTextBrowser, listOfDir: List[str]
) -> Set[str]:
"""
Scan available channels (to be used in channel preferences dialog). Since
channels for RT130 is hard code, this function won't be applied for it.
:param tracking_box: QTextBrowser - widget to display tracking info
:param listOfDir: [str,] - list of directories selected by users
:return dataObject: DataTypeModel - object that keep the data read from
listOfDir
:param tracking_box: widget to display tracking info
:param listOfDir: list of directories selected by users
:return dataObject.channels: set of channels present in listofDir
"""
dataObject = None
for d in listOfDir:
......@@ -92,12 +96,13 @@ def readChannels(tracking_box, listOfDir):
return dataObject.channels
def detectDataType(tracking_box, listOfDir):
def detectDataType(tracking_box: QTextBrowser, listOfDir: List[str]
) -> Optional[str]:
"""
Detect data type for the given directories using getDataTypeFromFile
:param tracking_box: QTextBrowser - widget to display tracking info
:param listOfDir: [str,] - list of directories selected by users
:return: None/str -
:param tracking_box: widget to display tracking info
:param listOfDir: list of directories selected by users
:return:
+ if there are more than one data types detected,
return None with a warning message
+ if only Unknown data type detected,
......@@ -144,7 +149,10 @@ def detectDataType(tracking_box, listOfDir):
return list(dirDataTypeDict.values())[0][0]
def getDataTypeFromFile(path2file, sign_chan_data_type_dict):
def getDataTypeFromFile(
path2file: Path,
sign_chan_data_type_dict: Dict[str, str]
) -> Optional[Tuple[str, str]]:
"""
+ Try to read mseed data from given file
if catch TypeError: no data type detected => return None
......@@ -152,11 +160,10 @@ def getDataTypeFromFile(path2file, sign_chan_data_type_dict):
otherwise data type is mseed which includes: q330, pegasus, centaur
+ Continue to identify data type for a file by checking if the channel
in that file is a unique channel of a data type.
:param path2file: str - absolute path to processed file
:param sign_chan_data_type_dict: {str: str} - dict of unique chan for data
:param path2file: absolute path to processed file
:param sign_chan_data_type_dict: dict of unique chan for data
type
:return: str, str - detected data type, channel from which data type is
detected
:return: detected data type, channel from which data type is detected
"""
try:
stream = read_ms(path2file)
......
......@@ -5,19 +5,23 @@ basic functions: format, validate, display tracking
import os
import re
from datetime import datetime
from pathlib import Path
from PySide2 import QtCore
from typing import Tuple, Union
from PySide2.QtWidgets import QTextBrowser
from obspy import UTCDateTime
import numpy as np
from sohstationviewer.view.util.enums import LogType
def validateFile(path2file, fileName):
def validateFile(path2file: Union[str, Path], fileName: str):
"""
Check if fileName given is a file and not info file
:param path2file: str - absolute path to file
:param fileName: str - name of the file
:return: bool - True if pass checking, False if not.
:param path2file: absolute path to file
:param fileName: name of the file
:return: True if pass checking, False if not.
"""
if fileName.strip() == '.DS_Store' or fileName.startswith('._'):
......@@ -30,12 +34,13 @@ def validateFile(path2file, fileName):
@QtCore.Slot()
def displayTrackingInfo(trackingBox, text, type=LogType.INFO):
def displayTrackingInfo(trackingBox: QTextBrowser, text: str,
type: LogType = LogType.INFO):
"""
Display text in the given widget with different background and text colors
:param trackingBox: QTextBrowser - widget to display tracking info
:param text: str - info to be displayed
:param type: str - (info/warning/error) type of info to be displayed in
:param trackingBox: widget to display tracking info
:param text: info to be displayed
:param type: (info/warning/error) type of info to be displayed in
different color
"""
......@@ -63,13 +68,13 @@ def displayTrackingInfo(trackingBox, text, type=LogType.INFO):
trackingBox.repaint()
def getDirSize(dir):
def getDirSize(dir: str) -> Tuple[int, int]:
"""
Get size of directory and size of file.
:param dir: str - absolute path to directory
:param dir: absolute path to directory
:return:
totalSize: int - total size of the directory
totalFile: int - total file of the directory
totalSize: total size of the directory
totalFile: total file of the directory
"""
totalSize = 0
......@@ -85,13 +90,14 @@ def getDirSize(dir):
return totalSize, totalFile
def getTime6(timeStr):
def getTime6(timeStr: str) -> Tuple[float, int]:
"""
Get time from 6 parts string.
(year:day of year:hour:minute:second:millisecond)
Ex: 01:251:09:41:35:656/ 2001:251:09:41:35:656
in which year in the first part can be 2 digits or 6 digits
:param timeStr: str - 6 part time string
:param timeStr: 6 part time string
:return the epoch time and the year of timeStr.
"""
year = timeStr.split(':')[0]
if len(year) == 2:
......@@ -100,11 +106,12 @@ def getTime6(timeStr):
return getTime6_4y(timeStr)
def getTime6_2y(timeStr):
def getTime6_2y(timeStr: str) -> Tuple[float, int]:
"""
Get time from 6 parts string in which year has 2 digits.
Ex: 01:251:09:41:35:656
:param timeStr: str - 6 part time string with 2 digits for year
:param timeStr: 6 part time string with 2 digits for year
:return the epoch time and the year of timeStr.
"""
# pad 0 so the last part has 6 digits to match with the format str
timeStr = timeStr.ljust(22, "0")
......@@ -113,11 +120,12 @@ def getTime6_2y(timeStr):
return utcTime.timestamp, time.year
def getTime6_4y(timeStr):
def getTime6_4y(timeStr: str) -> Tuple[float, int]:
"""
Get time from 6 parts string in which year has 4 digits.
Ex: 2001:251:09:41:35:656
:param timeStr: str - 6 part time string with 4 digits for year
:param timeStr: 6 part time string with 4 digits for year
:return the epoch time and the year of timeStr.
"""
# pad 0 so the last part has 6 digits to match with the format str
timeStr = timeStr.ljust(24, "0")
......@@ -126,17 +134,18 @@ def getTime6_4y(timeStr):
return utcTime.timestamp, time.year
def getTime4(timeStr, trackingYear, yAdded):
def getTime4(timeStr: str, trackingYear: int, yAdded: bool
) -> Tuple[float, int, bool]:
"""
Get time from 4 parts string. (day of year:hour:minute:second)
Ex: 253:19:41:42
:param timeStr: str - time string
:param trackingYear: int - year that has been detected
:param yAdded: bool - flag to tell if year has been plussed 1 or not
:param timeStr: time string
:param trackingYear: year that has been detected
:param yAdded: flag to tell if year has been plussed 1 or not
:return:
+ utcTime.timestamp: float: epoch time
+ time.year: int: year
+ yAdded: bool - flag to tell if year has been plussed 1 or not
+ utcTime.timestamp: epoch time
+ time.year: year
+ yAdded: flag to tell if year has been plussed 1 or not
"""
if not yAdded:
# first day => move to next year
......@@ -150,11 +159,11 @@ def getTime4(timeStr, trackingYear, yAdded):
return utcTime.timestamp, time.year, yAdded
def getVal(text):
def getVal(text: str) -> float:
"""
Get the value part of a string with non-number substring following.
:param text: str - value string including unit
:return: value part including +/-, remove str that follows - float
:param text: value string including unit
:return: value part including +/-, remove str that follows
"""
REVal = '^\+?\-?[0-9]+\.?[0-9]?' # noqa: W605
return float(re.search(REVal, text).group())
......@@ -162,14 +171,14 @@ def getVal(text):
def isBinaryStr(text):
"""
:param text: str: text to check
:return: bool - True if text is a binary string or False if not
:param text: text to check
:return: True if text is a binary string or False if not
"""
return lambda b: bool(b.translate(None, text))
def rtnPattern(text, upper=False):
def rtnPattern(text: str, upper: bool = False) -> str:
"""
This function is from logpeek's rtnPattern.
return routine pattern of the string with:
......@@ -177,8 +186,10 @@ def rtnPattern(text, upper=False):
+ a for lowercase
+ A for upper case
+ remain special character
:param text: str - text to get format
:return rtn: str - routine pattern of the string
:param text: text to get format
:param upper: flag of whether to convert all alphabetic characters
to A
:return rtn: routine pattern of the string
"""
rtn = ""
for c in text:
......@@ -196,12 +207,12 @@ def rtnPattern(text, upper=False):
return rtn
def fmti(Value):
def add_thousand_separator(Value: float) -> str:
"""
This function is from logpeek's fmti
Given Value will be convert to a string integer with thousand separators
:param Value: str - string of value with unit
:return NewValue: str - new value with no unit and with thousand separators
:param Value: string of value with unit
:return NewValue: new value with no unit and with thousand separators
"""
Value = int(Value)
if Value > -1000 and Value < 1000:
......@@ -227,15 +238,15 @@ def fmti(Value):
return NewValue
def apply_convert_factor(c_data, convert_factor):
def apply_convert_factor(c_data: dict, convert_factor: float):
"""
convertFactor = 150mV/count = 150V/1000count
=> unit data * convertFactor= data *150/1000 V
:param c_data: dict - data of the channel which includes down-sampled
:param c_data: data of the channel which includes down-sampled
data in keys 'times' and 'data'. Refer to DataTypeModel.__init__.
soh_data[key][chan_id]
:param convert_factor: float - convertFactor field retrieved from
:param convert_factor: convertFactor field retrieved from
db table Channels for this channel
"""
c_data['data'] = np.multiply(c_data['data'], [convert_factor])
......@@ -3,7 +3,7 @@ import os
from tempfile import mkdtemp
import shutil
from typing import Optional
from typing import List, Tuple, Dict, Optional, Union
from PySide2 import QtCore
......@@ -12,6 +12,8 @@ from sohstationviewer.conf import constants
from sohstationviewer.view.util.enums import LogType
from sohstationviewer.database.process_db import execute_db
from PySide2 import QtWidgets
class WrongDataTypeError(Exception):
def __init__(self, *args, **kwargs):
......@@ -28,9 +30,11 @@ class ThreadStopped(Exception):
class DataTypeModel():
def __init__(self, trackingBox, folder, readChanOnly=False,
reqWFChans=[], reqSOHChans=[],
readStart=0, readEnd=constants.HIGHEST_INT,
def __init__(self, trackingBox: QtWidgets.QTextBrowser, folder: str,
readChanOnly: bool = False,
reqWFChans: Union[List[str], List[int]] = [],
reqSOHChans: List[str] = [], readStart: float = 0,
readEnd: float = constants.HIGHEST_INT,
creator_thread: Optional[QtCore.QThread] = None,
notification_signal: Optional[QtCore.Signal] = None,
pause_signal: Optional[QtCore.Signal] = None,
......@@ -38,13 +42,13 @@ class DataTypeModel():
"""
Super class for different data type to process data from data files
:param trackingBox: QTextBrowser - widget to display tracking info
:param folder: str - path to the folder of data
:param readChanOnly: bool - if True, only read for channel name
:param reqWFChans: list of str - requested waveform channel list
:param reqSOHChans: list of str - requested SOH channel list
:param readStart: float - requested start time to read
:param readEnd: float - requested end time to read
:param trackingBox: widget to display tracking info
:param folder: path to the folder of data
:param readChanOnly: if True, only read for channel name
:param reqWFChans: requested waveform channel list
:param reqSOHChans: requested SOH channel list
:param readStart: requested start time to read
:param readEnd: requested end time to read
:param creator_thread: the thread the current DataTypeModel instance is
being created in. If None, the DataTypeModel instance is being
created in the main thread
......@@ -74,7 +78,9 @@ class DataTypeModel():
"""
processingLog: [(message, type)] - record the progress of processing
"""
self.processingLog = []
self.processingLog: List[Tuple[str, LogType]] = []
DataKey = Union[Tuple[str, str], str]
"""
Log data: info from log channels, soh messages, text file in dict:
......@@ -82,7 +88,7 @@ class DataTypeModel():
In which 'TEXT': is the chan_id given by sohview for text only file.
Note: logData for RT130's dataset has only one channel: SOH
"""
self.logData = {'TEXT': []}
self.logData = {'TEXT': []} # noqa
"""
waveformData: data of waveform in dict:
......@@ -197,7 +203,7 @@ class DataTypeModel():
dataTime: time range of data sets:
{setKey: [earliestepoch, latestepoch]} - {str: [float, float],}
"""
self.dataTime = {}
self.dataTime: Dict[DataKey, List[float]] = {}
"""
The given data may include more than one data set which is station_id
......@@ -205,19 +211,19 @@ class DataTypeModel():
data set to be displayed
selectedKey: str - key of the data set to be displayed
"""
self.selectedKey = None
self.selectedKey: Optional[str] = None
"""
gaps: gaps info in dict:
{set_key: [list of gaps]} - {str: [[float, float],],}
"""
self.gaps = {}
self.gaps: Dict[DataKey, List[List[float]]] = {}
"""
tmpDir: str - dir to keep memmap files.
Will be deleted when object is deleted
"""
self.tmpDir = mkdtemp()
self.tmpDir: str = mkdtemp()
self.save_temp_data_folder_to_database()
try:
os.mkdir(self.tmpDir)
......@@ -239,7 +245,7 @@ class DataTypeModel():
print("Error deleting %s : %s" % (self.tmpDir, e.strerror))
print("finish deleting")
def hasData(self):
def hasData(self) -> bool:
"""
:return bool - True if there is any data can be read.
False if there is no valid data
......@@ -249,7 +255,7 @@ class DataTypeModel():
return False
return True
def trackInfo(self, text: str, type: LogType = LogType.INFO) -> None:
def trackInfo(self, text: str, type: LogType) -> None:
"""
Display tracking info in tracking_box.
Add all errors/warnings to processing_log.
......
......@@ -4,11 +4,12 @@ Functions that help processing model data
import math
from struct import unpack
from pathlib import Path
from typing import Dict, Tuple, List
from typing import Dict, Callable, Tuple, List, Union, IO, Optional
import numpy as np
from obspy.core import Stream, read as read_ms
from obspy.core import Trace, Stream, read as read_ms
from PySide2 import QtWidgets
from obspy.io.reftek.core import Reftek130
from sohstationviewer.model.mseed.blockettes_reader import (
readNextBlkt, ReadBlocketteError)
......@@ -18,8 +19,11 @@ from sohstationviewer.model.reftek.from_rt2ms import core
from sohstationviewer.view.util.enums import LogType
def readSOHMSeed(path2file, fileName,
SOHStreams, logData, netsProbInFile, trackInfo):
def readSOHMSeed(path2file: Path, fileName: str,
SOHStreams: Dict[str, Dict[str, Stream]],
logData: Dict[str, List[str]],
netsProbInFile: Dict[Tuple[str, ...], str],
trackInfo: Callable[[str, LogType], None]) -> Dict[str, List]:
"""
Use read() from obspy.core to read miniseed file:
+ if encoding is ASCII: log string will be added to log_data. The
......@@ -27,14 +31,14 @@ def readSOHMSeed(path2file, fileName,
to add to log_data as well.
+ otherwise traces from stream will be add to soh_streams to be
merged later
:param path2file: str - absolute path of mseed file
:param fileName: str - name of mseed file
:param SOHStreams: dict - holder of SOH mseed streams
:param logData: dict - holder of info from log
:param netsProbInfile: {list of str: str} - holder of dict with key are
nets of file and value is user-selected net so the rule will be
:param path2file: absolute path of mseed file
:param fileName: name of mseed file
:param SOHStreams: holder of SOH mseed streams
:param logData: holder of info from log
:param netsProbInFile: holder of dict with key are
nets of file and value is user-selected net so the rule will be
applied for other files if their nets is subset of a key
:param trackInfo: function - to display processing info
:param trackInfo: to display processing info
"""
stream = read_ms(path2file)
......@@ -101,11 +105,11 @@ def readSOHMSeed(path2file, fileName,
}
def readSOHTrace(trace):
def readSOHTrace(trace: Trace) -> Dict:
"""
Read SOH trace's info
:param trace: obspy.core.trace.Trace - mseed trace
:return tr: dict - with trace's info
:param trace: mseed trace
:return tr: with trace's info
(structure in DataTypeModel.__init__.soh_ata[key][chan_id][orgTrace])
"""
tr = {}
......@@ -122,12 +126,12 @@ def readSOHTrace(trace):
return tr
def readMPTrace(trace):
def readMPTrace(trace: Trace) -> Dict:
"""
Read mass possition trace's info using readSOHTrace(), then calculate real
value for mass possition
:param trace: obspy.core.trace.Trace - mseed trace
:return tr: dict - with trace's info from readSOHTrace in which tr['data']
:param trace: mseed trace
:return tr: with trace's info from readSOHTrace in which tr['data']
has been converted from 16-bit signed integer in which
32767= 2 ** 16/2 - 1 is the highest value of 16-bit two's complement
number. The value is also multiplied by 10 for readable display.
......@@ -142,17 +146,19 @@ def readMPTrace(trace):
return tr
def readWaveformTrace(trace, sta_id, chan_id, traces_info, tmp_dir):
def readWaveformTrace(trace: Trace, sta_id: Union[Tuple[str, str], str],
chan_id: str, traces_info: List, tmp_dir: str) -> Dict:
"""
read mseed waveform trace and save data to files to save mem for processing
since waveform data are big.
:param trace: obspy.core.trace.Trace - mseed trace
:param sta_id: str - station name
:param chan_id: str - channel name
:param traces_info: dict - holder of traces_info, refer
:param trace: mseed trace
:param sta_id: station name
:param chan_id: channel name
:param traces_info: holder of traces_info, refer
DataTypeModel.__init__.
waveform_data[key]['read_data'][chan_id]['traces_info']
:return tr: dict - with trace's info
:param tmp_dir: path to the directory that store memmap files
:return tr: with trace's info
(structure in DataTypeModel.__init__.
waveform_data[key][chan_id][traces_info])
"""
......@@ -177,21 +183,22 @@ def readWaveformTrace(trace, sta_id, chan_id, traces_info, tmp_dir):
return tr
def readWaveformMSeed(path2file, fileName, staID, chanID,
tracesInfo, dataTime, tmpDir):
def readWaveformMSeed(path2file: str, fileName: str, staID: str, chanID: str,
tracesInfo: List, dataTime: List[float], tmpDir: str
) -> None:
"""
Read traces from waveform mseed file to append to tracesInfo.
data_time is update for new min and max time.
:param path2file: str - absolute path to waveform mseed file
:param fileName: str - name of waveform mseed file
:param staID: str - station ID from indexing
:param chanID: str - channel ID from indexing
:param tracesInfo: dict - holder of traces_info, refer
:param path2file: absolute path to waveform mseed file
:param fileName: name of waveform mseed file
:param staID: station ID from indexing
:param chanID: channel ID from indexing
:param tracesInfo: holder of traces_info, refer
DataTypeModel.__init__.
waveform_data[key]['read_data'][chan_id]['traces_info']
:param dataTime: DataTypeModel.__init__.data_time[key] - holder for data
time of the current station
:param tmpDir: str - the folder to keep memmap files
:param tmpDir: the folder to keep memmap files
"""
stream = read_ms(path2file)
for trace in stream:
......@@ -201,7 +208,8 @@ def readWaveformMSeed(path2file, fileName, staID, chanID,
tracesInfo.append(tr)
def readWaveformReftek(rt130, key, read_data, dataTime, tmpDir):
def readWaveformReftek(rt130: Reftek130, key: Tuple[str, str], read_data: Dict,
dataTime: List[float], tmpDir: str) -> None:
"""
Read traces from rt130 object to add to traces Info tracesInfo.
dataTime is update for new min and max time.
......@@ -232,13 +240,15 @@ def readWaveformReftek(rt130, key, read_data, dataTime, tmpDir):
tracesInfo.append(tr)
def readASCII(path2file, file, sta_id, chan_id, trace, log_data, track_info):
def readASCII(path2file: Path, file: IO, sta_id: str, chan_id: str,
trace: Trace, log_data: dict,
track_info: Callable[[str, LogType], None]) -> IO:
"""
Read mseed trace with ASCII encoding to add to logData.
:param path2file: str- absolute path of mseed file
:param file: file object - to continue reading. Open new if file is None
:param sta_id: str - station ID got from mseed header
:param chaID: str - channel ID got from mseed header
:param chan_id: str - channel ID got from mseed header
:param trace: obspy.core.trace.Trace - mseed trace
:param log_data: dict - holder for log messages, refer to
DataTypeModel.__init__.log_data
......@@ -250,7 +260,6 @@ def readASCII(path2file, file, sta_id, chan_id, trace, log_data, track_info):
logText = "\n\nSTATE OF HEALTH: "
logText += ("From:%s To:%s\n" % (h.starttime, h.endtime))
textFromData = trace.data.tobytes().decode()
logText += textFromData
if textFromData != '':
logText += textFromData
else:
......@@ -280,7 +289,7 @@ def readASCII(path2file, file, sta_id, chan_id, trace, log_data, track_info):
return file
def readText(path2file, fileName, textLogs, ):
def readText(path2file: Path, fileName: str, textLogs: List) -> bool:
"""
Read text file and add to logData under channel TEXT. Raise exception
if the file isn't a text file
......@@ -302,14 +311,14 @@ def readText(path2file, fileName, textLogs, ):
return True
def saveData2File(tmp_dir, tm_data, sta_id, chanid,
tr, tr_idx, tr_size):
def saveData2File(tmp_dir: str, tm_data: str, sta_id: str, chanid: str,
tr: np.ndarray, tr_idx: int, tr_size: int) -> Path:
"""
Using np.memmap save time/data to file then free memory for processing
:param tmp_dir: str - the temporary dir to save file in
:param tm_data: str - "times"/"data"
:param sta_id: str - station ID
:param chaID: str - channel ID
:param chanid: str - channel ID
:param tr: numpy array - of trace time or data
:param tr_idx: int - trace index
:param tr_size: int - trace size
......@@ -325,7 +334,8 @@ def saveData2File(tmp_dir, tm_data, sta_id, chanid,
return memFileName
def checkChan(chanID, reqSOHChans, reqWFChans):
def checkChan(chanID: str, reqSOHChans: List[str], reqWFChans: List[str]
) -> Union[str, bool]:
"""
Check if chanID is a requested channel.
:param chanID: str - channel ID
......@@ -347,7 +357,7 @@ def checkChan(chanID, reqSOHChans, reqWFChans):
return False
def checkSOHChan(chanID, reqSOHChans):
def checkSOHChan(chanID: str, reqSOHChans: List[str]) -> bool:
"""
Check if chan_id is a requested SOH channel.
Mass position is always included.
......@@ -370,7 +380,7 @@ def checkSOHChan(chanID, reqSOHChans):
return False
def checkWFChan(chanID, reqWFChans):
def checkWFChan(chanID: str, reqWFChans: List[str]) -> Tuple[str, bool]:
"""
Check if chanID is a waveform channel and is requested by user
:param chanID: str - channel ID
......@@ -390,7 +400,7 @@ def checkWFChan(chanID, reqWFChans):
return wf, hasChan
def sortData(dataDict):
def sortData(dataDict: Dict) -> None:
"""
Sort data in 'tracesInfo' in 'startTmEpoch' order
:param dataDict: DataTypeModel.__init__.waveformData
......@@ -402,12 +412,12 @@ def sortData(dataDict):
tracesInfo, key=lambda i: i['startTmEpoch'])
def squash_gaps(gaps):
def squash_gaps(gaps: List[List[float]]) -> List[List[float]]:
"""
Compress gaps from different channels that have time range related to
each other to the ones with outside boundary (min start, max end).
:param gaps: [[[float, float],], [[float, float],],] -
list of gaps of different channels: [[[start, end],], [[start, end],],]
:param gaps: [[float, float],], [[float, float],] -
list of gaps of multiple channels: [[start, end],], [[start, end],]
:return: squashed_gaps: [[float, float],] - all related gaps are squashed
extending to min start and max end [[min start, max end],]
"""
......@@ -431,7 +441,9 @@ def squash_gaps(gaps):
return squashed_gaps
def downsample(times, data, log_indexes=None, rq_points=0):
def downsample(times: np.ndarray, data: np.ndarray,
log_indexes: Optional[np.ndarray] = None, rq_points: int = 0
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Reduce sample rate of times and data so that times and data return has
the size around the rq_points.
......@@ -469,7 +481,8 @@ def downsample(times, data, log_indexes=None, rq_points=0):
return chunk_minmax(times, data, log_indexes, rq_points)
def constant_rate(times, data, rq_points):
def constant_rate(times: np.ndarray, data: np.ndarray, rq_points: int
) -> Tuple[np.ndarray, np.ndarray]:
"""
Take sample with constant_rate regardless of the value of the data
:param times: numpy array of a waveform channel's times
......@@ -490,7 +503,9 @@ def constant_rate(times, data, rq_points):
return times, data
def chunk_minmax(times, data, log_indexes, rq_points):
def chunk_minmax(times: np.ndarray, data: np.ndarray,
log_indexes: Optional[np.ndarray],
rq_points: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Split data into different chunks, take the min, max of each chunk to add
to the data return
......@@ -570,7 +585,8 @@ def chunk_minmax(times, data, log_indexes, rq_points):
return dx, dy, dz
def trim_downsample_SOHChan(chan, startTm, endTm, firsttime):
def trim_downsample_SOHChan(chan: Dict, startTm: float, endTm: float,
firsttime: bool) -> None:
"""
When channel is zoomed in, times and data will be trimmed off to exclude
the non-included time. Then they will be downsampled to get to the
......@@ -685,7 +701,8 @@ def downsample_waveform_data(trimmed_traces_list: List[Dict],
return downsampled_times_list, downsampled_data_list
def trim_downsample_WFChan(chan, startTm, endTm, firsttime):
def trim_downsample_WFChan(chan: Dict, startTm: float, endTm: float,
firsttime: bool) -> Optional[bool]:
"""
When channel is zoomed in, times and data will be trimmed off to exclude
the non-included time.
......@@ -735,7 +752,7 @@ def trim_downsample_WFChan(chan, startTm, endTm, firsttime):
chan['data'] = np.hstack(downsampled_data)
def get_eachDay5MinList(startTm, endTm):
def get_eachDay5MinList(startTm: float, endTm: float) -> np.ndarray:
"""
Get the list of all five minute for every day start from the day of startTm
and end at the day of endTm.
......@@ -766,7 +783,8 @@ def get_eachDay5MinList(startTm, endTm):
return every_day_5_min_list
def get_trimTPSData(chan, startTm, endTm, every_day_5_min_list):
def get_trimTPSData(chan: Dict, startTm: float, endTm: float,
every_day_5_min_list: List[List[float]]) -> Optional[bool]:
"""
Different with soh_data where times and data are each in one np.array,
in waveform_data, times and data are each kept in a list of np.memmap
......@@ -865,11 +883,12 @@ def get_trimTPSData(chan, startTm, endTm, every_day_5_min_list):
chan['tps_data'] = tpsData
def findTPSTm(given_tm, each_day_5_min_list):
def findTPSTm(given_tm: float, each_day_5_min_list: List[List[float]]
) -> Tuple[float, float]:
"""
Find the position of the given time (given_tm) in time-power-squared plot
:param given_tm: float - given time
:param every_day_5_min_list: [[288 of floats], ] - the list of all start
:param each_day_5_min_list: [[288 of floats], ] - the list of all start
of five minutes for every day in which each day has 288 of 5 minutes.
:return x_idx: int - index of time in the each_day_5_min_list
:return y_idx: int - index of day plotted
......
......@@ -5,9 +5,15 @@ functions.
"""
import struct
from pathlib import Path
from typing import Dict, List, Tuple, Callable, Union, Optional
from obspy.core import Stream
from sohstationviewer.controller.util import getTime6
from sohstationviewer.model.handling_data import (
readSOHMSeed, readText, checkChan)
from sohstationviewer.view.util.enums import LogType
class futils:
......@@ -411,10 +417,13 @@ class MseedHeader(futils):
return 1
def readHdrs(path2file, fileName,
SOHStreams, logData,
reqSOHChans, reqWFChans,
netsProbInFile, trackInfo):
def readHdrs(path2file: Path, fileName: str,
SOHStreams: Dict[str, Dict[str, Stream]],
logData: Dict[str, Union[List[str], Dict[str, List[str]]]],
reqSOHChans: List[str], reqWFChans: List[str],
netsProbInFile: Dict[Tuple[str, ...], str],
trackInfo: Callable[[str, LogType], None]
) -> Optional[Dict[str, Union[float, str, bool, List[str]]]]:
"""
read headers of a given file build dictionary for quick access
:param path2file: str - path to file
......
......@@ -4,6 +4,9 @@ MSeed object to hold and process MSeed data
import os
from pathlib import Path
from typing import Dict, Tuple, List, Set
from obspy.core import Stream
from sohstationviewer.conf import constants
from sohstationviewer.controller.util import validateFile
......@@ -22,15 +25,15 @@ class MSeed(DataTypeModel):
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# channels: set - available channels got from files among
# channels: available channels got from files among
# requested channels
self.channels = set()
self.channels: Set[str] = set()
# nets: set - available nets got from files
self.nets = set()
# nets: available nets got from files
self.nets: Set[str] = set()
# stats: set - available stats got from files
self.stats = set()
# stats: available stats got from files
self.stats: Set[str] = set()
"""
A file can have more than one experiment which only one of them should
......@@ -38,7 +41,7 @@ class MSeed(DataTypeModel):
netsProbInFile is the dictionary with key is the tuple of all nets
in a file, value is the selected net selected by user.
"""
self.netsProbInFile = {}
self.netsProbInFile: Dict[Tuple[str, ...], str] = {}
if self.creator_thread.isInterruptionRequested():
raise ThreadStopped()
......@@ -53,21 +56,22 @@ class MSeed(DataTypeModel):
if len(self.reqWFChans) != 0:
self.readWFFiles(self.selectedKey)
def read_soh_and_index_waveform(self, folder):
def read_soh_and_index_waveform(self, folder: str):
"""
+ read waveform data for filename associate with time range
+ read soh_stream for unmerged traces
+ merge soh_stream into SOH data and mass position data
+ squash gaps from different soh channels to one
:param folder: str - absolute path to data set folder
:param folder: absolute path to data set folder
"""
self.waveformData, soh_stream = self.index_waveform(folder)
self.SOHData, self.massPosData, self.gaps = self.merge_soh_streams(
soh_stream)
def index_waveform(self, folder):
def index_waveform(self, folder: str
) -> Tuple[Dict, Dict[str, Dict[str, Stream]]]:
"""
:param folder: str - absolute path to data set folder
:param folder: absolute path to data set folder
:return waveformData: a dict for waveform data including
{sta_id: {'filesInfo': {chan_id: [{filepath,time range,trace's info}]}
'readData': { chan_id: {samplerate
......@@ -161,7 +165,8 @@ class MSeed(DataTypeModel):
self.trackInfo(errmsg, LogType.ERROR)
return waveform_data, soh_streams
def merge_soh_streams(self, soh_streams):
def merge_soh_streams(self, soh_streams: Dict[str, Dict[str, Stream]]
) -> Tuple[dict, dict, dict]:
"""
:param soh_streams: a dict of soh channel streams
{sta_id: {chan_id: mseed stream of traces}}
......@@ -169,11 +174,11 @@ class MSeed(DataTypeModel):
data for one network for that station, or combine all networks related
to that station into one data set (by changing network name to the
selected network then merge again)
:return masspos_data: dict - merged data for chan_id with 'VM' prefixed
:return masspos_data: merged data for chan_id with 'VM' prefixed
(structure in DataTypeModel.__init__.massPosData)
:return soh_data: dict - merged data for other soh chan_ids
:return soh_data: merged data for other soh chan_ids
(structure in DataTypeModel.__init__SOHData)
:return gaps: dict - start and end of gaps from all merged soh streams
:return gaps: start and end of gaps from all merged soh streams
are squashed to the largest gaps
(structure in DataTypeModel.__init__gaps)
"""
......@@ -181,7 +186,7 @@ class MSeed(DataTypeModel):
soh_data = {}
masspos_data = {}
gaps = {}
all_gaps = []
all_gaps: List[List[float]] = []
for sta_id in soh_streams:
self.stats.add(sta_id)
self.dataTime[sta_id] = [constants.HIGHEST_INT, 0]
......@@ -232,9 +237,9 @@ class MSeed(DataTypeModel):
gaps[sta_id] = squash_gaps(all_gaps)
return soh_data, masspos_data, gaps
def selectStaID(self):
def selectStaID(self) -> str:
"""
:return selectedStaID: str - the selected station id from self.stats
:return selectedStaID: the selected station id from self.stats
+ If there is only one station id, return it.
+ If there is more than one, show all ids, let user choose one to
return.
......@@ -252,7 +257,7 @@ class MSeed(DataTypeModel):
self.trackInfo(f'Select Station {selectedStaID}', LogType.INFO)
return selectedStaID
def readWFFiles(self, staID):
def readWFFiles(self, staID: str) -> None:
"""
From filesInfo, read all waveform data of requested waveform channels
for given sta_id, in the selected time (from Main Window) to add
......@@ -267,7 +272,7 @@ class MSeed(DataTypeModel):
'times_f': np.memmap's file path for time
'data_f': np.memmap's file path for data
}
:param staID: str - station ID
:param staID: station ID
"""
count = 0
......
from __future__ import annotations
from typing import Callable, Tuple, List, Union, Set, TYPE_CHECKING
from sohstationviewer.conf import constants
from sohstationviewer.controller.util import (
getTime6, getTime4, getVal, rtnPattern)
from sohstationviewer.view.util.enums import LogType
if TYPE_CHECKING:
from sohstationviewer.model.reftek.reftek import RT130
class LogInfo():
def __init__(self, parent, trackInfo, logText, key, reqDSs,
isLogFile=False):
def __init__(self, parent: RT130,
trackInfo: Callable[[str, LogType], None], logText: str,
key: Tuple[str, LogType], reqDSs: List[int],
isLogFile: bool = False):
"""
Help to extract channel data from LogText which include SOH Message and
Event message.
:param parent: reftek object - that calls LogInfo
:param trackInfo: function - to track data processing
:param logText: str - SOH and Event messages in time order
:param key: (str, str) - ID of the data set including unitID and expNo
:param reqDSs: list of str - requested data stream ID
:param isLogFile: bool - flag indicate if this is a log file
:param parent: reftek object that calls LogInfo
:param trackInfo: to track data processing
:param logText: SOH and Event messages in time order
:param key: ID of the data set including unitID and expNo
:param reqDSs: requested data stream ID
:param isLogFile: flag indicate if this is a log file
"""
self.parent = parent
self.trackInfo = trackInfo
......@@ -40,17 +47,17 @@ class LogInfo():
self.model = "72A"
self.maxEpoch = 0
self.minEpoch = constants.HIGHEST_INT
self.chans = self.parent.SOHData[self.key]
self.CPUVers = set()
self.GPSVers = set()
self.chans: dict = self.parent.SOHData[self.key]
self.CPUVers: Set[str] = set()
self.GPSVers: Set[str] = set()
self.extractInfo()
def readEVT(self, line):
def readEVT(self, line: str) -> Union[Tuple[float, int], bool]:
"""
Read EVT info from line for a specific datastream (DS)
:param line: str - a line of evt message
:return epoch: float: epoch time of message
:return DS: int: index of data stream
:param line: a line of evt message
:return epoch: epoch time of message
:return DS: index of data stream
for epoch, using trigger time (TT) if available or
first sample time (FST) if available, otherwise return 0, 0
minEpoch and maxEpoch are updated.
......@@ -78,10 +85,10 @@ class LogInfo():
return 0, 0
return epoch, DS
def readSHHeader(self, line):
def readSHHeader(self, line: str) -> Union[float, bool]:
"""
:param line: str - a line of evt message
:return epoch: float - time for state of health header
:param line: a line of evt message
:return epoch: time for state of health header
minEpoch and maxEpoch are updated.
yAdded is reset to false to allow adding 1 to trackYear
If different unitID is detected, give warning and skip reading.
......@@ -106,12 +113,12 @@ class LogInfo():
False
return epoch
def simpleRead(self, line):
def simpleRead(self, line: str) -> Union[Tuple[List[str], float], bool]:
"""
Read parts and epoch from an SOH line
:param line: str - a line of evt message
:return parts: list of str - parts of line with space delim
:return epoch: float - time when info is recorded
:param line: a line of evt message
:return parts: parts of line with space delim
:return epoch: time when info is recorded
maxEpoch is updated with the epoch time.
"""
# Ex: 186:21:41:35 <content>
......@@ -125,7 +132,8 @@ class LogInfo():
self.maxEpoch = max(epoch, self.maxEpoch)
return parts, epoch
def readIntClockPhaseErr(self, line):
def readIntClockPhaseErr(self, line: str
) -> Union[bool, Tuple[float, float]]:
"""
Read internal clock phase error
:param line: str - a line of evt message
......@@ -142,7 +150,8 @@ class LogInfo():
error *= 1000000.0
return epoch, error
def readBatTemBkup(self, line):
def readBatTemBkup(self, line: str
) -> Union[bool, Tuple[float, float, float, float]]:
"""
Read battery voltage, temperature, backup voltage
:param line: str - a line of evt message
......@@ -170,7 +179,8 @@ class LogInfo():
bkupV = 0.0
return epoch, volts, temp, bkupV
def readDiskUsage(self, line):
def readDiskUsage(self, line: str
) -> Union[bool, Tuple[float, float, float]]:
"""
Read disk usage
:param line: str - a line of evt message
......@@ -191,7 +201,7 @@ class LogInfo():
return False
return epoch, disk, val
def readDPS_ClockDiff(self, line):
def readDPS_ClockDiff(self, line: str) -> Union[bool, Tuple[float, float]]:
"""
Read DPS clock difference
:param line: str - a line of evt message
......@@ -211,7 +221,7 @@ class LogInfo():
total *= -1.0
return epoch, total
def readDefs(self, line):
def readDefs(self, line: str) -> Union[bool, float]:
"""
Read definitions' time. Currently, only read Station Channel Definition
Based on user requested, may use ["STATION", "DATA", "CALIBRATION"]
......@@ -245,7 +255,7 @@ class LogInfo():
# were changed, of course).
return epoch
def readCPUVer(self, line):
def readCPUVer(self, line: str) -> str:
"""
Read version of CPU software
:param line: str - a line of evt message
......@@ -266,7 +276,7 @@ class LogInfo():
CPUVer = " ".join(parts[4:])
return CPUVer
def readGPSVer(self, line):
def readGPSVer(self, line: str) -> str:
"""
Read version of GPS firmware
:param line: str - a line of evt message
......@@ -282,7 +292,7 @@ class LogInfo():
GPSVer = " ".join(verParts[2:])
return GPSVer
def addChanInfo(self, chan_id, t, d, idx):
def addChanInfo(self, chan_id: str, t: float, d: float, idx: int) -> None:
"""
Add information to field orgTrace of channel
{
......@@ -311,7 +321,7 @@ class LogInfo():
self.chans[chan_id]['orgTrace']['data'].append(d)
self.chans[chan_id]['orgTrace']['logIdx'].append(idx)
def extractInfo(self):
def extractInfo(self) -> None:
"""
Extract data from each line of log string to add to
SOH channels's orgTrace using addChanInfo()
......
......@@ -4,6 +4,8 @@ RT130 object to hold and process RefTek data
import os
from pathlib import Path
from typing import Tuple, List
import numpy as np
from obspy.core import Stream
......@@ -31,7 +33,7 @@ class RT130(DataTypeModel):
self.EH = {}
super().__init__(*args, **kwarg)
self.keys = set()
self.reqDSs = self.reqWFChans
self.reqDSs: List[int] = self.reqWFChans
self.massPosStream = {}
if self.creator_thread.isInterruptionRequested():
......@@ -49,11 +51,11 @@ class RT130(DataTypeModel):
if len(self.reqWFChans) != 0:
self.readWFFiles(self.selectedKey)
def readSOH_indexWaveform(self, folder):
def readSOH_indexWaveform(self, folder: str) -> None:
"""
Loop all files in dir to read for soh data, mass position data and
index waveform data with filename and corresponding time range
:param folder: str - absolute path to data set folder
:param folder: absolute path to data set folder
"""
count = 0
for path, subdirs, files in os.walk(folder):
......@@ -72,9 +74,9 @@ class RT130(DataTypeModel):
self.combineData()
def selectKey(self):
def selectKey(self) -> Tuple[str, str]:
"""
:return selectedKey: (str, str) -
:return selectedKey:
(device's serial number, experiment_number)
the selected keys from self.keys.
+ If there is only one key, return it.
......@@ -93,9 +95,9 @@ class RT130(DataTypeModel):
self.trackInfo(f'Select Key {selectedKey}', LogType.INFO)
return selectedKey
def readWFFiles(self, key):
def readWFFiles(self, key: Tuple[str, str]) -> None:
"""
:param key: (str, str) -
:param key:
(device's serial number, experiment_number)
the selected keys from self.keys.
From filesInfo, read all waveform data of requested waveform DS
......@@ -139,13 +141,13 @@ class RT130(DataTypeModel):
self.logData[self.curKey][chan_pkt] = []
self.logData[self.curKey][chan_pkt].append(logInfo)
def readReftek130(self, path2file):
def readReftek130(self, path2file: Path) -> bool:
"""
From the given file:
+ Read SOH data from file with SH packets,
+ read event info, mass position and index waveform (data stream)
file from file with EH or ET packets
:param path2file: str - absolute path to file
:param path2file: absolute path to file
"""
rt130 = core.Reftek130.from_file(path2file)
unique, counts = np.unique(rt130._data["packet_type"],
......@@ -158,11 +160,11 @@ class RT130(DataTypeModel):
self.readEHET_MP_indexWF(rt130)
return True
def readSH(self, path2file):
def readSH(self, path2file: Path) -> None:
"""
Use soh_packet library to read file with SH packet for soh data
to append tuple (time, log string) to log_data[self.curKey][SOH]
:param path2file: str - absolute path to file
:param path2file: absolute path to file
"""
with open(path2file, "rb") as fh:
str = fh.read()
......@@ -178,11 +180,11 @@ class RT130(DataTypeModel):
self.logData[self.curKey]['SOH'] = []
self.logData[self.curKey]['SOH'].append((d['time'], logs))
def readEHET_MP_indexWF(self, rt130):
def readEHET_MP_indexWF(self, rt130: core.Reftek130) -> None:
"""
Files that contents EH or ET packets are data stream files.
There may be 1 - 9 data streams.
:param rt130: rt130 object - of a data stream file in which
:param rt130: of a data stream file in which
+ event info can be found in EH packet and save in self.logData
+ mass position data can be found in data stream 9 and save in
self.masspos_stream
......@@ -220,11 +222,11 @@ class RT130(DataTypeModel):
else:
self.indexWaveForm(rt130, DS)
def readMassPos(self, rt130):
def readMassPos(self, rt130: core.Reftek130) -> None:
"""
Append all traces of data stream 9 to self.massPosStream[self.currKey].
Update dataTime.
:param rt130: object rt130 - for data stream 9
:param rt130: for data stream 9
"""
if self.curKey not in self.massPosStream:
self.massPosStream[self.curKey] = Stream()
......@@ -241,11 +243,11 @@ class RT130(DataTypeModel):
self.dataTime[self.curKey][1] = max(
tr.stats['endtime'].timestamp, self.dataTime[self.curKey][1])
def indexWaveForm(self, rt130, DS):
def indexWaveForm(self, rt130: core.Reftek130, DS: int) -> None:
"""
Indexing by adding rt130 object along with time range to
self.waveformData[self.currKey]['filesInfo']
:param rt130: object rt130 - for data stream DS
:param rt130: for data stream DS
:param DS: int - data stream index
"""
if self.curKey not in self.waveformData:
......@@ -266,7 +268,7 @@ class RT130(DataTypeModel):
'endEpoch': stream[0].stats['endtime'].timestamp,
'read': False})
def combineData(self):
def combineData(self) -> None:
"""
+ SOH and event logData will be processed to create SOHdata:
{Key of set:
......
......@@ -9,7 +9,9 @@ from sohstationviewer.view.util.color import clr
from sohstationviewer.controller.plottingData import (
getTitle, getDayTicks, formatTime)
from sohstationviewer.controller.util import displayTrackingInfo, fmti
from sohstationviewer.controller.util import (
displayTrackingInfo, add_thousand_separator
)
from sohstationviewer.model.handling_data import (
get_trimTPSData, get_eachDay5MinList, findTPSTm)
......@@ -295,7 +297,8 @@ class TimePowerSquaredWidget(plotting_widget.PlottingWidget):
for chan_id in self.plotting_data1:
c_data = self.plotting_data1[chan_id]
data = c_data['tps_data'][day_index, five_min_index]
info_str += f" {chan_id}:{fmti(sqrt(data))}"
info_str += (f" {chan_id}:"
f"{add_thousand_separator(sqrt(data))}")
info_str += " (counts)</pre>"
displayTrackingInfo(self.tracking_box, info_str)
self.draw()
......
......@@ -17,7 +17,7 @@ from sohstationviewer.controller.util import (
getTime4,
getVal,
rtnPattern,
fmti
add_thousand_separator
)
TEST_DATA_DIR = os.path.realpath(os.path.join(
......@@ -317,27 +317,29 @@ class TestGetVal(TestCase):
class TestFmti(TestCase):
"""Test suite for fmti."""
"""Test suite for add_thousand_separator."""
def test_absolute_value_below_1000(self):
"""
Test fmti - the input is greater than -1000 but smaller than 1000.
Test add_thousand_separator - the input is greater than -1000
but smaller than 1000.
"""
with self.subTest('test_positive'):
val = 52.521
self.assertEqual(fmti(val), '52')
self.assertEqual(add_thousand_separator(val), '52')
with self.subTest('test_negative'):
val = -232.42
self.assertEqual(fmti(val), '-232')
self.assertEqual(add_thousand_separator(val), '-232')
def test_absolute_value_above_1000(self):
"""
Test fmti - the input is greater than 1000 or smaller than -1000.
Test add_thousand_separator - the input is greater than 1000 or
smaller than -1000.
"""
with self.subTest('test_positive'):
val = 136235646.215151
expected = '136,235,646'
self.assertEqual(fmti(val), expected)
self.assertEqual(add_thousand_separator(val), expected)
with self.subTest('test_negative'):
val = -62362.32523
expected = '-62,362'
self.assertEqual(fmti(val), expected)
self.assertEqual(add_thousand_separator(val), expected)