Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 61e062dd

History | View | Annotate | Download (79.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import time
35
import weakref
36
import threading
37
import itertools
38
import operator
39

    
40
try:
41
  # pylint: disable=E0611
42
  from pyinotify import pyinotify
43
except ImportError:
44
  import pyinotify
45

    
46
from ganeti import asyncnotifier
47
from ganeti import constants
48
from ganeti import serializer
49
from ganeti import workerpool
50
from ganeti import locking
51
from ganeti import opcodes
52
from ganeti import errors
53
from ganeti import mcpu
54
from ganeti import utils
55
from ganeti import jstore
56
from ganeti import rpc
57
from ganeti import runtime
58
from ganeti import netutils
59
from ganeti import compat
60
from ganeti import ht
61
from ganeti import query
62
from ganeti import qlang
63
from ganeti import pathutils
64
from ganeti import vcluster
65

    
66

    
67
JOBQUEUE_THREADS = 25
68

    
69
# member lock names to be passed to @ssynchronized decorator
70
_LOCK = "_lock"
71
_QUEUE = "_queue"
72

    
73
#: Retrieves "id" attribute
74
_GetIdAttr = operator.attrgetter("id")
75

    
76

    
77
class CancelJob(Exception):
78
  """Special exception to cancel a job.
79

80
  """
81

    
82

    
83
class QueueShutdown(Exception):
84
  """Special exception to abort a job when the job queue is shutting down.
85

86
  """
87

    
88

    
89
def TimeStampNow():
90
  """Returns the current timestamp.
91

92
  @rtype: tuple
93
  @return: the current time in the (seconds, microseconds) format
94

95
  """
96
  return utils.SplitTime(time.time())
97

    
98

    
99
def _CallJqUpdate(runner, names, file_name, content):
100
  """Updates job queue file after virtualizing filename.
101

102
  """
103
  virt_file_name = vcluster.MakeVirtualPath(file_name)
104
  return runner.call_jobqueue_update(names, virt_file_name, content)
105

    
106

    
107
class _SimpleJobQuery:
108
  """Wrapper for job queries.
109

110
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
111

112
  """
113
  def __init__(self, fields):
114
    """Initializes this class.
115

116
    """
117
    self._query = query.Query(query.JOB_FIELDS, fields)
118

    
119
  def __call__(self, job):
120
    """Executes a job query using cached field list.
121

122
    """
123
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
124

    
125

    
126
class _QueuedOpCode(object):
127
  """Encapsulates an opcode object.
128

129
  @ivar log: holds the execution log and consists of tuples
130
  of the form C{(log_serial, timestamp, level, message)}
131
  @ivar input: the OpCode we encapsulate
132
  @ivar status: the current status
133
  @ivar result: the result of the LU execution
134
  @ivar start_timestamp: timestamp for the start of the execution
135
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
136
  @ivar stop_timestamp: timestamp for the end of the execution
137

138
  """
139
  __slots__ = ["input", "status", "result", "log", "priority",
140
               "start_timestamp", "exec_timestamp", "end_timestamp",
141
               "__weakref__"]
142

    
143
  def __init__(self, op):
144
    """Initializes instances of this class.
145

146
    @type op: L{opcodes.OpCode}
147
    @param op: the opcode we encapsulate
148

149
    """
150
    self.input = op
151
    self.status = constants.OP_STATUS_QUEUED
152
    self.result = None
153
    self.log = []
154
    self.start_timestamp = None
155
    self.exec_timestamp = None
156
    self.end_timestamp = None
157

    
158
    # Get initial priority (it might change during the lifetime of this opcode)
159
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
160

    
161
  @classmethod
162
  def Restore(cls, state):
163
    """Restore the _QueuedOpCode from the serialized form.
164

165
    @type state: dict
166
    @param state: the serialized state
167
    @rtype: _QueuedOpCode
168
    @return: a new _QueuedOpCode instance
169

170
    """
171
    obj = _QueuedOpCode.__new__(cls)
172
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
173
    obj.status = state["status"]
174
    obj.result = state["result"]
175
    obj.log = state["log"]
176
    obj.start_timestamp = state.get("start_timestamp", None)
177
    obj.exec_timestamp = state.get("exec_timestamp", None)
178
    obj.end_timestamp = state.get("end_timestamp", None)
179
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
180
    return obj
181

    
182
  def Serialize(self):
183
    """Serializes this _QueuedOpCode.
184

185
    @rtype: dict
186
    @return: the dictionary holding the serialized state
187

188
    """
189
    return {
190
      "input": self.input.__getstate__(),
191
      "status": self.status,
192
      "result": self.result,
193
      "log": self.log,
194
      "start_timestamp": self.start_timestamp,
195
      "exec_timestamp": self.exec_timestamp,
196
      "end_timestamp": self.end_timestamp,
197
      "priority": self.priority,
198
      }
199

    
200

    
201
class _QueuedJob(object):
202
  """In-memory job representation.
203

204
  This is what we use to track the user-submitted jobs. Locking must
205
  be taken care of by users of this class.
206

207
  @type queue: L{JobQueue}
208
  @ivar queue: the parent queue
209
  @ivar id: the job ID
210
  @type ops: list
211
  @ivar ops: the list of _QueuedOpCode that constitute the job
212
  @type log_serial: int
213
  @ivar log_serial: holds the index for the next log entry
214
  @ivar received_timestamp: the timestamp for when the job was received
215
  @ivar start_timestmap: the timestamp for start of execution
216
  @ivar end_timestamp: the timestamp for end of execution
217
  @ivar writable: Whether the job is allowed to be modified
218

219
  """
220
  # pylint: disable=W0212
221
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
222
               "received_timestamp", "start_timestamp", "end_timestamp",
223
               "__weakref__", "processor_lock", "writable", "archived"]
224

    
225
  def __init__(self, queue, job_id, ops, writable):
226
    """Constructor for the _QueuedJob.
227

228
    @type queue: L{JobQueue}
229
    @param queue: our parent queue
230
    @type job_id: job_id
231
    @param job_id: our job id
232
    @type ops: list
233
    @param ops: the list of opcodes we hold, which will be encapsulated
234
        in _QueuedOpCodes
235
    @type writable: bool
236
    @param writable: Whether job can be modified
237

238
    """
239
    if not ops:
240
      raise errors.GenericError("A job needs at least one opcode")
241

    
242
    self.queue = queue
243
    self.id = int(job_id)
244
    self.ops = [_QueuedOpCode(op) for op in ops]
245
    self.log_serial = 0
246
    self.received_timestamp = TimeStampNow()
247
    self.start_timestamp = None
248
    self.end_timestamp = None
249
    self.archived = False
250

    
251
    self._InitInMemory(self, writable)
252

    
253
    assert not self.archived, "New jobs can not be marked as archived"
254

    
255
  @staticmethod
256
  def _InitInMemory(obj, writable):
257
    """Initializes in-memory variables.
258

259
    """
260
    obj.writable = writable
261
    obj.ops_iter = None
262
    obj.cur_opctx = None
263

    
264
    # Read-only jobs are not processed and therefore don't need a lock
265
    if writable:
266
      obj.processor_lock = threading.Lock()
267
    else:
268
      obj.processor_lock = None
269

    
270
  def __repr__(self):
271
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
272
              "id=%s" % self.id,
273
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
274

    
275
    return "<%s at %#x>" % (" ".join(status), id(self))
276

    
277
  @classmethod
278
  def Restore(cls, queue, state, writable, archived):
279
    """Restore a _QueuedJob from serialized state:
280

281
    @type queue: L{JobQueue}
282
    @param queue: to which queue the restored job belongs
283
    @type state: dict
284
    @param state: the serialized state
285
    @type writable: bool
286
    @param writable: Whether job can be modified
287
    @type archived: bool
288
    @param archived: Whether job was already archived
289
    @rtype: _JobQueue
290
    @return: the restored _JobQueue instance
291

292
    """
293
    obj = _QueuedJob.__new__(cls)
294
    obj.queue = queue
295
    obj.id = int(state["id"])
296
    obj.received_timestamp = state.get("received_timestamp", None)
297
    obj.start_timestamp = state.get("start_timestamp", None)
298
    obj.end_timestamp = state.get("end_timestamp", None)
299
    obj.archived = archived
300

    
301
    obj.ops = []
302
    obj.log_serial = 0
303
    for op_state in state["ops"]:
304
      op = _QueuedOpCode.Restore(op_state)
305
      for log_entry in op.log:
306
        obj.log_serial = max(obj.log_serial, log_entry[0])
307
      obj.ops.append(op)
308

    
309
    cls._InitInMemory(obj, writable)
310

    
311
    return obj
312

    
313
  def Serialize(self):
314
    """Serialize the _JobQueue instance.
315

316
    @rtype: dict
317
    @return: the serialized state
318

319
    """
320
    return {
321
      "id": self.id,
322
      "ops": [op.Serialize() for op in self.ops],
323
      "start_timestamp": self.start_timestamp,
324
      "end_timestamp": self.end_timestamp,
325
      "received_timestamp": self.received_timestamp,
326
      }
327

    
328
  def CalcStatus(self):
329
    """Compute the status of this job.
330

331
    This function iterates over all the _QueuedOpCodes in the job and
332
    based on their status, computes the job status.
333

334
    The algorithm is:
335
      - if we find a cancelled, or finished with error, the job
336
        status will be the same
337
      - otherwise, the last opcode with the status one of:
338
          - waitlock
339
          - canceling
340
          - running
341

342
        will determine the job status
343

344
      - otherwise, it means either all opcodes are queued, or success,
345
        and the job status will be the same
346

347
    @return: the job status
348

349
    """
350
    status = constants.JOB_STATUS_QUEUED
351

    
352
    all_success = True
353
    for op in self.ops:
354
      if op.status == constants.OP_STATUS_SUCCESS:
355
        continue
356

    
357
      all_success = False
358

    
359
      if op.status == constants.OP_STATUS_QUEUED:
360
        pass
361
      elif op.status == constants.OP_STATUS_WAITING:
362
        status = constants.JOB_STATUS_WAITING
363
      elif op.status == constants.OP_STATUS_RUNNING:
364
        status = constants.JOB_STATUS_RUNNING
365
      elif op.status == constants.OP_STATUS_CANCELING:
366
        status = constants.JOB_STATUS_CANCELING
367
        break
368
      elif op.status == constants.OP_STATUS_ERROR:
369
        status = constants.JOB_STATUS_ERROR
370
        # The whole job fails if one opcode failed
371
        break
372
      elif op.status == constants.OP_STATUS_CANCELED:
373
        status = constants.OP_STATUS_CANCELED
374
        break
375

    
376
    if all_success:
377
      status = constants.JOB_STATUS_SUCCESS
