Skip to content
Snippets Groups Projects
Commit 17a1d1cf authored by Maeva Pourpoint's avatar Maeva Pourpoint
Browse files

Module handling the conversion of metadata fields to StationXML file

parent 24a628b1
No related branches found
No related tags found
1 merge request!25Metadata conversion functionalities
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Routines to convert the provided metadata to StationXML files.
Maeva Pourpoint - IRIS/PASSCAL
"""
from __future__ import annotations
import sys
from dataclasses import fields
from obspy import UTCDateTime
from obspy.core.inventory import (Channel, Comment, Equipment, Inventory,
Network, Operator, Person, Site, Station)
from typing import Dict, List, Optional, Set, Tuple, TYPE_CHECKING
from lemi2seed.lemi_data import DATA_TO_ARCHIVE
from lemi2seed.lemi_response import LemiResponse, CHA_UNITS
from lemi2seed.logging import parse_config_ini, setup_logger
from lemi2seed.utils import str2list, is_empty
if TYPE_CHECKING:
from dataclasses import Field
from lemi2seed.lemi_metadata import (LemiMetadata, Net, Sta, Run, Elec,
Mag, Aux, DCS, DCS_SUB_EMA, MD_VAL)
from obspy.core.inventory import Response
# Read config.ini file and set up logging
config = parse_config_ini()
logger = setup_logger(__name__)
AUX_CHA = {'Ui': 'Input voltage', 'Te': 'Temperature of electronic unit',
'Tf': 'Temperature of fluxgate sensor', 'Sn': 'Satellite number',
'Fq': 'GPS fix quality', 'Ce': 'Difference between internal time and GPS'}
def create_inventory() -> Inventory:
"""Create Inventory object."""
software_module = f"{config['software']['name']}.{config['software']['version']}"
inv = Inventory(module=software_module,
module_uri='www.passcal.nmt.edu',
source='PASSCAL')
return inv
def get_xml_field(cat: DCS, field: Field) -> Tuple:
"""
Get xml_id from MappingProxyType metadata for given dataclass attribute.
Get metadata field value for given dataclass attribute.
"""
field_info = field.metadata.get('xml_id')
tmp = getattr(cat, field.name)
field_val = tmp if not is_empty(tmp) else ''
return field_info, field_val
def get_comment(field_info: List[str], field_val: MD_VAL, *, ind_run: Optional[int] = None) -> Comment:
"""Create Comment object for a given comment entry."""
comment = Comment('')
comment.value = str(field_val)
subject = field_info[1]
if ind_run is not None:
run_id = chr(ord('a') + ind_run)
subject = subject.replace(':a', f':{run_id}')
comment.subject = subject
return comment
def create_net(net: Net) -> Network:
"""Create Network object and populate with network related metadata."""
comments = []
person = Person()
net_ = Network(code=net.archive_net.upper()) # type: ignore
net_fields = [x for x in fields(net) if x.metadata.get('xml_id') is not None]
for net_field in net_fields:
net_field_info, net_field_val = get_xml_field(net, net_field)
if net_field_info[0] == 'comments' and net_field_val != '':
comment = get_comment(net_field_info, net_field_val)
comments.append(comment)
elif 'operators' in net_field_info[0]:
if 'agency' in net_field_info[0]:
operator = Operator(agency=net_field_val)
elif 'names' in net_field_info[0]:
person.names = str2list(net_field_val)
else:
person.emails = str2list(net_field_val)
elif net_field_info[0] == 'identifiers':
net_.identifiers = str2list(net_field_val)
else:
setattr(net_, net_field_info[0], net_field_val)
operator.contacts = [person]
net_.operators = [operator]
net_.comments = comments
return net_
def create_sta(sta: Sta, runs: List[Run]) -> Station:
"""Create Station object and populate with station related metadata."""
comments = []
sta_ = Station(code=sta.archive_id.upper(), latitude=sta.lat, # type: ignore
longitude=sta.lon, elevation=sta.elev)
sta_fields = [x for x in fields(sta) if x.metadata.get('xml_id') is not None]
# --- Metadata fields at the station level ---
for sta_field in sta_fields:
sta_field_info, sta_field_val = get_xml_field(sta, sta_field)
if sta_field_info[0] == 'comments' and sta_field_val != '':
comment = get_comment(sta_field_info, sta_field_val)
comments.append(comment)
elif sta_field_info[0] == 'site':
sta_.site = Site(sta_field_val)
else:
setattr(sta_, sta_field_info[0], sta_field_val)
# --- Comments at the run level ---
for ind_run, run in enumerate(runs):
run_fields = [x for x in fields(run)
if x.metadata.get('xml_id') is not None
and 'comments' in x.metadata.get('xml_id')] # type: ignore
for run_field in run_fields:
run_field_info, run_field_val = get_xml_field(run, run_field)
if run_field_val != '':
comment = get_comment(run_field_info, run_field_val, ind_run=ind_run)
comments.append(comment)
sta_.comments = comments
return sta_
def create_logger(runs: List[Run]) -> List[Equipment]:
"""Create Equipment object and populate with run related metadata."""
loggers = []
for run in runs:
run_fields = [x for x in fields(run)
if x.metadata.get('xml_id') is not None
and 'comments' not in x.metadata.get('xml_id')] # type: ignore
logger = Equipment()
for run_field in run_fields:
run_field_info, run_field_val = get_xml_field(run, run_field)
setattr(logger, run_field_info[0], run_field_val)
loggers.append(logger)
return loggers
def detect_changes_elec_port(runs: List) -> Set[str]:
"""
Get run id for which a change in electrode channel port was made.
"""
keys = set()
for x, y in runs:
keys.update(list(y.keys()))
changes = set()
for x, y in zip(runs[:-1], runs[1:]):
for key in keys:
port1 = x[1].get(key)
port2 = y[1].get(key)
if (port1 and port2) and (port1 != port2):
changes.add(y[0])
return changes
def detect_changes_elec_cha(lemi_md: LemiMetadata) -> Set[str]:
"""Detect changes in electrode channel port."""
runs_cha_port = []
for run_id in lemi_md.run_list:
num_e_pairs = len(lemi_md.get_comps_rec('E', run_id))
chas = lemi_md.filter_cha('elec', run_id)[:num_e_pairs]
runs_cha_port.append((run_id, {f'{x.comp}_{x.loc_code}': x.cha_port for x in chas})) # type: ignore
changes = detect_changes_elec_port(runs_cha_port)
return changes
def detect_changes_mag_cha(lemi_md: LemiMetadata) -> Set[str]:
"""Detect changes in fluxgate serial number."""
runs_mag_sn = []
for run_id in lemi_md.run_list:
cha_run = lemi_md.filter_cha('mag', run_id)[0]
runs_mag_sn.append((run_id, cha_run.fluxgate_sn)) # type: ignore
changes = {y[0] for x, y in zip(runs_mag_sn[:-1], runs_mag_sn[1:]) if x[1] != y[1]}
return changes
def detect_changes_datalogger(lemi_md: LemiMetadata) -> Set[str]:
"""Detect changes in data logger serial number."""
runs_logger_sn = [(x.run_id, x.datalogger_sn) for x in lemi_md.run]
changes = {y[0] for x, y in zip(runs_logger_sn[:-1], runs_logger_sn[1:]) if x[1] != y[1]}
return changes
def detect_new_epoch(lemi_md: LemiMetadata) -> List:
"""
Detect a new epoch based on changes in:
- fluxgate serial number
- data logger serial number
- electrode channel port
"""
elec_changes = detect_changes_elec_cha(lemi_md)
mag_changes = detect_changes_mag_cha(lemi_md)
datalogger_changes = detect_changes_datalogger(lemi_md)
changes = datalogger_changes | elec_changes | mag_changes
changes.add(lemi_md.run_list[0])
changes.add(chr(ord(lemi_md.run_list[-1]) + 1))
changes = sorted(changes) # type: ignore
epochs = []
run_e_comp = {k: list(v.keys()) for k, v in lemi_md.get_e_infos().items()}
for run_id_start, run_id_end in zip(changes[:-1], changes[1:]): # type: ignore
run_ids = [chr(x) for x in range(ord(run_id_start), ord(run_id_end))]
run_info = {x: [*run_e_comp[x], *DATA_TO_ARCHIVE[4:]] for x in run_ids}
run_time_period = {run.run_id: (run.start, run.end) for run in lemi_md.run if run.run_id in run_ids}
epochs.append([run_info, run_time_period])
return epochs
def get_e_cha_updates(chas: List[Elec], epoch: Dict) -> Dict:
updates: Dict = {x.cha_port: {} for x in chas}
for x in chas:
if x.cha_port in epoch[x.run_id]:
updates[x.cha_port].update({x.run_id: [x.dc_start,
x.dc_end,
x.contact_resistance_start,
x.contact_resistance_end,
x.dipole_len]})
return updates
def create_elec_sensor(elec: Elec) -> Equipment:
"""Create Sensor object and populate with electric related metadata."""
sensor = Equipment()
sensor.type = 'dipole'
sensor.manufacturer = elec.inst_manufacturer
sensor.model = " - ".join([elec.inst_model, elec.inst_type]) # type: ignore
pos_sn = ": ".join(['positive', str(elec.pos_elec_sn)])
neg_sn = ": ".join(['negative', str(elec.neg_elec_sn)])
sensor.serial_number = ", ".join([pos_sn, neg_sn])
return sensor
def create_mag_sensor(mag: Mag) -> Equipment:
"""Create Sensor object and populate with magnetic related metadata."""
sensor = Equipment()
sensor.type = 'fluxgate'
sensor.description = mag.inst_type
sensor.manufacturer = mag.inst_manufacturer
sensor.model = mag.inst_model
sensor.serial_number = mag.fluxgate_sn
return sensor
def create_aux_sensor(aux: Aux) -> Equipment:
"""Create Sensor object and populate with auxiliary related metadata."""
sensor = Equipment()
sensor.type = 'data logger'
sensor.description = AUX_CHA[aux.comp] # type: ignore
sensor.manufacturer = aux.inst_manufacturer
sensor.model = " - ".join([aux.inst_model, aux.inst_type]) # type: ignore
sensor.serial_number = aux.sn
return sensor
def init_cha(cat: DCS_SUB_EMA) -> Channel:
"""Instantiate Channel object."""
cha = Channel(code=cat.cha_name, location_code=cat.loc_code,
latitude=cat.lat, longitude=cat.lon,
elevation=cat.elev, depth=cat.depth)
return cha
def create_cha(cha: DCS_SUB_EMA, cha_type: str) -> Channel:
"""
Create Channel object and populate class instance using the collected
metadata fields at the Elec, Mag and Aux levels.
Calls create_elec_sensor, create_mag_sensor or
create_aux_sensor based on channel type.
"""
comments = []
cha_ = init_cha(cha)
cha_fields = [x for x in fields(cha) if x.metadata.get('xml_id') is not None]
for cha_field in cha_fields:
cha_field_info, cha_field_val = get_xml_field(cha, cha_field)
if cha_field_info[0] == 'comments' and cha_field_val != '':
comment = get_comment(cha_field_info, cha_field_val)
comments.append(comment)
elif cha_field_info[0] == 'types':
cha_.types = [cha_field_val]
else:
setattr(cha_, cha_field_info[0], cha_field_val)
cha_.comments = comments
create_sensor_method = '_'.join(['create', cha_type, 'sensor'])
sensor = getattr(sys.modules[__name__], create_sensor_method)(cha)
cha_.sensor = sensor
return cha_
def update_dates(cha: Channel, dates: List) -> Channel:
"""Update the start and end dates of the channel."""
cha.start_date = min(dates)[0]
cha.end_date = max(dates)[1]
return cha
def update_calibration_units(cha: Channel, cha_code: str) -> Channel:
"""Update the calibration units of the channel."""
cha.calibration_units = CHA_UNITS[cha_code][0]
cha.calibration_units_description = CHA_UNITS[cha_code][1]
return cha
def add_run_ids_comment(cha: Channel, run_ids: List) -> Channel:
"""Add comment listing the id of the runs for given epoch"""
comment = Comment('')
comment.value = ", ".join(run_ids)
comment.subject = 'mt.run.id'
cha.comments.append(comment)
return cha
def add_voltage_resistance_comment(cha: Channel, cha_code: str, e_cha_updates: Dict) -> Channel:
""""
Add comments about the measured voltages and contact resistances for all
runs in a given epoch.
"""
comment_subjects = {0: ['mt.electric.dc.start', 'V'],
1: ['mt.electric.dc.end', 'V'],
2: ['mt.electric.contact_resistance.start', ''],
3: ['mt.electric.contact_resistance.end', '']}
for ind, val in comment_subjects.items():
subject, unit = val
val = ' - '.join([f'{k}: {v[ind]}{unit}' for k, v in e_cha_updates[cha_code].items() if v[ind]]) # type: ignore
if not val:
continue
comment = Comment('')
comment.value = val
comment.subject = subject
cha.comments.append(comment)
return cha
def add_sensor_description(cha: Channel, cha_code: str, e_cha_updates: Dict) -> Channel:
""""
Add dipole length information for all runs in a given epoch to sensor
description.
"""
val = ' - '.join([f'{k}: {v[-1]} m' for k, v in e_cha_updates[cha_code].items() if v[-1]])
cha.sensor.description = val
return cha
def update_cha(cha: Channel, cha_code: str, epoch: List[Dict], e_cha_updates: Dict) -> Channel:
"""
Update Channel attributes with information for a given epoch:
start date; end date; calibration units; list of run ids; voltage and contact
resistance for all runs (only applicable for electric channels).
"""
run_info, run_time_period = epoch
run_ids = [key for key, val in run_info.items() if cha_code in val]
dates = [val for key, val in run_time_period.items() if key in run_ids]
cha = update_dates(cha, dates)
cha = update_calibration_units(cha, cha_code)
cha = add_run_ids_comment(cha, run_ids)
if e_cha_updates:
cha = add_voltage_resistance_comment(cha, cha_code, e_cha_updates)
cha = add_sensor_description(cha, cha_code, e_cha_updates)
return cha
def get_cha_resp(sn: str, cha_type: str, cha_code: str, sample_rate: float) -> Response:
"""
Get single stage channel response for given logger/sesor pair (identified
by its serial number) and for given channel code.
"""
LemiResponse.check_resp_file()
resps = LemiResponse.load_resp()
lemi_resp = LemiResponse(sn, cha_code, sample_rate)
if cha_type == 'aux':
cha_resp = lemi_resp.get_soh_cha_resp()
else:
cha_resp = lemi_resp.get_data_cha_resp(resps)
return cha_resp
def create_all_chas(lemi_md: LemiMetadata, cha_type: str, epochs: List) -> List[Channel]:
"""
For a given channel type (electric, magnetic, auxiliary), create Channel
objects for each set of recorded components for each epoch.
"""
chas = []
md_chas = getattr(lemi_md, cha_type)
for epoch in epochs:
archived_chas = []
md_chas_epoch = [x for x in md_chas if x.run_id in epoch[0].keys()]
e_cha_updates = get_e_cha_updates(md_chas_epoch, epoch[0]) if cha_type == 'elec' else {}
for c in md_chas_epoch:
run_id = c.run_id
cha_code = c.cha_port if cha_type == 'elec' else c.comp
if cha_code in archived_chas or cha_code not in epoch[0][run_id]:
continue
archived_chas.append(cha_code)
cha = create_cha(c, cha_type)
cha = update_cha(cha, cha_code, epoch, e_cha_updates) # type: ignore
sn = [x.datalogger_sn for x in lemi_md.run if x.run_id == run_id][0]
cha.response = get_cha_resp(sn, cha_type, cha_code, c.sample_rate) # type: ignore
chas.append(cha)
return chas
def write_stationxml(lemi_md: LemiMetadata) -> None:
"""
Create a StationXML file for given station.
Naming convention for StationXML files: net.sta.project.yyyyjulday.time.xml
where:
- net is the network code
- project is the experiment name
- yyyy, julday and time is UTC year, julian day and time
"""
filename = '.'.join([lemi_md.data_stats['net'], lemi_md.data_stats['sta'],
'_'.join([x.upper() for x in lemi_md.net.project.split(" ")]), # type: ignore
UTCDateTime().strftime('%Y%j.%H%M%S'), 'xml'])
logger.info("Writing StationXML file: {}.".format(filename))
inv = create_inventory()
net = create_net(lemi_md.net)
sta = create_sta(lemi_md.sta, lemi_md.run)
loggers = create_logger(lemi_md.run)
epochs = detect_new_epoch(lemi_md)
chas = [c for cha_type in ['elec', 'mag', 'aux']
for c in create_all_chas(lemi_md, cha_type, epochs)]
sta.channels = chas
sta.equipments = loggers
net.stations = [sta]
inv.networks = [net]
inv.write(str(lemi_md.output_xml.joinpath(filename)), format="STATIONXML")
logger.info("Writing of StationXML file successfully completed!")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment