4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling."""
31 from ganeti import constants
32 from ganeti import serializer
33 from ganeti import workerpool
34 from ganeti import opcodes
35 from ganeti import errors
36 from ganeti import mcpu
37 from ganeti import utils
38 from ganeti import rpc
44 class _QueuedOpCode(object):
45 """Encasulates an opcode object.
47 Access is synchronized by the '_lock' attribute.
49 The 'log' attribute holds the execution log and consists of tuples
50 of the form (timestamp, level, message).
53 def __init__(self, op):
54 self.__Setup(op, constants.OP_STATUS_QUEUED, None, [])
56 def __Setup(self, input_, status, result, log):
57 self._lock = threading.Lock()
64 def Restore(cls, state):
65 obj = object.__new__(cls)
66 obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
67 state["status"], state["result"], state["log"])
73 "input": self.input.__getstate__(),
74 "status": self.status,
75 "result": self.result,
81 """Returns the original opcode.
87 def SetStatus(self, status, result):
88 """Update the opcode status and result.
96 """Get the opcode status.
103 """Get the opcode result.
109 def Log(self, *args):
110 """Append a log entry.
116 log_type = constants.ELOG_MESSAGE
119 log_type, log_msg = args
120 self.log.append((time.time(), log_type, log_msg))
123 def RetrieveLog(self, start_at=0):
124 """Retrieve (a part of) the execution log.
127 return self.log[start_at:]
130 class _QueuedJob(object):
131 """In-memory job representation.
133 This is what we use to track the user-submitted jobs.
136 def __init__(self, storage, job_id, ops):
139 raise Exception("No opcodes")
141 self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1)
143 def __Setup(self, storage, job_id, ops, run_op_index):
144 self._lock = threading.Lock()
145 self.storage = storage
148 self.run_op_index = run_op_index
151 def Restore(cls, storage, state):
152 obj = object.__new__(cls)
153 op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
154 obj.__Setup(storage, state["id"], op_list, state["run_op_index"])
160 "ops": [op.Serialize() for op in self._ops],
161 "run_op_index": self.run_op_index,
164 def _SetStatus(self, status, msg):
167 op.SetStatus(status, msg)
169 self.storage.UpdateJob(self)
171 def SetUnclean(self, msg):
172 return self._SetStatus(constants.OP_STATUS_ERROR, msg)
174 def SetCanceled(self, msg):
175 return self._SetStatus(constants.JOB_STATUS_CANCELED, msg)
178 status = constants.JOB_STATUS_QUEUED
182 op_status = op.GetStatus()
183 if op_status == constants.OP_STATUS_SUCCESS:
188 if op_status == constants.OP_STATUS_QUEUED:
190 elif op_status == constants.OP_STATUS_RUNNING:
191 status = constants.JOB_STATUS_RUNNING
192 elif op_status == constants.OP_STATUS_ERROR:
193 status = constants.JOB_STATUS_ERROR
194 # The whole job fails if one opcode failed
196 elif op_status == constants.OP_STATUS_CANCELED:
197 status = constants.OP_STATUS_CANCELED
201 status = constants.JOB_STATUS_SUCCESS
206 def GetRunOpIndex(self):
207 return self.run_op_index
212 This functions processes a this job in the context of given processor
216 - proc: Ganeti Processor to run the job with
220 count = len(self._ops)
221 for idx, op in enumerate(self._ops):
223 logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
227 self.run_op_index = idx
231 op.SetStatus(constants.OP_STATUS_RUNNING, None)
232 self.storage.UpdateJob(self)
234 result = proc.ExecOpCode(op.input, op.Log)
236 op.SetStatus(constants.OP_STATUS_SUCCESS, result)
237 self.storage.UpdateJob(self)
238 logging.debug("Op %s/%s: Successfully finished %s",
240 except Exception, err:
242 op.SetStatus(constants.OP_STATUS_ERROR, str(err))
243 logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
245 self.storage.UpdateJob(self)
248 except errors.GenericError, err:
249 logging.error("ganeti exception %s", exc_info=err)
250 except Exception, err:
251 logging.error("unhandled exception %s", exc_info=err)
253 logging.error("unhandled unknown exception %s", exc_info=err)
256 class _JobQueueWorker(workerpool.BaseWorker):
257 def RunTask(self, job):
258 logging.debug("Worker %s processing job %s",
259 self.worker_id, job.id)
260 # TODO: feedback function
261 proc = mcpu.Processor(self.pool.context)
265 logging.debug("Worker %s finished job %s, status = %s",
266 self.worker_id, job.id, job.GetStatus())
269 class _JobQueueWorkerPool(workerpool.WorkerPool):
270 def __init__(self, context):
271 super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
273 self.context = context
276 class JobStorageBase(object):
277 def __init__(self, id_prefix):
278 self.id_prefix = id_prefix
281 prefix_pattern = re.escape("%s-" % id_prefix)
285 # Apart from the prefix, all job IDs are numeric
286 self._re_job_id = re.compile(r"^%s\d+$" % prefix_pattern)
288 def OwnsJobId(self, job_id):
289 return self._re_job_id.match(job_id)
291 def FormatJobID(self, job_id):
292 if not isinstance(job_id, (int, long)):
293 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
295 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
298 prefix = "%s-" % self.id_prefix
302 return "%s%010d" % (prefix, job_id)
304 def _ShouldJobBeArchivedUnlocked(self, job):
305 if job.GetStatus() not in (constants.JOB_STATUS_CANCELED,
306 constants.JOB_STATUS_SUCCESS,
307 constants.JOB_STATUS_ERROR):
308 logging.debug("Job %s is not yet done", job.id)
313 class DiskJobStorage(JobStorageBase):
314 _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
316 def __init__(self, id_prefix):
317 JobStorageBase.__init__(self, id_prefix)
319 self._lock = threading.Lock()
321 self._my_hostname = utils.HostInfo().name
323 # Make sure our directories exists
324 for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR):
328 if err.errno not in (errno.EEXIST, ):
332 self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
334 utils.LockFile(self.lock_fd)
341 version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
343 if err.errno not in (errno.ENOENT, ):
347 self._InitQueueUnlocked()
350 version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
353 # Try to read version
354 version = int(version_fd.read(128))
357 if version != constants.JOB_QUEUE_VERSION:
358 raise errors.JobQueueError("Found version %s, expected %s",
359 version, constants.JOB_QUEUE_VERSION)
363 self._last_serial = self._ReadSerial()
364 if self._last_serial is None:
365 raise errors.ConfigurationError("Can't read/parse the job queue serial"
370 """Try to read the job serial file.
373 @return: If the serial can be read, then it is returned. Otherwise None
378 serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
381 serial = int(serial_fd.read(1024).strip())
384 except (ValueError, EnvironmentError):
390 assert self.lock_fd, "Queue should be open"
395 def _InitQueueUnlocked(self):
396 assert self.lock_fd, "Queue should be open"
398 utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
399 data="%s\n" % constants.JOB_QUEUE_VERSION)
400 if self._ReadSerial() is None:
401 utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
404 def _NewSerialUnlocked(self, nodes):
405 """Generates a new job identifier.
407 Job identifiers are unique during the lifetime of a cluster.
409 Returns: A string representing the job identifier.
412 assert self.lock_fd, "Queue should be open"
415 serial = self._last_serial + 1
418 utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
419 data="%s\n" % serial)
421 # Keep it only if we were able to write the file
422 self._last_serial = serial
424 # Distribute the serial to the other nodes
426 nodes.remove(self._my_hostname)
430 result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE)
433 logging.error("copy of job queue file to node %s failed", node)
435 return self.FormatJobID(serial)
437 def _GetJobPath(self, job_id):
438 return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
440 def _GetArchivedJobPath(self, job_id):
441 return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
443 def _ExtractJobID(self, name):
444 m = self._RE_JOB_FILE.match(name)
450 def _GetJobIDsUnlocked(self, archived=False):
451 """Return all known job IDs.
453 If the parameter archived is True, archived jobs IDs will be
454 included. Currently this argument is unused.
456 The method only looks at disk because it's a requirement that all
457 jobs are present on disk (so in the _memcache we don't have any
461 jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
465 def _ListJobFiles(self):
466 assert self.lock_fd, "Queue should be open"
468 return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
469 if self._RE_JOB_FILE.match(name)]
471 def _LoadJobUnlocked(self, job_id):
472 assert self.lock_fd, "Queue should be open"
474 if job_id in self._memcache:
475 logging.debug("Found job %s in memcache", job_id)
476 return self._memcache[job_id]
478 filepath = self._GetJobPath(job_id)
479 logging.debug("Loading job from %s", filepath)
481 fd = open(filepath, "r")
483 if err.errno in (errno.ENOENT, ):
487 data = serializer.LoadJson(fd.read())
491 job = _QueuedJob.Restore(self, data)
492 self._memcache[job_id] = job
493 logging.debug("Added job %s to the cache", job_id)
496 def _GetJobsUnlocked(self, job_ids):
498 job_ids = self._GetJobIDsUnlocked()
500 return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
503 def GetJobs(self, job_ids):
504 return self._GetJobsUnlocked(job_ids)
507 def AddJob(self, ops, nodes):
508 """Create and store on disk a new job.
511 @param ops: The list of OpCodes that will become the new job.
513 @param nodes: The list of nodes to which the new job serial will be
517 assert self.lock_fd, "Queue should be open"
520 job_id = self._NewSerialUnlocked(nodes)
521 job = _QueuedJob(self, job_id, ops)
524 self._UpdateJobUnlocked(job)
526 logging.debug("Added new job %s to the cache", job_id)
527 self._memcache[job_id] = job
531 def _UpdateJobUnlocked(self, job):
532 assert self.lock_fd, "Queue should be open"
534 filename = self._GetJobPath(job.id)
535 logging.debug("Writing job %s to %s", job.id, filename)
536 utils.WriteFile(filename,
537 data=serializer.DumpJson(job.Serialize(), indent=False))
538 self._CleanCacheUnlocked([job.id])
540 def _CleanCacheUnlocked(self, exclude):
541 """Clean the memory cache.
543 The exceptions argument contains job IDs that should not be
547 assert isinstance(exclude, list)
548 for job in self._memcache.values():
549 if job.id in exclude:
551 if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,
552 constants.JOB_STATUS_RUNNING):
553 logging.debug("Cleaning job %s from the cache", job.id)
555 del self._memcache[job.id]
560 def UpdateJob(self, job):
561 return self._UpdateJobUnlocked(job)
563 # TODO: Figure out locking
565 def CancelJob(self, job_id):
569 @param job_id: Job ID of job to be cancelled.
572 logging.debug("Cancelling job %s", job_id)
576 job = self._LoadJobUnlocked(job_id)
580 logging.debug("Job %s not found", job_id)
583 if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,):
584 logging.debug("Job %s is no longer in the queue", job.id)
587 job.SetCanceled("Job cancelled by request")
590 def ArchiveJob(self, job_id):
594 @param job_id: Job ID of job to be archived.
597 logging.debug("Archiving job %s", job_id)
599 job = self._LoadJobUnlocked(job_id)
601 logging.debug("Job %s not found", job_id)
604 if not self._ShouldJobBeArchivedUnlocked(job):
608 old = self._GetJobPath(job.id)
609 new = self._GetArchivedJobPath(job.id)
613 logging.debug("Successfully archived job %s", job.id)
615 # Cleaning the cache because we don't know what os.rename actually did
616 # and to be on the safe side.
617 self._CleanCacheUnlocked([])
624 def __init__(self, context):
625 self._lock = threading.Lock()
626 self._jobs = DiskJobStorage("")
627 self._wpool = _JobQueueWorkerPool(context)
629 for job in self._jobs.GetJobs(None):
630 status = job.GetStatus()
631 if status in (constants.JOB_STATUS_QUEUED, ):
632 self._wpool.AddTask(job)
634 elif status in (constants.JOB_STATUS_RUNNING, ):
635 logging.warning("Unfinished job %s found: %s", job.id, job)
636 job.SetUnclean("Unclean master daemon shutdown")
639 def SubmitJob(self, ops, nodes):
640 """Add a new job to the queue.
642 This enters the job into our job queue and also puts it on the new
643 queue, in order for it to be picked up by the queue processors.
646 @param ops: the sequence of opcodes that will become the new job
648 @param nodes: the list of nodes to which the queue should be
652 job = self._jobs.AddJob(ops, nodes)
655 self._wpool.AddTask(job)
659 def ArchiveJob(self, job_id):
660 self._jobs.ArchiveJob(job_id)
663 def CancelJob(self, job_id):
664 self._jobs.CancelJob(job_id)
666 def _GetJobInfo(self, job, fields):
671 elif fname == "status":
672 row.append(job.GetStatus())
674 row.append([op.GetInput().__getstate__() for op in job._ops])
675 elif fname == "opresult":
676 row.append([op.GetResult() for op in job._ops])
677 elif fname == "opstatus":
678 row.append([op.GetStatus() for op in job._ops])
679 elif fname == "ticker":
680 ji = job.GetRunOpIndex()
684 lmsg = job._ops[ji].RetrieveLog(-1)
685 # message might be empty here
692 raise errors.OpExecError("Invalid job query field '%s'" % fname)
695 def QueryJobs(self, job_ids, fields):
696 """Returns a list of jobs in queue.
699 - job_ids: Sequence of job identifiers or None for all
700 - fields: Names of fields to return
707 for job in self._jobs.GetJobs(job_ids):
711 jobs.append(self._GetJobInfo(job, fields))
719 """Stops the job queue.
722 self._wpool.TerminateWorkers()