Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 271daef8

History | View | Annotate | Download (46 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import errno
35
import re
36
import time
37
import weakref
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56

    
57

    
58
JOBQUEUE_THREADS = 25
59
JOBS_PER_ARCHIVE_DIRECTORY = 10000
60

    
61
# member lock names to be passed to @ssynchronized decorator
62
_LOCK = "_lock"
63
_QUEUE = "_queue"
64

    
65

    
66
class CancelJob(Exception):
67
  """Special exception to cancel a job.
68

69
  """
70

    
71

    
72
def TimeStampNow():
73
  """Returns the current timestamp.
74

75
  @rtype: tuple
76
  @return: the current time in the (seconds, microseconds) format
77

78
  """
79
  return utils.SplitTime(time.time())
80

    
81

    
82
class _QueuedOpCode(object):
83
  """Encapsulates an opcode object.
84

85
  @ivar log: holds the execution log and consists of tuples
86
  of the form C{(log_serial, timestamp, level, message)}
87
  @ivar input: the OpCode we encapsulate
88
  @ivar status: the current status
89
  @ivar result: the result of the LU execution
90
  @ivar start_timestamp: timestamp for the start of the execution
91
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
92
  @ivar stop_timestamp: timestamp for the end of the execution
93

94
  """
95
  __slots__ = ["input", "status", "result", "log",
96
               "start_timestamp", "exec_timestamp", "end_timestamp",
97
               "__weakref__"]
98

    
99
  def __init__(self, op):
100
    """Constructor for the _QuededOpCode.
101

102
    @type op: L{opcodes.OpCode}
103
    @param op: the opcode we encapsulate
104

105
    """
106
    self.input = op
107
    self.status = constants.OP_STATUS_QUEUED
108
    self.result = None
109
    self.log = []
110
    self.start_timestamp = None
111
    self.exec_timestamp = None
112
    self.end_timestamp = None
113

    
114
  @classmethod
115
  def Restore(cls, state):
116
    """Restore the _QueuedOpCode from the serialized form.
117

118
    @type state: dict
119
    @param state: the serialized state
120
    @rtype: _QueuedOpCode
121
    @return: a new _QueuedOpCode instance
122

123
    """
124
    obj = _QueuedOpCode.__new__(cls)
125
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
126
    obj.status = state["status"]
127
    obj.result = state["result"]
128
    obj.log = state["log"]
129
    obj.start_timestamp = state.get("start_timestamp", None)
130
    obj.exec_timestamp = state.get("exec_timestamp", None)
131
    obj.end_timestamp = state.get("end_timestamp", None)
132
    return obj
133

    
134
  def Serialize(self):
135
    """Serializes this _QueuedOpCode.
136

137
    @rtype: dict
138
    @return: the dictionary holding the serialized state
139

140
    """
141
    return {
142
      "input": self.input.__getstate__(),
143
      "status": self.status,
144
      "result": self.result,
145
      "log": self.log,
146
      "start_timestamp": self.start_timestamp,
147
      "exec_timestamp": self.exec_timestamp,
148
      "end_timestamp": self.end_timestamp,
149
      }
150

    
151

    
152
class _QueuedJob(object):
153
  """In-memory job representation.
154

155
  This is what we use to track the user-submitted jobs. Locking must
156
  be taken care of by users of this class.
157

158
  @type queue: L{JobQueue}
159
  @ivar queue: the parent queue
160
  @ivar id: the job ID
161
  @type ops: list
162
  @ivar ops: the list of _QueuedOpCode that constitute the job
163
  @type log_serial: int
164
  @ivar log_serial: holds the index for the next log entry
165
  @ivar received_timestamp: the timestamp for when the job was received
166
  @ivar start_timestmap: the timestamp for start of execution
167
  @ivar end_timestamp: the timestamp for end of execution
168
  @ivar lock_status: In-memory locking information for debugging
169

170
  """
171
  # pylint: disable-msg=W0212
172
  __slots__ = ["queue", "id", "ops", "log_serial",
173
               "received_timestamp", "start_timestamp", "end_timestamp",
174
               "lock_status", "change",
175
               "__weakref__"]
176

    
177
  def __init__(self, queue, job_id, ops):
178
    """Constructor for the _QueuedJob.
179

180
    @type queue: L{JobQueue}
181
    @param queue: our parent queue
182
    @type job_id: job_id
183
    @param job_id: our job id
184
    @type ops: list
185
    @param ops: the list of opcodes we hold, which will be encapsulated
186
        in _QueuedOpCodes
187

188
    """
189
    if not ops:
190
      raise errors.GenericError("A job needs at least one opcode")
191

    
192
    self.queue = queue
193
    self.id = job_id
194
    self.ops = [_QueuedOpCode(op) for op in ops]
195
    self.log_serial = 0
196
    self.received_timestamp = TimeStampNow()
197
    self.start_timestamp = None
198
    self.end_timestamp = None
199

    
200
    # In-memory attributes
201
    self.lock_status = None
202

    
203
  def __repr__(self):
204
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
205
              "id=%s" % self.id,
206
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
207

    
208
    return "<%s at %#x>" % (" ".join(status), id(self))
209

    
210
  @classmethod
211
  def Restore(cls, queue, state):
212
    """Restore a _QueuedJob from serialized state:
213

214
    @type queue: L{JobQueue}
215
    @param queue: to which queue the restored job belongs
216
    @type state: dict
217
    @param state: the serialized state
218
    @rtype: _JobQueue
219
    @return: the restored _JobQueue instance
220

221
    """
222
    obj = _QueuedJob.__new__(cls)
223
    obj.queue = queue
224
    obj.id = state["id"]
225
    obj.received_timestamp = state.get("received_timestamp", None)
226
    obj.start_timestamp = state.get("start_timestamp", None)
227
    obj.end_timestamp = state.get("end_timestamp", None)
228

    
229
    # In-memory attributes
230
    obj.lock_status = None
231

    
232
    obj.ops = []
233
    obj.log_serial = 0
234
    for op_state in state["ops"]:
235
      op = _QueuedOpCode.Restore(op_state)
236
      for log_entry in op.log:
237
        obj.log_serial = max(obj.log_serial, log_entry[0])
238
      obj.ops.append(op)
239

    
240
    return obj
241

    
242
  def Serialize(self):
243
    """Serialize the _JobQueue instance.
244

245
    @rtype: dict
246
    @return: the serialized state
247

248
    """
249
    return {
250
      "id": self.id,
251
      "ops": [op.Serialize() for op in self.ops],
252
      "start_timestamp": self.start_timestamp,
253
      "end_timestamp": self.end_timestamp,
254
      "received_timestamp": self.received_timestamp,
255
      }
256

    
257
  def CalcStatus(self):
258
    """Compute the status of this job.
259

260
    This function iterates over all the _QueuedOpCodes in the job and
261
    based on their status, computes the job status.
262

263
    The algorithm is:
264
      - if we find a cancelled, or finished with error, the job
265
        status will be the same
266
      - otherwise, the last opcode with the status one of:
267
          - waitlock
268
          - canceling
269
          - running
270

271
        will determine the job status
272

273
      - otherwise, it means either all opcodes are queued, or success,
274
        and the job status will be the same
275

276
    @return: the job status
277

278
    """
279
    status = constants.JOB_STATUS_QUEUED
280

    
281
    all_success = True
282
    for op in self.ops:
283
      if op.status == constants.OP_STATUS_SUCCESS:
284
        continue
285

    
286
      all_success = False
287

    
288
      if op.status == constants.OP_STATUS_QUEUED:
289
        pass
290
      elif op.status == constants.OP_STATUS_WAITLOCK:
291
        status = constants.JOB_STATUS_WAITLOCK
292
      elif op.status == constants.OP_STATUS_RUNNING:
293
        status = constants.JOB_STATUS_RUNNING
294
      elif op.status == constants.OP_STATUS_CANCELING:
295
        status = constants.JOB_STATUS_CANCELING
296
        break
297
      elif op.status == constants.OP_STATUS_ERROR:
298
        status = constants.JOB_STATUS_ERROR
299
        # The whole job fails if one opcode failed
300
        break
301
      elif op.status == constants.OP_STATUS_CANCELED:
302
        status = constants.OP_STATUS_CANCELED
303
        break
304

    
305
    if all_success:
306
      status = constants.JOB_STATUS_SUCCESS
307

    
308
    return status
309

    
310
  def GetLogEntries(self, newer_than):
311
    """Selectively returns the log entries.
312

313
    @type newer_than: None or int
314
    @param newer_than: if this is None, return all log entries,
315
        otherwise return only the log entries with serial higher
316
        than this value
317
    @rtype: list
318
    @return: the list of the log entries selected
319

320
    """
321
    if newer_than is None:
322
      serial = -1
323
    else:
324
      serial = newer_than
325

    
326
    entries = []
327
    for op in self.ops:
328
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
329

    
330
    return entries
331

    
332
  def GetInfo(self, fields):
333
    """Returns information about a job.
334

335
    @type fields: list
336
    @param fields: names of fields to return
337
    @rtype: list
338
    @return: list with one element for each field
339
    @raise errors.OpExecError: when an invalid field
340
        has been passed
341

342
    """
343
    row = []
344
    for fname in fields:
345
      if fname == "id":
346
        row.append(self.id)
347
      elif fname == "status":
348
        row.append(self.CalcStatus())
349
      elif fname == "ops":
350
        row.append([op.input.__getstate__() for op in self.ops])
351
      elif fname == "opresult":
352
        row.append([op.result for op in self.ops])
353
      elif fname == "opstatus":
354
        row.append([op.status for op in self.ops])
355
      elif fname == "oplog":
356
        row.append([op.log for op in self.ops])
357
      elif fname == "opstart":
358
        row.append([op.start_timestamp for op in self.ops])
359
      elif fname == "opexec":
360
        row.append([op.exec_timestamp for op in self.ops])
361
      elif fname == "opend":
362
        row.append([op.end_timestamp for op in self.ops])
363
      elif fname == "received_ts":
364
        row.append(self.received_timestamp)
365
      elif fname == "start_ts":
366
        row.append(self.start_timestamp)
367
      elif fname == "end_ts":
368
        row.append(self.end_timestamp)
369
      elif fname == "lock_status":
370
        row.append(self.lock_status)
371
      elif fname == "summary":
372
        row.append([op.input.Summary() for op in self.ops])
373
      else:
374
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
375
    return row
376

    
377
  def MarkUnfinishedOps(self, status, result):
378
    """Mark unfinished opcodes with a given status and result.
379

380
    This is an utility function for marking all running or waiting to
381
    be run opcodes with a given status. Opcodes which are already
382
    finalised are not changed.
383

384
    @param status: a given opcode status
385
    @param result: the opcode result
386

387
    """
388
    try:
389
      not_marked = True
390
      for op in self.ops:
391
        if op.status in constants.OPS_FINALIZED:
392
          assert not_marked, "Finalized opcodes found after non-finalized ones"
393
          continue
394
        op.status = status
395
        op.result = result
396
        not_marked = False
397
    finally:
398
      self.queue.UpdateJobUnlocked(self)
399

    
400

    
401
class _OpExecCallbacks(mcpu.OpExecCbBase):
402
  def __init__(self, queue, job, op):
403
    """Initializes this class.
404

405
    @type queue: L{JobQueue}
406
    @param queue: Job queue
407
    @type job: L{_QueuedJob}
408
    @param job: Job object
409
    @type op: L{_QueuedOpCode}
410
    @param op: OpCode
411

412
    """
413
    assert queue, "Queue is missing"
414
    assert job, "Job is missing"
415
    assert op, "Opcode is missing"
416

    
417
    self._queue = queue
418
    self._job = job
419
    self._op = op
420

    
421
  @locking.ssynchronized(_QUEUE, shared=1)
422
  def NotifyStart(self):
423
    """Mark the opcode as running, not lock-waiting.
424

425
    This is called from the mcpu code as a notifier function, when the LU is
426
    finally about to start the Exec() method. Of course, to have end-user
427
    visible results, the opcode must be initially (before calling into
428
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
429

430
    """
431
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
432
                               constants.OP_STATUS_CANCELING)
