Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ fa0192b2

History | View | Annotate | Download (80.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import time
35
import weakref
36
import threading
37
import itertools
38
import operator
39

    
40
try:
41
  # pylint: disable=E0611
42
  from pyinotify import pyinotify
43
except ImportError:
44
  import pyinotify
45

    
46
from ganeti import asyncnotifier
47
from ganeti import constants
48
from ganeti import serializer
49
from ganeti import workerpool
50
from ganeti import locking
51
from ganeti import opcodes
52
from ganeti import errors
53
from ganeti import mcpu
54
from ganeti import utils
55
from ganeti import jstore
56
from ganeti import rpc
57
from ganeti import runtime
58
from ganeti import netutils
59
from ganeti import compat
60
from ganeti import ht
61
from ganeti import query
62
from ganeti import qlang
63
from ganeti import pathutils
64
from ganeti import vcluster
65

    
66

    
67
JOBQUEUE_THREADS = 25
68

    
69
# member lock names to be passed to @ssynchronized decorator
70
_LOCK = "_lock"
71
_QUEUE = "_queue"
72

    
73
#: Retrieves "id" attribute
74
_GetIdAttr = operator.attrgetter("id")
75

    
76

    
77
class CancelJob(Exception):
78
  """Special exception to cancel a job.
79

80
  """
81

    
82

    
83
class QueueShutdown(Exception):
84
  """Special exception to abort a job when the job queue is shutting down.
85

86
  """
87

    
88

    
89
def TimeStampNow():
90
  """Returns the current timestamp.
91

92
  @rtype: tuple
93
  @return: the current time in the (seconds, microseconds) format
94

95
  """
96
  return utils.SplitTime(time.time())
97

    
98

    
99
def _CallJqUpdate(runner, names, file_name, content):
100
  """Updates job queue file after virtualizing filename.
101

102
  """
103
  virt_file_name = vcluster.MakeVirtualPath(file_name)
104
  return runner.call_jobqueue_update(names, virt_file_name, content)
105

    
106

    
107
class _SimpleJobQuery:
108
  """Wrapper for job queries.
109

110
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
111

112
  """
113
  def __init__(self, fields):
114
    """Initializes this class.
115

116
    """
117
    self._query = query.Query(query.JOB_FIELDS, fields)
118

    
119
  def __call__(self, job):
120
    """Executes a job query using cached field list.
121

122
    """
123
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
124

    
125

    
126
class _QueuedOpCode(object):
127
  """Encapsulates an opcode object.
128

129
  @ivar log: holds the execution log and consists of tuples
130
  of the form C{(log_serial, timestamp, level, message)}
131
  @ivar input: the OpCode we encapsulate
132
  @ivar status: the current status
133
  @ivar result: the result of the LU execution
134
  @ivar start_timestamp: timestamp for the start of the execution
135
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
136
  @ivar stop_timestamp: timestamp for the end of the execution
137

138
  """
139
  __slots__ = ["input", "status", "result", "log", "priority",
140
               "start_timestamp", "exec_timestamp", "end_timestamp",
141
               "__weakref__"]
142

    
143
  def __init__(self, op):
144
    """Initializes instances of this class.
145

146
    @type op: L{opcodes.OpCode}
147
    @param op: the opcode we encapsulate
148

149
    """
150
    self.input = op
151
    self.status = constants.OP_STATUS_QUEUED
152
    self.result = None
153
    self.log = []
154
    self.start_timestamp = None
155
    self.exec_timestamp = None
156
    self.end_timestamp = None
157

    
158
    # Get initial priority (it might change during the lifetime of this opcode)
159
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
160

    
161
  @classmethod
162
  def Restore(cls, state):
163
    """Restore the _QueuedOpCode from the serialized form.
164

165
    @type state: dict
166
    @param state: the serialized state
167
    @rtype: _QueuedOpCode
168
    @return: a new _QueuedOpCode instance
169

170
    """
171
    obj = _QueuedOpCode.__new__(cls)
172
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
173
    obj.status = state["status"]
174
    obj.result = state["result"]
175
    obj.log = state["log"]
176
    obj.start_timestamp = state.get("start_timestamp", None)
177
    obj.exec_timestamp = state.get("exec_timestamp", None)
178
    obj.end_timestamp = state.get("end_timestamp", None)
179
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
180
    return obj
181

    
182
  def Serialize(self):
183
    """Serializes this _QueuedOpCode.
184

185
    @rtype: dict
186
    @return: the dictionary holding the serialized state
187

188
    """
189
    return {
190
      "input": self.input.__getstate__(),
191
      "status": self.status,
192
      "result": self.result,
193
      "log": self.log,
194
      "start_timestamp": self.start_timestamp,
195
      "exec_timestamp": self.exec_timestamp,
196
      "end_timestamp": self.end_timestamp,
197
      "priority": self.priority,
198
      }
199

    
200

    
201
class _QueuedJob(object):
202
  """In-memory job representation.
203

204
  This is what we use to track the user-submitted jobs. Locking must
205
  be taken care of by users of this class.
206

207
  @type queue: L{JobQueue}
208
  @ivar queue: the parent queue
209
  @ivar id: the job ID
210
  @type ops: list
211
  @ivar ops: the list of _QueuedOpCode that constitute the job
212
  @type log_serial: int
213
  @ivar log_serial: holds the index for the next log entry
214
  @ivar received_timestamp: the timestamp for when the job was received
215
  @ivar start_timestmap: the timestamp for start of execution
216
  @ivar end_timestamp: the timestamp for end of execution
217
  @ivar writable: Whether the job is allowed to be modified
218

219
  """
220
  # pylint: disable=W0212
221
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
222
               "received_timestamp", "start_timestamp", "end_timestamp",
223
               "__weakref__", "processor_lock", "writable", "archived"]
224

    
225
  def _AddReasons(self):
226
    """Extend the reason trail
227

228
    Add the reason for all the opcodes of this job to be executed.
229

230
    """
231
    count = 0
232
    for queued_op in self.ops:
233
      op = queued_op.input
234
      reason_src = opcodes.NameToReasonSrc(op.__class__.__name__)
235
      reason_text = "job=%d;index=%d" % (self.id, count)
236
      reason = getattr(op, "reason", [])
237
      reason.append((reason_src, reason_text, utils.EpochNano()))
238
      op.reason = reason
239
      count = count + 1
240

    
241
  def __init__(self, queue, job_id, ops, writable):
242
    """Constructor for the _QueuedJob.
243

244
    @type queue: L{JobQueue}
245
    @param queue: our parent queue
246
    @type job_id: job_id
247
    @param job_id: our job id
248
    @type ops: list
249
    @param ops: the list of opcodes we hold, which will be encapsulated
250
        in _QueuedOpCodes
251
    @type writable: bool
252
    @param writable: Whether job can be modified
253

254
    """
255
    if not ops:
256
      raise errors.GenericError("A job needs at least one opcode")
257

    
258
    self.queue = queue
259
    self.id = int(job_id)
260
    self.ops = [_QueuedOpCode(op) for op in ops]
261
    self._AddReasons()
262
    self.log_serial = 0
263
    self.received_timestamp = TimeStampNow()
264
    self.start_timestamp = None
265
    self.end_timestamp = None
266
    self.archived = False
267

    
268
    self._InitInMemory(self, writable)
269

    
270
    assert not self.archived, "New jobs can not be marked as archived"
271

    
272
  @staticmethod
273
  def _InitInMemory(obj, writable):
274
    """Initializes in-memory variables.
275

276
    """
277
    obj.writable = writable
278
    obj.ops_iter = None
279
    obj.cur_opctx = None
280

    
281
    # Read-only jobs are not processed and therefore don't need a lock
282
    if writable:
283
      obj.processor_lock = threading.Lock()
284
    else:
285
      obj.processor_lock = None
286

    
287
  def __repr__(self):
288
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
289
              "id=%s" % self.id,
290
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
291

    
292
    return "<%s at %#x>" % (" ".join(status), id(self))
293

    
294
  @classmethod
295
  def Restore(cls, queue, state, writable, archived):
296
    """Restore a _QueuedJob from serialized state:
297

298
    @type queue: L{JobQueue}
299
    @param queue: to which queue the restored job belongs
300
    @type state: dict
301
    @param state: the serialized state
302
    @type writable: bool
303
    @param writable: Whether job can be modified
304
    @type archived: bool
305
    @param archived: Whether job was already archived
306
    @rtype: _JobQueue
307
    @return: the restored _JobQueue instance
308

309
    """
310
    obj = _QueuedJob.__new__(cls)
311
    obj.queue = queue
312
    obj.id = int(state["id"])
313
    obj.received_timestamp = state.get("received_timestamp", None)
314
    obj.start_timestamp = state.get("start_timestamp", None)
315
    obj.end_timestamp = state.get("end_timestamp", None)
316
    obj.archived = archived
317

    
318
    obj.ops = []
319
    obj.log_serial = 0
320
    for op_state in state["ops"]:
321
      op = _QueuedOpCode.Restore(op_state)
322
      for log_entry in op.log:
323
        obj.log_serial = max(obj.log_serial, log_entry[0])
324
      obj.ops.append(op)
325

    
326
    cls._InitInMemory(obj, writable)
327

    
328
    return obj
329

    
330
  def Serialize(self):
331
    """Serialize the _JobQueue instance.
332

333
    @rtype: dict
334
    @return: the serialized state
335

336
    """
337
    return {
338
      "id": self.id,
339
      "ops": [op.Serialize() for op in self.ops],
340
      "start_timestamp": self.start_timestamp,
341
      "end_timestamp": self.end_timestamp,
342
      "received_timestamp": self.received_timestamp,
343
      }
344

    
345
  def CalcStatus(self):
346
    """Compute the status of this job.
347

348
    This function iterates over all the _QueuedOpCodes in the job and
349
    based on their status, computes the job status.
350

351
    The algorithm is:
352
      - if we find a cancelled, or finished with error, the job
353
        status will be the same
354
      - otherwise, the last opcode with the status one of:
355
          - waitlock
356
          - canceling
357
          - running
358

359
        will determine the job status
360

361
      - otherwise, it means either all opcodes are queued, or success,
362
        and the job status will be the same
363

364
    @return: the job status
365

366
    """
367
    status = constants.JOB_STATUS_QUEUED
368

    
369
    all_success = True
370
    for op in self.ops:
371
      if op.status == constants.OP_STATUS_SUCCESS:
372
        continue
373

    
374
      all_success = False
375

    
376
      if op.status == constants.OP_STATUS_QUEUED:
377
        pass
378
      elif op.status == constants.OP_STATUS_WAITING:
379
        status = constants.JOB_STATUS_WAITING
380
      elif op.status == constants.OP_STATUS_RUNNING:
381
        status = constants.JOB_STATUS_RUNNING
382
      elif op.status == constants.OP_STATUS_CANCELING:
383
        status = constants.JOB_STATUS_CANCELING
384
        break
385
      elif op.status == constants.OP_STATUS_ERROR:
386
        status = constants.JOB_STATUS_ERROR
387
        # The whole job fails if one opcode failed
388
        break
389
      elif op.status == constants.OP_STATUS_CANCELED:
390
        status = constants.OP_STATUS_CANCELED
391
        break
392

    
393
    if all_success:
394
      status = constants.JOB_STATUS_SUCCESS
395

    
396
    return status
397

    
398
  def CalcPriority(self):
399
    """Gets the current priority for this job.
400

401
    Only unfinished opcodes are considered. When all are done, the default
402
    priority is used.
403

404
    @rtype: int
405

406
    """
407
    priorities = [op.priority for op in self.ops
408
                  if op.status not in constants.OPS_FINALIZED]
409

    
410
    if not priorities:
411
      # All opcodes are done, assume default priority
412
      return constants.OP_PRIO_DEFAULT
413

    
414
    return min(priorities)
415

    
416
  def GetLogEntries(self, newer_than):
417
    """Selectively returns the log entries.
418

419
    @type newer_than: None or int
420
    @param newer_than: if this is None, return all log entries,
421
        otherwise return only the log entries with serial higher
422
        than this value
423
    @rtype: list
424
    @return: the list of the log entries selected
425

426
    """
427
    if newer_than is None:
428
      serial = -1
429
    else:
430
      serial = newer_than
431

    
432
    entries = []
433
    for op in self.ops:
434
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
435

    
436
    return entries
437

    
438
  def GetInfo(self, fields):
439
    """Returns information about a job.
440

441
    @type fields: list
442
    @param fields: names of fields to return
443
    @rtype: list
444
    @return: list with one element for each field
445
    @raise errors.OpExecError: when an invalid field
446
        has been passed
447

448
    """
449
    return _SimpleJobQuery(fields)(self)
450

    
451
  def MarkUnfinishedOps(self, status, result):
452
    """Mark unfinished opcodes with a given status and result.
453

454
    This is an utility function for marking all running or waiting to
455
    be run opcodes with a given status. Opcodes which are already
456
    finalised are not changed.
457

458
    @param status: a given opcode status
459
    @param result: the opcode result
460

461
    """
462
    not_marked = True
463
    for op in self.ops:
464
      if op.status in constants.OPS_FINALIZED:
465
        assert not_marked, "Finalized opcodes found after non-finalized ones"
466
        continue
467
      op.status = status
468
      op.result = result
469
      not_marked = False
470

    
471
  def Finalize(self):
472
    """Marks the job as finalized.
473

474
    """
475
    self.end_timestamp = TimeStampNow()
476

    
477
  def Cancel(self):
478
    """Marks job as canceled/-ing if possible.
479

480
    @rtype: tuple; (bool, string)
481
    @return: Boolean describing whether job was successfully canceled or marked
482
      as canceling and a text message
483

484
    """
485
    status = self.CalcStatus()
486

    
487
    if status == constants.JOB_STATUS_QUEUED:
488
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
489
                             "Job canceled by request")
490
      self.Finalize()
491
      return (True, "Job %s canceled" % self.id)
492

    
493
    elif status == constants.JOB_STATUS_WAITING:
494
      # The worker will notice the new status and cancel the job
495
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
496
      return (True, "Job %s will be canceled" % self.id)
497

    
498
    else:
499
      logging.debug("Job %s is no longer waiting in the queue", self.id)
500
      return (False, "Job %s is no longer waiting in the queue" % self.id)
501

    
502
  def ChangePriority(self, priority):
503
    """Changes the job priority.
504

505
    @type priority: int
506
    @param priority: New priority
507
    @rtype: tuple; (bool, string)
508
    @return: Boolean describing whether job's priority was successfully changed
509
      and a text message
510

511
    """
512
    status = self.CalcStatus()
513

    
514
    if status in constants.JOBS_FINALIZED:
515
      return (False, "Job %s is finished" % self.id)
516
    elif status == constants.JOB_STATUS_CANCELING:
517
      return (False, "Job %s is cancelling" % self.id)
518
    else:
519
      assert status in (constants.JOB_STATUS_QUEUED,
520
                        constants.JOB_STATUS_WAITING,
521
                        constants.JOB_STATUS_RUNNING)
522

    
523
      changed = False
524
      for op in self.ops:
525
        if (op.status == constants.OP_STATUS_RUNNING or
526
            op.status in constants.OPS_FINALIZED):
527
          assert not changed, \
528
            ("Found opcode for which priority should not be changed after"
529
             " priority has been changed for previous opcodes")
530
          continue
531

    
532
        assert op.status in (constants.OP_STATUS_QUEUED,
533
                             constants.OP_STATUS_WAITING)
534

    
535
        changed = True
536

    
537
        # Set new priority (doesn't modify opcode input)
538
        op.priority = priority
539

    
540
      if changed:
541
        return (True, ("Priorities of pending opcodes for job %s have been"
542
                       " changed to %s" % (self.id, priority)))
543
      else:
544
        return (False, "Job %s had no pending opcodes" % self.id)
545

    
546

    
547
class _OpExecCallbacks(mcpu.OpExecCbBase):
548
  def __init__(self, queue, job, op):
549
    """Initializes this class.
550

551
    @type queue: L{JobQueue}
552
    @param queue: Job queue
553
    @type job: L{_QueuedJob}
554
    @param job: Job object
555
    @type op: L{_QueuedOpCode}
556
    @param op: OpCode
557

558
    """
559
    assert queue, "Queue is missing"
560
    assert job, "Job is missing"
561
    assert op, "Opcode is missing"
562

    
563
    self._queue = queue
564
    self._job = job
565
    self._op = op
566

    
567
  def _CheckCancel(self):
568
    """Raises an exception to cancel the job if asked to.
569

570
    """
571
    # Cancel here if we were asked to
572
    if self._op.status == constants.OP_STATUS_CANCELING:
573
      logging.debug("Canceling opcode")
574
      raise CancelJob()
575

    
576
    # See if queue is shutting down
577
    if not self._queue.AcceptingJobsUnlocked():
578
      logging.debug("Queue is shutting down")
579
      raise QueueShutdown()
580

    
581
  @locking.ssynchronized(_QUEUE, shared=1)
582
  def NotifyStart(self):
583
    """Mark the opcode as running, not lock-waiting.
584

585
    This is called from the mcpu code as a notifier function, when the LU is
586
    finally about to start the Exec() method. Of course, to have end-user
587
    visible results, the opcode must be initially (before calling into
588
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
589

590
    """
591
    assert self._op in self._job.ops
592
    assert self._op.status in (constants.OP_STATUS_WAITING,
593
                               constants.OP_STATUS_CANCELING)
594

    
595
    # Cancel here if we were asked to
596
    self._CheckCancel()
597

    
598
    logging.debug("Opcode is now running")
599

    
600
    self._op.status = constants.OP_STATUS_RUNNING
601
    self._op.exec_timestamp = TimeStampNow()
602

    
603
    # And finally replicate the job status
604
    self._queue.UpdateJobUnlocked(self._job)
605

    
606
  @locking.ssynchronized(_QUEUE, shared=1)
607
  def _AppendFeedback(self, timestamp, log_type, log_msg):
608
    """Internal feedback append function, with locks
609

610
    """
611
    self._job.log_serial += 1
612
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
613
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
614

    
615
  def Feedback(self, *args):
616
    """Append a log entry.
617

618
    """
619
    assert len(args) < 3
620

    
621
    if len(args) == 1:
622
      log_type = constants.ELOG_MESSAGE
623
      log_msg = args[0]
624
    else:
625
      (log_type, log_msg) = args
626

    
627
    # The time is split to make serialization easier and not lose
628
    # precision.
629
    timestamp = utils.SplitTime(time.time())
630
    self._AppendFeedback(timestamp, log_type, log_msg)
631

    
632
  def CurrentPriority(self):
633
    """Returns current priority for opcode.
634

635
    """
636
    assert self._op.status in (constants.OP_STATUS_WAITING,
637
                               constants.OP_STATUS_CANCELING)
638

    
639
    # Cancel here if we were asked to
640
    self._CheckCancel()
641

    
642
    return self._op.priority
643

    
644
  def SubmitManyJobs(self, jobs):
645
    """Submits jobs for processing.
646

647
    See L{JobQueue.SubmitManyJobs}.
648

649
    """
650
    # Locking is done in job queue
651
    return self._queue.SubmitManyJobs(jobs)
652

    
653

    
654
class _JobChangesChecker(object):
655
  def __init__(self, fields, prev_job_info, prev_log_serial):
656
    """Initializes this class.
657

658
    @type fields: list of strings
659
    @param fields: Fields requested by LUXI client
660
    @type prev_job_info: string
661
    @param prev_job_info: previous job info, as passed by the LUXI client
662
    @type prev_log_serial: string
663
    @param prev_log_serial: previous job serial, as passed by the LUXI client
664

665
    """
666
    self._squery = _SimpleJobQuery(fields)
667
    self._prev_job_info = prev_job_info
668
    self._prev_log_serial = prev_log_serial
669

    
670
  def __call__(self, job):
671
    """Checks whether job has changed.
672

673
    @type job: L{_QueuedJob}
674
    @param job: Job object
675

676
    """
677
    assert not job.writable, "Expected read-only job"
678

    
679
    status = job.CalcStatus()
680
    job_info = self._squery(job)
681
    log_entries = job.GetLogEntries(self._prev_log_serial)
682

    
683
    # Serializing and deserializing data can cause type changes (e.g. from
684
    # tuple to list) or precision loss. We're doing it here so that we get
685
    # the same modifications as the data received from the client. Without
686
    # this, the comparison afterwards might fail without the data being
687
    # significantly different.
688
    # TODO: we just deserialized from disk, investigate how to make sure that
689
    # the job info and log entries are compatible to avoid this further step.
690
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
691
    # efficient, though floats will be tricky
692
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
693
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
694

    
695
    # Don't even try to wait if the job is no longer running, there will be
696
    # no changes.
697
    if (status not in (constants.JOB_STATUS_QUEUED,
698
                       constants.JOB_STATUS_RUNNING,
699
                       constants.JOB_STATUS_WAITING) or
700
        job_info != self._prev_job_info or
701
        (log_entries and self._prev_log_serial != log_entries[0][0])):
702
      logging.debug("Job %s changed", job.id)
703
      return (job_info, log_entries)
704

    
705
    return None
706

    
707

    
708
class _JobFileChangesWaiter(object):
709
  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
710
    """Initializes this class.
711

712
    @type filename: string
713
    @param filename: Path to job file
714
    @raises errors.InotifyError: if the notifier cannot be setup
715

716
    """
717
    self._wm = _inotify_wm_cls()
718
    self._inotify_handler = \
