Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ c8549bfd

History | View | Annotate | Download (6.5 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 498ae1cc Iustin Pop
# Copyright (C) 2006, 2007 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 498ae1cc Iustin Pop
"""Module implementing the job queue handling."""
23 498ae1cc Iustin Pop
24 e2715f69 Michael Hanselmann
import logging
25 e2715f69 Michael Hanselmann
import threading
26 498ae1cc Iustin Pop
27 e2715f69 Michael Hanselmann
from ganeti import constants
28 e2715f69 Michael Hanselmann
from ganeti import workerpool
29 7a1ecaed Iustin Pop
from ganeti import errors
30 e2715f69 Michael Hanselmann
from ganeti import mcpu
31 e2715f69 Michael Hanselmann
32 e2715f69 Michael Hanselmann
33 e2715f69 Michael Hanselmann
JOBQUEUE_THREADS = 5
34 e2715f69 Michael Hanselmann
35 498ae1cc Iustin Pop
36 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
37 e2715f69 Michael Hanselmann
  """Encasulates an opcode object.
38 e2715f69 Michael Hanselmann

39 e2715f69 Michael Hanselmann
  Access must be synchronized by using an external lock.
40 e2715f69 Michael Hanselmann

41 e2715f69 Michael Hanselmann
  """
42 e2715f69 Michael Hanselmann
  def __init__(self, op):
43 e2715f69 Michael Hanselmann
    self.input = op
44 e2715f69 Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
45 e2715f69 Michael Hanselmann
    self.result = None
46 e2715f69 Michael Hanselmann
47 e2715f69 Michael Hanselmann
48 e2715f69 Michael Hanselmann
class _QueuedJob(object):
49 e2715f69 Michael Hanselmann
  """In-memory job representation.
50 e2715f69 Michael Hanselmann

51 e2715f69 Michael Hanselmann
  This is what we use to track the user-submitted jobs.
52 e2715f69 Michael Hanselmann

53 e2715f69 Michael Hanselmann
  """
54 e2715f69 Michael Hanselmann
  def __init__(self, ops, job_id):
55 e2715f69 Michael Hanselmann
    if not ops:
56 e2715f69 Michael Hanselmann
      # TODO
57 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
58 e2715f69 Michael Hanselmann
59 e2715f69 Michael Hanselmann
    self.id = job_id
60 e2715f69 Michael Hanselmann
    self._lock = threading.Lock()
61 e2715f69 Michael Hanselmann
62 e2715f69 Michael Hanselmann
    # _ops should not be modified again because we don't acquire the lock
63 e2715f69 Michael Hanselmann
    # to use it.
64 e2715f69 Michael Hanselmann
    self._ops = [_QueuedOpCode(op) for op in ops]
65 e2715f69 Michael Hanselmann
66 e2715f69 Michael Hanselmann
  def _GetStatusUnlocked(self):
67 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
68 e2715f69 Michael Hanselmann
69 e2715f69 Michael Hanselmann
    all_success = True
70 e2715f69 Michael Hanselmann
    for op in self._ops:
71 e2715f69 Michael Hanselmann
      if op.status == constants.OP_STATUS_SUCCESS:
72 e2715f69 Michael Hanselmann
        continue
73 e2715f69 Michael Hanselmann
74 e2715f69 Michael Hanselmann
      all_success = False
75 e2715f69 Michael Hanselmann
76 e2715f69 Michael Hanselmann
      if op.status == constants.OP_STATUS_QUEUED:
77 e2715f69 Michael Hanselmann
        pass
78 e2715f69 Michael Hanselmann
      elif op.status == constants.OP_STATUS_ERROR:
79 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
80 e2715f69 Michael Hanselmann
      elif op.status == constants.OP_STATUS_RUNNING:
81 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
82 e2715f69 Michael Hanselmann
83 e2715f69 Michael Hanselmann
    if all_success:
84 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
85 e2715f69 Michael Hanselmann
86 e2715f69 Michael Hanselmann
    return status
87 e2715f69 Michael Hanselmann
88 e2715f69 Michael Hanselmann
  def GetStatus(self):
89 e2715f69 Michael Hanselmann
    self._lock.acquire()
90 e2715f69 Michael Hanselmann
    try:
91 e2715f69 Michael Hanselmann
      return self._GetStatusUnlocked()
92 e2715f69 Michael Hanselmann
    finally:
93 e2715f69 Michael Hanselmann
      self._lock.release()
94 e2715f69 Michael Hanselmann
95 e2715f69 Michael Hanselmann
  def Run(self, proc):
96 e2715f69 Michael Hanselmann
    """Job executor.
97 e2715f69 Michael Hanselmann

98 e2715f69 Michael Hanselmann
    This functions processes a this job in the context of given processor
99 e2715f69 Michael Hanselmann
    instance.
100 e2715f69 Michael Hanselmann

101 e2715f69 Michael Hanselmann
    Args:
102 e2715f69 Michael Hanselmann
    - proc: Ganeti Processor to run the job with
103 e2715f69 Michael Hanselmann

104 e2715f69 Michael Hanselmann
    """
105 e2715f69 Michael Hanselmann
    try:
106 c8549bfd Michael Hanselmann
      count = len(self._ops)
107 c8549bfd Michael Hanselmann
      for idx, op in enumerate(self._ops):
108 e2715f69 Michael Hanselmann
        try:
109 e2715f69 Michael Hanselmann
          self._lock.acquire()
110 e2715f69 Michael Hanselmann
          try:
111 c8549bfd Michael Hanselmann
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
112 e2715f69 Michael Hanselmann
            op.status = constants.OP_STATUS_RUNNING
113 e2715f69 Michael Hanselmann
          finally:
114 e2715f69 Michael Hanselmann
            self._lock.release()
115 e2715f69 Michael Hanselmann
116 e2715f69 Michael Hanselmann
          result = proc.ExecOpCode(op.input)
117 e2715f69 Michael Hanselmann
118 e2715f69 Michael Hanselmann
          self._lock.acquire()
119 e2715f69 Michael Hanselmann
          try:
120 c8549bfd Michael Hanselmann
            logging.debug("Op %s/%s: Successfully finished %s",
121 c8549bfd Michael Hanselmann
                          idx + 1, count, op)
122 e2715f69 Michael Hanselmann
            op.status = constants.OP_STATUS_SUCCESS
123 e2715f69 Michael Hanselmann
            op.result = result
124 e2715f69 Michael Hanselmann
          finally:
125 e2715f69 Michael Hanselmann
            self._lock.release()
126 e2715f69 Michael Hanselmann
        except Exception, err:
127 e2715f69 Michael Hanselmann
          self._lock.acquire()
128 e2715f69 Michael Hanselmann
          try:
129 c8549bfd Michael Hanselmann
            logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
130 e2715f69 Michael Hanselmann
            op.status = constants.OP_STATUS_ERROR
131 e2715f69 Michael Hanselmann
            op.result = str(err)
132 e2715f69 Michael Hanselmann
          finally:
133 e2715f69 Michael Hanselmann
            self._lock.release()
134 e2715f69 Michael Hanselmann
          raise
135 e2715f69 Michael Hanselmann
136 e2715f69 Michael Hanselmann
    except errors.GenericError, err:
137 e2715f69 Michael Hanselmann
      logging.error("ganeti exception %s", exc_info=err)
138 e2715f69 Michael Hanselmann
    except Exception, err:
139 e2715f69 Michael Hanselmann
      logging.error("unhandled exception %s", exc_info=err)
140 e2715f69 Michael Hanselmann
    except:
141 e2715f69 Michael Hanselmann
      logging.error("unhandled unknown exception %s", exc_info=err)
142 e2715f69 Michael Hanselmann
143 e2715f69 Michael Hanselmann
144 e2715f69 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
145 e2715f69 Michael Hanselmann
  def RunTask(self, job):
146 e2715f69 Michael Hanselmann
    logging.debug("Worker %s processing job %s",
147 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
148 e2715f69 Michael Hanselmann
    # TODO: feedback function
149 e2715f69 Michael Hanselmann
    proc = mcpu.Processor(self.pool.context, feedback=lambda x: None)
150 e2715f69 Michael Hanselmann
    try:
151 e2715f69 Michael Hanselmann
      job.Run(proc)
152 e2715f69 Michael Hanselmann
    finally:
153 e2715f69 Michael Hanselmann
      logging.debug("Worker %s finished job %s, status = %s",
154 e2715f69 Michael Hanselmann
                    self.worker_id, job.id, job.GetStatus())
155 e2715f69 Michael Hanselmann
156 e2715f69 Michael Hanselmann
157 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
158 e2715f69 Michael Hanselmann
  def __init__(self, context):
159 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
160 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
161 e2715f69 Michael Hanselmann
    self.context = context
162 e2715f69 Michael Hanselmann
163 e2715f69 Michael Hanselmann
164 e2715f69 Michael Hanselmann
class JobQueue:
165 e2715f69 Michael Hanselmann
  """The job queue.
166 e2715f69 Michael Hanselmann

167 e2715f69 Michael Hanselmann
   """
168 e2715f69 Michael Hanselmann
  def __init__(self, context):
169 e2715f69 Michael Hanselmann
    self._lock = threading.Lock()
170 e2715f69 Michael Hanselmann
    self._last_job_id = 0
171 e2715f69 Michael Hanselmann
    self._jobs = {}
172 e2715f69 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(context)
173 e2715f69 Michael Hanselmann
174 e2715f69 Michael Hanselmann
  def _NewJobIdUnlocked(self):
175 e2715f69 Michael Hanselmann
    """Generates a new job identifier.
176 e2715f69 Michael Hanselmann

177 e2715f69 Michael Hanselmann
    Returns: A string representing the job identifier.
178 e2715f69 Michael Hanselmann

179 e2715f69 Michael Hanselmann
    """
180 e2715f69 Michael Hanselmann
    self._last_job_id += 1
181 e2715f69 Michael Hanselmann
    return str(self._last_job_id)
182 e2715f69 Michael Hanselmann
183 e2715f69 Michael Hanselmann
  def SubmitJob(self, ops):
184 e2715f69 Michael Hanselmann
    """Add a new job to the queue.
185 e2715f69 Michael Hanselmann

186 e2715f69 Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
187 e2715f69 Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
188 e2715f69 Michael Hanselmann

189 e2715f69 Michael Hanselmann
    Args:
190 e2715f69 Michael Hanselmann
    - ops: Sequence of opcodes
191 e2715f69 Michael Hanselmann

192 e2715f69 Michael Hanselmann
    """
193 e2715f69 Michael Hanselmann
    # Get job identifier
194 e2715f69 Michael Hanselmann
    self._lock.acquire()
195 e2715f69 Michael Hanselmann
    try:
196 e2715f69 Michael Hanselmann
      job_id = self._NewJobIdUnlocked()
197 e2715f69 Michael Hanselmann
    finally:
198 e2715f69 Michael Hanselmann
      self._lock.release()
199 e2715f69 Michael Hanselmann
200 e2715f69 Michael Hanselmann
    job = _QueuedJob(ops, job_id)
201 e2715f69 Michael Hanselmann
202 e2715f69 Michael Hanselmann
    # Add it to our internal queue
203 e2715f69 Michael Hanselmann
    self._lock.acquire()
204 e2715f69 Michael Hanselmann
    try:
205 e2715f69 Michael Hanselmann
      self._jobs[job_id] = job
206 e2715f69 Michael Hanselmann
    finally:
207 e2715f69 Michael Hanselmann
      self._lock.release()
208 e2715f69 Michael Hanselmann
209 e2715f69 Michael Hanselmann
    # Add to worker pool
210 e2715f69 Michael Hanselmann
    self._wpool.AddTask(job)
211 e2715f69 Michael Hanselmann
212 e2715f69 Michael Hanselmann
    return job_id
213 e2715f69 Michael Hanselmann
214 e2715f69 Michael Hanselmann
  def ArchiveJob(self, job_id):
215 e2715f69 Michael Hanselmann
    raise NotImplementedError()
216 e2715f69 Michael Hanselmann
217 e2715f69 Michael Hanselmann
  def CancelJob(self, job_id):
218 e2715f69 Michael Hanselmann
    raise NotImplementedError()
219 e2715f69 Michael Hanselmann
220 e2715f69 Michael Hanselmann
  def _GetJobInfo(self, job, fields):
221 e2715f69 Michael Hanselmann
    row = []
222 e2715f69 Michael Hanselmann
    for fname in fields:
223 e2715f69 Michael Hanselmann
      if fname == "id":
224 e2715f69 Michael Hanselmann
        row.append(job.id)
225 e2715f69 Michael Hanselmann
      elif fname == "status":
226 e2715f69 Michael Hanselmann
        row.append(job.GetStatus())
227 e2715f69 Michael Hanselmann
      elif fname == "result":
228 e2715f69 Michael Hanselmann
        # TODO
229 e2715f69 Michael Hanselmann
        row.append(map(lambda op: op.result, job._ops))
230 e2715f69 Michael Hanselmann
      else:
231 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
232 e2715f69 Michael Hanselmann
    return row
233 e2715f69 Michael Hanselmann
234 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
235 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
236 e2715f69 Michael Hanselmann

237 e2715f69 Michael Hanselmann
    Args:
238 e2715f69 Michael Hanselmann
    - job_ids: Sequence of job identifiers or None for all
239 e2715f69 Michael Hanselmann
    - fields: Names of fields to return
240 e2715f69 Michael Hanselmann

241 e2715f69 Michael Hanselmann
    """
242 e2715f69 Michael Hanselmann
    self._lock.acquire()
243 e2715f69 Michael Hanselmann
    try:
244 e2715f69 Michael Hanselmann
      if not job_ids:
245 e2715f69 Michael Hanselmann
        job_ids = self._jobs.keys()
246 e2715f69 Michael Hanselmann
247 e2715f69 Michael Hanselmann
      # TODO: define sort order?
248 e2715f69 Michael Hanselmann
      job_ids.sort()
249 e2715f69 Michael Hanselmann
250 e2715f69 Michael Hanselmann
      jobs = []
251 e2715f69 Michael Hanselmann
252 e2715f69 Michael Hanselmann
      for job_id in job_ids:
253 e2715f69 Michael Hanselmann
        job = self._jobs.get(job_id, None)
254 e2715f69 Michael Hanselmann
        if job is None:
255 e2715f69 Michael Hanselmann
          jobs.append(None)
256 e2715f69 Michael Hanselmann
        else:
257 e2715f69 Michael Hanselmann
          jobs.append(self._GetJobInfo(job, fields))
258 e2715f69 Michael Hanselmann
259 e2715f69 Michael Hanselmann
      return jobs
260 e2715f69 Michael Hanselmann
    finally:
261 e2715f69 Michael Hanselmann
      self._lock.release()
262 e2715f69 Michael Hanselmann
263 e2715f69 Michael Hanselmann
  def Shutdown(self):
264 e2715f69 Michael Hanselmann
    """Stops the job queue.
265 e2715f69 Michael Hanselmann

266 e2715f69 Michael Hanselmann
    """
267 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()