Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ fcd70b89

History | View | Annotate | Download (80.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import time
35
import weakref
36
import threading
37
import itertools
38
import operator
39

    
40
try:
41
  # pylint: disable=E0611
42
  from pyinotify import pyinotify
43
except ImportError:
44
  import pyinotify
45

    
46
from ganeti import asyncnotifier
47
from ganeti import constants
48
from ganeti import serializer
49
from ganeti import workerpool
50
from ganeti import locking
51
from ganeti import opcodes
52
from ganeti import opcodes_base
53
from ganeti import errors
54
from ganeti import mcpu
55
from ganeti import utils
56
from ganeti import jstore
57
from ganeti import rpc
58
from ganeti import runtime
59
from ganeti import netutils
60
from ganeti import compat
61
from ganeti import ht
62
from ganeti import query
63
from ganeti import qlang
64
from ganeti import pathutils
65
from ganeti import vcluster
66

    
67

    
68
JOBQUEUE_THREADS = 25
69

    
70
# member lock names to be passed to @ssynchronized decorator
71
_LOCK = "_lock"
72
_QUEUE = "_queue"
73

    
74
#: Retrieves "id" attribute
75
_GetIdAttr = operator.attrgetter("id")
76

    
77

    
78
class CancelJob(Exception):
79
  """Special exception to cancel a job.
80

81
  """
82

    
83

    
84
class QueueShutdown(Exception):
85
  """Special exception to abort a job when the job queue is shutting down.
86

87
  """
88

    
89

    
90
def TimeStampNow():
91
  """Returns the current timestamp.
92

93
  @rtype: tuple
94
  @return: the current time in the (seconds, microseconds) format
95

96
  """
97
  return utils.SplitTime(time.time())
98

    
99

    
100
def _CallJqUpdate(runner, names, file_name, content):
101
  """Updates job queue file after virtualizing filename.
102

103
  """
104
  virt_file_name = vcluster.MakeVirtualPath(file_name)
105
  return runner.call_jobqueue_update(names, virt_file_name, content)
106

    
107

    
108
class _SimpleJobQuery:
109
  """Wrapper for job queries.
110

111
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
112

113
  """
114
  def __init__(self, fields):
115
    """Initializes this class.
116

117
    """
118
    self._query = query.Query(query.JOB_FIELDS, fields)
119

    
120
  def __call__(self, job):
121
    """Executes a job query using cached field list.
122

123
    """
124
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
125

    
126

    
127
class _QueuedOpCode(object):
128
  """Encapsulates an opcode object.
129

130
  @ivar log: holds the execution log and consists of tuples
131
  of the form C{(log_serial, timestamp, level, message)}
132
  @ivar input: the OpCode we encapsulate
133
  @ivar status: the current status
134
  @ivar result: the result of the LU execution
135
  @ivar start_timestamp: timestamp for the start of the execution
136
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
137
  @ivar stop_timestamp: timestamp for the end of the execution
138

139
  """
140
  __slots__ = ["input", "status", "result", "log", "priority",
141
               "start_timestamp", "exec_timestamp", "end_timestamp",
142
               "__weakref__"]
143

    
144
  def __init__(self, op):
145
    """Initializes instances of this class.
146

147
    @type op: L{opcodes.OpCode}
148
    @param op: the opcode we encapsulate
149

150
    """
151
    self.input = op
152
    self.status = constants.OP_STATUS_QUEUED
153
    self.result = None
154
    self.log = []
155
    self.start_timestamp = None
156
    self.exec_timestamp = None
157
    self.end_timestamp = None
158

    
159
    # Get initial priority (it might change during the lifetime of this opcode)
160
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
161

    
162
  @classmethod
163
  def Restore(cls, state):
164
    """Restore the _QueuedOpCode from the serialized form.
165

166
    @type state: dict
167
    @param state: the serialized state
168
    @rtype: _QueuedOpCode
169
    @return: a new _QueuedOpCode instance
170

171
    """
172
    obj = _QueuedOpCode.__new__(cls)
173
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
174
    obj.status = state["status"]
175
    obj.result = state["result"]
176
    obj.log = state["log"]
177
    obj.start_timestamp = state.get("start_timestamp", None)
178
    obj.exec_timestamp = state.get("exec_timestamp", None)
179
    obj.end_timestamp = state.get("end_timestamp", None)
180
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
181
    return obj
182

    
183
  def Serialize(self):
184
    """Serializes this _QueuedOpCode.
185

186
    @rtype: dict
187
    @return: the dictionary holding the serialized state
188

189
    """
190
    return {
191
      "input": self.input.__getstate__(),
192
      "status": self.status,
193
      "result": self.result,
194
      "log": self.log,
195
      "start_timestamp": self.start_timestamp,
196
      "exec_timestamp": self.exec_timestamp,
197
      "end_timestamp": self.end_timestamp,
198
      "priority": self.priority,
199
      }
200

    
201

    
202
class _QueuedJob(object):
203
  """In-memory job representation.
204

205
  This is what we use to track the user-submitted jobs. Locking must
206
  be taken care of by users of this class.
207

208
  @type queue: L{JobQueue}
209
  @ivar queue: the parent queue
210
  @ivar id: the job ID
211
  @type ops: list
212
  @ivar ops: the list of _QueuedOpCode that constitute the job
213
  @type log_serial: int
214
  @ivar log_serial: holds the index for the next log entry
215
  @ivar received_timestamp: the timestamp for when the job was received
216
  @ivar start_timestmap: the timestamp for start of execution
217
  @ivar end_timestamp: the timestamp for end of execution
218
  @ivar writable: Whether the job is allowed to be modified
219

220
  """
221
  # pylint: disable=W0212
222
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
223
               "received_timestamp", "start_timestamp", "end_timestamp",
224
               "__weakref__", "processor_lock", "writable", "archived"]
225

    
226
  def _AddReasons(self):
227
    """Extend the reason trail
228

229
    Add the reason for all the opcodes of this job to be executed.
230

231
    """
232
    count = 0
233
    for queued_op in self.ops:
234
      op = queued_op.input
235
      reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__)
236
      reason_text = "job=%d;index=%d" % (self.id, count)
237
      reason = getattr(op, "reason", [])
238
      reason.append((reason_src, reason_text, utils.EpochNano()))
239
      op.reason = reason
240
      count = count + 1
241

    
242
  def __init__(self, queue, job_id, ops, writable):
243
    """Constructor for the _QueuedJob.
244

245
    @type queue: L{JobQueue}
246
    @param queue: our parent queue
247
    @type job_id: job_id
248
    @param job_id: our job id
249
    @type ops: list
250
    @param ops: the list of opcodes we hold, which will be encapsulated
251
        in _QueuedOpCodes
252
    @type writable: bool
253
    @param writable: Whether job can be modified
254

255
    """
256
    if not ops:
257
      raise errors.GenericError("A job needs at least one opcode")
258

    
259
    self.queue = queue
260
    self.id = int(job_id)
261
    self.ops = [_QueuedOpCode(op) for op in ops]
262
    self._AddReasons()
263
    self.log_serial = 0
264
    self.received_timestamp = TimeStampNow()
265
    self.start_timestamp = None
266
    self.end_timestamp = None
267
    self.archived = False
268

    
269
    self._InitInMemory(self, writable)
270

    
271
    assert not self.archived, "New jobs can not be marked as archived"
272

    
273
  @staticmethod
274
  def _InitInMemory(obj, writable):
275
    """Initializes in-memory variables.
276

277
    """
278
    obj.writable = writable
279
    obj.ops_iter = None
280
    obj.cur_opctx = None
281

    
282
    # Read-only jobs are not processed and therefore don't need a lock
283
    if writable:
284
      obj.processor_lock = threading.Lock()
285
    else:
286
      obj.processor_lock = None
287

    
288
  def __repr__(self):
289
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
290
              "id=%s" % self.id,
291
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
292

    
293
    return "<%s at %#x>" % (" ".join(status), id(self))
294

    
295
  @classmethod
296
  def Restore(cls, queue, state, writable, archived):
297
    """Restore a _QueuedJob from serialized state:
298

299
    @type queue: L{JobQueue}
300
    @param queue: to which queue the restored job belongs
301
    @type state: dict
302
    @param state: the serialized state
303
    @type writable: bool
304
    @param writable: Whether job can be modified
305
    @type archived: bool
306
    @param archived: Whether job was already archived
307
    @rtype: _JobQueue
308
    @return: the restored _JobQueue instance
309

310
    """
311
    obj = _QueuedJob.__new__(cls)
312
    obj.queue = queue
313
    obj.id = int(state["id"])
314
    obj.received_timestamp = state.get("received_timestamp", None)
315
    obj.start_timestamp = state.get("start_timestamp", None)
316
    obj.end_timestamp = state.get("end_timestamp", None)
317
    obj.archived = archived
318

    
319
    obj.ops = []
320
    obj.log_serial = 0
321
    for op_state in state["ops"]:
322
      op = _QueuedOpCode.Restore(op_state)
323
      for log_entry in op.log:
324
        obj.log_serial = max(obj.log_serial, log_entry[0])
325
      obj.ops.append(op)
326

    
327
    cls._InitInMemory(obj, writable)
328

    
329
    return obj
330

    
331
  def Serialize(self):
332
    """Serialize the _JobQueue instance.
333

334
    @rtype: dict
335
    @return: the serialized state
336

337
    """
338
    return {
339
      "id": self.id,
340
      "ops": [op.Serialize() for op in self.ops],
341
      "start_timestamp": self.start_timestamp,
342
      "end_timestamp": self.end_timestamp,
343
      "received_timestamp": self.received_timestamp,
344
      }
345

    
346
  def CalcStatus(self):
347
    """Compute the status of this job.
348

349
    This function iterates over all the _QueuedOpCodes in the job and
350
    based on their status, computes the job status.
351

352
    The algorithm is:
353
      - if we find a cancelled, or finished with error, the job
354
        status will be the same
355
      - otherwise, the last opcode with the status one of:
356
          - waitlock
357
          - canceling
358
          - running
359

360
        will determine the job status
361

362
      - otherwise, it means either all opcodes are queued, or success,
363
        and the job status will be the same
364

365
    @return: the job status
366

367
    """
368
    status = constants.JOB_STATUS_QUEUED
369

    
370
    all_success = True
371
    for op in self.ops:
372
      if op.status == constants.OP_STATUS_SUCCESS:
373
        continue
374

    
375
      all_success = False
376

    
377
      if op.status == constants.OP_STATUS_QUEUED:
378
        pass
379
      elif op.status == constants.OP_STATUS_WAITING:
380
        status = constants.JOB_STATUS_WAITING
381
      elif op.status == constants.OP_STATUS_RUNNING:
382
        status = constants.JOB_STATUS_RUNNING
383
      elif op.status == constants.OP_STATUS_CANCELING:
384
        status = constants.JOB_STATUS_CANCELING
385
        break
