Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 3c631ea2

History | View | Annotate | Download (78.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import time
35
import weakref
36
import threading
37
import itertools
38
import operator
39

    
40
try:
41
  # pylint: disable=E0611
42
  from pyinotify import pyinotify
43
except ImportError:
44
  import pyinotify
45

    
46
from ganeti import asyncnotifier
47
from ganeti import constants
48
from ganeti import serializer
49
from ganeti import workerpool
50
from ganeti import locking
51
from ganeti import opcodes
52
from ganeti import errors
53
from ganeti import mcpu
54
from ganeti import utils
55
from ganeti import jstore
56
from ganeti import rpc
57
from ganeti import runtime
58
from ganeti import netutils
59
from ganeti import compat
60
from ganeti import ht
61
from ganeti import query
62
from ganeti import qlang
63
from ganeti import pathutils
64
from ganeti import vcluster
65

    
66

    
67
JOBQUEUE_THREADS = 25
68

    
69
# member lock names to be passed to @ssynchronized decorator
70
_LOCK = "_lock"
71
_QUEUE = "_queue"
72

    
73
#: Retrieves "id" attribute
74
_GetIdAttr = operator.attrgetter("id")
75

    
76

    
77
class CancelJob(Exception):
78
  """Special exception to cancel a job.
79

80
  """
81

    
82

    
83
class QueueShutdown(Exception):
84
  """Special exception to abort a job when the job queue is shutting down.
85

86
  """
87

    
88

    
89
def TimeStampNow():
90
  """Returns the current timestamp.
91

92
  @rtype: tuple
93
  @return: the current time in the (seconds, microseconds) format
94

95
  """
96
  return utils.SplitTime(time.time())
97

    
98

    
99
def _CallJqUpdate(runner, names, file_name, content):
100
  """Updates job queue file after virtualizing filename.
101

102
  """
103
  virt_file_name = vcluster.MakeVirtualPath(file_name)
104
  return runner.call_jobqueue_update(names, virt_file_name, content)
105

    
106

    
107
class _SimpleJobQuery:
108
  """Wrapper for job queries.
109

110
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
111

112
  """
113
  def __init__(self, fields):
114
    """Initializes this class.
115

116
    """
117
    self._query = query.Query(query.JOB_FIELDS, fields)
118

    
119
  def __call__(self, job):
120
    """Executes a job query using cached field list.
121

122
    """
123
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
124

    
125

    
126
class _QueuedOpCode(object):
127
  """Encapsulates an opcode object.
128

129
  @ivar log: holds the execution log and consists of tuples
130
  of the form C{(log_serial, timestamp, level, message)}
131
  @ivar input: the OpCode we encapsulate
132
  @ivar status: the current status
133
  @ivar result: the result of the LU execution
134
  @ivar start_timestamp: timestamp for the start of the execution
135
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
136
  @ivar stop_timestamp: timestamp for the end of the execution
137

138
  """
139
  __slots__ = ["input", "status", "result", "log", "priority",
140
               "start_timestamp", "exec_timestamp", "end_timestamp",
141
               "__weakref__"]
142

    
143
  def __init__(self, op):
144
    """Initializes instances of this class.
145

146
    @type op: L{opcodes.OpCode}
147
    @param op: the opcode we encapsulate
148

149
    """
150
    self.input = op
151
    self.status = constants.OP_STATUS_QUEUED
152
    self.result = None
153
    self.log = []
154
    self.start_timestamp = None
155
    self.exec_timestamp = None
156
    self.end_timestamp = None
157

    
158
    # Get initial priority (it might change during the lifetime of this opcode)
159
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
160

    
161
  @classmethod
162
  def Restore(cls, state):
163
    """Restore the _QueuedOpCode from the serialized form.
164

165
    @type state: dict
166
    @param state: the serialized state
167
    @rtype: _QueuedOpCode
168
    @return: a new _QueuedOpCode instance
169

170
    """
171
    obj = _QueuedOpCode.__new__(cls)
172
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
173
    obj.status = state["status"]
174
    obj.result = state["result"]
175
    obj.log = state["log"]
176
    obj.start_timestamp = state.get("start_timestamp", None)
177
    obj.exec_timestamp = state.get("exec_timestamp", None)
178
    obj.end_timestamp = state.get("end_timestamp", None)
179
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
180
    return obj
181

    
182
  def Serialize(self):
183
    """Serializes this _QueuedOpCode.
184

185
    @rtype: dict
186
    @return: the dictionary holding the serialized state
187

188
    """
189
    return {
190
      "input": self.input.__getstate__(),
191
      "status": self.status,
192
      "result": self.result,
193
      "log": self.log,
194
      "start_timestamp": self.start_timestamp,
195
      "exec_timestamp": self.exec_timestamp,
196
      "end_timestamp": self.end_timestamp,
197
      "priority": self.priority,
198
      }
199

    
200

    
201
class _QueuedJob(object):
202
  """In-memory job representation.
203

204
  This is what we use to track the user-submitted jobs. Locking must
205
  be taken care of by users of this class.
206

207
  @type queue: L{JobQueue}
208
  @ivar queue: the parent queue
209
  @ivar id: the job ID
210
  @type ops: list
211
  @ivar ops: the list of _QueuedOpCode that constitute the job
212
  @type log_serial: int
213
  @ivar log_serial: holds the index for the next log entry
214
  @ivar received_timestamp: the timestamp for when the job was received
215
  @ivar start_timestmap: the timestamp for start of execution
216
  @ivar end_timestamp: the timestamp for end of execution
217
  @ivar writable: Whether the job is allowed to be modified
218

219
  """
220
  # pylint: disable=W0212
221
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
222
               "received_timestamp", "start_timestamp", "end_timestamp",
223
               "__weakref__", "processor_lock", "writable", "archived"]
224

    
225
  def __init__(self, queue, job_id, ops, writable):
226
    """Constructor for the _QueuedJob.
227

228
    @type queue: L{JobQueue}
229
    @param queue: our parent queue
230
    @type job_id: job_id
231
    @param job_id: our job id
232
    @type ops: list
233
    @param ops: the list of opcodes we hold, which will be encapsulated
234
        in _QueuedOpCodes
235
    @type writable: bool
236
    @param writable: Whether job can be modified
237

238
    """
239
    if not ops:
240
      raise errors.GenericError("A job needs at least one opcode")
241

    
242
    self.queue = queue
243
    self.id = int(job_id)
244
    self.ops = [_QueuedOpCode(op) for op in ops]
245
    self.log_serial = 0
246
    self.received_timestamp = TimeStampNow()
247
    self.start_timestamp = None
248
    self.end_timestamp = None
249
    self.archived = False
250

    
251
    self._InitInMemory(self, writable)
252

    
253
    assert not self.archived, "New jobs can not be marked as archived"
254

    
255
  @staticmethod
256
  def _InitInMemory(obj, writable):
257
    """Initializes in-memory variables.
258

259
    """
260
    obj.writable = writable
261
    obj.ops_iter = None
262
    obj.cur_opctx = None
263

    
264
    # Read-only jobs are not processed and therefore don't need a lock
265
    if writable:
266
      obj.processor_lock = threading.Lock()
267
    else:
268
      obj.processor_lock = None
269

    
270
  def __repr__(self):
271
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
272
              "id=%s" % self.id,
273
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
274

    
275
    return "<%s at %#x>" % (" ".join(status), id(self))
276

    
277
  @classmethod
278
  def Restore(cls, queue, state, writable, archived):
279
    """Restore a _QueuedJob from serialized state:
280

281
    @type queue: L{JobQueue}
282
    @param queue: to which queue the restored job belongs
283
    @type state: dict
284
    @param state: the serialized state
285
    @type writable: bool
286
    @param writable: Whether job can be modified
287
    @type archived: bool
288
    @param archived: Whether job was already archived
289
    @rtype: _JobQueue
290
    @return: the restored _JobQueue instance
291

292
    """
293
    obj = _QueuedJob.__new__(cls)
294
    obj.queue = queue
295
    obj.id = int(state["id"])
296
    obj.received_timestamp = state.get("received_timestamp", None)
297
    obj.start_timestamp = state.get("start_timestamp", None)
298
    obj.end_timestamp = state.get("end_timestamp", None)
299
    obj.archived = archived
300

    
301
    obj.ops = []
302
    obj.log_serial = 0
303
    for op_state in state["ops"]:
304
      op = _QueuedOpCode.Restore(op_state)
305
      for log_entry in op.log:
306
        obj.log_serial = max(obj.log_serial, log_entry[0])
307
      obj.ops.append(op)
308

    
309
    cls._InitInMemory(obj, writable)
310

    
311
    return obj
312

    
313
  def Serialize(self):
314
    """Serialize the _JobQueue instance.
315

316
    @rtype: dict
317
    @return: the serialized state
318

319
    """
320
    return {
321
      "id": self.id,
322
      "ops": [op.Serialize() for op in self.ops],
323
      "start_timestamp": self.start_timestamp,
324
      "end_timestamp": self.end_timestamp,
325
      "received_timestamp": self.received_timestamp,
326
      }
327

    
328
  def CalcStatus(self):
329
    """Compute the status of this job.
330

331
    This function iterates over all the _QueuedOpCodes in the job and
332
    based on their status, computes the job status.
333

334
    The algorithm is:
335
      - if we find a cancelled, or finished with error, the job
336
        status will be the same
337
      - otherwise, the last opcode with the status one of:
338
          - waitlock
339
          - canceling
340
          - running
341

342
        will determine the job status
343

344
      - otherwise, it means either all opcodes are queued, or success,
345
        and the job status will be the same
346

347
    @return: the job status
348

349
    """
350
    status = constants.JOB_STATUS_QUEUED
351

    
352
    all_success = True
353
    for op in self.ops:
354
      if op.status == constants.OP_STATUS_SUCCESS:
355
        continue
356

    
357
      all_success = False
358

    
359
      if op.status == constants.OP_STATUS_QUEUED:
360
        pass
361
      elif op.status == constants.OP_STATUS_WAITING:
362
        status = constants.JOB_STATUS_WAITING
363
      elif op.status == constants.OP_STATUS_RUNNING:
364
        status = constants.JOB_STATUS_RUNNING
365
      elif op.status == constants.OP_STATUS_CANCELING:
366
        status = constants.JOB_STATUS_CANCELING
367
        break
368
      elif op.status == constants.OP_STATUS_ERROR:
369
        status = constants.JOB_STATUS_ERROR
370
        # The whole job fails if one opcode failed
371
        break
372
      elif op.status == constants.OP_STATUS_CANCELED:
373
        status = constants.OP_STATUS_CANCELED
374
        break
375

    
376
    if all_success:
377
      status = constants.JOB_STATUS_SUCCESS
378

    
379
    return status
380

    
381
  def CalcPriority(self):
382
    """Gets the current priority for this job.
383

384
    Only unfinished opcodes are considered. When all are done, the default
385
    priority is used.
386

387
    @rtype: int
388

389
    """
390
    priorities = [op.priority for op in self.ops
391
                  if op.status not in constants.OPS_FINALIZED]
392

    
393
    if not priorities:
394
      # All opcodes are done, assume default priority
395
      return constants.OP_PRIO_DEFAULT
396

    
397
    return min(priorities)
398

    
399
  def GetLogEntries(self, newer_than):
400
    """Selectively returns the log entries.
401

402
    @type newer_than: None or int
403
    @param newer_than: if this is None, return all log entries,
404
        otherwise return only the log entries with serial higher
405
        than this value
406
    @rtype: list
407
    @return: the list of the log entries selected
408

409
    """
410
    if newer_than is None:
411
      serial = -1
412
    else:
413
      serial = newer_than
414

    
415
    entries = []
416
    for op in self.ops:
417
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
418

    
419
    return entries
420

    
421
  def GetInfo(self, fields):
422
    """Returns information about a job.
423

424
    @type fields: list
425
    @param fields: names of fields to return
426
    @rtype: list
427
    @return: list with one element for each field
428
    @raise errors.OpExecError: when an invalid field
429
        has been passed
430

431
    """
432
    return _SimpleJobQuery(fields)(self)
433

    
434
  def MarkUnfinishedOps(self, status, result):
435
    """Mark unfinished opcodes with a given status and result.
436

437
    This is an utility function for marking all running or waiting to
438
    be run opcodes with a given status. Opcodes which are already
439
    finalised are not changed.
440

441
    @param status: a given opcode status
442
    @param result: the opcode result
443

444
    """
445
    not_marked = True
446
    for op in self.ops:
447
      if op.status in constants.OPS_FINALIZED:
448
        assert not_marked, "Finalized opcodes found after non-finalized ones"
449
        continue
450
      op.status = status
451
      op.result = result
452
      not_marked = False
453

    
454
  def Finalize(self):
455
    """Marks the job as finalized.
456

457
    """
458
    self.end_timestamp = TimeStampNow()
459

    
460
  def Cancel(self):
461
    """Marks job as canceled/-ing if possible.
462

463
    @rtype: tuple; (bool, string)
464
    @return: Boolean describing whether job was successfully canceled or marked
465
      as canceling and a text message
466

467
    """
468
    status = self.CalcStatus()
469

    
470
    if status == constants.JOB_STATUS_QUEUED:
471
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
472
                             "Job canceled by request")
473
      self.Finalize()
474
      return (True, "Job %s canceled" % self.id)
475

    
476
    elif status == constants.JOB_STATUS_WAITING:
477
      # The worker will notice the new status and cancel the job
478
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
479
      return (True, "Job %s will be canceled" % self.id)
480

    
481
    else:
482
      logging.debug("Job %s is no longer waiting in the queue", self.id)
483
      return (False, "Job %s is no longer waiting in the queue" % self.id)
484

    
485
  def ChangePriority(self, priority):
486
    """Changes the job priority.
487

488
    @type priority: int
489
    @param priority: New priority
490
    @rtype: tuple; (bool, string)
491
    @return: Boolean describing whether job's priority was successfully changed
492
      and a text message
493

494
    """
495
    status = self.CalcStatus()
496

    
497
    if status in constants.JOBS_FINALIZED:
498
      return (False, "Job %s is finished" % self.id)
499
    elif status == constants.JOB_STATUS_CANCELING:
500
      return (False, "Job %s is cancelling" % self.id)
501
    else:
502
      assert status in (constants.JOB_STATUS_QUEUED,
503
                        constants.JOB_STATUS_WAITING,
504
                        constants.JOB_STATUS_RUNNING)
505

    
506
      changed = False
507
      for op in self.ops:
508
        if (op.status == constants.OP_STATUS_RUNNING or
509
            op.status in constants.OPS_FINALIZED):
510
          assert not changed, \
511
            ("Found opcode for which priority should not be changed after"
512
             " priority has been changed for previous opcodes")
513
          continue
514

    
515
        assert op.status in (constants.OP_STATUS_QUEUED,
516
                             constants.OP_STATUS_WAITING)
517

    
518
        changed = True
519

    
520
        # Set new priority (doesn't modify opcode input)
521
        op.priority = priority
522

    
523
      if changed:
524
        return (True, ("Priorities of pending opcodes for job %s have been"
525
                       " changed to %s" % (self.id, priority)))
526
      else:
527
        return (False, "Job %s had no pending opcodes" % self.id)
528

    
529

    
530
class _OpExecCallbacks(mcpu.OpExecCbBase):
531
  def __init__(self, queue, job, op):
532
    """Initializes this class.
533

534
    @type queue: L{JobQueue}
535
    @param queue: Job queue
536
    @type job: L{_QueuedJob}
537
    @param job: Job object
538
    @type op: L{_QueuedOpCode}
539
    @param op: OpCode
540

541
    """
542
    assert queue, "Queue is missing"
543
    assert job, "Job is missing"
544
    assert op, "Opcode is missing"
545

    
546
    self._queue = queue
547
    self._job = job
548
    self._op = op
549

    
550
  def _CheckCancel(self):
551
    """Raises an exception to cancel the job if asked to.
552

553
    """
554
    # Cancel here if we were asked to
555
    if self._op.status == constants.OP_STATUS_CANCELING:
556
      logging.debug("Canceling opcode")
557
      raise CancelJob()
558

    
559
    # See if queue is shutting down
560
    if not self._queue.AcceptingJobsUnlocked():
561
      logging.debug("Queue is shutting down")
562
      raise QueueShutdown()
563

    
564
  @locking.ssynchronized(_QUEUE, shared=1)
565
  def NotifyStart(self):
566
    """Mark the opcode as running, not lock-waiting.
567

568
    This is called from the mcpu code as a notifier function, when the LU is
569
    finally about to start the Exec() method. Of course, to have end-user
570
    visible results, the opcode must be initially (before calling into
571
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
572

573
    """
574
    assert self._op in self._job.ops
575
    assert self._op.status in (constants.OP_STATUS_WAITING,
576
                               constants.OP_STATUS_CANCELING)
577

    
578
    # Cancel here if we were asked to
579
    self._CheckCancel()
580

    
581
    logging.debug("Opcode is now running")
582

    
583
    self._op.status = constants.OP_STATUS_RUNNING
584
    self._op.exec_timestamp = TimeStampNow()
585

    
586
    # And finally replicate the job status
587
    self._queue.UpdateJobUnlocked(self._job)
588

    
589
  @locking.ssynchronized(_QUEUE, shared=1)
590
  def _AppendFeedback(self, timestamp, log_type, log_msg):
591
    """Internal feedback append function, with locks
592

593
    """
594
    self._job.log_serial += 1
595
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
596
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
597

    
598
  def Feedback(self, *args):
599
    """Append a log entry.
600

601
    """
602
    assert len(args) < 3
603

    
604
    if len(args) == 1:
605
      log_type = constants.ELOG_MESSAGE
606
      log_msg = args[0]
607
    else:
608
      (log_type, log_msg) = args
609

    
610
    # The time is split to make serialization easier and not lose
611
    # precision.
612
    timestamp = utils.SplitTime(time.time())
613
    self._AppendFeedback(timestamp, log_type, log_msg)
614

    
615
  def CurrentPriority(self):
616
    """Returns current priority for opcode.
617

618
    """
619
    assert self._op.status in (constants.OP_STATUS_WAITING,
620
                               constants.OP_STATUS_CANCELING)
621

    
622
    # Cancel here if we were asked to
623
    self._CheckCancel()
624

    
625
    return self._op.priority
626

    
627
  def SubmitManyJobs(self, jobs):
628
    """Submits jobs for processing.
629

630
    See L{JobQueue.SubmitManyJobs}.
631

632
    """
633
    # Locking is done in job queue
634
    return self._queue.SubmitManyJobs(jobs)
635

    
636

    
637
class _JobChangesChecker(object):
638
  def __init__(self, fields, prev_job_info, prev_log_serial):
639
    """Initializes this class.
640

641
    @type fields: list of strings
642
    @param fields: Fields requested by LUXI client
643
    @type prev_job_info: string
644
    @param prev_job_info: previous job info, as passed by the LUXI client
645
    @type prev_log_serial: string
646
    @param prev_log_serial: previous job serial, as passed by the LUXI client
647

648
    """
649
    self._squery = _SimpleJobQuery(fields)
650
    self._prev_job_info = prev_job_info
651
    self._prev_log_serial = prev_log_serial
652

    
653
  def __call__(self, job):
654
    """Checks whether job has changed.
655

656
    @type job: L{_QueuedJob}
657
    @param job: Job object
658

659
    """
660
    assert not job.writable, "Expected read-only job"
661

    
662
    status = job.CalcStatus()
663
    job_info = self._squery(job)
664
    log_entries = job.GetLogEntries(self._prev_log_serial)
665

    
666
    # Serializing and deserializing data can cause type changes (e.g. from
667
    # tuple to list) or precision loss. We're doing it here so that we get
668
    # the same modifications as the data received from the client. Without
669
    # this, the comparison afterwards might fail without the data being
670
    # significantly different.
671
    # TODO: we just deserialized from disk, investigate how to make sure that
672
    # the job info and log entries are compatible to avoid this further step.
673
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
674
    # efficient, though floats will be tricky
675
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
676
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
677

    
678
    # Don't even try to wait if the job is no longer running, there will be
679
    # no changes.
680
    if (status not in (constants.JOB_STATUS_QUEUED,
681
                       constants.JOB_STATUS_RUNNING,
682
                       constants.JOB_STATUS_WAITING) or
683
        job_info != self._prev_job_info or
684
        (log_entries and self._prev_log_serial != log_entries[0][0])):
685
      logging.debug("Job %s changed", job.id)
686
      return (job_info, log_entries)
687

    
688
    return None
689

    
690

    
691
class _JobFileChangesWaiter(object):
692
  def __init__(self, filename):
693
    """Initializes this class.
694

695
    @type filename: string
696
    @param filename: Path to job file
697
    @raises errors.InotifyError: if the notifier cannot be setup
698

699
    """
700
    self._wm = pyinotify.WatchManager()
701
    self._inotify_handler = \
702
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
703
    self._notifier = \
704
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
705
    try:
706
      self._inotify_handler.enable()
707
    except Exception:
