4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling."""
31 from ganeti import constants
32 from ganeti import serializer
33 from ganeti import workerpool
34 from ganeti import opcodes
35 from ganeti import errors
36 from ganeti import mcpu
37 from ganeti import utils
38 from ganeti import rpc
44 class _QueuedOpCode(object):
45 """Encasulates an opcode object.
47 Access is synchronized by the '_lock' attribute.
49 The 'log' attribute holds the execution log and consists of tuples
50 of the form (timestamp, level, message).
53 def __init__(self, op):
54 self.__Setup(op, constants.OP_STATUS_QUEUED, None, [])
56 def __Setup(self, input_, status, result, log):
57 self._lock = threading.Lock()
64 def Restore(cls, state):
65 obj = object.__new__(cls)
66 obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
67 state["status"], state["result"], state["log"])
73 "input": self.input.__getstate__(),
74 "status": self.status,
75 "result": self.result,
81 """Returns the original opcode.
87 def SetStatus(self, status, result):
88 """Update the opcode status and result.
96 """Get the opcode status.
103 """Get the opcode result.
109 def Log(self, *args):
110 """Append a log entry.
116 log_type = constants.ELOG_MESSAGE
119 log_type, log_msg = args
120 self.log.append((time.time(), log_type, log_msg))
123 def RetrieveLog(self, start_at=0):
124 """Retrieve (a part of) the execution log.
127 return self.log[start_at:]
130 class _QueuedJob(object):
131 """In-memory job representation.
133 This is what we use to track the user-submitted jobs.
136 def __init__(self, storage, job_id, ops):
139 raise Exception("No opcodes")
141 self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1)
143 def __Setup(self, storage, job_id, ops, run_op_index):
144 self._lock = threading.Lock()
145 self.storage = storage
148 self.run_op_index = run_op_index
151 def Restore(cls, storage, state):
152 obj = object.__new__(cls)
153 op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
154 obj.__Setup(storage, state["id"], op_list, state["run_op_index"])
160 "ops": [op.Serialize() for op in self._ops],
161 "run_op_index": self.run_op_index,
164 def SetUnclean(self, msg):
167 op.SetStatus(constants.OP_STATUS_ERROR, msg)
169 self.storage.UpdateJob(self)
172 status = constants.JOB_STATUS_QUEUED
176 op_status = op.GetStatus()
177 if op_status == constants.OP_STATUS_SUCCESS:
182 if op_status == constants.OP_STATUS_QUEUED:
184 elif op_status == constants.OP_STATUS_RUNNING:
185 status = constants.JOB_STATUS_RUNNING
186 elif op_status == constants.OP_STATUS_ERROR:
187 status = constants.JOB_STATUS_ERROR
188 # The whole job fails if one opcode failed
192 status = constants.JOB_STATUS_SUCCESS
197 def GetRunOpIndex(self):
198 return self.run_op_index
203 This functions processes a this job in the context of given processor
207 - proc: Ganeti Processor to run the job with
211 count = len(self._ops)
212 for idx, op in enumerate(self._ops):
214 logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
218 self.run_op_index = idx
222 op.SetStatus(constants.OP_STATUS_RUNNING, None)
223 self.storage.UpdateJob(self)
225 result = proc.ExecOpCode(op.input, op.Log)
227 op.SetStatus(constants.OP_STATUS_SUCCESS, result)
228 self.storage.UpdateJob(self)
229 logging.debug("Op %s/%s: Successfully finished %s",
231 except Exception, err:
233 op.SetStatus(constants.OP_STATUS_ERROR, str(err))
234 logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
236 self.storage.UpdateJob(self)
239 except errors.GenericError, err:
240 logging.error("ganeti exception %s", exc_info=err)
241 except Exception, err:
242 logging.error("unhandled exception %s", exc_info=err)
244 logging.error("unhandled unknown exception %s", exc_info=err)
247 class _JobQueueWorker(workerpool.BaseWorker):
248 def RunTask(self, job):
249 logging.debug("Worker %s processing job %s",
250 self.worker_id, job.id)
251 # TODO: feedback function
252 proc = mcpu.Processor(self.pool.context)
256 logging.debug("Worker %s finished job %s, status = %s",
257 self.worker_id, job.id, job.GetStatus())
260 class _JobQueueWorkerPool(workerpool.WorkerPool):
261 def __init__(self, context):
262 super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
264 self.context = context
267 class JobStorageBase(object):
268 def __init__(self, id_prefix):
269 self.id_prefix = id_prefix
272 prefix_pattern = re.escape("%s-" % id_prefix)
276 # Apart from the prefix, all job IDs are numeric
277 self._re_job_id = re.compile(r"^%s\d+$" % prefix_pattern)
279 def OwnsJobId(self, job_id):
280 return self._re_job_id.match(job_id)
282 def FormatJobID(self, job_id):
283 if not isinstance(job_id, (int, long)):
284 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
286 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
289 prefix = "%s-" % self.id_prefix
293 return "%s%010d" % (prefix, job_id)
296 class DiskJobStorage(JobStorageBase):
297 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
299 def __init__(self, id_prefix):
300 JobStorageBase.__init__(self, id_prefix)
302 self._lock = threading.Lock()
304 self._my_hostname = utils.HostInfo().name
306 # Make sure our directories exists
307 for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR):
311 if err.errno not in (errno.EEXIST, ):
315 self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
317 utils.LockFile(self.lock_fd)
324 version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
326 if err.errno not in (errno.ENOENT, ):
330 self._InitQueueUnlocked()
333 version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
336 # Try to read version
337 version = int(version_fd.read(128))
340 if version != constants.JOB_QUEUE_VERSION:
341 raise errors.JobQueueError("Found version %s, expected %s",
342 version, constants.JOB_QUEUE_VERSION)
346 self._last_serial = self._ReadSerial()
347 if self._last_serial is None:
348 raise errors.ConfigurationError("Can't read/parse the job queue serial"
353 """Try to read the job serial file.
356 @return: If the serial can be read, then it is returned. Otherwise None
361 serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
364 serial = int(serial_fd.read(1024).strip())
367 except (ValueError, EnvironmentError):
373 assert self.lock_fd, "Queue should be open"
378 def _InitQueueUnlocked(self):
379 assert self.lock_fd, "Queue should be open"
381 utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
382 data="%s\n" % constants.JOB_QUEUE_VERSION)
383 if self._ReadSerial() is None:
384 utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
387 def _NewSerialUnlocked(self, nodes):
388 """Generates a new job identifier.
390 Job identifiers are unique during the lifetime of a cluster.
392 Returns: A string representing the job identifier.
395 assert self.lock_fd, "Queue should be open"
398 serial = self._last_serial + 1
401 utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
402 data="%s\n" % serial)
404 # Keep it only if we were able to write the file
405 self._last_serial = serial
407 # Distribute the serial to the other nodes
409 nodes.remove(self._my_hostname)
413 result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE)
416 logging.error("copy of job queue file to node %s failed", node)
418 return self.FormatJobID(serial)
420 def _GetJobPath(self, job_id):
421 return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
423 def _GetArchivedJobPath(self, job_id):
424 return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
426 def _GetJobIDsUnlocked(self, archived=False):
427 """Return all known job IDs.
429 If the parameter archived is True, archived jobs IDs will be
430 included. Currently this argument is unused.
432 The method only looks at disk because it's a requirement that all
433 jobs are present on disk (so in the _memcache we don't have any
437 jfiles = self._ListJobFiles()
438 jlist = [m.group(1) for m in
439 [self._RE_JOB_FILE.match(name) for name in jfiles]]
443 def _ListJobFiles(self):
444 assert self.lock_fd, "Queue should be open"
446 return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
447 if self._RE_JOB_FILE.match(name)]
449 def _LoadJobUnlocked(self, job_id):
450 assert self.lock_fd, "Queue should be open"
452 if job_id in self._memcache:
453 logging.debug("Found job %s in memcache", job_id)
454 return self._memcache[job_id]
456 filepath = self._GetJobPath(job_id)
457 logging.debug("Loading job from %s", filepath)
459 fd = open(filepath, "r")
461 if err.errno in (errno.ENOENT, ):
465 data = serializer.LoadJson(fd.read())
469 job = _QueuedJob.Restore(self, data)
470 self._memcache[job_id] = job
471 logging.debug("Added job %s to the cache", job_id)
474 def _GetJobsUnlocked(self, job_ids):
476 job_ids = self._GetJobIDsUnlocked()
478 return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
481 def GetJobs(self, job_ids):
482 return self._GetJobsUnlocked(job_ids)
485 def AddJob(self, ops, nodes):
486 """Create and store on disk a new job.
489 @param ops: The list of OpCodes that will become the new job.
491 @param nodes: The list of nodes to which the new job serial will be
495 assert self.lock_fd, "Queue should be open"
498 job_id = self._NewSerialUnlocked(nodes)
499 job = _QueuedJob(self, job_id, ops)
502 self._UpdateJobUnlocked(job)
504 logging.debug("Added new job %s to the cache", job_id)
505 self._memcache[job_id] = job
509 def _UpdateJobUnlocked(self, job):
510 assert self.lock_fd, "Queue should be open"
512 filename = self._GetJobPath(job.id)
513 logging.debug("Writing job %s to %s", job.id, filename)
514 utils.WriteFile(filename,
515 data=serializer.DumpJson(job.Serialize(), indent=False))
516 self._CleanCacheUnlocked([job.id])
518 def _CleanCacheUnlocked(self, exclude):
519 """Clean the memory cache.
521 The exceptions argument contains job IDs that should not be
525 assert isinstance(exclude, list)
526 for job in self._memcache.values():
527 if job.id in exclude:
529 if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,
530 constants.JOB_STATUS_RUNNING):
531 logging.debug("Cleaning job %s from the cache", job.id)
533 del self._memcache[job.id]
538 def UpdateJob(self, job):
539 return self._UpdateJobUnlocked(job)
541 def ArchiveJob(self, job_id):
542 raise NotImplementedError()
549 def __init__(self, context):
550 self._lock = threading.Lock()
551 self._jobs = DiskJobStorage("")
552 self._wpool = _JobQueueWorkerPool(context)
554 for job in self._jobs.GetJobs(None):
555 status = job.GetStatus()
556 if status in (constants.JOB_STATUS_QUEUED, ):
557 self._wpool.AddTask(job)
559 elif status in (constants.JOB_STATUS_RUNNING, ):
560 logging.warning("Unfinished job %s found: %s", job.id, job)
561 job.SetUnclean("Unclean master daemon shutdown")
564 def SubmitJob(self, ops, nodes):
565 """Add a new job to the queue.
567 This enters the job into our job queue and also puts it on the new
568 queue, in order for it to be picked up by the queue processors.
571 @param ops: the sequence of opcodes that will become the new job
573 @param nodes: the list of nodes to which the queue should be
577 job = self._jobs.AddJob(ops, nodes)
580 self._wpool.AddTask(job)
584 def ArchiveJob(self, job_id):
585 raise NotImplementedError()
587 def CancelJob(self, job_id):
588 raise NotImplementedError()
590 def _GetJobInfo(self, job, fields):
595 elif fname == "status":
596 row.append(job.GetStatus())
598 row.append([op.GetInput().__getstate__() for op in job._ops])
599 elif fname == "opresult":
600 row.append([op.GetResult() for op in job._ops])
601 elif fname == "opstatus":
602 row.append([op.GetStatus() for op in job._ops])
603 elif fname == "ticker":
604 ji = job.GetRunOpIndex()
608 lmsg = job._ops[ji].RetrieveLog(-1)
609 # message might be empty here
616 raise errors.OpExecError("Invalid job query field '%s'" % fname)
619 def QueryJobs(self, job_ids, fields):
620 """Returns a list of jobs in queue.
623 - job_ids: Sequence of job identifiers or None for all
624 - fields: Names of fields to return
631 for job in self._jobs.GetJobs(job_ids):
635 jobs.append(self._GetJobInfo(job, fields))
643 """Stops the job queue.
646 self._wpool.TerminateWorkers()