Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 4008c8ed

History | View | Annotate | Download (49 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import errno
35
import re
36
import time
37
import weakref
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import netutils
57
from ganeti import compat
58

    
59

    
60
JOBQUEUE_THREADS = 25
61
JOBS_PER_ARCHIVE_DIRECTORY = 10000
62

    
63
# member lock names to be passed to @ssynchronized decorator
64
_LOCK = "_lock"
65
_QUEUE = "_queue"
66

    
67

    
68
class CancelJob(Exception):
69
  """Special exception to cancel a job.
70

71
  """
72

    
73

    
74
def TimeStampNow():
75
  """Returns the current timestamp.
76

77
  @rtype: tuple
78
  @return: the current time in the (seconds, microseconds) format
79

80
  """
81
  return utils.SplitTime(time.time())
82

    
83

    
84
class _QueuedOpCode(object):
85
  """Encapsulates an opcode object.
86

87
  @ivar log: holds the execution log and consists of tuples
88
  of the form C{(log_serial, timestamp, level, message)}
89
  @ivar input: the OpCode we encapsulate
90
  @ivar status: the current status
91
  @ivar result: the result of the LU execution
92
  @ivar start_timestamp: timestamp for the start of the execution
93
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94
  @ivar stop_timestamp: timestamp for the end of the execution
95

96
  """
97
  __slots__ = ["input", "status", "result", "log",
98
               "start_timestamp", "exec_timestamp", "end_timestamp",
99
               "__weakref__"]
100

    
101
  def __init__(self, op):
102
    """Constructor for the _QuededOpCode.
103

104
    @type op: L{opcodes.OpCode}
105
    @param op: the opcode we encapsulate
106

107
    """
108
    self.input = op
109
    self.status = constants.OP_STATUS_QUEUED
110
    self.result = None
111
    self.log = []
112
    self.start_timestamp = None
113
    self.exec_timestamp = None
114
    self.end_timestamp = None
115

    
116
  @classmethod
117
  def Restore(cls, state):
118
    """Restore the _QueuedOpCode from the serialized form.
119

120
    @type state: dict
121
    @param state: the serialized state
122
    @rtype: _QueuedOpCode
123
    @return: a new _QueuedOpCode instance
124

125
    """
126
    obj = _QueuedOpCode.__new__(cls)
127
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
128
    obj.status = state["status"]
129
    obj.result = state["result"]
130
    obj.log = state["log"]
131
    obj.start_timestamp = state.get("start_timestamp", None)
132
    obj.exec_timestamp = state.get("exec_timestamp", None)
133
    obj.end_timestamp = state.get("end_timestamp", None)
134
    return obj
135

    
136
  def Serialize(self):
137
    """Serializes this _QueuedOpCode.
138

139
    @rtype: dict
140
    @return: the dictionary holding the serialized state
141

142
    """
143
    return {
144
      "input": self.input.__getstate__(),
145
      "status": self.status,
146
      "result": self.result,
147
      "log": self.log,
148
      "start_timestamp": self.start_timestamp,
149
      "exec_timestamp": self.exec_timestamp,
150
      "end_timestamp": self.end_timestamp,
151
      }
152

    
153

    
154
class _QueuedJob(object):
155
  """In-memory job representation.
156

157
  This is what we use to track the user-submitted jobs. Locking must
158
  be taken care of by users of this class.
159

160
  @type queue: L{JobQueue}
161
  @ivar queue: the parent queue
162
  @ivar id: the job ID
163
  @type ops: list
164
  @ivar ops: the list of _QueuedOpCode that constitute the job
165
  @type log_serial: int
166
  @ivar log_serial: holds the index for the next log entry
167
  @ivar received_timestamp: the timestamp for when the job was received
168
  @ivar start_timestmap: the timestamp for start of execution
169
  @ivar end_timestamp: the timestamp for end of execution
170

171
  """
172
  # pylint: disable-msg=W0212
173
  __slots__ = ["queue", "id", "ops", "log_serial",
174
               "received_timestamp", "start_timestamp", "end_timestamp",
175
               "__weakref__"]
176

    
177
  def __init__(self, queue, job_id, ops):
178
    """Constructor for the _QueuedJob.
179

180
    @type queue: L{JobQueue}
181
    @param queue: our parent queue
182
    @type job_id: job_id
183
    @param job_id: our job id
184
    @type ops: list
185
    @param ops: the list of opcodes we hold, which will be encapsulated
186
        in _QueuedOpCodes
187

188
    """
189
    if not ops:
190
      raise errors.GenericError("A job needs at least one opcode")
191

    
192
    self.queue = queue
193
    self.id = job_id
194
    self.ops = [_QueuedOpCode(op) for op in ops]
195
    self.log_serial = 0
196
    self.received_timestamp = TimeStampNow()
197
    self.start_timestamp = None
198
    self.end_timestamp = None
199

    
200
  def __repr__(self):
201
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
202
              "id=%s" % self.id,
203
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
204

    
205
    return "<%s at %#x>" % (" ".join(status), id(self))
206

    
207
  @classmethod
208
  def Restore(cls, queue, state):
209
    """Restore a _QueuedJob from serialized state:
210

211
    @type queue: L{JobQueue}
212
    @param queue: to which queue the restored job belongs
213
    @type state: dict
214
    @param state: the serialized state
215
    @rtype: _JobQueue
216
    @return: the restored _JobQueue instance
217

218
    """
219
    obj = _QueuedJob.__new__(cls)
220
    obj.queue = queue
221
    obj.id = state["id"]
222
    obj.received_timestamp = state.get("received_timestamp", None)
223
    obj.start_timestamp = state.get("start_timestamp", None)
224
    obj.end_timestamp = state.get("end_timestamp", None)
225

    
226
    obj.ops = []
227
    obj.log_serial = 0
228
    for op_state in state["ops"]:
229
      op = _QueuedOpCode.Restore(op_state)
230
      for log_entry in op.log:
231
        obj.log_serial = max(obj.log_serial, log_entry[0])
232
      obj.ops.append(op)
233

    
234
    return obj
235

    
236
  def Serialize(self):
237
    """Serialize the _JobQueue instance.
238

239
    @rtype: dict
240
    @return: the serialized state
241

242
    """
243
    return {
244
      "id": self.id,
245
      "ops": [op.Serialize() for op in self.ops],
246
      "start_timestamp": self.start_timestamp,
247
      "end_timestamp": self.end_timestamp,
248
      "received_timestamp": self.received_timestamp,
249
      }
250

    
251
  def CalcStatus(self):
252
    """Compute the status of this job.
253

254
    This function iterates over all the _QueuedOpCodes in the job and
255
    based on their status, computes the job status.
256

257
    The algorithm is:
258
      - if we find a cancelled, or finished with error, the job
259
        status will be the same
260
      - otherwise, the last opcode with the status one of:
261
          - waitlock
262
          - canceling
263
          - running
264

265
        will determine the job status
266

267
      - otherwise, it means either all opcodes are queued, or success,
268
        and the job status will be the same
269

270
    @return: the job status
271

272
    """
273
    status = constants.JOB_STATUS_QUEUED
274

    
275
    all_success = True
276
    for op in self.ops:
277
      if op.status == constants.OP_STATUS_SUCCESS:
278
        continue
279

    
280
      all_success = False
281

    
282
      if op.status == constants.OP_STATUS_QUEUED:
283
        pass
284
      elif op.status == constants.OP_STATUS_WAITLOCK:
285
        status = constants.JOB_STATUS_WAITLOCK
286
      elif op.status == constants.OP_STATUS_RUNNING:
287
        status = constants.JOB_STATUS_RUNNING
288
      elif op.status == constants.OP_STATUS_CANCELING:
289
        status = constants.JOB_STATUS_CANCELING
290
        break
291
      elif op.status == constants.OP_STATUS_ERROR:
292
        status = constants.JOB_STATUS_ERROR
293
        # The whole job fails if one opcode failed
294
        break
295
      elif op.status == constants.OP_STATUS_CANCELED:
296
        status = constants.OP_STATUS_CANCELED
297
        break
298

    
299
    if all_success:
300
      status = constants.JOB_STATUS_SUCCESS
301

    
302
    return status
303

    
304
  def GetLogEntries(self, newer_than):
305
    """Selectively returns the log entries.
306

307
    @type newer_than: None or int
308
    @param newer_than: if this is None, return all log entries,
309
        otherwise return only the log entries with serial higher
310
        than this value
311
    @rtype: list
312
    @return: the list of the log entries selected
313

314
    """
315
    if newer_than is None:
316
      serial = -1
317
    else:
318
      serial = newer_than
319

    
320
    entries = []
321
    for op in self.ops:
322
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
323

    
324
    return entries
325

    
326
  def GetInfo(self, fields):
327
    """Returns information about a job.
328

329
    @type fields: list
330
    @param fields: names of fields to return
331
    @rtype: list
332
    @return: list with one element for each field
333
    @raise errors.OpExecError: when an invalid field
334
        has been passed
335

336
    """
337
    row = []
338
    for fname in fields:
339
      if fname == "id":
340
        row.append(self.id)
341
      elif fname == "status":
342
        row.append(self.CalcStatus())
343
      elif fname == "ops":
344
        row.append([op.input.__getstate__() for op in self.ops])
345
      elif fname == "opresult":
346
        row.append([op.result for op in self.ops])
347
      elif fname == "opstatus":
348
        row.append([op.status for op in self.ops])
349
      elif fname == "oplog":
350
        row.append([op.log for op in self.ops])
351
      elif fname == "opstart":
352
        row.append([op.start_timestamp for op in self.ops])
353
      elif fname == "opexec":
354
        row.append([op.exec_timestamp for op in self.ops])
355
      elif fname == "opend":
356
        row.append([op.end_timestamp for op in self.ops])
357
      elif fname == "received_ts":
358
        row.append(self.received_timestamp)
359
      elif fname == "start_ts":
360
        row.append(self.start_timestamp)
361
      elif fname == "end_ts":
362
        row.append(self.end_timestamp)
363
      elif fname == "summary":
364
        row.append([op.input.Summary() for op in self.ops])
365
      else:
366
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
367
    return row
368

    
369
  def MarkUnfinishedOps(self, status, result):
370
    """Mark unfinished opcodes with a given status and result.
371

372
    This is an utility function for marking all running or waiting to
373
    be run opcodes with a given status. Opcodes which are already
374
    finalised are not changed.
375

376
    @param status: a given opcode status
377
    @param result: the opcode result
378

379
    """
380
    not_marked = True
381
    for op in self.ops:
382
      if op.status in constants.OPS_FINALIZED:
383
        assert not_marked, "Finalized opcodes found after non-finalized ones"
384
        continue
385
      op.status = status
386
      op.result = result
387
      not_marked = False
388

    
389

    
390
class _OpExecCallbacks(mcpu.OpExecCbBase):
391
  def __init__(self, queue, job, op):
392
    """Initializes this class.
393

394
    @type queue: L{JobQueue}
395
    @param queue: Job queue
396
    @type job: L{_QueuedJob}
397
    @param job: Job object
398
    @type op: L{_QueuedOpCode}
399
    @param op: OpCode
400

401
    """
402
    assert queue, "Queue is missing"
403
    assert job, "Job is missing"
404
    assert op, "Opcode is missing"
405

    
406
    self._queue = queue
407
    self._job = job
408
    self._op = op
409

    
410
  def _CheckCancel(self):
411
    """Raises an exception to cancel the job if asked to.
412

413
    """
414
    # Cancel here if we were asked to
415
    if self._op.status == constants.OP_STATUS_CANCELING:
416
      logging.debug("Canceling opcode")
417
      raise CancelJob()
418

    
419
  @locking.ssynchronized(_QUEUE, shared=1)
420
  def NotifyStart(self):
421
    """Mark the opcode as running, not lock-waiting.
422

423
    This is called from the mcpu code as a notifier function, when the LU is
424
    finally about to start the Exec() method. Of course, to have end-user
425
    visible results, the opcode must be initially (before calling into
426
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
427

428
    """
429
    assert self._op in self._job.ops
430
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
431
                               constants.OP_STATUS_CANCELING)