708
      # pyinotify doesn't close file descriptors automatically
709
      self._notifier.stop()
710
      raise
711

    
712
  def _OnInotify(self, notifier_enabled):
713
    """Callback for inotify.
714

715
    """
716
    if not notifier_enabled:
717
      self._inotify_handler.enable()
718

    
719
  def Wait(self, timeout):
720
    """Waits for the job file to change.
721

722
    @type timeout: float
723
    @param timeout: Timeout in seconds
724
    @return: Whether there have been events
725

726
    """
727
    assert timeout >= 0
728
    have_events = self._notifier.check_events(timeout * 1000)
729
    if have_events:
730
      self._notifier.read_events()
731
    self._notifier.process_events()
732
    return have_events
733

    
734
  def Close(self):
735
    """Closes underlying notifier and its file descriptor.
736

737
    """
738
    self._notifier.stop()
739

    
740

    
741
class _JobChangesWaiter(object):
742
  def __init__(self, filename):
743
    """Initializes this class.
744

745
    @type filename: string
746
    @param filename: Path to job file
747

748
    """
749
    self._filewaiter = None
750
    self._filename = filename
751

    
752
  def Wait(self, timeout):
753
    """Waits for a job to change.
754

755
    @type timeout: float
756
    @param timeout: Timeout in seconds
757
    @return: Whether there have been events
758

759
    """
760
    if self._filewaiter:
761
      return self._filewaiter.Wait(timeout)
762

    
763
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
764
    # If this point is reached, return immediately and let caller check the job
765
    # file again in case there were changes since the last check. This avoids a
766
    # race condition.
767
    self._filewaiter = _JobFileChangesWaiter(self._filename)
768

    
769
    return True
770

    
771
  def Close(self):
772
    """Closes underlying waiter.
773

774
    """
775
    if self._filewaiter:
776
      self._filewaiter.Close()
777

    
778

    
779
class _WaitForJobChangesHelper(object):
780
  """Helper class using inotify to wait for changes in a job file.
781

782
  This class takes a previous job status and serial, and alerts the client when
783
  the current job status has changed.
784

785
  """
786
  @staticmethod
787
  def _CheckForChanges(counter, job_load_fn, check_fn):
788
    if counter.next() > 0:
789
      # If this isn't the first check the job is given some more time to change
790
      # again. This gives better performance for jobs generating many
791
      # changes/messages.
792
      time.sleep(0.1)
793

    
794
    job = job_load_fn()
795
    if not job:
796
      raise errors.JobLost()
797

    
798
    result = check_fn(job)
799
    if result is None:
800
      raise utils.RetryAgain()
801

    
802
    return result
803

    
804
  def __call__(self, filename, job_load_fn,
805
               fields, prev_job_info, prev_log_serial, timeout):
806
    """Waits for changes on a job.
807

808
    @type filename: string
809
    @param filename: File on which to wait for changes
810
    @type job_load_fn: callable
811
    @param job_load_fn: Function to load job
812
    @type fields: list of strings
813
    @param fields: Which fields to check for changes
814
    @type prev_job_info: list or None
815
    @param prev_job_info: Last job information returned
816
    @type prev_log_serial: int
817
    @param prev_log_serial: Last job message serial number
818
    @type timeout: float
819
    @param timeout: maximum time to wait in seconds
820

821
    """
822
    counter = itertools.count()
823
    try:
824
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
825
      waiter = _JobChangesWaiter(filename)
826
      try:
827
        return utils.Retry(compat.partial(self._CheckForChanges,
828
                                          counter, job_load_fn, check_fn),
829
                           utils.RETRY_REMAINING_TIME, timeout,
830
                           wait_fn=waiter.Wait)
831
      finally:
832
        waiter.Close()
833
    except (errors.InotifyError, errors.JobLost):
834
      return None
835
    except utils.RetryTimeout:
836
      return constants.JOB_NOTCHANGED
837

    
838

    
839
def _EncodeOpError(err):
840
  """Encodes an error which occurred while processing an opcode.
841

842
  """
843
  if isinstance(err, errors.GenericError):
844
    to_encode = err
845
  else:
846
    to_encode = errors.OpExecError(str(err))
847

    
848
  return errors.EncodeException(to_encode)
849

    
850

    
851
class _TimeoutStrategyWrapper:
852
  def __init__(self, fn):
853
    """Initializes this class.
854

855
    """
856
    self._fn = fn
857
    self._next = None
858

    
859
  def _Advance(self):
860
    """Gets the next timeout if necessary.
861

862
    """
863
    if self._next is None:
864
      self._next = self._fn()
865

    
866
  def Peek(self):
867
    """Returns the next timeout.
868

869
    """
870
    self._Advance()
871
    return self._next
872

    
873
  def Next(self):
874
    """Returns the current timeout and advances the internal state.
875

876
    """
877
    self._Advance()
878
    result = self._next
879
    self._next = None
880
    return result
881

    
882

    
883
class _OpExecContext:
884
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
885
    """Initializes this class.
886

887
    """
888
    self.op = op
889
    self.index = index
890
    self.log_prefix = log_prefix
891
    self.summary = op.input.Summary()
892

    
893
    # Create local copy to modify
894
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
895
      self.jobdeps = op.input.depends[:]
896
    else:
897
      self.jobdeps = None
898

    
899
    self._timeout_strategy_factory = timeout_strategy_factory
900
    self._ResetTimeoutStrategy()
901

    
902
  def _ResetTimeoutStrategy(self):
903
    """Creates a new timeout strategy.
904

905
    """
906
    self._timeout_strategy = \
907
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
908

    
909
  def CheckPriorityIncrease(self):
910
    """Checks whether priority can and should be increased.
911

912
    Called when locks couldn't be acquired.
913

914
    """
915
    op = self.op
916

    
917
    # Exhausted all retries and next round should not use blocking acquire
918
    # for locks?
919
    if (self._timeout_strategy.Peek() is None and
920
        op.priority > constants.OP_PRIO_HIGHEST):
921
      logging.debug("Increasing priority")
922
      op.priority -= 1
923
      self._ResetTimeoutStrategy()
924
      return True
925

    
926
    return False
927

    
928
  def GetNextLockTimeout(self):
929
    """Returns the next lock acquire timeout.
930

931
    """
932
    return self._timeout_strategy.Next()
933

    
934

    
935
class _JobProcessor(object):
936
  (DEFER,
937
   WAITDEP,
938
   FINISHED) = range(1, 4)
939

    
940
  def __init__(self, queue, opexec_fn, job,
941
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
942
    """Initializes this class.
943

944
    """
945
    self.queue = queue
946
    self.opexec_fn = opexec_fn
947
    self.job = job
948
    self._timeout_strategy_factory = _timeout_strategy_factory
949

    
950
  @staticmethod
951
  def _FindNextOpcode(job, timeout_strategy_factory):
952
    """Locates the next opcode to run.
953

954
    @type job: L{_QueuedJob}
955
    @param job: Job object
956
    @param timeout_strategy_factory: Callable to create new timeout strategy
957

958
    """
959
    # Create some sort of a cache to speed up locating next opcode for future
960
    # lookups
961
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
962
    # pending and one for processed ops.
963
    if job.ops_iter is None:
964
      job.ops_iter = enumerate(job.ops)
965

    
966
    # Find next opcode to run
967
    while True:
968
      try:
969
        (idx, op) = job.ops_iter.next()
970
      except StopIteration:
971
        raise errors.ProgrammerError("Called for a finished job")
972

    
973
      if op.status == constants.OP_STATUS_RUNNING:
974
        # Found an opcode already marked as running
975
        raise errors.ProgrammerError("Called for job marked as running")
976

    
977
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
978
                             timeout_strategy_factory)
979

    
980
      if op.status not in constants.OPS_FINALIZED:
981
        return opctx
982

    
983
      # This is a job that was partially completed before master daemon
984
      # shutdown, so it can be expected that some opcodes are already
985
      # completed successfully (if any did error out, then the whole job
986
      # should have been aborted and not resubmitted for processing).
987
      logging.info("%s: opcode %s already processed, skipping",
988
                   opctx.log_prefix, opctx.summary)
989

    
990
  @staticmethod
991
  def _MarkWaitlock(job, op):
992
    """Marks an opcode as waiting for locks.
993

994
    The job's start timestamp is also set if necessary.
995

996
    @type job: L{_QueuedJob}
997
    @param job: Job object
998
    @type op: L{_QueuedOpCode}
999
    @param op: Opcode object
1000

1001
    """
1002
    assert op in job.ops
1003
    assert op.status in (constants.OP_STATUS_QUEUED,
1004
                         constants.OP_STATUS_WAITING)
1005

    
1006
    update = False
1007

    
1008
    op.result = None
1009

    
1010
    if op.status == constants.OP_STATUS_QUEUED:
1011
      op.status = constants.OP_STATUS_WAITING
1012
      update = True
1013

    
1014
    if op.start_timestamp is None:
1015
      op.start_timestamp = TimeStampNow()
1016
      update = True
1017

    
1018
    if job.start_timestamp is None:
1019
      job.start_timestamp = op.start_timestamp
1020
      update = True
1021

    
1022
    assert op.status == constants.OP_STATUS_WAITING
1023

    
1024
    return update
1025

    
1026
  @staticmethod
1027
  def _CheckDependencies(queue, job, opctx):
1028
    """Checks if an opcode has dependencies and if so, processes them.
1029

1030
    @type queue: L{JobQueue}
1031
    @param queue: Queue object
1032
    @type job: L{_QueuedJob}
1033
    @param job: Job object
1034
    @type opctx: L{_OpExecContext}
1035
    @param opctx: Opcode execution context
1036
    @rtype: bool
1037
    @return: Whether opcode will be re-scheduled by dependency tracker
1038

1039
    """
1040
    op = opctx.op
1041

    
1042
    result = False
1043

    
1044
    while opctx.jobdeps:
1045
      (dep_job_id, dep_status) = opctx.jobdeps[0]
1046

    
1047
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1048
                                                          dep_status)
1049
      assert ht.TNonEmptyString(depmsg), "No dependency message"
1050

    
1051
      logging.info("%s: %s", opctx.log_prefix, depmsg)
1052

    
1053
      if depresult == _JobDependencyManager.CONTINUE:
1054
        # Remove dependency and continue
1055
        opctx.jobdeps.pop(0)
1056

    
1057
      elif depresult == _JobDependencyManager.WAIT:
1058
        # Need to wait for notification, dependency tracker will re-add job
1059
        # to workerpool
1060
        result = True
1061
        break
1062

    
1063
      elif depresult == _JobDependencyManager.CANCEL:
1064
        # Job was cancelled, cancel this job as well
