Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ af30b2fd

History | View | Annotate | Download (6.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import logging
25
import threading
26

    
27
from ganeti import constants
28
from ganeti import workerpool
29
from ganeti import errors
30
from ganeti import mcpu
31
from ganeti import utils
32

    
33

    
34
JOBQUEUE_THREADS = 5
35

    
36

    
37
class _QueuedOpCode(object):
38
  """Encasulates an opcode object.
39

40
  Access is synchronized by the '_lock' attribute.
41

42
  """
43
  def __init__(self, op):
44
    self.input = op
45
    self.status = constants.OP_STATUS_QUEUED
46
    self.result = None
47
    self._lock = threading.Lock()
48

    
49
  @utils.LockedMethod
50
  def GetInput(self):
51
    """Returns the original opcode.
52

53
    """
54
    return self.input
55

    
56
  @utils.LockedMethod
57
  def SetStatus(self, status, result):
58
    """Update the opcode status and result.
59

60
    """
61
    self.status = status
62
    self.result = result
63

    
64
  @utils.LockedMethod
65
  def GetStatus(self):
66
    """Get the opcode status.
67

68
    """
69
    return self.status
70

    
71
  @utils.LockedMethod
72
  def GetResult(self):
73
    """Get the opcode result.
74

75
    """
76
    return self.result
77

    
78

    
79
class _QueuedJob(object):
80
  """In-memory job representation.
81

82
  This is what we use to track the user-submitted jobs.
83

84
  """
85
  def __init__(self, ops, job_id):
86
    if not ops:
87
      # TODO
88
      raise Exception("No opcodes")
89

    
90
    self.id = job_id
91
    self._lock = threading.Lock()
92

    
93
    # _ops should not be modified again because we don't acquire the lock
94
    # to use it.
95
    self._ops = [_QueuedOpCode(op) for op in ops]
96

    
97
  def GetStatus(self):
98
    status = constants.JOB_STATUS_QUEUED
99

    
100
    all_success = True
101
    for op in self._ops:
102
      op_status = op.GetStatus()
103
      if op_status == constants.OP_STATUS_SUCCESS:
104
        continue
105

    
106
      all_success = False
107

    
108
      if op_status == constants.OP_STATUS_QUEUED:
109
        pass
110
      elif op_status == constants.OP_STATUS_ERROR:
111
        status = constants.JOB_STATUS_ERROR
112
      elif op_status == constants.OP_STATUS_RUNNING:
113
        status = constants.JOB_STATUS_RUNNING
114

    
115
    if all_success:
116
      status = constants.JOB_STATUS_SUCCESS
117

    
118
    return status
119

    
120
  def Run(self, proc):
121
    """Job executor.
122

123
    This functions processes a this job in the context of given processor
124
    instance.
125

126
    Args:
127
    - proc: Ganeti Processor to run the job with
128

129
    """
130
    try:
131
      count = len(self._ops)
132
      for idx, op in enumerate(self._ops):
133
        try:
134
          logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
135
          op.SetStatus(constants.OP_STATUS_RUNNING, None)
136

    
137
          result = proc.ExecOpCode(op.input)
138

    
139
          op.SetStatus(constants.OP_STATUS_SUCCESS, result)
140
          logging.debug("Op %s/%s: Successfully finished %s",
141
                        idx + 1, count, op)
142
        except Exception, err:
143
          op.SetStatus(constants.OP_STATUS_ERROR, str(err))
144
          logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
145
          raise
146

    
147
    except errors.GenericError, err:
148
      logging.error("ganeti exception %s", exc_info=err)
149
    except Exception, err:
150
      logging.error("unhandled exception %s", exc_info=err)
151
    except:
152
      logging.error("unhandled unknown exception %s", exc_info=err)
153

    
154

    
155
class _JobQueueWorker(workerpool.BaseWorker):
156
  def RunTask(self, job):
157
    logging.debug("Worker %s processing job %s",
158
                  self.worker_id, job.id)
159
    # TODO: feedback function
160
    proc = mcpu.Processor(self.pool.context, feedback=lambda x: None)
161
    try:
162
      job.Run(proc)
163
    finally:
164
      logging.debug("Worker %s finished job %s, status = %s",
165
                    self.worker_id, job.id, job.GetStatus())
166

    
167

    
168
class _JobQueueWorkerPool(workerpool.WorkerPool):
169
  def __init__(self, context):
170
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
171
                                              _JobQueueWorker)
172
    self.context = context
173

    
174

    
175
class JobQueue:
176
  """The job queue.
177

178
   """
179
  def __init__(self, context):
180
    self._lock = threading.Lock()
181
    self._last_job_id = 0
182
    self._jobs = {}
183
    self._wpool = _JobQueueWorkerPool(context)
184

    
185
  def _NewJobIdUnlocked(self):
186
    """Generates a new job identifier.
187

188
    Returns: A string representing the job identifier.
189

190
    """
191
    self._last_job_id += 1
192
    return str(self._last_job_id)
193

    
194
  def SubmitJob(self, ops):
195
    """Add a new job to the queue.
196

197
    This enters the job into our job queue and also puts it on the new
198
    queue, in order for it to be picked up by the queue processors.
199

200
    Args:
201
    - ops: Sequence of opcodes
202

203
    """
204
    # Get job identifier
205
    self._lock.acquire()
206
    try:
207
      job_id = self._NewJobIdUnlocked()
208
    finally:
209
      self._lock.release()
210

    
211
    job = _QueuedJob(ops, job_id)
212

    
213
    # Add it to our internal queue
214
    self._lock.acquire()
215
    try:
216
      self._jobs[job_id] = job
217
    finally:
218
      self._lock.release()
219

    
220
    # Add to worker pool
221
    self._wpool.AddTask(job)
222

    
223
    return job_id
224

    
225
  def ArchiveJob(self, job_id):
226
    raise NotImplementedError()
227

    
228
  def CancelJob(self, job_id):
229
    raise NotImplementedError()
230

    
231
  def _GetJobInfo(self, job, fields):
232
    row = []
233
    for fname in fields:
234
      if fname == "id":
235
        row.append(job.id)
236
      elif fname == "status":
237
        row.append(job.GetStatus())
238
      elif fname == "ops":
239
        row.append([op.GetInput().__getstate__() for op in job._ops])
240
      elif fname == "opresult":
241
        row.append([op.GetResult() for op in job._ops])
242
      elif fname == "opstatus":
243
        row.append([op.GetStatus() for op in job._ops])
244
      else:
245
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
246
    return row
247

    
248
  def QueryJobs(self, job_ids, fields):
249
    """Returns a list of jobs in queue.
250

251
    Args:
252
    - job_ids: Sequence of job identifiers or None for all
253
    - fields: Names of fields to return
254

255
    """
256
    self._lock.acquire()
257
    try:
258
      if not job_ids:
259
        job_ids = self._jobs.keys()
260

    
261
      # TODO: define sort order?
262
      job_ids.sort()
263

    
264
      jobs = []
265

    
266
      for job_id in job_ids:
267
        job = self._jobs.get(job_id, None)
268
        if job is None:
269
          jobs.append(None)
270
        else:
271
          jobs.append(self._GetJobInfo(job, fields))
272

    
273
      return jobs
274
    finally:
275
      self._lock.release()
276

    
277
  def Shutdown(self):
278
    """Stops the job queue.
279

280
    """
281
    self._wpool.TerminateWorkers()