4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 # pylint: disable-msg=E0611
41 from pyinotify import pyinotify
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import netutils
57 from ganeti import compat
61 JOBS_PER_ARCHIVE_DIRECTORY = 10000
63 # member lock names to be passed to @ssynchronized decorator
68 class CancelJob(Exception):
69 """Special exception to cancel a job.
75 """Returns the current timestamp.
78 @return: the current time in the (seconds, microseconds) format
81 return utils.SplitTime(time.time())
84 class _QueuedOpCode(object):
85 """Encapsulates an opcode object.
87 @ivar log: holds the execution log and consists of tuples
88 of the form C{(log_serial, timestamp, level, message)}
89 @ivar input: the OpCode we encapsulate
90 @ivar status: the current status
91 @ivar result: the result of the LU execution
92 @ivar start_timestamp: timestamp for the start of the execution
93 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94 @ivar stop_timestamp: timestamp for the end of the execution
97 __slots__ = ["input", "status", "result", "log",
98 "start_timestamp", "exec_timestamp", "end_timestamp",
101 def __init__(self, op):
102 """Constructor for the _QuededOpCode.
104 @type op: L{opcodes.OpCode}
105 @param op: the opcode we encapsulate
109 self.status = constants.OP_STATUS_QUEUED
112 self.start_timestamp = None
113 self.exec_timestamp = None
114 self.end_timestamp = None
117 def Restore(cls, state):
118 """Restore the _QueuedOpCode from the serialized form.
121 @param state: the serialized state
122 @rtype: _QueuedOpCode
123 @return: a new _QueuedOpCode instance
126 obj = _QueuedOpCode.__new__(cls)
127 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
128 obj.status = state["status"]
129 obj.result = state["result"]
130 obj.log = state["log"]
131 obj.start_timestamp = state.get("start_timestamp", None)
132 obj.exec_timestamp = state.get("exec_timestamp", None)
133 obj.end_timestamp = state.get("end_timestamp", None)
137 """Serializes this _QueuedOpCode.
140 @return: the dictionary holding the serialized state
144 "input": self.input.__getstate__(),
145 "status": self.status,
146 "result": self.result,
148 "start_timestamp": self.start_timestamp,
149 "exec_timestamp": self.exec_timestamp,
150 "end_timestamp": self.end_timestamp,
154 class _QueuedJob(object):
155 """In-memory job representation.
157 This is what we use to track the user-submitted jobs. Locking must
158 be taken care of by users of this class.
160 @type queue: L{JobQueue}
161 @ivar queue: the parent queue
164 @ivar ops: the list of _QueuedOpCode that constitute the job
165 @type log_serial: int
166 @ivar log_serial: holds the index for the next log entry
167 @ivar received_timestamp: the timestamp for when the job was received
168 @ivar start_timestmap: the timestamp for start of execution
169 @ivar end_timestamp: the timestamp for end of execution
172 # pylint: disable-msg=W0212
173 __slots__ = ["queue", "id", "ops", "log_serial",
174 "received_timestamp", "start_timestamp", "end_timestamp",
177 def __init__(self, queue, job_id, ops):
178 """Constructor for the _QueuedJob.
180 @type queue: L{JobQueue}
181 @param queue: our parent queue
183 @param job_id: our job id
185 @param ops: the list of opcodes we hold, which will be encapsulated
190 raise errors.GenericError("A job needs at least one opcode")
194 self.ops = [_QueuedOpCode(op) for op in ops]
196 self.received_timestamp = TimeStampNow()
197 self.start_timestamp = None
198 self.end_timestamp = None
201 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
203 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
205 return "<%s at %#x>" % (" ".join(status), id(self))
208 def Restore(cls, queue, state):
209 """Restore a _QueuedJob from serialized state:
211 @type queue: L{JobQueue}
212 @param queue: to which queue the restored job belongs
214 @param state: the serialized state
216 @return: the restored _JobQueue instance
219 obj = _QueuedJob.__new__(cls)
222 obj.received_timestamp = state.get("received_timestamp", None)
223 obj.start_timestamp = state.get("start_timestamp", None)
224 obj.end_timestamp = state.get("end_timestamp", None)
228 for op_state in state["ops"]:
229 op = _QueuedOpCode.Restore(op_state)
230 for log_entry in op.log:
231 obj.log_serial = max(obj.log_serial, log_entry[0])
237 """Serialize the _JobQueue instance.
240 @return: the serialized state
245 "ops": [op.Serialize() for op in self.ops],
246 "start_timestamp": self.start_timestamp,
247 "end_timestamp": self.end_timestamp,
248 "received_timestamp": self.received_timestamp,
251 def CalcStatus(self):
252 """Compute the status of this job.
254 This function iterates over all the _QueuedOpCodes in the job and
255 based on their status, computes the job status.
258 - if we find a cancelled, or finished with error, the job
259 status will be the same
260 - otherwise, the last opcode with the status one of:
265 will determine the job status
267 - otherwise, it means either all opcodes are queued, or success,
268 and the job status will be the same
270 @return: the job status
273 status = constants.JOB_STATUS_QUEUED
277 if op.status == constants.OP_STATUS_SUCCESS:
282 if op.status == constants.OP_STATUS_QUEUED:
284 elif op.status == constants.OP_STATUS_WAITLOCK:
285 status = constants.JOB_STATUS_WAITLOCK
286 elif op.status == constants.OP_STATUS_RUNNING:
287 status = constants.JOB_STATUS_RUNNING
288 elif op.status == constants.OP_STATUS_CANCELING:
289 status = constants.JOB_STATUS_CANCELING
291 elif op.status == constants.OP_STATUS_ERROR:
292 status = constants.JOB_STATUS_ERROR
293 # The whole job fails if one opcode failed
295 elif op.status == constants.OP_STATUS_CANCELED:
296 status = constants.OP_STATUS_CANCELED
300 status = constants.JOB_STATUS_SUCCESS
304 def GetLogEntries(self, newer_than):
305 """Selectively returns the log entries.
307 @type newer_than: None or int
308 @param newer_than: if this is None, return all log entries,
309 otherwise return only the log entries with serial higher
312 @return: the list of the log entries selected
315 if newer_than is None:
322 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
326 def GetInfo(self, fields):
327 """Returns information about a job.
330 @param fields: names of fields to return
332 @return: list with one element for each field
333 @raise errors.OpExecError: when an invalid field
341 elif fname == "status":
342 row.append(self.CalcStatus())
344 row.append([op.input.__getstate__() for op in self.ops])
345 elif fname == "opresult":
346 row.append([op.result for op in self.ops])
347 elif fname == "opstatus":
348 row.append([op.status for op in self.ops])
349 elif fname == "oplog":
350 row.append([op.log for op in self.ops])
351 elif fname == "opstart":
352 row.append([op.start_timestamp for op in self.ops])
353 elif fname == "opexec":
354 row.append([op.exec_timestamp for op in self.ops])
355 elif fname == "opend":
356 row.append([op.end_timestamp for op in self.ops])
357 elif fname == "received_ts":
358 row.append(self.received_timestamp)
359 elif fname == "start_ts":
360 row.append(self.start_timestamp)
361 elif fname == "end_ts":
362 row.append(self.end_timestamp)
363 elif fname == "summary":
364 row.append([op.input.Summary() for op in self.ops])
366 raise errors.OpExecError("Invalid self query field '%s'" % fname)
369 def MarkUnfinishedOps(self, status, result):
370 """Mark unfinished opcodes with a given status and result.
372 This is an utility function for marking all running or waiting to
373 be run opcodes with a given status. Opcodes which are already
374 finalised are not changed.
376 @param status: a given opcode status
377 @param result: the opcode result
382 if op.status in constants.OPS_FINALIZED:
383 assert not_marked, "Finalized opcodes found after non-finalized ones"
390 class _OpExecCallbacks(mcpu.OpExecCbBase):
391 def __init__(self, queue, job, op):
392 """Initializes this class.
394 @type queue: L{JobQueue}
395 @param queue: Job queue
396 @type job: L{_QueuedJob}
397 @param job: Job object
398 @type op: L{_QueuedOpCode}
402 assert queue, "Queue is missing"
403 assert job, "Job is missing"
404 assert op, "Opcode is missing"
410 def _CheckCancel(self):
411 """Raises an exception to cancel the job if asked to.
414 # Cancel here if we were asked to
415 if self._op.status == constants.OP_STATUS_CANCELING:
416 logging.debug("Canceling opcode")
419 @locking.ssynchronized(_QUEUE, shared=1)
420 def NotifyStart(self):
421 """Mark the opcode as running, not lock-waiting.
423 This is called from the mcpu code as a notifier function, when the LU is
424 finally about to start the Exec() method. Of course, to have end-user
425 visible results, the opcode must be initially (before calling into
426 Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
429 assert self._op in self._job.ops
430 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
431 constants.OP_STATUS_CANCELING)
433 # Cancel here if we were asked to
436 logging.debug("Opcode is now running")
438 self._op.status = constants.OP_STATUS_RUNNING
439 self._op.exec_timestamp = TimeStampNow()
441 # And finally replicate the job status
442 self._queue.UpdateJobUnlocked(self._job)
444 @locking.ssynchronized(_QUEUE, shared=1)
445 def _AppendFeedback(self, timestamp, log_type, log_msg):
446 """Internal feedback append function, with locks
449 self._job.log_serial += 1
450 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
451 self._queue.UpdateJobUnlocked(self._job, replicate=False)
453 def Feedback(self, *args):
454 """Append a log entry.
460 log_type = constants.ELOG_MESSAGE
463 (log_type, log_msg) = args
465 # The time is split to make serialization easier and not lose
467 timestamp = utils.SplitTime(time.time())
468 self._AppendFeedback(timestamp, log_type, log_msg)
470 def ReportLocks(self, msg):
471 """Write locking information to the job.
473 Called whenever the LU processor is waiting for a lock or has acquired one.
476 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
477 constants.OP_STATUS_CANCELING)
479 # Cancel here if we were asked to
483 class _JobChangesChecker(object):
484 def __init__(self, fields, prev_job_info, prev_log_serial):
485 """Initializes this class.
487 @type fields: list of strings
488 @param fields: Fields requested by LUXI client
489 @type prev_job_info: string
490 @param prev_job_info: previous job info, as passed by the LUXI client
491 @type prev_log_serial: string
492 @param prev_log_serial: previous job serial, as passed by the LUXI client
495 self._fields = fields
496 self._prev_job_info = prev_job_info
497 self._prev_log_serial = prev_log_serial
499 def __call__(self, job):
500 """Checks whether job has changed.
502 @type job: L{_QueuedJob}
503 @param job: Job object
506 status = job.CalcStatus()
507 job_info = job.GetInfo(self._fields)
508 log_entries = job.GetLogEntries(self._prev_log_serial)
510 # Serializing and deserializing data can cause type changes (e.g. from
511 # tuple to list) or precision loss. We're doing it here so that we get
512 # the same modifications as the data received from the client. Without
513 # this, the comparison afterwards might fail without the data being
514 # significantly different.
515 # TODO: we just deserialized from disk, investigate how to make sure that
516 # the job info and log entries are compatible to avoid this further step.
517 # TODO: Doing something like in testutils.py:UnifyValueType might be more
518 # efficient, though floats will be tricky
519 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
520 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
522 # Don't even try to wait if the job is no longer running, there will be
524 if (status not in (constants.JOB_STATUS_QUEUED,
525 constants.JOB_STATUS_RUNNING,
526 constants.JOB_STATUS_WAITLOCK) or
527 job_info != self._prev_job_info or
528 (log_entries and self._prev_log_serial != log_entries[0][0])):
529 logging.debug("Job %s changed", job.id)
530 return (job_info, log_entries)
535 class _JobFileChangesWaiter(object):
536 def __init__(self, filename):
537 """Initializes this class.
539 @type filename: string
540 @param filename: Path to job file
541 @raises errors.InotifyError: if the notifier cannot be setup
544 self._wm = pyinotify.WatchManager()
545 self._inotify_handler = \
546 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
548 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
550 self._inotify_handler.enable()
552 # pyinotify doesn't close file descriptors automatically
553 self._notifier.stop()
556 def _OnInotify(self, notifier_enabled):
557 """Callback for inotify.
560 if not notifier_enabled:
561 self._inotify_handler.enable()
563 def Wait(self, timeout):
564 """Waits for the job file to change.
567 @param timeout: Timeout in seconds
568 @return: Whether there have been events
572 have_events = self._notifier.check_events(timeout * 1000)
574 self._notifier.read_events()
575 self._notifier.process_events()
579 """Closes underlying notifier and its file descriptor.
582 self._notifier.stop()
585 class _JobChangesWaiter(object):
586 def __init__(self, filename):
587 """Initializes this class.
589 @type filename: string
590 @param filename: Path to job file
593 self._filewaiter = None
594 self._filename = filename
596 def Wait(self, timeout):
597 """Waits for a job to change.
600 @param timeout: Timeout in seconds
601 @return: Whether there have been events
605 return self._filewaiter.Wait(timeout)
607 # Lazy setup: Avoid inotify setup cost when job file has already changed.
608 # If this point is reached, return immediately and let caller check the job
609 # file again in case there were changes since the last check. This avoids a
611 self._filewaiter = _JobFileChangesWaiter(self._filename)
616 """Closes underlying waiter.
620 self._filewaiter.Close()
623 class _WaitForJobChangesHelper(object):
624 """Helper class using inotify to wait for changes in a job file.
626 This class takes a previous job status and serial, and alerts the client when
627 the current job status has changed.
631 def _CheckForChanges(job_load_fn, check_fn):
634 raise errors.JobLost()
636 result = check_fn(job)
638 raise utils.RetryAgain()
642 def __call__(self, filename, job_load_fn,
643 fields, prev_job_info, prev_log_serial, timeout):
644 """Waits for changes on a job.
646 @type filename: string
647 @param filename: File on which to wait for changes
648 @type job_load_fn: callable
649 @param job_load_fn: Function to load job
650 @type fields: list of strings
651 @param fields: Which fields to check for changes
652 @type prev_job_info: list or None
653 @param prev_job_info: Last job information returned
654 @type prev_log_serial: int
655 @param prev_log_serial: Last job message serial number
657 @param timeout: maximum time to wait in seconds
661 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
662 waiter = _JobChangesWaiter(filename)
664 return utils.Retry(compat.partial(self._CheckForChanges,
665 job_load_fn, check_fn),
666 utils.RETRY_REMAINING_TIME, timeout,
670 except (errors.InotifyError, errors.JobLost):
672 except utils.RetryTimeout:
673 return constants.JOB_NOTCHANGED
676 def _EncodeOpError(err):
677 """Encodes an error which occurred while processing an opcode.
680 if isinstance(err, errors.GenericError):
683 to_encode = errors.OpExecError(str(err))
685 return errors.EncodeException(to_encode)
688 class _JobQueueWorker(workerpool.BaseWorker):
689 """The actual job workers.
692 def RunTask(self, job): # pylint: disable-msg=W0221
695 This functions processes a job. It is closely tied to the _QueuedJob and
696 _QueuedOpCode classes.
698 @type job: L{_QueuedJob}
699 @param job: the job to be processed
702 self.SetTaskName("Job%s" % job.id)
704 logging.info("Processing job %s", job.id)
705 proc = mcpu.Processor(self.pool.queue.context, job.id)
710 for idx, op in enumerate(job.ops):
711 op_summary = op.input.Summary()
712 if op.status == constants.OP_STATUS_SUCCESS:
713 # this is a job that was partially completed before master
714 # daemon shutdown, so it can be expected that some opcodes
715 # are already completed successfully (if any did error
716 # out, then the whole job should have been aborted and not
717 # resubmitted for processing)
718 logging.info("Op %s/%s: opcode %s already processed, skipping",
719 idx + 1, count, op_summary)
722 logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
725 queue.acquire(shared=1)
727 if op.status == constants.OP_STATUS_CANCELED:
728 logging.debug("Canceling opcode")
730 assert op.status == constants.OP_STATUS_QUEUED
731 logging.debug("Opcode %s/%s waiting for locks",
733 op.status = constants.OP_STATUS_WAITLOCK
735 op.start_timestamp = TimeStampNow()
736 if idx == 0: # first opcode
737 job.start_timestamp = op.start_timestamp
738 queue.UpdateJobUnlocked(job)
740 input_opcode = op.input
744 # Make sure not to hold queue lock while calling ExecOpCode
745 result = proc.ExecOpCode(input_opcode,
746 _OpExecCallbacks(queue, job, op))
748 queue.acquire(shared=1)
750 logging.debug("Opcode %s/%s succeeded", idx + 1, count)
751 op.status = constants.OP_STATUS_SUCCESS
753 op.end_timestamp = TimeStampNow()
755 job.end_timestamp = TimeStampNow()
758 assert compat.all(i.status == constants.OP_STATUS_SUCCESS
761 queue.UpdateJobUnlocked(job)
765 logging.info("Op %s/%s: Successfully finished opcode %s",
766 idx + 1, count, op_summary)
768 # Will be handled further up
770 except Exception, err:
771 queue.acquire(shared=1)
774 logging.debug("Opcode %s/%s failed", idx + 1, count)
775 op.status = constants.OP_STATUS_ERROR
776 op.result = _EncodeOpError(err)
777 op.end_timestamp = TimeStampNow()
778 logging.info("Op %s/%s: Error in opcode %s: %s",
779 idx + 1, count, op_summary, err)
781 to_encode = errors.OpExecError("Preceding opcode failed")
782 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
783 _EncodeOpError(to_encode))
786 assert compat.all(i.status == constants.OP_STATUS_SUCCESS
787 for i in job.ops[:idx])
788 assert compat.all(i.status == constants.OP_STATUS_ERROR and
789 errors.GetEncodedError(i.result)
790 for i in job.ops[idx:])
792 job.end_timestamp = TimeStampNow()
793 queue.UpdateJobUnlocked(job)
799 queue.acquire(shared=1)
801 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
802 "Job canceled by request")
803 job.end_timestamp = TimeStampNow()
804 queue.UpdateJobUnlocked(job)
807 except errors.GenericError, err:
808 logging.exception("Ganeti exception")
810 logging.exception("Unhandled exception")
812 status = job.CalcStatus()
813 logging.info("Finished job %s, status = %s", job.id, status)
816 class _JobQueueWorkerPool(workerpool.WorkerPool):
817 """Simple class implementing a job-processing workerpool.
820 def __init__(self, queue):
821 super(_JobQueueWorkerPool, self).__init__("JobQueue",
827 def _RequireOpenQueue(fn):
828 """Decorator for "public" functions.
830 This function should be used for all 'public' functions. That is,
831 functions usually called from other classes. Note that this should
832 be applied only to methods (not plain functions), since it expects
833 that the decorated function is called with a first argument that has
834 a '_queue_filelock' argument.
836 @warning: Use this decorator only after locking.ssynchronized
839 @locking.ssynchronized(_LOCK)
845 def wrapper(self, *args, **kwargs):
846 # pylint: disable-msg=W0212
847 assert self._queue_filelock is not None, "Queue should be open"
848 return fn(self, *args, **kwargs)
852 class JobQueue(object):
853 """Queue used to manage the jobs.
855 @cvar _RE_JOB_FILE: regex matching the valid job file names
858 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
860 def __init__(self, context):
861 """Constructor for JobQueue.
863 The constructor will initialize the job queue object and then
864 start loading the current jobs from disk, either for starting them
865 (if they were queue) or for aborting them (if they were already
868 @type context: GanetiContext
869 @param context: the context object for access to the configuration
870 data and other ganeti objects
873 self.context = context
874 self._memcache = weakref.WeakValueDictionary()
875 self._my_hostname = netutils.HostInfo().name
877 # The Big JobQueue lock. If a code block or method acquires it in shared
878 # mode safe it must guarantee concurrency with all the code acquiring it in
879 # shared mode, including itself. In order not to acquire it at all
880 # concurrency must be guaranteed with all code acquiring it in shared mode
881 # and all code acquiring it exclusively.
882 self._lock = locking.SharedLock("JobQueue")
884 self.acquire = self._lock.acquire
885 self.release = self._lock.release
887 # Initialize the queue, and acquire the filelock.
888 # This ensures no other process is working on the job queue.
889 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
892 self._last_serial = jstore.ReadSerial()
893 assert self._last_serial is not None, ("Serial file was modified between"
894 " check in jstore and here")
896 # Get initial list of nodes
897 self._nodes = dict((n.name, n.primary_ip)
898 for n in self.context.cfg.GetAllNodesInfo().values()
899 if n.master_candidate)
902 self._nodes.pop(self._my_hostname, None)
904 # TODO: Check consistency across nodes
907 self._UpdateQueueSizeUnlocked()
908 self._drained = self._IsQueueMarkedDrain()
911 self._wpool = _JobQueueWorkerPool(self)
913 # We need to lock here because WorkerPool.AddTask() may start a job while
914 # we're still doing our work.
917 logging.info("Inspecting job queue")
919 all_job_ids = self._GetJobIDsUnlocked()
920 jobs_count = len(all_job_ids)
921 lastinfo = time.time()
922 for idx, job_id in enumerate(all_job_ids):
923 # Give an update every 1000 jobs or 10 seconds
924 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
925 idx == (jobs_count - 1)):
926 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
927 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
928 lastinfo = time.time()
930 job = self._LoadJobUnlocked(job_id)
932 # a failure in loading the job can cause 'None' to be returned
936 status = job.CalcStatus()
938 if status in (constants.JOB_STATUS_QUEUED, ):
939 self._wpool.AddTask((job, ))
941 elif status in (constants.JOB_STATUS_RUNNING,
942 constants.JOB_STATUS_WAITLOCK,
943 constants.JOB_STATUS_CANCELING):
944 logging.warning("Unfinished job %s found: %s", job.id, job)
945 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
946 "Unclean master daemon shutdown")
947 self.UpdateJobUnlocked(job)
949 logging.info("Job queue inspection finished")
953 self._wpool.TerminateWorkers()
956 @locking.ssynchronized(_LOCK)
958 def AddNode(self, node):
959 """Register a new node with the queue.
961 @type node: L{objects.Node}
962 @param node: the node object to be added
965 node_name = node.name
966 assert node_name != self._my_hostname
968 # Clean queue directory on added node
969 result = rpc.RpcRunner.call_jobqueue_purge(node_name)
970 msg = result.fail_msg
972 logging.warning("Cannot cleanup queue directory on node %s: %s",
975 if not node.master_candidate:
976 # remove if existing, ignoring errors
977 self._nodes.pop(node_name, None)
978 # and skip the replication of the job ids
981 # Upload the whole queue excluding archived jobs
982 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
984 # Upload current serial file
985 files.append(constants.JOB_QUEUE_SERIAL_FILE)
987 for file_name in files:
989 content = utils.ReadFile(file_name)
991 result = rpc.RpcRunner.call_jobqueue_update([node_name],
994 msg = result[node_name].fail_msg
996 logging.error("Failed to upload file %s to node %s: %s",
997 file_name, node_name, msg)
999 self._nodes[node_name] = node.primary_ip
1001 @locking.ssynchronized(_LOCK)
1003 def RemoveNode(self, node_name):
1004 """Callback called when removing nodes from the cluster.
1006 @type node_name: str
1007 @param node_name: the name of the node to remove
1010 self._nodes.pop(node_name, None)
1013 def _CheckRpcResult(result, nodes, failmsg):
1014 """Verifies the status of an RPC call.
1016 Since we aim to keep consistency should this node (the current
1017 master) fail, we will log errors if our rpc fail, and especially
1018 log the case when more than half of the nodes fails.
1020 @param result: the data as returned from the rpc call
1022 @param nodes: the list of nodes we made the call to
1024 @param failmsg: the identifier to be used for logging
1031 msg = result[node].fail_msg
1034 logging.error("RPC call %s (%s) failed on node %s: %s",
1035 result[node].call, failmsg, node, msg)
1037 success.append(node)
1039 # +1 for the master node
1040 if (len(success) + 1) < len(failed):
1041 # TODO: Handle failing nodes
1042 logging.error("More than half of the nodes failed")
1044 def _GetNodeIp(self):
1045 """Helper for returning the node name/ip list.
1047 @rtype: (list, list)
1048 @return: a tuple of two lists, the first one with the node
1049 names and the second one with the node addresses
1052 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1053 name_list = self._nodes.keys()
1054 addr_list = [self._nodes[name] for name in name_list]
1055 return name_list, addr_list
1057 def _UpdateJobQueueFile(self, file_name, data, replicate):
1058 """Writes a file locally and then replicates it to all nodes.
1060 This function will replace the contents of a file on the local
1061 node and then replicate it to all the other nodes we have.
1063 @type file_name: str
1064 @param file_name: the path of the file to be replicated
1066 @param data: the new contents of the file
1067 @type replicate: boolean
1068 @param replicate: whether to spread the changes to the remote nodes
1071 utils.WriteFile(file_name, data=data)
1074 names, addrs = self._GetNodeIp()
1075 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1076 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1078 def _RenameFilesUnlocked(self, rename):
1079 """Renames a file locally and then replicate the change.
1081 This function will rename a file in the local queue directory
1082 and then replicate this rename to all the other nodes we have.
1084 @type rename: list of (old, new)
1085 @param rename: List containing tuples mapping old to new names
1088 # Rename them locally
1089 for old, new in rename:
1090 utils.RenameFile(old, new, mkdir=True)
1092 # ... and on all nodes
1093 names, addrs = self._GetNodeIp()
1094 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1095 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1098 def _FormatJobID(job_id):
1099 """Convert a job ID to string format.
1101 Currently this just does C{str(job_id)} after performing some
1102 checks, but if we want to change the job id format this will
1103 abstract this change.
1105 @type job_id: int or long
1106 @param job_id: the numeric job id
1108 @return: the formatted job id
1111 if not isinstance(job_id, (int, long)):
1112 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1114 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1119 def _GetArchiveDirectory(cls, job_id):
1120 """Returns the archive directory for a job.
1123 @param job_id: Job identifier
1125 @return: Directory name
1128 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1130 def _NewSerialsUnlocked(self, count):
1131 """Generates a new job identifier.
1133 Job identifiers are unique during the lifetime of a cluster.
1135 @type count: integer
1136 @param count: how many serials to return
1138 @return: a string representing the job identifier.
1143 serial = self._last_serial + count
1146 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1147 "%s\n" % serial, True)
1149 result = [self._FormatJobID(v)
1150 for v in range(self._last_serial, serial + 1)]
1151 # Keep it only if we were able to write the file
1152 self._last_serial = serial
1157 def _GetJobPath(job_id):
1158 """Returns the job file for a given job id.
1161 @param job_id: the job identifier
1163 @return: the path to the job file
1166 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1169 def _GetArchivedJobPath(cls, job_id):
1170 """Returns the archived job file for a give job id.
1173 @param job_id: the job identifier
1175 @return: the path to the archived job file
1178 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1179 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1181 def _GetJobIDsUnlocked(self, sort=True):
1182 """Return all known job IDs.
1184 The method only looks at disk because it's a requirement that all
1185 jobs are present on disk (so in the _memcache we don't have any
1189 @param sort: perform sorting on the returned job ids
1191 @return: the list of job IDs
1195 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1196 m = self._RE_JOB_FILE.match(filename)
1198 jlist.append(m.group(1))
1200 jlist = utils.NiceSort(jlist)
1203 def _LoadJobUnlocked(self, job_id):
1204 """Loads a job from the disk or memory.
1206 Given a job id, this will return the cached job object if
1207 existing, or try to load the job from the disk. If loading from
1208 disk, it will also add the job to the cache.
1210 @param job_id: the job id
1211 @rtype: L{_QueuedJob} or None
1212 @return: either None or the job object
1215 job = self._memcache.get(job_id, None)
1217 logging.debug("Found job %s in memcache", job_id)
1221 job = self._LoadJobFromDisk(job_id)
1224 except errors.JobFileCorrupted:
1225 old_path = self._GetJobPath(job_id)
1226 new_path = self._GetArchivedJobPath(job_id)
1227 if old_path == new_path:
1228 # job already archived (future case)
1229 logging.exception("Can't parse job %s", job_id)
1232 logging.exception("Can't parse job %s, will archive.", job_id)
1233 self._RenameFilesUnlocked([(old_path, new_path)])
1236 self._memcache[job_id] = job
1237 logging.debug("Added job %s to the cache", job_id)
1240 def _LoadJobFromDisk(self, job_id):
1241 """Load the given job file from disk.
1243 Given a job file, read, load and restore it in a _QueuedJob format.
1245 @type job_id: string
1246 @param job_id: job identifier
1247 @rtype: L{_QueuedJob} or None
1248 @return: either None or the job object
1251 filepath = self._GetJobPath(job_id)
1252 logging.debug("Loading job from %s", filepath)
1254 raw_data = utils.ReadFile(filepath)
1255 except EnvironmentError, err:
1256 if err.errno in (errno.ENOENT, ):
1261 data = serializer.LoadJson(raw_data)
1262 job = _QueuedJob.Restore(self, data)
1263 except Exception, err: # pylint: disable-msg=W0703
1264 raise errors.JobFileCorrupted(err)
1268 def SafeLoadJobFromDisk(self, job_id):
1269 """Load the given job file from disk.
1271 Given a job file, read, load and restore it in a _QueuedJob format.
1272 In case of error reading the job, it gets returned as None, and the
1273 exception is logged.
1275 @type job_id: string
1276 @param job_id: job identifier
1277 @rtype: L{_QueuedJob} or None
1278 @return: either None or the job object
1282 return self._LoadJobFromDisk(job_id)
1283 except (errors.JobFileCorrupted, EnvironmentError):
1284 logging.exception("Can't load/parse job %s", job_id)
1288 def _IsQueueMarkedDrain():
1289 """Check if the queue is marked from drain.
1291 This currently uses the queue drain file, which makes it a
1292 per-node flag. In the future this can be moved to the config file.
1295 @return: True of the job queue is marked for draining
1298 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1300 def _UpdateQueueSizeUnlocked(self):
1301 """Update the queue size.
1304 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1306 @locking.ssynchronized(_LOCK)
1308 def SetDrainFlag(self, drain_flag):
1309 """Sets the drain flag for the queue.
1311 @type drain_flag: boolean
1312 @param drain_flag: Whether to set or unset the drain flag
1316 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1318 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1320 self._drained = drain_flag
1325 def _SubmitJobUnlocked(self, job_id, ops):
1326 """Create and store a new job.
1328 This enters the job into our job queue and also puts it on the new
1329 queue, in order for it to be picked up by the queue processors.
1331 @type job_id: job ID
1332 @param job_id: the job ID for the new job
1334 @param ops: The list of OpCodes that will become the new job.
1335 @rtype: L{_QueuedJob}
1336 @return: the job object to be queued
1337 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1338 @raise errors.JobQueueFull: if the job queue has too many jobs in it
1341 # Ok when sharing the big job queue lock, as the drain file is created when
1342 # the lock is exclusive.
1344 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1346 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1347 raise errors.JobQueueFull()
1349 job = _QueuedJob(self, job_id, ops)
1352 self.UpdateJobUnlocked(job)
1354 self._queue_size += 1
1356 logging.debug("Adding new job %s to the cache", job_id)
1357 self._memcache[job_id] = job
1361 @locking.ssynchronized(_LOCK)
1363 def SubmitJob(self, ops):
1364 """Create and store a new job.
1366 @see: L{_SubmitJobUnlocked}
1369 job_id = self._NewSerialsUnlocked(1)[0]
1370 self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
1373 @locking.ssynchronized(_LOCK)
1375 def SubmitManyJobs(self, jobs):
1376 """Create and store multiple jobs.
1378 @see: L{_SubmitJobUnlocked}
1383 all_job_ids = self._NewSerialsUnlocked(len(jobs))
1384 for job_id, ops in zip(all_job_ids, jobs):
1386 tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1389 except errors.GenericError, err:
1392 results.append((status, data))
1393 self._wpool.AddManyTasks(tasks)
1398 def UpdateJobUnlocked(self, job, replicate=True):
1399 """Update a job's on disk storage.
1401 After a job has been modified, this function needs to be called in
1402 order to write the changes to disk and replicate them to the other
1405 @type job: L{_QueuedJob}
1406 @param job: the changed job
1407 @type replicate: boolean
1408 @param replicate: whether to replicate the change to remote nodes
1411 filename = self._GetJobPath(job.id)
1412 data = serializer.DumpJson(job.Serialize(), indent=False)
1413 logging.debug("Writing job %s to %s", job.id, filename)
1414 self._UpdateJobQueueFile(filename, data, replicate)
1416 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1418 """Waits for changes in a job.
1420 @type job_id: string
1421 @param job_id: Job identifier
1422 @type fields: list of strings
1423 @param fields: Which fields to check for changes
1424 @type prev_job_info: list or None
1425 @param prev_job_info: Last job information returned
1426 @type prev_log_serial: int
1427 @param prev_log_serial: Last job message serial number
1428 @type timeout: float
1429 @param timeout: maximum time to wait in seconds
1430 @rtype: tuple (job info, log entries)
1431 @return: a tuple of the job information as required via
1432 the fields parameter, and the log entries as a list
1434 if the job has not changed and the timeout has expired,
1435 we instead return a special value,
1436 L{constants.JOB_NOTCHANGED}, which should be interpreted
1437 as such by the clients
1440 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1442 helper = _WaitForJobChangesHelper()
1444 return helper(self._GetJobPath(job_id), load_fn,
1445 fields, prev_job_info, prev_log_serial, timeout)
1447 @locking.ssynchronized(_LOCK)
1449 def CancelJob(self, job_id):
1452 This will only succeed if the job has not started yet.
1454 @type job_id: string
1455 @param job_id: job ID of job to be cancelled.
1458 logging.info("Cancelling job %s", job_id)
1460 job = self._LoadJobUnlocked(job_id)
1462 logging.debug("Job %s not found", job_id)
1463 return (False, "Job %s not found" % job_id)
1465 job_status = job.CalcStatus()
1467 if job_status not in (constants.JOB_STATUS_QUEUED,
1468 constants.JOB_STATUS_WAITLOCK):
1469 logging.debug("Job %s is no longer waiting in the queue", job.id)
1470 return (False, "Job %s is no longer waiting in the queue" % job.id)
1472 if job_status == constants.JOB_STATUS_QUEUED:
1473 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1474 "Job canceled by request")
1475 msg = "Job %s canceled" % job.id
1477 elif job_status == constants.JOB_STATUS_WAITLOCK:
1478 # The worker will notice the new status and cancel the job
1479 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1480 msg = "Job %s will be canceled" % job.id
1482 self.UpdateJobUnlocked(job)
1487 def _ArchiveJobsUnlocked(self, jobs):
1490 @type jobs: list of L{_QueuedJob}
1491 @param jobs: Job objects
1493 @return: Number of archived jobs
1499 if job.CalcStatus() not in constants.JOBS_FINALIZED:
1500 logging.debug("Job %s is not yet done", job.id)
1503 archive_jobs.append(job)
1505 old = self._GetJobPath(job.id)
1506 new = self._GetArchivedJobPath(job.id)
1507 rename_files.append((old, new))
1509 # TODO: What if 1..n files fail to rename?
1510 self._RenameFilesUnlocked(rename_files)
1512 logging.debug("Successfully archived job(s) %s",
1513 utils.CommaJoin(job.id for job in archive_jobs))
1515 # Since we haven't quite checked, above, if we succeeded or failed renaming
1516 # the files, we update the cached queue size from the filesystem. When we
1517 # get around to fix the TODO: above, we can use the number of actually
1518 # archived jobs to fix this.
1519 self._UpdateQueueSizeUnlocked()
1520 return len(archive_jobs)
1522 @locking.ssynchronized(_LOCK)
1524 def ArchiveJob(self, job_id):
1527 This is just a wrapper over L{_ArchiveJobsUnlocked}.
1529 @type job_id: string
1530 @param job_id: Job ID of job to be archived.
1532 @return: Whether job was archived
1535 logging.info("Archiving job %s", job_id)
1537 job = self._LoadJobUnlocked(job_id)
1539 logging.debug("Job %s not found", job_id)
1542 return self._ArchiveJobsUnlocked([job]) == 1
1544 @locking.ssynchronized(_LOCK)
1546 def AutoArchiveJobs(self, age, timeout):
1547 """Archives all jobs based on age.
1549 The method will archive all jobs which are older than the age
1550 parameter. For jobs that don't have an end timestamp, the start
1551 timestamp will be considered. The special '-1' age will cause
1552 archival of all jobs (that are not running or queued).
1555 @param age: the minimum age in seconds
1558 logging.info("Archiving jobs with age more than %s seconds", age)
1561 end_time = now + timeout
1565 all_job_ids = self._GetJobIDsUnlocked()
1567 for idx, job_id in enumerate(all_job_ids):
1568 last_touched = idx + 1
1570 # Not optimal because jobs could be pending
1571 # TODO: Measure average duration for job archival and take number of
1572 # pending jobs into account.
1573 if time.time() > end_time:
1576 # Returns None if the job failed to load
1577 job = self._LoadJobUnlocked(job_id)
1579 if job.end_timestamp is None:
1580 if job.start_timestamp is None:
1581 job_age = job.received_timestamp
1583 job_age = job.start_timestamp
1585 job_age = job.end_timestamp
1587 if age == -1 or now - job_age[0] > age:
1590 # Archive 10 jobs at a time
1591 if len(pending) >= 10:
1592 archived_count += self._ArchiveJobsUnlocked(pending)
1596 archived_count += self._ArchiveJobsUnlocked(pending)
1598 return (archived_count, len(all_job_ids) - last_touched)
1600 def QueryJobs(self, job_ids, fields):
1601 """Returns a list of jobs in queue.
1604 @param job_ids: sequence of job identifiers or None for all
1606 @param fields: names of fields to return
1608 @return: list one element per job, each element being list with
1609 the requested fields
1615 # Since files are added to/removed from the queue atomically, there's no
1616 # risk of getting the job ids in an inconsistent state.
1617 job_ids = self._GetJobIDsUnlocked()
1620 for job_id in job_ids:
1621 job = self.SafeLoadJobFromDisk(job_id)
1623 jobs.append(job.GetInfo(fields))
1629 @locking.ssynchronized(_LOCK)
1632 """Stops the job queue.
1634 This shutdowns all the worker threads an closes the queue.
1637 self._wpool.TerminateWorkers()
1639 self._queue_filelock.Close()
1640 self._queue_filelock = None