Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 23752136

History | View | Annotate | Download (16.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import os
25
import logging
26
import threading
27
import errno
28
import re
29
import time
30

    
31
from ganeti import constants
32
from ganeti import serializer
33
from ganeti import workerpool
34
from ganeti import opcodes
35
from ganeti import errors
36
from ganeti import mcpu
37
from ganeti import utils
38
from ganeti import jstore
39
from ganeti import rpc
40

    
41

    
42
JOBQUEUE_THREADS = 5
43

    
44

    
45
class _QueuedOpCode(object):
46
  """Encasulates an opcode object.
47

48
  Access is synchronized by the '_lock' attribute.
49

50
  The 'log' attribute holds the execution log and consists of tuples
51
  of the form (timestamp, level, message).
52

53
  """
54
  def __new__(cls, *args, **kwargs):
55
    obj = object.__new__(cls, *args, **kwargs)
56
    # Create a special lock for logging
57
    obj._log_lock = threading.Lock()
58
    return obj
59

    
60
  def __init__(self, op):
61
    self.input = op
62
    self.status = constants.OP_STATUS_QUEUED
63
    self.result = None
64
    self.log = []
65

    
66
  @classmethod
67
  def Restore(cls, state):
68
    obj = _QueuedOpCode.__new__(cls)
69
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
70
    obj.status = state["status"]
71
    obj.result = state["result"]
72
    obj.log = state["log"]
73
    return obj
74

    
75
  def Serialize(self):
76
    self._log_lock.acquire()
77
    try:
78
      return {
79
        "input": self.input.__getstate__(),
80
        "status": self.status,
81
        "result": self.result,
82
        "log": self.log,
83
        }
84
    finally:
85
      self._log_lock.release()
86

    
87
  def Log(self, *args):
88
    """Append a log entry.
89

90
    """
91
    assert len(args) < 3
92

    
93
    if len(args) == 1:
94
      log_type = constants.ELOG_MESSAGE
95
      log_msg = args[0]
96
    else:
97
      log_type, log_msg = args
98

    
99
    self._log_lock.acquire()
100
    try:
101
      self.log.append((time.time(), log_type, log_msg))
102
    finally:
103
      self._log_lock.release()
104

    
105
  def RetrieveLog(self, start_at=0):
106
    """Retrieve (a part of) the execution log.
107

108
    """
109
    self._log_lock.acquire()
110
    try:
111
      return self.log[start_at:]
112
    finally:
113
      self._log_lock.release()
114

    
115

    
116
class _QueuedJob(object):
117
  """In-memory job representation.
118

119
  This is what we use to track the user-submitted jobs.
120

121
  """
122
  def __init__(self, queue, job_id, ops):
123
    if not ops:
124
      # TODO
125
      raise Exception("No opcodes")
126

    
127
    self.queue = queue
128
    self.id = job_id
129
    self.ops = [_QueuedOpCode(op) for op in ops]
130
    self.run_op_index = -1
131

    
132
  @classmethod
133
  def Restore(cls, queue, state):
134
    obj = _QueuedJob.__new__(cls)
135
    obj.queue = queue
136
    obj.id = state["id"]
137
    obj.ops = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
138
    obj.run_op_index = state["run_op_index"]
139
    return obj
140

    
141
  def Serialize(self):
142
    return {
143
      "id": self.id,
144
      "ops": [op.Serialize() for op in self.ops],
145
      "run_op_index": self.run_op_index,
146
      }
147

    
148
  def CalcStatus(self):
149
    status = constants.JOB_STATUS_QUEUED
150

    
151
    all_success = True
152
    for op in self.ops:
153
      if op.status == constants.OP_STATUS_SUCCESS:
154
        continue
155

    
156
      all_success = False
157

    
158
      if op.status == constants.OP_STATUS_QUEUED:
159
        pass
160
      elif op.status == constants.OP_STATUS_RUNNING:
161
        status = constants.JOB_STATUS_RUNNING
162
      elif op.status == constants.OP_STATUS_ERROR:
163
        status = constants.JOB_STATUS_ERROR
164
        # The whole job fails if one opcode failed
165
        break
166
      elif op.status == constants.OP_STATUS_CANCELED:
167
        status = constants.OP_STATUS_CANCELED
168
        break
169

    
170
    if all_success:
171
      status = constants.JOB_STATUS_SUCCESS
172

    
173
    return status
174

    
175

    
176
class _JobQueueWorker(workerpool.BaseWorker):
177
  def RunTask(self, job):
178
    """Job executor.
179

180
    This functions processes a job.
181

182
    """
183
    logging.debug("Worker %s processing job %s",
184
                  self.worker_id, job.id)
185
    proc = mcpu.Processor(self.pool.queue.context)
186
    queue = job.queue
187
    try:
188
      try:
189
        count = len(job.ops)
190
        for idx, op in enumerate(job.ops):
191
          try:
192
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
193

    
194
            queue.acquire()
195
            try:
196
              job.run_op_index = idx
197
              op.status = constants.OP_STATUS_RUNNING
198
              op.result = None
199
              queue.UpdateJobUnlocked(job)
200

    
201
              input_opcode = op.input
202
            finally:
203
              queue.release()
204

    
205
            result = proc.ExecOpCode(input_opcode, op.Log)
206

    
207
            queue.acquire()
208
            try:
209
              op.status = constants.OP_STATUS_SUCCESS
210
              op.result = result
211
              queue.UpdateJobUnlocked(job)
212
            finally:
213
              queue.release()
214

    
215
            logging.debug("Op %s/%s: Successfully finished %s",
216
                          idx + 1, count, op)
217
          except Exception, err:
218
            queue.acquire()
219
            try:
220
              try:
221
                op.status = constants.OP_STATUS_ERROR
222
                op.result = str(err)
223
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
224
              finally:
225
                queue.UpdateJobUnlocked(job)
226
            finally:
227
              queue.release()
228
            raise
229

    
230
      except errors.GenericError, err:
231
        logging.exception("Ganeti exception")
232
      except:
233
        logging.exception("Unhandled exception")
234
    finally:
235
      queue.acquire()
236
      try:
237
        job_id = job.id
238
        status = job.CalcStatus()
239
      finally:
240
        queue.release()
241
      logging.debug("Worker %s finished job %s, status = %s",
242
                    self.worker_id, job_id, status)
243

    
244

    
245
class _JobQueueWorkerPool(workerpool.WorkerPool):
246
  def __init__(self, queue):
247
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
248
                                              _JobQueueWorker)
