Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 99bd4f0a

History | View | Annotate | Download (45.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import errno
35
import re
36
import time
37
import weakref
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56

    
57

    
58
JOBQUEUE_THREADS = 25
59
JOBS_PER_ARCHIVE_DIRECTORY = 10000
60

    
61
# The Big JobQueue lock. As for all B*Lock conversions, it must be acquired in
62
# shared mode to ensure exclusion with legacy code, which acquires it
63
# exclusively. It can not be acquired at all only after concurrency with all
64
# new and legacy code is ensured.
65
_big_jqueue_lock = locking.SharedLock()
66

    
67

    
68
class CancelJob(Exception):
69
  """Special exception to cancel a job.
70

71
  """
72

    
73

    
74
def TimeStampNow():
75
  """Returns the current timestamp.
76

77
  @rtype: tuple
78
  @return: the current time in the (seconds, microseconds) format
79

80
  """
81
  return utils.SplitTime(time.time())
82

    
83

    
84
class _QueuedOpCode(object):
85
  """Encapsulates an opcode object.
86

87
  @ivar log: holds the execution log and consists of tuples
88
  of the form C{(log_serial, timestamp, level, message)}
89
  @ivar input: the OpCode we encapsulate
90
  @ivar status: the current status
91
  @ivar result: the result of the LU execution
92
  @ivar start_timestamp: timestamp for the start of the execution
93
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94
  @ivar stop_timestamp: timestamp for the end of the execution
95

96
  """
97
  __slots__ = ["input", "status", "result", "log",
98
               "start_timestamp", "exec_timestamp", "end_timestamp",
99
               "__weakref__"]
100

    
101
  def __init__(self, op):
102
    """Constructor for the _QuededOpCode.
103

104
    @type op: L{opcodes.OpCode}
105
    @param op: the opcode we encapsulate
106

107
    """
108
    self.input = op
109
    self.status = constants.OP_STATUS_QUEUED
110
    self.result = None
111
    self.log = []
112
    self.start_timestamp = None
113
    self.exec_timestamp = None
114
    self.end_timestamp = None
115

    
116
  @classmethod
117
  def Restore(cls, state):
118
    """Restore the _QueuedOpCode from the serialized form.
119

120
    @type state: dict
121
    @param state: the serialized state
122
    @rtype: _QueuedOpCode
123
    @return: a new _QueuedOpCode instance
124

125
    """
126
    obj = _QueuedOpCode.__new__(cls)
127
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
128
    obj.status = state["status"]
129
    obj.result = state["result"]
130
    obj.log = state["log"]
131
    obj.start_timestamp = state.get("start_timestamp", None)
132
    obj.exec_timestamp = state.get("exec_timestamp", None)
133
    obj.end_timestamp = state.get("end_timestamp", None)
134
    return obj
135

    
136
  def Serialize(self):
137
    """Serializes this _QueuedOpCode.
138

139
    @rtype: dict
140
    @return: the dictionary holding the serialized state
141

142
    """
143
    return {
144
      "input": self.input.__getstate__(),
145
      "status": self.status,
146
      "result": self.result,
147
      "log": self.log,
148
      "start_timestamp": self.start_timestamp,
149
      "exec_timestamp": self.exec_timestamp,
150
      "end_timestamp": self.end_timestamp,
151
      }
152

    
153

    
154
class _QueuedJob(object):
155
  """In-memory job representation.
156

157
  This is what we use to track the user-submitted jobs. Locking must
158
  be taken care of by users of this class.
159

160
  @type queue: L{JobQueue}
161
  @ivar queue: the parent queue
162
  @ivar id: the job ID
163
  @type ops: list
164
  @ivar ops: the list of _QueuedOpCode that constitute the job
165
  @type log_serial: int
166
  @ivar log_serial: holds the index for the next log entry
167
  @ivar received_timestamp: the timestamp for when the job was received
168
  @ivar start_timestmap: the timestamp for start of execution
169
  @ivar end_timestamp: the timestamp for end of execution
170
  @ivar lock_status: In-memory locking information for debugging
171

172
  """
173
  # pylint: disable-msg=W0212
174
  __slots__ = ["queue", "id", "ops", "log_serial",
175
               "received_timestamp", "start_timestamp", "end_timestamp",
176
               "lock_status", "change",
177
               "__weakref__"]
178

    
179
  def __init__(self, queue, job_id, ops):
180
    """Constructor for the _QueuedJob.
181

182
    @type queue: L{JobQueue}
183
    @param queue: our parent queue
184
    @type job_id: job_id
185
    @param job_id: our job id
186
    @type ops: list
187
    @param ops: the list of opcodes we hold, which will be encapsulated
188
        in _QueuedOpCodes
189

190
    """
191
    if not ops:
192
      raise errors.GenericError("A job needs at least one opcode")
193

    
194
    self.queue = queue
195
    self.id = job_id
196
    self.ops = [_QueuedOpCode(op) for op in ops]
197
    self.log_serial = 0
198
    self.received_timestamp = TimeStampNow()
199
    self.start_timestamp = None
200
    self.end_timestamp = None
201

    
202
    # In-memory attributes
203
    self.lock_status = None
204

    
205
  def __repr__(self):
206
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
207
              "id=%s" % self.id,
208
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
209

    
210
    return "<%s at %#x>" % (" ".join(status), id(self))
211

    
212
  @classmethod
213
  def Restore(cls, queue, state):
214
    """Restore a _QueuedJob from serialized state:
215

216
    @type queue: L{JobQueue}
217
    @param queue: to which queue the restored job belongs
218
    @type state: dict
219
    @param state: the serialized state
220
    @rtype: _JobQueue
221
    @return: the restored _JobQueue instance
222

223
    """
224
    obj = _QueuedJob.__new__(cls)
225
    obj.queue = queue
226
    obj.id = state["id"]
227
    obj.received_timestamp = state.get("received_timestamp", None)
228
    obj.start_timestamp = state.get("start_timestamp", None)
229
    obj.end_timestamp = state.get("end_timestamp", None)
230

    
231
    # In-memory attributes
232
    obj.lock_status = None
233

    
234
    obj.ops = []
235
    obj.log_serial = 0
236
    for op_state in state["ops"]:
237
      op = _QueuedOpCode.Restore(op_state)
238
      for log_entry in op.log:
239
        obj.log_serial = max(obj.log_serial, log_entry[0])
240
      obj.ops.append(op)
241

    
242
    return obj
243

    
244
  def Serialize(self):
245
    """Serialize the _JobQueue instance.
246

247
    @rtype: dict
248
    @return: the serialized state
249

250
    """
251
    return {
252
      "id": self.id,
253
      "ops": [op.Serialize() for op in self.ops],
254
      "start_timestamp": self.start_timestamp,
255
      "end_timestamp": self.end_timestamp,
256
      "received_timestamp": self.received_timestamp,
257
      }
258

    
259
  def CalcStatus(self):
260
    """Compute the status of this job.
261

262
    This function iterates over all the _QueuedOpCodes in the job and
263
    based on their status, computes the job status.
264

265
    The algorithm is:
266
      - if we find a cancelled, or finished with error, the job
267
        status will be the same
268
      - otherwise, the last opcode with the status one of:
269
          - waitlock
270
          - canceling
271
          - running
272

273
        will determine the job status
274

275
      - otherwise, it means either all opcodes are queued, or success,
276
        and the job status will be the same
277

278
    @return: the job status
279

280
    """
281
    status = constants.JOB_STATUS_QUEUED
282

    
283
    all_success = True
284
    for op in self.ops:
285
      if op.status == constants.OP_STATUS_SUCCESS:
286
        continue
287

    
288
      all_success = False
289

    
290
      if op.status == constants.OP_STATUS_QUEUED:
291
        pass
292
      elif op.status == constants.OP_STATUS_WAITLOCK:
293
        status = constants.JOB_STATUS_WAITLOCK
294
      elif op.status == constants.OP_STATUS_RUNNING:
295
        status = constants.JOB_STATUS_RUNNING
296
      elif op.status == constants.OP_STATUS_CANCELING:
297
        status = constants.JOB_STATUS_CANCELING
298
        break
299
      elif op.status == constants.OP_STATUS_ERROR:
300
        status = constants.JOB_STATUS_ERROR
301
        # The whole job fails if one opcode failed
302
        break
303
      elif op.status == constants.OP_STATUS_CANCELED:
304
        status = constants.OP_STATUS_CANCELED
305
        break
306

    
307
    if all_success:
308
      status = constants.JOB_STATUS_SUCCESS
309

    
310
    return status
311

    
312
  def GetLogEntries(self, newer_than):
313
    """Selectively returns the log entries.
314

315
    @type newer_than: None or int
316
    @param newer_than: if this is None, return all log entries,
317
        otherwise return only the log entries with serial higher
318
        than this value
319
    @rtype: list
320
    @return: the list of the log entries selected
321

322
    """
323
    if newer_than is None:
324
      serial = -1
325
    else:
326
      serial = newer_than
327

    
328
    entries = []
329
    for op in self.ops:
330
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
331

    
332
    return entries
333

    
334
  def GetInfo(self, fields):
335
    """Returns information about a job.
336

337
    @type fields: list
338
    @param fields: names of fields to return
339
    @rtype: list
340
    @return: list with one element for each field
341
    @raise errors.OpExecError: when an invalid field
342
        has been passed
343

344
    """
345
    row = []
346
    for fname in fields:
347
      if fname == "id":
348
        row.append(self.id)
349
      elif fname == "status":
350
        row.append(self.CalcStatus())
351
      elif fname == "ops":
352
        row.append([op.input.__getstate__() for op in self.ops])
353
      elif fname == "opresult":
354
        row.append([op.result for op in self.ops])
355
      elif fname == "opstatus":
356
        row.append([op.status for op in self.ops])
357
      elif fname == "oplog":
358
        row.append([op.log for op in self.ops])
359
      elif fname == "opstart":
360
        row.append([op.start_timestamp for op in self.ops])
361
      elif fname == "opexec":
362
        row.append([op.exec_timestamp for op in self.ops])
363
      elif fname == "opend":
364
        row.append([op.end_timestamp for op in self.ops])
365
      elif fname == "received_ts":
366
        row.append(self.received_timestamp)
367
      elif fname == "start_ts":
368
        row.append(self.start_timestamp)
369
      elif fname == "end_ts":
370
        row.append(self.end_timestamp)
371
      elif fname == "lock_status":
372
        row.append(self.lock_status)
373
      elif fname == "summary":
374
        row.append([op.input.Summary() for op in self.ops])
375
      else:
376
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
377
    return row
378

    
379
  def MarkUnfinishedOps(self, status, result):
380
    """Mark unfinished opcodes with a given status and result.
381

382
    This is an utility function for marking all running or waiting to
383
    be run opcodes with a given status. Opcodes which are already
384
    finalised are not changed.
385

386
    @param status: a given opcode status
387
    @param result: the opcode result
388

389
    """
390
    try:
391
      not_marked = True
392
      for op in self.ops:
393
        if op.status in constants.OPS_FINALIZED:
394
          assert not_marked, "Finalized opcodes found after non-finalized ones"
395
          continue
396
        op.status = status
397
        op.result = result
398
        not_marked = False
399
    finally:
400
      self.queue.UpdateJobUnlocked(self)
401

    
402

    
403
class _OpExecCallbacks(mcpu.OpExecCbBase):
404
  def __init__(self, queue, job, op):
405
    """Initializes this class.
406

407
    @type queue: L{JobQueue}
408
    @param queue: Job queue
409
    @type job: L{_QueuedJob}
410
    @param job: Job object
411
    @type op: L{_QueuedOpCode}
412
    @param op: OpCode
413

414
    """
415
    assert queue, "Queue is missing"
416
    assert job, "Job is missing"
417
    assert op, "Opcode is missing"
418

    
419
    self._queue = queue
420
    self._job = job
421
    self._op = op
422

    
423
  def NotifyStart(self):
424
    """Mark the opcode as running, not lock-waiting.
425

426
    This is called from the mcpu code as a notifier function, when the LU is
427
    finally about to start the Exec() method. Of course, to have end-user
428
    visible results, the opcode must be initially (before calling into
429
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
430

431
    """
432
    self._queue.acquire()
433
    try:
434
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
435
                                 constants.OP_STATUS_CANCELING)
