4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 # pylint: disable-msg=E0611
41 from pyinotify import pyinotify
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import netutils
57 from ganeti import compat
61 JOBS_PER_ARCHIVE_DIRECTORY = 10000
63 # member lock names to be passed to @ssynchronized decorator
68 class CancelJob(Exception):
69 """Special exception to cancel a job.
75 """Returns the current timestamp.
78 @return: the current time in the (seconds, microseconds) format
81 return utils.SplitTime(time.time())
84 class _QueuedOpCode(object):
85 """Encapsulates an opcode object.
87 @ivar log: holds the execution log and consists of tuples
88 of the form C{(log_serial, timestamp, level, message)}
89 @ivar input: the OpCode we encapsulate
90 @ivar status: the current status
91 @ivar result: the result of the LU execution
92 @ivar start_timestamp: timestamp for the start of the execution
93 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94 @ivar stop_timestamp: timestamp for the end of the execution
97 __slots__ = ["input", "status", "result", "log",
98 "start_timestamp", "exec_timestamp", "end_timestamp",
101 def __init__(self, op):
102 """Constructor for the _QuededOpCode.
104 @type op: L{opcodes.OpCode}
105 @param op: the opcode we encapsulate
109 self.status = constants.OP_STATUS_QUEUED
112 self.start_timestamp = None
113 self.exec_timestamp = None
114 self.end_timestamp = None
117 def Restore(cls, state):
118 """Restore the _QueuedOpCode from the serialized form.
121 @param state: the serialized state
122 @rtype: _QueuedOpCode
123 @return: a new _QueuedOpCode instance
126 obj = _QueuedOpCode.__new__(cls)
127 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
128 obj.status = state["status"]
129 obj.result = state["result"]
130 obj.log = state["log"]
131 obj.start_timestamp = state.get("start_timestamp", None)
132 obj.exec_timestamp = state.get("exec_timestamp", None)
133 obj.end_timestamp = state.get("end_timestamp", None)
137 """Serializes this _QueuedOpCode.
140 @return: the dictionary holding the serialized state
144 "input": self.input.__getstate__(),
145 "status": self.status,
146 "result": self.result,
148 "start_timestamp": self.start_timestamp,
149 "exec_timestamp": self.exec_timestamp,
150 "end_timestamp": self.end_timestamp,
154 class _QueuedJob(object):
155 """In-memory job representation.
157 This is what we use to track the user-submitted jobs. Locking must
158 be taken care of by users of this class.
160 @type queue: L{JobQueue}
161 @ivar queue: the parent queue
164 @ivar ops: the list of _QueuedOpCode that constitute the job
165 @type log_serial: int
166 @ivar log_serial: holds the index for the next log entry
167 @ivar received_timestamp: the timestamp for when the job was received
168 @ivar start_timestmap: the timestamp for start of execution
169 @ivar end_timestamp: the timestamp for end of execution
170 @ivar lock_status: In-memory locking information for debugging
173 # pylint: disable-msg=W0212
174 __slots__ = ["queue", "id", "ops", "log_serial",
175 "received_timestamp", "start_timestamp", "end_timestamp",
176 "lock_status", "change",
179 def __init__(self, queue, job_id, ops):
180 """Constructor for the _QueuedJob.
182 @type queue: L{JobQueue}
183 @param queue: our parent queue
185 @param job_id: our job id
187 @param ops: the list of opcodes we hold, which will be encapsulated
192 raise errors.GenericError("A job needs at least one opcode")
196 self.ops = [_QueuedOpCode(op) for op in ops]
198 self.received_timestamp = TimeStampNow()
199 self.start_timestamp = None
200 self.end_timestamp = None
202 # In-memory attributes
203 self.lock_status = None
206 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
208 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
210 return "<%s at %#x>" % (" ".join(status), id(self))
213 def Restore(cls, queue, state):
214 """Restore a _QueuedJob from serialized state:
216 @type queue: L{JobQueue}
217 @param queue: to which queue the restored job belongs
219 @param state: the serialized state
221 @return: the restored _JobQueue instance
224 obj = _QueuedJob.__new__(cls)
227 obj.received_timestamp = state.get("received_timestamp", None)
228 obj.start_timestamp = state.get("start_timestamp", None)
229 obj.end_timestamp = state.get("end_timestamp", None)
231 # In-memory attributes
232 obj.lock_status = None
236 for op_state in state["ops"]:
237 op = _QueuedOpCode.Restore(op_state)
238 for log_entry in op.log:
239 obj.log_serial = max(obj.log_serial, log_entry[0])
245 """Serialize the _JobQueue instance.
248 @return: the serialized state
253 "ops": [op.Serialize() for op in self.ops],
254 "start_timestamp": self.start_timestamp,
255 "end_timestamp": self.end_timestamp,
256 "received_timestamp": self.received_timestamp,
259 def CalcStatus(self):
260 """Compute the status of this job.
262 This function iterates over all the _QueuedOpCodes in the job and
263 based on their status, computes the job status.
266 - if we find a cancelled, or finished with error, the job
267 status will be the same
268 - otherwise, the last opcode with the status one of:
273 will determine the job status
275 - otherwise, it means either all opcodes are queued, or success,
276 and the job status will be the same
278 @return: the job status
281 status = constants.JOB_STATUS_QUEUED
285 if op.status == constants.OP_STATUS_SUCCESS:
290 if op.status == constants.OP_STATUS_QUEUED:
292 elif op.status == constants.OP_STATUS_WAITLOCK:
293 status = constants.JOB_STATUS_WAITLOCK
294 elif op.status == constants.OP_STATUS_RUNNING:
295 status = constants.JOB_STATUS_RUNNING
296 elif op.status == constants.OP_STATUS_CANCELING:
297 status = constants.JOB_STATUS_CANCELING
299 elif op.status == constants.OP_STATUS_ERROR:
300 status = constants.JOB_STATUS_ERROR
301 # The whole job fails if one opcode failed
303 elif op.status == constants.OP_STATUS_CANCELED:
304 status = constants.OP_STATUS_CANCELED
308 status = constants.JOB_STATUS_SUCCESS
312 def GetLogEntries(self, newer_than):
313 """Selectively returns the log entries.
315 @type newer_than: None or int
316 @param newer_than: if this is None, return all log entries,
317 otherwise return only the log entries with serial higher
320 @return: the list of the log entries selected
323 if newer_than is None:
330 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
334 def GetInfo(self, fields):
335 """Returns information about a job.
338 @param fields: names of fields to return
340 @return: list with one element for each field
341 @raise errors.OpExecError: when an invalid field
349 elif fname == "status":
350 row.append(self.CalcStatus())
352 row.append([op.input.__getstate__() for op in self.ops])
353 elif fname == "opresult":
354 row.append([op.result for op in self.ops])
355 elif fname == "opstatus":
356 row.append([op.status for op in self.ops])
357 elif fname == "oplog":
358 row.append([op.log for op in self.ops])
359 elif fname == "opstart":
360 row.append([op.start_timestamp for op in self.ops])
361 elif fname == "opexec":
362 row.append([op.exec_timestamp for op in self.ops])
363 elif fname == "opend":
364 row.append([op.end_timestamp for op in self.ops])
365 elif fname == "received_ts":
366 row.append(self.received_timestamp)
367 elif fname == "start_ts":
368 row.append(self.start_timestamp)
369 elif fname == "end_ts":
370 row.append(self.end_timestamp)
371 elif fname == "lock_status":
372 row.append(self.lock_status)
373 elif fname == "summary":
374 row.append([op.input.Summary() for op in self.ops])
376 raise errors.OpExecError("Invalid self query field '%s'" % fname)
379 def MarkUnfinishedOps(self, status, result):
380 """Mark unfinished opcodes with a given status and result.
382 This is an utility function for marking all running or waiting to
383 be run opcodes with a given status. Opcodes which are already
384 finalised are not changed.
386 @param status: a given opcode status
387 @param result: the opcode result
393 if op.status in constants.OPS_FINALIZED:
394 assert not_marked, "Finalized opcodes found after non-finalized ones"
400 self.queue.UpdateJobUnlocked(self)
403 class _OpExecCallbacks(mcpu.OpExecCbBase):
404 def __init__(self, queue, job, op):
405 """Initializes this class.
407 @type queue: L{JobQueue}
408 @param queue: Job queue
409 @type job: L{_QueuedJob}
410 @param job: Job object
411 @type op: L{_QueuedOpCode}
415 assert queue, "Queue is missing"
416 assert job, "Job is missing"
417 assert op, "Opcode is missing"
423 def _CheckCancel(self):
424 """Raises an exception to cancel the job if asked to.
427 # Cancel here if we were asked to
428 if self._op.status == constants.OP_STATUS_CANCELING:
429 logging.debug("Canceling opcode")
432 @locking.ssynchronized(_QUEUE, shared=1)
433 def NotifyStart(self):
434 """Mark the opcode as running, not lock-waiting.
436 This is called from the mcpu code as a notifier function, when the LU is
437 finally about to start the Exec() method. Of course, to have end-user
438 visible results, the opcode must be initially (before calling into
439 Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
442 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
443 constants.OP_STATUS_CANCELING)
445 # All locks are acquired by now
446 self._job.lock_status = None
448 # Cancel here if we were asked to
451 logging.debug("Opcode is now running")
452 self._op.status = constants.OP_STATUS_RUNNING
453 self._op.exec_timestamp = TimeStampNow()
455 # And finally replicate the job status
456 self._queue.UpdateJobUnlocked(self._job)
458 @locking.ssynchronized(_QUEUE, shared=1)
459 def _AppendFeedback(self, timestamp, log_type, log_msg):
460 """Internal feedback append function, with locks
463 self._job.log_serial += 1
464 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
465 self._queue.UpdateJobUnlocked(self._job, replicate=False)
467 def Feedback(self, *args):
468 """Append a log entry.
474 log_type = constants.ELOG_MESSAGE
477 (log_type, log_msg) = args
479 # The time is split to make serialization easier and not lose
481 timestamp = utils.SplitTime(time.time())
482 self._AppendFeedback(timestamp, log_type, log_msg)
484 def ReportLocks(self, msg):
485 """Write locking information to the job.
487 Called whenever the LU processor is waiting for a lock or has acquired one.
490 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
491 constants.OP_STATUS_CANCELING)
493 # Not getting the queue lock because this is a single assignment
494 self._job.lock_status = msg
496 # Cancel here if we were asked to
500 class _JobChangesChecker(object):
501 def __init__(self, fields, prev_job_info, prev_log_serial):
502 """Initializes this class.
504 @type fields: list of strings
505 @param fields: Fields requested by LUXI client
506 @type prev_job_info: string
507 @param prev_job_info: previous job info, as passed by the LUXI client
508 @type prev_log_serial: string
509 @param prev_log_serial: previous job serial, as passed by the LUXI client
512 self._fields = fields
513 self._prev_job_info = prev_job_info
514 self._prev_log_serial = prev_log_serial
516 def __call__(self, job):
517 """Checks whether job has changed.
519 @type job: L{_QueuedJob}
520 @param job: Job object
523 status = job.CalcStatus()
524 job_info = job.GetInfo(self._fields)
525 log_entries = job.GetLogEntries(self._prev_log_serial)
527 # Serializing and deserializing data can cause type changes (e.g. from
528 # tuple to list) or precision loss. We're doing it here so that we get
529 # the same modifications as the data received from the client. Without
530 # this, the comparison afterwards might fail without the data being
531 # significantly different.
532 # TODO: we just deserialized from disk, investigate how to make sure that
533 # the job info and log entries are compatible to avoid this further step.
534 # TODO: Doing something like in testutils.py:UnifyValueType might be more
535 # efficient, though floats will be tricky
536 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
537 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
539 # Don't even try to wait if the job is no longer running, there will be
541 if (status not in (constants.JOB_STATUS_QUEUED,
542 constants.JOB_STATUS_RUNNING,
543 constants.JOB_STATUS_WAITLOCK) or
544 job_info != self._prev_job_info or
545 (log_entries and self._prev_log_serial != log_entries[0][0])):
546 logging.debug("Job %s changed", job.id)
547 return (job_info, log_entries)
552 class _JobFileChangesWaiter(object):
553 def __init__(self, filename):
554 """Initializes this class.
556 @type filename: string
557 @param filename: Path to job file
558 @raises errors.InotifyError: if the notifier cannot be setup
561 self._wm = pyinotify.WatchManager()
562 self._inotify_handler = \
563 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
565 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
567 self._inotify_handler.enable()
569 # pyinotify doesn't close file descriptors automatically
570 self._notifier.stop()
573 def _OnInotify(self, notifier_enabled):
574 """Callback for inotify.
577 if not notifier_enabled:
578 self._inotify_handler.enable()
580 def Wait(self, timeout):
581 """Waits for the job file to change.
584 @param timeout: Timeout in seconds
585 @return: Whether there have been events
589 have_events = self._notifier.check_events(timeout * 1000)
591 self._notifier.read_events()
592 self._notifier.process_events()
596 """Closes underlying notifier and its file descriptor.
599 self._notifier.stop()
602 class _JobChangesWaiter(object):
603 def __init__(self, filename):
604 """Initializes this class.
606 @type filename: string
607 @param filename: Path to job file
610 self._filewaiter = None
611 self._filename = filename
613 def Wait(self, timeout):
614 """Waits for a job to change.
617 @param timeout: Timeout in seconds
618 @return: Whether there have been events
622 return self._filewaiter.Wait(timeout)
624 # Lazy setup: Avoid inotify setup cost when job file has already changed.
625 # If this point is reached, return immediately and let caller check the job
626 # file again in case there were changes since the last check. This avoids a
628 self._filewaiter = _JobFileChangesWaiter(self._filename)
633 """Closes underlying waiter.
637 self._filewaiter.Close()
640 class _WaitForJobChangesHelper(object):
641 """Helper class using inotify to wait for changes in a job file.
643 This class takes a previous job status and serial, and alerts the client when
644 the current job status has changed.
648 def _CheckForChanges(job_load_fn, check_fn):
651 raise errors.JobLost()
653 result = check_fn(job)
655 raise utils.RetryAgain()
659 def __call__(self, filename, job_load_fn,
660 fields, prev_job_info, prev_log_serial, timeout):
661 """Waits for changes on a job.
663 @type filename: string
664 @param filename: File on which to wait for changes
665 @type job_load_fn: callable
666 @param job_load_fn: Function to load job
667 @type fields: list of strings
668 @param fields: Which fields to check for changes
669 @type prev_job_info: list or None
670 @param prev_job_info: Last job information returned
671 @type prev_log_serial: int
672 @param prev_log_serial: Last job message serial number
674 @param timeout: maximum time to wait in seconds
678 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
679 waiter = _JobChangesWaiter(filename)
681 return utils.Retry(compat.partial(self._CheckForChanges,
682 job_load_fn, check_fn),
683 utils.RETRY_REMAINING_TIME, timeout,
687 except (errors.InotifyError, errors.JobLost):
689 except utils.RetryTimeout:
690 return constants.JOB_NOTCHANGED
693 class _JobQueueWorker(workerpool.BaseWorker):
694 """The actual job workers.
697 def RunTask(self, job): # pylint: disable-msg=W0221
700 This functions processes a job. It is closely tied to the _QueuedJob and
701 _QueuedOpCode classes.
703 @type job: L{_QueuedJob}
704 @param job: the job to be processed
707 logging.info("Processing job %s", job.id)
708 proc = mcpu.Processor(self.pool.queue.context, job.id)
713 for idx, op in enumerate(job.ops):
714 op_summary = op.input.Summary()
715 if op.status == constants.OP_STATUS_SUCCESS:
716 # this is a job that was partially completed before master
717 # daemon shutdown, so it can be expected that some opcodes
718 # are already completed successfully (if any did error
719 # out, then the whole job should have been aborted and not
720 # resubmitted for processing)
721 logging.info("Op %s/%s: opcode %s already processed, skipping",
722 idx + 1, count, op_summary)
725 logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
728 queue.acquire(shared=1)
730 if op.status == constants.OP_STATUS_CANCELED:
731 logging.debug("Canceling opcode")
733 assert op.status == constants.OP_STATUS_QUEUED
734 logging.debug("Opcode %s/%s waiting for locks",
736 op.status = constants.OP_STATUS_WAITLOCK
738 op.start_timestamp = TimeStampNow()
739 if idx == 0: # first opcode
740 job.start_timestamp = op.start_timestamp
741 queue.UpdateJobUnlocked(job)
743 input_opcode = op.input
747 # Make sure not to hold queue lock while calling ExecOpCode
748 result = proc.ExecOpCode(input_opcode,
749 _OpExecCallbacks(queue, job, op))
751 queue.acquire(shared=1)
753 logging.debug("Opcode %s/%s succeeded", idx + 1, count)
754 op.status = constants.OP_STATUS_SUCCESS
756 op.end_timestamp = TimeStampNow()
758 job.lock_status = None
759 job.end_timestamp = TimeStampNow()
762 assert compat.all(i.status == constants.OP_STATUS_SUCCESS
765 queue.UpdateJobUnlocked(job)
769 logging.info("Op %s/%s: Successfully finished opcode %s",
770 idx + 1, count, op_summary)
772 # Will be handled further up
774 except Exception, err:
775 queue.acquire(shared=1)
778 logging.debug("Opcode %s/%s failed", idx + 1, count)
779 op.status = constants.OP_STATUS_ERROR
780 if isinstance(err, errors.GenericError):
783 to_encode = errors.OpExecError(str(err))
784 op.result = errors.EncodeException(to_encode)
785 op.end_timestamp = TimeStampNow()
786 logging.info("Op %s/%s: Error in opcode %s: %s",
787 idx + 1, count, op_summary, err)
789 to_encode = errors.OpExecError("Preceding opcode failed")
790 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
791 errors.EncodeException(to_encode))
794 assert compat.all(i.status == constants.OP_STATUS_SUCCESS
795 for i in job.ops[:idx])
796 assert compat.all(i.status == constants.OP_STATUS_ERROR and
797 errors.GetEncodedError(i.result)
798 for i in job.ops[idx:])
800 job.lock_status = None
801 job.end_timestamp = TimeStampNow()
802 queue.UpdateJobUnlocked(job)
808 queue.acquire(shared=1)
810 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
811 "Job canceled by request")
812 job.lock_status = None
813 job.end_timestamp = TimeStampNow()
814 queue.UpdateJobUnlocked(job)
817 except errors.GenericError, err:
818 logging.exception("Ganeti exception")
820 logging.exception("Unhandled exception")
822 status = job.CalcStatus()
823 logging.info("Finished job %s, status = %s", job.id, status)
826 class _JobQueueWorkerPool(workerpool.WorkerPool):
827 """Simple class implementing a job-processing workerpool.
830 def __init__(self, queue):
831 super(_JobQueueWorkerPool, self).__init__("JobQueue",
837 def _RequireOpenQueue(fn):
838 """Decorator for "public" functions.
840 This function should be used for all 'public' functions. That is,
841 functions usually called from other classes. Note that this should
842 be applied only to methods (not plain functions), since it expects
843 that the decorated function is called with a first argument that has
844 a '_queue_filelock' argument.
846 @warning: Use this decorator only after locking.ssynchronized
849 @locking.ssynchronized(_LOCK)
855 def wrapper(self, *args, **kwargs):
856 # pylint: disable-msg=W0212
857 assert self._queue_filelock is not None, "Queue should be open"
858 return fn(self, *args, **kwargs)
862 class JobQueue(object):
863 """Queue used to manage the jobs.
865 @cvar _RE_JOB_FILE: regex matching the valid job file names
868 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
870 def __init__(self, context):
871 """Constructor for JobQueue.
873 The constructor will initialize the job queue object and then
874 start loading the current jobs from disk, either for starting them
875 (if they were queue) or for aborting them (if they were already
878 @type context: GanetiContext
879 @param context: the context object for access to the configuration
880 data and other ganeti objects
883 self.context = context
884 self._memcache = weakref.WeakValueDictionary()
885 self._my_hostname = netutils.Hostname.GetSysName()
887 # The Big JobQueue lock. If a code block or method acquires it in shared
888 # mode safe it must guarantee concurrency with all the code acquiring it in
889 # shared mode, including itself. In order not to acquire it at all
890 # concurrency must be guaranteed with all code acquiring it in shared mode
891 # and all code acquiring it exclusively.
892 self._lock = locking.SharedLock("JobQueue")
894 self.acquire = self._lock.acquire
895 self.release = self._lock.release
897 # Initialize the queue, and acquire the filelock.
898 # This ensures no other process is working on the job queue.
899 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
902 self._last_serial = jstore.ReadSerial()
903 assert self._last_serial is not None, ("Serial file was modified between"
904 " check in jstore and here")
906 # Get initial list of nodes
907 self._nodes = dict((n.name, n.primary_ip)
908 for n in self.context.cfg.GetAllNodesInfo().values()
909 if n.master_candidate)
912 self._nodes.pop(self._my_hostname, None)
914 # TODO: Check consistency across nodes
917 self._UpdateQueueSizeUnlocked()
918 self._drained = self._IsQueueMarkedDrain()
921 self._wpool = _JobQueueWorkerPool(self)
923 # We need to lock here because WorkerPool.AddTask() may start a job while
924 # we're still doing our work.
927 logging.info("Inspecting job queue")
929 all_job_ids = self._GetJobIDsUnlocked()
930 jobs_count = len(all_job_ids)
931 lastinfo = time.time()
932 for idx, job_id in enumerate(all_job_ids):
933 # Give an update every 1000 jobs or 10 seconds
934 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
935 idx == (jobs_count - 1)):
936 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
937 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
938 lastinfo = time.time()
940 job = self._LoadJobUnlocked(job_id)
942 # a failure in loading the job can cause 'None' to be returned
946 status = job.CalcStatus()
948 if status in (constants.JOB_STATUS_QUEUED, ):
949 self._wpool.AddTask((job, ))
951 elif status in (constants.JOB_STATUS_RUNNING,
952 constants.JOB_STATUS_WAITLOCK,
953 constants.JOB_STATUS_CANCELING):
954 logging.warning("Unfinished job %s found: %s", job.id, job)
955 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
956 "Unclean master daemon shutdown")
958 logging.info("Job queue inspection finished")
962 self._wpool.TerminateWorkers()
965 @locking.ssynchronized(_LOCK)
967 def AddNode(self, node):
968 """Register a new node with the queue.
970 @type node: L{objects.Node}
971 @param node: the node object to be added
974 node_name = node.name
975 assert node_name != self._my_hostname
977 # Clean queue directory on added node
978 result = rpc.RpcRunner.call_jobqueue_purge(node_name)
979 msg = result.fail_msg
981 logging.warning("Cannot cleanup queue directory on node %s: %s",
984 if not node.master_candidate:
985 # remove if existing, ignoring errors
986 self._nodes.pop(node_name, None)
987 # and skip the replication of the job ids
990 # Upload the whole queue excluding archived jobs
991 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
993 # Upload current serial file
994 files.append(constants.JOB_QUEUE_SERIAL_FILE)
996 for file_name in files:
998 content = utils.ReadFile(file_name)
1000 result = rpc.RpcRunner.call_jobqueue_update([node_name],
1003 msg = result[node_name].fail_msg
1005 logging.error("Failed to upload file %s to node %s: %s",
1006 file_name, node_name, msg)
1008 self._nodes[node_name] = node.primary_ip
1010 @locking.ssynchronized(_LOCK)
1012 def RemoveNode(self, node_name):
1013 """Callback called when removing nodes from the cluster.
1015 @type node_name: str
1016 @param node_name: the name of the node to remove
1019 self._nodes.pop(node_name, None)
1022 def _CheckRpcResult(result, nodes, failmsg):
1023 """Verifies the status of an RPC call.
1025 Since we aim to keep consistency should this node (the current
1026 master) fail, we will log errors if our rpc fail, and especially
1027 log the case when more than half of the nodes fails.
1029 @param result: the data as returned from the rpc call
1031 @param nodes: the list of nodes we made the call to
1033 @param failmsg: the identifier to be used for logging
1040 msg = result[node].fail_msg
1043 logging.error("RPC call %s (%s) failed on node %s: %s",
1044 result[node].call, failmsg, node, msg)
1046 success.append(node)
1048 # +1 for the master node
1049 if (len(success) + 1) < len(failed):
1050 # TODO: Handle failing nodes
1051 logging.error("More than half of the nodes failed")
1053 def _GetNodeIp(self):
1054 """Helper for returning the node name/ip list.
1056 @rtype: (list, list)
1057 @return: a tuple of two lists, the first one with the node
1058 names and the second one with the node addresses
1061 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1062 name_list = self._nodes.keys()
1063 addr_list = [self._nodes[name] for name in name_list]
1064 return name_list, addr_list
1066 def _UpdateJobQueueFile(self, file_name, data, replicate):
1067 """Writes a file locally and then replicates it to all nodes.
1069 This function will replace the contents of a file on the local
1070 node and then replicate it to all the other nodes we have.
1072 @type file_name: str
1073 @param file_name: the path of the file to be replicated
1075 @param data: the new contents of the file
1076 @type replicate: boolean
1077 @param replicate: whether to spread the changes to the remote nodes
1080 utils.WriteFile(file_name, data=data)
1083 names, addrs = self._GetNodeIp()
1084 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1085 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1087 def _RenameFilesUnlocked(self, rename):
1088 """Renames a file locally and then replicate the change.
1090 This function will rename a file in the local queue directory
1091 and then replicate this rename to all the other nodes we have.
1093 @type rename: list of (old, new)
1094 @param rename: List containing tuples mapping old to new names
1097 # Rename them locally
1098 for old, new in rename:
1099 utils.RenameFile(old, new, mkdir=True)
1101 # ... and on all nodes
1102 names, addrs = self._GetNodeIp()
1103 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1104 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1107 def _FormatJobID(job_id):
1108 """Convert a job ID to string format.
1110 Currently this just does C{str(job_id)} after performing some
1111 checks, but if we want to change the job id format this will
1112 abstract this change.
1114 @type job_id: int or long
1115 @param job_id: the numeric job id
1117 @return: the formatted job id
1120 if not isinstance(job_id, (int, long)):
1121 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1123 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1128 def _GetArchiveDirectory(cls, job_id):
1129 """Returns the archive directory for a job.
1132 @param job_id: Job identifier
1134 @return: Directory name
1137 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1139 def _NewSerialsUnlocked(self, count):
1140 """Generates a new job identifier.
1142 Job identifiers are unique during the lifetime of a cluster.
1144 @type count: integer
1145 @param count: how many serials to return
1147 @return: a string representing the job identifier.
1152 serial = self._last_serial + count
1155 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1156 "%s\n" % serial, True)
1158 result = [self._FormatJobID(v)
1159 for v in range(self._last_serial, serial + 1)]
1160 # Keep it only if we were able to write the file
1161 self._last_serial = serial
1166 def _GetJobPath(job_id):
1167 """Returns the job file for a given job id.
1170 @param job_id: the job identifier
1172 @return: the path to the job file
1175 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1178 def _GetArchivedJobPath(cls, job_id):
1179 """Returns the archived job file for a give job id.
1182 @param job_id: the job identifier
1184 @return: the path to the archived job file
1187 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1188 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1190 def _GetJobIDsUnlocked(self, sort=True):
1191 """Return all known job IDs.
1193 The method only looks at disk because it's a requirement that all
1194 jobs are present on disk (so in the _memcache we don't have any
1198 @param sort: perform sorting on the returned job ids
1200 @return: the list of job IDs
1204 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1205 m = self._RE_JOB_FILE.match(filename)
1207 jlist.append(m.group(1))
1209 jlist = utils.NiceSort(jlist)
1212 def _LoadJobUnlocked(self, job_id):
1213 """Loads a job from the disk or memory.
1215 Given a job id, this will return the cached job object if
1216 existing, or try to load the job from the disk. If loading from
1217 disk, it will also add the job to the cache.
1219 @param job_id: the job id
1220 @rtype: L{_QueuedJob} or None
1221 @return: either None or the job object
1224 job = self._memcache.get(job_id, None)
1226 logging.debug("Found job %s in memcache", job_id)
1230 job = self._LoadJobFromDisk(job_id)
1233 except errors.JobFileCorrupted:
1234 old_path = self._GetJobPath(job_id)
1235 new_path = self._GetArchivedJobPath(job_id)
1236 if old_path == new_path:
1237 # job already archived (future case)
1238 logging.exception("Can't parse job %s", job_id)
1241 logging.exception("Can't parse job %s, will archive.", job_id)
1242 self._RenameFilesUnlocked([(old_path, new_path)])
1245 self._memcache[job_id] = job
1246 logging.debug("Added job %s to the cache", job_id)
1249 def _LoadJobFromDisk(self, job_id):
1250 """Load the given job file from disk.
1252 Given a job file, read, load and restore it in a _QueuedJob format.
1254 @type job_id: string
1255 @param job_id: job identifier
1256 @rtype: L{_QueuedJob} or None
1257 @return: either None or the job object
1260 filepath = self._GetJobPath(job_id)
1261 logging.debug("Loading job from %s", filepath)
1263 raw_data = utils.ReadFile(filepath)
1264 except EnvironmentError, err:
1265 if err.errno in (errno.ENOENT, ):
1270 data = serializer.LoadJson(raw_data)
1271 job = _QueuedJob.Restore(self, data)
1272 except Exception, err: # pylint: disable-msg=W0703
1273 raise errors.JobFileCorrupted(err)
1277 def SafeLoadJobFromDisk(self, job_id):
1278 """Load the given job file from disk.
1280 Given a job file, read, load and restore it in a _QueuedJob format.
1281 In case of error reading the job, it gets returned as None, and the
1282 exception is logged.
1284 @type job_id: string
1285 @param job_id: job identifier
1286 @rtype: L{_QueuedJob} or None
1287 @return: either None or the job object
1291 return self._LoadJobFromDisk(job_id)
1292 except (errors.JobFileCorrupted, EnvironmentError):
1293 logging.exception("Can't load/parse job %s", job_id)
1297 def _IsQueueMarkedDrain():
1298 """Check if the queue is marked from drain.
1300 This currently uses the queue drain file, which makes it a
1301 per-node flag. In the future this can be moved to the config file.
1304 @return: True of the job queue is marked for draining
1307 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1309 def _UpdateQueueSizeUnlocked(self):
1310 """Update the queue size.
1313 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1315 @locking.ssynchronized(_LOCK)
1317 def SetDrainFlag(self, drain_flag):
1318 """Sets the drain flag for the queue.
1320 @type drain_flag: boolean
1321 @param drain_flag: Whether to set or unset the drain flag
1325 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1327 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1329 self._drained = drain_flag
1334 def _SubmitJobUnlocked(self, job_id, ops):
1335 """Create and store a new job.
1337 This enters the job into our job queue and also puts it on the new
1338 queue, in order for it to be picked up by the queue processors.
1340 @type job_id: job ID
1341 @param job_id: the job ID for the new job
1343 @param ops: The list of OpCodes that will become the new job.
1344 @rtype: L{_QueuedJob}
1345 @return: the job object to be queued
1346 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1347 @raise errors.JobQueueFull: if the job queue has too many jobs in it
1350 # Ok when sharing the big job queue lock, as the drain file is created when
1351 # the lock is exclusive.
1353 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1355 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1356 raise errors.JobQueueFull()
1358 job = _QueuedJob(self, job_id, ops)
1361 self.UpdateJobUnlocked(job)
1363 self._queue_size += 1
1365 logging.debug("Adding new job %s to the cache", job_id)
1366 self._memcache[job_id] = job
1370 @locking.ssynchronized(_LOCK)
1372 def SubmitJob(self, ops):
1373 """Create and store a new job.
1375 @see: L{_SubmitJobUnlocked}
1378 job_id = self._NewSerialsUnlocked(1)[0]
1379 self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
1382 @locking.ssynchronized(_LOCK)
1384 def SubmitManyJobs(self, jobs):
1385 """Create and store multiple jobs.
1387 @see: L{_SubmitJobUnlocked}
1392 all_job_ids = self._NewSerialsUnlocked(len(jobs))
1393 for job_id, ops in zip(all_job_ids, jobs):
1395 tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1398 except errors.GenericError, err:
1401 results.append((status, data))
1402 self._wpool.AddManyTasks(tasks)
1407 def UpdateJobUnlocked(self, job, replicate=True):
1408 """Update a job's on disk storage.
1410 After a job has been modified, this function needs to be called in
1411 order to write the changes to disk and replicate them to the other
1414 @type job: L{_QueuedJob}
1415 @param job: the changed job
1416 @type replicate: boolean
1417 @param replicate: whether to replicate the change to remote nodes
1420 filename = self._GetJobPath(job.id)
1421 data = serializer.DumpJson(job.Serialize(), indent=False)
1422 logging.debug("Writing job %s to %s", job.id, filename)
1423 self._UpdateJobQueueFile(filename, data, replicate)
1425 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1427 """Waits for changes in a job.
1429 @type job_id: string
1430 @param job_id: Job identifier
1431 @type fields: list of strings
1432 @param fields: Which fields to check for changes
1433 @type prev_job_info: list or None
1434 @param prev_job_info: Last job information returned
1435 @type prev_log_serial: int
1436 @param prev_log_serial: Last job message serial number
1437 @type timeout: float
1438 @param timeout: maximum time to wait in seconds
1439 @rtype: tuple (job info, log entries)
1440 @return: a tuple of the job information as required via
1441 the fields parameter, and the log entries as a list
1443 if the job has not changed and the timeout has expired,
1444 we instead return a special value,
1445 L{constants.JOB_NOTCHANGED}, which should be interpreted
1446 as such by the clients
1449 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1451 helper = _WaitForJobChangesHelper()
1453 return helper(self._GetJobPath(job_id), load_fn,
1454 fields, prev_job_info, prev_log_serial, timeout)
1456 @locking.ssynchronized(_LOCK)
1458 def CancelJob(self, job_id):
1461 This will only succeed if the job has not started yet.
1463 @type job_id: string
1464 @param job_id: job ID of job to be cancelled.
1467 logging.info("Cancelling job %s", job_id)
1469 job = self._LoadJobUnlocked(job_id)
1471 logging.debug("Job %s not found", job_id)
1472 return (False, "Job %s not found" % job_id)
1474 job_status = job.CalcStatus()
1476 if job_status not in (constants.JOB_STATUS_QUEUED,
1477 constants.JOB_STATUS_WAITLOCK):
1478 logging.debug("Job %s is no longer waiting in the queue", job.id)
1479 return (False, "Job %s is no longer waiting in the queue" % job.id)
1481 if job_status == constants.JOB_STATUS_QUEUED:
1482 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1483 "Job canceled by request")
1484 return (True, "Job %s canceled" % job.id)
1486 elif job_status == constants.JOB_STATUS_WAITLOCK:
1487 # The worker will notice the new status and cancel the job
1488 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1489 return (True, "Job %s will be canceled" % job.id)
1492 def _ArchiveJobsUnlocked(self, jobs):
1495 @type jobs: list of L{_QueuedJob}
1496 @param jobs: Job objects
1498 @return: Number of archived jobs
1504 if job.CalcStatus() not in constants.JOBS_FINALIZED:
1505 logging.debug("Job %s is not yet done", job.id)
1508 archive_jobs.append(job)
1510 old = self._GetJobPath(job.id)
1511 new = self._GetArchivedJobPath(job.id)
1512 rename_files.append((old, new))
1514 # TODO: What if 1..n files fail to rename?
1515 self._RenameFilesUnlocked(rename_files)
1517 logging.debug("Successfully archived job(s) %s",
1518 utils.CommaJoin(job.id for job in archive_jobs))
1520 # Since we haven't quite checked, above, if we succeeded or failed renaming
1521 # the files, we update the cached queue size from the filesystem. When we
1522 # get around to fix the TODO: above, we can use the number of actually
1523 # archived jobs to fix this.
1524 self._UpdateQueueSizeUnlocked()
1525 return len(archive_jobs)
1527 @locking.ssynchronized(_LOCK)
1529 def ArchiveJob(self, job_id):
1532 This is just a wrapper over L{_ArchiveJobsUnlocked}.
1534 @type job_id: string
1535 @param job_id: Job ID of job to be archived.
1537 @return: Whether job was archived
1540 logging.info("Archiving job %s", job_id)
1542 job = self._LoadJobUnlocked(job_id)
1544 logging.debug("Job %s not found", job_id)
1547 return self._ArchiveJobsUnlocked([job]) == 1
1549 @locking.ssynchronized(_LOCK)
1551 def AutoArchiveJobs(self, age, timeout):
1552 """Archives all jobs based on age.
1554 The method will archive all jobs which are older than the age
1555 parameter. For jobs that don't have an end timestamp, the start
1556 timestamp will be considered. The special '-1' age will cause
1557 archival of all jobs (that are not running or queued).
1560 @param age: the minimum age in seconds
1563 logging.info("Archiving jobs with age more than %s seconds", age)
1566 end_time = now + timeout
1570 all_job_ids = self._GetJobIDsUnlocked()
1572 for idx, job_id in enumerate(all_job_ids):
1573 last_touched = idx + 1
1575 # Not optimal because jobs could be pending
1576 # TODO: Measure average duration for job archival and take number of
1577 # pending jobs into account.
1578 if time.time() > end_time:
1581 # Returns None if the job failed to load
1582 job = self._LoadJobUnlocked(job_id)
1584 if job.end_timestamp is None:
1585 if job.start_timestamp is None:
1586 job_age = job.received_timestamp
1588 job_age = job.start_timestamp
1590 job_age = job.end_timestamp
1592 if age == -1 or now - job_age[0] > age:
1595 # Archive 10 jobs at a time
1596 if len(pending) >= 10:
1597 archived_count += self._ArchiveJobsUnlocked(pending)
1601 archived_count += self._ArchiveJobsUnlocked(pending)
1603 return (archived_count, len(all_job_ids) - last_touched)
1605 def QueryJobs(self, job_ids, fields):
1606 """Returns a list of jobs in queue.
1609 @param job_ids: sequence of job identifiers or None for all
1611 @param fields: names of fields to return
1613 @return: list one element per job, each element being list with
1614 the requested fields
1620 # Since files are added to/removed from the queue atomically, there's no
1621 # risk of getting the job ids in an inconsistent state.
1622 job_ids = self._GetJobIDsUnlocked()
1625 for job_id in job_ids:
1626 job = self.SafeLoadJobFromDisk(job_id)
1628 jobs.append(job.GetInfo(fields))
1634 @locking.ssynchronized(_LOCK)
1637 """Stops the job queue.
1639 This shutdowns all the worker threads an closes the queue.
1642 self._wpool.TerminateWorkers()
1644 self._queue_filelock.Close()
1645 self._queue_filelock = None