249
    self.queue = queue
250

    
251

    
252
class JobQueue(object):
253
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
254

    
255
  def _RequireOpenQueue(fn):
256
    """Decorator for "public" functions.
257

258
    This function should be used for all "public" functions. That is, functions
259
    usually called from other classes.
260

261
    Important: Use this decorator only after utils.LockedMethod!
262

263
    Example:
264
      @utils.LockedMethod
265
      @_RequireOpenQueue
266
      def Example(self):
267
        pass
268

269
    """
270
    def wrapper(self, *args, **kwargs):
271
      assert self._queue_lock is not None, "Queue should be open"
272
      return fn(self, *args, **kwargs)
273
    return wrapper
274

    
275
  def __init__(self, context):
276
    self.context = context
277
    self._memcache = {}
278
    self._my_hostname = utils.HostInfo().name
279

    
280
    # Locking
281
    self._lock = threading.Lock()
282
    self.acquire = self._lock.acquire
283
    self.release = self._lock.release
284

    
285
    # Initialize
286
    self._queue_lock = jstore.InitAndVerifyQueue(exclusive=True)
287

    
288
    # Read serial file
289
    self._last_serial = jstore.ReadSerial()
290
    assert self._last_serial is not None, ("Serial file was modified between"
291
                                           " check in jstore and here")
292

    
293
    # Get initial list of nodes
294
    self._nodes = self.context.cfg.GetNodeList()
295

    
296
    # TODO: Check consistency across nodes
297

    
298
    # Setup worker pool
299
    self._wpool = _JobQueueWorkerPool(self)
300

    
301
    # We need to lock here because WorkerPool.AddTask() may start a job while
302
    # we're still doing our work.
303
    self.acquire()
304
    try:
305
      for job in self._GetJobsUnlocked(None):
306
        status = job.CalcStatus()
307

    
308
        if status in (constants.JOB_STATUS_QUEUED, ):
309
          self._wpool.AddTask(job)
310

    
311
        elif status in (constants.JOB_STATUS_RUNNING, ):
312
          logging.warning("Unfinished job %s found: %s", job.id, job)
313
          try:
314
            for op in job.ops:
315
              op.status = constants.OP_STATUS_ERROR
316
              op.result = "Unclean master daemon shutdown"
317
          finally:
318
            self.UpdateJobUnlocked(job)
319
    finally:
320
      self.release()
