X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/6c5a7090314116ff0381763a813ba7c84d888ffb..72737a7f7641e4277401fe154e3d75b61623d013:/daemons/ganeti-masterd diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd index d6ef907..06886f3 100755 --- a/daemons/ganeti-masterd +++ b/daemons/ganeti-masterd @@ -216,10 +216,14 @@ class ClientOps: job_id = args return queue.ArchiveJob(job_id) + elif method == luxi.REQ_AUTOARCHIVE_JOBS: + age = args + return queue.AutoArchiveJobs(age) + elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE: - (job_id, fields, prev_job_info, prev_log_serial) = args + (job_id, fields, prev_job_info, prev_log_serial, timeout) = args return queue.WaitForJobChanges(job_id, fields, prev_job_info, - prev_log_serial) + prev_log_serial, timeout) elif method == luxi.REQ_QUERY_JOBS: (job_ids, fields) = args @@ -240,6 +244,11 @@ class ClientOps: op = opcodes.OpQueryExports(nodes=nodes) return self._Query(op) + elif method == luxi.REQ_QUERY_CONFIG_VALUES: + fields = args + op = opcodes.OpQueryConfigValues(output_fields=fields) + return self._Query(op) + else: raise ValueError("Invalid operation") @@ -252,7 +261,7 @@ class ClientOps: """ proc = mcpu.Processor(self.server.context) # TODO: Where should log messages go? - return proc.ExecOpCode(op, self._DummyLog) + return proc.ExecOpCode(op, self._DummyLog, None) class GanetiContext(object): @@ -349,6 +358,64 @@ def ParseOptions(): return options, args +def CheckAgreement(): + """Check the agreement on who is the master. + + The function uses a very simple algorithm: we must get more positive + than negative answers. Since in most of the cases we are the master, + we'll use our own config file for getting the node list. In the + future we could collect the current node list from our (possibly + obsolete) known nodes. + + """ + myself = utils.HostInfo().name + #temp instantiation of a config writer, used only to get the node list + cfg = config.ConfigWriter() + node_list = cfg.GetNodeList() + del cfg + try: + node_list.remove(myself) + except KeyError: + pass + if not node_list: + # either single node cluster, or a misconfiguration, but I won't + # break any other node, so I can proceed + return True + results = rpc.RpcRunner.call_master_info(node_list) + if not isinstance(results, dict): + # this should not happen (unless internal error in rpc) + logging.critical("Can't complete rpc call, aborting master startup") + return False + positive = negative = 0 + other_masters = {} + for node in results: + if not isinstance(results[node], (tuple, list)) or len(results[node]) < 3: + logging.warning("Can't contact node %s", node) + continue + master_node = results[node][2] + if master_node == myself: + positive += 1 + else: + negative += 1 + if not master_node in other_masters: + other_masters[master_node] = 0 + other_masters[master_node] += 1 + if positive <= negative: + # bad! + logging.critical("It seems we are not the master (%d votes for," + " %d votes against)", positive, negative) + if len(other_masters) > 1: + logging.critical("The other nodes do not agree on a single master") + elif other_masters: + # TODO: resync my files from the master + logging.critical("It seems the real master is %s", + other_masters.keys()[0]) + else: + logging.critical("Can't contact any node for data, aborting startup") + return False + return True + + def main(): """Main function""" @@ -358,6 +425,10 @@ def main(): ssconf.CheckMaster(options.debug) + # we believe we are the master, let's ask the other nodes... + if not CheckAgreement(): + return + master = IOServer(constants.MASTER_SOCKET, ClientRqHandler) # become a daemon @@ -373,8 +444,8 @@ def main(): logging.info("ganeti master daemon startup") # activate ip - master_node = ssconf.SimpleStore().GetMasterNode() - if not rpc.call_node_start_master(master_node, False): + master_node = ssconf.SimpleConfigReader().GetMasterNode() + if not rpc.RpcRunner.call_node_start_master(master_node, False): logging.error("Can't activate master IP address") master.setup_queue()