378

    
379
    return status
380

    
381
  def CalcPriority(self):
382
    """Gets the current priority for this job.
383

384
    Only unfinished opcodes are considered. When all are done, the default
385
    priority is used.
386

387
    @rtype: int
388

389
    """
390
    priorities = [op.priority for op in self.ops
391
                  if op.status not in constants.OPS_FINALIZED]
392

    
393
    if not priorities:
394
      # All opcodes are done, assume default priority
395
      return constants.OP_PRIO_DEFAULT
396

    
397
    return min(priorities)
398

    
399
  def GetLogEntries(self, newer_than):
400
    """Selectively returns the log entries.
401

402
    @type newer_than: None or int
403
    @param newer_than: if this is None, return all log entries,
404
        otherwise return only the log entries with serial higher
405
        than this value
406
    @rtype: list
407
    @return: the list of the log entries selected
408

409
    """
410
    if newer_than is None:
411
      serial = -1
412
    else:
413
      serial = newer_than
414

    
415
    entries = []
416
    for op in self.ops:
417
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
418

    
419
    return entries
420

    
421
  def GetInfo(self, fields):
422
    """Returns information about a job.
423

424
    @type fields: list
425
    @param fields: names of fields to return
426
    @rtype: list
427
    @return: list with one element for each field
428
    @raise errors.OpExecError: when an invalid field
429
        has been passed
430

431
    """
432
    return _SimpleJobQuery(fields)(self)
433

    
434
  def MarkUnfinishedOps(self, status, result):
435
    """Mark unfinished opcodes with a given status and result.
436

437
    This is an utility function for marking all running or waiting to
438
    be run opcodes with a given status. Opcodes which are already
439
    finalised are not changed.
440

441
    @param status: a given opcode status
442
    @param result: the opcode result
443

444
    """
445
    not_marked = True
446
    for op in self.ops:
447
      if op.status in constants.OPS_FINALIZED:
448
        assert not_marked, "Finalized opcodes found after non-finalized ones"
449
        continue
450
      op.status = status
451
      op.result = result
452
      not_marked = False
453

    
454
  def Finalize(self):
455
    """Marks the job as finalized.
456

457
    """
458
    self.end_timestamp = TimeStampNow()
459

    
460
  def Cancel(self):
461
    """Marks job as canceled/-ing if possible.
462

463
    @rtype: tuple; (bool, string)
464
    @return: Boolean describing whether job was successfully canceled or marked
465
      as canceling and a text message
466

467
    """
468
    status = self.CalcStatus()
469

    
470
    if status == constants.JOB_STATUS_QUEUED:
471
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
472
                             "Job canceled by request")
473
      self.Finalize()
474
      return (True, "Job %s canceled" % self.id)
475

    
476
    elif status == constants.JOB_STATUS_WAITING:
477
      # The worker will notice the new status and cancel the job
478
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
479
      return (True, "Job %s will be canceled" % self.id)
480

    
481
    else:
482
      logging.debug("Job %s is no longer waiting in the queue", self.id)
483
      return (False, "Job %s is no longer waiting in the queue" % self.id)
484

    
485
  def ChangePriority(self, priority):
486
    """Changes the job priority.
487

488
    @type priority: int
489
    @param priority: New priority
490
    @rtype: tuple; (bool, string)
491
    @return: Boolean describing whether job's priority was successfully changed
492
      and a text message
493

494
    """
495
    status = self.CalcStatus()
496

    
497
    if status in constants.JOBS_FINALIZED:
498
      return (False, "Job %s is finished" % self.id)
499
    elif status == constants.JOB_STATUS_CANCELING:
500
      return (False, "Job %s is cancelling" % self.id)
501
    else:
502
      assert status in (constants.JOB_STATUS_QUEUED,
503
                        constants.JOB_STATUS_WAITING,
504
                        constants.JOB_STATUS_RUNNING)
505

    
506
      changed = False
507
      for op in self.ops:
508
        if (op.status == constants.OP_STATUS_RUNNING or
509
            op.status in constants.OPS_FINALIZED):
510
          assert not changed, \
511
            ("Found opcode for which priority should not be changed after"
512
             " priority has been changed for previous opcodes")
513
          continue
514

    
515
        assert op.status in (constants.OP_STATUS_QUEUED,
516
                             constants.OP_STATUS_WAITING)
517

    
518
        changed = True
519

    
520
        # Set new priority (doesn't modify opcode input)
521
        op.priority = priority
522

    
523
      if changed:
524
        return (True, ("Priorities of pending opcodes for job %s have been"
525
                       " changed to %s" % (self.id, priority)))
526
      else:
527
        return (False, "Job %s had no pending opcodes" % self.id)
528

    
529

    
530
class _OpExecCallbacks(mcpu.OpExecCbBase):
531
  def __init__(self, queue, job, op):
532
    """Initializes this class.
533

534
    @type queue: L{JobQueue}
535
    @param queue: Job queue
536
    @type job: L{_QueuedJob}
537
    @param job: Job object
538
    @type op: L{_QueuedOpCode}
539
    @param op: OpCode
540

541
    """
542
    assert queue, "Queue is missing"
543
    assert job, "Job is missing"
544
    assert op, "Opcode is missing"
545

    
546
    self._queue = queue
547
    self._job = job
548
    self._op = op
549

    
550
  def _CheckCancel(self):
551
    """Raises an exception to cancel the job if asked to.
552

553
    """
554
    # Cancel here if we were asked to
555
    if self._op.status == constants.OP_STATUS_CANCELING:
556
      logging.debug("Canceling opcode")
557
      raise CancelJob()
558

    
559
    # See if queue is shutting down
560
    if not self._queue.AcceptingJobsUnlocked():
561
      logging.debug("Queue is shutting down")
562
      raise QueueShutdown()
563

    
564
  @locking.ssynchronized(_QUEUE, shared=1)
565
  def NotifyStart(self):
566
    """Mark the opcode as running, not lock-waiting.
567

568
    This is called from the mcpu code as a notifier function, when the LU is
569
    finally about to start the Exec() method. Of course, to have end-user
570
    visible results, the opcode must be initially (before calling into
571
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
572

573
    """
574
    assert self._op in self._job.ops
575
    assert self._op.status in (constants.OP_STATUS_WAITING,
576
                               constants.OP_STATUS_CANCELING)
577

    
578
    # Cancel here if we were asked to
579
    self._CheckCancel()
580

    
581
    logging.debug("Opcode is now running")
582

    
583
    self._op.status = constants.OP_STATUS_RUNNING
584
    self._op.exec_timestamp = TimeStampNow()
585

    
586
    # And finally replicate the job status
587
    self._queue.UpdateJobUnlocked(self._job)
588

    
589
  @locking.ssynchronized(_QUEUE, shared=1)
590
  def _AppendFeedback(self, timestamp, log_type, log_msg):
591
    """Internal feedback append function, with locks
592

593
    """
594
    self._job.log_serial += 1
595
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
596
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
597

    
598
  def Feedback(self, *args):
599
    """Append a log entry.
600

601
    """
602
    assert len(args) < 3
603

    
604
    if len(args) == 1:
605
      log_type = constants.ELOG_MESSAGE
606
      log_msg = args[0]
607
    else:
608
      (log_type, log_msg) = args
609

    
610
    # The time is split to make serialization easier and not lose
611
    # precision.
612
    timestamp = utils.SplitTime(time.time())
613
    self._AppendFeedback(timestamp, log_type, log_msg)
614

    
615
  def CurrentPriority(self):
616
    """Returns current priority for opcode.
617

618
    """
619
    assert self._op.status in (constants.OP_STATUS_WAITING,
620
                               constants.OP_STATUS_CANCELING)
621

    
622
    # Cancel here if we were asked to
623
    self._CheckCancel()
624

    
625
    return self._op.priority
626

    
627
  def SubmitManyJobs(self, jobs):
628
    """Submits jobs for processing.
629

630
    See L{JobQueue.SubmitManyJobs}.
631

632
    """
633
    # Locking is done in job queue
634
    return self._queue.SubmitManyJobs(jobs)
635

    
636

    
637
class _JobChangesChecker(object):
638
  def __init__(self, fields, prev_job_info, prev_log_serial):
639
    """Initializes this class.
640

641
    @type fields: list of strings
642
    @param fields: Fields requested by LUXI client
643
    @type prev_job_info: string
644
    @param prev_job_info: previous job info, as passed by the LUXI client
645
    @type prev_log_serial: string
646
    @param prev_log_serial: previous job serial, as passed by the LUXI client
647

648
    """
649
    self._squery = _SimpleJobQuery(fields)
650
    self._prev_job_info = prev_job_info
651
    self._prev_log_serial = prev_log_serial
652

    
653
  def __call__(self, job):
654
    """Checks whether job has changed.
655

656
    @type job: L{_QueuedJob}
657
    @param job: Job object
658

659
    """
660
    assert not job.writable, "Expected read-only job"
661

    
662
    status = job.CalcStatus()
663
    job_info = self._squery(job)
664
    log_entries = job.GetLogEntries(self._prev_log_serial)
665

    
666
    # Serializing and deserializing data can cause type changes (e.g. from
667
    # tuple to list) or precision loss. We're doing it here so that we get
668
    # the same modifications as the data received from the client. Without
669
    # this, the comparison afterwards might fail without the data being
670
    # significantly different.
671
    # TODO: we just deserialized from disk, investigate how to make sure that
672
    # the job info and log entries are compatible to avoid this further step.
673
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
674
    # efficient, though floats will be tricky
675
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
676
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
677

    
678
    # Don't even try to wait if the job is no longer running, there will be
679
    # no changes.
680
    if (status not in (constants.JOB_STATUS_QUEUED,
681
                       constants.JOB_STATUS_RUNNING,
682
                       constants.JOB_STATUS_WAITING) or
683
        job_info != self._prev_job_info or
684
        (log_entries and self._prev_log_serial != log_entries[0][0])):
685
      logging.debug("Job %s changed", job.id)
686
      return (job_info, log_entries)
687

    
688
    return None
689

    
690

    
691
class _JobFileChangesWaiter(object):
692
  def __init__(self, filename):
693
    """Initializes this class.
694

695
    @type filename: string
696
    @param filename: Path to job file
697
    @raises errors.InotifyError: if the notifier cannot be setup
698

699
    """
700
    self._wm = pyinotify.WatchManager()
701
    self._inotify_handler = \
702
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
703
    self._notifier = \
704
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
705
    try:
706
      self._inotify_handler.enable()
707
    except Exception:
708
      # pyinotify doesn't close file descriptors automatically
709
      self._notifier.stop()
710
      raise
711

    
712
  def _OnInotify(self, notifier_enabled):
