#!/usr/bin/env python ''' Find & fix mis-timed RT-130 data from Dec 2016 leap second. Lloyd Carothers IRIS/PASSCAL ''' VERSION = '2017.094' import argparse import os import re from struct import unpack, pack import time import zipfile # Set all time calcualations storage to UTC os.environ['TZ'] = '0' time.tzset() ############################ #Time handling ############################ class Ltime(object): ''' An upgraded time object with milliseconds stores time in epoch for slow creation but faster comparison May need to turn epoch and milli into ints to prevent rounding problems self.epoch :: float refpktstamp :: YY:DDD:HH:MM:SS.MIL epoch :: float shstamp :: deal with this later ''' #reftek packet time format for strptime PKTFMT = "%y:%j:%H:%M:%S" SECperDAY = 86400 SECperHOUR = 3600 def __init__(self, pktstamp=None, epoch=None, shstamp=None, strptime=None): if not (pktstamp or epoch or shstamp or strptime): raise RuntimeError, "Must provide 1 form of time" if epoch is not None: self.epoch = epoch if pktstamp is not None: self.set_pktstamp(pktstamp) if shstamp is not None: self.set_shstamp(shstamp) if strptime is not None: self.epoch = time.mktime(time.strptime(*strptime)) def set_pktstamp(self, pktstamp): self.epoch = time.mktime( time.strptime(pktstamp[:-4], Ltime.PKTFMT) ) + float( pktstamp[-4:] ) def get_pktstamp(self): #time should be instance of Ltime st = self.get_struct_time() styear = str(st.tm_year)[-2:] pyear = pack('1B', int(styear,16)) mils = "%0.3f" % self.get_milisec() mils = mils[-3:] ts = "%0.3d"% st.tm_yday + "%0.2d" % st.tm_hour + '%0.2d'%st.tm_min + '%0.2d'%st.tm_sec + mils ptime = pack('6B', int(ts[:2],16), int(ts[2:4],16), int(ts[4:6],16), int(ts[6:8],16), int(ts[8:10],16), int(ts[10:12],16) ) return pyear, ptime def set_shstamp(self, shstamp): self.epoch = time.mktime( time.strptime(shstamp, Ltime.PKTFMT) ) def get_shstamp(self): st = self.get_struct_time() return "%0.3d:%0.2d:%0.2d:%0.2d" % (st.tm_yday, st.tm_hour, st.tm_min, st.tm_sec) def get_epoch(self): #includes milisecs return self.epoch def get_milisec(self): r = self.epoch % 1.0 return r def get_struct_time(self): t = time.gmtime(self.epoch) return t def add_hours(self, hours): return Ltime( epoch = self.epoch + hours * Ltime.SECperHOUR ) def add_days(self, days): return Ltime( epoch = self.epoch + days * Ltime.SECperDAY ) def __str__(self): #s = "Time struct: " + str(self.timetup) + " Milisec: " + str(self.milisec) #s += " Epoch: " + str( self.get_epoch() ) s = time.strftime(Ltime.PKTFMT, self.get_struct_time()) ms = "%0.3f" % self.get_milisec() s = '20' + s + '.' + ms[-3:] return s def __repr__(self): return self.__str__() def calday_str(self): #returns a string in calendar day format return time.asctime(self.get_struct_time()) def __add__(self, seconds): #other must be seconds return Ltime(epoch= self.epoch + seconds) def __eq__(self, other): return "%0.3f" % self.epoch == "%0.3f" % other.epoch def __ne__(self, other): return "%0.3f" % self.epoch != "%0.3f" % other.epoch def __lt__(self, other): return self.epoch < other.epoch def __le__(self, other): return self.epoch <= other.epoch def __gt__(self, other): return self.epoch > other.epoch def __ge__(self, other): return self.epoch >= other.epoch def __sub__(self, other): if isinstance( other, Ltime): return self.epoch - other.epoch if isinstance(other, int): return Ltime(epoch= self.epoch - other) # Constant of time NEWYEAR = Ltime(strptime=('2017', '%Y')) YEAR2018 = Ltime(strptime=('2018', '%Y')) class Ltimespan(object): ''' time span class start and end are of type Ltime ''' def __init__(self, t1, t2): if not ( isinstance(t1,Ltime) and isinstance(t2,Ltime) ): raise Exception, 'Arguments must both be of type Ltime' if t1 < t2: self.start = t1 self.end = t2 else: self.start = t2 self.end = t1 def __str__(self): return str(self.start) + " - " + str(self.end) def __repr__(self): return self.__str__() def calday_str(self): return self.start.calday_str() + " - " + self.end.calday_str() def has(self, t): # returns true if t is in time span inclusive on start not inclusive on end if self.start <= t and t < self.end: return True else: return False class Packet(object): '''Rt-130 packet''' PACKET_SIZE = 1024 packetHdrFmt = '2c1B1B2B6B2B2B' reDSPDIFF = re.compile(r'(\d{3}:\d{2}:\d{2}:\d{2})\sDSP CLOCK DIFFERENCE:\s+(-?\d+)\s+SECS') def __init__(self, buffer): self.buffer = buffer self.unpack() @staticmethod def isSH(buffer): if buffer[:2] == 'SH': return True else: return False @staticmethod def isEH(buffer): if buffer[:2] == 'EH': return True else: return False @staticmethod def isDT(buffer): if buffer[:2] == 'DT': return True else: return False @property def SHstring(self): return self.buffer[24:self.bytecount] def DSPdiff(self): '''Returns [(Ltime, diff), (Ltime, diff)...]''' if self.ptype == 'SH': dspmatch = self.reDSPDIFF.findall(self.SHstring) if dspmatch: dsps = list() for dsptime, diff in dspmatch: ltime = Ltime(shstamp='{:02d}:'.format(self.year)+dsptime) diff = int(diff) dsps.append((ltime, diff)) return dsps return False def unpack(self): '''parses a 1024 byte reftek packet header''' if len(self.buffer) != 1024: raise RuntimeError, "Packet not 1024 bytes!" tup = unpack(self.packetHdrFmt, self.buffer[:16]) self.ptype = tup[0] + tup[1] self.expnum = int("%0.2X" % tup[2]) self.year = int("%0.2X" %tup[3]) self.sn = "%0.2X%0.2X" % (tup[4] , tup[5] ) self.time = "%0.2X%0.2X%0.2X%0.2X%0.2X%0.2X" % tup[6:12] self.day, self.hour, self.minute, self.sec, self.millisec = \ int(self.time[:3]),int(self.time[3:5]),int(self.time[5:7]),int(self.time[7:9]),int(self.time[9:]) self.bytecount = int("%0.2X%0.2X" % tup[12:14]) self.sequence = int("%0.2X%0.2X" % tup[14:16]) self.Ltime = Ltime('%02d:%03d:%02d:%02d:%02d.%03d' % ( self.year, self.day, self.hour, self.minute, self.sec, self.millisec)) if self.ptype == 'DT': self.unpackDT() if self.ptype == 'EH': self.unpackEH() def unpackDT(self): if self.ptype != 'DT': return False self.chan = int('%0.1X' % unpack('B', self.buffer[19])) self.num_samp = int( "%0.2X%0.2X" % unpack('2B', self.buffer[20:22])) self.data_format = '%0.1X' % unpack('B', self.buffer[23]) self.unpackEHDT() def unpackEH(self): if self.ptype != 'EH': return False self.samplerate = float(self.buffer[88:92]) self.unpackEHDT() def unpackEHDT(self): ''' fields common to EH and DT''' self.event_num = int("%0.2X%0.2X" % unpack('2B', self.buffer[16:18])) self.ds = int('%0.1X' % unpack('B', self.buffer[18])) def __str__(self): tup = (self.ptype, self.expnum, self.sn, self.year,\ self.day, self.hour, self.minute, self.sec, self.millisec,\ self.bytecount, self.sequence) s = '%s %02d %s 20%02d:%03d:%02d:%02d:%02d.%03d %04d %04d' s = s % tup if self.isSH(self.buffer): s += '\n' + self.SHstring elif self.ptype == 'DT': tup = (self.event_num, self.ds, self.chan, self.num_samp, self.data_format) s += ' %02d %01d %01d %02d %s' % tup elif self.ptype == 'EH': tup = (self.event_num, self.ds, self.samplerate) s += ' %02d %01d %04f' % tup return s def __repr__(self): return self.__str__() def pack_minus_one(self): newtime = self.Ltime - 1 pyear, ptime = newtime.get_pktstamp() newbuf = self.buffer[:3] + pyear + self.buffer[4:6] + ptime + self.buffer[12:] return newbuf class Tear(object): '''Object representing a gap or overlap.''' def __init__(self, start, should_start, packet): self.start = start self.should_start = should_start self.length = start - should_start self.packet = packet def __str__(self): s = "TEAR: ID: {p.sn} DS: {p.ds} CH: {p.chan}\n".format(p=self.packet) s+= "\tStart: %s\n" % self.start s+= "\tExpected: %s\n" % self.should_start s+= "\tLength: %21.3f\n" % self.length return s def __repr__(self): return self.__str__() class DataStream(object): def __init__(self, EH=None): self.event = None self.sr = None #tuple (next_start, lastpacket_seen) self.channels = dict() def update(self, EH): '''Update info from event header but don't overwrite chan info ''' self.event = EH.event_num self.sr = EH.samplerate def addDT(self, DT): '''Returns (should_start, last_packet) OR (None, None)''' if not self.channels.has_key(DT.chan): next_start = DT.Ltime + DT.num_samp / self.sr self.channels[DT.chan] = (next_start, DT) return None, None else: should_start, last_packet = self.channels[DT.chan] self.channels[DT.chan] = (DT.Ltime + DT.num_samp / self.sr, DT) return should_start, last_packet class DataStreams(object): def __init__(self, *args, **kwargs): self.streams = dict() def addEH(self, EH): if not self.streams.has_key(EH.ds): self.streams[EH.ds] = DataStream() self.streams[EH.ds].update(EH) def addDT(self, DT): ''' Returns next expected start time or None''' if not self.streams.has_key(DT.ds): #Haven't seen a EH for this stream # self.streams[EH.ds] = DataStream() # self.streams[TD.ds].addDT(DT) # print "Can't process. No EH for stream %s" % DT.ds return None, None else: return self.streams[DT.ds].addDT(DT) class Reftek(object): '''The reftek "file" format, but really the base for all reftek formats cf.zip, cf directory, .ref files ''' def __init__(self, *args, **kwargs): super(Reftek, self).__init__(*args, **kwargs) @classmethod def get(cls, path): if os.path.isdir(path): #check if cfdir return False elif zipfile.is_zipfile(path): return def find_tears(self, start=None, end=None): '''Finds all gaps and overlaps ignoring mass positions ''' dstreams = DataStreams() # tears tears = list() for p in self.gen_packets(start=start, end=end): # if eh if p.ptype == 'EH': # add /replace stream dstreams.addEH(p) continue # if dt if p.ptype == 'DT' and p.ds < 8: first_samp_time = p.Ltime should_start, last_packet = dstreams.addDT(p) if not should_start: # print "No expected start(no previous packet?)" continue if first_samp_time != should_start: tear = Tear(first_samp_time, should_start, p) #print tear #print last_packet #print p tears.append(tear) return tears def find_dsp_diffs(self, start=None, end=None): '''Find all DSP diff lines in SOH''' dsp_list = list() for p in self.gen_packets(start=start, end=end, ds=0): if p.DSPdiff(): dsp_list.append(p.DSPdiff()) return dsp_list def find_2016_leap_dsp(self): '''Returns the time of the first dsp diff of -1s 0msec after 2017:001 or False if no such diff found''' for p in self.gen_packets(start=NEWYEAR, end=None, ds=0): dsplist = p.DSPdiff() if dsplist: for time, diff in dsplist: if int(diff) == -1: return time return False def has_corrected(self): for (path, dirs, files) in os.walk(self.root): for f in files: if f.endswith(self.CFfile.BACKUP_SUFFIX): return True return False def gen_2016_corrected_packets(self): '''generate time corrected (-1sec) reftek packets from 2017:001:00:00:00 until the first tear or dsp time yields a string of a packet''' dsptime = self.find_2016_leap_dsp() # print dsptime for p in self.gen_packets(start=NEWYEAR, end=dsptime): # print # print p if p.ptype == 'DT': corrected_buf = p.pack_minus_one() # print Packet(corrected_buf) yield corrected_buf else: yield p.buffer def mistimed_2017_cf_files(self): ''' Return a list of mistimed CF event files ''' mistimed_files = [] dsptime = self.find_2016_leap_dsp() print 'DAS corrected for leap at: ', dsptime or 'never' if not dsptime: print 'No DSP set, can NOT make any corrections.' return [] for cf in self.gen_cf_files(start=NEWYEAR, end=dsptime): if 0 < cf.ds and cf.ds < 8: # 8 datastreams, 0=ds, 9=M.P. mistimed_files.append(cf) return mistimed_files def correct_2017_leap(self): if self.has_corrected(): print "File already has corrections applied, NOT correcting." return False # Make list of files to list cf_to_fix = rt.mistimed_2017_cf_files() print 'Files found to be in need of time adjust:' for f in cf_to_fix: print '\t', f print # Rename files for f in cf_to_fix: print f print '\t', 'Backing up original.' f.backup_cf_file() #f.restore_cf_file() print '\t', 'Adjusting time.' with f as infh: with open(f.original_path, 'w') as outfh: for p in infh.gen_packets(): if p.ptype == 'DT': # Some rt-130's wrote data in 2027 toss it. if p.Ltime > YEAR2018: continue outfh.write(p.pack_minus_one()) else: outfh.write(p.buffer) def restore(self): for (path, dirs, files) in os.walk(self.root): for f in files: cf_file = self.CFfile(os.path.join(path, f)) if cf_file.backed_up: print cf_file.path cf_file.restore_cf_file() def print_packets(self): for p in self: print p class CFfile(object): '''Represents a filename with path in CF card dir structure or Neo .ZIP.''' TIME_FORMAT = '%Y%j%H%M%S' BACKUP_SUFFIX = '.ORIG' def __init__(self, path): self.path = path self.next = self.gen_packets if self.path.endswith(self.BACKUP_SUFFIX): self.backed_up = True self.original_path = self.path[:-5] else: self.backed_up = False try: yearday, self.sn, self.ds, self.event_file = path.split('/')[-4:] self.ds = int(self.ds) except ValueError: self.valid = None return None # ensure file format if not self.has_name_format(): self.valid = False return None self.valid = True self.year = yearday[:4] self.day = yearday[-3:] self.hour = self.event_file[:2] self.minute = self.event_file[2:4] self.second = self.event_file[4:6] self.millisecond = float(self.event_file[6:9]) / 1000.0 time_str = self.year + self.day + self.event_file[:6] self.ltime = Ltime(strptime=(time_str, self.TIME_FORMAT)) self.ltime += self.millisecond def has_name_format(self): first, sep, second = self.event_file.partition('_') if not first.isdigit(): return False if not second.isalnum(): return False return True def __enter__(self): self.fh = open(self.path, 'r') return self def __exit__(self, *args): self.fh.close() def __str__(self): return '{0.path} : {0.sn} {0.ds} {0.ltime}'.format(self) def __repr__(self): return self.__str__() def backup_cf_file(self): if self.backed_up: print "Already backedup. " , self.path return False backup_path = self.path + self.BACKUP_SUFFIX os.rename(self.path, backup_path) self.original_path = self.path self.path = backup_path self.backed_up = True def restore_cf_file(self): # Move .ORIG to original filename, for development. if not self.backed_up: print "Not a backup. ", self.path return False os.rename(self.path, self.original_path) self.path = self.original_path self.backed_up = False def gen_packets(self, start=None, end=None, ds=None, ptype=None): with self as __: while True: data = self.fh.read(Packet.PACKET_SIZE) if not data: raise StopIteration yield Packet(data) def print_packets(self): for p in self.gen_packets(): print p class RefFile(Reftek): '''A .ref file''' def __init__(self, filename): '''arg[0] filename''' super(RefFile, self).__init__() self.filename = filename self.next = self.gen_packets def __enter__(self): self.fh = open(self.filename, 'r') return self def __exit__(self, *args): self.fh.close() # print args @staticmethod def is_ref(filename): return filename[-4:].lower() == '.ref' def __iter__(self): return self.gen_packets() def gen_packets(self, start=None, end=None, ds=None, ptype=None): while True: data = self.fh.read(Packet.PACKET_SIZE) if not data: raise StopIteration yield Packet(data) class RtCfDir(Reftek): '''A CF directory structure rt-130's record to''' def __init__(self, root): self.root = root self.next = self.gen_packets def __iter__(self): return self.gen_packets() def __str__(self): return self.root def __repr__(self): return self.__str__() def gen_packets(self, start=None, end=None, ds=None, ptype=None): for cf_file in self.gen_cf_files(start=start, end=end, ds=ds): for packet in self.gen_event_packets(cf_file): if start and packet.Ltime < start: continue if end and packet.Ltime > end: continue yield packet raise StopIteration def gen_cf_files(self, start=None, end=None, ds=None): for (path, dirs, files) in os.walk(self.root): for f in files: cf_file = self.CFfile(os.path.join(path, f)) if not cf_file.valid: continue if start and cf_file.ltime < start: continue if end and cf_file.ltime > end: continue if ds is not None and cf_file.ds != int(ds): continue yield cf_file def gen_event_packets(self, cf_file): with open(cf_file.path) as fh: while True: data = fh.read(Packet.PACKET_SIZE) if not data: raise StopIteration yield Packet(data) class RtZip(Reftek, zipfile.ZipFile): '''A reftek cf card dir structure in a zip file.''' def __init__(self, *args, **kwargs): super(RtZip, self).__init__(*args, **kwargs) self.next = self.gen_packets @staticmethod def is_zip(filename): '''Returns True if file ends in .ZIP or .zip and is regular file. ''' if filename[-4:].lower() == '.zip' and os.path.isfile(filename): return True return False def __iter__(self): return self.gen_packets() def __str__(self): return self.filename def __repr__(self): return self.__str__() def gen_cf_files(self, start=None, end=None, ds=None): for event_file in sorted(self.namelist()): cffile = self.CFfile(event_file) if start and cffile.ltime < start: continue if end and cffile.ltime > end: continue if ds is not None and cffile.ds != int(ds): continue yield cffile def gen_packets(self, start=None, end=None, ds=None, ptype=None): for cffile in self.gen_cf_files(start=start, end=end, ds=ds): for packet in self.gen_event_packets(cffile): if start and packet.Ltime < start: continue if end and packet.Ltime > end: continue yield packet raise StopIteration def gen_event_packets(self, cffile): with self.open(cffile.path) as fh: while True: data = fh.read(Packet.PACKET_SIZE) if not data: raise StopIteration yield Packet(data) def print_packets(self): for p in self: print p def print_packets(zips): for zip in zips: print zip with RtZip(zip, 'r') as rtzip: for p in rtzip: print p def find_tears(zips): for zip in zips: print zip try: with RtZip(zip, 'r') as rtzip: print rtzip.find_tears(start=NEWYEAR, end=Ltime(strptime=('2017:002', '%Y:%j')), ) except zipfile.BadZipfile as e: print "Can not open {}.".format(e) print e def dsp_diffs(zips): for zip in zips: print zip try: with RtZip(zip, 'r') as rtzip: print rtzip.find_2016_leap_dsp() except zipfile.BadZipfile as e: print "Can not open {}.".format(e) print e def write_out_leap_fixed(rt, outfile): if not rt.find_2016_leap_dsp(): print 'No DSP after 2017. No correction to be made' return None, None print 'Writing time corrected data to {}'.format(outfile) with open(outfile,'w') as outfh: correct_packets = rt.gen_2016_corrected_packets() # Save the first packet first_buf = correct_packets.next() outfh.write(first_buf) for buf in correct_packets: outfh.write(buf) # return first and last packet (for printing etc) return Packet(first_buf), Packet(buf) def print_zip_contents(zips): for i, zip in enumerate(zips): print i, zip with RtZip(zip, 'r') as rtzip: for cffile in rtzip.gen_cf_files(): print cffile def find_rt_files(paths): rt_paths = list() for path in paths: if os.path.isdir(path): # For rt cf dir structure if False: pass # Get reg files in dir, don't recurse for f in [f for f in os.listdir(path) if os.path.isfile(f)]: rt_paths.extend(find_rt_files([os.path.join(path,f)])) elif zipfile.is_zipfile(path): # print path rt_paths.append(path) elif RefFile.is_ref(path): rt_paths.append(path) return rt_paths if __name__ == '__main__': parser = argparse.ArgumentParser() parser.version = VERSION parser.description = ('Program for finding, viewing, and correcting ' 'Dec 2016 leapsecond mistimed RT-130 data ' 'recorded to CF media. Not to be used on similar ' 'directory structures created by RTPD as issues ' 'are known to exist.' ) parser.add_argument('paths', nargs='*', default=['./'], help='Input CF dirs. The root dir of a CF card ' 'recorded by and RT-130, containing YYYYDDD dirs.', ) parser.add_argument('-p', dest='print_packets', action='store_true', help='Print all packets. Like refpacket.', ) parser.add_argument('-t', dest='find_tears', action='store_true', help='Print all tears. Does not include mass position ' 'channels.', ) parser.add_argument('-j', dest='find_jerks', action='store_true', help='Print all jerks.', ) parser.add_argument('-c', dest='correct', action='store_true', help='Fix Dec 2016 leapsecond tears. Files corrected ' 'will first be backed up with .ORIG suffix, then ' 'corrections will be applied to the original files ' 'allowing most downstream processes (rt2ms) to work ' 'on the corrected data. Does not alter mass position ' 'or SOH channels.', ) parser.add_argument('-r', dest='restore', action='store_true', help='ks.', ) args = parser.parse_args() refs = [] for path in args.paths: if os.path.isdir(path): ref = RtCfDir(path) else: ref = RtCfDir.CFfile(path) # ref = RefFile(path) refs.append(ref) if args.correct: for rt in refs: # Only correct cf dirs if not isinstance(rt, RtCfDir): continue print '-' * 15, 'Correcting: ', rt rt.correct_2017_leap() print '-' * 60 if args.print_packets: for rt in refs: rt.print_packets() print '-' * 60 if args.find_tears: for rt in refs: print '-' * 15, 'Finding tears: ', rt if rt.has_corrected(): print 'Backups exist, corrections likely already applied.' for t in rt.find_tears(): print t print '-' * 60 if args.find_jerks: for rt in refs: print '-' * 15, 'Finding jerks: ', rt for l in rt.find_dsp_diffs(): for jerk in l: print jerk print '-' * 60 if args.restore: for rt in refs: print '-' * 15, 'Restoring from original backups: ', rt rt.restore() print '-' * 60