Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ be6cdf67

History | View | Annotate | Download (76 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import time
35
import weakref
36
import threading
37
import itertools
38
import operator
39

    
40
try:
41
  # pylint: disable=E0611
42
  from pyinotify import pyinotify
43
except ImportError:
44
  import pyinotify
45

    
46
from ganeti import asyncnotifier
47
from ganeti import constants
48
from ganeti import serializer
49
from ganeti import workerpool
50
from ganeti import locking
51
from ganeti import luxi
52
from ganeti import opcodes
53
from ganeti import opcodes_base
54
from ganeti import errors
55
from ganeti import mcpu
56
from ganeti import utils
57
from ganeti import jstore
58
import ganeti.rpc.node as rpc
59
from ganeti import runtime
60
from ganeti import netutils
61
from ganeti import compat
62
from ganeti import ht
63
from ganeti import query
64
from ganeti import qlang
65
from ganeti import pathutils
66
from ganeti import vcluster
67

    
68

    
69
JOBQUEUE_THREADS = 25
70

    
71
# member lock names to be passed to @ssynchronized decorator
72
_LOCK = "_lock"
73
_QUEUE = "_queue"
74

    
75
#: Retrieves "id" attribute
76
_GetIdAttr = operator.attrgetter("id")
77

    
78

    
79
class CancelJob(Exception):
80
  """Special exception to cancel a job.
81

82
  """
83

    
84

    
85
class QueueShutdown(Exception):
86
  """Special exception to abort a job when the job queue is shutting down.
87

88
  """
89

    
90

    
91
def TimeStampNow():
92
  """Returns the current timestamp.
93

94
  @rtype: tuple
95
  @return: the current time in the (seconds, microseconds) format
96

97
  """
98
  return utils.SplitTime(time.time())
99

    
100

    
101
def _CallJqUpdate(runner, names, file_name, content):
102
  """Updates job queue file after virtualizing filename.
103

104
  """
105
  virt_file_name = vcluster.MakeVirtualPath(file_name)
106
  return runner.call_jobqueue_update(names, virt_file_name, content)
107

    
108

    
109
class _SimpleJobQuery:
110
  """Wrapper for job queries.
111

112
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
113

114
  """
115
  def __init__(self, fields):
116
    """Initializes this class.
117

118
    """
119
    self._query = query.Query(query.JOB_FIELDS, fields)
120

    
121
  def __call__(self, job):
122
    """Executes a job query using cached field list.
123

124
    """
125
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
126

    
127

    
128
class _QueuedOpCode(object):
129
  """Encapsulates an opcode object.
130

131
  @ivar log: holds the execution log and consists of tuples
132
  of the form C{(log_serial, timestamp, level, message)}
133
  @ivar input: the OpCode we encapsulate
134
  @ivar status: the current status
135
  @ivar result: the result of the LU execution
136
  @ivar start_timestamp: timestamp for the start of the execution
137
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
138
  @ivar stop_timestamp: timestamp for the end of the execution
139

140
  """
141
  __slots__ = ["input", "status", "result", "log", "priority",
142
               "start_timestamp", "exec_timestamp", "end_timestamp",
143
               "__weakref__"]
144

    
145
  def __init__(self, op):
146
    """Initializes instances of this class.
147

148
    @type op: L{opcodes.OpCode}
149
    @param op: the opcode we encapsulate
150

151
    """
152
    self.input = op
153
    self.status = constants.OP_STATUS_QUEUED
154
    self.result = None
155
    self.log = []
156
    self.start_timestamp = None
157
    self.exec_timestamp = None
158
    self.end_timestamp = None
159

    
160
    # Get initial priority (it might change during the lifetime of this opcode)
161
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
162

    
163
  @classmethod
164
  def Restore(cls, state):
165
    """Restore the _QueuedOpCode from the serialized form.
166

167
    @type state: dict
168
    @param state: the serialized state
169
    @rtype: _QueuedOpCode
170
    @return: a new _QueuedOpCode instance
171

172
    """
173
    obj = _QueuedOpCode.__new__(cls)
174
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
175
    obj.status = state["status"]
176
    obj.result = state["result"]
177
    obj.log = state["log"]
178
    obj.start_timestamp = state.get("start_timestamp", None)
179
    obj.exec_timestamp = state.get("exec_timestamp", None)
180
    obj.end_timestamp = state.get("end_timestamp", None)
181
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
182
    return obj
183

    
184
  def Serialize(self):
185
    """Serializes this _QueuedOpCode.
186

187
    @rtype: dict
188
    @return: the dictionary holding the serialized state
189

190
    """
191
    return {
192
      "input": self.input.__getstate__(),
193
      "status": self.status,
194
      "result": self.result,
195
      "log": self.log,
196
      "start_timestamp": self.start_timestamp,
197
      "exec_timestamp": self.exec_timestamp,
198
      "end_timestamp": self.end_timestamp,
199
      "priority": self.priority,
200
      }
201

    
202

    
203
class _QueuedJob(object):
204
  """In-memory job representation.
205

206
  This is what we use to track the user-submitted jobs. Locking must
207
  be taken care of by users of this class.
208

209
  @type queue: L{JobQueue}
210
  @ivar queue: the parent queue
211
  @ivar id: the job ID
212
  @type ops: list
213
  @ivar ops: the list of _QueuedOpCode that constitute the job
214
  @type log_serial: int
215
  @ivar log_serial: holds the index for the next log entry
216
  @ivar received_timestamp: the timestamp for when the job was received
217
  @ivar start_timestmap: the timestamp for start of execution
218
  @ivar end_timestamp: the timestamp for end of execution
219
  @ivar writable: Whether the job is allowed to be modified
220

221
  """
222
  # pylint: disable=W0212
223
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
224
               "received_timestamp", "start_timestamp", "end_timestamp",
225
               "__weakref__", "processor_lock", "writable", "archived"]
226

    
227
  def AddReasons(self, pickup=False):
228
    """Extend the reason trail
229

230
    Add the reason for all the opcodes of this job to be executed.
231

232
    """
233
    count = 0
234
    for queued_op in self.ops:
235
      op = queued_op.input
236
      if pickup:
237
        reason_src_prefix = constants.OPCODE_REASON_SRC_PICKUP
238
      else:
239
        reason_src_prefix = constants.OPCODE_REASON_SRC_OPCODE
240
      reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__,
241
                                                reason_src_prefix)
242
      reason_text = "job=%d;index=%d" % (self.id, count)
243
      reason = getattr(op, "reason", [])
244
      reason.append((reason_src, reason_text, utils.EpochNano()))
245
      op.reason = reason
246
      count = count + 1
247

    
248
  def __init__(self, queue, job_id, ops, writable):
249
    """Constructor for the _QueuedJob.
250

251
    @type queue: L{JobQueue}
252
    @param queue: our parent queue
253
    @type job_id: job_id
254
    @param job_id: our job id
255
    @type ops: list
256
    @param ops: the list of opcodes we hold, which will be encapsulated
257
        in _QueuedOpCodes
258
    @type writable: bool
259
    @param writable: Whether job can be modified
260

261
    """
262
    if not ops:
263
      raise errors.GenericError("A job needs at least one opcode")
264

    
265
    self.queue = queue
266
    self.id = int(job_id)
267
    self.ops = [_QueuedOpCode(op) for op in ops]
268
    self.AddReasons()
269
    self.log_serial = 0
270
    self.received_timestamp = TimeStampNow()
271
    self.start_timestamp = None
272
    self.end_timestamp = None
273
    self.archived = False
274

    
275
    self._InitInMemory(self, writable)
276

    
277
    assert not self.archived, "New jobs can not be marked as archived"
278

    
279
  @staticmethod
280
  def _InitInMemory(obj, writable):
281
    """Initializes in-memory variables.
282

283
    """
284
    obj.writable = writable
285
    obj.ops_iter = None
286
    obj.cur_opctx = None
287

    
288
    # Read-only jobs are not processed and therefore don't need a lock
289
    if writable:
290
      obj.processor_lock = threading.Lock()
291
    else:
292
      obj.processor_lock = None
293

    
294
  def __repr__(self):
295
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
296
              "id=%s" % self.id,
297
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
298

    
299
    return "<%s at %#x>" % (" ".join(status), id(self))
300

    
301
  @classmethod
302
  def Restore(cls, queue, state, writable, archived):
303
    """Restore a _QueuedJob from serialized state:
304

305
    @type queue: L{JobQueue}
306
    @param queue: to which queue the restored job belongs
307
    @type state: dict
308
    @param state: the serialized state
309
    @type writable: bool
310
    @param writable: Whether job can be modified
311
    @type archived: bool
312
    @param archived: Whether job was already archived
313
    @rtype: _JobQueue
314
    @return: the restored _JobQueue instance
315

316
    """
317
    obj = _QueuedJob.__new__(cls)
318
    obj.queue = queue
319
    obj.id = int(state["id"])
320
    obj.received_timestamp = state.get("received_timestamp", None)
321
    obj.start_timestamp = state.get("start_timestamp", None)
322
    obj.end_timestamp = state.get("end_timestamp", None)
323
    obj.archived = archived
324

    
325
    obj.ops = []
326
    obj.log_serial = 0
327
    for op_state in state["ops"]:
328
      op = _QueuedOpCode.Restore(op_state)
329
      for log_entry in op.log:
330
        obj.log_serial = max(obj.log_serial, log_entry[0])
331
      obj.ops.append(op)
332

    
333
    cls._InitInMemory(obj, writable)
334

    
335
    return obj
336

    
337
  def Serialize(self):
338
    """Serialize the _JobQueue instance.
339

340
    @rtype: dict
341
    @return: the serialized state
342

343
    """
344
    return {
345
      "id": self.id,
346
      "ops": [op.Serialize() for op in self.ops],
347
      "start_timestamp": self.start_timestamp,
348
      "end_timestamp": self.end_timestamp,
349
      "received_timestamp": self.received_timestamp,
350
      }
351

    
352
  def CalcStatus(self):
353
    """Compute the status of this job.
354

355
    This function iterates over all the _QueuedOpCodes in the job and
356
    based on their status, computes the job status.
357

358
    The algorithm is:
359
      - if we find a cancelled, or finished with error, the job
360
        status will be the same
361
      - otherwise, the last opcode with the status one of:
362
          - waitlock
363
          - canceling
364
          - running
365

366
        will determine the job status
367

368
      - otherwise, it means either all opcodes are queued, or success,
369
        and the job status will be the same
370

371
    @return: the job status
372

373
    """
