Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ c0f6d0d8

History | View | Annotate | Download (67.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import re
35
import time
36
import weakref
37
import threading
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
59
from ganeti import ht
60

    
61

    
62
JOBQUEUE_THREADS = 25
63
JOBS_PER_ARCHIVE_DIRECTORY = 10000
64

    
65
# member lock names to be passed to @ssynchronized decorator
66
_LOCK = "_lock"
67
_QUEUE = "_queue"
68

    
69

    
70
class CancelJob(Exception):
71
  """Special exception to cancel a job.
72

73
  """
74

    
75

    
76
def TimeStampNow():
77
  """Returns the current timestamp.
78

79
  @rtype: tuple
80
  @return: the current time in the (seconds, microseconds) format
81

82
  """
83
  return utils.SplitTime(time.time())
84

    
85

    
86
class _QueuedOpCode(object):
87
  """Encapsulates an opcode object.
88

89
  @ivar log: holds the execution log and consists of tuples
90
  of the form C{(log_serial, timestamp, level, message)}
91
  @ivar input: the OpCode we encapsulate
92
  @ivar status: the current status
93
  @ivar result: the result of the LU execution
94
  @ivar start_timestamp: timestamp for the start of the execution
95
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
96
  @ivar stop_timestamp: timestamp for the end of the execution
97

98
  """
99
  __slots__ = ["input", "status", "result", "log", "priority",
100
               "start_timestamp", "exec_timestamp", "end_timestamp",
101
               "__weakref__"]
102

    
103
  def __init__(self, op):
104
    """Constructor for the _QuededOpCode.
105

106
    @type op: L{opcodes.OpCode}
107
    @param op: the opcode we encapsulate
108

109
    """
110
    self.input = op
111
    self.status = constants.OP_STATUS_QUEUED
112
    self.result = None
113
    self.log = []
114
    self.start_timestamp = None
115
    self.exec_timestamp = None
116
    self.end_timestamp = None
117

    
118
    # Get initial priority (it might change during the lifetime of this opcode)
119
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
120

    
121
  @classmethod
122
  def Restore(cls, state):
123
    """Restore the _QueuedOpCode from the serialized form.
124

125
    @type state: dict
126
    @param state: the serialized state
127
    @rtype: _QueuedOpCode
128
    @return: a new _QueuedOpCode instance
129

130
    """
131
    obj = _QueuedOpCode.__new__(cls)
132
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
133
    obj.status = state["status"]
134
    obj.result = state["result"]
135
    obj.log = state["log"]
136
    obj.start_timestamp = state.get("start_timestamp", None)
137
    obj.exec_timestamp = state.get("exec_timestamp", None)
138
    obj.end_timestamp = state.get("end_timestamp", None)
139
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
140
    return obj
141

    
142
  def Serialize(self):
143
    """Serializes this _QueuedOpCode.
144

145
    @rtype: dict
146
    @return: the dictionary holding the serialized state
147

148
    """
149
    return {
150
      "input": self.input.__getstate__(),
151
      "status": self.status,
152
      "result": self.result,
153
      "log": self.log,
154
      "start_timestamp": self.start_timestamp,
155
      "exec_timestamp": self.exec_timestamp,
156
      "end_timestamp": self.end_timestamp,
157
      "priority": self.priority,
158
      }
159

    
160

    
161
class _QueuedJob(object):
162
  """In-memory job representation.
163

164
  This is what we use to track the user-submitted jobs. Locking must
165
  be taken care of by users of this class.
166

167
  @type queue: L{JobQueue}
168
  @ivar queue: the parent queue
169
  @ivar id: the job ID
170
  @type ops: list
171
  @ivar ops: the list of _QueuedOpCode that constitute the job
172
  @type log_serial: int
173
  @ivar log_serial: holds the index for the next log entry
174
  @ivar received_timestamp: the timestamp for when the job was received
175
  @ivar start_timestmap: the timestamp for start of execution
176
  @ivar end_timestamp: the timestamp for end of execution
177
  @ivar writable: Whether the job is allowed to be modified
178

179
  """
180
  # pylint: disable-msg=W0212
181
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
182
               "received_timestamp", "start_timestamp", "end_timestamp",
183
               "__weakref__", "processor_lock", "writable"]
184

    
185
  def __init__(self, queue, job_id, ops, writable):
186
    """Constructor for the _QueuedJob.
187

188
    @type queue: L{JobQueue}
189
    @param queue: our parent queue
190
    @type job_id: job_id
191
    @param job_id: our job id
192
    @type ops: list
193
    @param ops: the list of opcodes we hold, which will be encapsulated
194
        in _QueuedOpCodes
195
    @type writable: bool
196
    @param writable: Whether job can be modified
197

198
    """
199
    if not ops:
200
      raise errors.GenericError("A job needs at least one opcode")
201

    
202
    self.queue = queue
203
    self.id = job_id
204
    self.ops = [_QueuedOpCode(op) for op in ops]
205
    self.log_serial = 0
206
    self.received_timestamp = TimeStampNow()
207
    self.start_timestamp = None
208
    self.end_timestamp = None
209

    
210
    self._InitInMemory(self, writable)
211

    
212
  @staticmethod
213
  def _InitInMemory(obj, writable):
214
    """Initializes in-memory variables.
215

216
    """
217
    obj.writable = writable
218
    obj.ops_iter = None
219
    obj.cur_opctx = None
220
    obj.processor_lock = threading.Lock()
221

    
222
  def __repr__(self):
223
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
224
              "id=%s" % self.id,
225
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
226

    
227
    return "<%s at %#x>" % (" ".join(status), id(self))
228

    
229
  @classmethod
230
  def Restore(cls, queue, state, writable):
231
    """Restore a _QueuedJob from serialized state:
232

233
    @type queue: L{JobQueue}
234
    @param queue: to which queue the restored job belongs
235
    @type state: dict
236
    @param state: the serialized state
237
    @type writable: bool
238
    @param writable: Whether job can be modified
239
    @rtype: _JobQueue
240
    @return: the restored _JobQueue instance
241

242
    """
243
    obj = _QueuedJob.__new__(cls)
244
    obj.queue = queue
245
    obj.id = state["id"]
246
    obj.received_timestamp = state.get("received_timestamp", None)
247
    obj.start_timestamp = state.get("start_timestamp", None)
248
    obj.end_timestamp = state.get("end_timestamp", None)
249

    
250
    obj.ops = []
251
    obj.log_serial = 0
252
    for op_state in state["ops"]:
253
      op = _QueuedOpCode.Restore(op_state)
254
      for log_entry in op.log:
255
        obj.log_serial = max(obj.log_serial, log_entry[0])
256
      obj.ops.append(op)
257

    
258
    cls._InitInMemory(obj, writable)
259

    
260
    return obj
261

    
262
  def Serialize(self):
263
    """Serialize the _JobQueue instance.
264

265
    @rtype: dict
266
    @return: the serialized state
267

268
    """
269
    return {
270
      "id": self.id,
271
      "ops": [op.Serialize() for op in self.ops],
272
      "start_timestamp": self.start_timestamp,
273
      "end_timestamp": self.end_timestamp,
274
      "received_timestamp": self.received_timestamp,
275
      }
276

    
277
  def CalcStatus(self):
278
    """Compute the status of this job.
279

280
    This function iterates over all the _QueuedOpCodes in the job and
281
    based on their status, computes the job status.
282

283
    The algorithm is:
284
      - if we find a cancelled, or finished with error, the job
285
        status will be the same
286
      - otherwise, the last opcode with the status one of:
287
          - waitlock
288
          - canceling
289
          - running
290

291
        will determine the job status
292

293
      - otherwise, it means either all opcodes are queued, or success,
294
        and the job status will be the same
295

296
    @return: the job status
297

298
    """
299
    status = constants.JOB_STATUS_QUEUED
300

    
301
    all_success = True
302
    for op in self.ops:
303
      if op.status == constants.OP_STATUS_SUCCESS:
304
        continue
305

    
306
      all_success = False
307

    
308
      if op.status == constants.OP_STATUS_QUEUED:
309
        pass
310
      elif op.status == constants.OP_STATUS_WAITLOCK:
311
        status = constants.JOB_STATUS_WAITLOCK
312
      elif op.status == constants.OP_STATUS_RUNNING:
313
        status = constants.JOB_STATUS_RUNNING
314
      elif op.status == constants.OP_STATUS_CANCELING:
315
        status = constants.JOB_STATUS_CANCELING
316
        break
317
      elif op.status == constants.OP_STATUS_ERROR:
318
        status = constants.JOB_STATUS_ERROR
319
        # The whole job fails if one opcode failed
320
        break
321
      elif op.status == constants.OP_STATUS_CANCELED:
322
        status = constants.OP_STATUS_CANCELED
323
        break
324

    
325
    if all_success:
326
      status = constants.JOB_STATUS_SUCCESS
327

    
328
    return status
329

    
330
  def CalcPriority(self):
331
    """Gets the current priority for this job.
332

333
    Only unfinished opcodes are considered. When all are done, the default
334
    priority is used.
335

336
    @rtype: int
337

338
    """
339
    priorities = [op.priority for op in self.ops
340
                  if op.status not in constants.OPS_FINALIZED]
341

    
342
    if not priorities:
343
      # All opcodes are done, assume default priority
344
      return constants.OP_PRIO_DEFAULT
345

    
346
    return min(priorities)
347

    
348
  def GetLogEntries(self, newer_than):
349
    """Selectively returns the log entries.
350

351
    @type newer_than: None or int
352
    @param newer_than: if this is None, return all log entries,
353
        otherwise return only the log entries with serial higher
354
        than this value
355
    @rtype: list
356
    @return: the list of the log entries selected
357

358
    """
359
    if newer_than is None:
360
      serial = -1
361
    else:
362
      serial = newer_than
363

    
364
    entries = []
365
    for op in self.ops:
366
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
367

    
368
    return entries
369

    
370
  def GetInfo(self, fields):
371
    """Returns information about a job.
372

373
    @type fields: list
374
    @param fields: names of fields to return
375
    @rtype: list
376
    @return: list with one element for each field
377
    @raise errors.OpExecError: when an invalid field
378
        has been passed
379

380
    """
381
    row = []
382
    for fname in fields:
383
      if fname == "id":
384
        row.append(self.id)
385
      elif fname == "status":
386
        row.append(self.CalcStatus())
387
      elif fname == "priority":
388
        row.append(self.CalcPriority())
389
      elif fname == "ops":
390
        row.append([op.input.__getstate__() for op in self.ops])
391
      elif fname == "opresult":
392
        row.append([op.result for op in self.ops])
393
      elif fname == "opstatus":
394
        row.append([op.status for op in self.ops])
395
      elif fname == "oplog":
396
        row.append([op.log for op in self.ops])
397
      elif fname == "opstart":
398
        row.append([op.start_timestamp for op in self.ops])
399
      elif fname == "opexec":
400
        row.append([op.exec_timestamp for op in self.ops])
401
      elif fname == "opend":
402
        row.append([op.end_timestamp for op in self.ops])
403
      elif fname == "oppriority":
404
        row.append([op.priority for op in self.ops])
405
      elif fname == "received_ts":
406
        row.append(self.received_timestamp)
407
      elif fname == "start_ts":
408
        row.append(self.start_timestamp)
409
      elif fname == "end_ts":
410
        row.append(self.end_timestamp)
411
      elif fname == "summary":
412
        row.append([op.input.Summary() for op in self.ops])
413
      else:
414
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
415
    return row
416

    
417
  def MarkUnfinishedOps(self, status, result):
418
    """Mark unfinished opcodes with a given status and result.
419

420
    This is an utility function for marking all running or waiting to
421
    be run opcodes with a given status. Opcodes which are already
422
    finalised are not changed.
423

424
    @param status: a given opcode status
425
    @param result: the opcode result
426

427
    """
428
    not_marked = True
429
    for op in self.ops:
430
      if op.status in constants.OPS_FINALIZED:
431
        assert not_marked, "Finalized opcodes found after non-finalized ones"
432
        continue
433
      op.status = status
434
      op.result = result
435
      not_marked = False
436

    
437
  def Finalize(self):
438
    """Marks the job as finalized.
439

440
    """
441
    self.end_timestamp = TimeStampNow()
442

    
443
  def Cancel(self):
444
    """Marks job as canceled/-ing if possible.
445

446
    @rtype: tuple; (bool, string)
447
    @return: Boolean describing whether job was successfully canceled or marked
448
      as canceling and a text message
449

450
    """
451
    status = self.CalcStatus()
452

    
453
    if status == constants.JOB_STATUS_QUEUED:
454
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
455
                             "Job canceled by request")
456
      self.Finalize()
457
      return (True, "Job %s canceled" % self.id)
458

    
459
    elif status == constants.JOB_STATUS_WAITLOCK:
460
      # The worker will notice the new status and cancel the job
461
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
462
      return (True, "Job %s will be canceled" % self.id)
463

    
464
    else:
465
      logging.debug("Job %s is no longer waiting in the queue", self.id)
466
      return (False, "Job %s is no longer waiting in the queue" % self.id)
467

    
468

    
469
class _OpExecCallbacks(mcpu.OpExecCbBase):
470
  def __init__(self, queue, job, op):
471
    """Initializes this class.
472

473
    @type queue: L{JobQueue}
474
    @param queue: Job queue
475
    @type job: L{_QueuedJob}
476
    @param job: Job object
477
    @type op: L{_QueuedOpCode}
478
    @param op: OpCode
479

480
    """
481
    assert queue, "Queue is missing"
482
    assert job, "Job is missing"
483
    assert op, "Opcode is missing"
484

    
485
    self._queue = queue
486
    self._job = job
487
    self._op = op
488

    
489
  def _CheckCancel(self):
490
    """Raises an exception to cancel the job if asked to.
491

492
    """
493
    # Cancel here if we were asked to
494
    if self._op.status == constants.OP_STATUS_CANCELING:
495
      logging.debug("Canceling opcode")
496
      raise CancelJob()
497

    
498
  @locking.ssynchronized(_QUEUE, shared=1)
499
  def NotifyStart(self):
500
    """Mark the opcode as running, not lock-waiting.
501

502
    This is called from the mcpu code as a notifier function, when the LU is
503
    finally about to start the Exec() method. Of course, to have end-user
504
    visible results, the opcode must be initially (before calling into
505
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
506

507
    """
508
    assert self._op in self._job.ops
509
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
510
                               constants.OP_STATUS_CANCELING)
511

    
512
    # Cancel here if we were asked to
513
    self._CheckCancel()
514

    
515
    logging.debug("Opcode is now running")
516

    
517
    self._op.status = constants.OP_STATUS_RUNNING
518
    self._op.exec_timestamp = TimeStampNow()
519

    
520
    # And finally replicate the job status
521
    self._queue.UpdateJobUnlocked(self._job)
522

    
523
  @locking.ssynchronized(_QUEUE, shared=1)
524
  def _AppendFeedback(self, timestamp, log_type, log_msg):
525
    """Internal feedback append function, with locks
526

527
    """
528
    self._job.log_serial += 1
529
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
530
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
531

    
532
  def Feedback(self, *args):
533
    """Append a log entry.
534

535
    """
536
    assert len(args) < 3
537

    
538
    if len(args) == 1:
539
      log_type = constants.ELOG_MESSAGE
540
      log_msg = args[0]
541
    else:
542
      (log_type, log_msg) = args
543

    
544
    # The time is split to make serialization easier and not lose
545
    # precision.
546
    timestamp = utils.SplitTime(time.time())
547
    self._AppendFeedback(timestamp, log_type, log_msg)
548

    
549
  def CheckCancel(self):
550
    """Check whether job has been cancelled.
551

552
    """
553
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
554
                               constants.OP_STATUS_CANCELING)
555

    
556
    # Cancel here if we were asked to
557
    self._CheckCancel()
558

    
559
  def SubmitManyJobs(self, jobs):
560
    """Submits jobs for processing.
561

562
    See L{JobQueue.SubmitManyJobs}.
563

564
    """
565
    # Locking is done in job queue
566
    return self._queue.SubmitManyJobs(jobs)
567

    
568

    
569
class _JobChangesChecker(object):
570
  def __init__(self, fields, prev_job_info, prev_log_serial):
571
    """Initializes this class.
572

573
    @type fields: list of strings
574
    @param fields: Fields requested by LUXI client
575
    @type prev_job_info: string
576
    @param prev_job_info: previous job info, as passed by the LUXI client
577
    @type prev_log_serial: string
578
    @param prev_log_serial: previous job serial, as passed by the LUXI client
579

580
    """
581
    self._fields = fields
582
    self._prev_job_info = prev_job_info
583
    self._prev_log_serial = prev_log_serial
584

    
585
  def __call__(self, job):
586
    """Checks whether job has changed.
587

588
    @type job: L{_QueuedJob}
589
    @param job: Job object
590

591
    """
592
    assert not job.writable, "Expected read-only job"
593

    
594
    status = job.CalcStatus()
595
    job_info = job.GetInfo(self._fields)
596
    log_entries = job.GetLogEntries(self._prev_log_serial)
597

    
598
    # Serializing and deserializing data can cause type changes (e.g. from
599
    # tuple to list) or precision loss. We're doing it here so that we get
600
    # the same modifications as the data received from the client. Without
601
    # this, the comparison afterwards might fail without the data being
602
    # significantly different.
603
    # TODO: we just deserialized from disk, investigate how to make sure that
604
    # the job info and log entries are compatible to avoid this further step.
605
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
606
    # efficient, though floats will be tricky
607
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
608
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
609

    
610
    # Don't even try to wait if the job is no longer running, there will be
611
    # no changes.
612
    if (status not in (constants.JOB_STATUS_QUEUED,
613
                       constants.JOB_STATUS_RUNNING,
614
                       constants.JOB_STATUS_WAITLOCK) or
615
        job_info != self._prev_job_info or
616
        (log_entries and self._prev_log_serial != log_entries[0][0])):
617
      logging.debug("Job %s changed", job.id)
618
      return (job_info, log_entries)
619

    
620
    return None
621

    
622

    
623
class _JobFileChangesWaiter(object):
624
  def __init__(self, filename):
625
    """Initializes this class.
626

627
    @type filename: string
628
    @param filename: Path to job file
629
    @raises errors.InotifyError: if the notifier cannot be setup
630

631
    """
632
    self._wm = pyinotify.WatchManager()
633
    self._inotify_handler = \
634
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
635
    self._notifier = \
636
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
637
    try:
638
      self._inotify_handler.enable()
639
    except Exception:
640
      # pyinotify doesn't close file descriptors automatically
641
      self._notifier.stop()
642
      raise
643

    
644
  def _OnInotify(self, notifier_enabled):
645
    """Callback for inotify.
646

647
    """
648
    if not notifier_enabled:
649
      self._inotify_handler.enable()
650

    
651
  def Wait(self, timeout):
652
    """Waits for the job file to change.
653

654
    @type timeout: float
655
    @param timeout: Timeout in seconds
656
    @return: Whether there have been events
657

658
    """
659
    assert timeout >= 0
660
    have_events = self._notifier.check_events(timeout * 1000)
661
    if have_events:
662
      self._notifier.read_events()
663
    self._notifier.process_events()
664
    return have_events
665

    
666
  def Close(self):
667
    """Closes underlying notifier and its file descriptor.
668

669
    """
670
    self._notifier.stop()
671

    
672

    
673
class _JobChangesWaiter(object):
674
  def __init__(self, filename):
675
    """Initializes this class.
676

677
    @type filename: string
678
    @param filename: Path to job file
679

680
    """
681
    self._filewaiter = None
682
    self._filename = filename
683

    
684
  def Wait(self, timeout):
685
    """Waits for a job to change.
686

687
    @type timeout: float
688
    @param timeout: Timeout in seconds
689
    @return: Whether there have been events
690

691
    """
692
    if self._filewaiter:
693
      return self._filewaiter.Wait(timeout)
694

    
695
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
696
    # If this point is reached, return immediately and let caller check the job
697
    # file again in case there were changes since the last check. This avoids a
698
    # race condition.
699
    self._filewaiter = _JobFileChangesWaiter(self._filename)
700

    
701
    return True
702

    
703
  def Close(self):
704
    """Closes underlying waiter.
705

706
    """
707
    if self._filewaiter:
708
      self._filewaiter.Close()
709

    
710

    
711
class _WaitForJobChangesHelper(object):
712
  """Helper class using inotify to wait for changes in a job file.
713

714
  This class takes a previous job status and serial, and alerts the client when
715
  the current job status has changed.
716

717
  """
718
  @staticmethod
719
  def _CheckForChanges(job_load_fn, check_fn):
720
    job = job_load_fn()
721
    if not job:
722
      raise errors.JobLost()
723

    
724
    result = check_fn(job)
725
    if result is None:
726
      raise utils.RetryAgain()
727

    
728
    return result
729

    
730
  def __call__(self, filename, job_load_fn,
731
               fields, prev_job_info, prev_log_serial, timeout):
732
    """Waits for changes on a job.
733

734
    @type filename: string
735
    @param filename: File on which to wait for changes
736
    @type job_load_fn: callable
737
    @param job_load_fn: Function to load job
738
    @type fields: list of strings
739
    @param fields: Which fields to check for changes
740
    @type prev_job_info: list or None
741
    @param prev_job_info: Last job information returned
742
    @type prev_log_serial: int
743
    @param prev_log_serial: Last job message serial number
744
    @type timeout: float
745
    @param timeout: maximum time to wait in seconds
746

747
    """
748
    try:
749
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
750
      waiter = _JobChangesWaiter(filename)
751
      try:
752
        return utils.Retry(compat.partial(self._CheckForChanges,
753
                                          job_load_fn, check_fn),
754
                           utils.RETRY_REMAINING_TIME, timeout,
755
                           wait_fn=waiter.Wait)
756
      finally:
757
        waiter.Close()
758
    except (errors.InotifyError, errors.JobLost):
759
      return None
760
    except utils.RetryTimeout:
761
      return constants.JOB_NOTCHANGED
762

    
763

    
764
def _EncodeOpError(err):
765
  """Encodes an error which occurred while processing an opcode.
766

767
  """
768
  if isinstance(err, errors.GenericError):
769
    to_encode = err
770
  else:
771
    to_encode = errors.OpExecError(str(err))
772

    
773
  return errors.EncodeException(to_encode)
774

    
775

    
776
class _TimeoutStrategyWrapper:
777
  def __init__(self, fn):
778
    """Initializes this class.
779

780
    """
781
    self._fn = fn
782
    self._next = None
783

    
784
  def _Advance(self):
785
    """Gets the next timeout if necessary.
786

787
    """
788
    if self._next is None:
789
      self._next = self._fn()
790

    
791
  def Peek(self):
792
    """Returns the next timeout.
793

794
    """
795
    self._Advance()
796
    return self._next
797

    
798
  def Next(self):
799
    """Returns the current timeout and advances the internal state.
800

801
    """
802
    self._Advance()
803
    result = self._next
804
    self._next = None
805
    return result
806

    
807

    
808
class _OpExecContext:
809
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
810
    """Initializes this class.
811

812
    """
813
    self.op = op
814
    self.index = index
815
    self.log_prefix = log_prefix
816
    self.summary = op.input.Summary()
817

    
818
    # Create local copy to modify
819
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
820
      self.jobdeps = op.input.depends[:]
821
    else:
822
      self.jobdeps = None
823

    
824
    self._timeout_strategy_factory = timeout_strategy_factory
825
    self._ResetTimeoutStrategy()
826

    
827
  def _ResetTimeoutStrategy(self):
828
    """Creates a new timeout strategy.
829

830
    """
831
    self._timeout_strategy = \
832
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
833

    
834
  def CheckPriorityIncrease(self):
835
    """Checks whether priority can and should be increased.
836

837
    Called when locks couldn't be acquired.
838

839
    """
840
    op = self.op
841

    
842
    # Exhausted all retries and next round should not use blocking acquire
843
    # for locks?
844
    if (self._timeout_strategy.Peek() is None and
845
        op.priority > constants.OP_PRIO_HIGHEST):
846
      logging.debug("Increasing priority")
847
      op.priority -= 1
848
      self._ResetTimeoutStrategy()
849
      return True
850

    
851
    return False
852

    
853
  def GetNextLockTimeout(self):
854
    """Returns the next lock acquire timeout.
855

856
    """
857
    return self._timeout_strategy.Next()
858

    
859

    
860
class _JobProcessor(object):
861
  def __init__(self, queue, opexec_fn, job,
862
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
863
    """Initializes this class.
864

865
    """
866
    self.queue = queue
867
    self.opexec_fn = opexec_fn
868
    self.job = job
869
    self._timeout_strategy_factory = _timeout_strategy_factory
870

    
871
  @staticmethod
872
  def _FindNextOpcode(job, timeout_strategy_factory):
873
    """Locates the next opcode to run.
874

875
    @type job: L{_QueuedJob}
876
    @param job: Job object
877
    @param timeout_strategy_factory: Callable to create new timeout strategy
878

879
    """
880
    # Create some sort of a cache to speed up locating next opcode for future
881
    # lookups
882
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
883
    # pending and one for processed ops.
884
    if job.ops_iter is None:
885
      job.ops_iter = enumerate(job.ops)
886

    
887
    # Find next opcode to run
888
    while True:
889
      try:
890
        (idx, op) = job.ops_iter.next()
891
      except StopIteration:
892
        raise errors.ProgrammerError("Called for a finished job")
893

    
894
      if op.status == constants.OP_STATUS_RUNNING:
895
        # Found an opcode already marked as running
896
        raise errors.ProgrammerError("Called for job marked as running")
897

    
898
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
899
                             timeout_strategy_factory)
900

    
901
      if op.status not in constants.OPS_FINALIZED:
902
        return opctx
903

    
904
      # This is a job that was partially completed before master daemon
905
      # shutdown, so it can be expected that some opcodes are already
906
      # completed successfully (if any did error out, then the whole job
907
      # should have been aborted and not resubmitted for processing).
908
      logging.info("%s: opcode %s already processed, skipping",
909
                   opctx.log_prefix, opctx.summary)
910

    
911
  @staticmethod
912
  def _MarkWaitlock(job, op):
913
    """Marks an opcode as waiting for locks.
914

915
    The job's start timestamp is also set if necessary.
916

917
    @type job: L{_QueuedJob}
918
    @param job: Job object
919
    @type op: L{_QueuedOpCode}
920
    @param op: Opcode object
921

922
    """
923
    assert op in job.ops
924
    assert op.status in (constants.OP_STATUS_QUEUED,
925
                         constants.OP_STATUS_WAITLOCK)
926

    
927
    update = False
928

    
929
    op.result = None
930

    
931
    if op.status == constants.OP_STATUS_QUEUED:
932
      op.status = constants.OP_STATUS_WAITLOCK
933
      update = True
934

    
935
    if op.start_timestamp is None:
936
      op.start_timestamp = TimeStampNow()