432

    
433
    # Cancel here if we were asked to
434
    self._CheckCancel()
435

    
436
    logging.debug("Opcode is now running")
437

    
438
    self._op.status = constants.OP_STATUS_RUNNING
439
    self._op.exec_timestamp = TimeStampNow()
440

    
441
    # And finally replicate the job status
442
    self._queue.UpdateJobUnlocked(self._job)
443

    
444
  @locking.ssynchronized(_QUEUE, shared=1)
445
  def _AppendFeedback(self, timestamp, log_type, log_msg):
446
    """Internal feedback append function, with locks
447

448
    """
449
    self._job.log_serial += 1
450
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
451
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
452

    
453
  def Feedback(self, *args):
454
    """Append a log entry.
455

456
    """
457
    assert len(args) < 3
458

    
459
    if len(args) == 1:
460
      log_type = constants.ELOG_MESSAGE
461
      log_msg = args[0]
462
    else:
463
      (log_type, log_msg) = args
464

    
465
    # The time is split to make serialization easier and not lose
466
    # precision.
467
    timestamp = utils.SplitTime(time.time())
468
    self._AppendFeedback(timestamp, log_type, log_msg)
469

    
470
  def ReportLocks(self, msg):
471
    """Write locking information to the job.
472

473
    Called whenever the LU processor is waiting for a lock or has acquired one.
474

475
    """
