Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 82b22e19

History | View | Annotate | Download (49.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import errno
35
import re
36
import time
37
import weakref
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
59

    
60

    
61
JOBQUEUE_THREADS = 25
62
JOBS_PER_ARCHIVE_DIRECTORY = 10000
63

    
64
# member lock names to be passed to @ssynchronized decorator
65
_LOCK = "_lock"
66
_QUEUE = "_queue"
67

    
68

    
69
class CancelJob(Exception):
70
  """Special exception to cancel a job.
71

72
  """
73

    
74

    
75
def TimeStampNow():
76
  """Returns the current timestamp.
77

78
  @rtype: tuple
79
  @return: the current time in the (seconds, microseconds) format
80

81
  """
82
  return utils.SplitTime(time.time())
83

    
84

    
85
class _QueuedOpCode(object):
86
  """Encapsulates an opcode object.
87

88
  @ivar log: holds the execution log and consists of tuples
89
  of the form C{(log_serial, timestamp, level, message)}
90
  @ivar input: the OpCode we encapsulate
91
  @ivar status: the current status
92
  @ivar result: the result of the LU execution
93
  @ivar start_timestamp: timestamp for the start of the execution
94
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
95
  @ivar stop_timestamp: timestamp for the end of the execution
96

97
  """
98
  __slots__ = ["input", "status", "result", "log",
99
               "start_timestamp", "exec_timestamp", "end_timestamp",
100
               "__weakref__"]
101

    
102
  def __init__(self, op):
103
    """Constructor for the _QuededOpCode.
104

105
    @type op: L{opcodes.OpCode}
106
    @param op: the opcode we encapsulate
107

108
    """
109
    self.input = op
110
    self.status = constants.OP_STATUS_QUEUED
111
    self.result = None
112
    self.log = []
113
    self.start_timestamp = None
114
    self.exec_timestamp = None
115
    self.end_timestamp = None
116

    
117
  @classmethod
118
  def Restore(cls, state):
119
    """Restore the _QueuedOpCode from the serialized form.
120

121
    @type state: dict
122
    @param state: the serialized state
123
    @rtype: _QueuedOpCode
124
    @return: a new _QueuedOpCode instance
125

126
    """
127
    obj = _QueuedOpCode.__new__(cls)
128
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
129
    obj.status = state["status"]
130
    obj.result = state["result"]
131
    obj.log = state["log"]
132
    obj.start_timestamp = state.get("start_timestamp", None)
133
    obj.exec_timestamp = state.get("exec_timestamp", None)
134
    obj.end_timestamp = state.get("end_timestamp", None)
135
    return obj
136

    
137
  def Serialize(self):
138
    """Serializes this _QueuedOpCode.
139

140
    @rtype: dict
141
    @return: the dictionary holding the serialized state
142

143
    """
144
    return {
145
      "input": self.input.__getstate__(),
146
      "status": self.status,
147
      "result": self.result,
148
      "log": self.log,
149
      "start_timestamp": self.start_timestamp,
150
      "exec_timestamp": self.exec_timestamp,
151
      "end_timestamp": self.end_timestamp,
152
      }
153

    
154

    
155
class _QueuedJob(object):
156
  """In-memory job representation.
157

158
  This is what we use to track the user-submitted jobs. Locking must
159
  be taken care of by users of this class.
160

161
  @type queue: L{JobQueue}
162
  @ivar queue: the parent queue
163
  @ivar id: the job ID
164
  @type ops: list
165
  @ivar ops: the list of _QueuedOpCode that constitute the job
166
  @type log_serial: int
167
  @ivar log_serial: holds the index for the next log entry
168
  @ivar received_timestamp: the timestamp for when the job was received
169
  @ivar start_timestmap: the timestamp for start of execution
170
  @ivar end_timestamp: the timestamp for end of execution
171

172
  """
173
  # pylint: disable-msg=W0212
174
  __slots__ = ["queue", "id", "ops", "log_serial",
175
               "received_timestamp", "start_timestamp", "end_timestamp",
176
               "__weakref__"]
177

    
178
  def __init__(self, queue, job_id, ops):
179
    """Constructor for the _QueuedJob.
180

181
    @type queue: L{JobQueue}
182
    @param queue: our parent queue
183
    @type job_id: job_id
184
    @param job_id: our job id
185
    @type ops: list
186
    @param ops: the list of opcodes we hold, which will be encapsulated
187
        in _QueuedOpCodes
188

189
    """
190
    if not ops:
191
      raise errors.GenericError("A job needs at least one opcode")
192

    
193
    self.queue = queue
194
    self.id = job_id
195
    self.ops = [_QueuedOpCode(op) for op in ops]
196
    self.log_serial = 0
197
    self.received_timestamp = TimeStampNow()
198
    self.start_timestamp = None
199
    self.end_timestamp = None
200

    
201
  def __repr__(self):
202
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
203
              "id=%s" % self.id,
204
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
205

    
206
    return "<%s at %#x>" % (" ".join(status), id(self))
207

    
208
  @classmethod
209
  def Restore(cls, queue, state):
210
    """Restore a _QueuedJob from serialized state:
211

212
    @type queue: L{JobQueue}
213
    @param queue: to which queue the restored job belongs
214
    @type state: dict
215
    @param state: the serialized state
216
    @rtype: _JobQueue
217
    @return: the restored _JobQueue instance
218

219
    """
220
    obj = _QueuedJob.__new__(cls)
221
    obj.queue = queue
222
    obj.id = state["id"]
223
    obj.received_timestamp = state.get("received_timestamp", None)
224
    obj.start_timestamp = state.get("start_timestamp", None)
225
    obj.end_timestamp = state.get("end_timestamp", None)
226

    
227
    obj.ops = []
228
    obj.log_serial = 0
229
    for op_state in state["ops"]:
230
      op = _QueuedOpCode.Restore(op_state)
231
      for log_entry in op.log:
232
        obj.log_serial = max(obj.log_serial, log_entry[0])
233
      obj.ops.append(op)
234

    
235
    return obj
236

    
237
  def Serialize(self):
238
    """Serialize the _JobQueue instance.
239

240
    @rtype: dict
241
    @return: the serialized state
242

243
    """
244
    return {
245
      "id": self.id,
246
      "ops": [op.Serialize() for op in self.ops],
247
      "start_timestamp": self.start_timestamp,
248
      "end_timestamp": self.end_timestamp,
249
      "received_timestamp": self.received_timestamp,
250
      }
251

    
252
  def CalcStatus(self):
253
    """Compute the status of this job.
254

255
    This function iterates over all the _QueuedOpCodes in the job and
256
    based on their status, computes the job status.
257

258
    The algorithm is:
259
      - if we find a cancelled, or finished with error, the job
260
        status will be the same
261
      - otherwise, the last opcode with the status one of:
262
          - waitlock
263
          - canceling
264
          - running
265

266
        will determine the job status
267

268
      - otherwise, it means either all opcodes are queued, or success,
269
        and the job status will be the same
270

271
    @return: the job status
272

273
    """
274
    status = constants.JOB_STATUS_QUEUED
275

    
276
    all_success = True
277
    for op in self.ops:
278
      if op.status == constants.OP_STATUS_SUCCESS:
279
        continue
280

    
281
      all_success = False
282

    
283
      if op.status == constants.OP_STATUS_QUEUED:
284
        pass
285
      elif op.status == constants.OP_STATUS_WAITLOCK:
286
        status = constants.JOB_STATUS_WAITLOCK
287
      elif op.status == constants.OP_STATUS_RUNNING:
288
        status = constants.JOB_STATUS_RUNNING
289
      elif op.status == constants.OP_STATUS_CANCELING:
290
        status = constants.JOB_STATUS_CANCELING
291
        break
292
      elif op.status == constants.OP_STATUS_ERROR:
293
        status = constants.JOB_STATUS_ERROR
294
        # The whole job fails if one opcode failed
295
        break
296
      elif op.status == constants.OP_STATUS_CANCELED:
297
        status = constants.OP_STATUS_CANCELED
298
        break
299

    
300
    if all_success:
301
      status = constants.JOB_STATUS_SUCCESS
302

    
303
    return status
304

    
305
  def GetLogEntries(self, newer_than):
306
    """Selectively returns the log entries.
307

308
    @type newer_than: None or int
309
    @param newer_than: if this is None, return all log entries,
310
        otherwise return only the log entries with serial higher
311
        than this value
312
    @rtype: list
313
    @return: the list of the log entries selected
314

315
    """
316
    if newer_than is None:
317
      serial = -1
318
    else:
319
      serial = newer_than
320

    
321
    entries = []
322
    for op in self.ops:
323
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
324

    
325
    return entries
326

    
327
  def GetInfo(self, fields):
328
    """Returns information about a job.
329

330
    @type fields: list
331
    @param fields: names of fields to return
332
    @rtype: list
333
    @return: list with one element for each field
334
    @raise errors.OpExecError: when an invalid field
335
        has been passed
336

337
    """
338
    row = []
339
    for fname in fields:
340
      if fname == "id":
341
        row.append(self.id)
342
      elif fname == "status":
343
        row.append(self.CalcStatus())
344
      elif fname == "ops":
345
        row.append([op.input.__getstate__() for op in self.ops])
346
      elif fname == "opresult":
347
        row.append([op.result for op in self.ops])
348
      elif fname == "opstatus":
349
        row.append([op.status for op in self.ops])
350
      elif fname == "oplog":
351
        row.append([op.log for op in self.ops])
352
      elif fname == "opstart":
353
        row.append([op.start_timestamp for op in self.ops])
354
      elif fname == "opexec":
355
        row.append([op.exec_timestamp for op in self.ops])
356
      elif fname == "opend":
357
        row.append([op.end_timestamp for op in self.ops])
358
      elif fname == "received_ts":
359
        row.append(self.received_timestamp)
360
      elif fname == "start_ts":
361
        row.append(self.start_timestamp)
362
      elif fname == "end_ts":
363
        row.append(self.end_timestamp)
364
      elif fname == "summary":
365
        row.append([op.input.Summary() for op in self.ops])
366
      else:
367
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
368
    return row
369

    
370
  def MarkUnfinishedOps(self, status, result):
371
    """Mark unfinished opcodes with a given status and result.
372

373
    This is an utility function for marking all running or waiting to
374
    be run opcodes with a given status. Opcodes which are already
375
    finalised are not changed.
376

377
    @param status: a given opcode status
378
    @param result: the opcode result
379

380
    """
381
    try:
382
      not_marked = True
383
      for op in self.ops:
384
        if op.status in constants.OPS_FINALIZED:
385
          assert not_marked, "Finalized opcodes found after non-finalized ones"
386
          continue
387
        op.status = status
388
        op.result = result
389
        not_marked = False
390
    finally:
391
      self.queue.UpdateJobUnlocked(self)
392

    
393

    
394
class _OpExecCallbacks(mcpu.OpExecCbBase):
395
  def __init__(self, queue, job, op):
396
    """Initializes this class.
397

398
    @type queue: L{JobQueue}
399
    @param queue: Job queue
400
    @type job: L{_QueuedJob}
401
    @param job: Job object
402
    @type op: L{_QueuedOpCode}
403
    @param op: OpCode
404

405
    """
406
    assert queue, "Queue is missing"
407
    assert job, "Job is missing"
408
    assert op, "Opcode is missing"
409

    
410
    self._queue = queue
411
    self._job = job
412
    self._op = op
413

    
414
  def _CheckCancel(self):
415
    """Raises an exception to cancel the job if asked to.
416

417
    """
418
    # Cancel here if we were asked to
419
    if self._op.status == constants.OP_STATUS_CANCELING:
420
      logging.debug("Canceling opcode")
421
      raise CancelJob()
422

    
423
  @locking.ssynchronized(_QUEUE, shared=1)
424
  def NotifyStart(self):
425
    """Mark the opcode as running, not lock-waiting.
426

427
    This is called from the mcpu code as a notifier function, when the LU is
428
    finally about to start the Exec() method. Of course, to have end-user
429
    visible results, the opcode must be initially (before calling into
430
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
431

432
    """
433
    assert self._op in self._job.ops
434
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
435
                               constants.OP_STATUS_CANCELING)
436

    
437
    # Cancel here if we were asked to
438
    self._CheckCancel()
439

    
440
    logging.debug("Opcode is now running")
441

    
442
    self._op.status = constants.OP_STATUS_RUNNING
443
    self._op.exec_timestamp = TimeStampNow()
444

    
445
    # And finally replicate the job status
446
    self._queue.UpdateJobUnlocked(self._job)
447

    
448
  @locking.ssynchronized(_QUEUE, shared=1)
449
  def _AppendFeedback(self, timestamp, log_type, log_msg):
450
    """Internal feedback append function, with locks
451

452
    """
453
    self._job.log_serial += 1
454
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
455
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
456

    
457
  def Feedback(self, *args):
458
    """Append a log entry.
459

460
    """
461
    assert len(args) < 3
462

    
463
    if len(args) == 1:
464
      log_type = constants.ELOG_MESSAGE
465
      log_msg = args[0]
466
    else:
467
      (log_type, log_msg) = args
468

    
469
    # The time is split to make serialization easier and not lose
470
    # precision.
471
    timestamp = utils.SplitTime(time.time())
472
    self._AppendFeedback(timestamp, log_type, log_msg)
473

    
474
  def ReportLocks(self, msg):
475
    """Write locking information to the job.
476

477
    Called whenever the LU processor is waiting for a lock or has acquired one.
478

479
    """
480
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
481
                               constants.OP_STATUS_CANCELING)
482

    
483
    # Cancel here if we were asked to
484
    self._CheckCancel()
485

    
486

    
487
class _JobChangesChecker(object):
488
  def __init__(self, fields, prev_job_info, prev_log_serial):
489
    """Initializes this class.
490

491
    @type fields: list of strings
492
    @param fields: Fields requested by LUXI client
493
    @type prev_job_info: string
494
    @param prev_job_info: previous job info, as passed by the LUXI client
495
    @type prev_log_serial: string
496
    @param prev_log_serial: previous job serial, as passed by the LUXI client
497

498
    """
499
    self._fields = fields
500
    self._prev_job_info = prev_job_info
501
    self._prev_log_serial = prev_log_serial
502

    
503
  def __call__(self, job):
504
    """Checks whether job has changed.
505

506
    @type job: L{_QueuedJob}
507
    @param job: Job object
508

509
    """
510
    status = job.CalcStatus()
511
    job_info = job.GetInfo(self._fields)
512
    log_entries = job.GetLogEntries(self._prev_log_serial)
513

    
514
    # Serializing and deserializing data can cause type changes (e.g. from
515
    # tuple to list) or precision loss. We're doing it here so that we get
516
    # the same modifications as the data received from the client. Without
517
    # this, the comparison afterwards might fail without the data being
518
    # significantly different.
519
    # TODO: we just deserialized from disk, investigate how to make sure that
520
    # the job info and log entries are compatible to avoid this further step.
521
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
522
    # efficient, though floats will be tricky
523
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
524
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
525

    
526
    # Don't even try to wait if the job is no longer running, there will be
527
    # no changes.
528
    if (status not in (constants.JOB_STATUS_QUEUED,
529
                       constants.JOB_STATUS_RUNNING,
530
                       constants.JOB_STATUS_WAITLOCK) or
531
        job_info != self._prev_job_info or
532
        (log_entries and self._prev_log_serial != log_entries[0][0])):
533
      logging.debug("Job %s changed", job.id)
534
      return (job_info, log_entries)
535

    
536
    return None
537

    
538

    
539
class _JobFileChangesWaiter(object):
540
  def __init__(self, filename):
541
    """Initializes this class.
542

543
    @type filename: string
544
    @param filename: Path to job file
545
    @raises errors.InotifyError: if the notifier cannot be setup
546

547
    """
548
    self._wm = pyinotify.WatchManager()
549
    self._inotify_handler = \
550
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
551
    self._notifier = \
552
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
553
    try:
554
      self._inotify_handler.enable()
555
    except Exception:
556
      # pyinotify doesn't close file descriptors automatically
557
      self._notifier.stop()
558
      raise
559

    
560
  def _OnInotify(self, notifier_enabled):
561
    """Callback for inotify.
562

563
    """
564
    if not notifier_enabled:
565
      self._inotify_handler.enable()
566

    
567
  def Wait(self, timeout):
568
    """Waits for the job file to change.
569

570
    @type timeout: float
571
    @param timeout: Timeout in seconds
572
    @return: Whether there have been events
573

574
    """
575
    assert timeout >= 0
576
    have_events = self._notifier.check_events(timeout * 1000)
577
    if have_events:
578
      self._notifier.read_events()
579
    self._notifier.process_events()
580
    return have_events
581

    
582
  def Close(self):
583
    """Closes underlying notifier and its file descriptor.
584

585
    """
586
    self._notifier.stop()
587

    
588

    
589
class _JobChangesWaiter(object):
590
  def __init__(self, filename):
591
    """Initializes this class.
592

593
    @type filename: string
594
    @param filename: Path to job file
595

596
    """
597
    self._filewaiter = None
598
    self._filename = filename
599

    
600
  def Wait(self, timeout):
601
    """Waits for a job to change.
602

603
    @type timeout: float
604
    @param timeout: Timeout in seconds
605
    @return: Whether there have been events
606

607
    """
608
    if self._filewaiter:
609
      return self._filewaiter.Wait(timeout)
610

    
611
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
612
    # If this point is reached, return immediately and let caller check the job
613
    # file again in case there were changes since the last check. This avoids a
614
    # race condition.
615
    self._filewaiter = _JobFileChangesWaiter(self._filename)
616

    
617
    return True
618

    
619
  def Close(self):
620
    """Closes underlying waiter.
621

622
    """
623
    if self._filewaiter:
624
      self._filewaiter.Close()
625

    
626

    
627
class _WaitForJobChangesHelper(object):
628
  """Helper class using inotify to wait for changes in a job file.
629

630
  This class takes a previous job status and serial, and alerts the client when
631
  the current job status has changed.
632

633
  """
634
  @staticmethod
635
  def _CheckForChanges(job_load_fn, check_fn):
636
    job = job_load_fn()
637
    if not job:
638
      raise errors.JobLost()
639

    
640
    result = check_fn(job)
641
    if result is None:
642
      raise utils.RetryAgain()
643

    
644
    return result
645

    
646
  def __call__(self, filename, job_load_fn,
647
               fields, prev_job_info, prev_log_serial, timeout):
648
    """Waits for changes on a job.
649

650
    @type filename: string
651
    @param filename: File on which to wait for changes
652
    @type job_load_fn: callable
653
    @param job_load_fn: Function to load job
654
    @type fields: list of strings
655
    @param fields: Which fields to check for changes
656
    @type prev_job_info: list or None
657
    @param prev_job_info: Last job information returned
658
    @type prev_log_serial: int
659
    @param prev_log_serial: Last job message serial number
660
    @type timeout: float
661
    @param timeout: maximum time to wait in seconds
662

663
    """
664
    try:
665
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
666
      waiter = _JobChangesWaiter(filename)
667
      try:
668
        return utils.Retry(compat.partial(self._CheckForChanges,
669
                                          job_load_fn, check_fn),
670
                           utils.RETRY_REMAINING_TIME, timeout,
671
                           wait_fn=waiter.Wait)
672
      finally:
673
        waiter.Close()
674
    except (errors.InotifyError, errors.JobLost):
675
      return None
676
    except utils.RetryTimeout:
677
      return constants.JOB_NOTCHANGED
678

    
679

    
680
def _EncodeOpError(err):
681
  """Encodes an error which occurred while processing an opcode.
682

683
  """
684
  if isinstance(err, errors.GenericError):
685
    to_encode = err
686
  else:
687
    to_encode = errors.OpExecError(str(err))
688

    
689
  return errors.EncodeException(to_encode)
690

    
691

    
692
class _JobQueueWorker(workerpool.BaseWorker):
693
  """The actual job workers.
694

695
  """
696
  def RunTask(self, job): # pylint: disable-msg=W0221
697
    """Job executor.
698

699
    This functions processes a job. It is closely tied to the _QueuedJob and
700
    _QueuedOpCode classes.
701

702
    @type job: L{_QueuedJob}
703
    @param job: the job to be processed
704

705
    """
706
    self.SetTaskName("Job%s" % job.id)
707

    
708
    logging.info("Processing job %s", job.id)
709
    proc = mcpu.Processor(self.pool.queue.context, job.id)
710
    queue = job.queue
711
    try:
712
      try:
713
        count = len(job.ops)
714
        for idx, op in enumerate(job.ops):
715
          op_summary = op.input.Summary()
716
          if op.status == constants.OP_STATUS_SUCCESS:
717
            # this is a job that was partially completed before master
718
            # daemon shutdown, so it can be expected that some opcodes
719
            # are already completed successfully (if any did error
720
            # out, then the whole job should have been aborted and not
721
            # resubmitted for processing)
722
            logging.info("Op %s/%s: opcode %s already processed, skipping",
723
                         idx + 1, count, op_summary)
724
            continue
725
          try:
726
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
727
                         op_summary)
728

    
729
            queue.acquire(shared=1)
730
            try:
731
              if op.status == constants.OP_STATUS_CANCELED:
732
                logging.debug("Canceling opcode")
733
                raise CancelJob()
734
              assert op.status == constants.OP_STATUS_QUEUED
735
              logging.debug("Opcode %s/%s waiting for locks",
736
                            idx + 1, count)
737
              op.status = constants.OP_STATUS_WAITLOCK
738
              op.result = None
739
              op.start_timestamp = TimeStampNow()
740
              if idx == 0: # first opcode
741
                job.start_timestamp = op.start_timestamp
742
              queue.UpdateJobUnlocked(job)
743

    
744
              input_opcode = op.input
745
            finally:
746
              queue.release()
747

    
748
            # Make sure not to hold queue lock while calling ExecOpCode
749
            result = proc.ExecOpCode(input_opcode,
750
                                     _OpExecCallbacks(queue, job, op))
751

    
752
            queue.acquire(shared=1)
753
            try:
754
              logging.debug("Opcode %s/%s succeeded", idx + 1, count)
755
              op.status = constants.OP_STATUS_SUCCESS
756
              op.result = result
757
              op.end_timestamp = TimeStampNow()
758
              if idx == count - 1:
759
                job.end_timestamp = TimeStampNow()
760

    
761
                # Consistency check
762
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
763
                                  for i in job.ops)
764

    
765
              queue.UpdateJobUnlocked(job)
766
            finally:
767
              queue.release()
768

    
769
            logging.info("Op %s/%s: Successfully finished opcode %s",
770
                         idx + 1, count, op_summary)
771
          except CancelJob:
772
            # Will be handled further up
773
            raise
774
          except Exception, err:
775
            queue.acquire(shared=1)
776
            try:
777
              try:
778
                logging.debug("Opcode %s/%s failed", idx + 1, count)
779
                op.status = constants.OP_STATUS_ERROR
780
                op.result = _EncodeOpError(err)
781
                op.end_timestamp = TimeStampNow()
782
                logging.info("Op %s/%s: Error in opcode %s: %s",
783
                             idx + 1, count, op_summary, err)
784

    
785
                to_encode = errors.OpExecError("Preceding opcode failed")
786
                job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
787
                                      _EncodeOpError(to_encode))
788

    
789
                # Consistency check
790
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
791
                                  for i in job.ops[:idx])
792
                assert compat.all(i.status == constants.OP_STATUS_ERROR and
793
                                  errors.GetEncodedError(i.result)
794
                                  for i in job.ops[idx:])
795
              finally:
796
                job.end_timestamp = TimeStampNow()
797
                queue.UpdateJobUnlocked(job)
798
            finally:
799
              queue.release()
800
            raise
801

    
802
      except CancelJob:
803
        queue.acquire(shared=1)
804
        try:
805
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
806
                                "Job canceled by request")
807
          job.end_timestamp = TimeStampNow()
808
          queue.UpdateJobUnlocked(job)
809
        finally:
810
          queue.release()
811
      except errors.GenericError, err:
812
        logging.exception("Ganeti exception")
813
      except:
814
        logging.exception("Unhandled exception")
815
    finally:
816
      status = job.CalcStatus()
817
      logging.info("Finished job %s, status = %s", job.id, status)
818

    
819

    
820
class _JobQueueWorkerPool(workerpool.WorkerPool):
821
  """Simple class implementing a job-processing workerpool.
822

823
  """
824
  def __init__(self, queue):
825
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
826
                                              JOBQUEUE_THREADS,
827
                                              _JobQueueWorker)
828
    self.queue = queue
829

    
830

    
831
def _RequireOpenQueue(fn):
832
  """Decorator for "public" functions.
833

834
  This function should be used for all 'public' functions. That is,
835
  functions usually called from other classes. Note that this should
836
  be applied only to methods (not plain functions), since it expects
837
  that the decorated function is called with a first argument that has
838
  a '_queue_filelock' argument.
839

840
  @warning: Use this decorator only after locking.ssynchronized
841

842
  Example::
843
    @locking.ssynchronized(_LOCK)
844
    @_RequireOpenQueue
845
    def Example(self):
846
      pass
847

848
  """
849
  def wrapper(self, *args, **kwargs):
850
    # pylint: disable-msg=W0212
851
    assert self._queue_filelock is not None, "Queue should be open"
852
    return fn(self, *args, **kwargs)
853
  return wrapper
854

    
855

    
856
class JobQueue(object):
857
  """Queue used to manage the jobs.
858

859
  @cvar _RE_JOB_FILE: regex matching the valid job file names
860

861
  """
862
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
863

    
864
  def __init__(self, context):
865
    """Constructor for JobQueue.
866

867
    The constructor will initialize the job queue object and then
868
    start loading the current jobs from disk, either for starting them
869
    (if they were queue) or for aborting them (if they were already
870
    running).
871

872
    @type context: GanetiContext
873
    @param context: the context object for access to the configuration
874
        data and other ganeti objects
875

876
    """
877
    self.context = context
878
    self._memcache = weakref.WeakValueDictionary()
879
    self._my_hostname = netutils.Hostname.GetSysName()
880

    
881
    # The Big JobQueue lock. If a code block or method acquires it in shared
882
    # mode safe it must guarantee concurrency with all the code acquiring it in
883
    # shared mode, including itself. In order not to acquire it at all
884
    # concurrency must be guaranteed with all code acquiring it in shared mode
885
    # and all code acquiring it exclusively.
886
    self._lock = locking.SharedLock("JobQueue")
887

    
888
    self.acquire = self._lock.acquire
889
    self.release = self._lock.release
890

    
891
    # Initialize the queue, and acquire the filelock.
892
    # This ensures no other process is working on the job queue.
893
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
894

    
895
    # Read serial file
896
    self._last_serial = jstore.ReadSerial()
897
    assert self._last_serial is not None, ("Serial file was modified between"
898
                                           " check in jstore and here")
899

    
900
    # Get initial list of nodes
901
    self._nodes = dict((n.name, n.primary_ip)
902
                       for n in self.context.cfg.GetAllNodesInfo().values()
903
                       if n.master_candidate)
904

    
905
    # Remove master node
906
    self._nodes.pop(self._my_hostname, None)
907

    
908
    # TODO: Check consistency across nodes
909

    
910
    self._queue_size = 0
911
    self._UpdateQueueSizeUnlocked()
912
    self._drained = self._IsQueueMarkedDrain()
913

    
914
    # Setup worker pool
915
    self._wpool = _JobQueueWorkerPool(self)
916
    try:
917
      # We need to lock here because WorkerPool.AddTask() may start a job while
918
      # we're still doing our work.
919
      self.acquire()
920
      try:
921
        logging.info("Inspecting job queue")
922

    
923
        all_job_ids = self._GetJobIDsUnlocked()
924
        jobs_count = len(all_job_ids)
925
        lastinfo = time.time()
926
        for idx, job_id in enumerate(all_job_ids):
927
          # Give an update every 1000 jobs or 10 seconds
928
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
929
              idx == (jobs_count - 1)):
930
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
931
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
932
            lastinfo = time.time()
933

    
934
          job = self._LoadJobUnlocked(job_id)
935

    
936
          # a failure in loading the job can cause 'None' to be returned
937
          if job is None:
938
            continue
939

    
940
          status = job.CalcStatus()
941

    
942
          if status in (constants.JOB_STATUS_QUEUED, ):
943
            self._wpool.AddTask((job, ))
944

    
945
          elif status in (constants.JOB_STATUS_RUNNING,
946
                          constants.JOB_STATUS_WAITLOCK,
947
                          constants.JOB_STATUS_CANCELING):
948
            logging.warning("Unfinished job %s found: %s", job.id, job)
949
            job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
950
                                  "Unclean master daemon shutdown")
951

    
952
        logging.info("Job queue inspection finished")
953
      finally:
954
        self.release()
955
    except:
956
      self._wpool.TerminateWorkers()
957
      raise
958

    
959
  @locking.ssynchronized(_LOCK)
960
  @_RequireOpenQueue
961
  def AddNode(self, node):
962
    """Register a new node with the queue.
963

964
    @type node: L{objects.Node}
965
    @param node: the node object to be added
966

967
    """
968
    node_name = node.name
969
    assert node_name != self._my_hostname
970

    
971
    # Clean queue directory on added node
972
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
973
    msg = result.fail_msg
974
    if msg:
975
      logging.warning("Cannot cleanup queue directory on node %s: %s",
976
                      node_name, msg)
977

    
978
    if not node.master_candidate:
979
      # remove if existing, ignoring errors
980
      self._nodes.pop(node_name, None)
981
      # and skip the replication of the job ids
982
      return
983

    
984
    # Upload the whole queue excluding archived jobs
985
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
986

    
987
    # Upload current serial file
988
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
989

    
990
    for file_name in files:
991
      # Read file content
992
      content = utils.ReadFile(file_name)
993

    
994
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
995
                                                  [node.primary_ip],
996
                                                  file_name, content)
997
      msg = result[node_name].fail_msg
998
      if msg:
999
        logging.error("Failed to upload file %s to node %s: %s",
1000
                      file_name, node_name, msg)
1001

    
1002
    self._nodes[node_name] = node.primary_ip
1003

    
1004
  @locking.ssynchronized(_LOCK)
1005
  @_RequireOpenQueue
1006
  def RemoveNode(self, node_name):
1007
    """Callback called when removing nodes from the cluster.
1008

1009
    @type node_name: str
1010
    @param node_name: the name of the node to remove
1011

1012
    """
1013
    self._nodes.pop(node_name, None)
1014

    
1015
  @staticmethod
1016
  def _CheckRpcResult(result, nodes, failmsg):
1017
    """Verifies the status of an RPC call.
1018

1019
    Since we aim to keep consistency should this node (the current
1020
    master) fail, we will log errors if our rpc fail, and especially
1021
    log the case when more than half of the nodes fails.
1022

1023
    @param result: the data as returned from the rpc call
1024
    @type nodes: list
1025
    @param nodes: the list of nodes we made the call to
1026
    @type failmsg: str
1027
    @param failmsg: the identifier to be used for logging
1028

1029
    """
1030
    failed = []
1031
    success = []
1032

    
1033
    for node in nodes:
1034
      msg = result[node].fail_msg
1035
      if msg:
1036
        failed.append(node)
1037
        logging.error("RPC call %s (%s) failed on node %s: %s",
1038
                      result[node].call, failmsg, node, msg)
1039
      else:
1040
        success.append(node)
1041

    
1042
    # +1 for the master node
1043
    if (len(success) + 1) < len(failed):
1044
      # TODO: Handle failing nodes
1045
      logging.error("More than half of the nodes failed")
1046

    
1047
  def _GetNodeIp(self):