433

    
434
    # All locks are acquired by now
435
    self._job.lock_status = None
436

    
437
    # Cancel here if we were asked to
438
    if self._op.status == constants.OP_STATUS_CANCELING:
439
      raise CancelJob()
440

    
441
    self._op.status = constants.OP_STATUS_RUNNING
442
    self._op.exec_timestamp = TimeStampNow()
443

    
444
    # And finally replicate the job status
445
    self._queue.UpdateJobUnlocked(self._job)
446

    
447
  @locking.ssynchronized(_QUEUE, shared=1)
448
  def _AppendFeedback(self, timestamp, log_type, log_msg):
449
    """Internal feedback append function, with locks
450

451
    """
452
    self._job.log_serial += 1
453
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
454
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
455

    
456
  def Feedback(self, *args):
457
    """Append a log entry.
458

459
    """
460
    assert len(args) < 3
461

    
462
    if len(args) == 1:
463
      log_type = constants.ELOG_MESSAGE
464
      log_msg = args[0]
465
    else:
466
      (log_type, log_msg) = args
467

    
468
    # The time is split to make serialization easier and not lose
469
    # precision.
470
    timestamp = utils.SplitTime(time.time())
471
    self._AppendFeedback(timestamp, log_type, log_msg)
472

    
473
  def ReportLocks(self, msg):
