Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ f1da30e6

History | View | Annotate | Download (11.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import os
25
import logging
26
import threading
27
import errno
28
import re
29

    
30
from ganeti import constants
31
from ganeti import serializer
32
from ganeti import workerpool
33
from ganeti import opcodes
34
from ganeti import errors
35
from ganeti import mcpu
36
from ganeti import utils
37

    
38

    
39
JOBQUEUE_THREADS = 5
40

    
41

    
42
class _QueuedOpCode(object):
43
  """Encasulates an opcode object.
44

45
  Access is synchronized by the '_lock' attribute.
46

47
  """
48
  def __init__(self, op):
49
    self.__Setup(op, constants.OP_STATUS_QUEUED, None)
50

    
51
  def __Setup(self, input, status, result):
52
    self._lock = threading.Lock()
53
    self.input = input
54
    self.status = status
55
    self.result = result
56

    
57
  @classmethod
58
  def Restore(cls, state):
59
    obj = object.__new__(cls)
60
    obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
61
                state["status"], state["result"])
62
    return obj
63

    
64
  @utils.LockedMethod
65
  def Serialize(self):
66
    return {
67
      "input": self.input.__getstate__(),
68
      "status": self.status,
69
      "result": self.result,
70
      }
71

    
72
  @utils.LockedMethod
73
  def GetInput(self):
74
    """Returns the original opcode.
75

76
    """
77
    return self.input
78

    
79
  @utils.LockedMethod
80
  def SetStatus(self, status, result):
81
    """Update the opcode status and result.
82

83
    """
84
    self.status = status
85
    self.result = result
86

    
87
  @utils.LockedMethod
88
  def GetStatus(self):
89
    """Get the opcode status.
90

91
    """
92
    return self.status
93

    
94
  @utils.LockedMethod
95
  def GetResult(self):
96
    """Get the opcode result.
97

98
    """
99
    return self.result
100

    
101

    
102
class _QueuedJob(object):
103
  """In-memory job representation.
104

105
  This is what we use to track the user-submitted jobs.
106

107
  """
108
  def __init__(self, storage, job_id, ops):
109
    if not ops:
110
      # TODO
111
      raise Exception("No opcodes")
112

    
113
    self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops])
114

    
115
  def __Setup(self, storage, job_id, ops):
116
    self.storage = storage
117
    self.id = job_id
118
    self._ops = ops
119

    
120
  @classmethod
121
  def Restore(cls, storage, state):
122
    obj = object.__new__(cls)
123
    obj.__Setup(storage, state["id"],
124
                [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]])
125
    return obj
126

    
127
  def Serialize(self):
128
    return {
129
      "id": self.id,
130
      "ops": [op.Serialize() for op in self._ops],
131
      }
132

    
133
  def SetUnclean(self, msg):
134
    try:
135
      for op in self._ops:
136
        op.SetStatus(constants.OP_STATUS_ERROR, msg)
137
    finally:
138
      self.storage.UpdateJob(self)
139

    
140
  def GetStatus(self):
141
    status = constants.JOB_STATUS_QUEUED
142

    
143
    all_success = True
144
    for op in self._ops:
145
      op_status = op.GetStatus()
146
      if op_status == constants.OP_STATUS_SUCCESS:
147
        continue
148

    
149
      all_success = False
150

    
151
      if op_status == constants.OP_STATUS_QUEUED:
152
        pass
153
      elif op_status == constants.OP_STATUS_RUNNING:
154
        status = constants.JOB_STATUS_RUNNING
155
      elif op_status == constants.OP_STATUS_ERROR:
156
        status = constants.JOB_STATUS_ERROR
157
        # The whole job fails if one opcode failed
158
        break
159

    
160
    if all_success:
161
      status = constants.JOB_STATUS_SUCCESS
162

    
163
    return status
164

    
165
  def Run(self, proc):
166
    """Job executor.
167

168
    This functions processes a this job in the context of given processor
169
    instance.
170

171
    Args:
172
    - proc: Ganeti Processor to run the job with
173

174
    """
175
    try:
176
      count = len(self._ops)
177
      for idx, op in enumerate(self._ops):
178
        try:
179
          logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
180
          op.SetStatus(constants.OP_STATUS_RUNNING, None)
181
          self.storage.UpdateJob(self)
182

    
183
          result = proc.ExecOpCode(op.input)
184

    
185
          op.SetStatus(constants.OP_STATUS_SUCCESS, result)