476
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
477
                               constants.OP_STATUS_CANCELING)
478

    
479
    # Cancel here if we were asked to
480
    self._CheckCancel()
481

    
482

    
483
class _JobChangesChecker(object):
484
  def __init__(self, fields, prev_job_info, prev_log_serial):
485
    """Initializes this class.
486

487
    @type fields: list of strings
488
    @param fields: Fields requested by LUXI client
489
    @type prev_job_info: string
490
    @param prev_job_info: previous job info, as passed by the LUXI client
491
    @type prev_log_serial: string
492
    @param prev_log_serial: previous job serial, as passed by the LUXI client
493

494
    """
495
    self._fields = fields
496
    self._prev_job_info = prev_job_info
497
    self._prev_log_serial = prev_log_serial
498

    
499
  def __call__(self, job):
500
    """Checks whether job has changed.
501

502
    @type job: L{_QueuedJob}
503
    @param job: Job object
504

505
    """
506
    status = job.CalcStatus()
507
    job_info = job.GetInfo(self._fields)
508
    log_entries = job.GetLogEntries(self._prev_log_serial)
509

    
510
    # Serializing and deserializing data can cause type changes (e.g. from
511
    # tuple to list) or precision loss. We're doing it here so that we get
512
    # the same modifications as the data received from the client. Without
513
    # this, the comparison afterwards might fail without the data being
514
    # significantly different.
515
    # TODO: we just deserialized from disk, investigate how to make sure that
516
    # the job info and log entries are compatible to avoid this further step.
517
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
518
    # efficient, though floats will be tricky
519
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
520
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
521

    
522
    # Don't even try to wait if the job is no longer running, there will be
523
    # no changes.
524
    if (status not in (constants.JOB_STATUS_QUEUED,
525
                       constants.JOB_STATUS_RUNNING,
526
                       constants.JOB_STATUS_WAITLOCK) or
527
        job_info != self._prev_job_info or
528
        (log_entries and self._prev_log_serial != log_entries[0][0])):
529
      logging.debug("Job %s changed", job.id)
530
      return (job_info, log_entries)
531

    
532
    return None
533

    
534

    
535
class _JobFileChangesWaiter(object):
536
  def __init__(self, filename):
537
    """Initializes this class.
538

539
    @type filename: string
540
    @param filename: Path to job file
541
    @raises errors.InotifyError: if the notifier cannot be setup
542

543
    """
544
    self._wm = pyinotify.WatchManager()
545
    self._inotify_handler = \
546
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
547
    self._notifier = \
548
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
549
    try:
550
      self._inotify_handler.enable()
551
    except Exception:
552
      # pyinotify doesn't close file descriptors automatically
553
      self._notifier.stop()
554
      raise
555

    
556
  def _OnInotify(self, notifier_enabled):
557
    """Callback for inotify.
558

559
    """
560
    if not notifier_enabled:
561
      self._inotify_handler.enable()
562

    
563
  def Wait(self, timeout):
564
    """Waits for the job file to change.
565

566
    @type timeout: float
567
    @param timeout: Timeout in seconds
568
    @return: Whether there have been events
569

570
    """
571
    assert timeout >= 0
572
    have_events = self._notifier.check_events(timeout * 1000)
573
    if have_events:
574
      self._notifier.read_events()
575
    self._notifier.process_events()
576
    return have_events
577

    
578
  def Close(self):
579
    """Closes underlying notifier and its file descriptor.
580

581
    """
582
    self._notifier.stop()
583

    
584

    
585
class _JobChangesWaiter(object):
586
  def __init__(self, filename):
587
    """Initializes this class.
588

589
    @type filename: string
590
    @param filename: Path to job file
591

592
    """
593
    self._filewaiter = None
594
    self._filename = filename
595

    
596
  def Wait(self, timeout):
597
    """Waits for a job to change.
598

599
    @type timeout: float
600
    @param timeout: Timeout in seconds
601
    @return: Whether there have been events
602

603
    """
604
    if self._filewaiter:
605
      return self._filewaiter.Wait(timeout)
606

    
607
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
608
    # If this point is reached, return immediately and let caller check the job
609
    # file again in case there were changes since the last check. This avoids a
610
    # race condition.
611
    self._filewaiter = _JobFileChangesWaiter(self._filename)
612

    
613
    return True
614

    
615
  def Close(self):
616
    """Closes underlying waiter.
617

618
    """
619
    if self._filewaiter:
620
      self._filewaiter.Close()
621

    
622

    
623
class _WaitForJobChangesHelper(object):
624
  """Helper class using inotify to wait for changes in a job file.
625

626
  This class takes a previous job status and serial, and alerts the client when
627
  the current job status has changed.
628

629
  """
630
  @staticmethod
631
  def _CheckForChanges(job_load_fn, check_fn):
632
    job = job_load_fn()
633
    if not job:
634
      raise errors.JobLost()
635

    
636
    result = check_fn(job)
637
    if result is None:
638
      raise utils.RetryAgain()
639

    
640
    return result
641

    
642
  def __call__(self, filename, job_load_fn,
643
               fields, prev_job_info, prev_log_serial, timeout):
644
    """Waits for changes on a job.
645

646
    @type filename: string
647
    @param filename: File on which to wait for changes
648
    @type job_load_fn: callable
649
    @param job_load_fn: Function to load job
650
    @type fields: list of strings
651
    @param fields: Which fields to check for changes
652
    @type prev_job_info: list or None
653
    @param prev_job_info: Last job information returned
654
    @type prev_log_serial: int
655
    @param prev_log_serial: Last job message serial number
656
    @type timeout: float
657
    @param timeout: maximum time to wait in seconds
658

659
    """
660
    try:
661
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
662
      waiter = _JobChangesWaiter(filename)
663
      try:
664
        return utils.Retry(compat.partial(self._CheckForChanges,
665
                                          job_load_fn, check_fn),
666
                           utils.RETRY_REMAINING_TIME, timeout,
667
                           wait_fn=waiter.Wait)
668
      finally:
669
        waiter.Close()
670
    except (errors.InotifyError, errors.JobLost):
671
      return None
672
    except utils.RetryTimeout:
673
      return constants.JOB_NOTCHANGED
674

    
675

    
676
def _EncodeOpError(err):
677
  """Encodes an error which occurred while processing an opcode.
678

679
  """
680
  if isinstance(err, errors.GenericError):
681
    to_encode = err
682
  else:
683
    to_encode = errors.OpExecError(str(err))
684

    
685
  return errors.EncodeException(to_encode)
686

    
687

    
688
class _JobQueueWorker(workerpool.BaseWorker):
689
  """The actual job workers.
690

691
  """
692
  def RunTask(self, job): # pylint: disable-msg=W0221
693
    """Job executor.
694

695
    This functions processes a job. It is closely tied to the _QueuedJob and
696
    _QueuedOpCode classes.
697

698
    @type job: L{_QueuedJob}
699
    @param job: the job to be processed
700

701
    """
702
    self.SetTaskName("Job%s" % job.id)
703

    
704
    logging.info("Processing job %s", job.id)
705
    proc = mcpu.Processor(self.pool.queue.context, job.id)
706
    queue = job.queue
707
    try:
708
      try:
709
        count = len(job.ops)
710
        for idx, op in enumerate(job.ops):
711
          op_summary = op.input.Summary()
712
          if op.status == constants.OP_STATUS_SUCCESS:
713
            # this is a job that was partially completed before master
714
            # daemon shutdown, so it can be expected that some opcodes
715
            # are already completed successfully (if any did error
716
            # out, then the whole job should have been aborted and not
717
            # resubmitted for processing)
718
            logging.info("Op %s/%s: opcode %s already processed, skipping",
719
                         idx + 1, count, op_summary)
720
            continue
721
          try:
722
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
723
                         op_summary)
724

    
725
            queue.acquire(shared=1)
726
            try:
727
              if op.status == constants.OP_STATUS_CANCELED:
728
                logging.debug("Canceling opcode")
729
                raise CancelJob()
730
              assert op.status == constants.OP_STATUS_QUEUED
731
              logging.debug("Opcode %s/%s waiting for locks",
732
                            idx + 1, count)
733
              op.status = constants.OP_STATUS_WAITLOCK
734
              op.result = None
735
              op.start_timestamp = TimeStampNow()
736
              if idx == 0: # first opcode
737
                job.start_timestamp = op.start_timestamp
738
              queue.UpdateJobUnlocked(job)
739

    
740
              input_opcode = op.input
741
            finally:
742
              queue.release()
743

    
744
            # Make sure not to hold queue lock while calling ExecOpCode
745
            result = proc.ExecOpCode(input_opcode,
746
                                     _OpExecCallbacks(queue, job, op))
747

    
748
            queue.acquire(shared=1)
749
            try:
750
              logging.debug("Opcode %s/%s succeeded", idx + 1, count)
751
              op.status = constants.OP_STATUS_SUCCESS
752
              op.result = result
753
              op.end_timestamp = TimeStampNow()
754
              if idx == count - 1:
755
                job.end_timestamp = TimeStampNow()
756

    
757
                # Consistency check
758
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
759
                                  for i in job.ops)
760

    
761
              queue.UpdateJobUnlocked(job)
762
            finally:
763
              queue.release()
764

    
765
            logging.info("Op %s/%s: Successfully finished opcode %s",
766
                         idx + 1, count, op_summary)
767
          except CancelJob:
768
            # Will be handled further up
769
            raise
770
          except Exception, err:
771
            queue.acquire(shared=1)
772
            try:
773
              try:
774
                logging.debug("Opcode %s/%s failed", idx + 1, count)
775
                op.status = constants.OP_STATUS_ERROR
776
                op.result = _EncodeOpError(err)
777
                op.end_timestamp = TimeStampNow()
778
                logging.info("Op %s/%s: Error in opcode %s: %s",
779
                             idx + 1, count, op_summary, err)
780

    
781
                to_encode = errors.OpExecError("Preceding opcode failed")
782
                job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
783
                                      _EncodeOpError(to_encode))
784

    
785
                # Consistency check
786
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
787
                                  for i in job.ops[:idx])
788
                assert compat.all(i.status == constants.OP_STATUS_ERROR and
789
                                  errors.GetEncodedError(i.result)
790
                                  for i in job.ops[idx:])
791
              finally:
792
                job.end_timestamp = TimeStampNow()
793
                queue.UpdateJobUnlocked(job)
794
            finally:
795
              queue.release()
796
            raise
797

    
798
      except CancelJob:
799
        queue.acquire(shared=1)
800
        try:
801
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
802
                                "Job canceled by request")
803
          job.end_timestamp = TimeStampNow()
804
          queue.UpdateJobUnlocked(job)
805
        finally:
806
          queue.release()
807
      except errors.GenericError, err:
808
        logging.exception("Ganeti exception")
809
      except:
810
        logging.exception("Unhandled exception")
811
    finally:
812
      status = job.CalcStatus()
813
      logging.info("Finished job %s, status = %s", job.id, status)
814

    
815

    
816
class _JobQueueWorkerPool(workerpool.WorkerPool):
817
  """Simple class implementing a job-processing workerpool.
818

819
  """
820
  def __init__(self, queue):
821
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
822
                                              JOBQUEUE_THREADS,
823
                                              _JobQueueWorker)
824
    self.queue = queue
825

    
826

    
827
def _RequireOpenQueue(fn):
828
  """Decorator for "public" functions.
829

830
  This function should be used for all 'public' functions. That is,
831
  functions usually called from other classes. Note that this should
832
  be applied only to methods (not plain functions), since it expects
833
  that the decorated function is called with a first argument that has
834
  a '_queue_filelock' argument.
835

836
  @warning: Use this decorator only after locking.ssynchronized
837

838
  Example::
839
    @locking.ssynchronized(_LOCK)
840
    @_RequireOpenQueue
841
    def Example(self):
842
      pass
843

844
  """
845
  def wrapper(self, *args, **kwargs):
846
    # pylint: disable-msg=W0212
847
    assert self._queue_filelock is not None, "Queue should be open"
848
    return fn(self, *args, **kwargs)
849
  return wrapper
850

    
851

    
852
class JobQueue(object):
853
  """Queue used to manage the jobs.
854

855
  @cvar _RE_JOB_FILE: regex matching the valid job file names
856

857
  """
858
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
859

    
860
  def __init__(self, context):
861
    """Constructor for JobQueue.
862

863
    The constructor will initialize the job queue object and then
864
    start loading the current jobs from disk, either for starting them
865
    (if they were queue) or for aborting them (if they were already
866
    running).
867

868
    @type context: GanetiContext
869
    @param context: the context object for access to the configuration
870
        data and other ganeti objects
871

872
    """
873
    self.context = context
874
    self._memcache = weakref.WeakValueDictionary()
875
    self._my_hostname = netutils.HostInfo().name
876

    
877
    # The Big JobQueue lock. If a code block or method acquires it in shared
878
    # mode safe it must guarantee concurrency with all the code acquiring it in
879
    # shared mode, including itself. In order not to acquire it at all
880
    # concurrency must be guaranteed with all code acquiring it in shared mode
881
    # and all code acquiring it exclusively.
882
    self._lock = locking.SharedLock("JobQueue")
883

    
884
    self.acquire = self._lock.acquire
885
    self.release = self._lock.release
886

    
887
    # Initialize the queue, and acquire the filelock.
888
    # This ensures no other process is working on the job queue.
889
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
890

    
891
    # Read serial file
892
    self._last_serial = jstore.ReadSerial()
893
    assert self._last_serial is not None, ("Serial file was modified between"
894
                                           " check in jstore and here")
895

    
896
    # Get initial list of nodes
897
    self._nodes = dict((n.name, n.primary_ip)
898
                       for n in self.context.cfg.GetAllNodesInfo().values()
899
                       if n.master_candidate)
900

    
901
    # Remove master node
902
    self._nodes.pop(self._my_hostname, None)
903

    
904
    # TODO: Check consistency across nodes
905

    
906
    self._queue_size = 0
907
    self._UpdateQueueSizeUnlocked()
908
    self._drained = self._IsQueueMarkedDrain()
909

    
910
    # Setup worker pool
911
    self._wpool = _JobQueueWorkerPool(self)
912
    try:
913
      self._InspectQueue()
914
    except:
915
      self._wpool.TerminateWorkers()
916
      raise
917

    
918
  @locking.ssynchronized(_LOCK)
919
  @_RequireOpenQueue
920
  def _InspectQueue(self):
921
    """Loads the whole job queue and resumes unfinished jobs.
922

923
    This function needs the lock here because WorkerPool.AddTask() may start a
924
    job while we're still doing our work.
925

926
    """
927
    logging.info("Inspecting job queue")
928

    
929
    all_job_ids = self._GetJobIDsUnlocked()
930
    jobs_count = len(all_job_ids)
931
    lastinfo = time.time()
932
    for idx, job_id in enumerate(all_job_ids):
933
      # Give an update every 1000 jobs or 10 seconds
934
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
935
          idx == (jobs_count - 1)):
936
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
937
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
938
        lastinfo = time.time()
939

    
940
      job = self._LoadJobUnlocked(job_id)
941

    
942
      # a failure in loading the job can cause 'None' to be returned
943
      if job is None:
944
        continue
945

    
946
      status = job.CalcStatus()
947

    
948
      if status in (constants.JOB_STATUS_QUEUED,
949
                    constants.JOB_STATUS_WAITLOCK):
950
        self._wpool.AddTask((job, ))
951

    
952
      elif status in (constants.JOB_STATUS_RUNNING,
953
                      constants.JOB_STATUS_CANCELING):
954
        logging.warning("Unfinished job %s found: %s", job.id, job)
955
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
956
                              "Unclean master daemon shutdown")
957
        self.UpdateJobUnlocked(job)
958

    
959
    logging.info("Job queue inspection finished")
960

    
961
  @locking.ssynchronized(_LOCK)
962
  @_RequireOpenQueue
963
  def AddNode(self, node):
964
    """Register a new node with the queue.
965

966
    @type node: L{objects.Node}
967
    @param node: the node object to be added
968

969
    """
970
    node_name = node.name
971
    assert node_name != self._my_hostname
972

    
973
    # Clean queue directory on added node
974
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
975
    msg = result.fail_msg
976
    if msg:
977
      logging.warning("Cannot cleanup queue directory on node %s: %s",
978
                      node_name, msg)
979

    
980
    if not node.master_candidate:
981
      # remove if existing, ignoring errors
982
      self._nodes.pop(node_name, None)
983
      # and skip the replication of the job ids
984
      return
985

    
986
    # Upload the whole queue excluding archived jobs
987
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
988

    
989
    # Upload current serial file
990
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
991

    
992
    for file_name in files:
993
      # Read file content
994
      content = utils.ReadFile(file_name)
995

    
996
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
997
                                                  [node.primary_ip],
998
                                                  file_name, content)
999
      msg = result[node_name].fail_msg
1000
      if msg:
1001
        logging.error("Failed to upload file %s to node %s: %s",
1002
                      file_name, node_name, msg)
1003

    
1004
    self._nodes[node_name] = node.primary_ip
1005

    
1006
  @locking.ssynchronized(_LOCK)
1007
  @_RequireOpenQueue
1008
  def RemoveNode(self, node_name):
1009
    """Callback called when removing nodes from the cluster.
1010

1011
    @type node_name: str
1012
    @param node_name: the name of the node to remove
1013

1014
    """
1015
    self._nodes.pop(node_name, None)
1016

    
1017
  @staticmethod
1018
  def _CheckRpcResult(result, nodes, failmsg):
1019
    """Verifies the status of an RPC call.
1020

1021
    Since we aim to keep consistency should this node (the current
1022
    master) fail, we will log errors if our rpc fail, and especially
1023
    log the case when more than half of the nodes fails.
1024

1025
    @param result: the data as returned from the rpc call
1026
    @type nodes: list
1027
    @param nodes: the list of nodes we made the call to
1028
    @type failmsg: str
1029
    @param failmsg: the identifier to be used for logging
1030

1031
    """
1032
    failed = []
1033
    success = []
1034

    
1035
    for node in nodes:
1036
      msg = result[node].fail_msg
1037
      if msg:
1038
        failed.append(node)
1039
        logging.error("RPC call %s (%s) failed on node %s: %s",
1040
                      result[node].call, failmsg, node, msg)
1041
      else:
1042
        success.append(node)
1043

    
1044
    # +1 for the master node
1045
    if (len(success) + 1) < len(failed):
1046
      # TODO: Handle failing nodes
1047
      logging.error("More than half of the nodes failed")
1048

    
1049
  def _GetNodeIp(self):
1050
    """Helper for returning the node name/ip list.
1051

1052
    @rtype: (list, list)
1053
    @return: a tuple of two lists, the first one with the node
1054
        names and the second one with the node addresses
1055

1056
    """
1057
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1058
    name_list = self._nodes.keys()
1059
    addr_list = [self._nodes[name] for name in name_list]
1060
    return name_list, addr_list
1061

    
1062
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1063
    """Writes a file locally and then replicates it to all nodes.
1064

1065
    This function will replace the contents of a file on the local
1066
    node and then replicate it to all the other nodes we have.
1067

1068
    @type file_name: str
1069
    @param file_name: the path of the file to be replicated
1070
    @type data: str
1071
    @param data: the new contents of the file
1072
    @type replicate: boolean
1073
    @param replicate: whether to spread the changes to the remote nodes
1074

1075
    """
1076
    utils.WriteFile(file_name, data=data)
1077

    
1078
    if replicate:
1079
      names, addrs = self._GetNodeIp()
1080
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1081
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1082

    
1083
  def _RenameFilesUnlocked(self, rename):
1084
    """Renames a file locally and then replicate the change.
1085

1086
    This function will rename a file in the local queue directory
1087
    and then replicate this rename to all the other nodes we have.
1088

1089
    @type rename: list of (old, new)
1090
    @param rename: List containing tuples mapping old to new names
1091

1092
    """
1093
    # Rename them locally
1094
    for old, new in rename:
1095
      utils.RenameFile(old, new, mkdir=True)
1096

    
1097
    # ... and on all nodes
1098
    names, addrs = self._GetNodeIp()
1099
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1100
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1101

    
1102
  @staticmethod
1103
  def _FormatJobID(job_id):
1104
    """Convert a job ID to string format.
1105

1106
    Currently this just does C{str(job_id)} after performing some
1107
    checks, but if we want to change the job id format this will
1108
    abstract this change.
1109

1110
    @type job_id: int or long
1111
    @param job_id: the numeric job id
1112
    @rtype: str
1113
    @return: the formatted job id
1114

1115
    """
1116
    if not isinstance(job_id, (int, long)):
1117
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1118
    if job_id < 0:
1119
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1120

    
1121
    return str(job_id)
1122

    
1123
  @classmethod
1124
  def _GetArchiveDirectory(cls, job_id):
1125
    """Returns the archive directory for a job.
1126

1127
    @type job_id: str
1128
    @param job_id: Job identifier
1129
    @rtype: str
1130
    @return: Directory name
1131

1132
    """
1133
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1134

    
1135
  def _NewSerialsUnlocked(self, count):
1136
    """Generates a new job identifier.
1137

1138
    Job identifiers are unique during the lifetime of a cluster.
1139

1140
    @type count: integer
1141
    @param count: how many serials to return
1142
    @rtype: str
1143
    @return: a string representing the job identifier.
1144

1145
    """
1146
    assert count > 0
1147
    # New number
1148
    serial = self._last_serial + count
1149

    
1150
    # Write to file
1151
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1152
                             "%s\n" % serial, True)
1153

    
1154
    result = [self._FormatJobID(v)
1155
              for v in range(self._last_serial, serial + 1)]
1156
    # Keep it only if we were able to write the file
1157
    self._last_serial = serial
1158

    
1159
    return result
1160

    
1161
  @staticmethod
1162
  def _GetJobPath(job_id):
1163
    """Returns the job file for a given job id.
1164

1165
    @type job_id: str
1166
    @param job_id: the job identifier
1167
    @rtype: str
1168
    @return: the path to the job file
1169

1170
    """
1171
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1172

    
1173
  @classmethod
1174
  def _GetArchivedJobPath(cls, job_id):
1175
    """Returns the archived job file for a give job id.
1176

1177
    @type job_id: str
1178
    @param job_id: the job identifier
1179
    @rtype: str
1180
    @return: the path to the archived job file
1181

1182
    """
1183
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1184
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1185

    
1186
  def _GetJobIDsUnlocked(self, sort=True):
1187
    """Return all known job IDs.
1188

1189
    The method only looks at disk because it's a requirement that all
1190
    jobs are present on disk (so in the _memcache we don't have any
1191
    extra IDs).
1192

1193
    @type sort: boolean
1194
    @param sort: perform sorting on the returned job ids
1195
    @rtype: list
1196
    @return: the list of job IDs
1197

1198
    """
1199
    jlist = []
1200
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1201
      m = self._RE_JOB_FILE.match(filename)
1202
      if m:
1203
        jlist.append(m.group(1))
1204
    if sort:
1205
      jlist = utils.NiceSort(jlist)
1206
    return jlist
1207

    
1208
  def _LoadJobUnlocked(self, job_id):
1209
    """Loads a job from the disk or memory.
1210

1211
    Given a job id, this will return the cached job object if
1212
    existing, or try to load the job from the disk. If loading from
1213
    disk, it will also add the job to the cache.
1214

1215
    @param job_id: the job id
1216
    @rtype: L{_QueuedJob} or None
1217
    @return: either None or the job object
1218

1219
    """
1220
    job = self._memcache.get(job_id, None)
1221
    if job:
1222
      logging.debug("Found job %s in memcache", job_id)
1223
      return job
1224

    
1225
    try:
1226
      job = self._LoadJobFromDisk(job_id)
1227
      if job is None:
1228
        return job
1229
    except errors.JobFileCorrupted:
1230
      old_path = self._GetJobPath(job_id)
1231
      new_path = self._GetArchivedJobPath(job_id)
1232
      if old_path == new_path:
1233
        # job already archived (future case)
1234
        logging.exception("Can't parse job %s", job_id)
1235
      else:
1236
        # non-archived case
1237
        logging.exception("Can't parse job %s, will archive.", job_id)
1238
        self._RenameFilesUnlocked([(old_path, new_path)])
1239
      return None
1240

    
1241
    self._memcache[job_id] = job
1242
    logging.debug("Added job %s to the cache", job_id)
1243
    return job
1244

    
1245
  def _LoadJobFromDisk(self, job_id):
1246
    """Load the given job file from disk.
1247

1248
    Given a job file, read, load and restore it in a _QueuedJob format.
1249

1250
    @type job_id: string
1251
    @param job_id: job identifier
1252
    @rtype: L{_QueuedJob} or None
1253
    @return: either None or the job object
1254

1255
    """
1256
    filepath = self._GetJobPath(job_id)
1257
    logging.debug("Loading job from %s", filepath)
1258
    try:
1259
      raw_data = utils.ReadFile(filepath)
1260
    except EnvironmentError, err:
1261
      if err.errno in (errno.ENOENT, ):
1262
        return None
1263
      raise
1264

    
1265
    try:
1266
      data = serializer.LoadJson(raw_data)
1267
      job = _QueuedJob.Restore(self, data)
1268
    except Exception, err: # pylint: disable-msg=W0703
1269
      raise errors.JobFileCorrupted(err)
1270

    
1271
    return job
1272

    
1273
  def SafeLoadJobFromDisk(self, job_id):
1274
    """Load the given job file from disk.
1275

1276
    Given a job file, read, load and restore it in a _QueuedJob format.
1277
    In case of error reading the job, it gets returned as None, and the
1278
    exception is logged.
1279

1280
    @type job_id: string
1281
    @param job_id: job identifier
1282
    @rtype: L{_QueuedJob} or None
1283
    @return: either None or the job object
1284

1285
    """
1286
    try:
1287
      return self._LoadJobFromDisk(job_id)
1288
    except (errors.JobFileCorrupted, EnvironmentError):
1289
      logging.exception("Can't load/parse job %s", job_id)
1290
      return None
1291

    
1292
  @staticmethod
1293
  def _IsQueueMarkedDrain():
1294
    """Check if the queue is marked from drain.
1295

1296
    This currently uses the queue drain file, which makes it a
1297
    per-node flag. In the future this can be moved to the config file.
1298

1299
    @rtype: boolean
1300
    @return: True of the job queue is marked for draining
1301

1302
    """
1303
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1304

    
1305
  def _UpdateQueueSizeUnlocked(self):
1306
    """Update the queue size.
1307

1308
    """
1309
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1310

    
1311
  @locking.ssynchronized(_LOCK)
1312
  @_RequireOpenQueue
1313
  def SetDrainFlag(self, drain_flag):
1314
    """Sets the drain flag for the queue.
1315

1316
    @type drain_flag: boolean
1317
    @param drain_flag: Whether to set or unset the drain flag
1318

1319
    """
1320
    if drain_flag:
1321
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1322
    else:
1323
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1324

    
1325
    self._drained = drain_flag
1326

    
1327
    return True
1328

    
1329
  @_RequireOpenQueue
1330
  def _SubmitJobUnlocked(self, job_id, ops):
1331
    """Create and store a new job.
1332

1333
    This enters the job into our job queue and also puts it on the new
1334
    queue, in order for it to be picked up by the queue processors.
1335

1336
    @type job_id: job ID
1337
    @param job_id: the job ID for the new job
1338
    @type ops: list
1339
    @param ops: The list of OpCodes that will become the new job.
1340
    @rtype: L{_QueuedJob}
1341
    @return: the job object to be queued
1342
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1343
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1344

1345
    """
1346
    # Ok when sharing the big job queue lock, as the drain file is created when
1347
    # the lock is exclusive.
1348
    if self._drained:
1349
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1350

    
1351
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1352
      raise errors.JobQueueFull()
1353

    
1354
    job = _QueuedJob(self, job_id, ops)
1355

    
1356
    # Write to disk
1357
    self.UpdateJobUnlocked(job)
1358

    
1359
    self._queue_size += 1
1360

    
1361
    logging.debug("Adding new job %s to the cache", job_id)
1362
    self._memcache[job_id] = job
1363

    
1364
    return job
1365

    
1366
  @locking.ssynchronized(_LOCK)
1367
  @_RequireOpenQueue
1368
  def SubmitJob(self, ops):
1369
    """Create and store a new job.
1370

1371
    @see: L{_SubmitJobUnlocked}
1372

1373
    """
1374
    job_id = self._NewSerialsUnlocked(1)[0]
1375
    self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
1376
    return job_id
1377

    
1378
  @locking.ssynchronized(_LOCK)
1379
  @_RequireOpenQueue
1380
  def SubmitManyJobs(self, jobs):
1381
    """Create and store multiple jobs.
1382

1383
    @see: L{_SubmitJobUnlocked}
1384

1385
    """
1386
    results = []
1387
    tasks = []
1388
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1389
    for job_id, ops in zip(all_job_ids, jobs):
1390
      try:
1391
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1392
        status = True
1393
        data = job_id
1394
      except errors.GenericError, err:
1395
        data = str(err)
1396
        status = False
1397
      results.append((status, data))
1398
    self._wpool.AddManyTasks(tasks)
1399

    
1400
    return results
1401

    
1402
  @_RequireOpenQueue
1403
  def UpdateJobUnlocked(self, job, replicate=True):
1404
    """Update a job's on disk storage.
1405

1406
    After a job has been modified, this function needs to be called in
1407
    order to write the changes to disk and replicate them to the other
1408
    nodes.
1409

1410
    @type job: L{_QueuedJob}
1411
    @param job: the changed job
1412
    @type replicate: boolean
1413
    @param replicate: whether to replicate the change to remote nodes
1414

1415
    """
1416
    filename = self._GetJobPath(job.id)
1417
    data = serializer.DumpJson(job.Serialize(), indent=False)
1418
    logging.debug("Writing job %s to %s", job.id, filename)
1419
    self._UpdateJobQueueFile(filename, data, replicate)
1420

    
1421
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1422
                        timeout):
1423
    """Waits for changes in a job.
1424

1425
    @type job_id: string
1426
    @param job_id: Job identifier
1427
    @type fields: list of strings
1428
    @param fields: Which fields to check for changes
1429
    @type prev_job_info: list or None
1430
    @param prev_job_info: Last job information returned
1431
    @type prev_log_serial: int
1432
    @param prev_log_serial: Last job message serial number
1433
    @type timeout: float
1434
    @param timeout: maximum time to wait in seconds
1435
    @rtype: tuple (job info, log entries)
1436
    @return: a tuple of the job information as required via
1437
        the fields parameter, and the log entries as a list
1438

1439
        if the job has not changed and the timeout has expired,
1440
        we instead return a special value,
1441
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1442
        as such by the clients
1443

1444
    """
1445
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1446

    
1447
    helper = _WaitForJobChangesHelper()
1448

    
1449
    return helper(self._GetJobPath(job_id), load_fn,
1450
                  fields, prev_job_info, prev_log_serial, timeout)
1451

    
1452
  @locking.ssynchronized(_LOCK)
1453
  @_RequireOpenQueue
1454
  def CancelJob(self, job_id):