474
    """Write locking information to the job.
475

476
    Called whenever the LU processor is waiting for a lock or has acquired one.
477

478
    """
479
    # Not getting the queue lock because this is a single assignment
480
    self._job.lock_status = msg
481

    
482

    
483
class _WaitForJobChangesHelper(object):
484
  """Helper class using initofy to wait for changes in a job file.
485

486
  This class takes a previous job status and serial, and alerts the client when
487
  the current job status has changed.
488

489
  @type job_id: string
490
  @ivar job_id: id of the job we're watching
491
  @type prev_job_info: string
492
  @ivar prev_job_info: previous job info, as passed by the luxi client
493
  @type prev_log_serial: string
494
  @ivar prev_log_serial: previous job serial, as passed by the luxi client
495
  @type queue: L{JobQueue}
496
  @ivar queue: job queue (used for a few utility functions)
497
  @type job_path: string
498
  @ivar job_path: absolute path of the job file
499
  @type wm: pyinotify.WatchManager (or None)
500
  @ivar wm: inotify watch manager to watch for changes
501
  @type inotify_handler: L{asyncnotifier.SingleFileEventHandler}
502
  @ivar inotify_handler: single file event handler, used for watching
503
  @type notifier: pyinotify.Notifier
504
  @ivar notifier: inotify single-threaded notifier, used for watching
505

506
  """
507
  def __init__(self, job_id, fields, prev_job_info, prev_log_serial, queue):
508
    self.job_id = job_id
509
    self.fields = fields
510
    self.prev_job_info = prev_job_info
511
    self.prev_log_serial = prev_log_serial
512
    self.queue = queue
513
    # pylint: disable-msg=W0212
514
    self.job_path = self.queue._GetJobPath(self.job_id)
515
    self.wm = None
516
    self.inotify_handler = None
517
    self.notifier = None
518

    
519
  def _SetupInotify(self):
520
    """Create the inotify
521

522
    @raises errors.InotifyError: if the notifier cannot be setup
523

524
    """
525
    if self.wm:
526
      return
527
    self.wm = pyinotify.WatchManager()
528
    self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm,
529
                                                                self.OnInotify,
530
                                                                self.job_path)
531
    self.notifier = pyinotify.Notifier(self.wm, self.inotify_handler)
532
    self.inotify_handler.enable()
533

    
534
  def _LoadDiskStatus(self):
535
    job = self.queue.SafeLoadJobFromDisk(self.job_id)
536
    if not job:
537
      raise errors.JobLost()
538
    self.job_status = job.CalcStatus()
539

    
540
    job_info = job.GetInfo(self.fields)
541
    log_entries = job.GetLogEntries(self.prev_log_serial)
542
    # Serializing and deserializing data can cause type changes (e.g. from
543
    # tuple to list) or precision loss. We're doing it here so that we get
544
    # the same modifications as the data received from the client. Without
545
    # this, the comparison afterwards might fail without the data being
546
    # significantly different.
547
    # TODO: we just deserialized from disk, investigate how to make sure that
548
    # the job info and log entries are compatible to avoid this further step.
549
    self.job_info = serializer.LoadJson(serializer.DumpJson(job_info))
550
    self.log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
551

    
552
  def _CheckForChanges(self):
553
    self._LoadDiskStatus()
554
    # Don't even try to wait if the job is no longer running, there will be
555
    # no changes.
556
    if (self.job_status not in (constants.JOB_STATUS_QUEUED,
557
                                constants.JOB_STATUS_RUNNING,
558
                                constants.JOB_STATUS_WAITLOCK) or
559
        self.prev_job_info != self.job_info or
560
        (self.log_entries and self.prev_log_serial != self.log_entries[0][0])):
561
      logging.debug("Job %s changed", self.job_id)
562
      return (self.job_info, self.log_entries)
563

    
564
    raise utils.RetryAgain()
565

    
566
  def OnInotify(self, notifier_enabled):
567
    if not notifier_enabled:
568
      self.inotify_handler.enable()
569

    
570
  def WaitFn(self, timeout):
571
    self._SetupInotify()
572
    if self.notifier.check_events(timeout*1000):
573
      self.notifier.read_events()
574
    self.notifier.process_events()
575

    
576
  def WaitForChanges(self, timeout):
577
    try:
578
      return utils.Retry(self._CheckForChanges,
579
                         utils.RETRY_REMAINING_TIME,
580
                         timeout,
581
                         wait_fn=self.WaitFn)
582
    except (errors.InotifyError, errors.JobLost):
583
      return None
584
    except utils.RetryTimeout:
585
      return constants.JOB_NOTCHANGED
586

    
587
  def Close(self):
588
    if self.wm:
589
      self.notifier.stop()
590

    
591

    
592
class _JobQueueWorker(workerpool.BaseWorker):
593
  """The actual job workers.
594

595
  """
