Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 1a2eb2dc

History | View | Annotate | Download (78.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import time
35
import weakref
36
import threading
37
import itertools
38
import operator
39

    
40
try:
41
  # pylint: disable=E0611
42
  from pyinotify import pyinotify
43
except ImportError:
44
  import pyinotify
45

    
46
from ganeti import asyncnotifier
47
from ganeti import constants
48
from ganeti import serializer
49
from ganeti import workerpool
50
from ganeti import locking
51
from ganeti import opcodes
52
from ganeti import errors
53
from ganeti import mcpu
54
from ganeti import utils
55
from ganeti import jstore
56
from ganeti import rpc
57
from ganeti import runtime
58
from ganeti import netutils
59
from ganeti import compat
60
from ganeti import ht
61
from ganeti import query
62
from ganeti import qlang
63
from ganeti import pathutils
64
from ganeti import vcluster
65

    
66

    
67
JOBQUEUE_THREADS = 25
68

    
69
# member lock names to be passed to @ssynchronized decorator
70
_LOCK = "_lock"
71
_QUEUE = "_queue"
72

    
73
#: Retrieves "id" attribute
74
_GetIdAttr = operator.attrgetter("id")
75

    
76

    
77
class CancelJob(Exception):
78
  """Special exception to cancel a job.
79

80
  """
81

    
82

    
83
class QueueShutdown(Exception):
84
  """Special exception to abort a job when the job queue is shutting down.
85

86
  """
87

    
88

    
89
def TimeStampNow():
90
  """Returns the current timestamp.
91

92
  @rtype: tuple
93
  @return: the current time in the (seconds, microseconds) format
94

95
  """
96
  return utils.SplitTime(time.time())
97

    
98

    
99
def _CallJqUpdate(runner, names, file_name, content):
100
  """Updates job queue file after virtualizing filename.
101

102
  """
103
  virt_file_name = vcluster.MakeVirtualPath(file_name)
104
  return runner.call_jobqueue_update(names, virt_file_name, content)
105

    
106

    
107
class _SimpleJobQuery:
108
  """Wrapper for job queries.
109

110
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
111

112
  """
113
  def __init__(self, fields):
114
    """Initializes this class.
115

116
    """
117
    self._query = query.Query(query.JOB_FIELDS, fields)
118

    
119
  def __call__(self, job):
120
    """Executes a job query using cached field list.
121

122
    """
123
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
124

    
125

    
126
class _QueuedOpCode(object):
127
  """Encapsulates an opcode object.
128

129
  @ivar log: holds the execution log and consists of tuples
130
  of the form C{(log_serial, timestamp, level, message)}
131
  @ivar input: the OpCode we encapsulate
132
  @ivar status: the current status
133
  @ivar result: the result of the LU execution
134
  @ivar start_timestamp: timestamp for the start of the execution
135
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
136
  @ivar stop_timestamp: timestamp for the end of the execution
137

138
  """
139
  __slots__ = ["input", "status", "result", "log", "priority",
140
               "start_timestamp", "exec_timestamp", "end_timestamp",
141
               "__weakref__"]
142

    
143
  def __init__(self, op):
144
    """Initializes instances of this class.
145

146
    @type op: L{opcodes.OpCode}
147
    @param op: the opcode we encapsulate
148

149
    """
150
    self.input = op
151
    self.status = constants.OP_STATUS_QUEUED
152
    self.result = None
153
    self.log = []
154
    self.start_timestamp = None
155
    self.exec_timestamp = None
156
    self.end_timestamp = None
157

    
158
    # Get initial priority (it might change during the lifetime of this opcode)
159
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
160

    
161
  @classmethod
162
  def Restore(cls, state):
163
    """Restore the _QueuedOpCode from the serialized form.
164

165
    @type state: dict
166
    @param state: the serialized state
167
    @rtype: _QueuedOpCode
168
    @return: a new _QueuedOpCode instance
169

170
    """
171
    obj = _QueuedOpCode.__new__(cls)
172
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
173
    obj.status = state["status"]
174
    obj.result = state["result"]
175
    obj.log = state["log"]
176
    obj.start_timestamp = state.get("start_timestamp", None)
177
    obj.exec_timestamp = state.get("exec_timestamp", None)
178
    obj.end_timestamp = state.get("end_timestamp", None)
179
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
180
    return obj
181

    
182
  def Serialize(self):
183
    """Serializes this _QueuedOpCode.
184

185
    @rtype: dict
186
    @return: the dictionary holding the serialized state
187

188
    """
189
    return {
190
      "input": self.input.__getstate__(),
191
      "status": self.status,
192
      "result": self.result,
193
      "log": self.log,
194
      "start_timestamp": self.start_timestamp,
195
      "exec_timestamp": self.exec_timestamp,
196
      "end_timestamp": self.end_timestamp,
197
      "priority": self.priority,
198
      }
199

    
200

    
201
class _QueuedJob(object):
202
  """In-memory job representation.
203

204
  This is what we use to track the user-submitted jobs. Locking must
205
  be taken care of by users of this class.
206

207
  @type queue: L{JobQueue}
208
  @ivar queue: the parent queue
209
  @ivar id: the job ID
210
  @type ops: list
211
  @ivar ops: the list of _QueuedOpCode that constitute the job
212
  @type log_serial: int
213
  @ivar log_serial: holds the index for the next log entry
214
  @ivar received_timestamp: the timestamp for when the job was received
215
  @ivar start_timestmap: the timestamp for start of execution
216
  @ivar end_timestamp: the timestamp for end of execution
217
  @ivar writable: Whether the job is allowed to be modified
218

219
  """
220
  # pylint: disable=W0212
221
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
222
               "received_timestamp", "start_timestamp", "end_timestamp",
223
               "__weakref__", "processor_lock", "writable", "archived"]
224

    
225
  def __init__(self, queue, job_id, ops, writable):
226
    """Constructor for the _QueuedJob.
227

228
    @type queue: L{JobQueue}
229
    @param queue: our parent queue
230
    @type job_id: job_id
231
    @param job_id: our job id
232
    @type ops: list
233
    @param ops: the list of opcodes we hold, which will be encapsulated
234
        in _QueuedOpCodes
235
    @type writable: bool
236
    @param writable: Whether job can be modified
237

238
    """
239
    if not ops:
240
      raise errors.GenericError("A job needs at least one opcode")
241

    
242
    self.queue = queue
243
    self.id = int(job_id)
244
    self.ops = [_QueuedOpCode(op) for op in ops]
245
    self.log_serial = 0
246
    self.received_timestamp = TimeStampNow()
247
    self.start_timestamp = None
248
    self.end_timestamp = None
249
    self.archived = False
250

    
251
    self._InitInMemory(self, writable)
252

    
253
    assert not self.archived, "New jobs can not be marked as archived"
254

    
255
  @staticmethod
256
  def _InitInMemory(obj, writable):
257
    """Initializes in-memory variables.
258

259
    """
260
    obj.writable = writable
261
    obj.ops_iter = None
262
    obj.cur_opctx = None
263

    
264
    # Read-only jobs are not processed and therefore don't need a lock
265
    if writable:
266
      obj.processor_lock = threading.Lock()
267
    else:
268
      obj.processor_lock = None
269

    
270
  def __repr__(self):
271
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
272
              "id=%s" % self.id,
273
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
274

    
275
    return "<%s at %#x>" % (" ".join(status), id(self))
276

    
277
  @classmethod
278
  def Restore(cls, queue, state, writable, archived):
279
    """Restore a _QueuedJob from serialized state:
280

281
    @type queue: L{JobQueue}
282
    @param queue: to which queue the restored job belongs
283
    @type state: dict
284
    @param state: the serialized state
285
    @type writable: bool
286
    @param writable: Whether job can be modified
287
    @type archived: bool
288
    @param archived: Whether job was already archived
289
    @rtype: _JobQueue
290
    @return: the restored _JobQueue instance
291

292
    """
293
    obj = _QueuedJob.__new__(cls)
294
    obj.queue = queue
295
    obj.id = int(state["id"])
296
    obj.received_timestamp = state.get("received_timestamp", None)
297
    obj.start_timestamp = state.get("start_timestamp", None)
298
    obj.end_timestamp = state.get("end_timestamp", None)
299
    obj.archived = archived
300

    
301
    obj.ops = []
302
    obj.log_serial = 0
303
    for op_state in state["ops"]:
304
      op = _QueuedOpCode.Restore(op_state)
305
      for log_entry in op.log:
306
        obj.log_serial = max(obj.log_serial, log_entry[0])
307
      obj.ops.append(op)
308

    
309
    cls._InitInMemory(obj, writable)
310

    
311
    return obj
312

    
313
  def Serialize(self):
314
    """Serialize the _JobQueue instance.
315

316
    @rtype: dict
317
    @return: the serialized state
318

319
    """
320
    return {
321
      "id": self.id,
322
      "ops": [op.Serialize() for op in self.ops],
323
      "start_timestamp": self.start_timestamp,
324
      "end_timestamp": self.end_timestamp,
325
      "received_timestamp": self.received_timestamp,
326
      }
327

    
328
  def CalcStatus(self):
329
    """Compute the status of this job.
330

331
    This function iterates over all the _QueuedOpCodes in the job and
332
    based on their status, computes the job status.
333

334
    The algorithm is:
335
      - if we find a cancelled, or finished with error, the job
336
        status will be the same
337
      - otherwise, the last opcode with the status one of:
338
          - waitlock
339
          - canceling
340
          - running
341

342
        will determine the job status
343

344
      - otherwise, it means either all opcodes are queued, or success,
345
        and the job status will be the same
346

347
    @return: the job status
348

349
    """
350
    status = constants.JOB_STATUS_QUEUED
351

    
352
    all_success = True
353
    for op in self.ops:
354
      if op.status == constants.OP_STATUS_SUCCESS:
355
        continue
356

    
357
      all_success = False
358

    
359
      if op.status == constants.OP_STATUS_QUEUED:
360
        pass
361
      elif op.status == constants.OP_STATUS_WAITING:
362
        status = constants.JOB_STATUS_WAITING
363
      elif op.status == constants.OP_STATUS_RUNNING:
364
        status = constants.JOB_STATUS_RUNNING
365
      elif op.status == constants.OP_STATUS_CANCELING:
366
        status = constants.JOB_STATUS_CANCELING
367
        break
368
      elif op.status == constants.OP_STATUS_ERROR:
369
        status = constants.JOB_STATUS_ERROR
370
        # The whole job fails if one opcode failed
371
        break
372
      elif op.status == constants.OP_STATUS_CANCELED:
373
        status = constants.OP_STATUS_CANCELED
374
        break
