Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
S
SOHViewer
Manage
Activity
Members
Labels
Plan
Issues
14
Issue boards
Milestones
Iterations
Wiki
Requirements
Code
Merge requests
5
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Releases
Package Registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Service Desk
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Software Public
PASSOFT
SOHViewer
Merge requests
!191
Read log files generated by users
Code
Review changes
Check out branch
Download
Patches
Plain diff
Merged
Read log files generated by users
feature-#94-read_log_files
into
master
Overview
11
Commits
36
Pipelines
8
Changes
6
Merged
Kien Le
requested to merge
feature-#94-read_log_files
into
master
1 year ago
Overview
11
Commits
36
Pipelines
8
Changes
2
Expand
Let the program reads file generated by rt2ms, logpeek, or itself.
#94 (closed)
Edited
1 year ago
by
Kien Le
👍
0
👎
0
Merge request reports
Viewing commit
47ae466b
Show latest version
2 files
+
37
−
13
Inline
Compare changes
Side-by-side
Inline
Show whitespace changes
Show one file at a time
Files
2
Search (e.g. *.vue) (Ctrl+P)
47ae466b
Refactored mass-position log processing
· 47ae466b
Kien Le
authored
1 year ago
sohstationviewer/model/reftek_data/reftek_reader/log_file_reader.py
0 → 100644
+
293
−
0
Options
from
pathlib
import
Path
from
typing
import
List
,
Literal
,
Optional
,
Dict
,
Callable
,
Tuple
import
numpy
as
np
from
obspy
import
UTCDateTime
# The possible formats for a log file.
LogFileFormat
=
Literal
[
'
rt2ms
'
,
'
logpeek
'
,
'
sohstationviewer
'
]
# These packets can be found in section 4 of the RT130 record documentation.
RT130_PACKETS
=
[
'
SH
'
,
'
SC
'
,
'
OM
'
,
'
DS
'
,
'
AD
'
,
'
CD
'
,
'
FD
'
,
'
EH
'
,
'
ET
'
]
# The lists of event lines, SOH lines, and mass-position lines in a packet.
SeparatedPacketLines
=
Tuple
[
List
[
str
],
List
[
str
],
List
[
str
]]
def
detect_log_packet_format
(
packet
:
List
[
str
])
->
Optional
[
LogFileFormat
]:
"""
Detect the format of a log file packet. The format can be either rt2ms
'
,
logpeek
'
s, or SOHStationViewer
'
s.
:param packet: a packet extracted from a log file.
:return: the format of packet. Can be either
'
rt2ms
'
,
'
logpeek
'
, or
'
sohstationviewer
'
.
"""
# We want to take advantage of the metadata written by the various programs
# as much as possible.
if
packet
[
0
].
startswith
(
'
logpeek
'
):
return
'
logpeek
'
elif
packet
[
0
].
startswith
(
'
rt2ms
'
):
return
'
rt2ms
'
elif
packet
[
0
].
startswith
(
'
sohstationviewer
'
):
return
'
sohstationviewer
'
packet_start
=
packet
[
0
]
# The first line of a packet in a log file generated by rt2ms starts with
# a 2-letter packet type. That is then followed by an empty space and the
# string 'exp'.
if
packet_start
[:
2
]
in
RT130_PACKETS
and
packet_start
[
3
:
6
]
==
'
exp
'
:
return
'
rt2ms
'
# SOHStationViewer stores all events' info at the end of its log file, and
# so if we see the line
# Events:
# at the start of a packet, we know the log file is from SOHStationViewer.
if
packet_start
.
startswith
(
'
Events:
'
):
return
'
sohstationviewer
'
# Unlike logpeek, we are writing the mass position in its own packet. So,
# a packet that starts with mass position data would be an SOHStationViewer
# packet.
if
packet_start
.
startswith
(
'
LPMP
'
):
return
'
sohstationviewer
'
# Logpeek write its events' info and mass-position data right after an SH
# packet.
packet_end
=
packet
[
-
1
]
packet_end_with_event_info
=
(
packet_end
.
startswith
(
'
DAS
'
)
or
packet_end
.
startswith
(
'
WARNING
'
))
packet_end_with_mass_pos
=
packet_end
.
startswith
(
'
LPMP
'
)
packet_end_special
=
packet_end_with_event_info
or
packet_end_with_mass_pos
if
packet_start
.
startswith
(
'
State of Health
'
)
and
packet_end_special
:
return
'
logpeek
'
def
parse_log_packet_unknown_format
(
packet
:
List
[
str
])
->
SeparatedPacketLines
:
"""
Parse a log packet assuming that the format of the log file is unknown. In
this case, all the lines in the packet are SOH lines.
:param packet: list of lines in the packet
:return: the lists of event lines, SOH lines, and mass-position lines in
packet
"""
eh_et_lines
=
[]
soh_lines
=
packet
masspos_lines
=
[]
return
eh_et_lines
,
soh_lines
,
masspos_lines
def
parse_log_packet_logpeek
(
packet
:
List
[
str
])
->
SeparatedPacketLines
:
"""
Parse a log packet assuming that the log file comes from logpeek. In this
case, a packet can be composed of SOH lines, event info lines, and
mass-position lines.
:param packet: list of lines in the packet
:return: the lists of event lines, SOH lines, and mass-position lines in
packet
"""
eh_et_lines
=
[]
soh_lines
=
[]
masspos_lines
=
[]
for
line
in
packet
:
if
line
.
startswith
(
'
DAS:
'
):
eh_et_lines
.
append
(
line
)
elif
line
.
startswith
(
'
LPMP
'
):
masspos_lines
.
append
(
line
)
else
:
soh_lines
.
append
(
line
)
return
eh_et_lines
,
soh_lines
,
masspos_lines
def
parse_log_packet_rt2ms
(
packet
:
List
[
str
])
->
SeparatedPacketLines
:
"""
Parse a log packet assuming that the log file comes from rt2ms. In this
case, SOH data and event info are stored in separate packets. The first
line of a packet is a header that contains some metadata.
:param packet: list of lines in the packet
:return: the lists of event lines, SOH lines, and mass-position lines in
packet
"""
eh_et_lines
=
[]
soh_lines
=
[]
masspos_lines
=
[]
if
packet
[
0
].
startswith
(
'
EH
'
)
or
packet
[
0
].
startswith
(
'
ET
'
):
# The event info is summarized in the last line of an event info packet
eh_et_lines
=
[
packet
[
-
1
]]
else
:
# The header is not counted as an SOH line.
soh_lines
=
packet
[
1
:]
return
eh_et_lines
,
soh_lines
,
masspos_lines
def
parse_log_packet_sohstationviewer
(
packet
:
List
[
str
]
)
->
SeparatedPacketLines
:
"""
Parse a log packet assuming that the log file comes from sohstationviewer.
In this case, the file is composed mainly of SOH packets, with the event
info lines being written at the end of the file.
:param packet: list of lines in the packet
:return: the lists of event lines, SOH lines, and mass-position lines in
packet
"""
eh_et_lines
=
[]
soh_lines
=
[]
masspos_lines
=
[]
if
packet
[
0
].
startswith
(
'
Events:
'
):
eh_et_lines
=
packet
[
1
:]
else
:
soh_lines
=
packet
return
eh_et_lines
,
soh_lines
,
masspos_lines
class
LogFile
:
"""
Iterator over a log file.
"""
def
__init__
(
self
,
file_path
:
Path
):
self
.
file_path
=
file_path
self
.
file
=
open
(
file_path
)
def
__iter__
(
self
):
return
self
def
__next__
(
self
)
->
List
[
str
]:
line
=
self
.
file
.
readline
()
if
line
==
''
:
self
.
file
.
close
()
raise
StopIteration
# The log packets are separated by empty lines, so we know we have
# reached the next packet when we find a non-empty line.
while
line
==
'
\n
'
:
line
=
self
.
file
.
readline
()
packet
=
[]
# We have to check that we are not at the end of the file as well.
while
line
!=
'
\n
'
and
line
!=
''
:
packet
.
append
(
line
)
line
=
self
.
file
.
readline
()
if
line
==
''
:
break
# If there are more than one blank lines at the end of a log file, the
# last packet will be empty. This causes problem if the log file came
# from rt2ms or SOHStationViewer.
if
not
packet
:
self
.
file
.
close
()
raise
StopIteration
return
packet
def
__del__
(
self
):
"""
Close the file handle when this iterator is garbage collected just to
be absolutely sure that no memory is leaked.
"""
self
.
file
.
close
()
# A function that take in a log file packet and separate it into event info
# lines, SOH lines, and mass-position lines.
Parser
=
Callable
[[
List
[
str
]],
SeparatedPacketLines
]
# Mapping each log packet type to its corresponding parser.
PACKET_PARSERS
:
Dict
[
Optional
[
LogFileFormat
],
Parser
]
=
{
None
:
parse_log_packet_unknown_format
,
'
sohstationviewer
'
:
parse_log_packet_sohstationviewer
,
'
rt2ms
'
:
parse_log_packet_rt2ms
,
'
logpeek
'
:
parse_log_packet_logpeek
}
def
get_experiment_number
(
soh_lines
:
List
[
str
]):
"""
Get the experiment number from the list of SOH lines in a packet.
:param soh_lines: the list of SOH lines from a packet
:return: the experiment number if the packet has the correct format, None
otherwise
"""
# The experiment number only exists in SC packets.
if
not
soh_lines
[
0
].
startswith
(
'
Station Channel Definition
'
):
return
None
# The experiment number can be in the first (rt2ms) or second (logpeek,
# SOHStationViewer) line after the header. These lines are indented, so we
# have to strip them of whitespace.
if
soh_lines
[
1
].
strip
().
startswith
(
'
Experiment Number =
'
):
experiment_number_line
=
soh_lines
[
1
].
split
()
elif
soh_lines
[
2
].
strip
().
startswith
(
'
Experiment Number =
'
):
experiment_number_line
=
soh_lines
[
2
].
split
()
else
:
return
None
# If the experiment number is not recorded, we know that it will be 0. In
# order to not have too many return statements, we add 0 to the experiment
# line instead of returning it immediately.
if
len
(
experiment_number_line
)
<
4
:
experiment_number_line
.
append
(
'
0
'
)
return
experiment_number_line
[
-
1
]
class
LogFileReader
:
"""
Class that reads a log file.
"""
def
__init__
(
self
,
file_path
:
Path
):
self
.
file_path
=
file_path
self
.
log_file_type
:
Optional
[
LogFileFormat
]
=
None
self
.
eh_et_lines
:
List
[
str
]
=
[]
self
.
soh_lines
:
List
[
str
]
=
[]
self
.
masspos_lines
:
List
[
str
]
=
[]
self
.
station_code
:
Optional
[
str
]
=
None
self
.
experiment_number
:
Optional
[
str
]
=
None
def
read
(
self
)
->
None
:
"""
Read the log file.
"""
log_file
=
LogFile
(
self
.
file_path
)
for
packet
in
log_file
:
if
self
.
log_file_type
is
None
:
self
.
log_file_type
=
detect_log_packet_format
(
packet
)
parser
=
PACKET_PARSERS
[
self
.
log_file_type
]
eh_et_lines
,
soh_lines
,
masspos_lines
=
parser
(
packet
)
if
self
.
station_code
is
None
and
soh_lines
:
# All header lines contain the station code at the end.
self
.
station_code
=
soh_lines
[
0
].
split
(
'
'
)[
-
1
].
strip
()
if
self
.
experiment_number
is
None
and
soh_lines
:
found_experiment_number
=
get_experiment_number
(
soh_lines
)
self
.
experiment_number
=
found_experiment_number
self
.
eh_et_lines
.
extend
(
eh_et_lines
)
self
.
soh_lines
.
extend
(
soh_lines
)
# We need to add a new line between two blocks of SOH lines to
# separate them. This makes it so that we don't have to manually
# separate the SOH lines blocks during processing.
self
.
soh_lines
.
append
(
'
\n
'
)
self
.
masspos_lines
.
extend
(
masspos_lines
)
def
process_mass_poss_line
(
masspos_lines
:
List
[
str
]
)
->
List
[
Tuple
[
np
.
ndarray
,
np
.
ndarray
]]:
"""
Process a list of mass-position lines into a list of mass-position data,
sorted by the channel suffix
:param masspos_lines: a list of mass-position log lines
:return: a list of mass-position data, sorted by the channel suffix
"""
# There can be 6 mass-position channels.
mass_pos_data
=
[]
for
masspos_num
in
range
(
1
,
7
):
current_lines
=
[
line
.
split
()
for
line
in
masspos_lines
if
int
(
line
.
split
()[
2
])
==
masspos_num
]
data
=
np
.
asarray
([
line
[
3
]
for
line
in
current_lines
],
dtype
=
float
)
time_format
=
'
%Y:%j:%H:%M:%S.%f
'
times
=
np
.
array
(
# strptime requires the microsecond component to have 6 digits, but
# a mass-position log lines only have 3 digits for microsecond. So,
# we have to pad the time with 0s.
[
UTCDateTime
.
strptime
(
line
[
1
]
+
'
000
'
,
time_format
).
timestamp
for
line
in
current_lines
]
)
mass_pos_data
.
append
((
times
,
data
))
return
mass_pos_data
Loading