4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling."""
31 from ganeti import constants
32 from ganeti import serializer
33 from ganeti import workerpool
34 from ganeti import opcodes
35 from ganeti import errors
36 from ganeti import mcpu
37 from ganeti import utils
38 from ganeti import rpc
44 class _QueuedOpCode(object):
45 """Encasulates an opcode object.
47 Access is synchronized by the '_lock' attribute.
49 The 'log' attribute holds the execution log and consists of tuples
50 of the form (timestamp, level, message).
53 def __init__(self, op):
54 self.__Setup(op, constants.OP_STATUS_QUEUED, None, [])
56 def __Setup(self, input_, status, result, log):
57 self._lock = threading.Lock()
64 def Restore(cls, state):
65 obj = object.__new__(cls)
66 obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
67 state["status"], state["result"], state["log"])
73 "input": self.input.__getstate__(),
74 "status": self.status,
75 "result": self.result,
81 """Returns the original opcode.
87 def SetStatus(self, status, result):
88 """Update the opcode status and result.
96 """Get the opcode status.
103 """Get the opcode result.
109 def Log(self, *args):
110 """Append a log entry.
116 log_type = constants.ELOG_MESSAGE
119 log_type, log_msg = args
120 self.log.append((time.time(), log_type, log_msg))
123 def RetrieveLog(self, start_at=0):
124 """Retrieve (a part of) the execution log.
127 return self.log[start_at:]
130 class _QueuedJob(object):
131 """In-memory job representation.
133 This is what we use to track the user-submitted jobs.
136 def __init__(self, storage, job_id, ops):
139 raise Exception("No opcodes")
141 self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1)
143 def __Setup(self, storage, job_id, ops, run_op_index):
144 self._lock = threading.Lock()
145 self.storage = storage
148 self.run_op_index = run_op_index
151 def Restore(cls, storage, state):
152 obj = object.__new__(cls)
153 op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
154 obj.__Setup(storage, state["id"], op_list, state["run_op_index"])
160 "ops": [op.Serialize() for op in self._ops],
161 "run_op_index": self.run_op_index,
164 def SetUnclean(self, msg):
167 op.SetStatus(constants.OP_STATUS_ERROR, msg)
169 self.storage.UpdateJob(self)
172 status = constants.JOB_STATUS_QUEUED
176 op_status = op.GetStatus()
177 if op_status == constants.OP_STATUS_SUCCESS:
182 if op_status == constants.OP_STATUS_QUEUED:
184 elif op_status == constants.OP_STATUS_RUNNING:
185 status = constants.JOB_STATUS_RUNNING
186 elif op_status == constants.OP_STATUS_ERROR:
187 status = constants.JOB_STATUS_ERROR
188 # The whole job fails if one opcode failed
192 status = constants.JOB_STATUS_SUCCESS
197 def GetRunOpIndex(self):
198 return self.run_op_index
203 This functions processes a this job in the context of given processor
207 - proc: Ganeti Processor to run the job with
211 count = len(self._ops)
212 for idx, op in enumerate(self._ops):
214 logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
218 self.run_op_index = idx
222 op.SetStatus(constants.OP_STATUS_RUNNING, None)
223 self.storage.UpdateJob(self)
225 result = proc.ExecOpCode(op.input, op.Log)
227 op.SetStatus(constants.OP_STATUS_SUCCESS, result)
228 self.storage.UpdateJob(self)
229 logging.debug("Op %s/%s: Successfully finished %s",
231 except Exception, err:
233 op.SetStatus(constants.OP_STATUS_ERROR, str(err))
234 logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
236 self.storage.UpdateJob(self)
239 except errors.GenericError, err:
240 logging.error("ganeti exception %s", exc_info=err)
241 except Exception, err:
242 logging.error("unhandled exception %s", exc_info=err)
244 logging.error("unhandled unknown exception %s", exc_info=err)
247 class _JobQueueWorker(workerpool.BaseWorker):
248 def RunTask(self, job):
249 logging.debug("Worker %s processing job %s",
250 self.worker_id, job.id)
251 # TODO: feedback function
252 proc = mcpu.Processor(self.pool.context)
256 logging.debug("Worker %s finished job %s, status = %s",
257 self.worker_id, job.id, job.GetStatus())
260 class _JobQueueWorkerPool(workerpool.WorkerPool):
261 def __init__(self, context):
262 super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
264 self.context = context
267 class JobStorage(object):
268 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
271 self._lock = threading.Lock()
273 self._my_hostname = utils.HostInfo().name
275 # Make sure our directory exists
277 os.mkdir(constants.QUEUE_DIR, 0700)
279 if err.errno not in (errno.EEXIST, ):
283 self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
285 utils.LockFile(self.lock_fd)
292 version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
294 if err.errno not in (errno.ENOENT, ):
298 self._InitQueueUnlocked()
301 version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
304 # Try to read version
305 version = int(version_fd.read(128))
308 if version != constants.JOB_QUEUE_VERSION:
309 raise errors.JobQueueError("Found version %s, expected %s",
310 version, constants.JOB_QUEUE_VERSION)
314 self._last_serial = self._ReadSerial()
315 if self._last_serial is None:
316 raise errors.ConfigurationError("Can't read/parse the job queue serial"
321 """Try to read the job serial file.
324 @return: If the serial can be read, then it is returned. Otherwise None
329 serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
332 serial = int(serial_fd.read(1024).strip())
335 except (ValueError, EnvironmentError):
341 assert self.lock_fd, "Queue should be open"
346 def _InitQueueUnlocked(self):
347 assert self.lock_fd, "Queue should be open"
349 utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
350 data="%s\n" % constants.JOB_QUEUE_VERSION)
351 if self._ReadSerial() is None:
352 utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
355 def _NewSerialUnlocked(self, nodes):
356 """Generates a new job identifier.
358 Job identifiers are unique during the lifetime of a cluster.
360 Returns: A string representing the job identifier.
363 assert self.lock_fd, "Queue should be open"
366 serial = self._last_serial + 1
369 utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
370 data="%s\n" % serial)
372 # Keep it only if we were able to write the file
373 self._last_serial = serial
375 # Distribute the serial to the other nodes
377 nodes.remove(self._my_hostname)
381 result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE)
384 logging.error("copy of job queue file to node %s failed", node)
388 def _GetJobPath(self, job_id):
389 return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
391 def _GetJobIDsUnlocked(self, archived=False):
392 """Return all known job IDs.
394 If the parameter archived is True, archived jobs IDs will be
395 included. Currently this argument is unused.
397 The method only looks at disk because it's a requirement that all
398 jobs are present on disk (so in the _memcache we don't have any
402 jfiles = self._ListJobFiles()
403 jlist = [int(m.group(1)) for m in
404 [self._RE_JOB_FILE.match(name) for name in jfiles]]
408 def _ListJobFiles(self):
409 assert self.lock_fd, "Queue should be open"
411 return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
412 if self._RE_JOB_FILE.match(name)]
414 def _LoadJobUnlocked(self, job_id):
415 assert self.lock_fd, "Queue should be open"
417 if job_id in self._memcache:
418 logging.debug("Found job %s in memcache", job_id)
419 return self._memcache[job_id]
421 filepath = self._GetJobPath(job_id)
422 logging.debug("Loading job from %s", filepath)
424 fd = open(filepath, "r")
426 if err.errno in (errno.ENOENT, ):
430 data = serializer.LoadJson(fd.read())
434 job = _QueuedJob.Restore(self, data)
435 self._memcache[job_id] = job
436 logging.debug("Added job %s to the cache", job_id)
439 def _GetJobsUnlocked(self, job_ids):
441 job_ids = self._GetJobIDsUnlocked()
443 return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
446 def GetJobs(self, job_ids):
447 return self._GetJobsUnlocked(job_ids)
450 def AddJob(self, ops, nodes):
451 """Create and store on disk a new job.
454 @param ops: The list of OpCodes that will become the new job.
456 @param nodes: The list of nodes to which the new job serial will be
460 assert self.lock_fd, "Queue should be open"
463 job_id = self._NewSerialUnlocked(nodes)
464 job = _QueuedJob(self, job_id, ops)
467 self._UpdateJobUnlocked(job)
469 logging.debug("Added new job %s to the cache", job_id)
470 self._memcache[job_id] = job
474 def _UpdateJobUnlocked(self, job):
475 assert self.lock_fd, "Queue should be open"
477 filename = self._GetJobPath(job.id)
478 logging.debug("Writing job %s to %s", job.id, filename)
479 utils.WriteFile(filename,
480 data=serializer.DumpJson(job.Serialize(), indent=False))
481 self._CleanCacheUnlocked([job.id])
483 def _CleanCacheUnlocked(self, exclude):
484 """Clean the memory cache.
486 The exceptions argument contains job IDs that should not be
490 assert isinstance(exclude, list)
491 for job in self._memcache.values():
492 if job.id in exclude:
494 if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,
495 constants.JOB_STATUS_RUNNING):
496 logging.debug("Cleaning job %s from the cache", job.id)
498 del self._memcache[job.id]
503 def UpdateJob(self, job):
504 return self._UpdateJobUnlocked(job)
506 def ArchiveJob(self, job_id):
507 raise NotImplementedError()
514 def __init__(self, context):
515 self._lock = threading.Lock()
516 self._jobs = JobStorage()
517 self._wpool = _JobQueueWorkerPool(context)
519 for job in self._jobs.GetJobs(None):
520 status = job.GetStatus()
521 if status in (constants.JOB_STATUS_QUEUED, ):
522 self._wpool.AddTask(job)
524 elif status in (constants.JOB_STATUS_RUNNING, ):
525 logging.warning("Unfinished job %s found: %s", job.id, job)
526 job.SetUnclean("Unclean master daemon shutdown")
529 def SubmitJob(self, ops, nodes):
530 """Add a new job to the queue.
532 This enters the job into our job queue and also puts it on the new
533 queue, in order for it to be picked up by the queue processors.
536 @param ops: the sequence of opcodes that will become the new job
538 @param nodes: the list of nodes to which the queue should be
542 job = self._jobs.AddJob(ops, nodes)
545 self._wpool.AddTask(job)
549 def ArchiveJob(self, job_id):
550 raise NotImplementedError()
552 def CancelJob(self, job_id):
553 raise NotImplementedError()
555 def _GetJobInfo(self, job, fields):
560 elif fname == "status":
561 row.append(job.GetStatus())
563 row.append([op.GetInput().__getstate__() for op in job._ops])
564 elif fname == "opresult":
565 row.append([op.GetResult() for op in job._ops])
566 elif fname == "opstatus":
567 row.append([op.GetStatus() for op in job._ops])
568 elif fname == "ticker":
569 ji = job.GetRunOpIndex()
573 lmsg = job._ops[ji].RetrieveLog(-1)
574 # message might be empty here
581 raise errors.OpExecError("Invalid job query field '%s'" % fname)
584 def QueryJobs(self, job_ids, fields):
585 """Returns a list of jobs in queue.
588 - job_ids: Sequence of job identifiers or None for all
589 - fields: Names of fields to return
596 for job in self._jobs.GetJobs(job_ids):
600 jobs.append(self._GetJobInfo(job, fields))
608 """Stops the job queue.
611 self._wpool.TerminateWorkers()