375

    
376
    if all_success:
377
      status = constants.JOB_STATUS_SUCCESS
378

    
379
    return status
380

    
381
  def CalcPriority(self):
382
    """Gets the current priority for this job.
383

384
    Only unfinished opcodes are considered. When all are done, the default
385
    priority is used.
386

387
    @rtype: int
388

389
    """
390
    priorities = [op.priority for op in self.ops
391
                  if op.status not in constants.OPS_FINALIZED]
392

    
393
    if not priorities:
394
      # All opcodes are done, assume default priority
395
      return constants.OP_PRIO_DEFAULT
396

    
397
    return min(priorities)
398

    
399
  def GetLogEntries(self, newer_than):
400
    """Selectively returns the log entries.
401

402
    @type newer_than: None or int
403
    @param newer_than: if this is None, return all log entries,
404
        otherwise return only the log entries with serial higher
405
        than this value
406
    @rtype: list
407
    @return: the list of the log entries selected
408

409
    """
410
    if newer_than is None:
411
      serial = -1
412
    else:
413
      serial = newer_than
414

    
415
    entries = []
416
    for op in self.ops:
417
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
418

    
419
    return entries
420

    
421
  def GetInfo(self, fields):
422
    """Returns information about a job.
423

424
    @type fields: list
425
    @param fields: names of fields to return
426
    @rtype: list
427
    @return: list with one element for each field
428
    @raise errors.OpExecError: when an invalid field
429
        has been passed
430

431
    """
432
    return _SimpleJobQuery(fields)(self)
433

    
434
  def MarkUnfinishedOps(self, status, result):
435
    """Mark unfinished opcodes with a given status and result.
436

437
    This is an utility function for marking all running or waiting to
438
    be run opcodes with a given status. Opcodes which are already
439
    finalised are not changed.
440

441
    @param status: a given opcode status
442
    @param result: the opcode result
443

444
    """
445
    not_marked = True
446
    for op in self.ops:
447
      if op.status in constants.OPS_FINALIZED:
448
        assert not_marked, "Finalized opcodes found after non-finalized ones"
449
        continue
450
      op.status = status
451
      op.result = result
452
      not_marked = False
453

    
454
  def Finalize(self):
455
    """Marks the job as finalized.
456

457
    """
458
    self.end_timestamp = TimeStampNow()
459

    
460
  def Cancel(self):
461
    """Marks job as canceled/-ing if possible.
462

463
    @rtype: tuple; (bool, string)
464
    @return: Boolean describing whether job was successfully canceled or marked
465
      as canceling and a text message
466

467
    """
468
    status = self.CalcStatus()
469

    
470
    if status == constants.JOB_STATUS_QUEUED:
471
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
472
                             "Job canceled by request")
473
      self.Finalize()
474
      return (True, "Job %s canceled" % self.id)
475

    
476
    elif status == constants.JOB_STATUS_WAITING:
477
      # The worker will notice the new status and cancel the job
478
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
479
      return (True, "Job %s will be canceled" % self.id)
480

    
481
    else:
482
      logging.debug("Job %s is no longer waiting in the queue", self.id)
483
      return (False, "Job %s is no longer waiting in the queue" % self.id)
484

    
485
  def ChangePriority(self, priority):
486
    """Changes the job priority.
487

488
    @type priority: int
489
    @param priority: New priority
490
    @rtype: tuple; (bool, string)
491
    @return: Boolean describing whether job's priority was successfully changed
492
      and a text message
493

494
    """
495
    status = self.CalcStatus()
496

    
497
    if status in constants.JOBS_FINALIZED:
498
      return (False, "Job %s is finished" % self.id)
499
    elif status == constants.JOB_STATUS_CANCELING:
500
      return (False, "Job %s is cancelling" % self.id)
501
    else:
502
      assert status in (constants.JOB_STATUS_QUEUED,
503
                        constants.JOB_STATUS_WAITING,
504
                        constants.JOB_STATUS_RUNNING)
505

    
506
      changed = False
507
      for op in self.ops:
508
        if (op.status == constants.OP_STATUS_RUNNING or
509
            op.status in constants.OPS_FINALIZED):
510
          assert not changed, \
511
            ("Found opcode for which priority should not be changed after"
512
             " priority has been changed for previous opcodes")
513
          continue
514

    
515
        assert op.status in (constants.OP_STATUS_QUEUED,
516
                             constants.OP_STATUS_WAITING)
517

    
518
        changed = True
519

    
520
        # Note: this also changes the on-disk priority ("op.priority" is only in
521
        # memory)
522
        op.input.priority = priority
523
        op.priority = priority
524

    
525
      if changed:
526
        return (True, ("Priorities of pending opcodes for job %s have been"
527
                       " changed to %s" % (self.id, priority)))
528
      else:
529
        return (False, "Job %s had no pending opcodes" % self.id)
530

    
531

    
532
class _OpExecCallbacks(mcpu.OpExecCbBase):
533
  def __init__(self, queue, job, op):
534
    """Initializes this class.
535

536
    @type queue: L{JobQueue}
537
    @param queue: Job queue
538
    @type job: L{_QueuedJob}
539
    @param job: Job object
540
    @type op: L{_QueuedOpCode}
541
    @param op: OpCode
542

543
    """
544
    assert queue, "Queue is missing"
545
    assert job, "Job is missing"
546
    assert op, "Opcode is missing"
547

    
548
    self._queue = queue
549
    self._job = job
550
    self._op = op
551

    
552
  def _CheckCancel(self):
553
    """Raises an exception to cancel the job if asked to.
554

555
    """
556
    # Cancel here if we were asked to
557
    if self._op.status == constants.OP_STATUS_CANCELING:
558
      logging.debug("Canceling opcode")
559
      raise CancelJob()
560

    
561
    # See if queue is shutting down
562
    if not self._queue.AcceptingJobsUnlocked():
563
      logging.debug("Queue is shutting down")
564
      raise QueueShutdown()
565

    
566
  @locking.ssynchronized(_QUEUE, shared=1)
567
  def NotifyStart(self):
568
    """Mark the opcode as running, not lock-waiting.
569

570
    This is called from the mcpu code as a notifier function, when the LU is
571
    finally about to start the Exec() method. Of course, to have end-user
572
    visible results, the opcode must be initially (before calling into
573
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
574

575
    """
576
    assert self._op in self._job.ops
577
    assert self._op.status in (constants.OP_STATUS_WAITING,
578
                               constants.OP_STATUS_CANCELING)
579

    
580
    # Cancel here if we were asked to
581
    self._CheckCancel()
582

    
583
    logging.debug("Opcode is now running")
584

    
585
    self._op.status = constants.OP_STATUS_RUNNING
586
    self._op.exec_timestamp = TimeStampNow()
587

    
588
    # And finally replicate the job status
589
    self._queue.UpdateJobUnlocked(self._job)
590

    
591
  @locking.ssynchronized(_QUEUE, shared=1)
592
  def _AppendFeedback(self, timestamp, log_type, log_msg):
593
    """Internal feedback append function, with locks
594

595
    """
596
    self._job.log_serial += 1
597
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
598
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
599

    
600
  def Feedback(self, *args):
601
    """Append a log entry.
602

603
    """
604
    assert len(args) < 3
605

    
606
    if len(args) == 1:
607
      log_type = constants.ELOG_MESSAGE
608
      log_msg = args[0]
609
    else:
610
      (log_type, log_msg) = args
611

    
612
    # The time is split to make serialization easier and not lose
613
    # precision.
614
    timestamp = utils.SplitTime(time.time())
615
    self._AppendFeedback(timestamp, log_type, log_msg)
616

    
617
  def CurrentPriority(self):
618
    """Returns current priority for opcode.
619

620
    """
621
    assert self._op.status in (constants.OP_STATUS_WAITING,
622
                               constants.OP_STATUS_CANCELING)
623

    
624
    # Cancel here if we were asked to
625
    self._CheckCancel()
626

    
627
    return self._op.priority
628

    
629
  def SubmitManyJobs(self, jobs):
630
    """Submits jobs for processing.
631

632
    See L{JobQueue.SubmitManyJobs}.
633

634
    """
635
    # Locking is done in job queue
636
    return self._queue.SubmitManyJobs(jobs)
637

    
638

    
639
class _JobChangesChecker(object):
640
  def __init__(self, fields, prev_job_info, prev_log_serial):
641
    """Initializes this class.
642

643
    @type fields: list of strings
644
    @param fields: Fields requested by LUXI client
645
    @type prev_job_info: string
646
    @param prev_job_info: previous job info, as passed by the LUXI client
647
    @type prev_log_serial: string
648
    @param prev_log_serial: previous job serial, as passed by the LUXI client
649

650
    """
651
    self._squery = _SimpleJobQuery(fields)
652
    self._prev_job_info = prev_job_info
653
    self._prev_log_serial = prev_log_serial
654

    
655
  def __call__(self, job):
656
    """Checks whether job has changed.
657

658
    @type job: L{_QueuedJob}
659
    @param job: Job object
660

661
    """
662
    assert not job.writable, "Expected read-only job"
663

    
664
    status = job.CalcStatus()
665
    job_info = self._squery(job)
666
    log_entries = job.GetLogEntries(self._prev_log_serial)
667

    
668
    # Serializing and deserializing data can cause type changes (e.g. from
669
    # tuple to list) or precision loss. We're doing it here so that we get
670
    # the same modifications as the data received from the client. Without
671
    # this, the comparison afterwards might fail without the data being
672
    # significantly different.
673
    # TODO: we just deserialized from disk, investigate how to make sure that
674
    # the job info and log entries are compatible to avoid this further step.
675
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
676
    # efficient, though floats will be tricky
677
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
678
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
679

    
680
    # Don't even try to wait if the job is no longer running, there will be
681
    # no changes.
682
    if (status not in (constants.JOB_STATUS_QUEUED,
683
                       constants.JOB_STATUS_RUNNING,
684
                       constants.JOB_STATUS_WAITING) or
685
        job_info != self._prev_job_info or
686
        (log_entries and self._prev_log_serial != log_entries[0][0])):
687
      logging.debug("Job %s changed", job.id)
688
      return (job_info, log_entries)
689

    
690
    return None
691

    
692

    
693
class _JobFileChangesWaiter(object):
694
  def __init__(self, filename):
695
    """Initializes this class.
696

697
    @type filename: string
698
    @param filename: Path to job file
699
    @raises errors.InotifyError: if the notifier cannot be setup
700

701
    """
702
    self._wm = pyinotify.WatchManager()
703
    self._inotify_handler = \
704
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
705
    self._notifier = \
706
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
707
    try:
708
      self._inotify_handler.enable()
