Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 9cbcb1be

History | View | Annotate | Download (79.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import time
35
import weakref
36
import threading
37
import itertools
38
import operator
39

    
40
try:
41
  # pylint: disable=E0611
42
  from pyinotify import pyinotify
43
except ImportError:
44
  import pyinotify
45

    
46
from ganeti import asyncnotifier
47
from ganeti import constants
48
from ganeti import serializer
49
from ganeti import workerpool
50
from ganeti import locking
51
from ganeti import opcodes
52
from ganeti import errors
53
from ganeti import mcpu
54
from ganeti import utils
55
from ganeti import jstore
56
from ganeti import rpc
57
from ganeti import runtime
58
from ganeti import netutils
59
from ganeti import compat
60
from ganeti import ht
61
from ganeti import query
62
from ganeti import qlang
63
from ganeti import pathutils
64
from ganeti import vcluster
65

    
66

    
67
JOBQUEUE_THREADS = 25
68

    
69
# member lock names to be passed to @ssynchronized decorator
70
_LOCK = "_lock"
71
_QUEUE = "_queue"
72

    
73
#: Retrieves "id" attribute
74
_GetIdAttr = operator.attrgetter("id")
75

    
76

    
77
class CancelJob(Exception):
78
  """Special exception to cancel a job.
79

80
  """
81

    
82

    
83
class QueueShutdown(Exception):
84
  """Special exception to abort a job when the job queue is shutting down.
85

86
  """
87

    
88

    
89
def TimeStampNow():
90
  """Returns the current timestamp.
91

92
  @rtype: tuple
93
  @return: the current time in the (seconds, microseconds) format
94

95
  """
96
  return utils.SplitTime(time.time())
97

    
98

    
99
def _CallJqUpdate(runner, names, file_name, content):
100
  """Updates job queue file after virtualizing filename.
101

102
  """
103
  virt_file_name = vcluster.MakeVirtualPath(file_name)
104
  return runner.call_jobqueue_update(names, virt_file_name, content)
105

    
106

    
107
class _SimpleJobQuery:
108
  """Wrapper for job queries.
109

110
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
111

112
  """
113
  def __init__(self, fields):
114
    """Initializes this class.
115

116
    """
117
    self._query = query.Query(query.JOB_FIELDS, fields)
118

    
119
  def __call__(self, job):
120
    """Executes a job query using cached field list.
121

122
    """
123
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
124

    
125

    
126
class _QueuedOpCode(object):
127
  """Encapsulates an opcode object.
128

129
  @ivar log: holds the execution log and consists of tuples
130
  of the form C{(log_serial, timestamp, level, message)}
131
  @ivar input: the OpCode we encapsulate
132
  @ivar status: the current status
133
  @ivar result: the result of the LU execution
134
  @ivar start_timestamp: timestamp for the start of the execution
135
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
136
  @ivar stop_timestamp: timestamp for the end of the execution
137

138
  """
139
  __slots__ = ["input", "status", "result", "log", "priority",
140
               "start_timestamp", "exec_timestamp", "end_timestamp",
141
               "__weakref__"]
142

    
143
  def __init__(self, op):
144
    """Initializes instances of this class.
145

146
    @type op: L{opcodes.OpCode}
147
    @param op: the opcode we encapsulate
148

149
    """
150
    self.input = op
151
    self.status = constants.OP_STATUS_QUEUED
152
    self.result = None
153
    self.log = []
154
    self.start_timestamp = None
155
    self.exec_timestamp = None
156
    self.end_timestamp = None
157

    
158
    # Get initial priority (it might change during the lifetime of this opcode)
159
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
160

    
161
  @classmethod
162
  def Restore(cls, state):
163
    """Restore the _QueuedOpCode from the serialized form.
164

165
    @type state: dict
166
    @param state: the serialized state
167
    @rtype: _QueuedOpCode
168
    @return: a new _QueuedOpCode instance
169

170
    """
171
    obj = _QueuedOpCode.__new__(cls)
172
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
173
    obj.status = state["status"]
174
    obj.result = state["result"]
175
    obj.log = state["log"]
176
    obj.start_timestamp = state.get("start_timestamp", None)
177
    obj.exec_timestamp = state.get("exec_timestamp", None)
178
    obj.end_timestamp = state.get("end_timestamp", None)
179
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
180
    return obj
181

    
182
  def Serialize(self):
183
    """Serializes this _QueuedOpCode.
184

185
    @rtype: dict
186
    @return: the dictionary holding the serialized state
187

188
    """
189
    return {
190
      "input": self.input.__getstate__(),
191
      "status": self.status,
192
      "result": self.result,
193
      "log": self.log,
194
      "start_timestamp": self.start_timestamp,
195
      "exec_timestamp": self.exec_timestamp,
196
      "end_timestamp": self.end_timestamp,
197
      "priority": self.priority,
198
      }
199

    
200

    
201
class _QueuedJob(object):
202
  """In-memory job representation.
203

204
  This is what we use to track the user-submitted jobs. Locking must
205
  be taken care of by users of this class.
206

207
  @type queue: L{JobQueue}
208
  @ivar queue: the parent queue
209
  @ivar id: the job ID
210
  @type ops: list
211
  @ivar ops: the list of _QueuedOpCode that constitute the job
212
  @type log_serial: int
213
  @ivar log_serial: holds the index for the next log entry
214
  @ivar received_timestamp: the timestamp for when the job was received
215
  @ivar start_timestmap: the timestamp for start of execution
216
  @ivar end_timestamp: the timestamp for end of execution
217
  @ivar writable: Whether the job is allowed to be modified
218

219
  """
220
  # pylint: disable=W0212
221
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
222
               "received_timestamp", "start_timestamp", "end_timestamp",
223
               "__weakref__", "processor_lock", "writable", "archived"]
224

    
225
  def _AddReasons(self):
226
    """Extend the reason trail
227

228
    Add the reason for all the opcodes of this job to be executed.
229

230
    """
231
    count = 0
232
    for queued_op in self.ops:
233
      op = queued_op.input
234
      reason_src = opcodes.NameToReasonSrc(op.__class__.__name__)
235
      reason_text = "job=%d;index=%d" % (self.id, count)
236
      reason = getattr(op, "reason", [])
237
      reason.append((reason_src, reason_text, utils.EpochNano()))
238
      op.reason = reason
239
      count = count + 1
240

    
241
  def __init__(self, queue, job_id, ops, writable):
242
    """Constructor for the _QueuedJob.
243

244
    @type queue: L{JobQueue}
245
    @param queue: our parent queue
246
    @type job_id: job_id
247
    @param job_id: our job id
248
    @type ops: list
249
    @param ops: the list of opcodes we hold, which will be encapsulated
250
        in _QueuedOpCodes
251
    @type writable: bool
252
    @param writable: Whether job can be modified
253

254
    """
255
    if not ops:
256
      raise errors.GenericError("A job needs at least one opcode")
257

    
258
    self.queue = queue
259
    self.id = int(job_id)
260
    self.ops = [_QueuedOpCode(op) for op in ops]
261
    self._AddReasons()
262
    self.log_serial = 0
263
    self.received_timestamp = TimeStampNow()
264
    self.start_timestamp = None
265
    self.end_timestamp = None
266
    self.archived = False
267

    
268
    self._InitInMemory(self, writable)
269

    
270
    assert not self.archived, "New jobs can not be marked as archived"
271

    
272
  @staticmethod
273
  def _InitInMemory(obj, writable):
274
    """Initializes in-memory variables.
275

276
    """
277
    obj.writable = writable
278
    obj.ops_iter = None
279
    obj.cur_opctx = None
280

    
281
    # Read-only jobs are not processed and therefore don't need a lock
282
    if writable:
283
      obj.processor_lock = threading.Lock()
284
    else:
285
      obj.processor_lock = None
286

    
287
  def __repr__(self):
288
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
289
              "id=%s" % self.id,
290
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
291

    
292
    return "<%s at %#x>" % (" ".join(status), id(self))
293

    
294
  @classmethod
295
  def Restore(cls, queue, state, writable, archived):
296
    """Restore a _QueuedJob from serialized state:
297

298
    @type queue: L{JobQueue}
299
    @param queue: to which queue the restored job belongs
300
    @type state: dict
301
    @param state: the serialized state
302
    @type writable: bool
303
    @param writable: Whether job can be modified
304
    @type archived: bool
305
    @param archived: Whether job was already archived
306
    @rtype: _JobQueue
307
    @return: the restored _JobQueue instance
308

309
    """
310
    obj = _QueuedJob.__new__(cls)
311
    obj.queue = queue
312
    obj.id = int(state["id"])
313
    obj.received_timestamp = state.get("received_timestamp", None)
314
    obj.start_timestamp = state.get("start_timestamp", None)
315
    obj.end_timestamp = state.get("end_timestamp", None)
316
    obj.archived = archived
317

    
318
    obj.ops = []
319
    obj.log_serial = 0
320
    for op_state in state["ops"]:
321
      op = _QueuedOpCode.Restore(op_state)
322
      for log_entry in op.log:
323
        obj.log_serial = max(obj.log_serial, log_entry[0])
324
      obj.ops.append(op)
325

    
326
    cls._InitInMemory(obj, writable)
327

    
328
    return obj
329

    
330
  def Serialize(self):
331
    """Serialize the _JobQueue instance.
332

333
    @rtype: dict
334
    @return: the serialized state
335

336
    """
337
    return {
338
      "id": self.id,
339
      "ops": [op.Serialize() for op in self.ops],
340
      "start_timestamp": self.start_timestamp,
341
      "end_timestamp": self.end_timestamp,
342
      "received_timestamp": self.received_timestamp,
343
      }
344

    
345
  def CalcStatus(self):
346
    """Compute the status of this job.
347

348
    This function iterates over all the _QueuedOpCodes in the job and
349
    based on their status, computes the job status.
350

351
    The algorithm is:
352
      - if we find a cancelled, or finished with error, the job
353
        status will be the same
354
      - otherwise, the last opcode with the status one of:
355
          - waitlock
356
          - canceling
357
          - running
358

359
        will determine the job status
360

361
      - otherwise, it means either all opcodes are queued, or success,
362
        and the job status will be the same
363

364
    @return: the job status
365

366
    """
367
    status = constants.JOB_STATUS_QUEUED
368

    
369
    all_success = True
370
    for op in self.ops:
371
      if op.status == constants.OP_STATUS_SUCCESS:
372
        continue
373

    
374
      all_success = False
375

    
376
      if op.status == constants.OP_STATUS_QUEUED:
377
        pass
378
      elif op.status == constants.OP_STATUS_WAITING:
379
        status = constants.JOB_STATUS_WAITING
380
      elif op.status == constants.OP_STATUS_RUNNING:
381
        status = constants.JOB_STATUS_RUNNING
382
      elif op.status == constants.OP_STATUS_CANCELING:
383
        status = constants.JOB_STATUS_CANCELING
384
        break
385
      elif op.status == constants.OP_STATUS_ERROR:
386
        status = constants.JOB_STATUS_ERROR
387
        # The whole job fails if one opcode failed
388
        break
389
      elif op.status == constants.OP_STATUS_CANCELED:
390
        status = constants.OP_STATUS_CANCELED
391
        break
392

    
393
    if all_success:
394
      status = constants.JOB_STATUS_SUCCESS
395

    
396
    return status
397

    
398
  def CalcPriority(self):
399
    """Gets the current priority for this job.
400

401
    Only unfinished opcodes are considered. When all are done, the default
402
    priority is used.
403

404
    @rtype: int
405

406
    """
407
    priorities = [op.priority for op in self.ops
408
                  if op.status not in constants.OPS_FINALIZED]
409

    
410
    if not priorities:
411
      # All opcodes are done, assume default priority
412
      return constants.OP_PRIO_DEFAULT
413

    
414
    return min(priorities)
415

    
416
  def GetLogEntries(self, newer_than):
417
    """Selectively returns the log entries.
418

419
    @type newer_than: None or int
420
    @param newer_than: if this is None, return all log entries,
421
        otherwise return only the log entries with serial higher
422
        than this value
423
    @rtype: list
424
    @return: the list of the log entries selected
425

426
    """
427
    if newer_than is None:
428
      serial = -1
429
    else:
430
      serial = newer_than
431

    
432
    entries = []
433
    for op in self.ops:
434
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
435

    
436
    return entries
437

    
438
  def GetInfo(self, fields):
439
    """Returns information about a job.
440

441
    @type fields: list
442
    @param fields: names of fields to return
443
    @rtype: list
444
    @return: list with one element for each field
445
    @raise errors.OpExecError: when an invalid field
446
        has been passed
447

448
    """
449
    return _SimpleJobQuery(fields)(self)
450

    
451
  def MarkUnfinishedOps(self, status, result):
452
    """Mark unfinished opcodes with a given status and result.
453

454
    This is an utility function for marking all running or waiting to
455
    be run opcodes with a given status. Opcodes which are already
456
    finalised are not changed.
457

458
    @param status: a given opcode status
459
    @param result: the opcode result
460

461
    """
462
    not_marked = True
463
    for op in self.ops:
464
      if op.status in constants.OPS_FINALIZED:
465
        assert not_marked, "Finalized opcodes found after non-finalized ones"
466
        continue
467
      op.status = status
468
      op.result = result
469
      not_marked = False
470

    
471
  def Finalize(self):
472
    """Marks the job as finalized.
473

474
    """
475
    self.end_timestamp = TimeStampNow()
476

    
477
  def Cancel(self):
478
    """Marks job as canceled/-ing if possible.
479

480
    @rtype: tuple; (bool, string)
481
    @return: Boolean describing whether job was successfully canceled or marked
482
      as canceling and a text message
483

484
    """
485
    status = self.CalcStatus()
486

    
487
    if status == constants.JOB_STATUS_QUEUED:
488
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
489
                             "Job canceled by request")
490
      self.Finalize()
491
      return (True, "Job %s canceled" % self.id)
492

    
493
    elif status == constants.JOB_STATUS_WAITING:
494
      # The worker will notice the new status and cancel the job
495
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
496
      return (True, "Job %s will be canceled" % self.id)
497

    
498
    else:
499
      logging.debug("Job %s is no longer waiting in the queue", self.id)
500
      return (False, "Job %s is no longer waiting in the queue" % self.id)
501

    
502
  def ChangePriority(self, priority):
503
    """Changes the job priority.
504

505
    @type priority: int
506
    @param priority: New priority
507
    @rtype: tuple; (bool, string)
508
    @return: Boolean describing whether job's priority was successfully changed
509
      and a text message
510

511
    """
512
    status = self.CalcStatus()
513

    
514
    if status in constants.JOBS_FINALIZED:
515
      return (False, "Job %s is finished" % self.id)
516
    elif status == constants.JOB_STATUS_CANCELING:
517
      return (False, "Job %s is cancelling" % self.id)
518
    else:
519
      assert status in (constants.JOB_STATUS_QUEUED,
520
                        constants.JOB_STATUS_WAITING,
521
                        constants.JOB_STATUS_RUNNING)
522

    
523
      changed = False
524
      for op in self.ops:
525
        if (op.status == constants.OP_STATUS_RUNNING or
526
            op.status in constants.OPS_FINALIZED):
527
          assert not changed, \
528
            ("Found opcode for which priority should not be changed after"
529
             " priority has been changed for previous opcodes")
530
          continue
531

    
532
        assert op.status in (constants.OP_STATUS_QUEUED,
533
                             constants.OP_STATUS_WAITING)
534

    
535
        changed = True
536

    
537
        # Set new priority (doesn't modify opcode input)
538
        op.priority = priority
539

    
540
      if changed:
541
        return (True, ("Priorities of pending opcodes for job %s have been"
542
                       " changed to %s" % (self.id, priority)))
543
      else:
544
        return (False, "Job %s had no pending opcodes" % self.id)
545

    
546

    
547
class _OpExecCallbacks(mcpu.OpExecCbBase):
548
  def __init__(self, queue, job, op):
549
    """Initializes this class.
550

551
    @type queue: L{JobQueue}
552
    @param queue: Job queue
553
    @type job: L{_QueuedJob}
554
    @param job: Job object
555
    @type op: L{_QueuedOpCode}
556
    @param op: OpCode
557

558
    """
559
    assert queue, "Queue is missing"
560
    assert job, "Job is missing"
561
    assert op, "Opcode is missing"
562

    
563
    self._queue = queue
564
    self._job = job
565
    self._op = op
566

    
567
  def _CheckCancel(self):
568
    """Raises an exception to cancel the job if asked to.
569

570
    """
571
    # Cancel here if we were asked to
572
    if self._op.status == constants.OP_STATUS_CANCELING:
573
      logging.debug("Canceling opcode")
574
      raise CancelJob()
575

    
576
    # See if queue is shutting down
577
    if not self._queue.AcceptingJobsUnlocked():
578
      logging.debug("Queue is shutting down")
579
      raise QueueShutdown()
580

    
581
  @locking.ssynchronized(_QUEUE, shared=1)
582
  def NotifyStart(self):
583
    """Mark the opcode as running, not lock-waiting.
584

585
    This is called from the mcpu code as a notifier function, when the LU is
586
    finally about to start the Exec() method. Of course, to have end-user
587
    visible results, the opcode must be initially (before calling into
588
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
589

590
    """
591
    assert self._op in self._job.ops
592
    assert self._op.status in (constants.OP_STATUS_WAITING,
593
                               constants.OP_STATUS_CANCELING)
594

    
595
    # Cancel here if we were asked to
596
    self._CheckCancel()
597

    
598
    logging.debug("Opcode is now running")
599

    
600
    self._op.status = constants.OP_STATUS_RUNNING
601
    self._op.exec_timestamp = TimeStampNow()
602

    
603
    # And finally replicate the job status
604
    self._queue.UpdateJobUnlocked(self._job)
605

    
606
  @locking.ssynchronized(_QUEUE, shared=1)
607
  def _AppendFeedback(self, timestamp, log_type, log_msg):
608
    """Internal feedback append function, with locks
609

610
    """
611
    self._job.log_serial += 1
612
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
613
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
614

    
615
  def Feedback(self, *args):
616
    """Append a log entry.
617

618
    """
619
    assert len(args) < 3
620

    
621
    if len(args) == 1:
622
      log_type = constants.ELOG_MESSAGE
623
      log_msg = args[0]
624
    else:
625
      (log_type, log_msg) = args
626

    
627
    # The time is split to make serialization easier and not lose
628
    # precision.
629
    timestamp = utils.SplitTime(time.time())
630
    self._AppendFeedback(timestamp, log_type, log_msg)
631

    
632
  def CurrentPriority(self):
633
    """Returns current priority for opcode.
634

635
    """
636
    assert self._op.status in (constants.OP_STATUS_WAITING,
637
                               constants.OP_STATUS_CANCELING)
638

    
639
    # Cancel here if we were asked to
640
    self._CheckCancel()
641

    
642
    return self._op.priority
643

    
644
  def SubmitManyJobs(self, jobs):
645
    """Submits jobs for processing.
646

647
    See L{JobQueue.SubmitManyJobs}.
648

649
    """
650
    # Locking is done in job queue
651
    return self._queue.SubmitManyJobs(jobs)
652

    
653

    
654
class _JobChangesChecker(object):
655
  def __init__(self, fields, prev_job_info, prev_log_serial):
656
    """Initializes this class.
657

658
    @type fields: list of strings
659
    @param fields: Fields requested by LUXI client
660
    @type prev_job_info: string
661
    @param prev_job_info: previous job info, as passed by the LUXI client
662
    @type prev_log_serial: string
663
    @param prev_log_serial: previous job serial, as passed by the LUXI client
664

665
    """
666
    self._squery = _SimpleJobQuery(fields)
667
    self._prev_job_info = prev_job_info
668
    self._prev_log_serial = prev_log_serial
669

    
670
  def __call__(self, job):
671
    """Checks whether job has changed.
672

673
    @type job: L{_QueuedJob}
674
    @param job: Job object
675

676
    """
677
    assert not job.writable, "Expected read-only job"
678

    
679
    status = job.CalcStatus()
680
    job_info = self._squery(job)
681
    log_entries = job.GetLogEntries(self._prev_log_serial)
682

    
683
    # Serializing and deserializing data can cause type changes (e.g. from
684
    # tuple to list) or precision loss. We're doing it here so that we get
685
    # the same modifications as the data received from the client. Without
686
    # this, the comparison afterwards might fail without the data being
687
    # significantly different.
688
    # TODO: we just deserialized from disk, investigate how to make sure that
689
    # the job info and log entries are compatible to avoid this further step.
690
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
691
    # efficient, though floats will be tricky
692
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
693
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
694

    
695
    # Don't even try to wait if the job is no longer running, there will be
696
    # no changes.
697
    if (status not in (constants.JOB_STATUS_QUEUED,
698
                       constants.JOB_STATUS_RUNNING,
699
                       constants.JOB_STATUS_WAITING) or
700
        job_info != self._prev_job_info or
701
        (log_entries and self._prev_log_serial != log_entries[0][0])):
702
      logging.debug("Job %s changed", job.id)
703
      return (job_info, log_entries)
704

    
705
    return None
706

    
707

    
708
class _JobFileChangesWaiter(object):
709
  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
710
    """Initializes this class.
711

712
    @type filename: string
713
    @param filename: Path to job file
714
    @raises errors.InotifyError: if the notifier cannot be setup
715

716
    """
717
    self._wm = _inotify_wm_cls()
718
    self._inotify_handler = \
719
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
720
    self._notifier = \
721
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
722
    try:
723
      self._inotify_handler.enable()
724
    except Exception:
725
      # pyinotify doesn't close file descriptors automatically
726
      self._notifier.stop()
727
      raise
728

    
729
  def _OnInotify(self, notifier_enabled):
730
    """Callback for inotify.
731

732
    """
733
    if not notifier_enabled:
734
      self._inotify_handler.enable()
735

    
736
  def Wait(self, timeout):
737
    """Waits for the job file to change.
738

739
    @type timeout: float
740
    @param timeout: Timeout in seconds
741
    @return: Whether there have been events
742

743
    """
744
    assert timeout >= 0
745
    have_events = self._notifier.check_events(timeout * 1000)
746
    if have_events:
747
      self._notifier.read_events()
748
    self._notifier.process_events()
749
    return have_events
750

    
751
  def Close(self):
752
    """Closes underlying notifier and its file descriptor.
753

754
    """
755
    self._notifier.stop()
756

    
757

    
758
class _JobChangesWaiter(object):
759
  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
760
    """Initializes this class.
761

762
    @type filename: string
763
    @param filename: Path to job file
764

765
    """
766
    self._filewaiter = None
767
    self._filename = filename
768
    self._waiter_cls = _waiter_cls
769

    
770
  def Wait(self, timeout):
771
    """Waits for a job to change.
772

773
    @type timeout: float
774
    @param timeout: Timeout in seconds
775
    @return: Whether there have been events
776

777
    """
778
    if self._filewaiter:
779
      return self._filewaiter.Wait(timeout)
780

    
781
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
782
    # If this point is reached, return immediately and let caller check the job
783
    # file again in case there were changes since the last check. This avoids a
784
    # race condition.
785
    self._filewaiter = self._waiter_cls(self._filename)
786

    
787
    return True
788

    
789
  def Close(self):
790
    """Closes underlying waiter.
791

792
    """
793
    if self._filewaiter:
794
      self._filewaiter.Close()
795

    
796

    
797
class _WaitForJobChangesHelper(object):
798
  """Helper class using inotify to wait for changes in a job file.
799

800
  This class takes a previous job status and serial, and alerts the client when
801
  the current job status has changed.
802

803
  """
804
  @staticmethod
805
  def _CheckForChanges(counter, job_load_fn, check_fn):
806
    if counter.next() > 0:
807
      # If this isn't the first check the job is given some more time to change
808
      # again. This gives better performance for jobs generating many
809
      # changes/messages.
810
      time.sleep(0.1)
811

    
812
    job = job_load_fn()
813
    if not job:
814
      raise errors.JobLost()
815

    
816
    result = check_fn(job)
817
    if result is None:
818
      raise utils.RetryAgain()
819

    
820
    return result
821

    
822
  def __call__(self, filename, job_load_fn,
823
               fields, prev_job_info, prev_log_serial, timeout,
824
               _waiter_cls=_JobChangesWaiter):
825
    """Waits for changes on a job.
826

827
    @type filename: string
828
    @param filename: File on which to wait for changes
829
    @type job_load_fn: callable
830
    @param job_load_fn: Function to load job
831
    @type fields: list of strings
832
    @param fields: Which fields to check for changes
833
    @type prev_job_info: list or None
834
    @param prev_job_info: Last job information returned
835
    @type prev_log_serial: int
836
    @param prev_log_serial: Last job message serial number
837
    @type timeout: float
838
    @param timeout: maximum time to wait in seconds
839

840
    """
841
    counter = itertools.count()
842
    try:
843
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
844
      waiter = _waiter_cls(filename)
845
      try:
846
        return utils.Retry(compat.partial(self._CheckForChanges,
847
                                          counter, job_load_fn, check_fn),
848
                           utils.RETRY_REMAINING_TIME, timeout,
849
                           wait_fn=waiter.Wait)
850
      finally:
851
        waiter.Close()
852
    except errors.JobLost:
853
      return None
854
    except utils.RetryTimeout:
855
      return constants.JOB_NOTCHANGED
856

    
857

    
858
def _EncodeOpError(err):
859
  """Encodes an error which occurred while processing an opcode.
860

861
  """
862
  if isinstance(err, errors.GenericError):
863
    to_encode = err
864
  else:
865
    to_encode = errors.OpExecError(str(err))
866

    
867
  return errors.EncodeException(to_encode)
868

    
869

    
870
class _TimeoutStrategyWrapper:
871
  def __init__(self, fn):
872
    """Initializes this class.
873

874
    """
875
    self._fn = fn
876
    self._next = None
877

    
878
  def _Advance(self):
879
    """Gets the next timeout if necessary.
880

881
    """
882
    if self._next is None:
883
      self._next = self._fn()
884

    
885
  def Peek(self):
886
    """Returns the next timeout.
887

888
    """
889
    self._Advance()
890
    return self._next
891

    
892
  def Next(self):
893
    """Returns the current timeout and advances the internal state.
894

895
    """
896
    self._Advance()
897
    result = self._next
898
    self._next = None
899
    return result
900

    
901

    
902
class _OpExecContext:
903
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
904
    """Initializes this class.
905

906
    """
907
    self.op = op
908
    self.index = index
909
    self.log_prefix = log_prefix
910
    self.summary = op.input.Summary()
911

    
912
    # Create local copy to modify
913
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
914
      self.jobdeps = op.input.depends[:]
915
    else:
916
      self.jobdeps = None
917

    
918
    self._timeout_strategy_factory = timeout_strategy_factory
919
    self._ResetTimeoutStrategy()
920

    
921
  def _ResetTimeoutStrategy(self):
922
    """Creates a new timeout strategy.
923

924
    """
925
    self._timeout_strategy = \
926
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
927

    
928
  def CheckPriorityIncrease(self):
929
    """Checks whether priority can and should be increased.
930

931
    Called when locks couldn't be acquired.
932

933
    """
934
    op = self.op
935

    
936
    # Exhausted all retries and next round should not use blocking acquire
937
    # for locks?
938
    if (self._timeout_strategy.Peek() is None and
939
        op.priority > constants.OP_PRIO_HIGHEST):
940
      logging.debug("Increasing priority")
941
      op.priority -= 1
942
      self._ResetTimeoutStrategy()
943
      return True
944

    
945
    return False
946

    
947
  def GetNextLockTimeout(self):
948
    """Returns the next lock acquire timeout.
949

950
    """
951
    return self._timeout_strategy.Next()
952

    
953

    
954
class _JobProcessor(object):
955
  (DEFER,
956
   WAITDEP,
957
   FINISHED) = range(1, 4)
958

    
959
  def __init__(self, queue, opexec_fn, job,
960
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
961
    """Initializes this class.
962

963
    """
964
    self.queue = queue
965
    self.opexec_fn = opexec_fn
966
    self.job = job
967
    self._timeout_strategy_factory = _timeout_strategy_factory
968

    
969
  @staticmethod
970
  def _FindNextOpcode(job, timeout_strategy_factory):
971
    """Locates the next opcode to run.
972

973
    @type job: L{_QueuedJob}
974
    @param job: Job object
975
    @param timeout_strategy_factory: Callable to create new timeout strategy
976

977
    """
978
    # Create some sort of a cache to speed up locating next opcode for future
979
    # lookups
980
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
981
    # pending and one for processed ops.
982
    if job.ops_iter is None:
983
      job.ops_iter = enumerate(job.ops)
984

    
985
    # Find next opcode to run
986
    while True:
987
      try:
988
        (idx, op) = job.ops_iter.next()
989
      except StopIteration:
990
        raise errors.ProgrammerError("Called for a finished job")
991

    
992
      if op.status == constants.OP_STATUS_RUNNING:
993
        # Found an opcode already marked as running
994
        raise errors.ProgrammerError("Called for job marked as running")
995

    
996
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
997
                             timeout_strategy_factory)
998

    
999
      if op.status not in constants.OPS_FINALIZED:
1000
        return opctx
1001

    
1002
      # This is a job that was partially completed before master daemon
1003
      # shutdown, so it can be expected that some opcodes are already
1004
      # completed successfully (if any did error out, then the whole job
1005
      # should have been aborted and not resubmitted for processing).
1006
      logging.info("%s: opcode %s already processed, skipping",
1007
                   opctx.log_prefix, opctx.summary)
1008

    
1009
  @staticmethod
1010
  def _MarkWaitlock(job, op):
1011
    """Marks an opcode as waiting for locks.
1012

1013
    The job's start timestamp is also set if necessary.
1014

1015
    @type job: L{_QueuedJob}
1016
    @param job: Job object
1017
    @type op: L{_QueuedOpCode}
1018
    @param op: Opcode object
1019

1020
    """
1021
    assert op in job.ops
1022
    assert op.status in (constants.OP_STATUS_QUEUED,
1023
                         constants.OP_STATUS_WAITING)
1024

    
1025
    update = False
1026

    
1027
    op.result = None
1028

    
1029
    if op.status == constants.OP_STATUS_QUEUED:
1030
      op.status = constants.OP_STATUS_WAITING
1031
      update = True
1032

    
1033
    if op.start_timestamp is None:
1034
      op.start_timestamp = TimeStampNow()
1035
      update = True
1036

    
1037
    if job.start_timestamp is None:
1038
      job.start_timestamp = op.start_timestamp
1039
      update = True
1040

    
1041
    assert op.status == constants.OP_STATUS_WAITING
1042

    
1043
    return update
1044

    
1045
  @staticmethod
1046
  def _CheckDependencies(queue, job, opctx):
1047
    """Checks if an opcode has dependencies and if so, processes them.
1048

1049
    @type queue: L{JobQueue}
1050
    @param queue: Queue object
1051
    @type job: L{_QueuedJob}
1052
    @param job: Job object
1053
    @type opctx: L{_OpExecContext}
1054
    @param opctx: Opcode execution context
1055
    @rtype: bool
1056
    @return: Whether opcode will be re-scheduled by dependency tracker
1057

1058
    """
1059
    op = opctx.op
1060

    
1061
    result = False
1062

    
1063
    while opctx.jobdeps:
1064
      (dep_job_id, dep_status) = opctx.jobdeps[0]
1065

    
1066
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1067
                                                          dep_status)
1068
      assert ht.TNonEmptyString(depmsg), "No dependency message"
1069

    
1070
      logging.info("%s: %s", opctx.log_prefix, depmsg)
1071

    
1072
      if depresult == _JobDependencyManager.CONTINUE:
1073
        # Remove dependency and continue
1074
        opctx.jobdeps.pop(0)
1075

    
1076
      elif depresult == _JobDependencyManager.WAIT:
1077
        # Need to wait for notification, dependency tracker will re-add job
1078
        # to workerpool
1079
        result = True
1080
        break
1081

    
1082
      elif depresult == _JobDependencyManager.CANCEL:
1083
        # Job was cancelled, cancel this job as well
1084
        job.Cancel()
1085
        assert op.status == constants.OP_STATUS_CANCELING
1086
        break
1087

    
1088
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1089
                         _JobDependencyManager.ERROR):
1090
        # Job failed or there was an error, this job must fail
1091
        op.status = constants.OP_STATUS_ERROR
1092
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1093
        break
1094

    
1095
      else:
1096
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1097
                                     depresult)
1098

    
1099
    return result
1100

    
1101
  def _ExecOpCodeUnlocked(self, opctx):
1102
    """Processes one opcode and returns the result.
1103

1104
    """
1105
    op = opctx.op
1106

    
1107
    assert op.status == constants.OP_STATUS_WAITING
1108

    
1109
    timeout = opctx.GetNextLockTimeout()
1110

    
1111
    try:
1112
      # Make sure not to hold queue lock while calling ExecOpCode
1113
      result = self.opexec_fn(op.input,
1114
                              _OpExecCallbacks(self.queue, self.job, op),
1115
                              timeout=timeout)
1116
    except mcpu.LockAcquireTimeout:
1117
      assert timeout is not None, "Received timeout for blocking acquire"
1118
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1119

    
1120
      assert op.status in (constants.OP_STATUS_WAITING,
1121
                           constants.OP_STATUS_CANCELING)
1122

    
1123
      # Was job cancelled while we were waiting for the lock?
1124
      if op.status == constants.OP_STATUS_CANCELING:
1125
        return (constants.OP_STATUS_CANCELING, None)
1126

    
1127
      # Queue is shutting down, return to queued
1128
      if not self.queue.AcceptingJobsUnlocked():
1129
        return (constants.OP_STATUS_QUEUED, None)
1130

    
1131
      # Stay in waitlock while trying to re-acquire lock
1132
      return (constants.OP_STATUS_WAITING, None)
1133
    except CancelJob:
1134
      logging.exception("%s: Canceling job", opctx.log_prefix)
1135
      assert op.status == constants.OP_STATUS_CANCELING
1136
      return (constants.OP_STATUS_CANCELING, None)
1137

    
1138
    except QueueShutdown:
1139
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1140

    
1141
      assert op.status == constants.OP_STATUS_WAITING
1142

    
1143
      # Job hadn't been started yet, so it should return to the queue
1144
      return (constants.OP_STATUS_QUEUED, None)
1145

    
1146
    except Exception, err: # pylint: disable=W0703
1147
      logging.exception("%s: Caught exception in %s",
1148
                        opctx.log_prefix, opctx.summary)
1149
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1150
    else:
1151
      logging.debug("%s: %s successful",
1152
                    opctx.log_prefix, opctx.summary)
1153
      return (constants.OP_STATUS_SUCCESS, result)
1154

    
1155
  def __call__(self, _nextop_fn=None):
1156
    """Continues execution of a job.
1157

1158
    @param _nextop_fn: Callback function for tests
1159
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1160
      be deferred and C{WAITDEP} if the dependency manager
1161
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1162

1163
    """
1164
    queue = self.queue
1165
    job = self.job
1166

    
1167
    logging.debug("Processing job %s", job.id)
1168

    
1169
    queue.acquire(shared=1)
1170
    try:
1171
      opcount = len(job.ops)
1172

    
1173
      assert job.writable, "Expected writable job"
1174

    
1175
      # Don't do anything for finalized jobs
1176
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1177
        return self.FINISHED
1178

    
1179
      # Is a previous opcode still pending?
1180
      if job.cur_opctx:
1181
        opctx = job.cur_opctx
1182
        job.cur_opctx = None
1183
      else:
1184
        if __debug__ and _nextop_fn:
1185
          _nextop_fn()
1186
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1187

    
1188
      op = opctx.op
1189

    
1190
      # Consistency check
1191
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1192
                                     constants.OP_STATUS_CANCELING)
1193
                        for i in job.ops[opctx.index + 1:])
1194

    
1195
      assert op.status in (constants.OP_STATUS_QUEUED,
1196
                           constants.OP_STATUS_WAITING,
1197
                           constants.OP_STATUS_CANCELING)
1198

    
1199
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1200
              op.priority >= constants.OP_PRIO_HIGHEST)
1201

    
1202
      waitjob = None
1203

    
1204
      if op.status != constants.OP_STATUS_CANCELING:
1205
        assert op.status in (constants.OP_STATUS_QUEUED,
1206
                             constants.OP_STATUS_WAITING)
1207

    
1208
        # Prepare to start opcode
1209
        if self._MarkWaitlock(job, op):
1210
          # Write to disk
1211
          queue.UpdateJobUnlocked(job)
1212

    
1213
        assert op.status == constants.OP_STATUS_WAITING
1214
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1215
        assert job.start_timestamp and op.start_timestamp
1216
        assert waitjob is None
1217

    
1218
        # Check if waiting for a job is necessary
1219
        waitjob = self._CheckDependencies(queue, job, opctx)
1220

    
1221
        assert op.status in (constants.OP_STATUS_WAITING,
1222
                             constants.OP_STATUS_CANCELING,
1223
                             constants.OP_STATUS_ERROR)
1224

    
1225
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1226
                                         constants.OP_STATUS_ERROR)):
1227
          logging.info("%s: opcode %s waiting for locks",
1228
                       opctx.log_prefix, opctx.summary)
1229

    
1230
          assert not opctx.jobdeps, "Not all dependencies were removed"
1231

    
1232
          queue.release()
1233
          try:
1234
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1235
          finally:
1236
            queue.acquire(shared=1)
1237

    
1238
          op.status = op_status
1239
          op.result = op_result
1240

    
1241
          assert not waitjob
1242

    
1243
        if op.status in (constants.OP_STATUS_WAITING,
1244
                         constants.OP_STATUS_QUEUED):
1245
          # waiting: Couldn't get locks in time
1246
          # queued: Queue is shutting down
1247
          assert not op.end_timestamp
1248
        else:
1249
          # Finalize opcode
1250
          op.end_timestamp = TimeStampNow()
1251

    
1252
          if op.status == constants.OP_STATUS_CANCELING:
1253
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1254
                                  for i in job.ops[opctx.index:])
1255
          else:
1256
            assert op.status in constants.OPS_FINALIZED
1257

    
1258
      if op.status == constants.OP_STATUS_QUEUED:
1259
        # Queue is shutting down
1260
        assert not waitjob
1261

    
1262
        finalize = False
1263

    
1264
        # Reset context
1265
        job.cur_opctx = None
1266

    
1267
        # In no case must the status be finalized here
1268
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1269

    
1270
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1271
        finalize = False
1272

    
1273
        if not waitjob and opctx.CheckPriorityIncrease():
1274
          # Priority was changed, need to update on-disk file
1275
          queue.UpdateJobUnlocked(job)
1276

    
1277
        # Keep around for another round
1278
        job.cur_opctx = opctx
1279

    
1280
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1281
                op.priority >= constants.OP_PRIO_HIGHEST)
1282

    
1283
        # In no case must the status be finalized here
1284
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1285

    
1286
      else:
1287
        # Ensure all opcodes so far have been successful
1288
        assert (opctx.index == 0 or
1289
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1290
                           for i in job.ops[:opctx.index]))
1291

    
1292
        # Reset context
1293
        job.cur_opctx = None
1294

    
1295
        if op.status == constants.OP_STATUS_SUCCESS:
1296
          finalize = False
1297

    
1298
        elif op.status == constants.OP_STATUS_ERROR:
1299
          # Ensure failed opcode has an exception as its result
1300
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1301

    
1302
          to_encode = errors.OpExecError("Preceding opcode failed")
1303
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1304
                                _EncodeOpError(to_encode))
1305
          finalize = True
1306

    
1307
          # Consistency check
1308
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1309
                            errors.GetEncodedError(i.result)
1310
                            for i in job.ops[opctx.index:])
1311

    
1312
        elif op.status == constants.OP_STATUS_CANCELING:
1313
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1314
                                "Job canceled by request")
1315
          finalize = True
1316

    
1317
        else:
1318
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1319

    
1320
        if opctx.index == (opcount - 1):
1321
          # Finalize on last opcode
1322
          finalize = True
1323

    
1324
        if finalize:
1325
          # All opcodes have been run, finalize job
1326
          job.Finalize()
1327

    
1328
        # Write to disk. If the job status is final, this is the final write
1329
        # allowed. Once the file has been written, it can be archived anytime.
1330
        queue.UpdateJobUnlocked(job)
1331

    
1332
        assert not waitjob
1333

    
1334
        if finalize:
1335
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1336
          return self.FINISHED
1337

    
1338
      assert not waitjob or queue.depmgr.JobWaiting(job)
1339

    
1340
      if waitjob:
1341
        return self.WAITDEP
1342
      else:
1343
        return self.DEFER
1344
    finally:
1345
      assert job.writable, "Job became read-only while being processed"
1346
      queue.release()
1347

    
1348

    
1349
def _EvaluateJobProcessorResult(depmgr, job, result):
1350
  """Looks at a result from L{_JobProcessor} for a job.
1351

1352
  To be used in a L{_JobQueueWorker}.
1353

1354
  """
1355
  if result == _JobProcessor.FINISHED:
1356
    # Notify waiting jobs
1357
    depmgr.NotifyWaiters(job.id)
1358

    
1359
  elif result == _JobProcessor.DEFER:
1360
    # Schedule again
1361
    raise workerpool.DeferTask(priority=job.CalcPriority())
1362

    
1363
  elif result == _JobProcessor.WAITDEP:
1364
    # No-op, dependency manager will re-schedule
1365
    pass
1366

    
1367
  else:
1368
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1369
                                 (result, ))
1370

    
1371

    
1372
class _JobQueueWorker(workerpool.BaseWorker):
1373
  """The actual job workers.
1374

1375
  """
1376
  def RunTask(self, job): # pylint: disable=W0221
1377
    """Job executor.
1378

1379
    @type job: L{_QueuedJob}
1380
    @param job: the job to be processed
1381

1382
    """
1383
    assert job.writable, "Expected writable job"
1384

    
1385
    # Ensure only one worker is active on a single job. If a job registers for
1386
    # a dependency job, and the other job notifies before the first worker is
1387
    # done, the job can end up in the tasklist more than once.
1388
    job.processor_lock.acquire()
1389
    try:
1390
      return self._RunTaskInner(job)
1391
    finally:
1392
      job.processor_lock.release()
1393

    
1394
  def _RunTaskInner(self, job):
1395
    """Executes a job.
1396

1397
    Must be called with per-job lock acquired.
1398

1399
    """
1400
    queue = job.queue
1401
    assert queue == self.pool.queue
1402

    
1403
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1404
    setname_fn(None)
1405

    
1406
    proc = mcpu.Processor(queue.context, job.id)
1407

    
1408
    # Create wrapper for setting thread name
1409
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1410
                                    proc.ExecOpCode)
1411

    
1412
    _EvaluateJobProcessorResult(queue.depmgr, job,
1413
                                _JobProcessor(queue, wrap_execop_fn, job)())
1414

    
1415
  @staticmethod
1416
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1417
    """Updates the worker thread name to include a short summary of the opcode.
1418

1419
    @param setname_fn: Callable setting worker thread name
1420
    @param execop_fn: Callable for executing opcode (usually
1421
                      L{mcpu.Processor.ExecOpCode})
1422

1423
    """
1424
    setname_fn(op)
1425
    try:
1426
      return execop_fn(op, *args, **kwargs)
1427
    finally:
1428
      setname_fn(None)
1429

    
1430
  @staticmethod
1431
  def _GetWorkerName(job, op):
1432
    """Sets the worker thread name.
1433

1434
    @type job: L{_QueuedJob}
1435
    @type op: L{opcodes.OpCode}
1436

1437
    """
1438
    parts = ["Job%s" % job.id]
1439

    
1440
    if op:
1441
      parts.append(op.TinySummary())
1442

    
1443
    return "/".join(parts)
1444

    
1445

    
1446
class _JobQueueWorkerPool(workerpool.WorkerPool):
1447
  """Simple class implementing a job-processing workerpool.
1448

1449
  """
1450
  def __init__(self, queue):
1451
    super(_JobQueueWorkerPool, self).__init__("Jq",
1452
                                              JOBQUEUE_THREADS,
1453
                                              _JobQueueWorker)
1454
    self.queue = queue
1455

    
1456

    
1457
class _JobDependencyManager:
1458
  """Keeps track of job dependencies.
1459

1460
  """
1461
  (WAIT,
1462
   ERROR,
1463
   CANCEL,
1464
   CONTINUE,
1465
   WRONGSTATUS) = range(1, 6)
1466

    
1467
  def __init__(self, getstatus_fn, enqueue_fn):
1468
    """Initializes this class.
1469

1470
    """
1471
    self._getstatus_fn = getstatus_fn
1472
    self._enqueue_fn = enqueue_fn
1473

    
1474
    self._waiters = {}
1475
    self._lock = locking.SharedLock("JobDepMgr")
1476

    
1477
  @locking.ssynchronized(_LOCK, shared=1)
1478
  def GetLockInfo(self, requested): # pylint: disable=W0613
1479
    """Retrieves information about waiting jobs.
1480

1481
    @type requested: set
1482
    @param requested: Requested information, see C{query.LQ_*}
1483

1484
    """
1485
    # No need to sort here, that's being done by the lock manager and query
1486
    # library. There are no priorities for notifying jobs, hence all show up as
1487
    # one item under "pending".
1488
    return [("job/%s" % job_id, None, None,
1489
             [("job", [job.id for job in waiters])])
1490
            for job_id, waiters in self._waiters.items()
1491
            if waiters]
1492

    
1493
  @locking.ssynchronized(_LOCK, shared=1)
1494
  def JobWaiting(self, job):
1495
    """Checks if a job is waiting.
1496

1497
    """
1498
    return compat.any(job in jobs
1499
                      for jobs in self._waiters.values())
1500

    
1501
  @locking.ssynchronized(_LOCK)
1502
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1503
    """Checks if a dependency job has the requested status.
1504

1505
    If the other job is not yet in a finalized status, the calling job will be
1506
    notified (re-added to the workerpool) at a later point.
1507

1508
    @type job: L{_QueuedJob}
1509
    @param job: Job object
1510
    @type dep_job_id: int
1511
    @param dep_job_id: ID of dependency job
1512
    @type dep_status: list
1513
    @param dep_status: Required status
1514

1515
    """
1516
    assert ht.TJobId(job.id)
1517
    assert ht.TJobId(dep_job_id)
1518
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1519

    
1520
    if job.id == dep_job_id:
1521
      return (self.ERROR, "Job can't depend on itself")
1522

    
1523
    # Get status of dependency job
1524
    try:
1525
      status = self._getstatus_fn(dep_job_id)
1526
    except errors.JobLost, err:
1527
      return (self.ERROR, "Dependency error: %s" % err)
1528

    
1529
    assert status in constants.JOB_STATUS_ALL
1530

    
1531
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1532

    
1533
    if status not in constants.JOBS_FINALIZED:
1534
      # Register for notification and wait for job to finish
1535
      job_id_waiters.add(job)
1536
      return (self.WAIT,
1537
              "Need to wait for job %s, wanted status '%s'" %
1538
              (dep_job_id, dep_status))
1539

    
1540
    # Remove from waiters list
1541
    if job in job_id_waiters:
1542
      job_id_waiters.remove(job)
1543

    
1544
    if (status == constants.JOB_STATUS_CANCELED and
1545
        constants.JOB_STATUS_CANCELED not in dep_status):
1546
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1547

    
1548
    elif not dep_status or status in dep_status:
1549
      return (self.CONTINUE,
1550
              "Dependency job %s finished with status '%s'" %
1551
              (dep_job_id, status))
1552

    
1553
    else:
1554
      return (self.WRONGSTATUS,
1555
              "Dependency job %s finished with status '%s',"
1556
              " not one of '%s' as required" %
1557
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1558

    
1559
  def _RemoveEmptyWaitersUnlocked(self):
1560
    """Remove all jobs without actual waiters.
1561

1562
    """
1563
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1564
                   if not waiters]:
1565
      del self._waiters[job_id]
1566

    
1567
  def NotifyWaiters(self, job_id):
1568
    """Notifies all jobs waiting for a certain job ID.
1569

1570
    @attention: Do not call until L{CheckAndRegister} returned a status other
1571
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1572
    @type job_id: int
1573
    @param job_id: Job ID
1574

1575
    """
1576
    assert ht.TJobId(job_id)
1577

    
1578
    self._lock.acquire()
1579
    try:
1580
      self._RemoveEmptyWaitersUnlocked()
1581

    
1582
      jobs = self._waiters.pop(job_id, None)
1583
    finally:
1584
      self._lock.release()
1585

    
1586
    if jobs:
1587
      # Re-add jobs to workerpool
1588
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1589
                    len(jobs), job_id)
1590
      self._enqueue_fn(jobs)
1591

    
1592

    
1593
def _RequireOpenQueue(fn):
1594
  """Decorator for "public" functions.
1595

1596
  This function should be used for all 'public' functions. That is,
1597
  functions usually called from other classes. Note that this should
1598
  be applied only to methods (not plain functions), since it expects
1599
  that the decorated function is called with a first argument that has
1600
  a '_queue_filelock' argument.
1601

1602
  @warning: Use this decorator only after locking.ssynchronized
1603

1604
  Example::
1605
    @locking.ssynchronized(_LOCK)
1606
    @_RequireOpenQueue
1607
    def Example(self):
1608
      pass
1609

1610
  """
1611
  def wrapper(self, *args, **kwargs):
1612
    # pylint: disable=W0212
1613
    assert self._queue_filelock is not None, "Queue should be open"
1614
    return fn(self, *args, **kwargs)
1615
  return wrapper
1616

    
1617

    
1618
def _RequireNonDrainedQueue(fn):
1619
  """Decorator checking for a non-drained queue.
1620

1621
  To be used with functions submitting new jobs.
1622

1623
  """
1624
  def wrapper(self, *args, **kwargs):
1625
    """Wrapper function.
1626

1627
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1628

1629
    """
1630
    # Ok when sharing the big job queue lock, as the drain file is created when
1631
    # the lock is exclusive.
1632
    # Needs access to protected member, pylint: disable=W0212
1633
    if self._drained:
1634
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1635

    
1636
    if not self._accepting_jobs:
1637
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1638

    
1639
    return fn(self, *args, **kwargs)
1640
  return wrapper
1641

    
1642

    
1643
class JobQueue(object):
1644
  """Queue used to manage the jobs.
1645

1646
  """
1647
  def __init__(self, context):
1648
    """Constructor for JobQueue.
1649

1650
    The constructor will initialize the job queue object and then
1651
    start loading the current jobs from disk, either for starting them
1652
    (if they were queue) or for aborting them (if they were already
1653
    running).
1654

1655
    @type context: GanetiContext
1656
    @param context: the context object for access to the configuration
1657
        data and other ganeti objects
1658

1659
    """
1660
    self.context = context
1661
    self._memcache = weakref.WeakValueDictionary()
1662
    self._my_hostname = netutils.Hostname.GetSysName()
1663

    
1664
    # The Big JobQueue lock. If a code block or method acquires it in shared
1665
    # mode safe it must guarantee concurrency with all the code acquiring it in
1666
    # shared mode, including itself. In order not to acquire it at all
1667
    # concurrency must be guaranteed with all code acquiring it in shared mode
1668
    # and all code acquiring it exclusively.
1669
    self._lock = locking.SharedLock("JobQueue")
1670

    
1671
    self.acquire = self._lock.acquire
1672
    self.release = self._lock.release
1673

    
1674
    # Accept jobs by default
1675
    self._accepting_jobs = True
1676

    
1677
    # Initialize the queue, and acquire the filelock.
1678
    # This ensures no other process is working on the job queue.
1679
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1680

    
1681
    # Read serial file
1682
    self._last_serial = jstore.ReadSerial()
1683
    assert self._last_serial is not None, ("Serial file was modified between"
1684
                                           " check in jstore and here")
1685

    
1686
    # Get initial list of nodes
1687
    self._nodes = dict((n.name, n.primary_ip)
1688
                       for n in self.context.cfg.GetAllNodesInfo().values()
1689
                       if n.master_candidate)
1690

    
1691
    # Remove master node
1692
    self._nodes.pop(self._my_hostname, None)
1693

    
1694
    # TODO: Check consistency across nodes
1695

    
1696
    self._queue_size = None
1697
    self._UpdateQueueSizeUnlocked()
1698
    assert ht.TInt(self._queue_size)
1699
    self._drained = jstore.CheckDrainFlag()
1700

    
1701
    # Job dependencies
1702
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1703
                                        self._EnqueueJobs)
1704
    self.context.glm.AddToLockMonitor(self.depmgr)
1705

    
1706
    # Setup worker pool
1707
    self._wpool = _JobQueueWorkerPool(self)
1708
    try:
1709
      self._InspectQueue()
1710
    except:
1711
      self._wpool.TerminateWorkers()
1712
      raise
1713

    
1714
  @locking.ssynchronized(_LOCK)
1715
  @_RequireOpenQueue
1716
  def _InspectQueue(self):
1717
    """Loads the whole job queue and resumes unfinished jobs.
1718

1719
    This function needs the lock here because WorkerPool.AddTask() may start a
1720
    job while we're still doing our work.
1721

1722
    """
1723
    logging.info("Inspecting job queue")
1724

    
1725
    restartjobs = []
1726

    
1727
    all_job_ids = self._GetJobIDsUnlocked()
1728
    jobs_count = len(all_job_ids)
1729
    lastinfo = time.time()
1730
    for idx, job_id in enumerate(all_job_ids):
1731
      # Give an update every 1000 jobs or 10 seconds
1732
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1733
          idx == (jobs_count - 1)):
1734
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1735
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1736
        lastinfo = time.time()
1737

    
1738
      job = self._LoadJobUnlocked(job_id)
1739

    
1740
      # a failure in loading the job can cause 'None' to be returned
1741
      if job is None:
1742
        continue
1743

    
1744
      status = job.CalcStatus()
1745

    
1746
      if status == constants.JOB_STATUS_QUEUED:
1747
        restartjobs.append(job)
1748

    
1749
      elif status in (constants.JOB_STATUS_RUNNING,
1750
                      constants.JOB_STATUS_WAITING,
1751
                      constants.JOB_STATUS_CANCELING):
1752
        logging.warning("Unfinished job %s found: %s", job.id, job)
1753

    
1754
        if status == constants.JOB_STATUS_WAITING:
1755
          # Restart job
1756
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1757
          restartjobs.append(job)
1758
        else:
1759
          to_encode = errors.OpExecError("Unclean master daemon shutdown")
1760
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1761
                                _EncodeOpError(to_encode))
1762
          job.Finalize()
1763

    
1764
        self.UpdateJobUnlocked(job)
1765

    
1766
    if restartjobs:
1767
      logging.info("Restarting %s jobs", len(restartjobs))
1768
      self._EnqueueJobsUnlocked(restartjobs)
1769

    
1770
    logging.info("Job queue inspection finished")
1771

    
1772
  def _GetRpc(self, address_list):
1773
    """Gets RPC runner with context.
1774

1775
    """
1776
    return rpc.JobQueueRunner(self.context, address_list)
1777

    
1778
  @locking.ssynchronized(_LOCK)
1779
  @_RequireOpenQueue
1780
  def AddNode(self, node):
1781
    """Register a new node with the queue.
1782

1783
    @type node: L{objects.Node}
1784
    @param node: the node object to be added
1785

1786
    """
1787
    node_name = node.name
1788
    assert node_name != self._my_hostname
1789

    
1790
    # Clean queue directory on added node
1791
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1792
    msg = result.fail_msg
1793
    if msg:
1794
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1795
                      node_name, msg)
1796

    
1797
    if not node.master_candidate:
1798
      # remove if existing, ignoring errors
1799
      self._nodes.pop(node_name, None)
1800
      # and skip the replication of the job ids
1801
      return
1802

    
1803
    # Upload the whole queue excluding archived jobs
1804
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1805

    
1806
    # Upload current serial file
1807
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1808

    
1809
    # Static address list
1810
    addrs = [node.primary_ip]
1811

    
1812
    for file_name in files:
1813
      # Read file content
1814
      content = utils.ReadFile(file_name)
1815

    
1816
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1817
                             file_name, content)
1818
      msg = result[node_name].fail_msg
1819
      if msg:
1820
        logging.error("Failed to upload file %s to node %s: %s",
1821
                      file_name, node_name, msg)
1822

    
1823
    # Set queue drained flag
1824
    result = \
1825
      self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1826
                                                       self._drained)
1827
    msg = result[node_name].fail_msg
1828
    if msg:
1829
      logging.error("Failed to set queue drained flag on node %s: %s",
1830
                    node_name, msg)
1831

    
1832
    self._nodes[node_name] = node.primary_ip
1833

    
1834
  @locking.ssynchronized(_LOCK)
1835
  @_RequireOpenQueue
1836
  def RemoveNode(self, node_name):
1837
    """Callback called when removing nodes from the cluster.
1838

1839
    @type node_name: str
1840
    @param node_name: the name of the node to remove
1841

1842
    """
1843
    self._nodes.pop(node_name, None)
1844

    
1845
  @staticmethod
1846
  def _CheckRpcResult(result, nodes, failmsg):
1847
    """Verifies the status of an RPC call.
1848

1849
    Since we aim to keep consistency should this node (the current
1850
    master) fail, we will log errors if our rpc fail, and especially
1851
    log the case when more than half of the nodes fails.
1852

1853
    @param result: the data as returned from the rpc call
1854
    @type nodes: list
1855
    @param nodes: the list of nodes we made the call to
1856
    @type failmsg: str
1857
    @param failmsg: the identifier to be used for logging
1858

1859
    """
1860
    failed = []
1861
    success = []
1862

    
1863
    for node in nodes:
1864
      msg = result[node].fail_msg
1865
      if msg:
1866
        failed.append(node)
1867
        logging.error("RPC call %s (%s) failed on node %s: %s",
1868
                      result[node].call, failmsg, node, msg)
1869
      else:
1870
        success.append(node)
1871

    
1872
    # +1 for the master node
1873
    if (len(success) + 1) < len(failed):
1874
      # TODO: Handle failing nodes
1875
      logging.error("More than half of the nodes failed")
1876

    
1877
  def _GetNodeIp(self):
1878
    """Helper for returning the node name/ip list.
1879

1880
    @rtype: (list, list)
1881
    @return: a tuple of two lists, the first one with the node
1882
        names and the second one with the node addresses
1883

1884
    """
1885
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1886
    name_list = self._nodes.keys()
1887
    addr_list = [self._nodes[name] for name in name_list]
1888
    return name_list, addr_list
1889

    
1890
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1891
    """Writes a file locally and then replicates it to all nodes.
1892

1893
    This function will replace the contents of a file on the local
1894
    node and then replicate it to all the other nodes we have.
1895

1896
    @type file_name: str
1897
    @param file_name: the path of the file to be replicated
1898
    @type data: str
1899
    @param data: the new contents of the file
1900
    @type replicate: boolean
1901
    @param replicate: whether to spread the changes to the remote nodes
1902

1903
    """
1904
    getents = runtime.GetEnts()
1905
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1906
                    gid=getents.daemons_gid,
1907
                    mode=constants.JOB_QUEUE_FILES_PERMS)
1908

    
1909
    if replicate:
1910
      names, addrs = self._GetNodeIp()
1911
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1912
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1913

    
1914
  def _RenameFilesUnlocked(self, rename):
1915
    """Renames a file locally and then replicate the change.
1916

1917
    This function will rename a file in the local queue directory
1918
    and then replicate this rename to all the other nodes we have.
1919

1920
    @type rename: list of (old, new)
1921
    @param rename: List containing tuples mapping old to new names
1922

1923
    """
1924
    # Rename them locally
1925
    for old, new in rename:
1926
      utils.RenameFile(old, new, mkdir=True)
1927

    
1928
    # ... and on all nodes
1929
    names, addrs = self._GetNodeIp()
1930
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1931
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1932

    
1933
  def _NewSerialsUnlocked(self, count):
1934
    """Generates a new job identifier.
1935

1936
    Job identifiers are unique during the lifetime of a cluster.
1937

1938
    @type count: integer
1939
    @param count: how many serials to return
1940
    @rtype: list of int
1941
    @return: a list of job identifiers.
1942

1943
    """
1944
    assert ht.TNonNegativeInt(count)
1945

    
1946
    # New number
1947
    serial = self._last_serial + count
1948

    
1949
    # Write to file
1950
    self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1951
                             "%s\n" % serial, True)
1952

    
1953
    result = [jstore.FormatJobID(v)
1954
              for v in range(self._last_serial + 1, serial + 1)]
1955

    
1956
    # Keep it only if we were able to write the file
1957
    self._last_serial = serial
1958

    
1959
    assert len(result) == count
1960

    
1961
    return result
1962

    
1963
  @staticmethod
1964
  def _GetJobPath(job_id):
1965
    """Returns the job file for a given job id.
1966

1967
    @type job_id: str
1968
    @param job_id: the job identifier
1969
    @rtype: str
1970
    @return: the path to the job file
1971

1972
    """
1973
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1974

    
1975
  @staticmethod
1976
  def _GetArchivedJobPath(job_id):
1977
    """Returns the archived job file for a give job id.
1978

1979
    @type job_id: str
1980
    @param job_id: the job identifier
1981
    @rtype: str
1982
    @return: the path to the archived job file
1983

1984
    """
1985
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1986
                          jstore.GetArchiveDirectory(job_id),
1987
                          "job-%s" % job_id)
1988

    
1989
  @staticmethod
1990
  def _DetermineJobDirectories(archived):
1991
    """Build list of directories containing job files.
1992

1993
    @type archived: bool
1994
    @param archived: Whether to include directories for archived jobs
1995
    @rtype: list
1996

1997
    """
1998
    result = [pathutils.QUEUE_DIR]
1999

    
2000
    if archived:
2001
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
2002
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
2003
                        utils.ListVisibleFiles(archive_path)))
2004

    
2005
    return result
2006

    
2007
  @classmethod
2008
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
2009
    """Return all known job IDs.
2010

2011
    The method only looks at disk because it's a requirement that all
2012
    jobs are present on disk (so in the _memcache we don't have any
2013
    extra IDs).
2014

2015
    @type sort: boolean
2016
    @param sort: perform sorting on the returned job ids
2017
    @rtype: list
2018
    @return: the list of job IDs
2019

2020
    """
2021
    jlist = []
2022

    
2023
    for path in cls._DetermineJobDirectories(archived):
2024
      for filename in utils.ListVisibleFiles(path):
2025
        m = constants.JOB_FILE_RE.match(filename)
2026
        if m:
2027
          jlist.append(int(m.group(1)))
2028

    
2029
    if sort:
2030
      jlist.sort()
2031
    return jlist
2032

    
2033
  def _LoadJobUnlocked(self, job_id):
2034
    """Loads a job from the disk or memory.
2035

2036
    Given a job id, this will return the cached job object if
2037
    existing, or try to load the job from the disk. If loading from
2038
    disk, it will also add the job to the cache.
2039

2040
    @type job_id: int
2041
    @param job_id: the job id
2042
    @rtype: L{_QueuedJob} or None
2043
    @return: either None or the job object
2044

2045
    """
2046
    job = self._memcache.get(job_id, None)
2047
    if job:
2048
      logging.debug("Found job %s in memcache", job_id)
2049
      assert job.writable, "Found read-only job in memcache"
2050
      return job
2051

    
2052
    try:
2053
      job = self._LoadJobFromDisk(job_id, False)
2054
      if job is None:
2055
        return job
2056
    except errors.JobFileCorrupted:
2057
      old_path = self._GetJobPath(job_id)
2058
      new_path = self._GetArchivedJobPath(job_id)
2059
      if old_path == new_path:
2060
        # job already archived (future case)
2061
        logging.exception("Can't parse job %s", job_id)
2062
      else:
2063
        # non-archived case
2064
        logging.exception("Can't parse job %s, will archive.", job_id)
2065
        self._RenameFilesUnlocked([(old_path, new_path)])
2066
      return None
2067

    
2068
    assert job.writable, "Job just loaded is not writable"
2069

    
2070
    self._memcache[job_id] = job
2071
    logging.debug("Added job %s to the cache", job_id)
2072
    return job
2073

    
2074
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2075
    """Load the given job file from disk.
2076

2077
    Given a job file, read, load and restore it in a _QueuedJob format.
2078

2079
    @type job_id: int
2080
    @param job_id: job identifier
2081
    @type try_archived: bool
2082
    @param try_archived: Whether to try loading an archived job
2083
    @rtype: L{_QueuedJob} or None
2084
    @return: either None or the job object
2085

2086
    """
2087
    path_functions = [(self._GetJobPath, False)]
2088

    
2089
    if try_archived:
2090
      path_functions.append((self._GetArchivedJobPath, True))
2091

    
2092
    raw_data = None
2093
    archived = None
2094

    
2095
    for (fn, archived) in path_functions:
2096
      filepath = fn(job_id)
2097
      logging.debug("Loading job from %s", filepath)
2098
      try:
2099
        raw_data = utils.ReadFile(filepath)
2100
      except EnvironmentError, err:
2101
        if err.errno != errno.ENOENT:
2102
          raise
2103
      else:
2104
        break
2105

    
2106
    if not raw_data:
2107
      return None
2108

    
2109
    if writable is None:
2110
      writable = not archived
2111

    
2112
    try:
2113
      data = serializer.LoadJson(raw_data)
2114
      job = _QueuedJob.Restore(self, data, writable, archived)
2115
    except Exception, err: # pylint: disable=W0703
2116
      raise errors.JobFileCorrupted(err)
2117

    
2118
    return job
2119

    
2120
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2121
    """Load the given job file from disk.
2122

2123
    Given a job file, read, load and restore it in a _QueuedJob format.
2124
    In case of error reading the job, it gets returned as None, and the
2125
    exception is logged.
2126

2127
    @type job_id: int
2128
    @param job_id: job identifier
2129
    @type try_archived: bool
2130
    @param try_archived: Whether to try loading an archived job
2131
    @rtype: L{_QueuedJob} or None
2132
    @return: either None or the job object
2133

2134
    """
2135
    try:
2136
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2137
    except (errors.JobFileCorrupted, EnvironmentError):
2138
      logging.exception("Can't load/parse job %s", job_id)
2139
      return None
2140

    
2141
  def _UpdateQueueSizeUnlocked(self):
2142
    """Update the queue size.
2143

2144
    """
2145
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2146

    
2147
  @locking.ssynchronized(_LOCK)
2148
  @_RequireOpenQueue
2149
  def SetDrainFlag(self, drain_flag):
2150
    """Sets the drain flag for the queue.
2151

2152
    @type drain_flag: boolean
2153
    @param drain_flag: Whether to set or unset the drain flag
2154

2155
    """
2156
    # Change flag locally
2157
    jstore.SetDrainFlag(drain_flag)
2158

    
2159
    self._drained = drain_flag
2160

    
2161
    # ... and on all nodes
2162
    (names, addrs) = self._GetNodeIp()
2163
    result = \
2164
      self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2165
    self._CheckRpcResult(result, self._nodes,
2166
                         "Setting queue drain flag to %s" % drain_flag)
2167

    
2168
    return True
2169

    
2170
  @_RequireOpenQueue
2171
  def _SubmitJobUnlocked(self, job_id, ops):
2172
    """Create and store a new job.
2173

2174
    This enters the job into our job queue and also puts it on the new
2175
    queue, in order for it to be picked up by the queue processors.
2176

2177
    @type job_id: job ID
2178
    @param job_id: the job ID for the new job
2179
    @type ops: list
2180
    @param ops: The list of OpCodes that will become the new job.
2181
    @rtype: L{_QueuedJob}
2182
    @return: the job object to be queued
2183
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2184
    @raise errors.GenericError: If an opcode is not valid
2185

2186
    """
2187
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2188
      raise errors.JobQueueFull()
2189

    
2190
    job = _QueuedJob(self, job_id, ops, True)
2191

    
2192
    for idx, op in enumerate(job.ops):
2193
      # Check priority
2194
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2195
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2196
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2197
                                  " are %s" % (idx, op.priority, allowed))
2198

    
2199
      # Check job dependencies
2200
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2201
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2202
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2203
                                  " match %s: %s" %
2204
                                  (idx, opcodes.TNoRelativeJobDependencies,
2205
                                   dependencies))
2206

    
2207
    # Write to disk
2208
    self.UpdateJobUnlocked(job)
2209

    
2210
    self._queue_size += 1
2211

    
2212
    logging.debug("Adding new job %s to the cache", job_id)
2213
    self._memcache[job_id] = job
2214

    
2215
    return job
2216

    
2217
  @locking.ssynchronized(_LOCK)
2218
  @_RequireOpenQueue
2219
  @_RequireNonDrainedQueue
2220
  def SubmitJob(self, ops):
2221
    """Create and store a new job.
2222

2223
    @see: L{_SubmitJobUnlocked}
2224

2225
    """
2226
    (job_id, ) = self._NewSerialsUnlocked(1)
2227
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2228
    return job_id
2229

    
2230
  @locking.ssynchronized(_LOCK)
2231
  @_RequireOpenQueue
2232
  @_RequireNonDrainedQueue
2233
  def SubmitManyJobs(self, jobs):
2234
    """Create and store multiple jobs.
2235

2236
    @see: L{_SubmitJobUnlocked}
2237

2238
    """
2239
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2240

    
2241
    (results, added_jobs) = \
2242
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2243

    
2244
    self._EnqueueJobsUnlocked(added_jobs)
2245

    
2246
    return results
2247

    
2248
  @staticmethod
2249
  def _FormatSubmitError(msg, ops):
2250
    """Formats errors which occurred while submitting a job.
2251

2252
    """
2253
    return ("%s; opcodes %s" %
2254
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2255

    
2256
  @staticmethod
2257
  def _ResolveJobDependencies(resolve_fn, deps):
2258
    """Resolves relative job IDs in dependencies.
2259

2260
    @type resolve_fn: callable
2261
    @param resolve_fn: Function to resolve a relative job ID
2262
    @type deps: list
2263
    @param deps: Dependencies
2264
    @rtype: tuple; (boolean, string or list)
2265
    @return: If successful (first tuple item), the returned list contains
2266
      resolved job IDs along with the requested status; if not successful,
2267
      the second element is an error message
2268

2269
    """
2270
    result = []
2271

    
2272
    for (dep_job_id, dep_status) in deps:
2273
      if ht.TRelativeJobId(dep_job_id):
2274
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2275
        try:
2276
          job_id = resolve_fn(dep_job_id)
2277
        except IndexError:
2278
          # Abort
2279
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2280
      else:
2281
        job_id = dep_job_id
2282

    
2283
      result.append((job_id, dep_status))
2284

    
2285
    return (True, result)
2286

    
2287
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2288
    """Create and store multiple jobs.
2289

2290
    @see: L{_SubmitJobUnlocked}
2291

2292
    """
2293
    results = []
2294
    added_jobs = []
2295

    
2296
    def resolve_fn(job_idx, reljobid):
2297
      assert reljobid < 0
2298
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2299

    
2300
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2301
      for op in ops:
2302
        if getattr(op, opcodes.DEPEND_ATTR, None):
2303
          (status, data) = \
2304
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2305
                                         op.depends)
2306
          if not status:
2307
            # Abort resolving dependencies
2308
            assert ht.TNonEmptyString(data), "No error message"
2309
            break
2310
          # Use resolved dependencies
2311
          op.depends = data
2312
      else:
2313
        try:
2314
          job = self._SubmitJobUnlocked(job_id, ops)
2315
        except errors.GenericError, err:
2316
          status = False
2317
          data = self._FormatSubmitError(str(err), ops)
2318
        else:
2319
          status = True
2320
          data = job_id
2321
          added_jobs.append(job)
2322

    
2323
      results.append((status, data))
2324

    
2325
    return (results, added_jobs)
2326

    
2327
  @locking.ssynchronized(_LOCK)
2328
  def _EnqueueJobs(self, jobs):
2329
    """Helper function to add jobs to worker pool's queue.
2330

2331
    @type jobs: list
2332
    @param jobs: List of all jobs
2333

2334
    """
2335
    return self._EnqueueJobsUnlocked(jobs)
2336

    
2337
  def _EnqueueJobsUnlocked(self, jobs):
2338
    """Helper function to add jobs to worker pool's queue.
2339

2340
    @type jobs: list
2341
    @param jobs: List of all jobs
2342

2343
    """
2344
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2345
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2346
                             priority=[job.CalcPriority() for job in jobs],
2347
                             task_id=map(_GetIdAttr, jobs))
2348

    
2349
  def _GetJobStatusForDependencies(self, job_id):
2350
    """Gets the status of a job for dependencies.
2351

2352
    @type job_id: int
2353
    @param job_id: Job ID
2354
    @raise errors.JobLost: If job can't be found
2355

2356
    """
2357
    # Not using in-memory cache as doing so would require an exclusive lock
2358

    
2359
    # Try to load from disk
2360
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2361

    
2362
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2363

    
2364
    if job:
2365
      return job.CalcStatus()
2366

    
2367
    raise errors.JobLost("Job %s not found" % job_id)
2368

    
2369
  @_RequireOpenQueue
2370
  def UpdateJobUnlocked(self, job, replicate=True):
2371
    """Update a job's on disk storage.
2372

2373
    After a job has been modified, this function needs to be called in
2374
    order to write the changes to disk and replicate them to the other
2375
    nodes.
2376

2377
    @type job: L{_QueuedJob}
2378
    @param job: the changed job
2379
    @type replicate: boolean
2380
    @param replicate: whether to replicate the change to remote nodes
2381

2382
    """
2383
    if __debug__:
2384
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2385
      assert (finalized ^ (job.end_timestamp is None))
2386
      assert job.writable, "Can't update read-only job"
2387
      assert not job.archived, "Can't update archived job"
2388

    
2389
    filename = self._GetJobPath(job.id)
2390
    data = serializer.DumpJson(job.Serialize())
2391
    logging.debug("Writing job %s to %s", job.id, filename)
2392
    self._UpdateJobQueueFile(filename, data, replicate)
2393

    
2394
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2395
                        timeout):
