Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 6760e4ed

History | View | Annotate | Download (49 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import errno
35
import re
36
import time
37
import weakref
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import netutils
57
from ganeti import compat
58

    
59

    
60
JOBQUEUE_THREADS = 25
61
JOBS_PER_ARCHIVE_DIRECTORY = 10000
62

    
63
# member lock names to be passed to @ssynchronized decorator
64
_LOCK = "_lock"
65
_QUEUE = "_queue"
66

    
67

    
68
class CancelJob(Exception):
69
  """Special exception to cancel a job.
70

71
  """
72

    
73

    
74
def TimeStampNow():
75
  """Returns the current timestamp.
76

77
  @rtype: tuple
78
  @return: the current time in the (seconds, microseconds) format
79

80
  """
81
  return utils.SplitTime(time.time())
82

    
83

    
84
class _QueuedOpCode(object):
85
  """Encapsulates an opcode object.
86

87
  @ivar log: holds the execution log and consists of tuples
88
  of the form C{(log_serial, timestamp, level, message)}
89
  @ivar input: the OpCode we encapsulate
90
  @ivar status: the current status
91
  @ivar result: the result of the LU execution
92
  @ivar start_timestamp: timestamp for the start of the execution
93
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94
  @ivar stop_timestamp: timestamp for the end of the execution
95

96
  """
97
  __slots__ = ["input", "status", "result", "log",
98
               "start_timestamp", "exec_timestamp", "end_timestamp",
99
               "__weakref__"]
100

    
101
  def __init__(self, op):
102
    """Constructor for the _QuededOpCode.
103

104
    @type op: L{opcodes.OpCode}
105
    @param op: the opcode we encapsulate
106

107
    """
108
    self.input = op
109
    self.status = constants.OP_STATUS_QUEUED
110
    self.result = None
111
    self.log = []
112
    self.start_timestamp = None
113
    self.exec_timestamp = None
114
    self.end_timestamp = None
115

    
116
  @classmethod
117
  def Restore(cls, state):
118
    """Restore the _QueuedOpCode from the serialized form.
119

120
    @type state: dict
121
    @param state: the serialized state
122
    @rtype: _QueuedOpCode
123
    @return: a new _QueuedOpCode instance
124

125
    """
126
    obj = _QueuedOpCode.__new__(cls)
127
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
128
    obj.status = state["status"]
129
    obj.result = state["result"]
130
    obj.log = state["log"]
131
    obj.start_timestamp = state.get("start_timestamp", None)
132
    obj.exec_timestamp = state.get("exec_timestamp", None)
133
    obj.end_timestamp = state.get("end_timestamp", None)
134
    return obj
135

    
136
  def Serialize(self):
137
    """Serializes this _QueuedOpCode.
138

139
    @rtype: dict
140
    @return: the dictionary holding the serialized state
141

142
    """
143
    return {
144
      "input": self.input.__getstate__(),
145
      "status": self.status,
146
      "result": self.result,
147
      "log": self.log,
148
      "start_timestamp": self.start_timestamp,
149
      "exec_timestamp": self.exec_timestamp,
150
      "end_timestamp": self.end_timestamp,
151
      }
152

    
153

    
154
class _QueuedJob(object):
155
  """In-memory job representation.
156

157
  This is what we use to track the user-submitted jobs. Locking must
158
  be taken care of by users of this class.
159

160
  @type queue: L{JobQueue}
161
  @ivar queue: the parent queue
162
  @ivar id: the job ID
163
  @type ops: list
164
  @ivar ops: the list of _QueuedOpCode that constitute the job
165
  @type log_serial: int
166
  @ivar log_serial: holds the index for the next log entry
167
  @ivar received_timestamp: the timestamp for when the job was received
168
  @ivar start_timestmap: the timestamp for start of execution
169
  @ivar end_timestamp: the timestamp for end of execution
170

171
  """
172
  # pylint: disable-msg=W0212
173
  __slots__ = ["queue", "id", "ops", "log_serial",
174
               "received_timestamp", "start_timestamp", "end_timestamp",
175
               "__weakref__"]
176

    
177
  def __init__(self, queue, job_id, ops):
178
    """Constructor for the _QueuedJob.
179

180
    @type queue: L{JobQueue}
181
    @param queue: our parent queue
182
    @type job_id: job_id
183
    @param job_id: our job id
184
    @type ops: list
185
    @param ops: the list of opcodes we hold, which will be encapsulated
186
        in _QueuedOpCodes
187

188
    """
189
    if not ops:
190
      raise errors.GenericError("A job needs at least one opcode")
191

    
192
    self.queue = queue
193
    self.id = job_id
194
    self.ops = [_QueuedOpCode(op) for op in ops]
195
    self.log_serial = 0
196
    self.received_timestamp = TimeStampNow()
197
    self.start_timestamp = None
198
    self.end_timestamp = None
199

    
200
  def __repr__(self):
201
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
202
              "id=%s" % self.id,
203
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
204

    
205
    return "<%s at %#x>" % (" ".join(status), id(self))
206

    
207
  @classmethod
208
  def Restore(cls, queue, state):
209
    """Restore a _QueuedJob from serialized state:
210

211
    @type queue: L{JobQueue}
212
    @param queue: to which queue the restored job belongs
213
    @type state: dict
214
    @param state: the serialized state
215
    @rtype: _JobQueue
216
    @return: the restored _JobQueue instance
217

218
    """
219
    obj = _QueuedJob.__new__(cls)
220
    obj.queue = queue
221
    obj.id = state["id"]
222
    obj.received_timestamp = state.get("received_timestamp", None)
223
    obj.start_timestamp = state.get("start_timestamp", None)
224
    obj.end_timestamp = state.get("end_timestamp", None)
225

    
226
    obj.ops = []
227
    obj.log_serial = 0
228
    for op_state in state["ops"]:
229
      op = _QueuedOpCode.Restore(op_state)
230
      for log_entry in op.log:
231
        obj.log_serial = max(obj.log_serial, log_entry[0])
232
      obj.ops.append(op)
233

    
234
    return obj
235

    
236
  def Serialize(self):
237
    """Serialize the _JobQueue instance.
238

239
    @rtype: dict
240
    @return: the serialized state
241

242
    """
243
    return {
244
      "id": self.id,
245
      "ops": [op.Serialize() for op in self.ops],
246
      "start_timestamp": self.start_timestamp,
247
      "end_timestamp": self.end_timestamp,
248
      "received_timestamp": self.received_timestamp,
249
      }
250

    
251
  def CalcStatus(self):
252
    """Compute the status of this job.
253

254
    This function iterates over all the _QueuedOpCodes in the job and
255
    based on their status, computes the job status.
256

257
    The algorithm is:
258
      - if we find a cancelled, or finished with error, the job
259
        status will be the same
260
      - otherwise, the last opcode with the status one of:
261
          - waitlock
262
          - canceling
263
          - running
264

265
        will determine the job status
266

267
      - otherwise, it means either all opcodes are queued, or success,
268
        and the job status will be the same
269

270
    @return: the job status
271

272
    """
273
    status = constants.JOB_STATUS_QUEUED
274

    
275
    all_success = True
276
    for op in self.ops:
277
      if op.status == constants.OP_STATUS_SUCCESS:
278
        continue
279

    
280
      all_success = False
281

    
282
      if op.status == constants.OP_STATUS_QUEUED:
283
        pass
284
      elif op.status == constants.OP_STATUS_WAITLOCK:
285
        status = constants.JOB_STATUS_WAITLOCK
286
      elif op.status == constants.OP_STATUS_RUNNING:
287
        status = constants.JOB_STATUS_RUNNING
288
      elif op.status == constants.OP_STATUS_CANCELING:
289
        status = constants.JOB_STATUS_CANCELING
290
        break
291
      elif op.status == constants.OP_STATUS_ERROR:
292
        status = constants.JOB_STATUS_ERROR
293
        # The whole job fails if one opcode failed
294
        break
295
      elif op.status == constants.OP_STATUS_CANCELED:
296
        status = constants.OP_STATUS_CANCELED
297
        break
298

    
299
    if all_success:
300
      status = constants.JOB_STATUS_SUCCESS
301

    
302
    return status
303

    
304
  def GetLogEntries(self, newer_than):
305
    """Selectively returns the log entries.
306

307
    @type newer_than: None or int
308
    @param newer_than: if this is None, return all log entries,
309
        otherwise return only the log entries with serial higher
310
        than this value
311
    @rtype: list
312
    @return: the list of the log entries selected
313

314
    """
315
    if newer_than is None:
316
      serial = -1
317
    else:
318
      serial = newer_than
319

    
320
    entries = []
321
    for op in self.ops:
322
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
323

    
324
    return entries
325

    
326
  def GetInfo(self, fields):
327
    """Returns information about a job.
328

329
    @type fields: list
330
    @param fields: names of fields to return
331
    @rtype: list
332
    @return: list with one element for each field
333
    @raise errors.OpExecError: when an invalid field
334
        has been passed
335

336
    """
337
    row = []
338
    for fname in fields:
339
      if fname == "id":
340
        row.append(self.id)
341
      elif fname == "status":
342
        row.append(self.CalcStatus())
343
      elif fname == "ops":
344
        row.append([op.input.__getstate__() for op in self.ops])
345
      elif fname == "opresult":
346
        row.append([op.result for op in self.ops])
347
      elif fname == "opstatus":
348
        row.append([op.status for op in self.ops])
349
      elif fname == "oplog":
350
        row.append([op.log for op in self.ops])
351
      elif fname == "opstart":
352
        row.append([op.start_timestamp for op in self.ops])
353
      elif fname == "opexec":
354
        row.append([op.exec_timestamp for op in self.ops])
355
      elif fname == "opend":
356
        row.append([op.end_timestamp for op in self.ops])
357
      elif fname == "received_ts":
358
        row.append(self.received_timestamp)
359
      elif fname == "start_ts":
360
        row.append(self.start_timestamp)
361
      elif fname == "end_ts":
362
        row.append(self.end_timestamp)
363
      elif fname == "summary":
364
        row.append([op.input.Summary() for op in self.ops])
365
      else:
366
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
367
    return row
368

    
369
  def MarkUnfinishedOps(self, status, result):
370
    """Mark unfinished opcodes with a given status and result.
371

372
    This is an utility function for marking all running or waiting to
373
    be run opcodes with a given status. Opcodes which are already
374
    finalised are not changed.
375

376
    @param status: a given opcode status
377
    @param result: the opcode result
378

379
    """
380
    try:
381
      not_marked = True
382
      for op in self.ops:
383
        if op.status in constants.OPS_FINALIZED:
384
          assert not_marked, "Finalized opcodes found after non-finalized ones"
385
          continue
386
        op.status = status
387
        op.result = result
388
        not_marked = False
389
    finally:
390
      self.queue.UpdateJobUnlocked(self)
391

    
392

    
393
class _OpExecCallbacks(mcpu.OpExecCbBase):
394
  def __init__(self, queue, job, op):
395
    """Initializes this class.
396

397
    @type queue: L{JobQueue}
398
    @param queue: Job queue
399
    @type job: L{_QueuedJob}
400
    @param job: Job object
401
    @type op: L{_QueuedOpCode}
402
    @param op: OpCode
403

404
    """
405
    assert queue, "Queue is missing"
406
    assert job, "Job is missing"
407
    assert op, "Opcode is missing"
408

    
409
    self._queue = queue
410
    self._job = job
411
    self._op = op
412

    
413
  def _CheckCancel(self):
414
    """Raises an exception to cancel the job if asked to.
415

416
    """
417
    # Cancel here if we were asked to
418
    if self._op.status == constants.OP_STATUS_CANCELING:
419
      logging.debug("Canceling opcode")
420
      raise CancelJob()
421

    
422
  @locking.ssynchronized(_QUEUE, shared=1)
423
  def NotifyStart(self):
424
    """Mark the opcode as running, not lock-waiting.
425

426
    This is called from the mcpu code as a notifier function, when the LU is
427
    finally about to start the Exec() method. Of course, to have end-user
428
    visible results, the opcode must be initially (before calling into
429
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
430

431
    """
432
    assert self._op in self._job.ops
433
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
434
                               constants.OP_STATUS_CANCELING)
435

    
436
    # Cancel here if we were asked to
437
    self._CheckCancel()
438

    
439
    logging.debug("Opcode is now running")
440

    
441
    self._op.status = constants.OP_STATUS_RUNNING
442
    self._op.exec_timestamp = TimeStampNow()
443

    
444
    # And finally replicate the job status
445
    self._queue.UpdateJobUnlocked(self._job)
446

    
447
  @locking.ssynchronized(_QUEUE, shared=1)
448
  def _AppendFeedback(self, timestamp, log_type, log_msg):
449
    """Internal feedback append function, with locks
450

451
    """
452
    self._job.log_serial += 1
453
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
454
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
455

    
456
  def Feedback(self, *args):
457
    """Append a log entry.
458

459
    """
460
    assert len(args) < 3
461

    
462
    if len(args) == 1:
463
      log_type = constants.ELOG_MESSAGE
464
      log_msg = args[0]
465
    else:
466
      (log_type, log_msg) = args
467

    
468
    # The time is split to make serialization easier and not lose
469
    # precision.
470
    timestamp = utils.SplitTime(time.time())
471
    self._AppendFeedback(timestamp, log_type, log_msg)
472

    
473
  def ReportLocks(self, msg):
474
    """Write locking information to the job.
475

476
    Called whenever the LU processor is waiting for a lock or has acquired one.
477

478
    """
479
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
480
                               constants.OP_STATUS_CANCELING)
481

    
482
    # Cancel here if we were asked to
483
    self._CheckCancel()
484

    
485

    
486
class _JobChangesChecker(object):
487
  def __init__(self, fields, prev_job_info, prev_log_serial):
488
    """Initializes this class.
489

490
    @type fields: list of strings
491
    @param fields: Fields requested by LUXI client
492
    @type prev_job_info: string
493
    @param prev_job_info: previous job info, as passed by the LUXI client
494
    @type prev_log_serial: string
495
    @param prev_log_serial: previous job serial, as passed by the LUXI client
496

497
    """
498
    self._fields = fields
499
    self._prev_job_info = prev_job_info
500
    self._prev_log_serial = prev_log_serial
501

    
502
  def __call__(self, job):
503
    """Checks whether job has changed.
504

505
    @type job: L{_QueuedJob}
506
    @param job: Job object
507

508
    """
509
    status = job.CalcStatus()
510
    job_info = job.GetInfo(self._fields)
511
    log_entries = job.GetLogEntries(self._prev_log_serial)
512

    
513
    # Serializing and deserializing data can cause type changes (e.g. from
514
    # tuple to list) or precision loss. We're doing it here so that we get
515
    # the same modifications as the data received from the client. Without
516
    # this, the comparison afterwards might fail without the data being
517
    # significantly different.
518
    # TODO: we just deserialized from disk, investigate how to make sure that
519
    # the job info and log entries are compatible to avoid this further step.
520
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
521
    # efficient, though floats will be tricky
522
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
523
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
524

    
525
    # Don't even try to wait if the job is no longer running, there will be
526
    # no changes.
527
    if (status not in (constants.JOB_STATUS_QUEUED,
528
                       constants.JOB_STATUS_RUNNING,
529
                       constants.JOB_STATUS_WAITLOCK) or
530
        job_info != self._prev_job_info or
531
        (log_entries and self._prev_log_serial != log_entries[0][0])):
532
      logging.debug("Job %s changed", job.id)
533
      return (job_info, log_entries)
534

    
535
    return None
536

    
537

    
538
class _JobFileChangesWaiter(object):
539
  def __init__(self, filename):
540
    """Initializes this class.
541

542
    @type filename: string
543
    @param filename: Path to job file
544
    @raises errors.InotifyError: if the notifier cannot be setup
545

546
    """
547
    self._wm = pyinotify.WatchManager()
548
    self._inotify_handler = \
549
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
550
    self._notifier = \
551
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
552
    try:
553
      self._inotify_handler.enable()
554
    except Exception:
555
      # pyinotify doesn't close file descriptors automatically
556
      self._notifier.stop()
557
      raise
558

    
559
  def _OnInotify(self, notifier_enabled):
560
    """Callback for inotify.
561

562
    """
563
    if not notifier_enabled:
564
      self._inotify_handler.enable()
565

    
566
  def Wait(self, timeout):
567
    """Waits for the job file to change.
568

569
    @type timeout: float
570
    @param timeout: Timeout in seconds
571
    @return: Whether there have been events
572

573
    """
574
    assert timeout >= 0
575
    have_events = self._notifier.check_events(timeout * 1000)
576
    if have_events:
577
      self._notifier.read_events()
578
    self._notifier.process_events()
579
    return have_events
580

    
581
  def Close(self):
582
    """Closes underlying notifier and its file descriptor.
583

584
    """
585
    self._notifier.stop()
586

    
587

    
588
class _JobChangesWaiter(object):
589
  def __init__(self, filename):
590
    """Initializes this class.
591

592
    @type filename: string
593
    @param filename: Path to job file
594

595
    """
596
    self._filewaiter = None
597
    self._filename = filename
598

    
599
  def Wait(self, timeout):
600
    """Waits for a job to change.
601

602
    @type timeout: float
603
    @param timeout: Timeout in seconds
604
    @return: Whether there have been events
605

606
    """
607
    if self._filewaiter:
608
      return self._filewaiter.Wait(timeout)
609

    
610
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
611
    # If this point is reached, return immediately and let caller check the job
612
    # file again in case there were changes since the last check. This avoids a
613
    # race condition.
614
    self._filewaiter = _JobFileChangesWaiter(self._filename)
615

    
616
    return True
617

    
618
  def Close(self):
619
    """Closes underlying waiter.
620

621
    """
622
    if self._filewaiter:
623
      self._filewaiter.Close()
624

    
625

    
626
class _WaitForJobChangesHelper(object):
627
  """Helper class using inotify to wait for changes in a job file.
628

629
  This class takes a previous job status and serial, and alerts the client when
630
  the current job status has changed.
631

632
  """
633
  @staticmethod
634
  def _CheckForChanges(job_load_fn, check_fn):
635
    job = job_load_fn()
636
    if not job:
637
      raise errors.JobLost()
638

    
639
    result = check_fn(job)
640
    if result is None:
641
      raise utils.RetryAgain()
642

    
643
    return result
644

    
645
  def __call__(self, filename, job_load_fn,
646
               fields, prev_job_info, prev_log_serial, timeout):
647
    """Waits for changes on a job.
648

649
    @type filename: string
650
    @param filename: File on which to wait for changes
651
    @type job_load_fn: callable
652
    @param job_load_fn: Function to load job
653
    @type fields: list of strings
654
    @param fields: Which fields to check for changes
655
    @type prev_job_info: list or None
656
    @param prev_job_info: Last job information returned
657
    @type prev_log_serial: int
658
    @param prev_log_serial: Last job message serial number
659
    @type timeout: float
660
    @param timeout: maximum time to wait in seconds
661

662
    """
663
    try:
664
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
665
      waiter = _JobChangesWaiter(filename)
666
      try:
667
        return utils.Retry(compat.partial(self._CheckForChanges,
668
                                          job_load_fn, check_fn),
669
                           utils.RETRY_REMAINING_TIME, timeout,
670
                           wait_fn=waiter.Wait)
671
      finally:
672
        waiter.Close()
673
    except (errors.InotifyError, errors.JobLost):
674
      return None
675
    except utils.RetryTimeout:
676
      return constants.JOB_NOTCHANGED
677

    
678

    
679
def _EncodeOpError(err):
680
  """Encodes an error which occurred while processing an opcode.
681

682
  """
683
  if isinstance(err, errors.GenericError):
684
    to_encode = err
685
  else:
686
    to_encode = errors.OpExecError(str(err))
687

    
688
  return errors.EncodeException(to_encode)
689

    
690

    
691
class _JobQueueWorker(workerpool.BaseWorker):
692
  """The actual job workers.
693

694
  """
695
  def RunTask(self, job): # pylint: disable-msg=W0221
696
    """Job executor.
697

698
    This functions processes a job. It is closely tied to the _QueuedJob and
699
    _QueuedOpCode classes.
700

701
    @type job: L{_QueuedJob}
702
    @param job: the job to be processed
703

704
    """
705
    self.SetTaskName("Job%s" % job.id)
706

    
707
    logging.info("Processing job %s", job.id)
708
    proc = mcpu.Processor(self.pool.queue.context, job.id)
709
    queue = job.queue
710
    try:
711
      try:
712
        count = len(job.ops)
713
        for idx, op in enumerate(job.ops):
714
          op_summary = op.input.Summary()
715
          if op.status == constants.OP_STATUS_SUCCESS:
716
            # this is a job that was partially completed before master
717
            # daemon shutdown, so it can be expected that some opcodes
718
            # are already completed successfully (if any did error
719
            # out, then the whole job should have been aborted and not
720
            # resubmitted for processing)
721
            logging.info("Op %s/%s: opcode %s already processed, skipping",
722
                         idx + 1, count, op_summary)
723
            continue
724
          try:
725
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
726
                         op_summary)
727

    
728
            queue.acquire(shared=1)
729
            try:
730
              if op.status == constants.OP_STATUS_CANCELED:
731
                logging.debug("Canceling opcode")
732
                raise CancelJob()
733
              assert op.status == constants.OP_STATUS_QUEUED
734
              logging.debug("Opcode %s/%s waiting for locks",
735
                            idx + 1, count)
736
              op.status = constants.OP_STATUS_WAITLOCK
737
              op.result = None
738
              op.start_timestamp = TimeStampNow()
739
              if idx == 0: # first opcode
740
                job.start_timestamp = op.start_timestamp
741
              queue.UpdateJobUnlocked(job)
742

    
743
              input_opcode = op.input
744
            finally:
745
              queue.release()
746

    
747
            # Make sure not to hold queue lock while calling ExecOpCode
748
            result = proc.ExecOpCode(input_opcode,
749
                                     _OpExecCallbacks(queue, job, op))
750

    
751
            queue.acquire(shared=1)
752
            try:
753
              logging.debug("Opcode %s/%s succeeded", idx + 1, count)
754
              op.status = constants.OP_STATUS_SUCCESS
755
              op.result = result
756
              op.end_timestamp = TimeStampNow()
757
              if idx == count - 1:
758
                job.end_timestamp = TimeStampNow()
759

    
760
                # Consistency check
761
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
762
                                  for i in job.ops)
763

    
764
              queue.UpdateJobUnlocked(job)
765
            finally:
766
              queue.release()
767

    
768
            logging.info("Op %s/%s: Successfully finished opcode %s",
769
                         idx + 1, count, op_summary)
770
          except CancelJob:
771
            # Will be handled further up
772
            raise
773
          except Exception, err:
774
            queue.acquire(shared=1)
775
            try:
776
              try:
777
                logging.debug("Opcode %s/%s failed", idx + 1, count)
778
                op.status = constants.OP_STATUS_ERROR
779
                op.result = _EncodeOpError(err)
780
                op.end_timestamp = TimeStampNow()
781
                logging.info("Op %s/%s: Error in opcode %s: %s",
782
                             idx + 1, count, op_summary, err)
783

    
784
                to_encode = errors.OpExecError("Preceding opcode failed")
785
                job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
786
                                      _EncodeOpError(to_encode))
787

    
788
                # Consistency check
789
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
790
                                  for i in job.ops[:idx])
791
                assert compat.all(i.status == constants.OP_STATUS_ERROR and
792
                                  errors.GetEncodedError(i.result)
793
                                  for i in job.ops[idx:])
794
              finally:
795
                job.end_timestamp = TimeStampNow()
796
                queue.UpdateJobUnlocked(job)
797
            finally:
798
              queue.release()
799
            raise
800

    
801
      except CancelJob:
802
        queue.acquire(shared=1)
803
        try:
804
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
805
                                "Job canceled by request")
806
          job.end_timestamp = TimeStampNow()
807
          queue.UpdateJobUnlocked(job)
808
        finally:
809
          queue.release()
810
      except errors.GenericError, err:
811
        logging.exception("Ganeti exception")
812
      except:
813
        logging.exception("Unhandled exception")
814
    finally:
815
      status = job.CalcStatus()
816
      logging.info("Finished job %s, status = %s", job.id, status)
817

    
818

    
819
class _JobQueueWorkerPool(workerpool.WorkerPool):
820
  """Simple class implementing a job-processing workerpool.
821

822
  """
823
  def __init__(self, queue):
824
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
825
                                              JOBQUEUE_THREADS,
826
                                              _JobQueueWorker)
827
    self.queue = queue
828

    
829

    
830
def _RequireOpenQueue(fn):
831
  """Decorator for "public" functions.
832

833
  This function should be used for all 'public' functions. That is,
834
  functions usually called from other classes. Note that this should
835
  be applied only to methods (not plain functions), since it expects
836
  that the decorated function is called with a first argument that has
837
  a '_queue_filelock' argument.
838

839
  @warning: Use this decorator only after locking.ssynchronized
840

841
  Example::
842
    @locking.ssynchronized(_LOCK)
843
    @_RequireOpenQueue
844
    def Example(self):
845
      pass
846

847
  """
848
  def wrapper(self, *args, **kwargs):
849
    # pylint: disable-msg=W0212
850
    assert self._queue_filelock is not None, "Queue should be open"
851
    return fn(self, *args, **kwargs)
852
  return wrapper
853

    
854

    
855
class JobQueue(object):
856
  """Queue used to manage the jobs.
857

858
  @cvar _RE_JOB_FILE: regex matching the valid job file names
859

860
  """
861
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
862

    
863
  def __init__(self, context):
864
    """Constructor for JobQueue.
865

866
    The constructor will initialize the job queue object and then
867
    start loading the current jobs from disk, either for starting them
868
    (if they were queue) or for aborting them (if they were already
869
    running).
870

871
    @type context: GanetiContext
872
    @param context: the context object for access to the configuration
873
        data and other ganeti objects
874

875
    """
876
    self.context = context
877
    self._memcache = weakref.WeakValueDictionary()
878
    self._my_hostname = netutils.HostInfo().name
879

    
880
    # The Big JobQueue lock. If a code block or method acquires it in shared
881
    # mode safe it must guarantee concurrency with all the code acquiring it in
882
    # shared mode, including itself. In order not to acquire it at all
883
    # concurrency must be guaranteed with all code acquiring it in shared mode
884
    # and all code acquiring it exclusively.
885
    self._lock = locking.SharedLock("JobQueue")
886

    
887
    self.acquire = self._lock.acquire
888
    self.release = self._lock.release
889

    
890
    # Initialize the queue, and acquire the filelock.
891
    # This ensures no other process is working on the job queue.
892
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
893

    
894
    # Read serial file
895
    self._last_serial = jstore.ReadSerial()
896
    assert self._last_serial is not None, ("Serial file was modified between"
897
                                           " check in jstore and here")
898

    
899
    # Get initial list of nodes
900
    self._nodes = dict((n.name, n.primary_ip)
901
                       for n in self.context.cfg.GetAllNodesInfo().values()
902
                       if n.master_candidate)
903

    
904
    # Remove master node
905
    self._nodes.pop(self._my_hostname, None)
906

    
907
    # TODO: Check consistency across nodes
908

    
909
    self._queue_size = 0
910
    self._UpdateQueueSizeUnlocked()
911
    self._drained = self._IsQueueMarkedDrain()
912

    
913
    # Setup worker pool
914
    self._wpool = _JobQueueWorkerPool(self)
915
    try:
916
      # We need to lock here because WorkerPool.AddTask() may start a job while
917
      # we're still doing our work.
918
      self.acquire()
919
      try:
920
        logging.info("Inspecting job queue")
921

    
922
        all_job_ids = self._GetJobIDsUnlocked()
923
        jobs_count = len(all_job_ids)
924
        lastinfo = time.time()
925
        for idx, job_id in enumerate(all_job_ids):
926
          # Give an update every 1000 jobs or 10 seconds
927
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
928
              idx == (jobs_count - 1)):
929
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
930
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
931
            lastinfo = time.time()
932

    
933
          job = self._LoadJobUnlocked(job_id)
934

    
935
          # a failure in loading the job can cause 'None' to be returned
936
          if job is None:
937
            continue
938

    
939
          status = job.CalcStatus()
940

    
941
          if status in (constants.JOB_STATUS_QUEUED, ):
942
            self._wpool.AddTask((job, ))
943

    
944
          elif status in (constants.JOB_STATUS_RUNNING,
945
                          constants.JOB_STATUS_WAITLOCK,
946
                          constants.JOB_STATUS_CANCELING):
947
            logging.warning("Unfinished job %s found: %s", job.id, job)
948
            job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
949
                                  "Unclean master daemon shutdown")
950

    
951
        logging.info("Job queue inspection finished")
952
      finally:
953
        self.release()
954
    except:
955
      self._wpool.TerminateWorkers()
956
      raise
957

    
958
  @locking.ssynchronized(_LOCK)
959
  @_RequireOpenQueue
960
  def AddNode(self, node):
961
    """Register a new node with the queue.
962

963
    @type node: L{objects.Node}
964
    @param node: the node object to be added
965

966
    """
967
    node_name = node.name
968
    assert node_name != self._my_hostname
969

    
970
    # Clean queue directory on added node
971
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
972
    msg = result.fail_msg
973
    if msg:
974
      logging.warning("Cannot cleanup queue directory on node %s: %s",
975
                      node_name, msg)
976

    
977
    if not node.master_candidate:
978
      # remove if existing, ignoring errors
979
      self._nodes.pop(node_name, None)
980
      # and skip the replication of the job ids
981
      return
982

    
983
    # Upload the whole queue excluding archived jobs
984
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
985

    
986
    # Upload current serial file
987
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
988

    
989
    for file_name in files:
990
      # Read file content
991
      content = utils.ReadFile(file_name)
992

    
993
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
994
                                                  [node.primary_ip],
995
                                                  file_name, content)
996
      msg = result[node_name].fail_msg
997
      if msg:
998
        logging.error("Failed to upload file %s to node %s: %s",
999
                      file_name, node_name, msg)
1000

    
1001
    self._nodes[node_name] = node.primary_ip
1002

    
1003
  @locking.ssynchronized(_LOCK)
1004
  @_RequireOpenQueue
1005
  def RemoveNode(self, node_name):
1006
    """Callback called when removing nodes from the cluster.
1007

1008
    @type node_name: str
1009
    @param node_name: the name of the node to remove
1010

1011
    """
1012
    self._nodes.pop(node_name, None)
1013

    
1014
  @staticmethod
1015
  def _CheckRpcResult(result, nodes, failmsg):
1016
    """Verifies the status of an RPC call.
1017

1018
    Since we aim to keep consistency should this node (the current
1019
    master) fail, we will log errors if our rpc fail, and especially
1020
    log the case when more than half of the nodes fails.
1021

1022
    @param result: the data as returned from the rpc call
1023
    @type nodes: list
1024
    @param nodes: the list of nodes we made the call to
1025
    @type failmsg: str
1026
    @param failmsg: the identifier to be used for logging
1027

1028
    """
1029
    failed = []
1030
    success = []
1031

    
1032
    for node in nodes:
1033
      msg = result[node].fail_msg
1034
      if msg:
1035
        failed.append(node)
1036
        logging.error("RPC call %s (%s) failed on node %s: %s",
1037
                      result[node].call, failmsg, node, msg)
1038
      else:
1039
        success.append(node)
1040

    
1041
    # +1 for the master node
1042
    if (len(success) + 1) < len(failed):
1043
      # TODO: Handle failing nodes
1044
      logging.error("More than half of the nodes failed")
