Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 307149a8

History | View | Annotate | Download (6.5 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 498ae1cc Iustin Pop
# Copyright (C) 2006, 2007 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 498ae1cc Iustin Pop
"""Module implementing the job queue handling."""
23 498ae1cc Iustin Pop
24 e2715f69 Michael Hanselmann
import logging
25 e2715f69 Michael Hanselmann
import threading
26 498ae1cc Iustin Pop
27 e2715f69 Michael Hanselmann
from ganeti import constants
28 e2715f69 Michael Hanselmann
from ganeti import workerpool
29 7a1ecaed Iustin Pop
from ganeti import errors
30 e2715f69 Michael Hanselmann
from ganeti import mcpu
31 7996a135 Iustin Pop
from ganeti import utils
32 e2715f69 Michael Hanselmann
33 e2715f69 Michael Hanselmann
34 e2715f69 Michael Hanselmann
JOBQUEUE_THREADS = 5
35 e2715f69 Michael Hanselmann
36 498ae1cc Iustin Pop
37 e2715f69 Michael Hanselmann
class _QueuedOpCode(object):
38 e2715f69 Michael Hanselmann
  """Encasulates an opcode object.
39 e2715f69 Michael Hanselmann

40 307149a8 Iustin Pop
  Access is synchronized by the '_lock' attribute.
41 e2715f69 Michael Hanselmann

42 e2715f69 Michael Hanselmann
  """
43 e2715f69 Michael Hanselmann
  def __init__(self, op):
44 e2715f69 Michael Hanselmann
    self.input = op
45 e2715f69 Michael Hanselmann
    self.status = constants.OP_STATUS_QUEUED
46 e2715f69 Michael Hanselmann
    self.result = None
47 307149a8 Iustin Pop
    self._lock = threading.Lock()
48 307149a8 Iustin Pop
49 307149a8 Iustin Pop
  @utils.LockedMethod
50 307149a8 Iustin Pop
  def SetStatus(self, status, result):
51 307149a8 Iustin Pop
    """Update the opcode status and result.
52 307149a8 Iustin Pop

53 307149a8 Iustin Pop
    """
54 307149a8 Iustin Pop
    self.status = status
55 307149a8 Iustin Pop
    self.result = result
56 307149a8 Iustin Pop
57 307149a8 Iustin Pop
  @utils.LockedMethod
58 307149a8 Iustin Pop
  def GetStatus(self):
59 307149a8 Iustin Pop
    """Get the opcode status.
60 307149a8 Iustin Pop

61 307149a8 Iustin Pop
    """
62 307149a8 Iustin Pop
    return self.status
63 307149a8 Iustin Pop
64 307149a8 Iustin Pop
  @utils.LockedMethod
65 307149a8 Iustin Pop
  def GetResult(self):
66 307149a8 Iustin Pop
    """Get the opcode result.
67 307149a8 Iustin Pop

68 307149a8 Iustin Pop
    """
69 307149a8 Iustin Pop
    return self.result
70 e2715f69 Michael Hanselmann
71 e2715f69 Michael Hanselmann
72 e2715f69 Michael Hanselmann
class _QueuedJob(object):
73 e2715f69 Michael Hanselmann
  """In-memory job representation.
74 e2715f69 Michael Hanselmann

75 e2715f69 Michael Hanselmann
  This is what we use to track the user-submitted jobs.
76 e2715f69 Michael Hanselmann

77 e2715f69 Michael Hanselmann
  """
78 e2715f69 Michael Hanselmann
  def __init__(self, ops, job_id):
79 e2715f69 Michael Hanselmann
    if not ops:
80 e2715f69 Michael Hanselmann
      # TODO
81 e2715f69 Michael Hanselmann
      raise Exception("No opcodes")
82 e2715f69 Michael Hanselmann
83 e2715f69 Michael Hanselmann
    self.id = job_id
84 e2715f69 Michael Hanselmann
    self._lock = threading.Lock()
85 e2715f69 Michael Hanselmann
86 e2715f69 Michael Hanselmann
    # _ops should not be modified again because we don't acquire the lock
87 e2715f69 Michael Hanselmann
    # to use it.
88 e2715f69 Michael Hanselmann
    self._ops = [_QueuedOpCode(op) for op in ops]
89 e2715f69 Michael Hanselmann
90 307149a8 Iustin Pop
  def GetStatus(self):
91 e2715f69 Michael Hanselmann
    status = constants.JOB_STATUS_QUEUED
92 e2715f69 Michael Hanselmann
93 e2715f69 Michael Hanselmann
    all_success = True
94 e2715f69 Michael Hanselmann
    for op in self._ops:
95 307149a8 Iustin Pop
      op_status = op.GetStatus()
96 307149a8 Iustin Pop
      if op_status == constants.OP_STATUS_SUCCESS:
97 e2715f69 Michael Hanselmann
        continue
98 e2715f69 Michael Hanselmann
99 e2715f69 Michael Hanselmann
      all_success = False
100 e2715f69 Michael Hanselmann
101 307149a8 Iustin Pop
      if op_status == constants.OP_STATUS_QUEUED:
102 e2715f69 Michael Hanselmann
        pass
103 307149a8 Iustin Pop
      elif op_status == constants.OP_STATUS_ERROR:
104 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_ERROR
105 307149a8 Iustin Pop
      elif op_status == constants.OP_STATUS_RUNNING:
106 e2715f69 Michael Hanselmann
        status = constants.JOB_STATUS_RUNNING
107 e2715f69 Michael Hanselmann
108 e2715f69 Michael Hanselmann
    if all_success:
109 e2715f69 Michael Hanselmann
      status = constants.JOB_STATUS_SUCCESS
110 e2715f69 Michael Hanselmann
111 e2715f69 Michael Hanselmann
    return status
112 e2715f69 Michael Hanselmann
113 e2715f69 Michael Hanselmann
  def Run(self, proc):
114 e2715f69 Michael Hanselmann
    """Job executor.
115 e2715f69 Michael Hanselmann

116 e2715f69 Michael Hanselmann
    This functions processes a this job in the context of given processor
117 e2715f69 Michael Hanselmann
    instance.
118 e2715f69 Michael Hanselmann

119 e2715f69 Michael Hanselmann
    Args:
120 e2715f69 Michael Hanselmann
    - proc: Ganeti Processor to run the job with
121 e2715f69 Michael Hanselmann

122 e2715f69 Michael Hanselmann
    """
123 e2715f69 Michael Hanselmann
    try:
124 c8549bfd Michael Hanselmann
      count = len(self._ops)
125 c8549bfd Michael Hanselmann
      for idx, op in enumerate(self._ops):
126 e2715f69 Michael Hanselmann
        try:
127 307149a8 Iustin Pop
          logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
128 307149a8 Iustin Pop
          op.SetStatus(constants.OP_STATUS_RUNNING, None)
129 e2715f69 Michael Hanselmann
130 e2715f69 Michael Hanselmann
          result = proc.ExecOpCode(op.input)
131 e2715f69 Michael Hanselmann
132 307149a8 Iustin Pop
          op.SetStatus(constants.OP_STATUS_SUCCESS, result)
133 307149a8 Iustin Pop
          logging.debug("Op %s/%s: Successfully finished %s",
134 307149a8 Iustin Pop
                        idx + 1, count, op)
135 e2715f69 Michael Hanselmann
        except Exception, err:
136 307149a8 Iustin Pop
          op.SetStatus(constants.OP_STATUS_ERROR, str(err))
137 307149a8 Iustin Pop
          logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
138 e2715f69 Michael Hanselmann
          raise
139 e2715f69 Michael Hanselmann
140 e2715f69 Michael Hanselmann
    except errors.GenericError, err:
141 e2715f69 Michael Hanselmann
      logging.error("ganeti exception %s", exc_info=err)
142 e2715f69 Michael Hanselmann
    except Exception, err:
143 e2715f69 Michael Hanselmann
      logging.error("unhandled exception %s", exc_info=err)
144 e2715f69 Michael Hanselmann
    except:
145 e2715f69 Michael Hanselmann
      logging.error("unhandled unknown exception %s", exc_info=err)
146 e2715f69 Michael Hanselmann
147 e2715f69 Michael Hanselmann
148 e2715f69 Michael Hanselmann
class _JobQueueWorker(workerpool.BaseWorker):
149 e2715f69 Michael Hanselmann
  def RunTask(self, job):
150 e2715f69 Michael Hanselmann
    logging.debug("Worker %s processing job %s",
151 e2715f69 Michael Hanselmann
                  self.worker_id, job.id)
152 e2715f69 Michael Hanselmann
    # TODO: feedback function
153 e2715f69 Michael Hanselmann
    proc = mcpu.Processor(self.pool.context, feedback=lambda x: None)
154 e2715f69 Michael Hanselmann
    try:
155 e2715f69 Michael Hanselmann
      job.Run(proc)
156 e2715f69 Michael Hanselmann
    finally:
157 e2715f69 Michael Hanselmann
      logging.debug("Worker %s finished job %s, status = %s",
158 e2715f69 Michael Hanselmann
                    self.worker_id, job.id, job.GetStatus())
159 e2715f69 Michael Hanselmann
160 e2715f69 Michael Hanselmann
161 e2715f69 Michael Hanselmann
class _JobQueueWorkerPool(workerpool.WorkerPool):
162 e2715f69 Michael Hanselmann
  def __init__(self, context):
163 e2715f69 Michael Hanselmann
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
164 e2715f69 Michael Hanselmann
                                              _JobQueueWorker)
