4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling."""
31 from ganeti import constants
32 from ganeti import serializer
33 from ganeti import workerpool
34 from ganeti import opcodes
35 from ganeti import errors
36 from ganeti import mcpu
37 from ganeti import utils
38 from ganeti import jstore
39 from ganeti import rpc
45 class _QueuedOpCode(object):
46 """Encasulates an opcode object.
48 Access is synchronized by the '_lock' attribute.
50 The 'log' attribute holds the execution log and consists of tuples
51 of the form (timestamp, level, message).
54 def __new__(cls, *args, **kwargs):
55 obj = object.__new__(cls, *args, **kwargs)
56 # Create a special lock for logging
57 obj._log_lock = threading.Lock()
60 def __init__(self, op):
62 self.status = constants.OP_STATUS_QUEUED
67 def Restore(cls, state):
68 obj = _QueuedOpCode.__new__(cls)
69 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
70 obj.status = state["status"]
71 obj.result = state["result"]
72 obj.log = state["log"]
76 self._log_lock.acquire()
79 "input": self.input.__getstate__(),
80 "status": self.status,
81 "result": self.result,
85 self._log_lock.release()
88 """Append a log entry.
94 log_type = constants.ELOG_MESSAGE
97 log_type, log_msg = args
99 self._log_lock.acquire()
101 self.log.append((time.time(), log_type, log_msg))
103 self._log_lock.release()
105 def RetrieveLog(self, start_at=0):
106 """Retrieve (a part of) the execution log.
109 self._log_lock.acquire()
111 return self.log[start_at:]
113 self._log_lock.release()
116 class _QueuedJob(object):
117 """In-memory job representation.
119 This is what we use to track the user-submitted jobs.
122 def __init__(self, queue, job_id, ops):
125 raise Exception("No opcodes")
129 self.ops = [_QueuedOpCode(op) for op in ops]
130 self.run_op_index = -1
133 def Restore(cls, queue, state):
134 obj = _QueuedJob.__new__(cls)
137 obj.ops = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
138 obj.run_op_index = state["run_op_index"]
144 "ops": [op.Serialize() for op in self.ops],
145 "run_op_index": self.run_op_index,
148 def CalcStatus(self):
149 status = constants.JOB_STATUS_QUEUED
153 if op.status == constants.OP_STATUS_SUCCESS:
158 if op.status == constants.OP_STATUS_QUEUED:
160 elif op.status == constants.OP_STATUS_RUNNING:
161 status = constants.JOB_STATUS_RUNNING
162 elif op.status == constants.OP_STATUS_ERROR:
163 status = constants.JOB_STATUS_ERROR
164 # The whole job fails if one opcode failed
166 elif op.status == constants.OP_STATUS_CANCELED:
167 status = constants.OP_STATUS_CANCELED
171 status = constants.JOB_STATUS_SUCCESS
176 class _JobQueueWorker(workerpool.BaseWorker):
177 def RunTask(self, job):
180 This functions processes a job.
183 logging.debug("Worker %s processing job %s",
184 self.worker_id, job.id)
185 proc = mcpu.Processor(self.pool.queue.context)
190 for idx, op in enumerate(job.ops):
192 logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
196 job.run_op_index = idx
197 op.status = constants.OP_STATUS_RUNNING
199 queue.UpdateJobUnlocked(job)
201 input_opcode = op.input
205 result = proc.ExecOpCode(input_opcode, op.Log)
209 op.status = constants.OP_STATUS_SUCCESS
211 queue.UpdateJobUnlocked(job)
215 logging.debug("Op %s/%s: Successfully finished %s",
217 except Exception, err:
221 op.status = constants.OP_STATUS_ERROR
223 logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
225 queue.UpdateJobUnlocked(job)
230 except errors.GenericError, err:
231 logging.exception("Ganeti exception")
233 logging.exception("Unhandled exception")
238 status = job.CalcStatus()
241 logging.debug("Worker %s finished job %s, status = %s",
242 self.worker_id, job_id, status)
245 class _JobQueueWorkerPool(workerpool.WorkerPool):
246 def __init__(self, queue):
247 super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
252 class JobQueue(object):
253 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
255 def _RequireOpenQueue(fn):
256 """Decorator for "public" functions.
258 This function should be used for all "public" functions. That is, functions
259 usually called from other classes.
261 Important: Use this decorator only after utils.LockedMethod!
270 def wrapper(self, *args, **kwargs):
271 assert self._queue_lock is not None, "Queue should be open"
272 return fn(self, *args, **kwargs)
275 def __init__(self, context):
276 self.context = context
278 self._my_hostname = utils.HostInfo().name
281 self._lock = threading.Lock()
282 self.acquire = self._lock.acquire
283 self.release = self._lock.release
286 self._queue_lock = jstore.InitAndVerifyQueue(exclusive=True)
289 self._last_serial = jstore.ReadSerial()
290 assert self._last_serial is not None, ("Serial file was modified between"
291 " check in jstore and here")
293 # Get initial list of nodes
294 self._nodes = set(self.context.cfg.GetNodeList())
298 self._nodes.remove(self._my_hostname)
302 # TODO: Check consistency across nodes
305 self._wpool = _JobQueueWorkerPool(self)
307 # We need to lock here because WorkerPool.AddTask() may start a job while
308 # we're still doing our work.
311 for job in self._GetJobsUnlocked(None):
312 status = job.CalcStatus()
314 if status in (constants.JOB_STATUS_QUEUED, ):
315 self._wpool.AddTask(job)
317 elif status in (constants.JOB_STATUS_RUNNING, ):
318 logging.warning("Unfinished job %s found: %s", job.id, job)
321 op.status = constants.OP_STATUS_ERROR
322 op.result = "Unclean master daemon shutdown"
324 self.UpdateJobUnlocked(job)
330 def AddNode(self, node_name):
331 assert node_name != self._my_hostname
333 # TODO: Clean queue directory on added node
335 # Upload the whole queue excluding archived jobs
336 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
338 # Upload current serial file
339 files.append(constants.JOB_QUEUE_SERIAL_FILE)
341 for file_name in files:
342 result = rpc.call_upload_file([node_name], file_name)
343 if not result[node_name]:
344 logging.error("Failed to upload %s to %s", file_name, node_name)
346 self._nodes.add(node_name)
350 def RemoveNode(self, node_name):
352 # The queue is removed by the "leave node" RPC call.
353 self._nodes.remove(node_name)
357 def _WriteAndReplicateFileUnlocked(self, file_name, data):
358 """Writes a file locally and then replicates it to all nodes.
361 utils.WriteFile(file_name, data=data)
364 result = rpc.call_upload_file(self._nodes, file_name)
365 for node in self._nodes:
368 logging.error("Copy of job queue file to node %s failed", node)
370 # TODO: check failed_nodes
372 def _FormatJobID(self, job_id):
373 if not isinstance(job_id, (int, long)):
374 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
376 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
380 def _NewSerialUnlocked(self):
381 """Generates a new job identifier.
383 Job identifiers are unique during the lifetime of a cluster.
385 Returns: A string representing the job identifier.
389 serial = self._last_serial + 1
392 self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
395 # Keep it only if we were able to write the file
396 self._last_serial = serial
398 return self._FormatJobID(serial)
401 def _GetJobPath(job_id):
402 return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
405 def _GetArchivedJobPath(job_id):
406 return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
409 def _ExtractJobID(cls, name):
410 m = cls._RE_JOB_FILE.match(name)
416 def _GetJobIDsUnlocked(self, archived=False):
417 """Return all known job IDs.
419 If the parameter archived is True, archived jobs IDs will be
420 included. Currently this argument is unused.
422 The method only looks at disk because it's a requirement that all
423 jobs are present on disk (so in the _memcache we don't have any
427 jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
431 def _ListJobFiles(self):
432 return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
433 if self._RE_JOB_FILE.match(name)]
435 def _LoadJobUnlocked(self, job_id):
436 if job_id in self._memcache:
437 logging.debug("Found job %s in memcache", job_id)
438 return self._memcache[job_id]
440 filepath = self._GetJobPath(job_id)
441 logging.debug("Loading job from %s", filepath)
443 fd = open(filepath, "r")
445 if err.errno in (errno.ENOENT, ):
449 data = serializer.LoadJson(fd.read())
453 job = _QueuedJob.Restore(self, data)
454 self._memcache[job_id] = job
455 logging.debug("Added job %s to the cache", job_id)
458 def _GetJobsUnlocked(self, job_ids):
460 job_ids = self._GetJobIDsUnlocked()
462 return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
466 def SubmitJob(self, ops):
467 """Create and store a new job.
469 This enters the job into our job queue and also puts it on the new
470 queue, in order for it to be picked up by the queue processors.
473 @param ops: The list of OpCodes that will become the new job.
477 job_id = self._NewSerialUnlocked()
478 job = _QueuedJob(self, job_id, ops)
481 self.UpdateJobUnlocked(job)
483 logging.debug("Added new job %s to the cache", job_id)
484 self._memcache[job_id] = job
487 self._wpool.AddTask(job)
492 def UpdateJobUnlocked(self, job):
493 filename = self._GetJobPath(job.id)
494 data = serializer.DumpJson(job.Serialize(), indent=False)
495 logging.debug("Writing job %s to %s", job.id, filename)
496 self._WriteAndReplicateFileUnlocked(filename, data)
497 self._CleanCacheUnlocked([job.id])
499 def _CleanCacheUnlocked(self, exclude):
500 """Clean the memory cache.
502 The exceptions argument contains job IDs that should not be
506 assert isinstance(exclude, list)
508 for job in self._memcache.values():
509 if job.id in exclude:
511 if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
512 constants.JOB_STATUS_RUNNING):
513 logging.debug("Cleaning job %s from the cache", job.id)
515 del self._memcache[job.id]
521 def CancelJob(self, job_id):
525 @param job_id: Job ID of job to be cancelled.
528 logging.debug("Cancelling job %s", job_id)
530 job = self._LoadJobUnlocked(job_id)
532 logging.debug("Job %s not found", job_id)
535 if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
536 logging.debug("Job %s is no longer in the queue", job.id)
541 op.status = constants.OP_STATUS_ERROR
542 op.result = "Job cancelled by request"
544 self.UpdateJobUnlocked(job)
548 def ArchiveJob(self, job_id):
552 @param job_id: Job ID of job to be archived.
555 logging.debug("Archiving job %s", job_id)
557 job = self._LoadJobUnlocked(job_id)
559 logging.debug("Job %s not found", job_id)
562 if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
563 constants.JOB_STATUS_SUCCESS,
564 constants.JOB_STATUS_ERROR):
565 logging.debug("Job %s is not yet done", job.id)
569 old = self._GetJobPath(job.id)
570 new = self._GetArchivedJobPath(job.id)
574 logging.debug("Successfully archived job %s", job.id)
576 # Cleaning the cache because we don't know what os.rename actually did
577 # and to be on the safe side.
578 self._CleanCacheUnlocked([])
580 def _GetJobInfoUnlocked(self, job, fields):
585 elif fname == "status":
586 row.append(job.CalcStatus())
588 row.append([op.input.__getstate__() for op in job.ops])
589 elif fname == "opresult":
590 row.append([op.result for op in job.ops])
591 elif fname == "opstatus":
592 row.append([op.status for op in job.ops])
593 elif fname == "ticker":
594 ji = job.run_op_index
598 lmsg = job.ops[ji].RetrieveLog(-1)
599 # message might be empty here
606 raise errors.OpExecError("Invalid job query field '%s'" % fname)
611 def QueryJobs(self, job_ids, fields):
612 """Returns a list of jobs in queue.
615 - job_ids: Sequence of job identifiers or None for all
616 - fields: Names of fields to return
621 for job in self._GetJobsUnlocked(job_ids):
625 jobs.append(self._GetJobInfoUnlocked(job, fields))
632 """Stops the job queue.
635 self._wpool.TerminateWorkers()
637 self._queue_lock.Close()
638 self._queue_lock = None