596
  def RunTask(self, job): # pylint: disable-msg=W0221
597
    """Job executor.
598

599
    This functions processes a job. It is closely tied to the _QueuedJob and
600
    _QueuedOpCode classes.
601

602
    @type job: L{_QueuedJob}
603
    @param job: the job to be processed
604

605
    """
606
    logging.info("Processing job %s", job.id)
607
    proc = mcpu.Processor(self.pool.queue.context, job.id)
608
    queue = job.queue
609
    try:
610
      try:
611
        count = len(job.ops)
612
        for idx, op in enumerate(job.ops):
613
          op_summary = op.input.Summary()
614
          if op.status == constants.OP_STATUS_SUCCESS:
615
            # this is a job that was partially completed before master
616
            # daemon shutdown, so it can be expected that some opcodes
617
            # are already completed successfully (if any did error
618
            # out, then the whole job should have been aborted and not
619
            # resubmitted for processing)
620
            logging.info("Op %s/%s: opcode %s already processed, skipping",
621
                         idx + 1, count, op_summary)
622
            continue
623
          try:
624
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
625
                         op_summary)
626

    
627
            queue.acquire(shared=1)
628
            try:
629
              if op.status == constants.OP_STATUS_CANCELED:
630
                raise CancelJob()
631
              assert op.status == constants.OP_STATUS_QUEUED
632
              op.status = constants.OP_STATUS_WAITLOCK
633
              op.result = None
634
              op.start_timestamp = TimeStampNow()
635
              if idx == 0: # first opcode
636
                job.start_timestamp = op.start_timestamp
637
              queue.UpdateJobUnlocked(job)
638

    
639
              input_opcode = op.input
640
            finally:
641
              queue.release()
642

    
643
            # Make sure not to hold queue lock while calling ExecOpCode
644
            result = proc.ExecOpCode(input_opcode,
645
                                     _OpExecCallbacks(queue, job, op))
646

    
647
            queue.acquire(shared=1)
648
            try:
649
              op.status = constants.OP_STATUS_SUCCESS
650
              op.result = result
651
              op.end_timestamp = TimeStampNow()
652
              queue.UpdateJobUnlocked(job)
653
            finally:
654
              queue.release()
655

    
656
            logging.info("Op %s/%s: Successfully finished opcode %s",
657
                         idx + 1, count, op_summary)
658
          except CancelJob:
659
            # Will be handled further up
660
            raise
661
          except Exception, err:
662
            queue.acquire(shared=1)
663
            try:
664
              try:
665
                op.status = constants.OP_STATUS_ERROR
666
                if isinstance(err, errors.GenericError):
667
                  op.result = errors.EncodeException(err)
668
                else:
669
                  op.result = str(err)
670
                op.end_timestamp = TimeStampNow()
671
                logging.info("Op %s/%s: Error in opcode %s: %s",
672
                             idx + 1, count, op_summary, err)
673
              finally:
674
                queue.UpdateJobUnlocked(job)
675
            finally:
676
              queue.release()
677
            raise
678

    
679
      except CancelJob:
680
        queue.acquire(shared=1)
681
        try:
682
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
683
                                "Job canceled by request")
684
        finally:
685
          queue.release()
686
      except errors.GenericError, err:
687
        logging.exception("Ganeti exception")
688
      except:
689
        logging.exception("Unhandled exception")
690
    finally:
691
      queue.acquire(shared=1)
692
      try:
693
        try:
694
          job.lock_status = None
695
          job.end_timestamp = TimeStampNow()
696
          queue.UpdateJobUnlocked(job)
697
        finally:
698
          job_id = job.id
699
          status = job.CalcStatus()
700
      finally:
701
        queue.release()
702

    
703
      logging.info("Finished job %s, status = %s", job_id, status)
704

    
705

    
706
class _JobQueueWorkerPool(workerpool.WorkerPool):
707
  """Simple class implementing a job-processing workerpool.
708

709
  """
710
  def __init__(self, queue):
711
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
712
                                              JOBQUEUE_THREADS,
713
                                              _JobQueueWorker)
714
    self.queue = queue
715

    
716

    
717
def _RequireOpenQueue(fn):
718
  """Decorator for "public" functions.
719

720
  This function should be used for all 'public' functions. That is,
721
  functions usually called from other classes. Note that this should
722
  be applied only to methods (not plain functions), since it expects
723
  that the decorated function is called with a first argument that has
724
  a '_queue_filelock' argument.
725

726
  @warning: Use this decorator only after locking.ssynchronized
727

728
  Example::
729
    @locking.ssynchronized(_LOCK)
730
    @_RequireOpenQueue
731
    def Example(self):
732
      pass
733

734
  """
735
  def wrapper(self, *args, **kwargs):
736
    # pylint: disable-msg=W0212
737
    assert self._queue_filelock is not None, "Queue should be open"
738
    return fn(self, *args, **kwargs)
739
  return wrapper
740

    
741

    
742
class JobQueue(object):
743
  """Queue used to manage the jobs.
744

745
  @cvar _RE_JOB_FILE: regex matching the valid job file names
746

747
  """
748
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
749

    
750
  def __init__(self, context):
751
    """Constructor for JobQueue.
752

753
    The constructor will initialize the job queue object and then
754
    start loading the current jobs from disk, either for starting them
755
    (if they were queue) or for aborting them (if they were already
756
    running).
757

758
    @type context: GanetiContext
759
    @param context: the context object for access to the configuration
760
        data and other ganeti objects
761

762
    """
763
    self.context = context
764
    self._memcache = weakref.WeakValueDictionary()
765
    self._my_hostname = utils.HostInfo().name
766

    
767
    # The Big JobQueue lock. If a code block or method acquires it in shared
768
    # mode safe it must guarantee concurrency with all the code acquiring it in
769
    # shared mode, including itself. In order not to acquire it at all
770
    # concurrency must be guaranteed with all code acquiring it in shared mode
771
    # and all code acquiring it exclusively.
772
    self._lock = locking.SharedLock()
773

    
774
    self.acquire = self._lock.acquire
775
    self.release = self._lock.release
776

    
777
    # Initialize the queue, and acquire the filelock.
