Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 911a495b

History | View | Annotate | Download (12 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import os
25
import logging
26
import threading
27
import errno
28
import re
29

    
30
from ganeti import constants
31
from ganeti import serializer
32
from ganeti import workerpool
33
from ganeti import opcodes
34
from ganeti import errors
35
from ganeti import mcpu
36
from ganeti import utils
37

    
38

    
39
JOBQUEUE_THREADS = 5
40

    
41

    
42
class _QueuedOpCode(object):
43
  """Encasulates an opcode object.
44

45
  Access is synchronized by the '_lock' attribute.
46

47
  """
48
  def __init__(self, op):
49
    self.__Setup(op, constants.OP_STATUS_QUEUED, None)
50

    
51
  def __Setup(self, input, status, result):
52
    self._lock = threading.Lock()
53
    self.input = input
54
    self.status = status
55
    self.result = result
56

    
57
  @classmethod
58
  def Restore(cls, state):
59
    obj = object.__new__(cls)
60
    obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
61
                state["status"], state["result"])
62
    return obj
63

    
64
  @utils.LockedMethod
65
  def Serialize(self):
66
    return {
67
      "input": self.input.__getstate__(),
68
      "status": self.status,
69
      "result": self.result,
70
      }
71

    
72
  @utils.LockedMethod
73
  def GetInput(self):
74
    """Returns the original opcode.
75

76
    """
77
    return self.input
78

    
79
  @utils.LockedMethod
80
  def SetStatus(self, status, result):
81
    """Update the opcode status and result.
82

83
    """
84
    self.status = status
85
    self.result = result
86

    
87
  @utils.LockedMethod
88
  def GetStatus(self):
89
    """Get the opcode status.
90

91
    """
92
    return self.status
93

    
94
  @utils.LockedMethod
95
  def GetResult(self):
96
    """Get the opcode result.
97

98
    """
99
    return self.result
100

    
101

    
102
class _QueuedJob(object):
103
  """In-memory job representation.
104

105
  This is what we use to track the user-submitted jobs.
106

107
  """
108
  def __init__(self, storage, job_id, ops):
109
    if not ops:
110
      # TODO
111
      raise Exception("No opcodes")
112

    
113
    self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops])
114

    
115
  def __Setup(self, storage, job_id, ops):
116
    self.storage = storage
117
    self.id = job_id
118
    self._ops = ops
119

    
120
  @classmethod
121
  def Restore(cls, storage, state):
122
    obj = object.__new__(cls)
123
    obj.__Setup(storage, state["id"],
124
                [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]])
125
    return obj
126

    
127
  def Serialize(self):
128
    return {
129
      "id": self.id,
130
      "ops": [op.Serialize() for op in self._ops],
131
      }
132

    
133
  def SetUnclean(self, msg):
134
    try:
135
      for op in self._ops:
136
        op.SetStatus(constants.OP_STATUS_ERROR, msg)
137
    finally:
138
      self.storage.UpdateJob(self)
139

    
140
  def GetStatus(self):
141
    status = constants.JOB_STATUS_QUEUED
142

    
143
    all_success = True
144
    for op in self._ops:
145
      op_status = op.GetStatus()
146
      if op_status == constants.OP_STATUS_SUCCESS:
147
        continue
148

    
149
      all_success = False
150

    
151
      if op_status == constants.OP_STATUS_QUEUED:
152
        pass
153
      elif op_status == constants.OP_STATUS_RUNNING:
154
        status = constants.JOB_STATUS_RUNNING
155
      elif op_status == constants.OP_STATUS_ERROR:
156
        status = constants.JOB_STATUS_ERROR
157
        # The whole job fails if one opcode failed
158
        break
159

    
160
    if all_success:
161
      status = constants.JOB_STATUS_SUCCESS
162

    
163
    return status
164

    
165
  def Run(self, proc):
166
    """Job executor.
167

168
    This functions processes a this job in the context of given processor
169
    instance.
170

171
    Args:
172
    - proc: Ganeti Processor to run the job with
173

174
    """
175
    try:
176
      count = len(self._ops)
177
      for idx, op in enumerate(self._ops):
178
        try:
179
          logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
180
          op.SetStatus(constants.OP_STATUS_RUNNING, None)
181
          self.storage.UpdateJob(self)
182

    
183
          result = proc.ExecOpCode(op.input)
184

    
185
          op.SetStatus(constants.OP_STATUS_SUCCESS, result)
186
          self.storage.UpdateJob(self)