436

    
437
      # All locks are acquired by now
438
      self._job.lock_status = None
439

    
440
      # Cancel here if we were asked to
441
      if self._op.status == constants.OP_STATUS_CANCELING:
442
        raise CancelJob()
443

    
444
      self._op.status = constants.OP_STATUS_RUNNING
445
      self._op.exec_timestamp = TimeStampNow()
446
    finally:
447
      self._queue.release()
448

    
449
  def Feedback(self, *args):
450
    """Append a log entry.
451

452
    """
453
    assert len(args) < 3
454

    
455
    if len(args) == 1:
456
      log_type = constants.ELOG_MESSAGE
457
      log_msg = args[0]
458
    else:
459
      (log_type, log_msg) = args
460

    
461
    # The time is split to make serialization easier and not lose
462
    # precision.
463
    timestamp = utils.SplitTime(time.time())
464

    
465
    self._queue.acquire()
466
    try:
467
      self._job.log_serial += 1
468
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
469
      self._queue.UpdateJobUnlocked(self._job, replicate=False)
470
    finally:
471
      self._queue.release()
472

    
473
  def ReportLocks(self, msg):
474
    """Write locking information to the job.
475

476
    Called whenever the LU processor is waiting for a lock or has acquired one.
477

478
    """
479
    # Not getting the queue lock because this is a single assignment
