Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 876fb142

History | View | Annotate | Download (80.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import time
35
import weakref
36
import threading
37
import itertools
38
import operator
39

    
40
try:
41
  # pylint: disable=E0611
42
  from pyinotify import pyinotify
43
except ImportError:
44
  import pyinotify
45

    
46
from ganeti import asyncnotifier
47
from ganeti import constants
48
from ganeti import serializer
49
from ganeti import workerpool
50
from ganeti import locking
51
from ganeti import opcodes
52
from ganeti import opcodes_base
53
from ganeti import errors
54
from ganeti import mcpu
55
from ganeti import utils
56
from ganeti import jstore
57
from ganeti import rpc
58
from ganeti import runtime
59
from ganeti import netutils
60
from ganeti import compat
61
from ganeti import ht
62
from ganeti import query
63
from ganeti import qlang
64
from ganeti import pathutils
65
from ganeti import vcluster
66

    
67

    
68
JOBQUEUE_THREADS = 25
69

    
70
# member lock names to be passed to @ssynchronized decorator
71
_LOCK = "_lock"
72
_QUEUE = "_queue"
73

    
74
#: Retrieves "id" attribute
75
_GetIdAttr = operator.attrgetter("id")
76

    
77

    
78
class CancelJob(Exception):
79
  """Special exception to cancel a job.
80

81
  """
82

    
83

    
84
class QueueShutdown(Exception):
85
  """Special exception to abort a job when the job queue is shutting down.
86

87
  """
88

    
89

    
90
def TimeStampNow():
91
  """Returns the current timestamp.
92

93
  @rtype: tuple
94
  @return: the current time in the (seconds, microseconds) format
95

96
  """
97
  return utils.SplitTime(time.time())
98

    
99

    
100
def _CallJqUpdate(runner, names, file_name, content):
101
  """Updates job queue file after virtualizing filename.
102

103
  """
104
  virt_file_name = vcluster.MakeVirtualPath(file_name)
105
  return runner.call_jobqueue_update(names, virt_file_name, content)
106

    
107

    
108
class _SimpleJobQuery:
109
  """Wrapper for job queries.
110

111
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
112

113
  """
114
  def __init__(self, fields):
115
    """Initializes this class.
116

117
    """
118
    self._query = query.Query(query.JOB_FIELDS, fields)
119

    
120
  def __call__(self, job):
121
    """Executes a job query using cached field list.
122

123
    """
124
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
125

    
126

    
127
class _QueuedOpCode(object):
128
  """Encapsulates an opcode object.
129

130
  @ivar log: holds the execution log and consists of tuples
131
  of the form C{(log_serial, timestamp, level, message)}
132
  @ivar input: the OpCode we encapsulate
133
  @ivar status: the current status
134
  @ivar result: the result of the LU execution
135
  @ivar start_timestamp: timestamp for the start of the execution
136
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
137
  @ivar stop_timestamp: timestamp for the end of the execution
138

139
  """
140
  __slots__ = ["input", "status", "result", "log", "priority",
141
               "start_timestamp", "exec_timestamp", "end_timestamp",
142
               "__weakref__"]
143

    
144
  def __init__(self, op):
145
    """Initializes instances of this class.
146

147
    @type op: L{opcodes.OpCode}
148
    @param op: the opcode we encapsulate
149

150
    """
151
    self.input = op
152
    self.status = constants.OP_STATUS_QUEUED
153
    self.result = None
154
    self.log = []
155
    self.start_timestamp = None
156
    self.exec_timestamp = None
157
    self.end_timestamp = None
158

    
159
    # Get initial priority (it might change during the lifetime of this opcode)
160
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
161

    
162
  @classmethod
163
  def Restore(cls, state):
164
    """Restore the _QueuedOpCode from the serialized form.
165

166
    @type state: dict
167
    @param state: the serialized state
168
    @rtype: _QueuedOpCode
169
    @return: a new _QueuedOpCode instance
170

171
    """
172
    obj = _QueuedOpCode.__new__(cls)
173
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
174
    obj.status = state["status"]
175
    obj.result = state["result"]
176
    obj.log = state["log"]
177
    obj.start_timestamp = state.get("start_timestamp", None)
178
    obj.exec_timestamp = state.get("exec_timestamp", None)
179
    obj.end_timestamp = state.get("end_timestamp", None)
180
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
181
    return obj
182

    
183
  def Serialize(self):
184
    """Serializes this _QueuedOpCode.
185

186
    @rtype: dict
187
    @return: the dictionary holding the serialized state
188

189
    """
190
    return {
191
      "input": self.input.__getstate__(),
192
      "status": self.status,
193
      "result": self.result,
194
      "log": self.log,
195
      "start_timestamp": self.start_timestamp,
196
      "exec_timestamp": self.exec_timestamp,
197
      "end_timestamp": self.end_timestamp,
198
      "priority": self.priority,
199
      }
200

    
201

    
202
class _QueuedJob(object):
203
  """In-memory job representation.
204

205
  This is what we use to track the user-submitted jobs. Locking must
206
  be taken care of by users of this class.
207

208
  @type queue: L{JobQueue}
209
  @ivar queue: the parent queue
210
  @ivar id: the job ID
211
  @type ops: list
212
  @ivar ops: the list of _QueuedOpCode that constitute the job
213
  @type log_serial: int
214
  @ivar log_serial: holds the index for the next log entry
215
  @ivar received_timestamp: the timestamp for when the job was received
216
  @ivar start_timestmap: the timestamp for start of execution
217
  @ivar end_timestamp: the timestamp for end of execution
218
  @ivar writable: Whether the job is allowed to be modified
219

220
  """
221
  # pylint: disable=W0212
222
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
223
               "received_timestamp", "start_timestamp", "end_timestamp",
224
               "__weakref__", "processor_lock", "writable", "archived"]
225

    
226
  def _AddReasons(self):
227
    """Extend the reason trail
228

229
    Add the reason for all the opcodes of this job to be executed.
230

231
    """
232
    count = 0
233
    for queued_op in self.ops:
234
      op = queued_op.input
235
      reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__)
236
      reason_text = "job=%d;index=%d" % (self.id, count)
237
      reason = getattr(op, "reason", [])
238
      reason.append((reason_src, reason_text, utils.EpochNano()))
239
      op.reason = reason
240
      count = count + 1
241

    
242
  def __init__(self, queue, job_id, ops, writable):
243
    """Constructor for the _QueuedJob.
244

245
    @type queue: L{JobQueue}
246
    @param queue: our parent queue
247
    @type job_id: job_id
248
    @param job_id: our job id
249
    @type ops: list
250
    @param ops: the list of opcodes we hold, which will be encapsulated
251
        in _QueuedOpCodes
252
    @type writable: bool
253
    @param writable: Whether job can be modified
254

255
    """
256
    if not ops:
257
      raise errors.GenericError("A job needs at least one opcode")
258

    
259
    self.queue = queue
260
    self.id = int(job_id)
261
    self.ops = [_QueuedOpCode(op) for op in ops]
262
    self._AddReasons()
263
    self.log_serial = 0
264
    self.received_timestamp = TimeStampNow()
265
    self.start_timestamp = None
266
    self.end_timestamp = None
267
    self.archived = False
268

    
269
    self._InitInMemory(self, writable)
270

    
271
    assert not self.archived, "New jobs can not be marked as archived"
272

    
273
  @staticmethod
274
  def _InitInMemory(obj, writable):
275
    """Initializes in-memory variables.
276

277
    """
278
    obj.writable = writable
279
    obj.ops_iter = None
280
    obj.cur_opctx = None
281

    
282
    # Read-only jobs are not processed and therefore don't need a lock
283
    if writable:
284
      obj.processor_lock = threading.Lock()
285
    else:
286
      obj.processor_lock = None
287

    
288
  def __repr__(self):
289
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
290
              "id=%s" % self.id,
291
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
292

    
293
    return "<%s at %#x>" % (" ".join(status), id(self))
294

    
295
  @classmethod
296
  def Restore(cls, queue, state, writable, archived):
297
    """Restore a _QueuedJob from serialized state:
298

299
    @type queue: L{JobQueue}
300
    @param queue: to which queue the restored job belongs
301
    @type state: dict
302
    @param state: the serialized state
303
    @type writable: bool
304
    @param writable: Whether job can be modified
305
    @type archived: bool
306
    @param archived: Whether job was already archived
307
    @rtype: _JobQueue
308
    @return: the restored _JobQueue instance
309

310
    """
311
    obj = _QueuedJob.__new__(cls)
312
    obj.queue = queue
313
    obj.id = int(state["id"])
314
    obj.received_timestamp = state.get("received_timestamp", None)
315
    obj.start_timestamp = state.get("start_timestamp", None)
316
    obj.end_timestamp = state.get("end_timestamp", None)
317
    obj.archived = archived
318

    
319
    obj.ops = []
320
    obj.log_serial = 0
321
    for op_state in state["ops"]:
322
      op = _QueuedOpCode.Restore(op_state)
323
      for log_entry in op.log:
324
        obj.log_serial = max(obj.log_serial, log_entry[0])
325
      obj.ops.append(op)
326

    
327
    cls._InitInMemory(obj, writable)
328

    
329
    return obj
330

    
331
  def Serialize(self):
332
    """Serialize the _JobQueue instance.
333

334
    @rtype: dict
335
    @return: the serialized state
336

337
    """
338
    return {
339
      "id": self.id,
340
      "ops": [op.Serialize() for op in self.ops],
341
      "start_timestamp": self.start_timestamp,
342
      "end_timestamp": self.end_timestamp,
343
      "received_timestamp": self.received_timestamp,
344
      }
345

    
346
  def CalcStatus(self):
347
    """Compute the status of this job.
348

349
    This function iterates over all the _QueuedOpCodes in the job and
350
    based on their status, computes the job status.
351

352
    The algorithm is:
353
      - if we find a cancelled, or finished with error, the job
354
        status will be the same
355
      - otherwise, the last opcode with the status one of:
356
          - waitlock
357
          - canceling
358
          - running
359

360
        will determine the job status
361

362
      - otherwise, it means either all opcodes are queued, or success,
363
        and the job status will be the same
364

365
    @return: the job status
366

367
    """
368
    status = constants.JOB_STATUS_QUEUED
369

    
370
    all_success = True
371
    for op in self.ops:
372
      if op.status == constants.OP_STATUS_SUCCESS:
373
        continue
374

    
375
      all_success = False
376

    
377
      if op.status == constants.OP_STATUS_QUEUED:
378
        pass
379
      elif op.status == constants.OP_STATUS_WAITING:
380
        status = constants.JOB_STATUS_WAITING
381
      elif op.status == constants.OP_STATUS_RUNNING:
382
        status = constants.JOB_STATUS_RUNNING
383
      elif op.status == constants.OP_STATUS_CANCELING:
384
        status = constants.JOB_STATUS_CANCELING
385
        break
386
      elif op.status == constants.OP_STATUS_ERROR:
387
        status = constants.JOB_STATUS_ERROR
388
        # The whole job fails if one opcode failed
389
        break
390
      elif op.status == constants.OP_STATUS_CANCELED:
391
        status = constants.OP_STATUS_CANCELED
392
        break
393

    
394
    if all_success:
395
      status = constants.JOB_STATUS_SUCCESS
396

    
397
    return status
398

    
399
  def CalcPriority(self):
400
    """Gets the current priority for this job.
401

402
    Only unfinished opcodes are considered. When all are done, the default
403
    priority is used.
404

405
    @rtype: int
406

407
    """
408
    priorities = [op.priority for op in self.ops
409
                  if op.status not in constants.OPS_FINALIZED]
410

    
411
    if not priorities:
412
      # All opcodes are done, assume default priority
413
      return constants.OP_PRIO_DEFAULT
414

    
415
    return min(priorities)
416

    
417
  def GetLogEntries(self, newer_than):
418
    """Selectively returns the log entries.
419

420
    @type newer_than: None or int
421
    @param newer_than: if this is None, return all log entries,
422
        otherwise return only the log entries with serial higher
423
        than this value
424
    @rtype: list
425
    @return: the list of the log entries selected
426

427
    """
428
    if newer_than is None:
429
      serial = -1
430
    else:
431
      serial = newer_than
432

    
433
    entries = []
434
    for op in self.ops:
435
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
436

    
437
    return entries
438

    
439
  def GetInfo(self, fields):
440
    """Returns information about a job.
441

442
    @type fields: list
443
    @param fields: names of fields to return
444
    @rtype: list
445
    @return: list with one element for each field
446
    @raise errors.OpExecError: when an invalid field
447
        has been passed
448

449
    """
450
    return _SimpleJobQuery(fields)(self)
451

    
452
  def MarkUnfinishedOps(self, status, result):
453
    """Mark unfinished opcodes with a given status and result.
454

455
    This is an utility function for marking all running or waiting to
456
    be run opcodes with a given status. Opcodes which are already
457
    finalised are not changed.
458

459
    @param status: a given opcode status
460
    @param result: the opcode result
461

462
    """
463
    not_marked = True
464
    for op in self.ops:
465
      if op.status in constants.OPS_FINALIZED:
466
        assert not_marked, "Finalized opcodes found after non-finalized ones"
467
        continue
468
      op.status = status
469
      op.result = result
470
      not_marked = False
471

    
472
  def Finalize(self):
473
    """Marks the job as finalized.
474

475
    """
476
    self.end_timestamp = TimeStampNow()
477

    
478
  def Cancel(self):
479
    """Marks job as canceled/-ing if possible.
480

481
    @rtype: tuple; (bool, string)
482
    @return: Boolean describing whether job was successfully canceled or marked
483
      as canceling and a text message
484

485
    """
486
    status = self.CalcStatus()
487

    
488
    if status == constants.JOB_STATUS_QUEUED:
489
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
490
                             "Job canceled by request")
491
      self.Finalize()
492
      return (True, "Job %s canceled" % self.id)
493

    
494
    elif status == constants.JOB_STATUS_WAITING:
495
      # The worker will notice the new status and cancel the job
496
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
497
      return (True, "Job %s will be canceled" % self.id)
498

    
499
    else:
500
      logging.debug("Job %s is no longer waiting in the queue", self.id)
501
      return (False, "Job %s is no longer waiting in the queue" % self.id)
502

    
503
  def ChangePriority(self, priority):
504
    """Changes the job priority.
505

506
    @type priority: int
507
    @param priority: New priority
508
    @rtype: tuple; (bool, string)
509
    @return: Boolean describing whether job's priority was successfully changed
510
      and a text message
511

512
    """
513
    status = self.CalcStatus()
514

    
515
    if status in constants.JOBS_FINALIZED:
516
      return (False, "Job %s is finished" % self.id)
517
    elif status == constants.JOB_STATUS_CANCELING:
518
      return (False, "Job %s is cancelling" % self.id)
519
    else:
520
      assert status in (constants.JOB_STATUS_QUEUED,
521
                        constants.JOB_STATUS_WAITING,
522
                        constants.JOB_STATUS_RUNNING)
523

    
524
      changed = False
525
      for op in self.ops:
526
        if (op.status == constants.OP_STATUS_RUNNING or
527
            op.status in constants.OPS_FINALIZED):
528
          assert not changed, \
529
            ("Found opcode for which priority should not be changed after"
530
             " priority has been changed for previous opcodes")
531
          continue
532

    
533
        assert op.status in (constants.OP_STATUS_QUEUED,
534
                             constants.OP_STATUS_WAITING)
535

    
536
        changed = True
537

    
538
        # Set new priority (doesn't modify opcode input)
539
        op.priority = priority
540

    
541
      if changed:
542
        return (True, ("Priorities of pending opcodes for job %s have been"
543
                       " changed to %s" % (self.id, priority)))
544
      else:
545
        return (False, "Job %s had no pending opcodes" % self.id)
546

    
547

    
548
class _OpExecCallbacks(mcpu.OpExecCbBase):
549

    
550
  def __init__(self, queue, job, op):
551
    """Initializes this class.
552

553
    @type queue: L{JobQueue}
554
    @param queue: Job queue
555
    @type job: L{_QueuedJob}
556
    @param job: Job object
557
    @type op: L{_QueuedOpCode}
558
    @param op: OpCode
559

560
    """
561
    super(_OpExecCallbacks, self).__init__()
562

    
563
    assert queue, "Queue is missing"
564
    assert job, "Job is missing"
565
    assert op, "Opcode is missing"
566

    
567
    self._queue = queue
568
    self._job = job
569
    self._op = op
570

    
571
  def _CheckCancel(self):
572
    """Raises an exception to cancel the job if asked to.
573

574
    """
575
    # Cancel here if we were asked to
576
    if self._op.status == constants.OP_STATUS_CANCELING:
577
      logging.debug("Canceling opcode")
578
      raise CancelJob()
579

    
580
    # See if queue is shutting down
581
    if not self._queue.AcceptingJobsUnlocked():
582
      logging.debug("Queue is shutting down")
583
      raise QueueShutdown()
584

    
585
  @locking.ssynchronized(_QUEUE, shared=1)
586
  def NotifyStart(self):
587
    """Mark the opcode as running, not lock-waiting.
588

589
    This is called from the mcpu code as a notifier function, when the LU is
590
    finally about to start the Exec() method. Of course, to have end-user
591
    visible results, the opcode must be initially (before calling into
592
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
593

594
    """
595
    assert self._op in self._job.ops
596
    assert self._op.status in (constants.OP_STATUS_WAITING,
597
                               constants.OP_STATUS_CANCELING)
598

    
599
    # Cancel here if we were asked to
600
    self._CheckCancel()
601

    
602
    logging.debug("Opcode is now running")
603

    
604
    self._op.status = constants.OP_STATUS_RUNNING
605
    self._op.exec_timestamp = TimeStampNow()
606

    
607
    # And finally replicate the job status
608
    self._queue.UpdateJobUnlocked(self._job)
609

    
610
  @locking.ssynchronized(_QUEUE, shared=1)
611
  def _AppendFeedback(self, timestamp, log_type, log_msg):
612
    """Internal feedback append function, with locks
613

614
    """
615
    self._job.log_serial += 1
616
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
617
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
618

    
619
  def Feedback(self, *args):
620
    """Append a log entry.
621

622
    """
623
    assert len(args) < 3
624

    
625
    if len(args) == 1:
626
      log_type = constants.ELOG_MESSAGE
627
      log_msg = args[0]
628
    else:
629
      (log_type, log_msg) = args
630

    
631
    # The time is split to make serialization easier and not lose
632
    # precision.
633
    timestamp = utils.SplitTime(time.time())
634
    self._AppendFeedback(timestamp, log_type, log_msg)
635

    
636
  def CurrentPriority(self):
637
    """Returns current priority for opcode.
638

639
    """
640
    assert self._op.status in (constants.OP_STATUS_WAITING,
641
                               constants.OP_STATUS_CANCELING)
642

    
643
    # Cancel here if we were asked to
644
    self._CheckCancel()
645

    
646
    return self._op.priority
647

    
648
  def SubmitManyJobs(self, jobs):
649
    """Submits jobs for processing.
650

651
    See L{JobQueue.SubmitManyJobs}.
652

653
    """
654
    # Locking is done in job queue
655
    return self._queue.SubmitManyJobs(jobs)
656

    
657

    
658
class _JobChangesChecker(object):
659
  def __init__(self, fields, prev_job_info, prev_log_serial):
660
    """Initializes this class.
661

662
    @type fields: list of strings
663
    @param fields: Fields requested by LUXI client
664
    @type prev_job_info: string
665
    @param prev_job_info: previous job info, as passed by the LUXI client
666
    @type prev_log_serial: string
667
    @param prev_log_serial: previous job serial, as passed by the LUXI client
668

669
    """
670
    self._squery = _SimpleJobQuery(fields)
671
    self._prev_job_info = prev_job_info
672
    self._prev_log_serial = prev_log_serial
673

    
674
  def __call__(self, job):
675
    """Checks whether job has changed.
676

677
    @type job: L{_QueuedJob}
678
    @param job: Job object
679

680
    """
681
    assert not job.writable, "Expected read-only job"
682

    
683
    status = job.CalcStatus()
684
    job_info = self._squery(job)
685
    log_entries = job.GetLogEntries(self._prev_log_serial)
686

    
687
    # Serializing and deserializing data can cause type changes (e.g. from
688
    # tuple to list) or precision loss. We're doing it here so that we get
689
    # the same modifications as the data received from the client. Without
690
    # this, the comparison afterwards might fail without the data being
691
    # significantly different.
692
    # TODO: we just deserialized from disk, investigate how to make sure that
693
    # the job info and log entries are compatible to avoid this further step.
694
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
695
    # efficient, though floats will be tricky
696
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
697
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
698

    
699
    # Don't even try to wait if the job is no longer running, there will be
700
    # no changes.
701
    if (status not in (constants.JOB_STATUS_QUEUED,
702
                       constants.JOB_STATUS_RUNNING,
703
                       constants.JOB_STATUS_WAITING) or
704
        job_info != self._prev_job_info or
705
        (log_entries and self._prev_log_serial != log_entries[0][0])):
706
      logging.debug("Job %s changed", job.id)
707
      return (job_info, log_entries)
708

    
709
    return None
710

    
711

    
712
class _JobFileChangesWaiter(object):
713
  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
714
    """Initializes this class.
715

716
    @type filename: string
717
    @param filename: Path to job file
718
    @raises errors.InotifyError: if the notifier cannot be setup
719

720
    """
721
    self._wm = _inotify_wm_cls()
722
    self._inotify_handler = \
723
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
724
    self._notifier = \
725
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
726
    try:
727
      self._inotify_handler.enable()
728
    except Exception:
729
      # pyinotify doesn't close file descriptors automatically
730
      self._notifier.stop()
731
      raise
732

    
733
  def _OnInotify(self, notifier_enabled):
734
    """Callback for inotify.
735

736
    """
737
    if not notifier_enabled:
738
      self._inotify_handler.enable()
739

    
740
  def Wait(self, timeout):
741
    """Waits for the job file to change.
742

743
    @type timeout: float
744
    @param timeout: Timeout in seconds
745
    @return: Whether there have been events
746

747
    """
748
    assert timeout >= 0
749
    have_events = self._notifier.check_events(timeout * 1000)
750
    if have_events:
751
      self._notifier.read_events()
752
    self._notifier.process_events()
753
    return have_events
754

    
755
  def Close(self):
756
    """Closes underlying notifier and its file descriptor.
757

758
    """
759
    self._notifier.stop()
760

    
761

    
762
class _JobChangesWaiter(object):
763
  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
764
    """Initializes this class.
765

766
    @type filename: string
767
    @param filename: Path to job file
768

769
    """
770
    self._filewaiter = None
771
    self._filename = filename
772
    self._waiter_cls = _waiter_cls
773

    
774
  def Wait(self, timeout):
775
    """Waits for a job to change.
776

777
    @type timeout: float
778
    @param timeout: Timeout in seconds
779
    @return: Whether there have been events
780

781
    """
782
    if self._filewaiter:
783
      return self._filewaiter.Wait(timeout)
784

    
785
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
786
    # If this point is reached, return immediately and let caller check the job
787
    # file again in case there were changes since the last check. This avoids a
788
    # race condition.
789
    self._filewaiter = self._waiter_cls(self._filename)
790

    
791
    return True
792

    
793
  def Close(self):
794
    """Closes underlying waiter.
795

796
    """
797
    if self._filewaiter:
798
      self._filewaiter.Close()
799

    
800

    
801
class _WaitForJobChangesHelper(object):
802
  """Helper class using inotify to wait for changes in a job file.
803

804
  This class takes a previous job status and serial, and alerts the client when
805
  the current job status has changed.
806

807
  """
808
  @staticmethod
809
  def _CheckForChanges(counter, job_load_fn, check_fn):
810
    if counter.next() > 0:
811
      # If this isn't the first check the job is given some more time to change
812
      # again. This gives better performance for jobs generating many
813
      # changes/messages.
814
      time.sleep(0.1)
815

    
816
    job = job_load_fn()
817
    if not job:
818
      raise errors.JobLost()
819

    
820
    result = check_fn(job)
821
    if result is None:
822
      raise utils.RetryAgain()
823

    
824
    return result
825

    
826
  def __call__(self, filename, job_load_fn,
827
               fields, prev_job_info, prev_log_serial, timeout,
828
               _waiter_cls=_JobChangesWaiter):
829
    """Waits for changes on a job.
830

831
    @type filename: string
832
    @param filename: File on which to wait for changes
833
    @type job_load_fn: callable
834
    @param job_load_fn: Function to load job
835
    @type fields: list of strings
836
    @param fields: Which fields to check for changes
837
    @type prev_job_info: list or None
838
    @param prev_job_info: Last job information returned
839
    @type prev_log_serial: int
840
    @param prev_log_serial: Last job message serial number
841
    @type timeout: float
842
    @param timeout: maximum time to wait in seconds
843

844
    """
845
    counter = itertools.count()
846
    try:
847
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
848
      waiter = _waiter_cls(filename)
849
      try:
850
        return utils.Retry(compat.partial(self._CheckForChanges,
851
                                          counter, job_load_fn, check_fn),
852
                           utils.RETRY_REMAINING_TIME, timeout,
853
                           wait_fn=waiter.Wait)
854
      finally:
855
        waiter.Close()
856
    except errors.JobLost:
857
      return None
858
    except utils.RetryTimeout:
859
      return constants.JOB_NOTCHANGED
860

    
861

    
862
def _EncodeOpError(err):
863
  """Encodes an error which occurred while processing an opcode.
864

865
  """
866
  if isinstance(err, errors.GenericError):
867
    to_encode = err
868
  else:
869
    to_encode = errors.OpExecError(str(err))
870

    
871
  return errors.EncodeException(to_encode)
872

    
873

    
874
class _TimeoutStrategyWrapper:
875
  def __init__(self, fn):
876
    """Initializes this class.
877

878
    """
879
    self._fn = fn
880
    self._next = None
881

    
882
  def _Advance(self):
883
    """Gets the next timeout if necessary.
884

885
    """
886
    if self._next is None:
887
      self._next = self._fn()
888

    
889
  def Peek(self):
890
    """Returns the next timeout.
891

892
    """
893
    self._Advance()
894
    return self._next
895

    
896
  def Next(self):
897
    """Returns the current timeout and advances the internal state.
898

899
    """
900
    self._Advance()
901
    result = self._next
902
    self._next = None
903
    return result
904

    
905

    
906
class _OpExecContext:
907
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
908
    """Initializes this class.
909

910
    """
911
    self.op = op
912
    self.index = index
913
    self.log_prefix = log_prefix
914
    self.summary = op.input.Summary()
915

    
916
    # Create local copy to modify
917
    if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
918
      self.jobdeps = op.input.depends[:]
919
    else:
920
      self.jobdeps = None
921

    
922
    self._timeout_strategy_factory = timeout_strategy_factory
923
    self._ResetTimeoutStrategy()
924

    
925
  def _ResetTimeoutStrategy(self):
926
    """Creates a new timeout strategy.
927

928
    """
929
    self._timeout_strategy = \
930
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
931

    
932
  def CheckPriorityIncrease(self):
933
    """Checks whether priority can and should be increased.
934

935
    Called when locks couldn't be acquired.
936

937
    """
938
    op = self.op
939

    
940
    # Exhausted all retries and next round should not use blocking acquire
941
    # for locks?
942
    if (self._timeout_strategy.Peek() is None and
943
        op.priority > constants.OP_PRIO_HIGHEST):
944
      logging.debug("Increasing priority")
945
      op.priority -= 1
946
      self._ResetTimeoutStrategy()
947
      return True
948

    
949
    return False
950

    
951
  def GetNextLockTimeout(self):
952
    """Returns the next lock acquire timeout.
953

954
    """
955
    return self._timeout_strategy.Next()
956

    
957

    
958
class _JobProcessor(object):
959
  (DEFER,
960
   WAITDEP,
961
   FINISHED) = range(1, 4)
962

    
963
  def __init__(self, queue, opexec_fn, job,
964
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
965
    """Initializes this class.
966

967
    """
968
    self.queue = queue
969
    self.opexec_fn = opexec_fn
970
    self.job = job
971
    self._timeout_strategy_factory = _timeout_strategy_factory
972

    
973
  @staticmethod
974
  def _FindNextOpcode(job, timeout_strategy_factory):
975
    """Locates the next opcode to run.
976

977
    @type job: L{_QueuedJob}
978
    @param job: Job object
979
    @param timeout_strategy_factory: Callable to create new timeout strategy
980

981
    """
982
    # Create some sort of a cache to speed up locating next opcode for future
983
    # lookups
984
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
985
    # pending and one for processed ops.
986
    if job.ops_iter is None:
987
      job.ops_iter = enumerate(job.ops)
988

    
989
    # Find next opcode to run
990
    while True:
991
      try:
992
        (idx, op) = job.ops_iter.next()
993
      except StopIteration:
994
        raise errors.ProgrammerError("Called for a finished job")
995

    
996
      if op.status == constants.OP_STATUS_RUNNING:
997
        # Found an opcode already marked as running
998
        raise errors.ProgrammerError("Called for job marked as running")
999

    
1000
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
1001
                             timeout_strategy_factory)
1002

    
1003
      if op.status not in constants.OPS_FINALIZED:
1004
        return opctx
1005

    
1006
      # This is a job that was partially completed before master daemon
1007
      # shutdown, so it can be expected that some opcodes are already
1008
      # completed successfully (if any did error out, then the whole job
1009
      # should have been aborted and not resubmitted for processing).
1010
      logging.info("%s: opcode %s already processed, skipping",
1011
                   opctx.log_prefix, opctx.summary)
1012

    
1013
  @staticmethod
1014
  def _MarkWaitlock(job, op):
1015
    """Marks an opcode as waiting for locks.
1016

1017
    The job's start timestamp is also set if necessary.
1018

1019
    @type job: L{_QueuedJob}
1020
    @param job: Job object
1021
    @type op: L{_QueuedOpCode}
1022
    @param op: Opcode object
1023

1024
    """
1025
    assert op in job.ops
1026
    assert op.status in (constants.OP_STATUS_QUEUED,
1027
                         constants.OP_STATUS_WAITING)
1028

    
1029
    update = False
1030

    
1031
    op.result = None
1032

    
1033
    if op.status == constants.OP_STATUS_QUEUED:
1034
      op.status = constants.OP_STATUS_WAITING
1035
      update = True
1036

    
1037
    if op.start_timestamp is None:
1038
      op.start_timestamp = TimeStampNow()
1039
      update = True
1040

    
1041
    if job.start_timestamp is None:
1042
      job.start_timestamp = op.start_timestamp
1043
      update = True
1044

    
1045
    assert op.status == constants.OP_STATUS_WAITING
1046

    
1047
    return update
1048

    
1049
  @staticmethod
1050
  def _CheckDependencies(queue, job, opctx):
1051
    """Checks if an opcode has dependencies and if so, processes them.
1052

1053
    @type queue: L{JobQueue}
1054
    @param queue: Queue object
1055
    @type job: L{_QueuedJob}
1056
    @param job: Job object
1057
    @type opctx: L{_OpExecContext}
1058
    @param opctx: Opcode execution context
1059
    @rtype: bool
1060
    @return: Whether opcode will be re-scheduled by dependency tracker
1061

1062
    """
1063
    op = opctx.op
1064

    
1065
    result = False
1066

    
1067
    while opctx.jobdeps:
1068
      (dep_job_id, dep_status) = opctx.jobdeps[0]
1069

    
1070
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1071
                                                          dep_status)
1072
      assert ht.TNonEmptyString(depmsg), "No dependency message"
1073

    
1074
      logging.info("%s: %s", opctx.log_prefix, depmsg)
1075

    
1076
      if depresult == _JobDependencyManager.CONTINUE:
1077
        # Remove dependency and continue
1078
        opctx.jobdeps.pop(0)
1079

    
1080
      elif depresult == _JobDependencyManager.WAIT:
1081
        # Need to wait for notification, dependency tracker will re-add job
1082
        # to workerpool
1083
        result = True
1084
        break
1085

    
1086
      elif depresult == _JobDependencyManager.CANCEL:
1087
        # Job was cancelled, cancel this job as well
1088
        job.Cancel()
1089
        assert op.status == constants.OP_STATUS_CANCELING
1090
        break
1091

    
1092
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1093
                         _JobDependencyManager.ERROR):
1094
        # Job failed or there was an error, this job must fail
1095
        op.status = constants.OP_STATUS_ERROR
1096
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1097
        break
1098

    
1099
      else:
1100
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1101
                                     depresult)
1102

    
1103
    return result
1104

    
1105
  def _ExecOpCodeUnlocked(self, opctx):
1106
    """Processes one opcode and returns the result.
1107

1108
    """
1109
    op = opctx.op
1110

    
1111
    assert op.status in (constants.OP_STATUS_WAITING,
1112
                         constants.OP_STATUS_CANCELING)
1113

    
1114
    # The very last check if the job was cancelled before trying to execute
1115
    if op.status == constants.OP_STATUS_CANCELING:
1116
      return (constants.OP_STATUS_CANCELING, None)
1117

    
1118
    timeout = opctx.GetNextLockTimeout()
1119

    
1120
    try:
1121
      # Make sure not to hold queue lock while calling ExecOpCode
1122
      result = self.opexec_fn(op.input,
1123
                              _OpExecCallbacks(self.queue, self.job, op),
1124
                              timeout=timeout)
1125
    except mcpu.LockAcquireTimeout:
1126
      assert timeout is not None, "Received timeout for blocking acquire"
1127
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1128

    
1129
      assert op.status in (constants.OP_STATUS_WAITING,
1130
                           constants.OP_STATUS_CANCELING)
1131

    
1132
      # Was job cancelled while we were waiting for the lock?
1133
      if op.status == constants.OP_STATUS_CANCELING:
1134
        return (constants.OP_STATUS_CANCELING, None)
1135

    
1136
      # Queue is shutting down, return to queued
1137
      if not self.queue.AcceptingJobsUnlocked():
1138
        return (constants.OP_STATUS_QUEUED, None)
1139

    
1140
      # Stay in waitlock while trying to re-acquire lock
1141
      return (constants.OP_STATUS_WAITING, None)
1142
    except CancelJob:
1143
      logging.exception("%s: Canceling job", opctx.log_prefix)
1144
      assert op.status == constants.OP_STATUS_CANCELING
1145
      return (constants.OP_STATUS_CANCELING, None)
1146

    
1147
    except QueueShutdown:
1148
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1149

    
1150
      assert op.status == constants.OP_STATUS_WAITING
1151

    
1152
      # Job hadn't been started yet, so it should return to the queue
1153
      return (constants.OP_STATUS_QUEUED, None)
1154

    
1155
    except Exception, err: # pylint: disable=W0703
1156
      logging.exception("%s: Caught exception in %s",
1157
                        opctx.log_prefix, opctx.summary)
1158
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1159
    else:
1160
      logging.debug("%s: %s successful",
1161
                    opctx.log_prefix, opctx.summary)
1162
      return (constants.OP_STATUS_SUCCESS, result)
1163

    
1164
  def __call__(self, _nextop_fn=None):
1165
    """Continues execution of a job.
1166

1167
    @param _nextop_fn: Callback function for tests
1168
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1169
      be deferred and C{WAITDEP} if the dependency manager
1170
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1171

1172
    """
1173
    queue = self.queue
1174
    job = self.job
1175

    
1176
    logging.debug("Processing job %s", job.id)
1177

    
1178
    queue.acquire(shared=1)
1179
    try:
1180
      opcount = len(job.ops)
1181

    
1182
      assert job.writable, "Expected writable job"
1183

    
1184
      # Don't do anything for finalized jobs
1185
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1186
        return self.FINISHED
1187

    
1188
      # Is a previous opcode still pending?
1189
      if job.cur_opctx:
1190
        opctx = job.cur_opctx
1191
        job.cur_opctx = None
1192
      else:
1193
        if __debug__ and _nextop_fn:
1194
          _nextop_fn()
1195
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1196

    
1197
      op = opctx.op
1198

    
1199
      # Consistency check
1200
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1201
                                     constants.OP_STATUS_CANCELING)
1202
                        for i in job.ops[opctx.index + 1:])
1203

    
1204
      assert op.status in (constants.OP_STATUS_QUEUED,
1205
                           constants.OP_STATUS_WAITING,
1206
                           constants.OP_STATUS_CANCELING)
1207

    
1208
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1209
              op.priority >= constants.OP_PRIO_HIGHEST)
1210

    
1211
      waitjob = None
1212

    
1213
      if op.status != constants.OP_STATUS_CANCELING:
1214
        assert op.status in (constants.OP_STATUS_QUEUED,
1215
                             constants.OP_STATUS_WAITING)
1216

    
1217
        # Prepare to start opcode
1218
        if self._MarkWaitlock(job, op):
1219
          # Write to disk
1220
          queue.UpdateJobUnlocked(job)
1221

    
1222
        assert op.status == constants.OP_STATUS_WAITING
1223
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1224
        assert job.start_timestamp and op.start_timestamp
1225
        assert waitjob is None
1226

    
1227
        # Check if waiting for a job is necessary
1228
        waitjob = self._CheckDependencies(queue, job, opctx)
1229

    
1230
        assert op.status in (constants.OP_STATUS_WAITING,
1231
                             constants.OP_STATUS_CANCELING,
1232
                             constants.OP_STATUS_ERROR)
1233

    
1234
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1235
                                         constants.OP_STATUS_ERROR)):
1236
          logging.info("%s: opcode %s waiting for locks",
1237
                       opctx.log_prefix, opctx.summary)
1238

    
1239
          assert not opctx.jobdeps, "Not all dependencies were removed"
1240

    
1241
          queue.release()
1242
          try:
1243
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1244
          finally:
1245
            queue.acquire(shared=1)
1246

    
1247
          op.status = op_status
1248
          op.result = op_result
1249

    
1250
          assert not waitjob
1251

    
1252
        if op.status in (constants.OP_STATUS_WAITING,
1253
                         constants.OP_STATUS_QUEUED):
1254
          # waiting: Couldn't get locks in time
1255
          # queued: Queue is shutting down
1256
          assert not op.end_timestamp
1257
        else:
1258
          # Finalize opcode
1259
          op.end_timestamp = TimeStampNow()
1260

    
1261
          if op.status == constants.OP_STATUS_CANCELING:
1262
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1263
                                  for i in job.ops[opctx.index:])
1264
          else:
1265
            assert op.status in constants.OPS_FINALIZED
1266

    
1267
      if op.status == constants.OP_STATUS_QUEUED:
1268
        # Queue is shutting down
1269
        assert not waitjob
1270

    
1271
        finalize = False
1272

    
1273
        # Reset context
1274
        job.cur_opctx = None
1275

    
1276
        # In no case must the status be finalized here
1277
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1278

    
1279
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1280
        finalize = False
1281

    
1282
        if not waitjob and opctx.CheckPriorityIncrease():
1283
          # Priority was changed, need to update on-disk file
1284
          queue.UpdateJobUnlocked(job)
1285

    
1286
        # Keep around for another round
1287
        job.cur_opctx = opctx
1288

    
1289
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1290
                op.priority >= constants.OP_PRIO_HIGHEST)
1291

    
1292
        # In no case must the status be finalized here
1293
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1294

    
1295
      else:
1296
        # Ensure all opcodes so far have been successful
1297
        assert (opctx.index == 0 or
1298
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1299
                           for i in job.ops[:opctx.index]))
1300

    
1301
        # Reset context
1302
        job.cur_opctx = None
1303

    
1304
        if op.status == constants.OP_STATUS_SUCCESS:
1305
          finalize = False
1306

    
1307
        elif op.status == constants.OP_STATUS_ERROR:
1308
          # Ensure failed opcode has an exception as its result
1309
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1310

    
1311
          to_encode = errors.OpExecError("Preceding opcode failed")
1312
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1313
                                _EncodeOpError(to_encode))
1314
          finalize = True
1315

    
1316
          # Consistency check
1317
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1318
                            errors.GetEncodedError(i.result)
1319
                            for i in job.ops[opctx.index:])
1320

    
1321
        elif op.status == constants.OP_STATUS_CANCELING:
1322
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1323
                                "Job canceled by request")
1324
          finalize = True
1325

    
1326
        else:
1327
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1328

    
1329
        if opctx.index == (opcount - 1):
1330
          # Finalize on last opcode
1331
          finalize = True
1332

    
1333
        if finalize:
1334
          # All opcodes have been run, finalize job
1335
          job.Finalize()
1336

    
1337
        # Write to disk. If the job status is final, this is the final write
1338
        # allowed. Once the file has been written, it can be archived anytime.
1339
        queue.UpdateJobUnlocked(job)
1340

    
1341
        assert not waitjob
1342

    
1343
        if finalize:
1344
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1345
          return self.FINISHED
1346

    
1347
      assert not waitjob or queue.depmgr.JobWaiting(job)
1348

    
1349
      if waitjob:
1350
        return self.WAITDEP
1351
      else:
1352
        return self.DEFER
1353
    finally:
1354
      assert job.writable, "Job became read-only while being processed"
1355
      queue.release()
1356

    
1357

    
1358
def _EvaluateJobProcessorResult(depmgr, job, result):
1359
  """Looks at a result from L{_JobProcessor} for a job.
1360

1361
  To be used in a L{_JobQueueWorker}.
1362

1363
  """
1364
  if result == _JobProcessor.FINISHED:
1365
    # Notify waiting jobs
1366
    depmgr.NotifyWaiters(job.id)
1367

    
1368
  elif result == _JobProcessor.DEFER:
1369
    # Schedule again
1370
    raise workerpool.DeferTask(priority=job.CalcPriority())
1371

    
1372
  elif result == _JobProcessor.WAITDEP:
1373
    # No-op, dependency manager will re-schedule
1374
    pass
1375

    
1376
  else:
1377
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1378
                                 (result, ))
1379

    
1380

    
1381
class _JobQueueWorker(workerpool.BaseWorker):
1382
  """The actual job workers.
1383

1384
  """
1385
  def RunTask(self, job): # pylint: disable=W0221
1386
    """Job executor.
1387

1388
    @type job: L{_QueuedJob}
1389
    @param job: the job to be processed
1390

1391
    """
1392
    assert job.writable, "Expected writable job"
1393

    
1394
    # Ensure only one worker is active on a single job. If a job registers for
1395
    # a dependency job, and the other job notifies before the first worker is
1396
    # done, the job can end up in the tasklist more than once.
1397
    job.processor_lock.acquire()
1398
    try:
1399
      return self._RunTaskInner(job)
1400
    finally:
1401
      job.processor_lock.release()
1402

    
1403
  def _RunTaskInner(self, job):
1404
    """Executes a job.
1405

1406
    Must be called with per-job lock acquired.
1407

1408
    """
1409
    queue = job.queue
1410
    assert queue == self.pool.queue
1411

    
1412
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1413
    setname_fn(None)
1414

    
1415
    proc = mcpu.Processor(queue.context, job.id)
1416

    
1417
    # Create wrapper for setting thread name