719
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
720
    self._notifier = \
721
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
722
    try:
723
      self._inotify_handler.enable()
724
    except Exception:
725
      # pyinotify doesn't close file descriptors automatically
726
      self._notifier.stop()
727
      raise
728

    
729
  def _OnInotify(self, notifier_enabled):
730
    """Callback for inotify.
731

732
    """
733
    if not notifier_enabled:
734
      self._inotify_handler.enable()
735

    
736
  def Wait(self, timeout):
737
    """Waits for the job file to change.
738

739
    @type timeout: float
740
    @param timeout: Timeout in seconds
741
    @return: Whether there have been events
742

743
    """
744
    assert timeout >= 0
745
    have_events = self._notifier.check_events(timeout * 1000)
746
    if have_events:
747
      self._notifier.read_events()
748
    self._notifier.process_events()
749
    return have_events
750

    
751
  def Close(self):
752
    """Closes underlying notifier and its file descriptor.
753

754
    """
755
    self._notifier.stop()
756

    
757

    
758
class _JobChangesWaiter(object):
759
  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
760
    """Initializes this class.
761

762
    @type filename: string
763
    @param filename: Path to job file
764

765
    """
766
    self._filewaiter = None
767
    self._filename = filename
768
    self._waiter_cls = _waiter_cls
769

    
770
  def Wait(self, timeout):
771
    """Waits for a job to change.
772

773
    @type timeout: float
774
    @param timeout: Timeout in seconds
775
    @return: Whether there have been events
776

777
    """
778
    if self._filewaiter:
779
      return self._filewaiter.Wait(timeout)
780

    
781
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
782
    # If this point is reached, return immediately and let caller check the job
783
    # file again in case there were changes since the last check. This avoids a
784
    # race condition.
785
    self._filewaiter = self._waiter_cls(self._filename)
786

    
787
    return True
788

    
789
  def Close(self):
790
    """Closes underlying waiter.
791

792
    """
793
    if self._filewaiter:
794
      self._filewaiter.Close()
795

    
796

    
797
class _WaitForJobChangesHelper(object):
798
  """Helper class using inotify to wait for changes in a job file.
799

800
  This class takes a previous job status and serial, and alerts the client when
801
  the current job status has changed.
802

803
  """
804
  @staticmethod
805
  def _CheckForChanges(counter, job_load_fn, check_fn):
806
    if counter.next() > 0:
807
      # If this isn't the first check the job is given some more time to change
808
      # again. This gives better performance for jobs generating many
809
      # changes/messages.
810
      time.sleep(0.1)
811

    
812
    job = job_load_fn()
813
    if not job:
814
      raise errors.JobLost()
815

    
816
    result = check_fn(job)
817
    if result is None:
818
      raise utils.RetryAgain()
819

    
820
    return result
821

    
822
  def __call__(self, filename, job_load_fn,
823
               fields, prev_job_info, prev_log_serial, timeout,
824
               _waiter_cls=_JobChangesWaiter):
825
    """Waits for changes on a job.
826

827
    @type filename: string
828
    @param filename: File on which to wait for changes
829
    @type job_load_fn: callable
830
    @param job_load_fn: Function to load job
831
    @type fields: list of strings
832
    @param fields: Which fields to check for changes
833
    @type prev_job_info: list or None
834
    @param prev_job_info: Last job information returned
835
    @type prev_log_serial: int
836
    @param prev_log_serial: Last job message serial number
837
    @type timeout: float
838
    @param timeout: maximum time to wait in seconds
839

840
    """
841
    counter = itertools.count()
842
    try:
843
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
844
      waiter = _waiter_cls(filename)
845
      try:
846
        return utils.Retry(compat.partial(self._CheckForChanges,
847
                                          counter, job_load_fn, check_fn),
848
                           utils.RETRY_REMAINING_TIME, timeout,
849
                           wait_fn=waiter.Wait)
850
      finally:
851
        waiter.Close()
852
    except errors.JobLost:
853
      return None
854
    except utils.RetryTimeout:
855
      return constants.JOB_NOTCHANGED
856

    
857

    
858
def _EncodeOpError(err):
859
  """Encodes an error which occurred while processing an opcode.
860

861
  """
862
  if isinstance(err, errors.GenericError):
863
    to_encode = err
864
  else:
865
    to_encode = errors.OpExecError(str(err))
866

    
867
  return errors.EncodeException(to_encode)
868

    
869

    
870
class _TimeoutStrategyWrapper:
871
  def __init__(self, fn):
872
    """Initializes this class.
873

874
    """
875
    self._fn = fn
876
    self._next = None
877

    
878
  def _Advance(self):
879
    """Gets the next timeout if necessary.
880

881
    """
882
    if self._next is None:
883
      self._next = self._fn()
884

    
885
  def Peek(self):
886
    """Returns the next timeout.
887

888
    """
889
    self._Advance()
890
    return self._next
891

    
892
  def Next(self):
893
    """Returns the current timeout and advances the internal state.
894

895
    """
896
    self._Advance()
897
    result = self._next
898
    self._next = None
899
    return result
900

    
901

    
902
class _OpExecContext:
903
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
904
    """Initializes this class.
905

906
    """
907
    self.op = op
908
    self.index = index
909
    self.log_prefix = log_prefix
910
    self.summary = op.input.Summary()
911

    
912
    # Create local copy to modify
913
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
914
      self.jobdeps = op.input.depends[:]
915
    else:
916
      self.jobdeps = None
917

    
918
    self._timeout_strategy_factory = timeout_strategy_factory
919
    self._ResetTimeoutStrategy()
920

    
921
  def _ResetTimeoutStrategy(self):
922
    """Creates a new timeout strategy.
923

924
    """
925
    self._timeout_strategy = \
926
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
927

    
928
  def CheckPriorityIncrease(self):
929
    """Checks whether priority can and should be increased.
930

931
    Called when locks couldn't be acquired.
932

933
    """
934
    op = self.op
935

    
936
    # Exhausted all retries and next round should not use blocking acquire
937
    # for locks?
938
    if (self._timeout_strategy.Peek() is None and
939
        op.priority > constants.OP_PRIO_HIGHEST):
940
      logging.debug("Increasing priority")
941
      op.priority -= 1
942
      self._ResetTimeoutStrategy()
943
      return True
944

    
945
    return False
946

    
947
  def GetNextLockTimeout(self):
948
    """Returns the next lock acquire timeout.
949

950
    """
951
    return self._timeout_strategy.Next()
952

    
953

    
954
class _JobProcessor(object):
955
  (DEFER,
956
   WAITDEP,
957
   FINISHED) = range(1, 4)
958

    
959
  def __init__(self, queue, opexec_fn, job,
960
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
961
    """Initializes this class.
962

963
    """
964
    self.queue = queue
965
    self.opexec_fn = opexec_fn
966
    self.job = job
967
    self._timeout_strategy_factory = _timeout_strategy_factory
968

    
969
  @staticmethod
970
  def _FindNextOpcode(job, timeout_strategy_factory):
971
    """Locates the next opcode to run.
972

973
    @type job: L{_QueuedJob}
974
    @param job: Job object
975
    @param timeout_strategy_factory: Callable to create new timeout strategy
976

977
    """
978
    # Create some sort of a cache to speed up locating next opcode for future
979
    # lookups
980
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
981
    # pending and one for processed ops.
982
    if job.ops_iter is None:
983
      job.ops_iter = enumerate(job.ops)
984

    
985
    # Find next opcode to run
986
    while True:
987
      try:
988
        (idx, op) = job.ops_iter.next()
989
      except StopIteration:
990
        raise errors.ProgrammerError("Called for a finished job")
991

    
992
      if op.status == constants.OP_STATUS_RUNNING:
993
        # Found an opcode already marked as running
994
        raise errors.ProgrammerError("Called for job marked as running")
995

    
996
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
997
                             timeout_strategy_factory)
998

    
999
      if op.status not in constants.OPS_FINALIZED:
1000
        return opctx
1001

    
1002
      # This is a job that was partially completed before master daemon
1003
      # shutdown, so it can be expected that some opcodes are already
1004
      # completed successfully (if any did error out, then the whole job
1005
      # should have been aborted and not resubmitted for processing).
1006
      logging.info("%s: opcode %s already processed, skipping",
1007
                   opctx.log_prefix, opctx.summary)
1008

    
1009
  @staticmethod
1010
  def _MarkWaitlock(job, op):
1011
    """Marks an opcode as waiting for locks.
1012

1013
    The job's start timestamp is also set if necessary.
1014

1015
    @type job: L{_QueuedJob}
1016
    @param job: Job object
1017
    @type op: L{_QueuedOpCode}
1018
    @param op: Opcode object
1019

1020
    """
1021
    assert op in job.ops
1022
    assert op.status in (constants.OP_STATUS_QUEUED,
1023
                         constants.OP_STATUS_WAITING)
1024

    
1025
    update = False
1026

    
1027
    op.result = None
1028

    
1029
    if op.status == constants.OP_STATUS_QUEUED:
1030
      op.status = constants.OP_STATUS_WAITING
1031
      update = True
1032

    
1033
    if op.start_timestamp is None:
1034
      op.start_timestamp = TimeStampNow()
1035
      update = True
1036

    
1037
    if job.start_timestamp is None:
1038
      job.start_timestamp = op.start_timestamp
1039
      update = True
1040

    
1041
    assert op.status == constants.OP_STATUS_WAITING
1042

    
1043
    return update
1044

    
1045
  @staticmethod
1046
  def _CheckDependencies(queue, job, opctx):
1047
    """Checks if an opcode has dependencies and if so, processes them.
1048

1049
    @type queue: L{JobQueue}
1050
    @param queue: Queue object
1051
    @type job: L{_QueuedJob}
1052
    @param job: Job object
1053
    @type opctx: L{_OpExecContext}
1054
    @param opctx: Opcode execution context
1055
    @rtype: bool
1056
    @return: Whether opcode will be re-scheduled by dependency tracker
1057

1058
    """
1059
    op = opctx.op
1060

    
1061
    result = False
1062

    
1063
    while opctx.jobdeps:
1064
      (dep_job_id, dep_status) = opctx.jobdeps[0]
1065

    
1066
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1067
                                                          dep_status)
1068
      assert ht.TNonEmptyString(depmsg), "No dependency message"
1069

    
1070
      logging.info("%s: %s", opctx.log_prefix, depmsg)
1071

    
1072
      if depresult == _JobDependencyManager.CONTINUE:
1073
        # Remove dependency and continue
1074
        opctx.jobdeps.pop(0)
1075

    
1076
      elif depresult == _JobDependencyManager.WAIT:
1077
        # Need to wait for notification, dependency tracker will re-add job
1078
        # to workerpool
1079
        result = True
1080
        break
1081

    
1082
      elif depresult == _JobDependencyManager.CANCEL:
1083
        # Job was cancelled, cancel this job as well
1084
        job.Cancel()
1085
        assert op.status == constants.OP_STATUS_CANCELING
1086
        break
1087

    
1088
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1089
                         _JobDependencyManager.ERROR):
1090
        # Job failed or there was an error, this job must fail
1091
        op.status = constants.OP_STATUS_ERROR
1092
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1093
        break
1094

    
1095
      else:
1096
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1097
                                     depresult)
1098

    
1099
    return result
1100

    
1101
  def _ExecOpCodeUnlocked(self, opctx):
1102
    """Processes one opcode and returns the result.
1103

1104
    """
1105
    op = opctx.op
1106

    
1107
    assert op.status in (constants.OP_STATUS_WAITING,
1108
                         constants.OP_STATUS_CANCELING)
1109

    
1110
    # The very last check if the job was cancelled before trying to execute
1111
    if op.status == constants.OP_STATUS_CANCELING:
1112
      return (constants.OP_STATUS_CANCELING, None)
1113

    
1114
    timeout = opctx.GetNextLockTimeout()
1115

    
1116
    try:
1117
      # Make sure not to hold queue lock while calling ExecOpCode
1118
      result = self.opexec_fn(op.input,
1119
                              _OpExecCallbacks(self.queue, self.job, op),
1120
                              timeout=timeout)
1121
    except mcpu.LockAcquireTimeout:
1122
      assert timeout is not None, "Received timeout for blocking acquire"
1123
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1124

    
1125
      assert op.status in (constants.OP_STATUS_WAITING,
1126
                           constants.OP_STATUS_CANCELING)
1127

    
1128
      # Was job cancelled while we were waiting for the lock?
1129
      if op.status == constants.OP_STATUS_CANCELING:
1130
        return (constants.OP_STATUS_CANCELING, None)
1131

    
1132
      # Queue is shutting down, return to queued
1133
      if not self.queue.AcceptingJobsUnlocked():
1134
        return (constants.OP_STATUS_QUEUED, None)
1135

    
1136
      # Stay in waitlock while trying to re-acquire lock
1137
      return (constants.OP_STATUS_WAITING, None)
1138
    except CancelJob:
1139
      logging.exception("%s: Canceling job", opctx.log_prefix)
1140
      assert op.status == constants.OP_STATUS_CANCELING
1141
      return (constants.OP_STATUS_CANCELING, None)
1142

    
1143
    except QueueShutdown:
1144
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1145

    
1146
      assert op.status == constants.OP_STATUS_WAITING
1147

    
1148
      # Job hadn't been started yet, so it should return to the queue
1149
      return (constants.OP_STATUS_QUEUED, None)
1150

    
1151
    except Exception, err: # pylint: disable=W0703
1152
      logging.exception("%s: Caught exception in %s",
1153
                        opctx.log_prefix, opctx.summary)
1154
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1155
    else:
1156
      logging.debug("%s: %s successful",
1157
                    opctx.log_prefix, opctx.summary)
1158
      return (constants.OP_STATUS_SUCCESS, result)
1159

    
1160
  def __call__(self, _nextop_fn=None):
1161
    """Continues execution of a job.
1162

1163
    @param _nextop_fn: Callback function for tests
1164
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1165
      be deferred and C{WAITDEP} if the dependency manager
1166
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1167

1168
    """
1169
    queue = self.queue
1170
    job = self.job
1171

    
1172
    logging.debug("Processing job %s", job.id)
1173

    
1174
    queue.acquire(shared=1)
1175
    try:
1176
      opcount = len(job.ops)
1177

    
1178
      assert job.writable, "Expected writable job"
1179

    
1180
      # Don't do anything for finalized jobs
1181
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1182
        return self.FINISHED
1183

    
1184
      # Is a previous opcode still pending?
1185
      if job.cur_opctx:
1186
        opctx = job.cur_opctx
1187
        job.cur_opctx = None
1188
      else:
1189
        if __debug__ and _nextop_fn:
1190
          _nextop_fn()
1191
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1192

    
1193
      op = opctx.op
1194

    
1195
      # Consistency check
1196
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1197
                                     constants.OP_STATUS_CANCELING)
1198
                        for i in job.ops[opctx.index + 1:])
1199

    
1200
      assert op.status in (constants.OP_STATUS_QUEUED,
1201
                           constants.OP_STATUS_WAITING,
1202
                           constants.OP_STATUS_CANCELING)
1203

    
1204
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1205
              op.priority >= constants.OP_PRIO_HIGHEST)
1206

    
1207
      waitjob = None
1208

    
1209
      if op.status != constants.OP_STATUS_CANCELING:
1210
        assert op.status in (constants.OP_STATUS_QUEUED,
1211
                             constants.OP_STATUS_WAITING)
1212

    
1213
        # Prepare to start opcode
1214
        if self._MarkWaitlock(job, op):
1215
          # Write to disk
1216
          queue.UpdateJobUnlocked(job)
1217

    
1218
        assert op.status == constants.OP_STATUS_WAITING
1219
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1220
        assert job.start_timestamp and op.start_timestamp
1221
        assert waitjob is None
1222

    
1223
        # Check if waiting for a job is necessary
1224
        waitjob = self._CheckDependencies(queue, job, opctx)
1225

    
1226
        assert op.status in (constants.OP_STATUS_WAITING,
1227
                             constants.OP_STATUS_CANCELING,
1228
                             constants.OP_STATUS_ERROR)
1229

    
1230
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1231
                                         constants.OP_STATUS_ERROR)):
1232
          logging.info("%s: opcode %s waiting for locks",
1233
                       opctx.log_prefix, opctx.summary)
1234

    
1235
          assert not opctx.jobdeps, "Not all dependencies were removed"
1236

    
1237
          queue.release()
1238
          try:
1239
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1240
          finally:
1241
            queue.acquire(shared=1)
1242

    
1243
          op.status = op_status
1244
          op.result = op_result
1245

    
1246
          assert not waitjob
1247

    
1248
        if op.status in (constants.OP_STATUS_WAITING,
1249
                         constants.OP_STATUS_QUEUED):
1250
          # waiting: Couldn't get locks in time
1251
          # queued: Queue is shutting down
1252
          assert not op.end_timestamp
1253
        else:
1254
          # Finalize opcode
1255
          op.end_timestamp = TimeStampNow()
1256

    
1257
          if op.status == constants.OP_STATUS_CANCELING:
1258
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1259
                                  for i in job.ops[opctx.index:])
1260
          else:
1261
            assert op.status in constants.OPS_FINALIZED
1262

    
1263
      if op.status == constants.OP_STATUS_QUEUED:
1264
        # Queue is shutting down
1265
        assert not waitjob
1266

    
1267
        finalize = False
1268

    
1269
        # Reset context
1270
        job.cur_opctx = None
1271

    
1272
        # In no case must the status be finalized here
1273
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1274

    
1275
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1276
        finalize = False
1277

    
1278
        if not waitjob and opctx.CheckPriorityIncrease():
1279
          # Priority was changed, need to update on-disk file
1280
          queue.UpdateJobUnlocked(job)
1281

    
1282
        # Keep around for another round
1283
        job.cur_opctx = opctx
1284

    
1285
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1286
                op.priority >= constants.OP_PRIO_HIGHEST)
1287

    
1288
        # In no case must the status be finalized here
1289
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1290

    
1291
      else:
1292
        # Ensure all opcodes so far have been successful
1293
        assert (opctx.index == 0 or
1294
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1295
                           for i in job.ops[:opctx.index]))
1296

    
1297
        # Reset context
1298
        job.cur_opctx = None
1299

    
1300
        if op.status == constants.OP_STATUS_SUCCESS:
1301
          finalize = False
1302

    
1303
        elif op.status == constants.OP_STATUS_ERROR:
1304
          # Ensure failed opcode has an exception as its result
1305
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1306

    
1307
          to_encode = errors.OpExecError("Preceding opcode failed")
1308
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1309
                                _EncodeOpError(to_encode))
1310
          finalize = True
1311

    
1312
          # Consistency check
1313
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1314
                            errors.GetEncodedError(i.result)
1315
                            for i in job.ops[opctx.index:])
1316

    
1317
        elif op.status == constants.OP_STATUS_CANCELING:
1318
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1319
                                "Job canceled by request")
1320
          finalize = True
1321

    
1322
        else:
1323
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1324

    
1325
        if opctx.index == (opcount - 1):
1326
          # Finalize on last opcode
1327
          finalize = True
1328

    
1329
        if finalize:
1330
          # All opcodes have been run, finalize job
1331
          job.Finalize()
1332

    
1333
        # Write to disk. If the job status is final, this is the final write
1334
        # allowed. Once the file has been written, it can be archived anytime.
1335
        queue.UpdateJobUnlocked(job)
1336

    
1337
        assert not waitjob
1338

    
1339
        if finalize:
1340
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1341
          return self.FINISHED
1342

    
1343
      assert not waitjob or queue.depmgr.JobWaiting(job)
1344

    
1345
      if waitjob:
1346
        return self.WAITDEP
1347
      else:
1348
        return self.DEFER
1349
    finally:
1350
      assert job.writable, "Job became read-only while being processed"
1351
      queue.release()
1352

    
1353

    
1354
def _EvaluateJobProcessorResult(depmgr, job, result):
1355
  """Looks at a result from L{_JobProcessor} for a job.
1356

1357
  To be used in a L{_JobQueueWorker}.
1358

1359
  """
1360
  if result == _JobProcessor.FINISHED:
1361
    # Notify waiting jobs
1362
    depmgr.NotifyWaiters(job.id)
1363

    
1364
  elif result == _JobProcessor.DEFER:
1365
    # Schedule again
1366
    raise workerpool.DeferTask(priority=job.CalcPriority())
1367

    
1368
  elif result == _JobProcessor.WAITDEP:
1369
    # No-op, dependency manager will re-schedule
1370
    pass
1371

    
1372
  else:
1373
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1374
                                 (result, ))
1375

    
1376

    
1377
class _JobQueueWorker(workerpool.BaseWorker):
1378
  """The actual job workers.
1379

1380
  """
1381
  def RunTask(self, job): # pylint: disable=W0221
1382
    """Job executor.
1383

1384
    @type job: L{_QueuedJob}
1385
    @param job: the job to be processed
1386

1387
    """
1388
    assert job.writable, "Expected writable job"
1389

    
1390
    # Ensure only one worker is active on a single job. If a job registers for
1391
    # a dependency job, and the other job notifies before the first worker is
1392
    # done, the job can end up in the tasklist more than once.
1393
    job.processor_lock.acquire()
1394
    try:
1395
      return self._RunTaskInner(job)
1396
    finally:
1397
      job.processor_lock.release()
1398

    
1399
  def _RunTaskInner(self, job):
1400
    """Executes a job.
1401

1402
    Must be called with per-job lock acquired.
1403

1404
    """
