4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
40 # pylint: disable-msg=E0611
41 from pyinotify import pyinotify
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import netutils
57 from ganeti import compat
61 JOBS_PER_ARCHIVE_DIRECTORY = 10000
63 # member lock names to be passed to @ssynchronized decorator
68 class CancelJob(Exception):
69 """Special exception to cancel a job.
75 """Returns the current timestamp.
78 @return: the current time in the (seconds, microseconds) format
81 return utils.SplitTime(time.time())
84 class _QueuedOpCode(object):
85 """Encapsulates an opcode object.
87 @ivar log: holds the execution log and consists of tuples
88 of the form C{(log_serial, timestamp, level, message)}
89 @ivar input: the OpCode we encapsulate
90 @ivar status: the current status
91 @ivar result: the result of the LU execution
92 @ivar start_timestamp: timestamp for the start of the execution
93 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94 @ivar stop_timestamp: timestamp for the end of the execution
97 __slots__ = ["input", "status", "result", "log",
98 "start_timestamp", "exec_timestamp", "end_timestamp",
101 def __init__(self, op):
102 """Constructor for the _QuededOpCode.
104 @type op: L{opcodes.OpCode}
105 @param op: the opcode we encapsulate
109 self.status = constants.OP_STATUS_QUEUED
112 self.start_timestamp = None
113 self.exec_timestamp = None
114 self.end_timestamp = None
117 def Restore(cls, state):
118 """Restore the _QueuedOpCode from the serialized form.
121 @param state: the serialized state
122 @rtype: _QueuedOpCode
123 @return: a new _QueuedOpCode instance
126 obj = _QueuedOpCode.__new__(cls)
127 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
128 obj.status = state["status"]
129 obj.result = state["result"]
130 obj.log = state["log"]
131 obj.start_timestamp = state.get("start_timestamp", None)
132 obj.exec_timestamp = state.get("exec_timestamp", None)
133 obj.end_timestamp = state.get("end_timestamp", None)
137 """Serializes this _QueuedOpCode.
140 @return: the dictionary holding the serialized state
144 "input": self.input.__getstate__(),
145 "status": self.status,
146 "result": self.result,
148 "start_timestamp": self.start_timestamp,
149 "exec_timestamp": self.exec_timestamp,
150 "end_timestamp": self.end_timestamp,
154 class _QueuedJob(object):
155 """In-memory job representation.
157 This is what we use to track the user-submitted jobs. Locking must
158 be taken care of by users of this class.
160 @type queue: L{JobQueue}
161 @ivar queue: the parent queue
164 @ivar ops: the list of _QueuedOpCode that constitute the job
165 @type log_serial: int
166 @ivar log_serial: holds the index for the next log entry
167 @ivar received_timestamp: the timestamp for when the job was received
168 @ivar start_timestmap: the timestamp for start of execution
169 @ivar end_timestamp: the timestamp for end of execution
172 # pylint: disable-msg=W0212
173 __slots__ = ["queue", "id", "ops", "log_serial",
174 "received_timestamp", "start_timestamp", "end_timestamp",
177 def __init__(self, queue, job_id, ops):
178 """Constructor for the _QueuedJob.
180 @type queue: L{JobQueue}
181 @param queue: our parent queue
183 @param job_id: our job id
185 @param ops: the list of opcodes we hold, which will be encapsulated
190 raise errors.GenericError("A job needs at least one opcode")
194 self.ops = [_QueuedOpCode(op) for op in ops]
196 self.received_timestamp = TimeStampNow()
197 self.start_timestamp = None
198 self.end_timestamp = None
201 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
203 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
205 return "<%s at %#x>" % (" ".join(status), id(self))
208 def Restore(cls, queue, state):
209 """Restore a _QueuedJob from serialized state:
211 @type queue: L{JobQueue}
212 @param queue: to which queue the restored job belongs
214 @param state: the serialized state
216 @return: the restored _JobQueue instance
219 obj = _QueuedJob.__new__(cls)
222 obj.received_timestamp = state.get("received_timestamp", None)
223 obj.start_timestamp = state.get("start_timestamp", None)
224 obj.end_timestamp = state.get("end_timestamp", None)
228 for op_state in state["ops"]:
229 op = _QueuedOpCode.Restore(op_state)
230 for log_entry in op.log:
231 obj.log_serial = max(obj.log_serial, log_entry[0])
237 """Serialize the _JobQueue instance.
240 @return: the serialized state
245 "ops": [op.Serialize() for op in self.ops],
246 "start_timestamp": self.start_timestamp,
247 "end_timestamp": self.end_timestamp,
248 "received_timestamp": self.received_timestamp,
251 def CalcStatus(self):
252 """Compute the status of this job.
254 This function iterates over all the _QueuedOpCodes in the job and
255 based on their status, computes the job status.
258 - if we find a cancelled, or finished with error, the job
259 status will be the same
260 - otherwise, the last opcode with the status one of:
265 will determine the job status
267 - otherwise, it means either all opcodes are queued, or success,
268 and the job status will be the same
270 @return: the job status
273 status = constants.JOB_STATUS_QUEUED
277 if op.status == constants.OP_STATUS_SUCCESS:
282 if op.status == constants.OP_STATUS_QUEUED:
284 elif op.status == constants.OP_STATUS_WAITLOCK:
285 status = constants.JOB_STATUS_WAITLOCK
286 elif op.status == constants.OP_STATUS_RUNNING:
287 status = constants.JOB_STATUS_RUNNING
288 elif op.status == constants.OP_STATUS_CANCELING:
289 status = constants.JOB_STATUS_CANCELING
291 elif op.status == constants.OP_STATUS_ERROR:
292 status = constants.JOB_STATUS_ERROR
293 # The whole job fails if one opcode failed
295 elif op.status == constants.OP_STATUS_CANCELED:
296 status = constants.OP_STATUS_CANCELED
300 status = constants.JOB_STATUS_SUCCESS
304 def GetLogEntries(self, newer_than):
305 """Selectively returns the log entries.
307 @type newer_than: None or int
308 @param newer_than: if this is None, return all log entries,
309 otherwise return only the log entries with serial higher
312 @return: the list of the log entries selected
315 if newer_than is None:
322 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
326 def GetInfo(self, fields):
327 """Returns information about a job.
330 @param fields: names of fields to return
332 @return: list with one element for each field
333 @raise errors.OpExecError: when an invalid field
341 elif fname == "status":
342 row.append(self.CalcStatus())
344 row.append([op.input.__getstate__() for op in self.ops])
345 elif fname == "opresult":
346 row.append([op.result for op in self.ops])
347 elif fname == "opstatus":
348 row.append([op.status for op in self.ops])
349 elif fname == "oplog":
350 row.append([op.log for op in self.ops])
351 elif fname == "opstart":
352 row.append([op.start_timestamp for op in self.ops])
353 elif fname == "opexec":
354 row.append([op.exec_timestamp for op in self.ops])
355 elif fname == "opend":
356 row.append([op.end_timestamp for op in self.ops])
357 elif fname == "received_ts":
358 row.append(self.received_timestamp)
359 elif fname == "start_ts":
360 row.append(self.start_timestamp)
361 elif fname == "end_ts":
362 row.append(self.end_timestamp)
363 elif fname == "summary":
364 row.append([op.input.Summary() for op in self.ops])
366 raise errors.OpExecError("Invalid self query field '%s'" % fname)
369 def MarkUnfinishedOps(self, status, result):
370 """Mark unfinished opcodes with a given status and result.
372 This is an utility function for marking all running or waiting to
373 be run opcodes with a given status. Opcodes which are already
374 finalised are not changed.
376 @param status: a given opcode status
377 @param result: the opcode result
383 if op.status in constants.OPS_FINALIZED:
384 assert not_marked, "Finalized opcodes found after non-finalized ones"
390 self.queue.UpdateJobUnlocked(self)
393 class _OpExecCallbacks(mcpu.OpExecCbBase):
394 def __init__(self, queue, job, op):
395 """Initializes this class.
397 @type queue: L{JobQueue}
398 @param queue: Job queue
399 @type job: L{_QueuedJob}
400 @param job: Job object
401 @type op: L{_QueuedOpCode}
405 assert queue, "Queue is missing"
406 assert job, "Job is missing"
407 assert op, "Opcode is missing"
413 def _CheckCancel(self):
414 """Raises an exception to cancel the job if asked to.
417 # Cancel here if we were asked to
418 if self._op.status == constants.OP_STATUS_CANCELING:
419 logging.debug("Canceling opcode")
422 @locking.ssynchronized(_QUEUE, shared=1)
423 def NotifyStart(self):
424 """Mark the opcode as running, not lock-waiting.
426 This is called from the mcpu code as a notifier function, when the LU is
427 finally about to start the Exec() method. Of course, to have end-user
428 visible results, the opcode must be initially (before calling into
429 Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
432 assert self._op in self._job.ops
433 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
434 constants.OP_STATUS_CANCELING)
436 # Cancel here if we were asked to
439 logging.debug("Opcode is now running")
441 self._op.status = constants.OP_STATUS_RUNNING
442 self._op.exec_timestamp = TimeStampNow()
444 # And finally replicate the job status
445 self._queue.UpdateJobUnlocked(self._job)
447 @locking.ssynchronized(_QUEUE, shared=1)
448 def _AppendFeedback(self, timestamp, log_type, log_msg):
449 """Internal feedback append function, with locks
452 self._job.log_serial += 1
453 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
454 self._queue.UpdateJobUnlocked(self._job, replicate=False)
456 def Feedback(self, *args):
457 """Append a log entry.
463 log_type = constants.ELOG_MESSAGE
466 (log_type, log_msg) = args
468 # The time is split to make serialization easier and not lose
470 timestamp = utils.SplitTime(time.time())
471 self._AppendFeedback(timestamp, log_type, log_msg)
473 def ReportLocks(self, msg):
474 """Write locking information to the job.
476 Called whenever the LU processor is waiting for a lock or has acquired one.
479 assert self._op.status in (constants.OP_STATUS_WAITLOCK,
480 constants.OP_STATUS_CANCELING)
482 # Cancel here if we were asked to
486 class _JobChangesChecker(object):
487 def __init__(self, fields, prev_job_info, prev_log_serial):
488 """Initializes this class.
490 @type fields: list of strings
491 @param fields: Fields requested by LUXI client
492 @type prev_job_info: string
493 @param prev_job_info: previous job info, as passed by the LUXI client
494 @type prev_log_serial: string
495 @param prev_log_serial: previous job serial, as passed by the LUXI client
498 self._fields = fields
499 self._prev_job_info = prev_job_info
500 self._prev_log_serial = prev_log_serial
502 def __call__(self, job):
503 """Checks whether job has changed.
505 @type job: L{_QueuedJob}
506 @param job: Job object
509 status = job.CalcStatus()
510 job_info = job.GetInfo(self._fields)
511 log_entries = job.GetLogEntries(self._prev_log_serial)
513 # Serializing and deserializing data can cause type changes (e.g. from
514 # tuple to list) or precision loss. We're doing it here so that we get
515 # the same modifications as the data received from the client. Without
516 # this, the comparison afterwards might fail without the data being
517 # significantly different.
518 # TODO: we just deserialized from disk, investigate how to make sure that
519 # the job info and log entries are compatible to avoid this further step.
520 # TODO: Doing something like in testutils.py:UnifyValueType might be more
521 # efficient, though floats will be tricky
522 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
523 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
525 # Don't even try to wait if the job is no longer running, there will be
527 if (status not in (constants.JOB_STATUS_QUEUED,
528 constants.JOB_STATUS_RUNNING,
529 constants.JOB_STATUS_WAITLOCK) or
530 job_info != self._prev_job_info or
531 (log_entries and self._prev_log_serial != log_entries[0][0])):
532 logging.debug("Job %s changed", job.id)
533 return (job_info, log_entries)
538 class _JobFileChangesWaiter(object):
539 def __init__(self, filename):
540 """Initializes this class.
542 @type filename: string
543 @param filename: Path to job file
544 @raises errors.InotifyError: if the notifier cannot be setup
547 self._wm = pyinotify.WatchManager()
548 self._inotify_handler = \
549 asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
551 pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
553 self._inotify_handler.enable()
555 # pyinotify doesn't close file descriptors automatically
556 self._notifier.stop()
559 def _OnInotify(self, notifier_enabled):
560 """Callback for inotify.
563 if not notifier_enabled:
564 self._inotify_handler.enable()
566 def Wait(self, timeout):
567 """Waits for the job file to change.
570 @param timeout: Timeout in seconds
571 @return: Whether there have been events
575 have_events = self._notifier.check_events(timeout * 1000)
577 self._notifier.read_events()
578 self._notifier.process_events()
582 """Closes underlying notifier and its file descriptor.
585 self._notifier.stop()
588 class _JobChangesWaiter(object):
589 def __init__(self, filename):
590 """Initializes this class.
592 @type filename: string
593 @param filename: Path to job file
596 self._filewaiter = None
597 self._filename = filename
599 def Wait(self, timeout):
600 """Waits for a job to change.
603 @param timeout: Timeout in seconds
604 @return: Whether there have been events
608 return self._filewaiter.Wait(timeout)
610 # Lazy setup: Avoid inotify setup cost when job file has already changed.
611 # If this point is reached, return immediately and let caller check the job
612 # file again in case there were changes since the last check. This avoids a
614 self._filewaiter = _JobFileChangesWaiter(self._filename)
619 """Closes underlying waiter.
623 self._filewaiter.Close()
626 class _WaitForJobChangesHelper(object):
627 """Helper class using inotify to wait for changes in a job file.
629 This class takes a previous job status and serial, and alerts the client when
630 the current job status has changed.
634 def _CheckForChanges(job_load_fn, check_fn):
637 raise errors.JobLost()
639 result = check_fn(job)
641 raise utils.RetryAgain()
645 def __call__(self, filename, job_load_fn,
646 fields, prev_job_info, prev_log_serial, timeout):
647 """Waits for changes on a job.
649 @type filename: string
650 @param filename: File on which to wait for changes
651 @type job_load_fn: callable
652 @param job_load_fn: Function to load job
653 @type fields: list of strings
654 @param fields: Which fields to check for changes
655 @type prev_job_info: list or None
656 @param prev_job_info: Last job information returned
657 @type prev_log_serial: int
658 @param prev_log_serial: Last job message serial number
660 @param timeout: maximum time to wait in seconds
664 check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
665 waiter = _JobChangesWaiter(filename)
667 return utils.Retry(compat.partial(self._CheckForChanges,
668 job_load_fn, check_fn),
669 utils.RETRY_REMAINING_TIME, timeout,
673 except (errors.InotifyError, errors.JobLost):
675 except utils.RetryTimeout:
676 return constants.JOB_NOTCHANGED
679 def _EncodeOpError(err):
680 """Encodes an error which occurred while processing an opcode.
683 if isinstance(err, errors.GenericError):
686 to_encode = errors.OpExecError(str(err))
688 return errors.EncodeException(to_encode)
691 class _JobQueueWorker(workerpool.BaseWorker):
692 """The actual job workers.
695 def RunTask(self, job): # pylint: disable-msg=W0221
698 This functions processes a job. It is closely tied to the _QueuedJob and
699 _QueuedOpCode classes.
701 @type job: L{_QueuedJob}
702 @param job: the job to be processed
705 self.SetTaskName("Job%s" % job.id)
707 logging.info("Processing job %s", job.id)
708 proc = mcpu.Processor(self.pool.queue.context, job.id)
713 for idx, op in enumerate(job.ops):
714 op_summary = op.input.Summary()
715 if op.status == constants.OP_STATUS_SUCCESS:
716 # this is a job that was partially completed before master
717 # daemon shutdown, so it can be expected that some opcodes
718 # are already completed successfully (if any did error
719 # out, then the whole job should have been aborted and not
720 # resubmitted for processing)
721 logging.info("Op %s/%s: opcode %s already processed, skipping",
722 idx + 1, count, op_summary)
725 logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
728 queue.acquire(shared=1)
730 if op.status == constants.OP_STATUS_CANCELED:
731 logging.debug("Canceling opcode")
733 assert op.status == constants.OP_STATUS_QUEUED
734 logging.debug("Opcode %s/%s waiting for locks",
736 op.status = constants.OP_STATUS_WAITLOCK
738 op.start_timestamp = TimeStampNow()
739 if idx == 0: # first opcode
740 job.start_timestamp = op.start_timestamp
741 queue.UpdateJobUnlocked(job)
743 input_opcode = op.input
747 # Make sure not to hold queue lock while calling ExecOpCode
748 result = proc.ExecOpCode(input_opcode,
749 _OpExecCallbacks(queue, job, op))
751 queue.acquire(shared=1)
753 logging.debug("Opcode %s/%s succeeded", idx + 1, count)
754 op.status = constants.OP_STATUS_SUCCESS
756 op.end_timestamp = TimeStampNow()
758 job.end_timestamp = TimeStampNow()
761 assert compat.all(i.status == constants.OP_STATUS_SUCCESS
764 queue.UpdateJobUnlocked(job)
768 logging.info("Op %s/%s: Successfully finished opcode %s",
769 idx + 1, count, op_summary)
771 # Will be handled further up
773 except Exception, err:
774 queue.acquire(shared=1)
777 logging.debug("Opcode %s/%s failed", idx + 1, count)
778 op.status = constants.OP_STATUS_ERROR
779 op.result = _EncodeOpError(err)
780 op.end_timestamp = TimeStampNow()
781 logging.info("Op %s/%s: Error in opcode %s: %s",
782 idx + 1, count, op_summary, err)
784 to_encode = errors.OpExecError("Preceding opcode failed")
785 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
786 _EncodeOpError(to_encode))
789 assert compat.all(i.status == constants.OP_STATUS_SUCCESS
790 for i in job.ops[:idx])
791 assert compat.all(i.status == constants.OP_STATUS_ERROR and
792 errors.GetEncodedError(i.result)
793 for i in job.ops[idx:])
795 job.end_timestamp = TimeStampNow()
796 queue.UpdateJobUnlocked(job)
802 queue.acquire(shared=1)
804 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
805 "Job canceled by request")
806 job.end_timestamp = TimeStampNow()
807 queue.UpdateJobUnlocked(job)
810 except errors.GenericError, err:
811 logging.exception("Ganeti exception")
813 logging.exception("Unhandled exception")
815 status = job.CalcStatus()
816 logging.info("Finished job %s, status = %s", job.id, status)
819 class _JobQueueWorkerPool(workerpool.WorkerPool):
820 """Simple class implementing a job-processing workerpool.
823 def __init__(self, queue):
824 super(_JobQueueWorkerPool, self).__init__("JobQueue",
830 def _RequireOpenQueue(fn):
831 """Decorator for "public" functions.
833 This function should be used for all 'public' functions. That is,
834 functions usually called from other classes. Note that this should
835 be applied only to methods (not plain functions), since it expects
836 that the decorated function is called with a first argument that has
837 a '_queue_filelock' argument.
839 @warning: Use this decorator only after locking.ssynchronized
842 @locking.ssynchronized(_LOCK)
848 def wrapper(self, *args, **kwargs):
849 # pylint: disable-msg=W0212
850 assert self._queue_filelock is not None, "Queue should be open"
851 return fn(self, *args, **kwargs)
855 class JobQueue(object):
856 """Queue used to manage the jobs.
858 @cvar _RE_JOB_FILE: regex matching the valid job file names
861 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
863 def __init__(self, context):
864 """Constructor for JobQueue.
866 The constructor will initialize the job queue object and then
867 start loading the current jobs from disk, either for starting them
868 (if they were queue) or for aborting them (if they were already
871 @type context: GanetiContext
872 @param context: the context object for access to the configuration
873 data and other ganeti objects
876 self.context = context
877 self._memcache = weakref.WeakValueDictionary()
878 self._my_hostname = netutils.HostInfo().name
880 # The Big JobQueue lock. If a code block or method acquires it in shared
881 # mode safe it must guarantee concurrency with all the code acquiring it in
882 # shared mode, including itself. In order not to acquire it at all
883 # concurrency must be guaranteed with all code acquiring it in shared mode
884 # and all code acquiring it exclusively.
885 self._lock = locking.SharedLock("JobQueue")
887 self.acquire = self._lock.acquire
888 self.release = self._lock.release
890 # Initialize the queue, and acquire the filelock.
891 # This ensures no other process is working on the job queue.
892 self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
895 self._last_serial = jstore.ReadSerial()
896 assert self._last_serial is not None, ("Serial file was modified between"
897 " check in jstore and here")
899 # Get initial list of nodes
900 self._nodes = dict((n.name, n.primary_ip)
901 for n in self.context.cfg.GetAllNodesInfo().values()
902 if n.master_candidate)
905 self._nodes.pop(self._my_hostname, None)
907 # TODO: Check consistency across nodes
910 self._UpdateQueueSizeUnlocked()
911 self._drained = self._IsQueueMarkedDrain()
914 self._wpool = _JobQueueWorkerPool(self)
916 # We need to lock here because WorkerPool.AddTask() may start a job while
917 # we're still doing our work.
920 logging.info("Inspecting job queue")
922 all_job_ids = self._GetJobIDsUnlocked()
923 jobs_count = len(all_job_ids)
924 lastinfo = time.time()
925 for idx, job_id in enumerate(all_job_ids):
926 # Give an update every 1000 jobs or 10 seconds
927 if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
928 idx == (jobs_count - 1)):
929 logging.info("Job queue inspection: %d/%d (%0.1f %%)",
930 idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
931 lastinfo = time.time()
933 job = self._LoadJobUnlocked(job_id)
935 # a failure in loading the job can cause 'None' to be returned
939 status = job.CalcStatus()
941 if status in (constants.JOB_STATUS_QUEUED, ):
942 self._wpool.AddTask((job, ))
944 elif status in (constants.JOB_STATUS_RUNNING,
945 constants.JOB_STATUS_WAITLOCK,
946 constants.JOB_STATUS_CANCELING):
947 logging.warning("Unfinished job %s found: %s", job.id, job)
948 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
949 "Unclean master daemon shutdown")
951 logging.info("Job queue inspection finished")
955 self._wpool.TerminateWorkers()
958 @locking.ssynchronized(_LOCK)
960 def AddNode(self, node):
961 """Register a new node with the queue.
963 @type node: L{objects.Node}
964 @param node: the node object to be added
967 node_name = node.name
968 assert node_name != self._my_hostname
970 # Clean queue directory on added node
971 result = rpc.RpcRunner.call_jobqueue_purge(node_name)
972 msg = result.fail_msg
974 logging.warning("Cannot cleanup queue directory on node %s: %s",
977 if not node.master_candidate:
978 # remove if existing, ignoring errors
979 self._nodes.pop(node_name, None)
980 # and skip the replication of the job ids
983 # Upload the whole queue excluding archived jobs
984 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
986 # Upload current serial file
987 files.append(constants.JOB_QUEUE_SERIAL_FILE)
989 for file_name in files:
991 content = utils.ReadFile(file_name)
993 result = rpc.RpcRunner.call_jobqueue_update([node_name],
996 msg = result[node_name].fail_msg
998 logging.error("Failed to upload file %s to node %s: %s",
999 file_name, node_name, msg)
1001 self._nodes[node_name] = node.primary_ip
1003 @locking.ssynchronized(_LOCK)
1005 def RemoveNode(self, node_name):
1006 """Callback called when removing nodes from the cluster.
1008 @type node_name: str
1009 @param node_name: the name of the node to remove
1012 self._nodes.pop(node_name, None)
1015 def _CheckRpcResult(result, nodes, failmsg):
1016 """Verifies the status of an RPC call.
1018 Since we aim to keep consistency should this node (the current
1019 master) fail, we will log errors if our rpc fail, and especially
1020 log the case when more than half of the nodes fails.
1022 @param result: the data as returned from the rpc call
1024 @param nodes: the list of nodes we made the call to
1026 @param failmsg: the identifier to be used for logging
1033 msg = result[node].fail_msg
1036 logging.error("RPC call %s (%s) failed on node %s: %s",
1037 result[node].call, failmsg, node, msg)
1039 success.append(node)
1041 # +1 for the master node
1042 if (len(success) + 1) < len(failed):
1043 # TODO: Handle failing nodes
1044 logging.error("More than half of the nodes failed")
1046 def _GetNodeIp(self):
1047 """Helper for returning the node name/ip list.
1049 @rtype: (list, list)
1050 @return: a tuple of two lists, the first one with the node
1051 names and the second one with the node addresses
1054 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1055 name_list = self._nodes.keys()
1056 addr_list = [self._nodes[name] for name in name_list]
1057 return name_list, addr_list
1059 def _UpdateJobQueueFile(self, file_name, data, replicate):
1060 """Writes a file locally and then replicates it to all nodes.
1062 This function will replace the contents of a file on the local
1063 node and then replicate it to all the other nodes we have.
1065 @type file_name: str
1066 @param file_name: the path of the file to be replicated
1068 @param data: the new contents of the file
1069 @type replicate: boolean
1070 @param replicate: whether to spread the changes to the remote nodes
1073 utils.WriteFile(file_name, data=data)
1076 names, addrs = self._GetNodeIp()
1077 result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1078 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1080 def _RenameFilesUnlocked(self, rename):
1081 """Renames a file locally and then replicate the change.
1083 This function will rename a file in the local queue directory
1084 and then replicate this rename to all the other nodes we have.
1086 @type rename: list of (old, new)
1087 @param rename: List containing tuples mapping old to new names
1090 # Rename them locally
1091 for old, new in rename:
1092 utils.RenameFile(old, new, mkdir=True)
1094 # ... and on all nodes
1095 names, addrs = self._GetNodeIp()
1096 result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1097 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1100 def _FormatJobID(job_id):
1101 """Convert a job ID to string format.
1103 Currently this just does C{str(job_id)} after performing some
1104 checks, but if we want to change the job id format this will
1105 abstract this change.
1107 @type job_id: int or long
1108 @param job_id: the numeric job id
1110 @return: the formatted job id
1113 if not isinstance(job_id, (int, long)):
1114 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1116 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1121 def _GetArchiveDirectory(cls, job_id):
1122 """Returns the archive directory for a job.
1125 @param job_id: Job identifier
1127 @return: Directory name
1130 return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1132 def _NewSerialsUnlocked(self, count):
1133 """Generates a new job identifier.
1135 Job identifiers are unique during the lifetime of a cluster.
1137 @type count: integer
1138 @param count: how many serials to return
1140 @return: a string representing the job identifier.
1145 serial = self._last_serial + count
1148 self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1149 "%s\n" % serial, True)
1151 result = [self._FormatJobID(v)
1152 for v in range(self._last_serial, serial + 1)]
1153 # Keep it only if we were able to write the file
1154 self._last_serial = serial
1159 def _GetJobPath(job_id):
1160 """Returns the job file for a given job id.
1163 @param job_id: the job identifier
1165 @return: the path to the job file
1168 return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1171 def _GetArchivedJobPath(cls, job_id):
1172 """Returns the archived job file for a give job id.
1175 @param job_id: the job identifier
1177 @return: the path to the archived job file
1180 return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1181 cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1183 def _GetJobIDsUnlocked(self, sort=True):
1184 """Return all known job IDs.
1186 The method only looks at disk because it's a requirement that all
1187 jobs are present on disk (so in the _memcache we don't have any
1191 @param sort: perform sorting on the returned job ids
1193 @return: the list of job IDs
1197 for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1198 m = self._RE_JOB_FILE.match(filename)
1200 jlist.append(m.group(1))
1202 jlist = utils.NiceSort(jlist)
1205 def _LoadJobUnlocked(self, job_id):
1206 """Loads a job from the disk or memory.
1208 Given a job id, this will return the cached job object if
1209 existing, or try to load the job from the disk. If loading from
1210 disk, it will also add the job to the cache.
1212 @param job_id: the job id
1213 @rtype: L{_QueuedJob} or None
1214 @return: either None or the job object
1217 job = self._memcache.get(job_id, None)
1219 logging.debug("Found job %s in memcache", job_id)
1223 job = self._LoadJobFromDisk(job_id)
1226 except errors.JobFileCorrupted:
1227 old_path = self._GetJobPath(job_id)
1228 new_path = self._GetArchivedJobPath(job_id)
1229 if old_path == new_path:
1230 # job already archived (future case)
1231 logging.exception("Can't parse job %s", job_id)
1234 logging.exception("Can't parse job %s, will archive.", job_id)
1235 self._RenameFilesUnlocked([(old_path, new_path)])
1238 self._memcache[job_id] = job
1239 logging.debug("Added job %s to the cache", job_id)
1242 def _LoadJobFromDisk(self, job_id):
1243 """Load the given job file from disk.
1245 Given a job file, read, load and restore it in a _QueuedJob format.
1247 @type job_id: string
1248 @param job_id: job identifier
1249 @rtype: L{_QueuedJob} or None
1250 @return: either None or the job object
1253 filepath = self._GetJobPath(job_id)
1254 logging.debug("Loading job from %s", filepath)
1256 raw_data = utils.ReadFile(filepath)
1257 except EnvironmentError, err:
1258 if err.errno in (errno.ENOENT, ):
1263 data = serializer.LoadJson(raw_data)
1264 job = _QueuedJob.Restore(self, data)
1265 except Exception, err: # pylint: disable-msg=W0703
1266 raise errors.JobFileCorrupted(err)
1270 def SafeLoadJobFromDisk(self, job_id):
1271 """Load the given job file from disk.
1273 Given a job file, read, load and restore it in a _QueuedJob format.
1274 In case of error reading the job, it gets returned as None, and the
1275 exception is logged.
1277 @type job_id: string
1278 @param job_id: job identifier
1279 @rtype: L{_QueuedJob} or None
1280 @return: either None or the job object
1284 return self._LoadJobFromDisk(job_id)
1285 except (errors.JobFileCorrupted, EnvironmentError):
1286 logging.exception("Can't load/parse job %s", job_id)
1290 def _IsQueueMarkedDrain():
1291 """Check if the queue is marked from drain.
1293 This currently uses the queue drain file, which makes it a
1294 per-node flag. In the future this can be moved to the config file.
1297 @return: True of the job queue is marked for draining
1300 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1302 def _UpdateQueueSizeUnlocked(self):
1303 """Update the queue size.
1306 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1308 @locking.ssynchronized(_LOCK)
1310 def SetDrainFlag(self, drain_flag):
1311 """Sets the drain flag for the queue.
1313 @type drain_flag: boolean
1314 @param drain_flag: Whether to set or unset the drain flag
1318 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1320 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1322 self._drained = drain_flag
1327 def _SubmitJobUnlocked(self, job_id, ops):
1328 """Create and store a new job.
1330 This enters the job into our job queue and also puts it on the new
1331 queue, in order for it to be picked up by the queue processors.
1333 @type job_id: job ID
1334 @param job_id: the job ID for the new job
1336 @param ops: The list of OpCodes that will become the new job.
1337 @rtype: L{_QueuedJob}
1338 @return: the job object to be queued
1339 @raise errors.JobQueueDrainError: if the job queue is marked for draining
1340 @raise errors.JobQueueFull: if the job queue has too many jobs in it
1343 # Ok when sharing the big job queue lock, as the drain file is created when
1344 # the lock is exclusive.
1346 raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1348 if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1349 raise errors.JobQueueFull()
1351 job = _QueuedJob(self, job_id, ops)
1354 self.UpdateJobUnlocked(job)
1356 self._queue_size += 1
1358 logging.debug("Adding new job %s to the cache", job_id)
1359 self._memcache[job_id] = job
1363 @locking.ssynchronized(_LOCK)
1365 def SubmitJob(self, ops):
1366 """Create and store a new job.
1368 @see: L{_SubmitJobUnlocked}
1371 job_id = self._NewSerialsUnlocked(1)[0]
1372 self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
1375 @locking.ssynchronized(_LOCK)
1377 def SubmitManyJobs(self, jobs):
1378 """Create and store multiple jobs.
1380 @see: L{_SubmitJobUnlocked}
1385 all_job_ids = self._NewSerialsUnlocked(len(jobs))
1386 for job_id, ops in zip(all_job_ids, jobs):
1388 tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1391 except errors.GenericError, err:
1394 results.append((status, data))
1395 self._wpool.AddManyTasks(tasks)
1400 def UpdateJobUnlocked(self, job, replicate=True):
1401 """Update a job's on disk storage.
1403 After a job has been modified, this function needs to be called in
1404 order to write the changes to disk and replicate them to the other
1407 @type job: L{_QueuedJob}
1408 @param job: the changed job
1409 @type replicate: boolean
1410 @param replicate: whether to replicate the change to remote nodes
1413 filename = self._GetJobPath(job.id)
1414 data = serializer.DumpJson(job.Serialize(), indent=False)
1415 logging.debug("Writing job %s to %s", job.id, filename)
1416 self._UpdateJobQueueFile(filename, data, replicate)
1418 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1420 """Waits for changes in a job.
1422 @type job_id: string
1423 @param job_id: Job identifier
1424 @type fields: list of strings
1425 @param fields: Which fields to check for changes
1426 @type prev_job_info: list or None
1427 @param prev_job_info: Last job information returned
1428 @type prev_log_serial: int
1429 @param prev_log_serial: Last job message serial number
1430 @type timeout: float
1431 @param timeout: maximum time to wait in seconds
1432 @rtype: tuple (job info, log entries)
1433 @return: a tuple of the job information as required via
1434 the fields parameter, and the log entries as a list
1436 if the job has not changed and the timeout has expired,
1437 we instead return a special value,
1438 L{constants.JOB_NOTCHANGED}, which should be interpreted
1439 as such by the clients
1442 load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1444 helper = _WaitForJobChangesHelper()
1446 return helper(self._GetJobPath(job_id), load_fn,
1447 fields, prev_job_info, prev_log_serial, timeout)
1449 @locking.ssynchronized(_LOCK)
1451 def CancelJob(self, job_id):
1454 This will only succeed if the job has not started yet.
1456 @type job_id: string
1457 @param job_id: job ID of job to be cancelled.
1460 logging.info("Cancelling job %s", job_id)
1462 job = self._LoadJobUnlocked(job_id)
1464 logging.debug("Job %s not found", job_id)
1465 return (False, "Job %s not found" % job_id)
1467 job_status = job.CalcStatus()
1469 if job_status not in (constants.JOB_STATUS_QUEUED,
1470 constants.JOB_STATUS_WAITLOCK):
1471 logging.debug("Job %s is no longer waiting in the queue", job.id)
1472 return (False, "Job %s is no longer waiting in the queue" % job.id)
1474 if job_status == constants.JOB_STATUS_QUEUED:
1475 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1476 "Job canceled by request")
1477 return (True, "Job %s canceled" % job.id)
1479 elif job_status == constants.JOB_STATUS_WAITLOCK:
1480 # The worker will notice the new status and cancel the job
1481 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1482 return (True, "Job %s will be canceled" % job.id)
1485 def _ArchiveJobsUnlocked(self, jobs):
1488 @type jobs: list of L{_QueuedJob}
1489 @param jobs: Job objects
1491 @return: Number of archived jobs
1497 if job.CalcStatus() not in constants.JOBS_FINALIZED:
1498 logging.debug("Job %s is not yet done", job.id)
1501 archive_jobs.append(job)
1503 old = self._GetJobPath(job.id)
1504 new = self._GetArchivedJobPath(job.id)
1505 rename_files.append((old, new))
1507 # TODO: What if 1..n files fail to rename?
1508 self._RenameFilesUnlocked(rename_files)
1510 logging.debug("Successfully archived job(s) %s",
1511 utils.CommaJoin(job.id for job in archive_jobs))
1513 # Since we haven't quite checked, above, if we succeeded or failed renaming
1514 # the files, we update the cached queue size from the filesystem. When we
1515 # get around to fix the TODO: above, we can use the number of actually
1516 # archived jobs to fix this.
1517 self._UpdateQueueSizeUnlocked()
1518 return len(archive_jobs)
1520 @locking.ssynchronized(_LOCK)
1522 def ArchiveJob(self, job_id):
1525 This is just a wrapper over L{_ArchiveJobsUnlocked}.
1527 @type job_id: string
1528 @param job_id: Job ID of job to be archived.
1530 @return: Whether job was archived
1533 logging.info("Archiving job %s", job_id)
1535 job = self._LoadJobUnlocked(job_id)
1537 logging.debug("Job %s not found", job_id)
1540 return self._ArchiveJobsUnlocked([job]) == 1
1542 @locking.ssynchronized(_LOCK)
1544 def AutoArchiveJobs(self, age, timeout):
1545 """Archives all jobs based on age.
1547 The method will archive all jobs which are older than the age
1548 parameter. For jobs that don't have an end timestamp, the start
1549 timestamp will be considered. The special '-1' age will cause
1550 archival of all jobs (that are not running or queued).
1553 @param age: the minimum age in seconds
1556 logging.info("Archiving jobs with age more than %s seconds", age)
1559 end_time = now + timeout
1563 all_job_ids = self._GetJobIDsUnlocked()
1565 for idx, job_id in enumerate(all_job_ids):
1566 last_touched = idx + 1
1568 # Not optimal because jobs could be pending
1569 # TODO: Measure average duration for job archival and take number of
1570 # pending jobs into account.
1571 if time.time() > end_time:
1574 # Returns None if the job failed to load
1575 job = self._LoadJobUnlocked(job_id)
1577 if job.end_timestamp is None:
1578 if job.start_timestamp is None:
1579 job_age = job.received_timestamp
1581 job_age = job.start_timestamp
1583 job_age = job.end_timestamp
1585 if age == -1 or now - job_age[0] > age:
1588 # Archive 10 jobs at a time
1589 if len(pending) >= 10:
1590 archived_count += self._ArchiveJobsUnlocked(pending)
1594 archived_count += self._ArchiveJobsUnlocked(pending)
1596 return (archived_count, len(all_job_ids) - last_touched)
1598 def QueryJobs(self, job_ids, fields):
1599 """Returns a list of jobs in queue.
1602 @param job_ids: sequence of job identifiers or None for all
1604 @param fields: names of fields to return
1606 @return: list one element per job, each element being list with
1607 the requested fields
1613 # Since files are added to/removed from the queue atomically, there's no
1614 # risk of getting the job ids in an inconsistent state.
1615 job_ids = self._GetJobIDsUnlocked()
1618 for job_id in job_ids:
1619 job = self.SafeLoadJobFromDisk(job_id)
1621 jobs.append(job.GetInfo(fields))
1627 @locking.ssynchronized(_LOCK)
1630 """Stops the job queue.
1632 This shutdowns all the worker threads an closes the queue.
1635 self._wpool.TerminateWorkers()
1637 self._queue_filelock.Close()
1638 self._queue_filelock = None