4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
25 There's a single, large lock in the JobQueue class. It's used by all other
26 classes in this module.
38 from ganeti import constants
39 from ganeti import serializer
40 from ganeti import workerpool
41 from ganeti import opcodes
42 from ganeti import errors
43 from ganeti import mcpu
44 from ganeti import utils
45 from ganeti import jstore
46 from ganeti import rpc
48 from ganeti.rpc import RpcRunner
54 return utils.SplitTime(time.time())
57 class _QueuedOpCode(object):
58 """Encasulates an opcode object.
60 The 'log' attribute holds the execution log and consists of tuples
61 of the form (log_serial, timestamp, level, message).
64 def __init__(self, op):
66 self.status = constants.OP_STATUS_QUEUED
69 self.start_timestamp = None
70 self.end_timestamp = None
73 def Restore(cls, state):
74 obj = _QueuedOpCode.__new__(cls)
75 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
76 obj.status = state["status"]
77 obj.result = state["result"]
78 obj.log = state["log"]
79 obj.start_timestamp = state.get("start_timestamp", None)
80 obj.end_timestamp = state.get("end_timestamp", None)
85 "input": self.input.__getstate__(),
86 "status": self.status,
87 "result": self.result,
89 "start_timestamp": self.start_timestamp,
90 "end_timestamp": self.end_timestamp,
94 class _QueuedJob(object):
95 """In-memory job representation.
97 This is what we use to track the user-submitted jobs. Locking must be taken
98 care of by users of this class.
101 def __init__(self, queue, job_id, ops):
104 raise Exception("No opcodes")
108 self.ops = [_QueuedOpCode(op) for op in ops]
109 self.run_op_index = -1
111 self.received_timestamp = TimeStampNow()
112 self.start_timestamp = None
113 self.end_timestamp = None
115 # Condition to wait for changes
116 self.change = threading.Condition(self.queue._lock)
119 def Restore(cls, queue, state):
120 obj = _QueuedJob.__new__(cls)
123 obj.run_op_index = state["run_op_index"]
124 obj.received_timestamp = state.get("received_timestamp", None)
125 obj.start_timestamp = state.get("start_timestamp", None)
126 obj.end_timestamp = state.get("end_timestamp", None)
130 for op_state in state["ops"]:
131 op = _QueuedOpCode.Restore(op_state)
132 for log_entry in op.log:
133 obj.log_serial = max(obj.log_serial, log_entry[0])
136 # Condition to wait for changes
137 obj.change = threading.Condition(obj.queue._lock)
144 "ops": [op.Serialize() for op in self.ops],
145 "run_op_index": self.run_op_index,
146 "start_timestamp": self.start_timestamp,
147 "end_timestamp": self.end_timestamp,
148 "received_timestamp": self.received_timestamp,
151 def CalcStatus(self):
152 status = constants.JOB_STATUS_QUEUED
156 if op.status == constants.OP_STATUS_SUCCESS:
161 if op.status == constants.OP_STATUS_QUEUED:
163 elif op.status == constants.OP_STATUS_WAITLOCK:
164 status = constants.JOB_STATUS_WAITLOCK
165 elif op.status == constants.OP_STATUS_RUNNING:
166 status = constants.JOB_STATUS_RUNNING
167 elif op.status == constants.OP_STATUS_ERROR:
168 status = constants.JOB_STATUS_ERROR
169 # The whole job fails if one opcode failed
171 elif op.status == constants.OP_STATUS_CANCELED:
172 status = constants.OP_STATUS_CANCELED
176 status = constants.JOB_STATUS_SUCCESS
180 def GetLogEntries(self, newer_than):
181 if newer_than is None:
188 entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
193 class _JobQueueWorker(workerpool.BaseWorker):
194 def _NotifyStart(self):
195 """Mark the opcode as running, not lock-waiting.
197 This is called from the mcpu code as a notifier function, when the
198 LU is finally about to start the Exec() method. Of course, to have
199 end-user visible results, the opcode must be initially (before
200 calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
203 assert self.queue, "Queue attribute is missing"
204 assert self.opcode, "Opcode attribute is missing"
208 self.opcode.status = constants.OP_STATUS_RUNNING
212 def RunTask(self, job):
215 This functions processes a job. It is closely tied to the _QueuedJob and
216 _QueuedOpCode classes.
219 logging.debug("Worker %s processing job %s",
220 self.worker_id, job.id)
221 proc = mcpu.Processor(self.pool.queue.context)
222 self.queue = queue = job.queue
226 for idx, op in enumerate(job.ops):
228 logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
232 job.run_op_index = idx
233 op.status = constants.OP_STATUS_WAITLOCK
235 op.start_timestamp = TimeStampNow()
236 if idx == 0: # first opcode
237 job.start_timestamp = op.start_timestamp
238 queue.UpdateJobUnlocked(job)
240 input_opcode = op.input
245 """Append a log entry.
251 log_type = constants.ELOG_MESSAGE
254 log_type, log_msg = args
256 # The time is split to make serialization easier and not lose
258 timestamp = utils.SplitTime(time.time())
263 op.log.append((job.log_serial, timestamp, log_type, log_msg))
265 job.change.notifyAll()
269 # Make sure not to hold lock while _Log is called
271 result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
275 op.status = constants.OP_STATUS_SUCCESS
277 op.end_timestamp = TimeStampNow()
278 queue.UpdateJobUnlocked(job)
282 logging.debug("Op %s/%s: Successfully finished %s",
284 except Exception, err:
288 op.status = constants.OP_STATUS_ERROR
290 op.end_timestamp = TimeStampNow()
291 logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
293 queue.UpdateJobUnlocked(job)
298 except errors.GenericError, err:
299 logging.exception("Ganeti exception")
301 logging.exception("Unhandled exception")
307 job.end_timestamp = TimeStampNow()
308 queue.UpdateJobUnlocked(job)
311 status = job.CalcStatus()
314 logging.debug("Worker %s finished job %s, status = %s",
315 self.worker_id, job_id, status)
318 class _JobQueueWorkerPool(workerpool.WorkerPool):
319 def __init__(self, queue):
320 super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
325 class JobQueue(object):
326 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
328 def _RequireOpenQueue(fn):
329 """Decorator for "public" functions.
331 This function should be used for all "public" functions. That is, functions
332 usually called from other classes.
334 Important: Use this decorator only after utils.LockedMethod!
343 def wrapper(self, *args, **kwargs):
344 assert self._queue_lock is not None, "Queue should be open"
345 return fn(self, *args, **kwargs)
348 def __init__(self, context):
349 self.context = context
350 self._memcache = weakref.WeakValueDictionary()
351 self._my_hostname = utils.HostInfo().name
354 self._lock = threading.Lock()
355 self.acquire = self._lock.acquire
356 self.release = self._lock.release
359 self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
362 self._last_serial = jstore.ReadSerial()
363 assert self._last_serial is not None, ("Serial file was modified between"
364 " check in jstore and here")
366 # Get initial list of nodes
367 self._nodes = set(self.context.cfg.GetNodeList())
371 self._nodes.remove(self._my_hostname)
375 # TODO: Check consistency across nodes
378 self._wpool = _JobQueueWorkerPool(self)
380 # We need to lock here because WorkerPool.AddTask() may start a job while
381 # we're still doing our work.
384 for job in self._GetJobsUnlocked(None):
385 # a failure in loading the job can cause 'None' to be returned
389 status = job.CalcStatus()
391 if status in (constants.JOB_STATUS_QUEUED, ):
392 self._wpool.AddTask(job)
394 elif status in (constants.JOB_STATUS_RUNNING,
395 constants.JOB_STATUS_WAITLOCK):
396 logging.warning("Unfinished job %s found: %s", job.id, job)
399 op.status = constants.OP_STATUS_ERROR
400 op.result = "Unclean master daemon shutdown"
402 self.UpdateJobUnlocked(job)
408 def AddNode(self, node_name):
409 assert node_name != self._my_hostname
411 # Clean queue directory on added node
412 RpcRunner.call_jobqueue_purge(node_name)
414 # Upload the whole queue excluding archived jobs
415 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
417 # Upload current serial file
418 files.append(constants.JOB_QUEUE_SERIAL_FILE)
420 for file_name in files:
422 fd = open(file_name, "r")
428 result = RpcRunner.call_jobqueue_update([node_name], file_name, content)
429 if not result[node_name]:
430 logging.error("Failed to upload %s to %s", file_name, node_name)
432 self._nodes.add(node_name)
436 def RemoveNode(self, node_name):
438 # The queue is removed by the "leave node" RPC call.
439 self._nodes.remove(node_name)
443 def _CheckRpcResult(self, result, nodes, failmsg):
454 logging.error("%s failed on %s", failmsg, ", ".join(failed))
456 # +1 for the master node
457 if (len(success) + 1) < len(failed):
458 # TODO: Handle failing nodes
459 logging.error("More than half of the nodes failed")
461 def _WriteAndReplicateFileUnlocked(self, file_name, data):
462 """Writes a file locally and then replicates it to all nodes.
465 utils.WriteFile(file_name, data=data)
467 result = RpcRunner.call_jobqueue_update(self._nodes, file_name, data)
468 self._CheckRpcResult(result, self._nodes,
469 "Updating %s" % file_name)
471 def _RenameFileUnlocked(self, old, new):
474 result = RpcRunner.call_jobqueue_rename(self._nodes, old, new)
475 self._CheckRpcResult(result, self._nodes,
476 "Moving %s to %s" % (old, new))
478 def _FormatJobID(self, job_id):
479 if not isinstance(job_id, (int, long)):
480 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
482 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
486 def _NewSerialUnlocked(self):
487 """Generates a new job identifier.
489 Job identifiers are unique during the lifetime of a cluster.
491 Returns: A string representing the job identifier.
495 serial = self._last_serial + 1
498 self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
501 # Keep it only if we were able to write the file
502 self._last_serial = serial
504 return self._FormatJobID(serial)
507 def _GetJobPath(job_id):
508 return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
511 def _GetArchivedJobPath(job_id):
512 return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
515 def _ExtractJobID(cls, name):
516 m = cls._RE_JOB_FILE.match(name)
522 def _GetJobIDsUnlocked(self, archived=False):
523 """Return all known job IDs.
525 If the parameter archived is True, archived jobs IDs will be
526 included. Currently this argument is unused.
528 The method only looks at disk because it's a requirement that all
529 jobs are present on disk (so in the _memcache we don't have any
533 jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
534 jlist = utils.NiceSort(jlist)
537 def _ListJobFiles(self):
538 return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
539 if self._RE_JOB_FILE.match(name)]
541 def _LoadJobUnlocked(self, job_id):
542 job = self._memcache.get(job_id, None)
544 logging.debug("Found job %s in memcache", job_id)
547 filepath = self._GetJobPath(job_id)
548 logging.debug("Loading job from %s", filepath)
550 fd = open(filepath, "r")
552 if err.errno in (errno.ENOENT, ):
556 data = serializer.LoadJson(fd.read())
561 job = _QueuedJob.Restore(self, data)
562 except Exception, err:
563 new_path = self._GetArchivedJobPath(job_id)
564 if filepath == new_path:
565 # job already archived (future case)
566 logging.exception("Can't parse job %s", job_id)
569 logging.exception("Can't parse job %s, will archive.", job_id)
570 self._RenameFileUnlocked(filepath, new_path)
573 self._memcache[job_id] = job
574 logging.debug("Added job %s to the cache", job_id)
577 def _GetJobsUnlocked(self, job_ids):
579 job_ids = self._GetJobIDsUnlocked()
581 return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
584 def _IsQueueMarkedDrain():
585 """Check if the queue is marked from drain.
587 This currently uses the queue drain file, which makes it a
588 per-node flag. In the future this can be moved to the config file.
591 return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
594 def SetDrainFlag(drain_flag):
595 """Sets the drain flag for the queue.
597 This is similar to the function L{backend.JobQueueSetDrainFlag},
598 and in the future we might merge them.
602 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
604 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
609 def SubmitJob(self, ops):
610 """Create and store a new job.
612 This enters the job into our job queue and also puts it on the new
613 queue, in order for it to be picked up by the queue processors.
616 @param ops: The list of OpCodes that will become the new job.
619 if self._IsQueueMarkedDrain():
620 raise errors.JobQueueDrainError()
622 job_id = self._NewSerialUnlocked()
623 job = _QueuedJob(self, job_id, ops)
626 self.UpdateJobUnlocked(job)
628 logging.debug("Adding new job %s to the cache", job_id)
629 self._memcache[job_id] = job
632 self._wpool.AddTask(job)
637 def UpdateJobUnlocked(self, job):
638 filename = self._GetJobPath(job.id)
639 data = serializer.DumpJson(job.Serialize(), indent=False)
640 logging.debug("Writing job %s to %s", job.id, filename)
641 self._WriteAndReplicateFileUnlocked(filename, data)
643 # Notify waiters about potential changes
644 job.change.notifyAll()
648 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
650 """Waits for changes in a job.
653 @param job_id: Job identifier
654 @type fields: list of strings
655 @param fields: Which fields to check for changes
656 @type prev_job_info: list or None
657 @param prev_job_info: Last job information returned
658 @type prev_log_serial: int
659 @param prev_log_serial: Last job message serial number
661 @param timeout: maximum time to wait
664 logging.debug("Waiting for changes in job %s", job_id)
665 end_time = time.time() + timeout
667 delta_time = end_time - time.time()
669 return constants.JOB_NOTCHANGED
671 job = self._LoadJobUnlocked(job_id)
673 logging.debug("Job %s not found", job_id)
676 status = job.CalcStatus()
677 job_info = self._GetJobInfoUnlocked(job, fields)
678 log_entries = job.GetLogEntries(prev_log_serial)
680 # Serializing and deserializing data can cause type changes (e.g. from
681 # tuple to list) or precision loss. We're doing it here so that we get
682 # the same modifications as the data received from the client. Without
683 # this, the comparison afterwards might fail without the data being
684 # significantly different.
685 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
686 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
688 if status not in (constants.JOB_STATUS_QUEUED,
689 constants.JOB_STATUS_RUNNING,
690 constants.JOB_STATUS_WAITLOCK):
691 # Don't even try to wait if the job is no longer running, there will be
695 if (prev_job_info != job_info or
696 (log_entries and prev_log_serial != log_entries[0][0])):
699 logging.debug("Waiting again")
701 # Release the queue lock while waiting
702 job.change.wait(delta_time)
704 logging.debug("Job %s changed", job_id)
706 return (job_info, log_entries)
710 def CancelJob(self, job_id):
714 @param job_id: Job ID of job to be cancelled.
717 logging.debug("Cancelling job %s", job_id)
719 job = self._LoadJobUnlocked(job_id)
721 logging.debug("Job %s not found", job_id)
724 if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
725 logging.debug("Job %s is no longer in the queue", job.id)
730 op.status = constants.OP_STATUS_ERROR
731 op.result = "Job cancelled by request"
733 self.UpdateJobUnlocked(job)
736 def _ArchiveJobUnlocked(self, job_id):
740 @param job_id: Job ID of job to be archived.
743 logging.info("Archiving job %s", job_id)
745 job = self._LoadJobUnlocked(job_id)
747 logging.debug("Job %s not found", job_id)
750 if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
751 constants.JOB_STATUS_SUCCESS,
752 constants.JOB_STATUS_ERROR):
753 logging.debug("Job %s is not yet done", job.id)
756 old = self._GetJobPath(job.id)
757 new = self._GetArchivedJobPath(job.id)
759 self._RenameFileUnlocked(old, new)
761 logging.debug("Successfully archived job %s", job.id)
765 def ArchiveJob(self, job_id):
769 @param job_id: Job ID of job to be archived.
772 return self._ArchiveJobUnlocked(job_id)
776 def AutoArchiveJobs(self, age):
777 """Archives all jobs based on age.
779 The method will archive all jobs which are older than the age
780 parameter. For jobs that don't have an end timestamp, the start
781 timestamp will be considered. The special '-1' age will cause
782 archival of all jobs (that are not running or queued).
785 @param age: the minimum age in seconds
788 logging.info("Archiving jobs with age more than %s seconds", age)
791 for jid in self._GetJobIDsUnlocked(archived=False):
792 job = self._LoadJobUnlocked(jid)
793 if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
794 constants.OP_STATUS_ERROR,
795 constants.OP_STATUS_CANCELED):
797 if job.end_timestamp is None:
798 if job.start_timestamp is None:
799 job_age = job.received_timestamp
801 job_age = job.start_timestamp
803 job_age = job.end_timestamp
805 if age == -1 or now - job_age[0] > age:
806 self._ArchiveJobUnlocked(jid)
808 def _GetJobInfoUnlocked(self, job, fields):
813 elif fname == "status":
814 row.append(job.CalcStatus())
816 row.append([op.input.__getstate__() for op in job.ops])
817 elif fname == "opresult":
818 row.append([op.result for op in job.ops])
819 elif fname == "opstatus":
820 row.append([op.status for op in job.ops])
821 elif fname == "oplog":
822 row.append([op.log for op in job.ops])
823 elif fname == "opstart":
824 row.append([op.start_timestamp for op in job.ops])
825 elif fname == "opend":
826 row.append([op.end_timestamp for op in job.ops])
827 elif fname == "received_ts":
828 row.append(job.received_timestamp)
829 elif fname == "start_ts":
830 row.append(job.start_timestamp)
831 elif fname == "end_ts":
832 row.append(job.end_timestamp)
833 elif fname == "summary":
834 row.append([op.input.Summary() for op in job.ops])
836 raise errors.OpExecError("Invalid job query field '%s'" % fname)
841 def QueryJobs(self, job_ids, fields):
842 """Returns a list of jobs in queue.
845 - job_ids: Sequence of job identifiers or None for all
846 - fields: Names of fields to return
851 for job in self._GetJobsUnlocked(job_ids):
855 jobs.append(self._GetJobInfoUnlocked(job, fields))
862 """Stops the job queue.
865 self._wpool.TerminateWorkers()
867 self._queue_lock.Close()
868 self._queue_lock = None