778
    # This ensures no other process is working on the job queue.
779
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
780

    
781
    # Read serial file
782
    self._last_serial = jstore.ReadSerial()
783
    assert self._last_serial is not None, ("Serial file was modified between"
784
                                           " check in jstore and here")
785

    
786
    # Get initial list of nodes
787
    self._nodes = dict((n.name, n.primary_ip)
788
                       for n in self.context.cfg.GetAllNodesInfo().values()
789
                       if n.master_candidate)
790

    
791
    # Remove master node
792
    self._nodes.pop(self._my_hostname, None)
793

    
794
    # TODO: Check consistency across nodes
795

    
796
    self._queue_size = 0
797
    self._UpdateQueueSizeUnlocked()
798
    self._drained = self._IsQueueMarkedDrain()
799

    
800
    # Setup worker pool
801
    self._wpool = _JobQueueWorkerPool(self)
802
    try:
803
      # We need to lock here because WorkerPool.AddTask() may start a job while
804
      # we're still doing our work.
805
      self.acquire()
806
      try:
807
        logging.info("Inspecting job queue")
808

    
809
        all_job_ids = self._GetJobIDsUnlocked()
810
        jobs_count = len(all_job_ids)
811
        lastinfo = time.time()
812
        for idx, job_id in enumerate(all_job_ids):
813
          # Give an update every 1000 jobs or 10 seconds
814
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
815
              idx == (jobs_count - 1)):
816
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
817
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
818
            lastinfo = time.time()
819

    
820
          job = self._LoadJobUnlocked(job_id)
821

    
822
          # a failure in loading the job can cause 'None' to be returned
823
          if job is None:
824
            continue
825

    
826
          status = job.CalcStatus()
827

    
828
          if status in (constants.JOB_STATUS_QUEUED, ):
829
            self._wpool.AddTask(job)
830

    
831
          elif status in (constants.JOB_STATUS_RUNNING,
832
                          constants.JOB_STATUS_WAITLOCK,
833
                          constants.JOB_STATUS_CANCELING):
834
            logging.warning("Unfinished job %s found: %s", job.id, job)
835
            job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
836
                                  "Unclean master daemon shutdown")
837

    
838
        logging.info("Job queue inspection finished")
839
      finally:
840
        self.release()
841
    except:
842
      self._wpool.TerminateWorkers()
843
      raise
844

    
845
  @locking.ssynchronized(_LOCK)
846
  @_RequireOpenQueue
847
  def AddNode(self, node):
848
    """Register a new node with the queue.
849

850
    @type node: L{objects.Node}
851
    @param node: the node object to be added
852

853
    """
854
    node_name = node.name
855
    assert node_name != self._my_hostname
856

    
857
    # Clean queue directory on added node
858
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
859
    msg = result.fail_msg
860
    if msg:
861
      logging.warning("Cannot cleanup queue directory on node %s: %s",
862
                      node_name, msg)
863

    
864
    if not node.master_candidate:
865
      # remove if existing, ignoring errors
866
      self._nodes.pop(node_name, None)
867
      # and skip the replication of the job ids
868
      return
869

    
870
    # Upload the whole queue excluding archived jobs
871
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
872

    
873
    # Upload current serial file
874
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
875

    
876
    for file_name in files:
877
      # Read file content
878
      content = utils.ReadFile(file_name)
879

    
880
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
881
                                                  [node.primary_ip],
882
                                                  file_name, content)
883
      msg = result[node_name].fail_msg
884
      if msg:
885
        logging.error("Failed to upload file %s to node %s: %s",
886
                      file_name, node_name, msg)
887

    
888
    self._nodes[node_name] = node.primary_ip
889

    
890
  @locking.ssynchronized(_LOCK)
891
  @_RequireOpenQueue
892
  def RemoveNode(self, node_name):
893
    """Callback called when removing nodes from the cluster.
894

895
    @type node_name: str
896
    @param node_name: the name of the node to remove
897

898
    """
899
    self._nodes.pop(node_name, None)
900

    
901
  @staticmethod
902
  def _CheckRpcResult(result, nodes, failmsg):
903
    """Verifies the status of an RPC call.
904

905
    Since we aim to keep consistency should this node (the current
906
    master) fail, we will log errors if our rpc fail, and especially
907
    log the case when more than half of the nodes fails.
908

909
    @param result: the data as returned from the rpc call
910
    @type nodes: list
911
    @param nodes: the list of nodes we made the call to
912
    @type failmsg: str
913
    @param failmsg: the identifier to be used for logging
914

915
    """
916
    failed = []
917
    success = []
918

    
919
    for node in nodes:
920
      msg = result[node].fail_msg
921
      if msg:
922
        failed.append(node)
923
        logging.error("RPC call %s (%s) failed on node %s: %s",
924
                      result[node].call, failmsg, node, msg)
925
      else:
926
        success.append(node)
927

    
928
    # +1 for the master node
929
    if (len(success) + 1) < len(failed):
930
      # TODO: Handle failing nodes
931
      logging.error("More than half of the nodes failed")
932

    
933
  def _GetNodeIp(self):
934
    """Helper for returning the node name/ip list.
935

936
    @rtype: (list, list)
937
    @return: a tuple of two lists, the first one with the node
938
        names and the second one with the node addresses
939

940
    """
941
    name_list = self._nodes.keys()
942
    addr_list = [self._nodes[name] for name in name_list]
943
    return name_list, addr_list
944

    
945
  def _UpdateJobQueueFile(self, file_name, data, replicate):
946
    """Writes a file locally and then replicates it to all nodes.
947

948
    This function will replace the contents of a file on the local
949
    node and then replicate it to all the other nodes we have.
950

951
    @type file_name: str
952
    @param file_name: the path of the file to be replicated
953
    @type data: str
954
    @param data: the new contents of the file
955
    @type replicate: boolean
956
    @param replicate: whether to spread the changes to the remote nodes
957

958
    """
959
    utils.WriteFile(file_name, data=data)
960

    
961
    if replicate:
962
      names, addrs = self._GetNodeIp()
963
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
964
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
965

    
966
  def _RenameFilesUnlocked(self, rename):
