Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • software_public/passoft/sohstationviewer
1 result
Show changes
Commits on Source (6)
Showing
with 226 additions and 105 deletions
......@@ -18,18 +18,10 @@ def get_chan_plot_info(org_chan_id: str, data_type: str,
:return info of channel read from DB which is used for plotting
"""
chan = org_chan_id
if org_chan_id.startswith('EX'):
chan = 'EX?'
if org_chan_id.startswith('VM'):
chan = 'VM?'
if org_chan_id.startswith('MassPos'):
chan = 'MassPos?'
chan = convert_actual_channel_to_db_channel_w_question_mark(chan)
if org_chan_id.startswith('DS'):
chan = 'SEISMIC'
if org_chan_id.startswith('Event DS'):
chan = 'Event DS?'
if org_chan_id.startswith('Disk Usage'):
chan = 'Disk Usage?'
if dbConf['seisRE'].match(chan):
chan = 'SEISMIC'
# The valueColors for each color mode is stored in a separate column.
......@@ -74,7 +66,16 @@ def get_chan_plot_info(org_chan_id: str, data_type: str,
return chan_db_info[0]
def get_convert_factor(chan_id, data_type):
def get_convert_factor(chan_id: str, data_type: str) -> float:
"""
Get the convert factor to convert data from count (value read from file)
to actual value to display.
:param chan_id: actual channel name read from file
:param data_type: type of data in data set
:return: converting factor
"""
chan_id = convert_actual_channel_to_db_channel_w_question_mark(chan_id)
sql = f"SELECT convertFactor FROM Channels WHERE channel='{chan_id}' " \
f"AND dataType='{data_type}'"
ret = execute_db(sql)
......@@ -84,6 +85,24 @@ def get_convert_factor(chan_id, data_type):
return None
def convert_actual_channel_to_db_channel_w_question_mark(chan_id: str) -> str:
"""
The digit in channel end with a digit is represented with the question
mark '?' in DB. This function change the real channel name to DB
channel name with '?'.
:param chan_id: real channel name
:return chan_id: channel name with '?' at the end if available
"""
sql = "SELECT * FROM Channels WHERE channel like '%?'"
ret = execute_db(sql)
question_channels = [c[0][:-1] for c in ret]
if any(chan_id.startswith(x) for x in question_channels):
if chan_id[-1].isdigit():
# to prevent the case prefix similar to prefix of channel w/o ?
chan_id = chan_id[:-1] + '?'
return chan_id
def get_seismic_chan_label(chan_id):
"""
Get label for chan_id in which data stream can use chan_id for label while
......
No preview for this file type
......@@ -244,7 +244,7 @@ class LogInfo():
# saved until the DAS is reset, so if this is a log file then use the
# current SOH time for plotting the points, instead of what is in the
# message line.
if parts[0] in ["STATION"]:
if parts[0] in ["STATION", "DATA", "CALIBRATION", "OPERATING"]:
if self.is_log_file is False:
try:
epoch, _ = get_time_6(parts[-3])
......
......@@ -150,18 +150,19 @@ class Plotting:
has_min_max_lines=False)
val_cols = chan_db_info['valueColors'].split('|')
points_list = []
colors = []
# up/down has 2 values: 0, 1 which match with index of points_list
points_list = [[], []]
colors = [[], []]
for vc in val_cols:
v, c = vc.split(':')
val = get_val(v)
val = int(get_val(v))
points = []
for times, data in zip(c_data['times'], c_data['data']):
points += [times[i]
for i in range(len(data))
if data[i] == val]
points_list.append(points)
colors.append(c)
points_list[val] = points
colors[val] = c
# down dots
ax.plot(points_list[0], len(points_list[0]) * [-0.5], linestyle="",
......
......@@ -194,6 +194,10 @@ class PlottingWidget(QtWidgets.QScrollArea):
DataTypeModel.__init__.mass_pos_data[key]
"""
self.plotting_data2 = {}
# List of SOH message lines in RT130 to display in info box when
# there're more than 2 lines for one data point clicked
self.rt130_log_data: Optional[List[str]] = None
# ----------------------------------------------------------------
QtWidgets.QScrollArea.__init__(self)
......@@ -305,54 +309,64 @@ class PlottingWidget(QtWidgets.QScrollArea):
"""
self.is_button_press_event_triggered_pick_event = True
artist = event.artist
if isinstance(artist, pl.Line2D):
ax = artist.axes
chan_id = ax.chan
if not isinstance(artist, pl.Line2D):
return
ax = artist.axes
chan_id = ax.chan
try:
chan_data = self.plotting_data1[chan_id]
except KeyError:
# in case of mass position
chan_data = self.plotting_data2[chan_id]
# list of x values of the plot
x_list = artist.get_xdata()
# list of y values of the plot
y_list = artist.get_ydata()
# index of the clicked point on the plot
click_plot_index = event.ind[0]
# time, val of the clicked point
clicked_time = x_list[click_plot_index]
clicked_val = y_list[click_plot_index]
real_idxes = get_index_from_data_picked(
chan_data, clicked_time, clicked_val)
if len(real_idxes) == 0:
display_tracking_info(self.tracking_box, "Point not found.")
return
clicked_data = chan_data['data'][0][real_idxes[0]]
if chan_id.startswith('Disk Usage'):
clicked_data = get_disk_size_format(clicked_data)
if hasattr(ax, 'unit_bw'):
clicked_data = ax.unit_bw.format(clicked_data)
formatted_clicked_time = format_time(
clicked_time, self.date_mode, 'HH:MM:SS')
info_str = (f"<pre>Channel: {chan_id} "
f"Point:{click_plot_index + 1} "
f"Time: {formatted_clicked_time} "
f"Value: {clicked_data}</pre>")
if self.rt130_log_data is not None:
log_idxes = [chan_data['logIdx'][0][idx]
for idx in real_idxes]
if len(real_idxes) > 1:
info_str = info_str.replace(
"</pre>", f" ({len(real_idxes)} lines)")
for idx in log_idxes:
info_str += (
"<pre> " + self.rt130_log_data[idx] + "</pre>")
display_tracking_info(self.tracking_box, info_str)
if 'logIdx' in chan_data.keys():
# For Reftek, need to hightlight the corresponding
# SOH message lines based on the log_idxes of the clicked point
self.parent.search_message_dialog.show()
try:
chan_data = self.plotting_data1[chan_id]
except KeyError:
# in case of mass position
chan_data = self.plotting_data2[chan_id]
# list of x values of the plot
x_list = artist.get_xdata()
# list of y values of the plot
y_list = artist.get_ydata()
# index of the clicked point on the plot
click_plot_index = event.ind[0]
# time, val of the clicked point
clicked_time = x_list[click_plot_index]
clicked_val = y_list[click_plot_index]
real_idx = get_index_from_data_picked(
chan_data, clicked_time, clicked_val)
if real_idx is None:
display_tracking_info(self.tracking_box, "Point not found.")
return
clicked_data = chan_data['data'][0][real_idx]
if chan_id.startswith('Disk Usage'):
clicked_data = get_disk_size_format(clicked_data)
elif hasattr(ax, 'unit_bw'):
clicked_data = ax.unit_bw.format(clicked_data)
formatted_clicked_time = format_time(
clicked_time, self.date_mode, 'HH:MM:SS')
info_str = (f"<pre>Channel: {chan_id} "
f"Point:{click_plot_index + 1} "
f"Time: {formatted_clicked_time} "
f"Value: {clicked_data}</pre>")
display_tracking_info(self.tracking_box, info_str)
if 'logIdx' in chan_data.keys():
# For Reftek, need to hightlight the corresponding
# SOH message line based on the log_idx of the clicked point
self.parent.search_message_dialog.show()
clicked_log_idx = chan_data['logIdx'][0][real_idx]
try:
self.parent.search_message_dialog. \
show_log_entry_from_data_index(clicked_log_idx)
except ValueError as e:
QtWidgets.QMessageBox.warning(self, "Not found",
str(e))
self.parent.search_message_dialog. \
show_log_entry_from_log_indexes(log_idxes)
except ValueError as e:
QtWidgets.QMessageBox.warning(self, "Not found",
str(e))
def on_button_press_event(self, event):
"""
......
from typing import List, Optional
from typing import Dict, Optional
import numpy as np
def get_index_from_data_picked(
chan_data: List[np.ndarray], tm: float, val: float) -> Optional[int]:
chan_data: Dict, tm: float, val: float) -> np.ndarray:
"""
Get index of data picked
:param chan_data: dict of data to plot that includes 'times', 'data' key
:param tm: epoch time of a clicked point
:param val: data value of a clicked point
:return section_idx: index of tm inside np.ndarray found
:return real_indexes: list index of data point inside np.ndarray found
"""
if chan_data['chan_db_info']['plotType'] == 'upDownDots':
# actual plotting has value -0.5 or 0.5;
......@@ -24,9 +24,7 @@ def get_index_from_data_picked(
else:
real_indexes = np.where((chan_data['times'][0] == tm) &
(chan_data['data'][0] == val))[0]
if len(real_indexes) != 1:
return
return real_indexes[0]
return real_indexes
def get_total_miny_maxy(
......
......@@ -33,6 +33,12 @@ class SOHWidget(MultiThreadedPlottingWidget):
self.data_object = d_obj
self.plotting_data1 = d_obj.soh_data[key] if key else {}
self.plotting_data2 = d_obj.mass_pos_data[key] if key else {}
self.rt130_log_data = None
if self.data_object.data_type == 'RT130':
try:
self.rt130_log_data = d_obj.log_data[key]['SOH'][0].split('\n')
except KeyError:
pass
channel_list = d_obj.soh_data[key].keys() if key else []
data_time = d_obj.data_time[key] if key else [0, 1]
ret = super().init_plot(d_obj, data_time, key, start_tm, end_tm,
......
......@@ -4,7 +4,8 @@ from pathlib import PosixPath, Path
from typing import Dict, List, Tuple, Callable, Union, Optional
from PySide6 import QtGui, QtCore, QtWidgets
from PySide6.QtWidgets import QStyle
from PySide6.QtWidgets import QStyle, QAbstractItemView
from PySide6.QtGui import QPalette
from sohstationviewer.view.search_message.highlight_delegate import (
HighlightDelegate)
......@@ -394,6 +395,14 @@ class SearchMessageDialog(QtWidgets.QWidget):
"""
# add 1 extra column to show scroll bar (+ 1)
table = QtWidgets.QTableWidget(rows, cols + 1)
# To prevent selected row not grey out. It still gets faded out, but
# the color remain blue which is better than grey out.
p = table.palette()
p.setBrush(QPalette.Inactive, QPalette.Highlight,
p.brush(QPalette.Highlight))
table.setPalette(p)
delegate = HighlightDelegate(table, self.display_color)
table.setItemDelegate(delegate)
# Hide header cells
......@@ -487,9 +496,47 @@ class SearchMessageDialog(QtWidgets.QWidget):
item : QTableWidgetItem
A valid QTableWidgetIem
"""
self.current_table.scrollToItem(item)
self.current_table.scrollToItem(item, QAbstractItemView.PositionAtTop)
self.current_table.setFocus()
def show_log_entry_from_log_indexes(self, log_indexes: List[int]):
"""
This is called when clicking a clickable data point on a SOH channel
of RT130, list of log row indexes will be passed to this method.
This method will:
+ set current tab to soh_table_dict['SOH']
+ scroll the first indexed row to top of table and highlight all
the row in log_indexes
Parameters
----
log_indexes : The list of indexes of log row to be selected
"""
if 'SOH' not in self.soh_tables_dict:
return
# switch to SOH tab
self.tab_widget.setCurrentWidget(self.soh_tables_dict['SOH'])
self.current_table.clearSelection()
# Allow to select multiple rows
self.current_table.setSelectionMode(
QAbstractItemView.MultiSelection)
# select all rows according to log_indexes
for idx in log_indexes:
self.current_table.selectRow(idx)
# scroll to the first index and place the row at top of the table
self.current_table.scrollToItem(
self.current_table.item(log_indexes[0], 1),
QAbstractItemView.PositionAtTop
)
# raise the message dialog on top of others
self.setWindowState(QtCore.Qt.WindowState.WindowActive)
self.raise_()
self.activateWindow()
self.current_table.setFocus() # focus row to not faded out
# return back to select single row
self.current_table.setSelectionMode(
QAbstractItemView.SingleSelection)
def show_log_entry_from_data_index(self, data_index: int):
"""
This is called when clicking a clickable data point on a SOH channel
......@@ -632,7 +679,8 @@ class SearchMessageDialog(QtWidgets.QWidget):
if ret is None:
return
self.selected_item, self.search_rowidx = ret
self.current_table.scrollToItem(self.selected_item)
self.current_table.scrollToItem(self.selected_item,
QAbstractItemView.PositionAtTop)
def _filter_lines_with_search_text_from_soh_messages(self):
"""
......
# UI and connectSignals for main_window
import configparser
from pathlib import Path
from typing import Union, List, Optional
from PySide6 import QtCore, QtGui, QtWidgets
......@@ -286,7 +285,7 @@ class UIMainWindow(object):
self.set_first_row(main_layout)
self.set_second_row(main_layout)
self.tracking_info_text_browser.setFixedHeight(60)
self.tracking_info_text_browser.setFixedHeight(80)
main_layout.addWidget(self.tracking_info_text_browser)
self.create_menu_bar(main_window)
self.connect_signals(main_window)
......
......@@ -6,11 +6,13 @@ from sohstationviewer.database.extract_data import (
get_signature_channels,
get_color_def,
get_color_ranges,
convert_actual_channel_to_db_channel_w_question_mark,
get_convert_factor
)
class TestExtractData(unittest.TestCase):
def test_get_chan_plot_info_good_soh_channel_and_data_type(self):
class TestGetChanPlotInfo(unittest.TestCase):
def test_good_soh_channel_and_data_type(self):
"""
Test basic functionality of get_chan_plot_info - channel and data type
combination exists in database table `Channels`
......@@ -27,7 +29,7 @@ class TestExtractData(unittest.TestCase):
self.assertDictEqual(get_chan_plot_info('SOH/Data Def', 'RT130'),
expected_result)
def test_get_chan_plot_info_masspos_channel(self):
def test_masspos_channel(self):
with self.subTest("Mass position 'VM'"):
expected_result = {'channel': 'VM1',
'plotType': 'linesMasspos',
......@@ -54,7 +56,7 @@ class TestExtractData(unittest.TestCase):
self.assertDictEqual(get_chan_plot_info('MassPos1', 'RT130'),
expected_result)
def test_get_chan_plot_info_seismic_channel(self):
def test_seismic_channel(self):
with self.subTest("RT130 Seismic"):
expected_result = {'channel': 'DS2',
'plotType': 'linesSRate',
......@@ -81,7 +83,7 @@ class TestExtractData(unittest.TestCase):
self.assertDictEqual(get_chan_plot_info('LHE', 'Q330'),
expected_result)
def test_get_chan_plot_info_data_type_is_unknown(self):
def test_data_type_is_unknown(self):
"""
Test basic functionality of get_chan_plot_info - data type is the
string 'Unknown'.
......@@ -113,7 +115,7 @@ class TestExtractData(unittest.TestCase):
get_chan_plot_info('LCE', 'Unknown'),
expected_result)
def test_get_chan_plot_info_bad_channel_or_data_type(self):
def test_bad_channel_or_data_type(self):
"""
Test basic functionality of get_chan_plot_info - channel and data type
combination does not exist in database table Channels and data type is
......@@ -159,7 +161,9 @@ class TestExtractData(unittest.TestCase):
self.assertDictEqual(get_chan_plot_info('SOH/Data Def', 'Q330'),
expected_result)
def test_get_seismic_chan_label_good_channel_id(self):
class TestGetSeismicChanLabel(unittest.TestCase):
def test_good_channel_id(self):
"""
Test basic functionality of get_seismic_chan_label - channel ID ends
in one of the keys in conf.dbSettings.dbConf['seisLabel'] or
......@@ -174,7 +178,7 @@ class TestExtractData(unittest.TestCase):
self.assertEqual(get_seismic_chan_label('DS-TEST-CHANNEL'),
'DS-TEST-CHANNEL')
def test_get_chan_label_bad_channel_id(self):
def test_bad_channel_id(self):
"""
Test basic functionality of get_seismic_chan_label - channel ID does
not end in one of the keys in conf.dbSettings.dbConf['seisLabel']
......@@ -182,16 +186,22 @@ class TestExtractData(unittest.TestCase):
"""
self.assertRaises(IndexError, get_seismic_chan_label, '')
class TestGetSignatureChannels(unittest.TestCase):
def test_get_signature_channels(self):
"""Test basic functionality of get_signature_channels"""
self.assertIsInstance(get_signature_channels(), dict)
class TestGetColorDef(unittest.TestCase):
def test_get_color_def(self):
"""Test basic functionality of get_color_def"""
colors = get_color_def()
expected_colors = ['K', 'U', 'C', 'G', 'Y', 'R', 'M', 'E']
self.assertListEqual(colors, expected_colors)
class TestGetColorRanges(unittest.TestCase):
def test_get_color_ranges(self):
"""Test basic functionality of get_color_ranges"""
names, all_counts, all_display_strings = get_color_ranges()
......@@ -212,3 +222,23 @@ class TestExtractData(unittest.TestCase):
# Check that each list of strings to display have enough items
for display_strings in all_display_strings:
self.assertEqual(len(display_strings), num_color_def + 1)
class TestConvertActualChannelToDBChannelWQuestionMark(unittest.TestCase):
def test_question_channel(self):
ret = convert_actual_channel_to_db_channel_w_question_mark('VM1')
self.assertEqual(ret, 'VM?')
def test_non_question_channel(self):
ret = convert_actual_channel_to_db_channel_w_question_mark('VCO')
self.assertEqual(ret, 'VCO')
class TestGetConvertFactor(unittest.TestCase):
def test_question_channel(self):
ret = get_convert_factor('VM1', 'Centaur')
self.assertEqual(ret, 10**(-6))
def test_non_question_channel(self):
ret = get_convert_factor('VEP', 'Q330')
self.assertEqual(ret, 0.15)
......@@ -5,13 +5,13 @@ from sohstationviewer.view.plotting.plotting_widget.plotting_widget_helper \
import get_total_miny_maxy, get_index_from_data_picked
class TestGetIndexFromTime(TestCase):
class TestGetIndexFromDataPicked(TestCase):
@classmethod
def setUpClass(cls) -> None:
cls.plotting_data = {
'CH1': {
'times': [np.array([1, 2, 3, 4, 5, 6])],
'data': [np.array([1, 1, 0, 1, 1, 0])],
'times': [np.array([1, 2, 3, 4, 5, 6, 6])],
'data': [np.array([1, 1, 0, 1, 1, 0, 0])],
'chan_db_info': {'plotType': 'upDownDots'}
},
'CH2': {
......@@ -27,45 +27,51 @@ class TestGetIndexFromTime(TestCase):
}
def test_time_not_included(self):
real_idx = get_index_from_data_picked(
real_idxes = get_index_from_data_picked(
self.plotting_data['CH1'], 7, 1)
self.assertIsNone(real_idx)
self.assertEqual(len(real_idxes), 0)
def test_type_not_need_data_info(self):
# CH3 has plotType='dotForTime' in ["multiColorDots", "dotForTime"])
real_idx = get_index_from_data_picked(
def test_type_not_need_data_val(self):
# CH3 has plotType='dotForTime'
# which is in ["multiColorDots", "dotForTime"])
real_idxes = get_index_from_data_picked(
self.plotting_data['CH3'], 4, 4)
self.assertEqual(real_idx, 3)
self.assertEqual(real_idxes.tolist(), [3])
def test_type_need_data_info(self):
def test_type_need_data_val(self):
# CH2 has plotType='linesDots' not in ["multiColorDots", "dotForTime"])
with self.subTest('data not match time'):
real_idx = get_index_from_data_picked(
real_idxes = get_index_from_data_picked(
self.plotting_data['CH2'], 3, 5)
self.assertIsNone(real_idx)
self.assertEqual(len(real_idxes), 0)
with self.subTest('data match 1st value'):
real_idx = get_index_from_data_picked(
real_idxes = get_index_from_data_picked(
self.plotting_data['CH2'], 3, 7)
self.assertEqual(real_idx, 2)
self.assertEqual(real_idxes.tolist(), [2])
with self.subTest('data match 2nd value'):
real_idx = get_index_from_data_picked(
real_idxes = get_index_from_data_picked(
self.plotting_data['CH2'], 3, 4)
self.assertEqual(real_idx, 3)
self.assertEqual(real_idxes.tolist(), [3])
def test_type_up_down(self):
# CH1 has plotType='upDownDots'
with self.subTest('data not match time'):
real_idx = get_index_from_data_picked(
real_idxes = get_index_from_data_picked(
self.plotting_data['CH1'], 1, -0.5)
self.assertIsNone(real_idx)
self.assertEqual(len(real_idxes), 0)
with self.subTest('data=1 match time'):
real_idx = get_index_from_data_picked(
real_idxes = get_index_from_data_picked(
self.plotting_data['CH1'], 1, 0.5)
self.assertEqual(real_idx, 0)
self.assertEqual(real_idxes.tolist(), [0])
with self.subTest('data=0 match time'):
real_idx = get_index_from_data_picked(
real_idxes = get_index_from_data_picked(
self.plotting_data['CH1'], 3, -0.5)
self.assertEqual(real_idx, 2)
self.assertEqual(real_idxes.tolist(), [2])
def test_2_overlapped_points(self):
real_idxes = get_index_from_data_picked(
self.plotting_data['CH1'], 6, -0.5)
self.assertEqual(real_idxes.tolist(), [5, 6])
class TestGetTotalMinyMaxy(TestCase):
......