4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling."""
31 from ganeti import constants
32 from ganeti import serializer
33 from ganeti import workerpool
34 from ganeti import opcodes
35 from ganeti import errors
36 from ganeti import mcpu
37 from ganeti import utils
38 from ganeti import jstore
39 from ganeti import rpc
45 class _QueuedOpCode(object):
46 """Encasulates an opcode object.
48 Access is synchronized by the '_lock' attribute.
50 The 'log' attribute holds the execution log and consists of tuples
51 of the form (timestamp, level, message).
54 def __new__(cls, *args, **kwargs):
55 obj = object.__new__(cls, *args, **kwargs)
56 # Create a special lock for logging
57 obj._log_lock = threading.Lock()
60 def __init__(self, op):
62 self.status = constants.OP_STATUS_QUEUED
67 def Restore(cls, state):
68 obj = _QueuedOpCode.__new__(cls)
69 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
70 obj.status = state["status"]
71 obj.result = state["result"]
72 obj.log = state["log"]
76 self._log_lock.acquire()
79 "input": self.input.__getstate__(),
80 "status": self.status,
81 "result": self.result,
85 self._log_lock.release()
88 """Append a log entry.
94 log_type = constants.ELOG_MESSAGE
97 log_type, log_msg = args
99 self._log_lock.acquire()
101 # The time is split to make serialization easier and not lose more
103 self.log.append((utils.SplitTime(time.time()), log_type, log_msg))
105 self._log_lock.release()
107 def RetrieveLog(self, start_at=0):
108 """Retrieve (a part of) the execution log.
111 self._log_lock.acquire()
113 return self.log[start_at:]
115 self._log_lock.release()
118 class _QueuedJob(object):
119 """In-memory job representation.
121 This is what we use to track the user-submitted jobs.
124 def __new__(cls, *args, **kwargs):
125 obj = object.__new__(cls, *args, **kwargs)
126 # Condition to wait for changes
127 obj.change = threading.Condition()
130 def __init__(self, queue, job_id, ops):
133 raise Exception("No opcodes")
137 self.ops = [_QueuedOpCode(op) for op in ops]
138 self.run_op_index = -1
141 def Restore(cls, queue, state):
142 obj = _QueuedJob.__new__(cls)
145 obj.ops = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
146 obj.run_op_index = state["run_op_index"]
152 "ops": [op.Serialize() for op in self.ops],
153 "run_op_index": self.run_op_index,
156 def CalcStatus(self):
157 status = constants.JOB_STATUS_QUEUED
161 if op.status == constants.OP_STATUS_SUCCESS:
166 if op.status == constants.OP_STATUS_QUEUED:
168 elif op.status == constants.OP_STATUS_RUNNING:
169 status = constants.JOB_STATUS_RUNNING
170 elif op.status == constants.OP_STATUS_ERROR:
171 status = constants.JOB_STATUS_ERROR
172 # The whole job fails if one opcode failed
174 elif op.status == constants.OP_STATUS_CANCELED:
175 status = constants.OP_STATUS_CANCELED
179 status = constants.JOB_STATUS_SUCCESS
184 class _JobQueueWorker(workerpool.BaseWorker):
185 def RunTask(self, job):
188 This functions processes a job.
191 logging.debug("Worker %s processing job %s",
192 self.worker_id, job.id)
193 proc = mcpu.Processor(self.pool.queue.context)
198 for idx, op in enumerate(job.ops):
200 logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
204 job.run_op_index = idx
205 op.status = constants.OP_STATUS_RUNNING
207 queue.UpdateJobUnlocked(job)
209 input_opcode = op.input
218 job.change.notifyAll()
222 result = proc.ExecOpCode(input_opcode, _Log)
226 op.status = constants.OP_STATUS_SUCCESS
228 queue.UpdateJobUnlocked(job)
232 logging.debug("Op %s/%s: Successfully finished %s",
234 except Exception, err:
238 op.status = constants.OP_STATUS_ERROR
240 logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
242 queue.UpdateJobUnlocked(job)
247 except errors.GenericError, err:
248 logging.exception("Ganeti exception")
250 logging.exception("Unhandled exception")
255 status = job.CalcStatus()
258 logging.debug("Worker %s finished job %s, status = %s",
259 self.worker_id, job_id, status)
262 class _JobQueueWorkerPool(workerpool.WorkerPool):
263 def __init__(self, queue):
264 super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
269 class JobQueue(object):
270 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
272 def _RequireOpenQueue(fn):
273 """Decorator for "public" functions.
275 This function should be used for all "public" functions. That is, functions
276 usually called from other classes.
278 Important: Use this decorator only after utils.LockedMethod!
287 def wrapper(self, *args, **kwargs):
288 assert self._queue_lock is not None, "Queue should be open"
289 return fn(self, *args, **kwargs)
292 def __init__(self, context):
293 self.context = context
295 self._my_hostname = utils.HostInfo().name
298 self._lock = threading.Lock()
299 self.acquire = self._lock.acquire
300 self.release = self._lock.release
303 self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
306 self._last_serial = jstore.ReadSerial()
307 assert self._last_serial is not None, ("Serial file was modified between"
308 " check in jstore and here")
310 # Get initial list of nodes
311 self._nodes = set(self.context.cfg.GetNodeList())
315 self._nodes.remove(self._my_hostname)
319 # TODO: Check consistency across nodes
322 self._wpool = _JobQueueWorkerPool(self)
324 # We need to lock here because WorkerPool.AddTask() may start a job while
325 # we're still doing our work.
328 for job in self._GetJobsUnlocked(None):
329 status = job.CalcStatus()
331 if status in (constants.JOB_STATUS_QUEUED, ):
332 self._wpool.AddTask(job)
334 elif status in (constants.JOB_STATUS_RUNNING, ):
335 logging.warning("Unfinished job %s found: %s", job.id, job)
338 op.status = constants.OP_STATUS_ERROR
339 op.result = "Unclean master daemon shutdown"
341 self.UpdateJobUnlocked(job)
347 def AddNode(self, node_name):
348 assert node_name != self._my_hostname
350 # Clean queue directory on added node
351 rpc.call_jobqueue_purge(node_name)
353 # Upload the whole queue excluding archived jobs
354 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
356 # Upload current serial file
357 files.append(constants.JOB_QUEUE_SERIAL_FILE)
359 for file_name in files:
361 fd = open(file_name, "r")
367 result = rpc.call_jobqueue_update([node_name], file_name, content)
368 if not result[node_name]:
369 logging.error("Failed to upload %s to %s", file_name, node_name)
371 self._nodes.add(node_name)
375 def RemoveNode(self, node_name):
377 # The queue is removed by the "leave node" RPC call.
378 self._nodes.remove(node_name)
382 def _WriteAndReplicateFileUnlocked(self, file_name, data):
383 """Writes a file locally and then replicates it to all nodes.
386 utils.WriteFile(file_name, data=data)
389 result = rpc.call_jobqueue_update(self._nodes, file_name, data)
390 for node in self._nodes:
393 logging.error("Copy of job queue file to node %s failed", node)
395 # TODO: check failed_nodes
397 def _RenameFileUnlocked(self, old, new):
400 result = rpc.call_jobqueue_rename(self._nodes, old, new)
401 for node in self._nodes:
403 logging.error("Moving %s to %s failed on %s", old, new, node)
405 # TODO: check failed nodes
407 def _FormatJobID(self, job_id):
408 if not isinstance(job_id, (int, long)):
409 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
411 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
415 def _NewSerialUnlocked(self):
416 """Generates a new job identifier.
418 Job identifiers are unique during the lifetime of a cluster.
420 Returns: A string representing the job identifier.
424 serial = self._last_serial + 1
427 self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
430 # Keep it only if we were able to write the file
431 self._last_serial = serial
433 return self._FormatJobID(serial)
436 def _GetJobPath(job_id):
437 return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
440 def _GetArchivedJobPath(job_id):
441 return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
444 def _ExtractJobID(cls, name):
445 m = cls._RE_JOB_FILE.match(name)
451 def _GetJobIDsUnlocked(self, archived=False):
452 """Return all known job IDs.
454 If the parameter archived is True, archived jobs IDs will be
455 included. Currently this argument is unused.
457 The method only looks at disk because it's a requirement that all
458 jobs are present on disk (so in the _memcache we don't have any
462 jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
466 def _ListJobFiles(self):
467 return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
468 if self._RE_JOB_FILE.match(name)]
470 def _LoadJobUnlocked(self, job_id):
471 if job_id in self._memcache:
472 logging.debug("Found job %s in memcache", job_id)
473 return self._memcache[job_id]
475 filepath = self._GetJobPath(job_id)
476 logging.debug("Loading job from %s", filepath)
478 fd = open(filepath, "r")
480 if err.errno in (errno.ENOENT, ):
484 data = serializer.LoadJson(fd.read())
488 job = _QueuedJob.Restore(self, data)
489 self._memcache[job_id] = job
490 logging.debug("Added job %s to the cache", job_id)
493 def _GetJobsUnlocked(self, job_ids):
495 job_ids = self._GetJobIDsUnlocked()
497 return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
501 def SubmitJob(self, ops):
502 """Create and store a new job.
504 This enters the job into our job queue and also puts it on the new
505 queue, in order for it to be picked up by the queue processors.
508 @param ops: The list of OpCodes that will become the new job.
512 job_id = self._NewSerialUnlocked()
513 job = _QueuedJob(self, job_id, ops)
516 self.UpdateJobUnlocked(job)
518 logging.debug("Added new job %s to the cache", job_id)
519 self._memcache[job_id] = job
522 self._wpool.AddTask(job)
527 def UpdateJobUnlocked(self, job):
528 filename = self._GetJobPath(job.id)
529 data = serializer.DumpJson(job.Serialize(), indent=False)
530 logging.debug("Writing job %s to %s", job.id, filename)
531 self._WriteAndReplicateFileUnlocked(filename, data)
532 self._CleanCacheUnlocked([job.id])
534 # Notify waiters about potential changes
537 job.change.notifyAll()
541 def _CleanCacheUnlocked(self, exclude):
542 """Clean the memory cache.
544 The exceptions argument contains job IDs that should not be
548 assert isinstance(exclude, list)
550 for job in self._memcache.values():
551 if job.id in exclude:
553 if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
554 constants.JOB_STATUS_RUNNING):
555 logging.debug("Cleaning job %s from the cache", job.id)
557 del self._memcache[job.id]
562 def WaitForJobChanges(self, job_id, fields, previous):
563 logging.debug("Waiting for changes in job %s", job_id)
568 job = self._LoadJobUnlocked(job_id)
570 logging.debug("Job %s not found", job_id)
574 new_state = self._GetJobInfoUnlocked(job, fields)
578 # Serializing and deserializing data can cause type changes (e.g. from
579 # tuple to list) or precision loss. We're doing it here so that we get
580 # the same modifications as the data received from the client. Without
581 # this, the comparison afterwards might fail without the data being
582 # significantly different.
583 new_state = serializer.LoadJson(serializer.DumpJson(new_state))
585 if previous != new_state:
594 logging.debug("Job %s changed", job_id)
600 def CancelJob(self, job_id):
604 @param job_id: Job ID of job to be cancelled.
607 logging.debug("Cancelling job %s", job_id)
609 job = self._LoadJobUnlocked(job_id)
611 logging.debug("Job %s not found", job_id)
614 if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
615 logging.debug("Job %s is no longer in the queue", job.id)
620 op.status = constants.OP_STATUS_ERROR
621 op.result = "Job cancelled by request"
623 self.UpdateJobUnlocked(job)
627 def ArchiveJob(self, job_id):
631 @param job_id: Job ID of job to be archived.
634 logging.debug("Archiving job %s", job_id)
636 job = self._LoadJobUnlocked(job_id)
638 logging.debug("Job %s not found", job_id)
641 if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
642 constants.JOB_STATUS_SUCCESS,
643 constants.JOB_STATUS_ERROR):
644 logging.debug("Job %s is not yet done", job.id)
648 old = self._GetJobPath(job.id)
649 new = self._GetArchivedJobPath(job.id)
651 self._RenameFileUnlocked(old, new)
653 logging.debug("Successfully archived job %s", job.id)
655 # Cleaning the cache because we don't know what os.rename actually did
656 # and to be on the safe side.
657 self._CleanCacheUnlocked([])
659 def _GetJobInfoUnlocked(self, job, fields):
664 elif fname == "status":
665 row.append(job.CalcStatus())
667 row.append([op.input.__getstate__() for op in job.ops])
668 elif fname == "opresult":
669 row.append([op.result for op in job.ops])
670 elif fname == "opstatus":
671 row.append([op.status for op in job.ops])
672 elif fname == "ticker":
673 ji = job.run_op_index
677 lmsg = job.ops[ji].RetrieveLog(-1)
678 # message might be empty here
685 raise errors.OpExecError("Invalid job query field '%s'" % fname)
690 def QueryJobs(self, job_ids, fields):
691 """Returns a list of jobs in queue.
694 - job_ids: Sequence of job identifiers or None for all
695 - fields: Names of fields to return
700 for job in self._GetJobsUnlocked(job_ids):
704 jobs.append(self._GetJobInfoUnlocked(job, fields))
711 """Stops the job queue.
714 self._wpool.TerminateWorkers()
716 self._queue_lock.Close()
717 self._queue_lock = None