Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 580b1fdd

History | View | Annotate | Download (79.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import time
35
import weakref
36
import threading
37
import itertools
38
import operator
39

    
40
try:
41
  # pylint: disable=E0611
42
  from pyinotify import pyinotify
43
except ImportError:
44
  import pyinotify
45

    
46
from ganeti import asyncnotifier
47
from ganeti import constants
48
from ganeti import serializer
49
from ganeti import workerpool
50
from ganeti import locking
51
from ganeti import opcodes
52
from ganeti import opcodes_base
53
from ganeti import errors
54
from ganeti import mcpu
55
from ganeti import utils
56
from ganeti import jstore
57
from ganeti import rpc
58
from ganeti import runtime
59
from ganeti import netutils
60
from ganeti import compat
61
from ganeti import ht
62
from ganeti import query
63
from ganeti import qlang
64
from ganeti import pathutils
65
from ganeti import vcluster
66

    
67

    
68
JOBQUEUE_THREADS = 25
69

    
70
# member lock names to be passed to @ssynchronized decorator
71
_LOCK = "_lock"
72
_QUEUE = "_queue"
73

    
74
#: Retrieves "id" attribute
75
_GetIdAttr = operator.attrgetter("id")
76

    
77

    
78
class CancelJob(Exception):
79
  """Special exception to cancel a job.
80

81
  """
82

    
83

    
84
class QueueShutdown(Exception):
85
  """Special exception to abort a job when the job queue is shutting down.
86

87
  """
88

    
89

    
90
def TimeStampNow():
91
  """Returns the current timestamp.
92

93
  @rtype: tuple
94
  @return: the current time in the (seconds, microseconds) format
95

96
  """
97
  return utils.SplitTime(time.time())
98

    
99

    
100
def _CallJqUpdate(runner, names, file_name, content):
101
  """Updates job queue file after virtualizing filename.
102

103
  """
104
  virt_file_name = vcluster.MakeVirtualPath(file_name)
105
  return runner.call_jobqueue_update(names, virt_file_name, content)
106

    
107

    
108
class _SimpleJobQuery:
109
  """Wrapper for job queries.
110

111
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
112

113
  """
114
  def __init__(self, fields):
115
    """Initializes this class.
116

117
    """
118
    self._query = query.Query(query.JOB_FIELDS, fields)
119

    
120
  def __call__(self, job):
121
    """Executes a job query using cached field list.
122

123
    """
124
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
125

    
126

    
127
class _QueuedOpCode(object):
128
  """Encapsulates an opcode object.
129

130
  @ivar log: holds the execution log and consists of tuples
131
  of the form C{(log_serial, timestamp, level, message)}
132
  @ivar input: the OpCode we encapsulate
133
  @ivar status: the current status
134
  @ivar result: the result of the LU execution
135
  @ivar start_timestamp: timestamp for the start of the execution
136
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
137
  @ivar stop_timestamp: timestamp for the end of the execution
138

139
  """
140
  __slots__ = ["input", "status", "result", "log", "priority",
141
               "start_timestamp", "exec_timestamp", "end_timestamp",
142
               "__weakref__"]
143

    
144
  def __init__(self, op):
145
    """Initializes instances of this class.
146

147
    @type op: L{opcodes.OpCode}
148
    @param op: the opcode we encapsulate
149

150
    """
151
    self.input = op
152
    self.status = constants.OP_STATUS_QUEUED
153
    self.result = None
154
    self.log = []
155
    self.start_timestamp = None
156
    self.exec_timestamp = None
157
    self.end_timestamp = None
158

    
159
    # Get initial priority (it might change during the lifetime of this opcode)
160
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
161

    
162
  @classmethod
163
  def Restore(cls, state):
164
    """Restore the _QueuedOpCode from the serialized form.
165

166
    @type state: dict
167
    @param state: the serialized state
168
    @rtype: _QueuedOpCode
169
    @return: a new _QueuedOpCode instance
170

171
    """
172
    obj = _QueuedOpCode.__new__(cls)
173
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
174
    obj.status = state["status"]
175
    obj.result = state["result"]
176
    obj.log = state["log"]
177
    obj.start_timestamp = state.get("start_timestamp", None)
178
    obj.exec_timestamp = state.get("exec_timestamp", None)
179
    obj.end_timestamp = state.get("end_timestamp", None)
180
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
181
    return obj
182

    
183
  def Serialize(self):
184
    """Serializes this _QueuedOpCode.
185

186
    @rtype: dict
187
    @return: the dictionary holding the serialized state
188

189
    """
190
    return {
191
      "input": self.input.__getstate__(),
192
      "status": self.status,
193
      "result": self.result,
194
      "log": self.log,
195
      "start_timestamp": self.start_timestamp,
196
      "exec_timestamp": self.exec_timestamp,
197
      "end_timestamp": self.end_timestamp,
198
      "priority": self.priority,
199
      }
200

    
201

    
202
class _QueuedJob(object):
203
  """In-memory job representation.
204

205
  This is what we use to track the user-submitted jobs. Locking must
206
  be taken care of by users of this class.
207

208
  @type queue: L{JobQueue}
209
  @ivar queue: the parent queue
210
  @ivar id: the job ID
211
  @type ops: list
212
  @ivar ops: the list of _QueuedOpCode that constitute the job
213
  @type log_serial: int
214
  @ivar log_serial: holds the index for the next log entry
215
  @ivar received_timestamp: the timestamp for when the job was received
216
  @ivar start_timestmap: the timestamp for start of execution
217
  @ivar end_timestamp: the timestamp for end of execution
218
  @ivar writable: Whether the job is allowed to be modified
219

220
  """
221
  # pylint: disable=W0212
222
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
223
               "received_timestamp", "start_timestamp", "end_timestamp",
224
               "__weakref__", "processor_lock", "writable", "archived"]
225

    
226
  def _AddReasons(self):
227
    """Extend the reason trail
228

229
    Add the reason for all the opcodes of this job to be executed.
230

231
    """
232
    count = 0
233
    for queued_op in self.ops:
234
      op = queued_op.input
235
      reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__)
236
      reason_text = "job=%d;index=%d" % (self.id, count)
237
      reason = getattr(op, "reason", [])
238
      reason.append((reason_src, reason_text, utils.EpochNano()))
239
      op.reason = reason
240
      count = count + 1
241

    
242
  def __init__(self, queue, job_id, ops, writable):
243
    """Constructor for the _QueuedJob.
244

245
    @type queue: L{JobQueue}
246
    @param queue: our parent queue
247
    @type job_id: job_id
248
    @param job_id: our job id
249
    @type ops: list
250
    @param ops: the list of opcodes we hold, which will be encapsulated
251
        in _QueuedOpCodes
252
    @type writable: bool
253
    @param writable: Whether job can be modified
254

255
    """
256
    if not ops:
257
      raise errors.GenericError("A job needs at least one opcode")
258

    
259
    self.queue = queue
260
    self.id = int(job_id)
261
    self.ops = [_QueuedOpCode(op) for op in ops]
262
    self._AddReasons()
263
    self.log_serial = 0
264
    self.received_timestamp = TimeStampNow()
265
    self.start_timestamp = None
266
    self.end_timestamp = None
267
    self.archived = False
268

    
269
    self._InitInMemory(self, writable)
270

    
271
    assert not self.archived, "New jobs can not be marked as archived"
272

    
273
  @staticmethod
274
  def _InitInMemory(obj, writable):
275
    """Initializes in-memory variables.
276

277
    """
278
    obj.writable = writable
279
    obj.ops_iter = None
280
    obj.cur_opctx = None
281

    
282
    # Read-only jobs are not processed and therefore don't need a lock
283
    if writable:
284
      obj.processor_lock = threading.Lock()
285
    else:
286
      obj.processor_lock = None
287

    
288
  def __repr__(self):
289
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
290
              "id=%s" % self.id,
291
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
292

    
293
    return "<%s at %#x>" % (" ".join(status), id(self))
294

    
295
  @classmethod
296
  def Restore(cls, queue, state, writable, archived):
297
    """Restore a _QueuedJob from serialized state:
298

299
    @type queue: L{JobQueue}
300
    @param queue: to which queue the restored job belongs
301
    @type state: dict
302
    @param state: the serialized state
303
    @type writable: bool
304
    @param writable: Whether job can be modified
305
    @type archived: bool
306
    @param archived: Whether job was already archived
307
    @rtype: _JobQueue
308
    @return: the restored _JobQueue instance
309

310
    """
311
    obj = _QueuedJob.__new__(cls)
312
    obj.queue = queue
313
    obj.id = int(state["id"])
314
    obj.received_timestamp = state.get("received_timestamp", None)
315
    obj.start_timestamp = state.get("start_timestamp", None)
316
    obj.end_timestamp = state.get("end_timestamp", None)
317
    obj.archived = archived
318

    
319
    obj.ops = []
320
    obj.log_serial = 0
321
    for op_state in state["ops"]:
322
      op = _QueuedOpCode.Restore(op_state)
323
      for log_entry in op.log:
324
        obj.log_serial = max(obj.log_serial, log_entry[0])
325
      obj.ops.append(op)
326

    
327
    cls._InitInMemory(obj, writable)
328

    
329
    return obj
330

    
331
  def Serialize(self):
332
    """Serialize the _JobQueue instance.
333

334
    @rtype: dict
335
    @return: the serialized state
336

337
    """
338
    return {
339
      "id": self.id,
340
      "ops": [op.Serialize() for op in self.ops],
341
      "start_timestamp": self.start_timestamp,
342
      "end_timestamp": self.end_timestamp,
343
      "received_timestamp": self.received_timestamp,
344
      }
345

    
346
  def CalcStatus(self):
347
    """Compute the status of this job.
348

349
    This function iterates over all the _QueuedOpCodes in the job and
350
    based on their status, computes the job status.
351

352
    The algorithm is:
353
      - if we find a cancelled, or finished with error, the job
354
        status will be the same
355
      - otherwise, the last opcode with the status one of:
356
          - waitlock
357
          - canceling
358
          - running
359

360
        will determine the job status
361

362
      - otherwise, it means either all opcodes are queued, or success,
363
        and the job status will be the same
364

365
    @return: the job status
366

367
    """
368
    status = constants.JOB_STATUS_QUEUED
369

    
370
    all_success = True
371
    for op in self.ops:
372
      if op.status == constants.OP_STATUS_SUCCESS:
373
        continue
374

    
375
      all_success = False
376

    
377
      if op.status == constants.OP_STATUS_QUEUED:
378
        pass
379
      elif op.status == constants.OP_STATUS_WAITING:
380
        status = constants.JOB_STATUS_WAITING
381
      elif op.status == constants.OP_STATUS_RUNNING:
382
        status = constants.JOB_STATUS_RUNNING
383
      elif op.status == constants.OP_STATUS_CANCELING:
