Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 912b2278

History | View | Annotate | Download (75.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import time
35
import weakref
36
import threading
37
import itertools
38
import operator
39

    
40
try:
41
  # pylint: disable=E0611
42
  from pyinotify import pyinotify
43
except ImportError:
44
  import pyinotify
45

    
46
from ganeti import asyncnotifier
47
from ganeti import constants
48
from ganeti import serializer
49
from ganeti import workerpool
50
from ganeti import locking
51
from ganeti import luxi
52
from ganeti import opcodes
53
from ganeti import opcodes_base
54
from ganeti import errors
55
from ganeti import mcpu
56
from ganeti import utils
57
from ganeti import jstore
58
import ganeti.rpc.node as rpc
59
from ganeti import runtime
60
from ganeti import netutils
61
from ganeti import compat
62
from ganeti import ht
63
from ganeti import query
64
from ganeti import qlang
65
from ganeti import pathutils
66
from ganeti import vcluster
67

    
68

    
69
JOBQUEUE_THREADS = 25
70

    
71
# member lock names to be passed to @ssynchronized decorator
72
_LOCK = "_lock"
73
_QUEUE = "_queue"
74

    
75
#: Retrieves "id" attribute
76
_GetIdAttr = operator.attrgetter("id")
77

    
78

    
79
class CancelJob(Exception):
80
  """Special exception to cancel a job.
81

82
  """
83

    
84

    
85
class QueueShutdown(Exception):
86
  """Special exception to abort a job when the job queue is shutting down.
87

88
  """
89

    
90

    
91
def TimeStampNow():
92
  """Returns the current timestamp.
93

94
  @rtype: tuple
95
  @return: the current time in the (seconds, microseconds) format
96

97
  """
98
  return utils.SplitTime(time.time())
99

    
100

    
101
def _CallJqUpdate(runner, names, file_name, content):
102
  """Updates job queue file after virtualizing filename.
103

104
  """
105
  virt_file_name = vcluster.MakeVirtualPath(file_name)
106
  return runner.call_jobqueue_update(names, virt_file_name, content)
107

    
108

    
109
class _SimpleJobQuery:
110
  """Wrapper for job queries.
111

112
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
113

114
  """
115
  def __init__(self, fields):
116
    """Initializes this class.
117

118
    """
119
    self._query = query.Query(query.JOB_FIELDS, fields)
120

    
121
  def __call__(self, job):
122
    """Executes a job query using cached field list.
123

124
    """
125
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
126

    
127

    
128
class _QueuedOpCode(object):
129
  """Encapsulates an opcode object.
130

131
  @ivar log: holds the execution log and consists of tuples
132
  of the form C{(log_serial, timestamp, level, message)}
133
  @ivar input: the OpCode we encapsulate
134
  @ivar status: the current status
135
  @ivar result: the result of the LU execution
136
  @ivar start_timestamp: timestamp for the start of the execution
137
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
138
  @ivar stop_timestamp: timestamp for the end of the execution
139

140
  """
141
  __slots__ = ["input", "status", "result", "log", "priority",
142
               "start_timestamp", "exec_timestamp", "end_timestamp",
143
               "__weakref__"]
144

    
145
  def __init__(self, op):
146
    """Initializes instances of this class.
147

148
    @type op: L{opcodes.OpCode}
149
    @param op: the opcode we encapsulate
150

151
    """
152
    self.input = op
153
    self.status = constants.OP_STATUS_QUEUED
154
    self.result = None
155
    self.log = []
156
    self.start_timestamp = None
157
    self.exec_timestamp = None
158
    self.end_timestamp = None
159

    
160
    # Get initial priority (it might change during the lifetime of this opcode)
161
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
162

    
163
  @classmethod
164
  def Restore(cls, state):
165
    """Restore the _QueuedOpCode from the serialized form.
166

167
    @type state: dict
168
    @param state: the serialized state
169
    @rtype: _QueuedOpCode
170
    @return: a new _QueuedOpCode instance
171

172
    """
173
    obj = _QueuedOpCode.__new__(cls)
174
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
175
    obj.status = state["status"]
176
    obj.result = state["result"]
177
    obj.log = state["log"]
178
    obj.start_timestamp = state.get("start_timestamp", None)
179
    obj.exec_timestamp = state.get("exec_timestamp", None)
180
    obj.end_timestamp = state.get("end_timestamp", None)
181
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
182
    return obj
183

    
184
  def Serialize(self):
185
    """Serializes this _QueuedOpCode.
186

187
    @rtype: dict
188
    @return: the dictionary holding the serialized state
189

190
    """
191
    return {
192
      "input": self.input.__getstate__(),
193
      "status": self.status,
194
      "result": self.result,
195
      "log": self.log,
196
      "start_timestamp": self.start_timestamp,
197
      "exec_timestamp": self.exec_timestamp,
198
      "end_timestamp": self.end_timestamp,
199
      "priority": self.priority,
200
      }
201

    
202

    
203
class _QueuedJob(object):
204
  """In-memory job representation.
205

206
  This is what we use to track the user-submitted jobs. Locking must
207
  be taken care of by users of this class.
208

209
  @type queue: L{JobQueue}
210
  @ivar queue: the parent queue
211
  @ivar id: the job ID
212
  @type ops: list
213
  @ivar ops: the list of _QueuedOpCode that constitute the job
214
  @type log_serial: int
215
  @ivar log_serial: holds the index for the next log entry
216
  @ivar received_timestamp: the timestamp for when the job was received
217
  @ivar start_timestmap: the timestamp for start of execution
218
  @ivar end_timestamp: the timestamp for end of execution
219
  @ivar writable: Whether the job is allowed to be modified
220

221
  """
222
  # pylint: disable=W0212
223
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
224
               "received_timestamp", "start_timestamp", "end_timestamp",
225
               "__weakref__", "processor_lock", "writable", "archived"]
226

    
227
  def _AddReasons(self):
228
    """Extend the reason trail
229

230
    Add the reason for all the opcodes of this job to be executed.
231

232
    """
233
    count = 0
234
    for queued_op in self.ops:
235
      op = queued_op.input
236
      reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__)
237
      reason_text = "job=%d;index=%d" % (self.id, count)
238
      reason = getattr(op, "reason", [])
239
      reason.append((reason_src, reason_text, utils.EpochNano()))
240
      op.reason = reason
241
      count = count + 1
242

    
243
  def __init__(self, queue, job_id, ops, writable):
244
    """Constructor for the _QueuedJob.
245

246
    @type queue: L{JobQueue}
247
    @param queue: our parent queue
248
    @type job_id: job_id
249
    @param job_id: our job id
250
    @type ops: list
251
    @param ops: the list of opcodes we hold, which will be encapsulated
252
        in _QueuedOpCodes
253
    @type writable: bool
254
    @param writable: Whether job can be modified
255

256
    """
257
    if not ops:
258
      raise errors.GenericError("A job needs at least one opcode")
259

    
260
    self.queue = queue
261
    self.id = int(job_id)
262
    self.ops = [_QueuedOpCode(op) for op in ops]
263
    self._AddReasons()
264
    self.log_serial = 0
265
    self.received_timestamp = TimeStampNow()
266
    self.start_timestamp = None
267
    self.end_timestamp = None
268
    self.archived = False
269

    
270
    self._InitInMemory(self, writable)
271

    
272
    assert not self.archived, "New jobs can not be marked as archived"
273

    
274
  @staticmethod
275
  def _InitInMemory(obj, writable):
276
    """Initializes in-memory variables.
277

278
    """
279
    obj.writable = writable
280
    obj.ops_iter = None
281
    obj.cur_opctx = None
282

    
283
    # Read-only jobs are not processed and therefore don't need a lock
284
    if writable:
285
      obj.processor_lock = threading.Lock()
286
    else:
287
      obj.processor_lock = None
288

    
289
  def __repr__(self):
290
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
291
              "id=%s" % self.id,
292
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
293

    
294
    return "<%s at %#x>" % (" ".join(status), id(self))
295

    
296
  @classmethod
297
  def Restore(cls, queue, state, writable, archived):
298
    """Restore a _QueuedJob from serialized state:
299

300
    @type queue: L{JobQueue}
301
    @param queue: to which queue the restored job belongs
302
    @type state: dict
303
    @param state: the serialized state
304
    @type writable: bool
305
    @param writable: Whether job can be modified
306
    @type archived: bool
307
    @param archived: Whether job was already archived
308
    @rtype: _JobQueue
309
    @return: the restored _JobQueue instance
310

311
    """
312
    obj = _QueuedJob.__new__(cls)
313
    obj.queue = queue
314
    obj.id = int(state["id"])
315
    obj.received_timestamp = state.get("received_timestamp", None)
316
    obj.start_timestamp = state.get("start_timestamp", None)
317
    obj.end_timestamp = state.get("end_timestamp", None)
318
    obj.archived = archived
319

    
320
    obj.ops = []
321
    obj.log_serial = 0
322
    for op_state in state["ops"]:
323
      op = _QueuedOpCode.Restore(op_state)
324
      for log_entry in op.log:
325
        obj.log_serial = max(obj.log_serial, log_entry[0])
326
      obj.ops.append(op)
327

    
328
    cls._InitInMemory(obj, writable)
329

    
330
    return obj
331

    
332
  def Serialize(self):
333
    """Serialize the _JobQueue instance.
334

335
    @rtype: dict
336
    @return: the serialized state
337

338
    """
339
    return {
340
      "id": self.id,
341
      "ops": [op.Serialize() for op in self.ops],
342
      "start_timestamp": self.start_timestamp,
343
      "end_timestamp": self.end_timestamp,
344
      "received_timestamp": self.received_timestamp,
345
      }
346

    
347
  def CalcStatus(self):
348
    """Compute the status of this job.
349

350
    This function iterates over all the _QueuedOpCodes in the job and
351
    based on their status, computes the job status.
352

353
    The algorithm is:
354
      - if we find a cancelled, or finished with error, the job
355
        status will be the same
356
      - otherwise, the last opcode with the status one of:
357
          - waitlock
358
          - canceling
359
          - running
360

361
        will determine the job status
362

363
      - otherwise, it means either all opcodes are queued, or success,
364
        and the job status will be the same
365

366
    @return: the job status
367

368
    """
369
    status = constants.JOB_STATUS_QUEUED
370

    
371
    all_success = True
372
    for op in self.ops:
373
      if op.status == constants.OP_STATUS_SUCCESS:
374
        continue
375

    
376
      all_success = False
377

    
378
      if op.status == constants.OP_STATUS_QUEUED:
379
        pass
380
      elif op.status == constants.OP_STATUS_WAITING:
381
        status = constants.JOB_STATUS_WAITING
382
      elif op.status == constants.OP_STATUS_RUNNING:
383
        status = constants.JOB_STATUS_RUNNING
384
      elif op.status == constants.OP_STATUS_CANCELING:
385
        status = constants.JOB_STATUS_CANCELING
386
        break
387
      elif op.status == constants.OP_STATUS_ERROR:
388
        status = constants.JOB_STATUS_ERROR
389
        # The whole job fails if one opcode failed
390
        break
391
      elif op.status == constants.OP_STATUS_CANCELED:
392
        status = constants.OP_STATUS_CANCELED
393
        break
394

    
395
    if all_success:
396
      status = constants.JOB_STATUS_SUCCESS
397

    
398
    return status
399

    
400
  def CalcPriority(self):
401
    """Gets the current priority for this job.
402

403
    Only unfinished opcodes are considered. When all are done, the default
404
    priority is used.
405

406
    @rtype: int
407

408
    """
409
    priorities = [op.priority for op in self.ops
410
                  if op.status not in constants.OPS_FINALIZED]
411

    
412
    if not priorities:
413
      # All opcodes are done, assume default priority
414
      return constants.OP_PRIO_DEFAULT
415

    
416
    return min(priorities)
417

    
418
  def GetLogEntries(self, newer_than):
419
    """Selectively returns the log entries.
420

421
    @type newer_than: None or int
422
    @param newer_than: if this is None, return all log entries,
423
        otherwise return only the log entries with serial higher
424
        than this value
425
    @rtype: list
426
    @return: the list of the log entries selected
427

428
    """
429
    if newer_than is None:
430
      serial = -1
431
    else:
432
      serial = newer_than
433

    
434
    entries = []
435
    for op in self.ops:
436
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
437

    
438
    return entries
439

    
440
  def GetInfo(self, fields):
441
    """Returns information about a job.
442

443
    @type fields: list
444
    @param fields: names of fields to return
445
    @rtype: list
446
    @return: list with one element for each field
447
    @raise errors.OpExecError: when an invalid field
448
        has been passed
449

450
    """
451
    return _SimpleJobQuery(fields)(self)
452

    
453
  def MarkUnfinishedOps(self, status, result):
454
    """Mark unfinished opcodes with a given status and result.
455

456
    This is an utility function for marking all running or waiting to
457
    be run opcodes with a given status. Opcodes which are already
458
    finalised are not changed.
459

460
    @param status: a given opcode status
461
    @param result: the opcode result
462

463
    """
464
    not_marked = True
465
    for op in self.ops:
466
      if op.status in constants.OPS_FINALIZED:
467
        assert not_marked, "Finalized opcodes found after non-finalized ones"
468
        continue
469
      op.status = status
470
      op.result = result
471
      not_marked = False
472

    
473
  def Finalize(self):
474
    """Marks the job as finalized.
475

476
    """
477
    self.end_timestamp = TimeStampNow()
478

    
479
  def Cancel(self):
480
    """Marks job as canceled/-ing if possible.
481

482
    @rtype: tuple; (bool, string)
483
    @return: Boolean describing whether job was successfully canceled or marked
484
      as canceling and a text message
485

486
    """
487
    status = self.CalcStatus()
488

    
489
    if status == constants.JOB_STATUS_QUEUED:
490
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
491
                             "Job canceled by request")
492
      self.Finalize()
493
      return (True, "Job %s canceled" % self.id)
494

    
495
    elif status == constants.JOB_STATUS_WAITING:
496
      # The worker will notice the new status and cancel the job
497
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
498
      return (True, "Job %s will be canceled" % self.id)
499

    
500
    else:
501
      logging.debug("Job %s is no longer waiting in the queue", self.id)
502
      return (False, "Job %s is no longer waiting in the queue" % self.id)
503

    
504
  def ChangePriority(self, priority):
505
    """Changes the job priority.
506

507
    @type priority: int
508
    @param priority: New priority
509
    @rtype: tuple; (bool, string)
510
    @return: Boolean describing whether job's priority was successfully changed
511
      and a text message
512

513
    """
514
    status = self.CalcStatus()
515

    
516
    if status in constants.JOBS_FINALIZED:
517
      return (False, "Job %s is finished" % self.id)
518
    elif status == constants.JOB_STATUS_CANCELING:
519
      return (False, "Job %s is cancelling" % self.id)
520
    else:
521
      assert status in (constants.JOB_STATUS_QUEUED,
522
                        constants.JOB_STATUS_WAITING,
523
                        constants.JOB_STATUS_RUNNING)
524

    
525
      changed = False
526
      for op in self.ops:
527
        if (op.status == constants.OP_STATUS_RUNNING or
528
            op.status in constants.OPS_FINALIZED):
529
          assert not changed, \
530
            ("Found opcode for which priority should not be changed after"
531
             " priority has been changed for previous opcodes")
532
          continue
533

    
534
        assert op.status in (constants.OP_STATUS_QUEUED,
535
                             constants.OP_STATUS_WAITING)
536

    
537
        changed = True
538

    
539
        # Set new priority (doesn't modify opcode input)
540
        op.priority = priority
541

    
542
      if changed:
543
        return (True, ("Priorities of pending opcodes for job %s have been"
544
                       " changed to %s" % (self.id, priority)))
545
      else:
546
        return (False, "Job %s had no pending opcodes" % self.id)
547

    
548

    
549
class _OpExecCallbacks(mcpu.OpExecCbBase):
550
  def __init__(self, queue, job, op):
551
    """Initializes this class.
552

553
    @type queue: L{JobQueue}
554
    @param queue: Job queue
555
    @type job: L{_QueuedJob}
556
    @param job: Job object
557
    @type op: L{_QueuedOpCode}
558
    @param op: OpCode
559

560
    """
561
    assert queue, "Queue is missing"
562
    assert job, "Job is missing"
563
    assert op, "Opcode is missing"
564

    
565
    self._queue = queue
566
    self._job = job
567
    self._op = op
568

    
569
  def _CheckCancel(self):
570
    """Raises an exception to cancel the job if asked to.
571

572
    """
573
    # Cancel here if we were asked to
574
    if self._op.status == constants.OP_STATUS_CANCELING:
575
      logging.debug("Canceling opcode")
576
      raise CancelJob()
577

    
578
    # See if queue is shutting down
579
    if not self._queue.AcceptingJobsUnlocked():
580
      logging.debug("Queue is shutting down")
581
      raise QueueShutdown()
582

    
583
  @locking.ssynchronized(_QUEUE, shared=1)
584
  def NotifyStart(self):
585
    """Mark the opcode as running, not lock-waiting.
586

587
    This is called from the mcpu code as a notifier function, when the LU is
588
    finally about to start the Exec() method. Of course, to have end-user
589
    visible results, the opcode must be initially (before calling into
590
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
591

592
    """
593
    assert self._op in self._job.ops
594
    assert self._op.status in (constants.OP_STATUS_WAITING,
595
                               constants.OP_STATUS_CANCELING)
596

    
597
    # Cancel here if we were asked to
598
    self._CheckCancel()
599

    
600
    logging.debug("Opcode is now running")
601

    
602
    self._op.status = constants.OP_STATUS_RUNNING
603
    self._op.exec_timestamp = TimeStampNow()
604

    
605
    # And finally replicate the job status
606
    self._queue.UpdateJobUnlocked(self._job)
607

    
608
  @locking.ssynchronized(_QUEUE, shared=1)
609
  def _AppendFeedback(self, timestamp, log_type, log_msg):
610
    """Internal feedback append function, with locks
611

612
    """
613
    self._job.log_serial += 1
614
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
615
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
616

    
617
  def Feedback(self, *args):
618
    """Append a log entry.
619

620
    """
621
    assert len(args) < 3
622

    
623
    if len(args) == 1:
624
      log_type = constants.ELOG_MESSAGE
625
      log_msg = args[0]
626
    else:
627
      (log_type, log_msg) = args
628

    
629
    # The time is split to make serialization easier and not lose
630
    # precision.
631
    timestamp = utils.SplitTime(time.time())
632
    self._AppendFeedback(timestamp, log_type, log_msg)
633

    
634
  def CurrentPriority(self):
635
    """Returns current priority for opcode.
636

637
    """
638
    assert self._op.status in (constants.OP_STATUS_WAITING,
639
                               constants.OP_STATUS_CANCELING)
640

    
641
    # Cancel here if we were asked to
642
    self._CheckCancel()
643

    
644
    return self._op.priority
645

    
646
  def SubmitManyJobs(self, jobs):
647
    """Submits jobs for processing.
648

649
    See L{JobQueue.SubmitManyJobs}.
650

651
    """
652
    # Locking is done in job queue
653
    return self._queue.SubmitManyJobs(jobs)
654

    
655

    
656
class _JobChangesChecker(object):
657
  def __init__(self, fields, prev_job_info, prev_log_serial):
658
    """Initializes this class.
659

660
    @type fields: list of strings
661
    @param fields: Fields requested by LUXI client
662
    @type prev_job_info: string
663
    @param prev_job_info: previous job info, as passed by the LUXI client
664
    @type prev_log_serial: string
665
    @param prev_log_serial: previous job serial, as passed by the LUXI client
666

667
    """
668
    self._squery = _SimpleJobQuery(fields)
669
    self._prev_job_info = prev_job_info
670
    self._prev_log_serial = prev_log_serial
671

    
672
  def __call__(self, job):
673
    """Checks whether job has changed.
674

675
    @type job: L{_QueuedJob}
676
    @param job: Job object
677

678
    """
679
    assert not job.writable, "Expected read-only job"
680

    
681
    status = job.CalcStatus()
682
    job_info = self._squery(job)
683
    log_entries = job.GetLogEntries(self._prev_log_serial)
684

    
685
    # Serializing and deserializing data can cause type changes (e.g. from
686
    # tuple to list) or precision loss. We're doing it here so that we get
687
    # the same modifications as the data received from the client. Without
688
    # this, the comparison afterwards might fail without the data being
689
    # significantly different.
690
    # TODO: we just deserialized from disk, investigate how to make sure that
691
    # the job info and log entries are compatible to avoid this further step.
692
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
693
    # efficient, though floats will be tricky
694
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
695
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
696

    
697
    # Don't even try to wait if the job is no longer running, there will be
698
    # no changes.
699
    if (status not in (constants.JOB_STATUS_QUEUED,
700
                       constants.JOB_STATUS_RUNNING,
701
                       constants.JOB_STATUS_WAITING) or
702
        job_info != self._prev_job_info or
703
        (log_entries and self._prev_log_serial != log_entries[0][0])):
704
      logging.debug("Job %s changed", job.id)
705
      return (job_info, log_entries)
706

    
707
    return None
708

    
709

    
710
class _JobFileChangesWaiter(object):
711
  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
712
    """Initializes this class.
713

714
    @type filename: string
715
    @param filename: Path to job file
716
    @raises errors.InotifyError: if the notifier cannot be setup
717

718
    """
719
    self._wm = _inotify_wm_cls()
720
    self._inotify_handler = \
721
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
722
    self._notifier = \
723
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
724
    try:
725
      self._inotify_handler.enable()
726
    except Exception:
727
      # pyinotify doesn't close file descriptors automatically
728
      self._notifier.stop()
729
      raise
730

    
731
  def _OnInotify(self, notifier_enabled):
732
    """Callback for inotify.
733

734
    """
735
    if not notifier_enabled:
736
      self._inotify_handler.enable()
737

    
738
  def Wait(self, timeout):
739
    """Waits for the job file to change.
740

741
    @type timeout: float
742
    @param timeout: Timeout in seconds
743
    @return: Whether there have been events
744

745
    """
746
    assert timeout >= 0
747
    have_events = self._notifier.check_events(timeout * 1000)
748
    if have_events:
749
      self._notifier.read_events()
750
    self._notifier.process_events()
751
    return have_events
752

    
753
  def Close(self):
754
    """Closes underlying notifier and its file descriptor.
755

756
    """
757
    self._notifier.stop()
758

    
759

    
760
class _JobChangesWaiter(object):
761
  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
762
    """Initializes this class.
763

764
    @type filename: string
765
    @param filename: Path to job file
766

767
    """
768
    self._filewaiter = None
769
    self._filename = filename
770
    self._waiter_cls = _waiter_cls
771

    
772
  def Wait(self, timeout):
773
    """Waits for a job to change.
774

775
    @type timeout: float
776
    @param timeout: Timeout in seconds
777
    @return: Whether there have been events
778

779
    """
780
    if self._filewaiter:
781
      return self._filewaiter.Wait(timeout)
782

    
783
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
784
    # If this point is reached, return immediately and let caller check the job
785
    # file again in case there were changes since the last check. This avoids a
786
    # race condition.
787
    self._filewaiter = self._waiter_cls(self._filename)
788

    
789
    return True
790

    
791
  def Close(self):
792
    """Closes underlying waiter.
793

794
    """
795
    if self._filewaiter:
796
      self._filewaiter.Close()
797

    
798

    
799
class _WaitForJobChangesHelper(object):
800
  """Helper class using inotify to wait for changes in a job file.
801

802
  This class takes a previous job status and serial, and alerts the client when
803
  the current job status has changed.
804

805
  """
806
  @staticmethod
807
  def _CheckForChanges(counter, job_load_fn, check_fn):
808
    if counter.next() > 0:
809
      # If this isn't the first check the job is given some more time to change
810
      # again. This gives better performance for jobs generating many
811
      # changes/messages.
812
      time.sleep(0.1)
813

    
814
    job = job_load_fn()
815
    if not job:
816
      raise errors.JobLost()
817

    
818
    result = check_fn(job)
819
    if result is None:
820
      raise utils.RetryAgain()
821

    
822
    return result
823

    
824
  def __call__(self, filename, job_load_fn,
825
               fields, prev_job_info, prev_log_serial, timeout,
826
               _waiter_cls=_JobChangesWaiter):
827
    """Waits for changes on a job.
828

829
    @type filename: string
830
    @param filename: File on which to wait for changes
831
    @type job_load_fn: callable
832
    @param job_load_fn: Function to load job
833
    @type fields: list of strings
834
    @param fields: Which fields to check for changes
835
    @type prev_job_info: list or None
836
    @param prev_job_info: Last job information returned
837
    @type prev_log_serial: int
838
    @param prev_log_serial: Last job message serial number
839
    @type timeout: float
840
    @param timeout: maximum time to wait in seconds
841

842
    """
843
    counter = itertools.count()
844
    try:
845
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
846
      waiter = _waiter_cls(filename)
847
      try:
848
        return utils.Retry(compat.partial(self._CheckForChanges,
849
                                          counter, job_load_fn, check_fn),
850
                           utils.RETRY_REMAINING_TIME, timeout,
851
                           wait_fn=waiter.Wait)
852
      finally:
853
        waiter.Close()
854
    except errors.JobLost:
855
      return None
856
    except utils.RetryTimeout:
857
      return constants.JOB_NOTCHANGED
858

    
859

    
860
def _EncodeOpError(err):
861
  """Encodes an error which occurred while processing an opcode.
862

863
  """
864
  if isinstance(err, errors.GenericError):
865
    to_encode = err
866
  else:
867
    to_encode = errors.OpExecError(str(err))
868

    
869
  return errors.EncodeException(to_encode)
870

    
871

    
872
class _TimeoutStrategyWrapper:
873
  def __init__(self, fn):
874
    """Initializes this class.
875

876
    """
877
    self._fn = fn
878
    self._next = None
879

    
880
  def _Advance(self):
881
    """Gets the next timeout if necessary.
882

883
    """
884
    if self._next is None:
885
      self._next = self._fn()
886

    
887
  def Peek(self):
888
    """Returns the next timeout.
889

890
    """
891
    self._Advance()
892
    return self._next
893

    
894
  def Next(self):
895
    """Returns the current timeout and advances the internal state.
896

897
    """
898
    self._Advance()
899
    result = self._next
900
    self._next = None
901
    return result
902

    
903

    
904
class _OpExecContext:
905
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
906
    """Initializes this class.
907

908
    """
909
    self.op = op
910
    self.index = index
911
    self.log_prefix = log_prefix
912
    self.summary = op.input.Summary()
913

    
914
    # Create local copy to modify
915
    if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
916
      self.jobdeps = op.input.depends[:]
917
    else:
918
      self.jobdeps = None
919

    
920
    self._timeout_strategy_factory = timeout_strategy_factory
921
    self._ResetTimeoutStrategy()
922

    
923
  def _ResetTimeoutStrategy(self):
924
    """Creates a new timeout strategy.
925

926
    """
927
    self._timeout_strategy = \
928
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
929

    
930
  def CheckPriorityIncrease(self):
931
    """Checks whether priority can and should be increased.
932

933
    Called when locks couldn't be acquired.
934

935
    """
936
    op = self.op
937

    
938
    # Exhausted all retries and next round should not use blocking acquire
939
    # for locks?
940
    if (self._timeout_strategy.Peek() is None and
941
        op.priority > constants.OP_PRIO_HIGHEST):
942
      logging.debug("Increasing priority")
943
      op.priority -= 1
944
      self._ResetTimeoutStrategy()
945
      return True
946

    
947
    return False
948

    
949
  def GetNextLockTimeout(self):
950
    """Returns the next lock acquire timeout.
951

952
    """
953
    return self._timeout_strategy.Next()
954

    
955

    
956
class _JobProcessor(object):
957
  (DEFER,
958
   WAITDEP,
959
   FINISHED) = range(1, 4)
960

    
961
  def __init__(self, queue, opexec_fn, job,
962
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
963
    """Initializes this class.
964

965
    """
966
    self.queue = queue
967
    self.opexec_fn = opexec_fn
968
    self.job = job
969
    self._timeout_strategy_factory = _timeout_strategy_factory
970

    
971
  @staticmethod
972
  def _FindNextOpcode(job, timeout_strategy_factory):
973
    """Locates the next opcode to run.
974

975
    @type job: L{_QueuedJob}
976
    @param job: Job object
977
    @param timeout_strategy_factory: Callable to create new timeout strategy
978

979
    """
980
    # Create some sort of a cache to speed up locating next opcode for future
981
    # lookups
982
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
983
    # pending and one for processed ops.
984
    if job.ops_iter is None:
985
      job.ops_iter = enumerate(job.ops)
986

    
987
    # Find next opcode to run
988
    while True:
989
      try:
990
        (idx, op) = job.ops_iter.next()
991
      except StopIteration:
992
        raise errors.ProgrammerError("Called for a finished job")
993

    
994
      if op.status == constants.OP_STATUS_RUNNING:
995
        # Found an opcode already marked as running
996
        raise errors.ProgrammerError("Called for job marked as running")
997

    
998
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
999
                             timeout_strategy_factory)
1000

    
1001
      if op.status not in constants.OPS_FINALIZED:
1002
        return opctx
1003

    
1004
      # This is a job that was partially completed before master daemon
1005
      # shutdown, so it can be expected that some opcodes are already
1006
      # completed successfully (if any did error out, then the whole job
1007
      # should have been aborted and not resubmitted for processing).
1008
      logging.info("%s: opcode %s already processed, skipping",
1009
                   opctx.log_prefix, opctx.summary)
1010

    
1011
  @staticmethod
1012
  def _MarkWaitlock(job, op):
1013
    """Marks an opcode as waiting for locks.
1014

1015
    The job's start timestamp is also set if necessary.
1016

1017
    @type job: L{_QueuedJob}
1018
    @param job: Job object
1019
    @type op: L{_QueuedOpCode}
1020
    @param op: Opcode object
1021

1022
    """
1023
    assert op in job.ops
1024
    assert op.status in (constants.OP_STATUS_QUEUED,
1025
                         constants.OP_STATUS_WAITING)
1026

    
1027
    update = False
1028

    
1029
    op.result = None
1030

    
1031
    if op.status == constants.OP_STATUS_QUEUED:
1032
      op.status = constants.OP_STATUS_WAITING
1033
      update = True
1034

    
1035
    if op.start_timestamp is None:
1036
      op.start_timestamp = TimeStampNow()
1037
      update = True
1038

    
1039
    if job.start_timestamp is None:
1040
      job.start_timestamp = op.start_timestamp
1041
      update = True
1042

    
1043
    assert op.status == constants.OP_STATUS_WAITING
1044

    
1045
    return update
1046

    
1047
  @staticmethod
1048
  def _CheckDependencies(queue, job, opctx):
1049
    """Checks if an opcode has dependencies and if so, processes them.
1050

1051
    @type queue: L{JobQueue}
1052
    @param queue: Queue object
1053
    @type job: L{_QueuedJob}
1054
    @param job: Job object
1055
    @type opctx: L{_OpExecContext}
1056
    @param opctx: Opcode execution context
1057
    @rtype: bool
1058
    @return: Whether opcode will be re-scheduled by dependency tracker
1059

1060
    """
1061
    op = opctx.op
1062

    
1063
    result = False
1064

    
1065
    while opctx.jobdeps:
1066
      (dep_job_id, dep_status) = opctx.jobdeps[0]
1067

    
1068
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1069
                                                          dep_status)
1070
      assert ht.TNonEmptyString(depmsg), "No dependency message"
1071

    
1072
      logging.info("%s: %s", opctx.log_prefix, depmsg)
1073

    
1074
      if depresult == _JobDependencyManager.CONTINUE:
1075
        # Remove dependency and continue
1076
        opctx.jobdeps.pop(0)
1077

    
1078
      elif depresult == _JobDependencyManager.WAIT:
1079
        # Need to wait for notification, dependency tracker will re-add job
1080
        # to workerpool
1081
        result = True
1082
        break
1083

    
1084
      elif depresult == _JobDependencyManager.CANCEL:
1085
        # Job was cancelled, cancel this job as well
1086
        job.Cancel()
1087
        assert op.status == constants.OP_STATUS_CANCELING
1088
        break
1089

    
1090
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1091
                         _JobDependencyManager.ERROR):
1092
        # Job failed or there was an error, this job must fail
1093
        op.status = constants.OP_STATUS_ERROR
1094
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1095
        break
1096

    
1097
      else:
1098
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1099
                                     depresult)
1100

    
1101
    return result
1102

    
1103
  def _ExecOpCodeUnlocked(self, opctx):
1104
    """Processes one opcode and returns the result.
1105

1106
    """
1107
    op = opctx.op
1108

    
1109
    assert op.status == constants.OP_STATUS_WAITING
1110

    
1111
    timeout = opctx.GetNextLockTimeout()
1112

    
1113
    try:
1114
      # Make sure not to hold queue lock while calling ExecOpCode
1115
      result = self.opexec_fn(op.input,
1116
                              _OpExecCallbacks(self.queue, self.job, op),
1117
                              timeout=timeout)
1118
    except mcpu.LockAcquireTimeout:
1119
      assert timeout is not None, "Received timeout for blocking acquire"
1120
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1121

    
1122
      assert op.status in (constants.OP_STATUS_WAITING,
1123
                           constants.OP_STATUS_CANCELING)
1124

    
1125
      # Was job cancelled while we were waiting for the lock?
1126
      if op.status == constants.OP_STATUS_CANCELING:
1127
        return (constants.OP_STATUS_CANCELING, None)
1128

    
1129
      # Queue is shutting down, return to queued
1130
      if not self.queue.AcceptingJobsUnlocked():
1131
        return (constants.OP_STATUS_QUEUED, None)
1132

    
1133
      # Stay in waitlock while trying to re-acquire lock
1134
      return (constants.OP_STATUS_WAITING, None)
1135
    except CancelJob:
1136
      logging.exception("%s: Canceling job", opctx.log_prefix)
1137
      assert op.status == constants.OP_STATUS_CANCELING
1138
      return (constants.OP_STATUS_CANCELING, None)
1139

    
1140
    except QueueShutdown:
1141
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1142

    
1143
      assert op.status == constants.OP_STATUS_WAITING
1144

    
1145
      # Job hadn't been started yet, so it should return to the queue
1146
      return (constants.OP_STATUS_QUEUED, None)
1147

    
1148
    except Exception, err: # pylint: disable=W0703
1149
      logging.exception("%s: Caught exception in %s",
1150
                        opctx.log_prefix, opctx.summary)
1151
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1152
    else:
1153
      logging.debug("%s: %s successful",
1154
                    opctx.log_prefix, opctx.summary)
1155
      return (constants.OP_STATUS_SUCCESS, result)
1156

    
1157
  def __call__(self, _nextop_fn=None):
1158
    """Continues execution of a job.
1159

1160
    @param _nextop_fn: Callback function for tests
1161
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1162
      be deferred and C{WAITDEP} if the dependency manager
1163
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1164

1165
    """
1166
    queue = self.queue
1167
    job = self.job
1168

    
1169
    logging.debug("Processing job %s", job.id)
1170

    
1171
    queue.acquire(shared=1)
1172
    try:
1173
      opcount = len(job.ops)
1174

    
1175
      assert job.writable, "Expected writable job"
1176

    
1177
      # Don't do anything for finalized jobs
1178
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1179
        return self.FINISHED
1180

    
1181
      # Is a previous opcode still pending?
1182
      if job.cur_opctx:
1183
        opctx = job.cur_opctx
1184
        job.cur_opctx = None
1185
      else:
1186
        if __debug__ and _nextop_fn:
1187
          _nextop_fn()
1188
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1189

    
1190
      op = opctx.op
1191

    
1192
      # Consistency check
1193
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1194
                                     constants.OP_STATUS_CANCELING)
1195
                        for i in job.ops[opctx.index + 1:])
1196

    
1197
      assert op.status in (constants.OP_STATUS_QUEUED,
1198
                           constants.OP_STATUS_WAITING,
1199
                           constants.OP_STATUS_CANCELING)
1200

    
1201
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1202
              op.priority >= constants.OP_PRIO_HIGHEST)
1203

    
1204
      waitjob = None
1205

    
1206
      if op.status != constants.OP_STATUS_CANCELING:
1207
        assert op.status in (constants.OP_STATUS_QUEUED,
1208
                             constants.OP_STATUS_WAITING)
1209

    
1210
        # Prepare to start opcode
1211
        if self._MarkWaitlock(job, op):
1212
          # Write to disk
1213
          queue.UpdateJobUnlocked(job)
1214

    
1215
        assert op.status == constants.OP_STATUS_WAITING
1216
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1217
        assert job.start_timestamp and op.start_timestamp
1218
        assert waitjob is None
1219

    
1220
        # Check if waiting for a job is necessary
1221
        waitjob = self._CheckDependencies(queue, job, opctx)
1222

    
1223
        assert op.status in (constants.OP_STATUS_WAITING,
1224
                             constants.OP_STATUS_CANCELING,
1225
                             constants.OP_STATUS_ERROR)
1226

    
1227
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1228
                                         constants.OP_STATUS_ERROR)):
1229
          logging.info("%s: opcode %s waiting for locks",
1230
                       opctx.log_prefix, opctx.summary)
1231

    
1232
          assert not opctx.jobdeps, "Not all dependencies were removed"
1233

    
1234
          queue.release()
1235
          try:
1236
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1237
          finally:
1238
            queue.acquire(shared=1)
1239

    
1240
          op.status = op_status
1241
          op.result = op_result
1242

    
1243
          assert not waitjob
1244

    
1245
        if op.status in (constants.OP_STATUS_WAITING,
1246
                         constants.OP_STATUS_QUEUED):
1247
          # waiting: Couldn't get locks in time
1248
          # queued: Queue is shutting down
1249
          assert not op.end_timestamp
1250
        else:
1251
          # Finalize opcode
1252
          op.end_timestamp = TimeStampNow()
1253

    
1254
          if op.status == constants.OP_STATUS_CANCELING:
1255
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1256
                                  for i in job.ops[opctx.index:])
1257
          else:
1258
            assert op.status in constants.OPS_FINALIZED
1259

    
1260
      if op.status == constants.OP_STATUS_QUEUED:
1261
        # Queue is shutting down
1262
        assert not waitjob
1263

    
1264
        finalize = False
1265

    
1266
        # Reset context
1267
        job.cur_opctx = None
1268

    
1269
        # In no case must the status be finalized here
1270
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1271

    
1272
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1273
        finalize = False
1274

    
1275
        if not waitjob and opctx.CheckPriorityIncrease():
1276
          # Priority was changed, need to update on-disk file
1277
          queue.UpdateJobUnlocked(job)
1278

    
1279
        # Keep around for another round
1280
        job.cur_opctx = opctx
1281

    
1282
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1283
                op.priority >= constants.OP_PRIO_HIGHEST)
1284

    
1285
        # In no case must the status be finalized here
1286
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1287

    
1288
      else:
1289
        # Ensure all opcodes so far have been successful
1290
        assert (opctx.index == 0 or
1291
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1292
                           for i in job.ops[:opctx.index]))
1293

    
1294
        # Reset context
1295
        job.cur_opctx = None
1296

    
1297
        if op.status == constants.OP_STATUS_SUCCESS:
1298
          finalize = False
1299

    
1300
        elif op.status == constants.OP_STATUS_ERROR:
1301
          # Ensure failed opcode has an exception as its result
1302
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1303

    
1304
          to_encode = errors.OpExecError("Preceding opcode failed")
1305
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1306
                                _EncodeOpError(to_encode))
1307
          finalize = True
1308

    
1309
          # Consistency check
1310
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1311
                            errors.GetEncodedError(i.result)
1312
                            for i in job.ops[opctx.index:])
1313

    
1314
        elif op.status == constants.OP_STATUS_CANCELING:
1315
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1316
                                "Job canceled by request")
1317
          finalize = True
1318

    
1319
        else:
1320
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1321

    
1322
        if opctx.index == (opcount - 1):
1323
          # Finalize on last opcode
1324
          finalize = True
1325

    
1326
        if finalize:
1327
          # All opcodes have been run, finalize job
1328
          job.Finalize()
1329

    
1330
        # Write to disk. If the job status is final, this is the final write
1331
        # allowed. Once the file has been written, it can be archived anytime.
1332
        queue.UpdateJobUnlocked(job)
1333

    
1334
        assert not waitjob
1335

    
1336
        if finalize:
1337
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1338
          return self.FINISHED
1339

    
1340
      assert not waitjob or queue.depmgr.JobWaiting(job)
1341

    
1342
      if waitjob:
1343
        return self.WAITDEP
1344
      else:
1345
        return self.DEFER
1346
    finally:
1347
      assert job.writable, "Job became read-only while being processed"
1348
      queue.release()
1349

    
1350

    
1351
def _EvaluateJobProcessorResult(depmgr, job, result):
1352
  """Looks at a result from L{_JobProcessor} for a job.
1353

1354
  To be used in a L{_JobQueueWorker}.
1355

1356
  """
1357
  if result == _JobProcessor.FINISHED:
1358
    # Notify waiting jobs
1359
    depmgr.NotifyWaiters(job.id)
1360

    
1361
  elif result == _JobProcessor.DEFER:
1362
    # Schedule again
1363
    raise workerpool.DeferTask(priority=job.CalcPriority())
1364

    
1365
  elif result == _JobProcessor.WAITDEP:
1366
    # No-op, dependency manager will re-schedule
1367
    pass
1368

    
1369
  else:
1370
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1371
                                 (result, ))
1372

    
1373

    
1374
class _JobQueueWorker(workerpool.BaseWorker):
1375
  """The actual job workers.
1376

1377
  """
1378
  def RunTask(self, job): # pylint: disable=W0221
1379
    """Job executor.
1380

1381
    @type job: L{_QueuedJob}
1382
    @param job: the job to be processed
1383

1384
    """
1385
    assert job.writable, "Expected writable job"
1386

    
1387
    # Ensure only one worker is active on a single job. If a job registers for
1388
    # a dependency job, and the other job notifies before the first worker is
1389
    # done, the job can end up in the tasklist more than once.
1390
    job.processor_lock.acquire()
1391
    try:
1392
      return self._RunTaskInner(job)
1393
    finally:
1394
      job.processor_lock.release()
1395

    
1396
  def _RunTaskInner(self, job):
1397
    """Executes a job.
1398

1399
    Must be called with per-job lock acquired.
1400

1401
    """
1402
    queue = job.queue
1403
    assert queue == self.pool.queue
1404

    
1405
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1406
    setname_fn(None)
1407

    
1408
    proc = mcpu.Processor(queue.context, job.id)
1409

    
1410
    # Create wrapper for setting thread name
1411
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1412
                                    proc.ExecOpCode)
1413

    
1414
    _EvaluateJobProcessorResult(queue.depmgr, job,
1415
                                _JobProcessor(queue, wrap_execop_fn, job)())
1416

    
1417
  @staticmethod
1418
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1419
    """Updates the worker thread name to include a short summary of the opcode.
1420

1421
    @param setname_fn: Callable setting worker thread name
1422
    @param execop_fn: Callable for executing opcode (usually
1423
                      L{mcpu.Processor.ExecOpCode})
1424

1425
    """
1426
    setname_fn(op)
1427
    try:
1428
      return execop_fn(op, *args, **kwargs)
1429
    finally:
1430
      setname_fn(None)
1431

    
1432
  @staticmethod
1433
  def _GetWorkerName(job, op):
1434
    """Sets the worker thread name.
1435

1436
    @type job: L{_QueuedJob}
1437
    @type op: L{opcodes.OpCode}
1438

1439
    """
1440
    parts = ["Job%s" % job.id]
1441

    
1442
    if op:
1443
      parts.append(op.TinySummary())
1444

    
1445
    return "/".join(parts)
1446

    
1447

    
1448
class _JobQueueWorkerPool(workerpool.WorkerPool):
1449
  """Simple class implementing a job-processing workerpool.
1450

1451
  """
1452
  def __init__(self, queue):
1453
    super(_JobQueueWorkerPool, self).__init__("Jq",
1454
                                              JOBQUEUE_THREADS,
1455
                                              _JobQueueWorker)
1456
    self.queue = queue
1457

    
1458

    
1459
class _JobDependencyManager:
1460
  """Keeps track of job dependencies.
1461

1462
  """
1463
  (WAIT,
1464
   ERROR,
1465
   CANCEL,
1466
   CONTINUE,
1467
   WRONGSTATUS) = range(1, 6)
1468

    
1469
  def __init__(self, getstatus_fn, enqueue_fn):
1470
    """Initializes this class.
1471

1472
    """
1473
    self._getstatus_fn = getstatus_fn
1474
    self._enqueue_fn = enqueue_fn
1475

    
1476
    self._waiters = {}
1477
    self._lock = locking.SharedLock("JobDepMgr")
1478

    
1479
  @locking.ssynchronized(_LOCK, shared=1)
1480
  def GetLockInfo(self, requested): # pylint: disable=W0613
1481
    """Retrieves information about waiting jobs.
1482

1483
    @type requested: set
1484
    @param requested: Requested information, see C{query.LQ_*}
1485

1486
    """
1487
    # No need to sort here, that's being done by the lock manager and query
1488
    # library. There are no priorities for notifying jobs, hence all show up as
1489
    # one item under "pending".
1490
    return [("job/%s" % job_id, None, None,
1491
             [("job", [job.id for job in waiters])])
1492
            for job_id, waiters in self._waiters.items()
1493
            if waiters]
1494

    
1495
  @locking.ssynchronized(_LOCK, shared=1)
1496
  def JobWaiting(self, job):
1497
    """Checks if a job is waiting.
1498

1499
    """
1500
    return compat.any(job in jobs
1501
                      for jobs in self._waiters.values())
1502

    
1503
  @locking.ssynchronized(_LOCK)
1504
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1505
    """Checks if a dependency job has the requested status.
1506

1507
    If the other job is not yet in a finalized status, the calling job will be
1508
    notified (re-added to the workerpool) at a later point.
1509

1510
    @type job: L{_QueuedJob}
1511
    @param job: Job object
1512
    @type dep_job_id: int
1513
    @param dep_job_id: ID of dependency job
1514
    @type dep_status: list
1515
    @param dep_status: Required status
1516

1517
    """
1518
    assert ht.TJobId(job.id)
1519
    assert ht.TJobId(dep_job_id)
1520
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1521

    
1522
    if job.id == dep_job_id:
1523
      return (self.ERROR, "Job can't depend on itself")
1524

    
1525
    # Get status of dependency job
1526
    try:
1527
      status = self._getstatus_fn(dep_job_id)
1528
    except errors.JobLost, err:
1529
      return (self.ERROR, "Dependency error: %s" % err)
1530

    
1531
    assert status in constants.JOB_STATUS_ALL
1532

    
1533
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1534

    
1535
    if status not in constants.JOBS_FINALIZED:
1536
      # Register for notification and wait for job to finish
1537
      job_id_waiters.add(job)
1538
      return (self.WAIT,
1539
              "Need to wait for job %s, wanted status '%s'" %
1540
              (dep_job_id, dep_status))
1541

    
1542
    # Remove from waiters list
1543
    if job in job_id_waiters:
1544
      job_id_waiters.remove(job)
1545

    
1546
    if (status == constants.JOB_STATUS_CANCELED and
1547
        constants.JOB_STATUS_CANCELED not in dep_status):
1548
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1549

    
1550
    elif not dep_status or status in dep_status:
1551
      return (self.CONTINUE,
1552
              "Dependency job %s finished with status '%s'" %
1553
              (dep_job_id, status))
1554

    
1555
    else:
1556
      return (self.WRONGSTATUS,
1557
              "Dependency job %s finished with status '%s',"
1558
              " not one of '%s' as required" %
1559
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1560

    
1561
  def _RemoveEmptyWaitersUnlocked(self):
1562
    """Remove all jobs without actual waiters.
1563

1564
    """
1565
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1566
                   if not waiters]:
1567
      del self._waiters[job_id]
1568

    
1569
  def NotifyWaiters(self, job_id):
1570
    """Notifies all jobs waiting for a certain job ID.
1571

1572
    @attention: Do not call until L{CheckAndRegister} returned a status other
1573
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1574
    @type job_id: int
1575
    @param job_id: Job ID
1576

1577
    """
1578
    assert ht.TJobId(job_id)
1579

    
1580
    self._lock.acquire()
1581
    try:
1582
      self._RemoveEmptyWaitersUnlocked()
1583

    
1584
      jobs = self._waiters.pop(job_id, None)
1585
    finally:
1586
      self._lock.release()
1587

    
1588
    if jobs:
1589
      # Re-add jobs to workerpool
1590
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1591
                    len(jobs), job_id)
1592
      self._enqueue_fn(jobs)
1593

    
1594

    
1595
def _RequireOpenQueue(fn):
1596
  """Decorator for "public" functions.
1597

1598
  This function should be used for all 'public' functions. That is,
1599
  functions usually called from other classes. Note that this should
1600
  be applied only to methods (not plain functions), since it expects
1601
  that the decorated function is called with a first argument that has
1602
  a '_queue_filelock' argument.
1603

1604
  @warning: Use this decorator only after locking.ssynchronized
1605

1606
  Example::
1607
    @locking.ssynchronized(_LOCK)
1608
    @_RequireOpenQueue
1609
    def Example(self):
1610
      pass
1611

1612
  """
1613
  def wrapper(self, *args, **kwargs):
1614
    # pylint: disable=W0212
1615
    assert self._queue_filelock is not None, "Queue should be open"
1616
    return fn(self, *args, **kwargs)
1617
  return wrapper
1618

    
1619

    
1620
def _RequireNonDrainedQueue(fn):
1621
  """Decorator checking for a non-drained queue.
1622

1623
  To be used with functions submitting new jobs.
1624

1625
  """
1626
  def wrapper(self, *args, **kwargs):
1627
    """Wrapper function.
1628

1629
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1630

1631
    """
1632
    # Ok when sharing the big job queue lock, as the drain file is created when
1633
    # the lock is exclusive.
1634
    # Needs access to protected member, pylint: disable=W0212
1635
    if self._drained:
1636
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1637

    
1638
    if not self._accepting_jobs:
1639
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1640

    
1641
    return fn(self, *args, **kwargs)
1642
  return wrapper
1643

    
1644

    
1645
class JobQueue(object):
1646
  """Queue used to manage the jobs.
1647

1648
  """
1649
  def __init__(self, context):
1650
    """Constructor for JobQueue.
1651

1652
    The constructor will initialize the job queue object and then
1653
    start loading the current jobs from disk, either for starting them
1654
    (if they were queue) or for aborting them (if they were already
1655
    running).
1656

1657
    @type context: GanetiContext
1658
    @param context: the context object for access to the configuration
1659
        data and other ganeti objects
1660

1661
    """
1662
    self.context = context
1663
    self._memcache = weakref.WeakValueDictionary()
1664
    self._my_hostname = netutils.Hostname.GetSysName()
1665

    
1666
    # The Big JobQueue lock. If a code block or method acquires it in shared
1667
    # mode safe it must guarantee concurrency with all the code acquiring it in
1668
    # shared mode, including itself. In order not to acquire it at all
1669
    # concurrency must be guaranteed with all code acquiring it in shared mode
1670
    # and all code acquiring it exclusively.
1671
    self._lock = locking.SharedLock("JobQueue")
1672

    
1673
    self.acquire = self._lock.acquire
1674
    self.release = self._lock.release
1675

    
1676
    # Accept jobs by default
1677
    self._accepting_jobs = True
1678

    
1679
    # Initialize the queue, and acquire the filelock.
1680
    # This ensures no other process is working on the job queue.
1681
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1682

    
1683
    # Read serial file
1684
    self._last_serial = jstore.ReadSerial()
1685
    assert self._last_serial is not None, ("Serial file was modified between"
1686
                                           " check in jstore and here")
1687

    
1688
    # Get initial list of nodes
1689
    self._nodes = dict((n.name, n.primary_ip)
1690
                       for n in self.context.cfg.GetAllNodesInfo().values()
1691
                       if n.master_candidate)
1692

    
1693
    # Remove master node
1694
    self._nodes.pop(self._my_hostname, None)
1695

    
1696
    # TODO: Check consistency across nodes
1697

    
1698
    self._queue_size = None
1699
    self._UpdateQueueSizeUnlocked()
1700
    assert ht.TInt(self._queue_size)
1701
    self._drained = jstore.CheckDrainFlag()
1702

    
1703
    # Job dependencies
1704
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1705
                                        self._EnqueueJobs)
1706
    self.context.glm.AddToLockMonitor(self.depmgr)
1707

    
1708
    # Setup worker pool
1709
    self._wpool = _JobQueueWorkerPool(self)
1710

    
1711
  def _PickupJobUnlocked(self, job_id):
1712
    """Load a job from the job queue
1713

1714
    Pick up a job that already is in the job queue and start/resume it.
1715

1716
    """
1717
    job = self._LoadJobUnlocked(job_id)
1718

    
1719
    if job is None:
1720
      logging.warning("Job %s could not be read", job_id)
1721
      return
1722

    
1723
    status = job.CalcStatus()
1724

    
1725
    if status == constants.JOB_STATUS_QUEUED:
1726
      self._EnqueueJobsUnlocked([job])
1727
      logging.info("Restarting job %s", job.id)
1728

    
1729
    elif status in (constants.JOB_STATUS_RUNNING,
1730
                    constants.JOB_STATUS_WAITING,
1731
                    constants.JOB_STATUS_CANCELING):
1732
      logging.warning("Unfinished job %s found: %s", job.id, job)
1733

    
1734
      if status == constants.JOB_STATUS_WAITING:
1735
        job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1736
        self._EnqueueJobsUnlocked([job])
1737
        logging.info("Restarting job %s", job.id)
1738
      else:
1739
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1740
                              "Unclean master daemon shutdown")
1741
        job.Finalize()
1742

    
1743
    self.UpdateJobUnlocked(job)
1744

    
1745
  @locking.ssynchronized(_LOCK)
1746
  def PickupJob(self, job_id):
1747
    self._PickupJobUnlocked(job_id)
1748

    
1749
  def _GetRpc(self, address_list):
1750
    """Gets RPC runner with context.
1751

1752
    """
1753
    return rpc.JobQueueRunner(self.context, address_list)
1754

    
1755
  @locking.ssynchronized(_LOCK)
1756
  @_RequireOpenQueue
1757
  def AddNode(self, node):
1758
    """Register a new node with the queue.
1759

1760
    @type node: L{objects.Node}
1761
    @param node: the node object to be added
1762

1763
    """
1764
    node_name = node.name
1765
    assert node_name != self._my_hostname
1766

    
1767
    # Clean queue directory on added node
1768
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1769
    msg = result.fail_msg
1770
    if msg:
1771
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1772
                      node_name, msg)
1773

    
1774
    if not node.master_candidate:
1775
      # remove if existing, ignoring errors
1776
      self._nodes.pop(node_name, None)
1777
      # and skip the replication of the job ids
1778
      return
1779

    
1780
    # Upload the whole queue excluding archived jobs
1781
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1782

    
1783
    # Upload current serial file
1784
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1785

    
1786
    # Static address list
1787
    addrs = [node.primary_ip]
1788

    
1789
    for file_name in files:
1790
      # Read file content
1791
      content = utils.ReadFile(file_name)
1792

    
1793
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1794
                             file_name, content)
1795
      msg = result[node_name].fail_msg
1796
      if msg:
1797
        logging.error("Failed to upload file %s to node %s: %s",
1798
                      file_name, node_name, msg)
1799

    
1800
    # Set queue drained flag
1801
    result = \
1802
      self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1803
                                                       self._drained)
1804
    msg = result[node_name].fail_msg
1805
    if msg:
1806
      logging.error("Failed to set queue drained flag on node %s: %s",
1807
                    node_name, msg)
1808

    
1809
    self._nodes[node_name] = node.primary_ip
1810

    
1811
  @locking.ssynchronized(_LOCK)
1812
  @_RequireOpenQueue
1813
  def RemoveNode(self, node_name):
1814
    """Callback called when removing nodes from the cluster.
1815

1816
    @type node_name: str
1817
    @param node_name: the name of the node to remove
1818

1819
    """
1820
    self._nodes.pop(node_name, None)
1821

    
1822
  @staticmethod
1823
  def _CheckRpcResult(result, nodes, failmsg):
1824
    """Verifies the status of an RPC call.
1825

1826
    Since we aim to keep consistency should this node (the current
1827
    master) fail, we will log errors if our rpc fail, and especially
1828
    log the case when more than half of the nodes fails.
1829

1830
    @param result: the data as returned from the rpc call
1831
    @type nodes: list
1832
    @param nodes: the list of nodes we made the call to
1833
    @type failmsg: str
1834
    @param failmsg: the identifier to be used for logging
1835

1836
    """
1837
    failed = []
1838
    success = []
1839

    
1840
    for node in nodes:
1841
      msg = result[node].fail_msg
1842
      if msg:
1843
        failed.append(node)
1844
        logging.error("RPC call %s (%s) failed on node %s: %s",
1845
                      result[node].call, failmsg, node, msg)
1846
      else:
1847
        success.append(node)
1848

    
1849
    # +1 for the master node
1850
    if (len(success) + 1) < len(failed):
1851
      # TODO: Handle failing nodes
1852
      logging.error("More than half of the nodes failed")
1853

    
1854
  def _GetNodeIp(self):
1855
    """Helper for returning the node name/ip list.
1856

1857
    @rtype: (list, list)
1858
    @return: a tuple of two lists, the first one with the node
1859
        names and the second one with the node addresses
1860

1861
    """
1862
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1863
    name_list = self._nodes.keys()
1864
    addr_list = [self._nodes[name] for name in name_list]
1865
    return name_list, addr_list
1866

    
1867
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1868
    """Writes a file locally and then replicates it to all nodes.
1869

1870
    This function will replace the contents of a file on the local
1871
    node and then replicate it to all the other nodes we have.
1872

1873
    @type file_name: str
1874
    @param file_name: the path of the file to be replicated
1875
    @type data: str
1876
    @param data: the new contents of the file
1877
    @type replicate: boolean
1878
    @param replicate: whether to spread the changes to the remote nodes
1879

1880
    """
1881
    getents = runtime.GetEnts()
1882
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1883
                    gid=getents.daemons_gid,
1884
                    mode=constants.JOB_QUEUE_FILES_PERMS)
1885

    
1886
    if replicate:
1887
      names, addrs = self._GetNodeIp()
1888
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1889
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1890

    
1891
  def _RenameFilesUnlocked(self, rename):
1892
    """Renames a file locally and then replicate the change.
1893

1894
    This function will rename a file in the local queue directory
1895
    and then replicate this rename to all the other nodes we have.
1896

1897
    @type rename: list of (old, new)
1898
    @param rename: List containing tuples mapping old to new names
1899

1900
    """
1901
    # Rename them locally
1902
    for old, new in rename:
1903
      utils.RenameFile(old, new, mkdir=True)
1904

    
1905
    # ... and on all nodes
1906
    names, addrs = self._GetNodeIp()
1907
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1908
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1909

    
1910
  @staticmethod
1911
  def _GetJobPath(job_id):
1912
    """Returns the job file for a given job id.
1913

1914
    @type job_id: str
1915
    @param job_id: the job identifier
1916
    @rtype: str
1917
    @return: the path to the job file
1918

1919
    """
1920
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1921

    
1922
  @staticmethod
1923
  def _GetArchivedJobPath(job_id):
1924
    """Returns the archived job file for a give job id.
1925

1926
    @type job_id: str
1927
    @param job_id: the job identifier
1928
    @rtype: str
1929
    @return: the path to the archived job file
1930

1931
    """
1932
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1933
                          jstore.GetArchiveDirectory(job_id),
1934
                          "job-%s" % job_id)
1935

    
1936
  @staticmethod
1937
  def _DetermineJobDirectories(archived):
1938
    """Build list of directories containing job files.
1939

1940
    @type archived: bool
1941
    @param archived: Whether to include directories for archived jobs
1942
    @rtype: list
1943

1944
    """
1945
    result = [pathutils.QUEUE_DIR]
1946

    
1947
    if archived:
1948
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1949
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
1950
                        utils.ListVisibleFiles(archive_path)))
1951

    
1952
    return result
1953

    
1954
  @classmethod
1955
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1956
    """Return all known job IDs.
1957

1958
    The method only looks at disk because it's a requirement that all
1959
    jobs are present on disk (so in the _memcache we don't have any
1960
    extra IDs).
1961

1962
    @type sort: boolean
1963
    @param sort: perform sorting on the returned job ids
1964
    @rtype: list
1965
    @return: the list of job IDs
1966

1967
    """
1968
    jlist = []
1969

    
1970
    for path in cls._DetermineJobDirectories(archived):
1971
      for filename in utils.ListVisibleFiles(path):
1972
        m = constants.JOB_FILE_RE.match(filename)
1973
        if m:
1974
          jlist.append(int(m.group(1)))
1975

    
1976
    if sort:
1977
      jlist.sort()
1978
    return jlist
1979

    
1980
  def _LoadJobUnlocked(self, job_id):
1981
    """Loads a job from the disk or memory.
1982

1983
    Given a job id, this will return the cached job object if
1984
    existing, or try to load the job from the disk. If loading from
1985
    disk, it will also add the job to the cache.
1986

1987
    @type job_id: int
1988
    @param job_id: the job id
1989
    @rtype: L{_QueuedJob} or None
1990
    @return: either None or the job object
1991

1992
    """
1993
    job = self._memcache.get(job_id, None)
1994
    if job:
1995
      logging.debug("Found job %s in memcache", job_id)
1996
      assert job.writable, "Found read-only job in memcache"
1997
      return job
1998

    
1999
    try:
2000
      job = self._LoadJobFromDisk(job_id, False)
2001
      if job is None:
2002
        return job
2003
    except errors.JobFileCorrupted:
2004
      old_path = self._GetJobPath(job_id)
2005
      new_path = self._GetArchivedJobPath(job_id)
2006
      if old_path == new_path:
2007
        # job already archived (future case)
2008
        logging.exception("Can't parse job %s", job_id)
2009
      else:
2010
        # non-archived case
2011
        logging.exception("Can't parse job %s, will archive.", job_id)
2012
        self._RenameFilesUnlocked([(old_path, new_path)])
2013
      return None
2014

    
2015
    assert job.writable, "Job just loaded is not writable"
2016

    
2017
    self._memcache[job_id] = job
2018
    logging.debug("Added job %s to the cache", job_id)
2019
    return job
2020

    
2021
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2022
    """Load the given job file from disk.
2023

2024
    Given a job file, read, load and restore it in a _QueuedJob format.
2025

2026
    @type job_id: int
2027
    @param job_id: job identifier
2028
    @type try_archived: bool
2029
    @param try_archived: Whether to try loading an archived job
2030
    @rtype: L{_QueuedJob} or None
2031
    @return: either None or the job object
2032

2033
    """
2034
    path_functions = [(self._GetJobPath, False)]
2035

    
2036
    if try_archived:
2037
      path_functions.append((self._GetArchivedJobPath, True))
2038

    
2039
    raw_data = None
2040
    archived = None
2041

    
2042
    for (fn, archived) in path_functions:
2043
      filepath = fn(job_id)
2044
      logging.debug("Loading job from %s", filepath)
2045
      try:
2046
        raw_data = utils.ReadFile(filepath)
2047
      except EnvironmentError, err:
2048
        if err.errno != errno.ENOENT:
2049
          raise
2050
      else:
2051
        break
2052

    
2053
    if not raw_data:
2054
      return None
2055

    
2056
    if writable is None:
2057
      writable = not archived
2058

    
2059
    try:
2060
      data = serializer.LoadJson(raw_data)
2061
      job = _QueuedJob.Restore(self, data, writable, archived)
2062
    except Exception, err: # pylint: disable=W0703
2063
      raise errors.JobFileCorrupted(err)
2064

    
2065
    return job
2066

    
2067
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2068
    """Load the given job file from disk.
2069

2070
    Given a job file, read, load and restore it in a _QueuedJob format.
2071
    In case of error reading the job, it gets returned as None, and the
2072
    exception is logged.
2073

2074
    @type job_id: int
2075
    @param job_id: job identifier
2076
    @type try_archived: bool
2077
    @param try_archived: Whether to try loading an archived job
2078
    @rtype: L{_QueuedJob} or None
2079
    @return: either None or the job object
2080

2081
    """
2082
    try:
2083
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2084
    except (errors.JobFileCorrupted, EnvironmentError):
2085
      logging.exception("Can't load/parse job %s", job_id)
2086
      return None
2087

    
2088
  def _UpdateQueueSizeUnlocked(self):
2089
    """Update the queue size.
2090

2091
    """
2092
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2093

    
2094
  @locking.ssynchronized(_LOCK)
2095
  @_RequireOpenQueue
2096
  def SetDrainFlag(self, drain_flag):
2097
    """Sets the drain flag for the queue.
2098

2099
    @type drain_flag: boolean
2100
    @param drain_flag: Whether to set or unset the drain flag
2101

2102
    """
2103
    # Change flag locally
2104
    jstore.SetDrainFlag(drain_flag)
2105

    
2106
    self._drained = drain_flag
2107

    
2108
    # ... and on all nodes
2109
    (names, addrs) = self._GetNodeIp()
2110
    result = \
2111
      self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2112
    self._CheckRpcResult(result, self._nodes,
2113
                         "Setting queue drain flag to %s" % drain_flag)
2114

    
2115
    return True
2116

    
2117
  @classmethod
2118
  def SubmitJob(cls, ops):
2119
    """Create and store a new job.
2120

2121
    """
2122
    return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitJob(ops)
2123

    
2124
  @classmethod
2125
  def SubmitJobToDrainedQueue(cls, ops):
2126
    """Forcefully create and store a new job.
2127

2128
    Do so, even if the job queue is drained.
2129

2130
    """
2131
    return luxi.Client(address=pathutils.QUERY_SOCKET)\
2132
        .SubmitJobToDrainedQueue(ops)
2133

    
2134
  @classmethod
2135
  def SubmitManyJobs(cls, jobs):
2136
    """Create and store multiple jobs.
2137

2138
    """
2139
    return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitManyJobs(jobs)
2140

    
2141
  @staticmethod
2142
  def _FormatSubmitError(msg, ops):
2143
    """Formats errors which occurred while submitting a job.
2144

2145
    """
2146
    return ("%s; opcodes %s" %
2147
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2148

    
2149
  @staticmethod
2150
  def _ResolveJobDependencies(resolve_fn, deps):
2151
    """Resolves relative job IDs in dependencies.
2152

2153
    @type resolve_fn: callable
2154
    @param resolve_fn: Function to resolve a relative job ID
2155
    @type deps: list
2156
    @param deps: Dependencies
2157
    @rtype: tuple; (boolean, string or list)
2158
    @return: If successful (first tuple item), the returned list contains
2159
      resolved job IDs along with the requested status; if not successful,
2160
      the second element is an error message
2161

2162
    """
2163
    result = []
2164

    
2165
    for (dep_job_id, dep_status) in deps:
2166
      if ht.TRelativeJobId(dep_job_id):
2167
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2168
        try:
2169
          job_id = resolve_fn(dep_job_id)
2170
        except IndexError:
2171
          # Abort
2172
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2173
      else:
2174
        job_id = dep_job_id
2175

    
2176
      result.append((job_id, dep_status))
2177

    
2178
    return (True, result)
2179

    
2180
  @locking.ssynchronized(_LOCK)
2181
  def _EnqueueJobs(self, jobs):
2182
    """Helper function to add jobs to worker pool's queue.
2183

2184
    @type jobs: list
2185
    @param jobs: List of all jobs
2186

2187
    """
2188
    return self._EnqueueJobsUnlocked(jobs)
2189

    
2190
  def _EnqueueJobsUnlocked(self, jobs):
2191
    """Helper function to add jobs to worker pool's queue.
2192

2193
    @type jobs: list
2194
    @param jobs: List of all jobs
2195

2196
    """
2197
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2198
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2199
                             priority=[job.CalcPriority() for job in jobs],
2200
                             task_id=map(_GetIdAttr, jobs))
2201

    
2202
  def _GetJobStatusForDependencies(self, job_id):
2203
    """Gets the status of a job for dependencies.
2204

2205
    @type job_id: int
2206
    @param job_id: Job ID
2207
    @raise errors.JobLost: If job can't be found
2208

2209
    """
2210
    # Not using in-memory cache as doing so would require an exclusive lock
2211

    
2212
    # Try to load from disk
2213
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2214

    
2215
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2216

    
2217
    if job:
2218
      return job.CalcStatus()
2219

    
2220
    raise errors.JobLost("Job %s not found" % job_id)
2221

    
2222
  @_RequireOpenQueue
2223
  def UpdateJobUnlocked(self, job, replicate=True):
2224
    """Update a job's on disk storage.
2225

2226
    After a job has been modified, this function needs to be called in
2227
    order to write the changes to disk and replicate them to the other
2228
    nodes.
2229

2230
    @type job: L{_QueuedJob}
2231
    @param job: the changed job
2232
    @type replicate: boolean
2233
    @param replicate: whether to replicate the change to remote nodes
2234

2235
    """
2236
    if __debug__:
2237
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2238
      assert (finalized ^ (job.end_timestamp is None))
2239
      assert job.writable, "Can't update read-only job"
2240
      assert not job.archived, "Can't update archived job"
2241

    
2242
    filename = self._GetJobPath(job.id)
2243
    data = serializer.DumpJson(job.Serialize())
2244
    logging.debug("Writing job %s to %s", job.id, filename)
2245
    self._UpdateJobQueueFile(filename, data, replicate)
2246

    
2247
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2248
                        timeout):
2249
    """Waits for changes in a job.
2250

2251
    @type job_id: int
2252
    @param job_id: Job identifier
2253
    @type fields: list of strings
2254
    @param fields: Which fields to check for changes
2255
    @type prev_job_info: list or None
2256
    @param prev_job_info: Last job information returned
2257
    @type prev_log_serial: int
2258
    @param prev_log_serial: Last job message serial number
2259
    @type timeout: float
2260
    @param timeout: maximum time to wait in seconds
2261
    @rtype: tuple (job info, log entries)
2262
    @return: a tuple of the job information as required via
2263
        the fields parameter, and the log entries as a list
2264

2265
        if the job has not changed and the timeout has expired,
2266
        we instead return a special value,
2267
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2268
        as such by the clients
2269

2270
    """
2271
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2272
                             writable=False)
2273

    
2274
    helper = _WaitForJobChangesHelper()
2275

    
2276
    return helper(self._GetJobPath(job_id), load_fn,
2277
                  fields, prev_job_info, prev_log_serial, timeout)
2278

    
2279
  @locking.ssynchronized(_LOCK)
2280
  @_RequireOpenQueue
2281
  def CancelJob(self, job_id):
2282
    """Cancels a job.
2283

2284
    This will only succeed if the job has not started yet.
2285

2286
    @type job_id: int
2287
    @param job_id: job ID of job to be cancelled.
2288

2289
    """
2290
    logging.info("Cancelling job %s", job_id)
2291

    
2292
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2293

    
2294
  @locking.ssynchronized(_LOCK)
2295
  @_RequireOpenQueue
2296
  def ChangeJobPriority(self, job_id, priority):
2297
    """Changes a job's priority.
2298

2299
    @type job_id: int
2300
    @param job_id: ID of the job whose priority should be changed
2301
    @type priority: int
2302
    @param priority: New priority
2303

2304
    """
2305
    logging.info("Changing priority of job %s to %s", job_id, priority)
2306

    
2307
    if priority not in constants.OP_PRIO_SUBMIT_VALID:
2308
      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2309
      raise errors.GenericError("Invalid priority %s, allowed are %s" %
2310
                                (priority, allowed))
2311

    
2312
    def fn(job):
2313
      (success, msg) = job.ChangePriority(priority)
2314

    
2315
      if success:
2316
        try:
2317
          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2318
        except workerpool.NoSuchTask:
2319
          logging.debug("Job %s is not in workerpool at this time", job.id)
2320

    
2321
      return (success, msg)
2322

    
2323
    return self._ModifyJobUnlocked(job_id, fn)
2324

    
2325
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2326
    """Modifies a job.
2327

2328
    @type job_id: int
2329
    @param job_id: Job ID
2330
    @type mod_fn: callable
2331
    @param mod_fn: Modifying function, receiving job object as parameter,
2332
      returning tuple of (status boolean, message string)
2333

2334
    """
2335
    job = self._LoadJobUnlocked(job_id)
2336
    if not job:
2337
      logging.debug("Job %s not found", job_id)
2338
      return (False, "Job %s not found" % job_id)
2339

    
2340
    assert job.writable, "Can't modify read-only job"
2341
    assert not job.archived, "Can't modify archived job"
2342

    
2343
    (success, msg) = mod_fn(job)
2344

    
2345
    if success:
2346
      # If the job was finalized (e.g. cancelled), this is the final write
2347
      # allowed. The job can be archived anytime.
2348
      self.UpdateJobUnlocked(job)
2349

    
2350
    return (success, msg)
2351

    
2352
  @_RequireOpenQueue
2353
  def _ArchiveJobsUnlocked(self, jobs):
2354
    """Archives jobs.
2355

2356
    @type jobs: list of L{_QueuedJob}
2357
    @param jobs: Job objects
2358
    @rtype: int
2359
    @return: Number of archived jobs
2360

2361
    """
2362
    archive_jobs = []
2363
    rename_files = []
2364
    for job in jobs:
2365
      assert job.writable, "Can't archive read-only job"
2366
      assert not job.archived, "Can't cancel archived job"
2367

    
2368
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2369
        logging.debug("Job %s is not yet done", job.id)
2370
        continue
2371

    
2372
      archive_jobs.append(job)
2373

    
2374
      old = self._GetJobPath(job.id)
2375
      new = self._GetArchivedJobPath(job.id)
2376
      rename_files.append((old, new))
2377

    
2378
    # TODO: What if 1..n files fail to rename?
2379
    self._RenameFilesUnlocked(rename_files)
2380

    
2381
    logging.debug("Successfully archived job(s) %s",
2382
                  utils.CommaJoin(job.id for job in archive_jobs))
2383

    
2384
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2385
    # the files, we update the cached queue size from the filesystem. When we
2386
    # get around to fix the TODO: above, we can use the number of actually
2387
    # archived jobs to fix this.
2388
    self._UpdateQueueSizeUnlocked()
2389
    return len(archive_jobs)
2390

    
2391
  @locking.ssynchronized(_LOCK)
2392
  @_RequireOpenQueue
2393
  def ArchiveJob(self, job_id):
2394
    """Archives a job.
2395

2396
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2397

2398
    @type job_id: int
2399
    @param job_id: Job ID of job to be archived.
2400
    @rtype: bool
2401
    @return: Whether job was archived
2402

2403
    """
2404
    logging.info("Archiving job %s", job_id)
2405

    
2406
    job = self._LoadJobUnlocked(job_id)
2407
    if not job:
2408
      logging.debug("Job %s not found", job_id)
2409
      return False
2410

    
2411
    return self._ArchiveJobsUnlocked([job]) == 1
2412

    
2413
  @locking.ssynchronized(_LOCK)
2414
  @_RequireOpenQueue
2415
  def AutoArchiveJobs(self, age, timeout):
2416
    """Archives all jobs based on age.
2417

2418
    The method will archive all jobs which are older than the age
2419
    parameter. For jobs that don't have an end timestamp, the start
2420
    timestamp will be considered. The special '-1' age will cause
2421
    archival of all jobs (that are not running or queued).
2422

2423
    @type age: int
2424
    @param age: the minimum age in seconds
2425

2426
    """
2427
    logging.info("Archiving jobs with age more than %s seconds", age)
2428

    
2429
    now = time.time()
2430
    end_time = now + timeout
2431
    archived_count = 0
2432
    last_touched = 0
2433

    
2434
    all_job_ids = self._GetJobIDsUnlocked()
2435
    pending = []
2436
    for idx, job_id in enumerate(all_job_ids):
2437
      last_touched = idx + 1
2438

    
2439
      # Not optimal because jobs could be pending
2440
      # TODO: Measure average duration for job archival and take number of
2441
      # pending jobs into account.
2442
      if time.time() > end_time:
2443
        break
2444

    
2445
      # Returns None if the job failed to load
2446
      job = self._LoadJobUnlocked(job_id)
2447
      if job:
2448
        if job.end_timestamp is None:
2449
          if job.start_timestamp is None:
2450
            job_age = job.received_timestamp
2451
          else:
2452
            job_age = job.start_timestamp
2453
        else:
2454
          job_age = job.end_timestamp
2455

    
2456
        if age == -1 or now - job_age[0] > age:
2457
          pending.append(job)
2458

    
2459
          # Archive 10 jobs at a time
2460
          if len(pending) >= 10:
2461
            archived_count += self._ArchiveJobsUnlocked(pending)
2462
            pending = []
2463

    
2464
    if pending:
2465
      archived_count += self._ArchiveJobsUnlocked(pending)
2466

    
2467
    return (archived_count, len(all_job_ids) - last_touched)
2468

    
2469
  def _Query(self, fields, qfilter):
2470
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2471
                       namefield="id")
2472

    
2473
    # Archived jobs are only looked at if the "archived" field is referenced
2474
    # either as a requested field or in the filter. By default archived jobs
2475
    # are ignored.
2476
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2477

    
2478
    job_ids = qobj.RequestedNames()
2479

    
2480
    list_all = (job_ids is None)
2481

    
2482
    if list_all:
2483
      # Since files are added to/removed from the queue atomically, there's no
2484
      # risk of getting the job ids in an inconsistent state.
2485
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2486

    
2487
    jobs = []
2488

    
2489
    for job_id in job_ids:
2490
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2491
      if job is not None or not list_all:
2492
        jobs.append((job_id, job))
2493

    
2494
    return (qobj, jobs, list_all)
2495

    
2496
  def QueryJobs(self, fields, qfilter):
2497
    """Returns a list of jobs in queue.
2498

2499
    @type fields: sequence
2500
    @param fields: List of wanted fields
2501
    @type qfilter: None or query2 filter (list)
2502
    @param qfilter: Query filter
2503

2504
    """
2505
    (qobj, ctx, _) = self._Query(fields, qfilter)
2506

    
2507
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2508

    
2509
  def OldStyleQueryJobs(self, job_ids, fields):
2510
    """Returns a list of jobs in queue.
2511

2512
    @type job_ids: list
2513
    @param job_ids: sequence of job identifiers or None for all
2514
    @type fields: list
2515
    @param fields: names of fields to return
2516
    @rtype: list
2517
    @return: list one element per job, each element being list with
2518
        the requested fields
2519

2520
    """
2521
    # backwards compat:
2522
    job_ids = [int(jid) for jid in job_ids]
2523
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2524

    
2525
    (qobj, ctx, _) = self._Query(fields, qfilter)
2526

    
2527
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2528

    
2529
  @locking.ssynchronized(_LOCK)
2530
  def PrepareShutdown(self):
2531
    """Prepare to stop the job queue.
2532

2533
    Disables execution of jobs in the workerpool and returns whether there are
2534
    any jobs currently running. If the latter is the case, the job queue is not
2535
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2536
    be called without interfering with any job. Queued and unfinished jobs will
2537
    be resumed next time.
2538

2539
    Once this function has been called no new job submissions will be accepted
2540
    (see L{_RequireNonDrainedQueue}).
2541

2542
    @rtype: bool
2543
    @return: Whether there are any running jobs
2544

2545
    """
2546
    if self._accepting_jobs:
2547
      self._accepting_jobs = False
2548

    
2549
      # Tell worker pool to stop processing pending tasks
2550
      self._wpool.SetActive(False)
2551

    
2552
    return self._wpool.HasRunningTasks()
2553

    
2554
  def AcceptingJobsUnlocked(self):
2555
    """Returns whether jobs are accepted.
2556

2557
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2558
    queue is shutting down.
2559

2560
    @rtype: bool
2561

2562
    """
2563
    return self._accepting_jobs
2564

    
2565
  @locking.ssynchronized(_LOCK)
2566
  @_RequireOpenQueue
2567
  def Shutdown(self):
2568
    """Stops the job queue.
2569

2570
    This shutdowns all the worker threads an closes the queue.
2571

2572
    """
2573
    self._wpool.TerminateWorkers()
2574

    
2575
    self._queue_filelock.Close()
2576
    self._queue_filelock = None