1405
    queue = job.queue
1406
    assert queue == self.pool.queue
1407

    
1408
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1409
    setname_fn(None)
1410

    
1411
    proc = mcpu.Processor(queue.context, job.id)
1412

    
1413
    # Create wrapper for setting thread name
1414
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1415
                                    proc.ExecOpCode)
1416

    
1417
    _EvaluateJobProcessorResult(queue.depmgr, job,
1418
                                _JobProcessor(queue, wrap_execop_fn, job)())
1419

    
1420
  @staticmethod
1421
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1422
    """Updates the worker thread name to include a short summary of the opcode.
1423

1424
    @param setname_fn: Callable setting worker thread name
1425
    @param execop_fn: Callable for executing opcode (usually
1426
                      L{mcpu.Processor.ExecOpCode})
1427

1428
    """
1429
    setname_fn(op)
1430
    try:
1431
      return execop_fn(op, *args, **kwargs)
1432
    finally:
1433
      setname_fn(None)
1434

    
1435
  @staticmethod
1436
  def _GetWorkerName(job, op):
1437
    """Sets the worker thread name.
1438

1439
    @type job: L{_QueuedJob}
1440
    @type op: L{opcodes.OpCode}
1441

1442
    """
1443
    parts = ["Job%s" % job.id]
1444

    
1445
    if op:
1446
      parts.append(op.TinySummary())
1447

    
1448
    return "/".join(parts)
1449

    
1450

    
1451
class _JobQueueWorkerPool(workerpool.WorkerPool):
1452
  """Simple class implementing a job-processing workerpool.
1453

1454
  """
1455
  def __init__(self, queue):
1456
    super(_JobQueueWorkerPool, self).__init__("Jq",
1457
                                              JOBQUEUE_THREADS,
1458
                                              _JobQueueWorker)
1459
    self.queue = queue
1460

    
1461

    
1462
class _JobDependencyManager:
1463
  """Keeps track of job dependencies.
1464

1465
  """
1466
  (WAIT,
1467
   ERROR,
1468
   CANCEL,
1469
   CONTINUE,
1470
   WRONGSTATUS) = range(1, 6)
1471

    
1472
  def __init__(self, getstatus_fn, enqueue_fn):
1473
    """Initializes this class.
1474

1475
    """
1476
    self._getstatus_fn = getstatus_fn
1477
    self._enqueue_fn = enqueue_fn
1478

    
1479
    self._waiters = {}
1480
    self._lock = locking.SharedLock("JobDepMgr")
1481

    
1482
  @locking.ssynchronized(_LOCK, shared=1)
1483
  def GetLockInfo(self, requested): # pylint: disable=W0613
1484
    """Retrieves information about waiting jobs.
1485

1486
    @type requested: set
1487
    @param requested: Requested information, see C{query.LQ_*}
1488

1489
    """
1490
    # No need to sort here, that's being done by the lock manager and query
1491
    # library. There are no priorities for notifying jobs, hence all show up as
1492
    # one item under "pending".
1493
    return [("job/%s" % job_id, None, None,
1494
             [("job", [job.id for job in waiters])])
1495
            for job_id, waiters in self._waiters.items()
1496
            if waiters]
1497

    
1498
  @locking.ssynchronized(_LOCK, shared=1)
1499
  def JobWaiting(self, job):
1500
    """Checks if a job is waiting.
1501

1502
    """
1503
    return compat.any(job in jobs
1504
                      for jobs in self._waiters.values())
1505

    
1506
  @locking.ssynchronized(_LOCK)
1507
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1508
    """Checks if a dependency job has the requested status.
1509

1510
    If the other job is not yet in a finalized status, the calling job will be
1511
    notified (re-added to the workerpool) at a later point.
1512

1513
    @type job: L{_QueuedJob}
1514
    @param job: Job object
1515
    @type dep_job_id: int
1516
    @param dep_job_id: ID of dependency job
1517
    @type dep_status: list
1518
    @param dep_status: Required status
1519

1520
    """
1521
    assert ht.TJobId(job.id)
1522
    assert ht.TJobId(dep_job_id)
1523
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1524

    
1525
    if job.id == dep_job_id:
1526
      return (self.ERROR, "Job can't depend on itself")
1527

    
1528
    # Get status of dependency job
1529
    try:
1530
      status = self._getstatus_fn(dep_job_id)
1531
    except errors.JobLost, err:
1532
      return (self.ERROR, "Dependency error: %s" % err)
1533

    
1534
    assert status in constants.JOB_STATUS_ALL
1535

    
1536
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1537

    
1538
    if status not in constants.JOBS_FINALIZED:
1539
      # Register for notification and wait for job to finish
1540
      job_id_waiters.add(job)
1541
      return (self.WAIT,
1542
              "Need to wait for job %s, wanted status '%s'" %
1543
              (dep_job_id, dep_status))
1544

    
1545
    # Remove from waiters list
1546
    if job in job_id_waiters:
1547
      job_id_waiters.remove(job)
1548

    
1549
    if (status == constants.JOB_STATUS_CANCELED and
1550
        constants.JOB_STATUS_CANCELED not in dep_status):
1551
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1552

    
1553
    elif not dep_status or status in dep_status:
1554
      return (self.CONTINUE,
1555
              "Dependency job %s finished with status '%s'" %
1556
              (dep_job_id, status))
1557

    
1558
    else:
1559
      return (self.WRONGSTATUS,
1560
              "Dependency job %s finished with status '%s',"
1561
              " not one of '%s' as required" %
1562
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1563

    
1564
  def _RemoveEmptyWaitersUnlocked(self):
1565
    """Remove all jobs without actual waiters.
1566

1567
    """
1568
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1569
                   if not waiters]:
1570
      del self._waiters[job_id]
1571

    
1572
  def NotifyWaiters(self, job_id):
1573
    """Notifies all jobs waiting for a certain job ID.
1574

1575
    @attention: Do not call until L{CheckAndRegister} returned a status other
1576
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1577
    @type job_id: int
1578
    @param job_id: Job ID
1579

1580
    """
1581
    assert ht.TJobId(job_id)
1582

    
1583
    self._lock.acquire()
1584
    try:
1585
      self._RemoveEmptyWaitersUnlocked()
1586

    
1587
      jobs = self._waiters.pop(job_id, None)
1588
    finally:
1589
      self._lock.release()
1590

    
1591
    if jobs:
1592
      # Re-add jobs to workerpool
1593
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1594
                    len(jobs), job_id)
1595
      self._enqueue_fn(jobs)
1596

    
1597

    
1598
def _RequireOpenQueue(fn):
1599
  """Decorator for "public" functions.
1600

1601
  This function should be used for all 'public' functions. That is,
1602
  functions usually called from other classes. Note that this should
1603
  be applied only to methods (not plain functions), since it expects
1604
  that the decorated function is called with a first argument that has
1605
  a '_queue_filelock' argument.
1606

1607
  @warning: Use this decorator only after locking.ssynchronized
1608

1609
  Example::
1610
    @locking.ssynchronized(_LOCK)
1611
    @_RequireOpenQueue
1612
    def Example(self):
1613
      pass
1614

1615
  """
1616
  def wrapper(self, *args, **kwargs):
1617
    # pylint: disable=W0212
1618
    assert self._queue_filelock is not None, "Queue should be open"
1619
    return fn(self, *args, **kwargs)
1620
  return wrapper
1621

    
1622

    
1623
def _RequireNonDrainedQueue(fn):
1624
  """Decorator checking for a non-drained queue.
1625

1626
  To be used with functions submitting new jobs.
1627

1628
  """
1629
  def wrapper(self, *args, **kwargs):
1630
    """Wrapper function.
1631

1632
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1633

1634
    """
1635
    # Ok when sharing the big job queue lock, as the drain file is created when
1636
    # the lock is exclusive.
1637
    # Needs access to protected member, pylint: disable=W0212
1638
    if self._drained:
1639
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1640

    
1641
    if not self._accepting_jobs:
1642
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1643

    
1644
    return fn(self, *args, **kwargs)
1645
  return wrapper
1646

    
1647

    
1648
class JobQueue(object):
1649
  """Queue used to manage the jobs.
1650

1651
  """
1652
  def __init__(self, context):
1653
    """Constructor for JobQueue.
1654

1655
    The constructor will initialize the job queue object and then
1656
    start loading the current jobs from disk, either for starting them
1657
    (if they were queue) or for aborting them (if they were already
1658
    running).
1659

1660
    @type context: GanetiContext
1661
    @param context: the context object for access to the configuration
1662
        data and other ganeti objects
1663

1664
    """
1665
    self.context = context
1666
    self._memcache = weakref.WeakValueDictionary()
1667
    self._my_hostname = netutils.Hostname.GetSysName()
1668

    
1669
    # The Big JobQueue lock. If a code block or method acquires it in shared
1670
    # mode safe it must guarantee concurrency with all the code acquiring it in
1671
    # shared mode, including itself. In order not to acquire it at all
1672
    # concurrency must be guaranteed with all code acquiring it in shared mode
1673
    # and all code acquiring it exclusively.
1674
    self._lock = locking.SharedLock("JobQueue")
1675

    
1676
    self.acquire = self._lock.acquire
1677
    self.release = self._lock.release
1678

    
1679
    # Accept jobs by default
1680
    self._accepting_jobs = True
1681

    
1682
    # Initialize the queue, and acquire the filelock.
1683
    # This ensures no other process is working on the job queue.
1684
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1685

    
1686
    # Read serial file
1687
    self._last_serial = jstore.ReadSerial()
1688
    assert self._last_serial is not None, ("Serial file was modified between"
1689
                                           " check in jstore and here")
1690

    
1691
    # Get initial list of nodes
1692
    self._nodes = dict((n.name, n.primary_ip)
1693
                       for n in self.context.cfg.GetAllNodesInfo().values()
1694
                       if n.master_candidate)
1695

    
1696
    # Remove master node
1697
    self._nodes.pop(self._my_hostname, None)
1698

    
1699
    # TODO: Check consistency across nodes
1700

    
1701
    self._queue_size = None
1702
    self._UpdateQueueSizeUnlocked()
1703
    assert ht.TInt(self._queue_size)
1704
    self._drained = jstore.CheckDrainFlag()
1705

    
1706
    # Job dependencies
1707
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1708
                                        self._EnqueueJobs)
1709
    self.context.glm.AddToLockMonitor(self.depmgr)
1710

    
1711
    # Setup worker pool
1712
    self._wpool = _JobQueueWorkerPool(self)
1713
    try:
1714
      self._InspectQueue()
1715
    except:
1716
      self._wpool.TerminateWorkers()
1717
      raise
1718

    
1719
  @locking.ssynchronized(_LOCK)
1720
  @_RequireOpenQueue
1721
  def _InspectQueue(self):
1722
    """Loads the whole job queue and resumes unfinished jobs.
1723

1724
    This function needs the lock here because WorkerPool.AddTask() may start a
1725
    job while we're still doing our work.
1726

1727
    """
1728
    logging.info("Inspecting job queue")
1729

    
1730
    restartjobs = []
1731

    
1732
    all_job_ids = self._GetJobIDsUnlocked()
1733
    jobs_count = len(all_job_ids)
1734
    lastinfo = time.time()
1735
    for idx, job_id in enumerate(all_job_ids):
1736
      # Give an update every 1000 jobs or 10 seconds
1737
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1738
          idx == (jobs_count - 1)):
1739
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1740
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1741
        lastinfo = time.time()
1742

    
1743
      job = self._LoadJobUnlocked(job_id)
1744

    
1745
      # a failure in loading the job can cause 'None' to be returned
1746
      if job is None:
1747
        continue
1748

    
1749
      status = job.CalcStatus()
1750

    
1751
      if status == constants.JOB_STATUS_QUEUED:
1752
        restartjobs.append(job)
1753

    
1754
      elif status in (constants.JOB_STATUS_RUNNING,
1755
                      constants.JOB_STATUS_WAITING,
1756
                      constants.JOB_STATUS_CANCELING):
1757
        logging.warning("Unfinished job %s found: %s", job.id, job)
1758

    
1759
        if status == constants.JOB_STATUS_WAITING:
1760
          # Restart job
1761
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1762
          restartjobs.append(job)
1763
        else:
1764
          to_encode = errors.OpExecError("Unclean master daemon shutdown")
1765
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1766
                                _EncodeOpError(to_encode))
1767
          job.Finalize()
1768

    
1769
        self.UpdateJobUnlocked(job)
1770

    
1771
    if restartjobs:
1772
      logging.info("Restarting %s jobs", len(restartjobs))
1773
      self._EnqueueJobsUnlocked(restartjobs)
1774

    
1775
    logging.info("Job queue inspection finished")
1776

    
1777
  def _GetRpc(self, address_list):
1778
    """Gets RPC runner with context.
1779

1780
    """
1781
    return rpc.JobQueueRunner(self.context, address_list)
1782

    
1783
  @locking.ssynchronized(_LOCK)
1784
  @_RequireOpenQueue
1785
  def AddNode(self, node):
1786
    """Register a new node with the queue.
1787

1788
    @type node: L{objects.Node}
1789
    @param node: the node object to be added
1790

1791
    """
1792
    node_name = node.name
1793
    assert node_name != self._my_hostname
1794

    
1795
    # Clean queue directory on added node
1796
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1797
    msg = result.fail_msg
1798
    if msg:
1799
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1800
                      node_name, msg)
1801

    
1802
    if not node.master_candidate:
1803
      # remove if existing, ignoring errors
1804
      self._nodes.pop(node_name, None)
1805
      # and skip the replication of the job ids
1806
      return
1807

    
1808
    # Upload the whole queue excluding archived jobs
1809
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1810

    
1811
    # Upload current serial file
1812
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1813

    
1814
    # Static address list
1815
    addrs = [node.primary_ip]
1816

    
1817
    for file_name in files:
1818
      # Read file content
1819
      content = utils.ReadFile(file_name)
1820

    
1821
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1822
                             file_name, content)
1823
      msg = result[node_name].fail_msg
1824
      if msg:
1825
        logging.error("Failed to upload file %s to node %s: %s",
1826
                      file_name, node_name, msg)
1827

    
1828
    # Set queue drained flag
1829
    result = \
1830
      self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1831
                                                       self._drained)
1832
    msg = result[node_name].fail_msg
1833
    if msg:
1834
      logging.error("Failed to set queue drained flag on node %s: %s",
1835
                    node_name, msg)
1836

    
1837
    self._nodes[node_name] = node.primary_ip
1838

    
1839
  @locking.ssynchronized(_LOCK)
1840
  @_RequireOpenQueue
1841
  def RemoveNode(self, node_name):
1842
    """Callback called when removing nodes from the cluster.
1843

1844
    @type node_name: str
1845
    @param node_name: the name of the node to remove
1846

1847
    """
1848
    self._nodes.pop(node_name, None)
1849

    
1850
  @staticmethod
1851
  def _CheckRpcResult(result, nodes, failmsg):
1852
    """Verifies the status of an RPC call.
1853

1854
    Since we aim to keep consistency should this node (the current
1855
    master) fail, we will log errors if our rpc fail, and especially
1856
    log the case when more than half of the nodes fails.
1857

1858
    @param result: the data as returned from the rpc call
1859
    @type nodes: list
1860
    @param nodes: the list of nodes we made the call to
1861
    @type failmsg: str
1862
    @param failmsg: the identifier to be used for logging
1863

1864
    """
1865
    failed = []
1866
    success = []
1867

    
1868
    for node in nodes:
1869
      msg = result[node].fail_msg
1870
      if msg:
1871
        failed.append(node)
1872
        logging.error("RPC call %s (%s) failed on node %s: %s",
1873
                      result[node].call, failmsg, node, msg)
1874
      else:
1875
        success.append(node)
1876

    
1877
    # +1 for the master node
1878
    if (len(success) + 1) < len(failed):
1879
      # TODO: Handle failing nodes
1880
      logging.error("More than half of the nodes failed")
1881

    
1882
  def _GetNodeIp(self):
1883
    """Helper for returning the node name/ip list.
1884

1885
    @rtype: (list, list)
1886
    @return: a tuple of two lists, the first one with the node
1887
        names and the second one with the node addresses
1888

1889
    """
1890
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1891
    name_list = self._nodes.keys()
1892
    addr_list = [self._nodes[name] for name in name_list]
1893
    return name_list, addr_list
1894

    
1895
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1896
    """Writes a file locally and then replicates it to all nodes.
1897

1898
    This function will replace the contents of a file on the local
1899
    node and then replicate it to all the other nodes we have.
1900

1901
    @type file_name: str
1902
    @param file_name: the path of the file to be replicated
1903
    @type data: str
1904
    @param data: the new contents of the file
1905
    @type replicate: boolean
1906
    @param replicate: whether to spread the changes to the remote nodes
1907

1908
    """
1909
    getents = runtime.GetEnts()
1910
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1911
                    gid=getents.daemons_gid,
1912
                    mode=constants.JOB_QUEUE_FILES_PERMS)
1913

    
1914
    if replicate:
1915
      names, addrs = self._GetNodeIp()
1916
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1917
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1918

    
1919
  def _RenameFilesUnlocked(self, rename):
1920
    """Renames a file locally and then replicate the change.
1921

1922
    This function will rename a file in the local queue directory
1923
    and then replicate this rename to all the other nodes we have.
1924

1925
    @type rename: list of (old, new)
1926
    @param rename: List containing tuples mapping old to new names
1927

1928
    """
1929
    # Rename them locally
1930
    for old, new in rename:
1931
      utils.RenameFile(old, new, mkdir=True)
1932

    
1933
    # ... and on all nodes
1934
    names, addrs = self._GetNodeIp()
1935
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1936
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1937

    
1938
  def _NewSerialsUnlocked(self, count):
1939
    """Generates a new job identifier.
1940

1941
    Job identifiers are unique during the lifetime of a cluster.
1942

1943
    @type count: integer
1944
    @param count: how many serials to return
1945
    @rtype: list of int
1946
    @return: a list of job identifiers.
1947

1948
    """
1949
    assert ht.TNonNegativeInt(count)
1950

    
1951
    # New number
1952
    serial = self._last_serial + count
1953

    
1954
    # Write to file
1955
    self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1956
                             "%s\n" % serial, True)
1957

    
1958
    result = [jstore.FormatJobID(v)
1959
              for v in range(self._last_serial + 1, serial + 1)]
1960

    
1961
    # Keep it only if we were able to write the file
1962
    self._last_serial = serial
1963

    
1964
    assert len(result) == count
1965

    
1966
    return result
1967

    
1968
  @staticmethod
1969
  def _GetJobPath(job_id):
1970
    """Returns the job file for a given job id.
1971

1972
    @type job_id: str
1973
    @param job_id: the job identifier
1974
    @rtype: str
1975
    @return: the path to the job file
1976

1977
    """
1978
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1979

    
1980
  @staticmethod
1981
  def _GetArchivedJobPath(job_id):
1982
    """Returns the archived job file for a give job id.
1983

1984
    @type job_id: str
1985
    @param job_id: the job identifier
1986
    @rtype: str
1987
    @return: the path to the archived job file
1988

1989
    """
1990
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1991
                          jstore.GetArchiveDirectory(job_id),
1992
                          "job-%s" % job_id)
1993

    
1994
  @staticmethod
1995
  def _DetermineJobDirectories(archived):
1996
    """Build list of directories containing job files.
1997

1998
    @type archived: bool
1999
    @param archived: Whether to include directories for archived jobs
2000
    @rtype: list
2001

2002
    """
2003
    result = [pathutils.QUEUE_DIR]
2004

    
2005
    if archived:
2006
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
2007
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
2008
                        utils.ListVisibleFiles(archive_path)))
2009

    
2010
    return result
2011

    
2012
  @classmethod
2013
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
2014
    """Return all known job IDs.
2015

2016
    The method only looks at disk because it's a requirement that all
2017
    jobs are present on disk (so in the _memcache we don't have any
2018
    extra IDs).
2019

2020
    @type sort: boolean
2021
    @param sort: perform sorting on the returned job ids
2022
    @rtype: list
2023
    @return: the list of job IDs
2024

2025
    """
2026
    jlist = []
2027

    
2028
    for path in cls._DetermineJobDirectories(archived):
2029
      for filename in utils.ListVisibleFiles(path):
2030
        m = constants.JOB_FILE_RE.match(filename)
2031
        if m:
2032
          jlist.append(int(m.group(1)))
2033

    
2034
    if sort:
2035
      jlist.sort()
2036
    return jlist
2037

    
2038
  def _LoadJobUnlocked(self, job_id):
2039
    """Loads a job from the disk or memory.
2040

2041
    Given a job id, this will return the cached job object if
2042
    existing, or try to load the job from the disk. If loading from
2043
    disk, it will also add the job to the cache.
2044

2045
    @type job_id: int
2046
    @param job_id: the job id
2047
    @rtype: L{_QueuedJob} or None
2048
    @return: either None or the job object
2049

2050
    """
2051
    assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!"
2052

    
2053
    job = self._memcache.get(job_id, None)
2054
    if job:
2055
      logging.debug("Found job %s in memcache", job_id)
2056
      assert job.writable, "Found read-only job in memcache"
2057
      return job
2058

    
2059
    try:
2060
      job = self._LoadJobFromDisk(job_id, False)
2061
      if job is None:
2062
        return job
2063
    except errors.JobFileCorrupted:
2064
      old_path = self._GetJobPath(job_id)
2065
      new_path = self._GetArchivedJobPath(job_id)
2066
      if old_path == new_path:
2067
        # job already archived (future case)
2068
        logging.exception("Can't parse job %s", job_id)
2069
      else:
2070
        # non-archived case
2071
        logging.exception("Can't parse job %s, will archive.", job_id)
2072
        self._RenameFilesUnlocked([(old_path, new_path)])
2073
      return None
2074

    
2075
    assert job.writable, "Job just loaded is not writable"
2076

    
2077
    self._memcache[job_id] = job
2078
    logging.debug("Added job %s to the cache", job_id)
2079
    return job
2080

    
2081
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2082
    """Load the given job file from disk.
2083

2084
    Given a job file, read, load and restore it in a _QueuedJob format.
2085

2086
    @type job_id: int
2087
    @param job_id: job identifier
2088
    @type try_archived: bool
2089
    @param try_archived: Whether to try loading an archived job
2090
    @rtype: L{_QueuedJob} or None
2091
    @return: either None or the job object
2092

2093
    """
2094
    path_functions = [(self._GetJobPath, False)]
2095

    
2096
    if try_archived:
2097
      path_functions.append((self._GetArchivedJobPath, True))
2098

    
2099
    raw_data = None
2100
    archived = None
2101

    
2102
    for (fn, archived) in path_functions:
2103
      filepath = fn(job_id)
2104
      logging.debug("Loading job from %s", filepath)
2105
      try:
2106
        raw_data = utils.ReadFile(filepath)
2107
      except EnvironmentError, err:
2108
        if err.errno != errno.ENOENT:
2109
          raise
2110
      else:
2111
        break
2112

    
2113
    if not raw_data:
2114
      return None
2115

    
2116
    if writable is None:
2117
      writable = not archived
2118

    
2119
    try:
2120
      data = serializer.LoadJson(raw_data)
2121
      job = _QueuedJob.Restore(self, data, writable, archived)
2122
    except Exception, err: # pylint: disable=W0703
2123
      raise errors.JobFileCorrupted(err)
2124

    
2125
    return job
2126

    
2127
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2128
    """Load the given job file from disk.
2129

2130
    Given a job file, read, load and restore it in a _QueuedJob format.
2131
    In case of error reading the job, it gets returned as None, and the
2132
    exception is logged.
2133

2134
    @type job_id: int
2135
    @param job_id: job identifier
2136
    @type try_archived: bool
2137
    @param try_archived: Whether to try loading an archived job
2138
    @rtype: L{_QueuedJob} or None
2139
    @return: either None or the job object
2140

2141
    """
2142
    try:
2143
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2144
    except (errors.JobFileCorrupted, EnvironmentError):
2145
      logging.exception("Can't load/parse job %s", job_id)
2146
      return None
2147

    
2148
  def _UpdateQueueSizeUnlocked(self):
2149
    """Update the queue size.
2150

2151
    """
2152
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2153

    
2154
  @locking.ssynchronized(_LOCK)
2155
  @_RequireOpenQueue
2156
  def SetDrainFlag(self, drain_flag):
2157
    """Sets the drain flag for the queue.
2158

2159
    @type drain_flag: boolean
2160
    @param drain_flag: Whether to set or unset the drain flag
2161

2162
    """
2163
    # Change flag locally
2164
    jstore.SetDrainFlag(drain_flag)
2165

    
2166
    self._drained = drain_flag
2167

    
2168
    # ... and on all nodes
2169
    (names, addrs) = self._GetNodeIp()
2170
    result = \
2171
      self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2172
    self._CheckRpcResult(result, self._nodes,
2173
                         "Setting queue drain flag to %s" % drain_flag)
2174

    
2175
    return True
2176

    
2177
  @_RequireOpenQueue
2178
  def _SubmitJobUnlocked(self, job_id, ops):
2179
    """Create and store a new job.
2180

2181
    This enters the job into our job queue and also puts it on the new
2182
    queue, in order for it to be picked up by the queue processors.
2183

2184
    @type job_id: job ID
2185
    @param job_id: the job ID for the new job
2186
    @type ops: list
2187
    @param ops: The list of OpCodes that will become the new job.
2188
    @rtype: L{_QueuedJob}
2189
    @return: the job object to be queued
2190
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2191
    @raise errors.GenericError: If an opcode is not valid
2192

2193
    """
2194
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2195
      raise errors.JobQueueFull()
2196

    
2197
    job = _QueuedJob(self, job_id, ops, True)
2198

    
2199
    for idx, op in enumerate(job.ops):
2200
      # Check priority
2201
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2202
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2203
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2204
                                  " are %s" % (idx, op.priority, allowed))
2205

    
2206
      # Check job dependencies
2207
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2208
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2209
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2210
                                  " match %s: %s" %
2211
                                  (idx, opcodes.TNoRelativeJobDependencies,
2212
                                   dependencies))
2213

    
2214
    # Write to disk
2215
    self.UpdateJobUnlocked(job)
2216

    
2217
    self._queue_size += 1
2218

    
2219
    logging.debug("Adding new job %s to the cache", job_id)
2220
    self._memcache[job_id] = job
2221

    
2222
    return job
2223

    
2224
  @locking.ssynchronized(_LOCK)
2225
  @_RequireOpenQueue
2226
  @_RequireNonDrainedQueue
2227
  def SubmitJob(self, ops):
2228
    """Create and store a new job.
2229

2230
    @see: L{_SubmitJobUnlocked}
2231

2232
    """
2233
    (job_id, ) = self._NewSerialsUnlocked(1)
2234
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2235
    return job_id
2236

    
2237
  @locking.ssynchronized(_LOCK)
2238
  @_RequireOpenQueue
2239
  @_RequireNonDrainedQueue
2240
  def SubmitManyJobs(self, jobs):
2241
    """Create and store multiple jobs.
2242

2243
    @see: L{_SubmitJobUnlocked}
2244

2245
    """
2246
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2247

    
2248
    (results, added_jobs) = \
2249
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2250

    
2251
    self._EnqueueJobsUnlocked(added_jobs)
2252

    
2253
    return results
2254

    
2255
  @staticmethod
2256
  def _FormatSubmitError(msg, ops):
2257
    """Formats errors which occurred while submitting a job.
2258

2259
    """
2260
    return ("%s; opcodes %s" %
2261
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2262

    
2263
  @staticmethod
2264
  def _ResolveJobDependencies(resolve_fn, deps):
2265
    """Resolves relative job IDs in dependencies.
2266

2267
    @type resolve_fn: callable
2268
    @param resolve_fn: Function to resolve a relative job ID
2269
    @type deps: list
2270
    @param deps: Dependencies
2271
    @rtype: tuple; (boolean, string or list)
2272
    @return: If successful (first tuple item), the returned list contains
2273
      resolved job IDs along with the requested status; if not successful,
2274
      the second element is an error message
2275

2276
    """
2277
    result = []
2278

    
2279
    for (dep_job_id, dep_status) in deps:
2280
      if ht.TRelativeJobId(dep_job_id):
2281
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2282
        try:
2283
          job_id = resolve_fn(dep_job_id)
2284
        except IndexError:
2285
          # Abort
2286
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2287
      else:
2288
        job_id = dep_job_id
2289

    
2290
      result.append((job_id, dep_status))
2291

    
2292
    return (True, result)
2293

    
2294
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2295
    """Create and store multiple jobs.
2296

2297
    @see: L{_SubmitJobUnlocked}
2298

2299
    """
2300
    results = []
2301
    added_jobs = []
2302

    
2303
    def resolve_fn(job_idx, reljobid):
2304
      assert reljobid < 0
2305
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2306

    
2307
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2308
      for op in ops:
2309
        if getattr(op, opcodes.DEPEND_ATTR, None):
2310
          (status, data) = \
2311
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2312
                                         op.depends)
2313
          if not status:
2314
            # Abort resolving dependencies
2315
            assert ht.TNonEmptyString(data), "No error message"
2316
            break
2317
          # Use resolved dependencies
2318
          op.depends = data
2319
      else:
2320
        try:
2321
          job = self._SubmitJobUnlocked(job_id, ops)
2322
        except errors.GenericError, err:
2323
          status = False
2324
          data = self._FormatSubmitError(str(err), ops)
2325
        else:
2326
          status = True
2327
          data = job_id
2328
          added_jobs.append(job)
2329

    
2330
      results.append((status, data))
2331

    
2332
    return (results, added_jobs)
2333

    
2334
  @locking.ssynchronized(_LOCK)
2335
  def _EnqueueJobs(self, jobs):
2336
    """Helper function to add jobs to worker pool's queue.
2337

2338
    @type jobs: list
2339
    @param jobs: List of all jobs
2340

2341
    """
2342
    return self._EnqueueJobsUnlocked(jobs)
2343

    
2344
  def _EnqueueJobsUnlocked(self, jobs):
2345
    """Helper function to add jobs to worker pool's queue.
2346

2347
    @type jobs: list
2348
    @param jobs: List of all jobs
2349

2350
    """
2351
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2352
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2353
                             priority=[job.CalcPriority() for job in jobs],
2354
                             task_id=map(_GetIdAttr, jobs))
2355

    
2356
  def _GetJobStatusForDependencies(self, job_id):
2357
    """Gets the status of a job for dependencies.
2358

2359
    @type job_id: int
2360
    @param job_id: Job ID
2361
    @raise errors.JobLost: If job can't be found
2362

2363
    """
2364
    # Not using in-memory cache as doing so would require an exclusive lock
2365

    
2366
    # Try to load from disk
2367
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2368

    
2369
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2370

    
2371
    if job:
2372
      return job.CalcStatus()
2373

    
2374
    raise errors.JobLost("Job %s not found" % job_id)
2375

    
2376
  @_RequireOpenQueue
2377
  def UpdateJobUnlocked(self, job, replicate=True):
2378
    """Update a job's on disk storage.
2379

2380
    After a job has been modified, this function needs to be called in
2381
    order to write the changes to disk and replicate them to the other
2382
    nodes.
2383

2384
    @type job: L{_QueuedJob}
2385
    @param job: the changed job
2386
    @type replicate: boolean
2387
    @param replicate: whether to replicate the change to remote nodes
2388

2389
    """
2390
    if __debug__:
2391
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2392
      assert (finalized ^ (job.end_timestamp is None))
2393
      assert job.writable, "Can't update read-only job"
2394
      assert not job.archived, "Can't update archived job"
2395

    
2396
    filename = self._GetJobPath(job.id)
2397
    data = serializer.DumpJson(job.Serialize())
2398
    logging.debug("Writing job %s to %s", job.id, filename)
2399
    self._UpdateJobQueueFile(filename, data, replicate)
2400

    
2401
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2402
                        timeout):