384
        status = constants.JOB_STATUS_CANCELING
385
        break
386
      elif op.status == constants.OP_STATUS_ERROR:
387
        status = constants.JOB_STATUS_ERROR
388
        # The whole job fails if one opcode failed
389
        break
390
      elif op.status == constants.OP_STATUS_CANCELED:
391
        status = constants.OP_STATUS_CANCELED
392
        break
393

    
394
    if all_success:
395
      status = constants.JOB_STATUS_SUCCESS
396

    
397
    return status
398

    
399
  def CalcPriority(self):
400
    """Gets the current priority for this job.
401

402
    Only unfinished opcodes are considered. When all are done, the default
403
    priority is used.
404

405
    @rtype: int
406

407
    """
408
    priorities = [op.priority for op in self.ops
409
                  if op.status not in constants.OPS_FINALIZED]
410

    
411
    if not priorities:
412
      # All opcodes are done, assume default priority
413
      return constants.OP_PRIO_DEFAULT
414

    
415
    return min(priorities)
416

    
417
  def GetLogEntries(self, newer_than):
418
    """Selectively returns the log entries.
419

420
    @type newer_than: None or int
421
    @param newer_than: if this is None, return all log entries,
422
        otherwise return only the log entries with serial higher
423
        than this value
424
    @rtype: list
425
    @return: the list of the log entries selected
426

427
    """
428
    if newer_than is None:
429
      serial = -1
430
    else:
431
      serial = newer_than
432

    
433
    entries = []
434
    for op in self.ops:
435
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
436

    
437
    return entries
438

    
439
  def GetInfo(self, fields):
440
    """Returns information about a job.
441

442
    @type fields: list
443
    @param fields: names of fields to return
444
    @rtype: list
445
    @return: list with one element for each field
446
    @raise errors.OpExecError: when an invalid field
447
        has been passed
448

449
    """
450
    return _SimpleJobQuery(fields)(self)
451

    
452
  def MarkUnfinishedOps(self, status, result):
453
    """Mark unfinished opcodes with a given status and result.
454

455
    This is an utility function for marking all running or waiting to
456
    be run opcodes with a given status. Opcodes which are already
457
    finalised are not changed.
458

459
    @param status: a given opcode status
460
    @param result: the opcode result
461

462
    """
463
    not_marked = True
464
    for op in self.ops:
465
      if op.status in constants.OPS_FINALIZED:
466
        assert not_marked, "Finalized opcodes found after non-finalized ones"
467
        continue
468
      op.status = status
469
      op.result = result
470
      not_marked = False
471

    
472
  def Finalize(self):
473
    """Marks the job as finalized.
474

475
    """
476
    self.end_timestamp = TimeStampNow()
477

    
478
  def Cancel(self):
479
    """Marks job as canceled/-ing if possible.
480

481
    @rtype: tuple; (bool, string)
482
    @return: Boolean describing whether job was successfully canceled or marked
483
      as canceling and a text message
484

485
    """
486
    status = self.CalcStatus()
487

    
488
    if status == constants.JOB_STATUS_QUEUED:
489
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
490
                             "Job canceled by request")
491
      self.Finalize()
492
      return (True, "Job %s canceled" % self.id)
493

    
494
    elif status == constants.JOB_STATUS_WAITING:
495
      # The worker will notice the new status and cancel the job
496
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
497
      return (True, "Job %s will be canceled" % self.id)
498

    
499
    else:
500
      logging.debug("Job %s is no longer waiting in the queue", self.id)
501
      return (False, "Job %s is no longer waiting in the queue" % self.id)
502

    
503
  def ChangePriority(self, priority):
504
    """Changes the job priority.
505

506
    @type priority: int
507
    @param priority: New priority
508
    @rtype: tuple; (bool, string)
509
    @return: Boolean describing whether job's priority was successfully changed
510
      and a text message
511

512
    """
513
    status = self.CalcStatus()
514

    
515
    if status in constants.JOBS_FINALIZED:
516
      return (False, "Job %s is finished" % self.id)
517
    elif status == constants.JOB_STATUS_CANCELING:
518
      return (False, "Job %s is cancelling" % self.id)
519
    else:
520
      assert status in (constants.JOB_STATUS_QUEUED,
521
                        constants.JOB_STATUS_WAITING,
522
                        constants.JOB_STATUS_RUNNING)
523

    
524
      changed = False
525
      for op in self.ops:
526
        if (op.status == constants.OP_STATUS_RUNNING or
527
            op.status in constants.OPS_FINALIZED):
528
          assert not changed, \
529
            ("Found opcode for which priority should not be changed after"
530
             " priority has been changed for previous opcodes")
531
          continue
532

    
533
        assert op.status in (constants.OP_STATUS_QUEUED,
534
                             constants.OP_STATUS_WAITING)
535

    
536
        changed = True
537

    
538
        # Set new priority (doesn't modify opcode input)
539
        op.priority = priority
540

    
541
      if changed:
542
        return (True, ("Priorities of pending opcodes for job %s have been"
543
                       " changed to %s" % (self.id, priority)))
544
      else:
545
        return (False, "Job %s had no pending opcodes" % self.id)
546

    
547

    
548
class _OpExecCallbacks(mcpu.OpExecCbBase):
549
  def __init__(self, queue, job, op):
550
    """Initializes this class.
551

552
    @type queue: L{JobQueue}
553
    @param queue: Job queue
554
    @type job: L{_QueuedJob}
555
    @param job: Job object
556
    @type op: L{_QueuedOpCode}
557
    @param op: OpCode
558

559
    """
560
    assert queue, "Queue is missing"
561
    assert job, "Job is missing"
562
    assert op, "Opcode is missing"
563

    
564
    self._queue = queue
565
    self._job = job
566
    self._op = op
567

    
568
  def _CheckCancel(self):
569
    """Raises an exception to cancel the job if asked to.
570

571
    """
572
    # Cancel here if we were asked to
573
    if self._op.status == constants.OP_STATUS_CANCELING:
574
      logging.debug("Canceling opcode")
575
      raise CancelJob()
576

    
577
    # See if queue is shutting down
578
    if not self._queue.AcceptingJobsUnlocked():
579
      logging.debug("Queue is shutting down")
580
      raise QueueShutdown()
581

    
582
  @locking.ssynchronized(_QUEUE, shared=1)
583
  def NotifyStart(self):
584
    """Mark the opcode as running, not lock-waiting.
585

586
    This is called from the mcpu code as a notifier function, when the LU is
587
    finally about to start the Exec() method. Of course, to have end-user
588
    visible results, the opcode must be initially (before calling into
589
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
590

591
    """
592
    assert self._op in self._job.ops
593
    assert self._op.status in (constants.OP_STATUS_WAITING,
594
                               constants.OP_STATUS_CANCELING)
595

    
596
    # Cancel here if we were asked to
597
    self._CheckCancel()
598

    
599
    logging.debug("Opcode is now running")
600

    
601
    self._op.status = constants.OP_STATUS_RUNNING
602
    self._op.exec_timestamp = TimeStampNow()
603

    
604
    # And finally replicate the job status
605
    self._queue.UpdateJobUnlocked(self._job)
606

    
607
  @locking.ssynchronized(_QUEUE, shared=1)
608
  def _AppendFeedback(self, timestamp, log_type, log_msg):
609
    """Internal feedback append function, with locks
610

611
    """
612
    self._job.log_serial += 1
613
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
614
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
615

    
616
  def Feedback(self, *args):
617
    """Append a log entry.
618

619
    """
620
    assert len(args) < 3
621

    
622
    if len(args) == 1:
623
      log_type = constants.ELOG_MESSAGE
624
      log_msg = args[0]
625
    else:
626
      (log_type, log_msg) = args
627

    
628
    # The time is split to make serialization easier and not lose
629
    # precision.
630
    timestamp = utils.SplitTime(time.time())
631
    self._AppendFeedback(timestamp, log_type, log_msg)
632

    
633
  def CurrentPriority(self):
634
    """Returns current priority for opcode.
635

636
    """
637
    assert self._op.status in (constants.OP_STATUS_WAITING,
638
                               constants.OP_STATUS_CANCELING)
639

    
640
    # Cancel here if we were asked to
641
    self._CheckCancel()
642

    
643
    return self._op.priority
644

    
645
  def SubmitManyJobs(self, jobs):
646
    """Submits jobs for processing.
647

648
    See L{JobQueue.SubmitManyJobs}.
649

650
    """
651
    # Locking is done in job queue
652
    return self._queue.SubmitManyJobs(jobs)
653

    
654

    
655
class _JobChangesChecker(object):
656
  def __init__(self, fields, prev_job_info, prev_log_serial):
657
    """Initializes this class.
658

659
    @type fields: list of strings
660
    @param fields: Fields requested by LUXI client
661
    @type prev_job_info: string
662
    @param prev_job_info: previous job info, as passed by the LUXI client
663
    @type prev_log_serial: string
664
    @param prev_log_serial: previous job serial, as passed by the LUXI client
665

666
    """
667
    self._squery = _SimpleJobQuery(fields)
668
    self._prev_job_info = prev_job_info
669
    self._prev_log_serial = prev_log_serial
670

    
671
  def __call__(self, job):
672
    """Checks whether job has changed.
673

674
    @type job: L{_QueuedJob}
675
    @param job: Job object
676

677
    """
678
    assert not job.writable, "Expected read-only job"
679

    
680
    status = job.CalcStatus()
681
    job_info = self._squery(job)
682
    log_entries = job.GetLogEntries(self._prev_log_serial)
683

    
684
    # Serializing and deserializing data can cause type changes (e.g. from
685
    # tuple to list) or precision loss. We're doing it here so that we get
686
    # the same modifications as the data received from the client. Without
687
    # this, the comparison afterwards might fail without the data being
688
    # significantly different.
689
    # TODO: we just deserialized from disk, investigate how to make sure that
690
    # the job info and log entries are compatible to avoid this further step.
691
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
692
    # efficient, though floats will be tricky
693
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
694
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
695

    
696
    # Don't even try to wait if the job is no longer running, there will be
697
    # no changes.
698
    if (status not in (constants.JOB_STATUS_QUEUED,
699
                       constants.JOB_STATUS_RUNNING,
700
                       constants.JOB_STATUS_WAITING) or
701
        job_info != self._prev_job_info or
702
        (log_entries and self._prev_log_serial != log_entries[0][0])):
703
      logging.debug("Job %s changed", job.id)
704
      return (job_info, log_entries)
705

    
706
    return None
707

    
708

    
709
class _JobFileChangesWaiter(object):
710
  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
711
    """Initializes this class.
712

713
    @type filename: string
714
    @param filename: Path to job file
715
    @raises errors.InotifyError: if the notifier cannot be setup
716

717
    """
718
    self._wm = _inotify_wm_cls()