480
    self._job.lock_status = msg
481

    
482

    
483
class _WaitForJobChangesHelper(object):
484
  """Helper class using initofy to wait for changes in a job file.
485

486
  This class takes a previous job status and serial, and alerts the client when
487
  the current job status has changed.
488

489
  @type job_id: string
490
  @ivar job_id: id of the job we're watching
491
  @type prev_job_info: string
492
  @ivar prev_job_info: previous job info, as passed by the luxi client
493
  @type prev_log_serial: string
494
  @ivar prev_log_serial: previous job serial, as passed by the luxi client
495
  @type queue: L{JobQueue}
496
  @ivar queue: job queue (used for a few utility functions)
497
  @type job_path: string
498
  @ivar job_path: absolute path of the job file
499
  @type wm: pyinotify.WatchManager (or None)
500
  @ivar wm: inotify watch manager to watch for changes
501
  @type inotify_handler: L{asyncnotifier.SingleFileEventHandler}
502
  @ivar inotify_handler: single file event handler, used for watching
503
  @type notifier: pyinotify.Notifier
504
  @ivar notifier: inotify single-threaded notifier, used for watching
505

506
  """
507
  def __init__(self, job_id, fields, prev_job_info, prev_log_serial, queue):
508
    self.job_id = job_id
509
    self.fields = fields
510
    self.prev_job_info = prev_job_info
511
    self.prev_log_serial = prev_log_serial
512
    self.queue = queue
513
    # pylint: disable-msg=W0212
514
    self.job_path = self.queue._GetJobPath(self.job_id)
515
    self.wm = None
516
    self.inotify_handler = None
517
    self.notifier = None
518

    
519
  def _SetupInotify(self):
520
    """Create the inotify
521

522
    @raises errors.InotifyError: if the notifier cannot be setup
523

524
    """
525
    if self.wm:
526
      return
527
    self.wm = pyinotify.WatchManager()
528
    self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm,
529
                                                                self.OnInotify,
530
                                                                self.job_path)
531
    self.notifier = pyinotify.Notifier(self.wm, self.inotify_handler)
532
    self.inotify_handler.enable()
533

    
534
  def _LoadDiskStatus(self):
535
    job = self.queue.SafeLoadJobFromDisk(self.job_id)
536
    if not job:
537
      raise errors.JobLost()
538
    self.job_status = job.CalcStatus()
539

    
540
    job_info = job.GetInfo(self.fields)
541
    log_entries = job.GetLogEntries(self.prev_log_serial)
542
    # Serializing and deserializing data can cause type changes (e.g. from
543
    # tuple to list) or precision loss. We're doing it here so that we get
544
    # the same modifications as the data received from the client. Without
545
    # this, the comparison afterwards might fail without the data being
546
    # significantly different.
547
    # TODO: we just deserialized from disk, investigate how to make sure that
548
    # the job info and log entries are compatible to avoid this further step.
549
    self.job_info = serializer.LoadJson(serializer.DumpJson(job_info))
550
    self.log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
551

    
552
  def _CheckForChanges(self):
553
    self._LoadDiskStatus()
554
    # Don't even try to wait if the job is no longer running, there will be
555
    # no changes.
556
    if (self.job_status not in (constants.JOB_STATUS_QUEUED,
557
                                constants.JOB_STATUS_RUNNING,
558
                                constants.JOB_STATUS_WAITLOCK) or
559
        self.prev_job_info != self.job_info or
560
        (self.log_entries and self.prev_log_serial != self.log_entries[0][0])):
561
      logging.debug("Job %s changed", self.job_id)
562
      return (self.job_info, self.log_entries)
563

    
564
    raise utils.RetryAgain()
565

    
566
  def OnInotify(self, notifier_enabled):
567
    if not notifier_enabled:
568
      self.inotify_handler.enable()
569

    
570
  def WaitFn(self, timeout):
571
    self._SetupInotify()
572
    if self.notifier.check_events(timeout*1000):
573
      self.notifier.read_events()
574
    self.notifier.process_events()
575

    
576
  def WaitForChanges(self, timeout):
577
    try:
578
      return utils.Retry(self._CheckForChanges,
579
                         utils.RETRY_REMAINING_TIME,
580
                         timeout,
581
                         wait_fn=self.WaitFn)
582
    except (errors.InotifyError, errors.JobLost):
583
      return None
584
    except utils.RetryTimeout:
585
      return constants.JOB_NOTCHANGED
586

    
587
  def Close(self):
588
    if self.wm:
589
      self.notifier.stop()