374
    status = constants.JOB_STATUS_QUEUED
375

    
376
    all_success = True
377
    for op in self.ops:
378
      if op.status == constants.OP_STATUS_SUCCESS:
379
        continue
380

    
381
      all_success = False
382

    
383
      if op.status == constants.OP_STATUS_QUEUED:
384
        pass
385
      elif op.status == constants.OP_STATUS_WAITING:
386
        status = constants.JOB_STATUS_WAITING
387
      elif op.status == constants.OP_STATUS_RUNNING:
388
        status = constants.JOB_STATUS_RUNNING
389
      elif op.status == constants.OP_STATUS_CANCELING:
390
        status = constants.JOB_STATUS_CANCELING
391
        break
392
      elif op.status == constants.OP_STATUS_ERROR:
393
        status = constants.JOB_STATUS_ERROR
394
        # The whole job fails if one opcode failed
395
        break
396
      elif op.status == constants.OP_STATUS_CANCELED:
397
        status = constants.OP_STATUS_CANCELED
398
        break
399

    
400
    if all_success:
401
      status = constants.JOB_STATUS_SUCCESS
402

    
403
    return status
404

    
405
  def CalcPriority(self):
406
    """Gets the current priority for this job.
407

408
    Only unfinished opcodes are considered. When all are done, the default
409
    priority is used.
410

411
    @rtype: int
412

413
    """
414
    priorities = [op.priority for op in self.ops
415
                  if op.status not in constants.OPS_FINALIZED]
416

    
417
    if not priorities:
418
      # All opcodes are done, assume default priority
419
      return constants.OP_PRIO_DEFAULT
420

    
421
    return min(priorities)
422

    
423
  def GetLogEntries(self, newer_than):
424
    """Selectively returns the log entries.
425

426
    @type newer_than: None or int
427
    @param newer_than: if this is None, return all log entries,
428
        otherwise return only the log entries with serial higher
429
        than this value
430
    @rtype: list
431
    @return: the list of the log entries selected
432

433
    """
434
    if newer_than is None:
435
      serial = -1
436
    else:
437
      serial = newer_than
438

    
439
    entries = []
440
    for op in self.ops:
441
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
442

    
443
    return entries
444

    
445
  def GetInfo(self, fields):
446
    """Returns information about a job.
447

448
    @type fields: list
449
    @param fields: names of fields to return
450
    @rtype: list
451
    @return: list with one element for each field
452
    @raise errors.OpExecError: when an invalid field
453
        has been passed
454

455
    """
456
    return _SimpleJobQuery(fields)(self)
457

    
458
  def MarkUnfinishedOps(self, status, result):
459
    """Mark unfinished opcodes with a given status and result.
460

461
    This is an utility function for marking all running or waiting to
462
    be run opcodes with a given status. Opcodes which are already
463
    finalised are not changed.
464

465
    @param status: a given opcode status
466
    @param result: the opcode result
467

468
    """
469
    not_marked = True
470
    for op in self.ops:
471
      if op.status in constants.OPS_FINALIZED:
472
        assert not_marked, "Finalized opcodes found after non-finalized ones"
473
        continue
474
      op.status = status
475
      op.result = result
476
      not_marked = False
477

    
478
  def Finalize(self):
479
    """Marks the job as finalized.
480

481
    """
482
    self.end_timestamp = TimeStampNow()
483

    
484
  def Cancel(self):
485
    """Marks job as canceled/-ing if possible.
486

487
    @rtype: tuple; (bool, string)
488
    @return: Boolean describing whether job was successfully canceled or marked
489
      as canceling and a text message
490

491
    """
492
    status = self.CalcStatus()
493

    
494
    if status == constants.JOB_STATUS_QUEUED:
495
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
496
                             "Job canceled by request")
497
      self.Finalize()
498
      return (True, "Job %s canceled" % self.id)
499

    
500
    elif status == constants.JOB_STATUS_WAITING:
501
      # The worker will notice the new status and cancel the job
502
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
503
      return (True, "Job %s will be canceled" % self.id)
504

    
505
    else:
506
      logging.debug("Job %s is no longer waiting in the queue", self.id)
507
      return (False, "Job %s is no longer waiting in the queue" % self.id)
508

    
509
  def ChangePriority(self, priority):
510
    """Changes the job priority.
511

512
    @type priority: int
513
    @param priority: New priority
514
    @rtype: tuple; (bool, string)
515
    @return: Boolean describing whether job's priority was successfully changed
516
      and a text message
517

518
    """
519
    status = self.CalcStatus()
520

    
521
    if status in constants.JOBS_FINALIZED:
522
      return (False, "Job %s is finished" % self.id)
523
    elif status == constants.JOB_STATUS_CANCELING:
524
      return (False, "Job %s is cancelling" % self.id)
525
    else:
526
      assert status in (constants.JOB_STATUS_QUEUED,
527
                        constants.JOB_STATUS_WAITING,
528
                        constants.JOB_STATUS_RUNNING)
529

    
530
      changed = False
531
      for op in self.ops:
532
        if (op.status == constants.OP_STATUS_RUNNING or
533
            op.status in constants.OPS_FINALIZED):
534
          assert not changed, \
535
            ("Found opcode for which priority should not be changed after"
536
             " priority has been changed for previous opcodes")
537
          continue
538

    
539
        assert op.status in (constants.OP_STATUS_QUEUED,
540
                             constants.OP_STATUS_WAITING)
541

    
542
        changed = True
543

    
544
        # Set new priority (doesn't modify opcode input)
545
        op.priority = priority
546

    
547
      if changed:
548
        return (True, ("Priorities of pending opcodes for job %s have been"
549
                       " changed to %s" % (self.id, priority)))
550
      else:
551
        return (False, "Job %s had no pending opcodes" % self.id)
552

    
553

    
554
class _OpExecCallbacks(mcpu.OpExecCbBase):
555
  def __init__(self, queue, job, op):
556
    """Initializes this class.
557

558
    @type queue: L{JobQueue}
559
    @param queue: Job queue
560
    @type job: L{_QueuedJob}
561
    @param job: Job object
562
    @type op: L{_QueuedOpCode}
563
    @param op: OpCode
564

565
    """
566
    assert queue, "Queue is missing"
567
    assert job, "Job is missing"
568
    assert op, "Opcode is missing"
569

    
570
    self._queue = queue
571
    self._job = job
572
    self._op = op
573

    
574
  def _CheckCancel(self):
575
    """Raises an exception to cancel the job if asked to.
576

577
    """
578
    # Cancel here if we were asked to
579
    if self._op.status == constants.OP_STATUS_CANCELING:
580
      logging.debug("Canceling opcode")
581
      raise CancelJob()
582

    
583
    # See if queue is shutting down
584
    if not self._queue.AcceptingJobsUnlocked():
585
      logging.debug("Queue is shutting down")
586
      raise QueueShutdown()
587

    
588
  @locking.ssynchronized(_QUEUE, shared=1)
589
  def NotifyStart(self):
590
    """Mark the opcode as running, not lock-waiting.
591

592
    This is called from the mcpu code as a notifier function, when the LU is
593
    finally about to start the Exec() method. Of course, to have end-user
594
    visible results, the opcode must be initially (before calling into
595
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
596

597
    """
598
    assert self._op in self._job.ops
599
    assert self._op.status in (constants.OP_STATUS_WAITING,
600
                               constants.OP_STATUS_CANCELING)
601

    
602
    # Cancel here if we were asked to
603
    self._CheckCancel()
604

    
605
    logging.debug("Opcode is now running")
606

    
607
    self._op.status = constants.OP_STATUS_RUNNING
608
    self._op.exec_timestamp = TimeStampNow()
609

    
610
    # And finally replicate the job status
611
    self._queue.UpdateJobUnlocked(self._job)
612

    
613
  @locking.ssynchronized(_QUEUE, shared=1)
614
  def _AppendFeedback(self, timestamp, log_type, log_msg):
615
    """Internal feedback append function, with locks
616

617
    """
618
    self._job.log_serial += 1
619
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
620
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
621

    
622
  def Feedback(self, *args):
623
    """Append a log entry.
624

625
    """
626
    assert len(args) < 3
627

    
628
    if len(args) == 1:
629
      log_type = constants.ELOG_MESSAGE
630
      log_msg = args[0]
631
    else:
632
      (log_type, log_msg) = args
633

    
634
    # The time is split to make serialization easier and not lose
635
    # precision.
636
    timestamp = utils.SplitTime(time.time())
637
    self._AppendFeedback(timestamp, log_type, log_msg)
638

    
639
  def CurrentPriority(self):
640
    """Returns current priority for opcode.
641

642
    """
643
    assert self._op.status in (constants.OP_STATUS_WAITING,
644
                               constants.OP_STATUS_CANCELING)
645

    
646
    # Cancel here if we were asked to
647
    self._CheckCancel()
648

    
649
    return self._op.priority
650

    
651
  def SubmitManyJobs(self, jobs):
652
    """Submits jobs for processing.
653

654
    See L{JobQueue.SubmitManyJobs}.
655

656
    """
657
    # Locking is done in job queue
658
    return self._queue.SubmitManyJobs(jobs)
659

    
660

    
661
class _JobChangesChecker(object):
662
  def __init__(self, fields, prev_job_info, prev_log_serial):
663
    """Initializes this class.
664

665
    @type fields: list of strings
666
    @param fields: Fields requested by LUXI client
667
    @type prev_job_info: string
668
    @param prev_job_info: previous job info, as passed by the LUXI client
669
    @type prev_log_serial: string
670
    @param prev_log_serial: previous job serial, as passed by the LUXI client
671

672
    """
673
    self._squery = _SimpleJobQuery(fields)
674
    self._prev_job_info = prev_job_info
675
    self._prev_log_serial = prev_log_serial
676

    
677
  def __call__(self, job):
678
    """Checks whether job has changed.
679

680
    @type job: L{_QueuedJob}
681
    @param job: Job object
682

683
    """
684
    assert not job.writable, "Expected read-only job"
685

    
686
    status = job.CalcStatus()
687
    job_info = self._squery(job)
688
    log_entries = job.GetLogEntries(self._prev_log_serial)
