Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 653bc0f1

History | View | Annotate | Download (75.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import time
35
import weakref
36
import threading
37
import itertools
38
import operator
39

    
40
try:
41
  # pylint: disable=E0611
42
  from pyinotify import pyinotify
43
except ImportError:
44
  import pyinotify
45

    
46
from ganeti import asyncnotifier
47
from ganeti import constants
48
from ganeti import serializer
49
from ganeti import workerpool
50
from ganeti import locking
51
from ganeti import luxi
52
from ganeti import opcodes
53
from ganeti import opcodes_base
54
from ganeti import errors
55
from ganeti import mcpu
56
from ganeti import utils
57
from ganeti import jstore
58
import ganeti.rpc.node as rpc
59
from ganeti import runtime
60
from ganeti import netutils
61
from ganeti import compat
62
from ganeti import ht
63
from ganeti import query
64
from ganeti import qlang
65
from ganeti import pathutils
66
from ganeti import vcluster
67

    
68

    
69
JOBQUEUE_THREADS = 25
70

    
71
# member lock names to be passed to @ssynchronized decorator
72
_LOCK = "_lock"
73
_QUEUE = "_queue"
74

    
75
#: Retrieves "id" attribute
76
_GetIdAttr = operator.attrgetter("id")
77

    
78

    
79
class CancelJob(Exception):
80
  """Special exception to cancel a job.
81

82
  """
83

    
84

    
85
class QueueShutdown(Exception):
86
  """Special exception to abort a job when the job queue is shutting down.
87

88
  """
89

    
90

    
91
def TimeStampNow():
92
  """Returns the current timestamp.
93

94
  @rtype: tuple
95
  @return: the current time in the (seconds, microseconds) format
96

97
  """
98
  return utils.SplitTime(time.time())
99

    
100

    
101
def _CallJqUpdate(runner, names, file_name, content):
102
  """Updates job queue file after virtualizing filename.
103

104
  """
105
  virt_file_name = vcluster.MakeVirtualPath(file_name)
106
  return runner.call_jobqueue_update(names, virt_file_name, content)
107

    
108

    
109
class _SimpleJobQuery:
110
  """Wrapper for job queries.
111

112
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
113

114
  """
115
  def __init__(self, fields):
116
    """Initializes this class.
117

118
    """
119
    self._query = query.Query(query.JOB_FIELDS, fields)
120

    
121
  def __call__(self, job):
122
    """Executes a job query using cached field list.
123

124
    """
125
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
126

    
127

    
128
class _QueuedOpCode(object):
129
  """Encapsulates an opcode object.
130

131
  @ivar log: holds the execution log and consists of tuples
132
  of the form C{(log_serial, timestamp, level, message)}
133
  @ivar input: the OpCode we encapsulate
134
  @ivar status: the current status
135
  @ivar result: the result of the LU execution
136
  @ivar start_timestamp: timestamp for the start of the execution
137
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
138
  @ivar stop_timestamp: timestamp for the end of the execution
139

140
  """
141
  __slots__ = ["input", "status", "result", "log", "priority",
142
               "start_timestamp", "exec_timestamp", "end_timestamp",
143
               "__weakref__"]
144

    
145
  def __init__(self, op):
146
    """Initializes instances of this class.
147

148
    @type op: L{opcodes.OpCode}
149
    @param op: the opcode we encapsulate
150

151
    """
152
    self.input = op
153
    self.status = constants.OP_STATUS_QUEUED
154
    self.result = None
155
    self.log = []
156
    self.start_timestamp = None
157
    self.exec_timestamp = None
158
    self.end_timestamp = None
159

    
160
    # Get initial priority (it might change during the lifetime of this opcode)
161
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
162

    
163
  @classmethod
164
  def Restore(cls, state):
165
    """Restore the _QueuedOpCode from the serialized form.
166

167
    @type state: dict
168
    @param state: the serialized state
169
    @rtype: _QueuedOpCode
170
    @return: a new _QueuedOpCode instance
171

172
    """
173
    obj = _QueuedOpCode.__new__(cls)
174
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
175
    obj.status = state["status"]
176
    obj.result = state["result"]
177
    obj.log = state["log"]
178
    obj.start_timestamp = state.get("start_timestamp", None)
179
    obj.exec_timestamp = state.get("exec_timestamp", None)
180
    obj.end_timestamp = state.get("end_timestamp", None)
181
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
182
    return obj
183

    
184
  def Serialize(self):
185
    """Serializes this _QueuedOpCode.
186

187
    @rtype: dict
188
    @return: the dictionary holding the serialized state
189

190
    """
191
    return {
192
      "input": self.input.__getstate__(),
193
      "status": self.status,
194
      "result": self.result,
195
      "log": self.log,
196
      "start_timestamp": self.start_timestamp,
197
      "exec_timestamp": self.exec_timestamp,
198
      "end_timestamp": self.end_timestamp,
199
      "priority": self.priority,
200
      }
201

    
202

    
203
class _QueuedJob(object):
204
  """In-memory job representation.
205

206
  This is what we use to track the user-submitted jobs. Locking must
207
  be taken care of by users of this class.
208

209
  @type queue: L{JobQueue}
210
  @ivar queue: the parent queue
211
  @ivar id: the job ID
212
  @type ops: list
213
  @ivar ops: the list of _QueuedOpCode that constitute the job
214
  @type log_serial: int
215
  @ivar log_serial: holds the index for the next log entry
216
  @ivar received_timestamp: the timestamp for when the job was received
217
  @ivar start_timestmap: the timestamp for start of execution
218
  @ivar end_timestamp: the timestamp for end of execution
219
  @ivar writable: Whether the job is allowed to be modified
220

221
  """
222
  # pylint: disable=W0212
223
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
224
               "received_timestamp", "start_timestamp", "end_timestamp",
225
               "__weakref__", "processor_lock", "writable", "archived"]
226

    
227
  def AddReasons(self):
228
    """Extend the reason trail
229

230
    Add the reason for all the opcodes of this job to be executed.
231

232
    """
233
    count = 0
234
    for queued_op in self.ops:
235
      op = queued_op.input
236
      reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__)
237
      reason_text = "job=%d;index=%d" % (self.id, count)
238
      reason = getattr(op, "reason", [])
239
      reason.append((reason_src, reason_text, utils.EpochNano()))
240
      op.reason = reason
241
      count = count + 1
242

    
243
  def __init__(self, queue, job_id, ops, writable):
244
    """Constructor for the _QueuedJob.
245

246
    @type queue: L{JobQueue}
247
    @param queue: our parent queue
248
    @type job_id: job_id
249
    @param job_id: our job id
250
    @type ops: list
251
    @param ops: the list of opcodes we hold, which will be encapsulated
252
        in _QueuedOpCodes
253
    @type writable: bool
254
    @param writable: Whether job can be modified
255

256
    """
257
    if not ops:
258
      raise errors.GenericError("A job needs at least one opcode")
259

    
260
    self.queue = queue
261
    self.id = int(job_id)
262
    self.ops = [_QueuedOpCode(op) for op in ops]
263
    self.AddReasons()
264
    self.log_serial = 0
265
    self.received_timestamp = TimeStampNow()
266
    self.start_timestamp = None
267
    self.end_timestamp = None
268
    self.archived = False
269

    
270
    self._InitInMemory(self, writable)
271

    
272
    assert not self.archived, "New jobs can not be marked as archived"
273

    
274
  @staticmethod
275
  def _InitInMemory(obj, writable):
276
    """Initializes in-memory variables.
277

278
    """
279
    obj.writable = writable
280
    obj.ops_iter = None
281
    obj.cur_opctx = None
282

    
283
    # Read-only jobs are not processed and therefore don't need a lock
284
    if writable:
285
      obj.processor_lock = threading.Lock()
286
    else:
287
      obj.processor_lock = None
288

    
289
  def __repr__(self):
290
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
291
              "id=%s" % self.id,
292
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
293

    
294
    return "<%s at %#x>" % (" ".join(status), id(self))
295

    
296
  @classmethod
297
  def Restore(cls, queue, state, writable, archived):
298
    """Restore a _QueuedJob from serialized state:
299

300
    @type queue: L{JobQueue}
301
    @param queue: to which queue the restored job belongs
302
    @type state: dict
303
    @param state: the serialized state
304
    @type writable: bool
305
    @param writable: Whether job can be modified
306
    @type archived: bool
307
    @param archived: Whether job was already archived
308
    @rtype: _JobQueue
309
    @return: the restored _JobQueue instance
310

311
    """
312
    obj = _QueuedJob.__new__(cls)
313
    obj.queue = queue
314
    obj.id = int(state["id"])
315
    obj.received_timestamp = state.get("received_timestamp", None)
316
    obj.start_timestamp = state.get("start_timestamp", None)
317
    obj.end_timestamp = state.get("end_timestamp", None)
318
    obj.archived = archived
319

    
320
    obj.ops = []
321
    obj.log_serial = 0
322
    for op_state in state["ops"]:
323
      op = _QueuedOpCode.Restore(op_state)
324
      for log_entry in op.log:
325
        obj.log_serial = max(obj.log_serial, log_entry[0])
326
      obj.ops.append(op)
327

    
328
    cls._InitInMemory(obj, writable)
329

    
330
    return obj
331

    
332
  def Serialize(self):
333
    """Serialize the _JobQueue instance.
334

335
    @rtype: dict
336
    @return: the serialized state
337

338
    """
339
    return {
340
      "id": self.id,
341
      "ops": [op.Serialize() for op in self.ops],
342
      "start_timestamp": self.start_timestamp,
343
      "end_timestamp": self.end_timestamp,
344
      "received_timestamp": self.received_timestamp,
345
      }
346

    
347
  def CalcStatus(self):
348
    """Compute the status of this job.
349

350
    This function iterates over all the _QueuedOpCodes in the job and
351
    based on their status, computes the job status.
352

353
    The algorithm is:
354
      - if we find a cancelled, or finished with error, the job
355
        status will be the same
356
      - otherwise, the last opcode with the status one of:
357
          - waitlock
358
          - canceling
359
          - running
360

361
        will determine the job status
362

363
      - otherwise, it means either all opcodes are queued, or success,
364
        and the job status will be the same
365

366
    @return: the job status
367

368
    """
369
    status = constants.JOB_STATUS_QUEUED