713
    """Callback for inotify.
714

715
    """
716
    if not notifier_enabled:
717
      self._inotify_handler.enable()
718

    
719
  def Wait(self, timeout):
720
    """Waits for the job file to change.
721

722
    @type timeout: float
723
    @param timeout: Timeout in seconds
724
    @return: Whether there have been events
725

726
    """
727
    assert timeout >= 0
728
    have_events = self._notifier.check_events(timeout * 1000)
729
    if have_events:
730
      self._notifier.read_events()
731
    self._notifier.process_events()
732
    return have_events
733

    
734
  def Close(self):
735
    """Closes underlying notifier and its file descriptor.
736

737
    """
738
    self._notifier.stop()
739

    
740

    
741
class _JobChangesWaiter(object):
742
  def __init__(self, filename):
743
    """Initializes this class.
744

745
    @type filename: string
746
    @param filename: Path to job file
747

748
    """
749
    self._filewaiter = None
750
    self._filename = filename
751

    
752
  def Wait(self, timeout):
753
    """Waits for a job to change.
754

755
    @type timeout: float
756
    @param timeout: Timeout in seconds
757
    @return: Whether there have been events
758

759
    """
760
    if self._filewaiter:
761
      return self._filewaiter.Wait(timeout)
762

    
763
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
764
    # If this point is reached, return immediately and let caller check the job
765
    # file again in case there were changes since the last check. This avoids a
766
    # race condition.
767
    self._filewaiter = _JobFileChangesWaiter(self._filename)
768

    
769
    return True
770

    
771
  def Close(self):
772
    """Closes underlying waiter.
773

774
    """
775
    if self._filewaiter:
776
      self._filewaiter.Close()
777

    
778

    
779
class _WaitForJobChangesHelper(object):
780
  """Helper class using inotify to wait for changes in a job file.
781

782
  This class takes a previous job status and serial, and alerts the client when
783
  the current job status has changed.
784

785
  """
786
  @staticmethod
787
  def _CheckForChanges(counter, job_load_fn, check_fn):
788
    if counter.next() > 0:
789
      # If this isn't the first check the job is given some more time to change
790
      # again. This gives better performance for jobs generating many
791
      # changes/messages.
792
      time.sleep(0.1)
793

    
794
    job = job_load_fn()
795
    if not job:
796
      raise errors.JobLost()
797

    
798
    result = check_fn(job)
799
    if result is None:
800
      raise utils.RetryAgain()
801

    
802
    return result
803

    
804
  def __call__(self, filename, job_load_fn,
805
               fields, prev_job_info, prev_log_serial, timeout):
806
    """Waits for changes on a job.
807

808
    @type filename: string
809
    @param filename: File on which to wait for changes
810
    @type job_load_fn: callable
811
    @param job_load_fn: Function to load job
812
    @type fields: list of strings
813
    @param fields: Which fields to check for changes
814
    @type prev_job_info: list or None
815
    @param prev_job_info: Last job information returned
816
    @type prev_log_serial: int
817
    @param prev_log_serial: Last job message serial number
818
    @type timeout: float
819
    @param timeout: maximum time to wait in seconds
820

821
    """
822
    counter = itertools.count()
823
    try:
824
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
825
      waiter = _JobChangesWaiter(filename)
826
      try:
827
        return utils.Retry(compat.partial(self._CheckForChanges,
828
                                          counter, job_load_fn, check_fn),
829
                           utils.RETRY_REMAINING_TIME, timeout,
830
                           wait_fn=waiter.Wait)
831
      finally:
832
        waiter.Close()
833
    except (errors.InotifyError, errors.JobLost):
834
      return None
835
    except utils.RetryTimeout:
836
      return constants.JOB_NOTCHANGED
837

    
838

    
839
def _EncodeOpError(err):
840
  """Encodes an error which occurred while processing an opcode.
841

842
  """
843
  if isinstance(err, errors.GenericError):
844
    to_encode = err
845
  else:
846
    to_encode = errors.OpExecError(str(err))
847

    
848
  return errors.EncodeException(to_encode)
849

    
850

    
851
class _TimeoutStrategyWrapper:
852
  def __init__(self, fn):
853
    """Initializes this class.
854

855
    """
856
    self._fn = fn
857
    self._next = None
858

    
859
  def _Advance(self):
860
    """Gets the next timeout if necessary.
861

862
    """
863
    if self._next is None:
864
      self._next = self._fn()
865

    
866
  def Peek(self):
867
    """Returns the next timeout.
868

869
    """
870
    self._Advance()
871
    return self._next
872

    
873
  def Next(self):
874
    """Returns the current timeout and advances the internal state.
875

876
    """
877
    self._Advance()
878
    result = self._next
879
    self._next = None
880
    return result
881

    
882

    
883
class _OpExecContext:
884
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
885
    """Initializes this class.
886

887
    """
888
    self.op = op
889
    self.index = index
890
    self.log_prefix = log_prefix
891
    self.summary = op.input.Summary()
892

    
893
    # Create local copy to modify
894
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
895
      self.jobdeps = op.input.depends[:]
896
    else:
897
      self.jobdeps = None
898

    
899
    self._timeout_strategy_factory = timeout_strategy_factory
900
    self._ResetTimeoutStrategy()
901

    
902
  def _ResetTimeoutStrategy(self):
903
    """Creates a new timeout strategy.
904

905
    """
906
    self._timeout_strategy = \
907
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
908

    
909
  def CheckPriorityIncrease(self):
910
    """Checks whether priority can and should be increased.
911

912
    Called when locks couldn't be acquired.
913

914
    """
915
    op = self.op
916

    
917
    # Exhausted all retries and next round should not use blocking acquire
918
    # for locks?
919
    if (self._timeout_strategy.Peek() is None and
920
        op.priority > constants.OP_PRIO_HIGHEST):
921
      logging.debug("Increasing priority")
922
      op.priority -= 1
923
      self._ResetTimeoutStrategy()
924
      return True
925

    
926
    return False
927

    
928
  def GetNextLockTimeout(self):
929
    """Returns the next lock acquire timeout.
930

931
    """
932
    return self._timeout_strategy.Next()
933

    
934

    
935
class _JobProcessor(object):
936
  (DEFER,
937
   WAITDEP,
938
   FINISHED) = range(1, 4)
939

    
940
  def __init__(self, queue, opexec_fn, job,
941
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
942
    """Initializes this class.
943

944
    """
945
    self.queue = queue
946
    self.opexec_fn = opexec_fn
947
    self.job = job
948
    self._timeout_strategy_factory = _timeout_strategy_factory
949

    
950
  @staticmethod
951
  def _FindNextOpcode(job, timeout_strategy_factory):
952
    """Locates the next opcode to run.
953

954
    @type job: L{_QueuedJob}
955
    @param job: Job object
956
    @param timeout_strategy_factory: Callable to create new timeout strategy
957

958
    """
959
    # Create some sort of a cache to speed up locating next opcode for future
960
    # lookups
961
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
962
    # pending and one for processed ops.
963
    if job.ops_iter is None:
964
      job.ops_iter = enumerate(job.ops)
965

    
966
    # Find next opcode to run
967
    while True:
968
      try:
969
        (idx, op) = job.ops_iter.next()
970
      except StopIteration:
971
        raise errors.ProgrammerError("Called for a finished job")
972

    
973
      if op.status == constants.OP_STATUS_RUNNING:
974
        # Found an opcode already marked as running
975
        raise errors.ProgrammerError("Called for job marked as running")
976

    
977
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
978
                             timeout_strategy_factory)
979

    
980
      if op.status not in constants.OPS_FINALIZED:
981
        return opctx
982

    
983
      # This is a job that was partially completed before master daemon
984
      # shutdown, so it can be expected that some opcodes are already
985
      # completed successfully (if any did error out, then the whole job
986
      # should have been aborted and not resubmitted for processing).
987
      logging.info("%s: opcode %s already processed, skipping",
988
                   opctx.log_prefix, opctx.summary)
989

    
990
  @staticmethod
991
  def _MarkWaitlock(job, op):
992
    """Marks an opcode as waiting for locks.
993

994
    The job's start timestamp is also set if necessary.
995

996
    @type job: L{_QueuedJob}
997
    @param job: Job object
998
    @type op: L{_QueuedOpCode}
999
    @param op: Opcode object
1000

1001
    """
1002
    assert op in job.ops
1003
    assert op.status in (constants.OP_STATUS_QUEUED,
1004
                         constants.OP_STATUS_WAITING)
1005

    
1006
    update = False
1007

    
1008
    op.result = None
1009

    
1010
    if op.status == constants.OP_STATUS_QUEUED:
1011
      op.status = constants.OP_STATUS_WAITING
1012
      update = True
1013

    
1014
    if op.start_timestamp is None:
1015
      op.start_timestamp = TimeStampNow()
1016
      update = True
1017

    
1018
    if job.start_timestamp is None:
1019
      job.start_timestamp = op.start_timestamp
1020
      update = True
1021

    
1022
    assert op.status == constants.OP_STATUS_WAITING
1023

    
1024
    return update
1025

    
1026
  @staticmethod
1027
  def _CheckDependencies(queue, job, opctx):
1028
    """Checks if an opcode has dependencies and if so, processes them.
1029

1030
    @type queue: L{JobQueue}
1031
    @param queue: Queue object
1032
    @type job: L{_QueuedJob}
1033
    @param job: Job object
1034
    @type opctx: L{_OpExecContext}
1035
    @param opctx: Opcode execution context
1036
    @rtype: bool
1037
    @return: Whether opcode will be re-scheduled by dependency tracker
1038

1039
    """
1040
    op = opctx.op
1041

    
1042
    result = False
1043

    
1044
    while opctx.jobdeps:
1045
      (dep_job_id, dep_status) = opctx.jobdeps[0]
1046

    
1047
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1048
                                                          dep_status)
1049
      assert ht.TNonEmptyString(depmsg), "No dependency message"
1050

    
1051
      logging.info("%s: %s", opctx.log_prefix, depmsg)
1052

    
1053
      if depresult == _JobDependencyManager.CONTINUE:
1054
        # Remove dependency and continue
1055
        opctx.jobdeps.pop(0)
1056

    
1057
      elif depresult == _JobDependencyManager.WAIT:
1058
        # Need to wait for notification, dependency tracker will re-add job
1059
        # to workerpool
1060
        result = True
1061
        break
1062

    
1063
      elif depresult == _JobDependencyManager.CANCEL:
1064
        # Job was cancelled, cancel this job as well
1065
        job.Cancel()
1066
        assert op.status == constants.OP_STATUS_CANCELING
1067
        break
1068

    
1069
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1070
                         _JobDependencyManager.ERROR):