689

    
690
    # Serializing and deserializing data can cause type changes (e.g. from
691
    # tuple to list) or precision loss. We're doing it here so that we get
692
    # the same modifications as the data received from the client. Without
693
    # this, the comparison afterwards might fail without the data being
694
    # significantly different.
695
    # TODO: we just deserialized from disk, investigate how to make sure that
696
    # the job info and log entries are compatible to avoid this further step.
697
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
698
    # efficient, though floats will be tricky
699
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
700
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
701

    
702
    # Don't even try to wait if the job is no longer running, there will be
703
    # no changes.
704
    if (status not in (constants.JOB_STATUS_QUEUED,
705
                       constants.JOB_STATUS_RUNNING,
706
                       constants.JOB_STATUS_WAITING) or
707
        job_info != self._prev_job_info or
708
        (log_entries and self._prev_log_serial != log_entries[0][0])):
709
      logging.debug("Job %s changed", job.id)
710
      return (job_info, log_entries)
711

    
712
    return None
713

    
714

    
715
class _JobFileChangesWaiter(object):
716
  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
717
    """Initializes this class.
718

719
    @type filename: string
720
    @param filename: Path to job file
721
    @raises errors.InotifyError: if the notifier cannot be setup
722

723
    """
724
    self._wm = _inotify_wm_cls()
725
    self._inotify_handler = \
726
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
727
    self._notifier = \
728
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
729
    try:
730
      self._inotify_handler.enable()
731
    except Exception:
732
      # pyinotify doesn't close file descriptors automatically
733
      self._notifier.stop()
734
      raise
735

    
736
  def _OnInotify(self, notifier_enabled):
737
    """Callback for inotify.
738

739
    """
740
    if not notifier_enabled:
741
      self._inotify_handler.enable()
742

    
743
  def Wait(self, timeout):
744
    """Waits for the job file to change.
745

746
    @type timeout: float
747
    @param timeout: Timeout in seconds
748
    @return: Whether there have been events
749

750
    """
751
    assert timeout >= 0
752
    have_events = self._notifier.check_events(timeout * 1000)
753
    if have_events:
754
      self._notifier.read_events()
755
    self._notifier.process_events()
756
    return have_events
757

    
758
  def Close(self):
759
    """Closes underlying notifier and its file descriptor.
760

761
    """
762
    self._notifier.stop()
763

    
764

    
765
class _JobChangesWaiter(object):
766
  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
767
    """Initializes this class.
768

769
    @type filename: string
770
    @param filename: Path to job file
771

772
    """
773
    self._filewaiter = None
774
    self._filename = filename
775
    self._waiter_cls = _waiter_cls
776

    
777
  def Wait(self, timeout):
778
    """Waits for a job to change.
779

780
    @type timeout: float
781
    @param timeout: Timeout in seconds
782
    @return: Whether there have been events
783

784
    """
785
    if self._filewaiter:
786
      return self._filewaiter.Wait(timeout)
787

    
788
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
789
    # If this point is reached, return immediately and let caller check the job
790
    # file again in case there were changes since the last check. This avoids a
791
    # race condition.
792
    self._filewaiter = self._waiter_cls(self._filename)
793

    
794
    return True
795

    
796
  def Close(self):
797
    """Closes underlying waiter.
798

799
    """
800
    if self._filewaiter:
801
      self._filewaiter.Close()
802

    
803

    
804
class _WaitForJobChangesHelper(object):
805
  """Helper class using inotify to wait for changes in a job file.
806

807
  This class takes a previous job status and serial, and alerts the client when
808
  the current job status has changed.
809

810
  """
811
  @staticmethod
812
  def _CheckForChanges(counter, job_load_fn, check_fn):
813
    if counter.next() > 0:
814
      # If this isn't the first check the job is given some more time to change
815
      # again. This gives better performance for jobs generating many
816
      # changes/messages.
817
      time.sleep(0.1)
818

    
819
    job = job_load_fn()
820
    if not job:
821
      raise errors.JobLost()
822

    
823
    result = check_fn(job)
824
    if result is None:
825
      raise utils.RetryAgain()
826

    
827
    return result
828

    
829
  def __call__(self, filename, job_load_fn,
830
               fields, prev_job_info, prev_log_serial, timeout,
831
               _waiter_cls=_JobChangesWaiter):
832
    """Waits for changes on a job.
833

834
    @type filename: string
835
    @param filename: File on which to wait for changes
836
    @type job_load_fn: callable
837
    @param job_load_fn: Function to load job
838
    @type fields: list of strings
839
    @param fields: Which fields to check for changes
840
    @type prev_job_info: list or None
841
    @param prev_job_info: Last job information returned
842
    @type prev_log_serial: int
843
    @param prev_log_serial: Last job message serial number
844
    @type timeout: float
845
    @param timeout: maximum time to wait in seconds
846

847
    """
848
    counter = itertools.count()
849
    try:
850
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
851
      waiter = _waiter_cls(filename)
852
      try:
853
        return utils.Retry(compat.partial(self._CheckForChanges,
854
                                          counter, job_load_fn, check_fn),
855
                           utils.RETRY_REMAINING_TIME, timeout,
856
                           wait_fn=waiter.Wait)
857
      finally:
858
        waiter.Close()
859
    except errors.JobLost:
860
      return None
861
    except utils.RetryTimeout:
862
      return constants.JOB_NOTCHANGED
863

    
864

    
865
def _EncodeOpError(err):
866
  """Encodes an error which occurred while processing an opcode.
867

868
  """
869
  if isinstance(err, errors.GenericError):
870
    to_encode = err
871
  else:
872
    to_encode = errors.OpExecError(str(err))
873

    
874
  return errors.EncodeException(to_encode)
875

    
876

    
877
class _TimeoutStrategyWrapper:
878
  def __init__(self, fn):
879
    """Initializes this class.
880

881
    """
882
    self._fn = fn
883
    self._next = None
884

    
885
  def _Advance(self):
886
    """Gets the next timeout if necessary.
887

888
    """
889
    if self._next is None:
890
      self._next = self._fn()
891

    
892
  def Peek(self):
893
    """Returns the next timeout.
894

895
    """
896
    self._Advance()
897
    return self._next
898

    
899
  def Next(self):
900
    """Returns the current timeout and advances the internal state.
901

902
    """
903
    self._Advance()
904
    result = self._next
905
    self._next = None
906
    return result
907

    
908

    
909
class _OpExecContext:
910
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
911
    """Initializes this class.
912

913
    """
914
    self.op = op
915
    self.index = index
916
    self.log_prefix = log_prefix
917
    self.summary = op.input.Summary()
918

    
919
    # Create local copy to modify
920
    if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
921
      self.jobdeps = op.input.depends[:]
922
    else:
923
      self.jobdeps = None
924

    
925
    self._timeout_strategy_factory = timeout_strategy_factory
926
    self._ResetTimeoutStrategy()
927

    
928
  def _ResetTimeoutStrategy(self):
929
    """Creates a new timeout strategy.
930

931
    """
932
    self._timeout_strategy = \
933
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
934

    
935
  def CheckPriorityIncrease(self):
936
    """Checks whether priority can and should be increased.
937

938
    Called when locks couldn't be acquired.
939

940
    """
941
    op = self.op
942

    
943
    # Exhausted all retries and next round should not use blocking acquire
944
    # for locks?
945
    if (self._timeout_strategy.Peek() is None and
946
        op.priority > constants.OP_PRIO_HIGHEST):
947
      logging.debug("Increasing priority")
948
      op.priority -= 1
949
      self._ResetTimeoutStrategy()
950
      return True
951

    
952
    return False
953

    
954
  def GetNextLockTimeout(self):
955
    """Returns the next lock acquire timeout.
956

957
    """
958
    return self._timeout_strategy.Next()
959

    
960

    
961
class _JobProcessor(object):
962
  (DEFER,
963
   WAITDEP,
964
   FINISHED) = range(1, 4)
965

    
966
  def __init__(self, queue, opexec_fn, job,
967
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
968
    """Initializes this class.
969

970
    """
971
    self.queue = queue
972
    self.opexec_fn = opexec_fn
973
    self.job = job
974
    self._timeout_strategy_factory = _timeout_strategy_factory
975

    
976
  @staticmethod
977
  def _FindNextOpcode(job, timeout_strategy_factory):
978
    """Locates the next opcode to run.
979

980
    @type job: L{_QueuedJob}
981
    @param job: Job object
982
    @param timeout_strategy_factory: Callable to create new timeout strategy
983

984
    """
985
    # Create some sort of a cache to speed up locating next opcode for future
986
    # lookups
987
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
988
    # pending and one for processed ops.
989
    if job.ops_iter is None:
990
      job.ops_iter = enumerate(job.ops)
991

    
992
    # Find next opcode to run
993
    while True:
994
      try:
995
        (idx, op) = job.ops_iter.next()
996
      except StopIteration:
997
        raise errors.ProgrammerError("Called for a finished job")
998

    
999
      if op.status == constants.OP_STATUS_RUNNING:
1000
        # Found an opcode already marked as running
1001
        raise errors.ProgrammerError("Called for job marked as running")
1002

    
1003
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
1004
                             timeout_strategy_factory)
1005

    
1006
      if op.status not in constants.OPS_FINALIZED:
1007
        return opctx
1008

    
1009
      # This is a job that was partially completed before master daemon
1010
      # shutdown, so it can be expected that some opcodes are already
1011
      # completed successfully (if any did error out, then the whole job
1012
      # should have been aborted and not resubmitted for processing).
1013
      logging.info("%s: opcode %s already processed, skipping",
1014
                   opctx.log_prefix, opctx.summary)
1015

    
1016
  @staticmethod
1017
  def _MarkWaitlock(job, op):
1018
    """Marks an opcode as waiting for locks.
1019

1020
    The job's start timestamp is also set if necessary.
1021

1022
    @type job: L{_QueuedJob}
1023
    @param job: Job object
1024
    @type op: L{_QueuedOpCode}
1025
    @param op: Opcode object
1026

1027
    """
1028
    assert op in job.ops
1029
    assert op.status in (constants.OP_STATUS_QUEUED,
1030
                         constants.OP_STATUS_WAITING)
1031

    
1032
    update = False
1033

    
1034
    op.result = None
1035

    
1036
    if op.status == constants.OP_STATUS_QUEUED:
1037
      op.status = constants.OP_STATUS_WAITING
1038
      update = True
1039

    
1040
    if op.start_timestamp is None:
1041
      op.start_timestamp = TimeStampNow()
1042
      update = True
1043

    
1044
    if job.start_timestamp is None:
1045
      job.start_timestamp = op.start_timestamp
1046
      update = True
1047

    
1048
    assert op.status == constants.OP_STATUS_WAITING
1049

    
1050
    return update
1051

    
1052
  @staticmethod
1053
  def _CheckDependencies(queue, job, opctx):
1054
    """Checks if an opcode has dependencies and if so, processes them.
1055

1056
    @type queue: L{JobQueue}
1057
    @param queue: Queue object
1058
    @type job: L{_QueuedJob}
1059
    @param job: Job object
1060
    @type opctx: L{_OpExecContext}
1061
    @param opctx: Opcode execution context
1062
    @rtype: bool
1063
    @return: Whether opcode will be re-scheduled by dependency tracker
1064

1065
    """
1066
    op = opctx.op
1067

    
1068
    result = False
1069

    
1070
    while opctx.jobdeps:
1071
      (dep_job_id, dep_status) = opctx.jobdeps[0]
1072

    
1073
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1074
                                                          dep_status)
1075
      assert ht.TNonEmptyString(depmsg), "No dependency message"
1076

    
1077
      logging.info("%s: %s", opctx.log_prefix, depmsg)
1078

    
1079
      if depresult == _JobDependencyManager.CONTINUE:
1080
        # Remove dependency and continue
1081
        opctx.jobdeps.pop(0)
1082

    
1083
      elif depresult == _JobDependencyManager.WAIT:
1084
        # Need to wait for notification, dependency tracker will re-add job
1085
        # to workerpool
1086
        result = True
1087
        break
1088

    
1089
      elif depresult == _JobDependencyManager.CANCEL:
1090
        # Job was cancelled, cancel this job as well
1091
        job.Cancel()
1092
        assert op.status == constants.OP_STATUS_CANCELING
1093
        break
1094

    
1095
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1096
                         _JobDependencyManager.ERROR):
1097
        # Job failed or there was an error, this job must fail
1098
        op.status = constants.OP_STATUS_ERROR
1099
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1100
        break
1101

    
1102
      else:
1103
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1104
                                     depresult)
1105

    
1106
    return result
1107

    
1108
  def _ExecOpCodeUnlocked(self, opctx):
1109
    """Processes one opcode and returns the result.
1110

1111
    """
1112
    op = opctx.op
1113

    
1114
    assert op.status in (constants.OP_STATUS_WAITING,
1115
                         constants.OP_STATUS_CANCELING)
1116

    
1117
    # The very last check if the job was cancelled before trying to execute
1118
    if op.status == constants.OP_STATUS_CANCELING:
1119
      return (constants.OP_STATUS_CANCELING, None)
1120

    
1121
    timeout = opctx.GetNextLockTimeout()
1122

    
1123
    try:
1124
      # Make sure not to hold queue lock while calling ExecOpCode
1125
      result = self.opexec_fn(op.input,
1126
                              _OpExecCallbacks(self.queue, self.job, op),
1127
                              timeout=timeout)
1128
    except mcpu.LockAcquireTimeout:
1129
      assert timeout is not None, "Received timeout for blocking acquire"
1130
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1131

    
1132
      assert op.status in (constants.OP_STATUS_WAITING,
1133
                           constants.OP_STATUS_CANCELING)
1134

    
1135
      # Was job cancelled while we were waiting for the lock?
1136
      if op.status == constants.OP_STATUS_CANCELING:
1137
        return (constants.OP_STATUS_CANCELING, None)
1138

    
1139
      # Queue is shutting down, return to queued
1140
      if not self.queue.AcceptingJobsUnlocked():
1141
        return (constants.OP_STATUS_QUEUED, None)
1142

    
1143
      # Stay in waitlock while trying to re-acquire lock
1144
      return (constants.OP_STATUS_WAITING, None)
1145
    except CancelJob:
1146
      logging.exception("%s: Canceling job", opctx.log_prefix)
1147
      assert op.status == constants.OP_STATUS_CANCELING
1148
      return (constants.OP_STATUS_CANCELING, None)
1149

    
1150
    except QueueShutdown:
1151
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1152

    
1153
      assert op.status == constants.OP_STATUS_WAITING
1154

    
1155
      # Job hadn't been started yet, so it should return to the queue
1156
      return (constants.OP_STATUS_QUEUED, None)
1157

    
1158
    except Exception, err: # pylint: disable=W0703
1159
      logging.exception("%s: Caught exception in %s",
1160
                        opctx.log_prefix, opctx.summary)
1161
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1162
    else:
1163
      logging.debug("%s: %s successful",
1164
                    opctx.log_prefix, opctx.summary)
1165
      return (constants.OP_STATUS_SUCCESS, result)
1166

    
1167
  def __call__(self, _nextop_fn=None):
1168
    """Continues execution of a job.
1169

1170
    @param _nextop_fn: Callback function for tests
1171
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1172
      be deferred and C{WAITDEP} if the dependency manager
1173
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1174

1175
    """
1176
    queue = self.queue
1177
    job = self.job
1178

    
1179
    logging.debug("Processing job %s", job.id)
1180

    
1181
    queue.acquire(shared=1)
1182
    try:
1183
      opcount = len(job.ops)
1184

    
1185
      assert job.writable, "Expected writable job"
1186

    
1187
      # Don't do anything for finalized jobs
1188
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1189
        return self.FINISHED
1190

    
1191
      # Is a previous opcode still pending?
1192
      if job.cur_opctx:
1193
        opctx = job.cur_opctx
1194
        job.cur_opctx = None
1195
      else:
1196
        if __debug__ and _nextop_fn:
1197
          _nextop_fn()
1198
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1199

    
1200
      op = opctx.op
1201

    
1202
      # Consistency check
1203
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1204
                                     constants.OP_STATUS_CANCELING)
1205
                        for i in job.ops[opctx.index + 1:])
1206

    
1207
      assert op.status in (constants.OP_STATUS_QUEUED,
1208
                           constants.OP_STATUS_WAITING,
1209
                           constants.OP_STATUS_CANCELING)
1210

    
1211
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1212
              op.priority >= constants.OP_PRIO_HIGHEST)
1213

    
1214
      waitjob = None
1215

    
1216
      if op.status != constants.OP_STATUS_CANCELING:
1217
        assert op.status in (constants.OP_STATUS_QUEUED,
1218
                             constants.OP_STATUS_WAITING)
1219

    
1220
        # Prepare to start opcode
1221
        if self._MarkWaitlock(job, op):
1222
          # Write to disk
1223
          queue.UpdateJobUnlocked(job)
1224

    
1225
        assert op.status == constants.OP_STATUS_WAITING
1226
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1227
        assert job.start_timestamp and op.start_timestamp
1228
        assert waitjob is None
1229

    
1230
        # Check if waiting for a job is necessary
1231
        waitjob = self._CheckDependencies(queue, job, opctx)
1232

    
1233
        assert op.status in (constants.OP_STATUS_WAITING,
1234
                             constants.OP_STATUS_CANCELING,
1235
                             constants.OP_STATUS_ERROR)
1236

    
1237
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1238
                                         constants.OP_STATUS_ERROR)):
1239
          logging.info("%s: opcode %s waiting for locks",
1240
                       opctx.log_prefix, opctx.summary)
1241

    
1242
          assert not opctx.jobdeps, "Not all dependencies were removed"
1243

    
1244
          queue.release()
1245
          try:
1246
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1247
          finally:
1248
            queue.acquire(shared=1)
1249

    
1250
          op.status = op_status
1251
          op.result = op_result
1252

    
1253
          assert not waitjob
1254

    
1255
        if op.status in (constants.OP_STATUS_WAITING,
1256
                         constants.OP_STATUS_QUEUED):
1257
          # waiting: Couldn't get locks in time
1258
          # queued: Queue is shutting down
1259
          assert not op.end_timestamp
1260
        else:
1261
          # Finalize opcode
1262
          op.end_timestamp = TimeStampNow()
1263

    
1264
          if op.status == constants.OP_STATUS_CANCELING:
1265
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1266
                                  for i in job.ops[opctx.index:])
1267
          else:
1268
            assert op.status in constants.OPS_FINALIZED
1269

    
1270
      if op.status == constants.OP_STATUS_QUEUED:
1271
        # Queue is shutting down
1272
        assert not waitjob
1273

    
1274
        finalize = False
1275

    
1276
        # Reset context
1277
        job.cur_opctx = None
1278

    
1279
        # In no case must the status be finalized here
1280
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1281

    
1282
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1283
        finalize = False
1284

    
1285
        if not waitjob and opctx.CheckPriorityIncrease():
1286
          # Priority was changed, need to update on-disk file
1287
          queue.UpdateJobUnlocked(job)
1288

    
1289
        # Keep around for another round
1290
        job.cur_opctx = opctx
1291

    
1292
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1293
                op.priority >= constants.OP_PRIO_HIGHEST)
1294

    
1295
        # In no case must the status be finalized here
1296
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1297

    
1298
      else:
1299
        # Ensure all opcodes so far have been successful
1300
        assert (opctx.index == 0 or
1301
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1302
                           for i in job.ops[:opctx.index]))
1303

    
1304
        # Reset context
1305
        job.cur_opctx = None
1306

    
1307
        if op.status == constants.OP_STATUS_SUCCESS:
1308
          finalize = False
1309

    
1310
        elif op.status == constants.OP_STATUS_ERROR:
1311
          # Ensure failed opcode has an exception as its result
1312
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1313

    
1314
          to_encode = errors.OpExecError("Preceding opcode failed")
1315
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1316
                                _EncodeOpError(to_encode))
1317
          finalize = True
1318

    
1319
          # Consistency check
1320
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1321
                            errors.GetEncodedError(i.result)
1322
                            for i in job.ops[opctx.index:])
1323

    
1324
        elif op.status == constants.OP_STATUS_CANCELING:
1325
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1326
                                "Job canceled by request")
1327
          finalize = True
1328

    
1329
        else:
1330
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1331

    
1332
        if opctx.index == (opcount - 1):
1333
          # Finalize on last opcode
1334
          finalize = True
1335

    
1336
        if finalize:
1337
          # All opcodes have been run, finalize job
1338
          job.Finalize()
1339

    
1340
        # Write to disk. If the job status is final, this is the final write
1341
        # allowed. Once the file has been written, it can be archived anytime.
1342
        queue.UpdateJobUnlocked(job)
1343

    
1344
        assert not waitjob
1345

    
1346
        if finalize:
1347
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1348
          return self.FINISHED
1349

    
1350
      assert not waitjob or queue.depmgr.JobWaiting(job)
1351

    
1352
      if waitjob:
1353
        return self.WAITDEP
1354
      else:
1355
        return self.DEFER
1356
    finally:
1357
      assert job.writable, "Job became read-only while being processed"
1358
      queue.release()
1359

    
1360

    
1361
def _EvaluateJobProcessorResult(depmgr, job, result):
1362
  """Looks at a result from L{_JobProcessor} for a job.
1363

1364
  To be used in a L{_JobQueueWorker}.
1365

1366
  """
1367
  if result == _JobProcessor.FINISHED:
1368
    # Notify waiting jobs
1369
    depmgr.NotifyWaiters(job.id)
1370

    
1371
  elif result == _JobProcessor.DEFER:
1372
    # Schedule again
1373
    raise workerpool.DeferTask(priority=job.CalcPriority())
1374

    
1375
  elif result == _JobProcessor.WAITDEP:
1376
    # No-op, dependency manager will re-schedule
1377
    pass
1378

    
1379
  else:
1380
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1381
                                 (result, ))
1382

    
1383

    
1384
class _JobQueueWorker(workerpool.BaseWorker):
1385
  """The actual job workers.
1386

1387
  """
1388
  def RunTask(self, job): # pylint: disable=W0221
1389
    """Job executor.
1390

1391
    @type job: L{_QueuedJob}
1392
    @param job: the job to be processed
1393

1394
    """
1395
    assert job.writable, "Expected writable job"
1396

    
1397
    # Ensure only one worker is active on a single job. If a job registers for
1398
    # a dependency job, and the other job notifies before the first worker is
1399
    # done, the job can end up in the tasklist more than once.
1400
    job.processor_lock.acquire()
1401
    try:
1402
      return self._RunTaskInner(job)
1403
    finally:
1404
      job.processor_lock.release()
1405

    
1406
  def _RunTaskInner(self, job):
1407
    """Executes a job.
1408

1409
    Must be called with per-job lock acquired.
1410

1411
    """
1412
    queue = job.queue
1413
    assert queue == self.pool.queue
1414

    
1415
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1416
    setname_fn(None)
1417

    
1418
    proc = mcpu.Processor(queue.context, job.id)
1419

    
1420
    # Create wrapper for setting thread name
1421
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1422
                                    proc.ExecOpCode)
1423

    
1424
    _EvaluateJobProcessorResult(queue.depmgr, job,
1425
                                _JobProcessor(queue, wrap_execop_fn, job)())
1426

    
1427
  @staticmethod
1428
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1429
    """Updates the worker thread name to include a short summary of the opcode.
1430

1431
    @param setname_fn: Callable setting worker thread name
1432
    @param execop_fn: Callable for executing opcode (usually
1433
                      L{mcpu.Processor.ExecOpCode})
1434

1435
    """
1436
    setname_fn(op)
1437
    try:
1438
      return execop_fn(op, *args, **kwargs)
1439
    finally:
1440
      setname_fn(None)
1441

    
1442
  @staticmethod
1443
  def _GetWorkerName(job, op):
1444
    """Sets the worker thread name.
1445

1446
    @type job: L{_QueuedJob}
1447
    @type op: L{opcodes.OpCode}
1448

1449
    """
1450
    parts = ["Job%s" % job.id]
1451

    
1452
    if op:
1453
      parts.append(op.TinySummary())
1454

    
1455
    return "/".join(parts)
1456

    
1457

    
1458
class _JobQueueWorkerPool(workerpool.WorkerPool):
1459
  """Simple class implementing a job-processing workerpool.
1460

1461
  """
1462
  def __init__(self, queue):
1463
    super(_JobQueueWorkerPool, self).__init__("Jq",
1464
                                              JOBQUEUE_THREADS,
1465
                                              _JobQueueWorker)
1466
    self.queue = queue
1467

    
1468

    
1469
class _JobDependencyManager:
1470
  """Keeps track of job dependencies.
1471

1472
  """
1473
  (WAIT,
1474
   ERROR,
1475
   CANCEL,
1476
   CONTINUE,
1477
   WRONGSTATUS) = range(1, 6)
1478

    
1479
  def __init__(self, getstatus_fn, enqueue_fn):
1480
    """Initializes this class.
1481

1482
    """
1483
    self._getstatus_fn = getstatus_fn
1484
    self._enqueue_fn = enqueue_fn
1485

    
1486
    self._waiters = {}
1487
    self._lock = locking.SharedLock("JobDepMgr")
1488

    
1489
  @locking.ssynchronized(_LOCK, shared=1)
1490
  def GetLockInfo(self, requested): # pylint: disable=W0613
1491
    """Retrieves information about waiting jobs.
1492

1493
    @type requested: set
1494
    @param requested: Requested information, see C{query.LQ_*}
1495

1496
    """
1497
    # No need to sort here, that's being done by the lock manager and query
1498
    # library. There are no priorities for notifying jobs, hence all show up as
1499
    # one item under "pending".
1500
    return [("job/%s" % job_id, None, None,
1501
             [("job", [job.id for job in waiters])])
1502
            for job_id, waiters in self._waiters.items()
1503
            if waiters]
1504

    
1505
  @locking.ssynchronized(_LOCK, shared=1)
1506
  def JobWaiting(self, job):
1507
    """Checks if a job is waiting.
1508

1509
    """
1510
    return compat.any(job in jobs
1511
                      for jobs in self._waiters.values())
1512

    
1513
  @locking.ssynchronized(_LOCK)
1514
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1515
    """Checks if a dependency job has the requested status.
1516

1517
    If the other job is not yet in a finalized status, the calling job will be
1518
    notified (re-added to the workerpool) at a later point.
1519

1520
    @type job: L{_QueuedJob}
1521
    @param job: Job object
1522
    @type dep_job_id: int
1523
    @param dep_job_id: ID of dependency job
1524
    @type dep_status: list
1525
    @param dep_status: Required status
1526

1527
    """
1528
    assert ht.TJobId(job.id)
1529
    assert ht.TJobId(dep_job_id)
1530
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1531

    
1532
    if job.id == dep_job_id:
1533
      return (self.ERROR, "Job can't depend on itself")
1534

    
1535
    # Get status of dependency job
1536
    try:
1537
      status = self._getstatus_fn(dep_job_id)
1538
    except errors.JobLost, err:
1539
      return (self.ERROR, "Dependency error: %s" % err)
1540

    
1541
    assert status in constants.JOB_STATUS_ALL
1542

    
1543
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1544

    
1545
    if status not in constants.JOBS_FINALIZED:
1546
      # Register for notification and wait for job to finish
1547
      job_id_waiters.add(job)
1548
      return (self.WAIT,
1549
              "Need to wait for job %s, wanted status '%s'" %
1550
              (dep_job_id, dep_status))
1551

    
1552
    # Remove from waiters list
1553
    if job in job_id_waiters:
1554
      job_id_waiters.remove(job)
1555

    
1556
    if (status == constants.JOB_STATUS_CANCELED and
1557
        constants.JOB_STATUS_CANCELED not in dep_status):
1558
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1559

    
1560
    elif not dep_status or status in dep_status:
1561
      return (self.CONTINUE,
1562
              "Dependency job %s finished with status '%s'" %
1563
              (dep_job_id, status))
1564

    
1565
    else:
1566
      return (self.WRONGSTATUS,
1567
              "Dependency job %s finished with status '%s',"
1568
              " not one of '%s' as required" %
1569
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1570

    
1571
  def _RemoveEmptyWaitersUnlocked(self):
1572
    """Remove all jobs without actual waiters.
1573

1574
    """
1575
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1576
                   if not waiters]:
1577
      del self._waiters[job_id]
1578

    
1579
  def NotifyWaiters(self, job_id):
1580
    """Notifies all jobs waiting for a certain job ID.
1581

1582
    @attention: Do not call until L{CheckAndRegister} returned a status other
1583
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1584
    @type job_id: int
1585
    @param job_id: Job ID
1586

1587
    """
1588
    assert ht.TJobId(job_id)
1589

    
1590
    self._lock.acquire()
1591
    try:
1592
      self._RemoveEmptyWaitersUnlocked()
1593

    
1594
      jobs = self._waiters.pop(job_id, None)
1595
    finally:
1596
      self._lock.release()
1597

    
1598
    if jobs:
1599
      # Re-add jobs to workerpool
1600
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1601
                    len(jobs), job_id)
1602
      self._enqueue_fn(jobs)
1603

    
1604

    
1605
def _RequireOpenQueue(fn):
1606
  """Decorator for "public" functions.
1607

1608
  This function should be used for all 'public' functions. That is,
1609
  functions usually called from other classes. Note that this should
1610
  be applied only to methods (not plain functions), since it expects
1611
  that the decorated function is called with a first argument that has
1612
  a '_queue_filelock' argument.
1613

1614
  @warning: Use this decorator only after locking.ssynchronized
1615

1616
  Example::
1617
    @locking.ssynchronized(_LOCK)
1618
    @_RequireOpenQueue
1619
    def Example(self):
1620
      pass
1621

1622
  """
1623
  def wrapper(self, *args, **kwargs):
1624
    # pylint: disable=W0212
1625
    assert self._queue_filelock is not None, "Queue should be open"
1626
    return fn(self, *args, **kwargs)
1627
  return wrapper
1628

    
1629

    
1630
def _RequireNonDrainedQueue(fn):
1631
  """Decorator checking for a non-drained queue.
1632

1633
  To be used with functions submitting new jobs.
1634

1635
  """
1636
  def wrapper(self, *args, **kwargs):
1637
    """Wrapper function.
1638

1639
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1640

1641
    """
1642
    # Ok when sharing the big job queue lock, as the drain file is created when
1643
    # the lock is exclusive.
1644
    # Needs access to protected member, pylint: disable=W0212
1645
    if self._drained:
1646
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1647

    
1648
    if not self._accepting_jobs:
1649
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1650

    
1651
    return fn(self, *args, **kwargs)
1652
  return wrapper
1653

    
1654

    
1655
class JobQueue(object):
1656
  """Queue used to manage the jobs.
1657

1658
  """
1659
  def __init__(self, context):
1660
    """Constructor for JobQueue.
1661

1662
    The constructor will initialize the job queue object and then
1663
    start loading the current jobs from disk, either for starting them
1664
    (if they were queue) or for aborting them (if they were already
1665
    running).
1666

1667
    @type context: GanetiContext
1668
    @param context: the context object for access to the configuration
1669
        data and other ganeti objects
1670

1671
    """
1672
    self.context = context
1673
    self._memcache = weakref.WeakValueDictionary()
1674
    self._my_hostname = netutils.Hostname.GetSysName()
1675

    
1676
    # The Big JobQueue lock. If a code block or method acquires it in shared
1677
    # mode safe it must guarantee concurrency with all the code acquiring it in
1678
    # shared mode, including itself. In order not to acquire it at all
1679
    # concurrency must be guaranteed with all code acquiring it in shared mode
1680
    # and all code acquiring it exclusively.
1681
    self._lock = locking.SharedLock("JobQueue")
1682

    
1683
    self.acquire = self._lock.acquire
1684
    self.release = self._lock.release
1685

    
1686
    # Accept jobs by default
1687
    self._accepting_jobs = True
1688

    
1689
    # Initialize the queue, and acquire the filelock.
1690
    # This ensures no other process is working on the job queue.
1691
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1692

    
1693
    # Read serial file
1694
    self._last_serial = jstore.ReadSerial()
1695
    assert self._last_serial is not None, ("Serial file was modified between"
1696
                                           " check in jstore and here")
1697

    
1698
    # Get initial list of nodes
1699
    self._nodes = dict((n.name, n.primary_ip)
1700
                       for n in self.context.cfg.GetAllNodesInfo().values()
1701
                       if n.master_candidate)
1702

    
1703
    # Remove master node
1704
    self._nodes.pop(self._my_hostname, None)
1705

    
1706
    # TODO: Check consistency across nodes
1707

    
1708
    self._queue_size = None
1709
    self._UpdateQueueSizeUnlocked()
1710
    assert ht.TInt(self._queue_size)
1711
    self._drained = jstore.CheckDrainFlag()
1712

    
1713
    # Job dependencies
1714
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1715
                                        self._EnqueueJobs)
1716
    self.context.glm.AddToLockMonitor(self.depmgr)
1717

    
1718
    # Setup worker pool
1719
    self._wpool = _JobQueueWorkerPool(self)
1720

    
1721
  def _PickupJobUnlocked(self, job_id):
1722
    """Load a job from the job queue
1723

1724
    Pick up a job that already is in the job queue and start/resume it.
1725

1726
    """
1727
    job = self._LoadJobUnlocked(job_id)
1728

    
1729
    if job is None:
1730
      logging.warning("Job %s could not be read", job_id)
1731
      return
1732

    
1733
    job.AddReasons(pickup=True)
1734

    
1735
    status = job.CalcStatus()
1736
    if status == constants.JOB_STATUS_QUEUED:
1737
      self._EnqueueJobsUnlocked([job])
1738
      logging.info("Restarting job %s", job.id)
1739

    
1740
    elif status in (constants.JOB_STATUS_RUNNING,
1741
                    constants.JOB_STATUS_WAITING,
1742
                    constants.JOB_STATUS_CANCELING):
1743
      logging.warning("Unfinished job %s found: %s", job.id, job)
1744

    
1745
      if status == constants.JOB_STATUS_WAITING:
1746
        job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1747
        self._EnqueueJobsUnlocked([job])
1748
        logging.info("Restarting job %s", job.id)
1749
      else:
1750
        to_encode = errors.OpExecError("Unclean master daemon shutdown")
1751
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1752
                              _EncodeOpError(to_encode))
1753
        job.Finalize()
1754

    
1755
    self.UpdateJobUnlocked(job)
1756

    
1757
  @locking.ssynchronized(_LOCK)
1758
  def PickupJob(self, job_id):
1759
    self._PickupJobUnlocked(job_id)
1760

    
1761
  def _GetRpc(self, address_list):
1762
    """Gets RPC runner with context.
1763

1764
    """
1765
    return rpc.JobQueueRunner(self.context, address_list)
1766

    
1767
  @locking.ssynchronized(_LOCK)
1768
  @_RequireOpenQueue
1769
  def AddNode(self, node):
1770
    """Register a new node with the queue.
1771

1772
    @type node: L{objects.Node}
1773
    @param node: the node object to be added
1774

1775
    """
1776
    node_name = node.name
1777
    assert node_name != self._my_hostname
1778

    
1779
    # Clean queue directory on added node
1780
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1781
    msg = result.fail_msg
1782
    if msg:
1783
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1784
                      node_name, msg)
1785

    
1786
    if not node.master_candidate:
1787
      # remove if existing, ignoring errors
1788
      self._nodes.pop(node_name, None)
1789
      # and skip the replication of the job ids
1790
      return
1791

    
1792
    # Upload the whole queue excluding archived jobs
1793
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1794

    
1795
    # Upload current serial file
1796
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1797

    
1798
    # Static address list
1799
    addrs = [node.primary_ip]
1800

    
1801
    for file_name in files:
1802
      # Read file content
1803
      content = utils.ReadFile(file_name)
1804

    
1805
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1806
                             file_name, content)
1807
      msg = result[node_name].fail_msg
1808
      if msg:
1809
        logging.error("Failed to upload file %s to node %s: %s",
1810
                      file_name, node_name, msg)
1811

    
1812
    # Set queue drained flag
1813
    result = \
1814
      self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1815
                                                       self._drained)
1816
    msg = result[node_name].fail_msg
1817
    if msg:
1818
      logging.error("Failed to set queue drained flag on node %s: %s",
1819
                    node_name, msg)
1820

    
1821
    self._nodes[node_name] = node.primary_ip
1822

    
1823
  @locking.ssynchronized(_LOCK)
1824
  @_RequireOpenQueue
1825
  def RemoveNode(self, node_name):
1826
    """Callback called when removing nodes from the cluster.
1827

1828
    @type node_name: str
1829
    @param node_name: the name of the node to remove
1830

1831
    """
1832
    self._nodes.pop(node_name, None)
1833

    
1834
  @staticmethod
1835
  def _CheckRpcResult(result, nodes, failmsg):
1836
    """Verifies the status of an RPC call.
1837

1838
    Since we aim to keep consistency should this node (the current
1839
    master) fail, we will log errors if our rpc fail, and especially
1840
    log the case when more than half of the nodes fails.
1841

1842
    @param result: the data as returned from the rpc call
1843
    @type nodes: list
1844
    @param nodes: the list of nodes we made the call to
1845
    @type failmsg: str
1846
    @param failmsg: the identifier to be used for logging
1847

1848
    """
1849
    failed = []
1850
    success = []
1851

    
1852
    for node in nodes:
1853
      msg = result[node].fail_msg
1854
      if msg:
1855
        failed.append(node)
1856
        logging.error("RPC call %s (%s) failed on node %s: %s",
1857
                      result[node].call, failmsg, node, msg)
1858
      else:
1859
        success.append(node)
1860

    
1861
    # +1 for the master node
1862
    if (len(success) + 1) < len(failed):
1863
      # TODO: Handle failing nodes
1864
      logging.error("More than half of the nodes failed")
1865

    
1866
  def _GetNodeIp(self):
1867
    """Helper for returning the node name/ip list.
1868

1869
    @rtype: (list, list)
1870
    @return: a tuple of two lists, the first one with the node
1871
        names and the second one with the node addresses
1872

1873
    """
1874
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1875
    name_list = self._nodes.keys()
1876
    addr_list = [self._nodes[name] for name in name_list]
1877
    return name_list, addr_list
1878

    
1879
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1880
    """Writes a file locally and then replicates it to all nodes.
1881

1882
    This function will replace the contents of a file on the local
1883
    node and then replicate it to all the other nodes we have.
1884

1885
    @type file_name: str
1886
    @param file_name: the path of the file to be replicated
1887
    @type data: str
1888
    @param data: the new contents of the file
1889
    @type replicate: boolean
1890
    @param replicate: whether to spread the changes to the remote nodes
1891

1892
    """
1893
    getents = runtime.GetEnts()
1894
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1895
                    gid=getents.daemons_gid,
1896
                    mode=constants.JOB_QUEUE_FILES_PERMS)
1897

    
1898
    if replicate:
1899
      names, addrs = self._GetNodeIp()
1900
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1901
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1902

    
1903
  def _RenameFilesUnlocked(self, rename):
1904
    """Renames a file locally and then replicate the change.
1905

1906
    This function will rename a file in the local queue directory
1907
    and then replicate this rename to all the other nodes we have.
1908

1909
    @type rename: list of (old, new)
1910
    @param rename: List containing tuples mapping old to new names
1911

1912
    """
1913
    # Rename them locally
1914
    for old, new in rename:
1915
      utils.RenameFile(old, new, mkdir=True)
1916

    
1917
    # ... and on all nodes
1918
    names, addrs = self._GetNodeIp()
1919
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1920
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1921

    
1922
  @staticmethod
1923
  def _GetJobPath(job_id):
1924
    """Returns the job file for a given job id.
1925

1926
    @type job_id: str
1927
    @param job_id: the job identifier
1928
    @rtype: str
1929
    @return: the path to the job file
1930

1931
    """
1932
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1933

    
1934
  @staticmethod
1935
  def _GetArchivedJobPath(job_id):
1936
    """Returns the archived job file for a give job id.
1937

1938
    @type job_id: str
1939
    @param job_id: the job identifier
1940
    @rtype: str
1941
    @return: the path to the archived job file
1942

1943
    """
1944
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1945
                          jstore.GetArchiveDirectory(job_id),
1946
                          "job-%s" % job_id)
1947

    
1948
  @staticmethod
1949
  def _DetermineJobDirectories(archived):
1950
    """Build list of directories containing job files.
1951

1952
    @type archived: bool
1953
    @param archived: Whether to include directories for archived jobs
1954
    @rtype: list
1955

1956
    """
1957
    result = [pathutils.QUEUE_DIR]
1958

    
1959
    if archived:
1960
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1961
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
1962
                        utils.ListVisibleFiles(archive_path)))
1963

    
1964
    return result
1965

    
1966
  @classmethod
1967
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1968
    """Return all known job IDs.
1969

1970
    The method only looks at disk because it's a requirement that all
1971
    jobs are present on disk (so in the _memcache we don't have any
1972
    extra IDs).
1973

1974
    @type sort: boolean
1975
    @param sort: perform sorting on the returned job ids
1976
    @rtype: list
1977
    @return: the list of job IDs
1978

1979
    """
1980
    jlist = []
1981

    
1982
    for path in cls._DetermineJobDirectories(archived):
1983
      for filename in utils.ListVisibleFiles(path):
1984
        m = constants.JOB_FILE_RE.match(filename)
1985
        if m:
1986
          jlist.append(int(m.group(1)))
1987

    
1988
    if sort:
1989
      jlist.sort()
1990
    return jlist
1991

    
1992
  def _LoadJobUnlocked(self, job_id):
1993
    """Loads a job from the disk or memory.
1994

1995
    Given a job id, this will return the cached job object if
1996
    existing, or try to load the job from the disk. If loading from
1997
    disk, it will also add the job to the cache.
1998

1999
    @type job_id: int
2000
    @param job_id: the job id
2001
    @rtype: L{_QueuedJob} or None
2002
    @return: either None or the job object
2003

2004
    """
2005
    assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!"
2006

    
2007
    job = self._memcache.get(job_id, None)
2008
    if job:
2009
      logging.debug("Found job %s in memcache", job_id)
2010
      assert job.writable, "Found read-only job in memcache"
2011
      return job
2012

    
2013
    try:
2014
      job = self._LoadJobFromDisk(job_id, False)
2015
      if job is None:
2016
        return job
2017
    except errors.JobFileCorrupted:
2018
      old_path = self._GetJobPath(job_id)
2019
      new_path = self._GetArchivedJobPath(job_id)
2020
      if old_path == new_path:
2021
        # job already archived (future case)
2022
        logging.exception("Can't parse job %s", job_id)
2023
      else:
2024
        # non-archived case
2025
        logging.exception("Can't parse job %s, will archive.", job_id)
2026
        self._RenameFilesUnlocked([(old_path, new_path)])
2027
      return None
2028

    
2029
    assert job.writable, "Job just loaded is not writable"
2030

    
2031
    self._memcache[job_id] = job
2032
    logging.debug("Added job %s to the cache", job_id)
2033
    return job
2034

    
2035
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2036
    """Load the given job file from disk.
2037

2038
    Given a job file, read, load and restore it in a _QueuedJob format.
2039

2040
    @type job_id: int
2041
    @param job_id: job identifier
2042
    @type try_archived: bool
2043
    @param try_archived: Whether to try loading an archived job
2044
    @rtype: L{_QueuedJob} or None
2045
    @return: either None or the job object
2046

2047
    """
2048
    path_functions = [(self._GetJobPath, False)]
2049

    
2050
    if try_archived:
2051
      path_functions.append((self._GetArchivedJobPath, True))
2052

    
2053
    raw_data = None
2054
    archived = None
2055

    
2056
    for (fn, archived) in path_functions:
2057
      filepath = fn(job_id)
2058
      logging.debug("Loading job from %s", filepath)
2059
      try:
2060
        raw_data = utils.ReadFile(filepath)
2061
      except EnvironmentError, err:
2062
        if err.errno != errno.ENOENT:
2063
          raise
2064
      else:
2065
        break
2066

    
2067
    if not raw_data:
2068
      return None
2069

    
2070
    if writable is None:
2071
      writable = not archived
2072

    
2073
    try:
2074
      data = serializer.LoadJson(raw_data)
2075
      job = _QueuedJob.Restore(self, data, writable, archived)
2076
    except Exception, err: # pylint: disable=W0703
2077
      raise errors.JobFileCorrupted(err)
2078

    
2079
    return job
2080

    
2081
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2082
    """Load the given job file from disk.
2083

2084
    Given a job file, read, load and restore it in a _QueuedJob format.
2085
    In case of error reading the job, it gets returned as None, and the
2086
    exception is logged.
2087

2088
    @type job_id: int
2089
    @param job_id: job identifier
2090
    @type try_archived: bool
2091
    @param try_archived: Whether to try loading an archived job
2092
    @rtype: L{_QueuedJob} or None
2093
    @return: either None or the job object
2094

2095
    """
2096
    try:
2097
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2098
    except (errors.JobFileCorrupted, EnvironmentError):
2099
      logging.exception("Can't load/parse job %s", job_id)
2100
      return None
2101

    
2102
  def _UpdateQueueSizeUnlocked(self):
2103
    """Update the queue size.
2104

2105
    """
2106
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2107

    
2108
  @locking.ssynchronized(_LOCK)
2109
  @_RequireOpenQueue
2110
  def SetDrainFlag(self, drain_flag):
2111
    """Sets the drain flag for the queue.
2112

2113
    @type drain_flag: boolean
2114
    @param drain_flag: Whether to set or unset the drain flag
2115

2116
    """
2117
    # Change flag locally
2118
    jstore.SetDrainFlag(drain_flag)
2119

    
2120
    self._drained = drain_flag
2121

    
2122
    # ... and on all nodes
2123
    (names, addrs) = self._GetNodeIp()
2124
    result = \
2125
      self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2126
    self._CheckRpcResult(result, self._nodes,
2127
                         "Setting queue drain flag to %s" % drain_flag)
2128

    
2129
    return True
2130

    
2131
  @classmethod
2132
  def SubmitJob(cls, ops):
2133
    """Create and store a new job.
2134

2135
    """
2136
    return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitJob(ops)
2137

    
2138
  @classmethod
2139
  def SubmitJobToDrainedQueue(cls, ops):
2140
    """Forcefully create and store a new job.
2141

2142
    Do so, even if the job queue is drained.
2143

2144
    """
2145
    return luxi.Client(address=pathutils.QUERY_SOCKET)\
2146
        .SubmitJobToDrainedQueue(ops)
2147

    
2148
  @classmethod
2149
  def SubmitManyJobs(cls, jobs):
2150
    """Create and store multiple jobs.
2151

2152
    """
2153
    return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitManyJobs(jobs)
2154

    
2155
  @staticmethod
2156
  def _FormatSubmitError(msg, ops):
2157
    """Formats errors which occurred while submitting a job.
2158

2159
    """
2160
    return ("%s; opcodes %s" %
2161
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2162

    
2163
  @staticmethod
2164
  def _ResolveJobDependencies(resolve_fn, deps):
2165
    """Resolves relative job IDs in dependencies.
2166

2167
    @type resolve_fn: callable
2168
    @param resolve_fn: Function to resolve a relative job ID
2169
    @type deps: list
2170
    @param deps: Dependencies
2171
    @rtype: tuple; (boolean, string or list)
2172
    @return: If successful (first tuple item), the returned list contains
2173
      resolved job IDs along with the requested status; if not successful,
2174
      the second element is an error message
2175

2176
    """
2177
    result = []
2178

    
2179
    for (dep_job_id, dep_status) in deps:
2180
      if ht.TRelativeJobId(dep_job_id):
2181
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2182
        try:
2183
          job_id = resolve_fn(dep_job_id)
2184
        except IndexError:
2185
          # Abort
2186
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2187
      else:
2188
        job_id = dep_job_id
2189

    
2190
      result.append((job_id, dep_status))
2191

    
2192
    return (True, result)
2193

    
2194
  @locking.ssynchronized(_LOCK)
2195
  def _EnqueueJobs(self, jobs):
2196
    """Helper function to add jobs to worker pool's queue.
2197

2198
    @type jobs: list
2199
    @param jobs: List of all jobs
2200

2201
    """
2202
    return self._EnqueueJobsUnlocked(jobs)
2203

    
2204
  def _EnqueueJobsUnlocked(self, jobs):
2205
    """Helper function to add jobs to worker pool's queue.
2206

2207
    @type jobs: list
2208
    @param jobs: List of all jobs
2209

2210
    """
2211
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2212
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2213
                             priority=[job.CalcPriority() for job in jobs],
2214
                             task_id=map(_GetIdAttr, jobs))
2215

    
2216
  def _GetJobStatusForDependencies(self, job_id):
2217
    """Gets the status of a job for dependencies.
2218

2219
    @type job_id: int
2220
    @param job_id: Job ID
2221
    @raise errors.JobLost: If job can't be found
2222

2223
    """
2224
    # Not using in-memory cache as doing so would require an exclusive lock
2225

    
2226
    # Try to load from disk
2227
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2228

    
2229
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2230

    
2231
    if job:
2232
      return job.CalcStatus()
2233

    
2234
    raise errors.JobLost("Job %s not found" % job_id)
2235

    
2236
  @_RequireOpenQueue
2237
  def UpdateJobUnlocked(self, job, replicate=True):
2238
    """Update a job's on disk storage.
2239

2240
    After a job has been modified, this function needs to be called in
2241
    order to write the changes to disk and replicate them to the other
2242
    nodes.
2243

2244
    @type job: L{_QueuedJob}
2245
    @param job: the changed job
2246
    @type replicate: boolean
2247
    @param replicate: whether to replicate the change to remote nodes
2248

2249
    """
2250
    if __debug__:
2251
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2252
      assert (finalized ^ (job.end_timestamp is None))
2253
      assert job.writable, "Can't update read-only job"
2254
      assert not job.archived, "Can't update archived job"
2255

    
2256
    filename = self._GetJobPath(job.id)
2257
    data = serializer.DumpJson(job.Serialize())
2258
    logging.debug("Writing job %s to %s", job.id, filename)
2259
    self._UpdateJobQueueFile(filename, data, replicate)
2260

    
2261
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2262
                        timeout):
2263
    """Waits for changes in a job.
2264

2265
    @type job_id: int
2266
    @param job_id: Job identifier
2267
    @type fields: list of strings
2268
    @param fields: Which fields to check for changes
2269
    @type prev_job_info: list or None
2270
    @param prev_job_info: Last job information returned
2271
    @type prev_log_serial: int
2272
    @param prev_log_serial: Last job message serial number
2273
    @type timeout: float
2274
    @param timeout: maximum time to wait in seconds
2275
    @rtype: tuple (job info, log entries)
2276
    @return: a tuple of the job information as required via
2277
        the fields parameter, and the log entries as a list
2278

2279
        if the job has not changed and the timeout has expired,
2280
        we instead return a special value,
2281
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2282
        as such by the clients
2283

2284
    """