1045

    
1046
  def _GetNodeIp(self):
1047
    """Helper for returning the node name/ip list.
1048

1049
    @rtype: (list, list)
1050
    @return: a tuple of two lists, the first one with the node
1051
        names and the second one with the node addresses
1052

1053
    """
1054
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1055
    name_list = self._nodes.keys()
1056
    addr_list = [self._nodes[name] for name in name_list]
1057
    return name_list, addr_list
1058

    
1059
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1060
    """Writes a file locally and then replicates it to all nodes.
1061

1062
    This function will replace the contents of a file on the local
1063
    node and then replicate it to all the other nodes we have.
1064

1065
    @type file_name: str
1066
    @param file_name: the path of the file to be replicated
1067
    @type data: str
1068
    @param data: the new contents of the file
1069
    @type replicate: boolean
1070
    @param replicate: whether to spread the changes to the remote nodes
1071

1072
    """
1073
    utils.WriteFile(file_name, data=data)
1074

    
1075
    if replicate:
1076
      names, addrs = self._GetNodeIp()
1077
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1078
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1079

    
1080
  def _RenameFilesUnlocked(self, rename):
1081
    """Renames a file locally and then replicate the change.
1082

1083
    This function will rename a file in the local queue directory
1084
    and then replicate this rename to all the other nodes we have.
1085

1086
    @type rename: list of (old, new)
1087
    @param rename: List containing tuples mapping old to new names
1088

1089
    """
1090
    # Rename them locally
1091
    for old, new in rename:
1092
      utils.RenameFile(old, new, mkdir=True)
1093

    
1094
    # ... and on all nodes
1095
    names, addrs = self._GetNodeIp()
1096
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1097
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1098

    
1099
  @staticmethod
1100
  def _FormatJobID(job_id):
1101
    """Convert a job ID to string format.
1102

1103
    Currently this just does C{str(job_id)} after performing some
1104
    checks, but if we want to change the job id format this will
1105
    abstract this change.
1106

1107
    @type job_id: int or long
1108
    @param job_id: the numeric job id
1109
    @rtype: str
1110
    @return: the formatted job id
1111

1112
    """
1113
    if not isinstance(job_id, (int, long)):
1114
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1115
    if job_id < 0:
1116
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1117

    
1118
    return str(job_id)
1119

    
1120
  @classmethod
1121
  def _GetArchiveDirectory(cls, job_id):
1122
    """Returns the archive directory for a job.
1123

1124
    @type job_id: str
1125
    @param job_id: Job identifier
1126
    @rtype: str
1127
    @return: Directory name
1128

1129
    """
1130
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1131

    
1132
  def _NewSerialsUnlocked(self, count):
1133
    """Generates a new job identifier.
1134

1135
    Job identifiers are unique during the lifetime of a cluster.
1136

1137
    @type count: integer
1138
    @param count: how many serials to return
1139
    @rtype: str
1140
    @return: a string representing the job identifier.
1141

1142
    """
1143
    assert count > 0
1144
    # New number
1145
    serial = self._last_serial + count
1146

    
1147
    # Write to file
1148
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1149
                             "%s\n" % serial, True)
1150

    
1151
    result = [self._FormatJobID(v)
1152
              for v in range(self._last_serial, serial + 1)]
1153
    # Keep it only if we were able to write the file
1154
    self._last_serial = serial
1155

    
1156
    return result
1157

    
1158
  @staticmethod
1159
  def _GetJobPath(job_id):
1160
    """Returns the job file for a given job id.
1161

1162
    @type job_id: str
1163
    @param job_id: the job identifier
1164
    @rtype: str
1165
    @return: the path to the job file
1166

1167
    """
1168
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1169

    
1170
  @classmethod
1171
  def _GetArchivedJobPath(cls, job_id):
1172
    """Returns the archived job file for a give job id.
1173

1174
    @type job_id: str
1175
    @param job_id: the job identifier
1176
    @rtype: str
1177
    @return: the path to the archived job file
1178

1179
    """
1180
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1181
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1182

    
1183
  def _GetJobIDsUnlocked(self, sort=True):
1184
    """Return all known job IDs.
1185

1186
    The method only looks at disk because it's a requirement that all
1187
    jobs are present on disk (so in the _memcache we don't have any
1188
    extra IDs).
1189

1190
    @type sort: boolean
1191
    @param sort: perform sorting on the returned job ids
1192
    @rtype: list
1193
    @return: the list of job IDs
1194

1195
    """
1196
    jlist = []
1197
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1198
      m = self._RE_JOB_FILE.match(filename)
1199
      if m:
1200
        jlist.append(m.group(1))
1201
    if sort:
1202
      jlist = utils.NiceSort(jlist)
1203
    return jlist
1204

    
1205
  def _LoadJobUnlocked(self, job_id):
1206
    """Loads a job from the disk or memory.
1207

1208
    Given a job id, this will return the cached job object if
1209
    existing, or try to load the job from the disk. If loading from
1210
    disk, it will also add the job to the cache.
1211

1212
    @param job_id: the job id
1213
    @rtype: L{_QueuedJob} or None
1214
    @return: either None or the job object
1215

1216
    """
1217
    job = self._memcache.get(job_id, None)
1218
    if job:
1219
      logging.debug("Found job %s in memcache", job_id)
1220
      return job
1221

    
1222
    try:
1223
      job = self._LoadJobFromDisk(job_id)
1224
      if job is None:
1225
        return job
1226
    except errors.JobFileCorrupted:
1227
      old_path = self._GetJobPath(job_id)
1228
      new_path = self._GetArchivedJobPath(job_id)
1229
      if old_path == new_path:
1230
        # job already archived (future case)
1231
        logging.exception("Can't parse job %s", job_id)
1232
      else:
1233
        # non-archived case
1234
        logging.exception("Can't parse job %s, will archive.", job_id)
1235
        self._RenameFilesUnlocked([(old_path, new_path)])
1236
      return None
1237

    
1238
    self._memcache[job_id] = job
1239
    logging.debug("Added job %s to the cache", job_id)
1240
    return job
1241

    
1242
  def _LoadJobFromDisk(self, job_id):
1243
    """Load the given job file from disk.
1244

1245
    Given a job file, read, load and restore it in a _QueuedJob format.
1246

1247
    @type job_id: string
1248
    @param job_id: job identifier
1249
    @rtype: L{_QueuedJob} or None
1250
    @return: either None or the job object
1251

1252
    """
1253
    filepath = self._GetJobPath(job_id)