1065
        job.Cancel()
1066
        assert op.status == constants.OP_STATUS_CANCELING
1067
        break
1068

    
1069
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1070
                         _JobDependencyManager.ERROR):
1071
        # Job failed or there was an error, this job must fail
1072
        op.status = constants.OP_STATUS_ERROR
1073
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1074
        break
1075

    
1076
      else:
1077
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1078
                                     depresult)
1079

    
1080
    return result
1081

    
1082
  def _ExecOpCodeUnlocked(self, opctx):
1083
    """Processes one opcode and returns the result.
1084

1085
    """
1086
    op = opctx.op
1087

    
1088
    assert op.status == constants.OP_STATUS_WAITING
1089

    
1090
    timeout = opctx.GetNextLockTimeout()
1091

    
1092
    try:
1093
      # Make sure not to hold queue lock while calling ExecOpCode
1094
      result = self.opexec_fn(op.input,
1095
                              _OpExecCallbacks(self.queue, self.job, op),
1096
                              timeout=timeout)
1097
    except mcpu.LockAcquireTimeout:
1098
      assert timeout is not None, "Received timeout for blocking acquire"
1099
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1100

    
1101
      assert op.status in (constants.OP_STATUS_WAITING,
1102
                           constants.OP_STATUS_CANCELING)
1103

    
1104
      # Was job cancelled while we were waiting for the lock?
1105
      if op.status == constants.OP_STATUS_CANCELING:
1106
        return (constants.OP_STATUS_CANCELING, None)
1107

    
1108
      # Queue is shutting down, return to queued
1109
      if not self.queue.AcceptingJobsUnlocked():
1110
        return (constants.OP_STATUS_QUEUED, None)
1111

    
1112
      # Stay in waitlock while trying to re-acquire lock
1113
      return (constants.OP_STATUS_WAITING, None)
1114
    except CancelJob:
1115
      logging.exception("%s: Canceling job", opctx.log_prefix)
1116
      assert op.status == constants.OP_STATUS_CANCELING
1117
      return (constants.OP_STATUS_CANCELING, None)
1118

    
1119
    except QueueShutdown:
1120
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1121

    
1122
      assert op.status == constants.OP_STATUS_WAITING
1123

    
1124
      # Job hadn't been started yet, so it should return to the queue
1125
      return (constants.OP_STATUS_QUEUED, None)
1126

    
1127
    except Exception, err: # pylint: disable=W0703
1128
      logging.exception("%s: Caught exception in %s",
1129
                        opctx.log_prefix, opctx.summary)
1130
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1131
    else:
1132
      logging.debug("%s: %s successful",
1133
                    opctx.log_prefix, opctx.summary)
1134
      return (constants.OP_STATUS_SUCCESS, result)
1135

    
1136
  def __call__(self, _nextop_fn=None):
1137
    """Continues execution of a job.
1138

1139
    @param _nextop_fn: Callback function for tests
1140
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1141
      be deferred and C{WAITDEP} if the dependency manager
1142
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1143

1144
    """
1145
    queue = self.queue
1146
    job = self.job
1147

    
1148
    logging.debug("Processing job %s", job.id)
1149

    
1150
    queue.acquire(shared=1)
1151
    try:
1152
      opcount = len(job.ops)
1153

    
1154
      assert job.writable, "Expected writable job"
1155

    
1156
      # Don't do anything for finalized jobs
1157
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1158
        return self.FINISHED
1159

    
1160
      # Is a previous opcode still pending?
1161
      if job.cur_opctx:
1162
        opctx = job.cur_opctx
1163
        job.cur_opctx = None
1164
      else:
1165
        if __debug__ and _nextop_fn:
1166
          _nextop_fn()
1167
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1168

    
1169
      op = opctx.op
1170

    
1171
      # Consistency check
1172
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1173
                                     constants.OP_STATUS_CANCELING)
1174
                        for i in job.ops[opctx.index + 1:])
1175

    
1176
      assert op.status in (constants.OP_STATUS_QUEUED,
1177
                           constants.OP_STATUS_WAITING,
1178
                           constants.OP_STATUS_CANCELING)
1179

    
1180
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1181
              op.priority >= constants.OP_PRIO_HIGHEST)
1182

    
1183
      waitjob = None
1184

    
1185
      if op.status != constants.OP_STATUS_CANCELING:
1186
        assert op.status in (constants.OP_STATUS_QUEUED,
1187
                             constants.OP_STATUS_WAITING)
1188

    
1189
        # Prepare to start opcode
1190
        if self._MarkWaitlock(job, op):
1191
          # Write to disk
1192
          queue.UpdateJobUnlocked(job)
1193

    
1194
        assert op.status == constants.OP_STATUS_WAITING
1195
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1196
        assert job.start_timestamp and op.start_timestamp
1197
        assert waitjob is None
1198

    
1199
        # Check if waiting for a job is necessary
1200
        waitjob = self._CheckDependencies(queue, job, opctx)
1201

    
1202
        assert op.status in (constants.OP_STATUS_WAITING,
1203
                             constants.OP_STATUS_CANCELING,
1204
                             constants.OP_STATUS_ERROR)
1205

    
1206
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1207
                                         constants.OP_STATUS_ERROR)):
1208
          logging.info("%s: opcode %s waiting for locks",
1209
                       opctx.log_prefix, opctx.summary)
1210

    
1211
          assert not opctx.jobdeps, "Not all dependencies were removed"
1212

    
1213
          queue.release()
1214
          try:
1215
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1216
          finally:
1217
            queue.acquire(shared=1)
1218

    
1219
          op.status = op_status
1220
          op.result = op_result
1221

    
1222
          assert not waitjob
1223

    
1224
        if op.status in (constants.OP_STATUS_WAITING,
1225
                         constants.OP_STATUS_QUEUED):
1226
          # waiting: Couldn't get locks in time
1227
          # queued: Queue is shutting down
1228
          assert not op.end_timestamp
1229
        else:
1230
          # Finalize opcode
1231
          op.end_timestamp = TimeStampNow()
1232

    
1233
          if op.status == constants.OP_STATUS_CANCELING:
1234
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1235
                                  for i in job.ops[opctx.index:])
1236
          else:
1237
            assert op.status in constants.OPS_FINALIZED
1238

    
1239
      if op.status == constants.OP_STATUS_QUEUED:
1240
        # Queue is shutting down
1241
        assert not waitjob
1242

    
1243
        finalize = False
1244

    
1245
        # Reset context
1246
        job.cur_opctx = None
1247

    
1248
        # In no case must the status be finalized here
1249
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1250

    
1251
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1252
        finalize = False
1253

    
1254
        if not waitjob and opctx.CheckPriorityIncrease():
1255
          # Priority was changed, need to update on-disk file
1256
          queue.UpdateJobUnlocked(job)
1257

    
1258
        # Keep around for another round
1259
        job.cur_opctx = opctx
1260

    
1261
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1262
                op.priority >= constants.OP_PRIO_HIGHEST)
1263

    
1264
        # In no case must the status be finalized here
1265
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1266

    
1267
      else:
1268
        # Ensure all opcodes so far have been successful
1269
        assert (opctx.index == 0 or
1270
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1271
                           for i in job.ops[:opctx.index]))
1272

    
1273
        # Reset context
1274
        job.cur_opctx = None
1275

    
1276
        if op.status == constants.OP_STATUS_SUCCESS:
1277
          finalize = False
1278

    
1279
        elif op.status == constants.OP_STATUS_ERROR:
1280
          # Ensure failed opcode has an exception as its result
1281
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1282

    
1283
          to_encode = errors.OpExecError("Preceding opcode failed")
1284
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1285
                                _EncodeOpError(to_encode))
1286
          finalize = True
1287

    
1288
          # Consistency check
1289
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1290
                            errors.GetEncodedError(i.result)
1291
                            for i in job.ops[opctx.index:])
1292

    
1293
        elif op.status == constants.OP_STATUS_CANCELING:
1294
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1295
                                "Job canceled by request")
1296
          finalize = True
1297

    
1298
        else:
1299
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1300

    
1301
        if opctx.index == (opcount - 1):
1302
          # Finalize on last opcode
1303
          finalize = True
1304

    
1305
        if finalize:
1306
          # All opcodes have been run, finalize job
1307
          job.Finalize()
1308

    
1309
        # Write to disk. If the job status is final, this is the final write
1310
        # allowed. Once the file has been written, it can be archived anytime.
1311
        queue.UpdateJobUnlocked(job)
1312

    
1313
        assert not waitjob
1314

    
1315
        if finalize:
1316
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1317
          return self.FINISHED
1318

    
1319
      assert not waitjob or queue.depmgr.JobWaiting(job)
1320

    
1321
      if waitjob:
1322
        return self.WAITDEP
1323
      else:
1324
        return self.DEFER
1325
    finally:
1326
      assert job.writable, "Job became read-only while being processed"
1327
      queue.release()
1328

    
1329

    
1330
def _EvaluateJobProcessorResult(depmgr, job, result):
1331
  """Looks at a result from L{_JobProcessor} for a job.
1332

1333
  To be used in a L{_JobQueueWorker}.
1334

1335
  """
1336
  if result == _JobProcessor.FINISHED:
1337
    # Notify waiting jobs
1338
    depmgr.NotifyWaiters(job.id)
1339

    
1340
  elif result == _JobProcessor.DEFER:
1341
    # Schedule again
1342
    raise workerpool.DeferTask(priority=job.CalcPriority())
1343

    
1344
  elif result == _JobProcessor.WAITDEP:
1345
    # No-op, dependency manager will re-schedule
1346
    pass
1347

    
1348
  else:
1349
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1350
                                 (result, ))
1351

    
1352

    
1353
class _JobQueueWorker(workerpool.BaseWorker):
1354
  """The actual job workers.
1355

1356
  """
1357
  def RunTask(self, job): # pylint: disable=W0221
1358
    """Job executor.
1359

1360
    @type job: L{_QueuedJob}
1361
    @param job: the job to be processed
1362

1363
    """
1364
    assert job.writable, "Expected writable job"
1365

    
1366
    # Ensure only one worker is active on a single job. If a job registers for
1367
    # a dependency job, and the other job notifies before the first worker is
1368
    # done, the job can end up in the tasklist more than once.
1369
    job.processor_lock.acquire()
1370
    try:
1371
      return self._RunTaskInner(job)
1372
    finally:
1373
      job.processor_lock.release()
1374

    
1375
  def _RunTaskInner(self, job):
1376
    """Executes a job.
1377

1378
    Must be called with per-job lock acquired.
1379

1380
    """
1381
    queue = job.queue