165 e2715f69 Michael Hanselmann
    self.context = context
166 e2715f69 Michael Hanselmann
167 e2715f69 Michael Hanselmann
168 e2715f69 Michael Hanselmann
class JobQueue:
169 e2715f69 Michael Hanselmann
  """The job queue.
170 e2715f69 Michael Hanselmann

171 e2715f69 Michael Hanselmann
   """
172 e2715f69 Michael Hanselmann
  def __init__(self, context):
173 e2715f69 Michael Hanselmann
    self._lock = threading.Lock()
174 e2715f69 Michael Hanselmann
    self._last_job_id = 0
175 e2715f69 Michael Hanselmann
    self._jobs = {}
176 e2715f69 Michael Hanselmann
    self._wpool = _JobQueueWorkerPool(context)
177 e2715f69 Michael Hanselmann
178 e2715f69 Michael Hanselmann
  def _NewJobIdUnlocked(self):
179 e2715f69 Michael Hanselmann
    """Generates a new job identifier.
180 e2715f69 Michael Hanselmann

181 e2715f69 Michael Hanselmann
    Returns: A string representing the job identifier.
182 e2715f69 Michael Hanselmann

183 e2715f69 Michael Hanselmann
    """
184 e2715f69 Michael Hanselmann
    self._last_job_id += 1
