if method == luxi.REQ_SUBMIT_JOB:
ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
- return queue.SubmitJob(ops)
+ # we need to compute the node list here, since from now on all
+ # operations require locks on the queue or the storage, and we
+ # shouldn't get another lock
+ node_list = self.server.context.cfg.GetNodeList()
+ return queue.SubmitJob(ops, node_list)
elif method == luxi.REQ_CANCEL_JOB:
(job_id, ) = args
from ganeti import errors
from ganeti import mcpu
from ganeti import utils
+from ganeti import rpc
JOBQUEUE_THREADS = 5
def __init__(self):
self._lock = threading.Lock()
self._memcache = {}
+ self._my_hostname = utils.HostInfo().name
# Make sure our directory exists
try:
utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
data="%s\n" % 0)
- def _NewSerialUnlocked(self):
+ def _NewSerialUnlocked(self, nodes):
"""Generates a new job identifier.
Job identifiers are unique during the lifetime of a cluster.
# Keep it only if we were able to write the file
self._last_serial = serial
+ # Distribute the serial to the other nodes
+ try:
+ nodes.remove(self._my_hostname)
+ except ValueError:
+ pass
+
+ result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE)
+ for node in nodes:
+ if not result[node]:
+ logging.error("copy of job queue file to node %s failed", node)
+
return serial
def _GetJobPath(self, job_id):
return self._GetJobsUnlocked(job_ids)
@utils.LockedMethod
- def AddJob(self, ops):
+ def AddJob(self, ops, nodes):
+ """Create and store on disk a new job.
+
+ @type ops: list
+ @param ops: The list of OpCodes that will becom the new job.
+ @type nodes: list
+ @param nodes: The list of nodes to which the new job serial will be
+ distributed.
+
+ """
assert self.lock_fd, "Queue should be open"
# Get job identifier
- job_id = self._NewSerialUnlocked()
+ job_id = self._NewSerialUnlocked(nodes)
job = _QueuedJob(self, job_id, ops)
# Write to disk
job.SetUnclean("Unclean master daemon shutdown")
@utils.LockedMethod
- def SubmitJob(self, ops):
+ def SubmitJob(self, ops, nodes):
"""Add a new job to the queue.
This enters the job into our job queue and also puts it on the new
queue, in order for it to be picked up by the queue processors.
- Args:
- - ops: Sequence of opcodes
+ @type ops: list
+ @param ops: the sequence of opcodes that will become the new job
+ @type nodes: list
+ @param nodes: the list of nodes to which the queue should be
+ distributed
"""
- job = self._jobs.AddJob(ops)
+ job = self._jobs.AddJob(ops, nodes)
# Add to worker pool
self._wpool.AddTask(job)