2403
    """Waits for changes in a job.
2404

2405
    @type job_id: int
2406
    @param job_id: Job identifier
2407
    @type fields: list of strings
2408
    @param fields: Which fields to check for changes
2409
    @type prev_job_info: list or None
2410
    @param prev_job_info: Last job information returned
2411
    @type prev_log_serial: int
2412
    @param prev_log_serial: Last job message serial number
2413
    @type timeout: float
2414
    @param timeout: maximum time to wait in seconds
2415
    @rtype: tuple (job info, log entries)
2416
    @return: a tuple of the job information as required via
2417
        the fields parameter, and the log entries as a list
2418

2419
        if the job has not changed and the timeout has expired,
2420
        we instead return a special value,
2421
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2422
        as such by the clients
2423

2424
    """
2425
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2426
                             writable=False)
2427

    
2428
    helper = _WaitForJobChangesHelper()
2429

    
2430
    return helper(self._GetJobPath(job_id), load_fn,
2431
                  fields, prev_job_info, prev_log_serial, timeout)
2432

    
2433
  @locking.ssynchronized(_LOCK)
2434
  @_RequireOpenQueue
2435
  def CancelJob(self, job_id):
2436
    """Cancels a job.
2437

2438
    This will only succeed if the job has not started yet.
2439

2440
    @type job_id: int
2441
    @param job_id: job ID of job to be cancelled.
2442

2443
    """
2444
    logging.info("Cancelling job %s", job_id)
2445

    
2446
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2447

    
2448
  @locking.ssynchronized(_LOCK)
2449
  @_RequireOpenQueue
2450
  def ChangeJobPriority(self, job_id, priority):
2451
    """Changes a job's priority.
2452

2453
    @type job_id: int
2454
    @param job_id: ID of the job whose priority should be changed
2455
    @type priority: int
2456
    @param priority: New priority
2457

2458
    """
2459
    logging.info("Changing priority of job %s to %s", job_id, priority)
2460

    
2461
    if priority not in constants.OP_PRIO_SUBMIT_VALID:
2462
      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2463
      raise errors.GenericError("Invalid priority %s, allowed are %s" %
2464
                                (priority, allowed))
2465

    
2466
    def fn(job):
2467
      (success, msg) = job.ChangePriority(priority)
2468

    
2469
      if success:
2470
        try:
2471
          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2472
        except workerpool.NoSuchTask:
2473
          logging.debug("Job %s is not in workerpool at this time", job.id)
2474

    
2475
      return (success, msg)
2476

    
2477
    return self._ModifyJobUnlocked(job_id, fn)
2478

    
2479
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2480
    """Modifies a job.
2481

2482
    @type job_id: int
2483
    @param job_id: Job ID
2484
    @type mod_fn: callable
2485
    @param mod_fn: Modifying function, receiving job object as parameter,
2486
      returning tuple of (status boolean, message string)
2487

2488
    """
2489
    job = self._LoadJobUnlocked(job_id)
2490
    if not job:
2491
      logging.debug("Job %s not found", job_id)
2492
      return (False, "Job %s not found" % job_id)
2493

    
2494
    assert job.writable, "Can't modify read-only job"
2495
    assert not job.archived, "Can't modify archived job"
2496

    
2497
    (success, msg) = mod_fn(job)
2498

    
2499
    if success:
2500
      # If the job was finalized (e.g. cancelled), this is the final write
2501
      # allowed. The job can be archived anytime.
2502
      self.UpdateJobUnlocked(job)
2503

    
2504
    return (success, msg)
2505

    
2506
  @_RequireOpenQueue
2507
  def _ArchiveJobsUnlocked(self, jobs):
2508
    """Archives jobs.
2509

2510
    @type jobs: list of L{_QueuedJob}
2511
    @param jobs: Job objects
2512
    @rtype: int
2513
    @return: Number of archived jobs
2514

2515
    """
2516
    archive_jobs = []
2517
    rename_files = []
2518
    for job in jobs:
2519
      assert job.writable, "Can't archive read-only job"
2520
      assert not job.archived, "Can't cancel archived job"
2521

    
2522
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2523
        logging.debug("Job %s is not yet done", job.id)
2524
        continue
2525

    
2526
      archive_jobs.append(job)
2527

    
2528
      old = self._GetJobPath(job.id)
2529
      new = self._GetArchivedJobPath(job.id)
2530
      rename_files.append((old, new))
2531

    
2532
    # TODO: What if 1..n files fail to rename?
2533
    self._RenameFilesUnlocked(rename_files)
2534

    
2535
    logging.debug("Successfully archived job(s) %s",
2536
                  utils.CommaJoin(job.id for job in archive_jobs))
2537

    
2538
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2539
    # the files, we update the cached queue size from the filesystem. When we
2540
    # get around to fix the TODO: above, we can use the number of actually
2541
    # archived jobs to fix this.
2542
    self._UpdateQueueSizeUnlocked()
2543
    return len(archive_jobs)
2544

    
2545
  @locking.ssynchronized(_LOCK)
2546
  @_RequireOpenQueue
2547
  def ArchiveJob(self, job_id):
2548
    """Archives a job.
2549

2550
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2551

2552
    @type job_id: int
2553
    @param job_id: Job ID of job to be archived.
2554
    @rtype: bool
2555
    @return: Whether job was archived
2556

2557
    """
2558
    logging.info("Archiving job %s", job_id)
2559

    
2560
    job = self._LoadJobUnlocked(job_id)
2561
    if not job:
2562
      logging.debug("Job %s not found", job_id)
2563
      return False
2564

    
2565
    return self._ArchiveJobsUnlocked([job]) == 1
2566

    
2567
  @locking.ssynchronized(_LOCK)
2568
  @_RequireOpenQueue
2569
  def AutoArchiveJobs(self, age, timeout):
2570
    """Archives all jobs based on age.
2571

2572
    The method will archive all jobs which are older than the age
2573
    parameter. For jobs that don't have an end timestamp, the start
2574
    timestamp will be considered. The special '-1' age will cause
2575
    archival of all jobs (that are not running or queued).
2576

2577
    @type age: int
2578
    @param age: the minimum age in seconds
2579

2580
    """
2581
    logging.info("Archiving jobs with age more than %s seconds", age)
2582

    
2583
    now = time.time()
2584
    end_time = now + timeout
2585
    archived_count = 0
2586
    last_touched = 0
2587

    
2588
    all_job_ids = self._GetJobIDsUnlocked()
2589
    pending = []
2590
    for idx, job_id in enumerate(all_job_ids):
2591
      last_touched = idx + 1
2592

    
2593
      # Not optimal because jobs could be pending
2594
      # TODO: Measure average duration for job archival and take number of
2595
      # pending jobs into account.
2596
      if time.time() > end_time:
2597
        break
2598

    
2599
      # Returns None if the job failed to load
2600
      job = self._LoadJobUnlocked(job_id)
2601
      if job:
2602
        if job.end_timestamp is None:
2603
          if job.start_timestamp is None:
2604
            job_age = job.received_timestamp
2605
          else:
2606
            job_age = job.start_timestamp
2607
        else:
2608
          job_age = job.end_timestamp
2609

    
2610
        if age == -1 or now - job_age[0] > age:
2611
          pending.append(job)
2612

    
2613
          # Archive 10 jobs at a time
2614
          if len(pending) >= 10:
2615
            archived_count += self._ArchiveJobsUnlocked(pending)
2616
            pending = []
2617

    
2618
    if pending:
2619
      archived_count += self._ArchiveJobsUnlocked(pending)
2620

    
2621
    return (archived_count, len(all_job_ids) - last_touched)
2622

    
2623
  def _Query(self, fields, qfilter):
2624
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2625
                       namefield="id")
2626

    
2627
    # Archived jobs are only looked at if the "archived" field is referenced
2628
    # either as a requested field or in the filter. By default archived jobs
2629
    # are ignored.
2630
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2631

    
2632
    job_ids = qobj.RequestedNames()
2633

    
2634
    list_all = (job_ids is None)
2635

    
2636
    if list_all:
2637
      # Since files are added to/removed from the queue atomically, there's no
2638
      # risk of getting the job ids in an inconsistent state.
2639
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2640

    
2641
    jobs = []
2642

    
2643
    for job_id in job_ids:
2644
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2645
      if job is not None or not list_all:
2646
        jobs.append((job_id, job))
2647

    
2648
    return (qobj, jobs, list_all)
2649

    
2650
  def QueryJobs(self, fields, qfilter):
2651
    """Returns a list of jobs in queue.
2652

2653
    @type fields: sequence
2654
    @param fields: List of wanted fields
2655
    @type qfilter: None or query2 filter (list)
2656
    @param qfilter: Query filter
2657

2658
    """
2659
    (qobj, ctx, _) = self._Query(fields, qfilter)
2660

    
2661
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2662

    
2663
  def OldStyleQueryJobs(self, job_ids, fields):
2664
    """Returns a list of jobs in queue.
2665

2666
    @type job_ids: list
2667
    @param job_ids: sequence of job identifiers or None for all
2668
    @type fields: list
2669
    @param fields: names of fields to return
2670
    @rtype: list
2671
    @return: list one element per job, each element being list with
2672
        the requested fields
2673

2674
    """
2675
    # backwards compat:
2676
    job_ids = [int(jid) for jid in job_ids]
2677
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2678

    
2679
    (qobj, ctx, _) = self._Query(fields, qfilter)
2680

    
2681
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2682

    
2683
  @locking.ssynchronized(_LOCK)
2684
  def PrepareShutdown(self):
2685
    """Prepare to stop the job queue.
2686

2687
    Disables execution of jobs in the workerpool and returns whether there are
2688
    any jobs currently running. If the latter is the case, the job queue is not
2689
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2690
    be called without interfering with any job. Queued and unfinished jobs will
2691
    be resumed next time.
2692

2693
    Once this function has been called no new job submissions will be accepted
2694
    (see L{_RequireNonDrainedQueue}).
2695

2696
    @rtype: bool
2697
    @return: Whether there are any running jobs
2698

2699
    """
2700
    if self._accepting_jobs:
2701
      self._accepting_jobs = False
2702

    
2703
      # Tell worker pool to stop processing pending tasks
2704
      self._wpool.SetActive(False)
2705

    
2706
    return self._wpool.HasRunningTasks()
2707

    
2708
  def AcceptingJobsUnlocked(self):
2709
    """Returns whether jobs are accepted.
2710

2711
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2712
    queue is shutting down.
2713

2714
    @rtype: bool
2715

2716
    """
2717
    return self._accepting_jobs
2718

    
2719
  @locking.ssynchronized(_LOCK)
2720
  @_RequireOpenQueue
2721
  def Shutdown(self):
2722
    """Stops the job queue.
2723

2724
    This shutdowns all the worker threads an closes the queue.
2725

2726
    """
2727
    self._wpool.TerminateWorkers()
2728

    
2729
    self._queue_filelock.Close()
2730
    self._queue_filelock = None