From 55d472708080d742e9b6fad6bba3f2e8c6a7161f Mon Sep 17 00:00:00 2001 From: Darryl Masson Date: Tue, 10 May 2022 10:45:15 +0200 Subject: [PATCH] Work on GPS start integration --- dispatcher/DAQController.py | 43 +++++++++++++-------- dispatcher/MongoConnect.py | 74 ++++++++++++++++++++++++++++--------- dispatcher/config.ini | 11 +++++- dispatcher/dispatcher.py | 8 ++-- 4 files changed, 99 insertions(+), 37 deletions(-) diff --git a/dispatcher/DAQController.py b/dispatcher/DAQController.py index a43ee2b4..c866d079 100644 --- a/dispatcher/DAQController.py +++ b/dispatcher/DAQController.py @@ -21,7 +21,7 @@ class DAQController(): resetting of runs (the ~hourly stop/start) during normal operations. """ - def __init__(self, config, daq_config, mongo_connector, logger, hypervisor): + def __init__(self, config, daq_config, mongo_connector, logger, hypervisor, logic_timer, gps_start_detectors): self.mongo = mongo_connector self.hypervisor = hypervisor @@ -43,7 +43,7 @@ def __init__(self, config, daq_config, mongo_connector, logger, hypervisor): # Timeout properties come from config self.timeouts = { - k.lower() : int(config['%sCommandTimeout' % k]) + k.lower() : int(config[f'{k}CommandTimeout']) for k in ['Arm','Start','Stop']} self.stop_retries = int(config['RetryReset']) @@ -58,6 +58,9 @@ def __init__(self, config, daq_config, mongo_connector, logger, hypervisor): self.start_cmd_delay = float(config['StartCmdDelay']) self.stop_cmd_delay = float(config['StopCmdDelay']) + self.logic_timer = logic_timer + self.gps_start_detectors = gps_start_detectors + def solve_problem(self, latest_status, goal_state): """ This is sort of the whole thing that all the other code is supporting @@ -154,7 +157,7 @@ def solve_problem(self, latest_status, goal_state): # Deal separately with the TIMEOUT and ERROR statuses, by stopping the detector if needed elif latest_status[det]['status'] == DAQ_STATUS.TIMEOUT: self.logger.info(f"The {det} is in timeout, check timeouts") - self.logger.debug("Checking %s timeouts", det) + self.logger.debug(f"Checking {det} timeouts") self.handle_timeout(detector=det) elif latest_status[det]['status'] == DAQ_STATUS.ERROR: @@ -214,7 +217,7 @@ def control_detector(self, command, detector, force=False): gs = self.goal_state if command == 'arm': if self.one_detector_arming: - self.logger.info('Another detector already arming, can\'t arm %s' % detector) + self.logger.info(f"Another detector already arming, can't arm {detector}") # this leads to run number overlaps return 1 readers, cc = self.mongo.get_hosts_for_mode(gs[detector]['mode']) @@ -222,9 +225,17 @@ def control_detector(self, command, detector, force=False): delay = 0 self.one_detector_arming = True elif command == 'start': + delay = self.start_cmd_delay + if detector in self.gps_start_detectors: + dt = self.mongo.time_to_next_gps() + if 1 < dt < self.logic_timer+1: + # can't start unless there's a GPS signal in the offing + # we give it at least 1 second + self.logger.debug(f'Waiting for GPS signal to start {detector}') + return 0 + delay = 0 readers, cc = self.mongo.get_hosts_for_mode(ls[detector]['mode']) hosts = (readers, cc) - delay = self.start_cmd_delay #Reset arming timeout counter self.missed_arm_cycles[detector]=0 else: # stop @@ -249,8 +260,8 @@ def control_detector(self, command, detector, force=False): return 0 else: - self.logger.debug('Can\'t send %s to %s, timeout at %i/%i' % ( - command, detector, dt, self.timeouts[command])) + self.logger.debug(f'Can\'t send {command} to {detector}, ' + f'timeout at {int(dt)}/{self.timeouts[command]}') return 1 return 0 @@ -291,8 +302,8 @@ def check_timeouts(self, detector, command=None): local_timeouts['stop'] = self.timeouts['stop']*(self.error_stop_count[detector]+1) if dt < local_timeouts[command]: - self.logger.debug('%i is within the %i second timeout for a %s command' % - (dt, local_timeouts[command], command)) + self.logger.debug(f'{int(dt)} is within the {local_timeouts[command]} ' + f'second timeout for a {command} command') else: # timing out, maybe send stop? if command == 'stop': @@ -320,13 +331,12 @@ def check_timeouts(self, detector, command=None): self.error_stop_count[detector] += 1 else: self.mongo.log_error( - ('%s took more than %i seconds to %s, indicating a possible timeout or error' % - (detector, self.timeouts[command], command)), + (f'{detector} took more than {self.timeouts[command]} seconds ' + f'to {command}, indicating a possible timeout or error'), 'ERROR', - '%s_TIMEOUT' % command.upper()) + f'{command.upper()}_TIMEOUT') #Keep track of how often the arming sequence times out - if self.control_detector(detector=detector, command='stop') == 0: - # only increment the counter if we actually issued a STOP + if self.control_detector(detector=detector, command='stop') == 0: # only increment the counter if we actually issued a STOP self.missed_arm_cycles[detector] += 1 self.logger.info(f'{detector} missed {self.missed_arm_cycles[detector]} arm cycles') else: @@ -358,8 +368,9 @@ def check_run_turnover(self, detector): time_now = now() run_length = int(self.goal_state[detector]['stop_after'])*60 run_duration = (time_now - start_time).total_seconds() - self.logger.debug('Checking run turnover for %s: %i/%i' % (detector, run_duration, run_length)) + self.logger.debug(f'Checking run turnover for {detector}: ' + f'{run_duration}/{run_length}') if run_duration > run_length: - self.logger.info('Stopping run for %s' % detector) + self.logger.info(f'Stopping run for {detector}') self.control_detector(detector=detector, command='stop') diff --git a/dispatcher/MongoConnect.py b/dispatcher/MongoConnect.py index 6b5589b1..4aa44488 100644 --- a/dispatcher/MongoConnect.py +++ b/dispatcher/MongoConnect.py @@ -53,7 +53,7 @@ class MongoConnect(object): """ - def __init__(self, config, daq_config, logger, control_mc, runs_mc, hypervisor, testing=False): + def __init__(self, config, daq_config, logger, control_mc, runs_mc, hypervisor, testing=False, logic_timer, gps_start_detectors, gps_period): # Define DB connectivity. Log is separate to make it easier to split off if needed dbn = config['ControlDatabaseName'] @@ -75,6 +75,7 @@ def __init__(self, config, daq_config, logger, control_mc, runs_mc, hypervisor, 'log': self.dax_db['log'], 'options': self.dax_db['options'], 'run': self.runs_db[config['RunsDatabaseCollection']], + 'gps': self.runs_db['gps_sync'], } self.error_sent = {} @@ -147,6 +148,10 @@ def __init__(self, config, daq_config, logger, control_mc, runs_mc, hypervisor, self.command_thread = threading.Thread(target=self.process_commands) self.command_thread.start() + self.gps_start_detectors = gps_start_detectors + self.gps_period = gps_period + self.logic_timer = logic_timer + def quit(self): self.run = False try: @@ -197,14 +202,15 @@ def get_update(self, dc): each node we know about """ try: + sort=[('_id', -1)] for detector in dc.keys(): for host in dc[detector]['readers'].keys(): doc = self.collections['node_status'].find_one({'host': host}, - sort=[('_id', -1)]) + sort=sort) dc[detector]['readers'][host] = doc for host in dc[detector]['controller'].keys(): doc = self.collections['node_status'].find_one({'host': host}, - sort=[('_id', -1)]) + sort=sort) dc[detector]['controller'][host] = doc except Exception as e: self.logger.error(f'Got error while getting update: {type(e)}: {e}') @@ -215,6 +221,22 @@ def get_update(self, dc): # Now compute aggregate status return self.latest_status if self.aggregate_status() is None else None + def time_to_next_gps(self): + """ + In how many seconds can we expect the next GPS signal? + """ + # how long ago was the most recent signal? + utc = self.get_gps_time()[2] + dt = now().timestamp() - utc + return self.gps_period - dt + + def get_gps_time(self): + """ + Return the most recent GPS timestamp, and its UTC equivalent (approx) + """ + doc = self.collections['gps'].find_one({'channel': 0}, sort=[('_id', -1)]) + return doc['gps_sec'], doc['gps_ns'], int(str(doc['_id'])[:8], 16) + def clear_error_timeouts(self): self.error_sent = {} @@ -487,14 +509,14 @@ def get_run_mode(self, mode): return None base_doc = self.collections['options'].find_one({'name': mode}) if base_doc is None: - self.log_error("Mode '%s' doesn't exist" % mode, "info", "info") + self.log_error(f"Mode '{mode}' doesn't exist", "info", "info") return None if 'includes' not in base_doc or len(base_doc['includes']) == 0: return base_doc try: if self.collections['options'].count_documents({'name': {'$in': base_doc['includes']}}) != len(base_doc['includes']): - self.log_error("At least one subconfig for mode '%s' doesn't exist" % mode, "WARNING", "WARNING") + self.log_error(f"At least one subconfig for mode '{mode}' doesn't exist", "WARNING", "WARNING") return None return list(self.collections["options"].aggregate([ {'$match': {'name': mode}}, @@ -507,7 +529,7 @@ def get_run_mode(self, mode): {'$project': {'_id': 0, 'description': 0, 'includes': 0, 'subconfig': 0}}, ]))[0] except Exception as e: - self.logger.error("Got a %s exception in doc pulling: %s" % (type(e), e)) + self.logger.error(f"Got a {type(e)} exception in doc pulling: {e}") return None def get_hosts_for_mode(self, mode, detector=None): @@ -733,7 +755,7 @@ def log_error(self, message, priority, etype): if ( (etype in self.error_sent and self.error_sent[etype] is not None) and (etype in self.error_timeouts and self.error_timeouts[etype] is not None) and (nowtime-self.error_sent[etype]).total_seconds() <= self.error_timeouts[etype]): - self.logger.debug("Could log error, but still in timeout for type %s"%etype) + self.logger.debug(f"Could log error, but still in timeout for type {type}") return self.error_sent[etype] = nowtime try: @@ -744,7 +766,7 @@ def log_error(self, message, priority, etype): }) except Exception as e: self.logger.error(f'Database error, can\'t issue error message: {type(e)}, {e}') - self.logger.info("Error message from dispatcher: %s" % (message)) + self.logger.info(f"Error message from dispatcher: {message}") return def get_run_start(self, number): @@ -777,10 +799,11 @@ def insert_run_doc(self, detector): 'user': self.goal_state[detector]['user'], 'mode': self.goal_state[detector]['mode'], 'bootstrax': {'state': None}, - 'end': None + 'end': None, + 'gps_start': [0,0], } - # If there's a source add the source. Also add the complete ini file. + # If there's a source add the source. Also add the complete config cfg = self.get_run_mode(self.goal_state[detector]['mode']) if cfg is not None and 'source' in cfg.keys(): run_doc['source'] = str(cfg['source']) @@ -802,13 +825,30 @@ def insert_run_doc(self, detector): 'location': cfg['strax_output_path'] }] - # The cc needs some time to get started - time.sleep(self.cc_start_wait) - try: - start_time = self.get_ack_time(detector, 'start') - except Exception as e: - self.logger.error('Couldn\'t find start time ack') - start_time = None + if detector in self.gps_start_detectors: + # wait for GPS signal to come along + time.sleep(self.time_to_next_gps()+0.5) + try: + gps_sec, gps_ns, utc = self.get_gps_time() + if (now().timestamp() - utc) < self.logic_timer: + # we got the freshest entry + start_time = datetime.datetime.fromtimestamp(gps_sec + gps_ns/1e9, + tz=pytz.utc) + run_doc['gps_start'] = [gps_sec, gps_ns] + else: + self.logger.error(f'GPS timestamp too old? {gps_sec}, {gps_ns}, {utc}') + start_time = None + except Exception as e: + self.logger.error('Couldn\'t find a start time') + start_time = None + else: + # The cc needs some time to get started + time.sleep(self.cc_start_wait) + try: + start_time = self.get_ack_time(detector, 'start') + except Exception as e: + self.logger.error('Couldn\'t find start time ack') + start_time = None if start_time is None: start_time = now()-datetime.timedelta(seconds=2) diff --git a/dispatcher/config.ini b/dispatcher/config.ini index 6471db91..662bdeb0 100644 --- a/dispatcher/config.ini +++ b/dispatcher/config.ini @@ -5,7 +5,8 @@ LogName = dispatcher # Poll frequency in seconds for main program loop. It makes # some sense to make other time-based options multiples of -# this, though not required +# this, though not required. Do not make this an even divisor +# of the GPS period or you could deadlock PollFrequency = 4 # How long since a client's last check-in until we consider @@ -20,6 +21,11 @@ ControlDatabaseName = daq RunsDatabaseName = xenonnt RunsDatabaseCollection = runs +# these detectors start in sync with the GPS +StartWithGPS = tpc +# period of GPS signals +GPSPeriod = 10 + # Timeouts (seconds to wait until we assume it failed) # Give Arm command some time in case baselines noisy ArmCommandTimeout = 60 @@ -42,6 +48,7 @@ HypervisorNuclearTimeout = 1800 # Time between reader and CC start (can be float) # THIS IS AN IMPORTANT VALUE, DON'T CHANGE IT UNLESS YOU KNOW WHAT YOU'RE DOING # 20210308 - Darryl +# 20220414 - Only meaningful for detectors not starting with GPS signals StartCmdDelay = 1 # Time between CC and reader stop (less important, also can be float) StopCmdDelay = 5 @@ -88,6 +95,8 @@ TimeoutActionThreshold = 10 ControlDatabaseName = test RunsDatabaseName = test RunsDatabaseCollection = runs +StartWithGPS= +GPSPeriod=10 ArmCommandTimeout = 60 MaxArmCycles=3 StartCommandTimeout = 20 diff --git a/dispatcher/dispatcher.py b/dispatcher/dispatcher.py index b106a1c2..84bd0c1c 100755 --- a/dispatcher/dispatcher.py +++ b/dispatcher/dispatcher.py @@ -62,6 +62,7 @@ def setup(): def main(config, control_mc, logger, daq_config, vme_config, SlackBot, runs_mc, args): sh = daqnt.SignalHandler() + sleep_period = int(config['PollFrequency']) # Declare necessary classes hv = Hypervisor( @@ -73,14 +74,15 @@ def main(config, control_mc, logger, daq_config, vme_config, SlackBot, runs_mc, testing=args.test, slackbot=SlackBot, ) - mc = MongoConnect(config, daq_config, logger, control_mc, runs_mc, hv, args.test) - dc = DAQController(config, daq_config, mc, logger, hv) + gps_start = config['StartWithGPS'].split() + gps_period = int(config['GPSPeriod') + mc = MongoConnect(config, daq_config, logger, control_mc, runs_mc, hv, args.test, sleep_period, gps_start, gps_period) + dc = DAQController(config, daq_config, mc, logger, hv, sleep_period, gps_start) # connect the triangle hv.mongo_connect = mc hv.daq_controller = dc - sleep_period = int(config['PollFrequency']) last_loop_dt = 0