719
    self._inotify_handler = \
720
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
721
    self._notifier = \
722
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
723
    try:
724
      self._inotify_handler.enable()
725
    except Exception:
726
      # pyinotify doesn't close file descriptors automatically
727
      self._notifier.stop()
728
      raise
729

    
730
  def _OnInotify(self, notifier_enabled):
731
    """Callback for inotify.
732

733
    """
734
    if not notifier_enabled:
735
      self._inotify_handler.enable()
736

    
737
  def Wait(self, timeout):
738
    """Waits for the job file to change.
739

740
    @type timeout: float
741
    @param timeout: Timeout in seconds
742
    @return: Whether there have been events
743

744
    """
745
    assert timeout >= 0
746
    have_events = self._notifier.check_events(timeout * 1000)
747
    if have_events:
748
      self._notifier.read_events()
749
    self._notifier.process_events()
750
    return have_events
751

    
752
  def Close(self):
753
    """Closes underlying notifier and its file descriptor.
754

755
    """
756
    self._notifier.stop()
757

    
758

    
759
class _JobChangesWaiter(object):
760
  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
761
    """Initializes this class.
762

763
    @type filename: string
764
    @param filename: Path to job file
765

766
    """
767
    self._filewaiter = None
768
    self._filename = filename
769
    self._waiter_cls = _waiter_cls
770

    
771
  def Wait(self, timeout):
772
    """Waits for a job to change.
773

774
    @type timeout: float
775
    @param timeout: Timeout in seconds
776
    @return: Whether there have been events
777

778
    """
779
    if self._filewaiter:
780
      return self._filewaiter.Wait(timeout)
781

    
782
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
783
    # If this point is reached, return immediately and let caller check the job
784
    # file again in case there were changes since the last check. This avoids a
785
    # race condition.
786
    self._filewaiter = self._waiter_cls(self._filename)
787

    
788
    return True
789

    
790
  def Close(self):
791
    """Closes underlying waiter.
792

793
    """
794
    if self._filewaiter:
795
      self._filewaiter.Close()
796

    
797

    
798
class _WaitForJobChangesHelper(object):
799
  """Helper class using inotify to wait for changes in a job file.
800

801
  This class takes a previous job status and serial, and alerts the client when
802
  the current job status has changed.
803

804
  """
805
  @staticmethod
806
  def _CheckForChanges(counter, job_load_fn, check_fn):
807
    if counter.next() > 0:
808
      # If this isn't the first check the job is given some more time to change
809
      # again. This gives better performance for jobs generating many
810
      # changes/messages.
811
      time.sleep(0.1)
812

    
813
    job = job_load_fn()
814
    if not job:
815
      raise errors.JobLost()
816

    
817
    result = check_fn(job)
818
    if result is None:
819
      raise utils.RetryAgain()
820

    
821
    return result
822

    
823
  def __call__(self, filename, job_load_fn,
824
               fields, prev_job_info, prev_log_serial, timeout,
825
               _waiter_cls=_JobChangesWaiter):
826
    """Waits for changes on a job.
827

828
    @type filename: string
829
    @param filename: File on which to wait for changes
830
    @type job_load_fn: callable
831
    @param job_load_fn: Function to load job
832
    @type fields: list of strings
833
    @param fields: Which fields to check for changes
834
    @type prev_job_info: list or None
835
    @param prev_job_info: Last job information returned
836
    @type prev_log_serial: int
837
    @param prev_log_serial: Last job message serial number
838
    @type timeout: float
839
    @param timeout: maximum time to wait in seconds
840

841
    """
842
    counter = itertools.count()
843
    try:
844
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
845
      waiter = _waiter_cls(filename)
846
      try:
847
        return utils.Retry(compat.partial(self._CheckForChanges,
848
                                          counter, job_load_fn, check_fn),
849
                           utils.RETRY_REMAINING_TIME, timeout,
850
                           wait_fn=waiter.Wait)
851
      finally:
852
        waiter.Close()
853
    except errors.JobLost:
854
      return None
855
    except utils.RetryTimeout:
856
      return constants.JOB_NOTCHANGED
857

    
858

    
859
def _EncodeOpError(err):
860
  """Encodes an error which occurred while processing an opcode.
861

862
  """
863
  if isinstance(err, errors.GenericError):
864
    to_encode = err
865
  else:
866
    to_encode = errors.OpExecError(str(err))
867

    
868
  return errors.EncodeException(to_encode)
869

    
870

    
871
class _TimeoutStrategyWrapper:
872
  def __init__(self, fn):
873
    """Initializes this class.
874

875
    """
876
    self._fn = fn
877
    self._next = None
878

    
879
  def _Advance(self):
880
    """Gets the next timeout if necessary.
881

882
    """
883
    if self._next is None:
884
      self._next = self._fn()
885

    
886
  def Peek(self):
887
    """Returns the next timeout.
888

889
    """
890
    self._Advance()
891
    return self._next
892

    
893
  def Next(self):
894
    """Returns the current timeout and advances the internal state.
895

896
    """
897
    self._Advance()
898
    result = self._next
899
    self._next = None
900
    return result
901

    
902

    
903
class _OpExecContext:
904
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
905
    """Initializes this class.
906

907
    """
908
    self.op = op
909
    self.index = index
910
    self.log_prefix = log_prefix
911
    self.summary = op.input.Summary()
912

    
913
    # Create local copy to modify
914
    if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
915
      self.jobdeps = op.input.depends[:]
916
    else:
917
      self.jobdeps = None
918

    
919
    self._timeout_strategy_factory = timeout_strategy_factory
920
    self._ResetTimeoutStrategy()
921

    
922
  def _ResetTimeoutStrategy(self):
923
    """Creates a new timeout strategy.
924

925
    """
926
    self._timeout_strategy = \
927
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
928

    
929
  def CheckPriorityIncrease(self):
930
    """Checks whether priority can and should be increased.
931

932
    Called when locks couldn't be acquired.
933

934
    """
935
    op = self.op
936

    
937
    # Exhausted all retries and next round should not use blocking acquire
938
    # for locks?
939
    if (self._timeout_strategy.Peek() is None and
940
        op.priority > constants.OP_PRIO_HIGHEST):
941
      logging.debug("Increasing priority")
942
      op.priority -= 1
943
      self._ResetTimeoutStrategy()
944
      return True
945

    
946
    return False
947

    
948
  def GetNextLockTimeout(self):
949
    """Returns the next lock acquire timeout.
950

951
    """
952
    return self._timeout_strategy.Next()
953

    
954

    
955
class _JobProcessor(object):
956
  (DEFER,
957
   WAITDEP,
958
   FINISHED) = range(1, 4)
959

    
960
  def __init__(self, queue, opexec_fn, job,
961
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
962
    """Initializes this class.
963

964
    """
965
    self.queue = queue
966
    self.opexec_fn = opexec_fn
967
    self.job = job
968
    self._timeout_strategy_factory = _timeout_strategy_factory
969

    
970
  @staticmethod
971
  def _FindNextOpcode(job, timeout_strategy_factory):
972
    """Locates the next opcode to run.
973

974
    @type job: L{_QueuedJob}
975
    @param job: Job object
976
    @param timeout_strategy_factory: Callable to create new timeout strategy
977

978
    """
979
    # Create some sort of a cache to speed up locating next opcode for future
980
    # lookups
981
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
982
    # pending and one for processed ops.
983
    if job.ops_iter is None:
984
      job.ops_iter = enumerate(job.ops)
985

    
986
    # Find next opcode to run
987
    while True:
988
      try:
989
        (idx, op) = job.ops_iter.next()
990
      except StopIteration:
991
        raise errors.ProgrammerError("Called for a finished job")
992

    
993
      if op.status == constants.OP_STATUS_RUNNING:
994
        # Found an opcode already marked as running
995
        raise errors.ProgrammerError("Called for job marked as running")
996

    
997
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
998
                             timeout_strategy_factory)
999

    
1000
      if op.status not in constants.OPS_FINALIZED:
1001
        return opctx
1002

    
1003
      # This is a job that was partially completed before master daemon
1004
      # shutdown, so it can be expected that some opcodes are already
1005
      # completed successfully (if any did error out, then the whole job
1006
      # should have been aborted and not resubmitted for processing).
1007
      logging.info("%s: opcode %s already processed, skipping",
1008
                   opctx.log_prefix, opctx.summary)
1009

    
1010
  @staticmethod
1011
  def _MarkWaitlock(job, op):
1012
    """Marks an opcode as waiting for locks.
1013

1014
    The job's start timestamp is also set if necessary.
1015

1016
    @type job: L{_QueuedJob}
1017
    @param job: Job object
1018
    @type op: L{_QueuedOpCode}
1019
    @param op: Opcode object
1020

1021
    """
1022
    assert op in job.ops
1023
    assert op.status in (constants.OP_STATUS_QUEUED,
1024
                         constants.OP_STATUS_WAITING)
1025

    
1026
    update = False
1027

    
1028
    op.result = None
1029

    
1030
    if op.status == constants.OP_STATUS_QUEUED:
1031
      op.status = constants.OP_STATUS_WAITING
1032
      update = True
1033

    
1034
    if op.start_timestamp is None:
1035
      op.start_timestamp = TimeStampNow()
1036
      update = True
1037

    
1038
    if job.start_timestamp is None:
1039
      job.start_timestamp = op.start_timestamp
1040
      update = True
1041

    
1042
    assert op.status == constants.OP_STATUS_WAITING
1043

    
1044
    return update
1045

    
1046
  @staticmethod
1047
  def _CheckDependencies(queue, job, opctx):
1048
    """Checks if an opcode has dependencies and if so, processes them.
1049

1050
    @type queue: L{JobQueue}
1051
    @param queue: Queue object
1052
    @type job: L{_QueuedJob}
1053
    @param job: Job object
1054
    @type opctx: L{_OpExecContext}
1055
    @param opctx: Opcode execution context
1056
    @rtype: bool
1057
    @return: Whether opcode will be re-scheduled by dependency tracker
1058

1059
    """
1060
    op = opctx.op
1061

    
1062
    result = False
1063

    
1064
    while opctx.jobdeps:
1065
      (dep_job_id, dep_status) = opctx.jobdeps[0]
1066

    
1067
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1068
                                                          dep_status)
1069
      assert ht.TNonEmptyString(depmsg), "No dependency message"
1070

    
1071
      logging.info("%s: %s", opctx.log_prefix, depmsg)
1072

    
1073
      if depresult == _JobDependencyManager.CONTINUE:
1074
        # Remove dependency and continue
1075
        opctx.jobdeps.pop(0)
1076

    
1077
      elif depresult == _JobDependencyManager.WAIT:
1078
        # Need to wait for notification, dependency tracker will re-add job
1079
        # to workerpool
1080
        result = True
1081
        break
1082

    
1083
      elif depresult == _JobDependencyManager.CANCEL:
1084
        # Job was cancelled, cancel this job as well
1085
        job.Cancel()
1086
        assert op.status == constants.OP_STATUS_CANCELING
1087
        break
1088

    
1089
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1090
                         _JobDependencyManager.ERROR):
1091
        # Job failed or there was an error, this job must fail
1092
        op.status = constants.OP_STATUS_ERROR
1093
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1094
        break
1095

    
1096
      else:
1097
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1098
                                     depresult)
1099

    
1100
    return result
1101

    
1102
  def _ExecOpCodeUnlocked(self, opctx):
1103
    """Processes one opcode and returns the result.
1104

1105
    """
1106
    op = opctx.op
1107

    
1108
    assert op.status == constants.OP_STATUS_WAITING
1109

    
1110
    timeout = opctx.GetNextLockTimeout()
1111

    
1112
    try:
1113
      # Make sure not to hold queue lock while calling ExecOpCode
1114
      result = self.opexec_fn(op.input,
1115
                              _OpExecCallbacks(self.queue, self.job, op),
1116
                              timeout=timeout)
1117
    except mcpu.LockAcquireTimeout:
1118
      assert timeout is not None, "Received timeout for blocking acquire"
1119
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1120

    
1121
      assert op.status in (constants.OP_STATUS_WAITING,
1122
                           constants.OP_STATUS_CANCELING)
1123

    
1124
      # Was job cancelled while we were waiting for the lock?
1125
      if op.status == constants.OP_STATUS_CANCELING:
1126
        return (constants.OP_STATUS_CANCELING, None)
1127

    
1128
      # Queue is shutting down, return to queued
1129
      if not self.queue.AcceptingJobsUnlocked():
1130
        return (constants.OP_STATUS_QUEUED, None)
1131

    
1132
      # Stay in waitlock while trying to re-acquire lock
1133
      return (constants.OP_STATUS_WAITING, None)
1134
    except CancelJob:
1135
      logging.exception("%s: Canceling job", opctx.log_prefix)
1136
      assert op.status == constants.OP_STATUS_CANCELING
1137
      return (constants.OP_STATUS_CANCELING, None)
1138

    
1139
    except QueueShutdown:
1140
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1141

    
1142
      assert op.status == constants.OP_STATUS_WAITING
1143

    
1144
      # Job hadn't been started yet, so it should return to the queue
1145
      return (constants.OP_STATUS_QUEUED, None)
1146

    
1147
    except Exception, err: # pylint: disable=W0703
1148
      logging.exception("%s: Caught exception in %s",
1149
                        opctx.log_prefix, opctx.summary)
1150
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1151
    else:
1152
      logging.debug("%s: %s successful",
1153
                    opctx.log_prefix, opctx.summary)
1154
      return (constants.OP_STATUS_SUCCESS, result)
1155

    
1156
  def __call__(self, _nextop_fn=None):
1157
    """Continues execution of a job.
1158

1159
    @param _nextop_fn: Callback function for tests
1160
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1161
      be deferred and C{WAITDEP} if the dependency manager
1162
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1163

1164
    """
1165
    queue = self.queue
1166
    job = self.job
1167

    
1168
    logging.debug("Processing job %s", job.id)
1169

    
1170
    queue.acquire(shared=1)
1171
    try:
1172
      opcount = len(job.ops)
1173

    
1174
      assert job.writable, "Expected writable job"
1175

    
1176
      # Don't do anything for finalized jobs
1177
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1178
        return self.FINISHED
1179

    
1180
      # Is a previous opcode still pending?
1181
      if job.cur_opctx:
1182
        opctx = job.cur_opctx
1183
        job.cur_opctx = None
1184
      else:
1185
        if __debug__ and _nextop_fn:
1186
          _nextop_fn()
1187
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1188

    
1189
      op = opctx.op
1190

    
1191
      # Consistency check
1192
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1193
                                     constants.OP_STATUS_CANCELING)
1194
                        for i in job.ops[opctx.index + 1:])
1195

    
1196
      assert op.status in (constants.OP_STATUS_QUEUED,
1197
                           constants.OP_STATUS_WAITING,
1198
                           constants.OP_STATUS_CANCELING)
1199

    
1200
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1201
              op.priority >= constants.OP_PRIO_HIGHEST)
1202

    
1203
      waitjob = None
1204

    
1205
      if op.status != constants.OP_STATUS_CANCELING:
1206
        assert op.status in (constants.OP_STATUS_QUEUED,
1207
                             constants.OP_STATUS_WAITING)
1208

    
1209
        # Prepare to start opcode
1210
        if self._MarkWaitlock(job, op):
1211
          # Write to disk
1212
          queue.UpdateJobUnlocked(job)
1213

    
1214
        assert op.status == constants.OP_STATUS_WAITING
1215
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1216
        assert job.start_timestamp and op.start_timestamp
1217
        assert waitjob is None
1218

    
1219
        # Check if waiting for a job is necessary
1220
        waitjob = self._CheckDependencies(queue, job, opctx)
1221

    
1222
        assert op.status in (constants.OP_STATUS_WAITING,
1223
                             constants.OP_STATUS_CANCELING,
1224
                             constants.OP_STATUS_ERROR)
1225

    
1226
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1227
                                         constants.OP_STATUS_ERROR)):
1228
          logging.info("%s: opcode %s waiting for locks",
1229
                       opctx.log_prefix, opctx.summary)
1230

    
1231
          assert not opctx.jobdeps, "Not all dependencies were removed"
1232

    
1233
          queue.release()
1234
          try:
1235
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1236
          finally:
1237
            queue.acquire(shared=1)
1238

    
1239
          op.status = op_status
1240
          op.result = op_result
1241

    
1242
          assert not waitjob
1243

    
1244
        if op.status in (constants.OP_STATUS_WAITING,
1245
                         constants.OP_STATUS_QUEUED):
1246
          # waiting: Couldn't get locks in time
1247
          # queued: Queue is shutting down
1248
          assert not op.end_timestamp
1249
        else:
1250
          # Finalize opcode
1251
          op.end_timestamp = TimeStampNow()
1252

    
1253
          if op.status == constants.OP_STATUS_CANCELING:
1254
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1255
                                  for i in job.ops[opctx.index:])
1256
          else:
1257
            assert op.status in constants.OPS_FINALIZED
1258

    
1259
      if op.status == constants.OP_STATUS_QUEUED:
1260
        # Queue is shutting down
1261
        assert not waitjob
1262

    
1263
        finalize = False
1264

    
1265
        # Reset context
1266
        job.cur_opctx = None
1267

    
1268
        # In no case must the status be finalized here
1269
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1270

    
1271
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1272
        finalize = False
1273

    
1274
        if not waitjob and opctx.CheckPriorityIncrease():
1275
          # Priority was changed, need to update on-disk file
1276
          queue.UpdateJobUnlocked(job)
1277

    
1278
        # Keep around for another round
1279
        job.cur_opctx = opctx
1280

    
1281
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1282
                op.priority >= constants.OP_PRIO_HIGHEST)
1283

    
1284
        # In no case must the status be finalized here
1285
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1286

    
1287
      else:
1288
        # Ensure all opcodes so far have been successful
1289
        assert (opctx.index == 0 or
1290
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1291
                           for i in job.ops[:opctx.index]))
1292

    
1293
        # Reset context
1294
        job.cur_opctx = None
1295

    
1296
        if op.status == constants.OP_STATUS_SUCCESS:
1297
          finalize = False
1298

    
1299
        elif op.status == constants.OP_STATUS_ERROR:
1300
          # Ensure failed opcode has an exception as its result
1301
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1302

    
1303
          to_encode = errors.OpExecError("Preceding opcode failed")
1304
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1305
                                _EncodeOpError(to_encode))
1306
          finalize = True
1307

    
1308
          # Consistency check
1309
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1310
                            errors.GetEncodedError(i.result)
1311
                            for i in job.ops[opctx.index:])
1312

    
1313
        elif op.status == constants.OP_STATUS_CANCELING:
1314
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1315
                                "Job canceled by request")
1316
          finalize = True
1317

    
1318
        else:
1319
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1320

    
1321
        if opctx.index == (opcount - 1):
1322
          # Finalize on last opcode
1323
          finalize = True
1324

    
1325
        if finalize:
1326
          # All opcodes have been run, finalize job
1327
          job.Finalize()
1328

    
1329
        # Write to disk. If the job status is final, this is the final write
1330
        # allowed. Once the file has been written, it can be archived anytime.
1331
        queue.UpdateJobUnlocked(job)
1332

    
1333
        assert not waitjob
1334

    
1335
        if finalize:
1336
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1337
          return self.FINISHED
1338

    
1339
      assert not waitjob or queue.depmgr.JobWaiting(job)
1340

    
1341
      if waitjob:
1342
        return self.WAITDEP
1343
      else:
1344
        return self.DEFER
1345
    finally:
1346
      assert job.writable, "Job became read-only while being processed"
1347
      queue.release()
1348

    
1349

    
1350
def _EvaluateJobProcessorResult(depmgr, job, result):
1351
  """Looks at a result from L{_JobProcessor} for a job.
1352

1353
  To be used in a L{_JobQueueWorker}.
1354

1355
  """
1356
  if result == _JobProcessor.FINISHED:
1357
    # Notify waiting jobs
1358
    depmgr.NotifyWaiters(job.id)
1359

    
1360
  elif result == _JobProcessor.DEFER:
1361
    # Schedule again
1362
    raise workerpool.DeferTask(priority=job.CalcPriority())
1363

    
1364
  elif result == _JobProcessor.WAITDEP:
1365
    # No-op, dependency manager will re-schedule
1366
    pass
1367

    
1368
  else:
1369
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1370
                                 (result, ))
1371

    
1372

    
1373
class _JobQueueWorker(workerpool.BaseWorker):
1374
  """The actual job workers.
1375

1376
  """
1377
  def RunTask(self, job): # pylint: disable=W0221
1378
    """Job executor.
1379

1380
    @type job: L{_QueuedJob}
1381
    @param job: the job to be processed
1382

1383
    """
1384
    assert job.writable, "Expected writable job"
1385

    
1386
    # Ensure only one worker is active on a single job. If a job registers for
1387
    # a dependency job, and the other job notifies before the first worker is
1388
    # done, the job can end up in the tasklist more than once.
1389
    job.processor_lock.acquire()
1390
    try:
1391
      return self._RunTaskInner(job)
1392
    finally:
1393
      job.processor_lock.release()
1394

    
1395
  def _RunTaskInner(self, job):
1396
    """Executes a job.
1397

1398
    Must be called with per-job lock acquired.
1399

1400
    """
1401
    queue = job.queue
1402
    assert queue == self.pool.queue
1403

    
1404
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1405
    setname_fn(None)
1406

    
1407
    proc = mcpu.Processor(queue.context, job.id)
1408

    
1409
    # Create wrapper for setting thread name
1410
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1411
                                    proc.ExecOpCode)
1412

    
1413
    _EvaluateJobProcessorResult(queue.depmgr, job,
1414
                                _JobProcessor(queue, wrap_execop_fn, job)())
1415

    
1416
  @staticmethod
1417
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1418
    """Updates the worker thread name to include a short summary of the opcode.
1419

1420
    @param setname_fn: Callable setting worker thread name
1421
    @param execop_fn: Callable for executing opcode (usually
1422
                      L{mcpu.Processor.ExecOpCode})
1423

1424
    """
1425
    setname_fn(op)
1426
    try:
1427
      return execop_fn(op, *args, **kwargs)
1428
    finally:
1429
      setname_fn(None)
1430

    
1431
  @staticmethod
1432
  def _GetWorkerName(job, op):
1433
    """Sets the worker thread name.
1434

1435
    @type job: L{_QueuedJob}
1436
    @type op: L{opcodes.OpCode}
1437

1438
    """
1439
    parts = ["Job%s" % job.id]
1440

    
1441
    if op:
1442
      parts.append(op.TinySummary())
1443

    
1444
    return "/".join(parts)
1445

    
1446

    
1447
class _JobQueueWorkerPool(workerpool.WorkerPool):
1448
  """Simple class implementing a job-processing workerpool.
1449

1450
  """
1451
  def __init__(self, queue):
1452
    super(_JobQueueWorkerPool, self).__init__("Jq",
1453
                                              JOBQUEUE_THREADS,
1454
                                              _JobQueueWorker)
1455
    self.queue = queue
1456

    
1457

    
1458
class _JobDependencyManager:
1459
  """Keeps track of job dependencies.
1460

1461
  """
1462
  (WAIT,
1463
   ERROR,
1464
   CANCEL,
1465
   CONTINUE,
1466
   WRONGSTATUS) = range(1, 6)
1467

    
1468
  def __init__(self, getstatus_fn, enqueue_fn):
1469
    """Initializes this class.
1470

1471
    """
1472
    self._getstatus_fn = getstatus_fn
1473
    self._enqueue_fn = enqueue_fn
1474

    
1475
    self._waiters = {}
1476
    self._lock = locking.SharedLock("JobDepMgr")
1477

    
1478
  @locking.ssynchronized(_LOCK, shared=1)
1479
  def GetLockInfo(self, requested): # pylint: disable=W0613
1480
    """Retrieves information about waiting jobs.
1481

1482
    @type requested: set
1483
    @param requested: Requested information, see C{query.LQ_*}
1484

1485
    """
1486
    # No need to sort here, that's being done by the lock manager and query
1487
    # library. There are no priorities for notifying jobs, hence all show up as
1488
    # one item under "pending".
1489
    return [("job/%s" % job_id, None, None,
1490
             [("job", [job.id for job in waiters])])
1491
            for job_id, waiters in self._waiters.items()
1492
            if waiters]
1493

    
1494
  @locking.ssynchronized(_LOCK, shared=1)
1495
  def JobWaiting(self, job):
1496
    """Checks if a job is waiting.
1497

1498
    """
1499
    return compat.any(job in jobs
1500
                      for jobs in self._waiters.values())
1501

    
1502
  @locking.ssynchronized(_LOCK)
1503
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1504
    """Checks if a dependency job has the requested status.
1505

1506
    If the other job is not yet in a finalized status, the calling job will be
1507
    notified (re-added to the workerpool) at a later point.
1508

1509
    @type job: L{_QueuedJob}
1510
    @param job: Job object
1511
    @type dep_job_id: int
1512
    @param dep_job_id: ID of dependency job
1513
    @type dep_status: list
1514
    @param dep_status: Required status
1515

1516
    """
1517
    assert ht.TJobId(job.id)
1518
    assert ht.TJobId(dep_job_id)
1519
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1520

    
1521
    if job.id == dep_job_id:
1522
      return (self.ERROR, "Job can't depend on itself")
1523

    
1524
    # Get status of dependency job
1525
    try:
1526
      status = self._getstatus_fn(dep_job_id)
1527
    except errors.JobLost, err:
1528
      return (self.ERROR, "Dependency error: %s" % err)
1529

    
1530
    assert status in constants.JOB_STATUS_ALL
1531

    
1532
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1533

    
1534
    if status not in constants.JOBS_FINALIZED:
1535
      # Register for notification and wait for job to finish
1536
      job_id_waiters.add(job)
1537
      return (self.WAIT,
1538
              "Need to wait for job %s, wanted status '%s'" %
1539
              (dep_job_id, dep_status))
1540

    
1541
    # Remove from waiters list
1542
    if job in job_id_waiters:
1543
      job_id_waiters.remove(job)
1544

    
1545
    if (status == constants.JOB_STATUS_CANCELED and
1546
        constants.JOB_STATUS_CANCELED not in dep_status):
1547
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1548

    
1549
    elif not dep_status or status in dep_status:
1550
      return (self.CONTINUE,
1551
              "Dependency job %s finished with status '%s'" %
1552
              (dep_job_id, status))
1553

    
1554
    else:
1555
      return (self.WRONGSTATUS,
1556
              "Dependency job %s finished with status '%s',"
1557
              " not one of '%s' as required" %
1558
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1559

    
1560
  def _RemoveEmptyWaitersUnlocked(self):
1561
    """Remove all jobs without actual waiters.
1562

1563
    """
1564
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1565
                   if not waiters]:
1566
      del self._waiters[job_id]
1567

    
1568
  def NotifyWaiters(self, job_id):
1569
    """Notifies all jobs waiting for a certain job ID.
1570

1571
    @attention: Do not call until L{CheckAndRegister} returned a status other
1572
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1573
    @type job_id: int
1574
    @param job_id: Job ID
1575

1576
    """
1577
    assert ht.TJobId(job_id)
1578

    
1579
    self._lock.acquire()
1580
    try:
1581
      self._RemoveEmptyWaitersUnlocked()
1582

    
1583
      jobs = self._waiters.pop(job_id, None)
1584
    finally:
1585
      self._lock.release()
1586

    
1587
    if jobs:
1588
      # Re-add jobs to workerpool
1589
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1590
                    len(jobs), job_id)
1591
      self._enqueue_fn(jobs)
1592

    
1593

    
1594
def _RequireOpenQueue(fn):
1595
  """Decorator for "public" functions.
1596

1597
  This function should be used for all 'public' functions. That is,
1598
  functions usually called from other classes. Note that this should
1599
  be applied only to methods (not plain functions), since it expects
1600
  that the decorated function is called with a first argument that has
1601
  a '_queue_filelock' argument.
1602

1603
  @warning: Use this decorator only after locking.ssynchronized
1604

1605
  Example::
1606
    @locking.ssynchronized(_LOCK)
1607
    @_RequireOpenQueue
1608
    def Example(self):
1609
      pass
1610

1611
  """
1612
  def wrapper(self, *args, **kwargs):
1613
    # pylint: disable=W0212
1614
    assert self._queue_filelock is not None, "Queue should be open"
1615
    return fn(self, *args, **kwargs)
1616
  return wrapper
1617

    
1618

    
1619
def _RequireNonDrainedQueue(fn):
1620
  """Decorator checking for a non-drained queue.
1621

1622
  To be used with functions submitting new jobs.
1623

1624
  """
1625
  def wrapper(self, *args, **kwargs):
1626
    """Wrapper function.
1627

1628
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1629

1630
    """
1631
    # Ok when sharing the big job queue lock, as the drain file is created when
1632
    # the lock is exclusive.
1633
    # Needs access to protected member, pylint: disable=W0212
1634
    if self._drained:
1635
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1636

    
1637
    if not self._accepting_jobs:
1638
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1639

    
1640
    return fn(self, *args, **kwargs)
1641
  return wrapper
1642

    
1643

    
1644
class JobQueue(object):
1645
  """Queue used to manage the jobs.
1646

1647
  """
1648
  def __init__(self, context):
1649
    """Constructor for JobQueue.
1650

1651
    The constructor will initialize the job queue object and then
1652
    start loading the current jobs from disk, either for starting them
1653
    (if they were queue) or for aborting them (if they were already
1654
    running).
1655

1656
    @type context: GanetiContext
1657
    @param context: the context object for access to the configuration
1658
        data and other ganeti objects
1659

1660
    """
1661
    self.context = context
1662
    self._memcache = weakref.WeakValueDictionary()
1663
    self._my_hostname = netutils.Hostname.GetSysName()
1664

    
1665
    # The Big JobQueue lock. If a code block or method acquires it in shared
1666
    # mode safe it must guarantee concurrency with all the code acquiring it in
1667
    # shared mode, including itself. In order not to acquire it at all
1668
    # concurrency must be guaranteed with all code acquiring it in shared mode
1669
    # and all code acquiring it exclusively.
1670
    self._lock = locking.SharedLock("JobQueue")
1671

    
1672
    self.acquire = self._lock.acquire
1673
    self.release = self._lock.release
1674

    
1675
    # Accept jobs by default
1676
    self._accepting_jobs = True
1677

    
1678
    # Initialize the queue, and acquire the filelock.
1679
    # This ensures no other process is working on the job queue.
1680
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1681

    
1682
    # Read serial file
1683
    self._last_serial = jstore.ReadSerial()
1684
    assert self._last_serial is not None, ("Serial file was modified between"
1685
                                           " check in jstore and here")
1686

    
1687
    # Get initial list of nodes
1688
    self._nodes = dict((n.name, n.primary_ip)
1689
                       for n in self.context.cfg.GetAllNodesInfo().values()
1690
                       if n.master_candidate)
1691

    
1692
    # Remove master node
1693
    self._nodes.pop(self._my_hostname, None)
1694

    
1695
    # TODO: Check consistency across nodes
1696

    
1697
    self._queue_size = None
1698
    self._UpdateQueueSizeUnlocked()
1699
    assert ht.TInt(self._queue_size)
1700
    self._drained = jstore.CheckDrainFlag()
1701

    
1702
    # Job dependencies
1703
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1704
                                        self._EnqueueJobs)
1705
    self.context.glm.AddToLockMonitor(self.depmgr)
1706

    
1707
    # Setup worker pool
1708
    self._wpool = _JobQueueWorkerPool(self)
1709
    try:
1710
      self._InspectQueue()
1711
    except:
1712
      self._wpool.TerminateWorkers()
1713
      raise
1714

    
1715
  @locking.ssynchronized(_LOCK)
1716
  @_RequireOpenQueue
1717
  def _InspectQueue(self):
1718
    """Loads the whole job queue and resumes unfinished jobs.
1719

1720
    This function needs the lock here because WorkerPool.AddTask() may start a
1721
    job while we're still doing our work.
1722

1723
    """
1724
    logging.info("Inspecting job queue")
1725

    
1726
    restartjobs = []
1727

    
1728
    all_job_ids = self._GetJobIDsUnlocked()
1729
    jobs_count = len(all_job_ids)
1730
    lastinfo = time.time()
1731
    for idx, job_id in enumerate(all_job_ids):
1732
      # Give an update every 1000 jobs or 10 seconds
1733
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1734
          idx == (jobs_count - 1)):
1735
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1736
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1737
        lastinfo = time.time()
1738

    
1739
      job = self._LoadJobUnlocked(job_id)
1740

    
1741
      # a failure in loading the job can cause 'None' to be returned
1742
      if job is None:
1743
        continue
1744

    
1745
      status = job.CalcStatus()
1746

    
1747
      if status == constants.JOB_STATUS_QUEUED:
1748
        restartjobs.append(job)
1749

    
1750
      elif status in (constants.JOB_STATUS_RUNNING,
1751
                      constants.JOB_STATUS_WAITING,
1752
                      constants.JOB_STATUS_CANCELING):
1753
        logging.warning("Unfinished job %s found: %s", job.id, job)
1754

    
1755
        if status == constants.JOB_STATUS_WAITING:
1756
          # Restart job
1757
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1758
          restartjobs.append(job)
1759
        else:
1760
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1761
                                "Unclean master daemon shutdown")
1762
          job.Finalize()
1763

    
1764
        self.UpdateJobUnlocked(job)
1765

    
1766
    if restartjobs:
1767
      logging.info("Restarting %s jobs", len(restartjobs))
1768
      self._EnqueueJobsUnlocked(restartjobs)
1769

    
1770
    logging.info("Job queue inspection finished")
1771

    
1772
  def _GetRpc(self, address_list):
1773
    """Gets RPC runner with context.
1774

1775
    """
1776
    return rpc.JobQueueRunner(self.context, address_list)
1777

    
1778
  @locking.ssynchronized(_LOCK)
1779
  @_RequireOpenQueue
1780
  def AddNode(self, node):
1781
    """Register a new node with the queue.
1782

1783
    @type node: L{objects.Node}
1784
    @param node: the node object to be added
1785

1786
    """
1787
    node_name = node.name
1788
    assert node_name != self._my_hostname
1789

    
1790
    # Clean queue directory on added node
1791
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1792
    msg = result.fail_msg
1793
    if msg:
1794
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1795
                      node_name, msg)
1796

    
1797
    if not node.master_candidate:
1798
      # remove if existing, ignoring errors
1799
      self._nodes.pop(node_name, None)
1800
      # and skip the replication of the job ids
1801
      return
1802

    
1803
    # Upload the whole queue excluding archived jobs
1804
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1805

    
1806
    # Upload current serial file
1807
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1808

    
1809
    # Static address list
1810
    addrs = [node.primary_ip]
1811

    
1812
    for file_name in files:
1813
      # Read file content
1814
      content = utils.ReadFile(file_name)
1815

    
1816
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1817
                             file_name, content)
1818
      msg = result[node_name].fail_msg
1819
      if msg:
1820
        logging.error("Failed to upload file %s to node %s: %s",
1821
                      file_name, node_name, msg)
1822

    
1823
    # Set queue drained flag
1824
    result = \
1825
      self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1826
                                                       self._drained)
1827
    msg = result[node_name].fail_msg
1828
    if msg:
1829
      logging.error("Failed to set queue drained flag on node %s: %s",
1830
                    node_name, msg)
1831

    
1832
    self._nodes[node_name] = node.primary_ip
1833

    
1834
  @locking.ssynchronized(_LOCK)
1835
  @_RequireOpenQueue
1836
  def RemoveNode(self, node_name):
1837
    """Callback called when removing nodes from the cluster.
1838

1839
    @type node_name: str
1840
    @param node_name: the name of the node to remove
1841

1842
    """
1843
    self._nodes.pop(node_name, None)
1844

    
1845
  @staticmethod
1846
  def _CheckRpcResult(result, nodes, failmsg):
1847
    """Verifies the status of an RPC call.
1848

1849
    Since we aim to keep consistency should this node (the current
1850
    master) fail, we will log errors if our rpc fail, and especially
1851
    log the case when more than half of the nodes fails.
1852

1853
    @param result: the data as returned from the rpc call
1854
    @type nodes: list
1855
    @param nodes: the list of nodes we made the call to
1856
    @type failmsg: str
1857
    @param failmsg: the identifier to be used for logging
1858

1859
    """
1860
    failed = []
1861
    success = []
1862

    
1863
    for node in nodes:
1864
      msg = result[node].fail_msg
1865
      if msg:
1866
        failed.append(node)
1867
        logging.error("RPC call %s (%s) failed on node %s: %s",
1868
                      result[node].call, failmsg, node, msg)
1869
      else:
1870
        success.append(node)
1871

    
1872
    # +1 for the master node
1873
    if (len(success) + 1) < len(failed):
1874
      # TODO: Handle failing nodes
1875
      logging.error("More than half of the nodes failed")
1876

    
1877
  def _GetNodeIp(self):
1878
    """Helper for returning the node name/ip list.
1879

1880
    @rtype: (list, list)
1881
    @return: a tuple of two lists, the first one with the node
1882
        names and the second one with the node addresses
1883

1884
    """
1885
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1886
    name_list = self._nodes.keys()
1887
    addr_list = [self._nodes[name] for name in name_list]
1888
    return name_list, addr_list
1889

    
1890
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1891
    """Writes a file locally and then replicates it to all nodes.
1892

1893
    This function will replace the contents of a file on the local
1894
    node and then replicate it to all the other nodes we have.
1895

1896
    @type file_name: str
1897
    @param file_name: the path of the file to be replicated
1898
    @type data: str
1899
    @param data: the new contents of the file
1900
    @type replicate: boolean
1901
    @param replicate: whether to spread the changes to the remote nodes
1902

1903
    """
1904
    getents = runtime.GetEnts()
1905
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1906
                    gid=getents.daemons_gid,
1907
                    mode=constants.JOB_QUEUE_FILES_PERMS)
1908

    
1909
    if replicate:
1910
      names, addrs = self._GetNodeIp()
1911
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1912
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1913

    
1914
  def _RenameFilesUnlocked(self, rename):
1915
    """Renames a file locally and then replicate the change.
1916

1917
    This function will rename a file in the local queue directory
1918
    and then replicate this rename to all the other nodes we have.
1919

1920
    @type rename: list of (old, new)
1921
    @param rename: List containing tuples mapping old to new names
1922

1923
    """
1924
    # Rename them locally
1925
    for old, new in rename:
1926
      utils.RenameFile(old, new, mkdir=True)
1927

    
1928
    # ... and on all nodes
1929
    names, addrs = self._GetNodeIp()
1930
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1931
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1932

    
1933
  def _NewSerialsUnlocked(self, count):
1934
    """Generates a new job identifier.
1935

1936
    Job identifiers are unique during the lifetime of a cluster.
1937

1938
    @type count: integer
1939
    @param count: how many serials to return
1940
    @rtype: list of int
1941
    @return: a list of job identifiers.
1942

1943
    """
1944
    assert ht.TNonNegativeInt(count)
1945

    
1946
    # New number
1947
    serial = self._last_serial + count
1948

    
1949
    # Write to file
1950
    self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1951
                             "%s\n" % serial, True)
1952

    
1953
    result = [jstore.FormatJobID(v)
1954
              for v in range(self._last_serial + 1, serial + 1)]
1955

    
1956
    # Keep it only if we were able to write the file
1957
    self._last_serial = serial
1958

    
1959
    assert len(result) == count
1960

    
1961
    return result
1962

    
1963
  @staticmethod
1964
  def _GetJobPath(job_id):
1965
    """Returns the job file for a given job id.
1966

1967
    @type job_id: str
1968
    @param job_id: the job identifier
1969
    @rtype: str
1970
    @return: the path to the job file
1971

1972
    """
1973
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1974

    
1975
  @staticmethod
1976
  def _GetArchivedJobPath(job_id):
1977
    """Returns the archived job file for a give job id.
1978

1979
    @type job_id: str
1980
    @param job_id: the job identifier
1981
    @rtype: str
1982
    @return: the path to the archived job file
1983

1984
    """
1985
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1986
                          jstore.GetArchiveDirectory(job_id),
1987
                          "job-%s" % job_id)
1988

    
1989
  @staticmethod
1990
  def _DetermineJobDirectories(archived):
1991
    """Build list of directories containing job files.
1992

1993
    @type archived: bool
1994
    @param archived: Whether to include directories for archived jobs
1995
    @rtype: list
1996

1997
    """
1998
    result = [pathutils.QUEUE_DIR]
1999

    
2000
    if archived:
2001
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
2002
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
2003
                        utils.ListVisibleFiles(archive_path)))
2004

    
2005
    return result
2006

    
2007
  @classmethod
2008
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
2009
    """Return all known job IDs.
2010

2011
    The method only looks at disk because it's a requirement that all
2012
    jobs are present on disk (so in the _memcache we don't have any
2013
    extra IDs).
2014

2015
    @type sort: boolean
2016
    @param sort: perform sorting on the returned job ids
2017
    @rtype: list
2018
    @return: the list of job IDs
2019

2020
    """
2021
    jlist = []
2022

    
2023
    for path in cls._DetermineJobDirectories(archived):
2024
      for filename in utils.ListVisibleFiles(path):
2025
        m = constants.JOB_FILE_RE.match(filename)
2026
        if m:
2027
          jlist.append(int(m.group(1)))
2028

    
2029
    if sort:
2030
      jlist.sort()
2031
    return jlist
2032

    
2033
  def _LoadJobUnlocked(self, job_id):
2034
    """Loads a job from the disk or memory.
2035

2036
    Given a job id, this will return the cached job object if
2037
    existing, or try to load the job from the disk. If loading from
2038
    disk, it will also add the job to the cache.
2039

2040
    @type job_id: int
2041
    @param job_id: the job id
2042
    @rtype: L{_QueuedJob} or None
2043
    @return: either None or the job object
2044

2045
    """
2046
    job = self._memcache.get(job_id, None)
2047
    if job:
2048
      logging.debug("Found job %s in memcache", job_id)
2049
      assert job.writable, "Found read-only job in memcache"
2050
      return job
2051

    
2052
    try:
2053
      job = self._LoadJobFromDisk(job_id, False)
2054
      if job is None:
2055
        return job
2056
    except errors.JobFileCorrupted:
2057
      old_path = self._GetJobPath(job_id)
2058
      new_path = self._GetArchivedJobPath(job_id)
2059
      if old_path == new_path:
2060
        # job already archived (future case)
2061
        logging.exception("Can't parse job %s", job_id)
2062
      else:
2063
        # non-archived case
2064
        logging.exception("Can't parse job %s, will archive.", job_id)
2065
        self._RenameFilesUnlocked([(old_path, new_path)])
2066
      return None
2067

    
2068
    assert job.writable, "Job just loaded is not writable"
2069

    
2070
    self._memcache[job_id] = job
2071
    logging.debug("Added job %s to the cache", job_id)
2072
    return job
2073

    
2074
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2075
    """Load the given job file from disk.
2076

2077
    Given a job file, read, load and restore it in a _QueuedJob format.
2078

2079
    @type job_id: int
2080
    @param job_id: job identifier
2081
    @type try_archived: bool
2082
    @param try_archived: Whether to try loading an archived job
2083
    @rtype: L{_QueuedJob} or None
2084
    @return: either None or the job object
2085

2086
    """
2087
    path_functions = [(self._GetJobPath, False)]
2088

    
2089
    if try_archived:
2090
      path_functions.append((self._GetArchivedJobPath, True))
2091

    
2092
    raw_data = None
2093
    archived = None
2094

    
2095
    for (fn, archived) in path_functions:
2096
      filepath = fn(job_id)
2097
      logging.debug("Loading job from %s", filepath)
2098
      try:
2099
        raw_data = utils.ReadFile(filepath)
2100
      except EnvironmentError, err:
2101
        if err.errno != errno.ENOENT:
2102
          raise
2103
      else:
2104
        break
2105

    
2106
    if not raw_data:
2107
      return None
2108

    
2109
    if writable is None:
2110
      writable = not archived
2111

    
2112
    try:
2113
      data = serializer.LoadJson(raw_data)
2114
      job = _QueuedJob.Restore(self, data, writable, archived)
2115
    except Exception, err: # pylint: disable=W0703
2116
      raise errors.JobFileCorrupted(err)
2117

    
2118
    return job
2119

    
2120
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2121
    """Load the given job file from disk.
2122

2123
    Given a job file, read, load and restore it in a _QueuedJob format.
2124
    In case of error reading the job, it gets returned as None, and the
2125
    exception is logged.
2126

2127
    @type job_id: int
2128
    @param job_id: job identifier
2129
    @type try_archived: bool
2130
    @param try_archived: Whether to try loading an archived job
2131
    @rtype: L{_QueuedJob} or None
2132
    @return: either None or the job object
2133

2134
    """
2135
    try:
2136
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2137
    except (errors.JobFileCorrupted, EnvironmentError):
2138
      logging.exception("Can't load/parse job %s", job_id)
2139
      return None
2140

    
2141
  def _UpdateQueueSizeUnlocked(self):
2142
    """Update the queue size.
2143

2144
    """
2145
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2146

    
2147
  @locking.ssynchronized(_LOCK)
2148
  @_RequireOpenQueue
2149
  def SetDrainFlag(self, drain_flag):
2150
    """Sets the drain flag for the queue.
2151

2152
    @type drain_flag: boolean
2153
    @param drain_flag: Whether to set or unset the drain flag
2154

2155
    """
2156
    # Change flag locally
2157
    jstore.SetDrainFlag(drain_flag)
2158

    
2159
    self._drained = drain_flag
2160

    
2161
    # ... and on all nodes
2162
    (names, addrs) = self._GetNodeIp()
2163
    result = \
2164
      self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2165
    self._CheckRpcResult(result, self._nodes,
2166
                         "Setting queue drain flag to %s" % drain_flag)
2167

    
2168
    return True
2169

    
2170
  @_RequireOpenQueue
2171
  def _SubmitJobUnlocked(self, job_id, ops):
2172
    """Create and store a new job.
2173

2174
    This enters the job into our job queue and also puts it on the new
2175
    queue, in order for it to be picked up by the queue processors.
2176

2177
    @type job_id: job ID
2178
    @param job_id: the job ID for the new job
2179
    @type ops: list
2180
    @param ops: The list of OpCodes that will become the new job.
2181
    @rtype: L{_QueuedJob}
2182
    @return: the job object to be queued
2183
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2184
    @raise errors.GenericError: If an opcode is not valid
2185

2186
    """
2187
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2188
      raise errors.JobQueueFull()
2189

    
2190
    job = _QueuedJob(self, job_id, ops, True)
2191

    
2192
    for idx, op in enumerate(job.ops):
2193
      # Check priority
2194
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2195
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2196
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2197
                                  " are %s" % (idx, op.priority, allowed))
2198

    
2199
      # Check job dependencies
2200
      dependencies = getattr(op.input, opcodes_base.DEPEND_ATTR, None)
2201
      if not opcodes_base.TNoRelativeJobDependencies(dependencies):
2202
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2203
                                  " match %s: %s" %
2204
                                  (idx, opcodes_base.TNoRelativeJobDependencies,
2205
                                   dependencies))
2206

    
2207
    # Write to disk
2208
    self.UpdateJobUnlocked(job)
2209

    
2210
    self._queue_size += 1
2211

    
2212
    logging.debug("Adding new job %s to the cache", job_id)
2213
    self._memcache[job_id] = job
2214

    
2215
    return job
2216

    
2217
  @locking.ssynchronized(_LOCK)
2218
  @_RequireOpenQueue
2219
  @_RequireNonDrainedQueue
2220
  def SubmitJob(self, ops):
2221
    """Create and store a new job.
2222

2223
    @see: L{_SubmitJobUnlocked}
2224

2225
    """
2226
    (job_id, ) = self._NewSerialsUnlocked(1)
2227
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2228
    return job_id
2229

    
2230
  @locking.ssynchronized(_LOCK)
2231
  @_RequireOpenQueue
2232
  @_RequireNonDrainedQueue
2233
  def SubmitManyJobs(self, jobs):
2234
    """Create and store multiple jobs.
2235

2236
    @see: L{_SubmitJobUnlocked}
2237

2238
    """
2239
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2240

    
2241
    (results, added_jobs) = \
2242
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2243

    
2244
    self._EnqueueJobsUnlocked(added_jobs)
2245

    
2246
    return results
2247

    
2248
  @staticmethod
2249
  def _FormatSubmitError(msg, ops):
2250
    """Formats errors which occurred while submitting a job.
2251

2252
    """
2253
    return ("%s; opcodes %s" %
2254
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2255

    
2256
  @staticmethod
2257
  def _ResolveJobDependencies(resolve_fn, deps):
2258
    """Resolves relative job IDs in dependencies.
2259

2260
    @type resolve_fn: callable
2261
    @param resolve_fn: Function to resolve a relative job ID
2262
    @type deps: list
2263
    @param deps: Dependencies
2264
    @rtype: tuple; (boolean, string or list)
2265
    @return: If successful (first tuple item), the returned list contains
2266
      resolved job IDs along with the requested status; if not successful,
2267
      the second element is an error message
2268

2269
    """
2270
    result = []
2271

    
2272
    for (dep_job_id, dep_status) in deps:
2273
      if ht.TRelativeJobId(dep_job_id):
2274
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2275
        try:
2276
          job_id = resolve_fn(dep_job_id)
2277
        except IndexError:
2278
          # Abort
2279
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2280
      else:
2281
        job_id = dep_job_id
2282

    
2283
      result.append((job_id, dep_status))
2284

    
2285
    return (True, result)
2286

    
2287
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2288
    """Create and store multiple jobs.
2289

2290
    @see: L{_SubmitJobUnlocked}
2291

2292
    """
2293
    results = []
2294
    added_jobs = []
2295

    
2296
    def resolve_fn(job_idx, reljobid):
2297
      assert reljobid < 0
2298
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2299

    
2300
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2301
      for op in ops:
2302
        if getattr(op, opcodes_base.DEPEND_ATTR, None):
2303
          (status, data) = \
2304
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2305
                                         op.depends)
2306
          if not status:
2307
            # Abort resolving dependencies
2308
            assert ht.TNonEmptyString(data), "No error message"
2309
            break
2310
          # Use resolved dependencies
2311
          op.depends = data
2312
      else:
2313
        try:
2314
          job = self._SubmitJobUnlocked(job_id, ops)
2315
        except errors.GenericError, err:
2316
          status = False
2317
          data = self._FormatSubmitError(str(err), ops)
2318
        else:
2319
          status = True
2320
          data = job_id
2321
          added_jobs.append(job)
2322

    
2323
      results.append((status, data))
2324

    
2325
    return (results, added_jobs)
2326

    
2327
  @locking.ssynchronized(_LOCK)
2328
  def _EnqueueJobs(self, jobs):
2329
    """Helper function to add jobs to worker pool's queue.
2330

2331
    @type jobs: list
2332
    @param jobs: List of all jobs
2333

2334
    """
2335
    return self._EnqueueJobsUnlocked(jobs)
2336

    
2337
  def _EnqueueJobsUnlocked(self, jobs):
2338
    """Helper function to add jobs to worker pool's queue.
2339

2340
    @type jobs: list
2341
    @param jobs: List of all jobs
2342

2343
    """
2344
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2345
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2346
                             priority=[job.CalcPriority() for job in jobs],
2347
                             task_id=map(_GetIdAttr, jobs))
2348

    
2349
  def _GetJobStatusForDependencies(self, job_id):
2350
    """Gets the status of a job for dependencies.
2351

2352
    @type job_id: int
2353
    @param job_id: Job ID
2354
    @raise errors.JobLost: If job can't be found
2355

2356
    """
2357
    # Not using in-memory cache as doing so would require an exclusive lock
2358

    
2359
    # Try to load from disk
2360
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2361

    
2362
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2363

    
2364
    if job:
2365
      return job.CalcStatus()
2366

    
2367
    raise errors.JobLost("Job %s not found" % job_id)
2368

    
2369
  @_RequireOpenQueue
2370
  def UpdateJobUnlocked(self, job, replicate=True):
2371
    """Update a job's on disk storage.
2372

2373
    After a job has been modified, this function needs to be called in
2374
    order to write the changes to disk and replicate them to the other
2375
    nodes.
2376

2377
    @type job: L{_QueuedJob}
2378
    @param job: the changed job
2379
    @type replicate: boolean
2380
    @param replicate: whether to replicate the change to remote nodes
2381

2382
    """
2383
    if __debug__:
2384
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2385
      assert (finalized ^ (job.end_timestamp is None))
2386
      assert job.writable, "Can't update read-only job"
2387
      assert not job.archived, "Can't update archived job"
2388

    
2389
    filename = self._GetJobPath(job.id)
2390
    data = serializer.DumpJson(job.Serialize())
2391
    logging.debug("Writing job %s to %s", job.id, filename)
2392
    self._UpdateJobQueueFile(filename, data, replicate)
2393

    
2394
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2395
                        timeout):