709
    except Exception:
710
      # pyinotify doesn't close file descriptors automatically
711
      self._notifier.stop()
712
      raise
713

    
714
  def _OnInotify(self, notifier_enabled):
715
    """Callback for inotify.
716

717
    """
718
    if not notifier_enabled:
719
      self._inotify_handler.enable()
720

    
721
  def Wait(self, timeout):
722
    """Waits for the job file to change.
723

724
    @type timeout: float
725
    @param timeout: Timeout in seconds
726
    @return: Whether there have been events
727

728
    """
729
    assert timeout >= 0
730
    have_events = self._notifier.check_events(timeout * 1000)
731
    if have_events:
732
      self._notifier.read_events()
733
    self._notifier.process_events()
734
    return have_events
735

    
736
  def Close(self):
737
    """Closes underlying notifier and its file descriptor.
738

739
    """
740
    self._notifier.stop()
741

    
742

    
743
class _JobChangesWaiter(object):
744
  def __init__(self, filename):
745
    """Initializes this class.
746

747
    @type filename: string
748
    @param filename: Path to job file
749

750
    """
751
    self._filewaiter = None
752
    self._filename = filename
753

    
754
  def Wait(self, timeout):
755
    """Waits for a job to change.
756

757
    @type timeout: float
758
    @param timeout: Timeout in seconds
759
    @return: Whether there have been events
760

761
    """
762
    if self._filewaiter:
763
      return self._filewaiter.Wait(timeout)
764

    
765
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
766
    # If this point is reached, return immediately and let caller check the job
767
    # file again in case there were changes since the last check. This avoids a
768
    # race condition.
769
    self._filewaiter = _JobFileChangesWaiter(self._filename)
770

    
771
    return True
772

    
773
  def Close(self):
774
    """Closes underlying waiter.
775

776
    """
777
    if self._filewaiter:
778
      self._filewaiter.Close()
779

    
780

    
781
class _WaitForJobChangesHelper(object):
782
  """Helper class using inotify to wait for changes in a job file.
783

784
  This class takes a previous job status and serial, and alerts the client when
785
  the current job status has changed.
786

787
  """
788
  @staticmethod
789
  def _CheckForChanges(counter, job_load_fn, check_fn):
790
    if counter.next() > 0:
791
      # If this isn't the first check the job is given some more time to change
792
      # again. This gives better performance for jobs generating many
793
      # changes/messages.
794
      time.sleep(0.1)
795

    
796
    job = job_load_fn()
797
    if not job:
798
      raise errors.JobLost()
799

    
800
    result = check_fn(job)
801
    if result is None:
802
      raise utils.RetryAgain()
803

    
804
    return result
805

    
806
  def __call__(self, filename, job_load_fn,
807
               fields, prev_job_info, prev_log_serial, timeout):
808
    """Waits for changes on a job.
809

810
    @type filename: string
811
    @param filename: File on which to wait for changes
812
    @type job_load_fn: callable
813
    @param job_load_fn: Function to load job
814
    @type fields: list of strings
815
    @param fields: Which fields to check for changes
816
    @type prev_job_info: list or None
817
    @param prev_job_info: Last job information returned
818
    @type prev_log_serial: int
819
    @param prev_log_serial: Last job message serial number
820
    @type timeout: float
821
    @param timeout: maximum time to wait in seconds
822

823
    """
824
    counter = itertools.count()
825
    try:
826
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
827
      waiter = _JobChangesWaiter(filename)
828
      try:
829
        return utils.Retry(compat.partial(self._CheckForChanges,
830
                                          counter, job_load_fn, check_fn),
831
                           utils.RETRY_REMAINING_TIME, timeout,
832
                           wait_fn=waiter.Wait)
833
      finally:
834
        waiter.Close()
835
    except (errors.InotifyError, errors.JobLost):
836
      return None
837
    except utils.RetryTimeout:
838
      return constants.JOB_NOTCHANGED
839

    
840

    
841
def _EncodeOpError(err):
842
  """Encodes an error which occurred while processing an opcode.
843

844
  """
845
  if isinstance(err, errors.GenericError):
846
    to_encode = err
847
  else:
848
    to_encode = errors.OpExecError(str(err))
849

    
850
  return errors.EncodeException(to_encode)
851

    
852

    
853
class _TimeoutStrategyWrapper:
854
  def __init__(self, fn):
855
    """Initializes this class.
856

857
    """
858
    self._fn = fn
859
    self._next = None
860

    
861
  def _Advance(self):
862
    """Gets the next timeout if necessary.
863

864
    """
865
    if self._next is None:
866
      self._next = self._fn()
867

    
868
  def Peek(self):
869
    """Returns the next timeout.
870

871
    """
872
    self._Advance()
873
    return self._next
874

    
875
  def Next(self):
876
    """Returns the current timeout and advances the internal state.
877

878
    """
879
    self._Advance()
880
    result = self._next
881
    self._next = None
882
    return result
883

    
884

    
885
class _OpExecContext:
886
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
887
    """Initializes this class.
888

889
    """
890
    self.op = op
891
    self.index = index
892
    self.log_prefix = log_prefix
893
    self.summary = op.input.Summary()
894

    
895
    # Create local copy to modify
896
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
897
      self.jobdeps = op.input.depends[:]
898
    else:
899
      self.jobdeps = None
900

    
901
    self._timeout_strategy_factory = timeout_strategy_factory
902
    self._ResetTimeoutStrategy()
903

    
904
  def _ResetTimeoutStrategy(self):
905
    """Creates a new timeout strategy.
906

907
    """
908
    self._timeout_strategy = \
909
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
910

    
911
  def CheckPriorityIncrease(self):
912
    """Checks whether priority can and should be increased.
913

914
    Called when locks couldn't be acquired.
915

916
    """
917
    op = self.op
918

    
919
    # Exhausted all retries and next round should not use blocking acquire
920
    # for locks?
921
    if (self._timeout_strategy.Peek() is None and
922
        op.priority > constants.OP_PRIO_HIGHEST):
923
      logging.debug("Increasing priority")
924
      op.priority -= 1
925
      self._ResetTimeoutStrategy()
926
      return True
927

    
928
    return False
929

    
930
  def GetNextLockTimeout(self):
931
    """Returns the next lock acquire timeout.
932

933
    """
934
    return self._timeout_strategy.Next()
935

    
936

    
937
class _JobProcessor(object):
938
  (DEFER,
939
   WAITDEP,
940
   FINISHED) = range(1, 4)
941

    
942
  def __init__(self, queue, opexec_fn, job,
943
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
944
    """Initializes this class.
945

946
    """
947
    self.queue = queue
948
    self.opexec_fn = opexec_fn
949
    self.job = job
950
    self._timeout_strategy_factory = _timeout_strategy_factory
951

    
952
  @staticmethod
953
  def _FindNextOpcode(job, timeout_strategy_factory):
954
    """Locates the next opcode to run.
955

956
    @type job: L{_QueuedJob}
957
    @param job: Job object
958
    @param timeout_strategy_factory: Callable to create new timeout strategy
959

960
    """
961
    # Create some sort of a cache to speed up locating next opcode for future
962
    # lookups
963
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
964
    # pending and one for processed ops.
965
    if job.ops_iter is None:
966
      job.ops_iter = enumerate(job.ops)
967

    
968
    # Find next opcode to run
969
    while True:
970
      try:
971
        (idx, op) = job.ops_iter.next()
972
      except StopIteration:
973
        raise errors.ProgrammerError("Called for a finished job")
974

    
975
      if op.status == constants.OP_STATUS_RUNNING:
976
        # Found an opcode already marked as running
977
        raise errors.ProgrammerError("Called for job marked as running")
978

    
979
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
980
                             timeout_strategy_factory)
981

    
982
      if op.status not in constants.OPS_FINALIZED:
983
        return opctx
984

    
985
      # This is a job that was partially completed before master daemon
986
      # shutdown, so it can be expected that some opcodes are already
987
      # completed successfully (if any did error out, then the whole job
988
      # should have been aborted and not resubmitted for processing).
989
      logging.info("%s: opcode %s already processed, skipping",
990
                   opctx.log_prefix, opctx.summary)
991

    
992
  @staticmethod
993
  def _MarkWaitlock(job, op):
994
    """Marks an opcode as waiting for locks.
995

996
    The job's start timestamp is also set if necessary.
997

998
    @type job: L{_QueuedJob}
999
    @param job: Job object
1000
    @type op: L{_QueuedOpCode}
1001
    @param op: Opcode object
1002

1003
    """
1004
    assert op in job.ops
1005
    assert op.status in (constants.OP_STATUS_QUEUED,
1006
                         constants.OP_STATUS_WAITING)
1007

    
1008
    update = False
1009

    
1010
    op.result = None
1011

    
1012
    if op.status == constants.OP_STATUS_QUEUED:
1013
      op.status = constants.OP_STATUS_WAITING
1014
      update = True
1015

    
1016
    if op.start_timestamp is None:
1017
      op.start_timestamp = TimeStampNow()
1018
      update = True
1019

    
1020
    if job.start_timestamp is None:
1021
      job.start_timestamp = op.start_timestamp
1022
      update = True
1023

    
1024
    assert op.status == constants.OP_STATUS_WAITING
1025

    
1026
    return update
1027

    
1028
  @staticmethod
1029
  def _CheckDependencies(queue, job, opctx):
1030
    """Checks if an opcode has dependencies and if so, processes them.
1031

1032
    @type queue: L{JobQueue}
1033
    @param queue: Queue object
1034
    @type job: L{_QueuedJob}
1035
    @param job: Job object
1036
    @type opctx: L{_OpExecContext}
1037
    @param opctx: Opcode execution context
1038
    @rtype: bool
1039
    @return: Whether opcode will be re-scheduled by dependency tracker
1040

1041
    """
1042
    op = opctx.op
1043

    
1044
    result = False
1045

    
1046
    while opctx.jobdeps:
1047
      (dep_job_id, dep_status) = opctx.jobdeps[0]
1048

    
1049
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1050
                                                          dep_status)
1051
      assert ht.TNonEmptyString(depmsg), "No dependency message"
1052

    
1053
      logging.info("%s: %s", opctx.log_prefix, depmsg)
1054

    
1055
      if depresult == _JobDependencyManager.CONTINUE:
1056
        # Remove dependency and continue
1057
        opctx.jobdeps.pop(0)
1058

    
1059
      elif depresult == _JobDependencyManager.WAIT:
1060
        # Need to wait for notification, dependency tracker will re-add job
1061
        # to workerpool
1062
        result = True
1063
        break
1064

    
1065
      elif depresult == _JobDependencyManager.CANCEL:
1066
        # Job was cancelled, cancel this job as well
1067
        job.Cancel()
1068
        assert op.status == constants.OP_STATUS_CANCELING
1069
        break
1070

    
1071
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1072
                         _JobDependencyManager.ERROR):
1073
        # Job failed or there was an error, this job must fail
1074
        op.status = constants.OP_STATUS_ERROR
1075
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1076
        break
1077

    
1078
      else:
1079
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1080
                                     depresult)
1081

    
1082
    return result
1083

    
1084
  def _ExecOpCodeUnlocked(self, opctx):
1085
    """Processes one opcode and returns the result.
1086

1087
    """
1088
    op = opctx.op
1089

    
1090
    assert op.status == constants.OP_STATUS_WAITING
1091

    
1092
    timeout = opctx.GetNextLockTimeout()
1093

    
1094
    try:
1095
      # Make sure not to hold queue lock while calling ExecOpCode
1096
      result = self.opexec_fn(op.input,
1097
                              _OpExecCallbacks(self.queue, self.job, op),
1098
                              timeout=timeout)
1099
    except mcpu.LockAcquireTimeout:
1100
      assert timeout is not None, "Received timeout for blocking acquire"
1101
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1102

    
1103
      assert op.status in (constants.OP_STATUS_WAITING,
1104
                           constants.OP_STATUS_CANCELING)
1105

    
1106
      # Was job cancelled while we were waiting for the lock?
1107
      if op.status == constants.OP_STATUS_CANCELING:
1108
        return (constants.OP_STATUS_CANCELING, None)
1109

    
1110
      # Queue is shutting down, return to queued
1111
      if not self.queue.AcceptingJobsUnlocked():
1112
        return (constants.OP_STATUS_QUEUED, None)
1113

    
1114
      # Stay in waitlock while trying to re-acquire lock
1115
      return (constants.OP_STATUS_WAITING, None)
1116
    except CancelJob:
1117
      logging.exception("%s: Canceling job", opctx.log_prefix)
1118
      assert op.status == constants.OP_STATUS_CANCELING
1119
      return (constants.OP_STATUS_CANCELING, None)
1120

    
1121
    except QueueShutdown:
1122
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1123

    
1124
      assert op.status == constants.OP_STATUS_WAITING
1125

    
1126
      # Job hadn't been started yet, so it should return to the queue
1127
      return (constants.OP_STATUS_QUEUED, None)
1128

    
1129
    except Exception, err: # pylint: disable=W0703
1130
      logging.exception("%s: Caught exception in %s",
1131
                        opctx.log_prefix, opctx.summary)
1132
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1133
    else:
1134
      logging.debug("%s: %s successful",
1135
                    opctx.log_prefix, opctx.summary)
1136
      return (constants.OP_STATUS_SUCCESS, result)
1137

    
1138
  def __call__(self, _nextop_fn=None):
1139
    """Continues execution of a job.
1140

1141
    @param _nextop_fn: Callback function for tests
1142
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1143
      be deferred and C{WAITDEP} if the dependency manager
1144
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1145

1146
    """
1147
    queue = self.queue
1148
    job = self.job
1149

    
1150
    logging.debug("Processing job %s", job.id)
1151

    
1152
    queue.acquire(shared=1)
1153
    try:
1154
      opcount = len(job.ops)
1155

    
1156
      assert job.writable, "Expected writable job"
1157

    
1158
      # Don't do anything for finalized jobs
1159
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1160
        return self.FINISHED
1161

    
1162
      # Is a previous opcode still pending?
1163
      if job.cur_opctx:
1164
        opctx = job.cur_opctx
1165
        job.cur_opctx = None
1166
      else:
1167
        if __debug__ and _nextop_fn:
1168
          _nextop_fn()
1169
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1170

    
1171
      op = opctx.op
1172

    
1173
      # Consistency check
1174
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1175
                                     constants.OP_STATUS_CANCELING)
1176
                        for i in job.ops[opctx.index + 1:])
1177

    
1178
      assert op.status in (constants.OP_STATUS_QUEUED,
1179
                           constants.OP_STATUS_WAITING,
1180
                           constants.OP_STATUS_CANCELING)
1181

    
1182
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1183
              op.priority >= constants.OP_PRIO_HIGHEST)
1184

    
1185
      waitjob = None
1186

    
1187
      if op.status != constants.OP_STATUS_CANCELING:
1188
        assert op.status in (constants.OP_STATUS_QUEUED,
1189
                             constants.OP_STATUS_WAITING)
1190

    
1191
        # Prepare to start opcode
1192
        if self._MarkWaitlock(job, op):
1193
          # Write to disk
1194
          queue.UpdateJobUnlocked(job)
1195

    
1196
        assert op.status == constants.OP_STATUS_WAITING
1197
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1198
        assert job.start_timestamp and op.start_timestamp
1199
        assert waitjob is None
1200

    
1201
        # Check if waiting for a job is necessary
1202
        waitjob = self._CheckDependencies(queue, job, opctx)
1203

    
1204
        assert op.status in (constants.OP_STATUS_WAITING,
1205
                             constants.OP_STATUS_CANCELING,
1206
                             constants.OP_STATUS_ERROR)
1207

    
1208
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1209
                                         constants.OP_STATUS_ERROR)):
1210
          logging.info("%s: opcode %s waiting for locks",
1211
                       opctx.log_prefix, opctx.summary)
1212

    
1213
          assert not opctx.jobdeps, "Not all dependencies were removed"
1214

    
1215
          queue.release()
1216
          try:
1217
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1218
          finally:
1219
            queue.acquire(shared=1)
1220

    
1221
          op.status = op_status
1222
          op.result = op_result
1223

    
1224
          assert not waitjob
1225

    
1226
        if op.status in (constants.OP_STATUS_WAITING,
1227
                         constants.OP_STATUS_QUEUED):
1228
          # waiting: Couldn't get locks in time
1229
          # queued: Queue is shutting down
1230
          assert not op.end_timestamp
1231
        else:
1232
          # Finalize opcode
1233
          op.end_timestamp = TimeStampNow()
1234

    
1235
          if op.status == constants.OP_STATUS_CANCELING:
1236
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1237
                                  for i in job.ops[opctx.index:])
1238
          else:
1239
            assert op.status in constants.OPS_FINALIZED
1240

    
1241
      if op.status == constants.OP_STATUS_QUEUED:
1242
        # Queue is shutting down
1243
        assert not waitjob
1244

    
1245
        finalize = False
1246

    
1247
        # Reset context
1248
        job.cur_opctx = None
1249

    
1250
        # In no case must the status be finalized here
1251
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1252

    
1253
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1254
        finalize = False
1255

    
1256
        if not waitjob and opctx.CheckPriorityIncrease():
1257
          # Priority was changed, need to update on-disk file
1258
          queue.UpdateJobUnlocked(job)
1259

    
1260
        # Keep around for another round
1261
        job.cur_opctx = opctx
1262

    
1263
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1264
                op.priority >= constants.OP_PRIO_HIGHEST)
1265

    
1266
        # In no case must the status be finalized here
1267
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1268

    
1269
      else:
1270
        # Ensure all opcodes so far have been successful
1271
        assert (opctx.index == 0 or
1272
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1273
                           for i in job.ops[:opctx.index]))
1274

    
1275
        # Reset context
1276
        job.cur_opctx = None
1277

    
1278
        if op.status == constants.OP_STATUS_SUCCESS:
1279
          finalize = False
1280

    
1281
        elif op.status == constants.OP_STATUS_ERROR:
1282
          # Ensure failed opcode has an exception as its result
1283
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1284

    
1285
          to_encode = errors.OpExecError("Preceding opcode failed")
1286
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1287
                                _EncodeOpError(to_encode))
1288
          finalize = True
1289

    
1290
          # Consistency check
1291
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1292
                            errors.GetEncodedError(i.result)
1293
                            for i in job.ops[opctx.index:])
1294

    
1295
        elif op.status == constants.OP_STATUS_CANCELING:
1296
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1297
                                "Job canceled by request")
1298
          finalize = True
1299

    
1300
        else:
1301
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1302

    
1303
        if opctx.index == (opcount - 1):
1304
          # Finalize on last opcode
1305
          finalize = True
1306

    
1307
        if finalize:
1308
          # All opcodes have been run, finalize job
1309
          job.Finalize()
1310

    
1311
        # Write to disk. If the job status is final, this is the final write
1312
        # allowed. Once the file has been written, it can be archived anytime.
1313
        queue.UpdateJobUnlocked(job)
1314

    
1315
        assert not waitjob
1316

    
1317
        if finalize:
1318
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1319
          return self.FINISHED
1320

    
1321
      assert not waitjob or queue.depmgr.JobWaiting(job)
1322

    
1323
      if waitjob:
1324
        return self.WAITDEP
1325
      else:
1326
        return self.DEFER
1327
    finally:
1328
      assert job.writable, "Job became read-only while being processed"
1329
      queue.release()
1330

    
1331

    
1332
def _EvaluateJobProcessorResult(depmgr, job, result):
1333
  """Looks at a result from L{_JobProcessor} for a job.
1334

1335
  To be used in a L{_JobQueueWorker}.
1336

1337
  """
1338
  if result == _JobProcessor.FINISHED:
1339
    # Notify waiting jobs
1340
    depmgr.NotifyWaiters(job.id)
1341

    
1342
  elif result == _JobProcessor.DEFER:
1343
    # Schedule again
1344
    raise workerpool.DeferTask(priority=job.CalcPriority())
1345

    
1346
  elif result == _JobProcessor.WAITDEP:
1347
    # No-op, dependency manager will re-schedule
1348
    pass
1349

    
1350
  else:
1351
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1352
                                 (result, ))
1353

    
1354

    
1355
class _JobQueueWorker(workerpool.BaseWorker):
1356
  """The actual job workers.
1357

1358
  """
1359
  def RunTask(self, job): # pylint: disable=W0221
1360
    """Job executor.
1361

1362
    @type job: L{_QueuedJob}
1363
    @param job: the job to be processed
1364

1365
    """
1366
    assert job.writable, "Expected writable job"
1367

    
1368
    # Ensure only one worker is active on a single job. If a job registers for
1369
    # a dependency job, and the other job notifies before the first worker is
1370
    # done, the job can end up in the tasklist more than once.
1371
    job.processor_lock.acquire()
1372
    try:
1373
      return self._RunTaskInner(job)
1374
    finally:
1375
      job.processor_lock.release()
1376

    
1377
  def _RunTaskInner(self, job):
1378
    """Executes a job.
1379

1380
    Must be called with per-job lock acquired.
1381

1382
    """
