4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 # pylint: disable-msg=E0611
41 from pyinotify import pyinotify
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
62 JOBS_PER_ARCHIVE_DIRECTORY = 10000
64 # member lock names to be passed to @ssynchronized decorator
69 class CancelJob(Exception):
70 """Special exception to cancel a job.
76 """Returns the current timestamp.
79 @return: the current time in the (seconds, microseconds) format
82 return utils.SplitTime(time.time())
85 class _QueuedOpCode(object):
86 """Encapsulates an opcode object.
88 @ivar log: holds the execution log and consists of tuples
89 of the form C{(log_serial, timestamp, level, message)}
90 @ivar input: the OpCode we encapsulate
91 @ivar status: the current status
92 @ivar result: the result of the LU execution
93 @ivar start_timestamp: timestamp for the start of the execution
94 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
95 @ivar stop_timestamp: timestamp for the end of the execution
98 __slots__ = ["input", "status", "result", "log", "priority",
99 "start_timestamp", "exec_timestamp", "end_timestamp",
102 def __init__(self, op):
103 """Constructor for the _QuededOpCode.
105 @type op: L{opcodes.OpCode}
106 @param op: the opcode we encapsulate
110 self.status = constants.OP_STATUS_QUEUED
113 self.start_timestamp = None
114 self.exec_timestamp = None
115 self.end_timestamp = None
117 # Get initial priority (it might change during the lifetime of this opcode)
118 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
121 def Restore(cls, state):
122 """Restore the _QueuedOpCode from the serialized form.
125 @param state: the serialized state
126 @rtype: _QueuedOpCode
127 @return: a new _QueuedOpCode instance
130 obj = _QueuedOpCode.__new__(cls)
131 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
132 obj.status = state["status"]
133 obj.result = state["result"]
134 obj.log = state["log"]
135 obj.start_timestamp = state.get("start_timestamp", None)
136 obj.exec_timestamp = state.get("exec_timestamp", None)
137 obj.end_timestamp = state.get("end_timestamp", None)
138 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
142 """Serializes this _QueuedOpCode.
145 @return: the dictionary holding the serialized state
149 "input": self.input.__getstate__(),
150 "status": self.status,
151 "result": self.result,
153 "start_timestamp": self.start_timestamp,
154 "exec_timestamp": self.exec_timestamp,
155 "end_timestamp": self.end_timestamp,
156 "priority": self.priority,
160 class _QueuedJob(object):
161 """In-memory job representation.
163 This is what we use to track the user-submitted jobs. Locking must
164 be taken care of by users of this class.
166 @type queue: L{JobQueue}
167 @ivar queue: the parent queue
170 @ivar ops: the list of _QueuedOpCode that constitute the job
171 @type log_serial: int
172 @ivar log_serial: holds the index for the next log entry
173 @ivar received_timestamp: the timestamp for when the job was received
174 @ivar start_timestmap: the timestamp for start of execution
175 @ivar end_timestamp: the timestamp for end of execution
178 # pylint: disable-msg=W0212
179 __slots__ = ["queue", "id", "ops", "log_serial",
180 "received_timestamp", "start_timestamp", "end_timestamp",
183 def __init__(self, queue, job_id, ops):
184 """Constructor for the _QueuedJob.
186 @type queue: L{JobQueue}
187 @param queue: our parent queue
189 @param job_id: our job id
191 @param ops: the list of opcodes we hold, which will be encapsulated
196 raise errors.GenericError("A job needs at least one opcode")
200 self.ops = [_QueuedOpCode(op) for op in ops]
202 self.received_timestamp = TimeStampNow()
203 self.start_timestamp = None
204 self.end_timestamp = None
207 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
209 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
211 return "<%s at %#x>" % (" ".join(status), id(self))
214 def Restore(cls, queue, state):
215 """Restore a _QueuedJob from serialized state:
217 @type queue: L{JobQueue}
218 @param queue: to which queue the restored job belongs
220 @param state: the serialized state
222 @return: the restored _JobQueue instance
225 obj = _QueuedJob.__new__(cls)
228 obj.received_timestamp = state.get("received_timestamp", None)
229 obj.start_timestamp = state.get("start_timestamp", None)
230 obj.end_timestamp = state.get("end_timestamp", None)
234 for op_state in state["ops"]:
235 op = _QueuedOpCode.Restore(op_state)
236 for log_entry in op.log:
237 obj.log_serial = max(obj.log_serial, log_entry[0])
243 """Serialize the _JobQueue instance.
246 @return: the serialized state
251 "ops": [op.Serialize() for op in self.ops],
252 "start_timestamp": self.start_timestamp,
253 "end_timestamp": self.end_timestamp,
254 "received_timestamp": self.received_timestamp,
257 def CalcStatus(self):
258 """Compute the status of this job.
260 This function iterates over all the _QueuedOpCodes in the job and
261 based on their status, computes the job status.
264 - if we find a cancelled, or finished with error, the job
265 status will be the same
266 - otherwise, the last opcode with the status one of:
271 will determine the job status
273 - otherwise, it means either all opcodes are queued, or success,
274 and the job status will be the same
276 @return: the job status
279 status = constants.JOB_STATUS_QUEUED
283 if op.status == constants.OP_STATUS_SUCCESS:
288 if op.status == constants.OP_STATUS_QUEUED:
290 elif op.status == constants.OP_STATUS_WAITLOCK:
291 status = constants.JOB_STATUS_WAITLOCK
292 elif op.status == constants.OP_STATUS_RUNNING:
293 status = constants.JOB_STATUS_RUNNING
294 elif op.status == constants.OP_STATUS_CANCELING:
295 status = constants.JOB_STATUS_CANCELING
297 elif op.status == constants.OP_STATUS_ERROR:
298 status = constants.JOB_STATUS_ERROR
299 # The whole job fails if one opcode failed
301 elif op.status == constants.OP_STATUS_CANCELED:
302 status = constants.OP_STATUS_CANCELED
306 status = constants.JOB_STATUS_SUCCESS
310 def CalcPriority(self):
311 """Gets the current priority for this job.
313 Only unfinished opcodes are considered. When all are done, the default
319 priorities = [op.priority for op in self.ops
320 if op.status not in constants.OPS_FINALIZED]
323 # All opcodes are done, assume default priority
324 return constants.OP_PRIO_DEFAULT
326 return min(priorities)
328 def GetLogEntries(self, newer_than):
329 """Selectively returns the log entries.
331 @type newer_than: None or int
332 @param newer_than: if this is None, return all log entries,
333 otherwise return only the log entries with serial higher
336 @return: the list of the log entries selected
339 if newer_than is None:
346 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
350 def GetInfo(self, fields):
351 """Returns information about a job.
354 @param fields: names of fields to return
356 @return: list with one element for each field
357 @raise errors.OpExecError: when an invalid field
365 elif fname == "status":
366 row.append(self.CalcStatus())
368 row.append([op.input.__getstate__() for op in self.ops])
369 elif fname == "opresult":
370 row.append([op.result for op in self.ops])
371 elif fname == "opstatus":
372 row.append([op.status for op in self.ops])
373 elif fname == "oplog":
374 row.append([op.log for op in self.ops])
375 elif fname == "opstart":
376 row.append([op.start_timestamp for op in self.ops])
377 elif fname == "opexec":
378 row.append([op.exec_timestamp for op in self.ops])
379 elif fname == "opend":
380 row.append([op.end_timestamp for op in self.ops])
381 elif fname == "received_ts":
382 row.append(self.received_timestamp)
383 elif fname == "start_ts":
384 row.append(self.start_timestamp)
385 elif fname == "end_ts":
386 row.append(self.end_timestamp)
387 elif fname == "summary":
388 row.append([op.input.Summary() for op in self.ops])
390 raise errors.OpExecError("Invalid self query field '%s'" % fname)
393 def MarkUnfinishedOps(self, status, result):
394 """Mark unfinished opcodes with a given status and result.
396 This is an utility function for marking all running or waiting to
397 be run opcodes with a given status. Opcodes which are already
398 finalised are not changed.
400 @param status: a given opcode status
401 @param result: the opcode result
406 if op.status in constants.OPS_FINALIZED:
407 assert not_marked, "Finalized opcodes found after non-finalized ones"
414 class _OpExecCallbacks(mcpu.OpExecCbBase):
415 def __init__(self, queue, job, op):
416 """Initializes this class.
418 @type queue: L{JobQueue}
419 @param queue: Job queue
420 @type job: L{_QueuedJob}
421 @param job: Job object
422 @type op: L{_QueuedOpCode}
426 assert queue, "Queue is missing"
427 assert job, "Job is missing"
428 assert op, "Opcode is missing"
434 def _CheckCancel(self):
435 """Raises an exception to cancel the job if asked to.
438 # Cancel here if we were asked to
439 if self._op.status == constants.OP_STATUS_CANCELING:
440 logging.debug("Canceling opcode")
443 @locking.ssynchronized(_QUEUE, shared=1)
444 def NotifyStart(self):
445 """Mark the opcode as running, not lock-waiting.
447 This is called from the mcpu code as a notifier function, when the LU is
448 finally about to start the Exec() method. Of course, to have end-user
449 visible results, the opcode must be initially (before calling into
450 Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
453 assert self._op in self._job.ops
454 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
455 constants.OP_STATUS_CANCELING)
457 # Cancel here if we were asked to
460 logging.debug("Opcode is now running")
462 self._op.status = constants.OP_STATUS_RUNNING
463 self._op.exec_timestamp = TimeStampNow()
465 # And finally replicate the job status
466 self._queue.UpdateJobUnlocked(self._job)
468 @locking.ssynchronized(_QUEUE, shared=1)
469 def _AppendFeedback(self, timestamp, log_type, log_msg):
470 """Internal feedback append function, with locks
473 self._job.log_serial += 1
474 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
475 self._queue.UpdateJobUnlocked(self._job, replicate=False)
477 def Feedback(self, *args):
478 """Append a log entry.
484 log_type = constants.ELOG_MESSAGE
487 (log_type, log_msg) = args
489 # The time is split to make serialization easier and not lose
491 timestamp = utils.SplitTime(time.time())
492 self._AppendFeedback(timestamp, log_type, log_msg)
494 def ReportLocks(self, msg):
495 """Write locking information to the job.
497 Called whenever the LU processor is waiting for a lock or has acquired one.
500 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
501 constants.OP_STATUS_CANCELING)
503 # Cancel here if we were asked to
507 class _JobChangesChecker(object):
508 def __init__(self, fields, prev_job_info, prev_log_serial):
509 """Initializes this class.
511 @type fields: list of strings
512 @param fields: Fields requested by LUXI client
513 @type prev_job_info: string
514 @param prev_job_info: previous job info, as passed by the LUXI client
515 @type prev_log_serial: string
516 @param prev_log_serial: previous job serial, as passed by the LUXI client
519 self._fields = fields
520 self._prev_job_info = prev_job_info
521 self._prev_log_serial = prev_log_serial
523 def __call__(self, job):
524 """Checks whether job has changed.
526 @type job: L{_QueuedJob}
527 @param job: Job object
530 status = job.CalcStatus()
531 job_info = job.GetInfo(self._fields)
532 log_entries = job.GetLogEntries(self._prev_log_serial)
534 # Serializing and deserializing data can cause type changes (e.g. from
535 # tuple to list) or precision loss. We're doing it here so that we get
536 # the same modifications as the data received from the client. Without
537 # this, the comparison afterwards might fail without the data being
538 # significantly different.
539 # TODO: we just deserialized from disk, investigate how to make sure that
540 # the job info and log entries are compatible to avoid this further step.
541 # TODO: Doing something like in testutils.py:UnifyValueType might be more
542 # efficient, though floats will be tricky
543 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
544 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
546 # Don't even try to wait if the job is no longer running, there will be
548 if (status not in (constants.JOB_STATUS_QUEUED,
549 constants.JOB_STATUS_RUNNING,
550 constants.JOB_STATUS_WAITLOCK) or
551 job_info != self._prev_job_info or
552 (log_entries and self._prev_log_serial != log_entries[0][0])):
553 logging.debug("Job %s changed", job.id)
554 return (job_info, log_entries)
559 class _JobFileChangesWaiter(object):
560 def __init__(self, filename):
561 """Initializes this class.
563 @type filename: string
564 @param filename: Path to job file
565 @raises errors.InotifyError: if the notifier cannot be setup
568 self._wm = pyinotify.WatchManager()
569 self._inotify_handler = \
570 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
572 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
574 self._inotify_handler.enable()
576 # pyinotify doesn't close file descriptors automatically
577 self._notifier.stop()
580 def _OnInotify(self, notifier_enabled):
581 """Callback for inotify.
584 if not notifier_enabled:
585 self._inotify_handler.enable()
587 def Wait(self, timeout):
588 """Waits for the job file to change.
591 @param timeout: Timeout in seconds
592 @return: Whether there have been events
596 have_events = self._notifier.check_events(timeout * 1000)
598 self._notifier.read_events()
599 self._notifier.process_events()
603 """Closes underlying notifier and its file descriptor.
606 self._notifier.stop()
609 class _JobChangesWaiter(object):
610 def __init__(self, filename):
611 """Initializes this class.
613 @type filename: string
614 @param filename: Path to job file
617 self._filewaiter = None
618 self._filename = filename
620 def Wait(self, timeout):
621 """Waits for a job to change.
624 @param timeout: Timeout in seconds
625 @return: Whether there have been events
629 return self._filewaiter.Wait(timeout)
631 # Lazy setup: Avoid inotify setup cost when job file has already changed.
632 # If this point is reached, return immediately and let caller check the job
633 # file again in case there were changes since the last check. This avoids a
635 self._filewaiter = _JobFileChangesWaiter(self._filename)
640 """Closes underlying waiter.
644 self._filewaiter.Close()
647 class _WaitForJobChangesHelper(object):
648 """Helper class using inotify to wait for changes in a job file.
650 This class takes a previous job status and serial, and alerts the client when
651 the current job status has changed.
655 def _CheckForChanges(job_load_fn, check_fn):
658 raise errors.JobLost()
660 result = check_fn(job)
662 raise utils.RetryAgain()
666 def __call__(self, filename, job_load_fn,
667 fields, prev_job_info, prev_log_serial, timeout):
668 """Waits for changes on a job.
670 @type filename: string
671 @param filename: File on which to wait for changes
672 @type job_load_fn: callable
673 @param job_load_fn: Function to load job
674 @type fields: list of strings
675 @param fields: Which fields to check for changes
676 @type prev_job_info: list or None
677 @param prev_job_info: Last job information returned
678 @type prev_log_serial: int
679 @param prev_log_serial: Last job message serial number
681 @param timeout: maximum time to wait in seconds
685 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
686 waiter = _JobChangesWaiter(filename)
688 return utils.Retry(compat.partial(self._CheckForChanges,
689 job_load_fn, check_fn),
690 utils.RETRY_REMAINING_TIME, timeout,
694 except (errors.InotifyError, errors.JobLost):
696 except utils.RetryTimeout:
697 return constants.JOB_NOTCHANGED
700 def _EncodeOpError(err):
701 """Encodes an error which occurred while processing an opcode.
704 if isinstance(err, errors.GenericError):
707 to_encode = errors.OpExecError(str(err))
709 return errors.EncodeException(to_encode)
712 class _JobQueueWorker(workerpool.BaseWorker):
713 """The actual job workers.
716 def RunTask(self, job): # pylint: disable-msg=W0221
719 This functions processes a job. It is closely tied to the _QueuedJob and
720 _QueuedOpCode classes.
722 @type job: L{_QueuedJob}
723 @param job: the job to be processed
726 self.SetTaskName("Job%s" % job.id)
728 logging.info("Processing job %s", job.id)
729 proc = mcpu.Processor(self.pool.queue.context, job.id)
734 for idx, op in enumerate(job.ops):
735 op_summary = op.input.Summary()
736 if op.status == constants.OP_STATUS_SUCCESS:
737 # this is a job that was partially completed before master
738 # daemon shutdown, so it can be expected that some opcodes
739 # are already completed successfully (if any did error
740 # out, then the whole job should have been aborted and not
741 # resubmitted for processing)
742 logging.info("Op %s/%s: opcode %s already processed, skipping",
743 idx + 1, count, op_summary)
746 logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
749 queue.acquire(shared=1)
751 if op.status == constants.OP_STATUS_CANCELED:
752 logging.debug("Canceling opcode")
754 assert op.status == constants.OP_STATUS_QUEUED
755 logging.debug("Opcode %s/%s waiting for locks",
757 op.status = constants.OP_STATUS_WAITLOCK
759 op.start_timestamp = TimeStampNow()
760 if idx == 0: # first opcode
761 job.start_timestamp = op.start_timestamp
762 queue.UpdateJobUnlocked(job)
764 input_opcode = op.input
768 # Make sure not to hold queue lock while calling ExecOpCode
769 result = proc.ExecOpCode(input_opcode,
770 _OpExecCallbacks(queue, job, op))
772 queue.acquire(shared=1)
774 logging.debug("Opcode %s/%s succeeded", idx + 1, count)
775 op.status = constants.OP_STATUS_SUCCESS
777 op.end_timestamp = TimeStampNow()
779 job.end_timestamp = TimeStampNow()
782 assert compat.all(i.status == constants.OP_STATUS_SUCCESS
785 queue.UpdateJobUnlocked(job)
789 logging.info("Op %s/%s: Successfully finished opcode %s",
790 idx + 1, count, op_summary)
792 # Will be handled further up
794 except Exception, err:
795 queue.acquire(shared=1)
798 logging.debug("Opcode %s/%s failed", idx + 1, count)
799 op.status = constants.OP_STATUS_ERROR
800 op.result = _EncodeOpError(err)
801 op.end_timestamp = TimeStampNow()
802 logging.info("Op %s/%s: Error in opcode %s: %s",
803 idx + 1, count, op_summary, err)
805 to_encode = errors.OpExecError("Preceding opcode failed")
806 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
807 _EncodeOpError(to_encode))
810 assert compat.all(i.status == constants.OP_STATUS_SUCCESS
811 for i in job.ops[:idx])
812 assert compat.all(i.status == constants.OP_STATUS_ERROR and
813 errors.GetEncodedError(i.result)
814 for i in job.ops[idx:])
816 job.end_timestamp = TimeStampNow()
817 queue.UpdateJobUnlocked(job)
823 queue.acquire(shared=1)
825 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
826 "Job canceled by request")
827 job.end_timestamp = TimeStampNow()
828 queue.UpdateJobUnlocked(job)
831 except errors.GenericError, err:
832 logging.exception("Ganeti exception")
834 logging.exception("Unhandled exception")
836 status = job.CalcStatus()
837 logging.info("Finished job %s, status = %s", job.id, status)
840 class _JobQueueWorkerPool(workerpool.WorkerPool):
841 """Simple class implementing a job-processing workerpool.
844 def __init__(self, queue):
845 super(_JobQueueWorkerPool, self).__init__("JobQueue",
851 def _RequireOpenQueue(fn):
852 """Decorator for "public" functions.
854 This function should be used for all 'public' functions. That is,
855 functions usually called from other classes. Note that this should
856 be applied only to methods (not plain functions), since it expects
857 that the decorated function is called with a first argument that has
858 a '_queue_filelock' argument.
860 @warning: Use this decorator only after locking.ssynchronized
863 @locking.ssynchronized(_LOCK)
869 def wrapper(self, *args, **kwargs):
870 # pylint: disable-msg=W0212
871 assert self._queue_filelock is not None, "Queue should be open"
872 return fn(self, *args, **kwargs)
876 class JobQueue(object):
877 """Queue used to manage the jobs.
879 @cvar _RE_JOB_FILE: regex matching the valid job file names
882 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
884 def __init__(self, context):
885 """Constructor for JobQueue.
887 The constructor will initialize the job queue object and then
888 start loading the current jobs from disk, either for starting them
889 (if they were queue) or for aborting them (if they were already
892 @type context: GanetiContext
893 @param context: the context object for access to the configuration
894 data and other ganeti objects
897 self.context = context
898 self._memcache = weakref.WeakValueDictionary()
899 self._my_hostname = netutils.Hostname.GetSysName()
901 # The Big JobQueue lock. If a code block or method acquires it in shared
902 # mode safe it must guarantee concurrency with all the code acquiring it in
903 # shared mode, including itself. In order not to acquire it at all
904 # concurrency must be guaranteed with all code acquiring it in shared mode
905 # and all code acquiring it exclusively.
906 self._lock = locking.SharedLock("JobQueue")
908 self.acquire = self._lock.acquire
909 self.release = self._lock.release
911 # Initialize the queue, and acquire the filelock.
912 # This ensures no other process is working on the job queue.
913 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
916 self._last_serial = jstore.ReadSerial()
917 assert self._last_serial is not None, ("Serial file was modified between"
918 " check in jstore and here")
920 # Get initial list of nodes
921 self._nodes = dict((n.name, n.primary_ip)
922 for n in self.context.cfg.GetAllNodesInfo().values()
923 if n.master_candidate)
926 self._nodes.pop(self._my_hostname, None)
928 # TODO: Check consistency across nodes
931 self._UpdateQueueSizeUnlocked()
932 self._drained = self._IsQueueMarkedDrain()
935 self._wpool = _JobQueueWorkerPool(self)
939 self._wpool.TerminateWorkers()
942 @locking.ssynchronized(_LOCK)
944 def _InspectQueue(self):
945 """Loads the whole job queue and resumes unfinished jobs.
947 This function needs the lock here because WorkerPool.AddTask() may start a
948 job while we're still doing our work.
951 logging.info("Inspecting job queue")
953 all_job_ids = self._GetJobIDsUnlocked()
954 jobs_count = len(all_job_ids)
955 lastinfo = time.time()
956 for idx, job_id in enumerate(all_job_ids):
957 # Give an update every 1000 jobs or 10 seconds
958 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
959 idx == (jobs_count - 1)):
960 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
961 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
962 lastinfo = time.time()
964 job = self._LoadJobUnlocked(job_id)
966 # a failure in loading the job can cause 'None' to be returned
970 status = job.CalcStatus()
972 if status in (constants.JOB_STATUS_QUEUED,
973 constants.JOB_STATUS_WAITLOCK):
974 self._wpool.AddTask((job, ))
976 elif status in (constants.JOB_STATUS_RUNNING,
977 constants.JOB_STATUS_CANCELING):
978 logging.warning("Unfinished job %s found: %s", job.id, job)
979 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
980 "Unclean master daemon shutdown")
981 self.UpdateJobUnlocked(job)
983 logging.info("Job queue inspection finished")
985 @locking.ssynchronized(_LOCK)
987 def AddNode(self, node):
988 """Register a new node with the queue.
990 @type node: L{objects.Node}
991 @param node: the node object to be added
994 node_name = node.name
995 assert node_name != self._my_hostname
997 # Clean queue directory on added node
998 result = rpc.RpcRunner.call_jobqueue_purge(node_name)
999 msg = result.fail_msg
1001 logging.warning("Cannot cleanup queue directory on node %s: %s",
1004 if not node.master_candidate:
1005 # remove if existing, ignoring errors
1006 self._nodes.pop(node_name, None)
1007 # and skip the replication of the job ids
1010 # Upload the whole queue excluding archived jobs
1011 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1013 # Upload current serial file
1014 files.append(constants.JOB_QUEUE_SERIAL_FILE)
1016 for file_name in files:
1018 content = utils.ReadFile(file_name)
1020 result = rpc.RpcRunner.call_jobqueue_update([node_name],
1023 msg = result[node_name].fail_msg
1025 logging.error("Failed to upload file %s to node %s: %s",
1026 file_name, node_name, msg)
1028 self._nodes[node_name] = node.primary_ip
1030 @locking.ssynchronized(_LOCK)
1032 def RemoveNode(self, node_name):
1033 """Callback called when removing nodes from the cluster.
1035 @type node_name: str
1036 @param node_name: the name of the node to remove
1039 self._nodes.pop(node_name, None)
1042 def _CheckRpcResult(result, nodes, failmsg):
1043 """Verifies the status of an RPC call.
1045 Since we aim to keep consistency should this node (the current
1046 master) fail, we will log errors if our rpc fail, and especially
1047 log the case when more than half of the nodes fails.
1049 @param result: the data as returned from the rpc call
1051 @param nodes: the list of nodes we made the call to
1053 @param failmsg: the identifier to be used for logging
1060 msg = result[node].fail_msg
1063 logging.error("RPC call %s (%s) failed on node %s: %s",
1064 result[node].call, failmsg, node, msg)
1066 success.append(node)
1068 # +1 for the master node
1069 if (len(success) + 1) < len(failed):
1070 # TODO: Handle failing nodes
1071 logging.error("More than half of the nodes failed")
1073 def _GetNodeIp(self):
1074 """Helper for returning the node name/ip list.
1076 @rtype: (list, list)
1077 @return: a tuple of two lists, the first one with the node
1078 names and the second one with the node addresses
1081 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1082 name_list = self._nodes.keys()
1083 addr_list = [self._nodes[name] for name in name_list]
1084 return name_list, addr_list
1086 def _UpdateJobQueueFile(self, file_name, data, replicate):
1087 """Writes a file locally and then replicates it to all nodes.
1089 This function will replace the contents of a file on the local
1090 node and then replicate it to all the other nodes we have.
1092 @type file_name: str
1093 @param file_name: the path of the file to be replicated
1095 @param data: the new contents of the file
1096 @type replicate: boolean
1097 @param replicate: whether to spread the changes to the remote nodes
1100 getents = runtime.GetEnts()
1101 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1102 gid=getents.masterd_gid)
1105 names, addrs = self._GetNodeIp()
1106 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1107 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1109 def _RenameFilesUnlocked(self, rename):
1110 """Renames a file locally and then replicate the change.
1112 This function will rename a file in the local queue directory
1113 and then replicate this rename to all the other nodes we have.
1115 @type rename: list of (old, new)
1116 @param rename: List containing tuples mapping old to new names
1119 # Rename them locally
1120 for old, new in rename:
1121 utils.RenameFile(old, new, mkdir=True)
1123 # ... and on all nodes
1124 names, addrs = self._GetNodeIp()
1125 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1126 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1129 def _FormatJobID(job_id):
1130 """Convert a job ID to string format.
1132 Currently this just does C{str(job_id)} after performing some
1133 checks, but if we want to change the job id format this will
1134 abstract this change.
1136 @type job_id: int or long
1137 @param job_id: the numeric job id
1139 @return: the formatted job id
1142 if not isinstance(job_id, (int, long)):
1143 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1145 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1150 def _GetArchiveDirectory(cls, job_id):
1151 """Returns the archive directory for a job.
1154 @param job_id: Job identifier
1156 @return: Directory name
1159 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1161 def _NewSerialsUnlocked(self, count):
1162 """Generates a new job identifier.
1164 Job identifiers are unique during the lifetime of a cluster.
1166 @type count: integer
1167 @param count: how many serials to return
1169 @return: a string representing the job identifier.
1174 serial = self._last_serial + count
1177 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1178 "%s\n" % serial, True)
1180 result = [self._FormatJobID(v)
1181 for v in range(self._last_serial, serial + 1)]
1182 # Keep it only if we were able to write the file
1183 self._last_serial = serial
1188 def _GetJobPath(job_id):
1189 """Returns the job file for a given job id.
1192 @param job_id: the job identifier
1194 @return: the path to the job file
1197 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1200 def _GetArchivedJobPath(cls, job_id):
1201 """Returns the archived job file for a give job id.
1204 @param job_id: the job identifier
1206 @return: the path to the archived job file
1209 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1210 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1212 def _GetJobIDsUnlocked(self, sort=True):
1213 """Return all known job IDs.
1215 The method only looks at disk because it's a requirement that all
1216 jobs are present on disk (so in the _memcache we don't have any
1220 @param sort: perform sorting on the returned job ids
1222 @return: the list of job IDs
1226 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1227 m = self._RE_JOB_FILE.match(filename)
1229 jlist.append(m.group(1))
1231 jlist = utils.NiceSort(jlist)
1234 def _LoadJobUnlocked(self, job_id):
1235 """Loads a job from the disk or memory.
1237 Given a job id, this will return the cached job object if
1238 existing, or try to load the job from the disk. If loading from
1239 disk, it will also add the job to the cache.
1241 @param job_id: the job id
1242 @rtype: L{_QueuedJob} or None
1243 @return: either None or the job object
1246 job = self._memcache.get(job_id, None)
1248 logging.debug("Found job %s in memcache", job_id)
1252 job = self._LoadJobFromDisk(job_id)
1255 except errors.JobFileCorrupted:
1256 old_path = self._GetJobPath(job_id)
1257 new_path = self._GetArchivedJobPath(job_id)
1258 if old_path == new_path:
1259 # job already archived (future case)
1260 logging.exception("Can't parse job %s", job_id)
1263 logging.exception("Can't parse job %s, will archive.", job_id)
1264 self._RenameFilesUnlocked([(old_path, new_path)])
1267 self._memcache[job_id] = job
1268 logging.debug("Added job %s to the cache", job_id)
1271 def _LoadJobFromDisk(self, job_id):
1272 """Load the given job file from disk.
1274 Given a job file, read, load and restore it in a _QueuedJob format.
1276 @type job_id: string
1277 @param job_id: job identifier
1278 @rtype: L{_QueuedJob} or None
1279 @return: either None or the job object
1282 filepath = self._GetJobPath(job_id)
1283 logging.debug("Loading job from %s", filepath)
1285 raw_data = utils.ReadFile(filepath)
1286 except EnvironmentError, err:
1287 if err.errno in (errno.ENOENT, ):
1292 data = serializer.LoadJson(raw_data)
1293 job = _QueuedJob.Restore(self, data)
1294 except Exception, err: # pylint: disable-msg=W0703
1295 raise errors.JobFileCorrupted(err)
1299 def SafeLoadJobFromDisk(self, job_id):
1300 """Load the given job file from disk.
1302 Given a job file, read, load and restore it in a _QueuedJob format.
1303 In case of error reading the job, it gets returned as None, and the
1304 exception is logged.
1306 @type job_id: string
1307 @param job_id: job identifier
1308 @rtype: L{_QueuedJob} or None
1309 @return: either None or the job object
1313 return self._LoadJobFromDisk(job_id)
1314 except (errors.JobFileCorrupted, EnvironmentError):
1315 logging.exception("Can't load/parse job %s", job_id)
1319 def _IsQueueMarkedDrain():
1320 """Check if the queue is marked from drain.
1322 This currently uses the queue drain file, which makes it a
1323 per-node flag. In the future this can be moved to the config file.
1326 @return: True of the job queue is marked for draining
1329 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1331 def _UpdateQueueSizeUnlocked(self):
1332 """Update the queue size.
1335 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1337 @locking.ssynchronized(_LOCK)
1339 def SetDrainFlag(self, drain_flag):
1340 """Sets the drain flag for the queue.
1342 @type drain_flag: boolean
1343 @param drain_flag: Whether to set or unset the drain flag
1346 getents = runtime.GetEnts()
1349 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
1350 uid=getents.masterd_uid, gid=getents.masterd_gid)
1352 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1354 self._drained = drain_flag
1359 def _SubmitJobUnlocked(self, job_id, ops):
1360 """Create and store a new job.
1362 This enters the job into our job queue and also puts it on the new
1363 queue, in order for it to be picked up by the queue processors.
1365 @type job_id: job ID
1366 @param job_id: the job ID for the new job
1368 @param ops: The list of OpCodes that will become the new job.
1369 @rtype: L{_QueuedJob}
1370 @return: the job object to be queued
1371 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1372 @raise errors.JobQueueFull: if the job queue has too many jobs in it
1373 @raise errors.GenericError: If an opcode is not valid
1376 # Ok when sharing the big job queue lock, as the drain file is created when
1377 # the lock is exclusive.
1379 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1381 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1382 raise errors.JobQueueFull()
1384 job = _QueuedJob(self, job_id, ops)
1387 for idx, op in enumerate(job.ops):
1388 if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1389 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1390 raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1391 " are %s" % (idx, op.priority, allowed))
1394 self.UpdateJobUnlocked(job)
1396 self._queue_size += 1
1398 logging.debug("Adding new job %s to the cache", job_id)
1399 self._memcache[job_id] = job
1403 @locking.ssynchronized(_LOCK)
1405 def SubmitJob(self, ops):
1406 """Create and store a new job.
1408 @see: L{_SubmitJobUnlocked}
1411 job_id = self._NewSerialsUnlocked(1)[0]
1412 self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
1415 @locking.ssynchronized(_LOCK)
1417 def SubmitManyJobs(self, jobs):
1418 """Create and store multiple jobs.
1420 @see: L{_SubmitJobUnlocked}
1425 all_job_ids = self._NewSerialsUnlocked(len(jobs))
1426 for job_id, ops in zip(all_job_ids, jobs):
1428 tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1431 except errors.GenericError, err:
1434 results.append((status, data))
1435 self._wpool.AddManyTasks(tasks)
1440 def UpdateJobUnlocked(self, job, replicate=True):
1441 """Update a job's on disk storage.
1443 After a job has been modified, this function needs to be called in
1444 order to write the changes to disk and replicate them to the other
1447 @type job: L{_QueuedJob}
1448 @param job: the changed job
1449 @type replicate: boolean
1450 @param replicate: whether to replicate the change to remote nodes
1453 filename = self._GetJobPath(job.id)
1454 data = serializer.DumpJson(job.Serialize(), indent=False)
1455 logging.debug("Writing job %s to %s", job.id, filename)
1456 self._UpdateJobQueueFile(filename, data, replicate)
1458 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1460 """Waits for changes in a job.
1462 @type job_id: string
1463 @param job_id: Job identifier
1464 @type fields: list of strings
1465 @param fields: Which fields to check for changes
1466 @type prev_job_info: list or None
1467 @param prev_job_info: Last job information returned
1468 @type prev_log_serial: int
1469 @param prev_log_serial: Last job message serial number
1470 @type timeout: float
1471 @param timeout: maximum time to wait in seconds
1472 @rtype: tuple (job info, log entries)
1473 @return: a tuple of the job information as required via
1474 the fields parameter, and the log entries as a list
1476 if the job has not changed and the timeout has expired,
1477 we instead return a special value,
1478 L{constants.JOB_NOTCHANGED}, which should be interpreted
1479 as such by the clients
1482 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1484 helper = _WaitForJobChangesHelper()
1486 return helper(self._GetJobPath(job_id), load_fn,
1487 fields, prev_job_info, prev_log_serial, timeout)
1489 @locking.ssynchronized(_LOCK)
1491 def CancelJob(self, job_id):
1494 This will only succeed if the job has not started yet.
1496 @type job_id: string
1497 @param job_id: job ID of job to be cancelled.
1500 logging.info("Cancelling job %s", job_id)
1502 job = self._LoadJobUnlocked(job_id)
1504 logging.debug("Job %s not found", job_id)
1505 return (False, "Job %s not found" % job_id)
1507 job_status = job.CalcStatus()
1509 if job_status not in (constants.JOB_STATUS_QUEUED,
1510 constants.JOB_STATUS_WAITLOCK):
1511 logging.debug("Job %s is no longer waiting in the queue", job.id)
1512 return (False, "Job %s is no longer waiting in the queue" % job.id)
1514 if job_status == constants.JOB_STATUS_QUEUED:
1515 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1516 "Job canceled by request")
1517 msg = "Job %s canceled" % job.id
1519 elif job_status == constants.JOB_STATUS_WAITLOCK:
1520 # The worker will notice the new status and cancel the job
1521 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1522 msg = "Job %s will be canceled" % job.id
1524 self.UpdateJobUnlocked(job)
1529 def _ArchiveJobsUnlocked(self, jobs):
1532 @type jobs: list of L{_QueuedJob}
1533 @param jobs: Job objects
1535 @return: Number of archived jobs
1541 if job.CalcStatus() not in constants.JOBS_FINALIZED:
1542 logging.debug("Job %s is not yet done", job.id)
1545 archive_jobs.append(job)
1547 old = self._GetJobPath(job.id)
1548 new = self._GetArchivedJobPath(job.id)
1549 rename_files.append((old, new))
1551 # TODO: What if 1..n files fail to rename?
1552 self._RenameFilesUnlocked(rename_files)
1554 logging.debug("Successfully archived job(s) %s",
1555 utils.CommaJoin(job.id for job in archive_jobs))
1557 # Since we haven't quite checked, above, if we succeeded or failed renaming
1558 # the files, we update the cached queue size from the filesystem. When we
1559 # get around to fix the TODO: above, we can use the number of actually
1560 # archived jobs to fix this.
1561 self._UpdateQueueSizeUnlocked()
1562 return len(archive_jobs)
1564 @locking.ssynchronized(_LOCK)
1566 def ArchiveJob(self, job_id):
1569 This is just a wrapper over L{_ArchiveJobsUnlocked}.
1571 @type job_id: string
1572 @param job_id: Job ID of job to be archived.
1574 @return: Whether job was archived
1577 logging.info("Archiving job %s", job_id)
1579 job = self._LoadJobUnlocked(job_id)
1581 logging.debug("Job %s not found", job_id)
1584 return self._ArchiveJobsUnlocked([job]) == 1
1586 @locking.ssynchronized(_LOCK)
1588 def AutoArchiveJobs(self, age, timeout):
1589 """Archives all jobs based on age.
1591 The method will archive all jobs which are older than the age
1592 parameter. For jobs that don't have an end timestamp, the start
1593 timestamp will be considered. The special '-1' age will cause
1594 archival of all jobs (that are not running or queued).
1597 @param age: the minimum age in seconds
1600 logging.info("Archiving jobs with age more than %s seconds", age)
1603 end_time = now + timeout
1607 all_job_ids = self._GetJobIDsUnlocked()
1609 for idx, job_id in enumerate(all_job_ids):
1610 last_touched = idx + 1
1612 # Not optimal because jobs could be pending
1613 # TODO: Measure average duration for job archival and take number of
1614 # pending jobs into account.
1615 if time.time() > end_time:
1618 # Returns None if the job failed to load
1619 job = self._LoadJobUnlocked(job_id)
1621 if job.end_timestamp is None:
1622 if job.start_timestamp is None:
1623 job_age = job.received_timestamp
1625 job_age = job.start_timestamp
1627 job_age = job.end_timestamp
1629 if age == -1 or now - job_age[0] > age:
1632 # Archive 10 jobs at a time
1633 if len(pending) >= 10:
1634 archived_count += self._ArchiveJobsUnlocked(pending)
1638 archived_count += self._ArchiveJobsUnlocked(pending)
1640 return (archived_count, len(all_job_ids) - last_touched)
1642 def QueryJobs(self, job_ids, fields):
1643 """Returns a list of jobs in queue.
1646 @param job_ids: sequence of job identifiers or None for all
1648 @param fields: names of fields to return
1650 @return: list one element per job, each element being list with
1651 the requested fields
1657 # Since files are added to/removed from the queue atomically, there's no
1658 # risk of getting the job ids in an inconsistent state.
1659 job_ids = self._GetJobIDsUnlocked()
1662 for job_id in job_ids:
1663 job = self.SafeLoadJobFromDisk(job_id)
1665 jobs.append(job.GetInfo(fields))
1671 @locking.ssynchronized(_LOCK)
1674 """Stops the job queue.
1676 This shutdowns all the worker threads an closes the queue.
1679 self._wpool.TerminateWorkers()
1681 self._queue_filelock.Close()
1682 self._queue_filelock = None