590

    
591

    
592
class _JobQueueWorker(workerpool.BaseWorker):
593
  """The actual job workers.
594

595
  """
596
  def RunTask(self, job): # pylint: disable-msg=W0221
597
    """Job executor.
598

599
    This functions processes a job. It is closely tied to the _QueuedJob and
600
    _QueuedOpCode classes.
601

602
    @type job: L{_QueuedJob}
603
    @param job: the job to be processed
604

605
    """
606
    logging.info("Processing job %s", job.id)
607
    proc = mcpu.Processor(self.pool.queue.context, job.id)
608
    queue = job.queue
609
    try:
610
      try:
611
        count = len(job.ops)
612
        for idx, op in enumerate(job.ops):
613
          op_summary = op.input.Summary()
614
          if op.status == constants.OP_STATUS_SUCCESS:
615
            # this is a job that was partially completed before master
616
            # daemon shutdown, so it can be expected that some opcodes
617
            # are already completed successfully (if any did error
618
            # out, then the whole job should have been aborted and not
619
            # resubmitted for processing)
620
            logging.info("Op %s/%s: opcode %s already processed, skipping",
621
                         idx + 1, count, op_summary)
622
            continue
623
          try:
624
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
625
                         op_summary)
626

    
627
            queue.acquire()
628
            try:
629
              if op.status == constants.OP_STATUS_CANCELED:
630
                raise CancelJob()
631
              assert op.status == constants.OP_STATUS_QUEUED
632
              op.status = constants.OP_STATUS_WAITLOCK
633
              op.result = None
634
              op.start_timestamp = TimeStampNow()
635
              if idx == 0: # first opcode
636
                job.start_timestamp = op.start_timestamp
637
              queue.UpdateJobUnlocked(job)
638

    
639
              input_opcode = op.input
640
            finally:
641
              queue.release()
642

    
643
            # Make sure not to hold queue lock while calling ExecOpCode
644
            result = proc.ExecOpCode(input_opcode,
645
                                     _OpExecCallbacks(queue, job, op))
646

    
647
            queue.acquire()
648
            try:
649
              op.status = constants.OP_STATUS_SUCCESS
650
              op.result = result
651
              op.end_timestamp = TimeStampNow()
652
              queue.UpdateJobUnlocked(job)
653
            finally:
654
              queue.release()
655

    
656
            logging.info("Op %s/%s: Successfully finished opcode %s",
657
                         idx + 1, count, op_summary)
658
          except CancelJob:
659
            # Will be handled further up
660
            raise
661
          except Exception, err:
662
            queue.acquire()
663
            try:
664
              try:
665
                op.status = constants.OP_STATUS_ERROR
666
                if isinstance(err, errors.GenericError):
667
                  op.result = errors.EncodeException(err)
668
                else:
669
                  op.result = str(err)
670
                op.end_timestamp = TimeStampNow()
671
                logging.info("Op %s/%s: Error in opcode %s: %s",
672
                             idx + 1, count, op_summary, err)
673
              finally:
674
                queue.UpdateJobUnlocked(job)
675
            finally:
676
              queue.release()
677
            raise
678

    
679
      except CancelJob:
680
        queue.acquire()
681
        try:
682
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
683
                                "Job canceled by request")
684
        finally:
685
          queue.release()
686
      except errors.GenericError, err:
687
        logging.exception("Ganeti exception")
688
      except:
689
        logging.exception("Unhandled exception")
690
    finally:
691
      queue.acquire()
692
      try:
693
        try:
694
          job.lock_status = None
695
          job.end_timestamp = TimeStampNow()
696
          queue.UpdateJobUnlocked(job)
697
        finally:
698
          job_id = job.id
699
          status = job.CalcStatus()
700
      finally:
701
        queue.release()
702

    
703
      logging.info("Finished job %s, status = %s", job_id, status)
704

    
705

    
706
class _JobQueueWorkerPool(workerpool.WorkerPool):
707
  """Simple class implementing a job-processing workerpool.
708

709
  """
710
  def __init__(self, queue):
711
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
712
                                              JOBQUEUE_THREADS,
713
                                              _JobQueueWorker)
714
    self.queue = queue
715

    
716

    
717
def _RequireOpenQueue(fn):
718
  """Decorator for "public" functions.
719

720
  This function should be used for all 'public' functions. That is,
721
  functions usually called from other classes. Note that this should
722
  be applied only to methods (not plain functions), since it expects
723
  that the decorated function is called with a first argument that has
724
  a '_queue_filelock' argument.
725

726
  @warning: Use this decorator only after locking.ssynchronized
727

728
  Example::
729
    @locking.ssynchronized(_big_jqueue_lock)
730
    @_RequireOpenQueue
731
    def Example(self):
732
      pass
733

734
  """
735
  def wrapper(self, *args, **kwargs):
736
    # pylint: disable-msg=W0212
737
    assert self._queue_filelock is not None, "Queue should be open"
738
    return fn(self, *args, **kwargs)
739
  return wrapper
740

    
741

    
742
class JobQueue(object):
743
  """Queue used to manage the jobs.
744

745
  @cvar _RE_JOB_FILE: regex matching the valid job file names
746

747
  """
748
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
749

    
750
  def __init__(self, context):
751
    """Constructor for JobQueue.
752

753
    The constructor will initialize the job queue object and then
754
    start loading the current jobs from disk, either for starting them
755
    (if they were queue) or for aborting them (if they were already
756
    running).
757

758
    @type context: GanetiContext
759
    @param context: the context object for access to the configuration
760
        data and other ganeti objects
761

762
    """
763
    self.context = context
764
    self._memcache = weakref.WeakValueDictionary()
765
    self._my_hostname = utils.HostInfo().name
766

    
767
    self.acquire = _big_jqueue_lock.acquire
768
    self.release = _big_jqueue_lock.release
769

    
770
    # Initialize the queue, and acquire the filelock.
771
    # This ensures no other process is working on the job queue.
772
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
773

    
774
    # Read serial file
775
    self._last_serial = jstore.ReadSerial()
776
    assert self._last_serial is not None, ("Serial file was modified between"
777
                                           " check in jstore and here")