386
      elif op.status == constants.OP_STATUS_ERROR:
387
        status = constants.JOB_STATUS_ERROR
388
        # The whole job fails if one opcode failed
389
        break
390
      elif op.status == constants.OP_STATUS_CANCELED:
391
        status = constants.OP_STATUS_CANCELED
392
        break
393

    
394
    if all_success:
395
      status = constants.JOB_STATUS_SUCCESS
396

    
397
    return status
398

    
399
  def CalcPriority(self):
400
    """Gets the current priority for this job.
401

402
    Only unfinished opcodes are considered. When all are done, the default
403
    priority is used.
404

405
    @rtype: int
406

407
    """
408
    priorities = [op.priority for op in self.ops
409
                  if op.status not in constants.OPS_FINALIZED]
410

    
411
    if not priorities:
412
      # All opcodes are done, assume default priority
413
      return constants.OP_PRIO_DEFAULT
414

    
415
    return min(priorities)
416

    
417
  def GetLogEntries(self, newer_than):
418
    """Selectively returns the log entries.
419

420
    @type newer_than: None or int
421
    @param newer_than: if this is None, return all log entries,
422
        otherwise return only the log entries with serial higher
423
        than this value
424
    @rtype: list
425
    @return: the list of the log entries selected
426

427
    """
428
    if newer_than is None:
429
      serial = -1
430
    else:
431
      serial = newer_than
432

    
433
    entries = []
434
    for op in self.ops:
435
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
436

    
437
    return entries
438

    
439
  def GetInfo(self, fields):
440
    """Returns information about a job.
441

442
    @type fields: list
443
    @param fields: names of fields to return
444
    @rtype: list
445
    @return: list with one element for each field
446
    @raise errors.OpExecError: when an invalid field
447
        has been passed
448

449
    """
450
    return _SimpleJobQuery(fields)(self)
451

    
452
  def MarkUnfinishedOps(self, status, result):
453
    """Mark unfinished opcodes with a given status and result.
454

455
    This is an utility function for marking all running or waiting to
456
    be run opcodes with a given status. Opcodes which are already
457
    finalised are not changed.
458

459
    @param status: a given opcode status
460
    @param result: the opcode result
461

462
    """
463
    not_marked = True
464
    for op in self.ops:
465
      if op.status in constants.OPS_FINALIZED:
466
        assert not_marked, "Finalized opcodes found after non-finalized ones"
467
        continue
468
      op.status = status
469
      op.result = result
470
      not_marked = False
471

    
472
  def Finalize(self):
473
    """Marks the job as finalized.
474

475
    """
476
    self.end_timestamp = TimeStampNow()
477

    
478
  def Cancel(self):
479
    """Marks job as canceled/-ing if possible.
480

481
    @rtype: tuple; (bool, string)
482
    @return: Boolean describing whether job was successfully canceled or marked
483
      as canceling and a text message
484

485
    """
486
    status = self.CalcStatus()
487

    
488
    if status == constants.JOB_STATUS_QUEUED:
489
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
490
                             "Job canceled by request")
491
      self.Finalize()
492
      return (True, "Job %s canceled" % self.id)
493

    
494
    elif status == constants.JOB_STATUS_WAITING:
495
      # The worker will notice the new status and cancel the job
496
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
497
      return (True, "Job %s will be canceled" % self.id)
498

    
499
    else:
500
      logging.debug("Job %s is no longer waiting in the queue", self.id)
501
      return (False, "Job %s is no longer waiting in the queue" % self.id)
502

    
503
  def ChangePriority(self, priority):
504
    """Changes the job priority.
505

506
    @type priority: int
507
    @param priority: New priority
508
    @rtype: tuple; (bool, string)
509
    @return: Boolean describing whether job's priority was successfully changed
510
      and a text message
511

512
    """
513
    status = self.CalcStatus()
514

    
515
    if status in constants.JOBS_FINALIZED:
516
      return (False, "Job %s is finished" % self.id)
517
    elif status == constants.JOB_STATUS_CANCELING:
518
      return (False, "Job %s is cancelling" % self.id)
519
    else:
520
      assert status in (constants.JOB_STATUS_QUEUED,
521
                        constants.JOB_STATUS_WAITING,
522
                        constants.JOB_STATUS_RUNNING)
523

    
524
      changed = False
525
      for op in self.ops:
526
        if (op.status == constants.OP_STATUS_RUNNING or
527
            op.status in constants.OPS_FINALIZED):
528
          assert not changed, \
529
            ("Found opcode for which priority should not be changed after"
530
             " priority has been changed for previous opcodes")
531
          continue
532

    
533
        assert op.status in (constants.OP_STATUS_QUEUED,
534
                             constants.OP_STATUS_WAITING)
535

    
536
        changed = True
537

    
538
        # Set new priority (doesn't modify opcode input)
539
        op.priority = priority
540

    
541
      if changed:
542
        return (True, ("Priorities of pending opcodes for job %s have been"
543
                       " changed to %s" % (self.id, priority)))
544
      else:
545
        return (False, "Job %s had no pending opcodes" % self.id)
546

    
547

    
548
class _OpExecCallbacks(mcpu.OpExecCbBase):
549
  def __init__(self, queue, job, op):
550
    """Initializes this class.
551

552
    @type queue: L{JobQueue}
553
    @param queue: Job queue
554
    @type job: L{_QueuedJob}
555
    @param job: Job object
556
    @type op: L{_QueuedOpCode}
557
    @param op: OpCode
558

559
    """
560
    assert queue, "Queue is missing"
561
    assert job, "Job is missing"
562
    assert op, "Opcode is missing"
563

    
564
    self._queue = queue
565
    self._job = job
566
    self._op = op
567

    
568
  def _CheckCancel(self):
569
    """Raises an exception to cancel the job if asked to.
570

571
    """
572
    # Cancel here if we were asked to
573
    if self._op.status == constants.OP_STATUS_CANCELING:
574
      logging.debug("Canceling opcode")
575
      raise CancelJob()
576

    
577
    # See if queue is shutting down
578
    if not self._queue.AcceptingJobsUnlocked():
579
      logging.debug("Queue is shutting down")
580
      raise QueueShutdown()
581

    
582
  @locking.ssynchronized(_QUEUE, shared=1)
583
  def NotifyStart(self):
584
    """Mark the opcode as running, not lock-waiting.
585

586
    This is called from the mcpu code as a notifier function, when the LU is
587
    finally about to start the Exec() method. Of course, to have end-user
588
    visible results, the opcode must be initially (before calling into
589
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
590

591
    """
592
    assert self._op in self._job.ops
593
    assert self._op.status in (constants.OP_STATUS_WAITING,
594
                               constants.OP_STATUS_CANCELING)
595

    
596
    # Cancel here if we were asked to
597
    self._CheckCancel()
598

    
599
    logging.debug("Opcode is now running")
600

    
601
    self._op.status = constants.OP_STATUS_RUNNING
602
    self._op.exec_timestamp = TimeStampNow()
603

    
604
    # And finally replicate the job status
605
    self._queue.UpdateJobUnlocked(self._job)
606

    
607
  @locking.ssynchronized(_QUEUE, shared=1)
608
  def _AppendFeedback(self, timestamp, log_type, log_msg):
609
    """Internal feedback append function, with locks
610

611
    """
612
    self._job.log_serial += 1
613
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
614
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
615

    
616
  def Feedback(self, *args):
617
    """Append a log entry.
618

619
    """
620
    assert len(args) < 3
621

    
622
    if len(args) == 1:
623
      log_type = constants.ELOG_MESSAGE
624
      log_msg = args[0]
625
    else:
626
      (log_type, log_msg) = args
627

    
628
    # The time is split to make serialization easier and not lose
629
    # precision.
630
    timestamp = utils.SplitTime(time.time())
631
    self._AppendFeedback(timestamp, log_type, log_msg)
632

    
633
  def CurrentPriority(self):
634
    """Returns current priority for opcode.
635

636
    """
637
    assert self._op.status in (constants.OP_STATUS_WAITING,
638
                               constants.OP_STATUS_CANCELING)
639

    
640
    # Cancel here if we were asked to
641
    self._CheckCancel()
642

    
643
    return self._op.priority
644

    
645
  def SubmitManyJobs(self, jobs):
646
    """Submits jobs for processing.
647

648
    See L{JobQueue.SubmitManyJobs}.
649

650
    """
651
    # Locking is done in job queue
652
    return self._queue.SubmitManyJobs(jobs)
653

    
654

    
655
class _JobChangesChecker(object):
656
  def __init__(self, fields, prev_job_info, prev_log_serial):
657
    """Initializes this class.
658

659
    @type fields: list of strings
660
    @param fields: Fields requested by LUXI client
661
    @type prev_job_info: string
662
    @param prev_job_info: previous job info, as passed by the LUXI client
663
    @type prev_log_serial: string
664
    @param prev_log_serial: previous job serial, as passed by the LUXI client
665

666
    """
667
    self._squery = _SimpleJobQuery(fields)
668
    self._prev_job_info = prev_job_info
669
    self._prev_log_serial = prev_log_serial
670

    
671
  def __call__(self, job):
672
    """Checks whether job has changed.
673

674
    @type job: L{_QueuedJob}
675
    @param job: Job object
676

677
    """
678
    assert not job.writable, "Expected read-only job"
679

    
680
    status = job.CalcStatus()
681
    job_info = self._squery(job)
682
    log_entries = job.GetLogEntries(self._prev_log_serial)
683

    
684
    # Serializing and deserializing data can cause type changes (e.g. from
685
    # tuple to list) or precision loss. We're doing it here so that we get
686
    # the same modifications as the data received from the client. Without
687
    # this, the comparison afterwards might fail without the data being
688
    # significantly different.
689
    # TODO: we just deserialized from disk, investigate how to make sure that
690
    # the job info and log entries are compatible to avoid this further step.
691
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
692
    # efficient, though floats will be tricky
693
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
694
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
695

    
696
    # Don't even try to wait if the job is no longer running, there will be
697
    # no changes.
698
    if (status not in (constants.JOB_STATUS_QUEUED,
699
                       constants.JOB_STATUS_RUNNING,
700
                       constants.JOB_STATUS_WAITING) or
701
        job_info != self._prev_job_info or
702
        (log_entries and self._prev_log_serial != log_entries[0][0])):
703
      logging.debug("Job %s changed", job.id)
704
      return (job_info, log_entries)
705

    
706
    return None
707

    
708

    
709
class _JobFileChangesWaiter(object):
710
  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
711
    """Initializes this class.
712

713
    @type filename: string
714
    @param filename: Path to job file
715
    @raises errors.InotifyError: if the notifier cannot be setup
716

717
    """
718
    self._wm = _inotify_wm_cls()
719
    self._inotify_handler = \
720
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
721
    self._notifier = \
722
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
723
    try:
724
      self._inotify_handler.enable()
725
    except Exception:
726
      # pyinotify doesn't close file descriptors automatically