370

    
371
    all_success = True
372
    for op in self.ops:
373
      if op.status == constants.OP_STATUS_SUCCESS:
374
        continue
375

    
376
      all_success = False
377

    
378
      if op.status == constants.OP_STATUS_QUEUED:
379
        pass
380
      elif op.status == constants.OP_STATUS_WAITING:
381
        status = constants.JOB_STATUS_WAITING
382
      elif op.status == constants.OP_STATUS_RUNNING:
383
        status = constants.JOB_STATUS_RUNNING
384
      elif op.status == constants.OP_STATUS_CANCELING:
385
        status = constants.JOB_STATUS_CANCELING
386
        break
387
      elif op.status == constants.OP_STATUS_ERROR:
388
        status = constants.JOB_STATUS_ERROR
389
        # The whole job fails if one opcode failed
390
        break
391
      elif op.status == constants.OP_STATUS_CANCELED:
392
        status = constants.OP_STATUS_CANCELED
393
        break
394

    
395
    if all_success:
396
      status = constants.JOB_STATUS_SUCCESS
397

    
398
    return status
399

    
400
  def CalcPriority(self):
401
    """Gets the current priority for this job.
402

403
    Only unfinished opcodes are considered. When all are done, the default
404
    priority is used.
405

406
    @rtype: int
407

408
    """
409
    priorities = [op.priority for op in self.ops
410
                  if op.status not in constants.OPS_FINALIZED]
411

    
412
    if not priorities:
413
      # All opcodes are done, assume default priority
414
      return constants.OP_PRIO_DEFAULT
415

    
416
    return min(priorities)
417

    
418
  def GetLogEntries(self, newer_than):
419
    """Selectively returns the log entries.
420

421
    @type newer_than: None or int
422
    @param newer_than: if this is None, return all log entries,
423
        otherwise return only the log entries with serial higher
424
        than this value
425
    @rtype: list
426
    @return: the list of the log entries selected
427

428
    """
429
    if newer_than is None:
430
      serial = -1
431
    else:
432
      serial = newer_than
433

    
434
    entries = []
435
    for op in self.ops:
436
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
437

    
438
    return entries
439

    
440
  def GetInfo(self, fields):
441
    """Returns information about a job.
442

443
    @type fields: list
444
    @param fields: names of fields to return
445
    @rtype: list
446
    @return: list with one element for each field
447
    @raise errors.OpExecError: when an invalid field
448
        has been passed
449

450
    """
451
    return _SimpleJobQuery(fields)(self)
452

    
453
  def MarkUnfinishedOps(self, status, result):
454
    """Mark unfinished opcodes with a given status and result.
455

456
    This is an utility function for marking all running or waiting to
457
    be run opcodes with a given status. Opcodes which are already
458
    finalised are not changed.
459

460
    @param status: a given opcode status
461
    @param result: the opcode result
462

463
    """
464
    not_marked = True
465
    for op in self.ops:
466
      if op.status in constants.OPS_FINALIZED:
467
        assert not_marked, "Finalized opcodes found after non-finalized ones"
468
        continue
469
      op.status = status
470
      op.result = result
471
      not_marked = False
472

    
473
  def Finalize(self):
474
    """Marks the job as finalized.
475

476
    """
477
    self.end_timestamp = TimeStampNow()
478

    
479
  def Cancel(self):
480
    """Marks job as canceled/-ing if possible.
481

482
    @rtype: tuple; (bool, string)
483
    @return: Boolean describing whether job was successfully canceled or marked
484
      as canceling and a text message
485

486
    """
487
    status = self.CalcStatus()
488

    
489
    if status == constants.JOB_STATUS_QUEUED:
490
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
491
                             "Job canceled by request")
492
      self.Finalize()
493
      return (True, "Job %s canceled" % self.id)
494

    
495
    elif status == constants.JOB_STATUS_WAITING:
496
      # The worker will notice the new status and cancel the job
497
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
498
      return (True, "Job %s will be canceled" % self.id)
499

    
500
    else:
501
      logging.debug("Job %s is no longer waiting in the queue", self.id)
502
      return (False, "Job %s is no longer waiting in the queue" % self.id)
503

    
504
  def ChangePriority(self, priority):
505
    """Changes the job priority.
506

507
    @type priority: int
508
    @param priority: New priority
509
    @rtype: tuple; (bool, string)
510
    @return: Boolean describing whether job's priority was successfully changed
511
      and a text message
512

513
    """
514
    status = self.CalcStatus()
515

    
516
    if status in constants.JOBS_FINALIZED:
517
      return (False, "Job %s is finished" % self.id)
518
    elif status == constants.JOB_STATUS_CANCELING:
519
      return (False, "Job %s is cancelling" % self.id)
520
    else:
521
      assert status in (constants.JOB_STATUS_QUEUED,
522
                        constants.JOB_STATUS_WAITING,
523
                        constants.JOB_STATUS_RUNNING)
524

    
525
      changed = False
526
      for op in self.ops:
527
        if (op.status == constants.OP_STATUS_RUNNING or
528
            op.status in constants.OPS_FINALIZED):
529
          assert not changed, \
530
            ("Found opcode for which priority should not be changed after"
531
             " priority has been changed for previous opcodes")
532
          continue
533

    
534
        assert op.status in (constants.OP_STATUS_QUEUED,
535
                             constants.OP_STATUS_WAITING)
536

    
537
        changed = True
538

    
539
        # Set new priority (doesn't modify opcode input)
540
        op.priority = priority
541

    
542
      if changed:
543
        return (True, ("Priorities of pending opcodes for job %s have been"
544
                       " changed to %s" % (self.id, priority)))
545
      else:
546
        return (False, "Job %s had no pending opcodes" % self.id)
547

    
548

    
549
class _OpExecCallbacks(mcpu.OpExecCbBase):
550
  def __init__(self, queue, job, op):
551
    """Initializes this class.
552

553
    @type queue: L{JobQueue}
554
    @param queue: Job queue
555
    @type job: L{_QueuedJob}
556
    @param job: Job object
557
    @type op: L{_QueuedOpCode}
558
    @param op: OpCode
559

560
    """
561
    assert queue, "Queue is missing"
562
    assert job, "Job is missing"
563
    assert op, "Opcode is missing"
564

    
565
    self._queue = queue
566
    self._job = job
567
    self._op = op
568

    
569
  def _CheckCancel(self):
570
    """Raises an exception to cancel the job if asked to.
571

572
    """
573
    # Cancel here if we were asked to
574
    if self._op.status == constants.OP_STATUS_CANCELING:
575
      logging.debug("Canceling opcode")
576
      raise CancelJob()
577

    
578
    # See if queue is shutting down
579
    if not self._queue.AcceptingJobsUnlocked():
580
      logging.debug("Queue is shutting down")
581
      raise QueueShutdown()
582

    
583
  @locking.ssynchronized(_QUEUE, shared=1)
584
  def NotifyStart(self):
585
    """Mark the opcode as running, not lock-waiting.
586

587
    This is called from the mcpu code as a notifier function, when the LU is
588
    finally about to start the Exec() method. Of course, to have end-user
589
    visible results, the opcode must be initially (before calling into
590
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
591

592
    """
593
    assert self._op in self._job.ops
594
    assert self._op.status in (constants.OP_STATUS_WAITING,
595
                               constants.OP_STATUS_CANCELING)
596

    
597
    # Cancel here if we were asked to
598
    self._CheckCancel()
599

    
600
    logging.debug("Opcode is now running")
601

    
602
    self._op.status = constants.OP_STATUS_RUNNING
603
    self._op.exec_timestamp = TimeStampNow()
604

    
605
    # And finally replicate the job status
606
    self._queue.UpdateJobUnlocked(self._job)
607

    
608
  @locking.ssynchronized(_QUEUE, shared=1)
609
  def _AppendFeedback(self, timestamp, log_type, log_msg):
610
    """Internal feedback append function, with locks
611

612
    """
613
    self._job.log_serial += 1
614
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
615
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
616

    
617
  def Feedback(self, *args):
618
    """Append a log entry.
619

620
    """
621
    assert len(args) < 3
622

    
623
    if len(args) == 1:
624
      log_type = constants.ELOG_MESSAGE
625
      log_msg = args[0]
626
    else:
627
      (log_type, log_msg) = args
628

    
629
    # The time is split to make serialization easier and not lose
630
    # precision.
631
    timestamp = utils.SplitTime(time.time())
632
    self._AppendFeedback(timestamp, log_type, log_msg)
633

    
634
  def CurrentPriority(self):
635
    """Returns current priority for opcode.
636

637
    """
638
    assert self._op.status in (constants.OP_STATUS_WAITING,
639
                               constants.OP_STATUS_CANCELING)
640

    
641
    # Cancel here if we were asked to
642
    self._CheckCancel()
643

    
644
    return self._op.priority
645

    
646
  def SubmitManyJobs(self, jobs):
647
    """Submits jobs for processing.
648

649
    See L{JobQueue.SubmitManyJobs}.
650

651
    """
652
    # Locking is done in job queue
653
    return self._queue.SubmitManyJobs(jobs)
654

    
655

    
656
class _JobChangesChecker(object):
657
  def __init__(self, fields, prev_job_info, prev_log_serial):
658
    """Initializes this class.
659

660
    @type fields: list of strings
661
    @param fields: Fields requested by LUXI client
662
    @type prev_job_info: string
663
    @param prev_job_info: previous job info, as passed by the LUXI client
664
    @type prev_log_serial: string
665
    @param prev_log_serial: previous job serial, as passed by the LUXI client
666

667
    """
668
    self._squery = _SimpleJobQuery(fields)
669
    self._prev_job_info = prev_job_info
670
    self._prev_log_serial = prev_log_serial
671

    
672
  def __call__(self, job):
673
    """Checks whether job has changed.
674

675
    @type job: L{_QueuedJob}
676
    @param job: Job object
677

678
    """
679
    assert not job.writable, "Expected read-only job"
680

    
681
    status = job.CalcStatus()
682
    job_info = self._squery(job)
683
    log_entries = job.GetLogEntries(self._prev_log_serial)
684

    
685
    # Serializing and deserializing data can cause type changes (e.g. from
686
    # tuple to list) or precision loss. We're doing it here so that we get
687
    # the same modifications as the data received from the client. Without