778

    
779
    # Get initial list of nodes
780
    self._nodes = dict((n.name, n.primary_ip)
781
                       for n in self.context.cfg.GetAllNodesInfo().values()
782
                       if n.master_candidate)
783

    
784
    # Remove master node
785
    self._nodes.pop(self._my_hostname, None)
786

    
787
    # TODO: Check consistency across nodes
788

    
789
    self._queue_size = 0
790
    self._UpdateQueueSizeUnlocked()
791
    self._drained = self._IsQueueMarkedDrain()
792

    
793
    # Setup worker pool
794
    self._wpool = _JobQueueWorkerPool(self)
795
    try:
796
      # We need to lock here because WorkerPool.AddTask() may start a job while
797
      # we're still doing our work.
798
      self.acquire()
799
      try:
800
        logging.info("Inspecting job queue")
801

    
802
        all_job_ids = self._GetJobIDsUnlocked()
803
        jobs_count = len(all_job_ids)
804
        lastinfo = time.time()
805
        for idx, job_id in enumerate(all_job_ids):
806
          # Give an update every 1000 jobs or 10 seconds
807
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
808
              idx == (jobs_count - 1)):
809
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
810
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
811
            lastinfo = time.time()
812

    
813
          job = self._LoadJobUnlocked(job_id)
814

    
815
          # a failure in loading the job can cause 'None' to be returned
816
          if job is None:
817
            continue
818

    
819
          status = job.CalcStatus()
820

    
821
          if status in (constants.JOB_STATUS_QUEUED, ):
822
            self._wpool.AddTask(job)
823

    
824
          elif status in (constants.JOB_STATUS_RUNNING,
825
                          constants.JOB_STATUS_WAITLOCK,
826
                          constants.JOB_STATUS_CANCELING):
827
            logging.warning("Unfinished job %s found: %s", job.id, job)
828
            job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
829
                                  "Unclean master daemon shutdown")
830

    
831
        logging.info("Job queue inspection finished")
832
      finally:
833
        self.release()
834
    except:
835
      self._wpool.TerminateWorkers()
836
      raise
837

    
838
  @locking.ssynchronized(_big_jqueue_lock)
839
  @_RequireOpenQueue
840
  def AddNode(self, node):
841
    """Register a new node with the queue.
842

843
    @type node: L{objects.Node}
844
    @param node: the node object to be added
845

846
    """
847
    node_name = node.name
848
    assert node_name != self._my_hostname
849

    
850
    # Clean queue directory on added node
851
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
852
    msg = result.fail_msg
853
    if msg:
854
      logging.warning("Cannot cleanup queue directory on node %s: %s",
855
                      node_name, msg)
856

    
857
    if not node.master_candidate:
858
      # remove if existing, ignoring errors
859
      self._nodes.pop(node_name, None)
860
      # and skip the replication of the job ids
861
      return
862

    
863
    # Upload the whole queue excluding archived jobs
864
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
865

    
866
    # Upload current serial file
867
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
868

    
869
    for file_name in files:
870
      # Read file content
871
      content = utils.ReadFile(file_name)
872

    
873
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
874
                                                  [node.primary_ip],
875
                                                  file_name, content)
876
      msg = result[node_name].fail_msg
877
      if msg:
878
        logging.error("Failed to upload file %s to node %s: %s",
879
                      file_name, node_name, msg)
880

    
881
    self._nodes[node_name] = node.primary_ip
882

    
883
  @locking.ssynchronized(_big_jqueue_lock)
884
  @_RequireOpenQueue
885
  def RemoveNode(self, node_name):
886
    """Callback called when removing nodes from the cluster.
887

888
    @type node_name: str
889
    @param node_name: the name of the node to remove
890

891
    """
892
    self._nodes.pop(node_name, None)
893

    
894
  @staticmethod
895
  def _CheckRpcResult(result, nodes, failmsg):
896
    """Verifies the status of an RPC call.
897

898
    Since we aim to keep consistency should this node (the current
899
    master) fail, we will log errors if our rpc fail, and especially
900
    log the case when more than half of the nodes fails.
901

902
    @param result: the data as returned from the rpc call
903
    @type nodes: list
904
    @param nodes: the list of nodes we made the call to
905
    @type failmsg: str
906
    @param failmsg: the identifier to be used for logging
907

908
    """
909
    failed = []
910
    success = []
911

    
912
    for node in nodes:
913
      msg = result[node].fail_msg
914
      if msg:
915
        failed.append(node)
916
        logging.error("RPC call %s (%s) failed on node %s: %s",
917
                      result[node].call, failmsg, node, msg)
918
      else:
919
        success.append(node)
920

    
921
    # +1 for the master node
922
    if (len(success) + 1) < len(failed):
923
      # TODO: Handle failing nodes
924
      logging.error("More than half of the nodes failed")
925

    
926
  def _GetNodeIp(self):
927
    """Helper for returning the node name/ip list.
928

929
    @rtype: (list, list)
930
    @return: a tuple of two lists, the first one with the node
931
        names and the second one with the node addresses
932

933
    """
934
    name_list = self._nodes.keys()
935
    addr_list = [self._nodes[name] for name in name_list]
936
    return name_list, addr_list
937

    
938
  def _UpdateJobQueueFile(self, file_name, data, replicate):
939
    """Writes a file locally and then replicates it to all nodes.
940

941
    This function will replace the contents of a file on the local
942
    node and then replicate it to all the other nodes we have.
943

944
    @type file_name: str
945
    @param file_name: the path of the file to be replicated
946
    @type data: str
947
    @param data: the new contents of the file
948
    @type replicate: boolean
949
    @param replicate: whether to spread the changes to the remote nodes
950

951
    """
952
    utils.WriteFile(file_name, data=data)
953

    
954
    if replicate:
955
      names, addrs = self._GetNodeIp()
956
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
957
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
958

    
959
  def _RenameFilesUnlocked(self, rename):
960
    """Renames a file locally and then replicate the change.
961

962
    This function will rename a file in the local queue directory
963
    and then replicate this rename to all the other nodes we have.
964

965
    @type rename: list of (old, new)
966
    @param rename: List containing tuples mapping old to new names
967

968
    """
