Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 307149a8

History | View | Annotate | Download (6.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import logging
25
import threading
26

    
27
from ganeti import constants
28
from ganeti import workerpool
29
from ganeti import errors
30
from ganeti import mcpu
31
from ganeti import utils
32

    
33

    
34
JOBQUEUE_THREADS = 5
35

    
36

    
37
class _QueuedOpCode(object):
38
  """Encasulates an opcode object.
39

40
  Access is synchronized by the '_lock' attribute.
41

42
  """
43
  def __init__(self, op):
44
    self.input = op
45
    self.status = constants.OP_STATUS_QUEUED
46
    self.result = None
47
    self._lock = threading.Lock()
48

    
49
  @utils.LockedMethod
50
  def SetStatus(self, status, result):
51
    """Update the opcode status and result.
52

53
    """
54
    self.status = status
55
    self.result = result
56

    
57
  @utils.LockedMethod
58
  def GetStatus(self):
59
    """Get the opcode status.
60

61
    """
62
    return self.status
63

    
64
  @utils.LockedMethod
65
  def GetResult(self):
66
    """Get the opcode result.
67

68
    """
69
    return self.result
70

    
71

    
72
class _QueuedJob(object):
73
  """In-memory job representation.
74

75
  This is what we use to track the user-submitted jobs.
76

77
  """
78
  def __init__(self, ops, job_id):
79
    if not ops:
80
      # TODO
81
      raise Exception("No opcodes")
82

    
83
    self.id = job_id
84
    self._lock = threading.Lock()
85

    
86
    # _ops should not be modified again because we don't acquire the lock
87
    # to use it.
88
    self._ops = [_QueuedOpCode(op) for op in ops]
89

    
90
  def GetStatus(self):
91
    status = constants.JOB_STATUS_QUEUED
92

    
93
    all_success = True
94
    for op in self._ops:
95
      op_status = op.GetStatus()
96
      if op_status == constants.OP_STATUS_SUCCESS:
97
        continue
98

    
99
      all_success = False
100

    
101
      if op_status == constants.OP_STATUS_QUEUED:
102
        pass
103
      elif op_status == constants.OP_STATUS_ERROR:
104
        status = constants.JOB_STATUS_ERROR
105
      elif op_status == constants.OP_STATUS_RUNNING:
106
        status = constants.JOB_STATUS_RUNNING
107

    
108
    if all_success:
109
      status = constants.JOB_STATUS_SUCCESS
110

    
111
    return status
112

    
113
  def Run(self, proc):
114
    """Job executor.
115

116
    This functions processes a this job in the context of given processor
117
    instance.
118

119
    Args:
120
    - proc: Ganeti Processor to run the job with
121

122
    """
123
    try:
124
      count = len(self._ops)
125
      for idx, op in enumerate(self._ops):
126
        try:
127
          logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
128
          op.SetStatus(constants.OP_STATUS_RUNNING, None)
129

    
130
          result = proc.ExecOpCode(op.input)
131

    
132
          op.SetStatus(constants.OP_STATUS_SUCCESS, result)
133
          logging.debug("Op %s/%s: Successfully finished %s",
134
                        idx + 1, count, op)
135
        except Exception, err:
136
          op.SetStatus(constants.OP_STATUS_ERROR, str(err))
137
          logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
138
          raise
139

    
140
    except errors.GenericError, err:
141
      logging.error("ganeti exception %s", exc_info=err)
142
    except Exception, err:
143
      logging.error("unhandled exception %s", exc_info=err)
144
    except:
145
      logging.error("unhandled unknown exception %s", exc_info=err)
146

    
147

    
148
class _JobQueueWorker(workerpool.BaseWorker):
149
  def RunTask(self, job):
150
    logging.debug("Worker %s processing job %s",
151
                  self.worker_id, job.id)
152
    # TODO: feedback function
153
    proc = mcpu.Processor(self.pool.context, feedback=lambda x: None)
154
    try:
155
      job.Run(proc)
156
    finally:
157
      logging.debug("Worker %s finished job %s, status = %s",
158
                    self.worker_id, job.id, job.GetStatus())
159

    
160

    
161
class _JobQueueWorkerPool(workerpool.WorkerPool):
162
  def __init__(self, context):
163
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
164
                                              _JobQueueWorker)
165
    self.context = context
166

    
167

    
168
class JobQueue:
169
  """The job queue.
170

171
   """
172
  def __init__(self, context):
173
    self._lock = threading.Lock()
174
    self._last_job_id = 0
175
    self._jobs = {}
176
    self._wpool = _JobQueueWorkerPool(context)
177

    
178
  def _NewJobIdUnlocked(self):
179
    """Generates a new job identifier.
180

181
    Returns: A string representing the job identifier.
182

183
    """
184
    self._last_job_id += 1
185
    return str(self._last_job_id)
186

    
187
  def SubmitJob(self, ops):
188
    """Add a new job to the queue.
189

190
    This enters the job into our job queue and also puts it on the new
191
    queue, in order for it to be picked up by the queue processors.
192

193
    Args:
194
    - ops: Sequence of opcodes
195

196
    """
197
    # Get job identifier
198
    self._lock.acquire()
199
    try:
200
      job_id = self._NewJobIdUnlocked()
201
    finally:
202
      self._lock.release()
203

    
204
    job = _QueuedJob(ops, job_id)
205

    
206
    # Add it to our internal queue
207
    self._lock.acquire()
208
    try:
209
      self._jobs[job_id] = job
210
    finally:
211
      self._lock.release()
212

    
213
    # Add to worker pool
214
    self._wpool.AddTask(job)
215

    
216
    return job_id
217

    
218
  def ArchiveJob(self, job_id):
219
    raise NotImplementedError()
220

    
221
  def CancelJob(self, job_id):
222
    raise NotImplementedError()
223

    
224
  def _GetJobInfo(self, job, fields):
225
    row = []
226
    for fname in fields:
227
      if fname == "id":
228
        row.append(job.id)
229
      elif fname == "status":
230
        row.append(job.GetStatus())
231
      elif fname == "result":
232
        # TODO
233
        row.append([op.GetResult() for op in job._ops])
234
      else:
235
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
236
    return row
237

    
238
  def QueryJobs(self, job_ids, fields):
239
    """Returns a list of jobs in queue.
240

241
    Args:
242
    - job_ids: Sequence of job identifiers or None for all
243
    - fields: Names of fields to return
244

245
    """
246
    self._lock.acquire()
247
    try:
248
      if not job_ids:
249
        job_ids = self._jobs.keys()
250

    
251
      # TODO: define sort order?
252
      job_ids.sort()
253

    
254
      jobs = []
255

    
256
      for job_id in job_ids:
257
        job = self._jobs.get(job_id, None)
258
        if job is None:
259
          jobs.append(None)
260
        else:
261
          jobs.append(self._GetJobInfo(job, fields))
262

    
263
      return jobs
264
    finally:
265
      self._lock.release()
266

    
267
  def Shutdown(self):
268
    """Stops the job queue.
269

270
    """
271
    self._wpool.TerminateWorkers()