967
    """Renames a file locally and then replicate the change.
968

969
    This function will rename a file in the local queue directory
970
    and then replicate this rename to all the other nodes we have.
971

972
    @type rename: list of (old, new)
973
    @param rename: List containing tuples mapping old to new names
974

975
    """
976
    # Rename them locally
977
    for old, new in rename:
978
      utils.RenameFile(old, new, mkdir=True)
979

    
980
    # ... and on all nodes
981
    names, addrs = self._GetNodeIp()
982
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
983
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
984

    
985
  @staticmethod
986
  def _FormatJobID(job_id):
987
    """Convert a job ID to string format.
988

989
    Currently this just does C{str(job_id)} after performing some
990
    checks, but if we want to change the job id format this will
991
    abstract this change.
992

993
    @type job_id: int or long
994
    @param job_id: the numeric job id
995
    @rtype: str
996
    @return: the formatted job id
997

998
    """
999
    if not isinstance(job_id, (int, long)):
1000
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1001
    if job_id < 0:
1002
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1003

    
1004
    return str(job_id)
1005

    
1006
  @classmethod
1007
  def _GetArchiveDirectory(cls, job_id):
1008
    """Returns the archive directory for a job.
1009

1010
    @type job_id: str
1011
    @param job_id: Job identifier
1012
    @rtype: str
1013
    @return: Directory name
1014

1015
    """
1016
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1017

    
1018
  def _NewSerialsUnlocked(self, count):
1019
    """Generates a new job identifier.
1020

1021
    Job identifiers are unique during the lifetime of a cluster.
1022

1023
    @type count: integer
1024
    @param count: how many serials to return
1025
    @rtype: str
1026
    @return: a string representing the job identifier.
1027

1028
    """
1029
    assert count > 0
1030
    # New number
1031
    serial = self._last_serial + count
1032

    
1033
    # Write to file
1034
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1035
                             "%s\n" % serial, True)
1036

    
1037
    result = [self._FormatJobID(v)
1038
              for v in range(self._last_serial, serial + 1)]
1039
    # Keep it only if we were able to write the file
1040
    self._last_serial = serial
1041

    
1042
    return result
1043

    
1044
  @staticmethod
1045
  def _GetJobPath(job_id):
1046
    """Returns the job file for a given job id.
1047

1048
    @type job_id: str
1049
    @param job_id: the job identifier
1050
    @rtype: str
1051
    @return: the path to the job file
1052

1053
    """
1054
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1055

    
1056
  @classmethod
1057
  def _GetArchivedJobPath(cls, job_id):
1058
    """Returns the archived job file for a give job id.
1059

1060
    @type job_id: str
1061
    @param job_id: the job identifier
1062
    @rtype: str
1063
    @return: the path to the archived job file
1064

1065
    """
1066
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1067
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1068

    
1069
  def _GetJobIDsUnlocked(self, sort=True):
1070
    """Return all known job IDs.
1071

1072
    The method only looks at disk because it's a requirement that all
1073
    jobs are present on disk (so in the _memcache we don't have any
1074
    extra IDs).
1075

1076
    @type sort: boolean
1077
    @param sort: perform sorting on the returned job ids
1078
    @rtype: list
1079
    @return: the list of job IDs
1080

1081
    """
1082
    jlist = []
1083
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1084
      m = self._RE_JOB_FILE.match(filename)
1085
      if m:
1086
        jlist.append(m.group(1))
1087
    if sort:
1088
      jlist = utils.NiceSort(jlist)
1089
    return jlist
1090

    
1091
  def _LoadJobUnlocked(self, job_id):
1092
    """Loads a job from the disk or memory.
1093

1094
    Given a job id, this will return the cached job object if
1095
    existing, or try to load the job from the disk. If loading from
1096
    disk, it will also add the job to the cache.
1097

1098
    @param job_id: the job id
1099
    @rtype: L{_QueuedJob} or None
1100
    @return: either None or the job object
1101

1102
    """
1103
    job = self._memcache.get(job_id, None)
1104
    if job:
1105
      logging.debug("Found job %s in memcache", job_id)
1106
      return job
1107

    
1108
    try:
1109
      job = self._LoadJobFromDisk(job_id)
1110
    except errors.JobFileCorrupted:
1111
      old_path = self._GetJobPath(job_id)
1112
      new_path = self._GetArchivedJobPath(job_id)
1113
      if old_path == new_path:
1114
        # job already archived (future case)
1115
        logging.exception("Can't parse job %s", job_id)
1116
      else:
1117
        # non-archived case
1118
        logging.exception("Can't parse job %s, will archive.", job_id)
1119
        self._RenameFilesUnlocked([(old_path, new_path)])
1120
      return None
1121

    
1122
    self._memcache[job_id] = job
1123
    logging.debug("Added job %s to the cache", job_id)
1124
    return job
1125

    
1126
  def _LoadJobFromDisk(self, job_id):
1127
    """Load the given job file from disk.
1128

1129
    Given a job file, read, load and restore it in a _QueuedJob format.
1130

1131
    @type job_id: string
1132
    @param job_id: job identifier
1133
    @rtype: L{_QueuedJob} or None
1134
    @return: either None or the job object
1135

1136
    """
1137
    filepath = self._GetJobPath(job_id)
1138
    logging.debug("Loading job from %s", filepath)
1139
    try:
1140
      raw_data = utils.ReadFile(filepath)
1141
    except EnvironmentError, err:
1142
      if err.errno in (errno.ENOENT, ):
1143
        return None
1144
      raise
1145

    
1146
    try:
1147
      data = serializer.LoadJson(raw_data)
1148
      job = _QueuedJob.Restore(self, data)
1149
    except Exception, err: # pylint: disable-msg=W0703
1150
      raise errors.JobFileCorrupted(err)
1151

    
1152
    return job
1153

    
1154
  def SafeLoadJobFromDisk(self, job_id):
1155
    """Load the given job file from disk.
1156

1157
    Given a job file, read, load and restore it in a _QueuedJob format.
1158
    In case of error reading the job, it gets returned as None, and the
1159
    exception is logged.
1160

1161
    @type job_id: string
1162
    @param job_id: job identifier
1163
    @rtype: L{_QueuedJob} or None
1164
    @return: either None or the job object
1165

1166
    """