969
    # Rename them locally
970
    for old, new in rename:
971
      utils.RenameFile(old, new, mkdir=True)
972

    
973
    # ... and on all nodes
974
    names, addrs = self._GetNodeIp()
975
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
976
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
977

    
978
  @staticmethod
979
  def _FormatJobID(job_id):
980
    """Convert a job ID to string format.
981

982
    Currently this just does C{str(job_id)} after performing some
983
    checks, but if we want to change the job id format this will
984
    abstract this change.
985

986
    @type job_id: int or long
987
    @param job_id: the numeric job id
988
    @rtype: str
989
    @return: the formatted job id
990

991
    """
992
    if not isinstance(job_id, (int, long)):
993
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
994
    if job_id < 0:
995
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
996

    
997
    return str(job_id)
998

    
999
  @classmethod
1000
  def _GetArchiveDirectory(cls, job_id):
1001
    """Returns the archive directory for a job.
1002

1003
    @type job_id: str
1004
    @param job_id: Job identifier
1005
    @rtype: str
1006
    @return: Directory name
1007

1008
    """
1009
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1010

    
1011
  def _NewSerialsUnlocked(self, count):
1012
    """Generates a new job identifier.
1013

1014
    Job identifiers are unique during the lifetime of a cluster.
1015

1016
    @type count: integer
1017
    @param count: how many serials to return
1018
    @rtype: str
1019
    @return: a string representing the job identifier.
1020

1021
    """
1022
    assert count > 0
1023
    # New number
1024
    serial = self._last_serial + count
1025

    
1026
    # Write to file
1027
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1028
                             "%s\n" % serial, True)
1029

    
1030
    result = [self._FormatJobID(v)
1031
              for v in range(self._last_serial, serial + 1)]
1032
    # Keep it only if we were able to write the file
1033
    self._last_serial = serial
1034

    
1035
    return result
1036

    
1037
  @staticmethod
1038
  def _GetJobPath(job_id):
1039
    """Returns the job file for a given job id.
1040

1041
    @type job_id: str
1042
    @param job_id: the job identifier
1043
    @rtype: str
1044
    @return: the path to the job file
1045

1046
    """
1047
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1048

    
1049
  @classmethod
1050
  def _GetArchivedJobPath(cls, job_id):
1051
    """Returns the archived job file for a give job id.
1052

1053
    @type job_id: str
1054
    @param job_id: the job identifier
1055
    @rtype: str
1056
    @return: the path to the archived job file
1057

1058
    """
1059
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1060
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1061

    
1062
  def _GetJobIDsUnlocked(self, sort=True):
1063
    """Return all known job IDs.
1064

1065
    The method only looks at disk because it's a requirement that all
1066
    jobs are present on disk (so in the _memcache we don't have any
1067
    extra IDs).
1068

1069
    @type sort: boolean
1070
    @param sort: perform sorting on the returned job ids
1071
    @rtype: list
1072
    @return: the list of job IDs
1073

1074
    """
1075
    jlist = []
1076
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1077
      m = self._RE_JOB_FILE.match(filename)
1078
      if m:
1079
        jlist.append(m.group(1))
1080
    if sort:
1081
      jlist = utils.NiceSort(jlist)
1082
    return jlist
1083

    
1084
  def _LoadJobUnlocked(self, job_id):
1085
    """Loads a job from the disk or memory.
1086

1087
    Given a job id, this will return the cached job object if
1088
    existing, or try to load the job from the disk. If loading from
1089
    disk, it will also add the job to the cache.
1090

1091
    @param job_id: the job id
1092
    @rtype: L{_QueuedJob} or None
1093
    @return: either None or the job object
1094

1095
    """
1096
    job = self._memcache.get(job_id, None)
1097
    if job:
1098
      logging.debug("Found job %s in memcache", job_id)
1099
      return job
1100

    
1101
    try:
1102
      job = self._LoadJobFromDisk(job_id)
1103
    except errors.JobFileCorrupted:
1104
      old_path = self._GetJobPath(job_id)
1105
      new_path = self._GetArchivedJobPath(job_id)
1106
      if old_path == new_path:
1107
        # job already archived (future case)
1108
        logging.exception("Can't parse job %s", job_id)
1109
      else:
1110
        # non-archived case
1111
        logging.exception("Can't parse job %s, will archive.", job_id)
1112
        self._RenameFilesUnlocked([(old_path, new_path)])
1113
      return None
1114

    
1115
    self._memcache[job_id] = job
1116
    logging.debug("Added job %s to the cache", job_id)
1117
    return job
1118

    
1119
  def _LoadJobFromDisk(self, job_id):
1120
    """Load the given job file from disk.
1121

1122
    Given a job file, read, load and restore it in a _QueuedJob format.
1123

1124
    @type job_id: string
1125
    @param job_id: job identifier
1126
    @rtype: L{_QueuedJob} or None
1127
    @return: either None or the job object
1128

1129
    """
1130
    filepath = self._GetJobPath(job_id)
1131
    logging.debug("Loading job from %s", filepath)
1132
    try:
1133
      raw_data = utils.ReadFile(filepath)
1134
    except EnvironmentError, err:
1135
      if err.errno in (errno.ENOENT, ):
1136
        return None
1137
      raise
1138

    
1139
    try:
1140
      data = serializer.LoadJson(raw_data)
1141
      job = _QueuedJob.Restore(self, data)
1142
    except Exception, err: # pylint: disable-msg=W0703
1143
      raise errors.JobFileCorrupted(err)
1144

    
1145
    return job
1146

    
1147
  def SafeLoadJobFromDisk(self, job_id):
1148
    """Load the given job file from disk.
1149

1150
    Given a job file, read, load and restore it in a _QueuedJob format.
1151
    In case of error reading the job, it gets returned as None, and the
1152
    exception is logged.
1153

1154
    @type job_id: string
1155
    @param job_id: job identifier
1156
    @rtype: L{_QueuedJob} or None
1157
    @return: either None or the job object
1158

1159
    """
1160
    try:
1161
      return self._LoadJobFromDisk(job_id)
1162
    except (errors.JobFileCorrupted, EnvironmentError):
1163
      logging.exception("Can't load/parse job %s", job_id)
1164
      return None
1165

    
1166
  @staticmethod
1167
  def _IsQueueMarkedDrain():
1168
    """Check if the queue is marked from drain.
1169

1170
    This currently uses the queue drain file, which makes it a
1171
    per-node flag. In the future this can be moved to the config file.
1172

1173
    @rtype: boolean
1174
    @return: True of the job queue is marked for draining
1175

1176
    """
1177
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1178

    
1179
  def _UpdateQueueSizeUnlocked(self):
1180
    """Update the queue size.
1181

1182
    """
1183
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1184

    
1185
  @locking.ssynchronized(_big_jqueue_lock)
1186
  @_RequireOpenQueue
1187
  def SetDrainFlag(self, drain_flag):
1188
    """Sets the drain flag for the queue.
1189

1190
    @type drain_flag: boolean
1191
    @param drain_flag: Whether to set or unset the drain flag
1192

1193
    """
1194
    if drain_flag:
1195
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1196
    else:
1197
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1198

    
1199
    self._drained = drain_flag
1200

    
1201
    return True
1202

    
1203
  @_RequireOpenQueue
1204
  def _SubmitJobUnlocked(self, job_id, ops):
1205
    """Create and store a new job.
1206

1207
    This enters the job into our job queue and also puts it on the new
1208
    queue, in order for it to be picked up by the queue processors.
1209

1210
    @type job_id: job ID
1211
    @param job_id: the job ID for the new job
1212
    @type ops: list
1213
    @param ops: The list of OpCodes that will become the new job.
1214
    @rtype: L{_QueuedJob}
1215
    @return: the job object to be queued
1216
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1217
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1218

1219
    """
1220
    # Ok when sharing the big job queue lock, as the drain file is created when
1221
    # the lock is exclusive.
1222
    if self._drained:
1223
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1224

    
1225
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1226
      raise errors.JobQueueFull()
1227

    
1228
    job = _QueuedJob(self, job_id, ops)
1229

    
1230
    # Write to disk
1231
    self.UpdateJobUnlocked(job)
1232

    
1233
    self._queue_size += 1
1234

    
1235
    logging.debug("Adding new job %s to the cache", job_id)
1236
    self._memcache[job_id] = job
1237

    
1238
    return job
1239

    
1240
  @locking.ssynchronized(_big_jqueue_lock)
1241
  @_RequireOpenQueue
1242
  def SubmitJob(self, ops):
1243
    """Create and store a new job.
1244

1245
    @see: L{_SubmitJobUnlocked}
1246

1247
    """
1248
    job_id = self._NewSerialsUnlocked(1)[0]
1249
    self._wpool.AddTask(self._SubmitJobUnlocked(job_id, ops))
1250
    return job_id
1251

    
1252
  @locking.ssynchronized(_big_jqueue_lock)
1253
  @_RequireOpenQueue
1254
  def SubmitManyJobs(self, jobs):
1255
    """Create and store multiple jobs.
1256

1257
    @see: L{_SubmitJobUnlocked}
1258

1259
    """
1260
    results = []
1261
    tasks = []
1262
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1263
    for job_id, ops in zip(all_job_ids, jobs):
1264
      try:
1265
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1266
        status = True
1267
        data = job_id
1268
      except errors.GenericError, err:
1269
        data = str(err)
1270
        status = False
1271
      results.append((status, data))
1272
    self._wpool.AddManyTasks(tasks)
1273

    
1274
    return results
1275

    
1276
  @_RequireOpenQueue
1277
  def UpdateJobUnlocked(self, job, replicate=True):
1278
    """Update a job's on disk storage.
1279

1280
    After a job has been modified, this function needs to be called in
1281
    order to write the changes to disk and replicate them to the other
1282
    nodes.
1283

1284
    @type job: L{_QueuedJob}
1285
    @param job: the changed job
1286
    @type replicate: boolean
1287
    @param replicate: whether to replicate the change to remote nodes
1288

1289
    """
1290
    filename = self._GetJobPath(job.id)
1291
    data = serializer.DumpJson(job.Serialize(), indent=False)
1292
    logging.debug("Writing job %s to %s", job.id, filename)
1293
    self._UpdateJobQueueFile(filename, data, replicate)
1294

    
1295
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1296
                        timeout):
1297
    """Waits for changes in a job.
1298

1299
    @type job_id: string
1300
    @param job_id: Job identifier
1301
    @type fields: list of strings
1302
    @param fields: Which fields to check for changes
1303
    @type prev_job_info: list or None
1304
    @param prev_job_info: Last job information returned
1305
    @type prev_log_serial: int
1306
    @param prev_log_serial: Last job message serial number
1307
    @type timeout: float
1308
    @param timeout: maximum time to wait
1309
    @rtype: tuple (job info, log entries)
1310
    @return: a tuple of the job information as required via
1311
        the fields parameter, and the log entries as a list
1312

1313
        if the job has not changed and the timeout has expired,
1314
        we instead return a special value,
1315
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1316
        as such by the clients
1317

1318
    """
1319
    helper = _WaitForJobChangesHelper(job_id, fields, prev_job_info,
1320
                                      prev_log_serial, self)
1321
    try:
1322
      return helper.WaitForChanges(timeout)
1323
    finally:
1324
      helper.Close()
1325

    
1326
  @locking.ssynchronized(_big_jqueue_lock)
1327
  @_RequireOpenQueue
1328
  def CancelJob(self, job_id):
1329
    """Cancels a job.
1330

1331
    This will only succeed if the job has not started yet.
1332

1333
    @type job_id: string
1334
    @param job_id: job ID of job to be cancelled.
1335

1336
    """
1337
    logging.info("Cancelling job %s", job_id)
1338

    
1339
    job = self._LoadJobUnlocked(job_id)
1340
    if not job:
1341
      logging.debug("Job %s not found", job_id)
1342
      return (False, "Job %s not found" % job_id)
1343

    
1344
    job_status = job.CalcStatus()