1418
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1419
                                    proc.ExecOpCode)
1420

    
1421
    _EvaluateJobProcessorResult(queue.depmgr, job,
1422
                                _JobProcessor(queue, wrap_execop_fn, job)())
1423

    
1424
  @staticmethod
1425
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1426
    """Updates the worker thread name to include a short summary of the opcode.
1427

1428
    @param setname_fn: Callable setting worker thread name
1429
    @param execop_fn: Callable for executing opcode (usually
1430
                      L{mcpu.Processor.ExecOpCode})
1431

1432
    """
1433
    setname_fn(op)
1434
    try:
1435
      return execop_fn(op, *args, **kwargs)
1436
    finally:
1437
      setname_fn(None)
1438

    
1439
  @staticmethod
1440
  def _GetWorkerName(job, op):
1441
    """Sets the worker thread name.
1442

1443
    @type job: L{_QueuedJob}
1444
    @type op: L{opcodes.OpCode}
1445

1446
    """
1447
    parts = ["Job%s" % job.id]
1448

    
1449
    if op:
1450
      parts.append(op.TinySummary())
1451

    
1452
    return "/".join(parts)
1453

    
1454

    
1455
class _JobQueueWorkerPool(workerpool.WorkerPool):
1456
  """Simple class implementing a job-processing workerpool.
1457

1458
  """
1459
  def __init__(self, queue):
1460
    super(_JobQueueWorkerPool, self).__init__("Jq",
1461
                                              JOBQUEUE_THREADS,
1462
                                              _JobQueueWorker)
1463
    self.queue = queue
1464

    
1465

    
1466
class _JobDependencyManager:
1467
  """Keeps track of job dependencies.
1468

1469
  """
1470
  (WAIT,
1471
   ERROR,
1472
   CANCEL,
1473
   CONTINUE,
1474
   WRONGSTATUS) = range(1, 6)
1475

    
1476
  def __init__(self, getstatus_fn, enqueue_fn):
1477
    """Initializes this class.
1478

1479
    """
1480
    self._getstatus_fn = getstatus_fn
1481
    self._enqueue_fn = enqueue_fn
1482

    
1483
    self._waiters = {}
1484
    self._lock = locking.SharedLock("JobDepMgr")
1485

    
1486
  @locking.ssynchronized(_LOCK, shared=1)
1487
  def GetLockInfo(self, requested): # pylint: disable=W0613
1488
    """Retrieves information about waiting jobs.
1489

1490
    @type requested: set
1491
    @param requested: Requested information, see C{query.LQ_*}
1492

1493
    """
1494
    # No need to sort here, that's being done by the lock manager and query
1495
    # library. There are no priorities for notifying jobs, hence all show up as
1496
    # one item under "pending".
1497
    return [("job/%s" % job_id, None, None,
1498
             [("job", [job.id for job in waiters])])
1499
            for job_id, waiters in self._waiters.items()
1500
            if waiters]
1501

    
1502
  @locking.ssynchronized(_LOCK, shared=1)
1503
  def JobWaiting(self, job):
1504
    """Checks if a job is waiting.
1505

1506
    """
1507
    return compat.any(job in jobs
1508
                      for jobs in self._waiters.values())
1509

    
1510
  @locking.ssynchronized(_LOCK)
1511
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1512
    """Checks if a dependency job has the requested status.
1513

1514
    If the other job is not yet in a finalized status, the calling job will be
1515
    notified (re-added to the workerpool) at a later point.
1516

1517
    @type job: L{_QueuedJob}
1518
    @param job: Job object
1519
    @type dep_job_id: int
1520
    @param dep_job_id: ID of dependency job
1521
    @type dep_status: list
1522
    @param dep_status: Required status
1523

1524
    """
1525
    assert ht.TJobId(job.id)
1526
    assert ht.TJobId(dep_job_id)
1527
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1528

    
1529
    if job.id == dep_job_id:
1530
      return (self.ERROR, "Job can't depend on itself")
1531

    
1532
    # Get status of dependency job
1533
    try:
1534
      status = self._getstatus_fn(dep_job_id)
1535
    except errors.JobLost, err:
1536
      return (self.ERROR, "Dependency error: %s" % err)
1537

    
1538
    assert status in constants.JOB_STATUS_ALL
1539

    
1540
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1541

    
1542
    if status not in constants.JOBS_FINALIZED:
1543
      # Register for notification and wait for job to finish
1544
      job_id_waiters.add(job)
1545
      return (self.WAIT,
1546
              "Need to wait for job %s, wanted status '%s'" %
1547
              (dep_job_id, dep_status))
1548

    
1549
    # Remove from waiters list
1550
    if job in job_id_waiters:
1551
      job_id_waiters.remove(job)
1552

    
1553
    if (status == constants.JOB_STATUS_CANCELED and
1554
        constants.JOB_STATUS_CANCELED not in dep_status):
1555
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1556

    
1557
    elif not dep_status or status in dep_status:
1558
      return (self.CONTINUE,
1559
              "Dependency job %s finished with status '%s'" %
1560
              (dep_job_id, status))
1561

    
1562
    else:
1563
      return (self.WRONGSTATUS,
1564
              "Dependency job %s finished with status '%s',"
1565
              " not one of '%s' as required" %
1566
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1567

    
1568
  def _RemoveEmptyWaitersUnlocked(self):
1569
    """Remove all jobs without actual waiters.
1570

1571
    """
1572
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1573
                   if not waiters]:
1574
      del self._waiters[job_id]
1575

    
1576
  def NotifyWaiters(self, job_id):
1577
    """Notifies all jobs waiting for a certain job ID.
1578

1579
    @attention: Do not call until L{CheckAndRegister} returned a status other
1580
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1581
    @type job_id: int
1582
    @param job_id: Job ID
1583

1584
    """
1585
    assert ht.TJobId(job_id)
1586

    
1587
    self._lock.acquire()
1588
    try:
1589
      self._RemoveEmptyWaitersUnlocked()
1590

    
1591
      jobs = self._waiters.pop(job_id, None)
1592
    finally:
1593
      self._lock.release()
1594

    
1595
    if jobs:
1596
      # Re-add jobs to workerpool
1597
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1598
                    len(jobs), job_id)
1599
      self._enqueue_fn(jobs)
1600

    
1601

    
1602
def _RequireOpenQueue(fn):
1603
  """Decorator for "public" functions.
1604

1605
  This function should be used for all 'public' functions. That is,
1606
  functions usually called from other classes. Note that this should
1607
  be applied only to methods (not plain functions), since it expects
1608
  that the decorated function is called with a first argument that has
1609
  a '_queue_filelock' argument.
1610

1611
  @warning: Use this decorator only after locking.ssynchronized
1612

1613
  Example::
1614
    @locking.ssynchronized(_LOCK)
1615
    @_RequireOpenQueue
1616
    def Example(self):
1617
      pass
1618

1619
  """
1620
  def wrapper(self, *args, **kwargs):
1621
    # pylint: disable=W0212
1622
    assert self._queue_filelock is not None, "Queue should be open"
1623
    return fn(self, *args, **kwargs)
1624
  return wrapper
1625

    
1626

    
1627
def _RequireNonDrainedQueue(fn):
1628
  """Decorator checking for a non-drained queue.
1629

1630
  To be used with functions submitting new jobs.
1631

1632
  """
1633
  def wrapper(self, *args, **kwargs):
1634
    """Wrapper function.
1635

1636
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1637

1638
    """
1639
    # Ok when sharing the big job queue lock, as the drain file is created when
1640
    # the lock is exclusive.
1641
    # Needs access to protected member, pylint: disable=W0212
1642
    if self._drained:
1643
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1644

    
1645
    if not self._accepting_jobs:
1646
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1647

    
1648
    return fn(self, *args, **kwargs)
1649
  return wrapper
1650

    
1651

    
1652
class JobQueue(object):
1653
  """Queue used to manage the jobs.
1654

1655
  """
1656
  def __init__(self, context):
1657
    """Constructor for JobQueue.
1658

1659
    The constructor will initialize the job queue object and then
1660
    start loading the current jobs from disk, either for starting them
1661
    (if they were queue) or for aborting them (if they were already
1662
    running).
1663

1664
    @type context: GanetiContext
1665
    @param context: the context object for access to the configuration
1666
        data and other ganeti objects
1667

1668
    """
1669
    self.context = context
1670
    self._memcache = weakref.WeakValueDictionary()
1671
    self._my_hostname = netutils.Hostname.GetSysName()
1672

    
1673
    # The Big JobQueue lock. If a code block or method acquires it in shared
1674
    # mode safe it must guarantee concurrency with all the code acquiring it in
1675
    # shared mode, including itself. In order not to acquire it at all
1676
    # concurrency must be guaranteed with all code acquiring it in shared mode
1677
    # and all code acquiring it exclusively.
1678
    self._lock = locking.SharedLock("JobQueue")
1679

    
1680
    self.acquire = self._lock.acquire
1681
    self.release = self._lock.release
1682

    
1683
    # Accept jobs by default
1684
    self._accepting_jobs = True
1685

    
1686
    # Initialize the queue, and acquire the filelock.
1687
    # This ensures no other process is working on the job queue.
1688
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1689

    
1690
    # Read serial file
1691
    self._last_serial = jstore.ReadSerial()
1692
    assert self._last_serial is not None, ("Serial file was modified between"
1693
                                           " check in jstore and here")
1694

    
1695
    # Get initial list of nodes
1696
    self._nodes = dict((n.name, n.primary_ip)
1697
                       for n in self.context.cfg.GetAllNodesInfo().values()
1698
                       if n.master_candidate)
1699

    
1700
    # Remove master node
1701
    self._nodes.pop(self._my_hostname, None)
1702

    
1703
    # TODO: Check consistency across nodes
1704

    
1705
    self._queue_size = None
1706
    self._UpdateQueueSizeUnlocked()
1707
    assert ht.TInt(self._queue_size)
1708
    self._drained = jstore.CheckDrainFlag()
1709

    
1710
    # Job dependencies
1711
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1712
                                        self._EnqueueJobs)
1713
    self.context.glm.AddToLockMonitor(self.depmgr)
1714

    
1715
    # Setup worker pool
1716
    self._wpool = _JobQueueWorkerPool(self)
1717
    try:
1718
      self._InspectQueue()
1719
    except:
1720
      self._wpool.TerminateWorkers()
1721
      raise
1722

    
1723
  @locking.ssynchronized(_LOCK)
1724
  @_RequireOpenQueue
1725
  def _InspectQueue(self):
1726
    """Loads the whole job queue and resumes unfinished jobs.
1727

1728
    This function needs the lock here because WorkerPool.AddTask() may start a
1729
    job while we're still doing our work.
1730

1731
    """
1732
    logging.info("Inspecting job queue")
1733

    
1734
    restartjobs = []
1735

    
1736
    all_job_ids = self._GetJobIDsUnlocked()
1737
    jobs_count = len(all_job_ids)
