Skip to content
Snippets Groups Projects
Commit d40e4175 authored by Maeva Pourpoint's avatar Maeva Pourpoint
Browse files

Module handling data prep before conversion to mseed files

parent ab09df5f
No related branches found
No related tags found
1 merge request!4Data handling functionalities
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Routines for data prep before conversion to mseed files.
Maeva Pourpoint - IRIS/PASSCAL
"""
from __future__ import annotations
import logging
import logging.config
import numpy as np
import re
import sys
from obspy import UTCDateTime
from pathlib import Path, PosixPath
from typing import Dict, List, Optional
from lemi2seed.utils import (convert_time, convert_coordinate, str2list,
get_e_loc)
# Read logging config file
log_file_path = Path(__file__).parent.joinpath('logging.conf')
logging.config.fileConfig(log_file_path)
# Create logger
logger = logging.getLogger(__name__)
CHANNEL_NAMING_CONVENTION = {'Ex': 'LQN', 'Ey': 'LQE', 'Hx': 'LFN',
'Hy': 'LFE', 'Hz': 'LFZ', 'Ui': 'LEH',
'Te': 'LKH', 'Tf': 'LKF', 'Sn': 'GNS',
'Fq': 'GST', 'Ce': 'LCE'}
DATA_NBR_COLUMNS = 24
DATA_INDEX = {'Time': range(0, 6), 'Hx': 6, 'Hy': 7, 'Hz': 8, 'Te': 9,
'Tf': 10, 'E1': 11, 'E2': 12, 'E3': 13, 'E4': 14, 'Ui': 15,
'Elev': 16, 'Lat': 17, 'Lat_Hem': 18, 'Lon': 19, 'Lon_Hem': 20,
'Sn': 21, 'Fq': 22, 'Ce': 23}
DATA_TO_ARCHIVE = ['E1', 'E2', 'E3', 'E4', 'Hx', 'Hy', 'Hz', 'Ui', 'Te', 'Tf',
'Sn', 'Fq', 'Ce']
SAMPLING_RATE = 1.0 # Hz
VALID_COMPONENTS = ['E1', 'E2', 'E3', 'E4', 'Hx', 'Hy', 'Hz']
class LemiData():
"""
This class handles the parsing and prep of the data outputted by LEMI-424.
"""
def __init__(self, path2data: str, output_mseed: str) -> None:
"""Instantiate LemiData class and scan data directory."""
self.path2data: str = path2data
self.output_mseed: str = output_mseed
self.data_files: List[PosixPath] = []
self.stats: Dict = {'sample_rate': SAMPLING_RATE, }
self.scan_path2data()
def scan_path2data(self) -> None:
"""
Scan data directory provided by the user for LEMI generated data files.
LEMI-424 filenaming covention: YYYYMMDDHHMM.TXT
"""
data_files = [tmp for tmp in Path(self.path2data).glob('*')
if re.match(r"^\w{12}.txt$", tmp.parts[-1], re.IGNORECASE)]
if not data_files:
logger.error("No data files found under the following path - {}. "
"Check data path!".format(self.path2data))
sys.exit(1)
self.data_files = self.order_files(data_files) # type: ignore
def parse_file(self, data_file: PosixPath) -> Optional[Dict]:
"""Parse data file and check for potential corruptions."""
data: Dict = {}
msg = ("The data file - {} - may have been corrupted. Skipping file!"
.format(data_file))
with open(data_file, 'r', newline='') as fin:
for line in fin:
columns = line.strip().split()
if line.endswith('\r\n') and len(columns) == DATA_NBR_COLUMNS:
tmp = self.reformat_data(columns)
if tmp is None:
logger.warning(msg)
return None
for key, val in tmp.items():
if data.get(key) is None:
data[key] = [val]
else:
data[key].append(val)
else:
logger.warning(msg)
return None
return data
def parse_all_files(self) -> None:
"""
Parse all data files and define data attribute => dictionary structure
with:
- key: Time_stamp, Elev, Hx, Hy, Hz, E1, E2, E3, E4, Ui, Te, Tf, Sn,
Fq, Ce
- value: list of data points for given key
"""
self.data: Dict = {}
for data_file in self.data_files:
logger.info("Parsing data file - {}.".format(data_file))
data = self.parse_file(data_file)
if data is not None:
for key, val in data.items():
if self.data.get(key) is None:
self.data[key] = val
else:
self.data[key] = [*self.data[key], *val]
@staticmethod
def order_files(files: List[PosixPath]) -> List[PosixPath]:
"""
Order data files based on their name. Each file name is defined by
the time stamp of the first data point recorded in the file.
"""
return sorted(files, key=lambda x: x.parts[-1])
@staticmethod
def reformat_data(columns: List[str]) -> Optional[Dict]:
"""
Reformat data in all columns by either applying a time, coordinate or
float conversion.
"""
time_stamp = convert_time(' '.join([columns[ind] for ind in DATA_INDEX['Time']])) # type: ignore
lat = convert_coordinate(columns[DATA_INDEX['Lat']], columns[DATA_INDEX['Lat_Hem']]) # type: ignore
lon = convert_coordinate(columns[DATA_INDEX['Lon']], columns[DATA_INDEX['Lon_Hem']]) # type: ignore
if not all([time_stamp, lat, lon]):
return None
dict_ = {'Time_stamp': time_stamp, 'Lat': lat, 'Lon': lon,
'Elev': float(columns[DATA_INDEX['Elev']]), # type: ignore
'Hx': float(columns[DATA_INDEX['Hx']]), # type: ignore
'Hy': float(columns[DATA_INDEX['Hy']]), # type: ignore
'Hz': float(columns[DATA_INDEX['Hz']]), # type: ignore
'E1': float(columns[DATA_INDEX['E1']]), # type: ignore
'E2': float(columns[DATA_INDEX['E2']]), # type: ignore
'E3': float(columns[DATA_INDEX['E3']]), # type: ignore
'E4': float(columns[DATA_INDEX['E4']]), # type: ignore
'Ui': float(columns[DATA_INDEX['Ui']]), # type: ignore
'Te': float(columns[DATA_INDEX['Te']]), # type: ignore
'Tf': float(columns[DATA_INDEX['Tf']]), # type: ignore
'Sn': float(columns[DATA_INDEX['Sn']]), # type: ignore
'Fq': float(columns[DATA_INDEX['Fq']]), # type: ignore
'Ce': float(columns[DATA_INDEX['Ce']])} # type: ignore
return dict_
@staticmethod
def detect_gaps(time_stamp: List[UTCDateTime]) -> List:
"""
Detect gaps in time series. The number of gaps correlated with the
number of acquisition starts defined the number of runs.
Return list of data point indexes for which a gap is detected.
"""
diffs = [j - i for i, j in zip(time_stamp[:-1], time_stamp[1:])]
ind_gap = [ind + 1 for ind, diff in enumerate(diffs) if diff != SAMPLING_RATE]
if ind_gap:
logger.warning("Data gaps detected at {}."
.format(", ".join([str(time_stamp[x]) for x in ind_gap])))
return ind_gap
@staticmethod
def detect_new_day(time_stamp: List[UTCDateTime]) -> List:
"""
Detect start of a new day in time series. Used to create day-long data
traces.
Return list of data point indexes for which a new day is detected.
"""
return [i + 1 for i in range(len(time_stamp) - 1) if time_stamp[i + 1].day != time_stamp[i].day]
def create_data_array(self) -> None:
"""
From data attribute, create a data_np attribute (numpy array) in which
each time series is associated to a specific channel number, component,
channel name, location code,run number, start time and end time.
"""
# Check for data gaps and day start
time_stamp = self.data["Time_stamp"]
ind_gaps = self.detect_gaps(time_stamp)
ind_days = self.detect_new_day(time_stamp)
ind_traces = sorted(set([0, *ind_gaps, *ind_days, len(time_stamp)]))
# For LEMIs, number of data gaps defines number of runs
# Save that info in stats
self.stats['nbr_runs'] = len(ind_gaps) + 1
# Create structured numpy array
npts_max = int(24 * 3600 * SAMPLING_RATE)
dtype = [('channel_number', str, 2), ('component', str, 2),
('channel_name', str, 3), ('location', str, 2),
('run_nbr', int), ('starttime', UTCDateTime),
('endtime', UTCDateTime), ('npts', int),
('samples', float, npts_max)]
self.data_np = np.zeros(len(DATA_TO_ARCHIVE) * (len(ind_traces) - 1), dtype=dtype)
# Fill array for each time chunk and channel.
# A time chunk is defined as a day or the time between two data gaps if
# data recording is discontinuous.
ind = 0
run_nbr = 1
location = ''
for start, end in zip(ind_traces[:-1], ind_traces[1:]):
npts = end - start
if start in ind_gaps:
run_nbr += 1
for cha in DATA_TO_ARCHIVE:
channel_number = cha if cha.startswith('E') else ''
component = cha if not cha.startswith('E') else ''
channel_name = CHANNEL_NAMING_CONVENTION.get(cha, '')
samples = [*self.data[cha][start:end], *[0] * (npts_max - npts)]
self.data_np[ind] = np.array([(channel_number, component,
channel_name, location, run_nbr,
time_stamp[start],
time_stamp[end - 1], npts,
samples)], dtype=dtype)
ind += 1
def update_stats_1(self) -> None:
"""
Update stats attribute by defining the latitude, longitude, elevation,
start and end time for all channels.
These stats info are used to set the header information for a ObsPy
Trace object.
"""
self.stats['latitude'] = np.mean(self.data["Lat"])
self.stats['longitude'] = np.mean(self.data["Lon"])
self.stats['elevation'] = np.mean(self.data["Elev"])
self.stats['time_period_start'] = self.data["Time_stamp"][0]
self.stats['time_period_end'] = self.data["Time_stamp"][-1]
def update_stats_2(self, net: str, sta: str) -> None:
"""
Update stats attribute by defining or redefining the network code and
station name.
These stats info are used to set the header information for a ObsPy
Trace object.
"""
self.stats['network'] = 'XX' if not net else net.upper()
self.stats['station'] = 'TEST' if not sta else sta.upper()
def update_array(self, data_channels: List, e_info: Dict) -> None:
"""
Update data_np attribute based on:
- list of channels for which data was recorded.
- list of electrode related info.
"""
# Remove from data array channels for which data were not recorded
no_record = [x for x in VALID_COMPONENTS if x not in data_channels]
for channel in no_record:
self.data_np = self.data_np[self.data_np['channel_number'] != channel]
# Update component, channel_name and location code for electric channels
e_loc = get_e_loc(e_info)
for key, val in e_info.items():
for ind in np.where(self.data_np['channel_number'] == key)[0]:
self.data_np[ind]['component'] = val
self.data_np[ind]['channel_name'] = CHANNEL_NAMING_CONVENTION[val]
self.data_np[ind]['location'] = e_loc.get(key, '')
def update_data(self, qc_inputs: Optional[Dict] = None,
metadata=None) -> None:
"""
Update stats and data_np attributes based on user inputs (entered
either via the command line or the GUI).
Called either right after parse_data_files() in QC mode or after
parsing metadata info in normal mode.
"""
if qc_inputs is not None:
net = qc_inputs['net']
sta = qc_inputs['sta']
data_channels = qc_inputs['data_channels']
e_info = qc_inputs['e_info']
else:
net = metadata.survey.archive_network
sta = metadata.station.archive_id
data_channels = str2list(metadata.station.components_recorded)
e_info = {x.channel_number: x.component for x in metadata.electric
if x.channel_number in data_channels}
self.update_stats_2(net, sta)
self.update_array(data_channels, e_info)
def prep_data(self):
"""Wrap up method to prep data outputted by LEMI-424."""
self.parse_all_files()
if not self.data:
logger.error('No valid data found. Exiting!')
sys.exit(1)
else:
self.create_data_array()
self.update_stats_1()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment