Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 704b51ff

History | View | Annotate | Download (76.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import time
35
import weakref
36
import threading
37
import itertools
38
import operator
39

    
40
try:
41
  # pylint: disable=E0611
42
  from pyinotify import pyinotify
43
except ImportError:
44
  import pyinotify
45

    
46
from ganeti import asyncnotifier
47
from ganeti import constants
48
from ganeti import serializer
49
from ganeti import workerpool
50
from ganeti import locking
51
from ganeti import luxi
52
from ganeti import opcodes
53
from ganeti import opcodes_base
54
from ganeti import errors
55
from ganeti import mcpu
56
from ganeti import utils
57
from ganeti import jstore
58
from ganeti import rpc
59
from ganeti import runtime
60
from ganeti import netutils
61
from ganeti import compat
62
from ganeti import ht
63
from ganeti import query
64
from ganeti import qlang
65
from ganeti import pathutils
66
from ganeti import vcluster
67

    
68

    
69
JOBQUEUE_THREADS = 25
70

    
71
# member lock names to be passed to @ssynchronized decorator
72
_LOCK = "_lock"
73
_QUEUE = "_queue"
74

    
75
#: Retrieves "id" attribute
76
_GetIdAttr = operator.attrgetter("id")
77

    
78

    
79
class CancelJob(Exception):
80
  """Special exception to cancel a job.
81

82
  """
83

    
84

    
85
class QueueShutdown(Exception):
86
  """Special exception to abort a job when the job queue is shutting down.
87

88
  """
89

    
90

    
91
def TimeStampNow():
92
  """Returns the current timestamp.
93

94
  @rtype: tuple
95
  @return: the current time in the (seconds, microseconds) format
96

97
  """
98
  return utils.SplitTime(time.time())
99

    
100

    
101
def _CallJqUpdate(runner, names, file_name, content):
102
  """Updates job queue file after virtualizing filename.
103

104
  """
105
  virt_file_name = vcluster.MakeVirtualPath(file_name)
106
  return runner.call_jobqueue_update(names, virt_file_name, content)
107

    
108

    
109
class _SimpleJobQuery:
110
  """Wrapper for job queries.
111

112
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
113

114
  """
115
  def __init__(self, fields):
116
    """Initializes this class.
117

118
    """
119
    self._query = query.Query(query.JOB_FIELDS, fields)
120

    
121
  def __call__(self, job):
122
    """Executes a job query using cached field list.
123

124
    """
125
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
126

    
127

    
128
class _QueuedOpCode(object):
129
  """Encapsulates an opcode object.
130

131
  @ivar log: holds the execution log and consists of tuples
132
  of the form C{(log_serial, timestamp, level, message)}
133
  @ivar input: the OpCode we encapsulate
134
  @ivar status: the current status
135
  @ivar result: the result of the LU execution
136
  @ivar start_timestamp: timestamp for the start of the execution
137
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
138
  @ivar stop_timestamp: timestamp for the end of the execution
139

140
  """
141
  __slots__ = ["input", "status", "result", "log", "priority",
142
               "start_timestamp", "exec_timestamp", "end_timestamp",
143
               "__weakref__"]
144

    
145
  def __init__(self, op):
146
    """Initializes instances of this class.
147

148
    @type op: L{opcodes.OpCode}
149
    @param op: the opcode we encapsulate
150

151
    """
152
    self.input = op
153
    self.status = constants.OP_STATUS_QUEUED
154
    self.result = None
155
    self.log = []
156
    self.start_timestamp = None
157
    self.exec_timestamp = None
158
    self.end_timestamp = None
159

    
160
    # Get initial priority (it might change during the lifetime of this opcode)
161
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
162

    
163
  @classmethod
164
  def Restore(cls, state):
165
    """Restore the _QueuedOpCode from the serialized form.
166

167
    @type state: dict
168
    @param state: the serialized state
169
    @rtype: _QueuedOpCode
170
    @return: a new _QueuedOpCode instance
171

172
    """
173
    obj = _QueuedOpCode.__new__(cls)
174
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
175
    obj.status = state["status"]
176
    obj.result = state["result"]
177
    obj.log = state["log"]
178
    obj.start_timestamp = state.get("start_timestamp", None)
179
    obj.exec_timestamp = state.get("exec_timestamp", None)
180
    obj.end_timestamp = state.get("end_timestamp", None)
181
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
182
    return obj
183

    
184
  def Serialize(self):
185
    """Serializes this _QueuedOpCode.
186

187
    @rtype: dict
188
    @return: the dictionary holding the serialized state
189

190
    """
191
    return {
192
      "input": self.input.__getstate__(),
193
      "status": self.status,
194
      "result": self.result,
195
      "log": self.log,
196
      "start_timestamp": self.start_timestamp,
197
      "exec_timestamp": self.exec_timestamp,
198
      "end_timestamp": self.end_timestamp,
199
      "priority": self.priority,
200
      }
201

    
202

    
203
class _QueuedJob(object):
204
  """In-memory job representation.
205

206
  This is what we use to track the user-submitted jobs. Locking must
207
  be taken care of by users of this class.
208

209
  @type queue: L{JobQueue}
210
  @ivar queue: the parent queue
211
  @ivar id: the job ID
212
  @type ops: list
213
  @ivar ops: the list of _QueuedOpCode that constitute the job
214
  @type log_serial: int
215
  @ivar log_serial: holds the index for the next log entry
216
  @ivar received_timestamp: the timestamp for when the job was received
217
  @ivar start_timestmap: the timestamp for start of execution
218
  @ivar end_timestamp: the timestamp for end of execution
219
  @ivar writable: Whether the job is allowed to be modified
220

221
  """
222
  # pylint: disable=W0212
223
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
224
               "received_timestamp", "start_timestamp", "end_timestamp",
225
               "__weakref__", "processor_lock", "writable", "archived"]
226

    
227
  def _AddReasons(self):
228
    """Extend the reason trail
229

230
    Add the reason for all the opcodes of this job to be executed.
231

232
    """
233
    count = 0
234
    for queued_op in self.ops:
235
      op = queued_op.input
236
      reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__)
237
      reason_text = "job=%d;index=%d" % (self.id, count)
238
      reason = getattr(op, "reason", [])
239
      reason.append((reason_src, reason_text, utils.EpochNano()))
240
      op.reason = reason
241
      count = count + 1
242

    
243
  def __init__(self, queue, job_id, ops, writable):
244
    """Constructor for the _QueuedJob.
245

246
    @type queue: L{JobQueue}
247
    @param queue: our parent queue
248
    @type job_id: job_id
249
    @param job_id: our job id
250
    @type ops: list
251
    @param ops: the list of opcodes we hold, which will be encapsulated
252
        in _QueuedOpCodes
253
    @type writable: bool
254
    @param writable: Whether job can be modified
255

256
    """
257
    if not ops:
258
      raise errors.GenericError("A job needs at least one opcode")
259

    
260
    self.queue = queue
261
    self.id = int(job_id)
262
    self.ops = [_QueuedOpCode(op) for op in ops]
263
    self._AddReasons()
264
    self.log_serial = 0
265
    self.received_timestamp = TimeStampNow()
266
    self.start_timestamp = None
267
    self.end_timestamp = None
268
    self.archived = False
269

    
270
    self._InitInMemory(self, writable)
271

    
272
    assert not self.archived, "New jobs can not be marked as archived"
273

    
274
  @staticmethod
275
  def _InitInMemory(obj, writable):
276
    """Initializes in-memory variables.
277

278
    """
279
    obj.writable = writable
280
    obj.ops_iter = None
281
    obj.cur_opctx = None
282

    
283
    # Read-only jobs are not processed and therefore don't need a lock
284
    if writable:
285
      obj.processor_lock = threading.Lock()
286
    else:
287
      obj.processor_lock = None
288

    
289
  def __repr__(self):
290
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
291
              "id=%s" % self.id,
292
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
293

    
294
    return "<%s at %#x>" % (" ".join(status), id(self))
295

    
296
  @classmethod
297
  def Restore(cls, queue, state, writable, archived):
298
    """Restore a _QueuedJob from serialized state:
299

300
    @type queue: L{JobQueue}
301
    @param queue: to which queue the restored job belongs
302
    @type state: dict
303
    @param state: the serialized state
304
    @type writable: bool
305
    @param writable: Whether job can be modified
306
    @type archived: bool
307
    @param archived: Whether job was already archived
308
    @rtype: _JobQueue
309
    @return: the restored _JobQueue instance
310

311
    """
312
    obj = _QueuedJob.__new__(cls)
313
    obj.queue = queue
314
    obj.id = int(state["id"])
315
    obj.received_timestamp = state.get("received_timestamp", None)
316
    obj.start_timestamp = state.get("start_timestamp", None)
317
    obj.end_timestamp = state.get("end_timestamp", None)
318
    obj.archived = archived
319

    
320
    obj.ops = []
321
    obj.log_serial = 0
322
    for op_state in state["ops"]:
323
      op = _QueuedOpCode.Restore(op_state)
324
      for log_entry in op.log:
325
        obj.log_serial = max(obj.log_serial, log_entry[0])
326
      obj.ops.append(op)
327

    
328
    cls._InitInMemory(obj, writable)
329

    
330
    return obj
331

    
332
  def Serialize(self):
333
    """Serialize the _JobQueue instance.
334

335
    @rtype: dict
336
    @return: the serialized state
337

338
    """
339
    return {
340
      "id": self.id,
341
      "ops": [op.Serialize() for op in self.ops],
342
      "start_timestamp": self.start_timestamp,
343
      "end_timestamp": self.end_timestamp,
344
      "received_timestamp": self.received_timestamp,
345
      }
346

    
347
  def CalcStatus(self):
348
    """Compute the status of this job.
349

350
    This function iterates over all the _QueuedOpCodes in the job and
351
    based on their status, computes the job status.
352

353
    The algorithm is:
354
      - if we find a cancelled, or finished with error, the job
355
        status will be the same
356
      - otherwise, the last opcode with the status one of:
357
          - waitlock
358
          - canceling
359
          - running
360

361
        will determine the job status
362

363
      - otherwise, it means either all opcodes are queued, or success,
364
        and the job status will be the same
365

366
    @return: the job status
367

368
    """
369
    status = constants.JOB_STATUS_QUEUED
370

    
371
    all_success = True
372
    for op in self.ops:
373
      if op.status == constants.OP_STATUS_SUCCESS:
374
        continue
375

    
376
      all_success = False
377

    
378
      if op.status == constants.OP_STATUS_QUEUED:
379
        pass
380
      elif op.status == constants.OP_STATUS_WAITING:
381
        status = constants.JOB_STATUS_WAITING
382
      elif op.status == constants.OP_STATUS_RUNNING:
383
        status = constants.JOB_STATUS_RUNNING
384
      elif op.status == constants.OP_STATUS_CANCELING:
385
        status = constants.JOB_STATUS_CANCELING
386
        break
387
      elif op.status == constants.OP_STATUS_ERROR:
388
        status = constants.JOB_STATUS_ERROR
389
        # The whole job fails if one opcode failed
390
        break
391
      elif op.status == constants.OP_STATUS_CANCELED:
392
        status = constants.OP_STATUS_CANCELED
393
        break
394

    
395
    if all_success:
396
      status = constants.JOB_STATUS_SUCCESS
397

    
398
    return status
399

    
400
  def CalcPriority(self):
401
    """Gets the current priority for this job.
402

403
    Only unfinished opcodes are considered. When all are done, the default
404
    priority is used.
405

406
    @rtype: int
407

408
    """
409
    priorities = [op.priority for op in self.ops
410
                  if op.status not in constants.OPS_FINALIZED]
411

    
412
    if not priorities:
413
      # All opcodes are done, assume default priority
414
      return constants.OP_PRIO_DEFAULT
415

    
416
    return min(priorities)
417

    
418
  def GetLogEntries(self, newer_than):
419
    """Selectively returns the log entries.
420

421
    @type newer_than: None or int
422
    @param newer_than: if this is None, return all log entries,
423
        otherwise return only the log entries with serial higher
424
        than this value
425
    @rtype: list
426
    @return: the list of the log entries selected
427

428
    """
429
    if newer_than is None:
430
      serial = -1
431
    else:
432
      serial = newer_than
433

    
434
    entries = []
435
    for op in self.ops:
436
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
437

    
438
    return entries
439

    
440
  def GetInfo(self, fields):
441
    """Returns information about a job.
442

443
    @type fields: list
444
    @param fields: names of fields to return
445
    @rtype: list
446
    @return: list with one element for each field
447
    @raise errors.OpExecError: when an invalid field
448
        has been passed
449

450
    """
451
    return _SimpleJobQuery(fields)(self)
452

    
453
  def MarkUnfinishedOps(self, status, result):
454
    """Mark unfinished opcodes with a given status and result.
455

456
    This is an utility function for marking all running or waiting to
457
    be run opcodes with a given status. Opcodes which are already
458
    finalised are not changed.
459

460
    @param status: a given opcode status
461
    @param result: the opcode result
462

463
    """
464
    not_marked = True
465
    for op in self.ops:
466
      if op.status in constants.OPS_FINALIZED:
467
        assert not_marked, "Finalized opcodes found after non-finalized ones"
468
        continue
469
      op.status = status
470
      op.result = result
471
      not_marked = False
472

    
473
  def Finalize(self):
474
    """Marks the job as finalized.
475

476
    """
477
    self.end_timestamp = TimeStampNow()
478

    
479
  def Cancel(self):
480
    """Marks job as canceled/-ing if possible.
481

482
    @rtype: tuple; (bool, string)
483
    @return: Boolean describing whether job was successfully canceled or marked
484
      as canceling and a text message
485

486
    """
487
    status = self.CalcStatus()
488

    
489
    if status == constants.JOB_STATUS_QUEUED:
490
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
491
                             "Job canceled by request")
492
      self.Finalize()
493
      return (True, "Job %s canceled" % self.id)
494

    
495
    elif status == constants.JOB_STATUS_WAITING:
496
      # The worker will notice the new status and cancel the job
497
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
498
      return (True, "Job %s will be canceled" % self.id)
499

    
500
    else:
501
      logging.debug("Job %s is no longer waiting in the queue", self.id)
502
      return (False, "Job %s is no longer waiting in the queue" % self.id)
503

    
504
  def ChangePriority(self, priority):
505
    """Changes the job priority.
506

507
    @type priority: int
508
    @param priority: New priority
509
    @rtype: tuple; (bool, string)
510
    @return: Boolean describing whether job's priority was successfully changed
511
      and a text message
512

513
    """
514
    status = self.CalcStatus()
515

    
516
    if status in constants.JOBS_FINALIZED:
517
      return (False, "Job %s is finished" % self.id)
518
    elif status == constants.JOB_STATUS_CANCELING:
519
      return (False, "Job %s is cancelling" % self.id)
520
    else:
521
      assert status in (constants.JOB_STATUS_QUEUED,
522
                        constants.JOB_STATUS_WAITING,
523
                        constants.JOB_STATUS_RUNNING)
524

    
525
      changed = False
526
      for op in self.ops:
527
        if (op.status == constants.OP_STATUS_RUNNING or
528
            op.status in constants.OPS_FINALIZED):
529
          assert not changed, \
530
            ("Found opcode for which priority should not be changed after"
531
             " priority has been changed for previous opcodes")
532
          continue
533

    
534
        assert op.status in (constants.OP_STATUS_QUEUED,
535
                             constants.OP_STATUS_WAITING)
536

    
537
        changed = True
538

    
539
        # Set new priority (doesn't modify opcode input)
540
        op.priority = priority
541

    
542
      if changed:
543
        return (True, ("Priorities of pending opcodes for job %s have been"
544
                       " changed to %s" % (self.id, priority)))
545
      else:
546
        return (False, "Job %s had no pending opcodes" % self.id)
547

    
548

    
549
class _OpExecCallbacks(mcpu.OpExecCbBase):
550
  def __init__(self, queue, job, op):
551
    """Initializes this class.
552

553
    @type queue: L{JobQueue}
554
    @param queue: Job queue
555
    @type job: L{_QueuedJob}
556
    @param job: Job object
557
    @type op: L{_QueuedOpCode}
558
    @param op: OpCode
559

560
    """
561
    assert queue, "Queue is missing"
562
    assert job, "Job is missing"
563
    assert op, "Opcode is missing"
564

    
565
    self._queue = queue
566
    self._job = job
567
    self._op = op
568

    
569
  def _CheckCancel(self):
570
    """Raises an exception to cancel the job if asked to.
571

572
    """
573
    # Cancel here if we were asked to
574
    if self._op.status == constants.OP_STATUS_CANCELING:
575
      logging.debug("Canceling opcode")
576
      raise CancelJob()
577

    
578
    # See if queue is shutting down
579
    if not self._queue.AcceptingJobsUnlocked():
580
      logging.debug("Queue is shutting down")
581
      raise QueueShutdown()
582

    
583
  @locking.ssynchronized(_QUEUE, shared=1)
584
  def NotifyStart(self):
585
    """Mark the opcode as running, not lock-waiting.
586

587
    This is called from the mcpu code as a notifier function, when the LU is
588
    finally about to start the Exec() method. Of course, to have end-user
589
    visible results, the opcode must be initially (before calling into
590
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
591

592
    """
593
    assert self._op in self._job.ops
594
    assert self._op.status in (constants.OP_STATUS_WAITING,
595
                               constants.OP_STATUS_CANCELING)
596

    
597
    # Cancel here if we were asked to
598
    self._CheckCancel()
599

    
600
    logging.debug("Opcode is now running")
601

    
602
    self._op.status = constants.OP_STATUS_RUNNING
603
    self._op.exec_timestamp = TimeStampNow()
604

    
605
    # And finally replicate the job status
606
    self._queue.UpdateJobUnlocked(self._job)
607

    
608
  @locking.ssynchronized(_QUEUE, shared=1)
609
  def _AppendFeedback(self, timestamp, log_type, log_msg):
610
    """Internal feedback append function, with locks
611

612
    """
613
    self._job.log_serial += 1
614
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
615
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
616

    
617
  def Feedback(self, *args):
618
    """Append a log entry.
619

620
    """
621
    assert len(args) < 3
622

    
623
    if len(args) == 1:
624
      log_type = constants.ELOG_MESSAGE
625
      log_msg = args[0]
626
    else:
627
      (log_type, log_msg) = args
628

    
629
    # The time is split to make serialization easier and not lose
630
    # precision.
631
    timestamp = utils.SplitTime(time.time())
632
    self._AppendFeedback(timestamp, log_type, log_msg)
633

    
634
  def CurrentPriority(self):
635
    """Returns current priority for opcode.
636

637
    """
638
    assert self._op.status in (constants.OP_STATUS_WAITING,
639
                               constants.OP_STATUS_CANCELING)
640

    
641
    # Cancel here if we were asked to
642
    self._CheckCancel()
643

    
644
    return self._op.priority
645

    
646
  def SubmitManyJobs(self, jobs):
647
    """Submits jobs for processing.
648

649
    See L{JobQueue.SubmitManyJobs}.
650

651
    """
652
    # Locking is done in job queue
653
    return self._queue.SubmitManyJobs(jobs)
654

    
655

    
656
class _JobChangesChecker(object):
657
  def __init__(self, fields, prev_job_info, prev_log_serial):
658
    """Initializes this class.
659

660
    @type fields: list of strings
661
    @param fields: Fields requested by LUXI client
662
    @type prev_job_info: string
663
    @param prev_job_info: previous job info, as passed by the LUXI client
664
    @type prev_log_serial: string
665
    @param prev_log_serial: previous job serial, as passed by the LUXI client
666

667
    """
668
    self._squery = _SimpleJobQuery(fields)
669
    self._prev_job_info = prev_job_info
670
    self._prev_log_serial = prev_log_serial
671

    
672
  def __call__(self, job):
673
    """Checks whether job has changed.
674

675
    @type job: L{_QueuedJob}
676
    @param job: Job object
677

678
    """
679
    assert not job.writable, "Expected read-only job"
680

    
681
    status = job.CalcStatus()
682
    job_info = self._squery(job)
683
    log_entries = job.GetLogEntries(self._prev_log_serial)
684

    
685
    # Serializing and deserializing data can cause type changes (e.g. from
686
    # tuple to list) or precision loss. We're doing it here so that we get
687
    # the same modifications as the data received from the client. Without
688
    # this, the comparison afterwards might fail without the data being
689
    # significantly different.
690
    # TODO: we just deserialized from disk, investigate how to make sure that
691
    # the job info and log entries are compatible to avoid this further step.
692
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
693
    # efficient, though floats will be tricky
694
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
695
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
696

    
697
    # Don't even try to wait if the job is no longer running, there will be
698
    # no changes.
699
    if (status not in (constants.JOB_STATUS_QUEUED,
700
                       constants.JOB_STATUS_RUNNING,
701
                       constants.JOB_STATUS_WAITING) or
702
        job_info != self._prev_job_info or
703
        (log_entries and self._prev_log_serial != log_entries[0][0])):
704
      logging.debug("Job %s changed", job.id)
705
      return (job_info, log_entries)
706

    
707
    return None
708

    
709

    
710
class _JobFileChangesWaiter(object):
711
  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
712
    """Initializes this class.
713

714
    @type filename: string
715
    @param filename: Path to job file
716
    @raises errors.InotifyError: if the notifier cannot be setup
717

718
    """
719
    self._wm = _inotify_wm_cls()
720
    self._inotify_handler = \
721
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
722
    self._notifier = \
723
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
724
    try:
725
      self._inotify_handler.enable()
726
    except Exception:
727
      # pyinotify doesn't close file descriptors automatically
728
      self._notifier.stop()
729
      raise
730

    
731
  def _OnInotify(self, notifier_enabled):
732
    """Callback for inotify.
733

734
    """
735
    if not notifier_enabled:
736
      self._inotify_handler.enable()
737

    
738
  def Wait(self, timeout):
739
    """Waits for the job file to change.
740

741
    @type timeout: float
742
    @param timeout: Timeout in seconds
743
    @return: Whether there have been events
744

745
    """
746
    assert timeout >= 0
747
    have_events = self._notifier.check_events(timeout * 1000)
748
    if have_events:
749
      self._notifier.read_events()
750
    self._notifier.process_events()
751
    return have_events
752

    
753
  def Close(self):
754
    """Closes underlying notifier and its file descriptor.
755

756
    """
757
    self._notifier.stop()
758

    
759

    
760
class _JobChangesWaiter(object):
761
  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
762
    """Initializes this class.
763

764
    @type filename: string
765
    @param filename: Path to job file
766

767
    """
768
    self._filewaiter = None
769
    self._filename = filename
770
    self._waiter_cls = _waiter_cls
771

    
772
  def Wait(self, timeout):
773
    """Waits for a job to change.
774

775
    @type timeout: float
776
    @param timeout: Timeout in seconds
777
    @return: Whether there have been events
778

779
    """
780
    if self._filewaiter:
781
      return self._filewaiter.Wait(timeout)
782

    
783
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
784
    # If this point is reached, return immediately and let caller check the job
785
    # file again in case there were changes since the last check. This avoids a
786
    # race condition.
787
    self._filewaiter = self._waiter_cls(self._filename)
788

    
789
    return True
790

    
791
  def Close(self):
792
    """Closes underlying waiter.
793

794
    """
795
    if self._filewaiter:
796
      self._filewaiter.Close()
797

    
798

    
799
class _WaitForJobChangesHelper(object):
800
  """Helper class using inotify to wait for changes in a job file.
801

802
  This class takes a previous job status and serial, and alerts the client when
803
  the current job status has changed.
804

805
  """
806
  @staticmethod
807
  def _CheckForChanges(counter, job_load_fn, check_fn):
808
    if counter.next() > 0:
809
      # If this isn't the first check the job is given some more time to change
810
      # again. This gives better performance for jobs generating many
811
      # changes/messages.
812
      time.sleep(0.1)
813

    
814
    job = job_load_fn()
815
    if not job:
816
      raise errors.JobLost()
817

    
818
    result = check_fn(job)
819
    if result is None:
820
      raise utils.RetryAgain()
821

    
822
    return result
823

    
824
  def __call__(self, filename, job_load_fn,
825
               fields, prev_job_info, prev_log_serial, timeout,
826
               _waiter_cls=_JobChangesWaiter):
827
    """Waits for changes on a job.
828

829
    @type filename: string
830
    @param filename: File on which to wait for changes
831
    @type job_load_fn: callable
832
    @param job_load_fn: Function to load job
833
    @type fields: list of strings
834
    @param fields: Which fields to check for changes
835
    @type prev_job_info: list or None
836
    @param prev_job_info: Last job information returned
837
    @type prev_log_serial: int
838
    @param prev_log_serial: Last job message serial number
839
    @type timeout: float
840
    @param timeout: maximum time to wait in seconds
841

842
    """
843
    counter = itertools.count()
844
    try:
845
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
846
      waiter = _waiter_cls(filename)
847
      try:
848
        return utils.Retry(compat.partial(self._CheckForChanges,
849
                                          counter, job_load_fn, check_fn),
850
                           utils.RETRY_REMAINING_TIME, timeout,
851
                           wait_fn=waiter.Wait)
852
      finally:
853
        waiter.Close()
854
    except errors.JobLost:
855
      return None
856
    except utils.RetryTimeout:
857
      return constants.JOB_NOTCHANGED
858

    
859

    
860
def _EncodeOpError(err):
861
  """Encodes an error which occurred while processing an opcode.
862

863
  """
864
  if isinstance(err, errors.GenericError):
865
    to_encode = err
866
  else:
867
    to_encode = errors.OpExecError(str(err))
868

    
869
  return errors.EncodeException(to_encode)
870

    
871

    
872
class _TimeoutStrategyWrapper:
873
  def __init__(self, fn):
874
    """Initializes this class.
875

876
    """
877
    self._fn = fn
878
    self._next = None
879

    
880
  def _Advance(self):
881
    """Gets the next timeout if necessary.
882

883
    """
884
    if self._next is None:
885
      self._next = self._fn()
886

    
887
  def Peek(self):
888
    """Returns the next timeout.
889

890
    """
891
    self._Advance()
892
    return self._next
893

    
894
  def Next(self):
895
    """Returns the current timeout and advances the internal state.
896

897
    """
898
    self._Advance()
899
    result = self._next
900
    self._next = None
901
    return result
902

    
903

    
904
class _OpExecContext:
905
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
906
    """Initializes this class.
907

908
    """
909
    self.op = op
910
    self.index = index
911
    self.log_prefix = log_prefix
912
    self.summary = op.input.Summary()
913

    
914
    # Create local copy to modify
915
    if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
916
      self.jobdeps = op.input.depends[:]
917
    else:
918
      self.jobdeps = None
919

    
920
    self._timeout_strategy_factory = timeout_strategy_factory
921
    self._ResetTimeoutStrategy()
922

    
923
  def _ResetTimeoutStrategy(self):
924
    """Creates a new timeout strategy.
925

926
    """
927
    self._timeout_strategy = \
928
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
929

    
930
  def CheckPriorityIncrease(self):
931
    """Checks whether priority can and should be increased.
932

933
    Called when locks couldn't be acquired.
934

935
    """
936
    op = self.op
937

    
938
    # Exhausted all retries and next round should not use blocking acquire
939
    # for locks?
940
    if (self._timeout_strategy.Peek() is None and
941
        op.priority > constants.OP_PRIO_HIGHEST):
942
      logging.debug("Increasing priority")
943
      op.priority -= 1
944
      self._ResetTimeoutStrategy()
945
      return True
946

    
947
    return False
948

    
949
  def GetNextLockTimeout(self):
950
    """Returns the next lock acquire timeout.
951

952
    """
953
    return self._timeout_strategy.Next()
954

    
955

    
956
class _JobProcessor(object):
957
  (DEFER,
958
   WAITDEP,
959
   FINISHED) = range(1, 4)
960

    
961
  def __init__(self, queue, opexec_fn, job,
962
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
963
    """Initializes this class.
964

965
    """
966
    self.queue = queue
967
    self.opexec_fn = opexec_fn
968
    self.job = job
969
    self._timeout_strategy_factory = _timeout_strategy_factory
970

    
971
  @staticmethod
972
  def _FindNextOpcode(job, timeout_strategy_factory):
973
    """Locates the next opcode to run.
974

975
    @type job: L{_QueuedJob}
976
    @param job: Job object
977
    @param timeout_strategy_factory: Callable to create new timeout strategy
978

979
    """
980
    # Create some sort of a cache to speed up locating next opcode for future
981
    # lookups
982
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
983
    # pending and one for processed ops.
984
    if job.ops_iter is None:
985
      job.ops_iter = enumerate(job.ops)
986

    
987
    # Find next opcode to run
988
    while True:
989
      try:
990
        (idx, op) = job.ops_iter.next()
991
      except StopIteration:
992
        raise errors.ProgrammerError("Called for a finished job")
993

    
994
      if op.status == constants.OP_STATUS_RUNNING:
995
        # Found an opcode already marked as running
996
        raise errors.ProgrammerError("Called for job marked as running")
997

    
998
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
999
                             timeout_strategy_factory)
1000

    
1001
      if op.status not in constants.OPS_FINALIZED:
1002
        return opctx
1003

    
1004
      # This is a job that was partially completed before master daemon
1005
      # shutdown, so it can be expected that some opcodes are already
1006
      # completed successfully (if any did error out, then the whole job
1007
      # should have been aborted and not resubmitted for processing).
1008
      logging.info("%s: opcode %s already processed, skipping",
1009
                   opctx.log_prefix, opctx.summary)
1010

    
1011
  @staticmethod
1012
  def _MarkWaitlock(job, op):
1013
    """Marks an opcode as waiting for locks.
1014

1015
    The job's start timestamp is also set if necessary.
1016

1017
    @type job: L{_QueuedJob}
1018
    @param job: Job object
1019
    @type op: L{_QueuedOpCode}
1020
    @param op: Opcode object
1021

1022
    """
1023
    assert op in job.ops
1024
    assert op.status in (constants.OP_STATUS_QUEUED,
1025
                         constants.OP_STATUS_WAITING)
1026

    
1027
    update = False
1028

    
1029
    op.result = None
1030

    
1031
    if op.status == constants.OP_STATUS_QUEUED:
1032
      op.status = constants.OP_STATUS_WAITING
1033
      update = True
1034

    
1035
    if op.start_timestamp is None:
1036
      op.start_timestamp = TimeStampNow()
1037
      update = True
1038

    
1039
    if job.start_timestamp is None:
1040
      job.start_timestamp = op.start_timestamp
1041
      update = True
1042

    
1043
    assert op.status == constants.OP_STATUS_WAITING
1044

    
1045
    return update
1046

    
1047
  @staticmethod
1048
  def _CheckDependencies(queue, job, opctx):
1049
    """Checks if an opcode has dependencies and if so, processes them.
1050

1051
    @type queue: L{JobQueue}
1052
    @param queue: Queue object
1053
    @type job: L{_QueuedJob}
1054
    @param job: Job object
1055
    @type opctx: L{_OpExecContext}
1056
    @param opctx: Opcode execution context
1057
    @rtype: bool
1058
    @return: Whether opcode will be re-scheduled by dependency tracker
1059

1060
    """
1061
    op = opctx.op
1062

    
1063
    result = False
1064

    
1065
    while opctx.jobdeps:
1066
      (dep_job_id, dep_status) = opctx.jobdeps[0]
1067

    
1068
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1069
                                                          dep_status)
1070
      assert ht.TNonEmptyString(depmsg), "No dependency message"
1071

    
1072
      logging.info("%s: %s", opctx.log_prefix, depmsg)
1073

    
1074
      if depresult == _JobDependencyManager.CONTINUE:
1075
        # Remove dependency and continue
1076
        opctx.jobdeps.pop(0)
1077

    
1078
      elif depresult == _JobDependencyManager.WAIT:
1079
        # Need to wait for notification, dependency tracker will re-add job
1080
        # to workerpool
1081
        result = True
1082
        break
1083

    
1084
      elif depresult == _JobDependencyManager.CANCEL:
1085
        # Job was cancelled, cancel this job as well
1086
        job.Cancel()
1087
        assert op.status == constants.OP_STATUS_CANCELING
1088
        break
1089

    
1090
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1091
                         _JobDependencyManager.ERROR):
1092
        # Job failed or there was an error, this job must fail
1093
        op.status = constants.OP_STATUS_ERROR
1094
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1095
        break
1096

    
1097
      else:
1098
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1099
                                     depresult)
1100

    
1101
    return result
1102

    
1103
  def _ExecOpCodeUnlocked(self, opctx):
1104
    """Processes one opcode and returns the result.
1105

1106
    """
1107
    op = opctx.op
1108

    
1109
    assert op.status == constants.OP_STATUS_WAITING
1110

    
1111
    timeout = opctx.GetNextLockTimeout()
1112

    
1113
    try:
1114
      # Make sure not to hold queue lock while calling ExecOpCode
1115
      result = self.opexec_fn(op.input,
1116
                              _OpExecCallbacks(self.queue, self.job, op),
1117
                              timeout=timeout)
1118
    except mcpu.LockAcquireTimeout:
1119
      assert timeout is not None, "Received timeout for blocking acquire"
1120
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1121

    
1122
      assert op.status in (constants.OP_STATUS_WAITING,
1123
                           constants.OP_STATUS_CANCELING)
1124

    
1125
      # Was job cancelled while we were waiting for the lock?
1126
      if op.status == constants.OP_STATUS_CANCELING:
1127
        return (constants.OP_STATUS_CANCELING, None)
1128

    
1129
      # Queue is shutting down, return to queued
1130
      if not self.queue.AcceptingJobsUnlocked():
1131
        return (constants.OP_STATUS_QUEUED, None)
1132

    
1133
      # Stay in waitlock while trying to re-acquire lock
1134
      return (constants.OP_STATUS_WAITING, None)
1135
    except CancelJob:
1136
      logging.exception("%s: Canceling job", opctx.log_prefix)
1137
      assert op.status == constants.OP_STATUS_CANCELING
1138
      return (constants.OP_STATUS_CANCELING, None)
1139

    
1140
    except QueueShutdown:
1141
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1142

    
1143
      assert op.status == constants.OP_STATUS_WAITING
1144

    
1145
      # Job hadn't been started yet, so it should return to the queue
1146
      return (constants.OP_STATUS_QUEUED, None)
1147

    
1148
    except Exception, err: # pylint: disable=W0703
1149
      logging.exception("%s: Caught exception in %s",
1150
                        opctx.log_prefix, opctx.summary)
1151
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1152
    else:
1153
      logging.debug("%s: %s successful",
1154
                    opctx.log_prefix, opctx.summary)
1155
      return (constants.OP_STATUS_SUCCESS, result)
1156

    
1157
  def __call__(self, _nextop_fn=None):
1158
    """Continues execution of a job.
1159

1160
    @param _nextop_fn: Callback function for tests
1161
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1162
      be deferred and C{WAITDEP} if the dependency manager
1163
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1164

1165
    """
1166
    queue = self.queue
1167
    job = self.job
1168

    
1169
    logging.debug("Processing job %s", job.id)
1170

    
1171
    queue.acquire(shared=1)
1172
    try:
1173
      opcount = len(job.ops)
1174

    
1175
      assert job.writable, "Expected writable job"
1176

    
1177
      # Don't do anything for finalized jobs
1178
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1179
        return self.FINISHED
1180

    
1181
      # Is a previous opcode still pending?
1182
      if job.cur_opctx:
1183
        opctx = job.cur_opctx
1184
        job.cur_opctx = None
1185
      else:
1186
        if __debug__ and _nextop_fn:
1187
          _nextop_fn()
1188
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1189

    
1190
      op = opctx.op
1191

    
1192
      # Consistency check
1193
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1194
                                     constants.OP_STATUS_CANCELING)
1195
                        for i in job.ops[opctx.index + 1:])
1196

    
1197
      assert op.status in (constants.OP_STATUS_QUEUED,
1198
                           constants.OP_STATUS_WAITING,
1199
                           constants.OP_STATUS_CANCELING)
1200

    
1201
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1202
              op.priority >= constants.OP_PRIO_HIGHEST)
1203

    
1204
      waitjob = None
1205

    
1206
      if op.status != constants.OP_STATUS_CANCELING:
1207
        assert op.status in (constants.OP_STATUS_QUEUED,
1208
                             constants.OP_STATUS_WAITING)
1209

    
1210
        # Prepare to start opcode
1211
        if self._MarkWaitlock(job, op):
1212
          # Write to disk
1213
          queue.UpdateJobUnlocked(job)
1214

    
1215
        assert op.status == constants.OP_STATUS_WAITING
1216
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1217
        assert job.start_timestamp and op.start_timestamp
1218
        assert waitjob is None
1219

    
1220
        # Check if waiting for a job is necessary
1221
        waitjob = self._CheckDependencies(queue, job, opctx)
1222

    
1223
        assert op.status in (constants.OP_STATUS_WAITING,
1224
                             constants.OP_STATUS_CANCELING,
1225
                             constants.OP_STATUS_ERROR)
1226

    
1227
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1228
                                         constants.OP_STATUS_ERROR)):
1229
          logging.info("%s: opcode %s waiting for locks",
1230
                       opctx.log_prefix, opctx.summary)
1231

    
1232
          assert not opctx.jobdeps, "Not all dependencies were removed"
1233

    
1234
          queue.release()
1235
          try:
1236
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1237
          finally:
1238
            queue.acquire(shared=1)
1239

    
1240
          op.status = op_status
1241
          op.result = op_result
1242

    
1243
          assert not waitjob
1244

    
1245
        if op.status in (constants.OP_STATUS_WAITING,
1246
                         constants.OP_STATUS_QUEUED):
1247
          # waiting: Couldn't get locks in time
1248
          # queued: Queue is shutting down
1249
          assert not op.end_timestamp
1250
        else:
1251
          # Finalize opcode
1252
          op.end_timestamp = TimeStampNow()
1253

    
1254
          if op.status == constants.OP_STATUS_CANCELING:
1255
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1256
                                  for i in job.ops[opctx.index:])
1257
          else:
1258
            assert op.status in constants.OPS_FINALIZED
1259

    
1260
      if op.status == constants.OP_STATUS_QUEUED:
1261
        # Queue is shutting down
1262
        assert not waitjob
1263

    
1264
        finalize = False
1265

    
1266
        # Reset context
1267
        job.cur_opctx = None
1268

    
1269
        # In no case must the status be finalized here
1270
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1271

    
1272
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1273
        finalize = False
1274

    
1275
        if not waitjob and opctx.CheckPriorityIncrease():
1276
          # Priority was changed, need to update on-disk file
1277
          queue.UpdateJobUnlocked(job)
1278

    
1279
        # Keep around for another round
1280
        job.cur_opctx = opctx
1281

    
1282
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1283
                op.priority >= constants.OP_PRIO_HIGHEST)
1284

    
1285
        # In no case must the status be finalized here
1286
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1287

    
1288
      else:
1289
        # Ensure all opcodes so far have been successful
1290
        assert (opctx.index == 0 or
1291
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1292
                           for i in job.ops[:opctx.index]))
1293

    
1294
        # Reset context
1295
        job.cur_opctx = None
1296

    
1297
        if op.status == constants.OP_STATUS_SUCCESS:
1298
          finalize = False
1299

    
1300
        elif op.status == constants.OP_STATUS_ERROR:
1301
          # Ensure failed opcode has an exception as its result
1302
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1303

    
1304
          to_encode = errors.OpExecError("Preceding opcode failed")
1305
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1306
                                _EncodeOpError(to_encode))
1307
          finalize = True
1308

    
1309
          # Consistency check
1310
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1311
                            errors.GetEncodedError(i.result)
1312
                            for i in job.ops[opctx.index:])
1313

    
1314
        elif op.status == constants.OP_STATUS_CANCELING:
1315
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1316
                                "Job canceled by request")
1317
          finalize = True
1318

    
1319
        else:
1320
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1321

    
1322
        if opctx.index == (opcount - 1):
1323
          # Finalize on last opcode
1324
          finalize = True
1325

    
1326
        if finalize:
1327
          # All opcodes have been run, finalize job
1328
          job.Finalize()
1329

    
1330
        # Write to disk. If the job status is final, this is the final write
1331
        # allowed. Once the file has been written, it can be archived anytime.
1332
        queue.UpdateJobUnlocked(job)
1333

    
1334
        assert not waitjob
1335

    
1336
        if finalize:
1337
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1338
          return self.FINISHED
1339

    
1340
      assert not waitjob or queue.depmgr.JobWaiting(job)
1341

    
1342
      if waitjob:
1343
        return self.WAITDEP
1344
      else:
1345
        return self.DEFER
1346
    finally:
1347
      assert job.writable, "Job became read-only while being processed"
1348
      queue.release()
1349

    
1350

    
1351
def _EvaluateJobProcessorResult(depmgr, job, result):
1352
  """Looks at a result from L{_JobProcessor} for a job.
1353

1354
  To be used in a L{_JobQueueWorker}.
1355

1356
  """
1357
  if result == _JobProcessor.FINISHED:
1358
    # Notify waiting jobs
1359
    depmgr.NotifyWaiters(job.id)
1360

    
1361
  elif result == _JobProcessor.DEFER:
1362
    # Schedule again
1363
    raise workerpool.DeferTask(priority=job.CalcPriority())
1364

    
1365
  elif result == _JobProcessor.WAITDEP:
1366
    # No-op, dependency manager will re-schedule
1367
    pass
1368

    
1369
  else:
1370
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1371
                                 (result, ))
1372

    
1373

    
1374
class _JobQueueWorker(workerpool.BaseWorker):
1375
  """The actual job workers.
1376

1377
  """
1378
  def RunTask(self, job): # pylint: disable=W0221
1379
    """Job executor.
1380

1381
    @type job: L{_QueuedJob}
1382
    @param job: the job to be processed
1383

1384
    """
1385
    assert job.writable, "Expected writable job"
1386

    
1387
    # Ensure only one worker is active on a single job. If a job registers for
1388
    # a dependency job, and the other job notifies before the first worker is
1389
    # done, the job can end up in the tasklist more than once.
1390
    job.processor_lock.acquire()
1391
    try:
1392
      return self._RunTaskInner(job)
1393
    finally:
1394
      job.processor_lock.release()
1395

    
1396
  def _RunTaskInner(self, job):
1397
    """Executes a job.
1398

1399
    Must be called with per-job lock acquired.
1400

1401
    """
1402
    queue = job.queue
1403
    assert queue == self.pool.queue
1404

    
1405
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1406
    setname_fn(None)
1407

    
1408
    proc = mcpu.Processor(queue.context, job.id)
1409

    
1410
    # Create wrapper for setting thread name
1411
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1412
                                    proc.ExecOpCode)
1413

    
1414
    _EvaluateJobProcessorResult(queue.depmgr, job,
1415
                                _JobProcessor(queue, wrap_execop_fn, job)())
1416

    
1417
  @staticmethod
1418
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1419
    """Updates the worker thread name to include a short summary of the opcode.
1420

1421
    @param setname_fn: Callable setting worker thread name
1422
    @param execop_fn: Callable for executing opcode (usually
1423
                      L{mcpu.Processor.ExecOpCode})
1424

1425
    """
1426
    setname_fn(op)
1427
    try:
1428
      return execop_fn(op, *args, **kwargs)
1429
    finally:
1430
      setname_fn(None)
1431

    
1432
  @staticmethod
1433
  def _GetWorkerName(job, op):
1434
    """Sets the worker thread name.
1435

1436
    @type job: L{_QueuedJob}
1437
    @type op: L{opcodes.OpCode}
1438

1439
    """
1440
    parts = ["Job%s" % job.id]
1441

    
1442
    if op:
1443
      parts.append(op.TinySummary())
1444

    
1445
    return "/".join(parts)
1446

    
1447

    
1448
class _JobQueueWorkerPool(workerpool.WorkerPool):
1449
  """Simple class implementing a job-processing workerpool.
1450

1451
  """
1452
  def __init__(self, queue):
1453
    super(_JobQueueWorkerPool, self).__init__("Jq",
1454
                                              JOBQUEUE_THREADS,
1455
                                              _JobQueueWorker)
1456
    self.queue = queue
1457

    
1458

    
1459
class _JobDependencyManager:
1460
  """Keeps track of job dependencies.
1461

1462
  """
1463
  (WAIT,
1464
   ERROR,
1465
   CANCEL,
1466
   CONTINUE,
1467
   WRONGSTATUS) = range(1, 6)
1468

    
1469
  def __init__(self, getstatus_fn, enqueue_fn):
1470
    """Initializes this class.
1471

1472
    """
1473
    self._getstatus_fn = getstatus_fn
1474
    self._enqueue_fn = enqueue_fn
1475

    
1476
    self._waiters = {}
1477
    self._lock = locking.SharedLock("JobDepMgr")
1478

    
1479
  @locking.ssynchronized(_LOCK, shared=1)
1480
  def GetLockInfo(self, requested): # pylint: disable=W0613
1481
    """Retrieves information about waiting jobs.
1482

1483
    @type requested: set
1484
    @param requested: Requested information, see C{query.LQ_*}
1485

1486
    """
1487
    # No need to sort here, that's being done by the lock manager and query
1488
    # library. There are no priorities for notifying jobs, hence all show up as
1489
    # one item under "pending".
1490
    return [("job/%s" % job_id, None, None,
1491
             [("job", [job.id for job in waiters])])
1492
            for job_id, waiters in self._waiters.items()
1493
            if waiters]
1494

    
1495
  @locking.ssynchronized(_LOCK, shared=1)
1496
  def JobWaiting(self, job):
1497
    """Checks if a job is waiting.
1498

1499
    """
1500
    return compat.any(job in jobs
1501
                      for jobs in self._waiters.values())
1502

    
1503
  @locking.ssynchronized(_LOCK)
1504
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1505
    """Checks if a dependency job has the requested status.
1506

1507
    If the other job is not yet in a finalized status, the calling job will be
1508
    notified (re-added to the workerpool) at a later point.
1509

1510
    @type job: L{_QueuedJob}
1511
    @param job: Job object
1512
    @type dep_job_id: int
1513
    @param dep_job_id: ID of dependency job
1514
    @type dep_status: list
1515
    @param dep_status: Required status
1516

1517
    """
1518
    assert ht.TJobId(job.id)
1519
    assert ht.TJobId(dep_job_id)
1520
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1521

    
1522
    if job.id == dep_job_id:
1523
      return (self.ERROR, "Job can't depend on itself")
1524

    
1525
    # Get status of dependency job
1526
    try:
1527
      status = self._getstatus_fn(dep_job_id)
1528
    except errors.JobLost, err:
1529
      return (self.ERROR, "Dependency error: %s" % err)
1530

    
1531
    assert status in constants.JOB_STATUS_ALL
1532

    
1533
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1534

    
1535
    if status not in constants.JOBS_FINALIZED:
1536
      # Register for notification and wait for job to finish
1537
      job_id_waiters.add(job)
1538
      return (self.WAIT,
1539
              "Need to wait for job %s, wanted status '%s'" %
1540
              (dep_job_id, dep_status))
1541

    
1542
    # Remove from waiters list
1543
    if job in job_id_waiters:
1544
      job_id_waiters.remove(job)
1545

    
1546
    if (status == constants.JOB_STATUS_CANCELED and
1547
        constants.JOB_STATUS_CANCELED not in dep_status):
1548
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1549

    
1550
    elif not dep_status or status in dep_status:
1551
      return (self.CONTINUE,
1552
              "Dependency job %s finished with status '%s'" %
1553
              (dep_job_id, status))
1554

    
1555
    else:
1556
      return (self.WRONGSTATUS,
1557
              "Dependency job %s finished with status '%s',"
1558
              " not one of '%s' as required" %
1559
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1560

    
1561
  def _RemoveEmptyWaitersUnlocked(self):
1562
    """Remove all jobs without actual waiters.
1563

1564
    """
1565
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1566
                   if not waiters]:
1567
      del self._waiters[job_id]
1568

    
1569
  def NotifyWaiters(self, job_id):
1570
    """Notifies all jobs waiting for a certain job ID.
1571

1572
    @attention: Do not call until L{CheckAndRegister} returned a status other
1573
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1574
    @type job_id: int
1575
    @param job_id: Job ID
1576

1577
    """
1578
    assert ht.TJobId(job_id)
1579

    
1580
    self._lock.acquire()
1581
    try:
1582
      self._RemoveEmptyWaitersUnlocked()
1583

    
1584
      jobs = self._waiters.pop(job_id, None)
1585
    finally:
1586
      self._lock.release()
1587

    
1588
    if jobs:
1589
      # Re-add jobs to workerpool
1590
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1591
                    len(jobs), job_id)
1592
      self._enqueue_fn(jobs)
1593

    
1594

    
1595
def _RequireOpenQueue(fn):
1596
  """Decorator for "public" functions.
1597

1598
  This function should be used for all 'public' functions. That is,
1599
  functions usually called from other classes. Note that this should
1600
  be applied only to methods (not plain functions), since it expects
1601
  that the decorated function is called with a first argument that has
1602
  a '_queue_filelock' argument.
1603

1604
  @warning: Use this decorator only after locking.ssynchronized
1605

1606
  Example::
1607
    @locking.ssynchronized(_LOCK)
1608
    @_RequireOpenQueue
1609
    def Example(self):
1610
      pass
1611

1612
  """
1613
  def wrapper(self, *args, **kwargs):
1614
    # pylint: disable=W0212
1615
    assert self._queue_filelock is not None, "Queue should be open"
1616
    return fn(self, *args, **kwargs)
1617
  return wrapper
1618

    
1619

    
1620
def _RequireNonDrainedQueue(fn):
1621
  """Decorator checking for a non-drained queue.
1622

1623
  To be used with functions submitting new jobs.
1624

1625
  """
1626
  def wrapper(self, *args, **kwargs):
1627
    """Wrapper function.
1628

1629
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1630

1631
    """
1632
    # Ok when sharing the big job queue lock, as the drain file is created when
1633
    # the lock is exclusive.
1634
    # Needs access to protected member, pylint: disable=W0212
1635
    if self._drained:
1636
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1637

    
1638
    if not self._accepting_jobs:
1639
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1640

    
1641
    return fn(self, *args, **kwargs)
1642
  return wrapper
1643

    
1644

    
1645
class JobQueue(object):
1646
  """Queue used to manage the jobs.
1647

1648
  """
1649
  def __init__(self, context):
1650
    """Constructor for JobQueue.
1651

1652
    The constructor will initialize the job queue object and then
1653
    start loading the current jobs from disk, either for starting them
1654
    (if they were queue) or for aborting them (if they were already
1655
    running).
1656

1657
    @type context: GanetiContext
1658
    @param context: the context object for access to the configuration
1659
        data and other ganeti objects
1660

1661
    """
1662
    self.context = context
1663
    self._memcache = weakref.WeakValueDictionary()
1664
    self._my_hostname = netutils.Hostname.GetSysName()
1665

    
1666
    # The Big JobQueue lock. If a code block or method acquires it in shared