1071
        # Job failed or there was an error, this job must fail
1072
        op.status = constants.OP_STATUS_ERROR
1073
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1074
        break
1075

    
1076
      else:
1077
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1078
                                     depresult)
1079

    
1080
    return result
1081

    
1082
  def _ExecOpCodeUnlocked(self, opctx):
1083
    """Processes one opcode and returns the result.
1084

1085
    """
1086
    op = opctx.op
1087

    
1088
    assert op.status == constants.OP_STATUS_WAITING
1089

    
1090
    timeout = opctx.GetNextLockTimeout()
1091

    
1092
    try:
1093
      # Make sure not to hold queue lock while calling ExecOpCode
1094
      result = self.opexec_fn(op.input,
1095
                              _OpExecCallbacks(self.queue, self.job, op),
1096
                              timeout=timeout)
1097
    except mcpu.LockAcquireTimeout:
1098
      assert timeout is not None, "Received timeout for blocking acquire"
1099
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1100

    
1101
      assert op.status in (constants.OP_STATUS_WAITING,
1102
                           constants.OP_STATUS_CANCELING)
1103

    
1104
      # Was job cancelled while we were waiting for the lock?
1105
      if op.status == constants.OP_STATUS_CANCELING:
1106
        return (constants.OP_STATUS_CANCELING, None)
1107

    
1108
      # Queue is shutting down, return to queued
1109
      if not self.queue.AcceptingJobsUnlocked():
1110
        return (constants.OP_STATUS_QUEUED, None)
1111

    
1112
      # Stay in waitlock while trying to re-acquire lock
1113
      return (constants.OP_STATUS_WAITING, None)
1114
    except CancelJob:
1115
      logging.exception("%s: Canceling job", opctx.log_prefix)
1116
      assert op.status == constants.OP_STATUS_CANCELING
1117
      return (constants.OP_STATUS_CANCELING, None)
1118

    
1119
    except QueueShutdown:
1120
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1121

    
1122
      assert op.status == constants.OP_STATUS_WAITING
1123

    
1124
      # Job hadn't been started yet, so it should return to the queue
1125
      return (constants.OP_STATUS_QUEUED, None)
1126

    
1127
    except Exception, err: # pylint: disable=W0703
1128
      logging.exception("%s: Caught exception in %s",
1129
                        opctx.log_prefix, opctx.summary)
1130
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1131
    else:
1132
      logging.debug("%s: %s successful",
1133
                    opctx.log_prefix, opctx.summary)
1134
      return (constants.OP_STATUS_SUCCESS, result)
1135

    
1136
  def __call__(self, _nextop_fn=None):
1137
    """Continues execution of a job.
1138

1139
    @param _nextop_fn: Callback function for tests
1140
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1141
      be deferred and C{WAITDEP} if the dependency manager
1142
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1143

1144
    """
1145
    queue = self.queue
1146
    job = self.job
1147

    
1148
    logging.debug("Processing job %s", job.id)
1149

    
1150
    queue.acquire(shared=1)
1151
    try:
1152
      opcount = len(job.ops)
1153

    
1154
      assert job.writable, "Expected writable job"
1155

    
1156
      # Don't do anything for finalized jobs
1157
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1158
        return self.FINISHED
1159

    
1160
      # Is a previous opcode still pending?
1161
      if job.cur_opctx:
1162
        opctx = job.cur_opctx
1163
        job.cur_opctx = None
1164
      else:
1165
        if __debug__ and _nextop_fn:
1166
          _nextop_fn()
1167
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1168

    
1169
      op = opctx.op
1170

    
1171
      # Consistency check
1172
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1173
                                     constants.OP_STATUS_CANCELING)
1174
                        for i in job.ops[opctx.index + 1:])
1175

    
1176
      assert op.status in (constants.OP_STATUS_QUEUED,
1177
                           constants.OP_STATUS_WAITING,
1178
                           constants.OP_STATUS_CANCELING)
1179

    
1180
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1181
              op.priority >= constants.OP_PRIO_HIGHEST)
1182

    
1183
      waitjob = None
1184

    
1185
      if op.status != constants.OP_STATUS_CANCELING:
1186
        assert op.status in (constants.OP_STATUS_QUEUED,
1187
                             constants.OP_STATUS_WAITING)
1188

    
1189
        # Prepare to start opcode
1190
        if self._MarkWaitlock(job, op):
1191
          # Write to disk
1192
          queue.UpdateJobUnlocked(job)
1193

    
1194
        assert op.status == constants.OP_STATUS_WAITING
1195
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1196
        assert job.start_timestamp and op.start_timestamp
1197
        assert waitjob is None
1198

    
1199
        # Check if waiting for a job is necessary
1200
        waitjob = self._CheckDependencies(queue, job, opctx)
1201

    
1202
        assert op.status in (constants.OP_STATUS_WAITING,
1203
                             constants.OP_STATUS_CANCELING,
1204
                             constants.OP_STATUS_ERROR)
1205

    
1206
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1207
                                         constants.OP_STATUS_ERROR)):
1208
          logging.info("%s: opcode %s waiting for locks",
1209
                       opctx.log_prefix, opctx.summary)
1210

    
1211
          assert not opctx.jobdeps, "Not all dependencies were removed"
1212

    
1213
          queue.release()
1214
          try:
1215
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1216
          finally:
1217
            queue.acquire(shared=1)
1218

    
1219
          op.status = op_status
1220
          op.result = op_result
1221

    
1222
          assert not waitjob
1223

    
1224
        if op.status in (constants.OP_STATUS_WAITING,
1225
                         constants.OP_STATUS_QUEUED):
1226
          # waiting: Couldn't get locks in time
1227
          # queued: Queue is shutting down
1228
          assert not op.end_timestamp
1229
        else:
1230
          # Finalize opcode
1231
          op.end_timestamp = TimeStampNow()
1232

    
1233
          if op.status == constants.OP_STATUS_CANCELING:
1234
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1235
                                  for i in job.ops[opctx.index:])
1236
          else:
1237
            assert op.status in constants.OPS_FINALIZED
1238

    
1239
      if op.status == constants.OP_STATUS_QUEUED:
1240
        # Queue is shutting down
1241
        assert not waitjob
1242

    
1243
        finalize = False
1244

    
1245
        # Reset context
1246
        job.cur_opctx = None
1247

    
1248
        # In no case must the status be finalized here
1249
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1250

    
1251
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1252
        finalize = False
1253

    
1254
        if not waitjob and opctx.CheckPriorityIncrease():
1255
          # Priority was changed, need to update on-disk file
1256
          queue.UpdateJobUnlocked(job)
1257

    
1258
        # Keep around for another round
1259
        job.cur_opctx = opctx
1260

    
1261
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1262
                op.priority >= constants.OP_PRIO_HIGHEST)
1263

    
1264
        # In no case must the status be finalized here
1265
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1266

    
1267
      else:
1268
        # Ensure all opcodes so far have been successful
1269
        assert (opctx.index == 0 or
1270
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1271
                           for i in job.ops[:opctx.index]))
1272

    
1273
        # Reset context
1274
        job.cur_opctx = None
1275

    
1276
        if op.status == constants.OP_STATUS_SUCCESS:
1277
          finalize = False
1278

    
1279
        elif op.status == constants.OP_STATUS_ERROR:
1280
          # Ensure failed opcode has an exception as its result
1281
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1282

    
1283
          to_encode = errors.OpExecError("Preceding opcode failed")
1284
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1285
                                _EncodeOpError(to_encode))
1286
          finalize = True
1287

    
1288
          # Consistency check
1289
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1290
                            errors.GetEncodedError(i.result)
1291
                            for i in job.ops[opctx.index:])
1292

    
1293
        elif op.status == constants.OP_STATUS_CANCELING:
1294
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1295
                                "Job canceled by request")
1296
          finalize = True
1297

    
1298
        else:
1299
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1300

    
1301
        if opctx.index == (opcount - 1):
1302
          # Finalize on last opcode
1303
          finalize = True
1304

    
1305
        if finalize:
1306
          # All opcodes have been run, finalize job
1307
          job.Finalize()
1308

    
1309
        # Write to disk. If the job status is final, this is the final write
1310
        # allowed. Once the file has been written, it can be archived anytime.
1311
        queue.UpdateJobUnlocked(job)
1312

    
1313
        assert not waitjob
1314

    
1315
        if finalize:
1316
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1317
          return self.FINISHED
1318

    
1319
      assert not waitjob or queue.depmgr.JobWaiting(job)
1320

    
1321
      if waitjob:
1322
        return self.WAITDEP
1323
      else:
1324
        return self.DEFER
1325
    finally:
1326
      assert job.writable, "Job became read-only while being processed"
1327
      queue.release()
1328

    
1329

    
1330
def _EvaluateJobProcessorResult(depmgr, job, result):
1331
  """Looks at a result from L{_JobProcessor} for a job.
1332

1333
  To be used in a L{_JobQueueWorker}.
1334

1335
  """
1336
  if result == _JobProcessor.FINISHED:
1337
    # Notify waiting jobs
1338
    depmgr.NotifyWaiters(job.id)
1339

    
1340
  elif result == _JobProcessor.DEFER:
1341
    # Schedule again
1342
    raise workerpool.DeferTask(priority=job.CalcPriority())
1343

    
1344
  elif result == _JobProcessor.WAITDEP:
1345
    # No-op, dependency manager will re-schedule
1346
    pass
1347

    
1348
  else:
1349
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1350
                                 (result, ))
1351

    
1352

    
1353
class _JobQueueWorker(workerpool.BaseWorker):
1354
  """The actual job workers.
1355

1356
  """
1357
  def RunTask(self, job): # pylint: disable=W0221
1358
    """Job executor.
1359

1360
    @type job: L{_QueuedJob}
1361
    @param job: the job to be processed
1362

1363
    """
1364
    assert job.writable, "Expected writable job"
1365

    
1366
    # Ensure only one worker is active on a single job. If a job registers for
1367
    # a dependency job, and the other job notifies before the first worker is
1368
    # done, the job can end up in the tasklist more than once.
1369
    job.processor_lock.acquire()
1370
    try:
1371
      return self._RunTaskInner(job)
1372
    finally:
1373
      job.processor_lock.release()
1374

    
1375
  def _RunTaskInner(self, job):
1376
    """Executes a job.
1377

1378
    Must be called with per-job lock acquired.
1379

1380
    """
1381
    queue = job.queue
1382
    assert queue == self.pool.queue
1383

    
1384
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1385
    setname_fn(None)
1386

    
1387
    proc = mcpu.Processor(queue.context, job.id)
1388

    
1389
    # Create wrapper for setting thread name
1390
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1391
                                    proc.ExecOpCode)
1392

    
1393
    _EvaluateJobProcessorResult(queue.depmgr, job,
1394
                                _JobProcessor(queue, wrap_execop_fn, job)())
1395

    
1396
  @staticmethod
1397
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1398
    """Updates the worker thread name to include a short summary of the opcode.
1399

1400
    @param setname_fn: Callable setting worker thread name
1401
    @param execop_fn: Callable for executing opcode (usually
1402
                      L{mcpu.Processor.ExecOpCode})
1403

1404
    """
1405
    setname_fn(op)
1406
    try:
1407
      return execop_fn(op, *args, **kwargs)
1408
    finally:
1409
      setname_fn(None)
1410

    
1411
  @staticmethod
1412
  def _GetWorkerName(job, op):
1413
    """Sets the worker thread name.
1414

1415
    @type job: L{_QueuedJob}
1416
    @type op: L{opcodes.OpCode}
1417

1418
    """
1419
    parts = ["Job%s" % job.id]
1420

    
1421
    if op:
1422
      parts.append(op.TinySummary())
1423

    
1424
    return "/".join(parts)
1425

    
1426

    
1427
class _JobQueueWorkerPool(workerpool.WorkerPool):
1428
  """Simple class implementing a job-processing workerpool.
1429

1430
  """
1431
  def __init__(self, queue):
1432
    super(_JobQueueWorkerPool, self).__init__("Jq",
1433
                                              JOBQUEUE_THREADS,
1434
                                              _JobQueueWorker)
1435
    self.queue = queue
1436

    
1437

    
1438
class _JobDependencyManager:
1439
  """Keeps track of job dependencies.
1440

1441
  """
1442
  (WAIT,
1443
   ERROR,
1444
   CANCEL,
1445
   CONTINUE,
1446
   WRONGSTATUS) = range(1, 6)
1447

    
1448
  def __init__(self, getstatus_fn, enqueue_fn):
1449
    """Initializes this class.
1450

1451
    """
1452
    self._getstatus_fn = getstatus_fn
1453
    self._enqueue_fn = enqueue_fn
1454

    
1455
    self._waiters = {}
1456
    self._lock = locking.SharedLock("JobDepMgr")
1457

    
1458
  @locking.ssynchronized(_LOCK, shared=1)
1459
  def GetLockInfo(self, requested): # pylint: disable=W0613
1460
    """Retrieves information about waiting jobs.
1461

1462
    @type requested: set
1463
    @param requested: Requested information, see C{query.LQ_*}
1464

1465
    """
1466
    # No need to sort here, that's being done by the lock manager and query
1467
    # library. There are no priorities for notifying jobs, hence all show up as
1468
    # one item under "pending".
1469
    return [("job/%s" % job_id, None, None,
1470
             [("job", [job.id for job in waiters])])
1471
            for job_id, waiters in self._waiters.items()
1472
            if waiters]
1473

    
1474
  @locking.ssynchronized(_LOCK, shared=1)
1475
  def JobWaiting(self, job):
1476
    """Checks if a job is waiting.
1477

1478
    """
1479
    return compat.any(job in jobs
1480
                      for jobs in self._waiters.values())
1481

    
1482
  @locking.ssynchronized(_LOCK)
1483
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1484
    """Checks if a dependency job has the requested status.
1485

1486
    If the other job is not yet in a finalized status, the calling job will be
1487
    notified (re-added to the workerpool) at a later point.
1488

1489
    @type job: L{_QueuedJob}
1490
    @param job: Job object
1491
    @type dep_job_id: int
1492
    @param dep_job_id: ID of dependency job
1493
    @type dep_status: list
1494
    @param dep_status: Required status
1495

1496
    """
1497
    assert ht.TJobId(job.id)
1498
    assert ht.TJobId(dep_job_id)
1499
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1500

    
1501
    if job.id == dep_job_id:
1502
      return (self.ERROR, "Job can't depend on itself")
1503

    
1504
    # Get status of dependency job
1505
    try:
1506
      status = self._getstatus_fn(dep_job_id)
1507
    except errors.JobLost, err:
1508
      return (self.ERROR, "Dependency error: %s" % err)
1509

    
1510
    assert status in constants.JOB_STATUS_ALL
1511

    
1512
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1513

    
1514
    if status not in constants.JOBS_FINALIZED:
1515
      # Register for notification and wait for job to finish
1516
      job_id_waiters.add(job)
1517
      return (self.WAIT,
1518
              "Need to wait for job %s, wanted status '%s'" %
1519
              (dep_job_id, dep_status))
1520

    
1521
    # Remove from waiters list
1522
    if job in job_id_waiters:
1523
      job_id_waiters.remove(job)
1524

    
1525
    if (status == constants.JOB_STATUS_CANCELED and
1526
        constants.JOB_STATUS_CANCELED not in dep_status):
1527
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1528

    
1529
    elif not dep_status or status in dep_status:
1530
      return (self.CONTINUE,
1531
              "Dependency job %s finished with status '%s'" %
1532
              (dep_job_id, status))
1533

    
1534
    else:
1535
      return (self.WRONGSTATUS,
1536
              "Dependency job %s finished with status '%s',"
1537
              " not one of '%s' as required" %
1538
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1539

    
1540
  def _RemoveEmptyWaitersUnlocked(self):
1541
    """Remove all jobs without actual waiters.
1542

1543
    """
1544
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1545
                   if not waiters]:
1546
      del self._waiters[job_id]
1547

    
1548
  def NotifyWaiters(self, job_id):
1549
    """Notifies all jobs waiting for a certain job ID.
1550

1551
    @attention: Do not call until L{CheckAndRegister} returned a status other
1552
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1553
    @type job_id: int
1554
    @param job_id: Job ID
1555

1556
    """
1557
    assert ht.TJobId(job_id)
1558

    
1559
    self._lock.acquire()
1560
    try:
1561
      self._RemoveEmptyWaitersUnlocked()
1562

    
1563
      jobs = self._waiters.pop(job_id, None)
1564
    finally:
1565
      self._lock.release()
1566

    
1567
    if jobs:
1568
      # Re-add jobs to workerpool
1569
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1570
                    len(jobs), job_id)
1571
      self._enqueue_fn(jobs)
1572

    
1573

    
1574
def _RequireOpenQueue(fn):
1575
  """Decorator for "public" functions.
1576

1577
  This function should be used for all 'public' functions. That is,
1578
  functions usually called from other classes. Note that this should
1579
  be applied only to methods (not plain functions), since it expects
1580
  that the decorated function is called with a first argument that has
1581
  a '_queue_filelock' argument.
1582

1583
  @warning: Use this decorator only after locking.ssynchronized
1584

1585
  Example::
1586
    @locking.ssynchronized(_LOCK)
1587
    @_RequireOpenQueue
1588
    def Example(self):
1589
      pass
1590

1591
  """
1592
  def wrapper(self, *args, **kwargs):
1593
    # pylint: disable=W0212
1594
    assert self._queue_filelock is not None, "Queue should be open"
1595
    return fn(self, *args, **kwargs)
1596
  return wrapper
1597

    
1598

    
1599
def _RequireNonDrainedQueue(fn):
1600
  """Decorator checking for a non-drained queue.
1601

1602
  To be used with functions submitting new jobs.
1603

1604
  """
1605
  def wrapper(self, *args, **kwargs):
1606
    """Wrapper function.
1607

1608
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1609

1610
    """
1611
    # Ok when sharing the big job queue lock, as the drain file is created when
1612
    # the lock is exclusive.
1613
    # Needs access to protected member, pylint: disable=W0212
1614
    if self._drained:
1615
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1616

    
1617
    if not self._accepting_jobs:
1618
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1619

    
1620
    return fn(self, *args, **kwargs)
1621
  return wrapper
1622

    
1623

    
1624
class JobQueue(object):
1625
  """Queue used to manage the jobs.
1626

1627
  """
1628
  def __init__(self, context):
1629
    """Constructor for JobQueue.
1630

1631
    The constructor will initialize the job queue object and then
1632
    start loading the current jobs from disk, either for starting them
1633
    (if they were queue) or for aborting them (if they were already
1634
    running).
1635

1636
    @type context: GanetiContext
1637
    @param context: the context object for access to the configuration
1638
        data and other ganeti objects
1639

1640
    """
1641
    self.context = context
1642
    self._memcache = weakref.WeakValueDictionary()
1643
    self._my_hostname = netutils.Hostname.GetSysName()
1644

    
1645
    # The Big JobQueue lock. If a code block or method acquires it in shared
1646
    # mode safe it must guarantee concurrency with all the code acquiring it in
1647
    # shared mode, including itself. In order not to acquire it at all
1648
    # concurrency must be guaranteed with all code acquiring it in shared mode
1649
    # and all code acquiring it exclusively.
1650
    self._lock = locking.SharedLock("JobQueue")
1651

    
1652
    self.acquire = self._lock.acquire
1653
    self.release = self._lock.release
1654

    
1655
    # Accept jobs by default
1656
    self._accepting_jobs = True
1657

    
1658
    # Initialize the queue, and acquire the filelock.
1659
    # This ensures no other process is working on the job queue.
1660
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1661

    
1662
    # Read serial file
1663
    self._last_serial = jstore.ReadSerial()
1664
    assert self._last_serial is not None, ("Serial file was modified between"
1665
                                           " check in jstore and here")
1666

    
1667
    # Get initial list of nodes
1668
    self._nodes = dict((n.name, n.primary_ip)
1669
                       for n in self.context.cfg.GetAllNodesInfo().values()
1670
                       if n.master_candidate)
1671

    
1672
    # Remove master node
1673
    self._nodes.pop(self._my_hostname, None)
1674

    
1675
    # TODO: Check consistency across nodes
1676

    
1677
    self._queue_size = None
1678
    self._UpdateQueueSizeUnlocked()
1679
    assert ht.TInt(self._queue_size)
1680
    self._drained = jstore.CheckDrainFlag()
1681

    
1682
    # Job dependencies
1683
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1684
                                        self._EnqueueJobs)
1685
    self.context.glm.AddToLockMonitor(self.depmgr)
1686

    
1687
    # Setup worker pool
1688
    self._wpool = _JobQueueWorkerPool(self)
1689
    try:
1690
      self._InspectQueue()
1691
    except:
1692
      self._wpool.TerminateWorkers()
1693
      raise
1694

    
1695
  @locking.ssynchronized(_LOCK)
1696
  @_RequireOpenQueue
1697
  def _InspectQueue(self):
1698
    """Loads the whole job queue and resumes unfinished jobs.
1699

1700
    This function needs the lock here because WorkerPool.AddTask() may start a
1701
    job while we're still doing our work.
1702

1703
    """
1704
    logging.info("Inspecting job queue")
1705

    
1706
    restartjobs = []
1707

    
1708
    all_job_ids = self._GetJobIDsUnlocked()
1709
    jobs_count = len(all_job_ids)
1710
    lastinfo = time.time()
1711
    for idx, job_id in enumerate(all_job_ids):
1712
      # Give an update every 1000 jobs or 10 seconds
1713
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1714
          idx == (jobs_count - 1)):
1715
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1716
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1717
        lastinfo = time.time()
1718

    
1719
      job = self._LoadJobUnlocked(job_id)
1720

    
1721
      # a failure in loading the job can cause 'None' to be returned
1722
      if job is None:
1723
        continue
1724

    
1725
      status = job.CalcStatus()
1726

    
1727
      if status == constants.JOB_STATUS_QUEUED:
1728
        restartjobs.append(job)
1729

    
1730
      elif status in (constants.JOB_STATUS_RUNNING,
1731
                      constants.JOB_STATUS_WAITING,
1732
                      constants.JOB_STATUS_CANCELING):
1733
        logging.warning("Unfinished job %s found: %s", job.id, job)
1734

    
1735
        if status == constants.JOB_STATUS_WAITING:
1736
          # Restart job
1737
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1738
          restartjobs.append(job)
1739
        else:
1740
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1741
                                "Unclean master daemon shutdown")
1742
          job.Finalize()
1743

    
1744
        self.UpdateJobUnlocked(job)
1745

    
1746
    if restartjobs:
1747
      logging.info("Restarting %s jobs", len(restartjobs))
1748
      self._EnqueueJobsUnlocked(restartjobs)
1749

    
1750
    logging.info("Job queue inspection finished")
1751

    
1752
  def _GetRpc(self, address_list):
1753
    """Gets RPC runner with context.
1754

1755
    """
1756
    return rpc.JobQueueRunner(self.context, address_list)
1757

    
1758
  @locking.ssynchronized(_LOCK)
1759
  @_RequireOpenQueue
1760
  def AddNode(self, node):
1761
    """Register a new node with the queue.
1762

1763
    @type node: L{objects.Node}
1764
    @param node: the node object to be added
1765

1766
    """
1767
    node_name = node.name
1768
    assert node_name != self._my_hostname
1769

    
1770
    # Clean queue directory on added node
1771
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1772
    msg = result.fail_msg
1773
    if msg:
1774
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1775
                      node_name, msg)
1776

    
1777
    if not node.master_candidate:
1778
      # remove if existing, ignoring errors
1779
      self._nodes.pop(node_name, None)
1780
      # and skip the replication of the job ids
1781
      return
1782

    
1783
    # Upload the whole queue excluding archived jobs
1784
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1785

    
1786
    # Upload current serial file
1787
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1788

    
1789
    # Static address list
1790
    addrs = [node.primary_ip]
1791

    
1792
    for file_name in files:
1793
      # Read file content
1794
      content = utils.ReadFile(file_name)
1795

    
1796
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1797
                             file_name, content)
1798
      msg = result[node_name].fail_msg
1799
      if msg:
1800
        logging.error("Failed to upload file %s to node %s: %s",
1801
                      file_name, node_name, msg)
1802

    
1803
    # Set queue drained flag
1804
    result = \
1805
      self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1806
                                                       self._drained)
1807
    msg = result[node_name].fail_msg
1808
    if msg:
1809
      logging.error("Failed to set queue drained flag on node %s: %s",
1810
                    node_name, msg)
1811

    
1812
    self._nodes[node_name] = node.primary_ip
1813

    
1814
  @locking.ssynchronized(_LOCK)
1815
  @_RequireOpenQueue
1816
  def RemoveNode(self, node_name):
1817
    """Callback called when removing nodes from the cluster.
1818

1819
    @type node_name: str
1820
    @param node_name: the name of the node to remove
1821

1822
    """
1823
    self._nodes.pop(node_name, None)
1824

    
1825
  @staticmethod
1826
  def _CheckRpcResult(result, nodes, failmsg):
1827
    """Verifies the status of an RPC call.
1828

1829
    Since we aim to keep consistency should this node (the current
1830
    master) fail, we will log errors if our rpc fail, and especially
1831
    log the case when more than half of the nodes fails.
1832

1833
    @param result: the data as returned from the rpc call
1834
    @type nodes: list
1835
    @param nodes: the list of nodes we made the call to
1836
    @type failmsg: str
1837
    @param failmsg: the identifier to be used for logging
1838

1839
    """
1840
    failed = []
1841
    success = []
1842

    
1843
    for node in nodes:
1844
      msg = result[node].fail_msg
1845
      if msg:
1846
        failed.append(node)
1847
        logging.error("RPC call %s (%s) failed on node %s: %s",
1848
                      result[node].call, failmsg, node, msg)
1849
      else:
1850
        success.append(node)
1851

    
1852
    # +1 for the master node
1853
    if (len(success) + 1) < len(failed):
1854
      # TODO: Handle failing nodes
1855
      logging.error("More than half of the nodes failed")
1856

    
1857
  def _GetNodeIp(self):
1858
    """Helper for returning the node name/ip list.
1859

1860
    @rtype: (list, list)
1861
    @return: a tuple of two lists, the first one with the node
1862
        names and the second one with the node addresses
1863

1864
    """
1865
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1866
    name_list = self._nodes.keys()
1867
    addr_list = [self._nodes[name] for name in name_list]
1868
    return name_list, addr_list
1869

    
1870
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1871
    """Writes a file locally and then replicates it to all nodes.
1872

1873
    This function will replace the contents of a file on the local
1874
    node and then replicate it to all the other nodes we have.
1875

1876
    @type file_name: str
1877
    @param file_name: the path of the file to be replicated
1878
    @type data: str
1879
    @param data: the new contents of the file
1880
    @type replicate: boolean
1881
    @param replicate: whether to spread the changes to the remote nodes
1882

1883
    """
1884
    getents = runtime.GetEnts()
1885
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1886
                    gid=getents.masterd_gid)
1887

    
1888
    if replicate:
1889
      names, addrs = self._GetNodeIp()
1890
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1891
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1892

    
1893
  def _RenameFilesUnlocked(self, rename):
1894
    """Renames a file locally and then replicate the change.
1895

1896
    This function will rename a file in the local queue directory
1897
    and then replicate this rename to all the other nodes we have.
1898

1899
    @type rename: list of (old, new)
1900
    @param rename: List containing tuples mapping old to new names
1901

1902
    """
1903
    # Rename them locally
1904
    for old, new in rename:
1905
      utils.RenameFile(old, new, mkdir=True)
1906

    
1907
    # ... and on all nodes
1908
    names, addrs = self._GetNodeIp()
1909
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1910
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1911

    
1912
  def _NewSerialsUnlocked(self, count):
1913
    """Generates a new job identifier.
1914

1915
    Job identifiers are unique during the lifetime of a cluster.
1916

1917
    @type count: integer
1918
    @param count: how many serials to return
1919
    @rtype: list of int
1920
    @return: a list of job identifiers.
1921

1922
    """
1923
    assert ht.TNonNegativeInt(count)
1924

    
1925
    # New number
1926
    serial = self._last_serial + count
1927

    
1928
    # Write to file
1929
    self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1930
                             "%s\n" % serial, True)
1931

    
1932
    result = [jstore.FormatJobID(v)
1933
              for v in range(self._last_serial + 1, serial + 1)]
1934

    
1935
    # Keep it only if we were able to write the file
1936
    self._last_serial = serial
1937

    
1938
    assert len(result) == count
1939

    
1940
    return result
1941

    
1942
  @staticmethod
1943
  def _GetJobPath(job_id):
1944
    """Returns the job file for a given job id.
1945

1946
    @type job_id: str
1947
    @param job_id: the job identifier
1948
    @rtype: str
1949
    @return: the path to the job file
1950

1951
    """
1952
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1953

    
1954
  @staticmethod
1955
  def _GetArchivedJobPath(job_id):
1956
    """Returns the archived job file for a give job id.
1957

1958
    @type job_id: str
1959
    @param job_id: the job identifier
1960
    @rtype: str
1961
    @return: the path to the archived job file
1962

1963
    """
1964
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1965
                          jstore.GetArchiveDirectory(job_id),
1966
                          "job-%s" % job_id)
1967

    
1968
  @staticmethod
1969
  def _DetermineJobDirectories(archived):
1970
    """Build list of directories containing job files.
1971

1972
    @type archived: bool
1973
    @param archived: Whether to include directories for archived jobs
1974
    @rtype: list
1975

1976
    """
1977
    result = [pathutils.QUEUE_DIR]
1978

    
1979
    if archived:
1980
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1981
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
1982
                        utils.ListVisibleFiles(archive_path)))
1983

    
1984
    return result
1985

    
1986
  @classmethod
1987
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1988
    """Return all known job IDs.
1989

1990
    The method only looks at disk because it's a requirement that all
1991
    jobs are present on disk (so in the _memcache we don't have any
1992
    extra IDs).
1993

1994
    @type sort: boolean
1995
    @param sort: perform sorting on the returned job ids
1996
    @rtype: list
1997
    @return: the list of job IDs
1998

1999
    """
2000
    jlist = []
2001

    
2002
    for path in cls._DetermineJobDirectories(archived):
2003
      for filename in utils.ListVisibleFiles(path):
2004
        m = constants.JOB_FILE_RE.match(filename)
2005
        if m:
2006
          jlist.append(int(m.group(1)))
2007

    
2008
    if sort:
2009
      jlist.sort()
2010
    return jlist
2011

    
2012
  def _LoadJobUnlocked(self, job_id):
2013
    """Loads a job from the disk or memory.
2014

2015
    Given a job id, this will return the cached job object if
2016
    existing, or try to load the job from the disk. If loading from
2017
    disk, it will also add the job to the cache.
2018

2019
    @type job_id: int
2020
    @param job_id: the job id
2021
    @rtype: L{_QueuedJob} or None
2022
    @return: either None or the job object
2023

2024
    """
2025
    job = self._memcache.get(job_id, None)
2026
    if job:
2027
      logging.debug("Found job %s in memcache", job_id)
2028
      assert job.writable, "Found read-only job in memcache"
2029
      return job
2030

    
2031
    try:
2032
      job = self._LoadJobFromDisk(job_id, False)
2033
      if job is None:
2034
        return job
2035
    except errors.JobFileCorrupted:
2036
      old_path = self._GetJobPath(job_id)
2037
      new_path = self._GetArchivedJobPath(job_id)
2038
      if old_path == new_path:
2039
        # job already archived (future case)
2040
        logging.exception("Can't parse job %s", job_id)
2041
      else:
2042
        # non-archived case
2043
        logging.exception("Can't parse job %s, will archive.", job_id)
2044
        self._RenameFilesUnlocked([(old_path, new_path)])
2045
      return None
2046

    
2047
    assert job.writable, "Job just loaded is not writable"
2048

    
2049
    self._memcache[job_id] = job
2050
    logging.debug("Added job %s to the cache", job_id)
2051
    return job
2052

    
2053
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2054
    """Load the given job file from disk.
2055

2056
    Given a job file, read, load and restore it in a _QueuedJob format.
2057

2058
    @type job_id: int
2059
    @param job_id: job identifier
2060
    @type try_archived: bool
2061
    @param try_archived: Whether to try loading an archived job
2062
    @rtype: L{_QueuedJob} or None
2063
    @return: either None or the job object
2064

2065
    """
2066
    path_functions = [(self._GetJobPath, False)]
2067

    
2068
    if try_archived:
2069
      path_functions.append((self._GetArchivedJobPath, True))
2070

    
2071
    raw_data = None
2072
    archived = None
2073

    
2074
    for (fn, archived) in path_functions:
2075
      filepath = fn(job_id)
2076
      logging.debug("Loading job from %s", filepath)
2077
      try:
2078
        raw_data = utils.ReadFile(filepath)
2079
      except EnvironmentError, err:
2080
        if err.errno != errno.ENOENT:
2081
          raise
2082
      else:
2083
        break
2084

    
2085
    if not raw_data:
2086
      return None
2087

    
2088
    if writable is None:
2089
      writable = not archived
2090

    
2091
    try:
2092
      data = serializer.LoadJson(raw_data)
2093
      job = _QueuedJob.Restore(self, data, writable, archived)
2094
    except Exception, err: # pylint: disable=W0703
2095
      raise errors.JobFileCorrupted(err)
2096

    
2097
    return job
2098

    
2099
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2100
    """Load the given job file from disk.
2101

2102
    Given a job file, read, load and restore it in a _QueuedJob format.
2103
    In case of error reading the job, it gets returned as None, and the
2104
    exception is logged.
2105

2106
    @type job_id: int
2107
    @param job_id: job identifier
2108
    @type try_archived: bool
2109
    @param try_archived: Whether to try loading an archived job
2110
    @rtype: L{_QueuedJob} or None
2111
    @return: either None or the job object
2112

2113
    """
2114
    try:
2115
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2116
    except (errors.JobFileCorrupted, EnvironmentError):
2117
      logging.exception("Can't load/parse job %s", job_id)
2118
      return None
2119

    
2120
  def _UpdateQueueSizeUnlocked(self):
2121
    """Update the queue size.
2122

2123
    """
2124
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2125

    
2126
  @locking.ssynchronized(_LOCK)
2127
  @_RequireOpenQueue
2128
  def SetDrainFlag(self, drain_flag):
2129
    """Sets the drain flag for the queue.
2130

2131
    @type drain_flag: boolean
2132
    @param drain_flag: Whether to set or unset the drain flag
2133

2134
    """
2135
    # Change flag locally
2136
    jstore.SetDrainFlag(drain_flag)
2137

    
2138
    self._drained = drain_flag
2139

    
2140
    # ... and on all nodes
2141
    (names, addrs) = self._GetNodeIp()
2142
    result = \
2143
      self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2144
    self._CheckRpcResult(result, self._nodes,
2145
                         "Setting queue drain flag to %s" % drain_flag)
2146

    
2147
    return True
2148

    
2149
  @_RequireOpenQueue
2150
  def _SubmitJobUnlocked(self, job_id, ops):
2151
    """Create and store a new job.
2152

2153
    This enters the job into our job queue and also puts it on the new
2154
    queue, in order for it to be picked up by the queue processors.
2155

2156
    @type job_id: job ID
2157
    @param job_id: the job ID for the new job
2158
    @type ops: list
2159
    @param ops: The list of OpCodes that will become the new job.
2160
    @rtype: L{_QueuedJob}
2161
    @return: the job object to be queued
2162
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2163
    @raise errors.GenericError: If an opcode is not valid
2164

2165
    """
2166
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2167
      raise errors.JobQueueFull()
2168

    
2169
    job = _QueuedJob(self, job_id, ops, True)
2170

    
2171
    for idx, op in enumerate(job.ops):
2172
      # Check priority
2173
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2174
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2175
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2176
                                  " are %s" % (idx, op.priority, allowed))
2177

    
2178
      # Check job dependencies
2179
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2180
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2181
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2182
                                  " match %s: %s" %
2183
                                  (idx, opcodes.TNoRelativeJobDependencies,
2184
                                   dependencies))
2185

    
2186
    # Write to disk
2187
    self.UpdateJobUnlocked(job)
2188

    
2189
    self._queue_size += 1
2190

    
2191
    logging.debug("Adding new job %s to the cache", job_id)
2192
    self._memcache[job_id] = job
2193

    
2194
    return job
2195

    
2196
  @locking.ssynchronized(_LOCK)
2197
  @_RequireOpenQueue
2198
  @_RequireNonDrainedQueue
2199
  def SubmitJob(self, ops):
2200
    """Create and store a new job.
2201

2202
    @see: L{_SubmitJobUnlocked}
2203

2204
    """
2205
    (job_id, ) = self._NewSerialsUnlocked(1)
2206
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2207
    return job_id
2208

    
2209
  @locking.ssynchronized(_LOCK)
2210
  @_RequireOpenQueue
2211
  @_RequireNonDrainedQueue
2212
  def SubmitManyJobs(self, jobs):
2213
    """Create and store multiple jobs.
2214

2215
    @see: L{_SubmitJobUnlocked}
2216

2217
    """
2218
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2219

    
2220
    (results, added_jobs) = \
2221
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2222

    
2223
    self._EnqueueJobsUnlocked(added_jobs)
2224

    
2225
    return results
2226

    
2227
  @staticmethod
2228
  def _FormatSubmitError(msg, ops):
2229
    """Formats errors which occurred while submitting a job.
2230

2231
    """
2232
    return ("%s; opcodes %s" %
2233
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2234

    
2235
  @staticmethod
2236
  def _ResolveJobDependencies(resolve_fn, deps):
2237
    """Resolves relative job IDs in dependencies.
2238

2239
    @type resolve_fn: callable
2240
    @param resolve_fn: Function to resolve a relative job ID
2241
    @type deps: list
2242
    @param deps: Dependencies
2243
    @rtype: tuple; (boolean, string or list)
2244
    @return: If successful (first tuple item), the returned list contains
2245
      resolved job IDs along with the requested status; if not successful,
2246
      the second element is an error message
2247

2248
    """
2249
    result = []
2250

    
2251
    for (dep_job_id, dep_status) in deps:
2252
      if ht.TRelativeJobId(dep_job_id):
2253
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2254
        try:
2255
          job_id = resolve_fn(dep_job_id)
2256
        except IndexError:
2257
          # Abort
2258
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2259
      else:
2260
        job_id = dep_job_id
2261

    
2262
      result.append((job_id, dep_status))
2263

    
2264
    return (True, result)
2265

    
2266
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2267
    """Create and store multiple jobs.
2268

2269
    @see: L{_SubmitJobUnlocked}
2270

2271
    """
2272
    results = []
2273
    added_jobs = []
2274

    
2275
    def resolve_fn(job_idx, reljobid):
2276
      assert reljobid < 0
2277
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2278

    
2279
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2280
      for op in ops:
2281
        if getattr(op, opcodes.DEPEND_ATTR, None):
2282
          (status, data) = \
2283
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2284
                                         op.depends)
2285
          if not status:
2286
            # Abort resolving dependencies
2287
            assert ht.TNonEmptyString(data), "No error message"
2288
            break
2289
          # Use resolved dependencies
2290
          op.depends = data
2291
      else:
2292
        try:
2293
          job = self._SubmitJobUnlocked(job_id, ops)
2294
        except errors.GenericError, err:
2295
          status = False
2296
          data = self._FormatSubmitError(str(err), ops)
2297
        else:
2298
          status = True
2299
          data = job_id
2300
          added_jobs.append(job)
2301

    
2302
      results.append((status, data))
2303

    
2304
    return (results, added_jobs)
2305

    
2306
  @locking.ssynchronized(_LOCK)
2307
  def _EnqueueJobs(self, jobs):
2308
    """Helper function to add jobs to worker pool's queue.
2309

2310
    @type jobs: list
2311
    @param jobs: List of all jobs
2312

2313
    """
2314
    return self._EnqueueJobsUnlocked(jobs)
2315

    
2316
  def _EnqueueJobsUnlocked(self, jobs):
2317
    """Helper function to add jobs to worker pool's queue.
2318

2319
    @type jobs: list
2320
    @param jobs: List of all jobs
2321

2322
    """
2323
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2324
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2325
                             priority=[job.CalcPriority() for job in jobs],
2326
                             task_id=map(_GetIdAttr, jobs))
2327

    
2328
  def _GetJobStatusForDependencies(self, job_id):
2329
    """Gets the status of a job for dependencies.
2330

2331
    @type job_id: int
2332
    @param job_id: Job ID
2333
    @raise errors.JobLost: If job can't be found
2334

2335
    """
2336
    # Not using in-memory cache as doing so would require an exclusive lock
2337

    
2338
    # Try to load from disk
2339
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2340

    
2341
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2342

    
2343
    if job:
2344
      return job.CalcStatus()
2345

    
2346
    raise errors.JobLost("Job %s not found" % job_id)
2347

    
2348
  @_RequireOpenQueue
2349
  def UpdateJobUnlocked(self, job, replicate=True):
2350
    """Update a job's on disk storage.
2351

2352
    After a job has been modified, this function needs to be called in
2353
    order to write the changes to disk and replicate them to the other
2354
    nodes.
2355

2356
    @type job: L{_QueuedJob}
2357
    @param job: the changed job
2358
    @type replicate: boolean
2359
    @param replicate: whether to replicate the change to remote nodes
2360

2361
    """
2362
    if __debug__:
2363
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2364
      assert (finalized ^ (job.end_timestamp is None))
2365
      assert job.writable, "Can't update read-only job"
2366
      assert not job.archived, "Can't update archived job"
2367

    
2368
    filename = self._GetJobPath(job.id)
2369
    data = serializer.DumpJson(job.Serialize())
2370
    logging.debug("Writing job %s to %s", job.id, filename)
2371
    self._UpdateJobQueueFile(filename, data, replicate)
2372

    
2373
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2374
                        timeout):