187
          logging.debug("Op %s/%s: Successfully finished %s",
188
                        idx + 1, count, op)
189
        except Exception, err:
190
          try:
191
            op.SetStatus(constants.OP_STATUS_ERROR, str(err))
192
            logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
193
          finally:
194
            self.storage.UpdateJob(self)
195
          raise
196

    
197
    except errors.GenericError, err:
198
      logging.error("ganeti exception %s", exc_info=err)
199
    except Exception, err:
200
      logging.error("unhandled exception %s", exc_info=err)
201
    except:
202
      logging.error("unhandled unknown exception %s", exc_info=err)
203

    
204

    
205
class _JobQueueWorker(workerpool.BaseWorker):
206
  def RunTask(self, job):
207
    logging.debug("Worker %s processing job %s",
208
                  self.worker_id, job.id)
209
    # TODO: feedback function
210
    proc = mcpu.Processor(self.pool.context, feedback=lambda x: None)
211
    try:
212
      job.Run(proc)
213
    finally:
214
      logging.debug("Worker %s finished job %s, status = %s",
215
                    self.worker_id, job.id, job.GetStatus())
216

    
217

    
218
class _JobQueueWorkerPool(workerpool.WorkerPool):
219
  def __init__(self, context):
220
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
221
                                              _JobQueueWorker)
222
    self.context = context
223

    
224

    
225
class JobStorage(object):
226
  _RE_JOB_FILE = re.compile(r"^job-(\d+)$")
227

    
228
  def __init__(self):
229
    self._lock = threading.Lock()
230

    
231
    # Make sure our directory exists
232
    try:
233
      os.mkdir(constants.QUEUE_DIR, 0700)
234
    except OSError, err:
235
      if err.errno not in (errno.EEXIST, ):
236
        raise
237

    
238
    # Get queue lock
239
    self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
240
    try:
241
      utils.LockFile(self.lock_fd)
242
    except:
243
      self.lock_fd.close()
244
      raise
245

    
246
    # Read version
247
    try:
248
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
249
    except IOError, err:
250
      if err.errno not in (errno.ENOENT, ):
251
        raise
252

    
253
      # Setup a new queue
254
      self._InitQueueUnlocked()
255

    
256
      # Try to open again
257
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
258

    
259
    try:
260
      # Try to read version
261
      version = int(version_fd.read(128))
262

    
263
      # Verify version
264
      if version != constants.JOB_QUEUE_VERSION:
265
        raise errors.JobQueueError("Found version %s, expected %s",
266
                                   version, constants.JOB_QUEUE_VERSION)
267
    finally:
268
      version_fd.close()
269

    
270
    serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
271
    try:
272
      # Read last serial
273
      self._last_serial = int(serial_fd.read(1024).strip())
274
    finally:
275
      serial_fd.close()
276

    
277
  def Close(self):
278
    assert self.lock_fd, "Queue should be open"
279

    
280
    self.lock_fd.close()
281
    self.lock_fd = None
282

    
283
  def _InitQueueUnlocked(self):
284
    assert self.lock_fd, "Queue should be open"
285

    
286
    utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
287
                    data="%s\n" % constants.JOB_QUEUE_VERSION)
288
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
289
                    data="%s\n" % 0)
290

    
291
  def _NewSerialUnlocked(self):
292
    """Generates a new job identifier.
293

294
    Job identifiers are unique during the lifetime of a cluster.
295

296
    Returns: A string representing the job identifier.
297

298
    """
299
    assert self.lock_fd, "Queue should be open"
300

    
301
    # New number
302
    serial = self._last_serial + 1
303

    
304
    # Write to file
305
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
306
                    data="%s\n" % serial)
307

    
308
    # Keep it only if we were able to write the file
309
    self._last_serial = serial
310

    
311
    return serial
312

    
313
  def _GetJobPath(self, job_id):
314
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
315

    
316
  def _GetJobIDsUnlocked(self, archived=False):
317
    """Return all known job IDs.
318

319
    If the parameter archived is True, archived jobs IDs will be
320
    included. Currently this argument is unused.
321

322
    """
323
    jfiles = self._ListJobFiles()
324
    return [m.group(1) for m in
325
            [self._RE_JOB_FILE.match(name) for name in jfiles]]
326

    
327
  def _ListJobFiles(self):
328
    assert self.lock_fd, "Queue should be open"
329

    
330
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
331
            if self._RE_JOB_FILE.match(name)]
332

    
333
  def _LoadJobUnlocked(self, job_id):
334
    assert self.lock_fd, "Queue should be open"
