Skip to content
Snippets Groups Projects
Commit 2f09b396 authored by Maeva Pourpoint's avatar Maeva Pourpoint
Browse files

Module handling metadata populating

parent c115216c
No related branches found
No related tags found
1 merge request!20Update metadata populating
......@@ -12,17 +12,23 @@ from __future__ import annotations
import copy
import openpyxl
import pickle
from collections import Counter
from dataclasses import fields
from datetime import datetime
from inspect import signature
from obspy import UTCDateTime
from openpyxl.worksheet.worksheet import Worksheet
from operator import methodcaller
from pathlib import Path
from typing import Dict, List, Optional, TYPE_CHECKING, Union
from typing import Dict, List, Optional, Tuple, TYPE_CHECKING, Union
from lemi2seed.lemi_data import CHA_NAMING_CONV
from lemi2seed.logging import parse_config_ini, setup_logger
from lemi2seed.metadata_category import Aux, Elec, Mag, Run, Sta, Net
from lemi2seed.utils import is_empty, get_e_ids, get_run_list, NUM_E_CHA_MAX
from lemi2seed.utils import (is_empty, eval_loc_code, get_e_loc, get_e_ids,
get_run_list, str2list, NUM_E_CHA_MAX)
if TYPE_CHECKING:
import numpy
......@@ -301,3 +307,329 @@ class LemiMetadata():
md_fields = self.parse_field_sheet(sheet, sheet_type, md_fields)
workbook.close()
return self.reformat_md_dict(md_fields) if not is_empty(md_fields) else None
def populate(self, cat: DCS, md_fields: Dict, run_id: Optional[str] = None) -> None:
"""
Populate metadata properties for given "category" of dataclass.
Loop over all metadata fields for given "category" and call appropriate
validation method based on metadata fields name.
If validation method exists, use method to validate metadata field value.
Some metadata inputs are validated against data inputs. Ex: latitude,
longitude, elevation, serial numbers, dipole length ...
"""
for key, val in sorted(md_fields.items()):
valid = True
validate_method = f'validate_{key}'
if isinstance(val, datetime):
val = UTCDateTime(val)
if hasattr(cat, validate_method):
if "data_input" in signature(getattr(cat, validate_method)).parameters:
data_input = self.data_stats[key]
if isinstance(data_input, dict):
data_input = self.data_stats[key][run_id]
valid = methodcaller(validate_method, val, data_input)(cat)
else:
valid = methodcaller(validate_method, val)(cat)
if valid:
setattr(cat, key, val)
cat.md_invalid.discard(key)
else:
cat.md_invalid.add(key)
@staticmethod
def flag_md_missing(cat: DCS, skip: List[str] = []) -> None:
"""
Flag metadata fields that are required for archiving but "missing"
because not provided by the user.
"""
md_props = cat.__dict__
md_req = [md_field.name for md_field in fields(cat)
if md_field.metadata.get('req')]
for key, val in md_props.items():
if key in md_req and key not in skip:
if is_empty(val):
cat.md_missing.add(key)
else:
cat.md_missing.discard(key)
def populate_net_md_props(self, md_fields_n: Dict) -> None:
"""
Populate properties of Network data class based on user inputs from
the field sheets and/or GUI.
"""
self.populate(self.net, md_fields_n)
LemiMetadata.flag_md_missing(self.net)
def populate_sta_md_props(self, md_fields_s: Dict) -> None:
"""
Populate properties of Sta data class based on user inputs from
the field sheets and/or GUI.
"""
self.populate(self.sta, md_fields_s)
self.sta.run_list = ', '.join(self.run_list)
LemiMetadata.flag_md_missing(self.sta)
def populate_run_md_props(self, md_fields_r: Dict) -> None:
"""
Populate properties of run data class based on user inputs from
the field sheets and/or GUI.
"""
if not self.run:
self.run.extend(self.init_run_md_props())
for ind, run in enumerate(self.run):
run_id = self.run_list[ind]
name = f'Run_{run_id}'
self.populate(run, md_fields_r[name], run_id)
LemiMetadata.flag_md_missing(run)
def get_comps_rec(self, type_: str, run_id: str) -> list:
"""
For a given channel type (electric or magnetic) and a given run_id,
get components recorded.
"""
ind_run = self.run_list.index(run_id)
comps_rec = self.run[ind_run].comps_rec
cha_type = 'electric field' if type_ == 'E' else 'magnetic field'
msg = ("No {0} data recorded for run '{1}'. If you did record {0} data "
"for that run, please update your list of recorded components "
"at the station level accordingly!".format(cha_type, run_id))
if comps_rec is None:
logger.warning(msg)
return []
else:
comps = str2list(comps_rec) # type: ignore
comps_ = [x for x in comps if x.startswith(type_)]
if not comps_:
logger.warning(msg)
if type_ == 'H' and not(all([x in comps_ for x in CHA_TYPES['mag']])):
logger.warning("LEMI-039 fluxgate is a 3-component magnetometer. "
"If you did record {} data for run {}, data from "
"all magnetic components (Hx, Hy and Hz) were "
"recorded. Please update your list of recorded "
"components at the station level."
.format(cha_type, run_id))
return comps_
def match_comps_rec(self, e_chas: List[Elec], e_comps: List[str], run_id: str) -> None:
"""
Check that list of recorded components specified by the user at the run
level matches the channel numbers listed at the Elec level for a given
run.
"""
cha_nums = [e.cha_num for e in e_chas]
if Counter(cha_nums) != Counter(e_comps):
logger.error("Invalid electric channel numbers (run '{}')! "
"The channel numbers don't match the electric "
"channels in your list of recorded components at the "
"station/run level.".format(run_id))
for e_cha in e_chas:
e_cha.md_invalid.add('cha_num')
else:
for e_cha in e_chas:
e_cha.md_invalid.discard('cha_num')
def match_num_e_pairs(self, e_comps: List[str], run_id: str, num_e_pairs: Optional[int] = None) -> None:
"""
Check that number of electrode pairs specified at the Elec level for a
given run matches the number of electric channels specified by the user
at the run level.
"""
e_chas = self.filter_cha('elec', run_id)
if num_e_pairs is not None and int(num_e_pairs) != len(e_comps):
logger.error("Invalid number of electrode pairs (run '{}')! "
"The number of electrode pairs does not match the "
"number of electric channels in your list of recorded "
"components at the station/run level.".format(run_id))
for e_cha in e_chas:
e_cha.md_invalid.add(f'run_{run_id}_num_e_pairs')
else:
for e_cha in e_chas:
e_cha.md_invalid.discard(f'run_{run_id}_num_e_pairs')
def check_e_cha_nums(self, e_chas: List[Elec], e_comps: List[str],
run_id: str, num_e_pairs: Optional[int] = None) -> None:
"""
Check whether the electric channels numbers, the electric components and
the number of electrode pairs are consistent with one another.
"""
self.match_comps_rec(e_chas, e_comps, run_id)
self.match_num_e_pairs(e_comps, run_id, num_e_pairs)
def get_e_infos(self) -> Dict:
"""
Get electrode pair information (channel number and associated component)
for each run
"""
e_infos = {}
for run in self.run:
run_id = run.run_id
elec = self.filter_cha('elec', run_id)
if run.comps_rec:
e_comps = [x for x in str2list(run.comps_rec) if x.startswith('E')]
else:
e_comps = []
e_infos[run_id] = {x.cha_num: x.comp for x in elec if x.cha_num in e_comps} # type: ignore
return e_infos
def update_loc(self):
"""
Update location code for the electric channel if more than two
electrode pairs were deployed at a station at a given point.
"""
e_infos = self.get_e_infos()
bool_loc = eval_loc_code(e_infos)
for run_id, e_info in e_infos.items():
e_loc = get_e_loc(e_info) if bool_loc else {}
for key in e_info.keys():
elec = [c for c in self.elec if c.run_id == run_id and c.cha_num == key]
elec[0].loc_code = e_loc.get(key, '')
def populate_elec_md_props(self, md_fields_e: Dict, num_e_pairs: Optional[Dict] = None) -> Tuple[Dict, Dict]:
"""
Populate properties of Elec data class based on user inputs from
the field sheets and/or GUI.
"""
efield_keys = {}
cha_nums = {}
for ind_run, run_id in enumerate(self.run_list):
e_comps = self.get_comps_rec('E', run_id)
if not e_comps:
continue
if not self.filter_cha('elec', run_id):
cha = self.init_cha_md_props('elec', run_id)
self.elec.extend([LemiMetadata.update_cha_num(cha, f'E{i+1}') # type: ignore
for i in range(NUM_E_CHA_MAX)]) # type: ignore
num_e_pairs_run = int(num_e_pairs.get(run_id) or len(e_comps)) if num_e_pairs else len(e_comps)
efield_keys[run_id] = {v['cha_num']: k for k, v in md_fields_e.items() if k.split('_')[1] == run_id}
cha_nums[run_id] = [k for k, v in efield_keys[run_id].items()
if int(v.split('_')[-1]) in range(1, num_e_pairs_run+1)]
chas = [x for x in self.filter_cha('elec', run_id)
if x.cha_num in cha_nums[run_id]] # type: ignore
set_methods = [x for x in dir(Elec) if x.startswith('set_')]
for cha in chas:
key = efield_keys[run_id][cha.cha_num] # type: ignore
self.populate(cha, md_fields_e[key], run_id)
for set_method in set_methods:
methodcaller(set_method)(cha)
LemiMetadata.flag_md_missing(cha)
self.check_e_cha_nums(chas, e_comps, run_id, num_e_pairs_run) # type: ignore
self.update_loc()
return efield_keys, cha_nums
def get_cha_inds(self, cha_type: str, run_id: str) -> List[int]:
"""
For a given channel type (electric, magnetic or auxiliary), get indexes
of channels with a given run id.
"""
return [i for i, c in enumerate(getattr(self, cha_type))
if c.run_id == run_id]
@staticmethod
def update_comp_cha_name(cha: DCS_SUB_MA, comp: str) -> DCS_SUB_MA:
"""
Make copy of populated Magnetic or Aux data classes and set component
and channel name.
"""
tmp = copy.deepcopy(cha)
tmp.comp = comp
tmp.cha_name = CHA_NAMING_CONV[tmp.comp]
return tmp
def populate_mag_md_props(self, md_fields_m: Dict) -> None:
"""
Populate properties of Mag data class based on user inputs from the
field sheets and/or GUI.
For now, we are assuming that a 3-component magnetometer was installed.
"""
for ind_run, run_id in enumerate(self.run_list):
m_comps = self.get_comps_rec('H', run_id)
if not m_comps:
continue
if not self.filter_cha('mag', run_id):
cha = self.init_cha_md_props('mag', run_id)
self.mag.extend([copy.deepcopy(cha) for i in range(len(CHA_TYPES['mag']))]) # type: ignore
cha_inds = self.get_cha_inds('mag', run_id)
cha = self.mag[cha_inds[0]]
name = f'Run_{run_id}_Mag'
self.populate(cha, md_fields_m[name], run_id)
for set_method in [x for x in dir(Mag) if x.startswith('set_')]:
methodcaller(set_method)(cha)
LemiMetadata.flag_md_missing(cha, skip=['comp', 'cha_name'])
for ind, cha_ind in enumerate(cha_inds):
comp = CHA_TYPES['mag'][ind]
self.mag[cha_ind] = LemiMetadata.update_comp_cha_name(cha, comp) # type: ignore
def populate_aux_md_props(self) -> None:
"""
Populate properties of Aux data class based on user inputs from
the field sheets and/or GUI.
"""
for ind_run, run_id in enumerate(self.run_list):
if not self.filter_cha('aux', run_id):
cha = self.init_cha_md_props('aux', run_id)
self.aux.extend([copy.deepcopy(cha) for i in range(len(CHA_TYPES['aux']))]) # type: ignore
cha_inds = self.get_cha_inds('aux', run_id)
cha = self.aux[cha_inds[0]]
cha.sn = self.data_stats['datalogger_sn'][run_id]
for set_method in [x for x in dir(Aux) if x.startswith('set_')]:
methodcaller(set_method)(cha)
LemiMetadata.flag_md_missing(cha, skip=['comp', 'cha_name'])
for ind, cha_ind in enumerate(cha_inds):
comp = CHA_TYPES['aux'][ind]
self.aux[cha_ind] = LemiMetadata.update_comp_cha_name(cha, comp) # type: ignore
def populate_md_props(self, md_fields: Dict) -> None:
"""
Populate metadata properties based on user inputs from the field sheets
and/or GUI.
"""
for cat in self.cats:
md_fields_ = md_fields.get(cat)
populate_method = f'populate_{cat.lower()}_md_props'
if md_fields_:
methodcaller(populate_method, md_fields_)(self)
else:
methodcaller(populate_method)(self)
def save_md(self, filename: str) -> None:
"""
Save instance of LemiMetadata class into a byte stream.
Useful if:
- the user wants to update metadata fields and regenerate StationXML
files after terminating lemi2seed.
- the user wants to reuse some of the metadata fields for another
station.
- lemi2seed is terminated unexpectedly.
"""
with open(filename, 'wb') as fout:
logger.info("Saving metadata inputs in {}".format(filename))
pickle.dump(self, fout)
@staticmethod
def load_md(filename: str) -> LemiMetadata:
"""Load saved instance of LemiMetadata class."""
with open(filename, 'rb') as fin:
logger.info("Loading metadata inputs from {}".format(filename))
lemi_md = pickle.load(fin)
return lemi_md
def update_azimuth_tilt(self) -> None:
"""
Update azimuth and tilt for magnetic field channels.
LEMI-039 fluxgate is a 3-component magnetometer. So by convention:
- the tilt for the Hx and Hy channels should be set to 0°
- the azimuth of the Hx and Hy channels should be 90° offset
- the azimuth for the Hz channel should be set to 0°.
"""
for run_id in self.run_list:
chas = self.filter_cha('mag', run_id)
for cha in chas:
if cha.comp in ['Hx', 'Hy']:
cha.meas_tilt = 0.0
if cha.comp == 'Hy':
cha.meas_azimuth = float(cha.meas_azimuth) + 90.0 # type: ignore
else:
tilt = float(cha.meas_tilt) # type: ignore
if tilt > 0.0:
cha.meas_tilt = -tilt # to match SEED convention
cha.meas_azimuth = 0.0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment