Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 8044bf65

History | View | Annotate | Download (48.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import errno
35
import re
36
import time
37
import weakref
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import netutils
57
from ganeti import compat
58

    
59

    
60
JOBQUEUE_THREADS = 25
61
JOBS_PER_ARCHIVE_DIRECTORY = 10000
62

    
63
# member lock names to be passed to @ssynchronized decorator
64
_LOCK = "_lock"
65
_QUEUE = "_queue"
66

    
67

    
68
class CancelJob(Exception):
69
  """Special exception to cancel a job.
70

71
  """
72

    
73

    
74
def TimeStampNow():
75
  """Returns the current timestamp.
76

77
  @rtype: tuple
78
  @return: the current time in the (seconds, microseconds) format
79

80
  """
81
  return utils.SplitTime(time.time())
82

    
83

    
84
class _QueuedOpCode(object):
85
  """Encapsulates an opcode object.
86

87
  @ivar log: holds the execution log and consists of tuples
88
  of the form C{(log_serial, timestamp, level, message)}
89
  @ivar input: the OpCode we encapsulate
90
  @ivar status: the current status
91
  @ivar result: the result of the LU execution
92
  @ivar start_timestamp: timestamp for the start of the execution
93
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94
  @ivar stop_timestamp: timestamp for the end of the execution
95

96
  """
97
  __slots__ = ["input", "status", "result", "log",
98
               "start_timestamp", "exec_timestamp", "end_timestamp",
99
               "__weakref__"]
100

    
101
  def __init__(self, op):
102
    """Constructor for the _QuededOpCode.
103

104
    @type op: L{opcodes.OpCode}
105
    @param op: the opcode we encapsulate
106

107
    """
108
    self.input = op
109
    self.status = constants.OP_STATUS_QUEUED
110
    self.result = None
111
    self.log = []
112
    self.start_timestamp = None
113
    self.exec_timestamp = None
114
    self.end_timestamp = None
115

    
116
  @classmethod
117
  def Restore(cls, state):
118
    """Restore the _QueuedOpCode from the serialized form.
119

120
    @type state: dict
121
    @param state: the serialized state
122
    @rtype: _QueuedOpCode
123
    @return: a new _QueuedOpCode instance
124

125
    """
126
    obj = _QueuedOpCode.__new__(cls)
127
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
128
    obj.status = state["status"]
129
    obj.result = state["result"]
130
    obj.log = state["log"]
131
    obj.start_timestamp = state.get("start_timestamp", None)
132
    obj.exec_timestamp = state.get("exec_timestamp", None)
133
    obj.end_timestamp = state.get("end_timestamp", None)
134
    return obj
135

    
136
  def Serialize(self):
137
    """Serializes this _QueuedOpCode.
138

139
    @rtype: dict
140
    @return: the dictionary holding the serialized state
141

142
    """
143
    return {
144
      "input": self.input.__getstate__(),
145
      "status": self.status,
146
      "result": self.result,
147
      "log": self.log,
148
      "start_timestamp": self.start_timestamp,
149
      "exec_timestamp": self.exec_timestamp,
150
      "end_timestamp": self.end_timestamp,
151
      }
152

    
153

    
154
class _QueuedJob(object):
155
  """In-memory job representation.
156

157
  This is what we use to track the user-submitted jobs. Locking must
158
  be taken care of by users of this class.
159

160
  @type queue: L{JobQueue}
161
  @ivar queue: the parent queue
162
  @ivar id: the job ID
163
  @type ops: list
164
  @ivar ops: the list of _QueuedOpCode that constitute the job
165
  @type log_serial: int
166
  @ivar log_serial: holds the index for the next log entry
167
  @ivar received_timestamp: the timestamp for when the job was received
168
  @ivar start_timestmap: the timestamp for start of execution
169
  @ivar end_timestamp: the timestamp for end of execution
170

171
  """
172
  # pylint: disable-msg=W0212
173
  __slots__ = ["queue", "id", "ops", "log_serial",
174
               "received_timestamp", "start_timestamp", "end_timestamp",
175
               "__weakref__"]
176

    
177
  def __init__(self, queue, job_id, ops):
178
    """Constructor for the _QueuedJob.
179

180
    @type queue: L{JobQueue}
181
    @param queue: our parent queue
182
    @type job_id: job_id
183
    @param job_id: our job id
184
    @type ops: list
185
    @param ops: the list of opcodes we hold, which will be encapsulated
186
        in _QueuedOpCodes
187

188
    """
189
    if not ops:
190
      raise errors.GenericError("A job needs at least one opcode")
191

    
192
    self.queue = queue
193
    self.id = job_id
194
    self.ops = [_QueuedOpCode(op) for op in ops]
195
    self.log_serial = 0
196
    self.received_timestamp = TimeStampNow()
197
    self.start_timestamp = None
198
    self.end_timestamp = None
199

    
200
  def __repr__(self):
201
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
202
              "id=%s" % self.id,
203
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
204

    
205
    return "<%s at %#x>" % (" ".join(status), id(self))
206

    
207
  @classmethod
208
  def Restore(cls, queue, state):
209
    """Restore a _QueuedJob from serialized state:
210

211
    @type queue: L{JobQueue}
212
    @param queue: to which queue the restored job belongs
213
    @type state: dict
214
    @param state: the serialized state
215
    @rtype: _JobQueue
216
    @return: the restored _JobQueue instance
217

218
    """
219
    obj = _QueuedJob.__new__(cls)
220
    obj.queue = queue
221
    obj.id = state["id"]
222
    obj.received_timestamp = state.get("received_timestamp", None)
223
    obj.start_timestamp = state.get("start_timestamp", None)
224
    obj.end_timestamp = state.get("end_timestamp", None)
225

    
226
    obj.ops = []
227
    obj.log_serial = 0
228
    for op_state in state["ops"]:
229
      op = _QueuedOpCode.Restore(op_state)
230
      for log_entry in op.log:
231
        obj.log_serial = max(obj.log_serial, log_entry[0])
232
      obj.ops.append(op)
233

    
234
    return obj
235

    
236
  def Serialize(self):
237
    """Serialize the _JobQueue instance.
238

239
    @rtype: dict
240
    @return: the serialized state
241

242
    """
243
    return {
244
      "id": self.id,
245
      "ops": [op.Serialize() for op in self.ops],
246
      "start_timestamp": self.start_timestamp,
247
      "end_timestamp": self.end_timestamp,
248
      "received_timestamp": self.received_timestamp,
249
      }
250

    
251
  def CalcStatus(self):
252
    """Compute the status of this job.
253

254
    This function iterates over all the _QueuedOpCodes in the job and
255
    based on their status, computes the job status.
256

257
    The algorithm is:
258
      - if we find a cancelled, or finished with error, the job
259
        status will be the same
260
      - otherwise, the last opcode with the status one of:
261
          - waitlock
262
          - canceling
263
          - running
264

265
        will determine the job status
266

267
      - otherwise, it means either all opcodes are queued, or success,
268
        and the job status will be the same
269

270
    @return: the job status
271

272
    """
273
    status = constants.JOB_STATUS_QUEUED
274

    
275
    all_success = True
276
    for op in self.ops:
277
      if op.status == constants.OP_STATUS_SUCCESS:
278
        continue
279

    
280
      all_success = False
281

    
282
      if op.status == constants.OP_STATUS_QUEUED:
283
        pass
284
      elif op.status == constants.OP_STATUS_WAITLOCK:
285
        status = constants.JOB_STATUS_WAITLOCK
286
      elif op.status == constants.OP_STATUS_RUNNING:
287
        status = constants.JOB_STATUS_RUNNING
288
      elif op.status == constants.OP_STATUS_CANCELING:
289
        status = constants.JOB_STATUS_CANCELING
290
        break
291
      elif op.status == constants.OP_STATUS_ERROR:
292
        status = constants.JOB_STATUS_ERROR
293
        # The whole job fails if one opcode failed
294
        break
295
      elif op.status == constants.OP_STATUS_CANCELED:
296
        status = constants.OP_STATUS_CANCELED
297
        break
298

    
299
    if all_success:
300
      status = constants.JOB_STATUS_SUCCESS
301

    
302
    return status
303

    
304
  def GetLogEntries(self, newer_than):
305
    """Selectively returns the log entries.
306

307
    @type newer_than: None or int
308
    @param newer_than: if this is None, return all log entries,
309
        otherwise return only the log entries with serial higher
310
        than this value
311
    @rtype: list
312
    @return: the list of the log entries selected
313

314
    """
315
    if newer_than is None:
316
      serial = -1
317
    else:
318
      serial = newer_than
319

    
320
    entries = []
321
    for op in self.ops:
322
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
323

    
324
    return entries
325

    
326
  def GetInfo(self, fields):
327
    """Returns information about a job.
328

329
    @type fields: list
330
    @param fields: names of fields to return
331
    @rtype: list
332
    @return: list with one element for each field
333
    @raise errors.OpExecError: when an invalid field
334
        has been passed
335

336
    """
337
    row = []
338
    for fname in fields:
339
      if fname == "id":
340
        row.append(self.id)
341
      elif fname == "status":
342
        row.append(self.CalcStatus())
343
      elif fname == "ops":
344
        row.append([op.input.__getstate__() for op in self.ops])
345
      elif fname == "opresult":
346
        row.append([op.result for op in self.ops])
347
      elif fname == "opstatus":
348
        row.append([op.status for op in self.ops])
349
      elif fname == "oplog":
350
        row.append([op.log for op in self.ops])
351
      elif fname == "opstart":
352
        row.append([op.start_timestamp for op in self.ops])
353
      elif fname == "opexec":
354
        row.append([op.exec_timestamp for op in self.ops])
355
      elif fname == "opend":
356
        row.append([op.end_timestamp for op in self.ops])
357
      elif fname == "received_ts":
358
        row.append(self.received_timestamp)
359
      elif fname == "start_ts":
360
        row.append(self.start_timestamp)
361
      elif fname == "end_ts":
362
        row.append(self.end_timestamp)
363
      elif fname == "summary":
364
        row.append([op.input.Summary() for op in self.ops])
365
      else:
366
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
367
    return row
368

    
369
  def MarkUnfinishedOps(self, status, result):
370
    """Mark unfinished opcodes with a given status and result.
371

372
    This is an utility function for marking all running or waiting to
373
    be run opcodes with a given status. Opcodes which are already
374
    finalised are not changed.
375

376
    @param status: a given opcode status
377
    @param result: the opcode result
378

379
    """
380
    try:
381
      not_marked = True
382
      for op in self.ops:
383
        if op.status in constants.OPS_FINALIZED:
384
          assert not_marked, "Finalized opcodes found after non-finalized ones"
385
          continue
386
        op.status = status
387
        op.result = result
388
        not_marked = False
389
    finally:
390
      self.queue.UpdateJobUnlocked(self)
391

    
392

    
393
class _OpExecCallbacks(mcpu.OpExecCbBase):
394
  def __init__(self, queue, job, op):
395
    """Initializes this class.
396

397
    @type queue: L{JobQueue}
398
    @param queue: Job queue
399
    @type job: L{_QueuedJob}
400
    @param job: Job object
401
    @type op: L{_QueuedOpCode}
402
    @param op: OpCode
403

404
    """
405
    assert queue, "Queue is missing"
406
    assert job, "Job is missing"
407
    assert op, "Opcode is missing"
408

    
409
    self._queue = queue
410
    self._job = job
411
    self._op = op
412

    
413
  def _CheckCancel(self):
414
    """Raises an exception to cancel the job if asked to.
415

416
    """
417
    # Cancel here if we were asked to
418
    if self._op.status == constants.OP_STATUS_CANCELING:
419
      logging.debug("Canceling opcode")
420
      raise CancelJob()
421

    
422
  @locking.ssynchronized(_QUEUE, shared=1)
423
  def NotifyStart(self):
424
    """Mark the opcode as running, not lock-waiting.
425

426
    This is called from the mcpu code as a notifier function, when the LU is
427
    finally about to start the Exec() method. Of course, to have end-user
428
    visible results, the opcode must be initially (before calling into
429
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
430

431
    """
432
    assert self._op in self._job.ops
433
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
434
                               constants.OP_STATUS_CANCELING)
435

    
436
    # Cancel here if we were asked to
437
    self._CheckCancel()
438

    
439
    logging.debug("Opcode is now running")
440

    
441
    self._op.status = constants.OP_STATUS_RUNNING
442
    self._op.exec_timestamp = TimeStampNow()
443

    
444
    # And finally replicate the job status
445
    self._queue.UpdateJobUnlocked(self._job)
446

    
447
  @locking.ssynchronized(_QUEUE, shared=1)
448
  def _AppendFeedback(self, timestamp, log_type, log_msg):
449
    """Internal feedback append function, with locks
450

451
    """
452
    self._job.log_serial += 1
453
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
454
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
455

    
456
  def Feedback(self, *args):
457
    """Append a log entry.
458

459
    """
460
    assert len(args) < 3
461

    
462
    if len(args) == 1:
463
      log_type = constants.ELOG_MESSAGE
464
      log_msg = args[0]
465
    else:
466
      (log_type, log_msg) = args
467

    
468
    # The time is split to make serialization easier and not lose
469
    # precision.
470
    timestamp = utils.SplitTime(time.time())
471
    self._AppendFeedback(timestamp, log_type, log_msg)
472

    
473
  def ReportLocks(self, msg):
474
    """Write locking information to the job.
475

476
    Called whenever the LU processor is waiting for a lock or has acquired one.
477

478
    """
479
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
480
                               constants.OP_STATUS_CANCELING)
481

    
482
    # Cancel here if we were asked to
483
    self._CheckCancel()
484

    
485

    
486
class _JobChangesChecker(object):
487
  def __init__(self, fields, prev_job_info, prev_log_serial):
488
    """Initializes this class.
489

490
    @type fields: list of strings
491
    @param fields: Fields requested by LUXI client
492
    @type prev_job_info: string
493
    @param prev_job_info: previous job info, as passed by the LUXI client
494
    @type prev_log_serial: string
495
    @param prev_log_serial: previous job serial, as passed by the LUXI client
496

497
    """
498
    self._fields = fields
499
    self._prev_job_info = prev_job_info
500
    self._prev_log_serial = prev_log_serial
501

    
502
  def __call__(self, job):
503
    """Checks whether job has changed.
504

505
    @type job: L{_QueuedJob}
506
    @param job: Job object
507

508
    """
509
    status = job.CalcStatus()
510
    job_info = job.GetInfo(self._fields)
511
    log_entries = job.GetLogEntries(self._prev_log_serial)
512

    
513
    # Serializing and deserializing data can cause type changes (e.g. from
514
    # tuple to list) or precision loss. We're doing it here so that we get
515
    # the same modifications as the data received from the client. Without
516
    # this, the comparison afterwards might fail without the data being
517
    # significantly different.
518
    # TODO: we just deserialized from disk, investigate how to make sure that
519
    # the job info and log entries are compatible to avoid this further step.
520
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
521
    # efficient, though floats will be tricky
522
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
523
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
524

    
525
    # Don't even try to wait if the job is no longer running, there will be
526
    # no changes.
527
    if (status not in (constants.JOB_STATUS_QUEUED,
528
                       constants.JOB_STATUS_RUNNING,
529
                       constants.JOB_STATUS_WAITLOCK) or
530
        job_info != self._prev_job_info or
531
        (log_entries and self._prev_log_serial != log_entries[0][0])):
532
      logging.debug("Job %s changed", job.id)
533
      return (job_info, log_entries)
534

    
535
    return None
536

    
537

    
538
class _JobFileChangesWaiter(object):
539
  def __init__(self, filename):
540
    """Initializes this class.
541

542
    @type filename: string
543
    @param filename: Path to job file
544
    @raises errors.InotifyError: if the notifier cannot be setup
545

546
    """
547
    self._wm = pyinotify.WatchManager()
548
    self._inotify_handler = \
549
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
550
    self._notifier = \
551
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
552
    try:
553
      self._inotify_handler.enable()
554
    except Exception:
555
      # pyinotify doesn't close file descriptors automatically
556
      self._notifier.stop()
557
      raise
558

    
559
  def _OnInotify(self, notifier_enabled):
560
    """Callback for inotify.
561

562
    """
563
    if not notifier_enabled:
564
      self._inotify_handler.enable()
565

    
566
  def Wait(self, timeout):
567
    """Waits for the job file to change.
568

569
    @type timeout: float
570
    @param timeout: Timeout in seconds
571
    @return: Whether there have been events
572

573
    """
574
    assert timeout >= 0
575
    have_events = self._notifier.check_events(timeout * 1000)
576
    if have_events:
577
      self._notifier.read_events()
578
    self._notifier.process_events()
579
    return have_events
580

    
581
  def Close(self):
582
    """Closes underlying notifier and its file descriptor.
583

584
    """
585
    self._notifier.stop()
586

    
587

    
588
class _JobChangesWaiter(object):
589
  def __init__(self, filename):
590
    """Initializes this class.
591

592
    @type filename: string
593
    @param filename: Path to job file
594

595
    """
596
    self._filewaiter = None
597
    self._filename = filename
598

    
599
  def Wait(self, timeout):
600
    """Waits for a job to change.
601

602
    @type timeout: float
603
    @param timeout: Timeout in seconds
604
    @return: Whether there have been events
605

606
    """
607
    if self._filewaiter:
608
      return self._filewaiter.Wait(timeout)
609

    
610
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
611
    # If this point is reached, return immediately and let caller check the job
612
    # file again in case there were changes since the last check. This avoids a
613
    # race condition.
614
    self._filewaiter = _JobFileChangesWaiter(self._filename)
615

    
616
    return True
617

    
618
  def Close(self):
619
    """Closes underlying waiter.
620

621
    """
622
    if self._filewaiter:
623
      self._filewaiter.Close()
624

    
625

    
626
class _WaitForJobChangesHelper(object):
627
  """Helper class using inotify to wait for changes in a job file.
628

629
  This class takes a previous job status and serial, and alerts the client when
630
  the current job status has changed.
631

632
  """
633
  @staticmethod
634
  def _CheckForChanges(job_load_fn, check_fn):
635
    job = job_load_fn()
636
    if not job:
637
      raise errors.JobLost()
638

    
639
    result = check_fn(job)
640
    if result is None:
641
      raise utils.RetryAgain()
642

    
643
    return result
644

    
645
  def __call__(self, filename, job_load_fn,
646
               fields, prev_job_info, prev_log_serial, timeout):
647
    """Waits for changes on a job.
648

649
    @type filename: string
650
    @param filename: File on which to wait for changes
651
    @type job_load_fn: callable
652
    @param job_load_fn: Function to load job
653
    @type fields: list of strings
654
    @param fields: Which fields to check for changes
655
    @type prev_job_info: list or None
656
    @param prev_job_info: Last job information returned
657
    @type prev_log_serial: int
658
    @param prev_log_serial: Last job message serial number
659
    @type timeout: float
660
    @param timeout: maximum time to wait in seconds
661

662
    """
663
    try:
664
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
665
      waiter = _JobChangesWaiter(filename)
666
      try:
667
        return utils.Retry(compat.partial(self._CheckForChanges,
668
                                          job_load_fn, check_fn),
669
                           utils.RETRY_REMAINING_TIME, timeout,
670
                           wait_fn=waiter.Wait)
671
      finally:
672
        waiter.Close()
673
    except (errors.InotifyError, errors.JobLost):
674
      return None
675
    except utils.RetryTimeout:
676
      return constants.JOB_NOTCHANGED
677

    
678

    
679
class _JobQueueWorker(workerpool.BaseWorker):
680
  """The actual job workers.
681

682
  """
683
  def RunTask(self, job): # pylint: disable-msg=W0221
684
    """Job executor.
685

686
    This functions processes a job. It is closely tied to the _QueuedJob and
687
    _QueuedOpCode classes.
688

689
    @type job: L{_QueuedJob}
690
    @param job: the job to be processed
691

692
    """
693
    logging.info("Processing job %s", job.id)
694
    proc = mcpu.Processor(self.pool.queue.context, job.id)
695
    queue = job.queue
696
    try:
697
      try:
698
        count = len(job.ops)
699
        for idx, op in enumerate(job.ops):
700
          op_summary = op.input.Summary()
701
          if op.status == constants.OP_STATUS_SUCCESS:
702
            # this is a job that was partially completed before master
703
            # daemon shutdown, so it can be expected that some opcodes
704
            # are already completed successfully (if any did error
705
            # out, then the whole job should have been aborted and not
706
            # resubmitted for processing)
707
            logging.info("Op %s/%s: opcode %s already processed, skipping",
708
                         idx + 1, count, op_summary)
709
            continue
710
          try:
711
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
712
                         op_summary)
713

    
714
            queue.acquire(shared=1)
715
            try:
716
              if op.status == constants.OP_STATUS_CANCELED:
717
                logging.debug("Canceling opcode")
718
                raise CancelJob()
719
              assert op.status == constants.OP_STATUS_QUEUED
720
              logging.debug("Opcode %s/%s waiting for locks",
721
                            idx + 1, count)
722
              op.status = constants.OP_STATUS_WAITLOCK
723
              op.result = None
724
              op.start_timestamp = TimeStampNow()
725
              if idx == 0: # first opcode
726
                job.start_timestamp = op.start_timestamp
727
              queue.UpdateJobUnlocked(job)
728

    
729
              input_opcode = op.input
730
            finally:
731
              queue.release()
732

    
733
            # Make sure not to hold queue lock while calling ExecOpCode
734
            result = proc.ExecOpCode(input_opcode,
735
                                     _OpExecCallbacks(queue, job, op))
736

    
737
            queue.acquire(shared=1)
738
            try:
739
              logging.debug("Opcode %s/%s succeeded", idx + 1, count)
740
              op.status = constants.OP_STATUS_SUCCESS
741
              op.result = result
742
              op.end_timestamp = TimeStampNow()
743
              if idx == count - 1:
744
                job.end_timestamp = TimeStampNow()
745

    
746
                # Consistency check
747
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
748
                                  for i in job.ops)
749

    
750
              queue.UpdateJobUnlocked(job)
751
            finally:
752
              queue.release()
753

    
754
            logging.info("Op %s/%s: Successfully finished opcode %s",
755
                         idx + 1, count, op_summary)
756
          except CancelJob:
757
            # Will be handled further up
758
            raise
759
          except Exception, err:
760
            queue.acquire(shared=1)
761
            try:
762
              try:
763
                logging.debug("Opcode %s/%s failed", idx + 1, count)
764
                op.status = constants.OP_STATUS_ERROR
765
                if isinstance(err, errors.GenericError):
766
                  to_encode = err
767
                else:
768
                  to_encode = errors.OpExecError(str(err))
769
                op.result = errors.EncodeException(to_encode)
770
                op.end_timestamp = TimeStampNow()
771
                logging.info("Op %s/%s: Error in opcode %s: %s",
772
                             idx + 1, count, op_summary, err)
773

    
774
                to_encode = errors.OpExecError("Preceding opcode failed")
775
                job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
776
                                      errors.EncodeException(to_encode))
777

    
778
                # Consistency check
779
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
780
                                  for i in job.ops[:idx])
781
                assert compat.all(i.status == constants.OP_STATUS_ERROR and
782
                                  errors.GetEncodedError(i.result)
783
                                  for i in job.ops[idx:])
784
              finally:
785
                job.end_timestamp = TimeStampNow()
786
                queue.UpdateJobUnlocked(job)
787
            finally:
788
              queue.release()
789
            raise
790

    
791
      except CancelJob:
792
        queue.acquire(shared=1)
793
        try:
794
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
795
                                "Job canceled by request")
796
          job.end_timestamp = TimeStampNow()
797
          queue.UpdateJobUnlocked(job)
798
        finally:
799
          queue.release()
800
      except errors.GenericError, err:
801
        logging.exception("Ganeti exception")
802
      except:
803
        logging.exception("Unhandled exception")
804
    finally:
805
      status = job.CalcStatus()
806
      logging.info("Finished job %s, status = %s", job.id, status)
807

    
808

    
809
class _JobQueueWorkerPool(workerpool.WorkerPool):
810
  """Simple class implementing a job-processing workerpool.
811

812
  """
813
  def __init__(self, queue):
814
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
815
                                              JOBQUEUE_THREADS,
816
                                              _JobQueueWorker)
817
    self.queue = queue
818

    
819

    
820
def _RequireOpenQueue(fn):
821
  """Decorator for "public" functions.
822

823
  This function should be used for all 'public' functions. That is,
824
  functions usually called from other classes. Note that this should
825
  be applied only to methods (not plain functions), since it expects
826
  that the decorated function is called with a first argument that has
827
  a '_queue_filelock' argument.
828

829
  @warning: Use this decorator only after locking.ssynchronized
830

831
  Example::
832
    @locking.ssynchronized(_LOCK)
833
    @_RequireOpenQueue
834
    def Example(self):
835
      pass
836

837
  """
838
  def wrapper(self, *args, **kwargs):
839
    # pylint: disable-msg=W0212
840
    assert self._queue_filelock is not None, "Queue should be open"
841
    return fn(self, *args, **kwargs)
842
  return wrapper
843

    
844

    
845
class JobQueue(object):
846
  """Queue used to manage the jobs.
847

848
  @cvar _RE_JOB_FILE: regex matching the valid job file names
849

850
  """
851
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
852

    
853
  def __init__(self, context):
854
    """Constructor for JobQueue.
855

856
    The constructor will initialize the job queue object and then
857
    start loading the current jobs from disk, either for starting them
858
    (if they were queue) or for aborting them (if they were already
859
    running).
860

861
    @type context: GanetiContext
862
    @param context: the context object for access to the configuration
863
        data and other ganeti objects
864

865
    """
866
    self.context = context
867
    self._memcache = weakref.WeakValueDictionary()
868
    self._my_hostname = netutils.HostInfo().name
869

    
870
    # The Big JobQueue lock. If a code block or method acquires it in shared
871
    # mode safe it must guarantee concurrency with all the code acquiring it in
872
    # shared mode, including itself. In order not to acquire it at all
873
    # concurrency must be guaranteed with all code acquiring it in shared mode
874
    # and all code acquiring it exclusively.
875
    self._lock = locking.SharedLock("JobQueue")
876

    
877
    self.acquire = self._lock.acquire
878
    self.release = self._lock.release
879

    
880
    # Initialize the queue, and acquire the filelock.
881
    # This ensures no other process is working on the job queue.
882
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
883

    
884
    # Read serial file
885
    self._last_serial = jstore.ReadSerial()
886
    assert self._last_serial is not None, ("Serial file was modified between"
887
                                           " check in jstore and here")
888

    
889
    # Get initial list of nodes
890
    self._nodes = dict((n.name, n.primary_ip)
891
                       for n in self.context.cfg.GetAllNodesInfo().values()
892
                       if n.master_candidate)
893

    
894
    # Remove master node
895
    self._nodes.pop(self._my_hostname, None)
896

    
897
    # TODO: Check consistency across nodes
898

    
899
    self._queue_size = 0
900
    self._UpdateQueueSizeUnlocked()
901
    self._drained = self._IsQueueMarkedDrain()
902

    
903
    # Setup worker pool
904
    self._wpool = _JobQueueWorkerPool(self)
905
    try:
906
      # We need to lock here because WorkerPool.AddTask() may start a job while
907
      # we're still doing our work.
908
      self.acquire()
909
      try:
910
        logging.info("Inspecting job queue")
911

    
912
        all_job_ids = self._GetJobIDsUnlocked()
913
        jobs_count = len(all_job_ids)
914
        lastinfo = time.time()
915
        for idx, job_id in enumerate(all_job_ids):
916
          # Give an update every 1000 jobs or 10 seconds
917
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
918
              idx == (jobs_count - 1)):
919
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
920
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
921
            lastinfo = time.time()
922

    
923
          job = self._LoadJobUnlocked(job_id)
924

    
925
          # a failure in loading the job can cause 'None' to be returned
926
          if job is None:
927
            continue
928

    
929
          status = job.CalcStatus()
930

    
931
          if status in (constants.JOB_STATUS_QUEUED, ):
932
            self._wpool.AddTask((job, ))
933

    
934
          elif status in (constants.JOB_STATUS_RUNNING,
935
                          constants.JOB_STATUS_WAITLOCK,
936
                          constants.JOB_STATUS_CANCELING):
937
            logging.warning("Unfinished job %s found: %s", job.id, job)
938
            job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
939
                                  "Unclean master daemon shutdown")
940

    
941
        logging.info("Job queue inspection finished")
942
      finally:
943
        self.release()
944
    except:
945
      self._wpool.TerminateWorkers()
946
      raise
947

    
948
  @locking.ssynchronized(_LOCK)
949
  @_RequireOpenQueue
950
  def AddNode(self, node):
951
    """Register a new node with the queue.
952

953
    @type node: L{objects.Node}
954
    @param node: the node object to be added
955

956
    """
957
    node_name = node.name
958
    assert node_name != self._my_hostname
959

    
960
    # Clean queue directory on added node
961
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
962
    msg = result.fail_msg
963
    if msg:
964
      logging.warning("Cannot cleanup queue directory on node %s: %s",
965
                      node_name, msg)
966

    
967
    if not node.master_candidate:
968
      # remove if existing, ignoring errors
969
      self._nodes.pop(node_name, None)
970
      # and skip the replication of the job ids
971
      return
972

    
973
    # Upload the whole queue excluding archived jobs
974
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
975

    
976
    # Upload current serial file
977
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
978

    
979
    for file_name in files:
980
      # Read file content
981
      content = utils.ReadFile(file_name)
982

    
983
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
984
                                                  [node.primary_ip],
985
                                                  file_name, content)
986
      msg = result[node_name].fail_msg
987
      if msg:
988
        logging.error("Failed to upload file %s to node %s: %s",
989
                      file_name, node_name, msg)
990

    
991
    self._nodes[node_name] = node.primary_ip
992

    
993
  @locking.ssynchronized(_LOCK)
994
  @_RequireOpenQueue
995
  def RemoveNode(self, node_name):
996
    """Callback called when removing nodes from the cluster.
997

998
    @type node_name: str
999
    @param node_name: the name of the node to remove
1000

1001
    """
1002
    self._nodes.pop(node_name, None)
1003

    
1004
  @staticmethod
1005
  def _CheckRpcResult(result, nodes, failmsg):
1006
    """Verifies the status of an RPC call.
1007

1008
    Since we aim to keep consistency should this node (the current
1009
    master) fail, we will log errors if our rpc fail, and especially
1010
    log the case when more than half of the nodes fails.
1011

1012
    @param result: the data as returned from the rpc call
1013
    @type nodes: list
1014
    @param nodes: the list of nodes we made the call to
1015
    @type failmsg: str
1016
    @param failmsg: the identifier to be used for logging
1017

1018
    """
1019
    failed = []
1020
    success = []
1021

    
1022
    for node in nodes:
1023
      msg = result[node].fail_msg
1024
      if msg:
1025
        failed.append(node)
1026
        logging.error("RPC call %s (%s) failed on node %s: %s",
1027
                      result[node].call, failmsg, node, msg)
1028
      else:
1029
        success.append(node)
1030

    
1031
    # +1 for the master node
1032
    if (len(success) + 1) < len(failed):
1033
      # TODO: Handle failing nodes
1034
      logging.error("More than half of the nodes failed")
1035

    
1036
  def _GetNodeIp(self):
1037
    """Helper for returning the node name/ip list.
1038

1039
    @rtype: (list, list)
1040
    @return: a tuple of two lists, the first one with the node
1041
        names and the second one with the node addresses
1042

1043
    """
1044
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1045
    name_list = self._nodes.keys()
1046
    addr_list = [self._nodes[name] for name in name_list]
1047
    return name_list, addr_list
1048

    
1049
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1050
    """Writes a file locally and then replicates it to all nodes.
1051

1052
    This function will replace the contents of a file on the local
1053
    node and then replicate it to all the other nodes we have.
1054

1055
    @type file_name: str
1056
    @param file_name: the path of the file to be replicated
1057
    @type data: str
1058
    @param data: the new contents of the file
1059
    @type replicate: boolean
1060
    @param replicate: whether to spread the changes to the remote nodes
1061

1062
    """
1063
    utils.WriteFile(file_name, data=data)
1064

    
1065
    if replicate:
1066
      names, addrs = self._GetNodeIp()
1067
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1068
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1069

    
1070
  def _RenameFilesUnlocked(self, rename):
1071
    """Renames a file locally and then replicate the change.
1072

1073
    This function will rename a file in the local queue directory
1074
    and then replicate this rename to all the other nodes we have.
1075

1076
    @type rename: list of (old, new)
1077
    @param rename: List containing tuples mapping old to new names
1078

1079
    """
1080
    # Rename them locally
1081
    for old, new in rename:
1082
      utils.RenameFile(old, new, mkdir=True)
1083

    
1084
    # ... and on all nodes
1085
    names, addrs = self._GetNodeIp()
1086
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1087
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1088

    
1089
  @staticmethod
1090
  def _FormatJobID(job_id):
1091
    """Convert a job ID to string format.
1092

1093
    Currently this just does C{str(job_id)} after performing some
1094
    checks, but if we want to change the job id format this will
1095
    abstract this change.
1096

1097
    @type job_id: int or long
1098
    @param job_id: the numeric job id
1099
    @rtype: str
1100
    @return: the formatted job id
1101

1102
    """
1103
    if not isinstance(job_id, (int, long)):
1104
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1105
    if job_id < 0:
1106
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1107

    
1108
    return str(job_id)
1109

    
1110
  @classmethod
1111
  def _GetArchiveDirectory(cls, job_id):
1112
    """Returns the archive directory for a job.
1113

1114
    @type job_id: str
1115
    @param job_id: Job identifier
1116
    @rtype: str
1117
    @return: Directory name
1118

1119
    """
1120
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1121

    
1122
  def _NewSerialsUnlocked(self, count):
1123
    """Generates a new job identifier.
1124

1125
    Job identifiers are unique during the lifetime of a cluster.
1126

1127
    @type count: integer
1128
    @param count: how many serials to return
1129
    @rtype: str
1130
    @return: a string representing the job identifier.
1131

1132
    """
1133
    assert count > 0
1134
    # New number
1135
    serial = self._last_serial + count
1136

    
1137
    # Write to file
1138
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1139
                             "%s\n" % serial, True)
1140

    
1141
    result = [self._FormatJobID(v)
1142
              for v in range(self._last_serial, serial + 1)]
1143
    # Keep it only if we were able to write the file
1144
    self._last_serial = serial
1145

    
1146
    return result
1147

    
1148
  @staticmethod
1149
  def _GetJobPath(job_id):
1150
    """Returns the job file for a given job id.
1151

1152
    @type job_id: str
1153
    @param job_id: the job identifier
1154
    @rtype: str
1155
    @return: the path to the job file
1156

1157
    """
1158
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1159

    
1160
  @classmethod
1161
  def _GetArchivedJobPath(cls, job_id):
1162
    """Returns the archived job file for a give job id.
1163

1164
    @type job_id: str
1165
    @param job_id: the job identifier
1166
    @rtype: str
1167
    @return: the path to the archived job file
1168

1169
    """
1170
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1171
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1172

    
1173
  def _GetJobIDsUnlocked(self, sort=True):
1174
    """Return all known job IDs.
1175

1176
    The method only looks at disk because it's a requirement that all
1177
    jobs are present on disk (so in the _memcache we don't have any
1178
    extra IDs).
1179

1180
    @type sort: boolean
1181
    @param sort: perform sorting on the returned job ids
1182
    @rtype: list
1183
    @return: the list of job IDs
1184

1185
    """
1186
    jlist = []
1187
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1188
      m = self._RE_JOB_FILE.match(filename)
1189
      if m:
1190
        jlist.append(m.group(1))
1191
    if sort:
1192
      jlist = utils.NiceSort(jlist)
1193
    return jlist
1194

    
1195
  def _LoadJobUnlocked(self, job_id):
1196
    """Loads a job from the disk or memory.
1197

1198
    Given a job id, this will return the cached job object if
1199
    existing, or try to load the job from the disk. If loading from
1200
    disk, it will also add the job to the cache.
1201

1202
    @param job_id: the job id
1203
    @rtype: L{_QueuedJob} or None
1204
    @return: either None or the job object
1205

1206
    """
1207
    job = self._memcache.get(job_id, None)
1208
    if job:
1209
      logging.debug("Found job %s in memcache", job_id)
1210
      return job
1211

    
1212
    try:
1213
      job = self._LoadJobFromDisk(job_id)
1214
      if job is None:
1215
        return job
1216
    except errors.JobFileCorrupted:
1217
      old_path = self._GetJobPath(job_id)
1218
      new_path = self._GetArchivedJobPath(job_id)
1219
      if old_path == new_path:
1220
        # job already archived (future case)
1221
        logging.exception("Can't parse job %s", job_id)
1222
      else:
1223
        # non-archived case
1224
        logging.exception("Can't parse job %s, will archive.", job_id)
1225
        self._RenameFilesUnlocked([(old_path, new_path)])
1226
      return None
1227

    
1228
    self._memcache[job_id] = job
1229
    logging.debug("Added job %s to the cache", job_id)
1230
    return job
1231

    
1232
  def _LoadJobFromDisk(self, job_id):
1233
    """Load the given job file from disk.
1234

1235
    Given a job file, read, load and restore it in a _QueuedJob format.
1236

1237
    @type job_id: string
1238
    @param job_id: job identifier
1239
    @rtype: L{_QueuedJob} or None
1240
    @return: either None or the job object
1241

1242
    """
1243
    filepath = self._GetJobPath(job_id)
1244
    logging.debug("Loading job from %s", filepath)
1245
    try:
1246
      raw_data = utils.ReadFile(filepath)
1247
    except EnvironmentError, err:
1248
      if err.errno in (errno.ENOENT, ):
1249
        return None
1250
      raise
1251

    
1252
    try:
1253
      data = serializer.LoadJson(raw_data)
1254
      job = _QueuedJob.Restore(self, data)
1255
    except Exception, err: # pylint: disable-msg=W0703
1256
      raise errors.JobFileCorrupted(err)
1257

    
1258
    return job
1259

    
1260
  def SafeLoadJobFromDisk(self, job_id):
1261
    """Load the given job file from disk.
1262

1263
    Given a job file, read, load and restore it in a _QueuedJob format.
1264
    In case of error reading the job, it gets returned as None, and the
1265
    exception is logged.
1266

1267
    @type job_id: string
1268
    @param job_id: job identifier
1269
    @rtype: L{_QueuedJob} or None
1270
    @return: either None or the job object
1271

1272
    """
1273
    try:
1274
      return self._LoadJobFromDisk(job_id)
1275
    except (errors.JobFileCorrupted, EnvironmentError):
1276
      logging.exception("Can't load/parse job %s", job_id)
1277
      return None
1278

    
1279
  @staticmethod
1280
  def _IsQueueMarkedDrain():
1281
    """Check if the queue is marked from drain.
1282

1283
    This currently uses the queue drain file, which makes it a
1284
    per-node flag. In the future this can be moved to the config file.
1285

1286
    @rtype: boolean
1287
    @return: True of the job queue is marked for draining
1288

1289
    """
1290
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1291

    
1292
  def _UpdateQueueSizeUnlocked(self):
1293
    """Update the queue size.
1294

1295
    """
1296
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1297

    
1298
  @locking.ssynchronized(_LOCK)
1299
  @_RequireOpenQueue
1300
  def SetDrainFlag(self, drain_flag):
1301
    """Sets the drain flag for the queue.
1302

1303
    @type drain_flag: boolean
1304
    @param drain_flag: Whether to set or unset the drain flag
1305

1306
    """
1307
    if drain_flag:
1308
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1309
    else:
1310
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1311

    
1312
    self._drained = drain_flag
1313

    
1314
    return True
1315

    
1316
  @_RequireOpenQueue
1317
  def _SubmitJobUnlocked(self, job_id, ops):
1318
    """Create and store a new job.
1319

1320
    This enters the job into our job queue and also puts it on the new
1321
    queue, in order for it to be picked up by the queue processors.
1322

1323
    @type job_id: job ID
1324
    @param job_id: the job ID for the new job
1325
    @type ops: list
1326
    @param ops: The list of OpCodes that will become the new job.
1327
    @rtype: L{_QueuedJob}
1328
    @return: the job object to be queued
1329
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1330
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1331

1332
    """
1333
    # Ok when sharing the big job queue lock, as the drain file is created when
1334
    # the lock is exclusive.
1335
    if self._drained:
1336
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1337

    
1338
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1339
      raise errors.JobQueueFull()
1340

    
1341
    job = _QueuedJob(self, job_id, ops)
1342

    
1343
    # Write to disk
1344
    self.UpdateJobUnlocked(job)
1345

    
1346
    self._queue_size += 1
1347

    
1348
    logging.debug("Adding new job %s to the cache", job_id)
1349
    self._memcache[job_id] = job
1350

    
1351
    return job
1352

    
1353
  @locking.ssynchronized(_LOCK)
1354
  @_RequireOpenQueue
1355
  def SubmitJob(self, ops):
1356
    """Create and store a new job.
1357

1358
    @see: L{_SubmitJobUnlocked}
1359

1360
    """
1361
    job_id = self._NewSerialsUnlocked(1)[0]
1362
    self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
1363
    return job_id
1364

    
1365
  @locking.ssynchronized(_LOCK)
1366
  @_RequireOpenQueue
1367
  def SubmitManyJobs(self, jobs):
1368
    """Create and store multiple jobs.
1369

1370
    @see: L{_SubmitJobUnlocked}
1371

1372
    """
1373
    results = []
1374
    tasks = []
1375
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1376
    for job_id, ops in zip(all_job_ids, jobs):
1377
      try:
1378
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1379
        status = True
1380
        data = job_id
1381
      except errors.GenericError, err:
1382
        data = str(err)
1383
        status = False
1384
      results.append((status, data))
1385
    self._wpool.AddManyTasks(tasks)
1386

    
1387
    return results
1388

    
1389
  @_RequireOpenQueue
1390
  def UpdateJobUnlocked(self, job, replicate=True):
1391
    """Update a job's on disk storage.
1392

1393
    After a job has been modified, this function needs to be called in
1394
    order to write the changes to disk and replicate them to the other
1395
    nodes.
1396

1397
    @type job: L{_QueuedJob}
1398
    @param job: the changed job
1399
    @type replicate: boolean
1400
    @param replicate: whether to replicate the change to remote nodes
1401

1402
    """
1403
    filename = self._GetJobPath(job.id)
1404
    data = serializer.DumpJson(job.Serialize(), indent=False)
1405
    logging.debug("Writing job %s to %s", job.id, filename)
1406
    self._UpdateJobQueueFile(filename, data, replicate)
1407

    
1408
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1409
                        timeout):
1410
    """Waits for changes in a job.
1411

1412
    @type job_id: string
1413
    @param job_id: Job identifier
1414
    @type fields: list of strings
1415
    @param fields: Which fields to check for changes
1416
    @type prev_job_info: list or None
1417
    @param prev_job_info: Last job information returned
1418
    @type prev_log_serial: int
1419
    @param prev_log_serial: Last job message serial number
1420
    @type timeout: float
1421
    @param timeout: maximum time to wait in seconds
1422
    @rtype: tuple (job info, log entries)
1423
    @return: a tuple of the job information as required via
1424
        the fields parameter, and the log entries as a list
1425

1426
        if the job has not changed and the timeout has expired,
1427
        we instead return a special value,
1428
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1429
        as such by the clients
1430

1431
    """
1432
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1433

    
1434
    helper = _WaitForJobChangesHelper()
1435

    
1436
    return helper(self._GetJobPath(job_id), load_fn,
1437
                  fields, prev_job_info, prev_log_serial, timeout)
1438

    
1439
  @locking.ssynchronized(_LOCK)
1440
  @_RequireOpenQueue
1441
  def CancelJob(self, job_id):
1442
    """Cancels a job.
1443

1444
    This will only succeed if the job has not started yet.
1445

1446
    @type job_id: string
1447
    @param job_id: job ID of job to be cancelled.
1448

1449
    """
1450
    logging.info("Cancelling job %s", job_id)
1451

    
1452
    job = self._LoadJobUnlocked(job_id)
1453
    if not job:
1454
      logging.debug("Job %s not found", job_id)
1455
      return (False, "Job %s not found" % job_id)
1456

    
1457
    job_status = job.CalcStatus()
1458

    
1459
    if job_status not in (constants.JOB_STATUS_QUEUED,
1460
                          constants.JOB_STATUS_WAITLOCK):
1461
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1462
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1463

    
1464
    if job_status == constants.JOB_STATUS_QUEUED:
1465
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1466
                            "Job canceled by request")
1467
      return (True, "Job %s canceled" % job.id)
1468

    
1469
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1470
      # The worker will notice the new status and cancel the job
1471
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1472
      return (True, "Job %s will be canceled" % job.id)
1473

    
1474
  @_RequireOpenQueue
1475
  def _ArchiveJobsUnlocked(self, jobs):
1476
    """Archives jobs.
1477

1478
    @type jobs: list of L{_QueuedJob}
1479
    @param jobs: Job objects
1480
    @rtype: int
1481
    @return: Number of archived jobs
1482

1483
    """
1484
    archive_jobs = []
1485
    rename_files = []
1486
    for job in jobs:
1487
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1488
        logging.debug("Job %s is not yet done", job.id)
1489
        continue
1490

    
1491
      archive_jobs.append(job)
1492

    
1493
      old = self._GetJobPath(job.id)
1494
      new = self._GetArchivedJobPath(job.id)
1495
      rename_files.append((old, new))
1496

    
1497
    # TODO: What if 1..n files fail to rename?
1498
    self._RenameFilesUnlocked(rename_files)
1499

    
1500
    logging.debug("Successfully archived job(s) %s",
1501
                  utils.CommaJoin(job.id for job in archive_jobs))
1502

    
1503
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1504
    # the files, we update the cached queue size from the filesystem. When we
1505
    # get around to fix the TODO: above, we can use the number of actually
1506
    # archived jobs to fix this.
1507
    self._UpdateQueueSizeUnlocked()
1508
    return len(archive_jobs)
1509

    
1510
  @locking.ssynchronized(_LOCK)
1511
  @_RequireOpenQueue
1512
  def ArchiveJob(self, job_id):
1513
    """Archives a job.
1514

1515
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1516

1517
    @type job_id: string
1518
    @param job_id: Job ID of job to be archived.
1519
    @rtype: bool
1520
    @return: Whether job was archived
1521

1522
    """
1523
    logging.info("Archiving job %s", job_id)
1524

    
1525
    job = self._LoadJobUnlocked(job_id)
1526
    if not job:
1527
      logging.debug("Job %s not found", job_id)
1528
      return False
1529

    
1530
    return self._ArchiveJobsUnlocked([job]) == 1
1531

    
1532
  @locking.ssynchronized(_LOCK)
1533
  @_RequireOpenQueue
1534
  def AutoArchiveJobs(self, age, timeout):
1535
    """Archives all jobs based on age.
1536

1537
    The method will archive all jobs which are older than the age
1538
    parameter. For jobs that don't have an end timestamp, the start
1539
    timestamp will be considered. The special '-1' age will cause
1540
    archival of all jobs (that are not running or queued).
1541

1542
    @type age: int
1543
    @param age: the minimum age in seconds
1544

1545
    """
1546
    logging.info("Archiving jobs with age more than %s seconds", age)
1547

    
1548
    now = time.time()
1549
    end_time = now + timeout
1550
    archived_count = 0
1551
    last_touched = 0
1552

    
1553
    all_job_ids = self._GetJobIDsUnlocked()
1554
    pending = []
1555
    for idx, job_id in enumerate(all_job_ids):
1556
      last_touched = idx + 1
1557

    
1558
      # Not optimal because jobs could be pending
1559
      # TODO: Measure average duration for job archival and take number of
1560
      # pending jobs into account.
1561
      if time.time() > end_time:
1562
        break
1563

    
1564
      # Returns None if the job failed to load
1565
      job = self._LoadJobUnlocked(job_id)
1566
      if job:
1567
        if job.end_timestamp is None:
1568
          if job.start_timestamp is None:
1569
            job_age = job.received_timestamp
1570
          else:
1571
            job_age = job.start_timestamp
1572
        else:
1573
          job_age = job.end_timestamp
1574

    
1575
        if age == -1 or now - job_age[0] > age:
1576
          pending.append(job)
1577

    
1578
          # Archive 10 jobs at a time
1579
          if len(pending) >= 10:
1580
            archived_count += self._ArchiveJobsUnlocked(pending)
1581
            pending = []
1582

    
1583
    if pending:
1584
      archived_count += self._ArchiveJobsUnlocked(pending)
1585

    
1586
    return (archived_count, len(all_job_ids) - last_touched)
1587

    
1588
  def QueryJobs(self, job_ids, fields):
1589
    """Returns a list of jobs in queue.
1590

1591
    @type job_ids: list
1592
    @param job_ids: sequence of job identifiers or None for all
1593
    @type fields: list
1594
    @param fields: names of fields to return
1595
    @rtype: list
1596
    @return: list one element per job, each element being list with
1597
        the requested fields
1598

1599
    """
1600
    jobs = []
1601
    list_all = False
1602
    if not job_ids:
1603
      # Since files are added to/removed from the queue atomically, there's no
1604
      # risk of getting the job ids in an inconsistent state.
1605
      job_ids = self._GetJobIDsUnlocked()
1606
      list_all = True
1607

    
1608
    for job_id in job_ids:
1609
      job = self.SafeLoadJobFromDisk(job_id)
1610
      if job is not None:
1611
        jobs.append(job.GetInfo(fields))
1612
      elif not list_all:
1613
        jobs.append(None)
1614

    
1615
    return jobs
1616

    
1617
  @locking.ssynchronized(_LOCK)
1618
  @_RequireOpenQueue
1619
  def Shutdown(self):
1620
    """Stops the job queue.
1621

1622
    This shutdowns all the worker threads an closes the queue.
1623

1624
    """
1625
    self._wpool.TerminateWorkers()
1626

    
1627
    self._queue_filelock.Close()
1628
    self._queue_filelock = None