2375
    """Waits for changes in a job.
2376

2377
    @type job_id: int
2378
    @param job_id: Job identifier
2379
    @type fields: list of strings
2380
    @param fields: Which fields to check for changes
2381
    @type prev_job_info: list or None
2382
    @param prev_job_info: Last job information returned
2383
    @type prev_log_serial: int
2384
    @param prev_log_serial: Last job message serial number
2385
    @type timeout: float
2386
    @param timeout: maximum time to wait in seconds
2387
    @rtype: tuple (job info, log entries)
2388
    @return: a tuple of the job information as required via
2389
        the fields parameter, and the log entries as a list
2390

2391
        if the job has not changed and the timeout has expired,
2392
        we instead return a special value,
2393
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2394
        as such by the clients
2395

2396
    """
2397
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2398
                             writable=False)
2399

    
2400
    helper = _WaitForJobChangesHelper()
2401

    
2402
    return helper(self._GetJobPath(job_id), load_fn,
2403
                  fields, prev_job_info, prev_log_serial, timeout)
2404

    
2405
  @locking.ssynchronized(_LOCK)
2406
  @_RequireOpenQueue
2407
  def CancelJob(self, job_id):
2408
    """Cancels a job.
2409

2410
    This will only succeed if the job has not started yet.
2411

2412
    @type job_id: int
2413
    @param job_id: job ID of job to be cancelled.
2414

2415
    """
2416
    logging.info("Cancelling job %s", job_id)
2417

    
2418
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2419

    
2420
  @locking.ssynchronized(_LOCK)
2421
  @_RequireOpenQueue
2422
  def ChangeJobPriority(self, job_id, priority):
2423
    """Changes a job's priority.
2424

2425
    @type job_id: int
2426
    @param job_id: ID of the job whose priority should be changed
2427
    @type priority: int
2428
    @param priority: New priority
2429

2430
    """
2431
    logging.info("Changing priority of job %s to %s", job_id, priority)
2432

    
2433
    if priority not in constants.OP_PRIO_SUBMIT_VALID:
2434
      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2435
      raise errors.GenericError("Invalid priority %s, allowed are %s" %
2436
                                (priority, allowed))
2437

    
2438
    def fn(job):
2439
      (success, msg) = job.ChangePriority(priority)
2440

    
2441
      if success:
2442
        try:
2443
          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2444
        except workerpool.NoSuchTask:
2445
          logging.debug("Job %s is not in workerpool at this time", job.id)
2446

    
2447
      return (success, msg)
2448

    
2449
    return self._ModifyJobUnlocked(job_id, fn)
2450

    
2451
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2452
    """Modifies a job.
2453

2454
    @type job_id: int
2455
    @param job_id: Job ID
2456
    @type mod_fn: callable
2457
    @param mod_fn: Modifying function, receiving job object as parameter,
2458
      returning tuple of (status boolean, message string)
2459

2460
    """
2461
    job = self._LoadJobUnlocked(job_id)
2462
    if not job:
2463
      logging.debug("Job %s not found", job_id)
2464
      return (False, "Job %s not found" % job_id)
2465

    
2466
    assert job.writable, "Can't modify read-only job"
2467
    assert not job.archived, "Can't modify archived job"
2468

    
2469
    (success, msg) = mod_fn(job)
2470

    
2471
    if success:
2472
      # If the job was finalized (e.g. cancelled), this is the final write
2473
      # allowed. The job can be archived anytime.
2474
      self.UpdateJobUnlocked(job)
2475

    
2476
    return (success, msg)
2477

    
2478
  @_RequireOpenQueue
2479
  def _ArchiveJobsUnlocked(self, jobs):
2480
    """Archives jobs.
2481

2482
    @type jobs: list of L{_QueuedJob}
2483
    @param jobs: Job objects
2484
    @rtype: int
2485
    @return: Number of archived jobs
2486

2487
    """
2488
    archive_jobs = []
2489
    rename_files = []
2490
    for job in jobs:
2491
      assert job.writable, "Can't archive read-only job"
2492
      assert not job.archived, "Can't cancel archived job"
2493

    
2494
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2495
        logging.debug("Job %s is not yet done", job.id)
2496
        continue
2497

    
2498
      archive_jobs.append(job)
2499

    
2500
      old = self._GetJobPath(job.id)
2501
      new = self._GetArchivedJobPath(job.id)
2502
      rename_files.append((old, new))
2503

    
2504
    # TODO: What if 1..n files fail to rename?
2505
    self._RenameFilesUnlocked(rename_files)
2506

    
2507
    logging.debug("Successfully archived job(s) %s",
2508
                  utils.CommaJoin(job.id for job in archive_jobs))
2509

    
2510
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2511
    # the files, we update the cached queue size from the filesystem. When we
2512
    # get around to fix the TODO: above, we can use the number of actually
2513
    # archived jobs to fix this.
2514
    self._UpdateQueueSizeUnlocked()
2515
    return len(archive_jobs)
2516

    
2517
  @locking.ssynchronized(_LOCK)
2518
  @_RequireOpenQueue
2519
  def ArchiveJob(self, job_id):
2520
    """Archives a job.
2521

2522
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2523

2524
    @type job_id: int
2525
    @param job_id: Job ID of job to be archived.
2526
    @rtype: bool
2527
    @return: Whether job was archived
2528

2529
    """
2530
    logging.info("Archiving job %s", job_id)
2531

    
2532
    job = self._LoadJobUnlocked(job_id)
2533
    if not job:
2534
      logging.debug("Job %s not found", job_id)
2535
      return False
2536

    
2537
    return self._ArchiveJobsUnlocked([job]) == 1
2538

    
2539
  @locking.ssynchronized(_LOCK)
2540
  @_RequireOpenQueue
2541
  def AutoArchiveJobs(self, age, timeout):
2542
    """Archives all jobs based on age.
2543

2544
    The method will archive all jobs which are older than the age
2545
    parameter. For jobs that don't have an end timestamp, the start
2546
    timestamp will be considered. The special '-1' age will cause
2547
    archival of all jobs (that are not running or queued).
2548

2549
    @type age: int
2550
    @param age: the minimum age in seconds
2551

2552
    """
2553
    logging.info("Archiving jobs with age more than %s seconds", age)
2554

    
2555
    now = time.time()
2556
    end_time = now + timeout
2557
    archived_count = 0
2558
    last_touched = 0
2559

    
2560
    all_job_ids = self._GetJobIDsUnlocked()
2561
    pending = []
2562
    for idx, job_id in enumerate(all_job_ids):
2563
      last_touched = idx + 1
2564

    
2565
      # Not optimal because jobs could be pending
2566
      # TODO: Measure average duration for job archival and take number of
2567
      # pending jobs into account.
2568
      if time.time() > end_time:
2569
        break
2570

    
2571
      # Returns None if the job failed to load
2572
      job = self._LoadJobUnlocked(job_id)
2573
      if job:
2574
        if job.end_timestamp is None:
2575
          if job.start_timestamp is None:
2576
            job_age = job.received_timestamp
2577
          else:
2578
            job_age = job.start_timestamp
2579
        else:
2580
          job_age = job.end_timestamp
2581

    
2582
        if age == -1 or now - job_age[0] > age:
2583
          pending.append(job)
2584

    
2585
          # Archive 10 jobs at a time
2586
          if len(pending) >= 10:
2587
            archived_count += self._ArchiveJobsUnlocked(pending)
2588
            pending = []
2589

    
2590
    if pending:
2591
      archived_count += self._ArchiveJobsUnlocked(pending)
2592

    
2593
    return (archived_count, len(all_job_ids) - last_touched)
2594

    
2595
  def _Query(self, fields, qfilter):
2596
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2597
                       namefield="id")
2598

    
2599
    # Archived jobs are only looked at if the "archived" field is referenced
2600
    # either as a requested field or in the filter. By default archived jobs
2601
    # are ignored.
2602
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2603

    
2604
    job_ids = qobj.RequestedNames()
2605

    
2606
    list_all = (job_ids is None)
2607

    
2608
    if list_all:
2609
      # Since files are added to/removed from the queue atomically, there's no
2610
      # risk of getting the job ids in an inconsistent state.
2611
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2612

    
2613
    jobs = []
2614

    
2615
    for job_id in job_ids:
2616
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2617
      if job is not None or not list_all:
2618
        jobs.append((job_id, job))
2619

    
2620
    return (qobj, jobs, list_all)
2621

    
2622
  def QueryJobs(self, fields, qfilter):
2623
    """Returns a list of jobs in queue.
2624

2625
    @type fields: sequence
2626
    @param fields: List of wanted fields
2627
    @type qfilter: None or query2 filter (list)
2628
    @param qfilter: Query filter
2629

2630
    """
2631
    (qobj, ctx, _) = self._Query(fields, qfilter)
2632

    
2633
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2634

    
2635
  def OldStyleQueryJobs(self, job_ids, fields):
2636
    """Returns a list of jobs in queue.
2637

2638
    @type job_ids: list
2639
    @param job_ids: sequence of job identifiers or None for all
2640
    @type fields: list
2641
    @param fields: names of fields to return
2642
    @rtype: list
2643
    @return: list one element per job, each element being list with
2644
        the requested fields
2645

2646
    """
2647
    # backwards compat:
2648
    job_ids = [int(jid) for jid in job_ids]
2649
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2650

    
2651
    (qobj, ctx, _) = self._Query(fields, qfilter)
2652

    
2653
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2654

    
2655
  @locking.ssynchronized(_LOCK)
2656
  def PrepareShutdown(self):
2657
    """Prepare to stop the job queue.
2658

2659
    Disables execution of jobs in the workerpool and returns whether there are
2660
    any jobs currently running. If the latter is the case, the job queue is not
2661
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2662
    be called without interfering with any job. Queued and unfinished jobs will
2663
    be resumed next time.
2664

2665
    Once this function has been called no new job submissions will be accepted
2666
    (see L{_RequireNonDrainedQueue}).
2667

2668
    @rtype: bool
2669
    @return: Whether there are any running jobs
2670

2671
    """
2672
    if self._accepting_jobs:
2673
      self._accepting_jobs = False
2674

    
2675
      # Tell worker pool to stop processing pending tasks
2676
      self._wpool.SetActive(False)
2677

    
2678
    return self._wpool.HasRunningTasks()
2679

    
2680
  def AcceptingJobsUnlocked(self):
2681
    """Returns whether jobs are accepted.
2682

2683
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2684
    queue is shutting down.
2685

2686
    @rtype: bool
2687

2688
    """
2689
    return self._accepting_jobs
2690

    
2691
  @locking.ssynchronized(_LOCK)
2692
  @_RequireOpenQueue
2693
  def Shutdown(self):
2694
    """Stops the job queue.
2695

2696
    This shutdowns all the worker threads an closes the queue.
2697

2698
    """
2699
    self._wpool.TerminateWorkers()
2700

    
2701
    self._queue_filelock.Close()
2702
    self._queue_filelock = None