Change method dispatch in ClientOps to enforce luxi.REQ_ALL
[ganeti-local] / lib / server / masterd.py
index 67ea5ca..b0e8708 100644 (file)
@@ -58,6 +58,8 @@ from ganeti import netutils
 from ganeti import objects
 from ganeti import query
 from ganeti import runtime
+from ganeti import pathutils
+from ganeti import ht
 
 
 CLIENT_REQUEST_WORKERS = 16
@@ -66,6 +68,19 @@ EXIT_NOTMASTER = constants.EXIT_NOTMASTER
 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
 
 
+def _LogNewJob(status, info, ops):
+  """Log information about a recently submitted job.
+
+  """
+  op_summary = utils.CommaJoin(op.Summary() for op in ops)
+
+  if status:
+    logging.info("New job with id %s, summary: %s", info, op_summary)
+  else:
+    logging.info("Failed to submit job, reason: '%s', summary: %s",
+                 info, op_summary)
+
+
 class ClientRequestWorker(workerpool.BaseWorker):
   # pylint: disable=W0221
   def RunTask(self, server, message, client):
@@ -264,27 +279,42 @@ class ClientOps:
       logging.info("Received invalid arguments of type '%s'", type(args))
       raise ValueError("Invalid arguments type '%s'" % type(args))
 
+    if method not in luxi.REQ_ALL:
+      logging.info("Received invalid request '%s'", method)
+      raise ValueError("Invalid operation '%s'" % method)
+
     # TODO: Rewrite to not exit in each 'if/elif' branch
 
     if method == luxi.REQ_SUBMIT_JOB:
-      logging.info("Received new job")
+      logging.info("Receiving new job")
       (job_def, ) = args
       ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
-      return queue.SubmitJob(ops)
+      job_id = queue.SubmitJob(ops)
+      _LogNewJob(True, job_id, ops)
+      return job_id
 
-    if method == luxi.REQ_SUBMIT_MANY_JOBS:
-      logging.info("Received multiple jobs")
+    elif method == luxi.REQ_SUBMIT_MANY_JOBS:
+      logging.info("Receiving multiple jobs")
       (job_defs, ) = args
       jobs = []
       for ops in job_defs:
         jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
-      return queue.SubmitManyJobs(jobs)
+      job_ids = queue.SubmitManyJobs(jobs)
+      for ((status, job_id), ops) in zip(job_ids, jobs):
+        _LogNewJob(status, job_id, ops)
+      return job_ids
 
     elif method == luxi.REQ_CANCEL_JOB:
       (job_id, ) = args
       logging.info("Received job cancel request for %s", job_id)
       return queue.CancelJob(job_id)
 
+    elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
+      (job_id, priority) = args
+      logging.info("Received request to change priority for job %s to %s",
+                   job_id, priority)
+      return queue.ChangeJobPriority(job_id, priority)
+
     elif method == luxi.REQ_ARCHIVE_JOB:
       (job_id, ) = args
       logging.info("Received job archive request for %s", job_id)
@@ -310,7 +340,8 @@ class ClientOps:
                                              qfilter=qfilter))
       elif what == constants.QR_LOCK:
         if qfilter is not None:
-          raise errors.OpPrereqError("Lock queries can't be filtered")
+          raise errors.OpPrereqError("Lock queries can't be filtered",
+                                     errors.ECODE_INVAL)
         return context.glm.QueryLocks(fields)
       elif what == constants.QR_JOB:
         return queue.QueryJobs(fields, qfilter)
@@ -372,6 +403,15 @@ class ClientOps:
       op = opcodes.OpGroupQuery(names=names, output_fields=fields)
       return self._Query(op)
 
+    elif method == luxi.REQ_QUERY_NETWORKS:
+      (names, fields, use_locking) = args
+      logging.info("Received network query request for %s", names)
+      if use_locking:
+        raise errors.OpPrereqError("Sync queries are not allowed",
+                                   errors.ECODE_INVAL)
+      op = opcodes.OpNetworkQuery(names=names, output_fields=fields)
+      return self._Query(op)
+
     elif method == luxi.REQ_QUERY_EXPORTS:
       (nodes, use_locking) = args
       if use_locking:
@@ -407,22 +447,12 @@ class ClientOps:
     elif method == luxi.REQ_SET_WATCHER_PAUSE:
       (until, ) = args
 
-      if until is None:
-        logging.info("Received request to no longer pause the watcher")
-      else:
-        if not isinstance(until, (int, float)):
-          raise TypeError("Duration must be an integer or float")
-
-        if until < time.time():
-          raise errors.GenericError("Unable to set pause end time in the past")
-
-        logging.info("Received request to pause the watcher until %s", until)
-
-      return _SetWatcherPause(until)
+      return _SetWatcherPause(context, until)
 
     else:
-      logging.info("Received invalid request '%s'", method)
-      raise ValueError("Invalid operation '%s'" % method)
+      logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method)
+      raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL,"
+                                   " but not implemented" % method)
 
   def _Query(self, op):
     """Runs the specified opcode and returns the result.
@@ -460,9 +490,10 @@ class GanetiContext(object):
 
     # Locking manager
     self.glm = locking.GanetiLockManager(
-                self.cfg.GetNodeList(),
-                self.cfg.GetNodeGroupList(),
-                self.cfg.GetInstanceList())
+      self.cfg.GetNodeList(),
+      self.cfg.GetNodeGroupList(),
+      self.cfg.GetInstanceList(),
+      self.cfg.GetNetworkList())
 
     self.cfg.SetContext(self)
 
@@ -518,18 +549,36 @@ class GanetiContext(object):
     self.glm.remove(locking.LEVEL_NODE_RES, name)
 
 
-def _SetWatcherPause(until):
+def _SetWatcherPause(context, until):
   """Creates or removes the watcher pause file.
 
+  @type context: L{GanetiContext}
+  @param context: Global Ganeti context
   @type until: None or int
   @param until: Unix timestamp saying until when the watcher shouldn't run
 
   """
+  node_names = context.cfg.GetNodeList()
+
   if until is None:
-    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
+    logging.info("Received request to no longer pause watcher")
   else:
-    utils.WriteFile(constants.WATCHER_PAUSEFILE,
-                    data="%d\n" % (until, ))
+    if not ht.TNumber(until):
+      raise TypeError("Duration must be numeric")
+
+    if until < time.time():
+      raise errors.GenericError("Unable to set pause end time in the past")
+
+    logging.info("Received request to pause watcher until %s", until)
+
+  result = context.rpc.call_set_watcher_pause(node_names, until)
+
+  errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg)
+                           for (node_name, nres) in result.items()
+                           if nres.fail_msg and not nres.offline)
+  if errmsg:
+    raise errors.OpExecError("Watcher pause was set where possible, but failed"
+                             " on the following node(s): %s" % errmsg)
 
   return until
 
@@ -688,10 +737,10 @@ def PrepMasterd(options, _):
   """
   # This is safe to do as the pid file guarantees against
   # concurrent execution.
-  utils.RemoveFile(constants.MASTER_SOCKET)
+  utils.RemoveFile(pathutils.MASTER_SOCKET)
 
   mainloop = daemon.Mainloop()
-  master = MasterServer(constants.MASTER_SOCKET, options.uid, options.gid)
+  master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
   return (mainloop, master)
 
 
@@ -711,7 +760,7 @@ def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
     finally:
       rpc.Shutdown()
   finally:
-    utils.RemoveFile(constants.MASTER_SOCKET)
+    utils.RemoveFile(pathutils.MASTER_SOCKET)
 
   logging.info("Clean master daemon shutdown")