1738
    lastinfo = time.time()
1739
    for idx, job_id in enumerate(all_job_ids):
1740
      # Give an update every 1000 jobs or 10 seconds
1741
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1742
          idx == (jobs_count - 1)):
1743
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1744
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1745
        lastinfo = time.time()
1746

    
1747
      job = self._LoadJobUnlocked(job_id)
1748

    
1749
      # a failure in loading the job can cause 'None' to be returned
1750
      if job is None:
1751
        continue
1752

    
1753
      status = job.CalcStatus()
1754

    
1755
      if status == constants.JOB_STATUS_QUEUED:
1756
        restartjobs.append(job)
1757

    
1758
      elif status in (constants.JOB_STATUS_RUNNING,
1759
                      constants.JOB_STATUS_WAITING,
1760
                      constants.JOB_STATUS_CANCELING):
1761
        logging.warning("Unfinished job %s found: %s", job.id, job)
1762

    
1763
        if status == constants.JOB_STATUS_WAITING:
1764
          # Restart job
1765
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1766
          restartjobs.append(job)
1767
        else:
1768
          to_encode = errors.OpExecError("Unclean master daemon shutdown")
1769
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1770
                                _EncodeOpError(to_encode))
1771
          job.Finalize()
1772

    
1773
        self.UpdateJobUnlocked(job)
1774

    
1775
    if restartjobs:
1776
      logging.info("Restarting %s jobs", len(restartjobs))
1777
      self._EnqueueJobsUnlocked(restartjobs)
1778

    
1779
    logging.info("Job queue inspection finished")
1780

    
1781
  def _GetRpc(self, address_list):
1782
    """Gets RPC runner with context.
1783

1784
    """
1785
    return rpc.JobQueueRunner(self.context, address_list)
1786

    
1787
  @locking.ssynchronized(_LOCK)
1788
  @_RequireOpenQueue
1789
  def AddNode(self, node):
1790
    """Register a new node with the queue.
1791

1792
    @type node: L{objects.Node}
1793
    @param node: the node object to be added
1794

1795
    """
1796
    node_name = node.name
1797
    assert node_name != self._my_hostname
1798

    
1799
    # Clean queue directory on added node
1800
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1801
    msg = result.fail_msg
1802
    if msg:
1803
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1804
                      node_name, msg)
1805

    
1806
    if not node.master_candidate:
1807
      # remove if existing, ignoring errors
1808
      self._nodes.pop(node_name, None)
1809
      # and skip the replication of the job ids
1810
      return
1811

    
1812
    # Upload the whole queue excluding archived jobs
1813
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1814

    
1815
    # Upload current serial file
1816
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1817

    
1818
    # Static address list
1819
    addrs = [node.primary_ip]
1820

    
1821
    for file_name in files:
1822
      # Read file content
1823
      content = utils.ReadFile(file_name)
1824

    
1825
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1826
                             file_name, content)
1827
      msg = result[node_name].fail_msg
1828
      if msg:
1829
        logging.error("Failed to upload file %s to node %s: %s",
1830
                      file_name, node_name, msg)
1831

    
1832
    # Set queue drained flag
1833
    result = \
1834
      self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1835
                                                       self._drained)
1836
    msg = result[node_name].fail_msg
1837
    if msg:
1838
      logging.error("Failed to set queue drained flag on node %s: %s",
1839
                    node_name, msg)
1840

    
1841
    self._nodes[node_name] = node.primary_ip
1842

    
1843
  @locking.ssynchronized(_LOCK)
1844
  @_RequireOpenQueue
1845
  def RemoveNode(self, node_name):
1846
    """Callback called when removing nodes from the cluster.
1847

1848
    @type node_name: str
1849
    @param node_name: the name of the node to remove
1850

1851
    """
1852
    self._nodes.pop(node_name, None)
1853

    
1854
  @staticmethod
1855
  def _CheckRpcResult(result, nodes, failmsg):
1856
    """Verifies the status of an RPC call.
1857

1858
    Since we aim to keep consistency should this node (the current
1859
    master) fail, we will log errors if our rpc fail, and especially
1860
    log the case when more than half of the nodes fails.
1861

1862
    @param result: the data as returned from the rpc call
1863
    @type nodes: list
1864
    @param nodes: the list of nodes we made the call to
1865
    @type failmsg: str
1866
    @param failmsg: the identifier to be used for logging
1867

1868
    """
1869
    failed = []
1870
    success = []
1871

    
1872
    for node in nodes:
1873
      msg = result[node].fail_msg
1874
      if msg:
1875
        failed.append(node)
1876
        logging.error("RPC call %s (%s) failed on node %s: %s",
1877
                      result[node].call, failmsg, node, msg)
1878
      else:
1879
        success.append(node)
1880

    
1881
    # +1 for the master node
1882
    if (len(success) + 1) < len(failed):
1883
      # TODO: Handle failing nodes
1884
      logging.error("More than half of the nodes failed")
1885

    
1886
  def _GetNodeIp(self):
1887
    """Helper for returning the node name/ip list.
1888

1889
    @rtype: (list, list)
1890
    @return: a tuple of two lists, the first one with the node
1891
        names and the second one with the node addresses
1892

1893
    """
1894
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1895
    name_list = self._nodes.keys()
1896
    addr_list = [self._nodes[name] for name in name_list]
1897
    return name_list, addr_list
1898

    
1899
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1900
    """Writes a file locally and then replicates it to all nodes.
1901

1902
    This function will replace the contents of a file on the local
1903
    node and then replicate it to all the other nodes we have.
1904

1905
    @type file_name: str
1906
    @param file_name: the path of the file to be replicated
1907
    @type data: str
1908
    @param data: the new contents of the file
1909
    @type replicate: boolean
1910
    @param replicate: whether to spread the changes to the remote nodes
1911

1912
    """
1913
    getents = runtime.GetEnts()
1914
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1915
                    gid=getents.daemons_gid,
1916
                    mode=constants.JOB_QUEUE_FILES_PERMS)
1917

    
1918
    if replicate:
1919
      names, addrs = self._GetNodeIp()
1920
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1921
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1922

    
1923
  def _RenameFilesUnlocked(self, rename):
1924
    """Renames a file locally and then replicate the change.
1925

1926
    This function will rename a file in the local queue directory
1927
    and then replicate this rename to all the other nodes we have.
1928

1929
    @type rename: list of (old, new)
1930
    @param rename: List containing tuples mapping old to new names
1931

1932
    """
1933
    # Rename them locally
1934
    for old, new in rename:
1935
      utils.RenameFile(old, new, mkdir=True)
1936

    
1937
    # ... and on all nodes
1938
    names, addrs = self._GetNodeIp()
1939
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1940
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1941

    
1942
  def _NewSerialsUnlocked(self, count):
1943
    """Generates a new job identifier.
1944

1945
    Job identifiers are unique during the lifetime of a cluster.
1946

1947
    @type count: integer
1948
    @param count: how many serials to return
1949
    @rtype: list of int
1950
    @return: a list of job identifiers.
1951

1952
    """
1953
    assert ht.TNonNegativeInt(count)
1954

    
1955
    # New number
1956
    serial = self._last_serial + count
1957

    
1958
    # Write to file
1959
    self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1960
                             "%s\n" % serial, True)
1961

    
1962
    result = [jstore.FormatJobID(v)
1963
              for v in range(self._last_serial + 1, serial + 1)]
1964

    
1965
    # Keep it only if we were able to write the file
1966
    self._last_serial = serial
1967

    
1968
    assert len(result) == count
1969

    
1970
    return result
1971

    
1972
  @staticmethod
1973
  def _GetJobPath(job_id):
1974
    """Returns the job file for a given job id.
1975

1976
    @type job_id: str
1977
    @param job_id: the job identifier
1978
    @rtype: str
1979
    @return: the path to the job file
1980

1981
    """
1982
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1983

    
1984
  @staticmethod
1985
  def _GetArchivedJobPath(job_id):
1986
    """Returns the archived job file for a give job id.
1987

1988
    @type job_id: str
1989
    @param job_id: the job identifier
1990
    @rtype: str
1991
    @return: the path to the archived job file
1992

1993
    """
1994
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1995
                          jstore.GetArchiveDirectory(job_id),
1996
                          "job-%s" % job_id)
1997

    
1998
  @staticmethod
1999
  def _DetermineJobDirectories(archived):
2000
    """Build list of directories containing job files.
2001

2002
    @type archived: bool
2003
    @param archived: Whether to include directories for archived jobs
2004
    @rtype: list
2005

2006
    """
2007
    result = [pathutils.QUEUE_DIR]
2008

    
2009
    if archived:
2010
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
2011
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
2012
                        utils.ListVisibleFiles(archive_path)))
2013

    
2014
    return result
2015

    
2016
  @classmethod
2017
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
2018
    """Return all known job IDs.
2019

2020
    The method only looks at disk because it's a requirement that all
2021
    jobs are present on disk (so in the _memcache we don't have any
2022
    extra IDs).
2023

2024
    @type sort: boolean
2025
    @param sort: perform sorting on the returned job ids
2026
    @rtype: list
2027
    @return: the list of job IDs
2028

2029
    """
2030
    jlist = []
2031

    
2032
    for path in cls._DetermineJobDirectories(archived):
2033
      for filename in utils.ListVisibleFiles(path):
2034
        m = constants.JOB_FILE_RE.match(filename)
2035
        if m:
2036
          jlist.append(int(m.group(1)))
2037

    
2038
    if sort:
2039
      jlist.sort()
2040
    return jlist
2041

    
2042
  def _LoadJobUnlocked(self, job_id):
2043
    """Loads a job from the disk or memory.
2044

2045
    Given a job id, this will return the cached job object if
2046
    existing, or try to load the job from the disk. If loading from
2047
    disk, it will also add the job to the cache.
2048

2049
    @type job_id: int
2050
    @param job_id: the job id
2051
    @rtype: L{_QueuedJob} or None
2052
    @return: either None or the job object
2053

2054
    """
2055
    assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!"
2056

    
2057
    job = self._memcache.get(job_id, None)
2058
    if job:
2059
      logging.debug("Found job %s in memcache", job_id)
2060
      assert job.writable, "Found read-only job in memcache"
2061
      return job
2062

    
2063
    try:
2064
      job = self._LoadJobFromDisk(job_id, False)
2065
      if job is None:
2066
        return job
2067
    except errors.JobFileCorrupted:
2068
      old_path = self._GetJobPath(job_id)
2069
      new_path = self._GetArchivedJobPath(job_id)
2070
      if old_path == new_path:
2071
        # job already archived (future case)
2072
        logging.exception("Can't parse job %s", job_id)
2073
      else:
2074
        # non-archived case
2075
        logging.exception("Can't parse job %s, will archive.", job_id)
2076
        self._RenameFilesUnlocked([(old_path, new_path)])
2077
      return None
2078

    
2079
    assert job.writable, "Job just loaded is not writable"