2285
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2286
                             writable=False)
2287

    
2288
    helper = _WaitForJobChangesHelper()
2289

    
2290
    return helper(self._GetJobPath(job_id), load_fn,
2291
                  fields, prev_job_info, prev_log_serial, timeout)
2292

    
2293
  @locking.ssynchronized(_LOCK)
2294
  @_RequireOpenQueue
2295
  def CancelJob(self, job_id):
2296
    """Cancels a job.
2297

2298
    This will only succeed if the job has not started yet.
2299

2300
    @type job_id: int
2301
    @param job_id: job ID of job to be cancelled.
2302

2303
    """
2304
    logging.info("Cancelling job %s", job_id)
2305

    
2306
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2307

    
2308
  @locking.ssynchronized(_LOCK)
2309
  @_RequireOpenQueue
2310
  def ChangeJobPriority(self, job_id, priority):
2311
    """Changes a job's priority.
2312

2313
    @type job_id: int
2314
    @param job_id: ID of the job whose priority should be changed
2315
    @type priority: int
2316
    @param priority: New priority
2317

2318
    """
2319
    logging.info("Changing priority of job %s to %s", job_id, priority)
2320

    
2321
    if priority not in constants.OP_PRIO_SUBMIT_VALID:
2322
      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2323
      raise errors.GenericError("Invalid priority %s, allowed are %s" %
2324
                                (priority, allowed))
2325

    
2326
    def fn(job):
2327
      (success, msg) = job.ChangePriority(priority)
2328

    
2329
      if success:
2330
        try:
2331
          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2332
        except workerpool.NoSuchTask:
2333
          logging.debug("Job %s is not in workerpool at this time", job.id)
2334

    
2335
      return (success, msg)
2336

    
2337
    return self._ModifyJobUnlocked(job_id, fn)
2338

    
2339
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2340
    """Modifies a job.
2341

2342
    @type job_id: int
2343
    @param job_id: Job ID
2344
    @type mod_fn: callable
2345
    @param mod_fn: Modifying function, receiving job object as parameter,
2346
      returning tuple of (status boolean, message string)
2347

2348
    """
2349
    job = self._LoadJobUnlocked(job_id)
2350
    if not job:
2351
      logging.debug("Job %s not found", job_id)
2352
      return (False, "Job %s not found" % job_id)
2353

    
2354
    assert job.writable, "Can't modify read-only job"
2355
    assert not job.archived, "Can't modify archived job"
2356

    
2357
    (success, msg) = mod_fn(job)
2358

    
2359
    if success:
2360
      # If the job was finalized (e.g. cancelled), this is the final write
2361
      # allowed. The job can be archived anytime.
2362
      self.UpdateJobUnlocked(job)
2363

    
2364
    return (success, msg)
2365

    
2366
  @_RequireOpenQueue
2367
  def _ArchiveJobsUnlocked(self, jobs):
2368
    """Archives jobs.
2369

2370
    @type jobs: list of L{_QueuedJob}
2371
    @param jobs: Job objects
2372
    @rtype: int
2373
    @return: Number of archived jobs
2374

2375
    """
2376
    archive_jobs = []
2377
    rename_files = []
2378
    for job in jobs:
2379
      assert job.writable, "Can't archive read-only job"
2380
      assert not job.archived, "Can't cancel archived job"
2381

    
2382
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2383
        logging.debug("Job %s is not yet done", job.id)
2384
        continue
2385

    
2386
      archive_jobs.append(job)
2387

    
2388
      old = self._GetJobPath(job.id)
2389
      new = self._GetArchivedJobPath(job.id)
2390
      rename_files.append((old, new))
2391

    
2392
    # TODO: What if 1..n files fail to rename?
2393
    self._RenameFilesUnlocked(rename_files)
2394

    
2395
    logging.debug("Successfully archived job(s) %s",
2396
                  utils.CommaJoin(job.id for job in archive_jobs))
2397

    
2398
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2399
    # the files, we update the cached queue size from the filesystem. When we
2400
    # get around to fix the TODO: above, we can use the number of actually
2401
    # archived jobs to fix this.
2402
    self._UpdateQueueSizeUnlocked()
2403
    return len(archive_jobs)
2404

    
2405
  @locking.ssynchronized(_LOCK)
2406
  @_RequireOpenQueue
2407
  def ArchiveJob(self, job_id):
2408
    """Archives a job.
2409

2410
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2411

2412
    @type job_id: int
2413
    @param job_id: Job ID of job to be archived.
2414
    @rtype: bool
2415
    @return: Whether job was archived
2416

2417
    """
2418
    logging.info("Archiving job %s", job_id)
2419

    
2420
    job = self._LoadJobUnlocked(job_id)
2421
    if not job:
2422
      logging.debug("Job %s not found", job_id)
2423
      return False
2424

    
2425
    return self._ArchiveJobsUnlocked([job]) == 1
2426

    
2427
  @locking.ssynchronized(_LOCK)
2428
  @_RequireOpenQueue
2429
  def AutoArchiveJobs(self, age, timeout):
2430
    """Archives all jobs based on age.
2431

2432
    The method will archive all jobs which are older than the age
2433
    parameter. For jobs that don't have an end timestamp, the start
2434
    timestamp will be considered. The special '-1' age will cause
2435
    archival of all jobs (that are not running or queued).
2436

2437
    @type age: int
2438
    @param age: the minimum age in seconds
2439

2440
    """
2441
    logging.info("Archiving jobs with age more than %s seconds", age)
2442

    
2443
    now = time.time()
2444
    end_time = now + timeout
2445
    archived_count = 0
2446
    last_touched = 0
2447

    
2448
    all_job_ids = self._GetJobIDsUnlocked()
2449
    pending = []
2450
    for idx, job_id in enumerate(all_job_ids):
2451
      last_touched = idx + 1
2452

    
2453
      # Not optimal because jobs could be pending
2454
      # TODO: Measure average duration for job archival and take number of
2455
      # pending jobs into account.
2456
      if time.time() > end_time:
2457
        break
2458

    
2459
      # Returns None if the job failed to load
2460
      job = self._LoadJobUnlocked(job_id)
2461
      if job:
2462
        if job.end_timestamp is None:
2463
          if job.start_timestamp is None:
2464
            job_age = job.received_timestamp
2465
          else:
2466
            job_age = job.start_timestamp
2467
        else:
2468
          job_age = job.end_timestamp
2469

    
2470
        if age == -1 or now - job_age[0] > age:
2471
          pending.append(job)
2472

    
2473
          # Archive 10 jobs at a time
2474
          if len(pending) >= 10:
2475
            archived_count += self._ArchiveJobsUnlocked(pending)
2476
            pending = []
2477

    
2478
    if pending:
2479
      archived_count += self._ArchiveJobsUnlocked(pending)
2480

    
2481
    return (archived_count, len(all_job_ids) - last_touched)
2482

    
2483
  def _Query(self, fields, qfilter):
2484
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2485
                       namefield="id")
2486

    
2487
    # Archived jobs are only looked at if the "archived" field is referenced
2488
    # either as a requested field or in the filter. By default archived jobs
2489
    # are ignored.
2490
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2491

    
2492
    job_ids = qobj.RequestedNames()
2493

    
2494
    list_all = (job_ids is None)
2495

    
2496
    if list_all:
2497
      # Since files are added to/removed from the queue atomically, there's no
2498
      # risk of getting the job ids in an inconsistent state.
2499
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2500

    
2501
    jobs = []
2502

    
2503
    for job_id in job_ids:
2504
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2505
      if job is not None or not list_all:
2506
        jobs.append((job_id, job))
2507

    
2508
    return (qobj, jobs, list_all)
2509

    
2510
  def QueryJobs(self, fields, qfilter):
2511
    """Returns a list of jobs in queue.
2512

2513
    @type fields: sequence
2514
    @param fields: List of wanted fields
2515
    @type qfilter: None or query2 filter (list)
2516
    @param qfilter: Query filter
2517

2518
    """
2519
    (qobj, ctx, _) = self._Query(fields, qfilter)
2520

    
2521
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2522

    
2523
  def OldStyleQueryJobs(self, job_ids, fields):
2524
    """Returns a list of jobs in queue.
2525

2526
    @type job_ids: list
2527
    @param job_ids: sequence of job identifiers or None for all
2528
    @type fields: list
2529
    @param fields: names of fields to return
2530
    @rtype: list
2531
    @return: list one element per job, each element being list with
2532
        the requested fields
2533

2534
    """
2535
    # backwards compat:
2536
    job_ids = [int(jid) for jid in job_ids]
2537
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2538

    
2539
    (qobj, ctx, _) = self._Query(fields, qfilter)
2540

    
2541
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2542

    
2543
  @locking.ssynchronized(_LOCK)
2544
  def PrepareShutdown(self):
2545
    """Prepare to stop the job queue.
2546

2547
    Disables execution of jobs in the workerpool and returns whether there are
2548
    any jobs currently running. If the latter is the case, the job queue is not
2549
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2550
    be called without interfering with any job. Queued and unfinished jobs will
2551
    be resumed next time.
2552

2553
    Once this function has been called no new job submissions will be accepted
2554
    (see L{_RequireNonDrainedQueue}).
2555

2556
    @rtype: bool
2557
    @return: Whether there are any running jobs
2558

2559
    """
2560
    if self._accepting_jobs:
2561
      self._accepting_jobs = False
2562

    
2563
      # Tell worker pool to stop processing pending tasks
2564
      self._wpool.SetActive(False)
2565

    
2566
    return self._wpool.HasRunningTasks()
2567

    
2568
  def AcceptingJobsUnlocked(self):
2569
    """Returns whether jobs are accepted.
2570

2571
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2572
    queue is shutting down.
2573

2574
    @rtype: bool
2575

2576
    """
2577
    return self._accepting_jobs
2578

    
2579
  @locking.ssynchronized(_LOCK)
2580
  @_RequireOpenQueue
2581
  def Shutdown(self):
2582
    """Stops the job queue.
2583

2584
    This shutdowns all the worker threads an closes the queue.
2585

2586
    """
2587
    self._wpool.TerminateWorkers()
2588

    
2589
    self._queue_filelock.Close()
2590
    self._queue_filelock = None