Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ fe05a931

History | View | Annotate | Download (79.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import time
35
import weakref
36
import threading
37
import itertools
38
import operator
39

    
40
try:
41
  # pylint: disable=E0611
42
  from pyinotify import pyinotify
43
except ImportError:
44
  import pyinotify
45

    
46
from ganeti import asyncnotifier
47
from ganeti import constants
48
from ganeti import serializer
49
from ganeti import workerpool
50
from ganeti import locking
51
from ganeti import opcodes
52
from ganeti import errors
53
from ganeti import mcpu
54
from ganeti import utils
55
from ganeti import jstore
56
from ganeti import rpc
57
from ganeti import runtime
58
from ganeti import netutils
59
from ganeti import compat
60
from ganeti import ht
61
from ganeti import query
62
from ganeti import qlang
63
from ganeti import pathutils
64
from ganeti import vcluster
65

    
66

    
67
JOBQUEUE_THREADS = 25
68

    
69
# member lock names to be passed to @ssynchronized decorator
70
_LOCK = "_lock"
71
_QUEUE = "_queue"
72

    
73
#: Retrieves "id" attribute
74
_GetIdAttr = operator.attrgetter("id")
75

    
76

    
77
class CancelJob(Exception):
78
  """Special exception to cancel a job.
79

80
  """
81

    
82

    
83
class QueueShutdown(Exception):
84
  """Special exception to abort a job when the job queue is shutting down.
85

86
  """
87

    
88

    
89
def TimeStampNow():
90
  """Returns the current timestamp.
91

92
  @rtype: tuple
93
  @return: the current time in the (seconds, microseconds) format
94

95
  """
96
  return utils.SplitTime(time.time())
97

    
98

    
99
def _CallJqUpdate(runner, names, file_name, content):
100
  """Updates job queue file after virtualizing filename.
101

102
  """
103
  virt_file_name = vcluster.MakeVirtualPath(file_name)
104
  return runner.call_jobqueue_update(names, virt_file_name, content)
105

    
106

    
107
class _SimpleJobQuery:
108
  """Wrapper for job queries.
109

110
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
111

112
  """
113
  def __init__(self, fields):
114
    """Initializes this class.
115

116
    """
117
    self._query = query.Query(query.JOB_FIELDS, fields)
118

    
119
  def __call__(self, job):
120
    """Executes a job query using cached field list.
121

122
    """
123
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
124

    
125

    
126
class _QueuedOpCode(object):
127
  """Encapsulates an opcode object.
128

129
  @ivar log: holds the execution log and consists of tuples
130
  of the form C{(log_serial, timestamp, level, message)}
131
  @ivar input: the OpCode we encapsulate
132
  @ivar status: the current status
133
  @ivar result: the result of the LU execution
134
  @ivar start_timestamp: timestamp for the start of the execution
135
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
136
  @ivar stop_timestamp: timestamp for the end of the execution
137

138
  """
139
  __slots__ = ["input", "status", "result", "log", "priority",
140
               "start_timestamp", "exec_timestamp", "end_timestamp",
141
               "__weakref__"]
142

    
143
  def __init__(self, op):
144
    """Initializes instances of this class.
145

146
    @type op: L{opcodes.OpCode}
147
    @param op: the opcode we encapsulate
148

149
    """
150
    self.input = op
151
    self.status = constants.OP_STATUS_QUEUED
152
    self.result = None
153
    self.log = []
154
    self.start_timestamp = None
155
    self.exec_timestamp = None
156
    self.end_timestamp = None
157

    
158
    # Get initial priority (it might change during the lifetime of this opcode)
159
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
160

    
161
  @classmethod
162
  def Restore(cls, state):
163
    """Restore the _QueuedOpCode from the serialized form.
164

165
    @type state: dict
166
    @param state: the serialized state
167
    @rtype: _QueuedOpCode
168
    @return: a new _QueuedOpCode instance
169

170
    """
171
    obj = _QueuedOpCode.__new__(cls)
172
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
173
    obj.status = state["status"]
174
    obj.result = state["result"]
175
    obj.log = state["log"]
176
    obj.start_timestamp = state.get("start_timestamp", None)
177
    obj.exec_timestamp = state.get("exec_timestamp", None)
178
    obj.end_timestamp = state.get("end_timestamp", None)
179
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
180
    return obj
181

    
182
  def Serialize(self):
183
    """Serializes this _QueuedOpCode.
184

185
    @rtype: dict
186
    @return: the dictionary holding the serialized state
187

188
    """
189
    return {
190
      "input": self.input.__getstate__(),
191
      "status": self.status,
192
      "result": self.result,
193
      "log": self.log,
194
      "start_timestamp": self.start_timestamp,
195
      "exec_timestamp": self.exec_timestamp,
196
      "end_timestamp": self.end_timestamp,
197
      "priority": self.priority,
198
      }
199

    
200

    
201
class _QueuedJob(object):
202
  """In-memory job representation.
203

204
  This is what we use to track the user-submitted jobs. Locking must
205
  be taken care of by users of this class.
206

207
  @type queue: L{JobQueue}
208
  @ivar queue: the parent queue
209
  @ivar id: the job ID
210
  @type ops: list
211
  @ivar ops: the list of _QueuedOpCode that constitute the job
212
  @type log_serial: int
213
  @ivar log_serial: holds the index for the next log entry
214
  @ivar received_timestamp: the timestamp for when the job was received
215
  @ivar start_timestmap: the timestamp for start of execution
216
  @ivar end_timestamp: the timestamp for end of execution
217
  @ivar writable: Whether the job is allowed to be modified
218

219
  """
220
  # pylint: disable=W0212
221
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
222
               "received_timestamp", "start_timestamp", "end_timestamp",
223
               "__weakref__", "processor_lock", "writable", "archived"]
224

    
225
  def __init__(self, queue, job_id, ops, writable):
226
    """Constructor for the _QueuedJob.
227

228
    @type queue: L{JobQueue}
229
    @param queue: our parent queue
230
    @type job_id: job_id
231
    @param job_id: our job id
232
    @type ops: list
233
    @param ops: the list of opcodes we hold, which will be encapsulated
234
        in _QueuedOpCodes
235
    @type writable: bool
236
    @param writable: Whether job can be modified
237

238
    """
239
    if not ops:
240
      raise errors.GenericError("A job needs at least one opcode")
241

    
242
    self.queue = queue
243
    self.id = int(job_id)
244
    self.ops = [_QueuedOpCode(op) for op in ops]
245
    self.log_serial = 0
246
    self.received_timestamp = TimeStampNow()
247
    self.start_timestamp = None
248
    self.end_timestamp = None
249
    self.archived = False
250

    
251
    self._InitInMemory(self, writable)
252

    
253
    assert not self.archived, "New jobs can not be marked as archived"
254

    
255
  @staticmethod
256
  def _InitInMemory(obj, writable):
257
    """Initializes in-memory variables.
258

259
    """
260
    obj.writable = writable
261
    obj.ops_iter = None
262
    obj.cur_opctx = None
263

    
264
    # Read-only jobs are not processed and therefore don't need a lock
265
    if writable:
266
      obj.processor_lock = threading.Lock()
267
    else:
268
      obj.processor_lock = None
269

    
270
  def __repr__(self):
271
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
272
              "id=%s" % self.id,
273
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
274

    
275
    return "<%s at %#x>" % (" ".join(status), id(self))
276

    
277
  @classmethod
278
  def Restore(cls, queue, state, writable, archived):
279
    """Restore a _QueuedJob from serialized state:
280

281
    @type queue: L{JobQueue}
282
    @param queue: to which queue the restored job belongs
283
    @type state: dict
284
    @param state: the serialized state
285
    @type writable: bool
286
    @param writable: Whether job can be modified
287
    @type archived: bool
288
    @param archived: Whether job was already archived
289
    @rtype: _JobQueue
290
    @return: the restored _JobQueue instance
291

292
    """
293
    obj = _QueuedJob.__new__(cls)
294
    obj.queue = queue
295
    obj.id = int(state["id"])
296
    obj.received_timestamp = state.get("received_timestamp", None)
297
    obj.start_timestamp = state.get("start_timestamp", None)
298
    obj.end_timestamp = state.get("end_timestamp", None)
299
    obj.archived = archived
300

    
301
    obj.ops = []
302
    obj.log_serial = 0
303
    for op_state in state["ops"]:
304
      op = _QueuedOpCode.Restore(op_state)
305
      for log_entry in op.log:
306
        obj.log_serial = max(obj.log_serial, log_entry[0])
307
      obj.ops.append(op)
308

    
309
    cls._InitInMemory(obj, writable)
310

    
311
    return obj
312

    
313
  def Serialize(self):
314
    """Serialize the _JobQueue instance.
315

316
    @rtype: dict
317
    @return: the serialized state
318

319
    """
320
    return {
321
      "id": self.id,
322
      "ops": [op.Serialize() for op in self.ops],
323
      "start_timestamp": self.start_timestamp,
324
      "end_timestamp": self.end_timestamp,
325
      "received_timestamp": self.received_timestamp,
326
      }
327

    
328
  def CalcStatus(self):
329
    """Compute the status of this job.
330

331
    This function iterates over all the _QueuedOpCodes in the job and
332
    based on their status, computes the job status.
333

334
    The algorithm is:
335
      - if we find a cancelled, or finished with error, the job
336
        status will be the same
337
      - otherwise, the last opcode with the status one of:
338
          - waitlock
339
          - canceling
340
          - running
341

342
        will determine the job status
343

344
      - otherwise, it means either all opcodes are queued, or success,
345
        and the job status will be the same
346

347
    @return: the job status
348

349
    """
350
    status = constants.JOB_STATUS_QUEUED
351

    
352
    all_success = True
353
    for op in self.ops:
354
      if op.status == constants.OP_STATUS_SUCCESS:
355
        continue
356

    
357
      all_success = False
358

    
359
      if op.status == constants.OP_STATUS_QUEUED:
360
        pass
361
      elif op.status == constants.OP_STATUS_WAITING:
362
        status = constants.JOB_STATUS_WAITING
363
      elif op.status == constants.OP_STATUS_RUNNING:
364
        status = constants.JOB_STATUS_RUNNING
365
      elif op.status == constants.OP_STATUS_CANCELING:
366
        status = constants.JOB_STATUS_CANCELING
367
        break
368
      elif op.status == constants.OP_STATUS_ERROR:
369
        status = constants.JOB_STATUS_ERROR
370
        # The whole job fails if one opcode failed
371
        break
372
      elif op.status == constants.OP_STATUS_CANCELED:
373
        status = constants.OP_STATUS_CANCELED
374
        break
375

    
376
    if all_success:
377
      status = constants.JOB_STATUS_SUCCESS