1254
    logging.debug("Loading job from %s", filepath)
1255
    try:
1256
      raw_data = utils.ReadFile(filepath)
1257
    except EnvironmentError, err:
1258
      if err.errno in (errno.ENOENT, ):
1259
        return None
1260
      raise
1261

    
1262
    try:
1263
      data = serializer.LoadJson(raw_data)
1264
      job = _QueuedJob.Restore(self, data)
1265
    except Exception, err: # pylint: disable-msg=W0703
1266
      raise errors.JobFileCorrupted(err)
1267

    
1268
    return job
1269

    
1270
  def SafeLoadJobFromDisk(self, job_id):
1271
    """Load the given job file from disk.
1272

1273
    Given a job file, read, load and restore it in a _QueuedJob format.
1274
    In case of error reading the job, it gets returned as None, and the
1275
    exception is logged.
1276

1277
    @type job_id: string
1278
    @param job_id: job identifier
1279
    @rtype: L{_QueuedJob} or None
1280
    @return: either None or the job object
1281

1282
    """
1283
    try:
1284
      return self._LoadJobFromDisk(job_id)
1285
    except (errors.JobFileCorrupted, EnvironmentError):
1286
      logging.exception("Can't load/parse job %s", job_id)
1287
      return None
1288

    
1289
  @staticmethod
1290
  def _IsQueueMarkedDrain():
1291
    """Check if the queue is marked from drain.
1292

1293
    This currently uses the queue drain file, which makes it a
1294
    per-node flag. In the future this can be moved to the config file.
1295

1296
    @rtype: boolean
1297
    @return: True of the job queue is marked for draining
1298

1299
    """
1300
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1301

    
1302
  def _UpdateQueueSizeUnlocked(self):
1303
    """Update the queue size.
1304

1305
    """
1306
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1307

    
1308
  @locking.ssynchronized(_LOCK)
1309
  @_RequireOpenQueue
1310
  def SetDrainFlag(self, drain_flag):
1311
    """Sets the drain flag for the queue.
1312

1313
    @type drain_flag: boolean
1314
    @param drain_flag: Whether to set or unset the drain flag
1315

1316
    """
1317
    if drain_flag:
1318
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1319
    else:
1320
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1321

    
1322
    self._drained = drain_flag
1323

    
1324
    return True
1325

    
1326
  @_RequireOpenQueue
1327
  def _SubmitJobUnlocked(self, job_id, ops):
1328
    """Create and store a new job.
1329

1330
    This enters the job into our job queue and also puts it on the new
1331
    queue, in order for it to be picked up by the queue processors.
1332

1333
    @type job_id: job ID
1334
    @param job_id: the job ID for the new job
1335
    @type ops: list
1336
    @param ops: The list of OpCodes that will become the new job.
1337
    @rtype: L{_QueuedJob}
1338
    @return: the job object to be queued
1339
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1340
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1341

1342
    """
1343
    # Ok when sharing the big job queue lock, as the drain file is created when
1344
    # the lock is exclusive.
1345
    if self._drained:
1346
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1347

    
1348
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1349
      raise errors.JobQueueFull()
1350

    
1351
    job = _QueuedJob(self, job_id, ops)
1352

    
1353
    # Write to disk
1354
    self.UpdateJobUnlocked(job)
1355

    
1356
    self._queue_size += 1
1357

    
1358
    logging.debug("Adding new job %s to the cache", job_id)
1359
    self._memcache[job_id] = job
1360

    
1361
    return job
1362

    
1363
  @locking.ssynchronized(_LOCK)
1364
  @_RequireOpenQueue
1365
  def SubmitJob(self, ops):
1366
    """Create and store a new job.
1367

1368
    @see: L{_SubmitJobUnlocked}
1369

1370
    """
1371
    job_id = self._NewSerialsUnlocked(1)[0]
1372
    self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
1373
    return job_id
1374

    
1375
  @locking.ssynchronized(_LOCK)
1376
  @_RequireOpenQueue
1377
  def SubmitManyJobs(self, jobs):
1378
    """Create and store multiple jobs.
1379

1380
    @see: L{_SubmitJobUnlocked}
1381

1382
    """
1383
    results = []
1384
    tasks = []
1385
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1386
    for job_id, ops in zip(all_job_ids, jobs):
1387
      try:
1388
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1389
        status = True
1390
        data = job_id
1391
      except errors.GenericError, err:
1392
        data = str(err)
1393
        status = False
1394
      results.append((status, data))
1395
    self._wpool.AddManyTasks(tasks)
1396

    
1397
    return results
1398

    
1399
  @_RequireOpenQueue
1400
  def UpdateJobUnlocked(self, job, replicate=True):
1401
    """Update a job's on disk storage.
1402

1403
    After a job has been modified, this function needs to be called in
1404
    order to write the changes to disk and replicate them to the other
1405
    nodes.
1406

1407
    @type job: L{_QueuedJob}
1408
    @param job: the changed job
1409
    @type replicate: boolean
1410
    @param replicate: whether to replicate the change to remote nodes
1411

1412
    """
1413
    filename = self._GetJobPath(job.id)
1414
    data = serializer.DumpJson(job.Serialize(), indent=False)
1415
    logging.debug("Writing job %s to %s", job.id, filename)
1416
    self._UpdateJobQueueFile(filename, data, replicate)
1417

    
1418
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1419
                        timeout):
1420
    """Waits for changes in a job.
1421

1422
    @type job_id: string
1423
    @param job_id: Job identifier
1424
    @type fields: list of strings
1425
    @param fields: Which fields to check for changes
1426
    @type prev_job_info: list or None
1427
    @param prev_job_info: Last job information returned
1428
    @type prev_log_serial: int
1429
    @param prev_log_serial: Last job message serial number
1430
    @type timeout: float
1431
    @param timeout: maximum time to wait in seconds
1432
    @rtype: tuple (job info, log entries)
1433
    @return: a tuple of the job information as required via
1434
        the fields parameter, and the log entries as a list
1435

1436
        if the job has not changed and the timeout has expired,
1437
        we instead return a special value,
1438
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1439
        as such by the clients
1440

1441
    """
1442
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1443

    
1444
    helper = _WaitForJobChangesHelper()
1445

    
1446
    return helper(self._GetJobPath(job_id), load_fn,
1447
                  fields, prev_job_info, prev_log_serial, timeout)
1448

    
1449
  @locking.ssynchronized(_LOCK)
1450
  @_RequireOpenQueue
1451
  def CancelJob(self, job_id):
