Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 93f1e606

History | View | Annotate | Download (75.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import time
35
import weakref
36
import threading
37
import itertools
38
import operator
39

    
40
try:
41
  # pylint: disable=E0611
42
  from pyinotify import pyinotify
43
except ImportError:
44
  import pyinotify
45

    
46
from ganeti import asyncnotifier
47
from ganeti import constants
48
from ganeti import serializer
49
from ganeti import workerpool
50
from ganeti import locking
51
from ganeti import luxi
52
from ganeti import opcodes
53
from ganeti import opcodes_base
54
from ganeti import errors
55
from ganeti import mcpu
56
from ganeti import utils
57
from ganeti import jstore
58
import ganeti.rpc.node as rpc
59
from ganeti import runtime
60
from ganeti import netutils
61
from ganeti import compat
62
from ganeti import ht
63
from ganeti import query
64
from ganeti import qlang
65
from ganeti import pathutils
66
from ganeti import vcluster
67

    
68

    
69
JOBQUEUE_THREADS = 25
70

    
71
# member lock names to be passed to @ssynchronized decorator
72
_LOCK = "_lock"
73
_QUEUE = "_queue"
74

    
75
#: Retrieves "id" attribute
76
_GetIdAttr = operator.attrgetter("id")
77

    
78

    
79
class CancelJob(Exception):
80
  """Special exception to cancel a job.
81

82
  """
83

    
84

    
85
class QueueShutdown(Exception):
86
  """Special exception to abort a job when the job queue is shutting down.
87

88
  """
89

    
90

    
91
def TimeStampNow():
92
  """Returns the current timestamp.
93

94
  @rtype: tuple
95
  @return: the current time in the (seconds, microseconds) format
96

97
  """
98
  return utils.SplitTime(time.time())
99

    
100

    
101
def _CallJqUpdate(runner, names, file_name, content):
102
  """Updates job queue file after virtualizing filename.
103

104
  """
105
  virt_file_name = vcluster.MakeVirtualPath(file_name)
106
  return runner.call_jobqueue_update(names, virt_file_name, content)
107

    
108

    
109
class _SimpleJobQuery:
110
  """Wrapper for job queries.
111

112
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
113

114
  """
115
  def __init__(self, fields):
116
    """Initializes this class.
117

118
    """
119
    self._query = query.Query(query.JOB_FIELDS, fields)
120

    
121
  def __call__(self, job):
122
    """Executes a job query using cached field list.
123

124
    """
125
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
126

    
127

    
128
class _QueuedOpCode(object):
129
  """Encapsulates an opcode object.
130

131
  @ivar log: holds the execution log and consists of tuples
132
  of the form C{(log_serial, timestamp, level, message)}
133
  @ivar input: the OpCode we encapsulate
134
  @ivar status: the current status
135
  @ivar result: the result of the LU execution
136
  @ivar start_timestamp: timestamp for the start of the execution
137
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
138
  @ivar stop_timestamp: timestamp for the end of the execution
139

140
  """
141
  __slots__ = ["input", "status", "result", "log", "priority",
142
               "start_timestamp", "exec_timestamp", "end_timestamp",
143
               "__weakref__"]
144

    
145
  def __init__(self, op):
146
    """Initializes instances of this class.
147

148
    @type op: L{opcodes.OpCode}
149
    @param op: the opcode we encapsulate
150

151
    """
152
    self.input = op
153
    self.status = constants.OP_STATUS_QUEUED
154
    self.result = None
155
    self.log = []
156
    self.start_timestamp = None
157
    self.exec_timestamp = None
158
    self.end_timestamp = None
159

    
160
    # Get initial priority (it might change during the lifetime of this opcode)
161
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
162

    
163
  @classmethod
164
  def Restore(cls, state):
165
    """Restore the _QueuedOpCode from the serialized form.
166

167
    @type state: dict
168
    @param state: the serialized state
169
    @rtype: _QueuedOpCode
170
    @return: a new _QueuedOpCode instance
171

172
    """
173
    obj = _QueuedOpCode.__new__(cls)
174
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
175
    obj.status = state["status"]
176
    obj.result = state["result"]
177
    obj.log = state["log"]
178
    obj.start_timestamp = state.get("start_timestamp", None)
179
    obj.exec_timestamp = state.get("exec_timestamp", None)
180
    obj.end_timestamp = state.get("end_timestamp", None)
181
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
182
    return obj
183

    
184
  def Serialize(self):
185
    """Serializes this _QueuedOpCode.
186

187
    @rtype: dict
188
    @return: the dictionary holding the serialized state
189

190
    """
191
    return {
192
      "input": self.input.__getstate__(),
193
      "status": self.status,
194
      "result": self.result,
195
      "log": self.log,
196
      "start_timestamp": self.start_timestamp,
197
      "exec_timestamp": self.exec_timestamp,
198
      "end_timestamp": self.end_timestamp,
199
      "priority": self.priority,
200
      }
201

    
202

    
203
class _QueuedJob(object):
204
  """In-memory job representation.
205

206
  This is what we use to track the user-submitted jobs. Locking must
207
  be taken care of by users of this class.
208

209
  @type queue: L{JobQueue}
210
  @ivar queue: the parent queue
211
  @ivar id: the job ID
212
  @type ops: list
213
  @ivar ops: the list of _QueuedOpCode that constitute the job
214
  @type log_serial: int
215
  @ivar log_serial: holds the index for the next log entry
216
  @ivar received_timestamp: the timestamp for when the job was received
217
  @ivar start_timestmap: the timestamp for start of execution
218
  @ivar end_timestamp: the timestamp for end of execution
219
  @ivar writable: Whether the job is allowed to be modified
220

221
  """
222
  # pylint: disable=W0212
223
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
224
               "received_timestamp", "start_timestamp", "end_timestamp",
225
               "__weakref__", "processor_lock", "writable", "archived"]
226

    
227
  def _AddReasons(self):
228
    """Extend the reason trail
229

230
    Add the reason for all the opcodes of this job to be executed.
231

232
    """
233
    count = 0
234
    for queued_op in self.ops:
235
      op = queued_op.input
236
      reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__)
237
      reason_text = "job=%d;index=%d" % (self.id, count)
238
      reason = getattr(op, "reason", [])
239
      reason.append((reason_src, reason_text, utils.EpochNano()))
240
      op.reason = reason
241
      count = count + 1
242

    
243
  def __init__(self, queue, job_id, ops, writable):
244
    """Constructor for the _QueuedJob.
245

246
    @type queue: L{JobQueue}
247
    @param queue: our parent queue
248
    @type job_id: job_id
249
    @param job_id: our job id
250
    @type ops: list
251
    @param ops: the list of opcodes we hold, which will be encapsulated
252
        in _QueuedOpCodes
253
    @type writable: bool
254
    @param writable: Whether job can be modified
255

256
    """
257
    if not ops:
258
      raise errors.GenericError("A job needs at least one opcode")
259

    
260
    self.queue = queue
261
    self.id = int(job_id)
262
    self.ops = [_QueuedOpCode(op) for op in ops]
263
    self._AddReasons()
264
    self.log_serial = 0
265
    self.received_timestamp = TimeStampNow()
266
    self.start_timestamp = None
267
    self.end_timestamp = None
268
    self.archived = False
269

    
270
    self._InitInMemory(self, writable)
271

    
272
    assert not self.archived, "New jobs can not be marked as archived"
273

    
274
  @staticmethod
275
  def _InitInMemory(obj, writable):
276
    """Initializes in-memory variables.
277

278
    """
279
    obj.writable = writable
280
    obj.ops_iter = None
281
    obj.cur_opctx = None
282

    
283
    # Read-only jobs are not processed and therefore don't need a lock
284
    if writable:
285
      obj.processor_lock = threading.Lock()
286
    else:
287
      obj.processor_lock = None
288

    
289
  def __repr__(self):
290
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
291
              "id=%s" % self.id,
292
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
293

    
294
    return "<%s at %#x>" % (" ".join(status), id(self))
295

    
296
  @classmethod
297
  def Restore(cls, queue, state, writable, archived):
298
    """Restore a _QueuedJob from serialized state:
299

300
    @type queue: L{JobQueue}
301
    @param queue: to which queue the restored job belongs
302
    @type state: dict
303
    @param state: the serialized state
304
    @type writable: bool
305
    @param writable: Whether job can be modified
306
    @type archived: bool
307
    @param archived: Whether job was already archived
308
    @rtype: _JobQueue
309
    @return: the restored _JobQueue instance
310

311
    """
312
    obj = _QueuedJob.__new__(cls)
313
    obj.queue = queue
314
    obj.id = int(state["id"])
315
    obj.received_timestamp = state.get("received_timestamp", None)
316
    obj.start_timestamp = state.get("start_timestamp", None)
317
    obj.end_timestamp = state.get("end_timestamp", None)
318
    obj.archived = archived
319

    
320
    obj.ops = []
321
    obj.log_serial = 0
322
    for op_state in state["ops"]:
323
      op = _QueuedOpCode.Restore(op_state)
324
      for log_entry in op.log:
325
        obj.log_serial = max(obj.log_serial, log_entry[0])
326
      obj.ops.append(op)
327

    
328
    cls._InitInMemory(obj, writable)
329

    
330
    return obj
331

    
332
  def Serialize(self):
333
    """Serialize the _JobQueue instance.
334

335
    @rtype: dict
336
    @return: the serialized state
337

338
    """
339
    return {
340
      "id": self.id,
341
      "ops": [op.Serialize() for op in self.ops],
342
      "start_timestamp": self.start_timestamp,
343
      "end_timestamp": self.end_timestamp,
344
      "received_timestamp": self.received_timestamp,
345
      }
346

    
347
  def CalcStatus(self):
348
    """Compute the status of this job.
349

350
    This function iterates over all the _QueuedOpCodes in the job and
351
    based on their status, computes the job status.
352

353
    The algorithm is:
354
      - if we find a cancelled, or finished with error, the job
355
        status will be the same
356
      - otherwise, the last opcode with the status one of:
357
          - waitlock
358
          - canceling
359
          - running
360

361
        will determine the job status
362

363
      - otherwise, it means either all opcodes are queued, or success,
364
        and the job status will be the same
365

366
    @return: the job status
367

368
    """
369
    status = constants.JOB_STATUS_QUEUED
370

    
371
    all_success = True
372
    for op in self.ops:
373
      if op.status == constants.OP_STATUS_SUCCESS:
374
        continue
375

    
376
      all_success = False
377

    
378
      if op.status == constants.OP_STATUS_QUEUED:
379
        pass
380
      elif op.status == constants.OP_STATUS_WAITING:
381
        status = constants.JOB_STATUS_WAITING
382
      elif op.status == constants.OP_STATUS_RUNNING:
383
        status = constants.JOB_STATUS_RUNNING
384
      elif op.status == constants.OP_STATUS_CANCELING:
385
        status = constants.JOB_STATUS_CANCELING
386
        break
387
      elif op.status == constants.OP_STATUS_ERROR:
388
        status = constants.JOB_STATUS_ERROR
389
        # The whole job fails if one opcode failed
390
        break
391
      elif op.status == constants.OP_STATUS_CANCELED:
392
        status = constants.OP_STATUS_CANCELED
393
        break
394

    
395
    if all_success:
396
      status = constants.JOB_STATUS_SUCCESS
397

    
398
    return status
399

    
400
  def CalcPriority(self):
401
    """Gets the current priority for this job.
402

403
    Only unfinished opcodes are considered. When all are done, the default
404
    priority is used.
405

406
    @rtype: int
407

408
    """
409
    priorities = [op.priority for op in self.ops
410
                  if op.status not in constants.OPS_FINALIZED]
411

    
412
    if not priorities:
413
      # All opcodes are done, assume default priority
414
      return constants.OP_PRIO_DEFAULT
415

    
416
    return min(priorities)
417

    
418
  def GetLogEntries(self, newer_than):
419
    """Selectively returns the log entries.
420

421
    @type newer_than: None or int
422
    @param newer_than: if this is None, return all log entries,
423
        otherwise return only the log entries with serial higher
424
        than this value
425
    @rtype: list
426
    @return: the list of the log entries selected
427

428
    """
429
    if newer_than is None:
430
      serial = -1
431
    else:
432
      serial = newer_than
433

    
434
    entries = []
435
    for op in self.ops:
436
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
437

    
438
    return entries
439

    
440
  def GetInfo(self, fields):
441
    """Returns information about a job.
442

443
    @type fields: list
444
    @param fields: names of fields to return
445
    @rtype: list
446
    @return: list with one element for each field
447
    @raise errors.OpExecError: when an invalid field
448
        has been passed
449

450
    """
451
    return _SimpleJobQuery(fields)(self)
452

    
453
  def MarkUnfinishedOps(self, status, result):
454
    """Mark unfinished opcodes with a given status and result.
455

456
    This is an utility function for marking all running or waiting to
457
    be run opcodes with a given status. Opcodes which are already
458
    finalised are not changed.
459

460
    @param status: a given opcode status
461
    @param result: the opcode result
462

463
    """
464
    not_marked = True
465
    for op in self.ops:
466
      if op.status in constants.OPS_FINALIZED:
467
        assert not_marked, "Finalized opcodes found after non-finalized ones"
468
        continue
469
      op.status = status
470
      op.result = result
471
      not_marked = False
472

    
473
  def Finalize(self):
474
    """Marks the job as finalized.
475

476
    """
477
    self.end_timestamp = TimeStampNow()
478

    
479
  def Cancel(self):
480
    """Marks job as canceled/-ing if possible.
481

482
    @rtype: tuple; (bool, string)
483
    @return: Boolean describing whether job was successfully canceled or marked
484
      as canceling and a text message
485

486
    """
487
    status = self.CalcStatus()
488

    
489
    if status == constants.JOB_STATUS_QUEUED:
490
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
491
                             "Job canceled by request")
492
      self.Finalize()
493
      return (True, "Job %s canceled" % self.id)
494

    
495
    elif status == constants.JOB_STATUS_WAITING:
496
      # The worker will notice the new status and cancel the job
497
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
498
      return (True, "Job %s will be canceled" % self.id)
499

    
500
    else:
501
      logging.debug("Job %s is no longer waiting in the queue", self.id)
502
      return (False, "Job %s is no longer waiting in the queue" % self.id)
503

    
504
  def ChangePriority(self, priority):
505
    """Changes the job priority.
506

507
    @type priority: int
508
    @param priority: New priority
509
    @rtype: tuple; (bool, string)
510
    @return: Boolean describing whether job's priority was successfully changed
511
      and a text message
512

513
    """
514
    status = self.CalcStatus()
515

    
516
    if status in constants.JOBS_FINALIZED:
517
      return (False, "Job %s is finished" % self.id)
518
    elif status == constants.JOB_STATUS_CANCELING:
519
      return (False, "Job %s is cancelling" % self.id)
520
    else:
521
      assert status in (constants.JOB_STATUS_QUEUED,
522
                        constants.JOB_STATUS_WAITING,
523
                        constants.JOB_STATUS_RUNNING)
524

    
525
      changed = False
526
      for op in self.ops:
527
        if (op.status == constants.OP_STATUS_RUNNING or
528
            op.status in constants.OPS_FINALIZED):
529
          assert not changed, \
530
            ("Found opcode for which priority should not be changed after"
531
             " priority has been changed for previous opcodes")
532
          continue
533

    
534
        assert op.status in (constants.OP_STATUS_QUEUED,
535
                             constants.OP_STATUS_WAITING)
536

    
537
        changed = True
538

    
539
        # Set new priority (doesn't modify opcode input)
540
        op.priority = priority
541

    
542
      if changed:
543
        return (True, ("Priorities of pending opcodes for job %s have been"
544
                       " changed to %s" % (self.id, priority)))
545
      else:
546
        return (False, "Job %s had no pending opcodes" % self.id)
547

    
548

    
549
class _OpExecCallbacks(mcpu.OpExecCbBase):
550
  def __init__(self, queue, job, op):
551
    """Initializes this class.
552

553
    @type queue: L{JobQueue}
554
    @param queue: Job queue
555
    @type job: L{_QueuedJob}
556
    @param job: Job object
557
    @type op: L{_QueuedOpCode}
558
    @param op: OpCode
559

560
    """
561
    assert queue, "Queue is missing"
562
    assert job, "Job is missing"
563
    assert op, "Opcode is missing"
564

    
565
    self._queue = queue
566
    self._job = job
567
    self._op = op
568

    
569
  def _CheckCancel(self):
570
    """Raises an exception to cancel the job if asked to.
571

572
    """
573
    # Cancel here if we were asked to
574
    if self._op.status == constants.OP_STATUS_CANCELING:
575
      logging.debug("Canceling opcode")
576
      raise CancelJob()
577

    
578
    # See if queue is shutting down
579
    if not self._queue.AcceptingJobsUnlocked():
580
      logging.debug("Queue is shutting down")
581
      raise QueueShutdown()
582

    
583
  @locking.ssynchronized(_QUEUE, shared=1)
584
  def NotifyStart(self):
585
    """Mark the opcode as running, not lock-waiting.
586

587
    This is called from the mcpu code as a notifier function, when the LU is
588
    finally about to start the Exec() method. Of course, to have end-user
589
    visible results, the opcode must be initially (before calling into
590
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
591

592
    """
593
    assert self._op in self._job.ops
594
    assert self._op.status in (constants.OP_STATUS_WAITING,
595
                               constants.OP_STATUS_CANCELING)
596

    
597
    # Cancel here if we were asked to
598
    self._CheckCancel()
599

    
600
    logging.debug("Opcode is now running")
601

    
602
    self._op.status = constants.OP_STATUS_RUNNING
603
    self._op.exec_timestamp = TimeStampNow()
604

    
605
    # And finally replicate the job status
606
    self._queue.UpdateJobUnlocked(self._job)
607

    
608
  @locking.ssynchronized(_QUEUE, shared=1)
609
  def _AppendFeedback(self, timestamp, log_type, log_msg):
610
    """Internal feedback append function, with locks
611

612
    """
613
    self._job.log_serial += 1
614
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
615
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
616

    
617
  def Feedback(self, *args):
618
    """Append a log entry.
619

620
    """
621
    assert len(args) < 3
622

    
623
    if len(args) == 1:
624
      log_type = constants.ELOG_MESSAGE
625
      log_msg = args[0]
626
    else:
627
      (log_type, log_msg) = args
628

    
629
    # The time is split to make serialization easier and not lose
630
    # precision.
631
    timestamp = utils.SplitTime(time.time())
632
    self._AppendFeedback(timestamp, log_type, log_msg)
633

    
634
  def CurrentPriority(self):
635
    """Returns current priority for opcode.
636

637
    """
638
    assert self._op.status in (constants.OP_STATUS_WAITING,
639
                               constants.OP_STATUS_CANCELING)
640

    
641
    # Cancel here if we were asked to
642
    self._CheckCancel()
643

    
644
    return self._op.priority
645

    
646
  def SubmitManyJobs(self, jobs):
647
    """Submits jobs for processing.
648

649
    See L{JobQueue.SubmitManyJobs}.
650

651
    """
652
    # Locking is done in job queue
653
    return self._queue.SubmitManyJobs(jobs)
654

    
655

    
656
class _JobChangesChecker(object):
657
  def __init__(self, fields, prev_job_info, prev_log_serial):
658
    """Initializes this class.
659

660
    @type fields: list of strings
661
    @param fields: Fields requested by LUXI client
662
    @type prev_job_info: string
663
    @param prev_job_info: previous job info, as passed by the LUXI client
664
    @type prev_log_serial: string
665
    @param prev_log_serial: previous job serial, as passed by the LUXI client
666

667
    """
668
    self._squery = _SimpleJobQuery(fields)
669
    self._prev_job_info = prev_job_info
670
    self._prev_log_serial = prev_log_serial
671

    
672
  def __call__(self, job):
673
    """Checks whether job has changed.
674

675
    @type job: L{_QueuedJob}
676
    @param job: Job object
677

678
    """
679
    assert not job.writable, "Expected read-only job"
680

    
681
    status = job.CalcStatus()
682
    job_info = self._squery(job)
683
    log_entries = job.GetLogEntries(self._prev_log_serial)
684

    
685
    # Serializing and deserializing data can cause type changes (e.g. from
686
    # tuple to list) or precision loss. We're doing it here so that we get
687
    # the same modifications as the data received from the client. Without
688
    # this, the comparison afterwards might fail without the data being
689
    # significantly different.
690
    # TODO: we just deserialized from disk, investigate how to make sure that
691
    # the job info and log entries are compatible to avoid this further step.
692
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
693
    # efficient, though floats will be tricky
694
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
695
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
696

    
697
    # Don't even try to wait if the job is no longer running, there will be
698
    # no changes.
699
    if (status not in (constants.JOB_STATUS_QUEUED,
700
                       constants.JOB_STATUS_RUNNING,
701
                       constants.JOB_STATUS_WAITING) or
702
        job_info != self._prev_job_info or
703
        (log_entries and self._prev_log_serial != log_entries[0][0])):
704
      logging.debug("Job %s changed", job.id)
705
      return (job_info, log_entries)
706

    
707
    return None
708

    
709

    
710
class _JobFileChangesWaiter(object):
711
  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
712
    """Initializes this class.
713

714
    @type filename: string
715
    @param filename: Path to job file
716
    @raises errors.InotifyError: if the notifier cannot be setup
717

718
    """
719
    self._wm = _inotify_wm_cls()
720
    self._inotify_handler = \
721
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
722
    self._notifier = \
723
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
724
    try:
725
      self._inotify_handler.enable()
726
    except Exception:
727
      # pyinotify doesn't close file descriptors automatically
728
      self._notifier.stop()
729
      raise
730

    
731
  def _OnInotify(self, notifier_enabled):
732
    """Callback for inotify.
733

734
    """
735
    if not notifier_enabled:
736
      self._inotify_handler.enable()
737

    
738
  def Wait(self, timeout):
739
    """Waits for the job file to change.
740

741
    @type timeout: float
742
    @param timeout: Timeout in seconds
743
    @return: Whether there have been events
744

745
    """
746
    assert timeout >= 0
747
    have_events = self._notifier.check_events(timeout * 1000)
748
    if have_events:
749
      self._notifier.read_events()
750
    self._notifier.process_events()
751
    return have_events
752

    
753
  def Close(self):
754
    """Closes underlying notifier and its file descriptor.
755

756
    """
757
    self._notifier.stop()
758

    
759

    
760
class _JobChangesWaiter(object):
761
  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
762
    """Initializes this class.
763

764
    @type filename: string
765
    @param filename: Path to job file
766

767
    """
768
    self._filewaiter = None
769
    self._filename = filename
770
    self._waiter_cls = _waiter_cls
771

    
772
  def Wait(self, timeout):
773
    """Waits for a job to change.
774

775
    @type timeout: float
776
    @param timeout: Timeout in seconds
777
    @return: Whether there have been events
778

779
    """
780
    if self._filewaiter:
781
      return self._filewaiter.Wait(timeout)
782

    
783
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
784
    # If this point is reached, return immediately and let caller check the job
785
    # file again in case there were changes since the last check. This avoids a
786
    # race condition.
787
    self._filewaiter = self._waiter_cls(self._filename)
788

    
789
    return True
790

    
791
  def Close(self):
792
    """Closes underlying waiter.
793

794
    """
795
    if self._filewaiter:
796
      self._filewaiter.Close()
797

    
798

    
799
class _WaitForJobChangesHelper(object):
800
  """Helper class using inotify to wait for changes in a job file.
801

802
  This class takes a previous job status and serial, and alerts the client when
803
  the current job status has changed.
804

805
  """
806
  @staticmethod
807
  def _CheckForChanges(counter, job_load_fn, check_fn):
808
    if counter.next() > 0:
809
      # If this isn't the first check the job is given some more time to change
810
      # again. This gives better performance for jobs generating many
811
      # changes/messages.
812
      time.sleep(0.1)
813

    
814
    job = job_load_fn()
815
    if not job:
816
      raise errors.JobLost()
817

    
818
    result = check_fn(job)
819
    if result is None:
820
      raise utils.RetryAgain()
821

    
822
    return result
823

    
824
  def __call__(self, filename, job_load_fn,
825
               fields, prev_job_info, prev_log_serial, timeout,
826
               _waiter_cls=_JobChangesWaiter):
827
    """Waits for changes on a job.
828

829
    @type filename: string
830
    @param filename: File on which to wait for changes
831
    @type job_load_fn: callable
832
    @param job_load_fn: Function to load job
833
    @type fields: list of strings
834
    @param fields: Which fields to check for changes
835
    @type prev_job_info: list or None
836
    @param prev_job_info: Last job information returned
837
    @type prev_log_serial: int
838
    @param prev_log_serial: Last job message serial number
839
    @type timeout: float
840
    @param timeout: maximum time to wait in seconds
841

842
    """
843
    counter = itertools.count()
844
    try:
845
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
846
      waiter = _waiter_cls(filename)
847
      try:
848
        return utils.Retry(compat.partial(self._CheckForChanges,
849
                                          counter, job_load_fn, check_fn),
850
                           utils.RETRY_REMAINING_TIME, timeout,
851
                           wait_fn=waiter.Wait)
852
      finally:
853
        waiter.Close()
854
    except errors.JobLost:
855
      return None
856
    except utils.RetryTimeout:
857
      return constants.JOB_NOTCHANGED
858

    
859

    
860
def _EncodeOpError(err):
861
  """Encodes an error which occurred while processing an opcode.
862

863
  """
864
  if isinstance(err, errors.GenericError):
865
    to_encode = err
866
  else:
867
    to_encode = errors.OpExecError(str(err))
868

    
869
  return errors.EncodeException(to_encode)
870

    
871

    
872
class _TimeoutStrategyWrapper:
873
  def __init__(self, fn):
874
    """Initializes this class.
875

876
    """
877
    self._fn = fn
878
    self._next = None
879

    
880
  def _Advance(self):
881
    """Gets the next timeout if necessary.
882

883
    """
884
    if self._next is None:
885
      self._next = self._fn()
886

    
887
  def Peek(self):
888
    """Returns the next timeout.
889

890
    """
891
    self._Advance()
892
    return self._next
893

    
894
  def Next(self):
895
    """Returns the current timeout and advances the internal state.
896

897
    """
898
    self._Advance()
899
    result = self._next
900
    self._next = None
901
    return result
902

    
903

    
904
class _OpExecContext:
905
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
906
    """Initializes this class.
907

908
    """
909
    self.op = op
910
    self.index = index
911
    self.log_prefix = log_prefix
912
    self.summary = op.input.Summary()
913

    
914
    # Create local copy to modify
915
    if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
916
      self.jobdeps = op.input.depends[:]
917
    else:
918
      self.jobdeps = None
919

    
920
    self._timeout_strategy_factory = timeout_strategy_factory
921
    self._ResetTimeoutStrategy()
922

    
923
  def _ResetTimeoutStrategy(self):
924
    """Creates a new timeout strategy.
925

926
    """
927
    self._timeout_strategy = \
928
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
929

    
930
  def CheckPriorityIncrease(self):
931
    """Checks whether priority can and should be increased.
932

933
    Called when locks couldn't be acquired.
934

935
    """
936
    op = self.op
937

    
938
    # Exhausted all retries and next round should not use blocking acquire
939
    # for locks?
940
    if (self._timeout_strategy.Peek() is None and
941
        op.priority > constants.OP_PRIO_HIGHEST):
942
      logging.debug("Increasing priority")
943
      op.priority -= 1
944
      self._ResetTimeoutStrategy()
945
      return True
946

    
947
    return False
948

    
949
  def GetNextLockTimeout(self):
950
    """Returns the next lock acquire timeout.
951

952
    """
953
    return self._timeout_strategy.Next()
954

    
955

    
956
class _JobProcessor(object):
957
  (DEFER,
958
   WAITDEP,
959
   FINISHED) = range(1, 4)
960

    
961
  def __init__(self, queue, opexec_fn, job,
962
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
963
    """Initializes this class.
964

965
    """
966
    self.queue = queue
967
    self.opexec_fn = opexec_fn
968
    self.job = job
969
    self._timeout_strategy_factory = _timeout_strategy_factory
970

    
971
  @staticmethod
972
  def _FindNextOpcode(job, timeout_strategy_factory):
973
    """Locates the next opcode to run.
974

975
    @type job: L{_QueuedJob}
976
    @param job: Job object
977
    @param timeout_strategy_factory: Callable to create new timeout strategy
978

979
    """
980
    # Create some sort of a cache to speed up locating next opcode for future
981
    # lookups
982
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
983
    # pending and one for processed ops.
984
    if job.ops_iter is None:
985
      job.ops_iter = enumerate(job.ops)
986

    
987
    # Find next opcode to run
988
    while True:
989
      try:
990
        (idx, op) = job.ops_iter.next()
991
      except StopIteration:
992
        raise errors.ProgrammerError("Called for a finished job")
993

    
994
      if op.status == constants.OP_STATUS_RUNNING:
995
        # Found an opcode already marked as running
996
        raise errors.ProgrammerError("Called for job marked as running")
997

    
998
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
999
                             timeout_strategy_factory)
1000

    
1001
      if op.status not in constants.OPS_FINALIZED:
1002
        return opctx
1003

    
1004
      # This is a job that was partially completed before master daemon
1005
      # shutdown, so it can be expected that some opcodes are already
1006
      # completed successfully (if any did error out, then the whole job
1007
      # should have been aborted and not resubmitted for processing).
1008
      logging.info("%s: opcode %s already processed, skipping",
1009
                   opctx.log_prefix, opctx.summary)
1010

    
1011
  @staticmethod
1012
  def _MarkWaitlock(job, op):
1013
    """Marks an opcode as waiting for locks.
1014

1015
    The job's start timestamp is also set if necessary.
1016

1017
    @type job: L{_QueuedJob}
1018
    @param job: Job object
1019
    @type op: L{_QueuedOpCode}
1020
    @param op: Opcode object
1021

1022
    """
1023
    assert op in job.ops
1024
    assert op.status in (constants.OP_STATUS_QUEUED,
1025
                         constants.OP_STATUS_WAITING)
1026

    
1027
    update = False
1028

    
1029
    op.result = None
1030

    
1031
    if op.status == constants.OP_STATUS_QUEUED:
1032
      op.status = constants.OP_STATUS_WAITING
1033
      update = True
1034

    
1035
    if op.start_timestamp is None:
1036
      op.start_timestamp = TimeStampNow()
1037
      update = True
1038

    
1039
    if job.start_timestamp is None:
1040
      job.start_timestamp = op.start_timestamp
1041
      update = True
1042

    
1043
    assert op.status == constants.OP_STATUS_WAITING
1044

    
1045
    return update
1046

    
1047
  @staticmethod
1048
  def _CheckDependencies(queue, job, opctx):
1049
    """Checks if an opcode has dependencies and if so, processes them.
1050

1051
    @type queue: L{JobQueue}
1052
    @param queue: Queue object
1053
    @type job: L{_QueuedJob}
1054
    @param job: Job object
1055
    @type opctx: L{_OpExecContext}
1056
    @param opctx: Opcode execution context
1057
    @rtype: bool
1058
    @return: Whether opcode will be re-scheduled by dependency tracker
1059

1060
    """
1061
    op = opctx.op
1062

    
1063
    result = False
1064

    
1065
    while opctx.jobdeps:
1066
      (dep_job_id, dep_status) = opctx.jobdeps[0]
1067

    
1068
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1069
                                                          dep_status)
1070
      assert ht.TNonEmptyString(depmsg), "No dependency message"
1071

    
1072
      logging.info("%s: %s", opctx.log_prefix, depmsg)
1073

    
1074
      if depresult == _JobDependencyManager.CONTINUE:
1075
        # Remove dependency and continue
1076
        opctx.jobdeps.pop(0)
1077

    
1078
      elif depresult == _JobDependencyManager.WAIT:
1079
        # Need to wait for notification, dependency tracker will re-add job
1080
        # to workerpool
1081
        result = True
1082
        break
1083

    
1084
      elif depresult == _JobDependencyManager.CANCEL:
1085
        # Job was cancelled, cancel this job as well
1086
        job.Cancel()
1087
        assert op.status == constants.OP_STATUS_CANCELING
1088
        break
1089

    
1090
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1091
                         _JobDependencyManager.ERROR):
1092
        # Job failed or there was an error, this job must fail
1093
        op.status = constants.OP_STATUS_ERROR
1094
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1095
        break
1096

    
1097
      else:
1098
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1099
                                     depresult)
1100

    
1101
    return result
1102

    
1103
  def _ExecOpCodeUnlocked(self, opctx):
1104
    """Processes one opcode and returns the result.
1105

1106
    """
1107
    op = opctx.op
1108

    
1109
    assert op.status == constants.OP_STATUS_WAITING
1110

    
1111
    timeout = opctx.GetNextLockTimeout()
1112

    
1113
    try:
1114
      # Make sure not to hold queue lock while calling ExecOpCode
1115
      result = self.opexec_fn(op.input,
1116
                              _OpExecCallbacks(self.queue, self.job, op),
1117
                              timeout=timeout)
1118
    except mcpu.LockAcquireTimeout:
1119
      assert timeout is not None, "Received timeout for blocking acquire"
1120
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1121

    
1122
      assert op.status in (constants.OP_STATUS_WAITING,
1123
                           constants.OP_STATUS_CANCELING)
1124

    
1125
      # Was job cancelled while we were waiting for the lock?
1126
      if op.status == constants.OP_STATUS_CANCELING:
1127
        return (constants.OP_STATUS_CANCELING, None)
1128

    
1129
      # Queue is shutting down, return to queued
1130
      if not self.queue.AcceptingJobsUnlocked():
1131
        return (constants.OP_STATUS_QUEUED, None)
1132

    
1133
      # Stay in waitlock while trying to re-acquire lock
1134
      return (constants.OP_STATUS_WAITING, None)
1135
    except CancelJob:
1136
      logging.exception("%s: Canceling job", opctx.log_prefix)
1137
      assert op.status == constants.OP_STATUS_CANCELING
1138
      return (constants.OP_STATUS_CANCELING, None)
1139

    
1140
    except QueueShutdown:
1141
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1142

    
1143
      assert op.status == constants.OP_STATUS_WAITING
1144

    
1145
      # Job hadn't been started yet, so it should return to the queue
1146
      return (constants.OP_STATUS_QUEUED, None)
1147

    
1148
    except Exception, err: # pylint: disable=W0703
1149
      logging.exception("%s: Caught exception in %s",
1150
                        opctx.log_prefix, opctx.summary)
1151
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1152
    else:
1153
      logging.debug("%s: %s successful",
1154
                    opctx.log_prefix, opctx.summary)
1155
      return (constants.OP_STATUS_SUCCESS, result)
1156

    
1157
  def __call__(self, _nextop_fn=None):
1158
    """Continues execution of a job.
1159

1160
    @param _nextop_fn: Callback function for tests
1161
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1162
      be deferred and C{WAITDEP} if the dependency manager
1163
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1164

1165
    """
1166
    queue = self.queue
1167
    job = self.job
1168

    
1169
    logging.debug("Processing job %s", job.id)
1170

    
1171
    queue.acquire(shared=1)
1172
    try:
1173
      opcount = len(job.ops)
1174

    
1175
      assert job.writable, "Expected writable job"
1176

    
1177
      # Don't do anything for finalized jobs
1178
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1179
        return self.FINISHED
1180

    
1181
      # Is a previous opcode still pending?
1182
      if job.cur_opctx:
1183
        opctx = job.cur_opctx
1184
        job.cur_opctx = None
1185
      else:
1186
        if __debug__ and _nextop_fn:
1187
          _nextop_fn()
1188
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1189

    
1190
      op = opctx.op
1191

    
1192
      # Consistency check
1193
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1194
                                     constants.OP_STATUS_CANCELING)
1195
                        for i in job.ops[opctx.index + 1:])
1196

    
1197
      assert op.status in (constants.OP_STATUS_QUEUED,
1198
                           constants.OP_STATUS_WAITING,
1199
                           constants.OP_STATUS_CANCELING)
1200

    
1201
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1202
              op.priority >= constants.OP_PRIO_HIGHEST)
1203

    
1204
      waitjob = None
1205

    
1206
      if op.status != constants.OP_STATUS_CANCELING:
1207
        assert op.status in (constants.OP_STATUS_QUEUED,
1208
                             constants.OP_STATUS_WAITING)
1209

    
1210
        # Prepare to start opcode
1211
        if self._MarkWaitlock(job, op):
1212
          # Write to disk
1213
          queue.UpdateJobUnlocked(job)
1214

    
1215
        assert op.status == constants.OP_STATUS_WAITING
1216
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1217
        assert job.start_timestamp and op.start_timestamp
1218
        assert waitjob is None
1219

    
1220
        # Check if waiting for a job is necessary
1221
        waitjob = self._CheckDependencies(queue, job, opctx)
1222

    
1223
        assert op.status in (constants.OP_STATUS_WAITING,
1224
                             constants.OP_STATUS_CANCELING,
1225
                             constants.OP_STATUS_ERROR)
1226

    
1227
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1228
                                         constants.OP_STATUS_ERROR)):
1229
          logging.info("%s: opcode %s waiting for locks",
1230
                       opctx.log_prefix, opctx.summary)
1231

    
1232
          assert not opctx.jobdeps, "Not all dependencies were removed"
1233

    
1234
          queue.release()
1235
          try:
1236
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1237
          finally:
1238
            queue.acquire(shared=1)
1239

    
1240
          op.status = op_status
1241
          op.result = op_result
1242

    
1243
          assert not waitjob
1244

    
1245
        if op.status in (constants.OP_STATUS_WAITING,
1246
                         constants.OP_STATUS_QUEUED):
1247
          # waiting: Couldn't get locks in time
1248
          # queued: Queue is shutting down
1249
          assert not op.end_timestamp
1250
        else:
1251
          # Finalize opcode
1252
          op.end_timestamp = TimeStampNow()
1253

    
1254
          if op.status == constants.OP_STATUS_CANCELING:
1255
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1256
                                  for i in job.ops[opctx.index:])
1257
          else:
1258
            assert op.status in constants.OPS_FINALIZED
1259

    
1260
      if op.status == constants.OP_STATUS_QUEUED:
1261
        # Queue is shutting down
1262
        assert not waitjob
1263

    
1264
        finalize = False
1265

    
1266
        # Reset context
1267
        job.cur_opctx = None
1268

    
1269
        # In no case must the status be finalized here
1270
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1271

    
1272
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1273
        finalize = False
1274

    
1275
        if not waitjob and opctx.CheckPriorityIncrease():
1276
          # Priority was changed, need to update on-disk file
1277
          queue.UpdateJobUnlocked(job)
1278

    
1279
        # Keep around for another round
1280
        job.cur_opctx = opctx
1281

    
1282
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1283
                op.priority >= constants.OP_PRIO_HIGHEST)
1284

    
1285
        # In no case must the status be finalized here
1286
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1287

    
1288
      else:
1289
        # Ensure all opcodes so far have been successful
1290
        assert (opctx.index == 0 or
1291
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1292
                           for i in job.ops[:opctx.index]))
1293

    
1294
        # Reset context
1295
        job.cur_opctx = None
1296

    
1297
        if op.status == constants.OP_STATUS_SUCCESS:
1298
          finalize = False
1299

    
1300
        elif op.status == constants.OP_STATUS_ERROR:
1301
          # Ensure failed opcode has an exception as its result
1302
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1303

    
1304
          to_encode = errors.OpExecError("Preceding opcode failed")
1305
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1306
                                _EncodeOpError(to_encode))
1307
          finalize = True
1308

    
1309
          # Consistency check
1310
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1311
                            errors.GetEncodedError(i.result)
1312
                            for i in job.ops[opctx.index:])
1313

    
1314
        elif op.status == constants.OP_STATUS_CANCELING:
1315
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1316
                                "Job canceled by request")
1317
          finalize = True
1318

    
1319
        else:
1320
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1321

    
1322
        if opctx.index == (opcount - 1):
1323
          # Finalize on last opcode
1324
          finalize = True
1325

    
1326
        if finalize:
1327
          # All opcodes have been run, finalize job
1328
          job.Finalize()
1329

    
1330
        # Write to disk. If the job status is final, this is the final write
1331
        # allowed. Once the file has been written, it can be archived anytime.
1332
        queue.UpdateJobUnlocked(job)
1333

    
1334
        assert not waitjob
1335

    
1336
        if finalize:
1337
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1338
          return self.FINISHED
1339

    
1340
      assert not waitjob or queue.depmgr.JobWaiting(job)
1341

    
1342
      if waitjob:
1343
        return self.WAITDEP
1344
      else:
1345
        return self.DEFER
1346
    finally:
1347
      assert job.writable, "Job became read-only while being processed"
1348
      queue.release()
1349

    
1350

    
1351
def _EvaluateJobProcessorResult(depmgr, job, result):
1352
  """Looks at a result from L{_JobProcessor} for a job.
1353

1354
  To be used in a L{_JobQueueWorker}.
1355

1356
  """
1357
  if result == _JobProcessor.FINISHED:
1358
    # Notify waiting jobs
1359
    depmgr.NotifyWaiters(job.id)
1360

    
1361
  elif result == _JobProcessor.DEFER:
1362
    # Schedule again
1363
    raise workerpool.DeferTask(priority=job.CalcPriority())
1364

    
1365
  elif result == _JobProcessor.WAITDEP:
1366
    # No-op, dependency manager will re-schedule
1367
    pass
1368

    
1369
  else:
1370
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1371
                                 (result, ))
1372

    
1373

    
1374
class _JobQueueWorker(workerpool.BaseWorker):
1375
  """The actual job workers.
1376

1377
  """
1378
  def RunTask(self, job): # pylint: disable=W0221
1379
    """Job executor.
1380

1381
    @type job: L{_QueuedJob}
1382
    @param job: the job to be processed
1383

1384
    """
1385
    assert job.writable, "Expected writable job"
1386

    
1387
    # Ensure only one worker is active on a single job. If a job registers for
1388
    # a dependency job, and the other job notifies before the first worker is
1389
    # done, the job can end up in the tasklist more than once.
1390
    job.processor_lock.acquire()
1391
    try:
1392
      return self._RunTaskInner(job)
1393
    finally:
1394
      job.processor_lock.release()
1395

    
1396
  def _RunTaskInner(self, job):
1397
    """Executes a job.
1398

1399
    Must be called with per-job lock acquired.
1400

1401
    """
1402
    queue = job.queue
1403
    assert queue == self.pool.queue
1404

    
1405
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1406
    setname_fn(None)
1407

    
1408
    proc = mcpu.Processor(queue.context, job.id)
1409

    
1410
    # Create wrapper for setting thread name
1411
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1412
                                    proc.ExecOpCode)
1413

    
1414
    _EvaluateJobProcessorResult(queue.depmgr, job,
1415
                                _JobProcessor(queue, wrap_execop_fn, job)())
1416

    
1417
  @staticmethod
1418
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1419
    """Updates the worker thread name to include a short summary of the opcode.
1420

1421
    @param setname_fn: Callable setting worker thread name
1422
    @param execop_fn: Callable for executing opcode (usually
1423
                      L{mcpu.Processor.ExecOpCode})
1424

1425
    """
1426
    setname_fn(op)
1427
    try:
1428
      return execop_fn(op, *args, **kwargs)
1429
    finally:
1430
      setname_fn(None)
1431

    
1432
  @staticmethod
1433
  def _GetWorkerName(job, op):
1434
    """Sets the worker thread name.
1435

1436
    @type job: L{_QueuedJob}
1437
    @type op: L{opcodes.OpCode}
1438

1439
    """
1440
    parts = ["Job%s" % job.id]
1441

    
1442
    if op:
1443
      parts.append(op.TinySummary())
1444

    
1445
    return "/".join(parts)
1446

    
1447

    
1448
class _JobQueueWorkerPool(workerpool.WorkerPool):
1449
  """Simple class implementing a job-processing workerpool.
1450

1451
  """
1452
  def __init__(self, queue):
1453
    super(_JobQueueWorkerPool, self).__init__("Jq",
1454
                                              JOBQUEUE_THREADS,
1455
                                              _JobQueueWorker)
1456
    self.queue = queue
1457

    
1458

    
1459
class _JobDependencyManager:
1460
  """Keeps track of job dependencies.
1461

1462
  """
1463
  (WAIT,
1464
   ERROR,
1465
   CANCEL,
1466
   CONTINUE,
1467
   WRONGSTATUS) = range(1, 6)
1468

    
1469
  def __init__(self, getstatus_fn, enqueue_fn):
1470
    """Initializes this class.
1471

1472
    """
1473
    self._getstatus_fn = getstatus_fn
1474
    self._enqueue_fn = enqueue_fn
1475

    
1476
    self._waiters = {}
1477
    self._lock = locking.SharedLock("JobDepMgr")
1478

    
1479
  @locking.ssynchronized(_LOCK, shared=1)
1480
  def GetLockInfo(self, requested): # pylint: disable=W0613
1481
    """Retrieves information about waiting jobs.
1482

1483
    @type requested: set
1484
    @param requested: Requested information, see C{query.LQ_*}
1485

1486
    """
1487
    # No need to sort here, that's being done by the lock manager and query
1488
    # library. There are no priorities for notifying jobs, hence all show up as
1489
    # one item under "pending".
1490
    return [("job/%s" % job_id, None, None,
1491
             [("job", [job.id for job in waiters])])
1492
            for job_id, waiters in self._waiters.items()
1493
            if waiters]
1494

    
1495
  @locking.ssynchronized(_LOCK, shared=1)
1496
  def JobWaiting(self, job):
1497
    """Checks if a job is waiting.
1498

1499
    """
1500
    return compat.any(job in jobs
1501
                      for jobs in self._waiters.values())
1502

    
1503
  @locking.ssynchronized(_LOCK)
1504
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1505
    """Checks if a dependency job has the requested status.
1506

1507
    If the other job is not yet in a finalized status, the calling job will be
1508
    notified (re-added to the workerpool) at a later point.
1509

1510
    @type job: L{_QueuedJob}
1511
    @param job: Job object
1512
    @type dep_job_id: int
1513
    @param dep_job_id: ID of dependency job
1514
    @type dep_status: list
1515
    @param dep_status: Required status
1516

1517
    """
1518
    assert ht.TJobId(job.id)
1519
    assert ht.TJobId(dep_job_id)
1520
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1521

    
1522
    if job.id == dep_job_id:
1523
      return (self.ERROR, "Job can't depend on itself")
1524

    
1525
    # Get status of dependency job
1526
    try:
1527
      status = self._getstatus_fn(dep_job_id)
1528
    except errors.JobLost, err:
1529
      return (self.ERROR, "Dependency error: %s" % err)
1530

    
1531
    assert status in constants.JOB_STATUS_ALL
1532

    
1533
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1534

    
1535
    if status not in constants.JOBS_FINALIZED:
1536
      # Register for notification and wait for job to finish
1537
      job_id_waiters.add(job)
1538
      return (self.WAIT,
1539
              "Need to wait for job %s, wanted status '%s'" %
1540
              (dep_job_id, dep_status))
1541

    
1542
    # Remove from waiters list
1543
    if job in job_id_waiters:
1544
      job_id_waiters.remove(job)
1545

    
1546
    if (status == constants.JOB_STATUS_CANCELED and
1547
        constants.JOB_STATUS_CANCELED not in dep_status):
1548
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1549

    
1550
    elif not dep_status or status in dep_status:
1551
      return (self.CONTINUE,
1552
              "Dependency job %s finished with status '%s'" %
1553
              (dep_job_id, status))
1554

    
1555
    else:
1556
      return (self.WRONGSTATUS,
1557
              "Dependency job %s finished with status '%s',"
1558
              " not one of '%s' as required" %
1559
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1560

    
1561
  def _RemoveEmptyWaitersUnlocked(self):
1562
    """Remove all jobs without actual waiters.
1563

1564
    """
1565
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1566
                   if not waiters]:
1567
      del self._waiters[job_id]
1568

    
1569
  def NotifyWaiters(self, job_id):
1570
    """Notifies all jobs waiting for a certain job ID.
1571

1572
    @attention: Do not call until L{CheckAndRegister} returned a status other
1573
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1574
    @type job_id: int
1575
    @param job_id: Job ID
1576

1577
    """
1578
    assert ht.TJobId(job_id)
1579

    
1580
    self._lock.acquire()
1581
    try:
1582
      self._RemoveEmptyWaitersUnlocked()
1583

    
1584
      jobs = self._waiters.pop(job_id, None)
1585
    finally:
1586
      self._lock.release()
1587

    
1588
    if jobs:
1589
      # Re-add jobs to workerpool
1590
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1591
                    len(jobs), job_id)
1592
      self._enqueue_fn(jobs)
1593

    
1594

    
1595
def _RequireOpenQueue(fn):
1596
  """Decorator for "public" functions.
1597

1598
  This function should be used for all 'public' functions. That is,
1599
  functions usually called from other classes. Note that this should
1600
  be applied only to methods (not plain functions), since it expects
1601
  that the decorated function is called with a first argument that has
1602
  a '_queue_filelock' argument.
1603

1604
  @warning: Use this decorator only after locking.ssynchronized
1605

1606
  Example::
1607
    @locking.ssynchronized(_LOCK)
1608
    @_RequireOpenQueue
1609
    def Example(self):
1610
      pass
1611

1612
  """
1613
  def wrapper(self, *args, **kwargs):
1614
    # pylint: disable=W0212
1615
    assert self._queue_filelock is not None, "Queue should be open"
1616
    return fn(self, *args, **kwargs)
1617
  return wrapper
1618

    
1619

    
1620
def _RequireNonDrainedQueue(fn):
1621
  """Decorator checking for a non-drained queue.
1622

1623
  To be used with functions submitting new jobs.
1624

1625
  """
1626
  def wrapper(self, *args, **kwargs):
1627
    """Wrapper function.
1628

1629
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1630

1631
    """
1632
    # Ok when sharing the big job queue lock, as the drain file is created when
1633
    # the lock is exclusive.
1634
    # Needs access to protected member, pylint: disable=W0212
1635
    if self._drained:
1636
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1637

    
1638
    if not self._accepting_jobs:
1639
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1640

    
1641
    return fn(self, *args, **kwargs)
1642
  return wrapper
1643

    
1644

    
1645
class JobQueue(object):
1646
  """Queue used to manage the jobs.
1647

1648
  """
1649
  def __init__(self, context):
1650
    """Constructor for JobQueue.
1651

1652
    The constructor will initialize the job queue object and then
1653
    start loading the current jobs from disk, either for starting them
1654
    (if they were queue) or for aborting them (if they were already
1655
    running).
1656

1657
    @type context: GanetiContext
1658
    @param context: the context object for access to the configuration
1659
        data and other ganeti objects
1660

1661
    """
1662
    self.context = context
1663
    self._memcache = weakref.WeakValueDictionary()
1664
    self._my_hostname = netutils.Hostname.GetSysName()
1665

    
1666
    # The Big JobQueue lock. If a code block or method acquires it in shared
1667
    # mode safe it must guarantee concurrency with all the code acquiring it in
1668
    # shared mode, including itself. In order not to acquire it at all
1669
    # concurrency must be guaranteed with all code acquiring it in shared mode
1670
    # and all code acquiring it exclusively.
1671
    self._lock = locking.SharedLock("JobQueue")
1672

    
1673
    self.acquire = self._lock.acquire
1674
    self.release = self._lock.release
1675

    
1676
    # Accept jobs by default
1677
    self._accepting_jobs = True
1678

    
1679
    # Initialize the queue, and acquire the filelock.
1680
    # This ensures no other process is working on the job queue.
1681
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1682

    
1683
    # Read serial file
1684
    self._last_serial = jstore.ReadSerial()
1685
    assert self._last_serial is not None, ("Serial file was modified between"
1686
                                           " check in jstore and here")
1687

    
1688
    # Get initial list of nodes
1689
    self._nodes = dict((n.name, n.primary_ip)
1690
                       for n in self.context.cfg.GetAllNodesInfo().values()
1691
                       if n.master_candidate)
1692

    
1693
    # Remove master node
1694
    self._nodes.pop(self._my_hostname, None)
1695

    
1696
    # TODO: Check consistency across nodes
1697

    
1698
    self._queue_size = None
1699
    self._UpdateQueueSizeUnlocked()
1700
    assert ht.TInt(self._queue_size)
1701
    self._drained = jstore.CheckDrainFlag()
1702

    
1703
    # Job dependencies
1704
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1705
                                        self._EnqueueJobs)
1706
    self.context.glm.AddToLockMonitor(self.depmgr)
1707

    
1708
    # Setup worker pool
1709
    self._wpool = _JobQueueWorkerPool(self)
1710

    
1711
  def _PickupJobUnlocked(self, job_id):
1712
    """Load a job from the job queue
1713

1714
    Pick up a job that already is in the job queue and start/resume it.
1715

1716
    """
1717
    job = self._LoadJobUnlocked(job_id)
1718

    
1719
    if job is None:
1720
      logging.warning("Job %s could not be read", job_id)
1721
      return
1722

    
1723
    status = job.CalcStatus()
1724
    if status == constants.JOB_STATUS_QUEUED:
1725
      self._EnqueueJobsUnlocked([job])
1726
      logging.info("Restarting job %s", job.id)
1727

    
1728
    elif status in (constants.JOB_STATUS_RUNNING,
1729
                    constants.JOB_STATUS_WAITING,
1730
                    constants.JOB_STATUS_CANCELING):
1731
      logging.warning("Unfinished job %s found: %s", job.id, job)
1732

    
1733
      if status == constants.JOB_STATUS_WAITING:
1734
        job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1735
        self._EnqueueJobsUnlocked([job])
1736
        logging.info("Restarting job %s", job.id)
1737
      else:
1738
        to_encode = errors.OpExecError("Unclean master daemon shutdown")
1739
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1740
                              _EncodeOpError(to_encode))
1741
        job.Finalize()
1742

    
1743
    self.UpdateJobUnlocked(job)
1744

    
1745
  @locking.ssynchronized(_LOCK)
1746
  def PickupJob(self, job_id):
1747
    self._PickupJobUnlocked(job_id)
1748

    
1749
  def _GetRpc(self, address_list):
1750
    """Gets RPC runner with context.
1751

1752
    """
1753
    return rpc.JobQueueRunner(self.context, address_list)
1754

    
1755
  @locking.ssynchronized(_LOCK)
1756
  @_RequireOpenQueue
1757
  def AddNode(self, node):
1758
    """Register a new node with the queue.
1759

1760
    @type node: L{objects.Node}
1761
    @param node: the node object to be added
1762

1763
    """
1764
    node_name = node.name
1765
    assert node_name != self._my_hostname
1766

    
1767
    # Clean queue directory on added node
1768
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1769
    msg = result.fail_msg
1770
    if msg:
1771
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1772
                      node_name, msg)
1773

    
1774
    if not node.master_candidate:
1775
      # remove if existing, ignoring errors
1776
      self._nodes.pop(node_name, None)
1777
      # and skip the replication of the job ids
1778
      return
1779

    
1780
    # Upload the whole queue excluding archived jobs
1781
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1782

    
1783
    # Upload current serial file
1784
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1785

    
1786
    # Static address list
1787
    addrs = [node.primary_ip]
1788

    
1789
    for file_name in files:
1790
      # Read file content
1791
      content = utils.ReadFile(file_name)
1792

    
1793
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1794
                             file_name, content)
1795
      msg = result[node_name].fail_msg
1796
      if msg:
1797
        logging.error("Failed to upload file %s to node %s: %s",
1798
                      file_name, node_name, msg)
1799

    
1800
    # Set queue drained flag
1801
    result = \
1802
      self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1803
                                                       self._drained)
1804
    msg = result[node_name].fail_msg
1805
    if msg:
1806
      logging.error("Failed to set queue drained flag on node %s: %s",
1807
                    node_name, msg)
1808

    
1809
    self._nodes[node_name] = node.primary_ip
1810

    
1811
  @locking.ssynchronized(_LOCK)
1812
  @_RequireOpenQueue
1813
  def RemoveNode(self, node_name):
1814
    """Callback called when removing nodes from the cluster.
1815

1816
    @type node_name: str
1817
    @param node_name: the name of the node to remove
1818

1819
    """
1820
    self._nodes.pop(node_name, None)
1821

    
1822
  @staticmethod
1823
  def _CheckRpcResult(result, nodes, failmsg):
1824
    """Verifies the status of an RPC call.
1825

1826
    Since we aim to keep consistency should this node (the current
1827
    master) fail, we will log errors if our rpc fail, and especially
1828
    log the case when more than half of the nodes fails.
1829

1830
    @param result: the data as returned from the rpc call
1831
    @type nodes: list
1832
    @param nodes: the list of nodes we made the call to
1833
    @type failmsg: str
1834
    @param failmsg: the identifier to be used for logging
1835

1836
    """
1837
    failed = []
1838
    success = []
1839

    
1840
    for node in nodes:
1841
      msg = result[node].fail_msg
1842
      if msg:
1843
        failed.append(node)
1844
        logging.error("RPC call %s (%s) failed on node %s: %s",
1845
                      result[node].call, failmsg, node, msg)
1846
      else:
1847
        success.append(node)
1848

    
1849
    # +1 for the master node
1850
    if (len(success) + 1) < len(failed):
1851
      # TODO: Handle failing nodes
1852
      logging.error("More than half of the nodes failed")
1853

    
1854
  def _GetNodeIp(self):
1855
    """Helper for returning the node name/ip list.
1856

1857
    @rtype: (list, list)
1858
    @return: a tuple of two lists, the first one with the node
1859
        names and the second one with the node addresses
1860

1861
    """
1862
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1863
    name_list = self._nodes.keys()
1864
    addr_list = [self._nodes[name] for name in name_list]
1865
    return name_list, addr_list
1866

    
1867
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1868
    """Writes a file locally and then replicates it to all nodes.
1869

1870
    This function will replace the contents of a file on the local
1871
    node and then replicate it to all the other nodes we have.
1872

1873
    @type file_name: str
1874
    @param file_name: the path of the file to be replicated
1875
    @type data: str
1876
    @param data: the new contents of the file
1877
    @type replicate: boolean
1878
    @param replicate: whether to spread the changes to the remote nodes
1879

1880
    """
1881
    getents = runtime.GetEnts()
1882
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1883
                    gid=getents.daemons_gid,
1884
                    mode=constants.JOB_QUEUE_FILES_PERMS)
1885

    
1886
    if replicate:
1887
      names, addrs = self._GetNodeIp()
1888
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1889
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1890

    
1891
  def _RenameFilesUnlocked(self, rename):
1892
    """Renames a file locally and then replicate the change.
1893

1894
    This function will rename a file in the local queue directory
1895
    and then replicate this rename to all the other nodes we have.
1896

1897
    @type rename: list of (old, new)
1898
    @param rename: List containing tuples mapping old to new names
1899

1900
    """
1901
    # Rename them locally
1902
    for old, new in rename:
1903
      utils.RenameFile(old, new, mkdir=True)
1904

    
1905
    # ... and on all nodes
1906
    names, addrs = self._GetNodeIp()
1907
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1908
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1909

    
1910
  @staticmethod
1911
  def _GetJobPath(job_id):
1912
    """Returns the job file for a given job id.
1913

1914
    @type job_id: str
1915
    @param job_id: the job identifier
1916
    @rtype: str
1917
    @return: the path to the job file
1918

1919
    """
1920
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1921

    
1922
  @staticmethod
1923
  def _GetArchivedJobPath(job_id):
1924
    """Returns the archived job file for a give job id.
1925

1926
    @type job_id: str
1927
    @param job_id: the job identifier
1928
    @rtype: str
1929
    @return: the path to the archived job file
1930

1931
    """
1932
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1933
                          jstore.GetArchiveDirectory(job_id),
1934
                          "job-%s" % job_id)
1935

    
1936
  @staticmethod
1937
  def _DetermineJobDirectories(archived):
1938
    """Build list of directories containing job files.
1939

1940
    @type archived: bool
1941
    @param archived: Whether to include directories for archived jobs
1942
    @rtype: list
1943

1944
    """
1945
    result = [pathutils.QUEUE_DIR]
1946

    
1947
    if archived:
1948
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1949
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
1950
                        utils.ListVisibleFiles(archive_path)))
1951

    
1952
    return result
1953

    
1954
  @classmethod
