Cluster: add nicparams, and update them on upgrade
[ganeti-local] / daemons / ganeti-masterd
index 5b9989e..2305fc6 100755 (executable)
@@ -36,7 +36,6 @@ import collections
 import Queue
 import random
 import signal
 import Queue
 import random
 import signal
-import simplejson
 import logging
 
 from cStringIO import StringIO
 import logging
 
 from cStringIO import StringIO
@@ -55,6 +54,7 @@ from ganeti import ssconf
 from ganeti import workerpool
 from ganeti import rpc
 from ganeti import bootstrap
 from ganeti import workerpool
 from ganeti import rpc
 from ganeti import bootstrap
+from ganeti import serializer
 
 
 CLIENT_REQUEST_WORKERS = 16
 
 
 CLIENT_REQUEST_WORKERS = 16
@@ -152,7 +152,7 @@ class ClientRqHandler(SocketServer.BaseRequestHandler):
         logging.debug("client closed connection")
         break
 
         logging.debug("client closed connection")
         break
 
-      request = simplejson.loads(msg)
+      request = serializer.LoadJson(msg)
       logging.debug("request: %s", request)
       if not isinstance(request, dict):
         logging.error("wrong request received: %s", msg)
       logging.debug("request: %s", request)
       if not isinstance(request, dict):
         logging.error("wrong request received: %s", msg)
@@ -181,7 +181,7 @@ class ClientRqHandler(SocketServer.BaseRequestHandler):
         luxi.KEY_RESULT: result,
         }
       logging.debug("response: %s", response)
         luxi.KEY_RESULT: result,
         }
       logging.debug("response: %s", response)
-      self.send_message(simplejson.dumps(response))
+      self.send_message(serializer.DumpJson(response))
 
   def read_message(self):
     while not self._msgs:
 
   def read_message(self):
     while not self._msgs:
@@ -213,6 +213,13 @@ class ClientOps:
       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
       return queue.SubmitJob(ops)
 
       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
       return queue.SubmitJob(ops)
 
+    if method == luxi.REQ_SUBMIT_MANY_JOBS:
+      logging.info("Received multiple jobs")
+      jobs = []
+      for ops in args:
+        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
+      return queue.SubmitManyJobs(jobs)
+
     elif method == luxi.REQ_CANCEL_JOB:
       job_id = args
       logging.info("Received job cancel request for %s", job_id)
     elif method == luxi.REQ_CANCEL_JOB:
       job_id = args
       logging.info("Received job cancel request for %s", job_id)
@@ -247,6 +254,8 @@ class ClientOps:
     elif method == luxi.REQ_QUERY_INSTANCES:
       (names, fields, use_locking) = args
       logging.info("Received instance query request for %s", names)
     elif method == luxi.REQ_QUERY_INSTANCES:
       (names, fields, use_locking) = args
       logging.info("Received instance query request for %s", names)
+      if use_locking:
+        raise errors.OpPrereqError("Sync queries are not allowed")
       op = opcodes.OpQueryInstances(names=names, output_fields=fields,
                                     use_locking=use_locking)
       return self._Query(op)
       op = opcodes.OpQueryInstances(names=names, output_fields=fields,
                                     use_locking=use_locking)
       return self._Query(op)
@@ -254,12 +263,16 @@ class ClientOps:
     elif method == luxi.REQ_QUERY_NODES:
       (names, fields, use_locking) = args
       logging.info("Received node query request for %s", names)
     elif method == luxi.REQ_QUERY_NODES:
       (names, fields, use_locking) = args
       logging.info("Received node query request for %s", names)
+      if use_locking:
+        raise errors.OpPrereqError("Sync queries are not allowed")
       op = opcodes.OpQueryNodes(names=names, output_fields=fields,
                                 use_locking=use_locking)
       return self._Query(op)
 
     elif method == luxi.REQ_QUERY_EXPORTS:
       nodes, use_locking = args
       op = opcodes.OpQueryNodes(names=names, output_fields=fields,
                                 use_locking=use_locking)
       return self._Query(op)
 
     elif method == luxi.REQ_QUERY_EXPORTS:
       nodes, use_locking = args
+      if use_locking:
+        raise errors.OpPrereqError("Sync queries are not allowed")
       logging.info("Received exports query request")
       op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
       return self._Query(op)
       logging.info("Received exports query request")
       op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
       return self._Query(op)