185 e2715f69 Michael Hanselmann
    return str(self._last_job_id)
186 e2715f69 Michael Hanselmann
187 e2715f69 Michael Hanselmann
  def SubmitJob(self, ops):
188 e2715f69 Michael Hanselmann
    """Add a new job to the queue.
189 e2715f69 Michael Hanselmann

190 e2715f69 Michael Hanselmann
    This enters the job into our job queue and also puts it on the new
191 e2715f69 Michael Hanselmann
    queue, in order for it to be picked up by the queue processors.
192 e2715f69 Michael Hanselmann

193 e2715f69 Michael Hanselmann
    Args:
194 e2715f69 Michael Hanselmann
    - ops: Sequence of opcodes
195 e2715f69 Michael Hanselmann

196 e2715f69 Michael Hanselmann
    """
197 e2715f69 Michael Hanselmann
    # Get job identifier
198 e2715f69 Michael Hanselmann
    self._lock.acquire()
199 e2715f69 Michael Hanselmann
    try:
200 e2715f69 Michael Hanselmann
      job_id = self._NewJobIdUnlocked()
201 e2715f69 Michael Hanselmann
    finally:
202 e2715f69 Michael Hanselmann
      self._lock.release()
203 e2715f69 Michael Hanselmann
204 e2715f69 Michael Hanselmann
    job = _QueuedJob(ops, job_id)
205 e2715f69 Michael Hanselmann
206 e2715f69 Michael Hanselmann
    # Add it to our internal queue
207 e2715f69 Michael Hanselmann
    self._lock.acquire()