1955
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1956
    """Return all known job IDs.
1957

1958
    The method only looks at disk because it's a requirement that all
1959
    jobs are present on disk (so in the _memcache we don't have any
1960
    extra IDs).
1961

1962
    @type sort: boolean
1963
    @param sort: perform sorting on the returned job ids
1964
    @rtype: list
1965
    @return: the list of job IDs
1966

1967
    """
1968
    jlist = []
1969

    
1970
    for path in cls._DetermineJobDirectories(archived):
1971
      for filename in utils.ListVisibleFiles(path):
1972
        m = constants.JOB_FILE_RE.match(filename)
1973
        if m:
1974
          jlist.append(int(m.group(1)))
1975

    
1976
    if sort:
1977
      jlist.sort()
1978
    return jlist
1979

    
1980
  def _LoadJobUnlocked(self, job_id):
1981
    """Loads a job from the disk or memory.
1982

1983
    Given a job id, this will return the cached job object if
1984
    existing, or try to load the job from the disk. If loading from
1985
    disk, it will also add the job to the cache.
1986

1987
    @type job_id: int
1988
    @param job_id: the job id
1989
    @rtype: L{_QueuedJob} or None
1990
    @return: either None or the job object
1991

1992
    """
1993
    assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!"
1994

    
1995
    job = self._memcache.get(job_id, None)
1996
    if job:
1997
      logging.debug("Found job %s in memcache", job_id)
1998
      assert job.writable, "Found read-only job in memcache"
1999
      return job
2000

    
2001
    try:
2002
      job = self._LoadJobFromDisk(job_id, False)
2003
      if job is None:
2004
        return job
2005
    except errors.JobFileCorrupted:
2006
      old_path = self._GetJobPath(job_id)
2007
      new_path = self._GetArchivedJobPath(job_id)
2008
      if old_path == new_path:
2009
        # job already archived (future case)
2010
        logging.exception("Can't parse job %s", job_id)
2011
      else:
2012
        # non-archived case
2013
        logging.exception("Can't parse job %s, will archive.", job_id)
2014
        self._RenameFilesUnlocked([(old_path, new_path)])
2015
      return None
2016

    
2017
    assert job.writable, "Job just loaded is not writable"
2018

    
2019
    self._memcache[job_id] = job
2020
    logging.debug("Added job %s to the cache", job_id)
2021
    return job
2022

    
2023
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2024
    """Load the given job file from disk.
2025

2026
    Given a job file, read, load and restore it in a _QueuedJob format.
2027

2028
    @type job_id: int
2029
    @param job_id: job identifier
2030
    @type try_archived: bool
2031
    @param try_archived: Whether to try loading an archived job
2032
    @rtype: L{_QueuedJob} or None
2033
    @return: either None or the job object
2034

2035
    """
2036
    path_functions = [(self._GetJobPath, False)]
2037

    
2038
    if try_archived:
2039
      path_functions.append((self._GetArchivedJobPath, True))
2040

    
2041
    raw_data = None
2042
    archived = None
2043

    
2044
    for (fn, archived) in path_functions:
2045
      filepath = fn(job_id)
2046
      logging.debug("Loading job from %s", filepath)
2047
      try:
2048
        raw_data = utils.ReadFile(filepath)
2049
      except EnvironmentError, err:
2050
        if err.errno != errno.ENOENT:
2051
          raise
2052
      else:
2053
        break
2054

    
2055
    if not raw_data:
2056
      return None
2057

    
2058
    if writable is None:
2059
      writable = not archived
2060

    
2061
    try:
2062
      data = serializer.LoadJson(raw_data)
2063
      job = _QueuedJob.Restore(self, data, writable, archived)
2064
    except Exception, err: # pylint: disable=W0703
2065
      raise errors.JobFileCorrupted(err)
2066

    
2067
    return job
2068

    
2069
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2070
    """Load the given job file from disk.
2071

2072
    Given a job file, read, load and restore it in a _QueuedJob format.
2073
    In case of error reading the job, it gets returned as None, and the
2074
    exception is logged.
2075

2076
    @type job_id: int
2077
    @param job_id: job identifier
2078
    @type try_archived: bool
2079
    @param try_archived: Whether to try loading an archived job
2080
    @rtype: L{_QueuedJob} or None
2081
    @return: either None or the job object
2082

2083
    """
2084
    try:
2085
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2086
    except (errors.JobFileCorrupted, EnvironmentError):
2087
      logging.exception("Can't load/parse job %s", job_id)
2088
      return None
2089

    
2090
  def _UpdateQueueSizeUnlocked(self):
2091
    """Update the queue size.
2092

2093
    """
2094
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2095

    
2096
  @locking.ssynchronized(_LOCK)
2097
  @_RequireOpenQueue
2098
  def SetDrainFlag(self, drain_flag):
2099
    """Sets the drain flag for the queue.
2100

2101
    @type drain_flag: boolean
2102
    @param drain_flag: Whether to set or unset the drain flag
2103

2104
    """
2105
    # Change flag locally
2106
    jstore.SetDrainFlag(drain_flag)
2107

    
2108
    self._drained = drain_flag
2109

    
2110
    # ... and on all nodes
2111
    (names, addrs) = self._GetNodeIp()
2112
    result = \
2113
      self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2114
    self._CheckRpcResult(result, self._nodes,
2115
                         "Setting queue drain flag to %s" % drain_flag)
2116

    
2117
    return True
2118

    
2119
  @classmethod
2120
  def SubmitJob(cls, ops):
2121
    """Create and store a new job.
2122

2123
    """
2124
    return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitJob(ops)
2125

    
2126
  @classmethod
2127
  def SubmitJobToDrainedQueue(cls, ops):
2128
    """Forcefully create and store a new job.
2129

2130
    Do so, even if the job queue is drained.
2131

2132
    """
2133
    return luxi.Client(address=pathutils.QUERY_SOCKET)\
2134
        .SubmitJobToDrainedQueue(ops)
2135

    
2136
  @classmethod
2137
  def SubmitManyJobs(cls, jobs):
2138
    """Create and store multiple jobs.
2139

2140
    """
2141
    return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitManyJobs(jobs)
2142

    
2143
  @staticmethod
2144
  def _FormatSubmitError(msg, ops):
2145
    """Formats errors which occurred while submitting a job.
2146

2147
    """
2148
    return ("%s; opcodes %s" %
2149
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2150

    
2151
  @staticmethod
2152
  def _ResolveJobDependencies(resolve_fn, deps):
2153
    """Resolves relative job IDs in dependencies.
2154

2155
    @type resolve_fn: callable
2156
    @param resolve_fn: Function to resolve a relative job ID
2157
    @type deps: list
2158
    @param deps: Dependencies
2159
    @rtype: tuple; (boolean, string or list)
2160
    @return: If successful (first tuple item), the returned list contains
2161
      resolved job IDs along with the requested status; if not successful,
2162
      the second element is an error message
2163

2164
    """
2165
    result = []
2166

    
2167
    for (dep_job_id, dep_status) in deps:
2168
      if ht.TRelativeJobId(dep_job_id):
2169
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2170
        try:
2171
          job_id = resolve_fn(dep_job_id)
2172
        except IndexError:
2173
          # Abort
2174
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2175
      else:
2176
        job_id = dep_job_id
2177

    
2178
      result.append((job_id, dep_status))
2179

    
2180
    return (True, result)
2181

    
2182
  @locking.ssynchronized(_LOCK)
2183
  def _EnqueueJobs(self, jobs):
2184
    """Helper function to add jobs to worker pool's queue.
2185

2186
    @type jobs: list
2187
    @param jobs: List of all jobs
2188

2189
    """
2190
    return self._EnqueueJobsUnlocked(jobs)
2191

    
2192
  def _EnqueueJobsUnlocked(self, jobs):
2193
    """Helper function to add jobs to worker pool's queue.
2194

2195
    @type jobs: list
2196
    @param jobs: List of all jobs
2197

2198
    """
2199
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2200
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2201
                             priority=[job.CalcPriority() for job in jobs],
2202
                             task_id=map(_GetIdAttr, jobs))
2203

    
2204
  def _GetJobStatusForDependencies(self, job_id):
2205
    """Gets the status of a job for dependencies.
2206

2207
    @type job_id: int
2208
    @param job_id: Job ID
2209
    @raise errors.JobLost: If job can't be found
2210

2211
    """
2212
    # Not using in-memory cache as doing so would require an exclusive lock
2213

    
2214
    # Try to load from disk
2215
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2216

    
2217
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2218

    
2219
    if job:
2220
      return job.CalcStatus()
2221

    
2222
    raise errors.JobLost("Job %s not found" % job_id)
2223

    
2224
  @_RequireOpenQueue
2225
  def UpdateJobUnlocked(self, job, replicate=True):
2226
    """Update a job's on disk storage.
2227

2228
    After a job has been modified, this function needs to be called in
2229
    order to write the changes to disk and replicate them to the other
2230
    nodes.
2231

2232
    @type job: L{_QueuedJob}
2233
    @param job: the changed job
2234
    @type replicate: boolean
2235
    @param replicate: whether to replicate the change to remote nodes
2236

2237
    """
2238
    if __debug__:
2239
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2240
      assert (finalized ^ (job.end_timestamp is None))
2241
      assert job.writable, "Can't update read-only job"
2242
      assert not job.archived, "Can't update archived job"
2243

    
2244
    filename = self._GetJobPath(job.id)
2245
    data = serializer.DumpJson(job.Serialize())
2246
    logging.debug("Writing job %s to %s", job.id, filename)
2247
    self._UpdateJobQueueFile(filename, data, replicate)
2248

    
2249
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2250
                        timeout):
2251
    """Waits for changes in a job.
2252

2253
    @type job_id: int
2254
    @param job_id: Job identifier
2255
    @type fields: list of strings
2256
    @param fields: Which fields to check for changes
2257
    @type prev_job_info: list or None
2258
    @param prev_job_info: Last job information returned
2259
    @type prev_log_serial: int
2260
    @param prev_log_serial: Last job message serial number
2261
    @type timeout: float
2262
    @param timeout: maximum time to wait in seconds
2263
    @rtype: tuple (job info, log entries)
2264
    @return: a tuple of the job information as required via
2265
        the fields parameter, and the log entries as a list
2266

2267
        if the job has not changed and the timeout has expired,
2268
        we instead return a special value,
2269
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2270
        as such by the clients
2271

2272
    """