937
      update = True
938

    
939
    if job.start_timestamp is None:
940
      job.start_timestamp = op.start_timestamp
941
      update = True
942

    
943
    assert op.status == constants.OP_STATUS_WAITLOCK
944

    
945
    return update
946

    
947
  @staticmethod
948
  def _CheckDependencies(queue, job, opctx):
949
    """Checks if an opcode has dependencies and if so, processes them.
950

951
    @type queue: L{JobQueue}
952
    @param queue: Queue object
953
    @type job: L{_QueuedJob}
954
    @param job: Job object
955
    @type opctx: L{_OpExecContext}
956
    @param opctx: Opcode execution context
957
    @rtype: bool
958
    @return: Whether opcode will be re-scheduled by dependency tracker
959

960
    """
961
    op = opctx.op
962

    
963
    result = False
964

    
965
    while opctx.jobdeps:
966
      (dep_job_id, dep_status) = opctx.jobdeps[0]
967

    
968
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
969
                                                          dep_status)
970
      assert ht.TNonEmptyString(depmsg), "No dependency message"
971

    
972
      logging.info("%s: %s", opctx.log_prefix, depmsg)
973

    
974
      if depresult == _JobDependencyManager.CONTINUE:
975
        # Remove dependency and continue
976
        opctx.jobdeps.pop(0)
977

    
978
      elif depresult == _JobDependencyManager.WAIT:
979
        # Need to wait for notification, dependency tracker will re-add job
980
        # to workerpool
981
        result = True
982
        break
983

    
984
      elif depresult == _JobDependencyManager.CANCEL:
985
        # Job was cancelled, cancel this job as well
986
        job.Cancel()
987
        assert op.status == constants.OP_STATUS_CANCELING
988
        break
989

    
990
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
991
                         _JobDependencyManager.ERROR):
992
        # Job failed or there was an error, this job must fail
993
        op.status = constants.OP_STATUS_ERROR
994
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
995
        break
996

    
997
      else:
998
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
999
                                     depresult)
1000

    
1001
    return result
1002

    
1003
  def _ExecOpCodeUnlocked(self, opctx):
1004
    """Processes one opcode and returns the result.
1005

1006
    """
1007
    op = opctx.op
1008

    
1009
    assert op.status == constants.OP_STATUS_WAITLOCK
1010

    
1011
    timeout = opctx.GetNextLockTimeout()
1012

    
1013
    try:
1014
      # Make sure not to hold queue lock while calling ExecOpCode
1015
      result = self.opexec_fn(op.input,
1016
                              _OpExecCallbacks(self.queue, self.job, op),
1017
                              timeout=timeout, priority=op.priority)
1018
    except mcpu.LockAcquireTimeout:
1019
      assert timeout is not None, "Received timeout for blocking acquire"
1020
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1021

    
1022
      assert op.status in (constants.OP_STATUS_WAITLOCK,
1023
                           constants.OP_STATUS_CANCELING)
1024

    
1025
      # Was job cancelled while we were waiting for the lock?
1026
      if op.status == constants.OP_STATUS_CANCELING:
1027
        return (constants.OP_STATUS_CANCELING, None)
1028

    
1029
      # Stay in waitlock while trying to re-acquire lock
1030
      return (constants.OP_STATUS_WAITLOCK, None)
1031
    except CancelJob:
1032
      logging.exception("%s: Canceling job", opctx.log_prefix)
1033
      assert op.status == constants.OP_STATUS_CANCELING
1034
      return (constants.OP_STATUS_CANCELING, None)
1035
    except Exception, err: # pylint: disable-msg=W0703
1036
      logging.exception("%s: Caught exception in %s",
1037
                        opctx.log_prefix, opctx.summary)
1038
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1039
    else:
1040
      logging.debug("%s: %s successful",
1041
                    opctx.log_prefix, opctx.summary)
1042
      return (constants.OP_STATUS_SUCCESS, result)
1043

    
1044
  def __call__(self, _nextop_fn=None):
1045
    """Continues execution of a job.
1046

1047
    @param _nextop_fn: Callback function for tests
1048
    @rtype: bool
1049
    @return: True if job is finished, False if processor needs to be called
1050
             again
1051

1052
    """
1053
    queue = self.queue
1054
    job = self.job
1055

    
1056
    logging.debug("Processing job %s", job.id)
1057

    
1058
    queue.acquire(shared=1)
1059
    try:
1060
      opcount = len(job.ops)
1061

    
1062
      assert job.writable, "Expected writable job"
1063

    
1064
      # Don't do anything for finalized jobs
1065
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1066
        return True
1067

    
1068
      # Is a previous opcode still pending?
1069
      if job.cur_opctx:
1070
        opctx = job.cur_opctx
1071
        job.cur_opctx = None
1072
      else:
1073
        if __debug__ and _nextop_fn:
1074
          _nextop_fn()
1075
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1076

    
1077
      op = opctx.op
1078

    
1079
      # Consistency check
1080
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1081
                                     constants.OP_STATUS_CANCELING)
1082
                        for i in job.ops[opctx.index + 1:])
1083

    
1084
      assert op.status in (constants.OP_STATUS_QUEUED,
1085
                           constants.OP_STATUS_WAITLOCK,
1086
                           constants.OP_STATUS_CANCELING)
1087

    
1088
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1089
              op.priority >= constants.OP_PRIO_HIGHEST)
1090

    
1091
      waitjob = None
1092

    
1093
      if op.status != constants.OP_STATUS_CANCELING:
1094
        assert op.status in (constants.OP_STATUS_QUEUED,
1095
                             constants.OP_STATUS_WAITLOCK)
1096

    
1097
        # Prepare to start opcode
1098
        if self._MarkWaitlock(job, op):
1099
          # Write to disk
1100
          queue.UpdateJobUnlocked(job)
1101

    
1102
        assert op.status == constants.OP_STATUS_WAITLOCK
1103
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1104
        assert job.start_timestamp and op.start_timestamp
1105
        assert waitjob is None
1106

    
1107
        # Check if waiting for a job is necessary
1108
        waitjob = self._CheckDependencies(queue, job, opctx)
1109

    
1110
        assert op.status in (constants.OP_STATUS_WAITLOCK,
1111
                             constants.OP_STATUS_CANCELING,
1112
                             constants.OP_STATUS_ERROR)
1113

    
1114
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1115
                                         constants.OP_STATUS_ERROR)):
1116
          logging.info("%s: opcode %s waiting for locks",
1117
                       opctx.log_prefix, opctx.summary)
1118

    
1119
          assert not opctx.jobdeps, "Not all dependencies were removed"
1120

    
1121
          queue.release()
1122
          try:
1123
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1124
          finally:
1125
            queue.acquire(shared=1)
1126

    
1127
          op.status = op_status
1128
          op.result = op_result
1129

    
1130
          assert not waitjob
1131

    
1132
        if op.status == constants.OP_STATUS_WAITLOCK:
1133
          # Couldn't get locks in time
1134
          assert not op.end_timestamp
1135
        else:
1136
          # Finalize opcode
1137
          op.end_timestamp = TimeStampNow()
1138

    
1139
          if op.status == constants.OP_STATUS_CANCELING:
1140
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1141
                                  for i in job.ops[opctx.index:])
1142
          else:
1143
            assert op.status in constants.OPS_FINALIZED
1144

    
1145
      if op.status == constants.OP_STATUS_WAITLOCK or waitjob:
1146
        finalize = False
1147

    
1148
        if not waitjob and opctx.CheckPriorityIncrease():
1149
          # Priority was changed, need to update on-disk file
1150
          queue.UpdateJobUnlocked(job)
1151

    
1152
        # Keep around for another round
1153
        job.cur_opctx = opctx
1154

    
1155
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1156
                op.priority >= constants.OP_PRIO_HIGHEST)
1157

    
1158
        # In no case must the status be finalized here
1159
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1160

    
1161
      else:
1162
        # Ensure all opcodes so far have been successful
1163
        assert (opctx.index == 0 or
1164
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1165
                           for i in job.ops[:opctx.index]))
1166

    
1167
        # Reset context
1168
        job.cur_opctx = None
1169

    
1170
        if op.status == constants.OP_STATUS_SUCCESS:
1171
          finalize = False
1172

    
1173
        elif op.status == constants.OP_STATUS_ERROR:
1174
          # Ensure failed opcode has an exception as its result
1175
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1176

    
1177
          to_encode = errors.OpExecError("Preceding opcode failed")
1178
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1179
                                _EncodeOpError(to_encode))
1180
          finalize = True
1181

    
1182
          # Consistency check
1183
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1184
                            errors.GetEncodedError(i.result)
1185
                            for i in job.ops[opctx.index:])
1186

    
1187
        elif op.status == constants.OP_STATUS_CANCELING:
1188
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1189
                                "Job canceled by request")
1190
          finalize = True
1191

    
1192
        else:
1193
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1194

    
1195
        if opctx.index == (opcount - 1):
1196
          # Finalize on last opcode
1197
          finalize = True
1198

    
1199
        if finalize:
1200
          # All opcodes have been run, finalize job
1201
          job.Finalize()
1202

    
1203
        # Write to disk. If the job status is final, this is the final write
1204
        # allowed. Once the file has been written, it can be archived anytime.
1205
        queue.UpdateJobUnlocked(job)
1206

    
1207
        assert not waitjob
1208

    
1209
        if finalize:
1210
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1211
          # TODO: Check locking
1212
          queue.depmgr.NotifyWaiters(job.id)
1213
          return True
1214

    
1215
      assert not waitjob or queue.depmgr.JobWaiting(job)
1216

    
1217
      return bool(waitjob)
1218
    finally:
1219
      assert job.writable, "Job became read-only while being processed"
1220
      queue.release()
1221

    
1222

    
1223
class _JobQueueWorker(workerpool.BaseWorker):
1224
  """The actual job workers.
1225

1226
  """
1227
  def RunTask(self, job): # pylint: disable-msg=W0221
1228
    """Job executor.
1229

1230
    @type job: L{_QueuedJob}
1231
    @param job: the job to be processed
1232

1233
    """
1234
    # Ensure only one worker is active on a single job. If a job registers for
1235
    # a dependency job, and the other job notifies before the first worker is
1236
    # done, the job can end up in the tasklist more than once.
1237
    job.processor_lock.acquire()
1238
    try:
1239
      return self._RunTaskInner(job)
1240
    finally:
1241
      job.processor_lock.release()
1242

    
1243
  def _RunTaskInner(self, job):
1244
    """Executes a job.
1245

1246
    Must be called with per-job lock acquired.
1247

1248
    """
1249
    queue = job.queue
1250
    assert queue == self.pool.queue
1251

    
1252
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1253
    setname_fn(None)
1254

    
1255
    proc = mcpu.Processor(queue.context, job.id)
1256

    
1257
    # Create wrapper for setting thread name
1258
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1259
                                    proc.ExecOpCode)
1260

    
1261
    if not _JobProcessor(queue, wrap_execop_fn, job)():
1262
      # Schedule again
1263
      raise workerpool.DeferTask(priority=job.CalcPriority())
1264

    
1265
  @staticmethod
1266
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1267
    """Updates the worker thread name to include a short summary of the opcode.
1268

1269
    @param setname_fn: Callable setting worker thread name
1270
    @param execop_fn: Callable for executing opcode (usually
1271
                      L{mcpu.Processor.ExecOpCode})
1272

1273
    """
1274
    setname_fn(op)
1275
    try:
1276
      return execop_fn(op, *args, **kwargs)
1277
    finally:
1278
      setname_fn(None)
1279

    
1280
  @staticmethod
1281
  def _GetWorkerName(job, op):
1282
    """Sets the worker thread name.
1283

1284
    @type job: L{_QueuedJob}
1285
    @type op: L{opcodes.OpCode}
1286

1287
    """
1288
    parts = ["Job%s" % job.id]
1289

    
1290
    if op:
1291
      parts.append(op.TinySummary())
1292

    
1293
    return "/".join(parts)
1294

    
1295

    
1296
class _JobQueueWorkerPool(workerpool.WorkerPool):
1297
  """Simple class implementing a job-processing workerpool.
1298

1299
  """
1300
  def __init__(self, queue):
1301
    super(_JobQueueWorkerPool, self).__init__("Jq",
1302
                                              JOBQUEUE_THREADS,
1303
                                              _JobQueueWorker)
1304
    self.queue = queue
1305

    
1306

    
1307
class _JobDependencyManager:
1308
  """Keeps track of job dependencies.
1309

1310
  """
1311
  (WAIT,
1312
   ERROR,
1313
   CANCEL,
1314
   CONTINUE,
1315
   WRONGSTATUS) = range(1, 6)
1316

    
1317
  # TODO: Export waiter information to lock monitor
1318

    
1319
  def __init__(self, getstatus_fn, enqueue_fn):
1320
    """Initializes this class.
1321

1322
    """
1323
    self._getstatus_fn = getstatus_fn
1324
    self._enqueue_fn = enqueue_fn
1325

    
1326
    self._waiters = {}
1327
    self._lock = locking.SharedLock("JobDepMgr")
1328

    
1329
  @locking.ssynchronized(_LOCK, shared=1)
1330
  def JobWaiting(self, job):
1331
    """Checks if a job is waiting.
1332

1333
    """
1334
    return compat.any(job in jobs
1335
                      for jobs in self._waiters.values())
1336

    
1337
  @locking.ssynchronized(_LOCK)
1338
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1339
    """Checks if a dependency job has the requested status.
1340

1341
    If the other job is not yet in a finalized status, the calling job will be
1342
    notified (re-added to the workerpool) at a later point.
1343

1344
    @type job: L{_QueuedJob}
1345
    @param job: Job object
1346
    @type dep_job_id: string
1347
    @param dep_job_id: ID of dependency job
1348
    @type dep_status: list
1349
    @param dep_status: Required status
1350

1351
    """
1352
    assert ht.TString(job.id)
1353
    assert ht.TString(dep_job_id)
1354
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1355

    
1356
    if job.id == dep_job_id:
1357
      return (self.ERROR, "Job can't depend on itself")
1358

    
1359
    # Get status of dependency job
1360
    try:
1361
      status = self._getstatus_fn(dep_job_id)
1362
    except errors.JobLost, err:
1363
      return (self.ERROR, "Dependency error: %s" % err)
1364

    
1365
    assert status in constants.JOB_STATUS_ALL
1366

    
1367
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1368

    
1369
    if status not in constants.JOBS_FINALIZED:
1370
      # Register for notification and wait for job to finish
1371
      job_id_waiters.add(job)
1372
      return (self.WAIT,
1373
              "Need to wait for job %s, wanted status '%s'" %
1374
              (dep_job_id, dep_status))
1375

    
1376
    # Remove from waiters list
1377
    if job in job_id_waiters:
1378
      job_id_waiters.remove(job)
1379

    
1380
    if (status == constants.JOB_STATUS_CANCELED and
1381
        constants.JOB_STATUS_CANCELED not in dep_status):
1382
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1383

    
1384
    elif not dep_status or status in dep_status:
1385
      return (self.CONTINUE,
1386
              "Dependency job %s finished with status '%s'" %
1387
              (dep_job_id, status))
1388

    
1389
    else:
1390
      return (self.WRONGSTATUS,
1391
              "Dependency job %s finished with status '%s',"
1392
              " not one of '%s' as required" %
1393
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1394

    
1395
  @locking.ssynchronized(_LOCK)
1396
  def NotifyWaiters(self, job_id):
1397
    """Notifies all jobs waiting for a certain job ID.
1398

1399
    @type job_id: string
1400
    @param job_id: Job ID
1401

1402
    """
1403
    assert ht.TString(job_id)
1404

    
1405
    jobs = self._waiters.pop(job_id, None)
1406
    if jobs:
1407
      # Re-add jobs to workerpool
1408
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1409
                    len(jobs), job_id)
1410
      self._enqueue_fn(jobs)
1411

    
1412
    # Remove all jobs without actual waiters
1413
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1414
                   if not waiters]:
1415
      del self._waiters[job_id]
1416

    
1417

    
1418
def _RequireOpenQueue(fn):
1419
  """Decorator for "public" functions.
1420

1421
  This function should be used for all 'public' functions. That is,
1422
  functions usually called from other classes. Note that this should
1423
  be applied only to methods (not plain functions), since it expects
1424
  that the decorated function is called with a first argument that has
1425
  a '_queue_filelock' argument.
1426

1427
  @warning: Use this decorator only after locking.ssynchronized
1428

1429
  Example::
1430
    @locking.ssynchronized(_LOCK)
1431
    @_RequireOpenQueue
1432
    def Example(self):
1433
      pass
1434

1435
  """
1436
  def wrapper(self, *args, **kwargs):
1437
    # pylint: disable-msg=W0212
1438
    assert self._queue_filelock is not None, "Queue should be open"
1439
    return fn(self, *args, **kwargs)
1440
  return wrapper
1441

    
1442

    
1443
class JobQueue(object):
1444
  """Queue used to manage the jobs.
1445

1446
  @cvar _RE_JOB_FILE: regex matching the valid job file names
1447

1448
  """
1449
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1450

    
1451
  def __init__(self, context):
1452
    """Constructor for JobQueue.
1453

1454
    The constructor will initialize the job queue object and then
1455
    start loading the current jobs from disk, either for starting them
1456
    (if they were queue) or for aborting them (if they were already
1457
    running).
1458

1459
    @type context: GanetiContext
1460
    @param context: the context object for access to the configuration
1461
        data and other ganeti objects
1462

1463
    """
1464
    self.context = context
1465
    self._memcache = weakref.WeakValueDictionary()
1466
    self._my_hostname = netutils.Hostname.GetSysName()
1467

    
1468
    # The Big JobQueue lock. If a code block or method acquires it in shared
1469
    # mode safe it must guarantee concurrency with all the code acquiring it in
1470
    # shared mode, including itself. In order not to acquire it at all
1471
    # concurrency must be guaranteed with all code acquiring it in shared mode
1472
    # and all code acquiring it exclusively.
1473
    self._lock = locking.SharedLock("JobQueue")
1474

    
1475
    self.acquire = self._lock.acquire
1476
    self.release = self._lock.release
1477

    
1478
    # Initialize the queue, and acquire the filelock.
1479
    # This ensures no other process is working on the job queue.
1480
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1481

    
1482
    # Read serial file
1483
    self._last_serial = jstore.ReadSerial()
1484
    assert self._last_serial is not None, ("Serial file was modified between"
1485
                                           " check in jstore and here")
1486

    
1487
    # Get initial list of nodes
1488
    self._nodes = dict((n.name, n.primary_ip)
1489
                       for n in self.context.cfg.GetAllNodesInfo().values()
1490
                       if n.master_candidate)
1491

    
1492
    # Remove master node
1493
    self._nodes.pop(self._my_hostname, None)
1494

    
1495
    # TODO: Check consistency across nodes
1496

    
1497
    self._queue_size = 0
1498
    self._UpdateQueueSizeUnlocked()
1499
    self._drained = jstore.CheckDrainFlag()
1500

    
1501
    # Job dependencies
1502
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1503
                                        self._EnqueueJobs)
1504

    
1505
    # Setup worker pool
1506
    self._wpool = _JobQueueWorkerPool(self)
1507
    try:
1508
      self._InspectQueue()
1509
    except:
1510
      self._wpool.TerminateWorkers()
1511
      raise
1512

    
1513
  @locking.ssynchronized(_LOCK)
1514
  @_RequireOpenQueue
1515
  def _InspectQueue(self):
1516
    """Loads the whole job queue and resumes unfinished jobs.
1517

1518
    This function needs the lock here because WorkerPool.AddTask() may start a
1519
    job while we're still doing our work.
1520

1521
    """
1522
    logging.info("Inspecting job queue")
1523

    
1524
    restartjobs = []
1525

    
1526
    all_job_ids = self._GetJobIDsUnlocked()
1527
    jobs_count = len(all_job_ids)
1528
    lastinfo = time.time()
1529
    for idx, job_id in enumerate(all_job_ids):
1530
      # Give an update every 1000 jobs or 10 seconds
1531
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1532
          idx == (jobs_count - 1)):
1533
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1534
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1535
        lastinfo = time.time()
1536

    
1537
      job = self._LoadJobUnlocked(job_id)
1538

    
1539
      # a failure in loading the job can cause 'None' to be returned
1540
      if job is None:
1541
        continue
1542

    
1543
      status = job.CalcStatus()
1544

    
1545
      if status == constants.JOB_STATUS_QUEUED:
1546
        restartjobs.append(job)
1547

    
1548
      elif status in (constants.JOB_STATUS_RUNNING,
1549
                      constants.JOB_STATUS_WAITLOCK,
1550
                      constants.JOB_STATUS_CANCELING):
1551
        logging.warning("Unfinished job %s found: %s", job.id, job)
1552

    
1553
        if status == constants.JOB_STATUS_WAITLOCK:
1554
          # Restart job
1555
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1556
          restartjobs.append(job)
1557
        else:
1558
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1559
                                "Unclean master daemon shutdown")
1560
          job.Finalize()
1561

    
1562
        self.UpdateJobUnlocked(job)
1563

    
1564
    if restartjobs:
1565
      logging.info("Restarting %s jobs", len(restartjobs))
1566
      self._EnqueueJobs(restartjobs)
1567

    
1568
    logging.info("Job queue inspection finished")
1569

    
1570
  @locking.ssynchronized(_LOCK)
1571
  @_RequireOpenQueue
1572
  def AddNode(self, node):
1573
    """Register a new node with the queue.
1574

1575
    @type node: L{objects.Node}
1576
    @param node: the node object to be added
1577

1578
    """
1579
    node_name = node.name
1580
    assert node_name != self._my_hostname
1581

    
1582
    # Clean queue directory on added node
1583
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1584
    msg = result.fail_msg
1585
    if msg:
1586
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1587
                      node_name, msg)
1588

    
1589
    if not node.master_candidate:
1590
      # remove if existing, ignoring errors
1591
      self._nodes.pop(node_name, None)
1592
      # and skip the replication of the job ids
1593
      return
1594

    
1595
    # Upload the whole queue excluding archived jobs
1596
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1597

    
1598
    # Upload current serial file
1599
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1600

    
1601
    for file_name in files:
1602
      # Read file content
1603
      content = utils.ReadFile(file_name)
1604

    
1605
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
1606
                                                  [node.primary_ip],
1607
                                                  file_name, content)
1608
      msg = result[node_name].fail_msg
1609
      if msg:
1610
        logging.error("Failed to upload file %s to node %s: %s",
1611
                      file_name, node_name, msg)
1612

    
1613
    self._nodes[node_name] = node.primary_ip
1614

    
1615
  @locking.ssynchronized(_LOCK)
1616
  @_RequireOpenQueue
1617
  def RemoveNode(self, node_name):
1618
    """Callback called when removing nodes from the cluster.
1619

1620
    @type node_name: str
1621
    @param node_name: the name of the node to remove
1622

1623
    """
1624
    self._nodes.pop(node_name, None)
1625

    
1626
  @staticmethod
1627
  def _CheckRpcResult(result, nodes, failmsg):
1628
    """Verifies the status of an RPC call.
1629

1630
    Since we aim to keep consistency should this node (the current
1631
    master) fail, we will log errors if our rpc fail, and especially
1632
    log the case when more than half of the nodes fails.
1633

1634
    @param result: the data as returned from the rpc call
1635
    @type nodes: list
1636
    @param nodes: the list of nodes we made the call to
1637
    @type failmsg: str
1638
    @param failmsg: the identifier to be used for logging
1639

1640
    """
1641
    failed = []
1642
    success = []
1643

    
1644
    for node in nodes:
1645
      msg = result[node].fail_msg
1646
      if msg:
1647
        failed.append(node)
1648
        logging.error("RPC call %s (%s) failed on node %s: %s",
1649
                      result[node].call, failmsg, node, msg)
1650
      else:
1651
        success.append(node)
1652

    
1653
    # +1 for the master node
1654
    if (len(success) + 1) < len(failed):
1655
      # TODO: Handle failing nodes
1656
      logging.error("More than half of the nodes failed")
1657

    
1658
  def _GetNodeIp(self):
1659
    """Helper for returning the node name/ip list.
1660

1661
    @rtype: (list, list)
1662
    @return: a tuple of two lists, the first one with the node
1663
        names and the second one with the node addresses
1664

1665
    """
1666
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1667
    name_list = self._nodes.keys()
1668
    addr_list = [self._nodes[name] for name in name_list]
1669
    return name_list, addr_list
1670

    
1671
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1672
    """Writes a file locally and then replicates it to all nodes.
1673

1674
    This function will replace the contents of a file on the local
1675
    node and then replicate it to all the other nodes we have.
1676

1677
    @type file_name: str
1678
    @param file_name: the path of the file to be replicated
1679
    @type data: str
1680
    @param data: the new contents of the file
1681
    @type replicate: boolean
1682
    @param replicate: whether to spread the changes to the remote nodes
1683

1684
    """
1685
    getents = runtime.GetEnts()
1686
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1687
                    gid=getents.masterd_gid)
1688

    
1689
    if replicate:
1690
      names, addrs = self._GetNodeIp()
1691
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1692
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1693

    
1694
  def _RenameFilesUnlocked(self, rename):
1695
    """Renames a file locally and then replicate the change.
1696

1697
    This function will rename a file in the local queue directory
1698
    and then replicate this rename to all the other nodes we have.
1699

1700
    @type rename: list of (old, new)
1701
    @param rename: List containing tuples mapping old to new names
1702

1703
    """
1704
    # Rename them locally
1705
    for old, new in rename:
1706
      utils.RenameFile(old, new, mkdir=True)
1707

    
1708
    # ... and on all nodes
1709
    names, addrs = self._GetNodeIp()
1710
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1711
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1712

    
1713
  @staticmethod
1714
  def _FormatJobID(job_id):
1715
    """Convert a job ID to string format.
1716

1717
    Currently this just does C{str(job_id)} after performing some
1718
    checks, but if we want to change the job id format this will
1719
    abstract this change.
1720

1721
    @type job_id: int or long
1722
    @param job_id: the numeric job id
1723
    @rtype: str
1724
    @return: the formatted job id
1725

1726
    """
1727
    if not isinstance(job_id, (int, long)):
1728
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1729
    if job_id < 0:
1730
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1731

    
1732
    return str(job_id)
1733

    
1734
  @classmethod
1735
  def _GetArchiveDirectory(cls, job_id):
1736
    """Returns the archive directory for a job.
1737

1738
    @type job_id: str
1739
    @param job_id: Job identifier
1740
    @rtype: str
1741
    @return: Directory name
1742

1743
    """
1744
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1745

    
1746
  def _NewSerialsUnlocked(self, count):
1747
    """Generates a new job identifier.
1748

1749
    Job identifiers are unique during the lifetime of a cluster.
1750

1751
    @type count: integer
1752
    @param count: how many serials to return
1753
    @rtype: str
1754
    @return: a string representing the job identifier.
1755

1756
    """
1757
    assert count > 0
1758
    # New number
1759
    serial = self._last_serial + count
1760

    
1761
    # Write to file
1762
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1763
                             "%s\n" % serial, True)
1764

    
1765
    result = [self._FormatJobID(v)
1766
              for v in range(self._last_serial + 1, serial + 1)]
1767

    
1768
    # Keep it only if we were able to write the file
1769
    self._last_serial = serial
1770

    
1771
    assert len(result) == count
1772

    
1773
    return result
1774

    
1775
  @staticmethod
1776
  def _GetJobPath(job_id):
1777
    """Returns the job file for a given job id.
1778

1779
    @type job_id: str
1780
    @param job_id: the job identifier
1781
    @rtype: str
1782
    @return: the path to the job file
1783

1784
    """
1785
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1786

    
1787
  @classmethod
1788
  def _GetArchivedJobPath(cls, job_id):
1789
    """Returns the archived job file for a give job id.
1790

1791
    @type job_id: str
1792
    @param job_id: the job identifier
1793
    @rtype: str
1794
    @return: the path to the archived job file
1795

1796
    """
1797
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1798
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1799

    
1800
  def _GetJobIDsUnlocked(self, sort=True):
1801
    """Return all known job IDs.
1802

1803
    The method only looks at disk because it's a requirement that all
1804
    jobs are present on disk (so in the _memcache we don't have any
1805
    extra IDs).
1806

1807
    @type sort: boolean
1808
    @param sort: perform sorting on the returned job ids
1809
    @rtype: list
1810
    @return: the list of job IDs
1811

1812
    """
1813
    jlist = []
1814
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1815
      m = self._RE_JOB_FILE.match(filename)
1816
      if m:
1817
        jlist.append(m.group(1))
1818
    if sort:
1819
      jlist = utils.NiceSort(jlist)
1820
    return jlist
1821

    
1822
  def _LoadJobUnlocked(self, job_id):
1823
    """Loads a job from the disk or memory.
1824

1825
    Given a job id, this will return the cached job object if
1826
    existing, or try to load the job from the disk. If loading from
1827
    disk, it will also add the job to the cache.
1828

1829
    @param job_id: the job id
1830
    @rtype: L{_QueuedJob} or None
1831
    @return: either None or the job object
1832

1833
    """
1834
    job = self._memcache.get(job_id, None)
1835
    if job:
1836
      logging.debug("Found job %s in memcache", job_id)
1837
      assert job.writable, "Found read-only job in memcache"
1838
      return job
1839

    
1840
    try:
1841
      job = self._LoadJobFromDisk(job_id, False)
1842
      if job is None:
1843
        return job
1844
    except errors.JobFileCorrupted:
1845
      old_path = self._GetJobPath(job_id)
1846
      new_path = self._GetArchivedJobPath(job_id)
1847
      if old_path == new_path:
1848
        # job already archived (future case)
1849
        logging.exception("Can't parse job %s", job_id)
1850
      else:
1851
        # non-archived case
1852
        logging.exception("Can't parse job %s, will archive.", job_id)
1853
        self._RenameFilesUnlocked([(old_path, new_path)])
1854
      return None
1855

    
1856
    assert job.writable, "Job just loaded is not writable"
1857

    
1858
    self._memcache[job_id] = job
1859
    logging.debug("Added job %s to the cache", job_id)
1860
    return job
1861

    
1862
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1863
    """Load the given job file from disk.
1864

1865
    Given a job file, read, load and restore it in a _QueuedJob format.
1866

1867
    @type job_id: string
1868
    @param job_id: job identifier
1869
    @type try_archived: bool
1870
    @param try_archived: Whether to try loading an archived job
1871
    @rtype: L{_QueuedJob} or None
1872
    @return: either None or the job object
1873

1874
    """
1875
    path_functions = [(self._GetJobPath, True)]
1876

    
1877
    if try_archived:
1878
      path_functions.append((self._GetArchivedJobPath, False))
1879

    
1880
    raw_data = None
1881
    writable_default = None
1882

    
1883
    for (fn, writable_default) in path_functions:
1884
      filepath = fn(job_id)
1885
      logging.debug("Loading job from %s", filepath)
1886
      try:
1887
        raw_data = utils.ReadFile(filepath)
1888
      except EnvironmentError, err:
1889
        if err.errno != errno.ENOENT:
1890
          raise
1891
      else:
1892
        break
1893

    
1894
    if not raw_data:
1895
      return None
1896

    
1897
    if writable is None:
1898
      writable = writable_default
1899

    
1900
    try:
1901
      data = serializer.LoadJson(raw_data)
1902
      job = _QueuedJob.Restore(self, data, writable)
1903
    except Exception, err: # pylint: disable-msg=W0703
1904
      raise errors.JobFileCorrupted(err)
1905

    
1906
    return job
1907

    
1908
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1909
    """Load the given job file from disk.
1910

1911
    Given a job file, read, load and restore it in a _QueuedJob format.
1912
    In case of error reading the job, it gets returned as None, and the
1913
    exception is logged.
1914

1915
    @type job_id: string
1916
    @param job_id: job identifier
1917
    @type try_archived: bool
1918
    @param try_archived: Whether to try loading an archived job
1919
    @rtype: L{_QueuedJob} or None
1920
    @return: either None or the job object
1921

1922
    """
1923
    try:
1924
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
1925
    except (errors.JobFileCorrupted, EnvironmentError):
1926
      logging.exception("Can't load/parse job %s", job_id)
1927
      return None
1928

    
1929
  def _UpdateQueueSizeUnlocked(self):
1930
    """Update the queue size.
1931

1932
    """
1933
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1934

    
1935
  @locking.ssynchronized(_LOCK)
1936
  @_RequireOpenQueue
1937
  def SetDrainFlag(self, drain_flag):
1938
    """Sets the drain flag for the queue.
1939

1940
    @type drain_flag: boolean
1941
    @param drain_flag: Whether to set or unset the drain flag
1942

1943
    """
1944
    jstore.SetDrainFlag(drain_flag)
1945

    
1946
    self._drained = drain_flag
1947

    
1948
    return True
1949

    
1950
  @_RequireOpenQueue
1951
  def _SubmitJobUnlocked(self, job_id, ops):
1952
    """Create and store a new job.
1953

1954
    This enters the job into our job queue and also puts it on the new
1955
    queue, in order for it to be picked up by the queue processors.
1956

1957
    @type job_id: job ID
1958
    @param job_id: the job ID for the new job
1959
    @type ops: list
1960
    @param ops: The list of OpCodes that will become the new job.
1961
    @rtype: L{_QueuedJob}
1962
    @return: the job object to be queued
1963
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1964
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1965
    @raise errors.GenericError: If an opcode is not valid
1966

1967
    """
1968
    # Ok when sharing the big job queue lock, as the drain file is created when
1969
    # the lock is exclusive.
1970
    if self._drained:
1971
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1972

    
1973
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1974
      raise errors.JobQueueFull()
1975

    
1976
    job = _QueuedJob(self, job_id, ops, True)
1977

    
1978
    # Check priority
1979
    for idx, op in enumerate(job.ops):
1980
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1981
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1982
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1983
                                  " are %s" % (idx, op.priority, allowed))
1984

    
1985
    # Write to disk
1986
    self.UpdateJobUnlocked(job)
1987

    
1988
    self._queue_size += 1
1989

    
1990
    logging.debug("Adding new job %s to the cache", job_id)
1991
    self._memcache[job_id] = job
1992

    
1993
    return job
1994

    
1995
  @locking.ssynchronized(_LOCK)
1996
  @_RequireOpenQueue
1997
  def SubmitJob(self, ops):
1998
    """Create and store a new job.
1999

2000
    @see: L{_SubmitJobUnlocked}
2001

2002
    """
2003
    job_id = self._NewSerialsUnlocked(1)[0]
2004
    self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
2005
    return job_id
2006

    
2007
  @locking.ssynchronized(_LOCK)
2008
  @_RequireOpenQueue
2009
  def SubmitManyJobs(self, jobs):
2010
    """Create and store multiple jobs.
2011

2012
    @see: L{_SubmitJobUnlocked}
2013

2014
    """
2015
    results = []
2016
    added_jobs = []
2017
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2018
    for job_id, ops in zip(all_job_ids, jobs):
2019
      try:
2020
        added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
2021
        status = True
2022
        data = job_id
2023
      except errors.GenericError, err:
2024
        data = ("%s; opcodes %s" %
2025
                (err, utils.CommaJoin(op.Summary() for op in ops)))
2026
        status = False
2027
      results.append((status, data))
2028

    
2029
    self._EnqueueJobs(added_jobs)
2030

    
2031
    return results
2032

    
2033
  def _EnqueueJobs(self, jobs):
2034
    """Helper function to add jobs to worker pool's queue.
2035

2036
    @type jobs: list
2037
    @param jobs: List of all jobs
2038

2039
    """
2040
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2041
                             priority=[job.CalcPriority() for job in jobs])
2042

    
2043
  def _GetJobStatusForDependencies(self, job_id):
