Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ c8549bfd

History | View | Annotate | Download (6.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import logging
25
import threading
26

    
27
from ganeti import constants
28
from ganeti import workerpool
29
from ganeti import errors
30
from ganeti import mcpu
31

    
32

    
33
JOBQUEUE_THREADS = 5
34

    
35

    
36
class _QueuedOpCode(object):
37
  """Encasulates an opcode object.
38

39
  Access must be synchronized by using an external lock.
40

41
  """
42
  def __init__(self, op):
43
    self.input = op
44
    self.status = constants.OP_STATUS_QUEUED
45
    self.result = None
46

    
47

    
48
class _QueuedJob(object):
49
  """In-memory job representation.
50

51
  This is what we use to track the user-submitted jobs.
52

53
  """
54
  def __init__(self, ops, job_id):
55
    if not ops:
56
      # TODO
57
      raise Exception("No opcodes")
58

    
59
    self.id = job_id
60
    self._lock = threading.Lock()
61

    
62
    # _ops should not be modified again because we don't acquire the lock
63
    # to use it.
64
    self._ops = [_QueuedOpCode(op) for op in ops]
65

    
66
  def _GetStatusUnlocked(self):
67
    status = constants.JOB_STATUS_QUEUED
68

    
69
    all_success = True
70
    for op in self._ops:
71
      if op.status == constants.OP_STATUS_SUCCESS:
72
        continue
73

    
74
      all_success = False
75

    
76
      if op.status == constants.OP_STATUS_QUEUED:
77
        pass
78
      elif op.status == constants.OP_STATUS_ERROR:
79
        status = constants.JOB_STATUS_ERROR
80
      elif op.status == constants.OP_STATUS_RUNNING:
81
        status = constants.JOB_STATUS_RUNNING
82

    
83
    if all_success:
84
      status = constants.JOB_STATUS_SUCCESS
85

    
86
    return status
87

    
88
  def GetStatus(self):
89
    self._lock.acquire()
90
    try:
91
      return self._GetStatusUnlocked()
92
    finally:
93
      self._lock.release()
94

    
95
  def Run(self, proc):
96
    """Job executor.
97

98
    This functions processes a this job in the context of given processor
99
    instance.
100

101
    Args:
102
    - proc: Ganeti Processor to run the job with
103

104
    """
105
    try:
106
      count = len(self._ops)
107
      for idx, op in enumerate(self._ops):
108
        try:
109
          self._lock.acquire()
110
          try:
111
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
112
            op.status = constants.OP_STATUS_RUNNING
113
          finally:
114
            self._lock.release()
115

    
116
          result = proc.ExecOpCode(op.input)
117

    
118
          self._lock.acquire()
119
          try:
120
            logging.debug("Op %s/%s: Successfully finished %s",
121
                          idx + 1, count, op)
122
            op.status = constants.OP_STATUS_SUCCESS
123
            op.result = result
124
          finally:
125
            self._lock.release()
126
        except Exception, err:
127
          self._lock.acquire()
128
          try:
129
            logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
130
            op.status = constants.OP_STATUS_ERROR
131
            op.result = str(err)
132
          finally:
133
            self._lock.release()
134
          raise
135

    
136
    except errors.GenericError, err:
137
      logging.error("ganeti exception %s", exc_info=err)
138
    except Exception, err:
139
      logging.error("unhandled exception %s", exc_info=err)
140
    except:
141
      logging.error("unhandled unknown exception %s", exc_info=err)
142

    
143

    
144
class _JobQueueWorker(workerpool.BaseWorker):
145
  def RunTask(self, job):
146
    logging.debug("Worker %s processing job %s",
147
                  self.worker_id, job.id)
148
    # TODO: feedback function
149
    proc = mcpu.Processor(self.pool.context, feedback=lambda x: None)
150
    try:
151
      job.Run(proc)
152
    finally:
153
      logging.debug("Worker %s finished job %s, status = %s",
154
                    self.worker_id, job.id, job.GetStatus())
155

    
156

    
157
class _JobQueueWorkerPool(workerpool.WorkerPool):
158
  def __init__(self, context):
159
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
160
                                              _JobQueueWorker)
161
    self.context = context
162

    
163

    
164
class JobQueue:
165
  """The job queue.
166

167
   """
168
  def __init__(self, context):
169
    self._lock = threading.Lock()
170
    self._last_job_id = 0
171
    self._jobs = {}
172
    self._wpool = _JobQueueWorkerPool(context)
173

    
174
  def _NewJobIdUnlocked(self):
175
    """Generates a new job identifier.
176

177
    Returns: A string representing the job identifier.
178

179
    """
180
    self._last_job_id += 1
181
    return str(self._last_job_id)
182

    
183
  def SubmitJob(self, ops):
184
    """Add a new job to the queue.
185

186
    This enters the job into our job queue and also puts it on the new
187
    queue, in order for it to be picked up by the queue processors.
188

189
    Args:
190
    - ops: Sequence of opcodes
191

192
    """
193
    # Get job identifier
194
    self._lock.acquire()
195
    try:
196
      job_id = self._NewJobIdUnlocked()
197
    finally:
198
      self._lock.release()
199

    
200
    job = _QueuedJob(ops, job_id)
201

    
202
    # Add it to our internal queue
203
    self._lock.acquire()
204
    try:
205
      self._jobs[job_id] = job
206
    finally:
207
      self._lock.release()
208

    
209
    # Add to worker pool
210
    self._wpool.AddTask(job)
211

    
212
    return job_id
213

    
214
  def ArchiveJob(self, job_id):
215
    raise NotImplementedError()
216

    
217
  def CancelJob(self, job_id):
218
    raise NotImplementedError()
219

    
220
  def _GetJobInfo(self, job, fields):
221
    row = []
222
    for fname in fields:
223
      if fname == "id":
224
        row.append(job.id)
225
      elif fname == "status":
226
        row.append(job.GetStatus())
227
      elif fname == "result":
228
        # TODO
229
        row.append(map(lambda op: op.result, job._ops))
230
      else:
231
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
232
    return row
233

    
234
  def QueryJobs(self, job_ids, fields):
235
    """Returns a list of jobs in queue.
236

237
    Args:
238
    - job_ids: Sequence of job identifiers or None for all
239
    - fields: Names of fields to return
240

241
    """
242
    self._lock.acquire()
243
    try:
244
      if not job_ids:
245
        job_ids = self._jobs.keys()
246

    
247
      # TODO: define sort order?
248
      job_ids.sort()
249

    
250
      jobs = []
251

    
252
      for job_id in job_ids:
253
        job = self._jobs.get(job_id, None)
254
        if job is None:
255
          jobs.append(None)
256
        else:
257
          jobs.append(self._GetJobInfo(job, fields))
258

    
259
      return jobs
260
    finally:
261
      self._lock.release()
262

    
263
  def Shutdown(self):
264
    """Stops the job queue.
265

266
    """
267
    self._wpool.TerminateWorkers()