1452
    """Cancels a job.
1453

1454
    This will only succeed if the job has not started yet.
1455

1456
    @type job_id: string
1457
    @param job_id: job ID of job to be cancelled.
1458

1459
    """
1460
    logging.info("Cancelling job %s", job_id)
1461

    
1462
    job = self._LoadJobUnlocked(job_id)
1463
    if not job:
1464
      logging.debug("Job %s not found", job_id)
1465
      return (False, "Job %s not found" % job_id)
1466

    
1467
    job_status = job.CalcStatus()
1468

    
1469
    if job_status not in (constants.JOB_STATUS_QUEUED,
1470
                          constants.JOB_STATUS_WAITLOCK):
1471
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1472
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1473

    
1474
    if job_status == constants.JOB_STATUS_QUEUED:
1475
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1476
                            "Job canceled by request")
1477
      return (True, "Job %s canceled" % job.id)
1478

    
1479
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1480
      # The worker will notice the new status and cancel the job
1481
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1482
      return (True, "Job %s will be canceled" % job.id)
1483

    
1484
  @_RequireOpenQueue
1485
  def _ArchiveJobsUnlocked(self, jobs):
1486
    """Archives jobs.
1487

1488
    @type jobs: list of L{_QueuedJob}
1489
    @param jobs: Job objects
1490
    @rtype: int
1491
    @return: Number of archived jobs
1492

1493
    """
1494
    archive_jobs = []
1495
    rename_files = []
1496
    for job in jobs:
1497
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1498
        logging.debug("Job %s is not yet done", job.id)
1499
        continue
1500

    
1501
      archive_jobs.append(job)
1502

    
1503
      old = self._GetJobPath(job.id)
1504
      new = self._GetArchivedJobPath(job.id)
1505
      rename_files.append((old, new))
1506

    
1507
    # TODO: What if 1..n files fail to rename?
1508
    self._RenameFilesUnlocked(rename_files)
1509

    
1510
    logging.debug("Successfully archived job(s) %s",
1511
                  utils.CommaJoin(job.id for job in archive_jobs))
1512

    
1513
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1514
    # the files, we update the cached queue size from the filesystem. When we
1515
    # get around to fix the TODO: above, we can use the number of actually
1516
    # archived jobs to fix this.
1517
    self._UpdateQueueSizeUnlocked()
1518
    return len(archive_jobs)
1519

    
1520
  @locking.ssynchronized(_LOCK)
1521
  @_RequireOpenQueue
1522
  def ArchiveJob(self, job_id):
1523
    """Archives a job.
1524

1525
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1526

1527
    @type job_id: string
1528
    @param job_id: Job ID of job to be archived.
1529
    @rtype: bool
1530
    @return: Whether job was archived
1531

1532
    """
1533
    logging.info("Archiving job %s", job_id)
1534

    
1535
    job = self._LoadJobUnlocked(job_id)
1536
    if not job:
1537
      logging.debug("Job %s not found", job_id)
1538
      return False
1539

    
1540
    return self._ArchiveJobsUnlocked([job]) == 1
1541

    
1542
  @locking.ssynchronized(_LOCK)
1543
  @_RequireOpenQueue
1544
  def AutoArchiveJobs(self, age, timeout):
1545
    """Archives all jobs based on age.
1546

1547
    The method will archive all jobs which are older than the age
1548
    parameter. For jobs that don't have an end timestamp, the start
1549
    timestamp will be considered. The special '-1' age will cause
1550
    archival of all jobs (that are not running or queued).
1551

1552
    @type age: int
1553
    @param age: the minimum age in seconds
1554

1555
    """
1556
    logging.info("Archiving jobs with age more than %s seconds", age)
1557

    
1558
    now = time.time()
1559
    end_time = now + timeout
1560
    archived_count = 0
1561
    last_touched = 0
1562

    
1563
    all_job_ids = self._GetJobIDsUnlocked()
1564
    pending = []
1565
    for idx, job_id in enumerate(all_job_ids):
1566
      last_touched = idx + 1
1567

    
1568
      # Not optimal because jobs could be pending
1569
      # TODO: Measure average duration for job archival and take number of
1570
      # pending jobs into account.
1571
      if time.time() > end_time:
1572
        break
1573

    
1574
      # Returns None if the job failed to load
1575
      job = self._LoadJobUnlocked(job_id)
1576
      if job:
1577
        if job.end_timestamp is None:
1578
          if job.start_timestamp is None:
1579
            job_age = job.received_timestamp
1580
          else:
1581
            job_age = job.start_timestamp
1582
        else:
1583
          job_age = job.end_timestamp
1584

    
1585
        if age == -1 or now - job_age[0] > age:
1586
          pending.append(job)
1587

    
1588
          # Archive 10 jobs at a time
1589
          if len(pending) >= 10:
1590
            archived_count += self._ArchiveJobsUnlocked(pending)
1591
            pending = []
1592

    
1593
    if pending:
1594
      archived_count += self._ArchiveJobsUnlocked(pending)
1595

    
1596
    return (archived_count, len(all_job_ids) - last_touched)
1597

    
1598
  def QueryJobs(self, job_ids, fields):
1599
    """Returns a list of jobs in queue.
1600

1601
    @type job_ids: list
1602
    @param job_ids: sequence of job identifiers or None for all
1603
    @type fields: list
1604
    @param fields: names of fields to return
1605
    @rtype: list
1606
    @return: list one element per job, each element being list with
1607
        the requested fields
1608

1609
    """
1610
    jobs = []
1611
    list_all = False
1612
    if not job_ids:
1613
      # Since files are added to/removed from the queue atomically, there's no
1614
      # risk of getting the job ids in an inconsistent state.
1615
      job_ids = self._GetJobIDsUnlocked()
1616
      list_all = True
1617

    
1618
    for job_id in job_ids:
1619
      job = self.SafeLoadJobFromDisk(job_id)
1620
      if job is not None:
1621
        jobs.append(job.GetInfo(fields))
1622
      elif not list_all:
1623
        jobs.append(None)
1624

    
1625
    return jobs
1626

    
1627
  @locking.ssynchronized(_LOCK)
1628
  @_RequireOpenQueue
1629
  def Shutdown(self):
1630
    """Stops the job queue.
1631

1632
    This shutdowns all the worker threads an closes the queue.
1633

1634
    """
1635
    self._wpool.TerminateWorkers()
1636

    
1637
    self._queue_filelock.Close()
1638
    self._queue_filelock = None