4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
25 There's a single, large lock in the JobQueue class. It's used by all other
26 classes in this module.
38 from ganeti import constants
39 from ganeti import serializer
40 from ganeti import workerpool
41 from ganeti import opcodes
42 from ganeti import errors
43 from ganeti import mcpu
44 from ganeti import utils
45 from ganeti import jstore
46 from ganeti import rpc
48 from ganeti.rpc import RpcRunner
54 return utils.SplitTime(time.time())
57 class _QueuedOpCode(object):
58 """Encasulates an opcode object.
60 The 'log' attribute holds the execution log and consists of tuples
61 of the form (log_serial, timestamp, level, message).
64 def __init__(self, op):
66 self.status = constants.OP_STATUS_QUEUED
69 self.start_timestamp = None
70 self.end_timestamp = None
73 def Restore(cls, state):
74 obj = _QueuedOpCode.__new__(cls)
75 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
76 obj.status = state["status"]
77 obj.result = state["result"]
78 obj.log = state["log"]
79 obj.start_timestamp = state.get("start_timestamp", None)
80 obj.end_timestamp = state.get("end_timestamp", None)
85 "input": self.input.__getstate__(),
86 "status": self.status,
87 "result": self.result,
89 "start_timestamp": self.start_timestamp,
90 "end_timestamp": self.end_timestamp,
94 class _QueuedJob(object):
95 """In-memory job representation.
97 This is what we use to track the user-submitted jobs. Locking must be taken
98 care of by users of this class.
101 def __init__(self, queue, job_id, ops):
104 raise Exception("No opcodes")
108 self.ops = [_QueuedOpCode(op) for op in ops]
109 self.run_op_index = -1
111 self.received_timestamp = TimeStampNow()
112 self.start_timestamp = None
113 self.end_timestamp = None
115 # Condition to wait for changes
116 self.change = threading.Condition(self.queue._lock)
119 def Restore(cls, queue, state):
120 obj = _QueuedJob.__new__(cls)
123 obj.run_op_index = state["run_op_index"]
124 obj.received_timestamp = state.get("received_timestamp", None)
125 obj.start_timestamp = state.get("start_timestamp", None)
126 obj.end_timestamp = state.get("end_timestamp", None)
130 for op_state in state["ops"]:
131 op = _QueuedOpCode.Restore(op_state)
132 for log_entry in op.log:
133 obj.log_serial = max(obj.log_serial, log_entry[0])
136 # Condition to wait for changes
137 obj.change = threading.Condition(obj.queue._lock)
144 "ops": [op.Serialize() for op in self.ops],
145 "run_op_index": self.run_op_index,
146 "start_timestamp": self.start_timestamp,
147 "end_timestamp": self.end_timestamp,
148 "received_timestamp": self.received_timestamp,
151 def CalcStatus(self):
152 status = constants.JOB_STATUS_QUEUED
156 if op.status == constants.OP_STATUS_SUCCESS:
161 if op.status == constants.OP_STATUS_QUEUED:
163 elif op.status == constants.OP_STATUS_WAITLOCK:
164 status = constants.JOB_STATUS_WAITLOCK
165 elif op.status == constants.OP_STATUS_RUNNING:
166 status = constants.JOB_STATUS_RUNNING
167 elif op.status == constants.OP_STATUS_ERROR:
168 status = constants.JOB_STATUS_ERROR
169 # The whole job fails if one opcode failed
171 elif op.status == constants.OP_STATUS_CANCELED:
172 status = constants.OP_STATUS_CANCELED
176 status = constants.JOB_STATUS_SUCCESS
180 def GetLogEntries(self, newer_than):
181 if newer_than is None:
188 entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
193 class _JobQueueWorker(workerpool.BaseWorker):
194 def _NotifyStart(self):
195 """Mark the opcode as running, not lock-waiting.
197 This is called from the mcpu code as a notifier function, when the
198 LU is finally about to start the Exec() method. Of course, to have
199 end-user visible results, the opcode must be initially (before
200 calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
203 assert self.queue, "Queue attribute is missing"
204 assert self.opcode, "Opcode attribute is missing"
208 self.opcode.status = constants.OP_STATUS_RUNNING
212 def RunTask(self, job):
215 This functions processes a job. It is closely tied to the _QueuedJob and
216 _QueuedOpCode classes.
219 logging.debug("Worker %s processing job %s",
220 self.worker_id, job.id)
221 proc = mcpu.Processor(self.pool.queue.context)
222 self.queue = queue = job.queue
226 for idx, op in enumerate(job.ops):
228 logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
232 job.run_op_index = idx
233 op.status = constants.OP_STATUS_WAITLOCK
235 op.start_timestamp = TimeStampNow()
236 if idx == 0: # first opcode
237 job.start_timestamp = op.start_timestamp
238 queue.UpdateJobUnlocked(job)
240 input_opcode = op.input
245 """Append a log entry.
251 log_type = constants.ELOG_MESSAGE
254 log_type, log_msg = args
256 # The time is split to make serialization easier and not lose
258 timestamp = utils.SplitTime(time.time())
263 op.log.append((job.log_serial, timestamp, log_type, log_msg))
265 job.change.notifyAll()
269 # Make sure not to hold lock while _Log is called
271 result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
275 op.status = constants.OP_STATUS_SUCCESS
277 op.end_timestamp = TimeStampNow()
278 queue.UpdateJobUnlocked(job)
282 logging.debug("Op %s/%s: Successfully finished %s",
284 except Exception, err:
288 op.status = constants.OP_STATUS_ERROR
290 op.end_timestamp = TimeStampNow()
291 logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
293 queue.UpdateJobUnlocked(job)
298 except errors.GenericError, err:
299 logging.exception("Ganeti exception")
301 logging.exception("Unhandled exception")
307 job.end_timestamp = TimeStampNow()
308 queue.UpdateJobUnlocked(job)
311 status = job.CalcStatus()
314 logging.debug("Worker %s finished job %s, status = %s",
315 self.worker_id, job_id, status)
318 class _JobQueueWorkerPool(workerpool.WorkerPool):
319 def __init__(self, queue):
320 super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
325 class JobQueue(object):
326 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
328 def _RequireOpenQueue(fn):
329 """Decorator for "public" functions.
331 This function should be used for all "public" functions. That is, functions
332 usually called from other classes.
334 Important: Use this decorator only after utils.LockedMethod!
343 def wrapper(self, *args, **kwargs):
344 assert self._queue_lock is not None, "Queue should be open"
345 return fn(self, *args, **kwargs)
348 def __init__(self, context):
349 self.context = context
350 self._memcache = weakref.WeakValueDictionary()
351 self._my_hostname = utils.HostInfo().name
354 self._lock = threading.Lock()
355 self.acquire = self._lock.acquire
356 self.release = self._lock.release
359 self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
362 self._last_serial = jstore.ReadSerial()
363 assert self._last_serial is not None, ("Serial file was modified between"
364 " check in jstore and here")
366 # Get initial list of nodes
367 self._nodes = dict((n.name, n.primary_ip)
368 for n in self.context.cfg.GetAllNodesInfo().values())
372 del self._nodes[self._my_hostname]
376 # TODO: Check consistency across nodes
379 self._wpool = _JobQueueWorkerPool(self)
381 # We need to lock here because WorkerPool.AddTask() may start a job while
382 # we're still doing our work.
385 for job in self._GetJobsUnlocked(None):
386 # a failure in loading the job can cause 'None' to be returned
390 status = job.CalcStatus()
392 if status in (constants.JOB_STATUS_QUEUED, ):
393 self._wpool.AddTask(job)
395 elif status in (constants.JOB_STATUS_RUNNING,
396 constants.JOB_STATUS_WAITLOCK):
397 logging.warning("Unfinished job %s found: %s", job.id, job)
400 op.status = constants.OP_STATUS_ERROR
401 op.result = "Unclean master daemon shutdown"
403 self.UpdateJobUnlocked(job)
409 def AddNode(self, node):
410 """Register a new node with the queue.
412 @type node: L{objects.Node}
413 @param node: the node object to be added
416 node_name = node.name
417 assert node_name != self._my_hostname
419 # Clean queue directory on added node
420 RpcRunner.call_jobqueue_purge(node_name)
422 # Upload the whole queue excluding archived jobs
423 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
425 # Upload current serial file
426 files.append(constants.JOB_QUEUE_SERIAL_FILE)
428 for file_name in files:
430 fd = open(file_name, "r")
436 result = RpcRunner.call_jobqueue_update([node_name], [node.primary_ip],
438 if not result[node_name]:
439 logging.error("Failed to upload %s to %s", file_name, node_name)
441 self._nodes[node_name] = node.primary_ip
445 def RemoveNode(self, node_name):
447 # The queue is removed by the "leave node" RPC call.
448 del self._nodes[node_name]
452 def _CheckRpcResult(self, result, nodes, failmsg):
463 logging.error("%s failed on %s", failmsg, ", ".join(failed))
465 # +1 for the master node
466 if (len(success) + 1) < len(failed):
467 # TODO: Handle failing nodes
468 logging.error("More than half of the nodes failed")
470 def _GetNodeIp(self):
471 """Helper for returning the node name/ip list.
474 name_list = self._nodes.keys()
475 addr_list = [self._nodes[name] for name in name_list]
476 return name_list, addr_list
478 def _WriteAndReplicateFileUnlocked(self, file_name, data):
479 """Writes a file locally and then replicates it to all nodes.
482 utils.WriteFile(file_name, data=data)
484 names, addrs = self._GetNodeIp()
485 result = RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
486 self._CheckRpcResult(result, self._nodes,
487 "Updating %s" % file_name)
489 def _RenameFileUnlocked(self, old, new):
492 names, addrs = self._GetNodeIp()
493 result = RpcRunner.call_jobqueue_rename(names, addrs, old, new)
494 self._CheckRpcResult(result, self._nodes,
495 "Moving %s to %s" % (old, new))
497 def _FormatJobID(self, job_id):
498 if not isinstance(job_id, (int, long)):
499 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
501 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
505 def _NewSerialUnlocked(self):
506 """Generates a new job identifier.
508 Job identifiers are unique during the lifetime of a cluster.
510 Returns: A string representing the job identifier.
514 serial = self._last_serial + 1
517 self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
520 # Keep it only if we were able to write the file
521 self._last_serial = serial
523 return self._FormatJobID(serial)
526 def _GetJobPath(job_id):
527 return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
530 def _GetArchivedJobPath(job_id):
531 return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
534 def _ExtractJobID(cls, name):
535 m = cls._RE_JOB_FILE.match(name)
541 def _GetJobIDsUnlocked(self, archived=False):
542 """Return all known job IDs.
544 If the parameter archived is True, archived jobs IDs will be
545 included. Currently this argument is unused.
547 The method only looks at disk because it's a requirement that all
548 jobs are present on disk (so in the _memcache we don't have any
552 jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
553 jlist = utils.NiceSort(jlist)
556 def _ListJobFiles(self):
557 return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
558 if self._RE_JOB_FILE.match(name)]
560 def _LoadJobUnlocked(self, job_id):
561 job = self._memcache.get(job_id, None)
563 logging.debug("Found job %s in memcache", job_id)
566 filepath = self._GetJobPath(job_id)
567 logging.debug("Loading job from %s", filepath)
569 fd = open(filepath, "r")
571 if err.errno in (errno.ENOENT, ):
575 data = serializer.LoadJson(fd.read())
580 job = _QueuedJob.Restore(self, data)
581 except Exception, err:
582 new_path = self._GetArchivedJobPath(job_id)
583 if filepath == new_path:
584 # job already archived (future case)
585 logging.exception("Can't parse job %s", job_id)
588 logging.exception("Can't parse job %s, will archive.", job_id)
589 self._RenameFileUnlocked(filepath, new_path)
592 self._memcache[job_id] = job
593 logging.debug("Added job %s to the cache", job_id)
596 def _GetJobsUnlocked(self, job_ids):
598 job_ids = self._GetJobIDsUnlocked()
600 return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
603 def _IsQueueMarkedDrain():
604 """Check if the queue is marked from drain.
606 This currently uses the queue drain file, which makes it a
607 per-node flag. In the future this can be moved to the config file.
610 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
613 def SetDrainFlag(drain_flag):
614 """Sets the drain flag for the queue.
616 This is similar to the function L{backend.JobQueueSetDrainFlag},
617 and in the future we might merge them.
621 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
623 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
628 def SubmitJob(self, ops):
629 """Create and store a new job.
631 This enters the job into our job queue and also puts it on the new
632 queue, in order for it to be picked up by the queue processors.
635 @param ops: The list of OpCodes that will become the new job.
638 if self._IsQueueMarkedDrain():
639 raise errors.JobQueueDrainError()
641 job_id = self._NewSerialUnlocked()
642 job = _QueuedJob(self, job_id, ops)
645 self.UpdateJobUnlocked(job)
647 logging.debug("Adding new job %s to the cache", job_id)
648 self._memcache[job_id] = job
651 self._wpool.AddTask(job)
656 def UpdateJobUnlocked(self, job):
657 filename = self._GetJobPath(job.id)
658 data = serializer.DumpJson(job.Serialize(), indent=False)
659 logging.debug("Writing job %s to %s", job.id, filename)
660 self._WriteAndReplicateFileUnlocked(filename, data)
662 # Notify waiters about potential changes
663 job.change.notifyAll()
667 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
669 """Waits for changes in a job.
672 @param job_id: Job identifier
673 @type fields: list of strings
674 @param fields: Which fields to check for changes
675 @type prev_job_info: list or None
676 @param prev_job_info: Last job information returned
677 @type prev_log_serial: int
678 @param prev_log_serial: Last job message serial number
680 @param timeout: maximum time to wait
683 logging.debug("Waiting for changes in job %s", job_id)
684 end_time = time.time() + timeout
686 delta_time = end_time - time.time()
688 return constants.JOB_NOTCHANGED
690 job = self._LoadJobUnlocked(job_id)
692 logging.debug("Job %s not found", job_id)
695 status = job.CalcStatus()
696 job_info = self._GetJobInfoUnlocked(job, fields)
697 log_entries = job.GetLogEntries(prev_log_serial)
699 # Serializing and deserializing data can cause type changes (e.g. from
700 # tuple to list) or precision loss. We're doing it here so that we get
701 # the same modifications as the data received from the client. Without
702 # this, the comparison afterwards might fail without the data being
703 # significantly different.
704 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
705 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
707 if status not in (constants.JOB_STATUS_QUEUED,
708 constants.JOB_STATUS_RUNNING,
709 constants.JOB_STATUS_WAITLOCK):
710 # Don't even try to wait if the job is no longer running, there will be
714 if (prev_job_info != job_info or
715 (log_entries and prev_log_serial != log_entries[0][0])):
718 logging.debug("Waiting again")
720 # Release the queue lock while waiting
721 job.change.wait(delta_time)
723 logging.debug("Job %s changed", job_id)
725 return (job_info, log_entries)
729 def CancelJob(self, job_id):
733 @param job_id: Job ID of job to be cancelled.
736 logging.debug("Cancelling job %s", job_id)
738 job = self._LoadJobUnlocked(job_id)
740 logging.debug("Job %s not found", job_id)
743 if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
744 logging.debug("Job %s is no longer in the queue", job.id)
749 op.status = constants.OP_STATUS_ERROR
750 op.result = "Job cancelled by request"
752 self.UpdateJobUnlocked(job)
755 def _ArchiveJobUnlocked(self, job_id):
759 @param job_id: Job ID of job to be archived.
762 logging.info("Archiving job %s", job_id)
764 job = self._LoadJobUnlocked(job_id)
766 logging.debug("Job %s not found", job_id)
769 if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
770 constants.JOB_STATUS_SUCCESS,
771 constants.JOB_STATUS_ERROR):
772 logging.debug("Job %s is not yet done", job.id)
775 old = self._GetJobPath(job.id)
776 new = self._GetArchivedJobPath(job.id)
778 self._RenameFileUnlocked(old, new)
780 logging.debug("Successfully archived job %s", job.id)
784 def ArchiveJob(self, job_id):
788 @param job_id: Job ID of job to be archived.
791 return self._ArchiveJobUnlocked(job_id)
795 def AutoArchiveJobs(self, age):
796 """Archives all jobs based on age.
798 The method will archive all jobs which are older than the age
799 parameter. For jobs that don't have an end timestamp, the start
800 timestamp will be considered. The special '-1' age will cause
801 archival of all jobs (that are not running or queued).
804 @param age: the minimum age in seconds
807 logging.info("Archiving jobs with age more than %s seconds", age)
810 for jid in self._GetJobIDsUnlocked(archived=False):
811 job = self._LoadJobUnlocked(jid)
812 if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
813 constants.OP_STATUS_ERROR,
814 constants.OP_STATUS_CANCELED):
816 if job.end_timestamp is None:
817 if job.start_timestamp is None:
818 job_age = job.received_timestamp
820 job_age = job.start_timestamp
822 job_age = job.end_timestamp
824 if age == -1 or now - job_age[0] > age:
825 self._ArchiveJobUnlocked(jid)
827 def _GetJobInfoUnlocked(self, job, fields):
832 elif fname == "status":
833 row.append(job.CalcStatus())
835 row.append([op.input.__getstate__() for op in job.ops])
836 elif fname == "opresult":
837 row.append([op.result for op in job.ops])
838 elif fname == "opstatus":
839 row.append([op.status for op in job.ops])
840 elif fname == "oplog":
841 row.append([op.log for op in job.ops])
842 elif fname == "opstart":
843 row.append([op.start_timestamp for op in job.ops])
844 elif fname == "opend":
845 row.append([op.end_timestamp for op in job.ops])
846 elif fname == "received_ts":
847 row.append(job.received_timestamp)
848 elif fname == "start_ts":
849 row.append(job.start_timestamp)
850 elif fname == "end_ts":
851 row.append(job.end_timestamp)
852 elif fname == "summary":
853 row.append([op.input.Summary() for op in job.ops])
855 raise errors.OpExecError("Invalid job query field '%s'" % fname)
860 def QueryJobs(self, job_ids, fields):
861 """Returns a list of jobs in queue.
864 - job_ids: Sequence of job identifiers or None for all
865 - fields: Names of fields to return
870 for job in self._GetJobsUnlocked(job_ids):
874 jobs.append(self._GetJobInfoUnlocked(job, fields))
881 """Stops the job queue.
884 self._wpool.TerminateWorkers()
886 self._queue_lock.Close()
887 self._queue_lock = None