2396
    """Waits for changes in a job.
2397

2398
    @type job_id: int
2399
    @param job_id: Job identifier
2400
    @type fields: list of strings
2401
    @param fields: Which fields to check for changes
2402
    @type prev_job_info: list or None
2403
    @param prev_job_info: Last job information returned
2404
    @type prev_log_serial: int
2405
    @param prev_log_serial: Last job message serial number
2406
    @type timeout: float
2407
    @param timeout: maximum time to wait in seconds
2408
    @rtype: tuple (job info, log entries)
2409
    @return: a tuple of the job information as required via
2410
        the fields parameter, and the log entries as a list
2411

2412
        if the job has not changed and the timeout has expired,
2413
        we instead return a special value,
2414
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2415
        as such by the clients
2416

2417
    """
2418
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2419
                             writable=False)
2420

    
2421
    helper = _WaitForJobChangesHelper()
2422

    
2423
    return helper(self._GetJobPath(job_id), load_fn,
2424
                  fields, prev_job_info, prev_log_serial, timeout)
2425

    
2426
  @locking.ssynchronized(_LOCK)
2427
  @_RequireOpenQueue
2428
  def CancelJob(self, job_id):
2429
    """Cancels a job.
2430

2431
    This will only succeed if the job has not started yet.
2432

2433
    @type job_id: int
2434
    @param job_id: job ID of job to be cancelled.
2435

2436
    """
2437
    logging.info("Cancelling job %s", job_id)
2438

    
2439
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2440

    
2441
  @locking.ssynchronized(_LOCK)
2442
  @_RequireOpenQueue
2443
  def ChangeJobPriority(self, job_id, priority):
2444
    """Changes a job's priority.
2445

2446
    @type job_id: int
2447
    @param job_id: ID of the job whose priority should be changed
2448
    @type priority: int
2449
    @param priority: New priority
2450

2451
    """
2452
    logging.info("Changing priority of job %s to %s", job_id, priority)
2453

    
2454
    if priority not in constants.OP_PRIO_SUBMIT_VALID:
2455
      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2456
      raise errors.GenericError("Invalid priority %s, allowed are %s" %
2457
                                (priority, allowed))
2458

    
2459
    def fn(job):
2460
      (success, msg) = job.ChangePriority(priority)
2461

    
2462
      if success:
2463
        try:
2464
          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2465
        except workerpool.NoSuchTask:
2466
          logging.debug("Job %s is not in workerpool at this time", job.id)
2467

    
2468
      return (success, msg)
2469

    
2470
    return self._ModifyJobUnlocked(job_id, fn)
2471

    
2472
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2473
    """Modifies a job.
2474

2475
    @type job_id: int
2476
    @param job_id: Job ID
2477
    @type mod_fn: callable
2478
    @param mod_fn: Modifying function, receiving job object as parameter,
2479
      returning tuple of (status boolean, message string)
2480

2481
    """
2482
    job = self._LoadJobUnlocked(job_id)
2483
    if not job:
2484
      logging.debug("Job %s not found", job_id)
2485
      return (False, "Job %s not found" % job_id)
2486

    
2487
    assert job.writable, "Can't modify read-only job"
2488
    assert not job.archived, "Can't modify archived job"
2489

    
2490
    (success, msg) = mod_fn(job)
2491

    
2492
    if success:
2493
      # If the job was finalized (e.g. cancelled), this is the final write
2494
      # allowed. The job can be archived anytime.
2495
      self.UpdateJobUnlocked(job)
2496

    
2497
    return (success, msg)
2498

    
2499
  @_RequireOpenQueue
2500
  def _ArchiveJobsUnlocked(self, jobs):
2501
    """Archives jobs.
2502

2503
    @type jobs: list of L{_QueuedJob}
2504
    @param jobs: Job objects
2505
    @rtype: int
2506
    @return: Number of archived jobs
2507

2508
    """
2509
    archive_jobs = []
2510
    rename_files = []
2511
    for job in jobs:
2512
      assert job.writable, "Can't archive read-only job"
2513
      assert not job.archived, "Can't cancel archived job"
2514

    
2515
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2516
        logging.debug("Job %s is not yet done", job.id)
2517
        continue
2518

    
2519
      archive_jobs.append(job)
2520

    
2521
      old = self._GetJobPath(job.id)
2522
      new = self._GetArchivedJobPath(job.id)
2523
      rename_files.append((old, new))
2524

    
2525
    # TODO: What if 1..n files fail to rename?
2526
    self._RenameFilesUnlocked(rename_files)
2527

    
2528
    logging.debug("Successfully archived job(s) %s",
2529
                  utils.CommaJoin(job.id for job in archive_jobs))
2530

    
2531
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2532
    # the files, we update the cached queue size from the filesystem. When we
2533
    # get around to fix the TODO: above, we can use the number of actually
2534
    # archived jobs to fix this.
2535
    self._UpdateQueueSizeUnlocked()
2536
    return len(archive_jobs)
2537

    
2538
  @locking.ssynchronized(_LOCK)
2539
  @_RequireOpenQueue
2540
  def ArchiveJob(self, job_id):
2541
    """Archives a job.
2542

2543
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2544

2545
    @type job_id: int
2546
    @param job_id: Job ID of job to be archived.
2547
    @rtype: bool
2548
    @return: Whether job was archived
2549

2550
    """
2551
    logging.info("Archiving job %s", job_id)
2552

    
2553
    job = self._LoadJobUnlocked(job_id)
2554
    if not job:
2555
      logging.debug("Job %s not found", job_id)
2556
      return False
2557

    
2558
    return self._ArchiveJobsUnlocked([job]) == 1
2559

    
2560
  @locking.ssynchronized(_LOCK)
2561
  @_RequireOpenQueue
2562
  def AutoArchiveJobs(self, age, timeout):
2563
    """Archives all jobs based on age.
2564

2565
    The method will archive all jobs which are older than the age
2566
    parameter. For jobs that don't have an end timestamp, the start
2567
    timestamp will be considered. The special '-1' age will cause
2568
    archival of all jobs (that are not running or queued).
2569

2570
    @type age: int
2571
    @param age: the minimum age in seconds
2572

2573
    """
2574
    logging.info("Archiving jobs with age more than %s seconds", age)
2575

    
2576
    now = time.time()
2577
    end_time = now + timeout
2578
    archived_count = 0
2579
    last_touched = 0
2580

    
2581
    all_job_ids = self._GetJobIDsUnlocked()
2582
    pending = []
2583
    for idx, job_id in enumerate(all_job_ids):
2584
      last_touched = idx + 1
2585

    
2586
      # Not optimal because jobs could be pending
2587
      # TODO: Measure average duration for job archival and take number of
2588
      # pending jobs into account.
2589
      if time.time() > end_time:
2590
        break
2591

    
2592
      # Returns None if the job failed to load
2593
      job = self._LoadJobUnlocked(job_id)
2594
      if job:
2595
        if job.end_timestamp is None:
2596
          if job.start_timestamp is None:
2597
            job_age = job.received_timestamp
2598
          else:
2599
            job_age = job.start_timestamp
2600
        else:
2601
          job_age = job.end_timestamp
2602

    
2603
        if age == -1 or now - job_age[0] > age:
2604
          pending.append(job)
2605

    
2606
          # Archive 10 jobs at a time
2607
          if len(pending) >= 10:
2608
            archived_count += self._ArchiveJobsUnlocked(pending)
2609
            pending = []
2610

    
2611
    if pending:
2612
      archived_count += self._ArchiveJobsUnlocked(pending)
2613

    
2614
    return (archived_count, len(all_job_ids) - last_touched)
2615

    
2616
  def _Query(self, fields, qfilter):
2617
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2618
                       namefield="id")
2619

    
2620
    # Archived jobs are only looked at if the "archived" field is referenced
2621
    # either as a requested field or in the filter. By default archived jobs
2622
    # are ignored.
2623
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2624

    
2625
    job_ids = qobj.RequestedNames()
2626

    
2627
    list_all = (job_ids is None)
2628

    
2629
    if list_all:
2630
      # Since files are added to/removed from the queue atomically, there's no
2631
      # risk of getting the job ids in an inconsistent state.
2632
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2633

    
2634
    jobs = []
2635

    
2636
    for job_id in job_ids:
2637
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2638
      if job is not None or not list_all:
2639
        jobs.append((job_id, job))
2640

    
2641
    return (qobj, jobs, list_all)
2642

    
2643
  def QueryJobs(self, fields, qfilter):
2644
    """Returns a list of jobs in queue.
2645

2646
    @type fields: sequence
2647
    @param fields: List of wanted fields
2648
    @type qfilter: None or query2 filter (list)
2649
    @param qfilter: Query filter
2650

2651
    """
2652
    (qobj, ctx, _) = self._Query(fields, qfilter)
2653

    
2654
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2655

    
2656
  def OldStyleQueryJobs(self, job_ids, fields):
2657
    """Returns a list of jobs in queue.
2658

2659
    @type job_ids: list
2660
    @param job_ids: sequence of job identifiers or None for all
2661
    @type fields: list
2662
    @param fields: names of fields to return
2663
    @rtype: list
2664
    @return: list one element per job, each element being list with
2665
        the requested fields
2666

2667
    """
2668
    # backwards compat:
2669
    job_ids = [int(jid) for jid in job_ids]
2670
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2671

    
2672
    (qobj, ctx, _) = self._Query(fields, qfilter)
2673

    
2674
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2675

    
2676
  @locking.ssynchronized(_LOCK)
2677
  def PrepareShutdown(self):
2678
    """Prepare to stop the job queue.
2679

2680
    Disables execution of jobs in the workerpool and returns whether there are
2681
    any jobs currently running. If the latter is the case, the job queue is not
2682
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2683
    be called without interfering with any job. Queued and unfinished jobs will
2684
    be resumed next time.
2685

2686
    Once this function has been called no new job submissions will be accepted
2687
    (see L{_RequireNonDrainedQueue}).
2688

2689
    @rtype: bool
2690
    @return: Whether there are any running jobs
2691

2692
    """
2693
    if self._accepting_jobs:
2694
      self._accepting_jobs = False
2695

    
2696
      # Tell worker pool to stop processing pending tasks
2697
      self._wpool.SetActive(False)
2698

    
2699
    return self._wpool.HasRunningTasks()
2700

    
2701
  def AcceptingJobsUnlocked(self):
2702
    """Returns whether jobs are accepted.
2703

2704
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2705
    queue is shutting down.
2706

2707
    @rtype: bool
2708

2709
    """
2710
    return self._accepting_jobs
2711

    
2712
  @locking.ssynchronized(_LOCK)
2713
  @_RequireOpenQueue
2714
  def Shutdown(self):
2715
    """Stops the job queue.
2716

2717
    This shutdowns all the worker threads an closes the queue.
2718

2719
    """
2720
    self._wpool.TerminateWorkers()
2721

    
2722
    self._queue_filelock.Close()
2723
    self._queue_filelock = None