1382
    assert queue == self.pool.queue
1383

    
1384
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1385
    setname_fn(None)
1386

    
1387
    proc = mcpu.Processor(queue.context, job.id)
1388

    
1389
    # Create wrapper for setting thread name
1390
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1391
                                    proc.ExecOpCode)
1392

    
1393
    _EvaluateJobProcessorResult(queue.depmgr, job,
1394
                                _JobProcessor(queue, wrap_execop_fn, job)())
1395

    
1396
  @staticmethod
1397
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1398
    """Updates the worker thread name to include a short summary of the opcode.
1399

1400
    @param setname_fn: Callable setting worker thread name
1401
    @param execop_fn: Callable for executing opcode (usually
1402
                      L{mcpu.Processor.ExecOpCode})
1403

1404
    """
1405
    setname_fn(op)
1406
    try:
1407
      return execop_fn(op, *args, **kwargs)
1408
    finally:
1409
      setname_fn(None)
1410

    
1411
  @staticmethod
1412
  def _GetWorkerName(job, op):
1413
    """Sets the worker thread name.
1414

1415
    @type job: L{_QueuedJob}
1416
    @type op: L{opcodes.OpCode}
1417

1418
    """
1419
    parts = ["Job%s" % job.id]
1420

    
1421
    if op:
1422
      parts.append(op.TinySummary())
1423

    
1424
    return "/".join(parts)
1425

    
1426

    
1427
class _JobQueueWorkerPool(workerpool.WorkerPool):
1428
  """Simple class implementing a job-processing workerpool.
1429

1430
  """
1431
  def __init__(self, queue):
1432
    super(_JobQueueWorkerPool, self).__init__("Jq",
1433
                                              JOBQUEUE_THREADS,
1434
                                              _JobQueueWorker)
1435
    self.queue = queue
1436

    
1437

    
1438
class _JobDependencyManager:
1439
  """Keeps track of job dependencies.
1440

1441
  """
1442
  (WAIT,
1443
   ERROR,
1444
   CANCEL,
1445
   CONTINUE,
1446
   WRONGSTATUS) = range(1, 6)
1447

    
1448
  def __init__(self, getstatus_fn, enqueue_fn):
1449
    """Initializes this class.
1450

1451
    """
1452
    self._getstatus_fn = getstatus_fn
1453
    self._enqueue_fn = enqueue_fn
1454

    
1455
    self._waiters = {}
1456
    self._lock = locking.SharedLock("JobDepMgr")
1457

    
1458
  @locking.ssynchronized(_LOCK, shared=1)
1459
  def GetLockInfo(self, requested): # pylint: disable=W0613
1460
    """Retrieves information about waiting jobs.
1461

1462
    @type requested: set
1463
    @param requested: Requested information, see C{query.LQ_*}
1464

1465
    """
1466
    # No need to sort here, that's being done by the lock manager and query
1467
    # library. There are no priorities for notifying jobs, hence all show up as
1468
    # one item under "pending".
1469
    return [("job/%s" % job_id, None, None,
1470
             [("job", [job.id for job in waiters])])
1471
            for job_id, waiters in self._waiters.items()
1472
            if waiters]
1473

    
1474
  @locking.ssynchronized(_LOCK, shared=1)
1475
  def JobWaiting(self, job):
1476
    """Checks if a job is waiting.
1477

1478
    """
1479
    return compat.any(job in jobs
1480
                      for jobs in self._waiters.values())
1481

    
1482
  @locking.ssynchronized(_LOCK)
1483
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1484
    """Checks if a dependency job has the requested status.
1485

1486
    If the other job is not yet in a finalized status, the calling job will be
1487
    notified (re-added to the workerpool) at a later point.
1488

1489
    @type job: L{_QueuedJob}
1490
    @param job: Job object
1491
    @type dep_job_id: int
1492
    @param dep_job_id: ID of dependency job
1493
    @type dep_status: list
1494
    @param dep_status: Required status
1495

1496
    """
1497
    assert ht.TJobId(job.id)
1498
    assert ht.TJobId(dep_job_id)
1499
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1500

    
1501
    if job.id == dep_job_id:
1502
      return (self.ERROR, "Job can't depend on itself")
1503

    
1504
    # Get status of dependency job
1505
    try:
1506
      status = self._getstatus_fn(dep_job_id)
1507
    except errors.JobLost, err:
1508
      return (self.ERROR, "Dependency error: %s" % err)
1509

    
1510
    assert status in constants.JOB_STATUS_ALL
1511

    
1512
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1513

    
1514
    if status not in constants.JOBS_FINALIZED:
1515
      # Register for notification and wait for job to finish
1516
      job_id_waiters.add(job)
1517
      return (self.WAIT,
1518
              "Need to wait for job %s, wanted status '%s'" %
1519
              (dep_job_id, dep_status))
1520

    
1521
    # Remove from waiters list
1522
    if job in job_id_waiters:
1523
      job_id_waiters.remove(job)
1524

    
1525
    if (status == constants.JOB_STATUS_CANCELED and
1526
        constants.JOB_STATUS_CANCELED not in dep_status):
1527
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1528

    
1529
    elif not dep_status or status in dep_status:
1530
      return (self.CONTINUE,
1531
              "Dependency job %s finished with status '%s'" %
1532
              (dep_job_id, status))
1533

    
1534
    else:
1535
      return (self.WRONGSTATUS,
1536
              "Dependency job %s finished with status '%s',"
1537
              " not one of '%s' as required" %
1538
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1539

    
1540
  def _RemoveEmptyWaitersUnlocked(self):
1541
    """Remove all jobs without actual waiters.
1542

1543
    """
1544
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1545
                   if not waiters]:
1546
      del self._waiters[job_id]
1547

    
1548
  def NotifyWaiters(self, job_id):
1549
    """Notifies all jobs waiting for a certain job ID.
1550

1551
    @attention: Do not call until L{CheckAndRegister} returned a status other
1552
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1553
    @type job_id: int
1554
    @param job_id: Job ID
1555

1556
    """
1557
    assert ht.TJobId(job_id)
1558

    
1559
    self._lock.acquire()
1560
    try:
1561
      self._RemoveEmptyWaitersUnlocked()
1562

    
1563
      jobs = self._waiters.pop(job_id, None)
1564
    finally:
1565
      self._lock.release()
1566

    
1567
    if jobs:
1568
      # Re-add jobs to workerpool
1569
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1570
                    len(jobs), job_id)
1571
      self._enqueue_fn(jobs)
1572

    
1573

    
1574
def _RequireOpenQueue(fn):
1575
  """Decorator for "public" functions.
1576

1577
  This function should be used for all 'public' functions. That is,
1578
  functions usually called from other classes. Note that this should
1579
  be applied only to methods (not plain functions), since it expects
1580
  that the decorated function is called with a first argument that has
1581
  a '_queue_filelock' argument.
1582

1583
  @warning: Use this decorator only after locking.ssynchronized
1584

1585
  Example::
1586
    @locking.ssynchronized(_LOCK)
1587
    @_RequireOpenQueue
1588
    def Example(self):
1589
      pass
1590

1591
  """
1592
  def wrapper(self, *args, **kwargs):
1593
    # pylint: disable=W0212
1594
    assert self._queue_filelock is not None, "Queue should be open"
1595
    return fn(self, *args, **kwargs)
1596
  return wrapper
1597

    
1598

    
1599
def _RequireNonDrainedQueue(fn):
1600
  """Decorator checking for a non-drained queue.
1601

1602
  To be used with functions submitting new jobs.
1603

1604
  """
1605
  def wrapper(self, *args, **kwargs):
1606
    """Wrapper function.
1607

1608
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1609

1610
    """
1611
    # Ok when sharing the big job queue lock, as the drain file is created when
1612
    # the lock is exclusive.
1613
    # Needs access to protected member, pylint: disable=W0212
1614
    if self._drained:
1615
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1616

    
1617
    if not self._accepting_jobs:
1618
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1619

    
1620
    return fn(self, *args, **kwargs)
1621
  return wrapper
1622

    
1623

    
1624
class JobQueue(object):
1625
  """Queue used to manage the jobs.
1626

1627
  """
1628
  def __init__(self, context):
1629
    """Constructor for JobQueue.
1630

1631
    The constructor will initialize the job queue object and then
1632
    start loading the current jobs from disk, either for starting them
1633
    (if they were queue) or for aborting them (if they were already
1634
    running).
1635

1636
    @type context: GanetiContext
1637
    @param context: the context object for access to the configuration
1638
        data and other ganeti objects
1639

1640
    """
1641
    self.context = context
1642
    self._memcache = weakref.WeakValueDictionary()
1643
    self._my_hostname = netutils.Hostname.GetSysName()
1644

    
1645
    # The Big JobQueue lock. If a code block or method acquires it in shared
1646
    # mode safe it must guarantee concurrency with all the code acquiring it in
1647
    # shared mode, including itself. In order not to acquire it at all
1648
    # concurrency must be guaranteed with all code acquiring it in shared mode
1649
    # and all code acquiring it exclusively.
1650
    self._lock = locking.SharedLock("JobQueue")
1651

    
1652
    self.acquire = self._lock.acquire
1653
    self.release = self._lock.release
1654

    
1655
    # Accept jobs by default
1656
    self._accepting_jobs = True
1657

    
1658
    # Initialize the queue, and acquire the filelock.
1659
    # This ensures no other process is working on the job queue.
1660
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1661

    
1662
    # Read serial file
1663
    self._last_serial = jstore.ReadSerial()
1664
    assert self._last_serial is not None, ("Serial file was modified between"
1665
                                           " check in jstore and here")
1666

    
1667
    # Get initial list of nodes
1668
    self._nodes = dict((n.name, n.primary_ip)
1669
                       for n in self.context.cfg.GetAllNodesInfo().values()
1670
                       if n.master_candidate)
1671

    
1672
    # Remove master node
1673
    self._nodes.pop(self._my_hostname, None)
1674

    
1675
    # TODO: Check consistency across nodes
1676

    
1677
    self._queue_size = None
1678
    self._UpdateQueueSizeUnlocked()
1679
    assert ht.TInt(self._queue_size)
1680
    self._drained = jstore.CheckDrainFlag()
1681

    
1682
    # Job dependencies
1683
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1684
                                        self._EnqueueJobs)
1685
    self.context.glm.AddToLockMonitor(self.depmgr)
1686

    
1687
    # Setup worker pool
1688
    self._wpool = _JobQueueWorkerPool(self)
1689
    try:
1690
      self._InspectQueue()
1691
    except:
1692
      self._wpool.TerminateWorkers()
1693
      raise
1694

    
1695
  @locking.ssynchronized(_LOCK)
