#!/usr/bin/env python # -*- coding: utf-8 -*- """Tests for `lemi_data` module.""" import logging import pickle import unittest from obspy import UTCDateTime from pathlib import Path from lemi2seed.lemi_data import LemiData, log_file_path OUTPUT_MSEED = Path(__file__).resolve().parent.joinpath('MSEED') TEST_DIR = Path(__file__).resolve().parent.joinpath('test_data') SCR_DIR = "lemi2seed.lemi_data" logging.config.fileConfig(log_file_path) logger = logging.getLogger(SCR_DIR) class TestLemiData(unittest.TestCase): """Test suite for lemi_data.LemiData class""" def setUp(self): """Set up test fixtures""" self.path2data = TEST_DIR.joinpath("DATA0110") self.path2data_corrupted_1 = TEST_DIR.joinpath("DATA0110_Corrupted_Level1") self.path2data_corrupted_2 = TEST_DIR.joinpath("DATA0110_Corrupted_Level2") files = ['202009302105.TXT', '202009302112.TXT', '202009302114.TXT', '202010010000.TXT'] self.data_files = [self.path2data.joinpath(x) for x in files] def test_scan_path2data_valid_filenaming(self): """Test basic functionality of scan_path2data.""" lemi_data = LemiData(self.path2data, OUTPUT_MSEED) self.assertListEqual(lemi_data.data_files, self.data_files) def test_scan_path2data_erroneous_filenaming(self): """Test basic functionality of scan_path2data.""" path2data = TEST_DIR.joinpath('DATA0110_Erroneous_Filenaming') with self.assertRaises(SystemExit) as cmd: LemiData(path2data, OUTPUT_MSEED) self.assertEqual(cmd.exception.code, 1) def test_parse_file_valid_file(self): """Test basic functionality of parse_file.""" lemi_data = LemiData(self.path2data, OUTPUT_MSEED) data = lemi_data.parse_file(lemi_data.data_files[0]) input_file = self.path2data.joinpath('test_data_file.pkl') with open(input_file, 'rb') as fin: expected_data = pickle.load(fin) self.assertDictEqual(data, expected_data) def test_parse_file_corrupted_no_CR(self): """Test basic functionality of parse_file.""" lemi_data = LemiData(self.path2data_corrupted_1, OUTPUT_MSEED) data_file = lemi_data.data_files[0] with self.assertLogs(logger, level='WARNING') as cmd: data = lemi_data.parse_file(data_file) msg = ("The data file - {} - may have been corrupted. Skipping file!" .format(data_file)) self.assertEqual(cmd.output, [":".join(['WARNING', SCR_DIR, msg])]) self.assertIsNone(data) def test_parse_file_corrupted_missing_column(self): """Test basic functionality of parse_file.""" lemi_data = LemiData(self.path2data_corrupted_1, OUTPUT_MSEED) data_file = lemi_data.data_files[1] with self.assertLogs(logger, level='WARNING') as cmd: data = lemi_data.parse_file(data_file) msg = ("The data file - {} - may have been corrupted. Skipping file!" .format(data_file)) self.assertEqual(cmd.output, [":".join(['WARNING', SCR_DIR, msg])]) self.assertIsNone(data) def test_parse_file_corrupted_erroneous_time_formatting(self): """Test basic functionality of parse_file.""" lemi_data = LemiData(self.path2data_corrupted_2, OUTPUT_MSEED) data_file = lemi_data.data_files[0] with self.assertLogs(logger) as cmd: data = lemi_data.parse_file(data_file) msg = ("The data file - {} - may have been corrupted. Skipping file!" .format(data_file)) self.assertEqual(cmd.output, [":".join(['WARNING', SCR_DIR, msg])]) self.assertIsNone(data) def test_parse_file_corrupted_erroneous_coordinate_formatting(self): """Test basic functionality of parse_file.""" lemi_data = LemiData(self.path2data_corrupted_2, OUTPUT_MSEED) data_file = lemi_data.data_files[1] with self.assertLogs(logger) as cmd: data = lemi_data.parse_file(data_file) msg = ("The data file - {} - may have been corrupted. Skipping file!" .format(data_file)) self.assertEqual(cmd.output, [":".join(['WARNING', SCR_DIR, msg])]) self.assertIsNone(data) def test_parse_all_files(self): """Test basic functionality of parse_all_files.""" lemi_data = LemiData(self.path2data, OUTPUT_MSEED) lemi_data.parse_all_files() input_file = self.path2data.joinpath('test_data_files.pkl') with open(input_file, 'rb') as fin: expected_data = pickle.load(fin) self.assertDictEqual(lemi_data.data, expected_data) def test_order_files(self): """Test basic functionality of order_files.""" files = self.path2data.glob('*.TXT') ordered_data_files = LemiData.order_files(files) self.assertListEqual(ordered_data_files, self.data_files) def test_reformat_data_valid_formatting(self): """Test basic functionality of reformat_data.""" output = {'Time_stamp': UTCDateTime('2020-09-30T21:05:00.000000Z'), 'Lat': 34.0483955, 'Lon': -107.12843720000001, 'Elev': 2202.2, 'Hx': 23777.803, 'Hy': 210.731, 'Hz': 41834.585, 'E1': 136.521, 'E2': -110.769, 'E3': 174.046, 'E4': 16.178, 'Ui': 13.01, 'Te': 46.75, 'Tf': 37.09, 'Sn': 12.0, 'Fq': 2.0, 'Ce': 0.0} line = ('2020 09 30 21 05 00 23777.803 210.731 41834.585 46.75 ' '37.09 136.521 -110.769 174.046 16.178 13.01 2202.2 ' '3404.83955 N 10712.84372 W 12 2 0') columns = line.strip().split() self.assertDictEqual(LemiData.reformat_data(columns), output) def test_reformat_data_erroneous_time_formatting(self): """Test basic functionality of reformat_data.""" line = ('2020 09 30 21 05 23777.803 23777.803 210.731 41834.585 ' '46.75 37.09 136.521 -110.769 174.046 16.178 13.01 ' '2202.2 3404.83955 N 10712.84372 W 12 2 0') columns = line.strip().split() self.assertIsNone(LemiData.reformat_data(columns)) def test_reformat_data_erroneous_coordinate_formatting(self): """Test basic functionality of reformat_data.""" line = ('2020 09 30 21 12 00 23775.684 212.121 41834.749 46.68 ' '36.46 133.427 -111.936 171.563 15.734 13.00 2200.8 ' '3404.83994 Z 10712.84418 W 12 2 0') columns = line.strip().split() self.assertIsNone(LemiData.reformat_data(columns)) def test_detect_gaps(self): """Test basic functionality of detect_gaps.""" lemi_data = LemiData(self.path2data, OUTPUT_MSEED) lemi_data.parse_all_files() time_stamp = lemi_data.data["Time_stamp"] ind_gaps = LemiData.detect_gaps(time_stamp) self.assertEqual(len(ind_gaps), 3) self.assertListEqual([time_stamp[ind] for ind in ind_gaps], [UTCDateTime('2020-09-30T21:12:00.000000Z'), UTCDateTime('2020-09-30T21:14:00.000000Z'), UTCDateTime('2020-10-01T00:00:00.000000Z')]) def test_detect_new_day(self): """Test basic functionality of detect_new_day.""" lemi_data = LemiData(self.path2data, OUTPUT_MSEED) lemi_data.parse_all_files() time_stamp = lemi_data.data["Time_stamp"] ind_day = LemiData.detect_new_day(time_stamp) self.assertEqual(len(ind_day), 1) self.assertEqual(time_stamp[ind_day[0]], UTCDateTime('2020-10-01T00:00:00.000000Z')) def test_create_data_array(self): """Test basic functionality of create_data_array.""" lemi_data = LemiData(self.path2data, OUTPUT_MSEED) lemi_data.parse_all_files() lemi_data.create_data_array() input_file = self.path2data.joinpath('test_data_np_files.pkl') with open(input_file, 'rb') as fin: expected_data_np = pickle.load(fin) self.assertEqual(lemi_data.stats['nbr_runs'], 4) self.assertSequenceEqual(lemi_data.data_np[0], expected_data_np) def test_update_stats_1(self): """Test basic functionality of update_stats_1.""" lemi_data = LemiData(self.path2data, OUTPUT_MSEED) lemi_data.parse_all_files() lemi_data.update_stats_1() self.assertAlmostEqual(lemi_data.stats['latitude'], 34.04839, 5) self.assertAlmostEqual(lemi_data.stats['longitude'], -107.12844, 5) self.assertAlmostEqual(lemi_data.stats['elevation'], 2201.77122, 5) self.assertEqual(lemi_data.stats['time_period_start'], UTCDateTime('2020-09-30T21:05:00.000000Z')) self.assertEqual(lemi_data.stats['time_period_end'], UTCDateTime('2020-10-01T00:05:59.000000Z')) def test_update_array_list_of_channels(self): """Test basic functionality of update_array.""" lemi_data = LemiData(self.path2data, OUTPUT_MSEED) lemi_data.parse_all_files() lemi_data.create_data_array() data_channels = ['E1', 'E2', 'Hx', 'Hy', 'Hz'] e_info = {'E1': 'Ex', 'E2': 'Ey'} lemi_data.update_array(data_channels, e_info) self.assertNotIn('E3', lemi_data.data_np['channel_number']) self.assertNotIn('E4', lemi_data.data_np['channel_number']) def test_update_array_electrode_info(self): """Test basic functionality of update_array.""" lemi_data = LemiData(self.path2data, OUTPUT_MSEED) lemi_data.parse_all_files() lemi_data.create_data_array() data_channels = ['E1', 'E2', 'E3', 'E4', 'Hx', 'Hy', 'Hz'] e_info = {'E1': 'Ex', 'E2': 'Ey', 'E3': 'Ex', 'E4': 'Ey'} lemi_data.update_array(data_channels, e_info) data_np_E1 = lemi_data.data_np[lemi_data.data_np['channel_number'] == 'E1'] self.assertSetEqual(set(data_np_E1['component']), {'Ex'}) self.assertSetEqual(set(data_np_E1['channel_name']), {'LQN'}) self.assertSetEqual(set(data_np_E1['location']), {'00'}) data_np_E2 = lemi_data.data_np[lemi_data.data_np['channel_number'] == 'E2'] self.assertSetEqual(set(data_np_E2['component']), {'Ey'}) self.assertSetEqual(set(data_np_E2['channel_name']), {'LQE'}) self.assertSetEqual(set(data_np_E2['location']), {'00'}) data_np_E3 = lemi_data.data_np[lemi_data.data_np['channel_number'] == 'E3'] self.assertSetEqual(set(data_np_E3['component']), {'Ex'}) self.assertSetEqual(set(data_np_E3['channel_name']), {'LQN'}) self.assertSetEqual(set(data_np_E3['location']), {'01'}) data_np_E4 = lemi_data.data_np[lemi_data.data_np['channel_number'] == 'E4'] self.assertSetEqual(set(data_np_E4['component']), {'Ey'}) self.assertSetEqual(set(data_np_E4['channel_name']), {'LQE'}) self.assertSetEqual(set(data_np_E4['location']), {'01'}) def test_prep_data(self): """Test basic functionality if prep_data.""" lemi_data = LemiData(self.path2data_corrupted_1, OUTPUT_MSEED) with self.assertRaises(SystemExit) as cmd: lemi_data.prep_data() self.assertEqual(cmd.exception.code, 1)