4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling."""
31 from ganeti import constants
32 from ganeti import serializer
33 from ganeti import workerpool
34 from ganeti import opcodes
35 from ganeti import errors
36 from ganeti import mcpu
37 from ganeti import utils
38 from ganeti import rpc
44 class _QueuedOpCode(object):
45 """Encasulates an opcode object.
47 Access is synchronized by the '_lock' attribute.
49 The 'log' attribute holds the execution log and consists of tuples
50 of the form (timestamp, level, message).
53 def __new__(cls, *args, **kwargs):
54 obj = object.__new__(cls, *args, **kwargs)
55 # Create a special lock for logging
56 obj._log_lock = threading.Lock()
59 def __init__(self, op):
61 self.status = constants.OP_STATUS_QUEUED
66 def Restore(cls, state):
67 obj = _QueuedOpCode.__new__(cls)
68 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
69 obj.status = state["status"]
70 obj.result = state["result"]
71 obj.log = state["log"]
75 self._log_lock.acquire()
78 "input": self.input.__getstate__(),
79 "status": self.status,
80 "result": self.result,
84 self._log_lock.release()
87 """Append a log entry.
93 log_type = constants.ELOG_MESSAGE
96 log_type, log_msg = args
98 self._log_lock.acquire()
100 self.log.append((time.time(), log_type, log_msg))
102 self._log_lock.release()
104 def RetrieveLog(self, start_at=0):
105 """Retrieve (a part of) the execution log.
108 self._log_lock.acquire()
110 return self.log[start_at:]
112 self._log_lock.release()
115 class _QueuedJob(object):
116 """In-memory job representation.
118 This is what we use to track the user-submitted jobs.
121 def __init__(self, queue, job_id, ops):
124 raise Exception("No opcodes")
128 self.ops = [_QueuedOpCode(op) for op in ops]
129 self.run_op_index = -1
132 def Restore(cls, queue, state):
133 obj = _QueuedJob.__new__(cls)
136 obj.ops = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
137 obj.run_op_index = state["run_op_index"]
143 "ops": [op.Serialize() for op in self.ops],
144 "run_op_index": self.run_op_index,
147 def CalcStatus(self):
148 status = constants.JOB_STATUS_QUEUED
152 if op.status == constants.OP_STATUS_SUCCESS:
157 if op.status == constants.OP_STATUS_QUEUED:
159 elif op.status == constants.OP_STATUS_RUNNING:
160 status = constants.JOB_STATUS_RUNNING
161 elif op.status == constants.OP_STATUS_ERROR:
162 status = constants.JOB_STATUS_ERROR
163 # The whole job fails if one opcode failed
165 elif op.status == constants.OP_STATUS_CANCELED:
166 status = constants.OP_STATUS_CANCELED
170 status = constants.JOB_STATUS_SUCCESS
175 class _JobQueueWorker(workerpool.BaseWorker):
176 def RunTask(self, job):
179 This functions processes a job.
182 logging.debug("Worker %s processing job %s",
183 self.worker_id, job.id)
184 proc = mcpu.Processor(self.pool.context)
189 for idx, op in enumerate(job.ops):
191 logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
195 job.run_op_index = idx
196 op.status = constants.OP_STATUS_RUNNING
198 queue.UpdateJobUnlocked(job)
200 input_opcode = op.input
204 result = proc.ExecOpCode(input_opcode, op.Log)
208 op.status = constants.OP_STATUS_SUCCESS
210 queue.UpdateJobUnlocked(job)
214 logging.debug("Op %s/%s: Successfully finished %s",
216 except Exception, err:
220 op.status = constants.OP_STATUS_ERROR
222 logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
224 queue.UpdateJobUnlocked(job)
229 except errors.GenericError, err:
230 logging.exception("Ganeti exception")
232 logging.exception("Unhandled exception")
237 status = job.CalcStatus()
240 logging.debug("Worker %s finished job %s, status = %s",
241 self.worker_id, job_id, status)
244 class _JobQueueWorkerPool(workerpool.WorkerPool):
245 def __init__(self, context):
246 super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
248 self.context = context
251 class JobQueue(object):
252 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
254 def __init__(self, context):
256 self._my_hostname = utils.HostInfo().name
259 self._lock = threading.Lock()
260 self.acquire = self._lock.acquire
261 self.release = self._lock.release
263 # Make sure our directories exists
264 for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR):
268 if err.errno not in (errno.EEXIST, ):
272 self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
274 utils.LockFile(self.lock_fd)
281 version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
283 if err.errno not in (errno.ENOENT, ):
287 self._InitQueueUnlocked()
290 version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
293 # Try to read version
294 version = int(version_fd.read(128))
297 if version != constants.JOB_QUEUE_VERSION:
298 raise errors.JobQueueError("Found version %s, expected %s",
299 version, constants.JOB_QUEUE_VERSION)
303 self._last_serial = self._ReadSerial()
304 if self._last_serial is None:
305 raise errors.ConfigurationError("Can't read/parse the job queue serial"
309 self._wpool = _JobQueueWorkerPool(context)
311 # We need to lock here because WorkerPool.AddTask() may start a job while
312 # we're still doing our work.
315 for job in self._GetJobsUnlocked(None):
316 status = job.CalcStatus()
318 if status in (constants.JOB_STATUS_QUEUED, ):
319 self._wpool.AddTask(job)
321 elif status in (constants.JOB_STATUS_RUNNING, ):
322 logging.warning("Unfinished job %s found: %s", job.id, job)
325 op.status = constants.OP_STATUS_ERROR
326 op.result = "Unclean master daemon shutdown"
328 self.UpdateJobUnlocked(job)
334 """Try to read the job serial file.
337 @return: If the serial can be read, then it is returned. Otherwise None
342 serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
345 serial = int(serial_fd.read(1024).strip())
348 except (ValueError, EnvironmentError):
353 def _InitQueueUnlocked(self):
354 assert self.lock_fd, "Queue should be open"
356 utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
357 data="%s\n" % constants.JOB_QUEUE_VERSION)
358 if self._ReadSerial() is None:
359 utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
362 def _FormatJobID(self, job_id):
363 if not isinstance(job_id, (int, long)):
364 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
366 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
370 def _NewSerialUnlocked(self, nodes):
371 """Generates a new job identifier.
373 Job identifiers are unique during the lifetime of a cluster.
375 Returns: A string representing the job identifier.
378 assert self.lock_fd, "Queue should be open"
381 serial = self._last_serial + 1
384 utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
385 data="%s\n" % serial)
387 # Keep it only if we were able to write the file
388 self._last_serial = serial
390 # Distribute the serial to the other nodes
392 nodes.remove(self._my_hostname)
396 result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE)
399 logging.error("copy of job queue file to node %s failed", node)
401 return self._FormatJobID(serial)
404 def _GetJobPath(job_id):
405 return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
408 def _GetArchivedJobPath(job_id):
409 return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
412 def _ExtractJobID(cls, name):
413 m = cls._RE_JOB_FILE.match(name)
419 def _GetJobIDsUnlocked(self, archived=False):
420 """Return all known job IDs.
422 If the parameter archived is True, archived jobs IDs will be
423 included. Currently this argument is unused.
425 The method only looks at disk because it's a requirement that all
426 jobs are present on disk (so in the _memcache we don't have any
430 jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
434 def _ListJobFiles(self):
435 assert self.lock_fd, "Queue should be open"
437 return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
438 if self._RE_JOB_FILE.match(name)]
440 def _LoadJobUnlocked(self, job_id):
441 assert self.lock_fd, "Queue should be open"
443 if job_id in self._memcache:
444 logging.debug("Found job %s in memcache", job_id)
445 return self._memcache[job_id]
447 filepath = self._GetJobPath(job_id)
448 logging.debug("Loading job from %s", filepath)
450 fd = open(filepath, "r")
452 if err.errno in (errno.ENOENT, ):
456 data = serializer.LoadJson(fd.read())
460 job = _QueuedJob.Restore(self, data)
461 self._memcache[job_id] = job
462 logging.debug("Added job %s to the cache", job_id)
465 def _GetJobsUnlocked(self, job_ids):
467 job_ids = self._GetJobIDsUnlocked()
469 return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
472 def SubmitJob(self, ops, nodes):
473 """Create and store a new job.
475 This enters the job into our job queue and also puts it on the new
476 queue, in order for it to be picked up by the queue processors.
479 @param ops: The list of OpCodes that will become the new job.
481 @param nodes: The list of nodes to which the new job serial will be
485 assert self.lock_fd, "Queue should be open"
488 job_id = self._NewSerialUnlocked(nodes)
489 job = _QueuedJob(self, job_id, ops)
492 self.UpdateJobUnlocked(job)
494 logging.debug("Added new job %s to the cache", job_id)
495 self._memcache[job_id] = job
498 self._wpool.AddTask(job)
502 def UpdateJobUnlocked(self, job):
503 assert self.lock_fd, "Queue should be open"
505 filename = self._GetJobPath(job.id)
506 logging.debug("Writing job %s to %s", job.id, filename)
507 utils.WriteFile(filename,
508 data=serializer.DumpJson(job.Serialize(), indent=False))
509 self._CleanCacheUnlocked([job.id])
511 def _CleanCacheUnlocked(self, exclude):
512 """Clean the memory cache.
514 The exceptions argument contains job IDs that should not be
518 assert isinstance(exclude, list)
520 for job in self._memcache.values():
521 if job.id in exclude:
523 if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
524 constants.JOB_STATUS_RUNNING):
525 logging.debug("Cleaning job %s from the cache", job.id)
527 del self._memcache[job.id]
532 def CancelJob(self, job_id):
536 @param job_id: Job ID of job to be cancelled.
539 logging.debug("Cancelling job %s", job_id)
541 job = self._LoadJobUnlocked(job_id)
543 logging.debug("Job %s not found", job_id)
546 if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
547 logging.debug("Job %s is no longer in the queue", job.id)
552 op.status = constants.OP_STATUS_ERROR
553 op.result = "Job cancelled by request"
555 self.UpdateJobUnlocked(job)
558 def ArchiveJob(self, job_id):
562 @param job_id: Job ID of job to be archived.
565 logging.debug("Archiving job %s", job_id)
567 job = self._LoadJobUnlocked(job_id)
569 logging.debug("Job %s not found", job_id)
572 if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
573 constants.JOB_STATUS_SUCCESS,
574 constants.JOB_STATUS_ERROR):
575 logging.debug("Job %s is not yet done", job.id)
579 old = self._GetJobPath(job.id)
580 new = self._GetArchivedJobPath(job.id)
584 logging.debug("Successfully archived job %s", job.id)
586 # Cleaning the cache because we don't know what os.rename actually did
587 # and to be on the safe side.
588 self._CleanCacheUnlocked([])
590 def _GetJobInfoUnlocked(self, job, fields):
595 elif fname == "status":
596 row.append(job.CalcStatus())
598 row.append([op.input.__getstate__() for op in job.ops])
599 elif fname == "opresult":
600 row.append([op.result for op in job.ops])
601 elif fname == "opstatus":
602 row.append([op.status for op in job.ops])
603 elif fname == "ticker":
604 ji = job.run_op_index
608 lmsg = job.ops[ji].RetrieveLog(-1)
609 # message might be empty here
616 raise errors.OpExecError("Invalid job query field '%s'" % fname)
620 def QueryJobs(self, job_ids, fields):
621 """Returns a list of jobs in queue.
624 - job_ids: Sequence of job identifiers or None for all
625 - fields: Names of fields to return
630 for job in self._GetJobsUnlocked(job_ids):
634 jobs.append(self._GetJobInfoUnlocked(job, fields))
640 """Stops the job queue.
643 assert self.lock_fd, "Queue should be open"
645 self._wpool.TerminateWorkers()