688
    # this, the comparison afterwards might fail without the data being
689
    # significantly different.
690
    # TODO: we just deserialized from disk, investigate how to make sure that
691
    # the job info and log entries are compatible to avoid this further step.
692
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
693
    # efficient, though floats will be tricky
694
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
695
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
696

    
697
    # Don't even try to wait if the job is no longer running, there will be
698
    # no changes.
699
    if (status not in (constants.JOB_STATUS_QUEUED,
700
                       constants.JOB_STATUS_RUNNING,
701
                       constants.JOB_STATUS_WAITING) or
702
        job_info != self._prev_job_info or
703
        (log_entries and self._prev_log_serial != log_entries[0][0])):
704
      logging.debug("Job %s changed", job.id)
705
      return (job_info, log_entries)
706

    
707
    return None
708

    
709

    
710
class _JobFileChangesWaiter(object):
711
  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
712
    """Initializes this class.
713

714
    @type filename: string
715
    @param filename: Path to job file
716
    @raises errors.InotifyError: if the notifier cannot be setup
717

718
    """
719
    self._wm = _inotify_wm_cls()
720
    self._inotify_handler = \
721
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
722
    self._notifier = \
723
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
724
    try:
725
      self._inotify_handler.enable()
726
    except Exception:
727
      # pyinotify doesn't close file descriptors automatically
728
      self._notifier.stop()
729
      raise
730

    
731
  def _OnInotify(self, notifier_enabled):
732
    """Callback for inotify.
733

734
    """
735
    if not notifier_enabled:
736
      self._inotify_handler.enable()
737

    
738
  def Wait(self, timeout):
739
    """Waits for the job file to change.
740

741
    @type timeout: float
742
    @param timeout: Timeout in seconds
743
    @return: Whether there have been events
744

745
    """
746
    assert timeout >= 0
747
    have_events = self._notifier.check_events(timeout * 1000)
748
    if have_events:
749
      self._notifier.read_events()
750
    self._notifier.process_events()
751
    return have_events
752

    
753
  def Close(self):
754
    """Closes underlying notifier and its file descriptor.
755

756
    """
757
    self._notifier.stop()
758

    
759

    
760
class _JobChangesWaiter(object):
761
  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
762
    """Initializes this class.
763

764
    @type filename: string
765
    @param filename: Path to job file
766

767
    """
768
    self._filewaiter = None
769
    self._filename = filename
770
    self._waiter_cls = _waiter_cls
771

    
772
  def Wait(self, timeout):
773
    """Waits for a job to change.
774

775
    @type timeout: float
776
    @param timeout: Timeout in seconds
777
    @return: Whether there have been events
778

779
    """
780
    if self._filewaiter:
781
      return self._filewaiter.Wait(timeout)
782

    
783
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
784
    # If this point is reached, return immediately and let caller check the job
785
    # file again in case there were changes since the last check. This avoids a
786
    # race condition.
787
    self._filewaiter = self._waiter_cls(self._filename)
788

    
789
    return True
790

    
791
  def Close(self):
792
    """Closes underlying waiter.
793

794
    """
795
    if self._filewaiter:
796
      self._filewaiter.Close()
797

    
798

    
799
class _WaitForJobChangesHelper(object):
800
  """Helper class using inotify to wait for changes in a job file.
801

802
  This class takes a previous job status and serial, and alerts the client when
803
  the current job status has changed.
804

805
  """
806
  @staticmethod
807
  def _CheckForChanges(counter, job_load_fn, check_fn):
808
    if counter.next() > 0:
809
      # If this isn't the first check the job is given some more time to change
810
      # again. This gives better performance for jobs generating many
811
      # changes/messages.
812
      time.sleep(0.1)
813

    
814
    job = job_load_fn()
815
    if not job:
816
      raise errors.JobLost()
817

    
818
    result = check_fn(job)
819
    if result is None:
820
      raise utils.RetryAgain()
821

    
822
    return result
823

    
824
  def __call__(self, filename, job_load_fn,
825
               fields, prev_job_info, prev_log_serial, timeout,
826
               _waiter_cls=_JobChangesWaiter):
827
    """Waits for changes on a job.
828

829
    @type filename: string
830
    @param filename: File on which to wait for changes
831
    @type job_load_fn: callable
832
    @param job_load_fn: Function to load job
833
    @type fields: list of strings
834
    @param fields: Which fields to check for changes
835
    @type prev_job_info: list or None
836
    @param prev_job_info: Last job information returned
837
    @type prev_log_serial: int
838
    @param prev_log_serial: Last job message serial number
839
    @type timeout: float
840
    @param timeout: maximum time to wait in seconds
841

842
    """
843
    counter = itertools.count()
844
    try:
845
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
846
      waiter = _waiter_cls(filename)
847
      try:
848
        return utils.Retry(compat.partial(self._CheckForChanges,
849
                                          counter, job_load_fn, check_fn),
850
                           utils.RETRY_REMAINING_TIME, timeout,
851
                           wait_fn=waiter.Wait)
852
      finally:
853
        waiter.Close()
854
    except errors.JobLost:
855
      return None
856
    except utils.RetryTimeout:
857
      return constants.JOB_NOTCHANGED
858

    
859

    
860
def _EncodeOpError(err):
861
  """Encodes an error which occurred while processing an opcode.
862

863
  """
864
  if isinstance(err, errors.GenericError):
865
    to_encode = err
866
  else:
867
    to_encode = errors.OpExecError(str(err))
868

    
869
  return errors.EncodeException(to_encode)
870

    
871

    
872
class _TimeoutStrategyWrapper:
873
  def __init__(self, fn):
874
    """Initializes this class.
875

876
    """
877
    self._fn = fn
878
    self._next = None
879

    
880
  def _Advance(self):
881
    """Gets the next timeout if necessary.
882

883
    """
884
    if self._next is None:
885
      self._next = self._fn()
886

    
887
  def Peek(self):
888
    """Returns the next timeout.
889

890
    """
891
    self._Advance()
892
    return self._next
893

    
894
  def Next(self):
895
    """Returns the current timeout and advances the internal state.
896

897
    """
898
    self._Advance()
899
    result = self._next
900
    self._next = None
901
    return result
902

    
903

    
904
class _OpExecContext:
905
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
906
    """Initializes this class.
907

908
    """
909
    self.op = op
910
    self.index = index
911
    self.log_prefix = log_prefix
912
    self.summary = op.input.Summary()
913

    
914
    # Create local copy to modify
915
    if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
916
      self.jobdeps = op.input.depends[:]
917
    else:
918
      self.jobdeps = None
919

    
920
    self._timeout_strategy_factory = timeout_strategy_factory
921
    self._ResetTimeoutStrategy()
922

    
923
  def _ResetTimeoutStrategy(self):
924
    """Creates a new timeout strategy.
925

926
    """
927
    self._timeout_strategy = \
928
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
929

    
930
  def CheckPriorityIncrease(self):
931
    """Checks whether priority can and should be increased.
932

933
    Called when locks couldn't be acquired.
934

935
    """
936
    op = self.op
937

    
938
    # Exhausted all retries and next round should not use blocking acquire
939
    # for locks?
940
    if (self._timeout_strategy.Peek() is None and
941
        op.priority > constants.OP_PRIO_HIGHEST):
942
      logging.debug("Increasing priority")
943
      op.priority -= 1
944
      self._ResetTimeoutStrategy()
945
      return True
946

    
947
    return False
948

    
949
  def GetNextLockTimeout(self):
950
    """Returns the next lock acquire timeout.
951

952
    """
953
    return self._timeout_strategy.Next()
954

    
955

    
956
class _JobProcessor(object):
957
  (DEFER,
958
   WAITDEP,
959
   FINISHED) = range(1, 4)
960

    
961
  def __init__(self, queue, opexec_fn, job,
962
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
963
    """Initializes this class.
964

965
    """
966
    self.queue = queue
967
    self.opexec_fn = opexec_fn
968
    self.job = job
969
    self._timeout_strategy_factory = _timeout_strategy_factory
970

    
971
  @staticmethod
972
  def _FindNextOpcode(job, timeout_strategy_factory):
973
    """Locates the next opcode to run.
974

975
    @type job: L{_QueuedJob}
976
    @param job: Job object
977
    @param timeout_strategy_factory: Callable to create new timeout strategy
978

979
    """
980
    # Create some sort of a cache to speed up locating next opcode for future
981
    # lookups
982
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
983
    # pending and one for processed ops.
984
    if job.ops_iter is None:
985
      job.ops_iter = enumerate(job.ops)
986

    
987
    # Find next opcode to run
988
    while True:
989
      try:
990
        (idx, op) = job.ops_iter.next()
991
      except StopIteration:
992
        raise errors.ProgrammerError("Called for a finished job")
993

    
994
      if op.status == constants.OP_STATUS_RUNNING:
995
        # Found an opcode already marked as running
996
        raise errors.ProgrammerError("Called for job marked as running")
997

    
998
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
999
                             timeout_strategy_factory)
1000

    
1001
      if op.status not in constants.OPS_FINALIZED:
1002
        return opctx
1003

    
1004
      # This is a job that was partially completed before master daemon
1005
      # shutdown, so it can be expected that some opcodes are already
1006
      # completed successfully (if any did error out, then the whole job
1007
      # should have been aborted and not resubmitted for processing).
1008
      logging.info("%s: opcode %s already processed, skipping",
1009
                   opctx.log_prefix, opctx.summary)
1010

    
1011
  @staticmethod
1012
  def _MarkWaitlock(job, op):
1013
    """Marks an opcode as waiting for locks.
1014

1015
    The job's start timestamp is also set if necessary.
1016

1017
    @type job: L{_QueuedJob}
1018
    @param job: Job object
1019
    @type op: L{_QueuedOpCode}
1020
    @param op: Opcode object
1021

1022
    """
1023
    assert op in job.ops
1024
    assert op.status in (constants.OP_STATUS_QUEUED,
1025
                         constants.OP_STATUS_WAITING)
1026

    
1027
    update = False
1028

    
1029
    op.result = None
1030

    
1031
    if op.status == constants.OP_STATUS_QUEUED:
1032
      op.status = constants.OP_STATUS_WAITING
1033
      update = True
1034

    
1035
    if op.start_timestamp is None:
1036
      op.start_timestamp = TimeStampNow()
1037
      update = True
1038

    
1039
    if job.start_timestamp is None:
1040
      job.start_timestamp = op.start_timestamp
1041
      update = True
1042

    
1043
    assert op.status == constants.OP_STATUS_WAITING
1044

    
1045
    return update
1046

    
1047
  @staticmethod
1048
  def _CheckDependencies(queue, job, opctx):
1049
    """Checks if an opcode has dependencies and if so, processes them.
1050

1051
    @type queue: L{JobQueue}
1052
    @param queue: Queue object
1053
    @type job: L{_QueuedJob}
1054
    @param job: Job object
1055
    @type opctx: L{_OpExecContext}
1056
    @param opctx: Opcode execution context
1057
    @rtype: bool
1058
    @return: Whether opcode will be re-scheduled by dependency tracker
1059

1060
    """
1061
    op = opctx.op
1062

    
1063
    result = False
1064

    
1065
    while opctx.jobdeps:
1066
      (dep_job_id, dep_status) = opctx.jobdeps[0]
1067

    
1068
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1069
                                                          dep_status)
1070
      assert ht.TNonEmptyString(depmsg), "No dependency message"
1071

    
1072
      logging.info("%s: %s", opctx.log_prefix, depmsg)
1073

    
1074
      if depresult == _JobDependencyManager.CONTINUE:
1075
        # Remove dependency and continue
1076
        opctx.jobdeps.pop(0)
1077

    
1078
      elif depresult == _JobDependencyManager.WAIT:
1079
        # Need to wait for notification, dependency tracker will re-add job
1080
        # to workerpool
1081
        result = True
1082
        break
1083

    
1084
      elif depresult == _JobDependencyManager.CANCEL:
1085
        # Job was cancelled, cancel this job as well
1086
        job.Cancel()
1087
        assert op.status == constants.OP_STATUS_CANCELING
1088
        break
1089

    
1090
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1091
                         _JobDependencyManager.ERROR):
1092
        # Job failed or there was an error, this job must fail
1093
        op.status = constants.OP_STATUS_ERROR
1094
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1095
        break
1096

    
1097
      else:
1098
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1099
                                     depresult)
1100

    
1101
    return result
1102

    
1103
  def _ExecOpCodeUnlocked(self, opctx):
1104
    """Processes one opcode and returns the result.
1105

1106
    """
1107
    op = opctx.op
1108

    
1109
    assert op.status in (constants.OP_STATUS_WAITING,
1110
                         constants.OP_STATUS_CANCELING)
1111

    
1112
    # The very last check if the job was cancelled before trying to execute
1113
    if op.status == constants.OP_STATUS_CANCELING:
1114
      return (constants.OP_STATUS_CANCELING, None)
1115

    
1116
    timeout = opctx.GetNextLockTimeout()
1117

    
1118
    try:
1119
      # Make sure not to hold queue lock while calling ExecOpCode
1120
      result = self.opexec_fn(op.input,
1121
                              _OpExecCallbacks(self.queue, self.job, op),
1122
                              timeout=timeout)
1123
    except mcpu.LockAcquireTimeout:
1124
      assert timeout is not None, "Received timeout for blocking acquire"
1125
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1126

    
1127
      assert op.status in (constants.OP_STATUS_WAITING,
1128
                           constants.OP_STATUS_CANCELING)
1129

    
1130
      # Was job cancelled while we were waiting for the lock?
1131
      if op.status == constants.OP_STATUS_CANCELING:
1132
        return (constants.OP_STATUS_CANCELING, None)
1133

    
1134
      # Queue is shutting down, return to queued
1135
      if not self.queue.AcceptingJobsUnlocked():
1136
        return (constants.OP_STATUS_QUEUED, None)
1137

    
1138
      # Stay in waitlock while trying to re-acquire lock
1139
      return (constants.OP_STATUS_WAITING, None)
1140
    except CancelJob:
1141
      logging.exception("%s: Canceling job", opctx.log_prefix)
1142
      assert op.status == constants.OP_STATUS_CANCELING
1143
      return (constants.OP_STATUS_CANCELING, None)
1144

    
1145
    except QueueShutdown:
1146
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1147

    
1148
      assert op.status == constants.OP_STATUS_WAITING
1149

    
1150
      # Job hadn't been started yet, so it should return to the queue
1151
      return (constants.OP_STATUS_QUEUED, None)
1152

    
1153
    except Exception, err: # pylint: disable=W0703
1154
      logging.exception("%s: Caught exception in %s",
1155
                        opctx.log_prefix, opctx.summary)
1156
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1157
    else:
1158
      logging.debug("%s: %s successful",
1159
                    opctx.log_prefix, opctx.summary)
1160
      return (constants.OP_STATUS_SUCCESS, result)
1161

    
1162
  def __call__(self, _nextop_fn=None):
1163
    """Continues execution of a job.
1164

1165
    @param _nextop_fn: Callback function for tests
1166
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1167
      be deferred and C{WAITDEP} if the dependency manager
1168
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1169

1170
    """
1171
    queue = self.queue
1172
    job = self.job
1173

    
1174
    logging.debug("Processing job %s", job.id)
1175

    
1176
    queue.acquire(shared=1)
1177
    try:
1178
      opcount = len(job.ops)
1179

    
1180
      assert job.writable, "Expected writable job"
1181

    
1182
      # Don't do anything for finalized jobs
1183
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1184
        return self.FINISHED
1185

    
1186
      # Is a previous opcode still pending?
1187
      if job.cur_opctx:
1188
        opctx = job.cur_opctx
1189
        job.cur_opctx = None
1190
      else:
1191
        if __debug__ and _nextop_fn:
1192
          _nextop_fn()
1193
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1194

    
1195
      op = opctx.op
1196

    
1197
      # Consistency check
1198
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1199
                                     constants.OP_STATUS_CANCELING)
1200
                        for i in job.ops[opctx.index + 1:])
1201

    
1202
      assert op.status in (constants.OP_STATUS_QUEUED,
1203
                           constants.OP_STATUS_WAITING,
1204
                           constants.OP_STATUS_CANCELING)
1205

    
1206
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1207
              op.priority >= constants.OP_PRIO_HIGHEST)
1208

    
1209
      waitjob = None
1210

    
1211
      if op.status != constants.OP_STATUS_CANCELING:
1212
        assert op.status in (constants.OP_STATUS_QUEUED,
1213
                             constants.OP_STATUS_WAITING)
1214

    
1215
        # Prepare to start opcode
1216
        if self._MarkWaitlock(job, op):
1217
          # Write to disk
1218
          queue.UpdateJobUnlocked(job)
1219

    
1220
        assert op.status == constants.OP_STATUS_WAITING
1221
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1222
        assert job.start_timestamp and op.start_timestamp
1223
        assert waitjob is None
1224

    
1225
        # Check if waiting for a job is necessary
1226
        waitjob = self._CheckDependencies(queue, job, opctx)
1227

    
1228
        assert op.status in (constants.OP_STATUS_WAITING,
1229
                             constants.OP_STATUS_CANCELING,
1230
                             constants.OP_STATUS_ERROR)
1231

    
1232
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1233
                                         constants.OP_STATUS_ERROR)):
1234
          logging.info("%s: opcode %s waiting for locks",
1235
                       opctx.log_prefix, opctx.summary)
1236

    
1237
          assert not opctx.jobdeps, "Not all dependencies were removed"
1238

    
1239
          queue.release()
1240
          try:
1241
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1242
          finally:
1243
            queue.acquire(shared=1)
1244

    
1245
          op.status = op_status
1246
          op.result = op_result
1247

    
1248
          assert not waitjob
1249

    
1250
        if op.status in (constants.OP_STATUS_WAITING,
1251
                         constants.OP_STATUS_QUEUED):
1252
          # waiting: Couldn't get locks in time
1253
          # queued: Queue is shutting down
1254
          assert not op.end_timestamp
1255
        else:
1256
          # Finalize opcode
1257
          op.end_timestamp = TimeStampNow()
1258

    
1259
          if op.status == constants.OP_STATUS_CANCELING:
1260
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1261
                                  for i in job.ops[opctx.index:])
1262
          else:
1263
            assert op.status in constants.OPS_FINALIZED
1264

    
1265
      if op.status == constants.OP_STATUS_QUEUED:
1266
        # Queue is shutting down
1267
        assert not waitjob
1268

    
1269
        finalize = False
1270

    
1271
        # Reset context
1272
        job.cur_opctx = None
1273

    
1274
        # In no case must the status be finalized here
1275
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1276

    
1277
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1278
        finalize = False
1279

    
1280
        if not waitjob and opctx.CheckPriorityIncrease():
1281
          # Priority was changed, need to update on-disk file
1282
          queue.UpdateJobUnlocked(job)
1283

    
1284
        # Keep around for another round
1285
        job.cur_opctx = opctx
1286

    
1287
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1288
                op.priority >= constants.OP_PRIO_HIGHEST)
1289

    
1290
        # In no case must the status be finalized here
1291
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1292

    
1293
      else:
1294
        # Ensure all opcodes so far have been successful
1295
        assert (opctx.index == 0 or
1296
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1297
                           for i in job.ops[:opctx.index]))
1298

    
1299
        # Reset context
1300
        job.cur_opctx = None
1301

    
1302
        if op.status == constants.OP_STATUS_SUCCESS:
1303
          finalize = False
1304

    
1305
        elif op.status == constants.OP_STATUS_ERROR:
1306
          # Ensure failed opcode has an exception as its result
1307
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1308

    
1309
          to_encode = errors.OpExecError("Preceding opcode failed")
1310
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1311
                                _EncodeOpError(to_encode))
1312
          finalize = True
1313

    
1314
          # Consistency check
1315
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1316
                            errors.GetEncodedError(i.result)
1317
                            for i in job.ops[opctx.index:])
1318

    
1319
        elif op.status == constants.OP_STATUS_CANCELING:
1320
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1321
                                "Job canceled by request")
1322
          finalize = True
1323

    
1324
        else:
1325
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1326

    
1327
        if opctx.index == (opcount - 1):
1328
          # Finalize on last opcode
1329
          finalize = True
1330

    
1331
        if finalize:
1332
          # All opcodes have been run, finalize job
1333
          job.Finalize()
1334

    
1335
        # Write to disk. If the job status is final, this is the final write
1336
        # allowed. Once the file has been written, it can be archived anytime.
1337
        queue.UpdateJobUnlocked(job)
1338

    
1339
        assert not waitjob
1340

    
1341
        if finalize:
1342
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1343
          return self.FINISHED
1344

    
1345
      assert not waitjob or queue.depmgr.JobWaiting(job)
1346

    
1347
      if waitjob:
1348
        return self.WAITDEP
1349
      else:
1350
        return self.DEFER
1351
    finally:
1352
      assert job.writable, "Job became read-only while being processed"
1353
      queue.release()
1354

    
1355

    
1356
def _EvaluateJobProcessorResult(depmgr, job, result):
1357
  """Looks at a result from L{_JobProcessor} for a job.
1358

1359
  To be used in a L{_JobQueueWorker}.
1360

1361
  """
1362
  if result == _JobProcessor.FINISHED:
1363
    # Notify waiting jobs
1364
    depmgr.NotifyWaiters(job.id)
1365

    
1366
  elif result == _JobProcessor.DEFER:
1367
    # Schedule again
1368
    raise workerpool.DeferTask(priority=job.CalcPriority())
1369

    
1370
  elif result == _JobProcessor.WAITDEP:
1371
    # No-op, dependency manager will re-schedule
1372
    pass
1373

    
1374
  else:
1375
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1376
                                 (result, ))
1377

    
1378

    
1379
class _JobQueueWorker(workerpool.BaseWorker):
1380
  """The actual job workers.
1381

1382
  """
1383
  def RunTask(self, job): # pylint: disable=W0221
1384
    """Job executor.
1385

1386
    @type job: L{_QueuedJob}
1387
    @param job: the job to be processed
1388

1389
    """
1390
    assert job.writable, "Expected writable job"
1391

    
1392
    # Ensure only one worker is active on a single job. If a job registers for
1393
    # a dependency job, and the other job notifies before the first worker is
1394
    # done, the job can end up in the tasklist more than once.
1395
    job.processor_lock.acquire()
1396
    try:
1397
      return self._RunTaskInner(job)
1398
    finally:
1399
      job.processor_lock.release()
1400

    
1401
  def _RunTaskInner(self, job):
1402
    """Executes a job.
1403

1404
    Must be called with per-job lock acquired.
1405

1406
    """
1407
    queue = job.queue
1408
    assert queue == self.pool.queue
1409

    
1410
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1411
    setname_fn(None)
1412

    
1413
    proc = mcpu.Processor(queue.context, job.id)
1414

    
1415
    # Create wrapper for setting thread name
1416
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1417
                                    proc.ExecOpCode)
1418

    
1419
    _EvaluateJobProcessorResult(queue.depmgr, job,
1420
                                _JobProcessor(queue, wrap_execop_fn, job)())
1421

    
1422
  @staticmethod
1423
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1424
    """Updates the worker thread name to include a short summary of the opcode.
1425

1426
    @param setname_fn: Callable setting worker thread name
1427
    @param execop_fn: Callable for executing opcode (usually
1428
                      L{mcpu.Processor.ExecOpCode})
1429

1430
    """
1431
    setname_fn(op)
1432
    try:
1433
      return execop_fn(op, *args, **kwargs)
1434
    finally:
1435
      setname_fn(None)
1436

    
1437
  @staticmethod
1438
  def _GetWorkerName(job, op):
1439
    """Sets the worker thread name.
1440

1441
    @type job: L{_QueuedJob}
1442
    @type op: L{opcodes.OpCode}
1443

1444
    """
1445
    parts = ["Job%s" % job.id]
1446

    
1447
    if op:
1448
      parts.append(op.TinySummary())
1449

    
1450
    return "/".join(parts)
1451

    
1452

    
1453
class _JobQueueWorkerPool(workerpool.WorkerPool):
1454
  """Simple class implementing a job-processing workerpool.
1455

1456
  """
1457
  def __init__(self, queue):
1458
    super(_JobQueueWorkerPool, self).__init__("Jq",
1459
                                              JOBQUEUE_THREADS,
1460
                                              _JobQueueWorker)
1461
    self.queue = queue
1462

    
1463

    
1464
class _JobDependencyManager:
1465
  """Keeps track of job dependencies.
1466

1467
  """
1468
  (WAIT,
1469
   ERROR,
1470
   CANCEL,
1471
   CONTINUE,
1472
   WRONGSTATUS) = range(1, 6)
1473

    
1474
  def __init__(self, getstatus_fn, enqueue_fn):
1475
    """Initializes this class.
1476

1477
    """
1478
    self._getstatus_fn = getstatus_fn
1479
    self._enqueue_fn = enqueue_fn
1480

    
1481
    self._waiters = {}
1482
    self._lock = locking.SharedLock("JobDepMgr")
1483

    
1484
  @locking.ssynchronized(_LOCK, shared=1)
1485
  def GetLockInfo(self, requested): # pylint: disable=W0613
1486
    """Retrieves information about waiting jobs.
1487

1488
    @type requested: set
1489
    @param requested: Requested information, see C{query.LQ_*}
1490

1491
    """
1492
    # No need to sort here, that's being done by the lock manager and query
1493
    # library. There are no priorities for notifying jobs, hence all show up as
1494
    # one item under "pending".
1495
    return [("job/%s" % job_id, None, None,
1496
             [("job", [job.id for job in waiters])])
1497
            for job_id, waiters in self._waiters.items()
1498
            if waiters]
1499

    
1500
  @locking.ssynchronized(_LOCK, shared=1)
1501
  def JobWaiting(self, job):
1502
    """Checks if a job is waiting.
1503

1504
    """
1505
    return compat.any(job in jobs
1506
                      for jobs in self._waiters.values())
1507

    
1508
  @locking.ssynchronized(_LOCK)
1509
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1510
    """Checks if a dependency job has the requested status.
1511

1512
    If the other job is not yet in a finalized status, the calling job will be
1513
    notified (re-added to the workerpool) at a later point.
1514

1515
    @type job: L{_QueuedJob}
1516
    @param job: Job object
1517
    @type dep_job_id: int
1518
    @param dep_job_id: ID of dependency job
1519
    @type dep_status: list
1520
    @param dep_status: Required status
1521

1522
    """
1523
    assert ht.TJobId(job.id)
1524
    assert ht.TJobId(dep_job_id)
1525
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1526

    
1527
    if job.id == dep_job_id:
1528
      return (self.ERROR, "Job can't depend on itself")
1529

    
1530
    # Get status of dependency job
1531
    try:
1532
      status = self._getstatus_fn(dep_job_id)
1533
    except errors.JobLost, err:
1534
      return (self.ERROR, "Dependency error: %s" % err)
1535

    
1536
    assert status in constants.JOB_STATUS_ALL
1537

    
1538
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1539

    
1540
    if status not in constants.JOBS_FINALIZED:
1541
      # Register for notification and wait for job to finish
1542
      job_id_waiters.add(job)
1543
      return (self.WAIT,
1544
              "Need to wait for job %s, wanted status '%s'" %
1545
              (dep_job_id, dep_status))
1546

    
1547
    # Remove from waiters list
1548
    if job in job_id_waiters:
1549
      job_id_waiters.remove(job)
1550

    
1551
    if (status == constants.JOB_STATUS_CANCELED and
1552
        constants.JOB_STATUS_CANCELED not in dep_status):
1553
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1554

    
1555
    elif not dep_status or status in dep_status:
1556
      return (self.CONTINUE,
1557
              "Dependency job %s finished with status '%s'" %
1558
              (dep_job_id, status))
1559

    
1560
    else:
1561
      return (self.WRONGSTATUS,
1562
              "Dependency job %s finished with status '%s',"
1563
              " not one of '%s' as required" %
1564
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1565

    
1566
  def _RemoveEmptyWaitersUnlocked(self):
1567
    """Remove all jobs without actual waiters.
1568

1569
    """
1570
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1571
                   if not waiters]:
1572
      del self._waiters[job_id]
1573

    
1574
  def NotifyWaiters(self, job_id):
1575
    """Notifies all jobs waiting for a certain job ID.
1576

1577
    @attention: Do not call until L{CheckAndRegister} returned a status other
1578
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1579
    @type job_id: int
1580
    @param job_id: Job ID
1581

1582
    """
1583
    assert ht.TJobId(job_id)
1584

    
1585
    self._lock.acquire()
1586
    try:
1587
      self._RemoveEmptyWaitersUnlocked()
1588

    
1589
      jobs = self._waiters.pop(job_id, None)
1590
    finally:
1591
      self._lock.release()
1592

    
1593
    if jobs:
1594
      # Re-add jobs to workerpool
1595
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1596
                    len(jobs), job_id)
1597
      self._enqueue_fn(jobs)
1598

    
1599

    
1600
def _RequireOpenQueue(fn):
1601
  """Decorator for "public" functions.
1602

1603
  This function should be used for all 'public' functions. That is,
1604
  functions usually called from other classes. Note that this should
1605
  be applied only to methods (not plain functions), since it expects
1606
  that the decorated function is called with a first argument that has
1607
  a '_queue_filelock' argument.
1608

1609
  @warning: Use this decorator only after locking.ssynchronized
1610

1611
  Example::
1612
    @locking.ssynchronized(_LOCK)
1613
    @_RequireOpenQueue
1614
    def Example(self):
1615
      pass
1616

1617
  """
1618
  def wrapper(self, *args, **kwargs):
1619
    # pylint: disable=W0212
1620
    assert self._queue_filelock is not None, "Queue should be open"
1621
    return fn(self, *args, **kwargs)
1622
  return wrapper
1623

    
1624

    
1625
def _RequireNonDrainedQueue(fn):
1626
  """Decorator checking for a non-drained queue.
1627

1628
  To be used with functions submitting new jobs.
1629

1630
  """
1631
  def wrapper(self, *args, **kwargs):
1632
    """Wrapper function.
1633

1634
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1635

1636
    """
1637
    # Ok when sharing the big job queue lock, as the drain file is created when
1638
    # the lock is exclusive.
1639
    # Needs access to protected member, pylint: disable=W0212
1640
    if self._drained:
1641
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1642

    
1643
    if not self._accepting_jobs:
1644
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1645

    
1646
    return fn(self, *args, **kwargs)
1647
  return wrapper
1648

    
1649

    
1650
class JobQueue(object):
1651
  """Queue used to manage the jobs.
1652

1653
  """
1654
  def __init__(self, context):
1655
    """Constructor for JobQueue.
1656

1657
    The constructor will initialize the job queue object and then
1658
    start loading the current jobs from disk, either for starting them
1659
    (if they were queue) or for aborting them (if they were already
1660
    running).
1661

1662
    @type context: GanetiContext
1663
    @param context: the context object for access to the configuration
1664
        data and other ganeti objects
1665

1666
    """
1667
    self.context = context
1668
    self._memcache = weakref.WeakValueDictionary()
1669
    self._my_hostname = netutils.Hostname.GetSysName()
1670

    
1671
    # The Big JobQueue lock. If a code block or method acquires it in shared
1672
    # mode safe it must guarantee concurrency with all the code acquiring it in
1673
    # shared mode, including itself. In order not to acquire it at all
1674
    # concurrency must be guaranteed with all code acquiring it in shared mode
1675
    # and all code acquiring it exclusively.
1676
    self._lock = locking.SharedLock("JobQueue")
1677

    
1678
    self.acquire = self._lock.acquire
1679
    self.release = self._lock.release
1680

    
1681
    # Accept jobs by default
1682
    self._accepting_jobs = True
1683

    
1684
    # Initialize the queue, and acquire the filelock.
1685
    # This ensures no other process is working on the job queue.
1686
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1687

    
1688
    # Read serial file
1689
    self._last_serial = jstore.ReadSerial()
1690
    assert self._last_serial is not None, ("Serial file was modified between"
1691
                                           " check in jstore and here")
1692

    
1693
    # Get initial list of nodes
1694
    self._nodes = dict((n.name, n.primary_ip)
1695
                       for n in self.context.cfg.GetAllNodesInfo().values()
1696
                       if n.master_candidate)
1697

    
1698
    # Remove master node
1699
    self._nodes.pop(self._my_hostname, None)
1700

    
1701
    # TODO: Check consistency across nodes
1702

    
1703
    self._queue_size = None
1704
    self._UpdateQueueSizeUnlocked()
1705
    assert ht.TInt(self._queue_size)
1706
    self._drained = jstore.CheckDrainFlag()
1707

    
1708
    # Job dependencies
1709
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1710
                                        self._EnqueueJobs)
1711
    self.context.glm.AddToLockMonitor(self.depmgr)
1712

    
1713
    # Setup worker pool
1714
    self._wpool = _JobQueueWorkerPool(self)
1715

    
1716
  def _PickupJobUnlocked(self, job_id):
1717
    """Load a job from the job queue
1718

1719
    Pick up a job that already is in the job queue and start/resume it.
1720

1721
    """
1722
    job = self._LoadJobUnlocked(job_id)
1723

    
1724
    if job is None:
1725
      logging.warning("Job %s could not be read", job_id)
1726
      return
1727

    
1728
    status = job.CalcStatus()
1729
    if status == constants.JOB_STATUS_QUEUED:
1730
      self._EnqueueJobsUnlocked([job])
1731
      logging.info("Restarting job %s", job.id)
1732

    
1733
    elif status in (constants.JOB_STATUS_RUNNING,
1734
                    constants.JOB_STATUS_WAITING,
1735
                    constants.JOB_STATUS_CANCELING):
1736
      logging.warning("Unfinished job %s found: %s", job.id, job)
1737

    
1738
      if status == constants.JOB_STATUS_WAITING:
1739
        job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1740
        self._EnqueueJobsUnlocked([job])
1741
        logging.info("Restarting job %s", job.id)
1742
      else:
1743
        to_encode = errors.OpExecError("Unclean master daemon shutdown")
1744
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1745
                              _EncodeOpError(to_encode))
1746
        job.Finalize()
1747

    
1748
    self.UpdateJobUnlocked(job)
1749

    
1750
  @locking.ssynchronized(_LOCK)
1751
  def PickupJob(self, job_id):
1752
    self._PickupJobUnlocked(job_id)
1753

    
1754
  def _GetRpc(self, address_list):
1755
    """Gets RPC runner with context.
1756

1757
    """
1758
    return rpc.JobQueueRunner(self.context, address_list)
1759

    
1760
  @locking.ssynchronized(_LOCK)
1761
  @_RequireOpenQueue
1762
  def AddNode(self, node):
1763
    """Register a new node with the queue.
1764

1765
    @type node: L{objects.Node}
1766
    @param node: the node object to be added
1767

1768
    """
1769
    node_name = node.name
1770
    assert node_name != self._my_hostname
1771

    
1772
    # Clean queue directory on added node
1773
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1774
    msg = result.fail_msg
1775
    if msg:
1776
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1777
                      node_name, msg)
1778

    
1779
    if not node.master_candidate:
1780
      # remove if existing, ignoring errors
1781
      self._nodes.pop(node_name, None)
1782
      # and skip the replication of the job ids
1783
      return
1784

    
1785
    # Upload the whole queue excluding archived jobs
1786
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1787

    
1788
    # Upload current serial file
1789
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1790

    
1791
    # Static address list
1792
    addrs = [node.primary_ip]
1793

    
1794
    for file_name in files:
1795
      # Read file content
1796
      content = utils.ReadFile(file_name)
1797

    
1798
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1799
                             file_name, content)
1800
      msg = result[node_name].fail_msg
1801
      if msg:
1802
        logging.error("Failed to upload file %s to node %s: %s",
1803
                      file_name, node_name, msg)
1804

    
1805
    # Set queue drained flag
1806
    result = \
1807
      self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1808
                                                       self._drained)
1809
    msg = result[node_name].fail_msg
1810
    if msg:
1811
      logging.error("Failed to set queue drained flag on node %s: %s",
1812
                    node_name, msg)
1813

    
1814
    self._nodes[node_name] = node.primary_ip
1815

    
1816
  @locking.ssynchronized(_LOCK)
1817
  @_RequireOpenQueue
1818
  def RemoveNode(self, node_name):
1819
    """Callback called when removing nodes from the cluster.
1820

1821
    @type node_name: str
1822
    @param node_name: the name of the node to remove
1823

1824
    """
1825
    self._nodes.pop(node_name, None)
1826

    
1827
  @staticmethod
1828
  def _CheckRpcResult(result, nodes, failmsg):
1829
    """Verifies the status of an RPC call.
1830

1831
    Since we aim to keep consistency should this node (the current
1832
    master) fail, we will log errors if our rpc fail, and especially
1833
    log the case when more than half of the nodes fails.
1834

1835
    @param result: the data as returned from the rpc call
1836
    @type nodes: list
1837
    @param nodes: the list of nodes we made the call to
1838
    @type failmsg: str
1839
    @param failmsg: the identifier to be used for logging
1840

1841
    """
1842
    failed = []
1843
    success = []
1844

    
1845
    for node in nodes:
1846
      msg = result[node].fail_msg
1847
      if msg:
1848
        failed.append(node)
1849
        logging.error("RPC call %s (%s) failed on node %s: %s",
1850
                      result[node].call, failmsg, node, msg)
1851
      else:
1852
        success.append(node)
1853

    
1854
    # +1 for the master node
1855
    if (len(success) + 1) < len(failed):
1856
      # TODO: Handle failing nodes
1857
      logging.error("More than half of the nodes failed")
1858

    
1859
  def _GetNodeIp(self):
1860
    """Helper for returning the node name/ip list.
1861

1862
    @rtype: (list, list)
1863
    @return: a tuple of two lists, the first one with the node
1864
        names and the second one with the node addresses
1865

1866
    """
1867
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1868
    name_list = self._nodes.keys()
1869
    addr_list = [self._nodes[name] for name in name_list]
1870
    return name_list, addr_list
1871

    
1872
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1873
    """Writes a file locally and then replicates it to all nodes.
1874

1875
    This function will replace the contents of a file on the local
1876
    node and then replicate it to all the other nodes we have.
1877

1878
    @type file_name: str
1879
    @param file_name: the path of the file to be replicated
1880
    @type data: str
1881
    @param data: the new contents of the file
1882
    @type replicate: boolean
1883
    @param replicate: whether to spread the changes to the remote nodes
1884

1885
    """
1886
    getents = runtime.GetEnts()
1887
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1888
                    gid=getents.daemons_gid,
1889
                    mode=constants.JOB_QUEUE_FILES_PERMS)
1890

    
1891
    if replicate:
1892
      names, addrs = self._GetNodeIp()
1893
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1894
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1895

    
1896
  def _RenameFilesUnlocked(self, rename):
1897
    """Renames a file locally and then replicate the change.
1898

1899
    This function will rename a file in the local queue directory
1900
    and then replicate this rename to all the other nodes we have.
1901

1902
    @type rename: list of (old, new)
1903
    @param rename: List containing tuples mapping old to new names
1904

1905
    """
1906
    # Rename them locally
1907
    for old, new in rename:
1908
      utils.RenameFile(old, new, mkdir=True)
1909

    
1910
    # ... and on all nodes
1911
    names, addrs = self._GetNodeIp()
1912
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1913
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1914

    
1915
  @staticmethod
1916
  def _GetJobPath(job_id):
1917
    """Returns the job file for a given job id.
1918

1919
    @type job_id: str
1920
    @param job_id: the job identifier
1921
    @rtype: str
1922
    @return: the path to the job file
1923

1924
    """
1925
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1926

    
1927
  @staticmethod
1928
  def _GetArchivedJobPath(job_id):
1929
    """Returns the archived job file for a give job id.
1930

1931
    @type job_id: str
1932
    @param job_id: the job identifier
1933
    @rtype: str
1934
    @return: the path to the archived job file
1935

1936
    """
1937
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1938
                          jstore.GetArchiveDirectory(job_id),
1939
                          "job-%s" % job_id)
1940

    
1941
  @staticmethod
1942
  def _DetermineJobDirectories(archived):
1943
    """Build list of directories containing job files.
1944

1945
    @type archived: bool
1946
    @param archived: Whether to include directories for archived jobs
1947
    @rtype: list
1948

1949
    """
1950
    result = [pathutils.QUEUE_DIR]
1951

    
1952
    if archived:
1953
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1954
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
1955
                        utils.ListVisibleFiles(archive_path)))
1956

    
1957
    return result
1958

    
1959
  @classmethod
1960
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1961
    """Return all known job IDs.
1962

1963
    The method only looks at disk because it's a requirement that all
1964
    jobs are present on disk (so in the _memcache we don't have any
1965
    extra IDs).
1966

1967
    @type sort: boolean
1968
    @param sort: perform sorting on the returned job ids
1969
    @rtype: list
1970
    @return: the list of job IDs
1971

1972
    """
1973
    jlist = []
1974

    
1975
    for path in cls._DetermineJobDirectories(archived):
1976
      for filename in utils.ListVisibleFiles(path):
1977
        m = constants.JOB_FILE_RE.match(filename)
1978
        if m:
1979
          jlist.append(int(m.group(1)))
1980

    
1981
    if sort:
1982
      jlist.sort()
1983
    return jlist
1984

    
1985
  def _LoadJobUnlocked(self, job_id):
1986
    """Loads a job from the disk or memory.
1987

1988
    Given a job id, this will return the cached job object if
1989
    existing, or try to load the job from the disk. If loading from
1990
    disk, it will also add the job to the cache.
1991

1992
    @type job_id: int
1993
    @param job_id: the job id
1994
    @rtype: L{_QueuedJob} or None
1995
    @return: either None or the job object
1996

1997
    """
1998
    assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!"
1999

    
2000
    job = self._memcache.get(job_id, None)
2001
    if job:
2002
      logging.debug("Found job %s in memcache", job_id)
2003
      assert job.writable, "Found read-only job in memcache"
2004
      return job
2005

    
2006
    try:
2007
      job = self._LoadJobFromDisk(job_id, False)
2008
      if job is None:
2009
        return job
2010
    except errors.JobFileCorrupted:
2011
      old_path = self._GetJobPath(job_id)
2012
      new_path = self._GetArchivedJobPath(job_id)
2013
      if old_path == new_path:
2014
        # job already archived (future case)
2015
        logging.exception("Can't parse job %s", job_id)
2016
      else:
2017
        # non-archived case
2018
        logging.exception("Can't parse job %s, will archive.", job_id)
2019
        self._RenameFilesUnlocked([(old_path, new_path)])
2020
      return None
2021

    
2022
    assert job.writable, "Job just loaded is not writable"
2023

    
2024
    self._memcache[job_id] = job
2025
    logging.debug("Added job %s to the cache", job_id)
2026
    return job
2027

    
2028
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2029
    """Load the given job file from disk.
2030

2031
    Given a job file, read, load and restore it in a _QueuedJob format.
2032

2033
    @type job_id: int
2034
    @param job_id: job identifier
2035
    @type try_archived: bool
2036
    @param try_archived: Whether to try loading an archived job
2037
    @rtype: L{_QueuedJob} or None
2038
    @return: either None or the job object
2039

2040
    """
2041
    path_functions = [(self._GetJobPath, False)]
2042

    
2043
    if try_archived:
2044
      path_functions.append((self._GetArchivedJobPath, True))
2045

    
2046
    raw_data = None
2047
    archived = None
2048

    
2049
    for (fn, archived) in path_functions:
2050
      filepath = fn(job_id)
2051
      logging.debug("Loading job from %s", filepath)
2052
      try:
2053
        raw_data = utils.ReadFile(filepath)
2054
      except EnvironmentError, err:
2055
        if err.errno != errno.ENOENT:
2056
          raise
2057
      else:
2058
        break
2059

    
2060
    if not raw_data:
2061
      return None
2062

    
2063
    if writable is None:
2064
      writable = not archived
2065

    
2066
    try:
2067
      data = serializer.LoadJson(raw_data)
2068
      job = _QueuedJob.Restore(self, data, writable, archived)
2069
    except Exception, err: # pylint: disable=W0703
2070
      raise errors.JobFileCorrupted(err)
2071

    
2072
    return job
2073

    
2074
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2075
    """Load the given job file from disk.
2076

2077
    Given a job file, read, load and restore it in a _QueuedJob format.
2078
    In case of error reading the job, it gets returned as None, and the
2079
    exception is logged.
2080

2081
    @type job_id: int
2082
    @param job_id: job identifier
2083
    @type try_archived: bool
2084
    @param try_archived: Whether to try loading an archived job
2085
    @rtype: L{_QueuedJob} or None
2086
    @return: either None or the job object
2087

2088
    """
2089
    try:
2090
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2091
    except (errors.JobFileCorrupted, EnvironmentError):
2092
      logging.exception("Can't load/parse job %s", job_id)
2093
      return None
2094

    
2095
  def _UpdateQueueSizeUnlocked(self):
2096
    """Update the queue size.
2097

2098
    """
2099
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2100

    
2101
  @locking.ssynchronized(_LOCK)
2102
  @_RequireOpenQueue
2103
  def SetDrainFlag(self, drain_flag):
2104
    """Sets the drain flag for the queue.
2105

2106
    @type drain_flag: boolean
2107
    @param drain_flag: Whether to set or unset the drain flag
2108

2109
    """
2110
    # Change flag locally
2111
    jstore.SetDrainFlag(drain_flag)
2112

    
2113
    self._drained = drain_flag
2114

    
2115
    # ... and on all nodes
2116
    (names, addrs) = self._GetNodeIp()
2117
    result = \
2118
      self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2119
    self._CheckRpcResult(result, self._nodes,
2120
                         "Setting queue drain flag to %s" % drain_flag)
2121

    
2122
    return True
2123

    
2124
  @classmethod
2125
  def SubmitJob(cls, ops):
2126
    """Create and store a new job.
2127

2128
    """
2129
    return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitJob(ops)
2130

    
2131
  @classmethod
2132
  def SubmitJobToDrainedQueue(cls, ops):
2133
    """Forcefully create and store a new job.
2134

2135
    Do so, even if the job queue is drained.
2136

2137
    """
2138
    return luxi.Client(address=pathutils.QUERY_SOCKET)\
2139
        .SubmitJobToDrainedQueue(ops)
2140

    
2141
  @classmethod
2142
  def SubmitManyJobs(cls, jobs):
2143
    """Create and store multiple jobs.
2144

2145
    """
2146
    return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitManyJobs(jobs)
2147

    
2148
  @staticmethod
2149
  def _FormatSubmitError(msg, ops):
2150
    """Formats errors which occurred while submitting a job.
2151

2152
    """
2153
    return ("%s; opcodes %s" %
2154
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2155

    
2156
  @staticmethod
2157
  def _ResolveJobDependencies(resolve_fn, deps):
2158
    """Resolves relative job IDs in dependencies.
2159

2160
    @type resolve_fn: callable
2161
    @param resolve_fn: Function to resolve a relative job ID
2162
    @type deps: list
2163
    @param deps: Dependencies
2164
    @rtype: tuple; (boolean, string or list)
2165
    @return: If successful (first tuple item), the returned list contains
2166
      resolved job IDs along with the requested status; if not successful,
2167
      the second element is an error message
2168

2169
    """
2170
    result = []
2171

    
2172
    for (dep_job_id, dep_status) in deps:
2173
      if ht.TRelativeJobId(dep_job_id):
2174
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2175
        try:
2176
          job_id = resolve_fn(dep_job_id)
2177
        except IndexError:
2178
          # Abort
2179
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2180
      else:
2181
        job_id = dep_job_id
2182

    
2183
      result.append((job_id, dep_status))
2184

    
2185
    return (True, result)
2186

    
2187
  @locking.ssynchronized(_LOCK)
2188
  def _EnqueueJobs(self, jobs):
2189
    """Helper function to add jobs to worker pool's queue.
2190

2191
    @type jobs: list
2192
    @param jobs: List of all jobs
2193

2194
    """
2195
    return self._EnqueueJobsUnlocked(jobs)
2196

    
2197
  def _EnqueueJobsUnlocked(self, jobs):
2198
    """Helper function to add jobs to worker pool's queue.
2199

2200
    @type jobs: list
2201
    @param jobs: List of all jobs
2202

2203
    """
2204
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2205
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2206
                             priority=[job.CalcPriority() for job in jobs],
2207
                             task_id=map(_GetIdAttr, jobs))
2208

    
2209
  def _GetJobStatusForDependencies(self, job_id):
2210
    """Gets the status of a job for dependencies.
2211

2212
    @type job_id: int
2213
    @param job_id: Job ID
2214
    @raise errors.JobLost: If job can't be found
2215

2216
    """
2217
    # Not using in-memory cache as doing so would require an exclusive lock
2218

    
2219
    # Try to load from disk
2220
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2221

    
2222
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2223

    
2224
    if job:
2225
      return job.CalcStatus()
2226

    
2227
    raise errors.JobLost("Job %s not found" % job_id)
2228

    
2229
  @_RequireOpenQueue
2230
  def UpdateJobUnlocked(self, job, replicate=True):
2231
    """Update a job's on disk storage.
2232

2233
    After a job has been modified, this function needs to be called in
2234
    order to write the changes to disk and replicate them to the other
2235
    nodes.
2236

2237
    @type job: L{_QueuedJob}
2238
    @param job: the changed job
2239
    @type replicate: boolean
2240
    @param replicate: whether to replicate the change to remote nodes
2241

2242
    """
2243
    if __debug__:
2244
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2245
      assert (finalized ^ (job.end_timestamp is None))
2246
      assert job.writable, "Can't update read-only job"
2247
      assert not job.archived, "Can't update archived job"
2248

    
2249
    filename = self._GetJobPath(job.id)
2250
    data = serializer.DumpJson(job.Serialize())
2251
    logging.debug("Writing job %s to %s", job.id, filename)
2252
    self._UpdateJobQueueFile(filename, data, replicate)
2253

    
2254
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2255
                        timeout):
2256
    """Waits for changes in a job.
2257

2258
    @type job_id: int
2259
    @param job_id: Job identifier
2260
    @type fields: list of strings
2261
    @param fields: Which fields to check for changes
2262
    @type prev_job_info: list or None
2263
    @param prev_job_info: Last job information returned
2264
    @type prev_log_serial: int
2265
    @param prev_log_serial: Last job message serial number
2266
    @type timeout: float
2267
    @param timeout: maximum time to wait in seconds
2268
    @rtype: tuple (job info, log entries)
2269
    @return: a tuple of the job information as required via
2270
        the fields parameter, and the log entries as a list
2271

2272
        if the job has not changed and the timeout has expired,
2273
        we instead return a special value,
2274
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2275
        as such by the clients
2276

2277
    """
2278
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2279
                             writable=False)
2280

    
2281
    helper = _WaitForJobChangesHelper()
2282

    
2283
    return helper(self._GetJobPath(job_id), load_fn,
2284
                  fields, prev_job_info, prev_log_serial, timeout)
2285

    
2286
  @locking.ssynchronized(_LOCK)
2287
  @_RequireOpenQueue
2288
  def CancelJob(self, job_id):
2289
    """Cancels a job.
2290

2291
    This will only succeed if the job has not started yet.
2292

2293
    @type job_id: int
2294
    @param job_id: job ID of job to be cancelled.
2295

2296
    """
2297
    logging.info("Cancelling job %s", job_id)
2298

    
2299
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2300

    
2301
  @locking.ssynchronized(_LOCK)
2302
  @_RequireOpenQueue
2303
  def ChangeJobPriority(self, job_id, priority):
2304
    """Changes a job's priority.
2305

2306
    @type job_id: int
2307
    @param job_id: ID of the job whose priority should be changed
2308
    @type priority: int
2309
    @param priority: New priority
2310

2311
    """
2312
    logging.info("Changing priority of job %s to %s", job_id, priority)
2313

    
2314
    if priority not in constants.OP_PRIO_SUBMIT_VALID:
2315
      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2316
      raise errors.GenericError("Invalid priority %s, allowed are %s" %
2317
                                (priority, allowed))
2318

    
2319
    def fn(job):
2320
      (success, msg) = job.ChangePriority(priority)
2321

    
2322
      if success:
2323
        try:
2324
          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2325
        except workerpool.NoSuchTask:
2326
          logging.debug("Job %s is not in workerpool at this time", job.id)
2327

    
2328
      return (success, msg)
2329

    
2330
    return self._ModifyJobUnlocked(job_id, fn)
2331

    
2332
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2333
    """Modifies a job.
2334

2335
    @type job_id: int
2336
    @param job_id: Job ID
2337
    @type mod_fn: callable
2338
    @param mod_fn: Modifying function, receiving job object as parameter,
2339
      returning tuple of (status boolean, message string)
2340

2341
    """
2342
    job = self._LoadJobUnlocked(job_id)
2343
    if not job:
2344
      logging.debug("Job %s not found", job_id)
2345
      return (False, "Job %s not found" % job_id)
2346

    
2347
    assert job.writable, "Can't modify read-only job"
2348
    assert not job.archived, "Can't modify archived job"
2349

    
2350
    (success, msg) = mod_fn(job)
2351

    
2352
    if success:
2353
      # If the job was finalized (e.g. cancelled), this is the final write
2354
      # allowed. The job can be archived anytime.
2355
      self.UpdateJobUnlocked(job)
2356

    
2357
    return (success, msg)
2358

    
2359
  @_RequireOpenQueue
2360
  def _ArchiveJobsUnlocked(self, jobs):
2361
    """Archives jobs.
2362

2363
    @type jobs: list of L{_QueuedJob}
2364
    @param jobs: Job objects
2365
    @rtype: int
2366
    @return: Number of archived jobs
2367

2368
    """
2369
    archive_jobs = []
2370
    rename_files = []
2371
    for job in jobs:
2372
      assert job.writable, "Can't archive read-only job"
2373
      assert not job.archived, "Can't cancel archived job"
2374

    
2375
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2376
        logging.debug("Job %s is not yet done", job.id)
2377
        continue
2378

    
2379
      archive_jobs.append(job)
2380

    
2381
      old = self._GetJobPath(job.id)
2382
      new = self._GetArchivedJobPath(job.id)
2383
      rename_files.append((old, new))
2384

    
2385
    # TODO: What if 1..n files fail to rename?
2386
    self._RenameFilesUnlocked(rename_files)
2387

    
2388
    logging.debug("Successfully archived job(s) %s",
2389
                  utils.CommaJoin(job.id for job in archive_jobs))
2390

    
2391
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2392
    # the files, we update the cached queue size from the filesystem. When we
2393
    # get around to fix the TODO: above, we can use the number of actually
2394
    # archived jobs to fix this.
2395
    self._UpdateQueueSizeUnlocked()
2396
    return len(archive_jobs)
2397

    
2398
  @locking.ssynchronized(_LOCK)
2399
  @_RequireOpenQueue
2400
  def ArchiveJob(self, job_id):
2401
    """Archives a job.
2402

2403
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2404

2405
    @type job_id: int
2406
    @param job_id: Job ID of job to be archived.
2407
    @rtype: bool
2408
    @return: Whether job was archived
2409

2410
    """
2411
    logging.info("Archiving job %s", job_id)
2412

    
2413
    job = self._LoadJobUnlocked(job_id)
2414
    if not job:
2415
      logging.debug("Job %s not found", job_id)
2416
      return False
2417

    
2418
    return self._ArchiveJobsUnlocked([job]) == 1
2419

    
2420
  @locking.ssynchronized(_LOCK)
2421
  @_RequireOpenQueue
2422
  def AutoArchiveJobs(self, age, timeout):
2423
    """Archives all jobs based on age.
2424

2425
    The method will archive all jobs which are older than the age
2426
    parameter. For jobs that don't have an end timestamp, the start
2427
    timestamp will be considered. The special '-1' age will cause
2428
    archival of all jobs (that are not running or queued).
2429

2430
    @type age: int
2431
    @param age: the minimum age in seconds
2432

2433
    """
2434
    logging.info("Archiving jobs with age more than %s seconds", age)
2435

    
2436
    now = time.time()
2437
    end_time = now + timeout
2438
    archived_count = 0
2439
    last_touched = 0
2440

    
2441
    all_job_ids = self._GetJobIDsUnlocked()
2442
    pending = []
2443
    for idx, job_id in enumerate(all_job_ids):
2444
      last_touched = idx + 1
2445

    
2446
      # Not optimal because jobs could be pending
2447
      # TODO: Measure average duration for job archival and take number of
2448
      # pending jobs into account.
2449
      if time.time() > end_time:
2450
        break
2451

    
2452
      # Returns None if the job failed to load
2453
      job = self._LoadJobUnlocked(job_id)
2454
      if job:
2455
        if job.end_timestamp is None:
2456
          if job.start_timestamp is None:
2457
            job_age = job.received_timestamp
2458
          else:
2459
            job_age = job.start_timestamp
2460
        else:
2461
          job_age = job.end_timestamp
2462

    
2463
        if age == -1 or now - job_age[0] > age:
2464
          pending.append(job)
2465

    
2466
          # Archive 10 jobs at a time
2467
          if len(pending) >= 10:
2468
            archived_count += self._ArchiveJobsUnlocked(pending)
2469
            pending = []
2470

    
2471
    if pending:
2472
      archived_count += self._ArchiveJobsUnlocked(pending)
2473

    
2474
    return (archived_count, len(all_job_ids) - last_touched)
2475

    
2476
  def _Query(self, fields, qfilter):
2477
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2478
                       namefield="id")
2479

    
2480
    # Archived jobs are only looked at if the "archived" field is referenced
2481
    # either as a requested field or in the filter. By default archived jobs
2482
    # are ignored.
2483
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2484

    
2485
    job_ids = qobj.RequestedNames()
2486

    
2487
    list_all = (job_ids is None)
2488

    
2489
    if list_all:
2490
      # Since files are added to/removed from the queue atomically, there's no
2491
      # risk of getting the job ids in an inconsistent state.
2492
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2493

    
2494
    jobs = []
2495

    
2496
    for job_id in job_ids:
2497
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2498
      if job is not None or not list_all:
2499
        jobs.append((job_id, job))
2500

    
2501
    return (qobj, jobs, list_all)
2502

    
2503
  def QueryJobs(self, fields, qfilter):
2504
    """Returns a list of jobs in queue.
2505

2506
    @type fields: sequence
2507
    @param fields: List of wanted fields
2508
    @type qfilter: None or query2 filter (list)
2509
    @param qfilter: Query filter
2510

2511
    """
2512
    (qobj, ctx, _) = self._Query(fields, qfilter)
2513

    
2514
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2515

    
2516
  def OldStyleQueryJobs(self, job_ids, fields):
2517
    """Returns a list of jobs in queue.
2518

2519
    @type job_ids: list
2520
    @param job_ids: sequence of job identifiers or None for all
2521
    @type fields: list
2522
    @param fields: names of fields to return
2523
    @rtype: list
2524
    @return: list one element per job, each element being list with
2525
        the requested fields
2526

2527
    """
2528
    # backwards compat:
2529
    job_ids = [int(jid) for jid in job_ids]
2530
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2531

    
2532
    (qobj, ctx, _) = self._Query(fields, qfilter)
2533

    
2534
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2535

    
2536
  @locking.ssynchronized(_LOCK)
2537
  def PrepareShutdown(self):
2538
    """Prepare to stop the job queue.
2539

2540
    Disables execution of jobs in the workerpool and returns whether there are
2541
    any jobs currently running. If the latter is the case, the job queue is not
2542
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2543
    be called without interfering with any job. Queued and unfinished jobs will
2544
    be resumed next time.
2545

2546
    Once this function has been called no new job submissions will be accepted
2547
    (see L{_RequireNonDrainedQueue}).
2548

2549
    @rtype: bool
2550
    @return: Whether there are any running jobs
2551

2552
    """
2553
    if self._accepting_jobs:
2554
      self._accepting_jobs = False
2555

    
2556
      # Tell worker pool to stop processing pending tasks
2557
      self._wpool.SetActive(False)
2558

    
2559
    return self._wpool.HasRunningTasks()
2560

    
2561
  def AcceptingJobsUnlocked(self):
2562
    """Returns whether jobs are accepted.
2563

2564
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2565
    queue is shutting down.
2566

2567
    @rtype: bool
2568

2569
    """
2570
    return self._accepting_jobs
2571

    
2572
  @locking.ssynchronized(_LOCK)
2573
  @_RequireOpenQueue
2574
  def Shutdown(self):
2575
    """Stops the job queue.
2576

2577
    This shutdowns all the worker threads an closes the queue.
2578

2579
    """
2580
    self._wpool.TerminateWorkers()
2581

    
2582
    self._queue_filelock.Close()
2583
    self._queue_filelock = None