1696
  @_RequireOpenQueue
1697
  def _InspectQueue(self):
1698
    """Loads the whole job queue and resumes unfinished jobs.
1699

1700
    This function needs the lock here because WorkerPool.AddTask() may start a
1701
    job while we're still doing our work.
1702

1703
    """
1704
    logging.info("Inspecting job queue")
1705

    
1706
    restartjobs = []
1707

    
1708
    all_job_ids = self._GetJobIDsUnlocked()
1709
    jobs_count = len(all_job_ids)
1710
    lastinfo = time.time()
1711
    for idx, job_id in enumerate(all_job_ids):
1712
      # Give an update every 1000 jobs or 10 seconds
1713
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1714
          idx == (jobs_count - 1)):
1715
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1716
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1717
        lastinfo = time.time()
1718

    
1719
      job = self._LoadJobUnlocked(job_id)
1720

    
1721
      # a failure in loading the job can cause 'None' to be returned
1722
      if job is None:
1723
        continue
1724

    
1725
      status = job.CalcStatus()
1726

    
1727
      if status == constants.JOB_STATUS_QUEUED:
1728
        restartjobs.append(job)
1729

    
1730
      elif status in (constants.JOB_STATUS_RUNNING,
1731
                      constants.JOB_STATUS_WAITING,
1732
                      constants.JOB_STATUS_CANCELING):
1733
        logging.warning("Unfinished job %s found: %s", job.id, job)
1734

    
1735
        if status == constants.JOB_STATUS_WAITING:
1736
          # Restart job
1737
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1738
          restartjobs.append(job)
1739
        else:
1740
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1741
                                "Unclean master daemon shutdown")
1742
          job.Finalize()
1743

    
1744
        self.UpdateJobUnlocked(job)
1745

    
1746
    if restartjobs:
1747
      logging.info("Restarting %s jobs", len(restartjobs))
1748
      self._EnqueueJobsUnlocked(restartjobs)
1749

    
1750
    logging.info("Job queue inspection finished")
1751

    
1752
  def _GetRpc(self, address_list):
1753
    """Gets RPC runner with context.
1754

1755
    """
1756
    return rpc.JobQueueRunner(self.context, address_list)
1757

    
1758
  @locking.ssynchronized(_LOCK)
1759
  @_RequireOpenQueue
1760
  def AddNode(self, node):
1761
    """Register a new node with the queue.
1762

1763
    @type node: L{objects.Node}
1764
    @param node: the node object to be added
1765

1766
    """
1767
    node_name = node.name
1768
    assert node_name != self._my_hostname
1769

    
1770
    # Clean queue directory on added node
1771
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1772
    msg = result.fail_msg
1773
    if msg:
1774
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1775
                      node_name, msg)
1776

    
1777
    if not node.master_candidate:
1778
      # remove if existing, ignoring errors
1779
      self._nodes.pop(node_name, None)
1780
      # and skip the replication of the job ids
1781
      return
1782

    
1783
    # Upload the whole queue excluding archived jobs
1784
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1785

    
1786
    # Upload current serial file
1787
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1788

    
1789
    # Static address list
1790
    addrs = [node.primary_ip]
1791

    
1792
    for file_name in files:
1793
      # Read file content
1794
      content = utils.ReadFile(file_name)
1795

    
1796
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1797
                             file_name, content)
1798
      msg = result[node_name].fail_msg
1799
      if msg:
1800
        logging.error("Failed to upload file %s to node %s: %s",
1801
                      file_name, node_name, msg)
1802

    
1803
    self._nodes[node_name] = node.primary_ip
1804

    
1805
  @locking.ssynchronized(_LOCK)
1806
  @_RequireOpenQueue
1807
  def RemoveNode(self, node_name):
1808
    """Callback called when removing nodes from the cluster.
1809

1810
    @type node_name: str
1811
    @param node_name: the name of the node to remove
1812

1813
    """
1814
    self._nodes.pop(node_name, None)
1815

    
1816
  @staticmethod
1817
  def _CheckRpcResult(result, nodes, failmsg):
1818
    """Verifies the status of an RPC call.
1819

1820
    Since we aim to keep consistency should this node (the current
1821
    master) fail, we will log errors if our rpc fail, and especially
1822
    log the case when more than half of the nodes fails.
1823

1824
    @param result: the data as returned from the rpc call
1825
    @type nodes: list
1826
    @param nodes: the list of nodes we made the call to
1827
    @type failmsg: str
1828
    @param failmsg: the identifier to be used for logging
1829

1830
    """
1831
    failed = []
1832
    success = []
1833

    
1834
    for node in nodes:
1835
      msg = result[node].fail_msg
1836
      if msg:
1837
        failed.append(node)
1838
        logging.error("RPC call %s (%s) failed on node %s: %s",
1839
                      result[node].call, failmsg, node, msg)
1840
      else:
1841
        success.append(node)
1842

    
1843
    # +1 for the master node
1844
    if (len(success) + 1) < len(failed):
1845
      # TODO: Handle failing nodes
1846
      logging.error("More than half of the nodes failed")
1847

    
1848
  def _GetNodeIp(self):
1849
    """Helper for returning the node name/ip list.
1850

1851
    @rtype: (list, list)
1852
    @return: a tuple of two lists, the first one with the node
1853
        names and the second one with the node addresses
1854

1855
    """
1856
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1857
    name_list = self._nodes.keys()
1858
    addr_list = [self._nodes[name] for name in name_list]
1859
    return name_list, addr_list
1860

    
1861
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1862
    """Writes a file locally and then replicates it to all nodes.
1863

1864
    This function will replace the contents of a file on the local
1865
    node and then replicate it to all the other nodes we have.
1866

1867
    @type file_name: str
1868
    @param file_name: the path of the file to be replicated
1869
    @type data: str
1870
    @param data: the new contents of the file
1871
    @type replicate: boolean
1872
    @param replicate: whether to spread the changes to the remote nodes
1873

1874
    """
1875
    getents = runtime.GetEnts()
1876
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1877
                    gid=getents.masterd_gid)
1878

    
1879
    if replicate:
1880
      names, addrs = self._GetNodeIp()
1881
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1882
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1883

    
1884
  def _RenameFilesUnlocked(self, rename):
1885
    """Renames a file locally and then replicate the change.
1886

1887
    This function will rename a file in the local queue directory
1888
    and then replicate this rename to all the other nodes we have.
1889

1890
    @type rename: list of (old, new)
1891
    @param rename: List containing tuples mapping old to new names
1892

1893
    """
1894
    # Rename them locally
1895
    for old, new in rename:
1896
      utils.RenameFile(old, new, mkdir=True)
1897

    
1898
    # ... and on all nodes
1899
    names, addrs = self._GetNodeIp()
1900
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1901
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1902

    
1903
  def _NewSerialsUnlocked(self, count):
1904
    """Generates a new job identifier.
1905

1906
    Job identifiers are unique during the lifetime of a cluster.
1907

1908
    @type count: integer
1909
    @param count: how many serials to return
1910
    @rtype: list of int
1911
    @return: a list of job identifiers.
1912

1913
    """
1914
    assert ht.TNonNegativeInt(count)
1915

    
1916
    # New number
1917
    serial = self._last_serial + count
1918

    
1919
    # Write to file
1920
    self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1921
                             "%s\n" % serial, True)
1922

    
1923
    result = [jstore.FormatJobID(v)
1924
              for v in range(self._last_serial + 1, serial + 1)]
1925

    
1926
    # Keep it only if we were able to write the file
1927
    self._last_serial = serial
1928

    
1929
    assert len(result) == count
1930

    
1931
    return result
1932

    
1933
  @staticmethod
1934
  def _GetJobPath(job_id):
1935
    """Returns the job file for a given job id.
1936

1937
    @type job_id: str
1938
    @param job_id: the job identifier
1939
    @rtype: str
1940
    @return: the path to the job file
1941

1942
    """
1943
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1944

    
1945
  @staticmethod
1946
  def _GetArchivedJobPath(job_id):
1947
    """Returns the archived job file for a give job id.
1948

1949
    @type job_id: str
1950
    @param job_id: the job identifier
1951
    @rtype: str
1952
    @return: the path to the archived job file
1953

1954
    """
1955
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1956
                          jstore.GetArchiveDirectory(job_id),
1957
                          "job-%s" % job_id)
1958

    
1959
  @staticmethod
1960
  def _DetermineJobDirectories(archived):
1961
    """Build list of directories containing job files.
1962

1963
    @type archived: bool
1964
    @param archived: Whether to include directories for archived jobs
1965
    @rtype: list
1966

1967
    """
1968
    result = [pathutils.QUEUE_DIR]
1969

    
1970
    if archived:
1971
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1972
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
1973
                        utils.ListVisibleFiles(archive_path)))
1974

    
1975
    return result
1976

    
1977
  @classmethod
1978
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1979
    """Return all known job IDs.
1980

1981
    The method only looks at disk because it's a requirement that all
1982
    jobs are present on disk (so in the _memcache we don't have any
1983
    extra IDs).
1984

1985
    @type sort: boolean
1986
    @param sort: perform sorting on the returned job ids
1987
    @rtype: list
1988
    @return: the list of job IDs
1989

1990
    """
1991
    jlist = []
1992

    
1993
    for path in cls._DetermineJobDirectories(archived):
1994
      for filename in utils.ListVisibleFiles(path):
1995
        m = constants.JOB_FILE_RE.match(filename)
1996
        if m:
1997
          jlist.append(int(m.group(1)))
1998

    
1999
    if sort:
2000
      jlist.sort()
2001
    return jlist
2002

    
2003
  def _LoadJobUnlocked(self, job_id):
2004
    """Loads a job from the disk or memory.
2005

2006
    Given a job id, this will return the cached job object if
2007
    existing, or try to load the job from the disk. If loading from
2008
    disk, it will also add the job to the cache.
2009

2010
    @type job_id: int
2011
    @param job_id: the job id
2012
    @rtype: L{_QueuedJob} or None
2013
    @return: either None or the job object
2014

2015
    """
2016
    job = self._memcache.get(job_id, None)
2017
    if job:
2018
      logging.debug("Found job %s in memcache", job_id)
2019
      assert job.writable, "Found read-only job in memcache"
2020
      return job
2021

    
2022
    try:
2023
      job = self._LoadJobFromDisk(job_id, False)
2024
      if job is None:
2025
        return job
2026
    except errors.JobFileCorrupted:
2027
      old_path = self._GetJobPath(job_id)
2028
      new_path = self._GetArchivedJobPath(job_id)
2029
      if old_path == new_path:
2030
        # job already archived (future case)