727
      self._notifier.stop()
728
      raise
729

    
730
  def _OnInotify(self, notifier_enabled):
731
    """Callback for inotify.
732

733
    """
734
    if not notifier_enabled:
735
      self._inotify_handler.enable()
736

    
737
  def Wait(self, timeout):
738
    """Waits for the job file to change.
739

740
    @type timeout: float
741
    @param timeout: Timeout in seconds
742
    @return: Whether there have been events
743

744
    """
745
    assert timeout >= 0
746
    have_events = self._notifier.check_events(timeout * 1000)
747
    if have_events:
748
      self._notifier.read_events()
749
    self._notifier.process_events()
750
    return have_events
751

    
752
  def Close(self):
753
    """Closes underlying notifier and its file descriptor.
754

755
    """
756
    self._notifier.stop()
757

    
758

    
759
class _JobChangesWaiter(object):
760
  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
761
    """Initializes this class.
762

763
    @type filename: string
764
    @param filename: Path to job file
765

766
    """
767
    self._filewaiter = None
768
    self._filename = filename
769
    self._waiter_cls = _waiter_cls
770

    
771
  def Wait(self, timeout):
772
    """Waits for a job to change.
773

774
    @type timeout: float
775
    @param timeout: Timeout in seconds
776
    @return: Whether there have been events
777

778
    """
779
    if self._filewaiter:
780
      return self._filewaiter.Wait(timeout)
781

    
782
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
783
    # If this point is reached, return immediately and let caller check the job
784
    # file again in case there were changes since the last check. This avoids a
785
    # race condition.
786
    self._filewaiter = self._waiter_cls(self._filename)
787

    
788
    return True
789

    
790
  def Close(self):
791
    """Closes underlying waiter.
792

793
    """
794
    if self._filewaiter:
795
      self._filewaiter.Close()
796

    
797

    
798
class _WaitForJobChangesHelper(object):
799
  """Helper class using inotify to wait for changes in a job file.
800

801
  This class takes a previous job status and serial, and alerts the client when
802
  the current job status has changed.
803

804
  """
805
  @staticmethod
806
  def _CheckForChanges(counter, job_load_fn, check_fn):
807
    if counter.next() > 0:
808
      # If this isn't the first check the job is given some more time to change
809
      # again. This gives better performance for jobs generating many
810
      # changes/messages.
811
      time.sleep(0.1)
812

    
813
    job = job_load_fn()
814
    if not job:
815
      raise errors.JobLost()
816

    
817
    result = check_fn(job)
818
    if result is None:
819
      raise utils.RetryAgain()
820

    
821
    return result
822

    
823
  def __call__(self, filename, job_load_fn,
824
               fields, prev_job_info, prev_log_serial, timeout,
825
               _waiter_cls=_JobChangesWaiter):
826
    """Waits for changes on a job.
827

828
    @type filename: string
829
    @param filename: File on which to wait for changes
830
    @type job_load_fn: callable
831
    @param job_load_fn: Function to load job
832
    @type fields: list of strings
833
    @param fields: Which fields to check for changes
834
    @type prev_job_info: list or None
835
    @param prev_job_info: Last job information returned
836
    @type prev_log_serial: int
837
    @param prev_log_serial: Last job message serial number
838
    @type timeout: float
839
    @param timeout: maximum time to wait in seconds
840

841
    """
842
    counter = itertools.count()
843
    try:
844
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
845
      waiter = _waiter_cls(filename)
846
      try:
847
        return utils.Retry(compat.partial(self._CheckForChanges,
848
                                          counter, job_load_fn, check_fn),
849
                           utils.RETRY_REMAINING_TIME, timeout,
850
                           wait_fn=waiter.Wait)
851
      finally:
852
        waiter.Close()
853
    except errors.JobLost:
854
      return None
855
    except utils.RetryTimeout:
856
      return constants.JOB_NOTCHANGED
857

    
858

    
859
def _EncodeOpError(err):
860
  """Encodes an error which occurred while processing an opcode.
861

862
  """
863
  if isinstance(err, errors.GenericError):
864
    to_encode = err
865
  else:
866
    to_encode = errors.OpExecError(str(err))
867

    
868
  return errors.EncodeException(to_encode)
869

    
870

    
871
class _TimeoutStrategyWrapper:
872
  def __init__(self, fn):
873
    """Initializes this class.
874

875
    """
876
    self._fn = fn
877
    self._next = None
878

    
879
  def _Advance(self):
880
    """Gets the next timeout if necessary.
881

882
    """
883
    if self._next is None:
884
      self._next = self._fn()
885

    
886
  def Peek(self):
887
    """Returns the next timeout.
888

889
    """
890
    self._Advance()
891
    return self._next
892

    
893
  def Next(self):
894
    """Returns the current timeout and advances the internal state.
895

896
    """
897
    self._Advance()
898
    result = self._next
899
    self._next = None
900
    return result
901

    
902

    
903
class _OpExecContext:
904
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
905
    """Initializes this class.
906

907
    """
908
    self.op = op
909
    self.index = index
910
    self.log_prefix = log_prefix
911
    self.summary = op.input.Summary()
912

    
913
    # Create local copy to modify
914
    if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
915
      self.jobdeps = op.input.depends[:]
916
    else:
917
      self.jobdeps = None
918

    
919
    self._timeout_strategy_factory = timeout_strategy_factory
920
    self._ResetTimeoutStrategy()
921

    
922
  def _ResetTimeoutStrategy(self):
923
    """Creates a new timeout strategy.
924

925
    """
926
    self._timeout_strategy = \
927
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
928

    
929
  def CheckPriorityIncrease(self):
930
    """Checks whether priority can and should be increased.
931

932
    Called when locks couldn't be acquired.
933

934
    """
935
    op = self.op
936

    
937
    # Exhausted all retries and next round should not use blocking acquire
938
    # for locks?
939
    if (self._timeout_strategy.Peek() is None and
940
        op.priority > constants.OP_PRIO_HIGHEST):
941
      logging.debug("Increasing priority")
942
      op.priority -= 1
943
      self._ResetTimeoutStrategy()
944
      return True
945

    
946
    return False
947

    
948
  def GetNextLockTimeout(self):
949
    """Returns the next lock acquire timeout.
950

951
    """
952
    return self._timeout_strategy.Next()
953

    
954

    
955
class _JobProcessor(object):
956
  (DEFER,
957
   WAITDEP,
958
   FINISHED) = range(1, 4)
959

    
960
  def __init__(self, queue, opexec_fn, job,
961
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
962
    """Initializes this class.
963

964
    """
965
    self.queue = queue
966
    self.opexec_fn = opexec_fn
967
    self.job = job
968
    self._timeout_strategy_factory = _timeout_strategy_factory
969

    
970
  @staticmethod
971
  def _FindNextOpcode(job, timeout_strategy_factory):
972
    """Locates the next opcode to run.
973

974
    @type job: L{_QueuedJob}
975
    @param job: Job object
976
    @param timeout_strategy_factory: Callable to create new timeout strategy
977

978
    """
979
    # Create some sort of a cache to speed up locating next opcode for future
980
    # lookups
981
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
982
    # pending and one for processed ops.
983
    if job.ops_iter is None:
984
      job.ops_iter = enumerate(job.ops)
985

    
986
    # Find next opcode to run
987
    while True:
988
      try:
989
        (idx, op) = job.ops_iter.next()
990
      except StopIteration:
991
        raise errors.ProgrammerError("Called for a finished job")
992

    
993
      if op.status == constants.OP_STATUS_RUNNING:
994
        # Found an opcode already marked as running
995
        raise errors.ProgrammerError("Called for job marked as running")
996

    
997
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
998
                             timeout_strategy_factory)
999

    
1000
      if op.status not in constants.OPS_FINALIZED:
1001
        return opctx
1002

    
1003
      # This is a job that was partially completed before master daemon
1004
      # shutdown, so it can be expected that some opcodes are already
1005
      # completed successfully (if any did error out, then the whole job
1006
      # should have been aborted and not resubmitted for processing).
1007
      logging.info("%s: opcode %s already processed, skipping",
1008
                   opctx.log_prefix, opctx.summary)
1009

    
1010
  @staticmethod
1011
  def _MarkWaitlock(job, op):
1012
    """Marks an opcode as waiting for locks.
1013

1014
    The job's start timestamp is also set if necessary.
1015

1016
    @type job: L{_QueuedJob}
1017
    @param job: Job object
1018
    @type op: L{_QueuedOpCode}
1019
    @param op: Opcode object
1020

1021
    """
1022
    assert op in job.ops
1023
    assert op.status in (constants.OP_STATUS_QUEUED,
1024
                         constants.OP_STATUS_WAITING)
1025

    
1026
    update = False
1027

    
1028
    op.result = None
1029

    
1030
    if op.status == constants.OP_STATUS_QUEUED:
1031
      op.status = constants.OP_STATUS_WAITING
1032
      update = True
1033

    
1034
    if op.start_timestamp is None:
1035
      op.start_timestamp = TimeStampNow()
1036
      update = True
1037

    
1038
    if job.start_timestamp is None:
1039
      job.start_timestamp = op.start_timestamp
1040
      update = True
1041

    
1042
    assert op.status == constants.OP_STATUS_WAITING
1043

    
1044
    return update
1045

    
1046
  @staticmethod
1047
  def _CheckDependencies(queue, job, opctx):
1048
    """Checks if an opcode has dependencies and if so, processes them.
1049

1050
    @type queue: L{JobQueue}
1051
    @param queue: Queue object
1052
    @type job: L{_QueuedJob}
1053
    @param job: Job object
1054
    @type opctx: L{_OpExecContext}
1055
    @param opctx: Opcode execution context
1056
    @rtype: bool
1057
    @return: Whether opcode will be re-scheduled by dependency tracker
1058

1059
    """
1060
    op = opctx.op
1061

    
1062
    result = False
1063

    
1064
    while opctx.jobdeps:
1065
      (dep_job_id, dep_status) = opctx.jobdeps[0]
1066

    
1067
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1068
                                                          dep_status)
1069
      assert ht.TNonEmptyString(depmsg), "No dependency message"
1070

    
1071
      logging.info("%s: %s", opctx.log_prefix, depmsg)
1072

    
1073
      if depresult == _JobDependencyManager.CONTINUE:
1074
        # Remove dependency and continue
1075
        opctx.jobdeps.pop(0)
1076

    
1077
      elif depresult == _JobDependencyManager.WAIT:
1078
        # Need to wait for notification, dependency tracker will re-add job
1079
        # to workerpool
1080
        result = True
1081
        break
1082

    
1083
      elif depresult == _JobDependencyManager.CANCEL:
1084
        # Job was cancelled, cancel this job as well
1085
        job.Cancel()
1086
        assert op.status == constants.OP_STATUS_CANCELING
1087
        break
1088

    
1089
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1090
                         _JobDependencyManager.ERROR):
1091
        # Job failed or there was an error, this job must fail
1092
        op.status = constants.OP_STATUS_ERROR
1093
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1094
        break
1095

    
1096
      else:
1097
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1098
                                     depresult)
1099

    
1100
    return result
1101

    
1102
  def _ExecOpCodeUnlocked(self, opctx):
1103
    """Processes one opcode and returns the result.
1104

1105
    """
1106
    op = opctx.op
1107

    
1108
    assert op.status == constants.OP_STATUS_WAITING
1109

    
1110
    timeout = opctx.GetNextLockTimeout()
1111

    
1112
    try:
1113
      # Make sure not to hold queue lock while calling ExecOpCode
1114
      result = self.opexec_fn(op.input,
1115
                              _OpExecCallbacks(self.queue, self.job, op),
1116
                              timeout=timeout)
1117
    except mcpu.LockAcquireTimeout:
1118
      assert timeout is not None, "Received timeout for blocking acquire"
1119
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1120

    
1121
      assert op.status in (constants.OP_STATUS_WAITING,
1122
                           constants.OP_STATUS_CANCELING)
1123

    
1124
      # Was job cancelled while we were waiting for the lock?
1125
      if op.status == constants.OP_STATUS_CANCELING:
1126
        return (constants.OP_STATUS_CANCELING, None)
1127

    
1128
      # Queue is shutting down, return to queued
1129
      if not self.queue.AcceptingJobsUnlocked():
1130
        return (constants.OP_STATUS_QUEUED, None)
1131

    
1132
      # Stay in waitlock while trying to re-acquire lock
1133
      return (constants.OP_STATUS_WAITING, None)
1134
    except CancelJob:
1135
      logging.exception("%s: Canceling job", opctx.log_prefix)
1136
      assert op.status == constants.OP_STATUS_CANCELING
1137
      return (constants.OP_STATUS_CANCELING, None)
1138

    
1139
    except QueueShutdown:
1140
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1141

    
1142
      assert op.status == constants.OP_STATUS_WAITING
1143

    
1144
      # Job hadn't been started yet, so it should return to the queue
1145
      return (constants.OP_STATUS_QUEUED, None)
1146

    
1147
    except Exception, err: # pylint: disable=W0703
1148
      logging.exception("%s: Caught exception in %s",
1149
                        opctx.log_prefix, opctx.summary)
1150
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1151
    else:
1152
      logging.debug("%s: %s successful",
1153
                    opctx.log_prefix, opctx.summary)
1154
      return (constants.OP_STATUS_SUCCESS, result)
1155

    
1156
  def __call__(self, _nextop_fn=None):
1157
    """Continues execution of a job.
1158

1159
    @param _nextop_fn: Callback function for tests
1160
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1161
      be deferred and C{WAITDEP} if the dependency manager
1162
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1163

1164
    """
1165
    queue = self.queue
1166
    job = self.job
1167

    
1168
    logging.debug("Processing job %s", job.id)
1169

    
1170
    queue.acquire(shared=1)
1171
    try:
1172
      opcount = len(job.ops)
1173

    
1174
      assert job.writable, "Expected writable job"
1175

    
1176
      # Don't do anything for finalized jobs
1177
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1178
        return self.FINISHED
1179

    
1180
      # Is a previous opcode still pending?
1181
      if job.cur_opctx:
1182
        opctx = job.cur_opctx
1183
        job.cur_opctx = None
1184
      else:
1185
        if __debug__ and _nextop_fn:
1186
          _nextop_fn()
1187
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1188

    
1189
      op = opctx.op
1190

    
1191
      # Consistency check
1192
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1193
                                     constants.OP_STATUS_CANCELING)
1194
                        for i in job.ops[opctx.index + 1:])
1195

    
1196
      assert op.status in (constants.OP_STATUS_QUEUED,
1197
                           constants.OP_STATUS_WAITING,
1198
                           constants.OP_STATUS_CANCELING)
1199

    
1200
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1201
              op.priority >= constants.OP_PRIO_HIGHEST)
1202

    
1203
      waitjob = None
1204

    
1205
      if op.status != constants.OP_STATUS_CANCELING:
1206
        assert op.status in (constants.OP_STATUS_QUEUED,
1207
                             constants.OP_STATUS_WAITING)
1208

    
1209
        # Prepare to start opcode
1210
        if self._MarkWaitlock(job, op):
1211
          # Write to disk
1212
          queue.UpdateJobUnlocked(job)
1213

    
1214
        assert op.status == constants.OP_STATUS_WAITING
1215
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1216
        assert job.start_timestamp and op.start_timestamp
1217
        assert waitjob is None
1218

    
1219
        # Check if waiting for a job is necessary
1220
        waitjob = self._CheckDependencies(queue, job, opctx)
1221

    
1222
        assert op.status in (constants.OP_STATUS_WAITING,
1223
                             constants.OP_STATUS_CANCELING,
1224
                             constants.OP_STATUS_ERROR)
1225

    
1226
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1227
                                         constants.OP_STATUS_ERROR)):
1228
          logging.info("%s: opcode %s waiting for locks",
1229
                       opctx.log_prefix, opctx.summary)
1230

    
1231
          assert not opctx.jobdeps, "Not all dependencies were removed"
1232

    
1233
          queue.release()
1234
          try:
1235
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1236
          finally:
1237
            queue.acquire(shared=1)
1238

    
1239
          op.status = op_status
1240
          op.result = op_result
1241

    
1242
          assert not waitjob
1243

    
1244
        if op.status in (constants.OP_STATUS_WAITING,
1245
                         constants.OP_STATUS_QUEUED):
1246
          # waiting: Couldn't get locks in time
1247
          # queued: Queue is shutting down
1248
          assert not op.end_timestamp
1249
        else:
1250
          # Finalize opcode
1251
          op.end_timestamp = TimeStampNow()
1252

    
1253
          if op.status == constants.OP_STATUS_CANCELING:
1254
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1255
                                  for i in job.ops[opctx.index:])
1256
          else:
1257
            assert op.status in constants.OPS_FINALIZED
1258

    
1259
      if op.status == constants.OP_STATUS_QUEUED:
1260
        # Queue is shutting down
1261
        assert not waitjob
1262

    
1263
        finalize = False
1264

    
1265
        # Reset context
1266
        job.cur_opctx = None
1267

    
1268
        # In no case must the status be finalized here
1269
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1270

    
1271
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1272
        finalize = False
1273

    
1274
        if not waitjob and opctx.CheckPriorityIncrease():
1275
          # Priority was changed, need to update on-disk file
1276
          queue.UpdateJobUnlocked(job)
1277

    
1278
        # Keep around for another round
1279
        job.cur_opctx = opctx
1280

    
1281
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1282
                op.priority >= constants.OP_PRIO_HIGHEST)
1283

    
1284
        # In no case must the status be finalized here
1285
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1286

    
1287
      else:
1288
        # Ensure all opcodes so far have been successful
1289
        assert (opctx.index == 0 or
1290
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1291
                           for i in job.ops[:opctx.index]))
1292

    
1293
        # Reset context
1294
        job.cur_opctx = None
1295

    
1296
        if op.status == constants.OP_STATUS_SUCCESS:
1297
          finalize = False
1298

    
1299
        elif op.status == constants.OP_STATUS_ERROR:
1300
          # Ensure failed opcode has an exception as its result
1301
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1302

    
1303
          to_encode = errors.OpExecError("Preceding opcode failed")
1304
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1305
                                _EncodeOpError(to_encode))
1306
          finalize = True
1307

    
1308
          # Consistency check
1309
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1310
                            errors.GetEncodedError(i.result)
1311
                            for i in job.ops[opctx.index:])
1312

    
1313
        elif op.status == constants.OP_STATUS_CANCELING:
1314
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1315
                                "Job canceled by request")
1316
          finalize = True
1317

    
1318
        else:
1319
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1320

    
1321
        if opctx.index == (opcount - 1):
1322
          # Finalize on last opcode
1323
          finalize = True
1324

    
1325
        if finalize:
1326
          # All opcodes have been run, finalize job
1327
          job.Finalize()
1328

    
1329
        # Write to disk. If the job status is final, this is the final write
1330
        # allowed. Once the file has been written, it can be archived anytime.
1331
        queue.UpdateJobUnlocked(job)
1332

    
1333
        assert not waitjob
1334

    
1335
        if finalize:
1336
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1337
          return self.FINISHED
1338

    
1339
      assert not waitjob or queue.depmgr.JobWaiting(job)
1340

    
1341
      if waitjob:
1342
        return self.WAITDEP
1343
      else:
1344
        return self.DEFER
1345
    finally:
1346
      assert job.writable, "Job became read-only while being processed"
1347
      queue.release()
1348

    
1349

    
1350
def _EvaluateJobProcessorResult(depmgr, job, result):
1351
  """Looks at a result from L{_JobProcessor} for a job.
1352

1353
  To be used in a L{_JobQueueWorker}.
1354

1355
  """
1356
  if result == _JobProcessor.FINISHED:
1357
    # Notify waiting jobs
1358
    depmgr.NotifyWaiters(job.id)
1359

    
1360
  elif result == _JobProcessor.DEFER:
1361
    # Schedule again
1362
    raise workerpool.DeferTask(priority=job.CalcPriority())
1363

    
1364
  elif result == _JobProcessor.WAITDEP:
1365
    # No-op, dependency manager will re-schedule
1366
    pass
1367

    
1368
  else:
1369
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1370
                                 (result, ))
1371

    
1372

    
1373
class _JobQueueWorker(workerpool.BaseWorker):
1374
  """The actual job workers.
1375

1376
  """
1377
  def RunTask(self, job): # pylint: disable=W0221
1378
    """Job executor.
1379

1380
    @type job: L{_QueuedJob}
1381
    @param job: the job to be processed
1382

1383
    """
1384
    assert job.writable, "Expected writable job"
1385

    
1386
    # Ensure only one worker is active on a single job. If a job registers for
1387
    # a dependency job, and the other job notifies before the first worker is
1388
    # done, the job can end up in the tasklist more than once.
1389
    job.processor_lock.acquire()
1390
    try:
1391
      return self._RunTaskInner(job)
1392
    finally:
1393
      job.processor_lock.release()
1394

    
1395
  def _RunTaskInner(self, job):
1396
    """Executes a job.
1397

1398
    Must be called with per-job lock acquired.
1399

1400
    """
1401
    queue = job.queue
1402
    assert queue == self.pool.queue
1403

    
1404
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1405
    setname_fn(None)
1406

    
1407
    proc = mcpu.Processor(queue.context, job.id)
1408

    
1409
    # Create wrapper for setting thread name
1410
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1411
                                    proc.ExecOpCode)
1412

    
1413
    _EvaluateJobProcessorResult(queue.depmgr, job,
1414
                                _JobProcessor(queue, wrap_execop_fn, job)())
1415

    
1416
  @staticmethod
1417
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1418
    """Updates the worker thread name to include a short summary of the opcode.
1419

1420
    @param setname_fn: Callable setting worker thread name
1421
    @param execop_fn: Callable for executing opcode (usually
1422
                      L{mcpu.Processor.ExecOpCode})
1423

1424
    """
1425
    setname_fn(op)
1426
    try:
1427
      return execop_fn(op, *args, **kwargs)
1428
    finally:
1429
      setname_fn(None)
1430

    
1431
  @staticmethod
1432
  def _GetWorkerName(job, op):
1433
    """Sets the worker thread name.
1434

1435
    @type job: L{_QueuedJob}
1436
    @type op: L{opcodes.OpCode}
1437

1438
    """
1439
    parts = ["Job%s" % job.id]
1440

    
1441
    if op:
1442
      parts.append(op.TinySummary())
1443

    
1444
    return "/".join(parts)
1445

    
1446

    
1447
class _JobQueueWorkerPool(workerpool.WorkerPool):
1448
  """Simple class implementing a job-processing workerpool.
1449

1450
  """
1451
  def __init__(self, queue):
1452
    super(_JobQueueWorkerPool, self).__init__("Jq",
1453
                                              JOBQUEUE_THREADS,
1454
                                              _JobQueueWorker)
1455
    self.queue = queue
1456

    
1457

    
1458
class _JobDependencyManager:
1459
  """Keeps track of job dependencies.
1460

1461
  """
1462
  (WAIT,
1463
   ERROR,
1464
   CANCEL,
1465
   CONTINUE,
1466
   WRONGSTATUS) = range(1, 6)
1467

    
1468
  def __init__(self, getstatus_fn, enqueue_fn):
1469
    """Initializes this class.
1470

1471
    """
1472
    self._getstatus_fn = getstatus_fn
1473
    self._enqueue_fn = enqueue_fn
1474

    
1475
    self._waiters = {}
1476
    self._lock = locking.SharedLock("JobDepMgr")
1477

    
1478
  @locking.ssynchronized(_LOCK, shared=1)
1479
  def GetLockInfo(self, requested): # pylint: disable=W0613
1480
    """Retrieves information about waiting jobs.
1481

1482
    @type requested: set
1483
    @param requested: Requested information, see C{query.LQ_*}
1484

1485
    """
1486
    # No need to sort here, that's being done by the lock manager and query
1487
    # library. There are no priorities for notifying jobs, hence all show up as
1488
    # one item under "pending".
1489
    return [("job/%s" % job_id, None, None,
1490
             [("job", [job.id for job in waiters])])
1491
            for job_id, waiters in self._waiters.items()
1492
            if waiters]
1493

    
1494
  @locking.ssynchronized(_LOCK, shared=1)
1495
  def JobWaiting(self, job):
1496
    """Checks if a job is waiting.
1497

1498
    """
1499
    return compat.any(job in jobs
1500
                      for jobs in self._waiters.values())
1501

    
1502
  @locking.ssynchronized(_LOCK)
1503
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1504
    """Checks if a dependency job has the requested status.
1505

1506
    If the other job is not yet in a finalized status, the calling job will be
1507
    notified (re-added to the workerpool) at a later point.
1508

1509
    @type job: L{_QueuedJob}
1510
    @param job: Job object
1511
    @type dep_job_id: int
1512
    @param dep_job_id: ID of dependency job
1513
    @type dep_status: list
1514
    @param dep_status: Required status
1515

1516
    """
1517
    assert ht.TJobId(job.id)
1518
    assert ht.TJobId(dep_job_id)
1519
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1520

    
1521
    if job.id == dep_job_id:
1522
      return (self.ERROR, "Job can't depend on itself")
1523

    
1524
    # Get status of dependency job
1525
    try:
1526
      status = self._getstatus_fn(dep_job_id)
1527
    except errors.JobLost, err:
1528
      return (self.ERROR, "Dependency error: %s" % err)
1529

    
1530
    assert status in constants.JOB_STATUS_ALL
1531

    
1532
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1533

    
1534
    if status not in constants.JOBS_FINALIZED:
1535
      # Register for notification and wait for job to finish
1536
      job_id_waiters.add(job)
1537
      return (self.WAIT,
1538
              "Need to wait for job %s, wanted status '%s'" %
1539
              (dep_job_id, dep_status))
1540

    
1541
    # Remove from waiters list
1542
    if job in job_id_waiters:
1543
      job_id_waiters.remove(job)
1544

    
1545
    if (status == constants.JOB_STATUS_CANCELED and
1546
        constants.JOB_STATUS_CANCELED not in dep_status):
1547
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1548

    
1549
    elif not dep_status or status in dep_status:
1550
      return (self.CONTINUE,
1551
              "Dependency job %s finished with status '%s'" %
1552
              (dep_job_id, status))
1553

    
1554
    else:
1555
      return (self.WRONGSTATUS,
1556
              "Dependency job %s finished with status '%s',"
1557
              " not one of '%s' as required" %
1558
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1559

    
1560
  def _RemoveEmptyWaitersUnlocked(self):
1561
    """Remove all jobs without actual waiters.
1562

1563
    """
1564
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1565
                   if not waiters]:
1566
      del self._waiters[job_id]
1567

    
1568
  def NotifyWaiters(self, job_id):
1569
    """Notifies all jobs waiting for a certain job ID.
1570

1571
    @attention: Do not call until L{CheckAndRegister} returned a status other
1572
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1573
    @type job_id: int
1574
    @param job_id: Job ID
1575

1576
    """
1577
    assert ht.TJobId(job_id)
1578

    
1579
    self._lock.acquire()
1580
    try:
1581
      self._RemoveEmptyWaitersUnlocked()
1582

    
1583
      jobs = self._waiters.pop(job_id, None)
1584
    finally:
1585
      self._lock.release()
1586

    
1587
    if jobs:
1588
      # Re-add jobs to workerpool
1589
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1590
                    len(jobs), job_id)
1591
      self._enqueue_fn(jobs)
1592

    
1593

    
1594
def _RequireOpenQueue(fn):
1595
  """Decorator for "public" functions.
1596

1597
  This function should be used for all 'public' functions. That is,
1598
  functions usually called from other classes. Note that this should
1599
  be applied only to methods (not plain functions), since it expects
1600
  that the decorated function is called with a first argument that has
1601
  a '_queue_filelock' argument.
1602

1603
  @warning: Use this decorator only after locking.ssynchronized
1604

1605
  Example::
1606
    @locking.ssynchronized(_LOCK)
1607
    @_RequireOpenQueue
1608
    def Example(self):
1609
      pass
1610

1611
  """
1612
  def wrapper(self, *args, **kwargs):
1613
    # pylint: disable=W0212
1614
    assert self._queue_filelock is not None, "Queue should be open"
1615
    return fn(self, *args, **kwargs)
1616
  return wrapper
1617

    
1618

    
1619
def _RequireNonDrainedQueue(fn):
1620
  """Decorator checking for a non-drained queue.
1621

1622
  To be used with functions submitting new jobs.
1623

1624
  """
1625
  def wrapper(self, *args, **kwargs):
1626
    """Wrapper function.
1627

1628
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1629

1630
    """
1631
    # Ok when sharing the big job queue lock, as the drain file is created when
1632
    # the lock is exclusive.
1633
    # Needs access to protected member, pylint: disable=W0212
1634
    if self._drained:
1635
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1636

    
1637
    if not self._accepting_jobs:
1638
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1639

    
1640
    return fn(self, *args, **kwargs)
1641
  return wrapper
1642

    
1643

    
1644
class JobQueue(object):
1645
  """Queue used to manage the jobs.
1646

1647
  """
1648
  def __init__(self, context):
1649
    """Constructor for JobQueue.
1650

1651
    The constructor will initialize the job queue object and then
1652
    start loading the current jobs from disk, either for starting them
1653
    (if they were queue) or for aborting them (if they were already
1654
    running).
1655

1656
    @type context: GanetiContext
1657
    @param context: the context object for access to the configuration
1658
        data and other ganeti objects
1659

1660
    """
1661
    self.context = context
1662
    self._memcache = weakref.WeakValueDictionary()
1663
    self._my_hostname = netutils.Hostname.GetSysName()
1664

    
1665
    # The Big JobQueue lock. If a code block or method acquires it in shared
1666
    # mode safe it must guarantee concurrency with all the code acquiring it in
1667
    # shared mode, including itself. In order not to acquire it at all
1668
    # concurrency must be guaranteed with all code acquiring it in shared mode
1669
    # and all code acquiring it exclusively.
1670
    self._lock = locking.SharedLock("JobQueue")
1671

    
1672
    self.acquire = self._lock.acquire
1673
    self.release = self._lock.release
1674

    
1675
    # Accept jobs by default
1676
    self._accepting_jobs = True
1677

    
1678
    # Initialize the queue, and acquire the filelock.
1679
    # This ensures no other process is working on the job queue.
1680
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1681

    
1682
    # Read serial file
1683
    self._last_serial = jstore.ReadSerial()
1684
    assert self._last_serial is not None, ("Serial file was modified between"
1685
                                           " check in jstore and here")
1686

    
1687
    # Get initial list of nodes
1688
    self._nodes = dict((n.name, n.primary_ip)
1689
                       for n in self.context.cfg.GetAllNodesInfo().values()
1690
                       if n.master_candidate)
1691

    
1692
    # Remove master node
1693
    self._nodes.pop(self._my_hostname, None)
1694

    
1695
    # TODO: Check consistency across nodes
1696

    
1697
    self._queue_size = None
1698
    self._UpdateQueueSizeUnlocked()
1699
    assert ht.TInt(self._queue_size)
1700
    self._drained = jstore.CheckDrainFlag()
1701

    
1702
    # Job dependencies
1703
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1704
                                        self._EnqueueJobs)
1705
    self.context.glm.AddToLockMonitor(self.depmgr)
1706

    
1707
    # Setup worker pool
1708
    self._wpool = _JobQueueWorkerPool(self)
1709
    try:
1710
      self._InspectQueue()
1711
    except:
1712
      self._wpool.TerminateWorkers()
1713
      raise
1714

    
1715
  def _PickupJobUnlocked(self, job_id):
1716
    """Load a job from the job queue
1717

1718
    Pick up a job that already is in the job queue and start/resume it.
1719

1720
    """
1721
    job = self._LoadJobUnlocked(job_id)
1722

    
1723
    if job is None:
1724
      logging.warning("Job %s could not be read", job_id)
1725
      return
1726

    
1727
    status = job.CalcStatus()
1728

    
1729
    if status == constants.JOB_STATUS_QUEUED:
1730
      self._EnqueueJobsUnlocked([job])
1731
      logging.info("Restarting job %s", job.id)
1732

    
1733
    elif status in (constants.JOB_STATUS_RUNNING,
1734
                    constants.JOB_STATUS_WAITING,
1735
                    constants.JOB_STATUS_CANCELING):
1736
      logging.warning("Unfinished job %s found: %s", job.id, job)
1737

    
1738
      if status == constants.JOB_STATUS_WAITING:
1739
        job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1740
        self._EnqueueJobsUnlocked([job])
1741
        logging.info("Restarting job %s", job.id)
1742
      else:
1743
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1744
                              "Unclean master daemon shutdown")
1745
        job.Finalize()
1746

    
1747
    self.UpdateJobUnlocked(job)
1748

    
1749
  @locking.ssynchronized(_LOCK)
1750
  def PickupJob(self, job_id):
1751
    self._PickupJobUnlocked(job_id)
1752

    
1753
  @locking.ssynchronized(_LOCK)
1754
  @_RequireOpenQueue
1755
  def _InspectQueue(self):
1756
    """Loads the whole job queue and resumes unfinished jobs.
1757

1758
    This function needs the lock here because WorkerPool.AddTask() may start a
1759
    job while we're still doing our work.
1760

1761
    """
1762
    logging.info("Inspecting job queue")
1763

    
1764
    all_job_ids = self._GetJobIDsUnlocked()
1765
    jobs_count = len(all_job_ids)
1766
    lastinfo = time.time()
1767
    for idx, job_id in enumerate(all_job_ids):
1768
      # Give an update every 1000 jobs or 10 seconds
1769
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1770
          idx == (jobs_count - 1)):
1771
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1772
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1773
        lastinfo = time.time()
1774

    
1775
      self._PickupJobUnlocked(job_id)
1776

    
1777
    logging.info("Job queue inspection finished")
1778

    
1779
  def _GetRpc(self, address_list):
1780
    """Gets RPC runner with context.
1781

1782
    """
1783
    return rpc.JobQueueRunner(self.context, address_list)
1784

    
1785
  @locking.ssynchronized(_LOCK)
1786
  @_RequireOpenQueue
1787
  def AddNode(self, node):
1788
    """Register a new node with the queue.
1789

1790
    @type node: L{objects.Node}
1791
    @param node: the node object to be added
1792

1793
    """
1794
    node_name = node.name
1795
    assert node_name != self._my_hostname
1796

    
1797
    # Clean queue directory on added node
1798
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1799
    msg = result.fail_msg
1800
    if msg:
1801
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1802
                      node_name, msg)
1803

    
1804
    if not node.master_candidate:
1805
      # remove if existing, ignoring errors
1806
      self._nodes.pop(node_name, None)
1807
      # and skip the replication of the job ids
1808
      return
1809

    
1810
    # Upload the whole queue excluding archived jobs
1811
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1812

    
1813
    # Upload current serial file
1814
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1815

    
1816
    # Static address list
1817
    addrs = [node.primary_ip]
1818

    
1819
    for file_name in files:
1820
      # Read file content
1821
      content = utils.ReadFile(file_name)
1822

    
1823
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1824
                             file_name, content)
1825
      msg = result[node_name].fail_msg
1826
      if msg:
1827
        logging.error("Failed to upload file %s to node %s: %s",
1828
                      file_name, node_name, msg)
1829

    
1830
    # Set queue drained flag
1831
    result = \
1832
      self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1833
                                                       self._drained)
1834
    msg = result[node_name].fail_msg
1835
    if msg:
1836
      logging.error("Failed to set queue drained flag on node %s: %s",
1837
                    node_name, msg)
1838

    
1839
    self._nodes[node_name] = node.primary_ip
1840

    
1841
  @locking.ssynchronized(_LOCK)
1842
  @_RequireOpenQueue
1843
  def RemoveNode(self, node_name):
1844
    """Callback called when removing nodes from the cluster.
1845

1846
    @type node_name: str
1847
    @param node_name: the name of the node to remove
1848

1849
    """
1850
    self._nodes.pop(node_name, None)
1851

    
1852
  @staticmethod
1853
  def _CheckRpcResult(result, nodes, failmsg):
1854
    """Verifies the status of an RPC call.
1855

1856
    Since we aim to keep consistency should this node (the current
1857
    master) fail, we will log errors if our rpc fail, and especially
1858
    log the case when more than half of the nodes fails.
1859

1860
    @param result: the data as returned from the rpc call
1861
    @type nodes: list
1862
    @param nodes: the list of nodes we made the call to
1863
    @type failmsg: str
1864
    @param failmsg: the identifier to be used for logging
1865

1866
    """
1867
    failed = []
1868
    success = []
1869

    
1870
    for node in nodes:
1871
      msg = result[node].fail_msg
1872
      if msg:
1873
        failed.append(node)
1874
        logging.error("RPC call %s (%s) failed on node %s: %s",
1875
                      result[node].call, failmsg, node, msg)
1876
      else:
1877
        success.append(node)
1878

    
1879
    # +1 for the master node
1880
    if (len(success) + 1) < len(failed):
1881
      # TODO: Handle failing nodes
1882
      logging.error("More than half of the nodes failed")
1883

    
1884
  def _GetNodeIp(self):
1885
    """Helper for returning the node name/ip list.
1886

1887
    @rtype: (list, list)
1888
    @return: a tuple of two lists, the first one with the node
1889
        names and the second one with the node addresses
1890

1891
    """
1892
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1893
    name_list = self._nodes.keys()
1894
    addr_list = [self._nodes[name] for name in name_list]
1895
    return name_list, addr_list
1896

    
1897
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1898
    """Writes a file locally and then replicates it to all nodes.
1899

1900
    This function will replace the contents of a file on the local
1901
    node and then replicate it to all the other nodes we have.
1902

1903
    @type file_name: str
1904
    @param file_name: the path of the file to be replicated
1905
    @type data: str
1906
    @param data: the new contents of the file
1907
    @type replicate: boolean
1908
    @param replicate: whether to spread the changes to the remote nodes
1909

1910
    """
1911
    getents = runtime.GetEnts()
1912
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1913
                    gid=getents.daemons_gid,
1914
                    mode=constants.JOB_QUEUE_FILES_PERMS)
1915

    
1916
    if replicate:
1917
      names, addrs = self._GetNodeIp()
1918
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1919
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1920

    
1921
  def _RenameFilesUnlocked(self, rename):
1922
    """Renames a file locally and then replicate the change.
1923

1924
    This function will rename a file in the local queue directory
1925
    and then replicate this rename to all the other nodes we have.
1926

1927
    @type rename: list of (old, new)
1928
    @param rename: List containing tuples mapping old to new names
1929

1930
    """
1931
    # Rename them locally
1932
    for old, new in rename:
1933
      utils.RenameFile(old, new, mkdir=True)
1934

    
1935
    # ... and on all nodes
1936
    names, addrs = self._GetNodeIp()
1937
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1938
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1939

    
1940
  def _NewSerialsUnlocked(self, count):
1941
    """Generates a new job identifier.
1942

1943
    Job identifiers are unique during the lifetime of a cluster.
1944

1945
    @type count: integer
1946
    @param count: how many serials to return
1947
    @rtype: list of int
1948
    @return: a list of job identifiers.
1949

1950
    """
1951
    assert ht.TNonNegativeInt(count)
1952

    
1953
    # New number
1954
    serial = self._last_serial + count
1955

    
1956
    # Write to file
1957
    self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1958
                             "%s\n" % serial, True)
1959

    
1960
    result = [jstore.FormatJobID(v)
1961
              for v in range(self._last_serial + 1, serial + 1)]
1962

    
1963
    # Keep it only if we were able to write the file
1964
    self._last_serial = serial
1965

    
1966
    assert len(result) == count
1967

    
1968
    return result
1969

    
1970
  @staticmethod
1971
  def _GetJobPath(job_id):
1972
    """Returns the job file for a given job id.
1973

1974
    @type job_id: str
1975
    @param job_id: the job identifier
1976
    @rtype: str
1977
    @return: the path to the job file
1978

1979
    """
1980
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1981

    
1982
  @staticmethod
1983
  def _GetArchivedJobPath(job_id):
1984
    """Returns the archived job file for a give job id.
1985

1986
    @type job_id: str
1987
    @param job_id: the job identifier
1988
    @rtype: str
1989
    @return: the path to the archived job file
1990

1991
    """
1992
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1993
                          jstore.GetArchiveDirectory(job_id),
1994
                          "job-%s" % job_id)
1995

    
1996
  @staticmethod
1997
  def _DetermineJobDirectories(archived):
1998
    """Build list of directories containing job files.
1999

2000
    @type archived: bool
2001
    @param archived: Whether to include directories for archived jobs
2002
    @rtype: list
2003

2004
    """
2005
    result = [pathutils.QUEUE_DIR]
2006

    
2007
    if archived:
2008
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
2009
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
2010
                        utils.ListVisibleFiles(archive_path)))
2011

    
2012
    return result
2013

    
2014
  @classmethod
2015
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
2016
    """Return all known job IDs.
2017

2018
    The method only looks at disk because it's a requirement that all
2019
    jobs are present on disk (so in the _memcache we don't have any
2020
    extra IDs).
2021

2022
    @type sort: boolean
2023
    @param sort: perform sorting on the returned job ids
2024
    @rtype: list
2025
    @return: the list of job IDs
2026

2027
    """
2028
    jlist = []
2029

    
2030
    for path in cls._DetermineJobDirectories(archived):
2031
      for filename in utils.ListVisibleFiles(path):
2032
        m = constants.JOB_FILE_RE.match(filename)
2033
        if m:
2034
          jlist.append(int(m.group(1)))
2035

    
2036
    if sort:
2037
      jlist.sort()
2038
    return jlist
2039

    
2040
  def _LoadJobUnlocked(self, job_id):
2041
    """Loads a job from the disk or memory.
2042

2043
    Given a job id, this will return the cached job object if
2044
    existing, or try to load the job from the disk. If loading from
2045
    disk, it will also add the job to the cache.
2046

2047
    @type job_id: int
2048
    @param job_id: the job id
2049
    @rtype: L{_QueuedJob} or None
2050
    @return: either None or the job object
2051

2052
    """
2053
    job = self._memcache.get(job_id, None)
2054
    if job:
2055
      logging.debug("Found job %s in memcache", job_id)
2056
      assert job.writable, "Found read-only job in memcache"
2057
      return job
2058

    
2059
    try:
2060
      job = self._LoadJobFromDisk(job_id, False)
2061
      if job is None:
2062
        return job
2063
    except errors.JobFileCorrupted:
2064
      old_path = self._GetJobPath(job_id)
2065
      new_path = self._GetArchivedJobPath(job_id)
2066
      if old_path == new_path:
2067
        # job already archived (future case)
2068
        logging.exception("Can't parse job %s", job_id)
2069
      else:
2070
        # non-archived case
2071
        logging.exception("Can't parse job %s, will archive.", job_id)
2072
        self._RenameFilesUnlocked([(old_path, new_path)])
2073
      return None
2074

    
2075
    assert job.writable, "Job just loaded is not writable"