321

    
322
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
323
    """Writes a file locally and then replicates it to all nodes.
324

325
    """
326
    utils.WriteFile(file_name, data=data)
327

    
328
    nodes = self._nodes[:]
329

    
330
    # Remove master node
331
    try:
332
      nodes.remove(self._my_hostname)
333
    except ValueError:
334
      pass
335

    
336
    failed_nodes = 0
337
    result = rpc.call_upload_file(nodes, file_name)
338
    for node in nodes:
339
      if not result[node]:
340
        failed_nodes += 1
341
        logging.error("Copy of job queue file to node %s failed", node)
342

    
343
    # TODO: check failed_nodes
344

    
345
  def _FormatJobID(self, job_id):
346
    if not isinstance(job_id, (int, long)):
347
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
348
    if job_id < 0:
349
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
350

    
351
    return str(job_id)
352

    
353
  def _NewSerialUnlocked(self, nodes):
354
    """Generates a new job identifier.
355

356
    Job identifiers are unique during the lifetime of a cluster.
357

358
    Returns: A string representing the job identifier.
359

360
    """
361
    # New number
362
    serial = self._last_serial + 1
363

    
364
    # Write to file
365
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
366
                                        "%s\n" % serial)
367

    
368
    # Keep it only if we were able to write the file
369
    self._last_serial = serial
370

    
371
    return self._FormatJobID(serial)
372

    
373
  @staticmethod
374
  def _GetJobPath(job_id):
375
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
376

    
377
  @staticmethod
378
  def _GetArchivedJobPath(job_id):
379
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
380

    
381
  @classmethod
382
  def _ExtractJobID(cls, name):
383
    m = cls._RE_JOB_FILE.match(name)
384
    if m:
385
      return m.group(1)
386
    else:
387
      return None
388

    
389
  def _GetJobIDsUnlocked(self, archived=False):
390
    """Return all known job IDs.
391

392
    If the parameter archived is True, archived jobs IDs will be
393
    included. Currently this argument is unused.
394

395
    The method only looks at disk because it's a requirement that all
396
    jobs are present on disk (so in the _memcache we don't have any
397
    extra IDs).
398

399
    """
400
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
401
    jlist.sort()
402
    return jlist
403

    
404
  def _ListJobFiles(self):
405
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
406
            if self._RE_JOB_FILE.match(name)]
407

    
408
  def _LoadJobUnlocked(self, job_id):
409
    if job_id in self._memcache:
410
      logging.debug("Found job %s in memcache", job_id)
411
      return self._memcache[job_id]
412

    
413
    filepath = self._GetJobPath(job_id)
414
    logging.debug("Loading job from %s", filepath)
415
    try:
416
      fd = open(filepath, "r")
417
    except IOError, err:
418
      if err.errno in (errno.ENOENT, ):
419
        return None
420
      raise
421
    try:
422
      data = serializer.LoadJson(fd.read())
423
    finally:
424
      fd.close()
425

    
426
    job = _QueuedJob.Restore(self, data)
427
    self._memcache[job_id] = job
428
    logging.debug("Added job %s to the cache", job_id)
429
    return job
430

    
431
  def _GetJobsUnlocked(self, job_ids):
432
    if not job_ids:
433
      job_ids = self._GetJobIDsUnlocked()
434

    
435
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
436

    
437
  @utils.LockedMethod
438
  @_RequireOpenQueue
439
  def SubmitJob(self, ops, nodes):
440
    """Create and store a new job.
441

442
    This enters the job into our job queue and also puts it on the new
443
    queue, in order for it to be picked up by the queue processors.
444

445
    @type ops: list
446
    @param ops: The list of OpCodes that will become the new job.
447
    @type nodes: list
448
    @param nodes: The list of nodes to which the new job serial will be
449
                  distributed.
450

451
    """
452
    # Get job identifier
453
    job_id = self._NewSerialUnlocked(nodes)
454
    job = _QueuedJob(self, job_id, ops)
455

    
456
    # Write to disk
457
    self.UpdateJobUnlocked(job)
458

    
459
    logging.debug("Added new job %s to the cache", job_id)
460
    self._memcache[job_id] = job
461

    
462
    # Add to worker pool
463
    self._wpool.AddTask(job)
464

    
465
    return job.id
466

    
467
  @_RequireOpenQueue
468
  def UpdateJobUnlocked(self, job):
469
    filename = self._GetJobPath(job.id)
470
    data = serializer.DumpJson(job.Serialize(), indent=False)