2031
        logging.exception("Can't parse job %s", job_id)
2032
      else:
2033
        # non-archived case
2034
        logging.exception("Can't parse job %s, will archive.", job_id)
2035
        self._RenameFilesUnlocked([(old_path, new_path)])
2036
      return None
2037

    
2038
    assert job.writable, "Job just loaded is not writable"
2039

    
2040
    self._memcache[job_id] = job
2041
    logging.debug("Added job %s to the cache", job_id)
2042
    return job
2043

    
2044
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2045
    """Load the given job file from disk.
2046

2047
    Given a job file, read, load and restore it in a _QueuedJob format.
2048

2049
    @type job_id: int
2050
    @param job_id: job identifier
2051
    @type try_archived: bool
2052
    @param try_archived: Whether to try loading an archived job
2053
    @rtype: L{_QueuedJob} or None
2054
    @return: either None or the job object
2055

2056
    """
2057
    path_functions = [(self._GetJobPath, False)]
2058

    
2059
    if try_archived:
2060
      path_functions.append((self._GetArchivedJobPath, True))
2061

    
2062
    raw_data = None
2063
    archived = None
2064

    
2065
    for (fn, archived) in path_functions:
2066
      filepath = fn(job_id)
2067
      logging.debug("Loading job from %s", filepath)
2068
      try:
2069
        raw_data = utils.ReadFile(filepath)
2070
      except EnvironmentError, err:
2071
        if err.errno != errno.ENOENT:
2072
          raise
2073
      else:
2074
        break
2075

    
2076
    if not raw_data:
2077
      return None
2078

    
2079
    if writable is None:
2080
      writable = not archived
2081

    
2082
    try:
2083
      data = serializer.LoadJson(raw_data)
2084
      job = _QueuedJob.Restore(self, data, writable, archived)
2085
    except Exception, err: # pylint: disable=W0703
2086
      raise errors.JobFileCorrupted(err)
2087

    
2088
    return job
2089

    
2090
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2091
    """Load the given job file from disk.
2092

2093
    Given a job file, read, load and restore it in a _QueuedJob format.
2094
    In case of error reading the job, it gets returned as None, and the
2095
    exception is logged.
2096

2097
    @type job_id: int
2098
    @param job_id: job identifier
2099
    @type try_archived: bool
2100
    @param try_archived: Whether to try loading an archived job
2101
    @rtype: L{_QueuedJob} or None
2102
    @return: either None or the job object
2103

2104
    """
2105
    try:
2106
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2107
    except (errors.JobFileCorrupted, EnvironmentError):
2108
      logging.exception("Can't load/parse job %s", job_id)
2109
      return None
2110

    
2111
  def _UpdateQueueSizeUnlocked(self):
2112
    """Update the queue size.
2113

2114
    """
2115
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2116

    
2117
  @locking.ssynchronized(_LOCK)
2118
  @_RequireOpenQueue
2119
  def SetDrainFlag(self, drain_flag):
2120
    """Sets the drain flag for the queue.
2121

2122
    @type drain_flag: boolean
2123
    @param drain_flag: Whether to set or unset the drain flag
2124

2125
    """
2126
    jstore.SetDrainFlag(drain_flag)
2127

    
2128
    self._drained = drain_flag
2129

    
2130
    return True
2131

    
2132
  @_RequireOpenQueue
2133
  def _SubmitJobUnlocked(self, job_id, ops):
2134
    """Create and store a new job.
2135

2136
    This enters the job into our job queue and also puts it on the new
2137
    queue, in order for it to be picked up by the queue processors.
2138

2139
    @type job_id: job ID
2140
    @param job_id: the job ID for the new job
2141
    @type ops: list
2142
    @param ops: The list of OpCodes that will become the new job.
2143
    @rtype: L{_QueuedJob}
2144
    @return: the job object to be queued
2145
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2146
    @raise errors.GenericError: If an opcode is not valid
2147

2148
    """
2149
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2150
      raise errors.JobQueueFull()
2151

    
2152
    job = _QueuedJob(self, job_id, ops, True)
2153

    
2154
    for idx, op in enumerate(job.ops):
2155
      # Check priority
2156
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2157
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2158
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2159
                                  " are %s" % (idx, op.priority, allowed))
2160

    
2161
      # Check job dependencies
2162
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2163
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2164
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2165
                                  " match %s: %s" %
2166
                                  (idx, opcodes.TNoRelativeJobDependencies,
2167
                                   dependencies))
2168

    
2169
    # Write to disk
2170
    self.UpdateJobUnlocked(job)
2171

    
2172
    self._queue_size += 1
2173

    
2174
    logging.debug("Adding new job %s to the cache", job_id)
2175
    self._memcache[job_id] = job
2176

    
2177
    return job
2178

    
2179
  @locking.ssynchronized(_LOCK)
2180
  @_RequireOpenQueue
2181
  @_RequireNonDrainedQueue
2182
  def SubmitJob(self, ops):
2183
    """Create and store a new job.
2184

2185
    @see: L{_SubmitJobUnlocked}
2186

2187
    """
2188
    (job_id, ) = self._NewSerialsUnlocked(1)
2189
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2190
    return job_id
2191

    
2192
  @locking.ssynchronized(_LOCK)
2193
  @_RequireOpenQueue
2194
  @_RequireNonDrainedQueue
2195
  def SubmitManyJobs(self, jobs):
2196
    """Create and store multiple jobs.
2197

2198
    @see: L{_SubmitJobUnlocked}
2199

2200
    """
2201
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2202

    
2203
    (results, added_jobs) = \
2204
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2205

    
2206
    self._EnqueueJobsUnlocked(added_jobs)
2207

    
2208
    return results
2209

    
2210
  @staticmethod
2211
  def _FormatSubmitError(msg, ops):
2212
    """Formats errors which occurred while submitting a job.
2213

2214
    """
2215
    return ("%s; opcodes %s" %
2216
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2217

    
2218
  @staticmethod
2219
  def _ResolveJobDependencies(resolve_fn, deps):
2220
    """Resolves relative job IDs in dependencies.
2221

2222
    @type resolve_fn: callable
2223
    @param resolve_fn: Function to resolve a relative job ID
2224
    @type deps: list
2225
    @param deps: Dependencies
2226
    @rtype: tuple; (boolean, string or list)
2227
    @return: If successful (first tuple item), the returned list contains
2228
      resolved job IDs along with the requested status; if not successful,
2229
      the second element is an error message
2230

2231
    """
2232
    result = []
2233

    
2234
    for (dep_job_id, dep_status) in deps:
2235
      if ht.TRelativeJobId(dep_job_id):
2236
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2237
        try:
2238
          job_id = resolve_fn(dep_job_id)
2239
        except IndexError:
2240
          # Abort
2241
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2242
      else:
2243
        job_id = dep_job_id
2244

    
2245
      result.append((job_id, dep_status))
2246

    
2247
    return (True, result)
2248

    
2249
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2250
    """Create and store multiple jobs.
2251

2252
    @see: L{_SubmitJobUnlocked}
2253

2254
    """
2255
    results = []
2256
    added_jobs = []
2257

    
2258
    def resolve_fn(job_idx, reljobid):
2259
      assert reljobid < 0
2260
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2261

    
2262
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2263
      for op in ops:
2264
        if getattr(op, opcodes.DEPEND_ATTR, None):
2265
          (status, data) = \
2266
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2267
                                         op.depends)
2268
          if not status:
2269
            # Abort resolving dependencies
2270
            assert ht.TNonEmptyString(data), "No error message"
2271
            break
2272
          # Use resolved dependencies
2273
          op.depends = data
2274
      else:
2275
        try:
2276
          job = self._SubmitJobUnlocked(job_id, ops)
2277
        except errors.GenericError, err:
2278
          status = False
2279
          data = self._FormatSubmitError(str(err), ops)
2280
        else:
2281
          status = True
2282
          data = job_id
2283
          added_jobs.append(job)
2284

    
2285
      results.append((status, data))
2286

    
2287
    return (results, added_jobs)
2288

    
2289
  @locking.ssynchronized(_LOCK)
2290
  def _EnqueueJobs(self, jobs):
2291
    """Helper function to add jobs to worker pool's queue.
2292

2293
    @type jobs: list
2294
    @param jobs: List of all jobs
2295

2296
    """
2297
    return self._EnqueueJobsUnlocked(jobs)
2298

    
2299
  def _EnqueueJobsUnlocked(self, jobs):
2300
    """Helper function to add jobs to worker pool's queue.
2301

2302
    @type jobs: list
2303
    @param jobs: List of all jobs
2304

2305
    """
2306
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2307
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2308
                             priority=[job.CalcPriority() for job in jobs],
2309
                             task_id=map(_GetIdAttr, jobs))
2310

    
2311
  def _GetJobStatusForDependencies(self, job_id):
2312
    """Gets the status of a job for dependencies.
2313

2314
    @type job_id: int
2315
    @param job_id: Job ID
2316
    @raise errors.JobLost: If job can't be found
2317

2318
    """
2319
    # Not using in-memory cache as doing so would require an exclusive lock
2320

    
2321
    # Try to load from disk
2322
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2323

    
2324
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2325

    
2326
    if job:
2327
      return job.CalcStatus()
2328

    
2329
    raise errors.JobLost("Job %s not found" % job_id)
2330

    
2331
  @_RequireOpenQueue
2332
  def UpdateJobUnlocked(self, job, replicate=True):
2333
    """Update a job's on disk storage.
2334

2335
    After a job has been modified, this function needs to be called in
2336
    order to write the changes to disk and replicate them to the other
2337
    nodes.
2338

2339
    @type job: L{_QueuedJob}
2340
    @param job: the changed job
2341
    @type replicate: boolean
2342
    @param replicate: whether to replicate the change to remote nodes
2343

2344
    """
2345
    if __debug__:
2346
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2347
      assert (finalized ^ (job.end_timestamp is None))
2348
      assert job.writable, "Can't update read-only job"
2349
      assert not job.archived, "Can't update archived job"
2350

    
2351
    filename = self._GetJobPath(job.id)
2352
    data = serializer.DumpJson(job.Serialize())
2353
    logging.debug("Writing job %s to %s", job.id, filename)
2354
    self._UpdateJobQueueFile(filename, data, replicate)
2355

    
2356
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2357
                        timeout):