2076

    
2077
    self._memcache[job_id] = job
2078
    logging.debug("Added job %s to the cache", job_id)
2079
    return job
2080

    
2081
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2082
    """Load the given job file from disk.
2083

2084
    Given a job file, read, load and restore it in a _QueuedJob format.
2085

2086
    @type job_id: int
2087
    @param job_id: job identifier
2088
    @type try_archived: bool
2089
    @param try_archived: Whether to try loading an archived job
2090
    @rtype: L{_QueuedJob} or None
2091
    @return: either None or the job object
2092

2093
    """
2094
    path_functions = [(self._GetJobPath, False)]
2095

    
2096
    if try_archived:
2097
      path_functions.append((self._GetArchivedJobPath, True))
2098

    
2099
    raw_data = None
2100
    archived = None
2101

    
2102
    for (fn, archived) in path_functions:
2103
      filepath = fn(job_id)
2104
      logging.debug("Loading job from %s", filepath)
2105
      try:
2106
        raw_data = utils.ReadFile(filepath)
2107
      except EnvironmentError, err:
2108
        if err.errno != errno.ENOENT:
2109
          raise
2110
      else:
2111
        break
2112

    
2113
    if not raw_data:
2114
      return None
2115

    
2116
    if writable is None:
2117
      writable = not archived
2118

    
2119
    try:
2120
      data = serializer.LoadJson(raw_data)
2121
      job = _QueuedJob.Restore(self, data, writable, archived)
2122
    except Exception, err: # pylint: disable=W0703
2123
      raise errors.JobFileCorrupted(err)
2124

    
2125
    return job
2126

    
2127
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2128
    """Load the given job file from disk.
2129

2130
    Given a job file, read, load and restore it in a _QueuedJob format.
2131
    In case of error reading the job, it gets returned as None, and the
2132
    exception is logged.
2133

2134
    @type job_id: int
2135
    @param job_id: job identifier
2136
    @type try_archived: bool
2137
    @param try_archived: Whether to try loading an archived job
2138
    @rtype: L{_QueuedJob} or None
2139
    @return: either None or the job object
2140

2141
    """
2142
    try:
2143
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2144
    except (errors.JobFileCorrupted, EnvironmentError):
2145
      logging.exception("Can't load/parse job %s", job_id)
2146
      return None
2147

    
2148
  def _UpdateQueueSizeUnlocked(self):
2149
    """Update the queue size.
2150

2151
    """
2152
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2153

    
2154
  @locking.ssynchronized(_LOCK)
2155
  @_RequireOpenQueue
2156
  def SetDrainFlag(self, drain_flag):
2157
    """Sets the drain flag for the queue.
2158

2159
    @type drain_flag: boolean
2160
    @param drain_flag: Whether to set or unset the drain flag
2161

2162
    """
2163
    # Change flag locally
2164
    jstore.SetDrainFlag(drain_flag)
2165

    
2166
    self._drained = drain_flag
2167

    
2168
    # ... and on all nodes
2169
    (names, addrs) = self._GetNodeIp()
2170
    result = \
2171
      self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2172
    self._CheckRpcResult(result, self._nodes,
2173
                         "Setting queue drain flag to %s" % drain_flag)
2174

    
2175
    return True
2176

    
2177
  @_RequireOpenQueue
2178
  def _SubmitJobUnlocked(self, job_id, ops):
2179
    """Create and store a new job.
2180

2181
    This enters the job into our job queue and also puts it on the new
2182
    queue, in order for it to be picked up by the queue processors.
2183

2184
    @type job_id: job ID
2185
    @param job_id: the job ID for the new job
2186
    @type ops: list
2187
    @param ops: The list of OpCodes that will become the new job.
2188
    @rtype: L{_QueuedJob}
2189
    @return: the job object to be queued
2190
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2191
    @raise errors.GenericError: If an opcode is not valid
2192

2193
    """
2194
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2195
      raise errors.JobQueueFull()
2196

    
2197
    job = _QueuedJob(self, job_id, ops, True)
2198

    
2199
    for idx, op in enumerate(job.ops):
2200
      # Check priority
2201
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2202
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2203
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2204
                                  " are %s" % (idx, op.priority, allowed))
2205

    
2206
      # Check job dependencies
2207
      dependencies = getattr(op.input, opcodes_base.DEPEND_ATTR, None)
2208
      if not opcodes_base.TNoRelativeJobDependencies(dependencies):
2209
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2210
                                  " match %s: %s" %
2211
                                  (idx, opcodes_base.TNoRelativeJobDependencies,
2212
                                   dependencies))
2213

    
2214
    # Write to disk
2215
    self.UpdateJobUnlocked(job)
2216

    
2217
    self._queue_size += 1
2218

    
2219
    logging.debug("Adding new job %s to the cache", job_id)
2220
    self._memcache[job_id] = job
2221

    
2222
    return job
2223

    
2224
  @locking.ssynchronized(_LOCK)
2225
  @_RequireOpenQueue
2226
  @_RequireNonDrainedQueue
2227
  def SubmitJob(self, ops):
2228
    """Create and store a new job.
2229

2230
    @see: L{_SubmitJobUnlocked}
2231

2232
    """
2233
    (job_id, ) = self._NewSerialsUnlocked(1)
2234
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2235
    return job_id
2236

    
2237
  @locking.ssynchronized(_LOCK)
2238
  @_RequireOpenQueue
2239
  def SubmitJobToDrainedQueue(self, ops):
2240
    """Forcefully create and store a new job.
2241

2242
    Do so, even if the job queue is drained.
2243
    @see: L{_SubmitJobUnlocked}
2244

2245
    """
2246
    (job_id, ) = self._NewSerialsUnlocked(1)
2247
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2248
    return job_id
2249

    
2250
  @locking.ssynchronized(_LOCK)
2251
  @_RequireOpenQueue
2252
  @_RequireNonDrainedQueue
2253
  def SubmitManyJobs(self, jobs):
2254
    """Create and store multiple jobs.
2255

2256
    @see: L{_SubmitJobUnlocked}
2257

2258
    """
2259
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2260

    
2261
    (results, added_jobs) = \
2262
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2263

    
2264
    self._EnqueueJobsUnlocked(added_jobs)
2265

    
2266
    return results
2267

    
2268
  @staticmethod
2269
  def _FormatSubmitError(msg, ops):
2270
    """Formats errors which occurred while submitting a job.
2271

2272
    """
2273
    return ("%s; opcodes %s" %
2274
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2275

    
2276
  @staticmethod
2277
  def _ResolveJobDependencies(resolve_fn, deps):
2278
    """Resolves relative job IDs in dependencies.
2279

2280
    @type resolve_fn: callable
2281
    @param resolve_fn: Function to resolve a relative job ID
2282
    @type deps: list
2283
    @param deps: Dependencies
2284
    @rtype: tuple; (boolean, string or list)
2285
    @return: If successful (first tuple item), the returned list contains
2286
      resolved job IDs along with the requested status; if not successful,
2287
      the second element is an error message
2288

2289
    """
2290
    result = []
2291

    
2292
    for (dep_job_id, dep_status) in deps:
2293
      if ht.TRelativeJobId(dep_job_id):
2294
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2295
        try:
2296
          job_id = resolve_fn(dep_job_id)
2297
        except IndexError:
2298
          # Abort
2299
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2300
      else:
2301
        job_id = dep_job_id
2302

    
2303
      result.append((job_id, dep_status))
2304

    
2305
    return (True, result)
2306

    
2307
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2308
    """Create and store multiple jobs.
2309

2310
    @see: L{_SubmitJobUnlocked}
2311

2312
    """
2313
    results = []
2314
    added_jobs = []
2315

    
2316
    def resolve_fn(job_idx, reljobid):
2317
      assert reljobid < 0
2318
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2319

    
2320
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2321
      for op in ops:
2322
        if getattr(op, opcodes_base.DEPEND_ATTR, None):
2323
          (status, data) = \
2324
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2325
                                         op.depends)
2326
          if not status:
2327
            # Abort resolving dependencies
2328
            assert ht.TNonEmptyString(data), "No error message"
2329
            break
2330
          # Use resolved dependencies
2331
          op.depends = data
2332
      else:
2333
        try:
2334
          job = self._SubmitJobUnlocked(job_id, ops)
2335
        except errors.GenericError, err:
2336
          status = False
2337
          data = self._FormatSubmitError(str(err), ops)
2338
        else:
2339
          status = True
2340
          data = job_id
2341
          added_jobs.append(job)
2342

    
2343
      results.append((status, data))
2344

    
2345
    return (results, added_jobs)
2346

    
2347
  @locking.ssynchronized(_LOCK)
2348
  def _EnqueueJobs(self, jobs):
2349
    """Helper function to add jobs to worker pool's queue.
2350

2351
    @type jobs: list
2352
    @param jobs: List of all jobs
2353

2354
    """
2355
    return self._EnqueueJobsUnlocked(jobs)
2356

    
2357
  def _EnqueueJobsUnlocked(self, jobs):
2358
    """Helper function to add jobs to worker pool's queue.
2359

2360
    @type jobs: list
2361
    @param jobs: List of all jobs
2362

2363
    """
2364
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2365
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2366
                             priority=[job.CalcPriority() for job in jobs],
2367
                             task_id=map(_GetIdAttr, jobs))
2368

    
2369
  def _GetJobStatusForDependencies(self, job_id):
2370
    """Gets the status of a job for dependencies.
2371

2372
    @type job_id: int
2373
    @param job_id: Job ID
2374
    @raise errors.JobLost: If job can't be found
2375

2376
    """
2377
    # Not using in-memory cache as doing so would require an exclusive lock
2378

    
2379
    # Try to load from disk
2380
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2381

    
2382
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2383

    
2384
    if job:
2385
      return job.CalcStatus()
2386

    
2387
    raise errors.JobLost("Job %s not found" % job_id)
2388

    
2389
  @_RequireOpenQueue
2390
  def UpdateJobUnlocked(self, job, replicate=True):
2391
    """Update a job's on disk storage.
2392

2393
    After a job has been modified, this function needs to be called in
2394
    order to write the changes to disk and replicate them to the other
2395
    nodes.
2396

2397
    @type job: L{_QueuedJob}
2398
    @param job: the changed job
2399
    @type replicate: boolean
2400
    @param replicate: whether to replicate the change to remote nodes
2401

2402
    """
2403
    if __debug__:
2404
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2405
      assert (finalized ^ (job.end_timestamp is None))
2406
      assert job.writable, "Can't update read-only job"
2407
      assert not job.archived, "Can't update archived job"
2408

    
2409
    filename = self._GetJobPath(job.id)
2410
    data = serializer.DumpJson(job.Serialize())
2411
    logging.debug("Writing job %s to %s", job.id, filename)
2412
    self._UpdateJobQueueFile(filename, data, replicate)
2413

    
2414
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2415
                        timeout):