1048
    """Helper for returning the node name/ip list.
1049

1050
    @rtype: (list, list)
1051
    @return: a tuple of two lists, the first one with the node
1052
        names and the second one with the node addresses
1053

1054
    """
1055
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1056
    name_list = self._nodes.keys()
1057
    addr_list = [self._nodes[name] for name in name_list]
1058
    return name_list, addr_list
1059

    
1060
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1061
    """Writes a file locally and then replicates it to all nodes.
1062

1063
    This function will replace the contents of a file on the local
1064
    node and then replicate it to all the other nodes we have.
1065

1066
    @type file_name: str
1067
    @param file_name: the path of the file to be replicated
1068
    @type data: str
1069
    @param data: the new contents of the file
1070
    @type replicate: boolean
1071
    @param replicate: whether to spread the changes to the remote nodes
1072

1073
    """
1074
    getents = runtime.GetEnts()
1075
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1076
                    gid=getents.masterd_gid)
1077

    
1078
    if replicate:
1079
      names, addrs = self._GetNodeIp()
1080
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1081
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1082

    
1083
  def _RenameFilesUnlocked(self, rename):
1084
    """Renames a file locally and then replicate the change.
1085

1086
    This function will rename a file in the local queue directory
1087
    and then replicate this rename to all the other nodes we have.
1088

1089
    @type rename: list of (old, new)
1090
    @param rename: List containing tuples mapping old to new names
1091

1092
    """
1093
    # Rename them locally
1094
    for old, new in rename:
1095
      utils.RenameFile(old, new, mkdir=True)
1096

    
1097
    # ... and on all nodes
1098
    names, addrs = self._GetNodeIp()
1099
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1100
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1101

    
1102
  @staticmethod
1103
  def _FormatJobID(job_id):
1104
    """Convert a job ID to string format.
1105

1106
    Currently this just does C{str(job_id)} after performing some
1107
    checks, but if we want to change the job id format this will
1108
    abstract this change.
1109

1110
    @type job_id: int or long
1111
    @param job_id: the numeric job id
1112
    @rtype: str
1113
    @return: the formatted job id
1114

1115
    """
1116
    if not isinstance(job_id, (int, long)):
1117
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1118
    if job_id < 0:
1119
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1120

    
1121
    return str(job_id)
1122

    
1123
  @classmethod
1124
  def _GetArchiveDirectory(cls, job_id):
1125
    """Returns the archive directory for a job.
1126

1127
    @type job_id: str
1128
    @param job_id: Job identifier
1129
    @rtype: str
1130
    @return: Directory name
1131

1132
    """
1133
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1134

    
1135
  def _NewSerialsUnlocked(self, count):
1136
    """Generates a new job identifier.
1137

1138
    Job identifiers are unique during the lifetime of a cluster.
1139

1140
    @type count: integer
1141
    @param count: how many serials to return
1142
    @rtype: str
1143
    @return: a string representing the job identifier.
1144

1145
    """
1146
    assert count > 0
1147
    # New number
1148
    serial = self._last_serial + count
1149

    
1150
    # Write to file
1151
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1152
                             "%s\n" % serial, True)
1153

    
1154
    result = [self._FormatJobID(v)
1155
              for v in range(self._last_serial, serial + 1)]
1156
    # Keep it only if we were able to write the file
1157
    self._last_serial = serial
1158

    
1159
    return result
1160

    
1161
  @staticmethod
1162
  def _GetJobPath(job_id):
1163
    """Returns the job file for a given job id.
1164

1165
    @type job_id: str
1166
    @param job_id: the job identifier
1167
    @rtype: str
1168
    @return: the path to the job file
1169

1170
    """
1171
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1172

    
1173
  @classmethod
1174
  def _GetArchivedJobPath(cls, job_id):
1175
    """Returns the archived job file for a give job id.
1176

1177
    @type job_id: str
1178
    @param job_id: the job identifier
1179
    @rtype: str
1180
    @return: the path to the archived job file
1181

1182
    """
1183
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1184
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1185

    
1186
  def _GetJobIDsUnlocked(self, sort=True):
1187
    """Return all known job IDs.
1188

1189
    The method only looks at disk because it's a requirement that all
1190
    jobs are present on disk (so in the _memcache we don't have any
1191
    extra IDs).
1192

1193
    @type sort: boolean
1194
    @param sort: perform sorting on the returned job ids
1195
    @rtype: list
1196
    @return: the list of job IDs
1197

1198
    """
1199
    jlist = []
1200
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1201
      m = self._RE_JOB_FILE.match(filename)
1202
      if m:
1203
        jlist.append(m.group(1))
1204
    if sort:
1205
      jlist = utils.NiceSort(jlist)
1206
    return jlist
1207

    
1208
  def _LoadJobUnlocked(self, job_id):
1209
    """Loads a job from the disk or memory.
1210

1211
    Given a job id, this will return the cached job object if
1212
    existing, or try to load the job from the disk. If loading from
1213
    disk, it will also add the job to the cache.
1214

1215
    @param job_id: the job id
1216
    @rtype: L{_QueuedJob} or None
1217
    @return: either None or the job object
1218

1219
    """
1220
    job = self._memcache.get(job_id, None)
1221
    if job:
1222
      logging.debug("Found job %s in memcache", job_id)
1223
      return job
1224

    
1225
    try:
1226
      job = self._LoadJobFromDisk(job_id)
1227
      if job is None:
1228
        return job
1229
    except errors.JobFileCorrupted:
1230
      old_path = self._GetJobPath(job_id)
1231
      new_path = self._GetArchivedJobPath(job_id)
1232
      if old_path == new_path:
1233
        # job already archived (future case)
1234
        logging.exception("Can't parse job %s", job_id)
1235
      else:
1236
        # non-archived case
1237
        logging.exception("Can't parse job %s, will archive.", job_id)
1238
        self._RenameFilesUnlocked([(old_path, new_path)])
1239
      return None
1240

    
1241
    self._memcache[job_id] = job
1242
    logging.debug("Added job %s to the cache", job_id)
1243
    return job
1244

    
1245
  def _LoadJobFromDisk(self, job_id):
1246
    """Load the given job file from disk.
1247

1248
    Given a job file, read, load and restore it in a _QueuedJob format.
1249

1250
    @type job_id: string
1251
    @param job_id: job identifier
1252
    @rtype: L{_QueuedJob} or None
1253
    @return: either None or the job object
1254

1255
    """
1256
    filepath = self._GetJobPath(job_id)
1257
    logging.debug("Loading job from %s", filepath)
1258
    try:
1259
      raw_data = utils.ReadFile(filepath)