2396
    """Waits for changes in a job.
2397

2398
    @type job_id: int
2399
    @param job_id: Job identifier
2400
    @type fields: list of strings
2401
    @param fields: Which fields to check for changes
2402
    @type prev_job_info: list or None
2403
    @param prev_job_info: Last job information returned
2404
    @type prev_log_serial: int
2405
    @param prev_log_serial: Last job message serial number
2406
    @type timeout: float
2407
    @param timeout: maximum time to wait in seconds
2408
    @rtype: tuple (job info, log entries)
2409
    @return: a tuple of the job information as required via
2410
        the fields parameter, and the log entries as a list
2411

2412
        if the job has not changed and the timeout has expired,
2413
        we instead return a special value,
2414
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2415
        as such by the clients
2416

2417
    """
2418
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2419
                             writable=False)
2420

    
2421
    helper = _WaitForJobChangesHelper()
2422

    
2423
    return helper(self._GetJobPath(job_id), load_fn,
2424
                  fields, prev_job_info, prev_log_serial, timeout)
2425

    
2426
  @locking.ssynchronized(_LOCK)
2427
  @_RequireOpenQueue
2428
  def CancelJob(self, job_id):
2429
    """Cancels a job.
2430

2431
    This will only succeed if the job has not started yet.
2432

2433
    @type job_id: int
2434
    @param job_id: job ID of job to be cancelled.
2435

2436
    """
2437
    logging.info("Cancelling job %s", job_id)
2438

    
2439
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2440

    
2441
  @locking.ssynchronized(_LOCK)
2442
  @_RequireOpenQueue
2443
  def ChangeJobPriority(self, job_id, priority):
2444
    """Changes a job's priority.
2445

2446
    @type job_id: int
2447
    @param job_id: ID of the job whose priority should be changed
2448
    @type priority: int
2449
    @param priority: New priority
2450

2451
    """
2452
    logging.info("Changing priority of job %s to %s", job_id, priority)
2453

    
2454
    if priority not in constants.OP_PRIO_SUBMIT_VALID:
2455
      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2456
      raise errors.GenericError("Invalid priority %s, allowed are %s" %
2457
                                (priority, allowed))
2458

    
2459
    def fn(job):
2460
      (success, msg) = job.ChangePriority(priority)
2461

    
2462
      if success:
2463
        try:
2464
          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2465
        except workerpool.NoSuchTask:
2466
          logging.debug("Job %s is not in workerpool at this time", job.id)
2467

    
2468
      return (success, msg)
2469

    
2470
    return self._ModifyJobUnlocked(job_id, fn)
2471

    
2472
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2473
    """Modifies a job.
2474

2475
    @type job_id: int
2476
    @param job_id: Job ID
2477
    @type mod_fn: callable
2478
    @param mod_fn: Modifying function, receiving job object as parameter,
2479
      returning tuple of (status boolean, message string)
2480

2481
    """
2482
    job = self._LoadJobUnlocked(job_id)
2483
    if not job:
2484
      logging.debug("Job %s not found", job_id)
2485
      return (False, "Job %s not found" % job_id)
2486

    
2487
    assert job.writable, "Can't modify read-only job"
2488
    assert not job.archived, "Can't modify archived job"
2489

    
2490
    (success, msg) = mod_fn(job)
2491

    
2492
    if success:
2493
      # If the job was finalized (e.g. cancelled), this is the final write
2494
      # allowed. The job can be archived anytime.
2495
      self.UpdateJobUnlocked(job)
2496

    
2497
    return (success, msg)
2498

    
2499
  @_RequireOpenQueue
2500
  def _ArchiveJobsUnlocked(self, jobs):
2501
    """Archives jobs.
2502

2503
    @type jobs: list of L{_QueuedJob}
2504
    @param jobs: Job objects
2505
    @rtype: int
2506
    @return: Number of archived jobs
2507

2508
    """
2509
    archive_jobs = []
2510
    rename_files = []
2511
    for job in jobs:
2512
      assert job.writable, "Can't archive read-only job"
2513
      assert not job.archived, "Can't cancel archived job"
2514

    
2515
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2516
        logging.debug("Job %s is not yet done", job.id)
2517
        continue
2518

    
2519
      archive_jobs.append(job)
2520

    
2521
      old = self._GetJobPath(job.id)
2522
      new = self._GetArchivedJobPath(job.id)
2523
      rename_files.append((old, new))
2524

    
2525
    # TODO: What if 1..n files fail to rename?
2526
    self._RenameFilesUnlocked(rename_files)
2527

    
2528
    logging.debug("Successfully archived job(s) %s",
2529
                  utils.CommaJoin(job.id for job in archive_jobs))
2530

    
2531
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2532
    # the files, we update the cached queue size from the filesystem. When we
2533
    # get around to fix the TODO: above, we can use the number of actually
2534
    # archived jobs to fix this.
2535
    self._UpdateQueueSizeUnlocked()
2536
    return len(archive_jobs)
2537

    
2538
  @locking.ssynchronized(_LOCK)
2539
  @_RequireOpenQueue
2540
  def ArchiveJob(self, job_id):
2541
    """Archives a job.
2542

2543
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2544

2545
    @type job_id: int
2546
    @param job_id: Job ID of job to be archived.
2547
    @rtype: bool
2548
    @return: Whether job was archived
2549

2550
    """
2551
    logging.info("Archiving job %s", job_id)
2552

    
2553
    job = self._LoadJobUnlocked(job_id)
2554
    if not job:
2555
      logging.debug("Job %s not found", job_id)
2556
      return False
2557

    
2558
    return self._ArchiveJobsUnlocked([job]) == 1
2559

    
2560
  @locking.ssynchronized(_LOCK)
2561
  @_RequireOpenQueue
2562
  def AutoArchiveJobs(self, age, timeout):
2563
    """Archives all jobs based on age.
2564

2565
    The method will archive all jobs which are older than the age
2566
    parameter. For jobs that don't have an end timestamp, the start
2567
    timestamp will be considered. The special '-1' age will cause
2568
    archival of all jobs (that are not running or queued).
2569

2570
    @type age: int
2571
    @param age: the minimum age in seconds
2572

2573
    """
2574
    logging.info("Archiving jobs with age more than %s seconds", age)
2575

    
2576
    now = time.time()
2577
    end_time = now + timeout
2578
    archived_count = 0
2579
    last_touched = 0
2580

    
2581
    all_job_ids = self._GetJobIDsUnlocked()
2582
    pending = []
2583
    for idx, job_id in enumerate(all_job_ids):
2584
      last_touched = idx + 1
2585

    
2586
      # Not optimal because jobs could be pending
2587
      # TODO: Measure average duration for job archival and take number of
2588
      # pending jobs into account.
2589
      if time.time() > end_time:
2590
        break
2591

    
2592
      # Returns None if the job failed to load
2593
      job = self._LoadJobUnlocked(job_id)
2594
      if job:
2595
        if job.end_timestamp is None:
2596
          if job.start_timestamp is None:
2597
            job_age = job.received_timestamp
2598
          else:
2599
            job_age = job.start_timestamp
2600
        else:
2601
          job_age = job.end_timestamp
2602

    
2603
        if age == -1 or now - job_age[0] > age:
2604
          pending.append(job)
2605

    
2606
          # Archive 10 jobs at a time
2607
          if len(pending) >= 10:
2608
            archived_count += self._ArchiveJobsUnlocked(pending)
2609
            pending = []
2610

    
2611
    if pending:
2612
      archived_count += self._ArchiveJobsUnlocked(pending)
2613

    
2614
    return (archived_count, len(all_job_ids) - last_touched)
2615

    
2616
  def _Query(self, fields, qfilter):
2617
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2618
                       namefield="id")
2619

    
2620
    # Archived jobs are only looked at if the "archived" field is referenced
2621
    # either as a requested field or in the filter. By default archived jobs
2622
    # are ignored.
2623
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2624

    
2625
    job_ids = qobj.RequestedNames()
2626

    
2627
    list_all = (job_ids is None)
2628

    
2629
    if list_all:
2630
      # Since files are added to/removed from the queue atomically, there's no
2631
      # risk of getting the job ids in an inconsistent state.
2632
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2633

    
2634
    jobs = []
2635

    
2636
    for job_id in job_ids:
2637
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2638
      if job is not None or not list_all:
2639
        jobs.append((job_id, job))
2640

    
2641
    return (qobj, jobs, list_all)
2642

    
2643
  def QueryJobs(self, fields, qfilter):
2644
    """Returns a list of jobs in queue.
2645

2646
    @type fields: sequence
2647
    @param fields: List of wanted fields
2648
    @type qfilter: None or query2 filter (list)
2649
    @param qfilter: Query filter
2650

2651
    """
2652
    (qobj, ctx, _) = self._Query(fields, qfilter)
2653

    
2654
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2655

    
2656
  def OldStyleQueryJobs(self, job_ids, fields):
2657
    """Returns a list of jobs in queue.
2658

2659
    @type job_ids: list
2660
    @param job_ids: sequence of job identifiers or None for all
2661
    @type fields: list
2662
    @param fields: names of fields to return
2663
    @rtype: list
2664
    @return: list one element per job, each element being list with
2665
        the requested fields
2666

2667
    """
2668
    # backwards compat:
2669
    job_ids = [int(jid) for jid in job_ids]
2670
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2671

    
2672
    (qobj, ctx, _) = self._Query(fields, qfilter)
2673

    
2674
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2675

    
2676
  @locking.ssynchronized(_LOCK)
2677
  def PrepareShutdown(self):
2678
    """Prepare to stop the job queue.
2679

2680
    Disables execution of jobs in the workerpool and returns whether there are
2681
    any jobs currently running. If the latter is the case, the job queue is not
2682
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2683
    be called without interfering with any job. Queued and unfinished jobs will
2684
    be resumed next time.
2685

2686
    Once this function has been called no new job submissions will be accepted
2687
    (see L{_RequireNonDrainedQueue}).
2688

2689
    @rtype: bool
2690
    @return: Whether there are any running jobs
2691

2692
    """
2693
    if self._accepting_jobs:
2694
      self._accepting_jobs = False
2695

    
2696
      # Tell worker pool to stop processing pending tasks
2697
      self._wpool.SetActive(False)
2698

    
2699
    return self._wpool.HasRunningTasks()
2700

    
2701
  def AcceptingJobsUnlocked(self):
2702
    """Returns whether jobs are accepted.
2703

2704
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2705
    queue is shutting down.
2706

2707
    @rtype: bool
2708

2709
    """
2710
    return self._accepting_jobs
2711

    
2712
  @locking.ssynchronized(_LOCK)
2713
  @_RequireOpenQueue
2714
  def Shutdown(self):
2715
    """Stops the job queue.
2716

2717
    This shutdowns all the worker threads an closes the queue.
2718

2719
    """
2720
    self._wpool.TerminateWorkers()
2721

    
2722
    self._queue_filelock.Close()
2723
    self._queue_filelock = None