Revision c3f0a12f lib/jqueue.py
b/lib/jqueue.py | ||
---|---|---|
35 | 35 |
from ganeti import errors |
36 | 36 |
from ganeti import mcpu |
37 | 37 |
from ganeti import utils |
38 |
from ganeti import rpc |
|
38 | 39 |
|
39 | 40 |
|
40 | 41 |
JOBQUEUE_THREADS = 5 |
... | ... | |
269 | 270 |
def __init__(self): |
270 | 271 |
self._lock = threading.Lock() |
271 | 272 |
self._memcache = {} |
273 |
self._my_hostname = utils.HostInfo().name |
|
272 | 274 |
|
273 | 275 |
# Make sure our directory exists |
274 | 276 |
try: |
... | ... | |
350 | 352 |
utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE, |
351 | 353 |
data="%s\n" % 0) |
352 | 354 |
|
353 |
def _NewSerialUnlocked(self): |
|
355 |
def _NewSerialUnlocked(self, nodes):
|
|
354 | 356 |
"""Generates a new job identifier. |
355 | 357 |
|
356 | 358 |
Job identifiers are unique during the lifetime of a cluster. |
... | ... | |
370 | 372 |
# Keep it only if we were able to write the file |
371 | 373 |
self._last_serial = serial |
372 | 374 |
|
375 |
# Distribute the serial to the other nodes |
|
376 |
try: |
|
377 |
nodes.remove(self._my_hostname) |
|
378 |
except ValueError: |
|
379 |
pass |
|
380 |
|
|
381 |
result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE) |
|
382 |
for node in nodes: |
|
383 |
if not result[node]: |
|
384 |
logging.error("copy of job queue file to node %s failed", node) |
|
385 |
|
|
373 | 386 |
return serial |
374 | 387 |
|
375 | 388 |
def _GetJobPath(self, job_id): |
... | ... | |
434 | 447 |
return self._GetJobsUnlocked(job_ids) |
435 | 448 |
|
436 | 449 |
@utils.LockedMethod |
437 |
def AddJob(self, ops): |
|
450 |
def AddJob(self, ops, nodes): |
|
451 |
"""Create and store on disk a new job. |
|
452 |
|
|
453 |
@type ops: list |
|
454 |
@param ops: The list of OpCodes that will becom the new job. |
|
455 |
@type nodes: list |
|
456 |
@param nodes: The list of nodes to which the new job serial will be |
|
457 |
distributed. |
|
458 |
|
|
459 |
""" |
|
438 | 460 |
assert self.lock_fd, "Queue should be open" |
439 | 461 |
|
440 | 462 |
# Get job identifier |
441 |
job_id = self._NewSerialUnlocked() |
|
463 |
job_id = self._NewSerialUnlocked(nodes)
|
|
442 | 464 |
job = _QueuedJob(self, job_id, ops) |
443 | 465 |
|
444 | 466 |
# Write to disk |
... | ... | |
504 | 526 |
job.SetUnclean("Unclean master daemon shutdown") |
505 | 527 |
|
506 | 528 |
@utils.LockedMethod |
507 |
def SubmitJob(self, ops): |
|
529 |
def SubmitJob(self, ops, nodes):
|
|
508 | 530 |
"""Add a new job to the queue. |
509 | 531 |
|
510 | 532 |
This enters the job into our job queue and also puts it on the new |
511 | 533 |
queue, in order for it to be picked up by the queue processors. |
512 | 534 |
|
513 |
Args: |
|
514 |
- ops: Sequence of opcodes |
|
535 |
@type ops: list |
|
536 |
@param ops: the sequence of opcodes that will become the new job |
|
537 |
@type nodes: list |
|
538 |
@param nodes: the list of nodes to which the queue should be |
|
539 |
distributed |
|
515 | 540 |
|
516 | 541 |
""" |
517 |
job = self._jobs.AddJob(ops) |
|
542 |
job = self._jobs.AddJob(ops, nodes)
|
|
518 | 543 |
|
519 | 544 |
# Add to worker pool |
520 | 545 |
self._wpool.AddTask(job) |
Also available in: Unified diff