2273
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2274
                             writable=False)
2275

    
2276
    helper = _WaitForJobChangesHelper()
2277

    
2278
    return helper(self._GetJobPath(job_id), load_fn,
2279
                  fields, prev_job_info, prev_log_serial, timeout)
2280

    
2281
  @locking.ssynchronized(_LOCK)
2282
  @_RequireOpenQueue
2283
  def CancelJob(self, job_id):
2284
    """Cancels a job.
2285

2286
    This will only succeed if the job has not started yet.
2287

2288
    @type job_id: int
2289
    @param job_id: job ID of job to be cancelled.
2290

2291
    """
2292
    logging.info("Cancelling job %s", job_id)
2293

    
2294
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2295

    
2296
  @locking.ssynchronized(_LOCK)
2297
  @_RequireOpenQueue
2298
  def ChangeJobPriority(self, job_id, priority):
2299
    """Changes a job's priority.
2300

2301
    @type job_id: int
2302
    @param job_id: ID of the job whose priority should be changed
2303
    @type priority: int
2304
    @param priority: New priority
2305

2306
    """
2307
    logging.info("Changing priority of job %s to %s", job_id, priority)
2308

    
2309
    if priority not in constants.OP_PRIO_SUBMIT_VALID:
2310
      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2311
      raise errors.GenericError("Invalid priority %s, allowed are %s" %
2312
                                (priority, allowed))
2313

    
2314
    def fn(job):
2315
      (success, msg) = job.ChangePriority(priority)
2316

    
2317
      if success:
2318
        try:
2319
          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2320
        except workerpool.NoSuchTask:
2321
          logging.debug("Job %s is not in workerpool at this time", job.id)
2322

    
2323
      return (success, msg)
2324

    
2325
    return self._ModifyJobUnlocked(job_id, fn)
2326

    
2327
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2328
    """Modifies a job.
2329

2330
    @type job_id: int
2331
    @param job_id: Job ID
2332
    @type mod_fn: callable
2333
    @param mod_fn: Modifying function, receiving job object as parameter,
2334
      returning tuple of (status boolean, message string)
2335

2336
    """
2337
    job = self._LoadJobUnlocked(job_id)
2338
    if not job:
2339
      logging.debug("Job %s not found", job_id)
2340
      return (False, "Job %s not found" % job_id)
2341

    
2342
    assert job.writable, "Can't modify read-only job"
2343
    assert not job.archived, "Can't modify archived job"
2344

    
2345
    (success, msg) = mod_fn(job)
2346

    
2347
    if success:
2348
      # If the job was finalized (e.g. cancelled), this is the final write
2349
      # allowed. The job can be archived anytime.
2350
      self.UpdateJobUnlocked(job)
2351

    
2352
    return (success, msg)
2353

    
2354
  @_RequireOpenQueue
2355
  def _ArchiveJobsUnlocked(self, jobs):
2356
    """Archives jobs.
2357

2358
    @type jobs: list of L{_QueuedJob}
2359
    @param jobs: Job objects
2360
    @rtype: int
2361
    @return: Number of archived jobs
2362

2363
    """
2364
    archive_jobs = []
2365
    rename_files = []
2366
    for job in jobs:
2367
      assert job.writable, "Can't archive read-only job"
2368
      assert not job.archived, "Can't cancel archived job"
2369

    
2370
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2371
        logging.debug("Job %s is not yet done", job.id)
2372
        continue
2373

    
2374
      archive_jobs.append(job)
2375

    
2376
      old = self._GetJobPath(job.id)
2377
      new = self._GetArchivedJobPath(job.id)
2378
      rename_files.append((old, new))
2379

    
2380
    # TODO: What if 1..n files fail to rename?
2381
    self._RenameFilesUnlocked(rename_files)
2382

    
2383
    logging.debug("Successfully archived job(s) %s",
2384
                  utils.CommaJoin(job.id for job in archive_jobs))
2385

    
2386
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2387
    # the files, we update the cached queue size from the filesystem. When we
2388
    # get around to fix the TODO: above, we can use the number of actually
2389
    # archived jobs to fix this.
2390
    self._UpdateQueueSizeUnlocked()
2391
    return len(archive_jobs)
2392

    
2393
  @locking.ssynchronized(_LOCK)
2394
  @_RequireOpenQueue
2395
  def ArchiveJob(self, job_id):
2396
    """Archives a job.
2397

2398
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2399

2400
    @type job_id: int
2401
    @param job_id: Job ID of job to be archived.
2402
    @rtype: bool
2403
    @return: Whether job was archived
2404

2405
    """
2406
    logging.info("Archiving job %s", job_id)
2407

    
2408
    job = self._LoadJobUnlocked(job_id)
2409
    if not job:
2410
      logging.debug("Job %s not found", job_id)
2411
      return False
2412

    
2413
    return self._ArchiveJobsUnlocked([job]) == 1
2414

    
2415
  @locking.ssynchronized(_LOCK)
2416
  @_RequireOpenQueue
2417
  def AutoArchiveJobs(self, age, timeout):
2418
    """Archives all jobs based on age.
2419

2420
    The method will archive all jobs which are older than the age
2421
    parameter. For jobs that don't have an end timestamp, the start
2422
    timestamp will be considered. The special '-1' age will cause
2423
    archival of all jobs (that are not running or queued).
2424

2425
    @type age: int
2426
    @param age: the minimum age in seconds
2427

2428
    """
2429
    logging.info("Archiving jobs with age more than %s seconds", age)
2430

    
2431
    now = time.time()
2432
    end_time = now + timeout
2433
    archived_count = 0
2434
    last_touched = 0
2435

    
2436
    all_job_ids = self._GetJobIDsUnlocked()
2437
    pending = []
2438
    for idx, job_id in enumerate(all_job_ids):
2439
      last_touched = idx + 1
2440

    
2441
      # Not optimal because jobs could be pending
2442
      # TODO: Measure average duration for job archival and take number of
2443
      # pending jobs into account.
2444
      if time.time() > end_time:
2445
        break
2446

    
2447
      # Returns None if the job failed to load
2448
      job = self._LoadJobUnlocked(job_id)
2449
      if job:
2450
        if job.end_timestamp is None:
2451
          if job.start_timestamp is None:
2452
            job_age = job.received_timestamp
2453
          else:
2454
            job_age = job.start_timestamp
2455
        else:
2456
          job_age = job.end_timestamp
2457

    
2458
        if age == -1 or now - job_age[0] > age:
2459
          pending.append(job)
2460

    
2461
          # Archive 10 jobs at a time
2462
          if len(pending) >= 10:
2463
            archived_count += self._ArchiveJobsUnlocked(pending)
2464
            pending = []
2465

    
2466
    if pending:
2467
      archived_count += self._ArchiveJobsUnlocked(pending)
2468

    
2469
    return (archived_count, len(all_job_ids) - last_touched)
2470

    
2471
  def _Query(self, fields, qfilter):
2472
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2473
                       namefield="id")
2474

    
2475
    # Archived jobs are only looked at if the "archived" field is referenced
2476
    # either as a requested field or in the filter. By default archived jobs
2477
    # are ignored.
2478
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2479

    
2480
    job_ids = qobj.RequestedNames()
2481

    
2482
    list_all = (job_ids is None)
2483

    
2484
    if list_all:
2485
      # Since files are added to/removed from the queue atomically, there's no
2486
      # risk of getting the job ids in an inconsistent state.
2487
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2488

    
2489
    jobs = []
2490

    
2491
    for job_id in job_ids:
2492
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2493
      if job is not None or not list_all:
2494
        jobs.append((job_id, job))
2495

    
2496
    return (qobj, jobs, list_all)
2497

    
2498
  def QueryJobs(self, fields, qfilter):
2499
    """Returns a list of jobs in queue.
2500

2501
    @type fields: sequence
2502
    @param fields: List of wanted fields
2503
    @type qfilter: None or query2 filter (list)
2504
    @param qfilter: Query filter
2505

2506
    """
2507
    (qobj, ctx, _) = self._Query(fields, qfilter)
2508

    
2509
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2510

    
2511
  def OldStyleQueryJobs(self, job_ids, fields):
2512
    """Returns a list of jobs in queue.
2513

2514
    @type job_ids: list
2515
    @param job_ids: sequence of job identifiers or None for all
2516
    @type fields: list
2517
    @param fields: names of fields to return
2518
    @rtype: list
2519
    @return: list one element per job, each element being list with
2520
        the requested fields
2521

2522
    """
2523
    # backwards compat:
2524
    job_ids = [int(jid) for jid in job_ids]
2525
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2526

    
2527
    (qobj, ctx, _) = self._Query(fields, qfilter)
2528

    
2529
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2530

    
2531
  @locking.ssynchronized(_LOCK)
2532
  def PrepareShutdown(self):
2533
    """Prepare to stop the job queue.
2534

2535
    Disables execution of jobs in the workerpool and returns whether there are
2536
    any jobs currently running. If the latter is the case, the job queue is not
2537
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2538
    be called without interfering with any job. Queued and unfinished jobs will
2539
    be resumed next time.
2540

2541
    Once this function has been called no new job submissions will be accepted
2542
    (see L{_RequireNonDrainedQueue}).
2543

2544
    @rtype: bool
2545
    @return: Whether there are any running jobs
2546

2547
    """
2548
    if self._accepting_jobs:
2549
      self._accepting_jobs = False
2550

    
2551
      # Tell worker pool to stop processing pending tasks
2552
      self._wpool.SetActive(False)
2553

    
2554
    return self._wpool.HasRunningTasks()
2555

    
2556
  def AcceptingJobsUnlocked(self):
2557
    """Returns whether jobs are accepted.
2558

2559
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2560
    queue is shutting down.
2561

2562
    @rtype: bool
2563

2564
    """
2565
    return self._accepting_jobs
2566

    
2567
  @locking.ssynchronized(_LOCK)
2568
  @_RequireOpenQueue
2569
  def Shutdown(self):
2570
    """Stops the job queue.
2571

2572
    This shutdowns all the worker threads an closes the queue.
2573

2574
    """
2575
    self._wpool.TerminateWorkers()
2576

    
2577
    self._queue_filelock.Close()
2578
    self._queue_filelock = None