1667
    # mode safe it must guarantee concurrency with all the code acquiring it in
1668
    # shared mode, including itself. In order not to acquire it at all
1669
    # concurrency must be guaranteed with all code acquiring it in shared mode
1670
    # and all code acquiring it exclusively.
1671
    self._lock = locking.SharedLock("JobQueue")
1672

    
1673
    self.acquire = self._lock.acquire
1674
    self.release = self._lock.release
1675

    
1676
    # Accept jobs by default
1677
    self._accepting_jobs = True
1678

    
1679
    # Initialize the queue, and acquire the filelock.
1680
    # This ensures no other process is working on the job queue.
1681
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1682

    
1683
    # Read serial file
1684
    self._last_serial = jstore.ReadSerial()
1685
    assert self._last_serial is not None, ("Serial file was modified between"
1686
                                           " check in jstore and here")
1687

    
1688
    # Get initial list of nodes
1689
    self._nodes = dict((n.name, n.primary_ip)
1690
                       for n in self.context.cfg.GetAllNodesInfo().values()
1691
                       if n.master_candidate)
1692

    
1693
    # Remove master node
1694
    self._nodes.pop(self._my_hostname, None)
1695

    
1696
    # TODO: Check consistency across nodes
1697

    
1698
    self._queue_size = None
1699
    self._UpdateQueueSizeUnlocked()
1700
    assert ht.TInt(self._queue_size)
1701
    self._drained = jstore.CheckDrainFlag()
1702

    
1703
    # Job dependencies
1704
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1705
                                        self._EnqueueJobs)
1706
    self.context.glm.AddToLockMonitor(self.depmgr)
1707

    
1708
    # Setup worker pool
1709
    self._wpool = _JobQueueWorkerPool(self)
1710
    try:
1711
      self._InspectQueue()
1712
    except:
1713
      self._wpool.TerminateWorkers()
1714
      raise
1715

    
1716
  def _PickupJobUnlocked(self, job_id):
1717
    """Load a job from the job queue
1718

1719
    Pick up a job that already is in the job queue and start/resume it.
1720

1721
    """
1722
    job = self._LoadJobUnlocked(job_id)
1723

    
1724
    if job is None:
1725
      logging.warning("Job %s could not be read", job_id)
1726
      return
1727

    
1728
    status = job.CalcStatus()
1729

    
1730
    if status == constants.JOB_STATUS_QUEUED:
1731
      self._EnqueueJobsUnlocked([job])
1732
      logging.info("Restarting job %s", job.id)
1733

    
1734
    elif status in (constants.JOB_STATUS_RUNNING,
1735
                    constants.JOB_STATUS_WAITING,
1736
                    constants.JOB_STATUS_CANCELING):
1737
      logging.warning("Unfinished job %s found: %s", job.id, job)
1738

    
1739
      if status == constants.JOB_STATUS_WAITING:
1740
        job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1741
        self._EnqueueJobsUnlocked([job])
1742
        logging.info("Restarting job %s", job.id)
1743
      else:
1744
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1745
                              "Unclean master daemon shutdown")
1746
        job.Finalize()
1747

    
1748
    self.UpdateJobUnlocked(job)
1749

    
1750
  @locking.ssynchronized(_LOCK)
1751
  def PickupJob(self, job_id):
1752
    self._PickupJobUnlocked(job_id)
1753

    
1754
  @locking.ssynchronized(_LOCK)
1755
  @_RequireOpenQueue
1756
  def _InspectQueue(self):
1757
    """Loads the whole job queue and resumes unfinished jobs.
1758

1759
    This function needs the lock here because WorkerPool.AddTask() may start a
1760
    job while we're still doing our work.
1761

1762
    """
1763
    logging.info("Inspecting job queue")
1764

    
1765
    all_job_ids = self._GetJobIDsUnlocked()
1766
    jobs_count = len(all_job_ids)
1767
    lastinfo = time.time()
1768
    for idx, job_id in enumerate(all_job_ids):
1769
      # Give an update every 1000 jobs or 10 seconds
1770
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1771
          idx == (jobs_count - 1)):
1772
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1773
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1774
        lastinfo = time.time()
1775

    
1776
      self._PickupJobUnlocked(job_id)
1777

    
1778
    logging.info("Job queue inspection finished")
1779

    
1780
  def _GetRpc(self, address_list):
1781
    """Gets RPC runner with context.
1782

1783
    """
1784
    return rpc.JobQueueRunner(self.context, address_list)
1785

    
1786
  @locking.ssynchronized(_LOCK)
1787
  @_RequireOpenQueue
1788
  def AddNode(self, node):
1789
    """Register a new node with the queue.
1790

1791
    @type node: L{objects.Node}
1792
    @param node: the node object to be added
1793

1794
    """
1795
    node_name = node.name
1796
    assert node_name != self._my_hostname
1797

    
1798
    # Clean queue directory on added node
1799
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1800
    msg = result.fail_msg
1801
    if msg:
1802
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1803
                      node_name, msg)
1804

    
1805
    if not node.master_candidate:
1806
      # remove if existing, ignoring errors
1807
      self._nodes.pop(node_name, None)
1808
      # and skip the replication of the job ids
1809
      return
1810

    
1811
    # Upload the whole queue excluding archived jobs
1812
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1813

    
1814
    # Upload current serial file
1815
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1816

    
1817
    # Static address list
1818
    addrs = [node.primary_ip]
1819

    
1820
    for file_name in files:
1821
      # Read file content
1822
      content = utils.ReadFile(file_name)
1823

    
1824
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1825
                             file_name, content)
1826
      msg = result[node_name].fail_msg
1827
      if msg:
1828
        logging.error("Failed to upload file %s to node %s: %s",
1829
                      file_name, node_name, msg)
1830

    
1831
    # Set queue drained flag
1832
    result = \
1833
      self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1834
                                                       self._drained)
1835
    msg = result[node_name].fail_msg
1836
    if msg:
1837
      logging.error("Failed to set queue drained flag on node %s: %s",
1838
                    node_name, msg)
1839

    
1840
    self._nodes[node_name] = node.primary_ip
1841

    
1842
  @locking.ssynchronized(_LOCK)
1843
  @_RequireOpenQueue
1844
  def RemoveNode(self, node_name):
1845
    """Callback called when removing nodes from the cluster.
1846

1847
    @type node_name: str
1848
    @param node_name: the name of the node to remove
1849

1850
    """
1851
    self._nodes.pop(node_name, None)
1852

    
1853
  @staticmethod
1854
  def _CheckRpcResult(result, nodes, failmsg):
1855
    """Verifies the status of an RPC call.
1856

1857
    Since we aim to keep consistency should this node (the current
1858
    master) fail, we will log errors if our rpc fail, and especially
1859
    log the case when more than half of the nodes fails.
1860

1861
    @param result: the data as returned from the rpc call
1862
    @type nodes: list
1863
    @param nodes: the list of nodes we made the call to
1864
    @type failmsg: str
1865
    @param failmsg: the identifier to be used for logging
1866

1867
    """
1868
    failed = []
1869
    success = []
1870

    
1871
    for node in nodes:
1872
      msg = result[node].fail_msg
1873
      if msg:
1874
        failed.append(node)
1875
        logging.error("RPC call %s (%s) failed on node %s: %s",
1876
                      result[node].call, failmsg, node, msg)
1877
      else:
1878
        success.append(node)
1879

    
1880
    # +1 for the master node
1881
    if (len(success) + 1) < len(failed):
1882
      # TODO: Handle failing nodes
1883
      logging.error("More than half of the nodes failed")
1884

    
1885
  def _GetNodeIp(self):
1886
    """Helper for returning the node name/ip list.
1887

1888
    @rtype: (list, list)
1889
    @return: a tuple of two lists, the first one with the node
1890
        names and the second one with the node addresses
1891

1892
    """
1893
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1894
    name_list = self._nodes.keys()
1895
    addr_list = [self._nodes[name] for name in name_list]
1896
    return name_list, addr_list
1897

    
1898
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1899
    """Writes a file locally and then replicates it to all nodes.
1900

1901
    This function will replace the contents of a file on the local
1902
    node and then replicate it to all the other nodes we have.
1903

1904
    @type file_name: str
1905
    @param file_name: the path of the file to be replicated
1906
    @type data: str
1907
    @param data: the new contents of the file
1908
    @type replicate: boolean
1909
    @param replicate: whether to spread the changes to the remote nodes
1910

1911
    """
1912
    getents = runtime.GetEnts()
1913
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1914
                    gid=getents.daemons_gid,
1915
                    mode=constants.JOB_QUEUE_FILES_PERMS)
1916

    
1917
    if replicate:
1918
      names, addrs = self._GetNodeIp()
1919
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1920
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1921

    
1922
  def _RenameFilesUnlocked(self, rename):
1923
    """Renames a file locally and then replicate the change.
1924

1925
    This function will rename a file in the local queue directory
1926
    and then replicate this rename to all the other nodes we have.
1927

1928
    @type rename: list of (old, new)
1929
    @param rename: List containing tuples mapping old to new names
1930

1931
    """
1932
    # Rename them locally
1933
    for old, new in rename:
1934
      utils.RenameFile(old, new, mkdir=True)
1935

    
1936
    # ... and on all nodes
1937
    names, addrs = self._GetNodeIp()
1938
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1939
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1940

    
1941
  @staticmethod
1942
  def _GetJobPath(job_id):
1943
    """Returns the job file for a given job id.
1944

1945
    @type job_id: str
1946
    @param job_id: the job identifier
1947
    @rtype: str
1948
    @return: the path to the job file
1949

1950
    """
1951
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1952

    
1953
  @staticmethod
1954
  def _GetArchivedJobPath(job_id):
1955
    """Returns the archived job file for a give job id.
1956

1957
    @type job_id: str
1958
    @param job_id: the job identifier
1959
    @rtype: str
1960
    @return: the path to the archived job file
1961

1962
    """
1963
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1964
                          jstore.GetArchiveDirectory(job_id),
1965
                          "job-%s" % job_id)
1966

    
1967
  @staticmethod
1968
  def _DetermineJobDirectories(archived):
1969
    """Build list of directories containing job files.
1970

1971
    @type archived: bool
1972
    @param archived: Whether to include directories for archived jobs
1973
    @rtype: list
1974

1975
    """
1976
    result = [pathutils.QUEUE_DIR]
1977

    
1978
    if archived:
1979
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1980
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
1981
                        utils.ListVisibleFiles(archive_path)))
1982

    
1983
    return result
1984

    
1985
  @classmethod
1986
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1987
    """Return all known job IDs.
1988

1989
    The method only looks at disk because it's a requirement that all
1990
    jobs are present on disk (so in the _memcache we don't have any
1991
    extra IDs).
1992

1993
    @type sort: boolean
1994
    @param sort: perform sorting on the returned job ids
1995
    @rtype: list
1996
    @return: the list of job IDs
1997

1998
    """
1999
    jlist = []
2000

    
2001
    for path in cls._DetermineJobDirectories(archived):
2002
      for filename in utils.ListVisibleFiles(path):
2003
        m = constants.JOB_FILE_RE.match(filename)
2004
        if m:
2005
          jlist.append(int(m.group(1)))
2006

    
2007
    if sort:
2008
      jlist.sort()
2009
    return jlist
2010

    
2011
  def _LoadJobUnlocked(self, job_id):
2012
    """Loads a job from the disk or memory.
2013

2014
    Given a job id, this will return the cached job object if
2015
    existing, or try to load the job from the disk. If loading from
2016
    disk, it will also add the job to the cache.
2017

2018
    @type job_id: int
2019
    @param job_id: the job id
2020
    @rtype: L{_QueuedJob} or None
2021
    @return: either None or the job object
2022

2023
    """
2024
    job = self._memcache.get(job_id, None)
2025
    if job:
2026
      logging.debug("Found job %s in memcache", job_id)
2027
      assert job.writable, "Found read-only job in memcache"
2028
      return job
2029

    
2030
    try:
2031
      job = self._LoadJobFromDisk(job_id, False)
2032
      if job is None:
2033
        return job
2034
    except errors.JobFileCorrupted:
2035
      old_path = self._GetJobPath(job_id)
2036
      new_path = self._GetArchivedJobPath(job_id)
2037
      if old_path == new_path:
2038
        # job already archived (future case)
2039
        logging.exception("Can't parse job %s", job_id)
2040
      else:
2041
        # non-archived case
2042
        logging.exception("Can't parse job %s, will archive.", job_id)
2043
        self._RenameFilesUnlocked([(old_path, new_path)])
2044
      return None
2045

    
2046
    assert job.writable, "Job just loaded is not writable"
2047

    
2048
    self._memcache[job_id] = job
2049
    logging.debug("Added job %s to the cache", job_id)
2050
    return job
2051

    
2052
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2053
    """Load the given job file from disk.
2054

2055
    Given a job file, read, load and restore it in a _QueuedJob format.
2056

2057
    @type job_id: int
2058
    @param job_id: job identifier
2059
    @type try_archived: bool
2060
    @param try_archived: Whether to try loading an archived job
2061
    @rtype: L{_QueuedJob} or None
2062
    @return: either None or the job object
2063

2064
    """
2065
    path_functions = [(self._GetJobPath, False)]
2066

    
2067
    if try_archived:
2068
      path_functions.append((self._GetArchivedJobPath, True))
2069

    
2070
    raw_data = None
2071
    archived = None
2072

    
2073
    for (fn, archived) in path_functions:
2074
      filepath = fn(job_id)
2075
      logging.debug("Loading job from %s", filepath)
2076
      try:
2077
        raw_data = utils.ReadFile(filepath)
2078
      except EnvironmentError, err:
2079
        if err.errno != errno.ENOENT:
2080
          raise
2081
      else:
2082
        break
2083

    
2084
    if not raw_data:
2085
      return None
2086

    
2087
    if writable is None:
2088
      writable = not archived
2089

    
2090
    try:
2091
      data = serializer.LoadJson(raw_data)
2092
      job = _QueuedJob.Restore(self, data, writable, archived)
2093
    except Exception, err: # pylint: disable=W0703
2094
      raise errors.JobFileCorrupted(err)
2095

    
2096
    return job
2097

    
2098
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2099
    """Load the given job file from disk.
2100

2101
    Given a job file, read, load and restore it in a _QueuedJob format.
2102
    In case of error reading the job, it gets returned as None, and the
2103
    exception is logged.
2104

2105
    @type job_id: int
2106
    @param job_id: job identifier
2107
    @type try_archived: bool
2108
    @param try_archived: Whether to try loading an archived job
2109
    @rtype: L{_QueuedJob} or None
2110
    @return: either None or the job object
2111

2112
    """
2113
    try:
2114
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2115
    except (errors.JobFileCorrupted, EnvironmentError):
2116
      logging.exception("Can't load/parse job %s", job_id)
2117
      return None
2118

    
2119
  def _UpdateQueueSizeUnlocked(self):
2120
    """Update the queue size.
2121

2122
    """
2123
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2124

    
2125
  @locking.ssynchronized(_LOCK)
2126
  @_RequireOpenQueue
2127
  def SetDrainFlag(self, drain_flag):
2128
    """Sets the drain flag for the queue.
2129

2130
    @type drain_flag: boolean
2131
    @param drain_flag: Whether to set or unset the drain flag
2132

2133
    """
2134
    # Change flag locally
2135
    jstore.SetDrainFlag(drain_flag)
2136

    
2137
    self._drained = drain_flag
2138

    
2139
    # ... and on all nodes
2140
    (names, addrs) = self._GetNodeIp()
2141
    result = \
2142
      self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2143
    self._CheckRpcResult(result, self._nodes,
2144
                         "Setting queue drain flag to %s" % drain_flag)
2145

    
2146
    return True
2147

    
2148
  @classmethod
2149
  def SubmitJob(cls, ops):
2150
    """Create and store a new job.
2151

2152
    """
2153
    return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitJob(ops)
2154

    
2155
  @classmethod
2156
  def SubmitJobToDrainedQueue(cls, ops):
2157
    """Forcefully create and store a new job.
2158

2159
    Do so, even if the job queue is drained.
2160

2161
    """
2162
    return luxi.Client(address=pathutils.QUERY_SOCKET)\
2163
        .SubmitJobToDrainedQueue(ops)
2164

    
2165
  @classmethod
2166
  def SubmitManyJobs(cls, jobs):
2167
    """Create and store multiple jobs.
2168

2169
    """
2170
    return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitManyJobs(jobs)
2171

    
2172
  @staticmethod
2173
  def _FormatSubmitError(msg, ops):
2174
    """Formats errors which occurred while submitting a job.
2175

2176
    """
2177
    return ("%s; opcodes %s" %
2178
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2179

    
2180
  @staticmethod
2181
  def _ResolveJobDependencies(resolve_fn, deps):
2182
    """Resolves relative job IDs in dependencies.
2183

2184
    @type resolve_fn: callable
2185
    @param resolve_fn: Function to resolve a relative job ID
2186
    @type deps: list
2187
    @param deps: Dependencies
2188
    @rtype: tuple; (boolean, string or list)
2189
    @return: If successful (first tuple item), the returned list contains
2190
      resolved job IDs along with the requested status; if not successful,
2191
      the second element is an error message
2192

2193
    """
2194
    result = []
2195

    
2196
    for (dep_job_id, dep_status) in deps:
2197
      if ht.TRelativeJobId(dep_job_id):
2198
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2199
        try:
2200
          job_id = resolve_fn(dep_job_id)
2201
        except IndexError:
2202
          # Abort
2203
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2204
      else:
2205
        job_id = dep_job_id
2206

    
2207
      result.append((job_id, dep_status))
2208

    
2209
    return (True, result)
2210

    
2211
  @locking.ssynchronized(_LOCK)
2212
  def _EnqueueJobs(self, jobs):
2213
    """Helper function to add jobs to worker pool's queue.
2214

2215
    @type jobs: list
2216
    @param jobs: List of all jobs
2217

2218
    """
2219
    return self._EnqueueJobsUnlocked(jobs)
2220

    
2221
  def _EnqueueJobsUnlocked(self, jobs):
2222
    """Helper function to add jobs to worker pool's queue.
2223

2224
    @type jobs: list
2225
    @param jobs: List of all jobs
2226

2227
    """
2228
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2229
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2230
                             priority=[job.CalcPriority() for job in jobs],
2231
                             task_id=map(_GetIdAttr, jobs))
2232

    
2233
  def _GetJobStatusForDependencies(self, job_id):
2234
    """Gets the status of a job for dependencies.
2235

2236
    @type job_id: int
2237
    @param job_id: Job ID
2238
    @raise errors.JobLost: If job can't be found
2239

2240
    """
2241
    # Not using in-memory cache as doing so would require an exclusive lock
2242

    
2243
    # Try to load from disk
2244
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2245

    
2246
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2247

    
2248
    if job:
2249
      return job.CalcStatus()
2250

    
2251
    raise errors.JobLost("Job %s not found" % job_id)
2252

    
2253
  @_RequireOpenQueue
2254
  def UpdateJobUnlocked(self, job, replicate=True):
2255
    """Update a job's on disk storage.
2256

2257
    After a job has been modified, this function needs to be called in
2258
    order to write the changes to disk and replicate them to the other
2259
    nodes.
2260

2261
    @type job: L{_QueuedJob}
2262
    @param job: the changed job
2263
    @type replicate: boolean
2264
    @param replicate: whether to replicate the change to remote nodes
2265

2266
    """
2267
    if __debug__:
2268
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2269
      assert (finalized ^ (job.end_timestamp is None))
2270
      assert job.writable, "Can't update read-only job"
2271
      assert not job.archived, "Can't update archived job"
2272

    
2273
    filename = self._GetJobPath(job.id)
2274
    data = serializer.DumpJson(job.Serialize())
2275
    logging.debug("Writing job %s to %s", job.id, filename)
2276
    self._UpdateJobQueueFile(filename, data, replicate)
2277

    
2278
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2279
                        timeout):
2280
    """Waits for changes in a job.
2281

2282
    @type job_id: int
2283
    @param job_id: Job identifier
2284
    @type fields: list of strings
2285
    @param fields: Which fields to check for changes
2286
    @type prev_job_info: list or None
2287
    @param prev_job_info: Last job information returned
2288
    @type prev_log_serial: int
2289
    @param prev_log_serial: Last job message serial number
2290
    @type timeout: float
2291
    @param timeout: maximum time to wait in seconds
2292
    @rtype: tuple (job info, log entries)
2293
    @return: a tuple of the job information as required via
2294
        the fields parameter, and the log entries as a list
2295

2296
        if the job has not changed and the timeout has expired,
2297
        we instead return a special value,
2298
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2299
        as such by the clients
2300

2301
    """
