Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 2467e0d3

History | View | Annotate | Download (6.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import logging
25
import threading
26

    
27
from ganeti import constants
28
from ganeti import workerpool
29
from ganeti import errors
30
from ganeti import mcpu
31

    
32

    
33
JOBQUEUE_THREADS = 5
34

    
35

    
36
class _QueuedOpCode(object):
37
  """Encasulates an opcode object.
38

39
  Access must be synchronized by using an external lock.
40

41
  """
42
  def __init__(self, op):
43
    self.input = op
44
    self.status = constants.OP_STATUS_QUEUED
45
    self.result = None
46

    
47

    
48
class _QueuedJob(object):
49
  """In-memory job representation.
50

51
  This is what we use to track the user-submitted jobs.
52

53
  """
54
  def __init__(self, ops, job_id):
55
    if not ops:
56
      # TODO
57
      raise Exception("No opcodes")
58

    
59
    self.id = job_id
60
    self._lock = threading.Lock()
61

    
62
    # _ops should not be modified again because we don't acquire the lock
63
    # to use it.
64
    self._ops = [_QueuedOpCode(op) for op in ops]
65

    
66
  def _GetStatusUnlocked(self):
67
    status = constants.JOB_STATUS_QUEUED
68

    
69
    all_success = True
70
    for op in self._ops:
71
      if op.status == constants.OP_STATUS_SUCCESS:
72
        continue
73

    
74
      all_success = False
75

    
76
      if op.status == constants.OP_STATUS_QUEUED:
77
        pass
78
      elif op.status == constants.OP_STATUS_ERROR:
79
        status = constants.JOB_STATUS_ERROR
80
      elif op.status == constants.OP_STATUS_RUNNING:
81
        status = constants.JOB_STATUS_RUNNING
82

    
83
    if all_success:
84
      status = constants.JOB_STATUS_SUCCESS
85

    
86
    return status
87

    
88
  def GetStatus(self):
89
    self._lock.acquire()
90
    try:
91
      return self._GetStatusUnlocked()
92
    finally:
93
      self._lock.release()
94

    
95
  def Run(self, proc):
96
    """Job executor.
97

98
    This functions processes a this job in the context of given processor
99
    instance.
100

101
    Args:
102
    - proc: Ganeti Processor to run the job with
103

104
    """
105
    try:
106
      for op in self._ops:
107
        try:
108
          self._lock.acquire()
109
          try:
110
            op.status = constants.OP_STATUS_RUNNING
111
          finally:
112
            self._lock.release()
113

    
114
          result = proc.ExecOpCode(op.input)
115

    
116
          self._lock.acquire()
117
          try:
118
            op.status = constants.OP_STATUS_SUCCESS
119
            op.result = result
120
          finally:
121
            self._lock.release()
122
        except Exception, err:
123
          self._lock.acquire()
124
          try:
125
            op.status = constants.OP_STATUS_ERROR
126
            op.result = str(err)
127
          finally:
128
            self._lock.release()
129
          raise
130

    
131
    except errors.GenericError, err:
132
      logging.error("ganeti exception %s", exc_info=err)
133
    except Exception, err:
134
      logging.error("unhandled exception %s", exc_info=err)
135
    except:
136
      logging.error("unhandled unknown exception %s", exc_info=err)
137

    
138

    
139
class _JobQueueWorker(workerpool.BaseWorker):
140
  def RunTask(self, job):
141
    logging.debug("Worker %s processing job %s",
142
                  self.worker_id, job.id)
143
    # TODO: feedback function
144
    proc = mcpu.Processor(self.pool.context, feedback=lambda x: None)
145
    try:
146
      job.Run(proc)
147
    finally:
148
      logging.debug("Worker %s finished job %s, status = %s",
149
                    self.worker_id, job.id, job.GetStatus())
150

    
151

    
152
class _JobQueueWorkerPool(workerpool.WorkerPool):
153
  def __init__(self, context):
154
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
155
                                              _JobQueueWorker)
156
    self.context = context
157

    
158

    
159
class JobQueue:
160
  """The job queue.
161

162
   """
163
  def __init__(self, context):
164
    self._lock = threading.Lock()
165
    self._last_job_id = 0
166
    self._jobs = {}
167
    self._wpool = _JobQueueWorkerPool(context)
168

    
169
  def _NewJobIdUnlocked(self):
170
    """Generates a new job identifier.
171

172
    Returns: A string representing the job identifier.
173

174
    """
175
    self._last_job_id += 1
176
    return str(self._last_job_id)
177

    
178
  def SubmitJob(self, ops):
179
    """Add a new job to the queue.
180

181
    This enters the job into our job queue and also puts it on the new
182
    queue, in order for it to be picked up by the queue processors.
183

184
    Args:
185
    - ops: Sequence of opcodes
186

187
    """
188
    # Get job identifier
189
    self._lock.acquire()
190
    try:
191
      job_id = self._NewJobIdUnlocked()
192
    finally:
193
      self._lock.release()
194

    
195
    job = _QueuedJob(ops, job_id)
196

    
197
    # Add it to our internal queue
198
    self._lock.acquire()
199
    try:
200
      self._jobs[job_id] = job
201
    finally:
202
      self._lock.release()
203

    
204
    # Add to worker pool
205
    self._wpool.AddTask(job)
206

    
207
    return job_id
208

    
209
  def ArchiveJob(self, job_id):
210
    raise NotImplementedError()
211

    
212
  def CancelJob(self, job_id):
213
    raise NotImplementedError()
214

    
215
  def _GetJobInfo(self, job, fields):
216
    row = []
217
    for fname in fields:
218
      if fname == "id":
219
        row.append(job.id)
220
      elif fname == "status":
221
        row.append(job.GetStatus())
222
      elif fname == "result":
223
        # TODO
224
        row.append(map(lambda op: op.result, job._ops))
225
      else:
226
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
227
    return row
228

    
229
  def QueryJobs(self, job_ids, fields):
230
    """Returns a list of jobs in queue.
231

232
    Args:
233
    - job_ids: Sequence of job identifiers or None for all
234
    - fields: Names of fields to return
235

236
    """
237
    self._lock.acquire()
238
    try:
239
      if not job_ids:
240
        job_ids = self._jobs.keys()
241

    
242
      # TODO: define sort order?
243
      job_ids.sort()
244

    
245
      jobs = []
246

    
247
      for job_id in job_ids:
248
        job = self._jobs.get(job_id, None)
249
        if job is None:
250
          jobs.append(None)
251
        else:
252
          jobs.append(self._GetJobInfo(job, fields))
253

    
254
      return jobs
255
    finally:
256
      self._lock.release()
257

    
258
  def Shutdown(self):
259
    """Stops the job queue.
260

261
    """
262
    self._wpool.TerminateWorkers()