Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ f1048938

History | View | Annotate | Download (14.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import os
25
import logging
26
import threading
27
import errno
28
import re
29
import time
30

    
31
from ganeti import constants
32
from ganeti import serializer
33
from ganeti import workerpool
34
from ganeti import opcodes
35
from ganeti import errors
36
from ganeti import mcpu
37
from ganeti import utils
38

    
39

    
40
JOBQUEUE_THREADS = 5
41

    
42

    
43
class _QueuedOpCode(object):
44
  """Encasulates an opcode object.
45

46
  Access is synchronized by the '_lock' attribute.
47

48
  The 'log' attribute holds the execution log and consists of tuples
49
  of the form (timestamp, level, message).
50

51
  """
52
  def __init__(self, op):
53
    self.__Setup(op, constants.OP_STATUS_QUEUED, None, [])
54

    
55
  def __Setup(self, input_, status, result, log):
56
    self._lock = threading.Lock()
57
    self.input = input_
58
    self.status = status
59
    self.result = result
60
    self.log = log
61

    
62
  @classmethod
63
  def Restore(cls, state):
64
    obj = object.__new__(cls)
65
    obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
66
                state["status"], state["result"], state["log"])
67
    return obj
68

    
69
  @utils.LockedMethod
70
  def Serialize(self):
71
    return {
72
      "input": self.input.__getstate__(),
73
      "status": self.status,
74
      "result": self.result,
75
      "log": self.log,
76
      }
77

    
78
  @utils.LockedMethod
79
  def GetInput(self):
80
    """Returns the original opcode.
81

82
    """
83
    return self.input
84

    
85
  @utils.LockedMethod
86
  def SetStatus(self, status, result):
87
    """Update the opcode status and result.
88

89
    """
90
    self.status = status
91
    self.result = result
92

    
93
  @utils.LockedMethod
94
  def GetStatus(self):
95
    """Get the opcode status.
96

97
    """
98
    return self.status
99

    
100
  @utils.LockedMethod
101
  def GetResult(self):
102
    """Get the opcode result.
103

104
    """
105
    return self.result
106

    
107
  @utils.LockedMethod
108
  def Log(self, *args):
109
    """Append a log entry.
110

111
    """
112
    assert len(args) < 2
113

    
114
    if len(args) == 1:
115
      log_type = constants.ELOG_MESSAGE
116
      log_msg = args[0]
117
    else:
118
      log_type, log_msg = args
119
    self.log.append((time.time(), log_type, log_msg))
120

    
121
  @utils.LockedMethod
122
  def RetrieveLog(self, start_at=0):
123
    """Retrieve (a part of) the execution log.
124

125
    """
126
    return self.log[start_at:]
127

    
128

    
129
class _QueuedJob(object):
130
  """In-memory job representation.
131

132
  This is what we use to track the user-submitted jobs.
133

134
  """
135
  def __init__(self, storage, job_id, ops):
136
    if not ops:
137
      # TODO
138
      raise Exception("No opcodes")
139

    
140
    self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1)
141

    
142
  def __Setup(self, storage, job_id, ops, run_op_index):
143
    self._lock = threading.Lock()
144
    self.storage = storage
145
    self.id = job_id
146
    self._ops = ops
147
    self.run_op_index = run_op_index
148

    
149
  @classmethod
150
  def Restore(cls, storage, state):
151
    obj = object.__new__(cls)
152
    op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
153
    obj.__Setup(storage, state["id"], op_list, state["run_op_index"])
154
    return obj
155

    
156
  def Serialize(self):
157
    return {
158
      "id": self.id,
159
      "ops": [op.Serialize() for op in self._ops],
160
      "run_op_index": self.run_op_index,
161
      }
162

    
163
  def SetUnclean(self, msg):
164
    try:
165
      for op in self._ops:
166
        op.SetStatus(constants.OP_STATUS_ERROR, msg)
167
    finally:
168
      self.storage.UpdateJob(self)
169

    
170
  def GetStatus(self):
171
    status = constants.JOB_STATUS_QUEUED
172

    
173
    all_success = True
174
    for op in self._ops:
175
      op_status = op.GetStatus()
176
      if op_status == constants.OP_STATUS_SUCCESS:
177
        continue
178

    
179
      all_success = False
180

    
181
      if op_status == constants.OP_STATUS_QUEUED:
182
        pass
183
      elif op_status == constants.OP_STATUS_RUNNING:
184
        status = constants.JOB_STATUS_RUNNING
185
      elif op_status == constants.OP_STATUS_ERROR:
186
        status = constants.JOB_STATUS_ERROR
187
        # The whole job fails if one opcode failed
188
        break
189

    
190
    if all_success:
191
      status = constants.JOB_STATUS_SUCCESS
192

    
193
    return status
194

    
195
  @utils.LockedMethod
196
  def GetRunOpIndex(self):
197
    return self.run_op_index
198

    
199
  def Run(self, proc):
200
    """Job executor.
201

202
    This functions processes a this job in the context of given processor
203
    instance.
204

205
    Args:
206
    - proc: Ganeti Processor to run the job with
207

208
    """
209
    try:
210
      count = len(self._ops)
211
      for idx, op in enumerate(self._ops):
212
        try:
213
          logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
214

    
215
          self._lock.acquire()
216
          try:
217
            self.run_op_index = idx
218
          finally:
219
            self._lock.release()
220

    
221
          op.SetStatus(constants.OP_STATUS_RUNNING, None)
222
          self.storage.UpdateJob(self)
223

    
224
          result = proc.ExecOpCode(op.input, op.Log)
225

    
226
          op.SetStatus(constants.OP_STATUS_SUCCESS, result)
227
          self.storage.UpdateJob(self)
228
          logging.debug("Op %s/%s: Successfully finished %s",
229
                        idx + 1, count, op)
230
        except Exception, err:
231
          try:
232
            op.SetStatus(constants.OP_STATUS_ERROR, str(err))
233
            logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
234
          finally:
235
            self.storage.UpdateJob(self)
236
          raise
237

    
238
    except errors.GenericError, err:
239
      logging.error("ganeti exception %s", exc_info=err)
240
    except Exception, err:
241
      logging.error("unhandled exception %s", exc_info=err)
242
    except:
243
      logging.error("unhandled unknown exception %s", exc_info=err)
244

    
245

    
246
class _JobQueueWorker(workerpool.BaseWorker):
247
  def RunTask(self, job):
248
    logging.debug("Worker %s processing job %s",
249
                  self.worker_id, job.id)
250
    # TODO: feedback function
251
    proc = mcpu.Processor(self.pool.context)
252
    try:
253
      job.Run(proc)
254
    finally:
255
      logging.debug("Worker %s finished job %s, status = %s",
256
                    self.worker_id, job.id, job.GetStatus())
257

    
258

    
259
class _JobQueueWorkerPool(workerpool.WorkerPool):
260
  def __init__(self, context):
261
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
262
                                              _JobQueueWorker)
263
    self.context = context
264

    
265

    
266
class JobStorage(object):
267
  _RE_JOB_FILE = re.compile(r"^job-(\d+)$")
268

    
269
  def __init__(self):
270
    self._lock = threading.Lock()
271
    self._memcache = {}
272

    
273
    # Make sure our directory exists
274
    try:
275
      os.mkdir(constants.QUEUE_DIR, 0700)
276
    except OSError, err:
277
      if err.errno not in (errno.EEXIST, ):
278
        raise
279

    
280
    # Get queue lock
281
    self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
282
    try:
283
      utils.LockFile(self.lock_fd)
284
    except:
285
      self.lock_fd.close()
286
      raise
287

    
288
    # Read version
289
    try:
290
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
291
    except IOError, err:
292
      if err.errno not in (errno.ENOENT, ):
293
        raise
294

    
295
      # Setup a new queue
296
      self._InitQueueUnlocked()
297

    
298
      # Try to open again
299
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
300

    
301
    try:
302
      # Try to read version
303
      version = int(version_fd.read(128))
304

    
305
      # Verify version
306
      if version != constants.JOB_QUEUE_VERSION:
307
        raise errors.JobQueueError("Found version %s, expected %s",
308
                                   version, constants.JOB_QUEUE_VERSION)
309
    finally:
310
      version_fd.close()
311

    
312
    serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
313
    try:
314
      # Read last serial
315
      self._last_serial = int(serial_fd.read(1024).strip())
316
    finally:
317
      serial_fd.close()
318

    
319
  def Close(self):
320
    assert self.lock_fd, "Queue should be open"
321

    
322
    self.lock_fd.close()
323
    self.lock_fd = None
324

    
325
  def _InitQueueUnlocked(self):
326
    assert self.lock_fd, "Queue should be open"
327

    
328
    utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
329
                    data="%s\n" % constants.JOB_QUEUE_VERSION)
330
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
331
                    data="%s\n" % 0)
332

    
333
  def _NewSerialUnlocked(self):
334
    """Generates a new job identifier.
335

336
    Job identifiers are unique during the lifetime of a cluster.
337

338
    Returns: A string representing the job identifier.
339

340
    """
341
    assert self.lock_fd, "Queue should be open"
342

    
343
    # New number
344
    serial = self._last_serial + 1
345

    
346
    # Write to file
347
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
348
                    data="%s\n" % serial)
349

    
350
    # Keep it only if we were able to write the file
351
    self._last_serial = serial
352

    
353
    return serial
354

    
355
  def _GetJobPath(self, job_id):
356
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
357

    
358
  def _GetJobIDsUnlocked(self, archived=False):
359
    """Return all known job IDs.
360

361
    If the parameter archived is True, archived jobs IDs will be
362
    included. Currently this argument is unused.
363

364
    The method only looks at disk because it's a requirement that all
365
    jobs are present on disk (so in the _memcache we don't have any
366
    extra IDs).
367

368
    """
369
    jfiles = self._ListJobFiles()
370
    return [int(m.group(1)) for m in
371
            [self._RE_JOB_FILE.match(name) for name in jfiles]]
372

    
373
  def _ListJobFiles(self):
374
    assert self.lock_fd, "Queue should be open"
375

    
376
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
377
            if self._RE_JOB_FILE.match(name)]
378

    
379
  def _LoadJobUnlocked(self, job_id):
380
    assert self.lock_fd, "Queue should be open"
381

    
382
    if job_id in self._memcache:
383
      logging.debug("Found job %d in memcache", job_id)
384
      return self._memcache[job_id]
385

    
386
    filepath = self._GetJobPath(job_id)
387
    logging.debug("Loading job from %s", filepath)
388
    try:
389
      fd = open(filepath, "r")
390
    except IOError, err:
391
      if err.errno in (errno.ENOENT, ):
392
        return None
393
      raise
394
    try:
395
      data = serializer.LoadJson(fd.read())
396
    finally:
397
      fd.close()
398

    
399
    job = _QueuedJob.Restore(self, data)
400
    self._memcache[job_id] = job
401
    logging.debug("Added job %d to the cache", job_id)
402
    return job
403

    
404
  def _GetJobsUnlocked(self, job_ids):
405
    if not job_ids:
406
      job_ids = self._GetJobIDsUnlocked()
407

    
408
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
409

    
410
  @utils.LockedMethod
411
  def GetJobs(self, job_ids):
412
    return self._GetJobsUnlocked(job_ids)
413

    
414
  @utils.LockedMethod
415
  def AddJob(self, ops):
416
    assert self.lock_fd, "Queue should be open"
417

    
418
    # Get job identifier
419
    job_id = self._NewSerialUnlocked()
420
    job = _QueuedJob(self, job_id, ops)
421

    
422
    # Write to disk
423
    self._UpdateJobUnlocked(job)
424

    
425
    logging.debug("Added new job %d to the cache", job_id)
426
    self._memcache[job_id] = job
427

    
428
    return job
429

    
430
  def _UpdateJobUnlocked(self, job):
431
    assert self.lock_fd, "Queue should be open"
432

    
433
    filename = self._GetJobPath(job.id)
434
    logging.debug("Writing job %s to %s", job.id, filename)