2302
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2303
                             writable=False)
2304

    
2305
    helper = _WaitForJobChangesHelper()
2306

    
2307
    return helper(self._GetJobPath(job_id), load_fn,
2308
                  fields, prev_job_info, prev_log_serial, timeout)
2309

    
2310
  @locking.ssynchronized(_LOCK)
2311
  @_RequireOpenQueue
2312
  def CancelJob(self, job_id):
2313
    """Cancels a job.
2314

2315
    This will only succeed if the job has not started yet.
2316

2317
    @type job_id: int
2318
    @param job_id: job ID of job to be cancelled.
2319

2320
    """
2321
    logging.info("Cancelling job %s", job_id)
2322

    
2323
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2324

    
2325
  @locking.ssynchronized(_LOCK)
2326
  @_RequireOpenQueue
2327
  def ChangeJobPriority(self, job_id, priority):
2328
    """Changes a job's priority.
2329

2330
    @type job_id: int
2331
    @param job_id: ID of the job whose priority should be changed
2332
    @type priority: int
2333
    @param priority: New priority
2334

2335
    """
2336
    logging.info("Changing priority of job %s to %s", job_id, priority)
2337

    
2338
    if priority not in constants.OP_PRIO_SUBMIT_VALID:
2339
      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2340
      raise errors.GenericError("Invalid priority %s, allowed are %s" %
2341
                                (priority, allowed))
2342

    
2343
    def fn(job):
2344
      (success, msg) = job.ChangePriority(priority)
2345

    
2346
      if success:
2347
        try:
2348
          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2349
        except workerpool.NoSuchTask:
2350
          logging.debug("Job %s is not in workerpool at this time", job.id)
2351

    
2352
      return (success, msg)
2353

    
2354
    return self._ModifyJobUnlocked(job_id, fn)
2355

    
2356
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2357
    """Modifies a job.
2358

2359
    @type job_id: int
2360
    @param job_id: Job ID
2361
    @type mod_fn: callable
2362
    @param mod_fn: Modifying function, receiving job object as parameter,
2363
      returning tuple of (status boolean, message string)
2364

2365
    """
2366
    job = self._LoadJobUnlocked(job_id)
2367
    if not job:
2368
      logging.debug("Job %s not found", job_id)
2369
      return (False, "Job %s not found" % job_id)
2370

    
2371
    assert job.writable, "Can't modify read-only job"
2372
    assert not job.archived, "Can't modify archived job"
2373

    
2374
    (success, msg) = mod_fn(job)
2375

    
2376
    if success:
2377
      # If the job was finalized (e.g. cancelled), this is the final write
2378
      # allowed. The job can be archived anytime.
2379
      self.UpdateJobUnlocked(job)
2380

    
2381
    return (success, msg)
2382

    
2383
  @_RequireOpenQueue
2384
  def _ArchiveJobsUnlocked(self, jobs):
2385
    """Archives jobs.
2386

2387
    @type jobs: list of L{_QueuedJob}
2388
    @param jobs: Job objects
2389
    @rtype: int
2390
    @return: Number of archived jobs
2391

2392
    """
2393
    archive_jobs = []
2394
    rename_files = []
2395
    for job in jobs:
2396
      assert job.writable, "Can't archive read-only job"
2397
      assert not job.archived, "Can't cancel archived job"
2398

    
2399
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2400
        logging.debug("Job %s is not yet done", job.id)
2401
        continue
2402

    
2403
      archive_jobs.append(job)
2404

    
2405
      old = self._GetJobPath(job.id)
2406
      new = self._GetArchivedJobPath(job.id)
2407
      rename_files.append((old, new))
2408

    
2409
    # TODO: What if 1..n files fail to rename?
2410
    self._RenameFilesUnlocked(rename_files)
2411

    
2412
    logging.debug("Successfully archived job(s) %s",
2413
                  utils.CommaJoin(job.id for job in archive_jobs))
2414

    
2415
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2416
    # the files, we update the cached queue size from the filesystem. When we
2417
    # get around to fix the TODO: above, we can use the number of actually
2418
    # archived jobs to fix this.
2419
    self._UpdateQueueSizeUnlocked()
2420
    return len(archive_jobs)
2421

    
2422
  @locking.ssynchronized(_LOCK)
2423
  @_RequireOpenQueue
2424
  def ArchiveJob(self, job_id):
2425
    """Archives a job.
2426

2427
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2428

2429
    @type job_id: int
2430
    @param job_id: Job ID of job to be archived.
2431
    @rtype: bool
2432
    @return: Whether job was archived
2433

2434
    """
2435
    logging.info("Archiving job %s", job_id)
2436

    
2437
    job = self._LoadJobUnlocked(job_id)
2438
    if not job:
2439
      logging.debug("Job %s not found", job_id)
2440
      return False
2441

    
2442
    return self._ArchiveJobsUnlocked([job]) == 1
2443

    
2444
  @locking.ssynchronized(_LOCK)
2445
  @_RequireOpenQueue
2446
  def AutoArchiveJobs(self, age, timeout):
2447
    """Archives all jobs based on age.
2448

2449
    The method will archive all jobs which are older than the age
2450
    parameter. For jobs that don't have an end timestamp, the start
2451
    timestamp will be considered. The special '-1' age will cause
2452
    archival of all jobs (that are not running or queued).
2453

2454
    @type age: int
2455
    @param age: the minimum age in seconds
2456

2457
    """
2458
    logging.info("Archiving jobs with age more than %s seconds", age)
2459

    
2460
    now = time.time()
2461
    end_time = now + timeout
2462
    archived_count = 0
2463
    last_touched = 0
2464

    
2465
    all_job_ids = self._GetJobIDsUnlocked()
2466
    pending = []
2467
    for idx, job_id in enumerate(all_job_ids):
2468
      last_touched = idx + 1
2469

    
2470
      # Not optimal because jobs could be pending
2471
      # TODO: Measure average duration for job archival and take number of
2472
      # pending jobs into account.
2473
      if time.time() > end_time:
2474
        break
2475

    
2476
      # Returns None if the job failed to load
2477
      job = self._LoadJobUnlocked(job_id)
2478
      if job:
2479
        if job.end_timestamp is None:
2480
          if job.start_timestamp is None:
2481
            job_age = job.received_timestamp
2482
          else:
2483
            job_age = job.start_timestamp
2484
        else:
2485
          job_age = job.end_timestamp
2486

    
2487
        if age == -1 or now - job_age[0] > age:
2488
          pending.append(job)
2489

    
2490
          # Archive 10 jobs at a time
2491
          if len(pending) >= 10:
2492
            archived_count += self._ArchiveJobsUnlocked(pending)
2493
            pending = []
2494

    
2495
    if pending:
2496
      archived_count += self._ArchiveJobsUnlocked(pending)
2497

    
2498
    return (archived_count, len(all_job_ids) - last_touched)
2499

    
2500
  def _Query(self, fields, qfilter):
2501
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2502
                       namefield="id")
2503

    
2504
    # Archived jobs are only looked at if the "archived" field is referenced
2505
    # either as a requested field or in the filter. By default archived jobs
2506
    # are ignored.
2507
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2508

    
2509
    job_ids = qobj.RequestedNames()
2510

    
2511
    list_all = (job_ids is None)
2512

    
2513
    if list_all:
2514
      # Since files are added to/removed from the queue atomically, there's no
2515
      # risk of getting the job ids in an inconsistent state.
2516
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2517

    
2518
    jobs = []
2519

    
2520
    for job_id in job_ids:
2521
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2522
      if job is not None or not list_all:
2523
        jobs.append((job_id, job))
2524

    
2525
    return (qobj, jobs, list_all)
2526

    
2527
  def QueryJobs(self, fields, qfilter):
2528
    """Returns a list of jobs in queue.
2529

2530
    @type fields: sequence
2531
    @param fields: List of wanted fields
2532
    @type qfilter: None or query2 filter (list)
2533
    @param qfilter: Query filter
2534

2535
    """
2536
    (qobj, ctx, _) = self._Query(fields, qfilter)
2537

    
2538
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2539

    
2540
  def OldStyleQueryJobs(self, job_ids, fields):
2541
    """Returns a list of jobs in queue.
2542

2543
    @type job_ids: list
2544
    @param job_ids: sequence of job identifiers or None for all
2545
    @type fields: list
2546
    @param fields: names of fields to return
2547
    @rtype: list
2548
    @return: list one element per job, each element being list with
2549
        the requested fields
2550

2551
    """
2552
    # backwards compat:
2553
    job_ids = [int(jid) for jid in job_ids]
2554
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2555

    
2556
    (qobj, ctx, _) = self._Query(fields, qfilter)
2557

    
2558
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2559

    
2560
  @locking.ssynchronized(_LOCK)
2561
  def PrepareShutdown(self):
2562
    """Prepare to stop the job queue.
2563

2564
    Disables execution of jobs in the workerpool and returns whether there are
2565
    any jobs currently running. If the latter is the case, the job queue is not
2566
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2567
    be called without interfering with any job. Queued and unfinished jobs will
2568
    be resumed next time.
2569

2570
    Once this function has been called no new job submissions will be accepted
2571
    (see L{_RequireNonDrainedQueue}).
2572

2573
    @rtype: bool
2574
    @return: Whether there are any running jobs
2575

2576
    """
2577
    if self._accepting_jobs:
2578
      self._accepting_jobs = False
2579

    
2580
      # Tell worker pool to stop processing pending tasks
2581
      self._wpool.SetActive(False)
2582

    
2583
    return self._wpool.HasRunningTasks()
2584

    
2585
  def AcceptingJobsUnlocked(self):
2586
    """Returns whether jobs are accepted.
2587

2588
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2589
    queue is shutting down.
2590

2591
    @rtype: bool
2592

2593
    """
2594
    return self._accepting_jobs
2595

    
2596
  @locking.ssynchronized(_LOCK)
2597
  @_RequireOpenQueue
2598
  def Shutdown(self):
2599
    """Stops the job queue.
2600

2601
    This shutdowns all the worker threads an closes the queue.
2602

2603
    """
2604
    self._wpool.TerminateWorkers()
2605

    
2606
    self._queue_filelock.Close()
2607
    self._queue_filelock = None