471
    logging.debug("Writing job %s to %s", job.id, filename)
472
    self._WriteAndReplicateFileUnlocked(filename, data)
473
    self._CleanCacheUnlocked([job.id])
474

    
475
  def _CleanCacheUnlocked(self, exclude):
476
    """Clean the memory cache.
477

478
    The exceptions argument contains job IDs that should not be
479
    cleaned.
480

481
    """
482
    assert isinstance(exclude, list)
483

    
484
    for job in self._memcache.values():
485
      if job.id in exclude:
486
        continue
487
      if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
488
                                  constants.JOB_STATUS_RUNNING):
489
        logging.debug("Cleaning job %s from the cache", job.id)
490
        try:
491
          del self._memcache[job.id]
492
        except KeyError:
493
          pass
494

    
495
  @utils.LockedMethod
496
  @_RequireOpenQueue
497
  def CancelJob(self, job_id):
498
    """Cancels a job.
499

500
    @type job_id: string
501
    @param job_id: Job ID of job to be cancelled.
502

503
    """
504
    logging.debug("Cancelling job %s", job_id)
505

    
506
    job = self._LoadJobUnlocked(job_id)
507
    if not job:
508
      logging.debug("Job %s not found", job_id)
509
      return
510

    
511
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
512
      logging.debug("Job %s is no longer in the queue", job.id)
513
      return
514

    
515
    try:
516
      for op in job.ops:
517
        op.status = constants.OP_STATUS_ERROR
518
        op.result = "Job cancelled by request"
519
    finally:
520
      self.UpdateJobUnlocked(job)
521

    
522
  @utils.LockedMethod
523
  @_RequireOpenQueue
524
  def ArchiveJob(self, job_id):
525
    """Archives a job.
526

527
    @type job_id: string
528
    @param job_id: Job ID of job to be archived.
529

530
    """
531
    logging.debug("Archiving job %s", job_id)
532

    
533
    job = self._LoadJobUnlocked(job_id)
534
    if not job:
535
      logging.debug("Job %s not found", job_id)
536
      return
537

    
538
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
539
                                constants.JOB_STATUS_SUCCESS,
540
                                constants.JOB_STATUS_ERROR):
541
      logging.debug("Job %s is not yet done", job.id)
542
      return
543

    
544
    try:
545
      old = self._GetJobPath(job.id)
546
      new = self._GetArchivedJobPath(job.id)
547

    
548
      os.rename(old, new)
549

    
550
      logging.debug("Successfully archived job %s", job.id)
551
    finally:
552
      # Cleaning the cache because we don't know what os.rename actually did
553
      # and to be on the safe side.
554
      self._CleanCacheUnlocked([])
555

    
556
  def _GetJobInfoUnlocked(self, job, fields):
557
    row = []
558
    for fname in fields:
559
      if fname == "id":
560
        row.append(job.id)
561
      elif fname == "status":
562
        row.append(job.CalcStatus())
563
      elif fname == "ops":
564
        row.append([op.input.__getstate__() for op in job.ops])
565
      elif fname == "opresult":
566
        row.append([op.result for op in job.ops])
567
      elif fname == "opstatus":
568
        row.append([op.status for op in job.ops])
569
      elif fname == "ticker":
570
        ji = job.run_op_index
571
        if ji < 0:
572
          lmsg = None
573
        else:
574
          lmsg = job.ops[ji].RetrieveLog(-1)
575
          # message might be empty here
576
          if lmsg:
577
            lmsg = lmsg[0]
578
          else:
579
            lmsg = None
580
        row.append(lmsg)
581
      else:
582
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
583
    return row
584

    
585
  @utils.LockedMethod
586
  @_RequireOpenQueue
587
  def QueryJobs(self, job_ids, fields):
588
    """Returns a list of jobs in queue.
589

590
    Args:
591
    - job_ids: Sequence of job identifiers or None for all
592
    - fields: Names of fields to return
593

594
    """
595
    jobs = []
596

    
597
    for job in self._GetJobsUnlocked(job_ids):
598
      if job is None:
599
        jobs.append(None)
600
      else:
601
        jobs.append(self._GetJobInfoUnlocked(job, fields))
602

    
603
    return jobs
604

    
605
  @utils.LockedMethod
606
  @_RequireOpenQueue
607
  def Shutdown(self):
608
    """Stops the job queue.
609

610
    """
611
    self._wpool.TerminateWorkers()
612

    
613
    self._queue_lock.Close()
614
    self._queue_lock = None