2358
    """Waits for changes in a job.
2359

2360
    @type job_id: int
2361
    @param job_id: Job identifier
2362
    @type fields: list of strings
2363
    @param fields: Which fields to check for changes
2364
    @type prev_job_info: list or None
2365
    @param prev_job_info: Last job information returned
2366
    @type prev_log_serial: int
2367
    @param prev_log_serial: Last job message serial number
2368
    @type timeout: float
2369
    @param timeout: maximum time to wait in seconds
2370
    @rtype: tuple (job info, log entries)
2371
    @return: a tuple of the job information as required via
2372
        the fields parameter, and the log entries as a list
2373

2374
        if the job has not changed and the timeout has expired,
2375
        we instead return a special value,
2376
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2377
        as such by the clients
2378

2379
    """
2380
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2381
                             writable=False)
2382

    
2383
    helper = _WaitForJobChangesHelper()
2384

    
2385
    return helper(self._GetJobPath(job_id), load_fn,
2386
                  fields, prev_job_info, prev_log_serial, timeout)
2387

    
2388
  @locking.ssynchronized(_LOCK)
2389
  @_RequireOpenQueue
2390
  def CancelJob(self, job_id):
2391
    """Cancels a job.
2392

2393
    This will only succeed if the job has not started yet.
2394

2395
    @type job_id: int
2396
    @param job_id: job ID of job to be cancelled.
2397

2398
    """
2399
    logging.info("Cancelling job %s", job_id)
2400

    
2401
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2402

    
2403
  @locking.ssynchronized(_LOCK)
2404
  @_RequireOpenQueue
2405
  def ChangeJobPriority(self, job_id, priority):
2406
    """Changes a job's priority.
2407

2408
    @type job_id: int
2409
    @param job_id: ID of the job whose priority should be changed
2410
    @type priority: int
2411
    @param priority: New priority
2412

2413
    """
2414
    logging.info("Changing priority of job %s to %s", job_id, priority)
2415

    
2416
    if priority not in constants.OP_PRIO_SUBMIT_VALID:
2417
      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2418
      raise errors.GenericError("Invalid priority %s, allowed are %s" %
2419
                                (priority, allowed))
2420

    
2421
    def fn(job):
2422
      (success, msg) = job.ChangePriority(priority)
2423

    
2424
      if success:
2425
        try:
2426
          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2427
        except workerpool.NoSuchTask:
2428
          logging.debug("Job %s is not in workerpool at this time", job.id)
2429

    
2430
      return (success, msg)
2431

    
2432
    return self._ModifyJobUnlocked(job_id, fn)
2433

    
2434
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2435
    """Modifies a job.
2436

2437
    @type job_id: int
2438
    @param job_id: Job ID
2439
    @type mod_fn: callable
2440
    @param mod_fn: Modifying function, receiving job object as parameter,
2441
      returning tuple of (status boolean, message string)
2442

2443
    """
2444
    job = self._LoadJobUnlocked(job_id)
2445
    if not job:
2446
      logging.debug("Job %s not found", job_id)
2447
      return (False, "Job %s not found" % job_id)
2448

    
2449
    assert job.writable, "Can't modify read-only job"
2450
    assert not job.archived, "Can't modify archived job"
2451

    
2452
    (success, msg) = mod_fn(job)
2453

    
2454
    if success:
2455
      # If the job was finalized (e.g. cancelled), this is the final write
2456
      # allowed. The job can be archived anytime.
2457
      self.UpdateJobUnlocked(job)
2458

    
2459
    return (success, msg)
2460

    
2461
  @_RequireOpenQueue
2462
  def _ArchiveJobsUnlocked(self, jobs):
2463
    """Archives jobs.
2464

2465
    @type jobs: list of L{_QueuedJob}
2466
    @param jobs: Job objects
2467
    @rtype: int
2468
    @return: Number of archived jobs
2469

2470
    """
2471
    archive_jobs = []
2472
    rename_files = []
2473
    for job in jobs:
2474
      assert job.writable, "Can't archive read-only job"
2475
      assert not job.archived, "Can't cancel archived job"
2476

    
2477
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2478
        logging.debug("Job %s is not yet done", job.id)
2479
        continue
2480

    
2481
      archive_jobs.append(job)
2482

    
2483
      old = self._GetJobPath(job.id)
2484
      new = self._GetArchivedJobPath(job.id)
2485
      rename_files.append((old, new))
2486

    
2487
    # TODO: What if 1..n files fail to rename?
2488
    self._RenameFilesUnlocked(rename_files)
2489

    
2490
    logging.debug("Successfully archived job(s) %s",
2491
                  utils.CommaJoin(job.id for job in archive_jobs))
2492

    
2493
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2494
    # the files, we update the cached queue size from the filesystem. When we
2495
    # get around to fix the TODO: above, we can use the number of actually
2496
    # archived jobs to fix this.
2497
    self._UpdateQueueSizeUnlocked()
2498
    return len(archive_jobs)
2499

    
2500
  @locking.ssynchronized(_LOCK)
2501
  @_RequireOpenQueue
2502
  def ArchiveJob(self, job_id):
2503
    """Archives a job.
2504

2505
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2506

2507
    @type job_id: int
2508
    @param job_id: Job ID of job to be archived.
2509
    @rtype: bool
2510
    @return: Whether job was archived
2511

2512
    """
2513
    logging.info("Archiving job %s", job_id)
2514

    
2515
    job = self._LoadJobUnlocked(job_id)
2516
    if not job:
2517
      logging.debug("Job %s not found", job_id)
2518
      return False
2519

    
2520
    return self._ArchiveJobsUnlocked([job]) == 1
2521

    
2522
  @locking.ssynchronized(_LOCK)
2523
  @_RequireOpenQueue
2524
  def AutoArchiveJobs(self, age, timeout):
2525
    """Archives all jobs based on age.
2526

2527
    The method will archive all jobs which are older than the age
2528
    parameter. For jobs that don't have an end timestamp, the start
2529
    timestamp will be considered. The special '-1' age will cause
2530
    archival of all jobs (that are not running or queued).
2531

2532
    @type age: int
2533
    @param age: the minimum age in seconds
2534

2535
    """
2536
    logging.info("Archiving jobs with age more than %s seconds", age)
2537

    
2538
    now = time.time()
2539
    end_time = now + timeout
2540
    archived_count = 0
2541
    last_touched = 0
2542

    
2543
    all_job_ids = self._GetJobIDsUnlocked()
2544
    pending = []
2545
    for idx, job_id in enumerate(all_job_ids):
2546
      last_touched = idx + 1
2547

    
2548
      # Not optimal because jobs could be pending
2549
      # TODO: Measure average duration for job archival and take number of
2550
      # pending jobs into account.
2551
      if time.time() > end_time:
2552
        break
2553

    
2554
      # Returns None if the job failed to load
2555
      job = self._LoadJobUnlocked(job_id)
2556
      if job:
2557
        if job.end_timestamp is None:
2558
          if job.start_timestamp is None:
2559
            job_age = job.received_timestamp
2560
          else:
2561
            job_age = job.start_timestamp
2562
        else:
2563
          job_age = job.end_timestamp
2564

    
2565
        if age == -1 or now - job_age[0] > age:
2566
          pending.append(job)
2567

    
2568
          # Archive 10 jobs at a time
2569
          if len(pending) >= 10:
2570
            archived_count += self._ArchiveJobsUnlocked(pending)
2571
            pending = []
2572

    
2573
    if pending:
2574
      archived_count += self._ArchiveJobsUnlocked(pending)
2575

    
2576
    return (archived_count, len(all_job_ids) - last_touched)
2577

    
2578
  def _Query(self, fields, qfilter):
2579
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2580
                       namefield="id")
2581

    
2582
    # Archived jobs are only looked at if the "archived" field is referenced
2583
    # either as a requested field or in the filter. By default archived jobs
2584
    # are ignored.
2585
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2586

    
2587
    job_ids = qobj.RequestedNames()
2588

    
2589
    list_all = (job_ids is None)
2590

    
2591
    if list_all:
2592
      # Since files are added to/removed from the queue atomically, there's no
2593
      # risk of getting the job ids in an inconsistent state.
2594
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2595

    
2596
    jobs = []
2597

    
2598
    for job_id in job_ids:
2599
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2600
      if job is not None or not list_all:
2601
        jobs.append((job_id, job))
2602

    
2603
    return (qobj, jobs, list_all)
2604

    
2605
  def QueryJobs(self, fields, qfilter):
2606
    """Returns a list of jobs in queue.
2607

2608
    @type fields: sequence
2609
    @param fields: List of wanted fields
2610
    @type qfilter: None or query2 filter (list)
2611
    @param qfilter: Query filter
2612

2613
    """
2614
    (qobj, ctx, _) = self._Query(fields, qfilter)
2615

    
2616
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2617

    
2618
  def OldStyleQueryJobs(self, job_ids, fields):
2619
    """Returns a list of jobs in queue.
2620

2621
    @type job_ids: list
2622
    @param job_ids: sequence of job identifiers or None for all
2623
    @type fields: list
2624
    @param fields: names of fields to return
2625
    @rtype: list
2626
    @return: list one element per job, each element being list with
2627
        the requested fields
2628

2629
    """
2630
    # backwards compat:
2631
    job_ids = [int(jid) for jid in job_ids]
2632
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2633

    
2634
    (qobj, ctx, _) = self._Query(fields, qfilter)
2635

    
2636
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2637

    
2638
  @locking.ssynchronized(_LOCK)
2639
  def PrepareShutdown(self):
2640
    """Prepare to stop the job queue.
2641

2642
    Disables execution of jobs in the workerpool and returns whether there are
2643
    any jobs currently running. If the latter is the case, the job queue is not
2644
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2645
    be called without interfering with any job. Queued and unfinished jobs will
2646
    be resumed next time.
2647

2648
    Once this function has been called no new job submissions will be accepted
2649
    (see L{_RequireNonDrainedQueue}).
2650

2651
    @rtype: bool
2652
    @return: Whether there are any running jobs
2653

2654
    """
2655
    if self._accepting_jobs:
2656
      self._accepting_jobs = False
2657

    
2658
      # Tell worker pool to stop processing pending tasks
2659
      self._wpool.SetActive(False)
2660

    
2661
    return self._wpool.HasRunningTasks()
2662

    
2663
  def AcceptingJobsUnlocked(self):
2664
    """Returns whether jobs are accepted.
2665

2666
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2667
    queue is shutting down.
2668

2669
    @rtype: bool
2670

2671
    """
2672
    return self._accepting_jobs
2673

    
2674
  @locking.ssynchronized(_LOCK)
2675
  @_RequireOpenQueue
2676
  def Shutdown(self):
2677
    """Stops the job queue.
2678

2679
    This shutdowns all the worker threads an closes the queue.
2680

2681
    """
2682
    self._wpool.TerminateWorkers()
2683

    
2684
    self._queue_filelock.Close()
2685
    self._queue_filelock = None