1260
    except EnvironmentError, err:
1261
      if err.errno in (errno.ENOENT, ):
1262
        return None
1263
      raise
1264

    
1265
    try:
1266
      data = serializer.LoadJson(raw_data)
1267
      job = _QueuedJob.Restore(self, data)
1268
    except Exception, err: # pylint: disable-msg=W0703
1269
      raise errors.JobFileCorrupted(err)
1270

    
1271
    return job
1272

    
1273
  def SafeLoadJobFromDisk(self, job_id):
1274
    """Load the given job file from disk.
1275

1276
    Given a job file, read, load and restore it in a _QueuedJob format.
1277
    In case of error reading the job, it gets returned as None, and the
1278
    exception is logged.
1279

1280
    @type job_id: string
1281
    @param job_id: job identifier
1282
    @rtype: L{_QueuedJob} or None
1283
    @return: either None or the job object
1284

1285
    """
1286
    try:
1287
      return self._LoadJobFromDisk(job_id)
1288
    except (errors.JobFileCorrupted, EnvironmentError):
1289
      logging.exception("Can't load/parse job %s", job_id)
1290
      return None
1291

    
1292
  @staticmethod
1293
  def _IsQueueMarkedDrain():
1294
    """Check if the queue is marked from drain.
1295

1296
    This currently uses the queue drain file, which makes it a
1297
    per-node flag. In the future this can be moved to the config file.
1298

1299
    @rtype: boolean
1300
    @return: True of the job queue is marked for draining
1301

1302
    """
1303
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1304

    
1305
  def _UpdateQueueSizeUnlocked(self):
1306
    """Update the queue size.
1307

1308
    """
1309
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1310

    
1311
  @locking.ssynchronized(_LOCK)
1312
  @_RequireOpenQueue
1313
  def SetDrainFlag(self, drain_flag):
1314
    """Sets the drain flag for the queue.
1315

1316
    @type drain_flag: boolean
1317
    @param drain_flag: Whether to set or unset the drain flag
1318

1319
    """
1320
    getents = runtime.GetEnts()
1321

    
1322
    if drain_flag:
1323
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
1324
                      uid=getents.masterd_uid, gid=getents.masterd_gid)
1325
    else:
1326
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1327

    
1328
    self._drained = drain_flag
1329

    
1330
    return True
1331

    
1332
  @_RequireOpenQueue
1333
  def _SubmitJobUnlocked(self, job_id, ops):
1334
    """Create and store a new job.
1335

1336
    This enters the job into our job queue and also puts it on the new
1337
    queue, in order for it to be picked up by the queue processors.
1338

1339
    @type job_id: job ID
1340
    @param job_id: the job ID for the new job
1341
    @type ops: list
1342
    @param ops: The list of OpCodes that will become the new job.
1343
    @rtype: L{_QueuedJob}
1344
    @return: the job object to be queued
1345
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1346
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1347

1348
    """
1349
    # Ok when sharing the big job queue lock, as the drain file is created when
1350
    # the lock is exclusive.
1351
    if self._drained:
1352
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1353

    
1354
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1355
      raise errors.JobQueueFull()
1356

    
1357
    job = _QueuedJob(self, job_id, ops)
1358

    
1359
    # Write to disk
1360
    self.UpdateJobUnlocked(job)
1361

    
1362
    self._queue_size += 1
1363

    
1364
    logging.debug("Adding new job %s to the cache", job_id)
1365
    self._memcache[job_id] = job
1366

    
1367
    return job
1368

    
1369
  @locking.ssynchronized(_LOCK)
1370
  @_RequireOpenQueue
1371
  def SubmitJob(self, ops):
1372
    """Create and store a new job.
1373

1374
    @see: L{_SubmitJobUnlocked}
1375

1376
    """
1377
    job_id = self._NewSerialsUnlocked(1)[0]
1378
    self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
1379
    return job_id
1380

    
1381
  @locking.ssynchronized(_LOCK)
1382
  @_RequireOpenQueue
1383
  def SubmitManyJobs(self, jobs):
1384
    """Create and store multiple jobs.
1385

1386
    @see: L{_SubmitJobUnlocked}
1387

1388
    """
1389
    results = []
1390
    tasks = []
1391
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1392
    for job_id, ops in zip(all_job_ids, jobs):
1393
      try:
1394
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1395
        status = True
1396
        data = job_id
1397
      except errors.GenericError, err:
1398
        data = str(err)
1399
        status = False
1400
      results.append((status, data))
1401
    self._wpool.AddManyTasks(tasks)
1402

    
1403
    return results
1404

    
1405
  @_RequireOpenQueue
1406
  def UpdateJobUnlocked(self, job, replicate=True):
1407
    """Update a job's on disk storage.
1408

1409
    After a job has been modified, this function needs to be called in
1410
    order to write the changes to disk and replicate them to the other
1411
    nodes.
1412

1413
    @type job: L{_QueuedJob}
1414
    @param job: the changed job
1415
    @type replicate: boolean
1416
    @param replicate: whether to replicate the change to remote nodes
1417

1418
    """
1419
    filename = self._GetJobPath(job.id)
1420
    data = serializer.DumpJson(job.Serialize(), indent=False)
1421
    logging.debug("Writing job %s to %s", job.id, filename)
1422
    self._UpdateJobQueueFile(filename, data, replicate)
1423

    
1424
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1425
                        timeout):
1426
    """Waits for changes in a job.
1427

1428
    @type job_id: string
1429
    @param job_id: Job identifier
1430
    @type fields: list of strings
1431
    @param fields: Which fields to check for changes
1432
    @type prev_job_info: list or None
1433
    @param prev_job_info: Last job information returned
1434
    @type prev_log_serial: int
1435
    @param prev_log_serial: Last job message serial number
1436
    @type timeout: float
1437
    @param timeout: maximum time to wait in seconds
1438
    @rtype: tuple (job info, log entries)
1439
    @return: a tuple of the job information as required via
1440
        the fields parameter, and the log entries as a list
1441

1442
        if the job has not changed and the timeout has expired,
1443
        we instead return a special value,
1444
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1445
        as such by the clients
1446

1447
    """
1448
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1449

    
1450
    helper = _WaitForJobChangesHelper()
1451

    
1452
    return helper(self._GetJobPath(job_id), load_fn,
1453
                  fields, prev_job_info, prev_log_serial, timeout)
1454

    
1455
  @locking.ssynchronized(_LOCK)
1456
  @_RequireOpenQueue
