Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 2034c70d

History | View | Annotate | Download (46 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import errno
35
import re
36
import time
37
import weakref
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import netutils
57

    
58

    
59
JOBQUEUE_THREADS = 25
60
JOBS_PER_ARCHIVE_DIRECTORY = 10000
61

    
62
# member lock names to be passed to @ssynchronized decorator
63
_LOCK = "_lock"
64
_QUEUE = "_queue"
65

    
66

    
67
class CancelJob(Exception):
68
  """Special exception to cancel a job.
69

70
  """
71

    
72

    
73
def TimeStampNow():
74
  """Returns the current timestamp.
75

76
  @rtype: tuple
77
  @return: the current time in the (seconds, microseconds) format
78

79
  """
80
  return utils.SplitTime(time.time())
81

    
82

    
83
class _QueuedOpCode(object):
84
  """Encapsulates an opcode object.
85

86
  @ivar log: holds the execution log and consists of tuples
87
  of the form C{(log_serial, timestamp, level, message)}
88
  @ivar input: the OpCode we encapsulate
89
  @ivar status: the current status
90
  @ivar result: the result of the LU execution
91
  @ivar start_timestamp: timestamp for the start of the execution
92
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
93
  @ivar stop_timestamp: timestamp for the end of the execution
94

95
  """
96
  __slots__ = ["input", "status", "result", "log",
97
               "start_timestamp", "exec_timestamp", "end_timestamp",
98
               "__weakref__"]
99

    
100
  def __init__(self, op):
101
    """Constructor for the _QuededOpCode.
102

103
    @type op: L{opcodes.OpCode}
104
    @param op: the opcode we encapsulate
105

106
    """
107
    self.input = op
108
    self.status = constants.OP_STATUS_QUEUED
109
    self.result = None
110
    self.log = []
111
    self.start_timestamp = None
112
    self.exec_timestamp = None
113
    self.end_timestamp = None
114

    
115
  @classmethod
116
  def Restore(cls, state):
117
    """Restore the _QueuedOpCode from the serialized form.
118

119
    @type state: dict
120
    @param state: the serialized state
121
    @rtype: _QueuedOpCode
122
    @return: a new _QueuedOpCode instance
123

124
    """
125
    obj = _QueuedOpCode.__new__(cls)
126
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
127
    obj.status = state["status"]
128
    obj.result = state["result"]
129
    obj.log = state["log"]
130
    obj.start_timestamp = state.get("start_timestamp", None)
131
    obj.exec_timestamp = state.get("exec_timestamp", None)
132
    obj.end_timestamp = state.get("end_timestamp", None)
133
    return obj
134

    
135
  def Serialize(self):
136
    """Serializes this _QueuedOpCode.
137

138
    @rtype: dict
139
    @return: the dictionary holding the serialized state
140

141
    """
142
    return {
143
      "input": self.input.__getstate__(),
144
      "status": self.status,
145
      "result": self.result,
146
      "log": self.log,
147
      "start_timestamp": self.start_timestamp,
148
      "exec_timestamp": self.exec_timestamp,
149
      "end_timestamp": self.end_timestamp,
150
      }
151

    
152

    
153
class _QueuedJob(object):
154
  """In-memory job representation.
155

156
  This is what we use to track the user-submitted jobs. Locking must
157
  be taken care of by users of this class.
158

159
  @type queue: L{JobQueue}
160
  @ivar queue: the parent queue
161
  @ivar id: the job ID
162
  @type ops: list
163
  @ivar ops: the list of _QueuedOpCode that constitute the job
164
  @type log_serial: int
165
  @ivar log_serial: holds the index for the next log entry
166
  @ivar received_timestamp: the timestamp for when the job was received
167
  @ivar start_timestmap: the timestamp for start of execution
168
  @ivar end_timestamp: the timestamp for end of execution
169
  @ivar lock_status: In-memory locking information for debugging
170

171
  """
172
  # pylint: disable-msg=W0212
173
  __slots__ = ["queue", "id", "ops", "log_serial",
174
               "received_timestamp", "start_timestamp", "end_timestamp",
175
               "lock_status", "change",
176
               "__weakref__"]
177

    
178
  def __init__(self, queue, job_id, ops):
179
    """Constructor for the _QueuedJob.
180

181
    @type queue: L{JobQueue}
182
    @param queue: our parent queue
183
    @type job_id: job_id
184
    @param job_id: our job id
185
    @type ops: list
186
    @param ops: the list of opcodes we hold, which will be encapsulated
187
        in _QueuedOpCodes
188

189
    """
190
    if not ops:
191
      raise errors.GenericError("A job needs at least one opcode")
192

    
193
    self.queue = queue
194
    self.id = job_id
195
    self.ops = [_QueuedOpCode(op) for op in ops]
196
    self.log_serial = 0
197
    self.received_timestamp = TimeStampNow()
198
    self.start_timestamp = None
199
    self.end_timestamp = None
200

    
201
    # In-memory attributes
202
    self.lock_status = None
203

    
204
  def __repr__(self):
205
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
206
              "id=%s" % self.id,
207
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
208

    
209
    return "<%s at %#x>" % (" ".join(status), id(self))
210

    
211
  @classmethod
212
  def Restore(cls, queue, state):
213
    """Restore a _QueuedJob from serialized state:
214

215
    @type queue: L{JobQueue}
216
    @param queue: to which queue the restored job belongs
217
    @type state: dict
218
    @param state: the serialized state
219
    @rtype: _JobQueue
220
    @return: the restored _JobQueue instance
221

222
    """
223
    obj = _QueuedJob.__new__(cls)
224
    obj.queue = queue
225
    obj.id = state["id"]
226
    obj.received_timestamp = state.get("received_timestamp", None)
227
    obj.start_timestamp = state.get("start_timestamp", None)
228
    obj.end_timestamp = state.get("end_timestamp", None)
229

    
230
    # In-memory attributes
231
    obj.lock_status = None
232

    
233
    obj.ops = []
234
    obj.log_serial = 0
235
    for op_state in state["ops"]:
236
      op = _QueuedOpCode.Restore(op_state)
237
      for log_entry in op.log:
238
        obj.log_serial = max(obj.log_serial, log_entry[0])
239
      obj.ops.append(op)
240

    
241
    return obj
242

    
243
  def Serialize(self):
244
    """Serialize the _JobQueue instance.
245

246
    @rtype: dict
247
    @return: the serialized state
248

249
    """
250
    return {
251
      "id": self.id,
252
      "ops": [op.Serialize() for op in self.ops],
253
      "start_timestamp": self.start_timestamp,
254
      "end_timestamp": self.end_timestamp,
255
      "received_timestamp": self.received_timestamp,
256
      }
257

    
258
  def CalcStatus(self):
259
    """Compute the status of this job.
260

261
    This function iterates over all the _QueuedOpCodes in the job and
262
    based on their status, computes the job status.
263

264
    The algorithm is:
265
      - if we find a cancelled, or finished with error, the job
266
        status will be the same
267
      - otherwise, the last opcode with the status one of:
268
          - waitlock
269
          - canceling
270
          - running
271

272
        will determine the job status
273

274
      - otherwise, it means either all opcodes are queued, or success,
275
        and the job status will be the same
276

277
    @return: the job status
278

279
    """
280
    status = constants.JOB_STATUS_QUEUED
281

    
282
    all_success = True
283
    for op in self.ops:
284
      if op.status == constants.OP_STATUS_SUCCESS:
285
        continue
286

    
287
      all_success = False
288

    
289
      if op.status == constants.OP_STATUS_QUEUED:
290
        pass
291
      elif op.status == constants.OP_STATUS_WAITLOCK:
292
        status = constants.JOB_STATUS_WAITLOCK
293
      elif op.status == constants.OP_STATUS_RUNNING:
294
        status = constants.JOB_STATUS_RUNNING
295
      elif op.status == constants.OP_STATUS_CANCELING:
296
        status = constants.JOB_STATUS_CANCELING
297
        break
298
      elif op.status == constants.OP_STATUS_ERROR:
299
        status = constants.JOB_STATUS_ERROR
300
        # The whole job fails if one opcode failed
301
        break
302
      elif op.status == constants.OP_STATUS_CANCELED:
303
        status = constants.OP_STATUS_CANCELED
304
        break
305

    
306
    if all_success:
307
      status = constants.JOB_STATUS_SUCCESS
308

    
309
    return status
310

    
311
  def GetLogEntries(self, newer_than):
312
    """Selectively returns the log entries.
313

314
    @type newer_than: None or int
315
    @param newer_than: if this is None, return all log entries,
316
        otherwise return only the log entries with serial higher
317
        than this value
318
    @rtype: list
319
    @return: the list of the log entries selected
320

321
    """
322
    if newer_than is None:
323
      serial = -1
324
    else:
325
      serial = newer_than
326

    
327
    entries = []
328
    for op in self.ops:
329
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
330

    
331
    return entries
332

    
333
  def GetInfo(self, fields):
334
    """Returns information about a job.
335

336
    @type fields: list
337
    @param fields: names of fields to return
338
    @rtype: list
339
    @return: list with one element for each field
340
    @raise errors.OpExecError: when an invalid field
341
        has been passed
342

343
    """
344
    row = []
345
    for fname in fields:
346
      if fname == "id":
347
        row.append(self.id)
348
      elif fname == "status":
349
        row.append(self.CalcStatus())
350
      elif fname == "ops":
351
        row.append([op.input.__getstate__() for op in self.ops])
352
      elif fname == "opresult":
353
        row.append([op.result for op in self.ops])
354
      elif fname == "opstatus":
355
        row.append([op.status for op in self.ops])
356
      elif fname == "oplog":
357
        row.append([op.log for op in self.ops])
358
      elif fname == "opstart":
359
        row.append([op.start_timestamp for op in self.ops])
360
      elif fname == "opexec":
361
        row.append([op.exec_timestamp for op in self.ops])
362
      elif fname == "opend":
363
        row.append([op.end_timestamp for op in self.ops])
364
      elif fname == "received_ts":
365
        row.append(self.received_timestamp)
366
      elif fname == "start_ts":
367
        row.append(self.start_timestamp)
368
      elif fname == "end_ts":
369
        row.append(self.end_timestamp)
370
      elif fname == "lock_status":
371
        row.append(self.lock_status)
372
      elif fname == "summary":
373
        row.append([op.input.Summary() for op in self.ops])
374
      else:
375
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
376
    return row
377

    
378
  def MarkUnfinishedOps(self, status, result):
379
    """Mark unfinished opcodes with a given status and result.
380

381
    This is an utility function for marking all running or waiting to
382
    be run opcodes with a given status. Opcodes which are already
383
    finalised are not changed.
384

385
    @param status: a given opcode status
386
    @param result: the opcode result
387

388
    """
389
    try:
390
      not_marked = True
391
      for op in self.ops:
392
        if op.status in constants.OPS_FINALIZED:
393
          assert not_marked, "Finalized opcodes found after non-finalized ones"
394
          continue
395
        op.status = status
396
        op.result = result
397
        not_marked = False
398
    finally:
399
      self.queue.UpdateJobUnlocked(self)
400

    
401

    
402
class _OpExecCallbacks(mcpu.OpExecCbBase):
403
  def __init__(self, queue, job, op):
404
    """Initializes this class.
405

406
    @type queue: L{JobQueue}
407
    @param queue: Job queue
408
    @type job: L{_QueuedJob}
409
    @param job: Job object
410
    @type op: L{_QueuedOpCode}
411
    @param op: OpCode
412

413
    """
414
    assert queue, "Queue is missing"
415
    assert job, "Job is missing"
416
    assert op, "Opcode is missing"
417

    
418
    self._queue = queue
419
    self._job = job
420
    self._op = op
421

    
422
  @locking.ssynchronized(_QUEUE, shared=1)
423
  def NotifyStart(self):
424
    """Mark the opcode as running, not lock-waiting.
425

426
    This is called from the mcpu code as a notifier function, when the LU is
427
    finally about to start the Exec() method. Of course, to have end-user
428
    visible results, the opcode must be initially (before calling into
429
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
430

431
    """
432
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
433
                               constants.OP_STATUS_CANCELING)
434

    
435
    # All locks are acquired by now
436
    self._job.lock_status = None
437

    
438
    # Cancel here if we were asked to
439
    if self._op.status == constants.OP_STATUS_CANCELING:
440
      raise CancelJob()
441

    
442
    self._op.status = constants.OP_STATUS_RUNNING
443
    self._op.exec_timestamp = TimeStampNow()
444

    
445
    # And finally replicate the job status
446
    self._queue.UpdateJobUnlocked(self._job)
447

    
448
  @locking.ssynchronized(_QUEUE, shared=1)
449
  def _AppendFeedback(self, timestamp, log_type, log_msg):
450
    """Internal feedback append function, with locks
451

452
    """
453
    self._job.log_serial += 1
454
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
455
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
456

    
457
  def Feedback(self, *args):
458
    """Append a log entry.
459

460
    """
461
    assert len(args) < 3
462

    
463
    if len(args) == 1:
464
      log_type = constants.ELOG_MESSAGE
465
      log_msg = args[0]
466
    else:
467
      (log_type, log_msg) = args
468

    
469
    # The time is split to make serialization easier and not lose
470
    # precision.
471
    timestamp = utils.SplitTime(time.time())
472
    self._AppendFeedback(timestamp, log_type, log_msg)
473

    
474
  def ReportLocks(self, msg):
475
    """Write locking information to the job.
476

477
    Called whenever the LU processor is waiting for a lock or has acquired one.
478

479
    """
480
    # Not getting the queue lock because this is a single assignment
481
    self._job.lock_status = msg
482

    
483

    
484
class _WaitForJobChangesHelper(object):
485
  """Helper class using initofy to wait for changes in a job file.
486

487
  This class takes a previous job status and serial, and alerts the client when
488
  the current job status has changed.
489

490
  @type job_id: string
491
  @ivar job_id: id of the job we're watching
492
  @type prev_job_info: string
493
  @ivar prev_job_info: previous job info, as passed by the luxi client
494
  @type prev_log_serial: string
495
  @ivar prev_log_serial: previous job serial, as passed by the luxi client
496
  @type queue: L{JobQueue}
497
  @ivar queue: job queue (used for a few utility functions)
498
  @type job_path: string
499
  @ivar job_path: absolute path of the job file
500
  @type wm: pyinotify.WatchManager (or None)
501
  @ivar wm: inotify watch manager to watch for changes
502
  @type inotify_handler: L{asyncnotifier.SingleFileEventHandler}
503
  @ivar inotify_handler: single file event handler, used for watching
504
  @type notifier: pyinotify.Notifier
505
  @ivar notifier: inotify single-threaded notifier, used for watching
506

507
  """
508
  def __init__(self, job_id, fields, prev_job_info, prev_log_serial, queue):
509
    self.job_id = job_id
510
    self.fields = fields
511
    self.prev_job_info = prev_job_info
512
    self.prev_log_serial = prev_log_serial
513
    self.queue = queue
514
    # pylint: disable-msg=W0212
515
    self.job_path = self.queue._GetJobPath(self.job_id)
516
    self.wm = None
517
    self.inotify_handler = None
518
    self.notifier = None
519

    
520
  def _SetupInotify(self):
521
    """Create the inotify
522

523
    @raises errors.InotifyError: if the notifier cannot be setup
524

525
    """
526
    if self.wm:
527
      return
528
    self.wm = pyinotify.WatchManager()
529
    self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm,
530
                                                                self.OnInotify,
531
                                                                self.job_path)
532
    self.notifier = pyinotify.Notifier(self.wm, self.inotify_handler)
533
    self.inotify_handler.enable()
534

    
535
  def _LoadDiskStatus(self):
536
    job = self.queue.SafeLoadJobFromDisk(self.job_id)
537
    if not job:
538
      raise errors.JobLost()
539
    self.job_status = job.CalcStatus()
540

    
541
    job_info = job.GetInfo(self.fields)
542
    log_entries = job.GetLogEntries(self.prev_log_serial)
543
    # Serializing and deserializing data can cause type changes (e.g. from
544
    # tuple to list) or precision loss. We're doing it here so that we get
545
    # the same modifications as the data received from the client. Without
546
    # this, the comparison afterwards might fail without the data being
547
    # significantly different.
548
    # TODO: we just deserialized from disk, investigate how to make sure that
549
    # the job info and log entries are compatible to avoid this further step.
550
    self.job_info = serializer.LoadJson(serializer.DumpJson(job_info))
551
    self.log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
552

    
553
  def _CheckForChanges(self):
554
    self._LoadDiskStatus()
555
    # Don't even try to wait if the job is no longer running, there will be
556
    # no changes.
557
    if (self.job_status not in (constants.JOB_STATUS_QUEUED,
558
                                constants.JOB_STATUS_RUNNING,
559
                                constants.JOB_STATUS_WAITLOCK) or
560
        self.prev_job_info != self.job_info or
561
        (self.log_entries and self.prev_log_serial != self.log_entries[0][0])):
562
      logging.debug("Job %s changed", self.job_id)
563
      return (self.job_info, self.log_entries)
564

    
565
    raise utils.RetryAgain()
566

    
567
  def OnInotify(self, notifier_enabled):
568
    if not notifier_enabled:
569
      self.inotify_handler.enable()
570

    
571
  def WaitFn(self, timeout):
572
    self._SetupInotify()
573
    if self.notifier.check_events(timeout*1000):
574
      self.notifier.read_events()
575
    self.notifier.process_events()
576

    
577
  def WaitForChanges(self, timeout):
578
    self._SetupInotify()
579
    try:
580
      return utils.Retry(self._CheckForChanges,
581
                         utils.RETRY_REMAINING_TIME,
582
                         timeout,
583
                         wait_fn=self.WaitFn)
584
    except (errors.InotifyError, errors.JobLost):
585
      return None
586
    except utils.RetryTimeout:
587
      return constants.JOB_NOTCHANGED
588

    
589
  def Close(self):
590
    if self.wm:
591
      self.notifier.stop()
592

    
593

    
594
class _JobQueueWorker(workerpool.BaseWorker):
595
  """The actual job workers.
596

597
  """
598
  def RunTask(self, job): # pylint: disable-msg=W0221
599
    """Job executor.
600

601
    This functions processes a job. It is closely tied to the _QueuedJob and
602
    _QueuedOpCode classes.
603

604
    @type job: L{_QueuedJob}
605
    @param job: the job to be processed
606

607
    """
608
    logging.info("Processing job %s", job.id)
609
    proc = mcpu.Processor(self.pool.queue.context, job.id)
610
    queue = job.queue
611
    try:
612
      try:
613
        count = len(job.ops)
614
        for idx, op in enumerate(job.ops):
615
          op_summary = op.input.Summary()
616
          if op.status == constants.OP_STATUS_SUCCESS:
617
            # this is a job that was partially completed before master
618
            # daemon shutdown, so it can be expected that some opcodes
619
            # are already completed successfully (if any did error
620
            # out, then the whole job should have been aborted and not
621
            # resubmitted for processing)
622
            logging.info("Op %s/%s: opcode %s already processed, skipping",
623
                         idx + 1, count, op_summary)
624
            continue
625
          try:
626
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
627
                         op_summary)
628

    
629
            queue.acquire(shared=1)
630
            try:
631
              if op.status == constants.OP_STATUS_CANCELED:
632
                raise CancelJob()
633
              assert op.status == constants.OP_STATUS_QUEUED
634
              op.status = constants.OP_STATUS_WAITLOCK
635
              op.result = None
636
              op.start_timestamp = TimeStampNow()
637
              if idx == 0: # first opcode
638
                job.start_timestamp = op.start_timestamp
639
              queue.UpdateJobUnlocked(job)
640

    
641
              input_opcode = op.input
642
            finally:
643
              queue.release()
644

    
645
            # Make sure not to hold queue lock while calling ExecOpCode
646
            result = proc.ExecOpCode(input_opcode,
647
                                     _OpExecCallbacks(queue, job, op))
648

    
649
            queue.acquire(shared=1)
650
            try:
651
              op.status = constants.OP_STATUS_SUCCESS
652
              op.result = result
653
              op.end_timestamp = TimeStampNow()
654
              queue.UpdateJobUnlocked(job)
655
            finally:
656
              queue.release()
657

    
658
            logging.info("Op %s/%s: Successfully finished opcode %s",
659
                         idx + 1, count, op_summary)
660
          except CancelJob:
661
            # Will be handled further up
662
            raise
663
          except Exception, err:
664
            queue.acquire(shared=1)
665
            try:
666
              try:
667
                op.status = constants.OP_STATUS_ERROR
668
                if isinstance(err, errors.GenericError):
669
                  op.result = errors.EncodeException(err)
670
                else:
671
                  op.result = str(err)
672
                op.end_timestamp = TimeStampNow()
673
                logging.info("Op %s/%s: Error in opcode %s: %s",
674
                             idx + 1, count, op_summary, err)
675
              finally:
676
                queue.UpdateJobUnlocked(job)
677
            finally:
678
              queue.release()
679
            raise
680

    
681
      except CancelJob:
682
        queue.acquire(shared=1)
683
        try:
684
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
685
                                "Job canceled by request")
686
        finally:
687
          queue.release()
688
      except errors.GenericError, err:
689
        logging.exception("Ganeti exception")
690
      except:
691
        logging.exception("Unhandled exception")
692
    finally:
693
      queue.acquire(shared=1)
694
      try:
695
        try:
696
          job.lock_status = None
697
          job.end_timestamp = TimeStampNow()
698
          queue.UpdateJobUnlocked(job)
699
        finally:
700
          job_id = job.id
701
          status = job.CalcStatus()
702
      finally:
703
        queue.release()
704

    
705
      logging.info("Finished job %s, status = %s", job_id, status)
706

    
707

    
708
class _JobQueueWorkerPool(workerpool.WorkerPool):
709
  """Simple class implementing a job-processing workerpool.
710

711
  """
712
  def __init__(self, queue):
713
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
714
                                              JOBQUEUE_THREADS,
715
                                              _JobQueueWorker)
716
    self.queue = queue
717

    
718

    
719
def _RequireOpenQueue(fn):
720
  """Decorator for "public" functions.
721

722
  This function should be used for all 'public' functions. That is,
723
  functions usually called from other classes. Note that this should
724
  be applied only to methods (not plain functions), since it expects
725
  that the decorated function is called with a first argument that has
726
  a '_queue_filelock' argument.
727

728
  @warning: Use this decorator only after locking.ssynchronized
729

730
  Example::
731
    @locking.ssynchronized(_LOCK)
732
    @_RequireOpenQueue
733
    def Example(self):
734
      pass
735

736
  """
737
  def wrapper(self, *args, **kwargs):
738
    # pylint: disable-msg=W0212
739
    assert self._queue_filelock is not None, "Queue should be open"
740
    return fn(self, *args, **kwargs)
741
  return wrapper
742

    
743

    
744
class JobQueue(object):
745
  """Queue used to manage the jobs.
746

747
  @cvar _RE_JOB_FILE: regex matching the valid job file names
748

749
  """
750
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
751

    
752
  def __init__(self, context):
753
    """Constructor for JobQueue.
754

755
    The constructor will initialize the job queue object and then
756
    start loading the current jobs from disk, either for starting them
757
    (if they were queue) or for aborting them (if they were already
758
    running).
759

760
    @type context: GanetiContext
761
    @param context: the context object for access to the configuration
762
        data and other ganeti objects
763

764
    """
765
    self.context = context
766
    self._memcache = weakref.WeakValueDictionary()
767
    self._my_hostname = netutils.HostInfo().name
768

    
769
    # The Big JobQueue lock. If a code block or method acquires it in shared
770
    # mode safe it must guarantee concurrency with all the code acquiring it in
771
    # shared mode, including itself. In order not to acquire it at all
772
    # concurrency must be guaranteed with all code acquiring it in shared mode
773
    # and all code acquiring it exclusively.
774
    self._lock = locking.SharedLock()
775

    
776
    self.acquire = self._lock.acquire
777
    self.release = self._lock.release
778

    
779
    # Initialize the queue, and acquire the filelock.
780
    # This ensures no other process is working on the job queue.
781
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
782

    
783
    # Read serial file
784
    self._last_serial = jstore.ReadSerial()
785
    assert self._last_serial is not None, ("Serial file was modified between"
786
                                           " check in jstore and here")
787

    
788
    # Get initial list of nodes
789
    self._nodes = dict((n.name, n.primary_ip)
790
                       for n in self.context.cfg.GetAllNodesInfo().values()
791
                       if n.master_candidate)
792

    
793
    # Remove master node
794
    self._nodes.pop(self._my_hostname, None)
795

    
796
    # TODO: Check consistency across nodes
797

    
798
    self._queue_size = 0
799
    self._UpdateQueueSizeUnlocked()
800
    self._drained = self._IsQueueMarkedDrain()
801

    
802
    # Setup worker pool
803
    self._wpool = _JobQueueWorkerPool(self)
804
    try:
805
      # We need to lock here because WorkerPool.AddTask() may start a job while
806
      # we're still doing our work.
807
      self.acquire()
808
      try:
809
        logging.info("Inspecting job queue")
810

    
811
        all_job_ids = self._GetJobIDsUnlocked()
812
        jobs_count = len(all_job_ids)
813
        lastinfo = time.time()
814
        for idx, job_id in enumerate(all_job_ids):
815
          # Give an update every 1000 jobs or 10 seconds
816
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
817
              idx == (jobs_count - 1)):
818
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
819
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
820
            lastinfo = time.time()
821

    
822
          job = self._LoadJobUnlocked(job_id)
823

    
824
          # a failure in loading the job can cause 'None' to be returned
825
          if job is None:
826
            continue
827

    
828
          status = job.CalcStatus()
829

    
830
          if status in (constants.JOB_STATUS_QUEUED, ):
831
            self._wpool.AddTask(job)
832

    
833
          elif status in (constants.JOB_STATUS_RUNNING,
834
                          constants.JOB_STATUS_WAITLOCK,
835
                          constants.JOB_STATUS_CANCELING):
836
            logging.warning("Unfinished job %s found: %s", job.id, job)
837
            job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
838
                                  "Unclean master daemon shutdown")
839

    
840
        logging.info("Job queue inspection finished")
841
      finally:
842
        self.release()
843
    except:
844
      self._wpool.TerminateWorkers()
845
      raise
846

    
847
  @locking.ssynchronized(_LOCK)
848
  @_RequireOpenQueue
849
  def AddNode(self, node):
850
    """Register a new node with the queue.
851

852
    @type node: L{objects.Node}
853
    @param node: the node object to be added
854

855
    """
856
    node_name = node.name
857
    assert node_name != self._my_hostname
858

    
859
    # Clean queue directory on added node
860
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
861
    msg = result.fail_msg
862
    if msg:
863
      logging.warning("Cannot cleanup queue directory on node %s: %s",
864
                      node_name, msg)
865

    
866
    if not node.master_candidate:
867
      # remove if existing, ignoring errors
868
      self._nodes.pop(node_name, None)
869
      # and skip the replication of the job ids
870
      return
871

    
872
    # Upload the whole queue excluding archived jobs
873
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
874

    
875
    # Upload current serial file
876
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
877

    
878
    for file_name in files:
879
      # Read file content
880
      content = utils.ReadFile(file_name)
881

    
882
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
883
                                                  [node.primary_ip],
884
                                                  file_name, content)
885
      msg = result[node_name].fail_msg
886
      if msg:
887
        logging.error("Failed to upload file %s to node %s: %s",
888
                      file_name, node_name, msg)
889

    
890
    self._nodes[node_name] = node.primary_ip
891

    
892
  @locking.ssynchronized(_LOCK)
893
  @_RequireOpenQueue
894
  def RemoveNode(self, node_name):
895
    """Callback called when removing nodes from the cluster.
896

897
    @type node_name: str
898
    @param node_name: the name of the node to remove
899

900
    """
901
    self._nodes.pop(node_name, None)
902

    
903
  @staticmethod
904
  def _CheckRpcResult(result, nodes, failmsg):
905
    """Verifies the status of an RPC call.
906

907
    Since we aim to keep consistency should this node (the current
908
    master) fail, we will log errors if our rpc fail, and especially
909
    log the case when more than half of the nodes fails.
910

911
    @param result: the data as returned from the rpc call
912
    @type nodes: list
913
    @param nodes: the list of nodes we made the call to
914
    @type failmsg: str
915
    @param failmsg: the identifier to be used for logging
916

917
    """
918
    failed = []
919
    success = []
920

    
921
    for node in nodes:
922
      msg = result[node].fail_msg
923
      if msg:
924
        failed.append(node)
925
        logging.error("RPC call %s (%s) failed on node %s: %s",
926
                      result[node].call, failmsg, node, msg)
927
      else:
928
        success.append(node)
929

    
930
    # +1 for the master node
931
    if (len(success) + 1) < len(failed):
932
      # TODO: Handle failing nodes
933
      logging.error("More than half of the nodes failed")
934

    
935
  def _GetNodeIp(self):
936
    """Helper for returning the node name/ip list.
937

938
    @rtype: (list, list)
939
    @return: a tuple of two lists, the first one with the node
940
        names and the second one with the node addresses
941

942
    """
943
    name_list = self._nodes.keys()
944
    addr_list = [self._nodes[name] for name in name_list]
945
    return name_list, addr_list
946

    
947
  def _UpdateJobQueueFile(self, file_name, data, replicate):
948
    """Writes a file locally and then replicates it to all nodes.
949

950
    This function will replace the contents of a file on the local
951
    node and then replicate it to all the other nodes we have.
952

953
    @type file_name: str
954
    @param file_name: the path of the file to be replicated
955
    @type data: str
956
    @param data: the new contents of the file
957
    @type replicate: boolean
958
    @param replicate: whether to spread the changes to the remote nodes
959

960
    """
961
    utils.WriteFile(file_name, data=data)
962

    
963
    if replicate:
964
      names, addrs = self._GetNodeIp()
965
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
966
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
967

    
968
  def _RenameFilesUnlocked(self, rename):
969
    """Renames a file locally and then replicate the change.
970

971
    This function will rename a file in the local queue directory
972
    and then replicate this rename to all the other nodes we have.
973

974
    @type rename: list of (old, new)
975
    @param rename: List containing tuples mapping old to new names
976

977
    """
978
    # Rename them locally
979
    for old, new in rename:
980
      utils.RenameFile(old, new, mkdir=True)
981

    
982
    # ... and on all nodes
983
    names, addrs = self._GetNodeIp()
984
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
985
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
986

    
987
  @staticmethod
988
  def _FormatJobID(job_id):
989
    """Convert a job ID to string format.
990

991
    Currently this just does C{str(job_id)} after performing some
992
    checks, but if we want to change the job id format this will
993
    abstract this change.
994

995
    @type job_id: int or long
996
    @param job_id: the numeric job id
997
    @rtype: str
998
    @return: the formatted job id
999

1000
    """
1001
    if not isinstance(job_id, (int, long)):
1002
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1003
    if job_id < 0:
1004
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1005

    
1006
    return str(job_id)
1007

    
1008
  @classmethod
1009
  def _GetArchiveDirectory(cls, job_id):
1010
    """Returns the archive directory for a job.
1011

1012
    @type job_id: str
1013
    @param job_id: Job identifier
1014
    @rtype: str
1015
    @return: Directory name
1016

1017
    """
1018
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1019

    
1020
  def _NewSerialsUnlocked(self, count):
1021
    """Generates a new job identifier.
1022

1023
    Job identifiers are unique during the lifetime of a cluster.
1024

1025
    @type count: integer
1026
    @param count: how many serials to return
1027
    @rtype: str
1028
    @return: a string representing the job identifier.
1029

1030
    """
1031
    assert count > 0
1032
    # New number
1033
    serial = self._last_serial + count
1034

    
1035
    # Write to file
1036
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1037
                             "%s\n" % serial, True)
1038

    
1039
    result = [self._FormatJobID(v)
1040
              for v in range(self._last_serial, serial + 1)]
1041
    # Keep it only if we were able to write the file
1042
    self._last_serial = serial
1043

    
1044
    return result
1045

    
1046
  @staticmethod
1047
  def _GetJobPath(job_id):
1048
    """Returns the job file for a given job id.
1049

1050
    @type job_id: str
1051
    @param job_id: the job identifier
1052
    @rtype: str
1053
    @return: the path to the job file
1054

1055
    """
1056
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1057

    
1058
  @classmethod
1059
  def _GetArchivedJobPath(cls, job_id):
1060
    """Returns the archived job file for a give job id.
1061

1062
    @type job_id: str
1063
    @param job_id: the job identifier
1064
    @rtype: str
1065
    @return: the path to the archived job file
1066

1067
    """
1068
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1069
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1070

    
1071
  def _GetJobIDsUnlocked(self, sort=True):
1072
    """Return all known job IDs.
1073

1074
    The method only looks at disk because it's a requirement that all
1075
    jobs are present on disk (so in the _memcache we don't have any
1076
    extra IDs).
1077

1078
    @type sort: boolean
1079
    @param sort: perform sorting on the returned job ids
1080
    @rtype: list
1081
    @return: the list of job IDs
1082

1083
    """
1084
    jlist = []
1085
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1086
      m = self._RE_JOB_FILE.match(filename)
1087
      if m:
1088
        jlist.append(m.group(1))
1089
    if sort:
1090
      jlist = utils.NiceSort(jlist)
1091
    return jlist
1092

    
1093
  def _LoadJobUnlocked(self, job_id):
1094
    """Loads a job from the disk or memory.
1095

1096
    Given a job id, this will return the cached job object if
1097
    existing, or try to load the job from the disk. If loading from
1098
    disk, it will also add the job to the cache.
1099

1100
    @param job_id: the job id
1101
    @rtype: L{_QueuedJob} or None
1102
    @return: either None or the job object
1103

1104
    """
1105
    job = self._memcache.get(job_id, None)
1106
    if job:
1107
      logging.debug("Found job %s in memcache", job_id)
1108
      return job
1109

    
1110
    try:
1111
      job = self._LoadJobFromDisk(job_id)
1112
    except errors.JobFileCorrupted:
1113
      old_path = self._GetJobPath(job_id)
1114
      new_path = self._GetArchivedJobPath(job_id)
1115
      if old_path == new_path:
1116
        # job already archived (future case)
1117
        logging.exception("Can't parse job %s", job_id)
1118
      else:
1119
        # non-archived case
1120
        logging.exception("Can't parse job %s, will archive.", job_id)
1121
        self._RenameFilesUnlocked([(old_path, new_path)])
1122
      return None
1123

    
1124
    self._memcache[job_id] = job
1125
    logging.debug("Added job %s to the cache", job_id)
1126
    return job
1127

    
1128
  def _LoadJobFromDisk(self, job_id):
1129
    """Load the given job file from disk.
1130

1131
    Given a job file, read, load and restore it in a _QueuedJob format.
1132

1133
    @type job_id: string
1134
    @param job_id: job identifier
1135
    @rtype: L{_QueuedJob} or None
1136
    @return: either None or the job object
1137

1138
    """
1139
    filepath = self._GetJobPath(job_id)
1140
    logging.debug("Loading job from %s", filepath)
1141
    try:
1142
      raw_data = utils.ReadFile(filepath)
1143
    except EnvironmentError, err:
1144
      if err.errno in (errno.ENOENT, ):
1145
        return None
1146
      raise
1147

    
1148
    try:
1149
      data = serializer.LoadJson(raw_data)
1150
      job = _QueuedJob.Restore(self, data)
1151
    except Exception, err: # pylint: disable-msg=W0703
1152
      raise errors.JobFileCorrupted(err)
1153

    
1154
    return job
1155

    
1156
  def SafeLoadJobFromDisk(self, job_id):
1157
    """Load the given job file from disk.
1158

1159
    Given a job file, read, load and restore it in a _QueuedJob format.
1160
    In case of error reading the job, it gets returned as None, and the
1161
    exception is logged.
1162

1163
    @type job_id: string
1164
    @param job_id: job identifier
1165
    @rtype: L{_QueuedJob} or None
1166
    @return: either None or the job object
1167

1168
    """
1169
    try:
1170
      return self._LoadJobFromDisk(job_id)
1171
    except (errors.JobFileCorrupted, EnvironmentError):
1172
      logging.exception("Can't load/parse job %s", job_id)
1173
      return None
1174

    
1175
  @staticmethod
1176
  def _IsQueueMarkedDrain():
1177
    """Check if the queue is marked from drain.
1178

1179
    This currently uses the queue drain file, which makes it a
1180
    per-node flag. In the future this can be moved to the config file.
1181

1182
    @rtype: boolean
1183
    @return: True of the job queue is marked for draining
1184

1185
    """
1186
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1187

    
1188
  def _UpdateQueueSizeUnlocked(self):
1189
    """Update the queue size.
1190

1191
    """
1192
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1193

    
1194
  @locking.ssynchronized(_LOCK)
1195
  @_RequireOpenQueue
1196
  def SetDrainFlag(self, drain_flag):
1197
    """Sets the drain flag for the queue.
1198

1199
    @type drain_flag: boolean
1200
    @param drain_flag: Whether to set or unset the drain flag
1201

1202
    """
1203
    if drain_flag:
1204
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1205
    else:
1206
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1207

    
1208
    self._drained = drain_flag
1209

    
1210
    return True
1211

    
1212
  @_RequireOpenQueue
1213
  def _SubmitJobUnlocked(self, job_id, ops):
1214
    """Create and store a new job.
1215

1216
    This enters the job into our job queue and also puts it on the new
1217
    queue, in order for it to be picked up by the queue processors.
1218

1219
    @type job_id: job ID
1220
    @param job_id: the job ID for the new job
1221
    @type ops: list
1222
    @param ops: The list of OpCodes that will become the new job.
1223
    @rtype: L{_QueuedJob}
1224
    @return: the job object to be queued
1225
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1226
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1227

1228
    """
1229
    # Ok when sharing the big job queue lock, as the drain file is created when
1230
    # the lock is exclusive.
1231
    if self._drained:
1232
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1233

    
1234
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1235
      raise errors.JobQueueFull()
1236

    
1237
    job = _QueuedJob(self, job_id, ops)
1238

    
1239
    # Write to disk
1240
    self.UpdateJobUnlocked(job)
1241

    
1242
    self._queue_size += 1
1243

    
1244
    logging.debug("Adding new job %s to the cache", job_id)
1245
    self._memcache[job_id] = job
1246

    
1247
    return job
1248

    
1249
  @locking.ssynchronized(_LOCK)
1250
  @_RequireOpenQueue
1251
  def SubmitJob(self, ops):
1252
    """Create and store a new job.
1253

1254
    @see: L{_SubmitJobUnlocked}
1255

1256
    """
1257
    job_id = self._NewSerialsUnlocked(1)[0]
1258
    self._wpool.AddTask(self._SubmitJobUnlocked(job_id, ops))
1259
    return job_id
1260

    
1261
  @locking.ssynchronized(_LOCK)
1262
  @_RequireOpenQueue
1263
  def SubmitManyJobs(self, jobs):
1264
    """Create and store multiple jobs.
1265

1266
    @see: L{_SubmitJobUnlocked}
1267

1268
    """
1269
    results = []
1270
    tasks = []
1271
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1272
    for job_id, ops in zip(all_job_ids, jobs):
1273
      try:
1274
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1275
        status = True
1276
        data = job_id
1277
      except errors.GenericError, err:
1278
        data = str(err)
1279
        status = False
1280
      results.append((status, data))
1281
    self._wpool.AddManyTasks(tasks)
1282

    
1283
    return results
1284

    
1285
  @_RequireOpenQueue
1286
  def UpdateJobUnlocked(self, job, replicate=True):
1287
    """Update a job's on disk storage.
1288

1289
    After a job has been modified, this function needs to be called in
1290
    order to write the changes to disk and replicate them to the other
1291
    nodes.
1292

1293
    @type job: L{_QueuedJob}
1294
    @param job: the changed job
1295
    @type replicate: boolean
1296
    @param replicate: whether to replicate the change to remote nodes
1297

1298
    """
1299
    filename = self._GetJobPath(job.id)
1300
    data = serializer.DumpJson(job.Serialize(), indent=False)
1301
    logging.debug("Writing job %s to %s", job.id, filename)
1302
    self._UpdateJobQueueFile(filename, data, replicate)
1303

    
1304
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1305
                        timeout):
1306
    """Waits for changes in a job.
1307

1308
    @type job_id: string
1309
    @param job_id: Job identifier
1310
    @type fields: list of strings
1311
    @param fields: Which fields to check for changes
1312
    @type prev_job_info: list or None
1313
    @param prev_job_info: Last job information returned
1314
    @type prev_log_serial: int
1315
    @param prev_log_serial: Last job message serial number
1316
    @type timeout: float
1317
    @param timeout: maximum time to wait
1318
    @rtype: tuple (job info, log entries)
1319
    @return: a tuple of the job information as required via
1320
        the fields parameter, and the log entries as a list
1321

1322
        if the job has not changed and the timeout has expired,
1323
        we instead return a special value,
1324
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1325
        as such by the clients
1326

1327
    """
1328
    helper = _WaitForJobChangesHelper(job_id, fields, prev_job_info,
1329
                                      prev_log_serial, self)
1330
    try:
1331
      return helper.WaitForChanges(timeout)
1332
    finally:
1333
      helper.Close()
1334

    
1335
  @locking.ssynchronized(_LOCK)
1336
  @_RequireOpenQueue
1337
  def CancelJob(self, job_id):
1338
    """Cancels a job.
1339

1340
    This will only succeed if the job has not started yet.
1341

1342
    @type job_id: string
1343
    @param job_id: job ID of job to be cancelled.
1344

1345
    """
1346
    logging.info("Cancelling job %s", job_id)
1347

    
1348
    job = self._LoadJobUnlocked(job_id)
1349
    if not job:
1350
      logging.debug("Job %s not found", job_id)