378

    
379
    return status
380

    
381
  def CalcPriority(self):
382
    """Gets the current priority for this job.
383

384
    Only unfinished opcodes are considered. When all are done, the default
385
    priority is used.
386

387
    @rtype: int
388

389
    """
390
    priorities = [op.priority for op in self.ops
391
                  if op.status not in constants.OPS_FINALIZED]
392

    
393
    if not priorities:
394
      # All opcodes are done, assume default priority
395
      return constants.OP_PRIO_DEFAULT
396

    
397
    return min(priorities)
398

    
399
  def GetLogEntries(self, newer_than):
400
    """Selectively returns the log entries.
401

402
    @type newer_than: None or int
403
    @param newer_than: if this is None, return all log entries,
404
        otherwise return only the log entries with serial higher
405
        than this value
406
    @rtype: list
407
    @return: the list of the log entries selected
408

409
    """
410
    if newer_than is None:
411
      serial = -1
412
    else:
413
      serial = newer_than
414

    
415
    entries = []
416
    for op in self.ops:
417
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
418

    
419
    return entries
420

    
421
  def GetInfo(self, fields):
422
    """Returns information about a job.
423

424
    @type fields: list
425
    @param fields: names of fields to return
426
    @rtype: list
427
    @return: list with one element for each field
428
    @raise errors.OpExecError: when an invalid field
429
        has been passed
430

431
    """
432
    return _SimpleJobQuery(fields)(self)
433

    
434
  def MarkUnfinishedOps(self, status, result):
435
    """Mark unfinished opcodes with a given status and result.
436

437
    This is an utility function for marking all running or waiting to
438
    be run opcodes with a given status. Opcodes which are already
439
    finalised are not changed.
440

441
    @param status: a given opcode status
442
    @param result: the opcode result
443

444
    """
445
    not_marked = True
446
    for op in self.ops:
447
      if op.status in constants.OPS_FINALIZED:
448
        assert not_marked, "Finalized opcodes found after non-finalized ones"
449
        continue
450
      op.status = status
451
      op.result = result
452
      not_marked = False
453

    
454
  def Finalize(self):
455
    """Marks the job as finalized.
456

457
    """
458
    self.end_timestamp = TimeStampNow()
459

    
460
  def Cancel(self):
461
    """Marks job as canceled/-ing if possible.
462

463
    @rtype: tuple; (bool, string)
464
    @return: Boolean describing whether job was successfully canceled or marked
465
      as canceling and a text message
466

467
    """
468
    status = self.CalcStatus()
469

    
470
    if status == constants.JOB_STATUS_QUEUED:
471
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
472
                             "Job canceled by request")
473
      self.Finalize()
474
      return (True, "Job %s canceled" % self.id)
475

    
476
    elif status == constants.JOB_STATUS_WAITING:
477
      # The worker will notice the new status and cancel the job
478
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
479
      return (True, "Job %s will be canceled" % self.id)
480

    
481
    else:
482
      logging.debug("Job %s is no longer waiting in the queue", self.id)
483
      return (False, "Job %s is no longer waiting in the queue" % self.id)
484

    
485
  def ChangePriority(self, priority):
486
    """Changes the job priority.
487

488
    @type priority: int
489
    @param priority: New priority
490
    @rtype: tuple; (bool, string)
491
    @return: Boolean describing whether job's priority was successfully changed
492
      and a text message
493

494
    """
495
    status = self.CalcStatus()
496

    
497
    if status in constants.JOBS_FINALIZED:
498
      return (False, "Job %s is finished" % self.id)
499
    elif status == constants.JOB_STATUS_CANCELING:
500
      return (False, "Job %s is cancelling" % self.id)
501
    else:
502
      assert status in (constants.JOB_STATUS_QUEUED,
503
                        constants.JOB_STATUS_WAITING,
504
                        constants.JOB_STATUS_RUNNING)
505

    
506
      changed = False
507
      for op in self.ops:
508
        if (op.status == constants.OP_STATUS_RUNNING or
509
            op.status in constants.OPS_FINALIZED):
510
          assert not changed, \
511
            ("Found opcode for which priority should not be changed after"
512
             " priority has been changed for previous opcodes")
513
          continue
514

    
515
        assert op.status in (constants.OP_STATUS_QUEUED,
516
                             constants.OP_STATUS_WAITING)
517

    
518
        changed = True
519

    
520
        # Set new priority (doesn't modify opcode input)
521
        op.priority = priority
522

    
523
      if changed:
524
        return (True, ("Priorities of pending opcodes for job %s have been"
525
                       " changed to %s" % (self.id, priority)))
526
      else:
527
        return (False, "Job %s had no pending opcodes" % self.id)
528

    
529

    
530
class _OpExecCallbacks(mcpu.OpExecCbBase):
531
  def __init__(self, queue, job, op):
532
    """Initializes this class.
533

534
    @type queue: L{JobQueue}
535
    @param queue: Job queue
536
    @type job: L{_QueuedJob}
537
    @param job: Job object
538
    @type op: L{_QueuedOpCode}
539
    @param op: OpCode
540

541
    """
542
    assert queue, "Queue is missing"
543
    assert job, "Job is missing"
544
    assert op, "Opcode is missing"
545

    
546
    self._queue = queue
547
    self._job = job
548
    self._op = op
549

    
550
  def _CheckCancel(self):
551
    """Raises an exception to cancel the job if asked to.
552

553
    """
554
    # Cancel here if we were asked to
555
    if self._op.status == constants.OP_STATUS_CANCELING:
556
      logging.debug("Canceling opcode")
557
      raise CancelJob()
558

    
559
    # See if queue is shutting down
560
    if not self._queue.AcceptingJobsUnlocked():
561
      logging.debug("Queue is shutting down")
562
      raise QueueShutdown()
563

    
564
  @locking.ssynchronized(_QUEUE, shared=1)
565
  def NotifyStart(self):
566
    """Mark the opcode as running, not lock-waiting.
567

568
    This is called from the mcpu code as a notifier function, when the LU is
569
    finally about to start the Exec() method. Of course, to have end-user
570
    visible results, the opcode must be initially (before calling into
571
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
572

573
    """
574
    assert self._op in self._job.ops
575
    assert self._op.status in (constants.OP_STATUS_WAITING,
576
                               constants.OP_STATUS_CANCELING)
577

    
578
    # Cancel here if we were asked to
579
    self._CheckCancel()
580

    
581
    logging.debug("Opcode is now running")
582

    
583
    self._op.status = constants.OP_STATUS_RUNNING
584
    self._op.exec_timestamp = TimeStampNow()
585

    
586
    # And finally replicate the job status
587
    self._queue.UpdateJobUnlocked(self._job)
588

    
589
  @locking.ssynchronized(_QUEUE, shared=1)
590
  def _AppendFeedback(self, timestamp, log_type, log_msg):
591
    """Internal feedback append function, with locks
592

593
    """
594
    self._job.log_serial += 1
595
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
596
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
597

    
598
  def Feedback(self, *args):
599
    """Append a log entry.
600

601
    """
602
    assert len(args) < 3
603

    
604
    if len(args) == 1:
605
      log_type = constants.ELOG_MESSAGE
606
      log_msg = args[0]
607
    else:
608
      (log_type, log_msg) = args
609

    
610
    # The time is split to make serialization easier and not lose
611
    # precision.
612
    timestamp = utils.SplitTime(time.time())
613
    self._AppendFeedback(timestamp, log_type, log_msg)
614

    
615
  def CurrentPriority(self):
616
    """Returns current priority for opcode.
617

618
    """
619
    assert self._op.status in (constants.OP_STATUS_WAITING,
620
                               constants.OP_STATUS_CANCELING)
621

    
622
    # Cancel here if we were asked to
623
    self._CheckCancel()
624

    
625
    return self._op.priority
626

    
627
  def SubmitManyJobs(self, jobs):
628
    """Submits jobs for processing.
629

630
    See L{JobQueue.SubmitManyJobs}.
631

632
    """
633
    # Locking is done in job queue
634
    return self._queue.SubmitManyJobs(jobs)
635

    
636

    
637
class _JobChangesChecker(object):
638
  def __init__(self, fields, prev_job_info, prev_log_serial):
639
    """Initializes this class.
640

641
    @type fields: list of strings
642
    @param fields: Fields requested by LUXI client
643
    @type prev_job_info: string
644
    @param prev_job_info: previous job info, as passed by the LUXI client
645
    @type prev_log_serial: string
646
    @param prev_log_serial: previous job serial, as passed by the LUXI client
647

648
    """
649
    self._squery = _SimpleJobQuery(fields)
650
    self._prev_job_info = prev_job_info
651
    self._prev_log_serial = prev_log_serial
652

    
653
  def __call__(self, job):
654
    """Checks whether job has changed.
655

656
    @type job: L{_QueuedJob}
657
    @param job: Job object
658

659
    """
660
    assert not job.writable, "Expected read-only job"
661

    
662
    status = job.CalcStatus()
663
    job_info = self._squery(job)
664
    log_entries = job.GetLogEntries(self._prev_log_serial)
665

    
666
    # Serializing and deserializing data can cause type changes (e.g. from
667
    # tuple to list) or precision loss. We're doing it here so that we get
668
    # the same modifications as the data received from the client. Without
669
    # this, the comparison afterwards might fail without the data being
670
    # significantly different.
671
    # TODO: we just deserialized from disk, investigate how to make sure that
672
    # the job info and log entries are compatible to avoid this further step.
673
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
674
    # efficient, though floats will be tricky
675
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
676
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
677

    
678
    # Don't even try to wait if the job is no longer running, there will be
679
    # no changes.
680
    if (status not in (constants.JOB_STATUS_QUEUED,
681
                       constants.JOB_STATUS_RUNNING,
682
                       constants.JOB_STATUS_WAITING) or
683
        job_info != self._prev_job_info or
684
        (log_entries and self._prev_log_serial != log_entries[0][0])):
685
      logging.debug("Job %s changed", job.id)
686
      return (job_info, log_entries)
687

    
688
    return None
689

    
690

    
691
class _JobFileChangesWaiter(object):
692
  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
693
    """Initializes this class.
694

695
    @type filename: string
696
    @param filename: Path to job file
697
    @raises errors.InotifyError: if the notifier cannot be setup
698

699
    """
700
    self._wm = _inotify_wm_cls()
701
    self._inotify_handler = \
702
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
703
    self._notifier = \
704
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
705
    try:
706
      self._inotify_handler.enable()
707
    except Exception:
708
      # pyinotify doesn't close file descriptors automatically
709
      self._notifier.stop()
710
      raise
711

    
712
  def _OnInotify(self, notifier_enabled):
713
    """Callback for inotify.
714

715
    """
716
    if not notifier_enabled:
717
      self._inotify_handler.enable()
718

    
719
  def Wait(self, timeout):
720
    """Waits for the job file to change.
721

722
    @type timeout: float
723
    @param timeout: Timeout in seconds
724
    @return: Whether there have been events
725

726
    """
727
    assert timeout >= 0
728
    have_events = self._notifier.check_events(timeout * 1000)
729
    if have_events:
730
      self._notifier.read_events()
731
    self._notifier.process_events()
732
    return have_events
733

    
734
  def Close(self):
735
    """Closes underlying notifier and its file descriptor.
736

737
    """
738
    self._notifier.stop()
739

    
740

    
741
class _JobChangesWaiter(object):
742
  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
743
    """Initializes this class.
744

745
    @type filename: string
746
    @param filename: Path to job file
747

748
    """
749
    self._filewaiter = None
750
    self._filename = filename
751
    self._waiter_cls = _waiter_cls
752

    
753
  def Wait(self, timeout):
754
    """Waits for a job to change.
755

756
    @type timeout: float
757
    @param timeout: Timeout in seconds
758
    @return: Whether there have been events
759

760
    """
761
    if self._filewaiter:
762
      return self._filewaiter.Wait(timeout)
763

    
764
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
765
    # If this point is reached, return immediately and let caller check the job
766
    # file again in case there were changes since the last check. This avoids a
767
    # race condition.
768
    self._filewaiter = self._waiter_cls(self._filename)
769

    
770
    return True
771

    
772
  def Close(self):
773
    """Closes underlying waiter.
774

775
    """
776
    if self._filewaiter:
777
      self._filewaiter.Close()
778

    
779

    
780
class _WaitForJobChangesHelper(object):
781
  """Helper class using inotify to wait for changes in a job file.
782

783
  This class takes a previous job status and serial, and alerts the client when
784
  the current job status has changed.
785

786
  """
787
  @staticmethod
788
  def _CheckForChanges(counter, job_load_fn, check_fn):
789
    if counter.next() > 0:
790
      # If this isn't the first check the job is given some more time to change
791
      # again. This gives better performance for jobs generating many
792
      # changes/messages.
793
      time.sleep(0.1)
794

    
795
    job = job_load_fn()
796
    if not job:
797
      raise errors.JobLost()
798

    
799
    result = check_fn(job)
800
    if result is None:
801
      raise utils.RetryAgain()
802

    
803
    return result
804

    
805
  def __call__(self, filename, job_load_fn,
806
               fields, prev_job_info, prev_log_serial, timeout,
807
               _waiter_cls=_JobChangesWaiter):
808
    """Waits for changes on a job.
809

810
    @type filename: string
811
    @param filename: File on which to wait for changes
812
    @type job_load_fn: callable
813
    @param job_load_fn: Function to load job
814
    @type fields: list of strings
815
    @param fields: Which fields to check for changes
816
    @type prev_job_info: list or None
817
    @param prev_job_info: Last job information returned
818
    @type prev_log_serial: int
819
    @param prev_log_serial: Last job message serial number
820
    @type timeout: float
821
    @param timeout: maximum time to wait in seconds
822

823
    """
824
    counter = itertools.count()
825
    try:
826
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
827
      waiter = _waiter_cls(filename)
828
      try:
829
        return utils.Retry(compat.partial(self._CheckForChanges,
830
                                          counter, job_load_fn, check_fn),
831
                           utils.RETRY_REMAINING_TIME, timeout,
832
                           wait_fn=waiter.Wait)
833
      finally:
834
        waiter.Close()
835
    except errors.JobLost:
836
      return None
837
    except utils.RetryTimeout:
838
      return constants.JOB_NOTCHANGED
839

    
840

    
841
def _EncodeOpError(err):
842
  """Encodes an error which occurred while processing an opcode.
843

844
  """
845
  if isinstance(err, errors.GenericError):
846
    to_encode = err
847
  else:
848
    to_encode = errors.OpExecError(str(err))
849

    
850
  return errors.EncodeException(to_encode)
851

    
852

    
853
class _TimeoutStrategyWrapper:
854
  def __init__(self, fn):
855
    """Initializes this class.
856

857
    """
858
    self._fn = fn
859
    self._next = None
860

    
861
  def _Advance(self):
862
    """Gets the next timeout if necessary.
863

864
    """
865
    if self._next is None:
866
      self._next = self._fn()
867

    
868
  def Peek(self):
869
    """Returns the next timeout.
870

871
    """
872
    self._Advance()
873
    return self._next
874

    
875
  def Next(self):
876
    """Returns the current timeout and advances the internal state.
877

878
    """
879
    self._Advance()
880
    result = self._next
881
    self._next = None
882
    return result
883

    
884

    
885
class _OpExecContext:
886
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
887
    """Initializes this class.
888

889
    """
890
    self.op = op
891
    self.index = index
892
    self.log_prefix = log_prefix
893
    self.summary = op.input.Summary()
894

    
895
    # Create local copy to modify
896
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
897
      self.jobdeps = op.input.depends[:]
898
    else:
899
      self.jobdeps = None
900

    
901
    self._timeout_strategy_factory = timeout_strategy_factory
902
    self._ResetTimeoutStrategy()
903

    
904
  def _ResetTimeoutStrategy(self):
905
    """Creates a new timeout strategy.
906

907
    """
908
    self._timeout_strategy = \
909
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
910

    
911
  def CheckPriorityIncrease(self):
912
    """Checks whether priority can and should be increased.
913

914
    Called when locks couldn't be acquired.
915

916
    """
917
    op = self.op
918

    
919
    # Exhausted all retries and next round should not use blocking acquire
920
    # for locks?
921
    if (self._timeout_strategy.Peek() is None and
922
        op.priority > constants.OP_PRIO_HIGHEST):
923
      logging.debug("Increasing priority")
924
      op.priority -= 1
925
      self._ResetTimeoutStrategy()
926
      return True
927

    
928
    return False
929

    
930
  def GetNextLockTimeout(self):
931
    """Returns the next lock acquire timeout.
932

933
    """
934
    return self._timeout_strategy.Next()
935

    
936

    
937
class _JobProcessor(object):
938
  (DEFER,
939
   WAITDEP,
940
   FINISHED) = range(1, 4)
941

    
942
  def __init__(self, queue, opexec_fn, job,
943
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
944
    """Initializes this class.
945

946
    """
947
    self.queue = queue
948
    self.opexec_fn = opexec_fn
949
    self.job = job
950
    self._timeout_strategy_factory = _timeout_strategy_factory
951

    
952
  @staticmethod
953
  def _FindNextOpcode(job, timeout_strategy_factory):
954
    """Locates the next opcode to run.
955

956
    @type job: L{_QueuedJob}
957
    @param job: Job object
958
    @param timeout_strategy_factory: Callable to create new timeout strategy
959

960
    """
961
    # Create some sort of a cache to speed up locating next opcode for future
962
    # lookups
963
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
964
    # pending and one for processed ops.
965
    if job.ops_iter is None:
966
      job.ops_iter = enumerate(job.ops)
967

    
968
    # Find next opcode to run
969
    while True:
970
      try:
971
        (idx, op) = job.ops_iter.next()
972
      except StopIteration:
973
        raise errors.ProgrammerError("Called for a finished job")
974

    
975
      if op.status == constants.OP_STATUS_RUNNING:
976
        # Found an opcode already marked as running
977
        raise errors.ProgrammerError("Called for job marked as running")
978

    
979
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
980
                             timeout_strategy_factory)
981

    
982
      if op.status not in constants.OPS_FINALIZED:
983
        return opctx
984

    
985
      # This is a job that was partially completed before master daemon
986
      # shutdown, so it can be expected that some opcodes are already
987
      # completed successfully (if any did error out, then the whole job
988
      # should have been aborted and not resubmitted for processing).
989
      logging.info("%s: opcode %s already processed, skipping",
990
                   opctx.log_prefix, opctx.summary)
991

    
992
  @staticmethod
993
  def _MarkWaitlock(job, op):
994
    """Marks an opcode as waiting for locks.
995

996
    The job's start timestamp is also set if necessary.
997

998
    @type job: L{_QueuedJob}
999
    @param job: Job object
1000
    @type op: L{_QueuedOpCode}
1001
    @param op: Opcode object
1002

1003
    """
1004
    assert op in job.ops
1005
    assert op.status in (constants.OP_STATUS_QUEUED,
1006
                         constants.OP_STATUS_WAITING)
1007

    
1008
    update = False
1009

    
1010
    op.result = None
1011

    
1012
    if op.status == constants.OP_STATUS_QUEUED:
1013
      op.status = constants.OP_STATUS_WAITING
1014
      update = True
1015

    
1016
    if op.start_timestamp is None:
1017
      op.start_timestamp = TimeStampNow()
1018
      update = True
1019

    
1020
    if job.start_timestamp is None:
1021
      job.start_timestamp = op.start_timestamp
1022
      update = True
1023

    
1024
    assert op.status == constants.OP_STATUS_WAITING
1025

    
1026
    return update
1027

    
1028
  @staticmethod
1029
  def _CheckDependencies(queue, job, opctx):
1030
    """Checks if an opcode has dependencies and if so, processes them.
1031

1032
    @type queue: L{JobQueue}
1033
    @param queue: Queue object
1034
    @type job: L{_QueuedJob}
1035
    @param job: Job object
1036
    @type opctx: L{_OpExecContext}
1037
    @param opctx: Opcode execution context
1038
    @rtype: bool
1039
    @return: Whether opcode will be re-scheduled by dependency tracker
1040

1041
    """
1042
    op = opctx.op
1043

    
1044
    result = False
1045

    
1046
    while opctx.jobdeps:
1047
      (dep_job_id, dep_status) = opctx.jobdeps[0]
1048

    
1049
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1050
                                                          dep_status)
1051
      assert ht.TNonEmptyString(depmsg), "No dependency message"
1052

    
1053
      logging.info("%s: %s", opctx.log_prefix, depmsg)
1054

    
1055
      if depresult == _JobDependencyManager.CONTINUE:
1056
        # Remove dependency and continue
1057
        opctx.jobdeps.pop(0)
1058

    
1059
      elif depresult == _JobDependencyManager.WAIT:
1060
        # Need to wait for notification, dependency tracker will re-add job
1061
        # to workerpool
1062
        result = True
1063
        break
1064

    
1065
      elif depresult == _JobDependencyManager.CANCEL:
1066
        # Job was cancelled, cancel this job as well
1067
        job.Cancel()
1068
        assert op.status == constants.OP_STATUS_CANCELING
1069
        break
1070

    
1071
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1072
                         _JobDependencyManager.ERROR):
1073
        # Job failed or there was an error, this job must fail
1074
        op.status = constants.OP_STATUS_ERROR
1075
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1076
        break
1077

    
1078
      else:
1079
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1080
                                     depresult)
1081

    
1082
    return result
1083

    
1084
  def _ExecOpCodeUnlocked(self, opctx):
1085
    """Processes one opcode and returns the result.
1086

1087
    """
1088
    op = opctx.op
1089

    
1090
    assert op.status == constants.OP_STATUS_WAITING
1091

    
1092
    timeout = opctx.GetNextLockTimeout()
1093

    
1094
    try:
1095
      # Make sure not to hold queue lock while calling ExecOpCode
1096
      result = self.opexec_fn(op.input,
1097
                              _OpExecCallbacks(self.queue, self.job, op),
1098
                              timeout=timeout)
1099
    except mcpu.LockAcquireTimeout:
1100
      assert timeout is not None, "Received timeout for blocking acquire"
1101
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1102

    
1103
      assert op.status in (constants.OP_STATUS_WAITING,
1104
                           constants.OP_STATUS_CANCELING)
1105

    
1106
      # Was job cancelled while we were waiting for the lock?
1107
      if op.status == constants.OP_STATUS_CANCELING:
1108
        return (constants.OP_STATUS_CANCELING, None)
1109

    
1110
      # Queue is shutting down, return to queued
1111
      if not self.queue.AcceptingJobsUnlocked():
1112
        return (constants.OP_STATUS_QUEUED, None)
1113

    
1114
      # Stay in waitlock while trying to re-acquire lock
1115
      return (constants.OP_STATUS_WAITING, None)
1116
    except CancelJob:
1117
      logging.exception("%s: Canceling job", opctx.log_prefix)
1118
      assert op.status == constants.OP_STATUS_CANCELING
1119
      return (constants.OP_STATUS_CANCELING, None)
1120

    
1121
    except QueueShutdown:
1122
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1123

    
1124
      assert op.status == constants.OP_STATUS_WAITING
1125

    
1126
      # Job hadn't been started yet, so it should return to the queue
1127
      return (constants.OP_STATUS_QUEUED, None)
1128

    
1129
    except Exception, err: # pylint: disable=W0703
1130
      logging.exception("%s: Caught exception in %s",
1131
                        opctx.log_prefix, opctx.summary)
1132
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1133
    else:
1134
      logging.debug("%s: %s successful",
1135
                    opctx.log_prefix, opctx.summary)
1136
      return (constants.OP_STATUS_SUCCESS, result)
1137

    
1138
  def __call__(self, _nextop_fn=None):
1139
    """Continues execution of a job.
1140

1141
    @param _nextop_fn: Callback function for tests
1142
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1143
      be deferred and C{WAITDEP} if the dependency manager
1144
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1145

1146
    """
1147
    queue = self.queue
1148
    job = self.job
1149

    
1150
    logging.debug("Processing job %s", job.id)
1151

    
1152
    queue.acquire(shared=1)
1153
    try:
1154
      opcount = len(job.ops)
1155

    
1156
      assert job.writable, "Expected writable job"
1157

    
1158
      # Don't do anything for finalized jobs
1159
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1160
        return self.FINISHED
1161

    
1162
      # Is a previous opcode still pending?
1163
      if job.cur_opctx:
1164
        opctx = job.cur_opctx
1165
        job.cur_opctx = None
1166
      else:
1167
        if __debug__ and _nextop_fn:
1168
          _nextop_fn()
1169
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1170

    
1171
      op = opctx.op
1172

    
1173
      # Consistency check
1174
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1175
                                     constants.OP_STATUS_CANCELING)
1176
                        for i in job.ops[opctx.index + 1:])
1177

    
1178
      assert op.status in (constants.OP_STATUS_QUEUED,
1179
                           constants.OP_STATUS_WAITING,
1180
                           constants.OP_STATUS_CANCELING)
1181

    
1182
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1183
              op.priority >= constants.OP_PRIO_HIGHEST)
1184

    
1185
      waitjob = None
1186

    
1187
      if op.status != constants.OP_STATUS_CANCELING:
1188
        assert op.status in (constants.OP_STATUS_QUEUED,
1189
                             constants.OP_STATUS_WAITING)
1190

    
1191
        # Prepare to start opcode
1192
        if self._MarkWaitlock(job, op):
1193
          # Write to disk
1194
          queue.UpdateJobUnlocked(job)
1195

    
1196
        assert op.status == constants.OP_STATUS_WAITING
1197
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1198
        assert job.start_timestamp and op.start_timestamp
1199
        assert waitjob is None
1200

    
1201
        # Check if waiting for a job is necessary
1202
        waitjob = self._CheckDependencies(queue, job, opctx)
1203

    
1204
        assert op.status in (constants.OP_STATUS_WAITING,
1205
                             constants.OP_STATUS_CANCELING,
1206
                             constants.OP_STATUS_ERROR)
1207

    
1208
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1209
                                         constants.OP_STATUS_ERROR)):
1210
          logging.info("%s: opcode %s waiting for locks",
1211
                       opctx.log_prefix, opctx.summary)
1212

    
1213
          assert not opctx.jobdeps, "Not all dependencies were removed"
1214

    
1215
          queue.release()
1216
          try:
1217
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1218
          finally:
1219
            queue.acquire(shared=1)
1220

    
1221
          op.status = op_status
1222
          op.result = op_result
1223

    
1224
          assert not waitjob
1225

    
1226
        if op.status in (constants.OP_STATUS_WAITING,
1227
                         constants.OP_STATUS_QUEUED):
1228
          # waiting: Couldn't get locks in time
1229
          # queued: Queue is shutting down
1230
          assert not op.end_timestamp
1231
        else:
1232
          # Finalize opcode
1233
          op.end_timestamp = TimeStampNow()
1234

    
1235
          if op.status == constants.OP_STATUS_CANCELING:
1236
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1237
                                  for i in job.ops[opctx.index:])
1238
          else:
1239
            assert op.status in constants.OPS_FINALIZED
1240

    
1241
      if op.status == constants.OP_STATUS_QUEUED:
1242
        # Queue is shutting down
1243
        assert not waitjob
1244

    
1245
        finalize = False
1246

    
1247
        # Reset context
1248
        job.cur_opctx = None
1249

    
1250
        # In no case must the status be finalized here
1251
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1252

    
1253
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1254
        finalize = False
1255

    
1256
        if not waitjob and opctx.CheckPriorityIncrease():
1257
          # Priority was changed, need to update on-disk file
1258
          queue.UpdateJobUnlocked(job)
1259

    
1260
        # Keep around for another round
1261
        job.cur_opctx = opctx
1262

    
1263
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1264
                op.priority >= constants.OP_PRIO_HIGHEST)
1265

    
1266
        # In no case must the status be finalized here
1267
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1268

    
1269
      else:
1270
        # Ensure all opcodes so far have been successful
1271
        assert (opctx.index == 0 or
1272
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1273
                           for i in job.ops[:opctx.index]))
1274

    
1275
        # Reset context
1276
        job.cur_opctx = None
1277

    
1278
        if op.status == constants.OP_STATUS_SUCCESS:
1279
          finalize = False
1280

    
1281
        elif op.status == constants.OP_STATUS_ERROR:
1282
          # Ensure failed opcode has an exception as its result
1283
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1284

    
1285
          to_encode = errors.OpExecError("Preceding opcode failed")
1286
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1287
                                _EncodeOpError(to_encode))
1288
          finalize = True
1289

    
1290
          # Consistency check
1291
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1292
                            errors.GetEncodedError(i.result)
1293
                            for i in job.ops[opctx.index:])
1294

    
1295
        elif op.status == constants.OP_STATUS_CANCELING:
1296
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1297
                                "Job canceled by request")
1298
          finalize = True
1299

    
1300
        else:
1301
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1302

    
1303
        if opctx.index == (opcount - 1):
1304
          # Finalize on last opcode
1305
          finalize = True
1306

    
1307
        if finalize:
1308
          # All opcodes have been run, finalize job
1309
          job.Finalize()
1310

    
1311
        # Write to disk. If the job status is final, this is the final write
1312
        # allowed. Once the file has been written, it can be archived anytime.
1313
        queue.UpdateJobUnlocked(job)
1314

    
1315
        assert not waitjob
1316

    
1317
        if finalize:
1318
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1319
          return self.FINISHED
1320

    
1321
      assert not waitjob or queue.depmgr.JobWaiting(job)
1322

    
1323
      if waitjob:
1324
        return self.WAITDEP
1325
      else:
1326
        return self.DEFER
1327
    finally:
1328
      assert job.writable, "Job became read-only while being processed"
1329
      queue.release()
1330

    
1331

    
1332
def _EvaluateJobProcessorResult(depmgr, job, result):
1333
  """Looks at a result from L{_JobProcessor} for a job.
1334

1335
  To be used in a L{_JobQueueWorker}.
1336

1337
  """
1338
  if result == _JobProcessor.FINISHED:
1339
    # Notify waiting jobs
1340
    depmgr.NotifyWaiters(job.id)
1341

    
1342
  elif result == _JobProcessor.DEFER:
1343
    # Schedule again
1344
    raise workerpool.DeferTask(priority=job.CalcPriority())
1345

    
1346
  elif result == _JobProcessor.WAITDEP:
1347
    # No-op, dependency manager will re-schedule
1348
    pass
1349

    
1350
  else:
1351
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1352
                                 (result, ))
1353

    
1354

    
1355
class _JobQueueWorker(workerpool.BaseWorker):
1356
  """The actual job workers.
1357

1358
  """
1359
  def RunTask(self, job): # pylint: disable=W0221
1360
    """Job executor.
1361

1362
    @type job: L{_QueuedJob}
1363
    @param job: the job to be processed
1364

1365
    """
1366
    assert job.writable, "Expected writable job"
1367

    
1368
    # Ensure only one worker is active on a single job. If a job registers for
1369
    # a dependency job, and the other job notifies before the first worker is
1370
    # done, the job can end up in the tasklist more than once.
1371
    job.processor_lock.acquire()
1372
    try:
1373
      return self._RunTaskInner(job)
1374
    finally:
1375
      job.processor_lock.release()
1376

    
1377
  def _RunTaskInner(self, job):
1378
    """Executes a job.
1379

1380
    Must be called with per-job lock acquired.
1381

1382
    """
1383
    queue = job.queue
1384
    assert queue == self.pool.queue
1385

    
1386
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1387
    setname_fn(None)
1388

    
1389
    proc = mcpu.Processor(queue.context, job.id)
1390

    
1391
    # Create wrapper for setting thread name
1392
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1393
                                    proc.ExecOpCode)
1394

    
1395
    _EvaluateJobProcessorResult(queue.depmgr, job,
1396
                                _JobProcessor(queue, wrap_execop_fn, job)())
1397

    
1398
  @staticmethod
1399
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1400
    """Updates the worker thread name to include a short summary of the opcode.
1401

1402
    @param setname_fn: Callable setting worker thread name
1403
    @param execop_fn: Callable for executing opcode (usually
1404
                      L{mcpu.Processor.ExecOpCode})
1405

1406
    """
1407
    setname_fn(op)
1408
    try:
1409
      return execop_fn(op, *args, **kwargs)
1410
    finally:
1411
      setname_fn(None)
1412

    
1413
  @staticmethod
1414
  def _GetWorkerName(job, op):
1415
    """Sets the worker thread name.
1416

1417
    @type job: L{_QueuedJob}
1418
    @type op: L{opcodes.OpCode}
1419

1420
    """
1421
    parts = ["Job%s" % job.id]
1422

    
1423
    if op:
1424
      parts.append(op.TinySummary())
1425

    
1426
    return "/".join(parts)
1427

    
1428

    
1429
class _JobQueueWorkerPool(workerpool.WorkerPool):
1430
  """Simple class implementing a job-processing workerpool.
1431

1432
  """
1433
  def __init__(self, queue):
1434
    super(_JobQueueWorkerPool, self).__init__("Jq",
1435
                                              JOBQUEUE_THREADS,
1436
                                              _JobQueueWorker)
1437
    self.queue = queue
1438

    
1439

    
1440
class _JobDependencyManager:
1441
  """Keeps track of job dependencies.
1442

1443
  """
1444
  (WAIT,
1445
   ERROR,
1446
   CANCEL,
1447
   CONTINUE,
1448
   WRONGSTATUS) = range(1, 6)
1449

    
1450
  def __init__(self, getstatus_fn, enqueue_fn):
1451
    """Initializes this class.
1452

1453
    """
1454
    self._getstatus_fn = getstatus_fn
1455
    self._enqueue_fn = enqueue_fn
1456

    
1457
    self._waiters = {}
1458
    self._lock = locking.SharedLock("JobDepMgr")
1459

    
1460
  @locking.ssynchronized(_LOCK, shared=1)
1461
  def GetLockInfo(self, requested): # pylint: disable=W0613
1462
    """Retrieves information about waiting jobs.
1463

1464
    @type requested: set
1465
    @param requested: Requested information, see C{query.LQ_*}
1466

1467
    """
1468
    # No need to sort here, that's being done by the lock manager and query
1469
    # library. There are no priorities for notifying jobs, hence all show up as
1470
    # one item under "pending".
1471
    return [("job/%s" % job_id, None, None,
1472
             [("job", [job.id for job in waiters])])
1473
            for job_id, waiters in self._waiters.items()
1474
            if waiters]
1475

    
1476
  @locking.ssynchronized(_LOCK, shared=1)
1477
  def JobWaiting(self, job):
1478
    """Checks if a job is waiting.
1479

1480
    """
1481
    return compat.any(job in jobs
1482
                      for jobs in self._waiters.values())
1483

    
1484
  @locking.ssynchronized(_LOCK)
1485
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1486
    """Checks if a dependency job has the requested status.
1487

1488
    If the other job is not yet in a finalized status, the calling job will be
1489
    notified (re-added to the workerpool) at a later point.
1490

1491
    @type job: L{_QueuedJob}
1492
    @param job: Job object
1493
    @type dep_job_id: int
1494
    @param dep_job_id: ID of dependency job
1495
    @type dep_status: list
1496
    @param dep_status: Required status
1497

1498
    """
1499
    assert ht.TJobId(job.id)
1500
    assert ht.TJobId(dep_job_id)
1501
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1502

    
1503
    if job.id == dep_job_id:
1504
      return (self.ERROR, "Job can't depend on itself")
1505

    
1506
    # Get status of dependency job
1507
    try:
1508
      status = self._getstatus_fn(dep_job_id)
1509
    except errors.JobLost, err:
1510
      return (self.ERROR, "Dependency error: %s" % err)
1511

    
1512
    assert status in constants.JOB_STATUS_ALL
1513

    
1514
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1515

    
1516
    if status not in constants.JOBS_FINALIZED:
1517
      # Register for notification and wait for job to finish
1518
      job_id_waiters.add(job)
1519
      return (self.WAIT,
1520
              "Need to wait for job %s, wanted status '%s'" %
1521
              (dep_job_id, dep_status))
1522

    
1523
    # Remove from waiters list
1524
    if job in job_id_waiters:
1525
      job_id_waiters.remove(job)
1526

    
1527
    if (status == constants.JOB_STATUS_CANCELED and
1528
        constants.JOB_STATUS_CANCELED not in dep_status):
1529
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1530

    
1531
    elif not dep_status or status in dep_status:
1532
      return (self.CONTINUE,
1533
              "Dependency job %s finished with status '%s'" %
1534
              (dep_job_id, status))
1535

    
1536
    else:
1537
      return (self.WRONGSTATUS,
1538
              "Dependency job %s finished with status '%s',"
1539
              " not one of '%s' as required" %
1540
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1541

    
1542
  def _RemoveEmptyWaitersUnlocked(self):
1543
    """Remove all jobs without actual waiters.
1544

1545
    """
1546
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1547
                   if not waiters]:
1548
      del self._waiters[job_id]
1549

    
1550
  def NotifyWaiters(self, job_id):
1551
    """Notifies all jobs waiting for a certain job ID.
1552

1553
    @attention: Do not call until L{CheckAndRegister} returned a status other
1554
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1555
    @type job_id: int
1556
    @param job_id: Job ID
1557

1558
    """
1559
    assert ht.TJobId(job_id)
1560

    
1561
    self._lock.acquire()
1562
    try:
1563
      self._RemoveEmptyWaitersUnlocked()
1564

    
1565
      jobs = self._waiters.pop(job_id, None)
1566
    finally:
1567
      self._lock.release()
1568

    
1569
    if jobs:
1570
      # Re-add jobs to workerpool
1571
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1572
                    len(jobs), job_id)
1573
      self._enqueue_fn(jobs)
1574

    
1575

    
1576
def _RequireOpenQueue(fn):
1577
  """Decorator for "public" functions.
1578

1579
  This function should be used for all 'public' functions. That is,
1580
  functions usually called from other classes. Note that this should
1581
  be applied only to methods (not plain functions), since it expects
1582
  that the decorated function is called with a first argument that has
1583
  a '_queue_filelock' argument.
1584

1585
  @warning: Use this decorator only after locking.ssynchronized
1586

1587
  Example::
1588
    @locking.ssynchronized(_LOCK)
1589
    @_RequireOpenQueue
1590
    def Example(self):
1591
      pass
1592

1593
  """
1594
  def wrapper(self, *args, **kwargs):
1595
    # pylint: disable=W0212
1596
    assert self._queue_filelock is not None, "Queue should be open"
1597
    return fn(self, *args, **kwargs)
1598
  return wrapper
1599

    
1600

    
1601
def _RequireNonDrainedQueue(fn):
1602
  """Decorator checking for a non-drained queue.
1603

1604
  To be used with functions submitting new jobs.
1605

1606
  """
1607
  def wrapper(self, *args, **kwargs):
1608
    """Wrapper function.
1609

1610
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1611

1612
    """
1613
    # Ok when sharing the big job queue lock, as the drain file is created when
1614
    # the lock is exclusive.
1615
    # Needs access to protected member, pylint: disable=W0212
1616
    if self._drained:
1617
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1618

    
1619
    if not self._accepting_jobs:
1620
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1621

    
1622
    return fn(self, *args, **kwargs)
1623
  return wrapper
1624

    
1625

    
1626
class JobQueue(object):
1627
  """Queue used to manage the jobs.
1628

1629
  """
1630
  def __init__(self, context):
1631
    """Constructor for JobQueue.
1632

1633
    The constructor will initialize the job queue object and then
1634
    start loading the current jobs from disk, either for starting them
1635
    (if they were queue) or for aborting them (if they were already
1636
    running).
1637

1638
    @type context: GanetiContext
1639
    @param context: the context object for access to the configuration
1640
        data and other ganeti objects
1641

1642
    """
1643
    self.context = context
1644
    self._memcache = weakref.WeakValueDictionary()
1645
    self._my_hostname = netutils.Hostname.GetSysName()
1646

    
1647
    # The Big JobQueue lock. If a code block or method acquires it in shared
1648
    # mode safe it must guarantee concurrency with all the code acquiring it in
1649
    # shared mode, including itself. In order not to acquire it at all
1650
    # concurrency must be guaranteed with all code acquiring it in shared mode
1651
    # and all code acquiring it exclusively.
1652
    self._lock = locking.SharedLock("JobQueue")
1653

    
1654
    self.acquire = self._lock.acquire
1655
    self.release = self._lock.release
1656

    
1657
    # Accept jobs by default
1658
    self._accepting_jobs = True
1659

    
1660
    # Initialize the queue, and acquire the filelock.
1661
    # This ensures no other process is working on the job queue.
1662
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1663

    
1664
    # Read serial file
1665
    self._last_serial = jstore.ReadSerial()
1666
    assert self._last_serial is not None, ("Serial file was modified between"
1667
                                           " check in jstore and here")
1668

    
1669
    # Get initial list of nodes
1670
    self._nodes = dict((n.name, n.primary_ip)
1671
                       for n in self.context.cfg.GetAllNodesInfo().values()
1672
                       if n.master_candidate)
1673

    
1674
    # Remove master node
1675
    self._nodes.pop(self._my_hostname, None)
1676

    
1677
    # TODO: Check consistency across nodes
1678

    
1679
    self._queue_size = None
1680
    self._UpdateQueueSizeUnlocked()
1681
    assert ht.TInt(self._queue_size)
1682
    self._drained = jstore.CheckDrainFlag()
1683

    
1684
    # Job dependencies
1685
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1686
                                        self._EnqueueJobs)
1687
    self.context.glm.AddToLockMonitor(self.depmgr)
1688

    
1689
    # Setup worker pool
1690
    self._wpool = _JobQueueWorkerPool(self)
1691
    try:
1692
      self._InspectQueue()
1693
    except:
1694
      self._wpool.TerminateWorkers()
1695
      raise
1696

    
1697
  @locking.ssynchronized(_LOCK)
1698
  @_RequireOpenQueue
1699
  def _InspectQueue(self):
1700
    """Loads the whole job queue and resumes unfinished jobs.
1701

1702
    This function needs the lock here because WorkerPool.AddTask() may start a
1703
    job while we're still doing our work.
1704

1705
    """
1706
    logging.info("Inspecting job queue")
1707

    
1708
    restartjobs = []
1709

    
1710
    all_job_ids = self._GetJobIDsUnlocked()
1711
    jobs_count = len(all_job_ids)
1712
    lastinfo = time.time()
1713
    for idx, job_id in enumerate(all_job_ids):
1714
      # Give an update every 1000 jobs or 10 seconds
1715
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1716
          idx == (jobs_count - 1)):
1717
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1718
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1719
        lastinfo = time.time()
1720

    
1721
      job = self._LoadJobUnlocked(job_id)
1722

    
1723
      # a failure in loading the job can cause 'None' to be returned
1724
      if job is None:
1725
        continue
1726

    
1727
      status = job.CalcStatus()
1728

    
1729
      if status == constants.JOB_STATUS_QUEUED:
1730
        restartjobs.append(job)
1731

    
1732
      elif status in (constants.JOB_STATUS_RUNNING,
1733
                      constants.JOB_STATUS_WAITING,
1734
                      constants.JOB_STATUS_CANCELING):
1735
        logging.warning("Unfinished job %s found: %s", job.id, job)
1736

    
1737
        if status == constants.JOB_STATUS_WAITING:
1738
          # Restart job
1739
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1740
          restartjobs.append(job)
1741
        else:
1742
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1743
                                "Unclean master daemon shutdown")
1744
          job.Finalize()
1745

    
1746
        self.UpdateJobUnlocked(job)
1747

    
1748
    if restartjobs:
1749
      logging.info("Restarting %s jobs", len(restartjobs))
1750
      self._EnqueueJobsUnlocked(restartjobs)
1751

    
1752
    logging.info("Job queue inspection finished")
1753

    
1754
  def _GetRpc(self, address_list):
1755
    """Gets RPC runner with context.
1756

1757
    """
1758
    return rpc.JobQueueRunner(self.context, address_list)
1759

    
1760
  @locking.ssynchronized(_LOCK)
1761
  @_RequireOpenQueue
1762
  def AddNode(self, node):
1763
    """Register a new node with the queue.
1764

1765
    @type node: L{objects.Node}
1766
    @param node: the node object to be added
1767

1768
    """
1769
    node_name = node.name
1770
    assert node_name != self._my_hostname
1771

    
1772
    # Clean queue directory on added node
1773
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1774
    msg = result.fail_msg
1775
    if msg:
1776
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1777
                      node_name, msg)
1778

    
1779
    if not node.master_candidate:
1780
      # remove if existing, ignoring errors
1781
      self._nodes.pop(node_name, None)
1782
      # and skip the replication of the job ids
1783
      return
1784

    
1785
    # Upload the whole queue excluding archived jobs
1786
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1787

    
1788
    # Upload current serial file
1789
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1790

    
1791
    # Static address list
1792
    addrs = [node.primary_ip]
1793

    
1794
    for file_name in files:
1795
      # Read file content
1796
      content = utils.ReadFile(file_name)
1797

    
1798
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1799
                             file_name, content)
1800
      msg = result[node_name].fail_msg
1801
      if msg:
1802
        logging.error("Failed to upload file %s to node %s: %s",
1803
                      file_name, node_name, msg)
1804

    
1805
    # Set queue drained flag
1806
    result = \
1807
      self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1808
                                                       self._drained)
1809
    msg = result[node_name].fail_msg
1810
    if msg:
1811
      logging.error("Failed to set queue drained flag on node %s: %s",
1812
                    node_name, msg)
1813

    
1814
    self._nodes[node_name] = node.primary_ip
1815

    
1816
  @locking.ssynchronized(_LOCK)
1817
  @_RequireOpenQueue
1818
  def RemoveNode(self, node_name):
1819
    """Callback called when removing nodes from the cluster.
1820

1821
    @type node_name: str
1822
    @param node_name: the name of the node to remove
1823

1824
    """
1825
    self._nodes.pop(node_name, None)
1826

    
1827
  @staticmethod
1828
  def _CheckRpcResult(result, nodes, failmsg):
1829
    """Verifies the status of an RPC call.
1830

1831
    Since we aim to keep consistency should this node (the current
1832
    master) fail, we will log errors if our rpc fail, and especially
1833
    log the case when more than half of the nodes fails.
1834

1835
    @param result: the data as returned from the rpc call
1836
    @type nodes: list
1837
    @param nodes: the list of nodes we made the call to
1838
    @type failmsg: str
1839
    @param failmsg: the identifier to be used for logging
1840

1841
    """
1842
    failed = []
1843
    success = []
1844

    
1845
    for node in nodes:
1846
      msg = result[node].fail_msg
1847
      if msg:
1848
        failed.append(node)
1849
        logging.error("RPC call %s (%s) failed on node %s: %s",
1850
                      result[node].call, failmsg, node, msg)
1851
      else:
1852
        success.append(node)
1853

    
1854
    # +1 for the master node
1855
    if (len(success) + 1) < len(failed):
1856
      # TODO: Handle failing nodes
1857
      logging.error("More than half of the nodes failed")
1858

    
1859
  def _GetNodeIp(self):
1860
    """Helper for returning the node name/ip list.
1861

1862
    @rtype: (list, list)
1863
    @return: a tuple of two lists, the first one with the node
1864
        names and the second one with the node addresses
1865

1866
    """
1867
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1868
    name_list = self._nodes.keys()
1869
    addr_list = [self._nodes[name] for name in name_list]
1870
    return name_list, addr_list
1871

    
1872
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1873
    """Writes a file locally and then replicates it to all nodes.
1874

1875
    This function will replace the contents of a file on the local
1876
    node and then replicate it to all the other nodes we have.
1877

1878
    @type file_name: str
1879
    @param file_name: the path of the file to be replicated
1880
    @type data: str
1881
    @param data: the new contents of the file
1882
    @type replicate: boolean
1883
    @param replicate: whether to spread the changes to the remote nodes
1884

1885
    """
1886
    getents = runtime.GetEnts()
1887
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1888
                    gid=getents.daemons_gid,
1889
                    mode=constants.JOB_QUEUE_FILES_PERMS)
1890

    
1891
    if replicate:
1892
      names, addrs = self._GetNodeIp()
1893
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1894
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1895

    
1896
  def _RenameFilesUnlocked(self, rename):
1897
    """Renames a file locally and then replicate the change.
1898

1899
    This function will rename a file in the local queue directory
1900
    and then replicate this rename to all the other nodes we have.
1901

1902
    @type rename: list of (old, new)
1903
    @param rename: List containing tuples mapping old to new names
1904

1905
    """
1906
    # Rename them locally
1907
    for old, new in rename:
1908
      utils.RenameFile(old, new, mkdir=True)
1909

    
1910
    # ... and on all nodes
1911
    names, addrs = self._GetNodeIp()
1912
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1913
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1914

    
1915
  def _NewSerialsUnlocked(self, count):
1916
    """Generates a new job identifier.
1917

1918
    Job identifiers are unique during the lifetime of a cluster.
1919

1920
    @type count: integer
1921
    @param count: how many serials to return
1922
    @rtype: list of int
1923
    @return: a list of job identifiers.
1924

1925
    """
1926
    assert ht.TNonNegativeInt(count)
1927

    
1928
    # New number
1929
    serial = self._last_serial + count
1930

    
1931
    # Write to file
1932
    self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1933
                             "%s\n" % serial, True)
1934

    
1935
    result = [jstore.FormatJobID(v)
1936
              for v in range(self._last_serial + 1, serial + 1)]
1937

    
1938
    # Keep it only if we were able to write the file
1939
    self._last_serial = serial
1940

    
1941
    assert len(result) == count
1942

    
1943
    return result
1944

    
1945
  @staticmethod
1946
  def _GetJobPath(job_id):
1947
    """Returns the job file for a given job id.
1948

1949
    @type job_id: str
1950
    @param job_id: the job identifier
1951
    @rtype: str
1952
    @return: the path to the job file
1953

1954
    """
1955
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1956

    
1957
  @staticmethod
1958
  def _GetArchivedJobPath(job_id):
1959
    """Returns the archived job file for a give job id.
1960

1961
    @type job_id: str
1962
    @param job_id: the job identifier
1963
    @rtype: str
1964
    @return: the path to the archived job file
1965

1966
    """
1967
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1968
                          jstore.GetArchiveDirectory(job_id),
1969
                          "job-%s" % job_id)
1970

    
1971
  @staticmethod
1972
  def _DetermineJobDirectories(archived):
1973
    """Build list of directories containing job files.
1974

1975
    @type archived: bool
1976
    @param archived: Whether to include directories for archived jobs
1977
    @rtype: list
1978

1979
    """
1980
    result = [pathutils.QUEUE_DIR]
1981

    
1982
    if archived:
1983
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1984
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
1985
                        utils.ListVisibleFiles(archive_path)))
1986

    
1987
    return result
1988

    
1989
  @classmethod
1990
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1991
    """Return all known job IDs.
1992

1993
    The method only looks at disk because it's a requirement that all
1994
    jobs are present on disk (so in the _memcache we don't have any
1995
    extra IDs).
1996

1997
    @type sort: boolean
1998
    @param sort: perform sorting on the returned job ids
1999
    @rtype: list
2000
    @return: the list of job IDs
2001

2002
    """
2003
    jlist = []
2004

    
2005
    for path in cls._DetermineJobDirectories(archived):
2006
      for filename in utils.ListVisibleFiles(path):
2007
        m = constants.JOB_FILE_RE.match(filename)
2008
        if m:
2009
          jlist.append(int(m.group(1)))
2010

    
2011
    if sort:
2012
      jlist.sort()
2013
    return jlist
2014

    
2015
  def _LoadJobUnlocked(self, job_id):
2016
    """Loads a job from the disk or memory.
2017

2018
    Given a job id, this will return the cached job object if
2019
    existing, or try to load the job from the disk. If loading from
2020
    disk, it will also add the job to the cache.
2021

2022
    @type job_id: int
2023
    @param job_id: the job id
2024
    @rtype: L{_QueuedJob} or None
2025
    @return: either None or the job object
2026

2027
    """
2028
    job = self._memcache.get(job_id, None)
2029
    if job:
2030
      logging.debug("Found job %s in memcache", job_id)
2031
      assert job.writable, "Found read-only job in memcache"
2032
      return job
2033

    
2034
    try:
2035
      job = self._LoadJobFromDisk(job_id, False)
2036
      if job is None:
2037
        return job
2038
    except errors.JobFileCorrupted:
2039
      old_path = self._GetJobPath(job_id)
2040
      new_path = self._GetArchivedJobPath(job_id)
2041
      if old_path == new_path:
2042
        # job already archived (future case)
2043
        logging.exception("Can't parse job %s", job_id)
2044
      else:
2045
        # non-archived case
2046
        logging.exception("Can't parse job %s, will archive.", job_id)
2047
        self._RenameFilesUnlocked([(old_path, new_path)])
2048
      return None
2049

    
2050
    assert job.writable, "Job just loaded is not writable"
2051

    
2052
    self._memcache[job_id] = job
2053
    logging.debug("Added job %s to the cache", job_id)
2054
    return job
2055

    
2056
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2057
    """Load the given job file from disk.
2058

2059
    Given a job file, read, load and restore it in a _QueuedJob format.
2060

2061
    @type job_id: int
2062
    @param job_id: job identifier
2063
    @type try_archived: bool
2064
    @param try_archived: Whether to try loading an archived job
2065
    @rtype: L{_QueuedJob} or None
2066
    @return: either None or the job object
2067

2068
    """
2069
    path_functions = [(self._GetJobPath, False)]
2070

    
2071
    if try_archived:
2072
      path_functions.append((self._GetArchivedJobPath, True))
2073

    
2074
    raw_data = None
2075
    archived = None
2076

    
2077
    for (fn, archived) in path_functions:
2078
      filepath = fn(job_id)
2079
      logging.debug("Loading job from %s", filepath)
2080
      try:
2081
        raw_data = utils.ReadFile(filepath)
2082
      except EnvironmentError, err:
2083
        if err.errno != errno.ENOENT:
2084
          raise
2085
      else:
2086
        break
2087

    
2088
    if not raw_data:
2089
      return None
2090

    
2091
    if writable is None:
2092
      writable = not archived
2093

    
2094
    try:
2095
      data = serializer.LoadJson(raw_data)
2096
      job = _QueuedJob.Restore(self, data, writable, archived)
2097
    except Exception, err: # pylint: disable=W0703
2098
      raise errors.JobFileCorrupted(err)
2099

    
2100
    return job
2101

    
2102
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2103
    """Load the given job file from disk.
2104

2105
    Given a job file, read, load and restore it in a _QueuedJob format.
2106
    In case of error reading the job, it gets returned as None, and the
2107
    exception is logged.
2108

2109
    @type job_id: int
2110
    @param job_id: job identifier
2111
    @type try_archived: bool
2112
    @param try_archived: Whether to try loading an archived job
2113
    @rtype: L{_QueuedJob} or None
2114
    @return: either None or the job object
2115

2116
    """
2117
    try:
2118
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2119
    except (errors.JobFileCorrupted, EnvironmentError):
2120
      logging.exception("Can't load/parse job %s", job_id)
2121
      return None
2122

    
2123
  def _UpdateQueueSizeUnlocked(self):
2124
    """Update the queue size.
2125

2126
    """
2127
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2128

    
2129
  @locking.ssynchronized(_LOCK)
2130
  @_RequireOpenQueue
2131
  def SetDrainFlag(self, drain_flag):
2132
    """Sets the drain flag for the queue.
2133

2134
    @type drain_flag: boolean
2135
    @param drain_flag: Whether to set or unset the drain flag
2136

2137
    """
2138
    # Change flag locally
2139
    jstore.SetDrainFlag(drain_flag)
2140

    
2141
    self._drained = drain_flag
2142

    
2143
    # ... and on all nodes
2144
    (names, addrs) = self._GetNodeIp()
2145
    result = \
2146
      self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2147
    self._CheckRpcResult(result, self._nodes,
2148
                         "Setting queue drain flag to %s" % drain_flag)
2149

    
2150
    return True
2151

    
2152
  @_RequireOpenQueue
2153
  def _SubmitJobUnlocked(self, job_id, ops):
2154
    """Create and store a new job.
2155

2156
    This enters the job into our job queue and also puts it on the new
2157
    queue, in order for it to be picked up by the queue processors.
2158

2159
    @type job_id: job ID
2160
    @param job_id: the job ID for the new job
2161
    @type ops: list
2162
    @param ops: The list of OpCodes that will become the new job.
2163
    @rtype: L{_QueuedJob}
2164
    @return: the job object to be queued
2165
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2166
    @raise errors.GenericError: If an opcode is not valid
2167

2168
    """
2169
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2170
      raise errors.JobQueueFull()
2171

    
2172
    job = _QueuedJob(self, job_id, ops, True)
2173

    
2174
    for idx, op in enumerate(job.ops):
2175
      # Check priority
2176
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2177
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2178
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2179
                                  " are %s" % (idx, op.priority, allowed))
2180

    
2181
      # Check job dependencies
2182
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2183
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2184
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2185
                                  " match %s: %s" %
2186
                                  (idx, opcodes.TNoRelativeJobDependencies,
2187
                                   dependencies))
2188

    
2189
    # Write to disk
2190
    self.UpdateJobUnlocked(job)
2191

    
2192
    self._queue_size += 1
2193

    
2194
    logging.debug("Adding new job %s to the cache", job_id)
2195
    self._memcache[job_id] = job
2196

    
2197
    return job
2198

    
2199
  @locking.ssynchronized(_LOCK)
2200
  @_RequireOpenQueue
2201
  @_RequireNonDrainedQueue
2202
  def SubmitJob(self, ops):
2203
    """Create and store a new job.
2204

2205
    @see: L{_SubmitJobUnlocked}
2206

2207
    """
2208
    (job_id, ) = self._NewSerialsUnlocked(1)
2209
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2210
    return job_id
2211

    
2212
  @locking.ssynchronized(_LOCK)
2213
  @_RequireOpenQueue
2214
  @_RequireNonDrainedQueue
2215
  def SubmitManyJobs(self, jobs):
2216
    """Create and store multiple jobs.
2217

2218
    @see: L{_SubmitJobUnlocked}
2219

2220
    """
2221
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2222

    
2223
    (results, added_jobs) = \
2224
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2225

    
2226
    self._EnqueueJobsUnlocked(added_jobs)
2227

    
2228
    return results
2229

    
2230
  @staticmethod
2231
  def _FormatSubmitError(msg, ops):
2232
    """Formats errors which occurred while submitting a job.
2233

2234
    """
2235
    return ("%s; opcodes %s" %
2236
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2237

    
2238
  @staticmethod
2239
  def _ResolveJobDependencies(resolve_fn, deps):
2240
    """Resolves relative job IDs in dependencies.
2241

2242
    @type resolve_fn: callable
2243
    @param resolve_fn: Function to resolve a relative job ID
2244
    @type deps: list
2245
    @param deps: Dependencies
2246
    @rtype: tuple; (boolean, string or list)
2247
    @return: If successful (first tuple item), the returned list contains
2248
      resolved job IDs along with the requested status; if not successful,
2249
      the second element is an error message
2250

2251
    """
2252
    result = []
2253

    
2254
    for (dep_job_id, dep_status) in deps:
2255
      if ht.TRelativeJobId(dep_job_id):
2256
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2257
        try:
2258
          job_id = resolve_fn(dep_job_id)
2259
        except IndexError:
2260
          # Abort
2261
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2262
      else:
2263
        job_id = dep_job_id
2264

    
2265
      result.append((job_id, dep_status))
2266

    
2267
    return (True, result)
2268

    
2269
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2270
    """Create and store multiple jobs.
2271

2272
    @see: L{_SubmitJobUnlocked}
2273

2274
    """
2275
    results = []
2276
    added_jobs = []
2277

    
2278
    def resolve_fn(job_idx, reljobid):
2279
      assert reljobid < 0
2280
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2281

    
2282
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2283
      for op in ops:
2284
        if getattr(op, opcodes.DEPEND_ATTR, None):
2285
          (status, data) = \
2286
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2287
                                         op.depends)
2288
          if not status:
2289
            # Abort resolving dependencies
2290
            assert ht.TNonEmptyString(data), "No error message"
2291
            break
2292
          # Use resolved dependencies
2293
          op.depends = data
2294
      else:
2295
        try:
2296
          job = self._SubmitJobUnlocked(job_id, ops)
2297
        except errors.GenericError, err:
2298
          status = False
2299
          data = self._FormatSubmitError(str(err), ops)
2300
        else:
2301
          status = True
2302
          data = job_id
2303
          added_jobs.append(job)
2304

    
2305
      results.append((status, data))
2306

    
2307
    return (results, added_jobs)
2308

    
2309
  @locking.ssynchronized(_LOCK)
2310
  def _EnqueueJobs(self, jobs):
2311
    """Helper function to add jobs to worker pool's queue.
2312

2313
    @type jobs: list
2314
    @param jobs: List of all jobs
2315

2316
    """
2317
    return self._EnqueueJobsUnlocked(jobs)
2318

    
2319
  def _EnqueueJobsUnlocked(self, jobs):
2320
    """Helper function to add jobs to worker pool's queue.
2321

2322
    @type jobs: list
2323
    @param jobs: List of all jobs
2324

2325
    """
2326
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2327
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2328
                             priority=[job.CalcPriority() for job in jobs],
2329
                             task_id=map(_GetIdAttr, jobs))
2330

    
2331
  def _GetJobStatusForDependencies(self, job_id):
2332
    """Gets the status of a job for dependencies.
2333

2334
    @type job_id: int
2335
    @param job_id: Job ID
2336
    @raise errors.JobLost: If job can't be found
2337

2338
    """
2339
    # Not using in-memory cache as doing so would require an exclusive lock
2340

    
2341
    # Try to load from disk
2342
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2343

    
2344
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2345

    
2346
    if job:
2347
      return job.CalcStatus()
2348

    
2349
    raise errors.JobLost("Job %s not found" % job_id)
2350

    
2351
  @_RequireOpenQueue
2352
  def UpdateJobUnlocked(self, job, replicate=True):
2353
    """Update a job's on disk storage.
2354

2355
    After a job has been modified, this function needs to be called in
2356
    order to write the changes to disk and replicate them to the other
2357
    nodes.
2358

2359
    @type job: L{_QueuedJob}
2360
    @param job: the changed job
2361
    @type replicate: boolean
2362
    @param replicate: whether to replicate the change to remote nodes
2363

2364
    """
2365
    if __debug__:
2366
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2367
      assert (finalized ^ (job.end_timestamp is None))
2368
      assert job.writable, "Can't update read-only job"
2369
      assert not job.archived, "Can't update archived job"
2370

    
2371
    filename = self._GetJobPath(job.id)
2372
    data = serializer.DumpJson(job.Serialize())
2373
    logging.debug("Writing job %s to %s", job.id, filename)
2374
    self._UpdateJobQueueFile(filename, data, replicate)
2375

    
2376
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2377
                        timeout):
2378
    """Waits for changes in a job.
2379

2380
    @type job_id: int
2381
    @param job_id: Job identifier
2382
    @type fields: list of strings
2383
    @param fields: Which fields to check for changes
2384
    @type prev_job_info: list or None
2385
    @param prev_job_info: Last job information returned
2386
    @type prev_log_serial: int
2387
    @param prev_log_serial: Last job message serial number
2388
    @type timeout: float
2389
    @param timeout: maximum time to wait in seconds
2390
    @rtype: tuple (job info, log entries)
2391
    @return: a tuple of the job information as required via
2392
        the fields parameter, and the log entries as a list
2393

2394
        if the job has not changed and the timeout has expired,
2395
        we instead return a special value,
2396
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2397
        as such by the clients
2398

2399
    """
2400
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2401
                             writable=False)
2402

    
2403
    helper = _WaitForJobChangesHelper()
2404

    
2405
    return helper(self._GetJobPath(job_id), load_fn,
2406
                  fields, prev_job_info, prev_log_serial, timeout)
2407

    
2408
  @locking.ssynchronized(_LOCK)
2409
  @_RequireOpenQueue
2410
  def CancelJob(self, job_id):
2411
    """Cancels a job.
2412

2413
    This will only succeed if the job has not started yet.
2414

2415
    @type job_id: int
2416
    @param job_id: job ID of job to be cancelled.
2417

2418
    """
2419
    logging.info("Cancelling job %s", job_id)
2420

    
2421
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2422

    
2423
  @locking.ssynchronized(_LOCK)
2424
  @_RequireOpenQueue
2425
  def ChangeJobPriority(self, job_id, priority):
2426
    """Changes a job's priority.
2427

2428
    @type job_id: int
2429
    @param job_id: ID of the job whose priority should be changed
2430
    @type priority: int
2431
    @param priority: New priority
2432

2433
    """
2434
    logging.info("Changing priority of job %s to %s", job_id, priority)
2435

    
2436
    if priority not in constants.OP_PRIO_SUBMIT_VALID:
2437
      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2438
      raise errors.GenericError("Invalid priority %s, allowed are %s" %
2439
                                (priority, allowed))
2440

    
2441
    def fn(job):
2442
      (success, msg) = job.ChangePriority(priority)
2443

    
2444
      if success:
2445
        try:
2446
          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2447
        except workerpool.NoSuchTask:
2448
          logging.debug("Job %s is not in workerpool at this time", job.id)
2449

    
2450
      return (success, msg)
2451

    
2452
    return self._ModifyJobUnlocked(job_id, fn)
2453

    
2454
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2455
    """Modifies a job.
2456

2457
    @type job_id: int
2458
    @param job_id: Job ID
2459
    @type mod_fn: callable
2460
    @param mod_fn: Modifying function, receiving job object as parameter,
2461
      returning tuple of (status boolean, message string)
2462

2463
    """
2464
    job = self._LoadJobUnlocked(job_id)
2465
    if not job:
2466
      logging.debug("Job %s not found", job_id)
2467
      return (False, "Job %s not found" % job_id)
2468

    
2469
    assert job.writable, "Can't modify read-only job"
2470
    assert not job.archived, "Can't modify archived job"
2471

    
2472
    (success, msg) = mod_fn(job)
2473

    
2474
    if success:
2475
      # If the job was finalized (e.g. cancelled), this is the final write
2476
      # allowed. The job can be archived anytime.
2477
      self.UpdateJobUnlocked(job)
2478

    
2479
    return (success, msg)
2480

    
2481
  @_RequireOpenQueue
2482
  def _ArchiveJobsUnlocked(self, jobs):
2483
    """Archives jobs.
2484

2485
    @type jobs: list of L{_QueuedJob}
2486
    @param jobs: Job objects
2487
    @rtype: int
2488
    @return: Number of archived jobs
2489

2490
    """
2491
    archive_jobs = []
2492
    rename_files = []
2493
    for job in jobs:
2494
      assert job.writable, "Can't archive read-only job"
2495
      assert not job.archived, "Can't cancel archived job"
2496

    
2497
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2498
        logging.debug("Job %s is not yet done", job.id)
2499
        continue
2500

    
2501
      archive_jobs.append(job)
2502

    
2503
      old = self._GetJobPath(job.id)
2504
      new = self._GetArchivedJobPath(job.id)
2505
      rename_files.append((old, new))
2506

    
2507
    # TODO: What if 1..n files fail to rename?
2508
    self._RenameFilesUnlocked(rename_files)
2509

    
2510
    logging.debug("Successfully archived job(s) %s",
2511
                  utils.CommaJoin(job.id for job in archive_jobs))
2512

    
2513
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2514
    # the files, we update the cached queue size from the filesystem. When we
2515
    # get around to fix the TODO: above, we can use the number of actually
2516
    # archived jobs to fix this.
2517
    self._UpdateQueueSizeUnlocked()
2518
    return len(archive_jobs)
2519

    
2520
  @locking.ssynchronized(_LOCK)
2521
  @_RequireOpenQueue
2522
  def ArchiveJob(self, job_id):
2523
    """Archives a job.
2524

2525
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2526

2527
    @type job_id: int
2528
    @param job_id: Job ID of job to be archived.
2529
    @rtype: bool
2530
    @return: Whether job was archived
2531

2532
    """
2533
    logging.info("Archiving job %s", job_id)
2534

    
2535
    job = self._LoadJobUnlocked(job_id)
2536
    if not job:
2537
      logging.debug("Job %s not found", job_id)
2538
      return False
2539

    
2540
    return self._ArchiveJobsUnlocked([job]) == 1
2541

    
2542
  @locking.ssynchronized(_LOCK)
2543
  @_RequireOpenQueue
2544
  def AutoArchiveJobs(self, age, timeout):
2545
    """Archives all jobs based on age.
2546

2547
    The method will archive all jobs which are older than the age
2548
    parameter. For jobs that don't have an end timestamp, the start
2549
    timestamp will be considered. The special '-1' age will cause
2550
    archival of all jobs (that are not running or queued).
2551

2552
    @type age: int
2553
    @param age: the minimum age in seconds
2554

2555
    """
2556
    logging.info("Archiving jobs with age more than %s seconds", age)
2557

    
2558
    now = time.time()
2559
    end_time = now + timeout
2560
    archived_count = 0
2561
    last_touched = 0
2562

    
2563
    all_job_ids = self._GetJobIDsUnlocked()
2564
    pending = []
2565
    for idx, job_id in enumerate(all_job_ids):
2566
      last_touched = idx + 1
2567

    
2568
      # Not optimal because jobs could be pending
2569
      # TODO: Measure average duration for job archival and take number of
2570
      # pending jobs into account.
2571
      if time.time() > end_time:
2572
        break
2573

    
2574
      # Returns None if the job failed to load
2575
      job = self._LoadJobUnlocked(job_id)
2576
      if job:
2577
        if job.end_timestamp is None:
2578
          if job.start_timestamp is None:
2579
            job_age = job.received_timestamp
2580
          else:
2581
            job_age = job.start_timestamp
2582
        else:
2583
          job_age = job.end_timestamp
2584

    
2585
        if age == -1 or now - job_age[0] > age:
2586
          pending.append(job)
2587

    
2588
          # Archive 10 jobs at a time
2589
          if len(pending) >= 10:
2590
            archived_count += self._ArchiveJobsUnlocked(pending)
2591
            pending = []
2592

    
2593
    if pending:
2594
      archived_count += self._ArchiveJobsUnlocked(pending)
2595

    
2596
    return (archived_count, len(all_job_ids) - last_touched)
2597

    
2598
  def _Query(self, fields, qfilter):
2599
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2600
                       namefield="id")
2601

    
2602
    # Archived jobs are only looked at if the "archived" field is referenced
2603
    # either as a requested field or in the filter. By default archived jobs
2604
    # are ignored.
2605
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2606

    
2607
    job_ids = qobj.RequestedNames()
2608

    
2609
    list_all = (job_ids is None)
2610

    
2611
    if list_all:
2612
      # Since files are added to/removed from the queue atomically, there's no
2613
      # risk of getting the job ids in an inconsistent state.
2614
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2615

    
2616
    jobs = []
2617

    
2618
    for job_id in job_ids:
2619
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2620
      if job is not None or not list_all:
2621
        jobs.append((job_id, job))
2622

    
2623
    return (qobj, jobs, list_all)
2624

    
2625
  def QueryJobs(self, fields, qfilter):
2626
    """Returns a list of jobs in queue.
2627

2628
    @type fields: sequence
2629
    @param fields: List of wanted fields
2630
    @type qfilter: None or query2 filter (list)
2631
    @param qfilter: Query filter
2632

2633
    """
2634
    (qobj, ctx, _) = self._Query(fields, qfilter)
2635

    
2636
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2637

    
2638
  def OldStyleQueryJobs(self, job_ids, fields):
2639
    """Returns a list of jobs in queue.
2640

2641
    @type job_ids: list
2642
    @param job_ids: sequence of job identifiers or None for all
2643
    @type fields: list
2644
    @param fields: names of fields to return
2645
    @rtype: list
2646
    @return: list one element per job, each element being list with
2647
        the requested fields
2648

2649
    """
2650
    # backwards compat:
2651
    job_ids = [int(jid) for jid in job_ids]
2652
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2653

    
2654
    (qobj, ctx, _) = self._Query(fields, qfilter)
2655

    
2656
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2657

    
2658
  @locking.ssynchronized(_LOCK)
2659
  def PrepareShutdown(self):
2660
    """Prepare to stop the job queue.
2661

2662
    Disables execution of jobs in the workerpool and returns whether there are
2663
    any jobs currently running. If the latter is the case, the job queue is not
2664
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2665
    be called without interfering with any job. Queued and unfinished jobs will
2666
    be resumed next time.
2667

2668
    Once this function has been called no new job submissions will be accepted
2669
    (see L{_RequireNonDrainedQueue}).
2670

2671
    @rtype: bool
2672
    @return: Whether there are any running jobs
2673

2674
    """
2675
    if self._accepting_jobs:
2676
      self._accepting_jobs = False
2677

    
2678
      # Tell worker pool to stop processing pending tasks
2679
      self._wpool.SetActive(False)
2680

    
2681
    return self._wpool.HasRunningTasks()
2682

    
2683
  def AcceptingJobsUnlocked(self):
2684
    """Returns whether jobs are accepted.
2685

2686
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2687
    queue is shutting down.
2688

2689
    @rtype: bool
2690

2691
    """
2692
    return self._accepting_jobs
2693

    
2694
  @locking.ssynchronized(_LOCK)
2695
  @_RequireOpenQueue
2696
  def Shutdown(self):
2697
    """Stops the job queue.
2698

2699
    This shutdowns all the worker threads an closes the queue.
2700

2701
    """
2702
    self._wpool.TerminateWorkers()
2703

    
2704
    self._queue_filelock.Close()
2705
    self._queue_filelock = None