2044
    """Gets the status of a job for dependencies.
2045

2046
    @type job_id: string
2047
    @param job_id: Job ID
2048
    @raise errors.JobLost: If job can't be found
2049

2050
    """
2051
    if not isinstance(job_id, basestring):
2052
      job_id = self._FormatJobID(job_id)
2053

    
2054
    # Not using in-memory cache as doing so would require an exclusive lock
2055

    
2056
    # Try to load from disk
2057
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2058

    
2059
    assert not job.writable, "Got writable job"
2060

    
2061
    if job:
2062
      return job.CalcStatus()
2063

    
2064
    raise errors.JobLost("Job %s not found" % job_id)
2065

    
2066
  @_RequireOpenQueue
2067
  def UpdateJobUnlocked(self, job, replicate=True):
2068
    """Update a job's on disk storage.
2069

2070
    After a job has been modified, this function needs to be called in
2071
    order to write the changes to disk and replicate them to the other
2072
    nodes.
2073

2074
    @type job: L{_QueuedJob}
2075
    @param job: the changed job
2076
    @type replicate: boolean
2077
    @param replicate: whether to replicate the change to remote nodes
2078

2079
    """
2080
    if __debug__:
2081
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2082
      assert (finalized ^ (job.end_timestamp is None))
2083
      assert job.writable, "Can't update read-only job"
2084

    
2085
    filename = self._GetJobPath(job.id)
2086
    data = serializer.DumpJson(job.Serialize(), indent=False)
2087
    logging.debug("Writing job %s to %s", job.id, filename)
2088
    self._UpdateJobQueueFile(filename, data, replicate)
2089

    
2090
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2091
                        timeout):
2092
    """Waits for changes in a job.
2093

2094
    @type job_id: string
2095
    @param job_id: Job identifier
2096
    @type fields: list of strings
2097
    @param fields: Which fields to check for changes
2098
    @type prev_job_info: list or None
2099
    @param prev_job_info: Last job information returned
2100
    @type prev_log_serial: int
2101
    @param prev_log_serial: Last job message serial number
2102
    @type timeout: float
2103
    @param timeout: maximum time to wait in seconds
2104
    @rtype: tuple (job info, log entries)
2105
    @return: a tuple of the job information as required via
2106
        the fields parameter, and the log entries as a list
2107

2108
        if the job has not changed and the timeout has expired,
2109
        we instead return a special value,
2110
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2111
        as such by the clients
2112

2113
    """
2114
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
2115
                             writable=False)
2116

    
2117
    helper = _WaitForJobChangesHelper()
2118

    
2119
    return helper(self._GetJobPath(job_id), load_fn,
2120
                  fields, prev_job_info, prev_log_serial, timeout)
2121

    
2122
  @locking.ssynchronized(_LOCK)
2123
  @_RequireOpenQueue
2124
  def CancelJob(self, job_id):
2125
    """Cancels a job.
2126

2127
    This will only succeed if the job has not started yet.
2128

2129
    @type job_id: string
2130
    @param job_id: job ID of job to be cancelled.
2131

2132
    """
2133
    logging.info("Cancelling job %s", job_id)
2134

    
2135
    job = self._LoadJobUnlocked(job_id)
2136
    if not job:
2137
      logging.debug("Job %s not found", job_id)
2138
      return (False, "Job %s not found" % job_id)
2139

    
2140
    assert job.writable, "Can't cancel read-only job"
2141

    
2142
    (success, msg) = job.Cancel()
2143

    
2144
    if success:
2145
      # If the job was finalized (e.g. cancelled), this is the final write
2146
      # allowed. The job can be archived anytime.
2147
      self.UpdateJobUnlocked(job)
2148

    
2149
    return (success, msg)
2150

    
2151
  @_RequireOpenQueue
2152
  def _ArchiveJobsUnlocked(self, jobs):
2153
    """Archives jobs.
2154

2155
    @type jobs: list of L{_QueuedJob}
2156
    @param jobs: Job objects
2157
    @rtype: int
2158
    @return: Number of archived jobs
2159

2160
    """
2161
    archive_jobs = []
2162
    rename_files = []
2163
    for job in jobs:
2164
      assert job.writable, "Can't archive read-only job"
2165

    
2166
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2167
        logging.debug("Job %s is not yet done", job.id)
2168
        continue
2169

    
2170
      archive_jobs.append(job)
2171

    
2172
      old = self._GetJobPath(job.id)
2173
      new = self._GetArchivedJobPath(job.id)
2174
      rename_files.append((old, new))
2175

    
2176
    # TODO: What if 1..n files fail to rename?
2177
    self._RenameFilesUnlocked(rename_files)
2178

    
2179
    logging.debug("Successfully archived job(s) %s",
2180
                  utils.CommaJoin(job.id for job in archive_jobs))
2181

    
2182
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2183
    # the files, we update the cached queue size from the filesystem. When we
2184
    # get around to fix the TODO: above, we can use the number of actually
2185
    # archived jobs to fix this.
2186
    self._UpdateQueueSizeUnlocked()
2187
    return len(archive_jobs)
2188

    
2189
  @locking.ssynchronized(_LOCK)
2190
  @_RequireOpenQueue
2191
  def ArchiveJob(self, job_id):
2192
    """Archives a job.
2193

2194
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2195

2196
    @type job_id: string
2197
    @param job_id: Job ID of job to be archived.
2198
    @rtype: bool
2199
    @return: Whether job was archived
2200

2201
    """
2202
    logging.info("Archiving job %s", job_id)
2203

    
2204
    job = self._LoadJobUnlocked(job_id)
2205
    if not job:
2206
      logging.debug("Job %s not found", job_id)
2207
      return False
2208

    
2209
    return self._ArchiveJobsUnlocked([job]) == 1
2210

    
2211
  @locking.ssynchronized(_LOCK)
2212
  @_RequireOpenQueue
2213
  def AutoArchiveJobs(self, age, timeout):
2214
    """Archives all jobs based on age.
2215

2216
    The method will archive all jobs which are older than the age
2217
    parameter. For jobs that don't have an end timestamp, the start
2218
    timestamp will be considered. The special '-1' age will cause
2219
    archival of all jobs (that are not running or queued).
2220

2221
    @type age: int
2222
    @param age: the minimum age in seconds
2223

2224
    """
2225
    logging.info("Archiving jobs with age more than %s seconds", age)
2226

    
2227
    now = time.time()
2228
    end_time = now + timeout
2229
    archived_count = 0
2230
    last_touched = 0
2231

    
2232
    all_job_ids = self._GetJobIDsUnlocked()
2233
    pending = []
2234
    for idx, job_id in enumerate(all_job_ids):
2235
      last_touched = idx + 1
2236

    
2237
      # Not optimal because jobs could be pending
2238
      # TODO: Measure average duration for job archival and take number of
2239
      # pending jobs into account.
2240
      if time.time() > end_time:
2241
        break
2242

    
2243
      # Returns None if the job failed to load
2244
      job = self._LoadJobUnlocked(job_id)
2245
      if job:
2246
        if job.end_timestamp is None:
2247
          if job.start_timestamp is None:
2248
            job_age = job.received_timestamp
2249
          else:
2250
            job_age = job.start_timestamp
2251
        else:
2252
          job_age = job.end_timestamp
2253

    
2254
        if age == -1 or now - job_age[0] > age:
2255
          pending.append(job)
2256

    
2257
          # Archive 10 jobs at a time
2258
          if len(pending) >= 10:
2259
            archived_count += self._ArchiveJobsUnlocked(pending)
2260
            pending = []
2261

    
2262
    if pending:
2263
      archived_count += self._ArchiveJobsUnlocked(pending)
2264

    
2265
    return (archived_count, len(all_job_ids) - last_touched)
2266

    
2267
  def QueryJobs(self, job_ids, fields):
2268
    """Returns a list of jobs in queue.
2269

2270
    @type job_ids: list
2271
    @param job_ids: sequence of job identifiers or None for all
2272
    @type fields: list
2273
    @param fields: names of fields to return
2274
    @rtype: list
2275
    @return: list one element per job, each element being list with
2276
        the requested fields
2277

2278
    """
2279
    jobs = []
2280
    list_all = False
2281
    if not job_ids:
2282
      # Since files are added to/removed from the queue atomically, there's no
2283
      # risk of getting the job ids in an inconsistent state.
2284
      job_ids = self._GetJobIDsUnlocked()
2285
      list_all = True
2286

    
2287
    for job_id in job_ids:
2288
      job = self.SafeLoadJobFromDisk(job_id, True)
2289
      if job is not None:
2290
        jobs.append(job.GetInfo(fields))
2291
      elif not list_all:
2292
        jobs.append(None)
2293

    
2294
    return jobs
2295

    
2296
  @locking.ssynchronized(_LOCK)
2297
  @_RequireOpenQueue
2298
  def Shutdown(self):
2299
    """Stops the job queue.
2300

2301
    This shutdowns all the worker threads an closes the queue.
2302

2303
    """
2304
    self._wpool.TerminateWorkers()
2305

    
2306
    self._queue_filelock.Close()
2307
    self._queue_filelock = None