1383
    queue = job.queue
1384
    assert queue == self.pool.queue
1385

    
1386
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1387
    setname_fn(None)
1388

    
1389
    proc = mcpu.Processor(queue.context, job.id)
1390

    
1391
    # Create wrapper for setting thread name
1392
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1393
                                    proc.ExecOpCode)
1394

    
1395
    _EvaluateJobProcessorResult(queue.depmgr, job,
1396
                                _JobProcessor(queue, wrap_execop_fn, job)())
1397

    
1398
  @staticmethod
1399
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1400
    """Updates the worker thread name to include a short summary of the opcode.
1401

1402
    @param setname_fn: Callable setting worker thread name
1403
    @param execop_fn: Callable for executing opcode (usually
1404
                      L{mcpu.Processor.ExecOpCode})
1405

1406
    """
1407
    setname_fn(op)
1408
    try:
1409
      return execop_fn(op, *args, **kwargs)
1410
    finally:
1411
      setname_fn(None)
1412

    
1413
  @staticmethod
1414
  def _GetWorkerName(job, op):
1415
    """Sets the worker thread name.
1416

1417
    @type job: L{_QueuedJob}
1418
    @type op: L{opcodes.OpCode}
1419

1420
    """
1421
    parts = ["Job%s" % job.id]
1422

    
1423
    if op:
1424
      parts.append(op.TinySummary())
1425

    
1426
    return "/".join(parts)
1427

    
1428

    
1429
class _JobQueueWorkerPool(workerpool.WorkerPool):
1430
  """Simple class implementing a job-processing workerpool.
1431

1432
  """
1433
  def __init__(self, queue):
1434
    super(_JobQueueWorkerPool, self).__init__("Jq",
1435
                                              JOBQUEUE_THREADS,
1436
                                              _JobQueueWorker)
1437
    self.queue = queue
1438

    
1439

    
1440
class _JobDependencyManager:
1441
  """Keeps track of job dependencies.
1442

1443
  """
1444
  (WAIT,
1445
   ERROR,
1446
   CANCEL,
1447
   CONTINUE,
1448
   WRONGSTATUS) = range(1, 6)
1449

    
1450
  def __init__(self, getstatus_fn, enqueue_fn):
1451
    """Initializes this class.
1452

1453
    """
1454
    self._getstatus_fn = getstatus_fn
1455
    self._enqueue_fn = enqueue_fn
1456

    
1457
    self._waiters = {}
1458
    self._lock = locking.SharedLock("JobDepMgr")
1459

    
1460
  @locking.ssynchronized(_LOCK, shared=1)
1461
  def GetLockInfo(self, requested): # pylint: disable=W0613
1462
    """Retrieves information about waiting jobs.
1463

1464
    @type requested: set
1465
    @param requested: Requested information, see C{query.LQ_*}
1466

1467
    """
1468
    # No need to sort here, that's being done by the lock manager and query
1469
    # library. There are no priorities for notifying jobs, hence all show up as
1470
    # one item under "pending".
1471
    return [("job/%s" % job_id, None, None,
1472
             [("job", [job.id for job in waiters])])
1473
            for job_id, waiters in self._waiters.items()
1474
            if waiters]
1475

    
1476
  @locking.ssynchronized(_LOCK, shared=1)
1477
  def JobWaiting(self, job):
1478
    """Checks if a job is waiting.
1479

1480
    """
1481
    return compat.any(job in jobs
1482
                      for jobs in self._waiters.values())
1483

    
1484
  @locking.ssynchronized(_LOCK)
1485
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1486
    """Checks if a dependency job has the requested status.
1487

1488
    If the other job is not yet in a finalized status, the calling job will be
1489
    notified (re-added to the workerpool) at a later point.
1490

1491
    @type job: L{_QueuedJob}
1492
    @param job: Job object
1493
    @type dep_job_id: int
1494
    @param dep_job_id: ID of dependency job
1495
    @type dep_status: list
1496
    @param dep_status: Required status
1497

1498
    """
1499
    assert ht.TJobId(job.id)
1500
    assert ht.TJobId(dep_job_id)
1501
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1502

    
1503
    if job.id == dep_job_id:
1504
      return (self.ERROR, "Job can't depend on itself")
1505

    
1506
    # Get status of dependency job
1507
    try:
1508
      status = self._getstatus_fn(dep_job_id)
1509
    except errors.JobLost, err:
1510
      return (self.ERROR, "Dependency error: %s" % err)
1511

    
1512
    assert status in constants.JOB_STATUS_ALL
1513

    
1514
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1515

    
1516
    if status not in constants.JOBS_FINALIZED:
1517
      # Register for notification and wait for job to finish
1518
      job_id_waiters.add(job)
1519
      return (self.WAIT,
1520
              "Need to wait for job %s, wanted status '%s'" %
1521
              (dep_job_id, dep_status))
1522

    
1523
    # Remove from waiters list
1524
    if job in job_id_waiters:
1525
      job_id_waiters.remove(job)
1526

    
1527
    if (status == constants.JOB_STATUS_CANCELED and
1528
        constants.JOB_STATUS_CANCELED not in dep_status):
1529
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1530

    
1531
    elif not dep_status or status in dep_status:
1532
      return (self.CONTINUE,
1533
              "Dependency job %s finished with status '%s'" %
1534
              (dep_job_id, status))
1535

    
1536
    else:
1537
      return (self.WRONGSTATUS,
1538
              "Dependency job %s finished with status '%s',"
1539
              " not one of '%s' as required" %
1540
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1541

    
1542
  def _RemoveEmptyWaitersUnlocked(self):
1543
    """Remove all jobs without actual waiters.
1544

1545
    """
1546
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1547
                   if not waiters]:
1548
      del self._waiters[job_id]
1549

    
1550
  def NotifyWaiters(self, job_id):
1551
    """Notifies all jobs waiting for a certain job ID.
1552

1553
    @attention: Do not call until L{CheckAndRegister} returned a status other
1554
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1555
    @type job_id: int
1556
    @param job_id: Job ID
1557

1558
    """
1559
    assert ht.TJobId(job_id)
1560

    
1561
    self._lock.acquire()
1562
    try:
1563
      self._RemoveEmptyWaitersUnlocked()
1564

    
1565
      jobs = self._waiters.pop(job_id, None)
1566
    finally:
1567
      self._lock.release()
1568

    
1569
    if jobs:
1570
      # Re-add jobs to workerpool
1571
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1572
                    len(jobs), job_id)
1573
      self._enqueue_fn(jobs)
1574

    
1575

    
1576
def _RequireOpenQueue(fn):
1577
  """Decorator for "public" functions.
1578

1579
  This function should be used for all 'public' functions. That is,
1580
  functions usually called from other classes. Note that this should
1581
  be applied only to methods (not plain functions), since it expects
1582
  that the decorated function is called with a first argument that has
1583
  a '_queue_filelock' argument.
1584

1585
  @warning: Use this decorator only after locking.ssynchronized
1586

1587
  Example::
1588
    @locking.ssynchronized(_LOCK)
1589
    @_RequireOpenQueue
1590
    def Example(self):
1591
      pass
1592

1593
  """
1594
  def wrapper(self, *args, **kwargs):
1595
    # pylint: disable=W0212
1596
    assert self._queue_filelock is not None, "Queue should be open"
1597
    return fn(self, *args, **kwargs)
1598
  return wrapper
1599

    
1600

    
1601
def _RequireNonDrainedQueue(fn):
1602
  """Decorator checking for a non-drained queue.
1603

1604
  To be used with functions submitting new jobs.
1605

1606
  """
1607
  def wrapper(self, *args, **kwargs):
1608
    """Wrapper function.
1609

1610
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1611

1612
    """
1613
    # Ok when sharing the big job queue lock, as the drain file is created when
1614
    # the lock is exclusive.
1615
    # Needs access to protected member, pylint: disable=W0212
1616
    if self._drained:
1617
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1618

    
1619
    if not self._accepting_jobs:
1620
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1621

    
1622
    return fn(self, *args, **kwargs)
1623
  return wrapper
1624

    
1625

    
1626
class JobQueue(object):
1627
  """Queue used to manage the jobs.
1628

1629
  """
1630
  def __init__(self, context):
1631
    """Constructor for JobQueue.
1632

1633
    The constructor will initialize the job queue object and then
1634
    start loading the current jobs from disk, either for starting them
1635
    (if they were queue) or for aborting them (if they were already
1636
    running).
1637

1638
    @type context: GanetiContext
1639
    @param context: the context object for access to the configuration
1640
        data and other ganeti objects
1641

1642
    """
1643
    self.context = context
1644
    self._memcache = weakref.WeakValueDictionary()
1645
    self._my_hostname = netutils.Hostname.GetSysName()
1646

    
1647
    # The Big JobQueue lock. If a code block or method acquires it in shared
1648
    # mode safe it must guarantee concurrency with all the code acquiring it in
1649
    # shared mode, including itself. In order not to acquire it at all
1650
    # concurrency must be guaranteed with all code acquiring it in shared mode
1651
    # and all code acquiring it exclusively.
1652
    self._lock = locking.SharedLock("JobQueue")
1653

    
1654
    self.acquire = self._lock.acquire
1655
    self.release = self._lock.release
1656

    
1657
    # Accept jobs by default
1658
    self._accepting_jobs = True
1659

    
1660
    # Initialize the queue, and acquire the filelock.
1661
    # This ensures no other process is working on the job queue.
1662
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1663

    
1664
    # Read serial file
1665
    self._last_serial = jstore.ReadSerial()
1666
    assert self._last_serial is not None, ("Serial file was modified between"
1667
                                           " check in jstore and here")
1668

    
1669
    # Get initial list of nodes
1670
    self._nodes = dict((n.name, n.primary_ip)
1671
                       for n in self.context.cfg.GetAllNodesInfo().values()
1672
                       if n.master_candidate)
1673

    
1674
    # Remove master node
1675
    self._nodes.pop(self._my_hostname, None)
1676

    
1677
    # TODO: Check consistency across nodes
1678

    
1679
    self._queue_size = None
1680
    self._UpdateQueueSizeUnlocked()
1681
    assert ht.TInt(self._queue_size)
1682
    self._drained = jstore.CheckDrainFlag()
1683

    
1684
    # Job dependencies
1685
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1686
                                        self._EnqueueJobs)
1687
    self.context.glm.AddToLockMonitor(self.depmgr)
1688

    
1689
    # Setup worker pool
1690
    self._wpool = _JobQueueWorkerPool(self)
1691
    try:
1692
      self._InspectQueue()
1693
    except:
1694
      self._wpool.TerminateWorkers()
1695
      raise