1167
    try:
1168
      return self._LoadJobFromDisk(job_id)
1169
    except (errors.JobFileCorrupted, EnvironmentError):
1170
      logging.exception("Can't load/parse job %s", job_id)
1171
      return None
1172

    
1173
  @staticmethod
1174
  def _IsQueueMarkedDrain():
1175
    """Check if the queue is marked from drain.
1176

1177
    This currently uses the queue drain file, which makes it a
1178
    per-node flag. In the future this can be moved to the config file.
1179

1180
    @rtype: boolean
1181
    @return: True of the job queue is marked for draining
1182

1183
    """
1184
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1185

    
1186
  def _UpdateQueueSizeUnlocked(self):
1187
    """Update the queue size.
1188

1189
    """
1190
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1191

    
1192
  @locking.ssynchronized(_LOCK)
1193
  @_RequireOpenQueue
1194
  def SetDrainFlag(self, drain_flag):
1195
    """Sets the drain flag for the queue.
1196

1197
    @type drain_flag: boolean
1198
    @param drain_flag: Whether to set or unset the drain flag
1199

1200
    """
1201
    if drain_flag:
1202
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1203
    else:
1204
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1205

    
1206
    self._drained = drain_flag
1207

    
1208
    return True
1209

    
1210
  @_RequireOpenQueue
1211
  def _SubmitJobUnlocked(self, job_id, ops):
1212
    """Create and store a new job.
1213

1214
    This enters the job into our job queue and also puts it on the new
1215
    queue, in order for it to be picked up by the queue processors.
1216

1217
    @type job_id: job ID
1218
    @param job_id: the job ID for the new job
1219
    @type ops: list
1220
    @param ops: The list of OpCodes that will become the new job.
1221
    @rtype: L{_QueuedJob}
1222
    @return: the job object to be queued
1223
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1224
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1225

1226
    """
1227
    # Ok when sharing the big job queue lock, as the drain file is created when
1228
    # the lock is exclusive.
1229
    if self._drained:
1230
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1231

    
1232
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1233
      raise errors.JobQueueFull()
1234

    
1235
    job = _QueuedJob(self, job_id, ops)
1236

    
1237
    # Write to disk
1238
    self.UpdateJobUnlocked(job)
1239

    
1240
    self._queue_size += 1
1241

    
1242
    logging.debug("Adding new job %s to the cache", job_id)
1243
    self._memcache[job_id] = job
1244

    
1245
    return job
1246

    
1247
  @locking.ssynchronized(_LOCK)
1248
  @_RequireOpenQueue
1249
  def SubmitJob(self, ops):
1250
    """Create and store a new job.
1251

1252
    @see: L{_SubmitJobUnlocked}
1253

1254
    """
1255
    job_id = self._NewSerialsUnlocked(1)[0]
1256
    self._wpool.AddTask(self._SubmitJobUnlocked(job_id, ops))
1257
    return job_id
1258

    
1259
  @locking.ssynchronized(_LOCK)
1260
  @_RequireOpenQueue
1261
  def SubmitManyJobs(self, jobs):
1262
    """Create and store multiple jobs.
1263

1264
    @see: L{_SubmitJobUnlocked}
1265

1266
    """
1267
    results = []
1268
    tasks = []
1269
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1270
    for job_id, ops in zip(all_job_ids, jobs):
1271
      try:
1272
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1273
        status = True
1274
        data = job_id
1275
      except errors.GenericError, err:
1276
        data = str(err)
1277
        status = False
1278
      results.append((status, data))
1279
    self._wpool.AddManyTasks(tasks)
1280

    
1281
    return results
1282

    
1283
  @_RequireOpenQueue
1284
  def UpdateJobUnlocked(self, job, replicate=True):
1285
    """Update a job's on disk storage.
1286

1287
    After a job has been modified, this function needs to be called in
1288
    order to write the changes to disk and replicate them to the other
1289
    nodes.
1290

1291
    @type job: L{_QueuedJob}
1292
    @param job: the changed job
1293
    @type replicate: boolean
1294
    @param replicate: whether to replicate the change to remote nodes
1295

1296
    """
1297
    filename = self._GetJobPath(job.id)
1298
    data = serializer.DumpJson(job.Serialize(), indent=False)
1299
    logging.debug("Writing job %s to %s", job.id, filename)
1300
    self._UpdateJobQueueFile(filename, data, replicate)
1301

    
1302
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1303
                        timeout):
1304
    """Waits for changes in a job.
1305

1306
    @type job_id: string
1307
    @param job_id: Job identifier
1308
    @type fields: list of strings
1309
    @param fields: Which fields to check for changes
1310
    @type prev_job_info: list or None
1311
    @param prev_job_info: Last job information returned
1312
    @type prev_log_serial: int
1313
    @param prev_log_serial: Last job message serial number
1314
    @type timeout: float
1315
    @param timeout: maximum time to wait
1316
    @rtype: tuple (job info, log entries)
1317
    @return: a tuple of the job information as required via
1318
        the fields parameter, and the log entries as a list
1319

1320
        if the job has not changed and the timeout has expired,
1321
        we instead return a special value,
1322
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1323
        as such by the clients
1324

1325
    """
1326
    helper = _WaitForJobChangesHelper(job_id, fields, prev_job_info,
1327
                                      prev_log_serial, self)
1328
    try:
1329
      return helper.WaitForChanges(timeout)
1330
    finally:
1331
      helper.Close()
1332

    
1333
  @locking.ssynchronized(_LOCK)
1334
  @_RequireOpenQueue
1335
  def CancelJob(self, job_id):
1336
    """Cancels a job.
1337

1338
    This will only succeed if the job has not started yet.
1339

1340
    @type job_id: string
1341
    @param job_id: job ID of job to be cancelled.
1342

1343
    """
1344
    logging.info("Cancelling job %s", job_id)
1345

    
1346
    job = self._LoadJobUnlocked(job_id)
1347
    if not job:
1348
      logging.debug("Job %s not found", job_id)
1349
      return (False, "Job %s not found" % job_id)
1350

    
1351
    job_status = job.CalcStatus()
1352

    
1353
    if job_status not in (constants.JOB_STATUS_QUEUED,
1354
                          constants.JOB_STATUS_WAITLOCK):
1355
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1356
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1357

    
1358
    if job_status == constants.JOB_STATUS_QUEUED:
1359
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1360
                            "Job canceled by request")
1361
      return (True, "Job %s canceled" % job.id)
1362

    
1363
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1364
      # The worker will notice the new status and cancel the job
1365
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1366
      return (True, "Job %s will be canceled" % job.id)
1367

    
1368
  @_RequireOpenQueue
1369
  def _ArchiveJobsUnlocked(self, jobs):
1370
    """Archives jobs.
1371

1372
    @type jobs: list of L{_QueuedJob}
1373
    @param jobs: Job objects
1374
    @rtype: int
1375
    @return: Number of archived jobs
1376

1377
    """
1378
    archive_jobs = []
1379
    rename_files = []
1380
    for job in jobs:
1381
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1382
                                  constants.JOB_STATUS_SUCCESS,
1383
                                  constants.JOB_STATUS_ERROR):
1384
        logging.debug("Job %s is not yet done", job.id)
1385
        continue
1386

    
1387
      archive_jobs.append(job)
1388

    
1389
      old = self._GetJobPath(job.id)
1390
      new = self._GetArchivedJobPath(job.id)
1391
      rename_files.append((old, new))
1392

    
1393
    # TODO: What if 1..n files fail to rename?
1394
    self._RenameFilesUnlocked(rename_files)
1395

    
1396
    logging.debug("Successfully archived job(s) %s",
1397
                  utils.CommaJoin(job.id for job in archive_jobs))
1398

    
1399
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1400
    # the files, we update the cached queue size from the filesystem. When we
1401
    # get around to fix the TODO: above, we can use the number of actually
1402
    # archived jobs to fix this.
1403
    self._UpdateQueueSizeUnlocked()
1404
    return len(archive_jobs)
1405

    
1406
  @locking.ssynchronized(_LOCK)
1407
  @_RequireOpenQueue
1408
  def ArchiveJob(self, job_id):
1409
    """Archives a job.
1410

1411
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1412

1413
    @type job_id: string
1414
    @param job_id: Job ID of job to be archived.
1415
    @rtype: bool
1416
    @return: Whether job was archived
1417

1418
    """
1419
    logging.info("Archiving job %s", job_id)
1420

    
1421
    job = self._LoadJobUnlocked(job_id)
1422
    if not job:
1423
      logging.debug("Job %s not found", job_id)
1424
      return False
1425

    
1426
    return self._ArchiveJobsUnlocked([job]) == 1
1427

    
1428
  @locking.ssynchronized(_LOCK)
1429
  @_RequireOpenQueue
1430
  def AutoArchiveJobs(self, age, timeout):
1431
    """Archives all jobs based on age.
1432

1433
    The method will archive all jobs which are older than the age
1434
    parameter. For jobs that don't have an end timestamp, the start
1435
    timestamp will be considered. The special '-1' age will cause
1436
    archival of all jobs (that are not running or queued).
1437

1438
    @type age: int
1439
    @param age: the minimum age in seconds
1440

1441
    """
1442
    logging.info("Archiving jobs with age more than %s seconds", age)
1443

    
1444
    now = time.time()
1445
    end_time = now + timeout
1446
    archived_count = 0
1447
    last_touched = 0
1448

    
1449
    all_job_ids = self._GetJobIDsUnlocked()
1450
    pending = []
1451
    for idx, job_id in enumerate(all_job_ids):
1452
      last_touched = idx + 1
1453

    
1454
      # Not optimal because jobs could be pending
1455
      # TODO: Measure average duration for job archival and take number of
1456
      # pending jobs into account.
1457
      if time.time() > end_time:
1458
        break
1459

    
1460
      # Returns None if the job failed to load
1461
      job = self._LoadJobUnlocked(job_id)
1462
      if job:
1463
        if job.end_timestamp is None:
1464
          if job.start_timestamp is None:
1465
            job_age = job.received_timestamp
1466
          else:
1467
            job_age = job.start_timestamp
1468
        else:
1469
          job_age = job.end_timestamp
1470

    
1471
        if age == -1 or now - job_age[0] > age:
1472
          pending.append(job)
1473

    
1474
          # Archive 10 jobs at a time
1475
          if len(pending) >= 10:
1476
            archived_count += self._ArchiveJobsUnlocked(pending)
1477
            pending = []
1478

    
1479
    if pending:
1480
      archived_count += self._ArchiveJobsUnlocked(pending)
1481

    
1482
    return (archived_count, len(all_job_ids) - last_touched)
1483

    
1484
  def QueryJobs(self, job_ids, fields):
1485
    """Returns a list of jobs in queue.
1486

1487
    @type job_ids: list
1488
    @param job_ids: sequence of job identifiers or None for all
1489
    @type fields: list
1490
    @param fields: names of fields to return
1491
    @rtype: list
1492
    @return: list one element per job, each element being list with
1493
        the requested fields
1494

1495
    """
1496
    jobs = []
1497
    list_all = False
1498
    if not job_ids:
1499
      # Since files are added to/removed from the queue atomically, there's no
1500
      # risk of getting the job ids in an inconsistent state.
1501
      job_ids = self._GetJobIDsUnlocked()
1502
      list_all = True
1503

    
1504
    for job_id in job_ids:
1505
      job = self.SafeLoadJobFromDisk(job_id)
1506
      if job is not None:
1507
        jobs.append(job.GetInfo(fields))
1508
      elif not list_all:
1509
        jobs.append(None)
1510

    
1511
    return jobs
1512

    
1513
  @locking.ssynchronized(_LOCK)
1514
  @_RequireOpenQueue
1515
  def Shutdown(self):
1516
    """Stops the job queue.
1517

1518
    This shutdowns all the worker threads an closes the queue.
1519

1520
    """
1521
    self._wpool.TerminateWorkers()
1522

    
1523
    self._queue_filelock.Close()
1524
    self._queue_filelock = None