4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling.
25 There's a single, large lock in the JobQueue class. It's used by all other
26 classes in this module.
38 from ganeti import constants
39 from ganeti import serializer
40 from ganeti import workerpool
41 from ganeti import opcodes
42 from ganeti import errors
43 from ganeti import mcpu
44 from ganeti import utils
45 from ganeti import jstore
46 from ganeti import rpc
53 return utils.SplitTime(time.time())
56 class _QueuedOpCode(object):
57 """Encasulates an opcode object.
59 The 'log' attribute holds the execution log and consists of tuples
60 of the form (log_serial, timestamp, level, message).
63 def __init__(self, op):
65 self.status = constants.OP_STATUS_QUEUED
68 self.start_timestamp = None
69 self.end_timestamp = None
72 def Restore(cls, state):
73 obj = _QueuedOpCode.__new__(cls)
74 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
75 obj.status = state["status"]
76 obj.result = state["result"]
77 obj.log = state["log"]
78 obj.start_timestamp = state.get("start_timestamp", None)
79 obj.end_timestamp = state.get("end_timestamp", None)
84 "input": self.input.__getstate__(),
85 "status": self.status,
86 "result": self.result,
88 "start_timestamp": self.start_timestamp,
89 "end_timestamp": self.end_timestamp,
93 class _QueuedJob(object):
94 """In-memory job representation.
96 This is what we use to track the user-submitted jobs. Locking must be taken
97 care of by users of this class.
100 def __init__(self, queue, job_id, ops):
103 raise Exception("No opcodes")
107 self.ops = [_QueuedOpCode(op) for op in ops]
108 self.run_op_index = -1
110 self.received_timestamp = TimeStampNow()
111 self.start_timestamp = None
112 self.end_timestamp = None
114 # Condition to wait for changes
115 self.change = threading.Condition(self.queue._lock)
118 def Restore(cls, queue, state):
119 obj = _QueuedJob.__new__(cls)
122 obj.run_op_index = state["run_op_index"]
123 obj.received_timestamp = state.get("received_timestamp", None)
124 obj.start_timestamp = state.get("start_timestamp", None)
125 obj.end_timestamp = state.get("end_timestamp", None)
129 for op_state in state["ops"]:
130 op = _QueuedOpCode.Restore(op_state)
131 for log_entry in op.log:
132 obj.log_serial = max(obj.log_serial, log_entry[0])
135 # Condition to wait for changes
136 obj.change = threading.Condition(obj.queue._lock)
143 "ops": [op.Serialize() for op in self.ops],
144 "run_op_index": self.run_op_index,
145 "start_timestamp": self.start_timestamp,
146 "end_timestamp": self.end_timestamp,
147 "received_timestamp": self.received_timestamp,
150 def CalcStatus(self):
151 status = constants.JOB_STATUS_QUEUED
155 if op.status == constants.OP_STATUS_SUCCESS:
160 if op.status == constants.OP_STATUS_QUEUED:
162 elif op.status == constants.OP_STATUS_RUNNING:
163 status = constants.JOB_STATUS_RUNNING
164 elif op.status == constants.OP_STATUS_ERROR:
165 status = constants.JOB_STATUS_ERROR
166 # The whole job fails if one opcode failed
168 elif op.status == constants.OP_STATUS_CANCELED:
169 status = constants.OP_STATUS_CANCELED
173 status = constants.JOB_STATUS_SUCCESS
177 def GetLogEntries(self, newer_than):
178 if newer_than is None:
185 entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
190 class _JobQueueWorker(workerpool.BaseWorker):
191 def RunTask(self, job):
194 This functions processes a job. It is closely tied to the _QueuedJob and
195 _QueuedOpCode classes.
198 logging.debug("Worker %s processing job %s",
199 self.worker_id, job.id)
200 proc = mcpu.Processor(self.pool.queue.context)
205 for idx, op in enumerate(job.ops):
207 logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
211 job.run_op_index = idx
212 op.status = constants.OP_STATUS_RUNNING
214 op.start_timestamp = TimeStampNow()
215 if idx == 0: # first opcode
216 job.start_timestamp = op.start_timestamp
217 queue.UpdateJobUnlocked(job)
219 input_opcode = op.input
224 """Append a log entry.
230 log_type = constants.ELOG_MESSAGE
233 log_type, log_msg = args
235 # The time is split to make serialization easier and not lose
237 timestamp = utils.SplitTime(time.time())
242 op.log.append((job.log_serial, timestamp, log_type, log_msg))
244 job.change.notifyAll()
248 # Make sure not to hold lock while _Log is called
249 result = proc.ExecOpCode(input_opcode, _Log)
253 op.status = constants.OP_STATUS_SUCCESS
255 op.end_timestamp = TimeStampNow()
256 queue.UpdateJobUnlocked(job)
260 logging.debug("Op %s/%s: Successfully finished %s",
262 except Exception, err:
266 op.status = constants.OP_STATUS_ERROR
268 op.end_timestamp = TimeStampNow()
269 logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
271 queue.UpdateJobUnlocked(job)
276 except errors.GenericError, err:
277 logging.exception("Ganeti exception")
279 logging.exception("Unhandled exception")
285 job.end_timestamp = TimeStampNow()
286 queue.UpdateJobUnlocked(job)
289 status = job.CalcStatus()
292 logging.debug("Worker %s finished job %s, status = %s",
293 self.worker_id, job_id, status)
296 class _JobQueueWorkerPool(workerpool.WorkerPool):
297 def __init__(self, queue):
298 super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
303 class JobQueue(object):
304 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
306 def _RequireOpenQueue(fn):
307 """Decorator for "public" functions.
309 This function should be used for all "public" functions. That is, functions
310 usually called from other classes.
312 Important: Use this decorator only after utils.LockedMethod!
321 def wrapper(self, *args, **kwargs):
322 assert self._queue_lock is not None, "Queue should be open"
323 return fn(self, *args, **kwargs)
326 def __init__(self, context):
327 self.context = context
328 self._memcache = weakref.WeakValueDictionary()
329 self._my_hostname = utils.HostInfo().name
332 self._lock = threading.Lock()
333 self.acquire = self._lock.acquire
334 self.release = self._lock.release
337 self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
340 self._last_serial = jstore.ReadSerial()
341 assert self._last_serial is not None, ("Serial file was modified between"
342 " check in jstore and here")
344 # Get initial list of nodes
345 self._nodes = set(self.context.cfg.GetNodeList())
349 self._nodes.remove(self._my_hostname)
353 # TODO: Check consistency across nodes
356 self._wpool = _JobQueueWorkerPool(self)
358 # We need to lock here because WorkerPool.AddTask() may start a job while
359 # we're still doing our work.
362 for job in self._GetJobsUnlocked(None):
363 status = job.CalcStatus()
365 if status in (constants.JOB_STATUS_QUEUED, ):
366 self._wpool.AddTask(job)
368 elif status in (constants.JOB_STATUS_RUNNING, ):
369 logging.warning("Unfinished job %s found: %s", job.id, job)
372 op.status = constants.OP_STATUS_ERROR
373 op.result = "Unclean master daemon shutdown"
375 self.UpdateJobUnlocked(job)
381 def AddNode(self, node_name):
382 assert node_name != self._my_hostname
384 # Clean queue directory on added node
385 rpc.call_jobqueue_purge(node_name)
387 # Upload the whole queue excluding archived jobs
388 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
390 # Upload current serial file
391 files.append(constants.JOB_QUEUE_SERIAL_FILE)
393 for file_name in files:
395 fd = open(file_name, "r")
401 result = rpc.call_jobqueue_update([node_name], file_name, content)
402 if not result[node_name]:
403 logging.error("Failed to upload %s to %s", file_name, node_name)
405 self._nodes.add(node_name)
409 def RemoveNode(self, node_name):
411 # The queue is removed by the "leave node" RPC call.
412 self._nodes.remove(node_name)
416 def _CheckRpcResult(self, result, nodes, failmsg):
427 logging.error("%s failed on %s", failmsg, ", ".join(failed))
429 # +1 for the master node
430 if (len(success) + 1) < len(failed):
431 # TODO: Handle failing nodes
432 logging.error("More than half of the nodes failed")
434 def _WriteAndReplicateFileUnlocked(self, file_name, data):
435 """Writes a file locally and then replicates it to all nodes.
438 utils.WriteFile(file_name, data=data)
440 result = rpc.call_jobqueue_update(self._nodes, file_name, data)
441 self._CheckRpcResult(result, self._nodes,
442 "Updating %s" % file_name)
444 def _RenameFileUnlocked(self, old, new):
447 result = rpc.call_jobqueue_rename(self._nodes, old, new)
448 self._CheckRpcResult(result, self._nodes,
449 "Moving %s to %s" % (old, new))
451 def _FormatJobID(self, job_id):
452 if not isinstance(job_id, (int, long)):
453 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
455 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
459 def _NewSerialUnlocked(self):
460 """Generates a new job identifier.
462 Job identifiers are unique during the lifetime of a cluster.
464 Returns: A string representing the job identifier.
468 serial = self._last_serial + 1
471 self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
474 # Keep it only if we were able to write the file
475 self._last_serial = serial
477 return self._FormatJobID(serial)
480 def _GetJobPath(job_id):
481 return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
484 def _GetArchivedJobPath(job_id):
485 return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
488 def _ExtractJobID(cls, name):
489 m = cls._RE_JOB_FILE.match(name)
495 def _GetJobIDsUnlocked(self, archived=False):
496 """Return all known job IDs.
498 If the parameter archived is True, archived jobs IDs will be
499 included. Currently this argument is unused.
501 The method only looks at disk because it's a requirement that all
502 jobs are present on disk (so in the _memcache we don't have any
506 jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
507 jlist = utils.NiceSort(jlist)
510 def _ListJobFiles(self):
511 return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
512 if self._RE_JOB_FILE.match(name)]
514 def _LoadJobUnlocked(self, job_id):
515 job = self._memcache.get(job_id, None)
517 logging.debug("Found job %s in memcache", job_id)
520 filepath = self._GetJobPath(job_id)
521 logging.debug("Loading job from %s", filepath)
523 fd = open(filepath, "r")
525 if err.errno in (errno.ENOENT, ):
529 data = serializer.LoadJson(fd.read())
533 job = _QueuedJob.Restore(self, data)
534 self._memcache[job_id] = job
535 logging.debug("Added job %s to the cache", job_id)
538 def _GetJobsUnlocked(self, job_ids):
540 job_ids = self._GetJobIDsUnlocked()
542 return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
546 def SubmitJob(self, ops):
547 """Create and store a new job.
549 This enters the job into our job queue and also puts it on the new
550 queue, in order for it to be picked up by the queue processors.
553 @param ops: The list of OpCodes that will become the new job.
557 job_id = self._NewSerialUnlocked()
558 job = _QueuedJob(self, job_id, ops)
561 self.UpdateJobUnlocked(job)
563 logging.debug("Adding new job %s to the cache", job_id)
564 self._memcache[job_id] = job
567 self._wpool.AddTask(job)
572 def UpdateJobUnlocked(self, job):
573 filename = self._GetJobPath(job.id)
574 data = serializer.DumpJson(job.Serialize(), indent=False)
575 logging.debug("Writing job %s to %s", job.id, filename)
576 self._WriteAndReplicateFileUnlocked(filename, data)
578 # Notify waiters about potential changes
579 job.change.notifyAll()
583 def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
585 """Waits for changes in a job.
588 @param job_id: Job identifier
589 @type fields: list of strings
590 @param fields: Which fields to check for changes
591 @type prev_job_info: list or None
592 @param prev_job_info: Last job information returned
593 @type prev_log_serial: int
594 @param prev_log_serial: Last job message serial number
596 @param timeout: maximum time to wait
599 logging.debug("Waiting for changes in job %s", job_id)
600 end_time = time.time() + timeout
602 delta_time = end_time - time.time()
604 return constants.JOB_NOTCHANGED
606 job = self._LoadJobUnlocked(job_id)
608 logging.debug("Job %s not found", job_id)
611 status = job.CalcStatus()
612 job_info = self._GetJobInfoUnlocked(job, fields)
613 log_entries = job.GetLogEntries(prev_log_serial)
615 # Serializing and deserializing data can cause type changes (e.g. from
616 # tuple to list) or precision loss. We're doing it here so that we get
617 # the same modifications as the data received from the client. Without
618 # this, the comparison afterwards might fail without the data being
619 # significantly different.
620 job_info = serializer.LoadJson(serializer.DumpJson(job_info))
621 log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
623 if status not in (constants.JOB_STATUS_QUEUED,
624 constants.JOB_STATUS_RUNNING):
625 # Don't even try to wait if the job is no longer running, there will be
629 if (prev_job_info != job_info or
630 (log_entries and prev_log_serial != log_entries[0][0])):
633 logging.debug("Waiting again")
635 # Release the queue lock while waiting
636 job.change.wait(delta_time)
638 logging.debug("Job %s changed", job_id)
640 return (job_info, log_entries)
644 def CancelJob(self, job_id):
648 @param job_id: Job ID of job to be cancelled.
651 logging.debug("Cancelling job %s", job_id)
653 job = self._LoadJobUnlocked(job_id)
655 logging.debug("Job %s not found", job_id)
658 if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
659 logging.debug("Job %s is no longer in the queue", job.id)
664 op.status = constants.OP_STATUS_ERROR
665 op.result = "Job cancelled by request"
667 self.UpdateJobUnlocked(job)
671 def ArchiveJob(self, job_id):
675 @param job_id: Job ID of job to be archived.
678 logging.debug("Archiving job %s", job_id)
680 job = self._LoadJobUnlocked(job_id)
682 logging.debug("Job %s not found", job_id)
685 if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
686 constants.JOB_STATUS_SUCCESS,
687 constants.JOB_STATUS_ERROR):
688 logging.debug("Job %s is not yet done", job.id)
691 old = self._GetJobPath(job.id)
692 new = self._GetArchivedJobPath(job.id)
694 self._RenameFileUnlocked(old, new)
696 logging.debug("Successfully archived job %s", job.id)
698 def _GetJobInfoUnlocked(self, job, fields):
703 elif fname == "status":
704 row.append(job.CalcStatus())
706 row.append([op.input.__getstate__() for op in job.ops])
707 elif fname == "opresult":
708 row.append([op.result for op in job.ops])
709 elif fname == "opstatus":
710 row.append([op.status for op in job.ops])
711 elif fname == "oplog":
712 row.append([op.log for op in job.ops])
713 elif fname == "opstart":
714 row.append([op.start_timestamp for op in job.ops])
715 elif fname == "opend":
716 row.append([op.end_timestamp for op in job.ops])
717 elif fname == "received_ts":
718 row.append(job.received_timestamp)
719 elif fname == "start_ts":
720 row.append(job.start_timestamp)
721 elif fname == "end_ts":
722 row.append(job.end_timestamp)
723 elif fname == "summary":
724 row.append([op.input.Summary() for op in job.ops])
726 raise errors.OpExecError("Invalid job query field '%s'" % fname)
731 def QueryJobs(self, job_ids, fields):
732 """Returns a list of jobs in queue.
735 - job_ids: Sequence of job identifiers or None for all
736 - fields: Names of fields to return
741 for job in self._GetJobsUnlocked(job_ids):
745 jobs.append(self._GetJobInfoUnlocked(job, fields))
752 """Stops the job queue.
755 self._wpool.TerminateWorkers()
757 self._queue_lock.Close()
758 self._queue_lock = None