435
    utils.WriteFile(filename,
436
                    data=serializer.DumpJson(job.Serialize(), indent=False))
437
    self._CleanCacheUnlocked(exceptions=[job.id])
438

    
439
  def _CleanCacheUnlocked(self, exceptions=None):
440
    """Clean the memory cache.
441

442
    The exceptions argument contains job IDs that should not be
443
    cleaned.
444

445
    """
446
    assert isinstance(exceptions, list)
447
    for job in self._memcache.values():
448
      if job.id in exceptions:
449
        continue
450
      if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,
451
                                 constants.JOB_STATUS_RUNNING):
452
        logging.debug("Cleaning job %d from the cache", job.id)
453
        try:
454
          del self._memcache[job.id]
455
        except KeyError:
456
          pass
457

    
458
  @utils.LockedMethod
459
  def UpdateJob(self, job):
460
    return self._UpdateJobUnlocked(job)
461

    
462
  def ArchiveJob(self, job_id):
463
    raise NotImplementedError()
464

    
465

    
466
class JobQueue:
467
  """The job queue.
468

469
   """
470
  def __init__(self, context):
471
    self._lock = threading.Lock()
472
    self._jobs = JobStorage()
473
    self._wpool = _JobQueueWorkerPool(context)
474

    
475
    for job in self._jobs.GetJobs(None):
476
      status = job.GetStatus()
477
      if status in (constants.JOB_STATUS_QUEUED, ):
478
        self._wpool.AddTask(job)
479

    
480
      elif status in (constants.JOB_STATUS_RUNNING, ):
481
        logging.warning("Unfinished job %s found: %s", job.id, job)
482
        job.SetUnclean("Unclean master daemon shutdown")
483

    
484
  @utils.LockedMethod
485
  def SubmitJob(self, ops):
486
    """Add a new job to the queue.
487

488
    This enters the job into our job queue and also puts it on the new
489
    queue, in order for it to be picked up by the queue processors.
490

491
    Args:
492
    - ops: Sequence of opcodes
493

494
    """
495
    job = self._jobs.AddJob(ops)
496

    
497
    # Add to worker pool
498
    self._wpool.AddTask(job)
499

    
500
    return job.id
501

    
502
  def ArchiveJob(self, job_id):
503
    raise NotImplementedError()
504

    
505
  def CancelJob(self, job_id):
506
    raise NotImplementedError()
507

    
508
  def _GetJobInfo(self, job, fields):
509
    row = []
510
    for fname in fields:
511
      if fname == "id":
512
        row.append(job.id)
513
      elif fname == "status":
514
        row.append(job.GetStatus())
515
      elif fname == "ops":
516
        row.append([op.GetInput().__getstate__() for op in job._ops])
517
      elif fname == "opresult":
518
        row.append([op.GetResult() for op in job._ops])
519
      elif fname == "opstatus":
520
        row.append([op.GetStatus() for op in job._ops])
521
      elif fname == "ticker":
522
        ji = job.GetRunOpIndex()
523
        if ji < 0:
524
          lmsg = None
525
        else:
526
          lmsg = job._ops[ji].RetrieveLog(-1)
527
          # message might be empty here
528
          if lmsg:
529
            lmsg = lmsg[0]
530
          else:
531
            lmsg = None
532
        row.append(lmsg)
533
      else:
534
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
535
    return row
536

    
537
  def QueryJobs(self, job_ids, fields):
538
    """Returns a list of jobs in queue.
539

540
    Args:
541
    - job_ids: Sequence of job identifiers or None for all
542
    - fields: Names of fields to return
543

544
    """
545
    self._lock.acquire()
546
    try:
547
      jobs = []
548

    
549
      for job in self._jobs.GetJobs(job_ids):
550
        if job is None:
551
          jobs.append(None)
552
        else:
553
          jobs.append(self._GetJobInfo(job, fields))
554

    
555
      return jobs
556
    finally:
557
      self._lock.release()
558

    
559
  @utils.LockedMethod
560
  def Shutdown(self):
561
    """Stops the job queue.
562

563
    """
564
    self._wpool.TerminateWorkers()
565
    self._jobs.Close()