2080

    
2081
    self._memcache[job_id] = job
2082
    logging.debug("Added job %s to the cache", job_id)
2083
    return job
2084

    
2085
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2086
    """Load the given job file from disk.
2087

2088
    Given a job file, read, load and restore it in a _QueuedJob format.
2089

2090
    @type job_id: int
2091
    @param job_id: job identifier
2092
    @type try_archived: bool
2093
    @param try_archived: Whether to try loading an archived job
2094
    @rtype: L{_QueuedJob} or None
2095
    @return: either None or the job object
2096

2097
    """
2098
    path_functions = [(self._GetJobPath, False)]
2099

    
2100
    if try_archived:
2101
      path_functions.append((self._GetArchivedJobPath, True))
2102

    
2103
    raw_data = None
2104
    archived = None
2105

    
2106
    for (fn, archived) in path_functions:
2107
      filepath = fn(job_id)
2108
      logging.debug("Loading job from %s", filepath)
2109
      try:
2110
        raw_data = utils.ReadFile(filepath)
2111
      except EnvironmentError, err:
2112
        if err.errno != errno.ENOENT:
2113
          raise
2114
      else:
2115
        break
2116

    
2117
    if not raw_data:
2118
      return None
2119

    
2120
    if writable is None:
2121
      writable = not archived
2122

    
2123
    try:
2124
      data = serializer.LoadJson(raw_data)
2125
      job = _QueuedJob.Restore(self, data, writable, archived)
2126
    except Exception, err: # pylint: disable=W0703
2127
      raise errors.JobFileCorrupted(err)
2128

    
2129
    return job
2130

    
2131
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2132
    """Load the given job file from disk.
2133

2134
    Given a job file, read, load and restore it in a _QueuedJob format.
2135
    In case of error reading the job, it gets returned as None, and the
2136
    exception is logged.
2137

2138
    @type job_id: int
2139
    @param job_id: job identifier
2140
    @type try_archived: bool
2141
    @param try_archived: Whether to try loading an archived job
2142
    @rtype: L{_QueuedJob} or None
2143
    @return: either None or the job object
2144

2145
    """
2146
    try:
2147
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2148
    except (errors.JobFileCorrupted, EnvironmentError):
2149
      logging.exception("Can't load/parse job %s", job_id)
2150
      return None
2151

    
2152
  def _UpdateQueueSizeUnlocked(self):
2153
    """Update the queue size.
2154

2155
    """
2156
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2157

    
2158
  @locking.ssynchronized(_LOCK)
2159
  @_RequireOpenQueue
2160
  def SetDrainFlag(self, drain_flag):
2161
    """Sets the drain flag for the queue.
2162

2163
    @type drain_flag: boolean
2164
    @param drain_flag: Whether to set or unset the drain flag
2165

2166
    """
2167
    # Change flag locally
2168
    jstore.SetDrainFlag(drain_flag)
2169

    
2170
    self._drained = drain_flag
2171

    
2172
    # ... and on all nodes
2173
    (names, addrs) = self._GetNodeIp()
2174
    result = \
2175
      self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2176
    self._CheckRpcResult(result, self._nodes,
2177
                         "Setting queue drain flag to %s" % drain_flag)
2178

    
2179
    return True
2180

    
2181
  @_RequireOpenQueue
2182
  def _SubmitJobUnlocked(self, job_id, ops):
2183
    """Create and store a new job.
2184

2185
    This enters the job into our job queue and also puts it on the new
2186
    queue, in order for it to be picked up by the queue processors.
2187

2188
    @type job_id: job ID
2189
    @param job_id: the job ID for the new job
2190
    @type ops: list
2191
    @param ops: The list of OpCodes that will become the new job.
2192
    @rtype: L{_QueuedJob}
2193
    @return: the job object to be queued
2194
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2195
    @raise errors.GenericError: If an opcode is not valid
2196

2197
    """
2198
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2199
      raise errors.JobQueueFull()
2200

    
2201
    job = _QueuedJob(self, job_id, ops, True)
2202

    
2203
    for idx, op in enumerate(job.ops):
2204
      # Check priority
2205
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2206
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2207
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2208
                                  " are %s" % (idx, op.priority, allowed))
2209

    
2210
      # Check job dependencies
2211
      dependencies = getattr(op.input, opcodes_base.DEPEND_ATTR, None)
2212
      if not opcodes_base.TNoRelativeJobDependencies(dependencies):
2213
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2214
                                  " match %s: %s" %
2215
                                  (idx, opcodes_base.TNoRelativeJobDependencies,
2216
                                   dependencies))
2217

    
2218
    # Write to disk
2219
    self.UpdateJobUnlocked(job)
2220

    
2221
    self._queue_size += 1
2222

    
2223
    logging.debug("Adding new job %s to the cache", job_id)
2224
    self._memcache[job_id] = job
2225

    
2226
    return job
2227

    
2228
  @locking.ssynchronized(_LOCK)
2229
  @_RequireOpenQueue
2230
  @_RequireNonDrainedQueue
2231
  def SubmitJob(self, ops):
2232
    """Create and store a new job.
2233

2234
    @see: L{_SubmitJobUnlocked}
2235

2236
    """
2237
    (job_id, ) = self._NewSerialsUnlocked(1)
2238
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2239
    return job_id
2240

    
2241
  @locking.ssynchronized(_LOCK)
2242
  @_RequireOpenQueue
2243
  def SubmitJobToDrainedQueue(self, ops):
2244
    """Forcefully create and store a new job.
2245

2246
    Do so, even if the job queue is drained.
2247
    @see: L{_SubmitJobUnlocked}
2248

2249
    """
2250
    (job_id, ) = self._NewSerialsUnlocked(1)
2251
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2252
    return job_id
2253

    
2254
  @locking.ssynchronized(_LOCK)
2255
  @_RequireOpenQueue
2256
  @_RequireNonDrainedQueue
2257
  def SubmitManyJobs(self, jobs):
2258
    """Create and store multiple jobs.
2259

2260
    @see: L{_SubmitJobUnlocked}
2261

2262
    """
2263
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2264

    
2265
    (results, added_jobs) = \
2266
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2267

    
2268
    self._EnqueueJobsUnlocked(added_jobs)
2269

    
2270
    return results
2271

    
2272
  @staticmethod
2273
  def _FormatSubmitError(msg, ops):
2274
    """Formats errors which occurred while submitting a job.
2275

2276
    """
2277
    return ("%s; opcodes %s" %
2278
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2279

    
2280
  @staticmethod
2281
  def _ResolveJobDependencies(resolve_fn, deps):
2282
    """Resolves relative job IDs in dependencies.
2283

2284
    @type resolve_fn: callable
2285
    @param resolve_fn: Function to resolve a relative job ID
2286
    @type deps: list
2287
    @param deps: Dependencies
2288
    @rtype: tuple; (boolean, string or list)
2289
    @return: If successful (first tuple item), the returned list contains
2290
      resolved job IDs along with the requested status; if not successful,
2291
      the second element is an error message
2292

2293
    """
2294
    result = []
2295

    
2296
    for (dep_job_id, dep_status) in deps:
2297
      if ht.TRelativeJobId(dep_job_id):
2298
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2299
        try:
2300
          job_id = resolve_fn(dep_job_id)
2301
        except IndexError:
2302
          # Abort
2303
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2304
      else:
2305
        job_id = dep_job_id
2306

    
2307
      result.append((job_id, dep_status))
2308

    
2309
    return (True, result)
2310

    
2311
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2312
    """Create and store multiple jobs.
2313

2314
    @see: L{_SubmitJobUnlocked}
2315

2316
    """
2317
    results = []
2318
    added_jobs = []
2319

    
2320
    def resolve_fn(job_idx, reljobid):
2321
      assert reljobid < 0
2322
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2323

    
2324
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2325
      for op in ops:
2326
        if getattr(op, opcodes_base.DEPEND_ATTR, None):
2327
          (status, data) = \
2328
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2329
                                         op.depends)
2330
          if not status:
2331
            # Abort resolving dependencies
2332
            assert ht.TNonEmptyString(data), "No error message"
2333
            break
2334
          # Use resolved dependencies
2335
          op.depends = data
2336
      else:
2337
        try:
2338
          job = self._SubmitJobUnlocked(job_id, ops)
2339
        except errors.GenericError, err:
2340
          status = False
2341
          data = self._FormatSubmitError(str(err), ops)
2342
        else:
2343
          status = True
2344
          data = job_id
2345
          added_jobs.append(job)
2346

    
2347
      results.append((status, data))
2348

    
2349
    return (results, added_jobs)
2350

    
2351
  @locking.ssynchronized(_LOCK)
2352
  def _EnqueueJobs(self, jobs):
2353
    """Helper function to add jobs to worker pool's queue.
2354

2355
    @type jobs: list
2356
    @param jobs: List of all jobs
2357

2358
    """
2359
    return self._EnqueueJobsUnlocked(jobs)
2360

    
2361
  def _EnqueueJobsUnlocked(self, jobs):
2362
    """Helper function to add jobs to worker pool's queue.
2363

2364
    @type jobs: list
2365
    @param jobs: List of all jobs
2366

2367
    """
2368
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2369
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2370
                             priority=[job.CalcPriority() for job in jobs],
2371
                             task_id=map(_GetIdAttr, jobs))
2372

    
2373
  def _GetJobStatusForDependencies(self, job_id):
2374
    """Gets the status of a job for dependencies.
2375

2376
    @type job_id: int
2377
    @param job_id: Job ID
2378
    @raise errors.JobLost: If job can't be found
2379

2380
    """
2381
    # Not using in-memory cache as doing so would require an exclusive lock
2382

    
2383
    # Try to load from disk
2384
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2385

    
2386
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2387

    
2388
    if job:
2389
      return job.CalcStatus()
2390

    
2391
    raise errors.JobLost("Job %s not found" % job_id)
2392

    
2393
  @_RequireOpenQueue
2394
  def UpdateJobUnlocked(self, job, replicate=True):
2395
    """Update a job's on disk storage.
2396

2397
    After a job has been modified, this function needs to be called in
2398
    order to write the changes to disk and replicate them to the other
2399
    nodes.
2400

2401
    @type job: L{_QueuedJob}
2402
    @param job: the changed job
2403
    @type replicate: boolean
2404
    @param replicate: whether to replicate the change to remote nodes
2405

2406
    """
2407
    if __debug__:
2408
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2409
      assert (finalized ^ (job.end_timestamp is None))
2410
      assert job.writable, "Can't update read-only job"
2411
      assert not job.archived, "Can't update archived job"
2412

    
2413
    filename = self._GetJobPath(job.id)
2414
    data = serializer.DumpJson(job.Serialize())
2415
    logging.debug("Writing job %s to %s", job.id, filename)
2416
    self._UpdateJobQueueFile(filename, data, replicate)
2417

    
2418
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2419
                        timeout):