1696

    
1697
  @locking.ssynchronized(_LOCK)
1698
  @_RequireOpenQueue
1699
  def _InspectQueue(self):
1700
    """Loads the whole job queue and resumes unfinished jobs.
1701

1702
    This function needs the lock here because WorkerPool.AddTask() may start a
1703
    job while we're still doing our work.
1704

1705
    """
1706
    logging.info("Inspecting job queue")
1707

    
1708
    restartjobs = []
1709

    
1710
    all_job_ids = self._GetJobIDsUnlocked()
1711
    jobs_count = len(all_job_ids)
1712
    lastinfo = time.time()
1713
    for idx, job_id in enumerate(all_job_ids):
1714
      # Give an update every 1000 jobs or 10 seconds
1715
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1716
          idx == (jobs_count - 1)):
1717
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1718
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1719
        lastinfo = time.time()
1720

    
1721
      job = self._LoadJobUnlocked(job_id)
1722

    
1723
      # a failure in loading the job can cause 'None' to be returned
1724
      if job is None:
1725
        continue
1726

    
1727
      status = job.CalcStatus()
1728

    
1729
      if status == constants.JOB_STATUS_QUEUED:
1730
        restartjobs.append(job)
1731

    
1732
      elif status in (constants.JOB_STATUS_RUNNING,
1733
                      constants.JOB_STATUS_WAITING,
1734
                      constants.JOB_STATUS_CANCELING):
1735
        logging.warning("Unfinished job %s found: %s", job.id, job)
1736

    
1737
        if status == constants.JOB_STATUS_WAITING:
1738
          # Restart job
1739
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1740
          restartjobs.append(job)
1741
        else:
1742
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1743
                                "Unclean master daemon shutdown")
1744
          job.Finalize()
1745

    
1746
        self.UpdateJobUnlocked(job)
1747

    
1748
    if restartjobs:
1749
      logging.info("Restarting %s jobs", len(restartjobs))
1750
      self._EnqueueJobsUnlocked(restartjobs)
1751

    
1752
    logging.info("Job queue inspection finished")
1753

    
1754
  def _GetRpc(self, address_list):
1755
    """Gets RPC runner with context.
1756

1757
    """
1758
    return rpc.JobQueueRunner(self.context, address_list)
1759

    
1760
  @locking.ssynchronized(_LOCK)
1761
  @_RequireOpenQueue
1762
  def AddNode(self, node):
1763
    """Register a new node with the queue.
1764

1765
    @type node: L{objects.Node}
1766
    @param node: the node object to be added
1767

1768
    """
1769
    node_name = node.name
1770
    assert node_name != self._my_hostname
1771

    
1772
    # Clean queue directory on added node
1773
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1774
    msg = result.fail_msg
1775
    if msg:
1776
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1777
                      node_name, msg)
1778

    
1779
    if not node.master_candidate:
1780
      # remove if existing, ignoring errors
1781
      self._nodes.pop(node_name, None)
1782
      # and skip the replication of the job ids
1783
      return
1784

    
1785
    # Upload the whole queue excluding archived jobs
1786
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1787

    
1788
    # Upload current serial file
1789
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1790

    
1791
    # Static address list
1792
    addrs = [node.primary_ip]
1793

    
1794
    for file_name in files:
1795
      # Read file content
1796
      content = utils.ReadFile(file_name)
1797

    
1798
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1799
                             file_name, content)
1800
      msg = result[node_name].fail_msg
1801
      if msg:
1802
        logging.error("Failed to upload file %s to node %s: %s",
1803
                      file_name, node_name, msg)
1804

    
1805
    self._nodes[node_name] = node.primary_ip
1806

    
1807
  @locking.ssynchronized(_LOCK)
1808
  @_RequireOpenQueue
1809
  def RemoveNode(self, node_name):
1810
    """Callback called when removing nodes from the cluster.
1811

1812
    @type node_name: str
1813
    @param node_name: the name of the node to remove
1814

1815
    """
1816
    self._nodes.pop(node_name, None)
1817

    
1818
  @staticmethod
1819
  def _CheckRpcResult(result, nodes, failmsg):
1820
    """Verifies the status of an RPC call.
1821

1822
    Since we aim to keep consistency should this node (the current
1823
    master) fail, we will log errors if our rpc fail, and especially
1824
    log the case when more than half of the nodes fails.
1825

1826
    @param result: the data as returned from the rpc call
1827
    @type nodes: list
1828
    @param nodes: the list of nodes we made the call to
1829
    @type failmsg: str
1830
    @param failmsg: the identifier to be used for logging
1831

1832
    """
1833
    failed = []
1834
    success = []
1835

    
1836
    for node in nodes:
1837
      msg = result[node].fail_msg
1838
      if msg:
1839
        failed.append(node)
1840
        logging.error("RPC call %s (%s) failed on node %s: %s",
1841
                      result[node].call, failmsg, node, msg)
1842
      else:
1843
        success.append(node)
1844

    
1845
    # +1 for the master node
1846
    if (len(success) + 1) < len(failed):
1847
      # TODO: Handle failing nodes
1848
      logging.error("More than half of the nodes failed")
1849

    
1850
  def _GetNodeIp(self):
1851
    """Helper for returning the node name/ip list.
1852

1853
    @rtype: (list, list)
1854
    @return: a tuple of two lists, the first one with the node
1855
        names and the second one with the node addresses
1856

1857
    """
1858
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1859
    name_list = self._nodes.keys()
1860
    addr_list = [self._nodes[name] for name in name_list]
1861
    return name_list, addr_list
1862

    
1863
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1864
    """Writes a file locally and then replicates it to all nodes.
1865

1866
    This function will replace the contents of a file on the local
1867
    node and then replicate it to all the other nodes we have.
1868

1869
    @type file_name: str
1870
    @param file_name: the path of the file to be replicated
1871
    @type data: str
1872
    @param data: the new contents of the file
1873
    @type replicate: boolean
1874
    @param replicate: whether to spread the changes to the remote nodes
1875

1876
    """
1877
    getents = runtime.GetEnts()
1878
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1879
                    gid=getents.masterd_gid)
1880

    
1881
    if replicate:
1882
      names, addrs = self._GetNodeIp()
1883
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1884
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1885

    
1886
  def _RenameFilesUnlocked(self, rename):
1887
    """Renames a file locally and then replicate the change.
1888

1889
    This function will rename a file in the local queue directory
1890
    and then replicate this rename to all the other nodes we have.
1891

1892
    @type rename: list of (old, new)
1893
    @param rename: List containing tuples mapping old to new names
1894

1895
    """
1896
    # Rename them locally
1897
    for old, new in rename:
1898
      utils.RenameFile(old, new, mkdir=True)
1899

    
1900
    # ... and on all nodes
1901
    names, addrs = self._GetNodeIp()
1902
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1903
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1904

    
1905
  def _NewSerialsUnlocked(self, count):
1906
    """Generates a new job identifier.
1907

1908
    Job identifiers are unique during the lifetime of a cluster.
1909

1910
    @type count: integer
1911
    @param count: how many serials to return
1912
    @rtype: list of int
1913
    @return: a list of job identifiers.
1914

1915
    """
1916
    assert ht.TPositiveInt(count)
1917

    
1918
    # New number
1919
    serial = self._last_serial + count
1920

    
1921
    # Write to file
1922
    self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1923
                             "%s\n" % serial, True)
1924

    
1925
    result = [jstore.FormatJobID(v)
1926
              for v in range(self._last_serial + 1, serial + 1)]
1927

    
1928
    # Keep it only if we were able to write the file
1929
    self._last_serial = serial
1930

    
1931
    assert len(result) == count
1932

    
1933
    return result
1934

    
1935
  @staticmethod
1936
  def _GetJobPath(job_id):
1937
    """Returns the job file for a given job id.
1938

1939
    @type job_id: str
1940
    @param job_id: the job identifier
1941
    @rtype: str
1942
    @return: the path to the job file
1943

1944
    """
1945
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1946

    
1947
  @staticmethod
1948
  def _GetArchivedJobPath(job_id):
1949
    """Returns the archived job file for a give job id.
1950

1951
    @type job_id: str
1952
    @param job_id: the job identifier
1953
    @rtype: str
1954
    @return: the path to the archived job file
1955

1956
    """
1957
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1958
                          jstore.GetArchiveDirectory(job_id),
1959
                          "job-%s" % job_id)
1960

    
1961
  @staticmethod
1962
  def _DetermineJobDirectories(archived):
1963
    """Build list of directories containing job files.
1964

1965
    @type archived: bool
1966
    @param archived: Whether to include directories for archived jobs
1967
    @rtype: list
1968

1969
    """
1970
    result = [pathutils.QUEUE_DIR]
1971

    
1972
    if archived:
1973
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1974
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
1975
                        utils.ListVisibleFiles(archive_path)))
1976

    
1977
    return result
1978

    
1979
  @classmethod
1980
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1981
    """Return all known job IDs.
1982

1983
    The method only looks at disk because it's a requirement that all
1984
    jobs are present on disk (so in the _memcache we don't have any
1985
    extra IDs).
1986

1987
    @type sort: boolean
1988
    @param sort: perform sorting on the returned job ids
1989
    @rtype: list
1990
    @return: the list of job IDs
1991

1992
    """
1993
    jlist = []
1994

    
1995
    for path in cls._DetermineJobDirectories(archived):
1996
      for filename in utils.ListVisibleFiles(path):
1997
        m = constants.JOB_FILE_RE.match(filename)
1998
        if m:
1999
          jlist.append(int(m.group(1)))
2000

    
2001
    if sort:
2002
      jlist.sort()
2003
    return jlist
2004

    
2005
  def _LoadJobUnlocked(self, job_id):
2006
    """Loads a job from the disk or memory.
2007

2008
    Given a job id, this will return the cached job object if
2009
    existing, or try to load the job from the disk. If loading from
2010
    disk, it will also add the job to the cache.
2011

2012
    @type job_id: int
2013
    @param job_id: the job id
2014
    @rtype: L{_QueuedJob} or None
2015
    @return: either None or the job object
2016

2017
    """
2018
    job = self._memcache.get(job_id, None)
2019
    if job:
2020
      logging.debug("Found job %s in memcache", job_id)
2021
      assert job.writable, "Found read-only job in memcache"
2022
      return job
2023

    
2024
    try:
2025
      job = self._LoadJobFromDisk(job_id, False)
2026
      if job is None:
2027
        return job
2028
    except errors.JobFileCorrupted:
2029
      old_path = self._GetJobPath(job_id)
2030
      new_path = self._GetArchivedJobPath(job_id)
2031
      if old_path == new_path:
2032
        # job already archived (future case)
2033
        logging.exception("Can't parse job %s", job_id)
2034
      else:
2035
        # non-archived case
2036
        logging.exception("Can't parse job %s, will archive.", job_id)
2037
        self._RenameFilesUnlocked([(old_path, new_path)])
2038
      return None
2039

    
2040
    assert job.writable, "Job just loaded is not writable"
2041

    
2042
    self._memcache[job_id] = job
2043
    logging.debug("Added job %s to the cache", job_id)
2044
    return job
2045

    
2046
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2047
    """Load the given job file from disk.
2048

2049
    Given a job file, read, load and restore it in a _QueuedJob format.
2050

2051
    @type job_id: int
2052
    @param job_id: job identifier
2053
    @type try_archived: bool
2054
    @param try_archived: Whether to try loading an archived job
2055
    @rtype: L{_QueuedJob} or None
2056
    @return: either None or the job object
2057

2058
    """
2059
    path_functions = [(self._GetJobPath, False)]
2060

    
2061
    if try_archived:
2062
      path_functions.append((self._GetArchivedJobPath, True))
2063

    
2064
    raw_data = None
2065
    archived = None
2066

    
2067
    for (fn, archived) in path_functions:
2068
      filepath = fn(job_id)
2069
      logging.debug("Loading job from %s", filepath)
2070
      try:
2071
        raw_data = utils.ReadFile(filepath)
2072
      except EnvironmentError, err:
2073
        if err.errno != errno.ENOENT:
2074
          raise
2075
      else:
2076
        break
2077

    
2078
    if not raw_data:
2079
      return None
2080

    
2081
    if writable is None:
2082
      writable = not archived
2083

    
2084
    try:
2085
      data = serializer.LoadJson(raw_data)
2086
      job = _QueuedJob.Restore(self, data, writable, archived)
2087
    except Exception, err: # pylint: disable=W0703
2088
      raise errors.JobFileCorrupted(err)
2089

    
2090
    return job
2091

    
2092
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2093
    """Load the given job file from disk.
2094

2095
    Given a job file, read, load and restore it in a _QueuedJob format.
2096
    In case of error reading the job, it gets returned as None, and the
2097
    exception is logged.
2098

2099
    @type job_id: int
2100
    @param job_id: job identifier
2101
    @type try_archived: bool
2102
    @param try_archived: Whether to try loading an archived job
2103
    @rtype: L{_QueuedJob} or None
2104
    @return: either None or the job object
2105

2106
    """
2107
    try:
2108
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2109
    except (errors.JobFileCorrupted, EnvironmentError):
2110
      logging.exception("Can't load/parse job %s", job_id)
2111
      return None
2112

    
2113
  def _UpdateQueueSizeUnlocked(self):
2114
    """Update the queue size.
2115

2116
    """
2117
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2118

    
2119
  @locking.ssynchronized(_LOCK)
2120
  @_RequireOpenQueue
2121
  def SetDrainFlag(self, drain_flag):
2122
    """Sets the drain flag for the queue.
2123

2124
    @type drain_flag: boolean
2125
    @param drain_flag: Whether to set or unset the drain flag
2126

2127
    """
2128
    jstore.SetDrainFlag(drain_flag)
2129

    
2130
    self._drained = drain_flag
2131

    
2132
    return True
2133

    
2134
  @_RequireOpenQueue
2135
  def _SubmitJobUnlocked(self, job_id, ops):
2136
    """Create and store a new job.
2137

2138
    This enters the job into our job queue and also puts it on the new
2139
    queue, in order for it to be picked up by the queue processors.
2140

2141
    @type job_id: job ID
2142
    @param job_id: the job ID for the new job
2143
    @type ops: list
2144
    @param ops: The list of OpCodes that will become the new job.
2145
    @rtype: L{_QueuedJob}
2146
    @return: the job object to be queued
2147
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2148
    @raise errors.GenericError: If an opcode is not valid
2149

2150
    """
2151
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2152
      raise errors.JobQueueFull()
2153

    
2154
    job = _QueuedJob(self, job_id, ops, True)
2155

    
2156
    for idx, op in enumerate(job.ops):
2157
      # Check priority
2158
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2159
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2160
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2161
                                  " are %s" % (idx, op.priority, allowed))
2162

    
2163
      # Check job dependencies
2164
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2165
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2166
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2167
                                  " match %s: %s" %
2168
                                  (idx, opcodes.TNoRelativeJobDependencies,
2169
                                   dependencies))
2170

    
2171
    # Write to disk
2172
    self.UpdateJobUnlocked(job)
2173

    
2174
    self._queue_size += 1
2175

    
2176
    logging.debug("Adding new job %s to the cache", job_id)
2177
    self._memcache[job_id] = job
2178

    
2179
    return job
2180

    
2181
  @locking.ssynchronized(_LOCK)
2182
  @_RequireOpenQueue
2183
  @_RequireNonDrainedQueue
2184
  def SubmitJob(self, ops):
2185
    """Create and store a new job.
2186

2187
    @see: L{_SubmitJobUnlocked}
2188

2189
    """
2190
    (job_id, ) = self._NewSerialsUnlocked(1)
2191
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2192
    return job_id
2193

    
2194
  @locking.ssynchronized(_LOCK)
2195
  @_RequireOpenQueue
2196
  @_RequireNonDrainedQueue
2197
  def SubmitManyJobs(self, jobs):
2198
    """Create and store multiple jobs.
2199

2200
    @see: L{_SubmitJobUnlocked}
2201

2202
    """
2203
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2204

    
2205
    (results, added_jobs) = \
2206
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2207

    
2208
    self._EnqueueJobsUnlocked(added_jobs)
2209

    
2210
    return results
2211

    
2212
  @staticmethod
2213
  def _FormatSubmitError(msg, ops):
2214
    """Formats errors which occurred while submitting a job.
2215

2216
    """
2217
    return ("%s; opcodes %s" %
2218
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2219

    
2220
  @staticmethod
2221
  def _ResolveJobDependencies(resolve_fn, deps):
2222
    """Resolves relative job IDs in dependencies.
2223

2224
    @type resolve_fn: callable
2225
    @param resolve_fn: Function to resolve a relative job ID
2226
    @type deps: list
2227
    @param deps: Dependencies
2228
    @rtype: tuple; (boolean, string or list)
2229
    @return: If successful (first tuple item), the returned list contains
2230
      resolved job IDs along with the requested status; if not successful,
2231
      the second element is an error message
2232

2233
    """
2234
    result = []
2235

    
2236
    for (dep_job_id, dep_status) in deps:
2237
      if ht.TRelativeJobId(dep_job_id):
2238
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2239
        try:
2240
          job_id = resolve_fn(dep_job_id)
2241
        except IndexError:
2242
          # Abort
2243
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2244
      else:
2245
        job_id = dep_job_id
2246

    
2247
      result.append((job_id, dep_status))
2248

    
2249
    return (True, result)
2250

    
2251
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2252
    """Create and store multiple jobs.
2253

2254
    @see: L{_SubmitJobUnlocked}
2255

2256
    """
2257
    results = []
2258
    added_jobs = []
2259

    
2260
    def resolve_fn(job_idx, reljobid):
2261
      assert reljobid < 0
2262
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2263

    
2264
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2265
      for op in ops:
2266
        if getattr(op, opcodes.DEPEND_ATTR, None):
2267
          (status, data) = \
2268
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2269
                                         op.depends)
2270
          if not status:
2271
            # Abort resolving dependencies
2272
            assert ht.TNonEmptyString(data), "No error message"
2273
            break
2274
          # Use resolved dependencies
2275
          op.depends = data
2276
      else:
2277
        try:
2278
          job = self._SubmitJobUnlocked(job_id, ops)
2279
        except errors.GenericError, err:
2280
          status = False
2281
          data = self._FormatSubmitError(str(err), ops)
2282
        else:
2283
          status = True
2284
          data = job_id
2285
          added_jobs.append(job)
2286

    
2287
      results.append((status, data))
2288

    
2289
    return (results, added_jobs)
2290

    
2291
  @locking.ssynchronized(_LOCK)
2292
  def _EnqueueJobs(self, jobs):
2293
    """Helper function to add jobs to worker pool's queue.
2294

2295
    @type jobs: list
2296
    @param jobs: List of all jobs
2297

2298
    """
2299
    return self._EnqueueJobsUnlocked(jobs)
2300

    
2301
  def _EnqueueJobsUnlocked(self, jobs):
2302
    """Helper function to add jobs to worker pool's queue.
2303

2304
    @type jobs: list
2305
    @param jobs: List of all jobs
2306

2307
    """
2308
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2309
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2310
                             priority=[job.CalcPriority() for job in jobs],
2311
                             task_id=map(_GetIdAttr, jobs))
2312

    
2313
  def _GetJobStatusForDependencies(self, job_id):
2314
    """Gets the status of a job for dependencies.
2315

2316
    @type job_id: int
2317
    @param job_id: Job ID
2318
    @raise errors.JobLost: If job can't be found
2319

2320
    """
2321
    # Not using in-memory cache as doing so would require an exclusive lock
2322

    
2323
    # Try to load from disk
2324
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2325

    
2326
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2327

    
2328
    if job:
2329
      return job.CalcStatus()
2330

    
2331
    raise errors.JobLost("Job %s not found" % job_id)
2332

    
2333
  @_RequireOpenQueue
2334
  def UpdateJobUnlocked(self, job, replicate=True):
2335
    """Update a job's on disk storage.
2336

2337
    After a job has been modified, this function needs to be called in
2338
    order to write the changes to disk and replicate them to the other
2339
    nodes.
2340

2341
    @type job: L{_QueuedJob}
2342
    @param job: the changed job
2343
    @type replicate: boolean
2344
    @param replicate: whether to replicate the change to remote nodes
2345

2346
    """
2347
    if __debug__:
2348
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2349
      assert (finalized ^ (job.end_timestamp is None))
2350
      assert job.writable, "Can't update read-only job"
2351
      assert not job.archived, "Can't update archived job"
2352

    
2353
    filename = self._GetJobPath(job.id)
2354
    data = serializer.DumpJson(job.Serialize())
2355
    logging.debug("Writing job %s to %s", job.id, filename)
2356
    self._UpdateJobQueueFile(filename, data, replicate)
2357

    
2358
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2359
                        timeout):