335

    
336
    filepath = self._GetJobPath(job_id)
337
    logging.debug("Loading job from %s", filepath)
338
    try:
339
      fd = open(filepath, "r")
340
    except IOError, err:
341
      if err.errno in (errno.ENOENT, ):
342
        return None
343
      raise
344
    try:
345
      data = serializer.LoadJson(fd.read())
346
    finally:
347
      fd.close()
348

    
349
    return _QueuedJob.Restore(self, data)
350

    
351
  def _GetJobsUnlocked(self, job_ids):
352
    if not job_ids:
353
      job_ids = self._GetJobIDsUnlocked()
354

    
355
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
356

    
357
  @utils.LockedMethod
358
  def GetJobs(self, job_ids):
359
    return self._GetJobsUnlocked(job_ids)
360

    
361
  @utils.LockedMethod
362
  def AddJob(self, ops):
363
    assert self.lock_fd, "Queue should be open"
364

    
365
    # Get job identifier
366
    job_id = self._NewSerialUnlocked()
367
    job = _QueuedJob(self, job_id, ops)
368

    
369
    # Write to disk
370
    self._UpdateJobUnlocked(job)
371

    
372
    return job
373

    
374
  def _UpdateJobUnlocked(self, job):
375
    assert self.lock_fd, "Queue should be open"
376

    
377
    filename = self._GetJobPath(job.id)
378
    logging.debug("Writing job %s to %s", job.id, filename)
379
    utils.WriteFile(filename,
380
                    data=serializer.DumpJson(job.Serialize(), indent=False))
381

    
382
  @utils.LockedMethod
383
  def UpdateJob(self, job):
384
    return self._UpdateJobUnlocked(job)
385

    
386
  def ArchiveJob(self, job_id):
387
    raise NotImplementedError()
388

    
389

    
390
class JobQueue:
391
  """The job queue.
392

393
   """
394
  def __init__(self, context):
395
    self._lock = threading.Lock()
396
    self._jobs = JobStorage()
397
    self._wpool = _JobQueueWorkerPool(context)
398

    
399
    for job in self._jobs.GetJobs(None):
400
      status = job.GetStatus()
401
      if status in (constants.JOB_STATUS_QUEUED, ):
402
        self._wpool.AddTask(job)
403

    
404
      elif status in (constants.JOB_STATUS_RUNNING, ):
405
        logging.warning("Unfinished job %s found: %s", job.id, job)
406
        job.SetUnclean("Unclean master daemon shutdown")
407

    
408
  @utils.LockedMethod
409
  def SubmitJob(self, ops):
410
    """Add a new job to the queue.
411

412
    This enters the job into our job queue and also puts it on the new
413
    queue, in order for it to be picked up by the queue processors.
414

415
    Args:
416
    - ops: Sequence of opcodes
417

418
    """
419
    job = self._jobs.AddJob(ops)
420

    
421
    # Add to worker pool
422
    self._wpool.AddTask(job)
423

    
424
    return job.id
425

    
426
  def ArchiveJob(self, job_id):
427
    raise NotImplementedError()
428

    
429
  def CancelJob(self, job_id):
430
    raise NotImplementedError()
431

    
432
  def _GetJobInfo(self, job, fields):
433
    row = []
434
    for fname in fields:
435
      if fname == "id":
436
        row.append(job.id)
437
      elif fname == "status":
438
        row.append(job.GetStatus())
439
      elif fname == "ops":
440
        row.append([op.GetInput().__getstate__() for op in job._ops])
441
      elif fname == "opresult":
442
        row.append([op.GetResult() for op in job._ops])
443
      elif fname == "opstatus":
444
        row.append([op.GetStatus() for op in job._ops])
445
      else:
446
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
447
    return row
448

    
449
  def QueryJobs(self, job_ids, fields):
450
    """Returns a list of jobs in queue.
451

452
    Args:
453
    - job_ids: Sequence of job identifiers or None for all
454
    - fields: Names of fields to return
455

456
    """
457
    self._lock.acquire()
458
    try:
459
      jobs = []
460

    
461
      for job in self._jobs.GetJobs(job_ids):
462
        if job is None:
463
          jobs.append(None)
464
        else:
465
          jobs.append(self._GetJobInfo(job, fields))
466

    
467
      return jobs
468
    finally:
469
      self._lock.release()
470

    
471
  @utils.LockedMethod
472
  def Shutdown(self):
473
    """Stops the job queue.
474

475
    """
476
    self._wpool.TerminateWorkers()
477
    self._jobs.Close()