2420
    """Waits for changes in a job.
2421

2422
    @type job_id: int
2423
    @param job_id: Job identifier
2424
    @type fields: list of strings
2425
    @param fields: Which fields to check for changes
2426
    @type prev_job_info: list or None
2427
    @param prev_job_info: Last job information returned
2428
    @type prev_log_serial: int
2429
    @param prev_log_serial: Last job message serial number
2430
    @type timeout: float
2431
    @param timeout: maximum time to wait in seconds
2432
    @rtype: tuple (job info, log entries)
2433
    @return: a tuple of the job information as required via
2434
        the fields parameter, and the log entries as a list
2435

2436
        if the job has not changed and the timeout has expired,
2437
        we instead return a special value,
2438
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2439
        as such by the clients
2440

2441
    """
2442
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2443
                             writable=False)
2444

    
2445
    helper = _WaitForJobChangesHelper()
2446

    
2447
    return helper(self._GetJobPath(job_id), load_fn,
2448
                  fields, prev_job_info, prev_log_serial, timeout)
2449

    
2450
  @locking.ssynchronized(_LOCK)
2451
  @_RequireOpenQueue
2452
  def CancelJob(self, job_id):
2453
    """Cancels a job.
2454

2455
    This will only succeed if the job has not started yet.
2456

2457
    @type job_id: int
2458
    @param job_id: job ID of job to be cancelled.
2459

2460
    """
2461
    logging.info("Cancelling job %s", job_id)
2462

    
2463
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2464

    
2465
  @locking.ssynchronized(_LOCK)
2466
  @_RequireOpenQueue
2467
  def ChangeJobPriority(self, job_id, priority):
2468
    """Changes a job's priority.
2469

2470
    @type job_id: int
2471
    @param job_id: ID of the job whose priority should be changed
2472
    @type priority: int
2473
    @param priority: New priority
2474

2475
    """
2476
    logging.info("Changing priority of job %s to %s", job_id, priority)
2477

    
2478
    if priority not in constants.OP_PRIO_SUBMIT_VALID:
2479
      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2480
      raise errors.GenericError("Invalid priority %s, allowed are %s" %
2481
                                (priority, allowed))
2482

    
2483
    def fn(job):
2484
      (success, msg) = job.ChangePriority(priority)
2485

    
2486
      if success:
2487
        try:
2488
          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2489
        except workerpool.NoSuchTask:
2490
          logging.debug("Job %s is not in workerpool at this time", job.id)
2491

    
2492
      return (success, msg)
2493

    
2494
    return self._ModifyJobUnlocked(job_id, fn)
2495

    
2496
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2497
    """Modifies a job.
2498

2499
    @type job_id: int
2500
    @param job_id: Job ID
2501
    @type mod_fn: callable
2502
    @param mod_fn: Modifying function, receiving job object as parameter,
2503
      returning tuple of (status boolean, message string)
2504

2505
    """
2506
    job = self._LoadJobUnlocked(job_id)
2507
    if not job:
2508
      logging.debug("Job %s not found", job_id)
2509
      return (False, "Job %s not found" % job_id)
2510

    
2511
    assert job.writable, "Can't modify read-only job"
2512
    assert not job.archived, "Can't modify archived job"
2513

    
2514
    (success, msg) = mod_fn(job)
2515

    
2516
    if success:
2517
      # If the job was finalized (e.g. cancelled), this is the final write
2518
      # allowed. The job can be archived anytime.
2519
      self.UpdateJobUnlocked(job)
2520

    
2521
    return (success, msg)
2522

    
2523
  @_RequireOpenQueue
2524
  def _ArchiveJobsUnlocked(self, jobs):
2525
    """Archives jobs.
2526

2527
    @type jobs: list of L{_QueuedJob}
2528
    @param jobs: Job objects
2529
    @rtype: int
2530
    @return: Number of archived jobs
2531

2532
    """
2533
    archive_jobs = []
2534
    rename_files = []
2535
    for job in jobs:
2536
      assert job.writable, "Can't archive read-only job"
2537
      assert not job.archived, "Can't cancel archived job"
2538

    
2539
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2540
        logging.debug("Job %s is not yet done", job.id)
2541
        continue
2542

    
2543
      archive_jobs.append(job)
2544

    
2545
      old = self._GetJobPath(job.id)
2546
      new = self._GetArchivedJobPath(job.id)
2547
      rename_files.append((old, new))
2548

    
2549
    # TODO: What if 1..n files fail to rename?
2550
    self._RenameFilesUnlocked(rename_files)
2551

    
2552
    logging.debug("Successfully archived job(s) %s",
2553
                  utils.CommaJoin(job.id for job in archive_jobs))
2554

    
2555
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2556
    # the files, we update the cached queue size from the filesystem. When we
2557
    # get around to fix the TODO: above, we can use the number of actually
2558
    # archived jobs to fix this.
2559
    self._UpdateQueueSizeUnlocked()
2560
    return len(archive_jobs)
2561

    
2562
  @locking.ssynchronized(_LOCK)
2563
  @_RequireOpenQueue
2564
  def ArchiveJob(self, job_id):
2565
    """Archives a job.
2566

2567
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2568

2569
    @type job_id: int
2570
    @param job_id: Job ID of job to be archived.
2571
    @rtype: bool
2572
    @return: Whether job was archived
2573

2574
    """
2575
    logging.info("Archiving job %s", job_id)
2576

    
2577
    job = self._LoadJobUnlocked(job_id)
2578
    if not job:
2579
      logging.debug("Job %s not found", job_id)
2580
      return False
2581

    
2582
    return self._ArchiveJobsUnlocked([job]) == 1
2583

    
2584
  @locking.ssynchronized(_LOCK)
2585
  @_RequireOpenQueue
2586
  def AutoArchiveJobs(self, age, timeout):
2587
    """Archives all jobs based on age.
2588

2589
    The method will archive all jobs which are older than the age
2590
    parameter. For jobs that don't have an end timestamp, the start
2591
    timestamp will be considered. The special '-1' age will cause
2592
    archival of all jobs (that are not running or queued).
2593

2594
    @type age: int
2595
    @param age: the minimum age in seconds
2596

2597
    """
2598
    logging.info("Archiving jobs with age more than %s seconds", age)
2599

    
2600
    now = time.time()
2601
    end_time = now + timeout
2602
    archived_count = 0
2603
    last_touched = 0
2604

    
2605
    all_job_ids = self._GetJobIDsUnlocked()
2606
    pending = []
2607
    for idx, job_id in enumerate(all_job_ids):
2608
      last_touched = idx + 1
2609

    
2610
      # Not optimal because jobs could be pending
2611
      # TODO: Measure average duration for job archival and take number of
2612
      # pending jobs into account.
2613
      if time.time() > end_time:
2614
        break
2615

    
2616
      # Returns None if the job failed to load
2617
      job = self._LoadJobUnlocked(job_id)
2618
      if job:
2619
        if job.end_timestamp is None:
2620
          if job.start_timestamp is None:
2621
            job_age = job.received_timestamp
2622
          else:
2623
            job_age = job.start_timestamp
2624
        else:
2625
          job_age = job.end_timestamp
2626

    
2627
        if age == -1 or now - job_age[0] > age:
2628
          pending.append(job)
2629

    
2630
          # Archive 10 jobs at a time
2631
          if len(pending) >= 10:
2632
            archived_count += self._ArchiveJobsUnlocked(pending)
2633
            pending = []
2634

    
2635
    if pending:
2636
      archived_count += self._ArchiveJobsUnlocked(pending)
2637

    
2638
    return (archived_count, len(all_job_ids) - last_touched)
2639

    
2640
  def _Query(self, fields, qfilter):
2641
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2642
                       namefield="id")
2643

    
2644
    # Archived jobs are only looked at if the "archived" field is referenced
2645
    # either as a requested field or in the filter. By default archived jobs
2646
    # are ignored.
2647
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2648

    
2649
    job_ids = qobj.RequestedNames()
2650

    
2651
    list_all = (job_ids is None)
2652

    
2653
    if list_all:
2654
      # Since files are added to/removed from the queue atomically, there's no
2655
      # risk of getting the job ids in an inconsistent state.
2656
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2657

    
2658
    jobs = []
2659

    
2660
    for job_id in job_ids:
2661
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2662
      if job is not None or not list_all:
2663
        jobs.append((job_id, job))
2664

    
2665
    return (qobj, jobs, list_all)
2666

    
2667
  def QueryJobs(self, fields, qfilter):
2668
    """Returns a list of jobs in queue.
2669

2670
    @type fields: sequence
2671
    @param fields: List of wanted fields
2672
    @type qfilter: None or query2 filter (list)
2673
    @param qfilter: Query filter
2674

2675
    """
2676
    (qobj, ctx, _) = self._Query(fields, qfilter)
2677

    
2678
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2679

    
2680
  def OldStyleQueryJobs(self, job_ids, fields):
2681
    """Returns a list of jobs in queue.
2682

2683
    @type job_ids: list
2684
    @param job_ids: sequence of job identifiers or None for all
2685
    @type fields: list
2686
    @param fields: names of fields to return
2687
    @rtype: list
2688
    @return: list one element per job, each element being list with
2689
        the requested fields
2690

2691
    """
2692
    # backwards compat:
2693
    job_ids = [int(jid) for jid in job_ids]
2694
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2695

    
2696
    (qobj, ctx, _) = self._Query(fields, qfilter)
2697

    
2698
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2699

    
2700
  @locking.ssynchronized(_LOCK)
2701
  def PrepareShutdown(self):
2702
    """Prepare to stop the job queue.
2703

2704
    Disables execution of jobs in the workerpool and returns whether there are
2705
    any jobs currently running. If the latter is the case, the job queue is not
2706
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2707
    be called without interfering with any job. Queued and unfinished jobs will
2708
    be resumed next time.
2709

2710
    Once this function has been called no new job submissions will be accepted
2711
    (see L{_RequireNonDrainedQueue}).
2712

2713
    @rtype: bool
2714
    @return: Whether there are any running jobs
2715

2716
    """
2717
    if self._accepting_jobs:
2718
      self._accepting_jobs = False
2719

    
2720
      # Tell worker pool to stop processing pending tasks
2721
      self._wpool.SetActive(False)
2722

    
2723
    return self._wpool.HasRunningTasks()
2724

    
2725
  def AcceptingJobsUnlocked(self):
2726
    """Returns whether jobs are accepted.
2727

2728
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2729
    queue is shutting down.
2730

2731
    @rtype: bool
2732

2733
    """
2734
    return self._accepting_jobs
2735

    
2736
  @locking.ssynchronized(_LOCK)
2737
  @_RequireOpenQueue
2738
  def Shutdown(self):
2739
    """Stops the job queue.
2740

2741
    This shutdowns all the worker threads an closes the queue.
2742

2743
    """
2744
    self._wpool.TerminateWorkers()
2745

    
2746
    self._queue_filelock.Close()
2747
    self._queue_filelock = None