1457
  def CancelJob(self, job_id):
1458
    """Cancels a job.
1459

1460
    This will only succeed if the job has not started yet.
1461

1462
    @type job_id: string
1463
    @param job_id: job ID of job to be cancelled.
1464

1465
    """
1466
    logging.info("Cancelling job %s", job_id)
1467

    
1468
    job = self._LoadJobUnlocked(job_id)
1469
    if not job:
1470
      logging.debug("Job %s not found", job_id)
1471
      return (False, "Job %s not found" % job_id)
1472

    
1473
    job_status = job.CalcStatus()
1474

    
1475
    if job_status not in (constants.JOB_STATUS_QUEUED,
1476
                          constants.JOB_STATUS_WAITLOCK):
1477
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1478
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1479

    
1480
    if job_status == constants.JOB_STATUS_QUEUED:
1481
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1482
                            "Job canceled by request")
1483
      return (True, "Job %s canceled" % job.id)
1484

    
1485
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1486
      # The worker will notice the new status and cancel the job
1487
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1488
      return (True, "Job %s will be canceled" % job.id)
1489

    
1490
  @_RequireOpenQueue
1491
  def _ArchiveJobsUnlocked(self, jobs):
1492
    """Archives jobs.
1493

1494
    @type jobs: list of L{_QueuedJob}
1495
    @param jobs: Job objects
1496
    @rtype: int
1497
    @return: Number of archived jobs
1498

1499
    """
1500
    archive_jobs = []
1501
    rename_files = []
1502
    for job in jobs:
1503
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1504
        logging.debug("Job %s is not yet done", job.id)
1505
        continue
1506

    
1507
      archive_jobs.append(job)
1508

    
1509
      old = self._GetJobPath(job.id)
1510
      new = self._GetArchivedJobPath(job.id)
1511
      rename_files.append((old, new))
1512

    
1513
    # TODO: What if 1..n files fail to rename?
1514
    self._RenameFilesUnlocked(rename_files)
1515

    
1516
    logging.debug("Successfully archived job(s) %s",
1517
                  utils.CommaJoin(job.id for job in archive_jobs))
1518

    
1519
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1520
    # the files, we update the cached queue size from the filesystem. When we
1521
    # get around to fix the TODO: above, we can use the number of actually
1522
    # archived jobs to fix this.
1523
    self._UpdateQueueSizeUnlocked()
1524
    return len(archive_jobs)
1525

    
1526
  @locking.ssynchronized(_LOCK)
1527
  @_RequireOpenQueue
1528
  def ArchiveJob(self, job_id):
1529
    """Archives a job.
1530

1531
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1532

1533
    @type job_id: string
1534
    @param job_id: Job ID of job to be archived.
1535
    @rtype: bool
1536
    @return: Whether job was archived
1537

1538
    """
1539
    logging.info("Archiving job %s", job_id)
1540

    
1541
    job = self._LoadJobUnlocked(job_id)
1542
    if not job:
1543
      logging.debug("Job %s not found", job_id)
1544
      return False
1545

    
1546
    return self._ArchiveJobsUnlocked([job]) == 1
1547

    
1548
  @locking.ssynchronized(_LOCK)
1549
  @_RequireOpenQueue
1550
  def AutoArchiveJobs(self, age, timeout):
1551
    """Archives all jobs based on age.
1552

1553
    The method will archive all jobs which are older than the age
1554
    parameter. For jobs that don't have an end timestamp, the start
1555
    timestamp will be considered. The special '-1' age will cause
1556
    archival of all jobs (that are not running or queued).
1557

1558
    @type age: int
1559
    @param age: the minimum age in seconds
1560

1561
    """
1562
    logging.info("Archiving jobs with age more than %s seconds", age)
1563

    
1564
    now = time.time()
1565
    end_time = now + timeout
1566
    archived_count = 0
1567
    last_touched = 0
1568

    
1569
    all_job_ids = self._GetJobIDsUnlocked()
1570
    pending = []
1571
    for idx, job_id in enumerate(all_job_ids):
1572
      last_touched = idx + 1
1573

    
1574
      # Not optimal because jobs could be pending
1575
      # TODO: Measure average duration for job archival and take number of
1576
      # pending jobs into account.
1577
      if time.time() > end_time:
1578
        break
1579

    
1580
      # Returns None if the job failed to load
1581
      job = self._LoadJobUnlocked(job_id)
1582
      if job:
1583
        if job.end_timestamp is None:
1584
          if job.start_timestamp is None:
1585
            job_age = job.received_timestamp
1586
          else:
1587
            job_age = job.start_timestamp
1588
        else:
1589
          job_age = job.end_timestamp
1590

    
1591
        if age == -1 or now - job_age[0] > age:
1592
          pending.append(job)
1593

    
1594
          # Archive 10 jobs at a time
1595
          if len(pending) >= 10:
1596
            archived_count += self._ArchiveJobsUnlocked(pending)
1597
            pending = []
1598

    
1599
    if pending:
1600
      archived_count += self._ArchiveJobsUnlocked(pending)
1601

    
1602
    return (archived_count, len(all_job_ids) - last_touched)
1603

    
1604
  def QueryJobs(self, job_ids, fields):
1605
    """Returns a list of jobs in queue.
1606

1607
    @type job_ids: list
1608
    @param job_ids: sequence of job identifiers or None for all
1609
    @type fields: list
1610
    @param fields: names of fields to return
1611
    @rtype: list
1612
    @return: list one element per job, each element being list with
1613
        the requested fields
1614

1615
    """
1616
    jobs = []
1617
    list_all = False
1618
    if not job_ids:
1619
      # Since files are added to/removed from the queue atomically, there's no
1620
      # risk of getting the job ids in an inconsistent state.
1621
      job_ids = self._GetJobIDsUnlocked()
1622
      list_all = True
1623

    
1624
    for job_id in job_ids:
1625
      job = self.SafeLoadJobFromDisk(job_id)
1626
      if job is not None:
1627
        jobs.append(job.GetInfo(fields))
1628
      elif not list_all:
1629
        jobs.append(None)
1630

    
1631
    return jobs
1632

    
1633
  @locking.ssynchronized(_LOCK)
1634
  @_RequireOpenQueue
1635
  def Shutdown(self):
1636
    """Stops the job queue.
1637

1638
    This shutdowns all the worker threads an closes the queue.
1639

1640
    """
1641
    self._wpool.TerminateWorkers()
1642

    
1643
    self._queue_filelock.Close()
1644
    self._queue_filelock = None