1345

    
1346
    if job_status not in (constants.JOB_STATUS_QUEUED,
1347
                          constants.JOB_STATUS_WAITLOCK):
1348
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1349
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1350

    
1351
    if job_status == constants.JOB_STATUS_QUEUED:
1352
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1353
                            "Job canceled by request")
1354
      return (True, "Job %s canceled" % job.id)
1355

    
1356
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1357
      # The worker will notice the new status and cancel the job
1358
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1359
      return (True, "Job %s will be canceled" % job.id)
1360

    
1361
  @_RequireOpenQueue
1362
  def _ArchiveJobsUnlocked(self, jobs):
1363
    """Archives jobs.
1364

1365
    @type jobs: list of L{_QueuedJob}
1366
    @param jobs: Job objects
1367
    @rtype: int
1368
    @return: Number of archived jobs
1369

1370
    """
1371
    archive_jobs = []
1372
    rename_files = []
1373
    for job in jobs:
1374
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1375
                                  constants.JOB_STATUS_SUCCESS,
1376
                                  constants.JOB_STATUS_ERROR):
1377
        logging.debug("Job %s is not yet done", job.id)
1378
        continue
1379

    
1380
      archive_jobs.append(job)
1381

    
1382
      old = self._GetJobPath(job.id)
1383
      new = self._GetArchivedJobPath(job.id)
1384
      rename_files.append((old, new))
1385

    
1386
    # TODO: What if 1..n files fail to rename?
1387
    self._RenameFilesUnlocked(rename_files)
1388

    
1389
    logging.debug("Successfully archived job(s) %s",
1390
                  utils.CommaJoin(job.id for job in archive_jobs))
1391

    
1392
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1393
    # the files, we update the cached queue size from the filesystem. When we
1394
    # get around to fix the TODO: above, we can use the number of actually
1395
    # archived jobs to fix this.
1396
    self._UpdateQueueSizeUnlocked()
1397
    return len(archive_jobs)
1398

    
1399
  @locking.ssynchronized(_big_jqueue_lock)
1400
  @_RequireOpenQueue
1401
  def ArchiveJob(self, job_id):
1402
    """Archives a job.
1403

1404
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1405

1406
    @type job_id: string
1407
    @param job_id: Job ID of job to be archived.
1408
    @rtype: bool
1409
    @return: Whether job was archived
1410

1411
    """
1412
    logging.info("Archiving job %s", job_id)
1413

    
1414
    job = self._LoadJobUnlocked(job_id)
1415
    if not job:
1416
      logging.debug("Job %s not found", job_id)
1417
      return False
1418

    
1419
    return self._ArchiveJobsUnlocked([job]) == 1
1420

    
1421
  @locking.ssynchronized(_big_jqueue_lock)
1422
  @_RequireOpenQueue
1423
  def AutoArchiveJobs(self, age, timeout):
1424
    """Archives all jobs based on age.
1425

1426
    The method will archive all jobs which are older than the age
1427
    parameter. For jobs that don't have an end timestamp, the start
1428
    timestamp will be considered. The special '-1' age will cause
1429
    archival of all jobs (that are not running or queued).
1430

1431
    @type age: int
1432
    @param age: the minimum age in seconds
1433

1434
    """
1435
    logging.info("Archiving jobs with age more than %s seconds", age)
1436

    
1437
    now = time.time()
1438
    end_time = now + timeout
1439
    archived_count = 0
1440
    last_touched = 0
1441

    
1442
    all_job_ids = self._GetJobIDsUnlocked()
1443
    pending = []
1444
    for idx, job_id in enumerate(all_job_ids):
1445
      last_touched = idx + 1
1446

    
1447
      # Not optimal because jobs could be pending
1448
      # TODO: Measure average duration for job archival and take number of
1449
      # pending jobs into account.
1450
      if time.time() > end_time:
1451
        break
1452

    
1453
      # Returns None if the job failed to load
1454
      job = self._LoadJobUnlocked(job_id)
1455
      if job:
1456
        if job.end_timestamp is None:
1457
          if job.start_timestamp is None:
1458
            job_age = job.received_timestamp
1459
          else:
1460
            job_age = job.start_timestamp
1461
        else:
1462
          job_age = job.end_timestamp
1463

    
1464
        if age == -1 or now - job_age[0] > age:
1465
          pending.append(job)
1466

    
1467
          # Archive 10 jobs at a time
1468
          if len(pending) >= 10:
1469
            archived_count += self._ArchiveJobsUnlocked(pending)
1470
            pending = []
1471

    
1472
    if pending:
1473
      archived_count += self._ArchiveJobsUnlocked(pending)
1474

    
1475
    return (archived_count, len(all_job_ids) - last_touched)
1476

    
1477
  def QueryJobs(self, job_ids, fields):
1478
    """Returns a list of jobs in queue.
1479

1480
    @type job_ids: list
1481
    @param job_ids: sequence of job identifiers or None for all
1482
    @type fields: list
1483
    @param fields: names of fields to return
1484
    @rtype: list
1485
    @return: list one element per job, each element being list with
1486
        the requested fields
1487

1488
    """
1489
    jobs = []
1490
    list_all = False
1491
    if not job_ids:
1492
      # Since files are added to/removed from the queue atomically, there's no
1493
      # risk of getting the job ids in an inconsistent state.
1494
      job_ids = self._GetJobIDsUnlocked()
1495
      list_all = True
1496

    
1497
    for job_id in job_ids:
1498
      job = self.SafeLoadJobFromDisk(job_id)
1499
      if job is not None:
1500
        jobs.append(job.GetInfo(fields))
1501
      elif not list_all:
1502
        jobs.append(None)
1503

    
1504
    return jobs
1505

    
1506
  @locking.ssynchronized(_big_jqueue_lock)
1507
  @_RequireOpenQueue
1508
  def Shutdown(self):
1509
    """Stops the job queue.
1510

1511
    This shutdowns all the worker threads an closes the queue.
1512

1513
    """
1514
    self._wpool.TerminateWorkers()
1515

    
1516
    self._queue_filelock.Close()
1517
    self._queue_filelock = None