2360
    """Waits for changes in a job.
2361

2362
    @type job_id: int
2363
    @param job_id: Job identifier
2364
    @type fields: list of strings
2365
    @param fields: Which fields to check for changes
2366
    @type prev_job_info: list or None
2367
    @param prev_job_info: Last job information returned
2368
    @type prev_log_serial: int
2369
    @param prev_log_serial: Last job message serial number
2370
    @type timeout: float
2371
    @param timeout: maximum time to wait in seconds
2372
    @rtype: tuple (job info, log entries)
2373
    @return: a tuple of the job information as required via
2374
        the fields parameter, and the log entries as a list
2375

2376
        if the job has not changed and the timeout has expired,
2377
        we instead return a special value,
2378
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2379
        as such by the clients
2380

2381
    """
2382
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2383
                             writable=False)
2384

    
2385
    helper = _WaitForJobChangesHelper()
2386

    
2387
    return helper(self._GetJobPath(job_id), load_fn,
2388
                  fields, prev_job_info, prev_log_serial, timeout)
2389

    
2390
  @locking.ssynchronized(_LOCK)
2391
  @_RequireOpenQueue
2392
  def CancelJob(self, job_id):
2393
    """Cancels a job.
2394

2395
    This will only succeed if the job has not started yet.
2396

2397
    @type job_id: int
2398
    @param job_id: job ID of job to be cancelled.
2399

2400
    """
2401
    logging.info("Cancelling job %s", job_id)
2402

    
2403
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2404

    
2405
  @locking.ssynchronized(_LOCK)
2406
  @_RequireOpenQueue
2407
  def ChangeJobPriority(self, job_id, priority):
2408
    """Changes a job's priority.
2409

2410
    @type job_id: int
2411
    @param job_id: ID of the job whose priority should be changed
2412
    @type priority: int
2413
    @param priority: New priority
2414

2415
    """
2416
    logging.info("Changing priority of job %s to %s", job_id, priority)
2417

    
2418
    if priority not in constants.OP_PRIO_SUBMIT_VALID:
2419
      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2420
      raise errors.GenericError("Invalid priority %s, allowed are %s" %
2421
                                (priority, allowed))
2422

    
2423
    def fn(job):
2424
      (success, msg) = job.ChangePriority(priority)
2425

    
2426
      if success:
2427
        try:
2428
          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2429
        except workerpool.NoSuchTask:
2430
          logging.debug("Job %s is not in workerpool at this time", job.id)
2431

    
2432
      return (success, msg)
2433

    
2434
    return self._ModifyJobUnlocked(job_id, fn)
2435

    
2436
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2437
    """Modifies a job.
2438

2439
    @type job_id: int
2440
    @param job_id: Job ID
2441
    @type mod_fn: callable
2442
    @param mod_fn: Modifying function, receiving job object as parameter,
2443
      returning tuple of (status boolean, message string)
2444

2445
    """
2446
    job = self._LoadJobUnlocked(job_id)
2447
    if not job:
2448
      logging.debug("Job %s not found", job_id)
2449
      return (False, "Job %s not found" % job_id)
2450

    
2451
    assert job.writable, "Can't modify read-only job"
2452
    assert not job.archived, "Can't modify archived job"
2453

    
2454
    (success, msg) = mod_fn(job)
2455

    
2456
    if success:
2457
      # If the job was finalized (e.g. cancelled), this is the final write
2458
      # allowed. The job can be archived anytime.
2459
      self.UpdateJobUnlocked(job)
2460

    
2461
    return (success, msg)
2462

    
2463
  @_RequireOpenQueue
2464
  def _ArchiveJobsUnlocked(self, jobs):
2465
    """Archives jobs.
2466

2467
    @type jobs: list of L{_QueuedJob}
2468
    @param jobs: Job objects
2469
    @rtype: int
2470
    @return: Number of archived jobs
2471

2472
    """
2473
    archive_jobs = []
2474
    rename_files = []
2475
    for job in jobs:
2476
      assert job.writable, "Can't archive read-only job"
2477
      assert not job.archived, "Can't cancel archived job"
2478

    
2479
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2480
        logging.debug("Job %s is not yet done", job.id)
2481
        continue
2482

    
2483
      archive_jobs.append(job)
2484

    
2485
      old = self._GetJobPath(job.id)
2486
      new = self._GetArchivedJobPath(job.id)
2487
      rename_files.append((old, new))
2488

    
2489
    # TODO: What if 1..n files fail to rename?
2490
    self._RenameFilesUnlocked(rename_files)
2491

    
2492
    logging.debug("Successfully archived job(s) %s",
2493
                  utils.CommaJoin(job.id for job in archive_jobs))
2494

    
2495
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2496
    # the files, we update the cached queue size from the filesystem. When we
2497
    # get around to fix the TODO: above, we can use the number of actually
2498
    # archived jobs to fix this.
2499
    self._UpdateQueueSizeUnlocked()
2500
    return len(archive_jobs)
2501

    
2502
  @locking.ssynchronized(_LOCK)
2503
  @_RequireOpenQueue
2504
  def ArchiveJob(self, job_id):
2505
    """Archives a job.
2506

2507
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2508

2509
    @type job_id: int
2510
    @param job_id: Job ID of job to be archived.
2511
    @rtype: bool
2512
    @return: Whether job was archived
2513

2514
    """
2515
    logging.info("Archiving job %s", job_id)
2516

    
2517
    job = self._LoadJobUnlocked(job_id)
2518
    if not job:
2519
      logging.debug("Job %s not found", job_id)
2520
      return False
2521

    
2522
    return self._ArchiveJobsUnlocked([job]) == 1
2523

    
2524
  @locking.ssynchronized(_LOCK)
2525
  @_RequireOpenQueue
2526
  def AutoArchiveJobs(self, age, timeout):
2527
    """Archives all jobs based on age.
2528

2529
    The method will archive all jobs which are older than the age
2530
    parameter. For jobs that don't have an end timestamp, the start
2531
    timestamp will be considered. The special '-1' age will cause
2532
    archival of all jobs (that are not running or queued).
2533

2534
    @type age: int
2535
    @param age: the minimum age in seconds
2536

2537
    """
2538
    logging.info("Archiving jobs with age more than %s seconds", age)
2539

    
2540
    now = time.time()
2541
    end_time = now + timeout
2542
    archived_count = 0
2543
    last_touched = 0
2544

    
2545
    all_job_ids = self._GetJobIDsUnlocked()
2546
    pending = []
2547
    for idx, job_id in enumerate(all_job_ids):
2548
      last_touched = idx + 1
2549

    
2550
      # Not optimal because jobs could be pending
2551
      # TODO: Measure average duration for job archival and take number of
2552
      # pending jobs into account.
2553
      if time.time() > end_time:
2554
        break
2555

    
2556
      # Returns None if the job failed to load
2557
      job = self._LoadJobUnlocked(job_id)
2558
      if job:
2559
        if job.end_timestamp is None:
2560
          if job.start_timestamp is None:
2561
            job_age = job.received_timestamp
2562
          else:
2563
            job_age = job.start_timestamp
2564
        else:
2565
          job_age = job.end_timestamp
2566

    
2567
        if age == -1 or now - job_age[0] > age:
2568
          pending.append(job)
2569

    
2570
          # Archive 10 jobs at a time
2571
          if len(pending) >= 10:
2572
            archived_count += self._ArchiveJobsUnlocked(pending)
2573
            pending = []
2574

    
2575
    if pending:
2576
      archived_count += self._ArchiveJobsUnlocked(pending)
2577

    
2578
    return (archived_count, len(all_job_ids) - last_touched)
2579

    
2580
  def _Query(self, fields, qfilter):
2581
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2582
                       namefield="id")
2583

    
2584
    # Archived jobs are only looked at if the "archived" field is referenced
2585
    # either as a requested field or in the filter. By default archived jobs
2586
    # are ignored.
2587
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2588

    
2589
    job_ids = qobj.RequestedNames()
2590

    
2591
    list_all = (job_ids is None)
2592

    
2593
    if list_all:
2594
      # Since files are added to/removed from the queue atomically, there's no
2595
      # risk of getting the job ids in an inconsistent state.
2596
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2597

    
2598
    jobs = []
2599

    
2600
    for job_id in job_ids:
2601
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2602
      if job is not None or not list_all:
2603
        jobs.append((job_id, job))
2604

    
2605
    return (qobj, jobs, list_all)
2606

    
2607
  def QueryJobs(self, fields, qfilter):
2608
    """Returns a list of jobs in queue.
2609

2610
    @type fields: sequence
2611
    @param fields: List of wanted fields
2612
    @type qfilter: None or query2 filter (list)
2613
    @param qfilter: Query filter
2614

2615
    """
2616
    (qobj, ctx, _) = self._Query(fields, qfilter)
2617

    
2618
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2619

    
2620
  def OldStyleQueryJobs(self, job_ids, fields):
2621
    """Returns a list of jobs in queue.
2622

2623
    @type job_ids: list
2624
    @param job_ids: sequence of job identifiers or None for all
2625
    @type fields: list
2626
    @param fields: names of fields to return
2627
    @rtype: list
2628
    @return: list one element per job, each element being list with
2629
        the requested fields
2630

2631
    """
2632
    # backwards compat:
2633
    job_ids = [int(jid) for jid in job_ids]
2634
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2635

    
2636
    (qobj, ctx, _) = self._Query(fields, qfilter)
2637

    
2638
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2639

    
2640
  @locking.ssynchronized(_LOCK)
2641
  def PrepareShutdown(self):
2642
    """Prepare to stop the job queue.
2643

2644
    Disables execution of jobs in the workerpool and returns whether there are
2645
    any jobs currently running. If the latter is the case, the job queue is not
2646
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2647
    be called without interfering with any job. Queued and unfinished jobs will
2648
    be resumed next time.
2649

2650
    Once this function has been called no new job submissions will be accepted
2651
    (see L{_RequireNonDrainedQueue}).
2652

2653
    @rtype: bool
2654
    @return: Whether there are any running jobs
2655

2656
    """
2657
    if self._accepting_jobs:
2658
      self._accepting_jobs = False
2659

    
2660
      # Tell worker pool to stop processing pending tasks
2661
      self._wpool.SetActive(False)
2662

    
2663
    return self._wpool.HasRunningTasks()
2664

    
2665
  def AcceptingJobsUnlocked(self):
2666
    """Returns whether jobs are accepted.
2667

2668
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2669
    queue is shutting down.
2670

2671
    @rtype: bool
2672

2673
    """
2674
    return self._accepting_jobs
2675

    
2676
  @locking.ssynchronized(_LOCK)
2677
  @_RequireOpenQueue
2678
  def Shutdown(self):
2679
    """Stops the job queue.
2680

2681
    This shutdowns all the worker threads an closes the queue.
2682

2683
    """
2684
    self._wpool.TerminateWorkers()
2685

    
2686
    self._queue_filelock.Close()
2687
    self._queue_filelock = None