burnin: implement basic confd testing
[ganeti-local] / tools / burnin
index cd16644..91aae6b 100755 (executable)
@@ -36,6 +36,9 @@ from ganeti import constants
 from ganeti import cli
 from ganeti import errors
 from ganeti import utils
+from ganeti import ssconf
+
+from ganeti.confd import client as confd_client
 
 
 USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
@@ -164,6 +167,9 @@ OPTIONS = [
   cli.cli_option("--no-nics", dest="nics",
                  help="No network interfaces", action="store_const",
                  const=[], default=[{}]),
+  cli.cli_option("--no-confd", dest="do_confd_tests",
+                 help="Skip confd queries",
+                 action="store_false", default=True),
   cli.cli_option("--rename", dest="rename", default=None,
                  help=("Give one unused instance name which is taken"
                        " to start the renaming sequence"),
@@ -260,6 +266,7 @@ class Burner(object):
     self.hvp = self.bep = None
     self.ParseOptions()
     self.cl = cli.GetClient()
+    self.ss = ssconf.SimpleStore()
     self.GetState()
 
   def ClearFeedbackBuf(self):
@@ -385,18 +392,19 @@ class Burner(object):
 
     """
     self.ClearFeedbackBuf()
-    job_ids = [cli.SendJob(row[0], cl=self.cl) for row in jobs]
-    Log("Submitted job ID(s) %s", utils.CommaJoin(job_ids), indent=1)
-    results = []
-    for jid, (_, iname) in zip(job_ids, jobs):
-      Log("waiting for job %s for %s", jid, iname, indent=2)
-      try:
-        results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback))
-      except Exception, err: # pylint: disable-msg=W0703
-        Log("Job for %s failed: %s", iname, err)
-    if len(results) != len(jobs):
+    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
+    for ops, name in jobs:
+      jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
+    try:
+      results = jex.GetResults()
+    except Exception, err: # pylint: disable-msg=W0703
+      Log("Jobs failed: %s", err)
       raise BurninFailure()
-    return results
+
+    if utils.any(results, lambda x: not x[0]):
+      raise BurninFailure()
+
+    return [i[1] for i in results]
 
   def ParseOptions(self):
     """Parses the command line options.
@@ -455,7 +463,7 @@ class Burner(object):
     socket.setdefaulttimeout(options.net_timeout)
 
   def GetState(self):
-    """Read the cluster state from the config."""
+    """Read the cluster state from the master daemon."""
     if self.opts.nodes:
       names = self.opts.nodes.split(",")
     else:
@@ -485,6 +493,14 @@ class Burner(object):
     if not found:
       Err("OS '%s' not found" % self.opts.os)
 
+    cluster_info = self.cl.QueryClusterInfo()
+    self.cluster_info = cluster_info
+    if not self.cluster_info:
+      Err("Can't get cluster info")
+
+    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
+    self.cluster_default_nicparams = default_nic_params
+
   @_DoCheckInstances
   @_DoBatch(False)
   def BurnCreateInstances(self):
@@ -830,6 +846,67 @@ class Burner(object):
       Log("removing last NIC", indent=2)
       self.ExecOrQueue(instance, op_add, op_rem)
 
+  def ConfdCallback(self, reply):
+    """Callback for confd queries"""
+    if reply.type == confd_client.UPCALL_REPLY:
+      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
+        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
+                                                    reply.server_reply.status,
+                                                    reply.server_reply))
+      if reply.orig_request.type == constants.CONFD_REQ_PING:
+        Log("Ping: OK", indent=1)
+      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
+        if reply.server_reply.answer == self.cluster_info["master"]:
+          Log("Master: OK", indent=1)
+        else:
+          Err("Master: wrong: %s" % reply.server_reply.answer)
+      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
+        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
+          Log("Node role for master: OK", indent=1)
+        else:
+          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
+
+  def DoConfdRequestReply(self, req):
+    self.confd_counting_callback.RegisterQuery(req.rsalt)
+    self.confd_client.SendRequest(req, async=False)
+    while not self.confd_counting_callback.AllAnswered():
+      if not self.confd_client.ReceiveReply():
+        Err("Did not receive all expected confd replies")
+        break
+
+  def BurnConfd(self):
+    """Run confd queries for our instances.
+
+    The following confd queries are tested:
+    - CONFD_REQ_PING: simple ping
+    - CONFD_REQ_CLUSTER_MASTER: cluster master
+    - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
+
+    """
+    Log("Checking confd results")
+
+    hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
+    mc_file = self.ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
+    mc_list = utils.ReadFile(mc_file).splitlines()
+    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
+    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
+    self.confd_counting_callback = counting_callback
+
+    self.confd_client = confd_client.ConfdClient(hmac_key, mc_list,
+                                                 counting_callback)
+
+    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
+    self.DoConfdRequestReply(req)
+
+    req = confd_client.ConfdClientRequest(
+      type=constants.CONFD_REQ_CLUSTER_MASTER)
+    self.DoConfdRequestReply(req)
+
+    req = confd_client.ConfdClientRequest(
+        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
+        query=self.cluster_info["master"])
+    self.DoConfdRequestReply(req)
+
   def _CheckInstanceAlive(self, instance):
     """Check if an instance is alive by doing http checks.
 
@@ -912,8 +989,14 @@ class Burner(object):
       if opts.do_addremove_disks:
         self.BurnAddRemoveDisks()
 
+      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
+      # Don't add/remove nics in routed mode, as we would need an ip to add
+      # them with
       if opts.do_addremove_nics:
-        self.BurnAddRemoveNICs()
+        if default_nic_mode == constants.NIC_MODE_BRIDGED:
+          self.BurnAddRemoveNICs()
+        else:
+          Log("Skipping nic add/remove as the cluster is not in bridged mode")
 
       if opts.do_activate_disks:
         self.BurnActivateDisks()
@@ -921,6 +1004,9 @@ class Burner(object):
       if opts.rename:
         self.BurnRename()
 
+      if opts.do_confd_tests:
+        self.BurnConfd()
+
       if opts.do_startstop:
         self.BurnStopStart()