208 e2715f69 Michael Hanselmann
    try:
209 e2715f69 Michael Hanselmann
      self._jobs[job_id] = job
210 e2715f69 Michael Hanselmann
    finally:
211 e2715f69 Michael Hanselmann
      self._lock.release()
212 e2715f69 Michael Hanselmann
213 e2715f69 Michael Hanselmann
    # Add to worker pool
214 e2715f69 Michael Hanselmann
    self._wpool.AddTask(job)
215 e2715f69 Michael Hanselmann
216 e2715f69 Michael Hanselmann
    return job_id
217 e2715f69 Michael Hanselmann
218 e2715f69 Michael Hanselmann
  def ArchiveJob(self, job_id):
219 e2715f69 Michael Hanselmann
    raise NotImplementedError()
220 e2715f69 Michael Hanselmann
221 e2715f69 Michael Hanselmann
  def CancelJob(self, job_id):
222 e2715f69 Michael Hanselmann
    raise NotImplementedError()
223 e2715f69 Michael Hanselmann
224 e2715f69 Michael Hanselmann
  def _GetJobInfo(self, job, fields):
225 e2715f69 Michael Hanselmann
    row = []
226 e2715f69 Michael Hanselmann
    for fname in fields:
227 e2715f69 Michael Hanselmann
      if fname == "id":
228 e2715f69 Michael Hanselmann
        row.append(job.id)
229 e2715f69 Michael Hanselmann
      elif fname == "status":
230 e2715f69 Michael Hanselmann
        row.append(job.GetStatus())
231 e2715f69 Michael Hanselmann
      elif fname == "result":
232 e2715f69 Michael Hanselmann
        # TODO
233 307149a8 Iustin Pop
        row.append([op.GetResult() for op in job._ops])
234 e2715f69 Michael Hanselmann
      else:
235 e2715f69 Michael Hanselmann
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
236 e2715f69 Michael Hanselmann
    return row
237 e2715f69 Michael Hanselmann
238 e2715f69 Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
239 e2715f69 Michael Hanselmann
    """Returns a list of jobs in queue.
240 e2715f69 Michael Hanselmann

241 e2715f69 Michael Hanselmann
    Args:
242 e2715f69 Michael Hanselmann
    - job_ids: Sequence of job identifiers or None for all
243 e2715f69 Michael Hanselmann
    - fields: Names of fields to return
244 e2715f69 Michael Hanselmann

245 e2715f69 Michael Hanselmann
    """
246 e2715f69 Michael Hanselmann
    self._lock.acquire()
247 e2715f69 Michael Hanselmann
    try:
248 e2715f69 Michael Hanselmann
      if not job_ids:
249 e2715f69 Michael Hanselmann
        job_ids = self._jobs.keys()
250 e2715f69 Michael Hanselmann
251 e2715f69 Michael Hanselmann
      # TODO: define sort order?
252 e2715f69 Michael Hanselmann
      job_ids.sort()
253 e2715f69 Michael Hanselmann
254 e2715f69 Michael Hanselmann
      jobs = []
255 e2715f69 Michael Hanselmann
256 e2715f69 Michael Hanselmann
      for job_id in job_ids:
257 e2715f69 Michael Hanselmann
        job = self._jobs.get(job_id, None)
258 e2715f69 Michael Hanselmann
        if job is None:
259 e2715f69 Michael Hanselmann
          jobs.append(None)
260 e2715f69 Michael Hanselmann
        else:
261 e2715f69 Michael Hanselmann
          jobs.append(self._GetJobInfo(job, fields))
262 e2715f69 Michael Hanselmann
263 e2715f69 Michael Hanselmann
      return jobs
264 e2715f69 Michael Hanselmann
    finally:
265 e2715f69 Michael Hanselmann
      self._lock.release()
266 e2715f69 Michael Hanselmann
267 e2715f69 Michael Hanselmann
  def Shutdown(self):
268 e2715f69 Michael Hanselmann
    """Stops the job queue.
269 e2715f69 Michael Hanselmann

270 e2715f69 Michael Hanselmann
    """
271 e2715f69 Michael Hanselmann
    self._wpool.TerminateWorkers()