1455
    """Cancels a job.
1456

1457
    This will only succeed if the job has not started yet.
1458

1459
    @type job_id: string
1460
    @param job_id: job ID of job to be cancelled.
1461

1462
    """
1463
    logging.info("Cancelling job %s", job_id)
1464

    
1465
    job = self._LoadJobUnlocked(job_id)
1466
    if not job:
1467
      logging.debug("Job %s not found", job_id)
1468
      return (False, "Job %s not found" % job_id)
1469

    
1470
    job_status = job.CalcStatus()
1471

    
1472
    if job_status not in (constants.JOB_STATUS_QUEUED,
1473
                          constants.JOB_STATUS_WAITLOCK):
1474
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1475
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1476

    
1477
    if job_status == constants.JOB_STATUS_QUEUED:
1478
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1479
                            "Job canceled by request")
1480
      msg = "Job %s canceled" % job.id
1481

    
1482
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1483
      # The worker will notice the new status and cancel the job
1484
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1485
      msg = "Job %s will be canceled" % job.id
1486

    
1487
    self.UpdateJobUnlocked(job)
1488

    
1489
    return (True, msg)
1490

    
1491
  @_RequireOpenQueue
1492
  def _ArchiveJobsUnlocked(self, jobs):
1493
    """Archives jobs.
1494

1495
    @type jobs: list of L{_QueuedJob}
1496
    @param jobs: Job objects
1497
    @rtype: int
1498
    @return: Number of archived jobs
1499

1500
    """
1501
    archive_jobs = []
1502
    rename_files = []
1503
    for job in jobs:
1504
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1505
        logging.debug("Job %s is not yet done", job.id)
1506
        continue
1507

    
1508
      archive_jobs.append(job)
1509

    
1510
      old = self._GetJobPath(job.id)
1511
      new = self._GetArchivedJobPath(job.id)
1512
      rename_files.append((old, new))
1513

    
1514
    # TODO: What if 1..n files fail to rename?
1515
    self._RenameFilesUnlocked(rename_files)
1516

    
1517
    logging.debug("Successfully archived job(s) %s",
1518
                  utils.CommaJoin(job.id for job in archive_jobs))
1519

    
1520
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1521
    # the files, we update the cached queue size from the filesystem. When we
1522
    # get around to fix the TODO: above, we can use the number of actually
1523
    # archived jobs to fix this.
1524
    self._UpdateQueueSizeUnlocked()
1525
    return len(archive_jobs)
1526

    
1527
  @locking.ssynchronized(_LOCK)
1528
  @_RequireOpenQueue
1529
  def ArchiveJob(self, job_id):
1530
    """Archives a job.
1531

1532
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1533

1534
    @type job_id: string
1535
    @param job_id: Job ID of job to be archived.
1536
    @rtype: bool
1537
    @return: Whether job was archived
1538

1539
    """
1540
    logging.info("Archiving job %s", job_id)
1541

    
1542
    job = self._LoadJobUnlocked(job_id)
1543
    if not job:
1544
      logging.debug("Job %s not found", job_id)
1545
      return False
1546

    
1547
    return self._ArchiveJobsUnlocked([job]) == 1
1548

    
1549
  @locking.ssynchronized(_LOCK)
1550
  @_RequireOpenQueue
1551
  def AutoArchiveJobs(self, age, timeout):
1552
    """Archives all jobs based on age.
1553

1554
    The method will archive all jobs which are older than the age
1555
    parameter. For jobs that don't have an end timestamp, the start
1556
    timestamp will be considered. The special '-1' age will cause
1557
    archival of all jobs (that are not running or queued).
1558

1559
    @type age: int
1560
    @param age: the minimum age in seconds
1561

1562
    """
1563
    logging.info("Archiving jobs with age more than %s seconds", age)
1564

    
1565
    now = time.time()
1566
    end_time = now + timeout
1567
    archived_count = 0
1568
    last_touched = 0
1569

    
1570
    all_job_ids = self._GetJobIDsUnlocked()
1571
    pending = []
1572
    for idx, job_id in enumerate(all_job_ids):
1573
      last_touched = idx + 1
1574

    
1575
      # Not optimal because jobs could be pending
1576
      # TODO: Measure average duration for job archival and take number of
1577
      # pending jobs into account.
1578
      if time.time() > end_time:
1579
        break
1580

    
1581
      # Returns None if the job failed to load
1582
      job = self._LoadJobUnlocked(job_id)
1583
      if job:
1584
        if job.end_timestamp is None:
1585
          if job.start_timestamp is None:
1586
            job_age = job.received_timestamp
1587
          else:
1588
            job_age = job.start_timestamp
1589
        else:
1590
          job_age = job.end_timestamp
1591

    
1592
        if age == -1 or now - job_age[0] > age:
1593
          pending.append(job)
1594

    
1595
          # Archive 10 jobs at a time
1596
          if len(pending) >= 10:
1597
            archived_count += self._ArchiveJobsUnlocked(pending)
1598
            pending = []
1599

    
1600
    if pending:
1601
      archived_count += self._ArchiveJobsUnlocked(pending)
1602

    
1603
    return (archived_count, len(all_job_ids) - last_touched)
1604

    
1605
  def QueryJobs(self, job_ids, fields):
1606
    """Returns a list of jobs in queue.
1607

1608
    @type job_ids: list
1609
    @param job_ids: sequence of job identifiers or None for all
1610
    @type fields: list
1611
    @param fields: names of fields to return
1612
    @rtype: list
1613
    @return: list one element per job, each element being list with
1614
        the requested fields
1615

1616
    """
1617
    jobs = []
1618
    list_all = False
1619
    if not job_ids:
1620
      # Since files are added to/removed from the queue atomically, there's no
1621
      # risk of getting the job ids in an inconsistent state.
1622
      job_ids = self._GetJobIDsUnlocked()
1623
      list_all = True
1624

    
1625
    for job_id in job_ids:
1626
      job = self.SafeLoadJobFromDisk(job_id)
1627
      if job is not None:
1628
        jobs.append(job.GetInfo(fields))
1629
      elif not list_all:
1630
        jobs.append(None)
1631

    
1632
    return jobs
1633

    
1634
  @locking.ssynchronized(_LOCK)
1635
  @_RequireOpenQueue
1636
  def Shutdown(self):
1637
    """Stops the job queue.
1638

1639
    This shutdowns all the worker threads an closes the queue.
1640

1641
    """
1642
    self._wpool.TerminateWorkers()
1643

    
1644
    self._queue_filelock.Close()
1645
    self._queue_filelock = None