1351
      return (False, "Job %s not found" % job_id)
1352

    
1353
    job_status = job.CalcStatus()
1354

    
1355
    if job_status not in (constants.JOB_STATUS_QUEUED,
1356
                          constants.JOB_STATUS_WAITLOCK):
1357
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1358
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1359

    
1360
    if job_status == constants.JOB_STATUS_QUEUED:
1361
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1362
                            "Job canceled by request")
1363
      return (True, "Job %s canceled" % job.id)
1364

    
1365
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1366
      # The worker will notice the new status and cancel the job
1367
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1368
      return (True, "Job %s will be canceled" % job.id)
1369

    
1370
  @_RequireOpenQueue
1371
  def _ArchiveJobsUnlocked(self, jobs):
1372
    """Archives jobs.
1373

1374
    @type jobs: list of L{_QueuedJob}
1375
    @param jobs: Job objects
1376
    @rtype: int
1377
    @return: Number of archived jobs
1378

1379
    """
1380
    archive_jobs = []
1381
    rename_files = []
1382
    for job in jobs:
1383
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1384
                                  constants.JOB_STATUS_SUCCESS,
1385
                                  constants.JOB_STATUS_ERROR):
1386
        logging.debug("Job %s is not yet done", job.id)
1387
        continue
1388

    
1389
      archive_jobs.append(job)
1390

    
1391
      old = self._GetJobPath(job.id)
1392
      new = self._GetArchivedJobPath(job.id)
1393
      rename_files.append((old, new))
1394

    
1395
    # TODO: What if 1..n files fail to rename?
1396
    self._RenameFilesUnlocked(rename_files)
1397

    
1398
    logging.debug("Successfully archived job(s) %s",
1399
                  utils.CommaJoin(job.id for job in archive_jobs))
1400

    
1401
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1402
    # the files, we update the cached queue size from the filesystem. When we
1403
    # get around to fix the TODO: above, we can use the number of actually
1404
    # archived jobs to fix this.
1405
    self._UpdateQueueSizeUnlocked()
1406
    return len(archive_jobs)
1407

    
1408
  @locking.ssynchronized(_LOCK)
1409
  @_RequireOpenQueue
1410
  def ArchiveJob(self, job_id):
1411
    """Archives a job.
1412

1413
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1414

1415
    @type job_id: string
1416
    @param job_id: Job ID of job to be archived.
1417
    @rtype: bool
1418
    @return: Whether job was archived
1419

1420
    """
1421
    logging.info("Archiving job %s", job_id)
1422

    
1423
    job = self._LoadJobUnlocked(job_id)
1424
    if not job:
1425
      logging.debug("Job %s not found", job_id)
1426
      return False
1427

    
1428
    return self._ArchiveJobsUnlocked([job]) == 1
1429

    
1430
  @locking.ssynchronized(_LOCK)
1431
  @_RequireOpenQueue
1432
  def AutoArchiveJobs(self, age, timeout):
1433
    """Archives all jobs based on age.
1434

1435
    The method will archive all jobs which are older than the age
1436
    parameter. For jobs that don't have an end timestamp, the start
1437
    timestamp will be considered. The special '-1' age will cause
1438
    archival of all jobs (that are not running or queued).
1439

1440
    @type age: int
1441
    @param age: the minimum age in seconds
1442

1443
    """
1444
    logging.info("Archiving jobs with age more than %s seconds", age)
1445

    
1446
    now = time.time()
1447
    end_time = now + timeout
1448
    archived_count = 0
1449
    last_touched = 0
1450

    
1451
    all_job_ids = self._GetJobIDsUnlocked()
1452
    pending = []
1453
    for idx, job_id in enumerate(all_job_ids):
1454
      last_touched = idx + 1
1455

    
1456
      # Not optimal because jobs could be pending
1457
      # TODO: Measure average duration for job archival and take number of
1458
      # pending jobs into account.
1459
      if time.time() > end_time:
1460
        break
1461

    
1462
      # Returns None if the job failed to load
1463
      job = self._LoadJobUnlocked(job_id)
1464
      if job:
1465
        if job.end_timestamp is None:
1466
          if job.start_timestamp is None:
1467
            job_age = job.received_timestamp
1468
          else:
1469
            job_age = job.start_timestamp
1470
        else:
1471
          job_age = job.end_timestamp
1472

    
1473
        if age == -1 or now - job_age[0] > age:
1474
          pending.append(job)
1475

    
1476
          # Archive 10 jobs at a time
1477
          if len(pending) >= 10:
1478
            archived_count += self._ArchiveJobsUnlocked(pending)
1479
            pending = []
1480

    
1481
    if pending:
1482
      archived_count += self._ArchiveJobsUnlocked(pending)
1483

    
1484
    return (archived_count, len(all_job_ids) - last_touched)
1485

    
1486
  def QueryJobs(self, job_ids, fields):
1487
    """Returns a list of jobs in queue.
1488

1489
    @type job_ids: list
1490
    @param job_ids: sequence of job identifiers or None for all
1491
    @type fields: list
1492
    @param fields: names of fields to return
1493
    @rtype: list
1494
    @return: list one element per job, each element being list with
1495
        the requested fields
1496

1497
    """
1498
    jobs = []
1499
    list_all = False
1500
    if not job_ids:
1501
      # Since files are added to/removed from the queue atomically, there's no
1502
      # risk of getting the job ids in an inconsistent state.
1503
      job_ids = self._GetJobIDsUnlocked()
1504
      list_all = True
1505

    
1506
    for job_id in job_ids:
1507
      job = self.SafeLoadJobFromDisk(job_id)
1508
      if job is not None:
1509
        jobs.append(job.GetInfo(fields))
1510
      elif not list_all:
1511
        jobs.append(None)
1512

    
1513
    return jobs
1514

    
1515
  @locking.ssynchronized(_LOCK)
1516
  @_RequireOpenQueue
1517
  def Shutdown(self):
1518
    """Stops the job queue.
1519

1520
    This shutdowns all the worker threads an closes the queue.
1521

1522
    """
1523
    self._wpool.TerminateWorkers()
1524

    
1525
    self._queue_filelock.Close()
1526
    self._queue_filelock = None