2416
    """Waits for changes in a job.
2417

2418
    @type job_id: int
2419
    @param job_id: Job identifier
2420
    @type fields: list of strings
2421
    @param fields: Which fields to check for changes
2422
    @type prev_job_info: list or None
2423
    @param prev_job_info: Last job information returned
2424
    @type prev_log_serial: int
2425
    @param prev_log_serial: Last job message serial number
2426
    @type timeout: float
2427
    @param timeout: maximum time to wait in seconds
2428
    @rtype: tuple (job info, log entries)
2429
    @return: a tuple of the job information as required via
2430
        the fields parameter, and the log entries as a list
2431

2432
        if the job has not changed and the timeout has expired,
2433
        we instead return a special value,
2434
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2435
        as such by the clients
2436

2437
    """
2438
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2439
                             writable=False)
2440

    
2441
    helper = _WaitForJobChangesHelper()
2442

    
2443
    return helper(self._GetJobPath(job_id), load_fn,
2444
                  fields, prev_job_info, prev_log_serial, timeout)
2445

    
2446
  @locking.ssynchronized(_LOCK)
2447
  @_RequireOpenQueue
2448
  def CancelJob(self, job_id):
2449
    """Cancels a job.
2450

2451
    This will only succeed if the job has not started yet.
2452

2453
    @type job_id: int
2454
    @param job_id: job ID of job to be cancelled.
2455

2456
    """
2457
    logging.info("Cancelling job %s", job_id)
2458

    
2459
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2460

    
2461
  @locking.ssynchronized(_LOCK)
2462
  @_RequireOpenQueue
2463
  def ChangeJobPriority(self, job_id, priority):
2464
    """Changes a job's priority.
2465

2466
    @type job_id: int
2467
    @param job_id: ID of the job whose priority should be changed
2468
    @type priority: int
2469
    @param priority: New priority
2470

2471
    """
2472
    logging.info("Changing priority of job %s to %s", job_id, priority)
2473

    
2474
    if priority not in constants.OP_PRIO_SUBMIT_VALID:
2475
      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2476
      raise errors.GenericError("Invalid priority %s, allowed are %s" %
2477
                                (priority, allowed))
2478

    
2479
    def fn(job):
2480
      (success, msg) = job.ChangePriority(priority)
2481

    
2482
      if success:
2483
        try:
2484
          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2485
        except workerpool.NoSuchTask:
2486
          logging.debug("Job %s is not in workerpool at this time", job.id)
2487

    
2488
      return (success, msg)
2489

    
2490
    return self._ModifyJobUnlocked(job_id, fn)
2491

    
2492
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2493
    """Modifies a job.
2494

2495
    @type job_id: int
2496
    @param job_id: Job ID
2497
    @type mod_fn: callable
2498
    @param mod_fn: Modifying function, receiving job object as parameter,
2499
      returning tuple of (status boolean, message string)
2500

2501
    """
2502
    job = self._LoadJobUnlocked(job_id)
2503
    if not job:
2504
      logging.debug("Job %s not found", job_id)
2505
      return (False, "Job %s not found" % job_id)
2506

    
2507
    assert job.writable, "Can't modify read-only job"
2508
    assert not job.archived, "Can't modify archived job"
2509

    
2510
    (success, msg) = mod_fn(job)
2511

    
2512
    if success:
2513
      # If the job was finalized (e.g. cancelled), this is the final write
2514
      # allowed. The job can be archived anytime.
2515
      self.UpdateJobUnlocked(job)
2516

    
2517
    return (success, msg)
2518

    
2519
  @_RequireOpenQueue
2520
  def _ArchiveJobsUnlocked(self, jobs):
2521
    """Archives jobs.
2522

2523
    @type jobs: list of L{_QueuedJob}
2524
    @param jobs: Job objects
2525
    @rtype: int
2526
    @return: Number of archived jobs
2527

2528
    """
2529
    archive_jobs = []
2530
    rename_files = []
2531
    for job in jobs:
2532
      assert job.writable, "Can't archive read-only job"
2533
      assert not job.archived, "Can't cancel archived job"
2534

    
2535
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2536
        logging.debug("Job %s is not yet done", job.id)
2537
        continue
2538

    
2539
      archive_jobs.append(job)
2540

    
2541
      old = self._GetJobPath(job.id)
2542
      new = self._GetArchivedJobPath(job.id)
2543
      rename_files.append((old, new))
2544

    
2545
    # TODO: What if 1..n files fail to rename?
2546
    self._RenameFilesUnlocked(rename_files)
2547

    
2548
    logging.debug("Successfully archived job(s) %s",
2549
                  utils.CommaJoin(job.id for job in archive_jobs))
2550

    
2551
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2552
    # the files, we update the cached queue size from the filesystem. When we
2553
    # get around to fix the TODO: above, we can use the number of actually
2554
    # archived jobs to fix this.
2555
    self._UpdateQueueSizeUnlocked()
2556
    return len(archive_jobs)
2557

    
2558
  @locking.ssynchronized(_LOCK)
2559
  @_RequireOpenQueue
2560
  def ArchiveJob(self, job_id):
2561
    """Archives a job.
2562

2563
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2564

2565
    @type job_id: int
2566
    @param job_id: Job ID of job to be archived.
2567
    @rtype: bool
2568
    @return: Whether job was archived
2569

2570
    """
2571
    logging.info("Archiving job %s", job_id)
2572

    
2573
    job = self._LoadJobUnlocked(job_id)
2574
    if not job:
2575
      logging.debug("Job %s not found", job_id)
2576
      return False
2577

    
2578
    return self._ArchiveJobsUnlocked([job]) == 1
2579

    
2580
  @locking.ssynchronized(_LOCK)
2581
  @_RequireOpenQueue
2582
  def AutoArchiveJobs(self, age, timeout):
2583
    """Archives all jobs based on age.
2584

2585
    The method will archive all jobs which are older than the age
2586
    parameter. For jobs that don't have an end timestamp, the start
2587
    timestamp will be considered. The special '-1' age will cause
2588
    archival of all jobs (that are not running or queued).
2589

2590
    @type age: int
2591
    @param age: the minimum age in seconds
2592

2593
    """
2594
    logging.info("Archiving jobs with age more than %s seconds", age)
2595

    
2596
    now = time.time()
2597
    end_time = now + timeout
2598
    archived_count = 0
2599
    last_touched = 0
2600

    
2601
    all_job_ids = self._GetJobIDsUnlocked()
2602
    pending = []
2603
    for idx, job_id in enumerate(all_job_ids):
2604
      last_touched = idx + 1
2605

    
2606
      # Not optimal because jobs could be pending
2607
      # TODO: Measure average duration for job archival and take number of
2608
      # pending jobs into account.
2609
      if time.time() > end_time:
2610
        break
2611

    
2612
      # Returns None if the job failed to load
2613
      job = self._LoadJobUnlocked(job_id)
2614
      if job:
2615
        if job.end_timestamp is None:
2616
          if job.start_timestamp is None:
2617
            job_age = job.received_timestamp
2618
          else:
2619
            job_age = job.start_timestamp
2620
        else:
2621
          job_age = job.end_timestamp
2622

    
2623
        if age == -1 or now - job_age[0] > age:
2624
          pending.append(job)
2625

    
2626
          # Archive 10 jobs at a time
2627
          if len(pending) >= 10:
2628
            archived_count += self._ArchiveJobsUnlocked(pending)
2629
            pending = []
2630

    
2631
    if pending:
2632
      archived_count += self._ArchiveJobsUnlocked(pending)
2633

    
2634
    return (archived_count, len(all_job_ids) - last_touched)
2635

    
2636
  def _Query(self, fields, qfilter):
2637
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2638
                       namefield="id")
2639

    
2640
    # Archived jobs are only looked at if the "archived" field is referenced
2641
    # either as a requested field or in the filter. By default archived jobs
2642
    # are ignored.
2643
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2644

    
2645
    job_ids = qobj.RequestedNames()
2646

    
2647
    list_all = (job_ids is None)
2648

    
2649
    if list_all:
2650
      # Since files are added to/removed from the queue atomically, there's no
2651
      # risk of getting the job ids in an inconsistent state.
2652
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2653

    
2654
    jobs = []
2655

    
2656
    for job_id in job_ids:
2657
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2658
      if job is not None or not list_all:
2659
        jobs.append((job_id, job))
2660

    
2661
    return (qobj, jobs, list_all)
2662

    
2663
  def QueryJobs(self, fields, qfilter):
2664
    """Returns a list of jobs in queue.
2665

2666
    @type fields: sequence
2667
    @param fields: List of wanted fields
2668
    @type qfilter: None or query2 filter (list)
2669
    @param qfilter: Query filter
2670

2671
    """
2672
    (qobj, ctx, _) = self._Query(fields, qfilter)
2673

    
2674
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2675

    
2676
  def OldStyleQueryJobs(self, job_ids, fields):
2677
    """Returns a list of jobs in queue.
2678

2679
    @type job_ids: list
2680
    @param job_ids: sequence of job identifiers or None for all
2681
    @type fields: list
2682
    @param fields: names of fields to return
2683
    @rtype: list
2684
    @return: list one element per job, each element being list with
2685
        the requested fields
2686

2687
    """
2688
    # backwards compat:
2689
    job_ids = [int(jid) for jid in job_ids]
2690
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2691

    
2692
    (qobj, ctx, _) = self._Query(fields, qfilter)
2693

    
2694
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2695

    
2696
  @locking.ssynchronized(_LOCK)
2697
  def PrepareShutdown(self):
2698
    """Prepare to stop the job queue.
2699

2700
    Disables execution of jobs in the workerpool and returns whether there are
2701
    any jobs currently running. If the latter is the case, the job queue is not
2702
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2703
    be called without interfering with any job. Queued and unfinished jobs will
2704
    be resumed next time.
2705

2706
    Once this function has been called no new job submissions will be accepted
2707
    (see L{_RequireNonDrainedQueue}).
2708

2709
    @rtype: bool
2710
    @return: Whether there are any running jobs
2711

2712
    """
2713
    if self._accepting_jobs:
2714
      self._accepting_jobs = False
2715

    
2716
      # Tell worker pool to stop processing pending tasks
2717
      self._wpool.SetActive(False)
2718

    
2719
    return self._wpool.HasRunningTasks()
2720

    
2721
  def AcceptingJobsUnlocked(self):
2722
    """Returns whether jobs are accepted.
2723

2724
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2725
    queue is shutting down.
2726

2727
    @rtype: bool
2728

2729
    """
2730
    return self._accepting_jobs
2731

    
2732
  @locking.ssynchronized(_LOCK)
2733
  @_RequireOpenQueue
2734
  def Shutdown(self):
2735
    """Stops the job queue.
2736

2737
    This shutdowns all the worker threads an closes the queue.
2738

2739
    """
2740
    self._wpool.TerminateWorkers()
2741

    
2742
    self._queue_filelock.Close()
2743
    self._queue_filelock = None