186
          self.storage.UpdateJob(self)
187
          logging.debug("Op %s/%s: Successfully finished %s",
188
                        idx + 1, count, op)
189
        except Exception, err:
190
          try:
191
            op.SetStatus(constants.OP_STATUS_ERROR, str(err))
192
            logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
193
          finally:
194
            self.storage.UpdateJob(self)
195
          raise
196

    
197
    except errors.GenericError, err:
198
      logging.error("ganeti exception %s", exc_info=err)
199
    except Exception, err:
200
      logging.error("unhandled exception %s", exc_info=err)
201
    except:
202
      logging.error("unhandled unknown exception %s", exc_info=err)
203

    
204

    
205
class _JobQueueWorker(workerpool.BaseWorker):
206
  def RunTask(self, job):
207
    logging.debug("Worker %s processing job %s",
208
                  self.worker_id, job.id)
209
    # TODO: feedback function
210
    proc = mcpu.Processor(self.pool.context, feedback=lambda x: None)
211
    try:
212
      job.Run(proc)
213
    finally:
214
      logging.debug("Worker %s finished job %s, status = %s",
215
                    self.worker_id, job.id, job.GetStatus())
216

    
217

    
218
class _JobQueueWorkerPool(workerpool.WorkerPool):
219
  def __init__(self, context):
220
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
221
                                              _JobQueueWorker)
222
    self.context = context
223

    
224

    
225
class JobStorage(object):
226
  _RE_JOB_FILE = re.compile(r"^job-\d+$")
227

    
228
  def __init__(self):
229
    self._lock = threading.Lock()
230

    
231
    # Make sure our directory exists
232
    try:
233
      os.mkdir(constants.QUEUE_DIR, 0700)
234
    except OSError, err:
235
      if err.errno not in (errno.EEXIST, ):
236
        raise
237

    
238
    # Get queue lock
239
    self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
240
    try:
241
      utils.LockFile(self.lock_fd)
242
    except:
243
      self.lock_fd.close()
244
      raise
245

    
246
    # Read version
247
    try:
248
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
249
    except IOError, err:
250
      if err.errno not in (errno.ENOENT, ):
251
        raise
252

    
253
      # Setup a new queue
254
      self._InitQueueUnlocked()
255

    
256
      # Try to open again
257
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
258

    
259
    try:
260
      # Try to read version
261
      version = int(version_fd.read(128))
262

    
263
      # Verify version
264
      if version != constants.JOB_QUEUE_VERSION:
265
        raise errors.JobQueueError("Found version %s, expected %s",
266
                                   version, constants.JOB_QUEUE_VERSION)
267
    finally:
268
      version_fd.close()
269

    
270
    serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
271
    try:
272
      # Read last serial
273
      self._last_serial = int(serial_fd.read(1024).strip())
274
    finally:
275
      serial_fd.close()
276

    
277
  def Close(self):
278
    assert self.lock_fd, "Queue should be open"
279

    
280
    self.lock_fd.close()
281
    self.lock_fd = None
282

    
283
  def _InitQueueUnlocked(self):
284
    assert self.lock_fd, "Queue should be open"
285

    
286
    utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
287
                    data="%s\n" % constants.JOB_QUEUE_VERSION)
288
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
289
                    data="%s\n" % 0)
290

    
291
  def _NewSerialUnlocked(self):
292
    """Generates a new job identifier.
293

294
    Job identifiers are unique during the lifetime of a cluster.
295

296
    Returns: A string representing the job identifier.
297

298
    """
299
    assert self.lock_fd, "Queue should be open"
300

    
301
    # New number
302
    serial = self._last_serial + 1
303

    
304
    # Write to file
305
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
306
                    data="%s\n" % serial)
307

    
308
    # Keep it only if we were able to write the file
309
    self._last_serial = serial
310

    
311
    return serial
312

    
313
  def _GetJobPath(self, job_id):
314
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
315

    
316
  def _ListJobFiles(self):
317
    assert self.lock_fd, "Queue should be open"
318

    
319
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
320
            if self._RE_JOB_FILE.match(name)]
321

    
322
  def _LoadJobUnlocked(self, filepath):
323
    assert self.lock_fd, "Queue should be open"
324

    
325
    logging.debug("Loading job from %s", filepath)
326
    try:
327
      fd = open(filepath, "r")
328
    except IOError, err:
329
      if err.errno in (errno.ENOENT, ):
330
        return None
331
      raise
332
    try:
333
      data = serializer.LoadJson(fd.read())
334
    finally:
335
      fd.close()
336

    
337
    return _QueuedJob.Restore(self, data)
338

    
339
  def _GetJobsUnlocked(self, job_ids):
340
    if job_ids:
341
      files = [self._GetJobPath(job_id) for job_id in job_ids]
342
    else:
343
      files = [os.path.join(constants.QUEUE_DIR, filename)
344
               for filename in self._ListJobFiles()]
345

    
346
    return [self._LoadJobUnlocked(filepath) for filepath in files]
347

    
348
  @utils.LockedMethod
349
  def GetJobs(self, job_ids):
350
    return self._GetJobsUnlocked(job_ids)
351

    
352
  @utils.LockedMethod
353
  def AddJob(self, ops):
354
    assert self.lock_fd, "Queue should be open"
355

    
356
    # Get job identifier
357
    job_id = self._NewSerialUnlocked()
358
    job = _QueuedJob(self, job_id, ops)
359

    
360
    # Write to disk
361
    self._UpdateJobUnlocked(job)
362

    
363
    return job
364

    
365
  def _UpdateJobUnlocked(self, job):
366
    assert self.lock_fd, "Queue should be open"
367

    
368
    filename = self._GetJobPath(job.id)
369
    logging.debug("Writing job %s to %s", job.id, filename)
370
    utils.WriteFile(filename,
371
                    data=serializer.DumpJson(job.Serialize(), indent=False))
372

    
373
  @utils.LockedMethod
374
  def UpdateJob(self, job):
375
    return self._UpdateJobUnlocked(job)
376

    
377
  def ArchiveJob(self, job_id):
378
    raise NotImplementedError()
379

    
380

    
381
class JobQueue:
382
  """The job queue.
383

384
   """
385
  def __init__(self, context):
386
    self._lock = threading.Lock()
387
    self._jobs = JobStorage()
388
    self._wpool = _JobQueueWorkerPool(context)
389

    
390
    for job in self._jobs.GetJobs(None):
391
      status = job.GetStatus()
392
      if status in (constants.JOB_STATUS_QUEUED, ):
393
        self._wpool.AddTask(job)
394

    
395
      elif status in (constants.JOB_STATUS_RUNNING, ):
396
        logging.warning("Unfinished job %s found: %s", job.id, job)
397
        job.SetUnclean("Unclean master daemon shutdown")
398

    
399
  @utils.LockedMethod
400
  def SubmitJob(self, ops):
401
    """Add a new job to the queue.
402

403
    This enters the job into our job queue and also puts it on the new
404
    queue, in order for it to be picked up by the queue processors.
405

406
    Args:
407
    - ops: Sequence of opcodes
408

409
    """
410
    job = self._jobs.AddJob(ops)
411

    
412
    # Add to worker pool
413
    self._wpool.AddTask(job)
414

    
415
    return job.id
416

    
417
  def ArchiveJob(self, job_id):
418
    raise NotImplementedError()
419

    
420
  def CancelJob(self, job_id):
421
    raise NotImplementedError()
422

    
423
  def _GetJobInfo(self, job, fields):
424
    row = []
425
    for fname in fields:
426
      if fname == "id":
427
        row.append(job.id)
428
      elif fname == "status":
429
        row.append(job.GetStatus())
430
      elif fname == "ops":
431
        row.append([op.GetInput().__getstate__() for op in job._ops])
432
      elif fname == "opresult":
433
        row.append([op.GetResult() for op in job._ops])
434
      elif fname == "opstatus":
435
        row.append([op.GetStatus() for op in job._ops])
436
      else:
437
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
438
    return row
439

    
440
  def QueryJobs(self, job_ids, fields):
441
    """Returns a list of jobs in queue.
442

443
    Args:
444
    - job_ids: Sequence of job identifiers or None for all
445
    - fields: Names of fields to return
446

447
    """
448
    self._lock.acquire()
449
    try:
450
      jobs = []
451

    
452
      for job in self._jobs.GetJobs(job_ids):
453
        if job is None:
454
          jobs.append(None)
455
        else:
456
          jobs.append(self._GetJobInfo(job, fields))
457

    
458
      return jobs
459
    finally:
460
      self._lock.release()
461

    
462
  @utils.LockedMethod
463
  def Shutdown(self):
464
    """Stops the job queue.
465

466
    """
467
    self._wpool.TerminateWorkers()
468
    self._jobs.Close()