Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 76b62028

History | View | Annotate | Download (72.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import time
35
import weakref
36
import threading
37
import itertools
38

    
39
try:
40
  # pylint: disable=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
59
from ganeti import ht
60
from ganeti import query
61
from ganeti import qlang
62

    
63

    
64
JOBQUEUE_THREADS = 25
65

    
66
# member lock names to be passed to @ssynchronized decorator
67
_LOCK = "_lock"
68
_QUEUE = "_queue"
69

    
70

    
71
class CancelJob(Exception):
72
  """Special exception to cancel a job.
73

74
  """
75

    
76

    
77
def TimeStampNow():
78
  """Returns the current timestamp.
79

80
  @rtype: tuple
81
  @return: the current time in the (seconds, microseconds) format
82

83
  """
84
  return utils.SplitTime(time.time())
85

    
86

    
87
class _SimpleJobQuery:
88
  """Wrapper for job queries.
89

90
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
91

92
  """
93
  def __init__(self, fields):
94
    """Initializes this class.
95

96
    """
97
    self._query = query.Query(query.JOB_FIELDS, fields)
98

    
99
  def __call__(self, job):
100
    """Executes a job query using cached field list.
101

102
    """
103
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
104

    
105

    
106
class _QueuedOpCode(object):
107
  """Encapsulates an opcode object.
108

109
  @ivar log: holds the execution log and consists of tuples
110
  of the form C{(log_serial, timestamp, level, message)}
111
  @ivar input: the OpCode we encapsulate
112
  @ivar status: the current status
113
  @ivar result: the result of the LU execution
114
  @ivar start_timestamp: timestamp for the start of the execution
115
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
116
  @ivar stop_timestamp: timestamp for the end of the execution
117

118
  """
119
  __slots__ = ["input", "status", "result", "log", "priority",
120
               "start_timestamp", "exec_timestamp", "end_timestamp",
121
               "__weakref__"]
122

    
123
  def __init__(self, op):
124
    """Initializes instances of this class.
125

126
    @type op: L{opcodes.OpCode}
127
    @param op: the opcode we encapsulate
128

129
    """
130
    self.input = op
131
    self.status = constants.OP_STATUS_QUEUED
132
    self.result = None
133
    self.log = []
134
    self.start_timestamp = None
135
    self.exec_timestamp = None
136
    self.end_timestamp = None
137

    
138
    # Get initial priority (it might change during the lifetime of this opcode)
139
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
140

    
141
  @classmethod
142
  def Restore(cls, state):
143
    """Restore the _QueuedOpCode from the serialized form.
144

145
    @type state: dict
146
    @param state: the serialized state
147
    @rtype: _QueuedOpCode
148
    @return: a new _QueuedOpCode instance
149

150
    """
151
    obj = _QueuedOpCode.__new__(cls)
152
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
153
    obj.status = state["status"]
154
    obj.result = state["result"]
155
    obj.log = state["log"]
156
    obj.start_timestamp = state.get("start_timestamp", None)
157
    obj.exec_timestamp = state.get("exec_timestamp", None)
158
    obj.end_timestamp = state.get("end_timestamp", None)
159
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
160
    return obj
161

    
162
  def Serialize(self):
163
    """Serializes this _QueuedOpCode.
164

165
    @rtype: dict
166
    @return: the dictionary holding the serialized state
167

168
    """
169
    return {
170
      "input": self.input.__getstate__(),
171
      "status": self.status,
172
      "result": self.result,
173
      "log": self.log,
174
      "start_timestamp": self.start_timestamp,
175
      "exec_timestamp": self.exec_timestamp,
176
      "end_timestamp": self.end_timestamp,
177
      "priority": self.priority,
178
      }
179

    
180

    
181
class _QueuedJob(object):
182
  """In-memory job representation.
183

184
  This is what we use to track the user-submitted jobs. Locking must
185
  be taken care of by users of this class.
186

187
  @type queue: L{JobQueue}
188
  @ivar queue: the parent queue
189
  @ivar id: the job ID
190
  @type ops: list
191
  @ivar ops: the list of _QueuedOpCode that constitute the job
192
  @type log_serial: int
193
  @ivar log_serial: holds the index for the next log entry
194
  @ivar received_timestamp: the timestamp for when the job was received
195
  @ivar start_timestmap: the timestamp for start of execution
196
  @ivar end_timestamp: the timestamp for end of execution
197
  @ivar writable: Whether the job is allowed to be modified
198

199
  """
200
  # pylint: disable=W0212
201
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
202
               "received_timestamp", "start_timestamp", "end_timestamp",
203
               "__weakref__", "processor_lock", "writable"]
204

    
205
  def __init__(self, queue, job_id, ops, writable):
206
    """Constructor for the _QueuedJob.
207

208
    @type queue: L{JobQueue}
209
    @param queue: our parent queue
210
    @type job_id: job_id
211
    @param job_id: our job id
212
    @type ops: list
213
    @param ops: the list of opcodes we hold, which will be encapsulated
214
        in _QueuedOpCodes
215
    @type writable: bool
216
    @param writable: Whether job can be modified
217

218
    """
219
    if not ops:
220
      raise errors.GenericError("A job needs at least one opcode")
221

    
222
    self.queue = queue
223
    self.id = int(job_id)
224
    self.ops = [_QueuedOpCode(op) for op in ops]
225
    self.log_serial = 0
226
    self.received_timestamp = TimeStampNow()
227
    self.start_timestamp = None
228
    self.end_timestamp = None
229

    
230
    self._InitInMemory(self, writable)
231

    
232
  @staticmethod
233
  def _InitInMemory(obj, writable):
234
    """Initializes in-memory variables.
235

236
    """
237
    obj.writable = writable
238
    obj.ops_iter = None
239
    obj.cur_opctx = None
240

    
241
    # Read-only jobs are not processed and therefore don't need a lock
242
    if writable:
243
      obj.processor_lock = threading.Lock()
244
    else:
245
      obj.processor_lock = None
246

    
247
  def __repr__(self):
248
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
249
              "id=%s" % self.id,
250
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
251

    
252
    return "<%s at %#x>" % (" ".join(status), id(self))
253

    
254
  @classmethod
255
  def Restore(cls, queue, state, writable):
256
    """Restore a _QueuedJob from serialized state:
257

258
    @type queue: L{JobQueue}
259
    @param queue: to which queue the restored job belongs
260
    @type state: dict
261
    @param state: the serialized state
262
    @type writable: bool
263
    @param writable: Whether job can be modified
264
    @rtype: _JobQueue
265
    @return: the restored _JobQueue instance
266

267
    """
268
    obj = _QueuedJob.__new__(cls)
269
    obj.queue = queue
270
    obj.id = int(state["id"])
271
    obj.received_timestamp = state.get("received_timestamp", None)
272
    obj.start_timestamp = state.get("start_timestamp", None)
273
    obj.end_timestamp = state.get("end_timestamp", None)
274

    
275
    obj.ops = []
276
    obj.log_serial = 0
277
    for op_state in state["ops"]:
278
      op = _QueuedOpCode.Restore(op_state)
279
      for log_entry in op.log:
280
        obj.log_serial = max(obj.log_serial, log_entry[0])
281
      obj.ops.append(op)
282

    
283
    cls._InitInMemory(obj, writable)
284

    
285
    return obj
286

    
287
  def Serialize(self):
288
    """Serialize the _JobQueue instance.
289

290
    @rtype: dict
291
    @return: the serialized state
292

293
    """
294
    return {
295
      "id": self.id,
296
      "ops": [op.Serialize() for op in self.ops],
297
      "start_timestamp": self.start_timestamp,
298
      "end_timestamp": self.end_timestamp,
299
      "received_timestamp": self.received_timestamp,
300
      }
301

    
302
  def CalcStatus(self):
303
    """Compute the status of this job.
304

305
    This function iterates over all the _QueuedOpCodes in the job and
306
    based on their status, computes the job status.
307

308
    The algorithm is:
309
      - if we find a cancelled, or finished with error, the job
310
        status will be the same
311
      - otherwise, the last opcode with the status one of:
312
          - waitlock
313
          - canceling
314
          - running
315

316
        will determine the job status
317

318
      - otherwise, it means either all opcodes are queued, or success,
319
        and the job status will be the same
320

321
    @return: the job status
322

323
    """
324
    status = constants.JOB_STATUS_QUEUED
325

    
326
    all_success = True
327
    for op in self.ops:
328
      if op.status == constants.OP_STATUS_SUCCESS:
329
        continue
330

    
331
      all_success = False
332

    
333
      if op.status == constants.OP_STATUS_QUEUED:
334
        pass
335
      elif op.status == constants.OP_STATUS_WAITING:
336
        status = constants.JOB_STATUS_WAITING
337
      elif op.status == constants.OP_STATUS_RUNNING:
338
        status = constants.JOB_STATUS_RUNNING
339
      elif op.status == constants.OP_STATUS_CANCELING:
340
        status = constants.JOB_STATUS_CANCELING
341
        break
342
      elif op.status == constants.OP_STATUS_ERROR:
343
        status = constants.JOB_STATUS_ERROR
344
        # The whole job fails if one opcode failed
345
        break
346
      elif op.status == constants.OP_STATUS_CANCELED:
347
        status = constants.OP_STATUS_CANCELED
348
        break
349

    
350
    if all_success:
351
      status = constants.JOB_STATUS_SUCCESS
352

    
353
    return status
354

    
355
  def CalcPriority(self):
356
    """Gets the current priority for this job.
357

358
    Only unfinished opcodes are considered. When all are done, the default
359
    priority is used.
360

361
    @rtype: int
362

363
    """
364
    priorities = [op.priority for op in self.ops
365
                  if op.status not in constants.OPS_FINALIZED]
366

    
367
    if not priorities:
368
      # All opcodes are done, assume default priority
369
      return constants.OP_PRIO_DEFAULT
370

    
371
    return min(priorities)
372

    
373
  def GetLogEntries(self, newer_than):
374
    """Selectively returns the log entries.
375

376
    @type newer_than: None or int
377
    @param newer_than: if this is None, return all log entries,
378
        otherwise return only the log entries with serial higher
379
        than this value
380
    @rtype: list
381
    @return: the list of the log entries selected
382

383
    """
384
    if newer_than is None:
385
      serial = -1
386
    else:
387
      serial = newer_than
388

    
389
    entries = []
390
    for op in self.ops:
391
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
392

    
393
    return entries
394

    
395
  def GetInfo(self, fields):
396
    """Returns information about a job.
397

398
    @type fields: list
399
    @param fields: names of fields to return
400
    @rtype: list
401
    @return: list with one element for each field
402
    @raise errors.OpExecError: when an invalid field
403
        has been passed
404

405
    """
406
    return _SimpleJobQuery(fields)(self)
407

    
408
  def MarkUnfinishedOps(self, status, result):
409
    """Mark unfinished opcodes with a given status and result.
410

411
    This is an utility function for marking all running or waiting to
412
    be run opcodes with a given status. Opcodes which are already
413
    finalised are not changed.
414

415
    @param status: a given opcode status
416
    @param result: the opcode result
417

418
    """
419
    not_marked = True
420
    for op in self.ops:
421
      if op.status in constants.OPS_FINALIZED:
422
        assert not_marked, "Finalized opcodes found after non-finalized ones"
423
        continue
424
      op.status = status
425
      op.result = result
426
      not_marked = False
427

    
428
  def Finalize(self):
429
    """Marks the job as finalized.
430

431
    """
432
    self.end_timestamp = TimeStampNow()
433

    
434
  def Cancel(self):
435
    """Marks job as canceled/-ing if possible.
436

437
    @rtype: tuple; (bool, string)
438
    @return: Boolean describing whether job was successfully canceled or marked
439
      as canceling and a text message
440

441
    """
442
    status = self.CalcStatus()
443

    
444
    if status == constants.JOB_STATUS_QUEUED:
445
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
446
                             "Job canceled by request")
447
      self.Finalize()
448
      return (True, "Job %s canceled" % self.id)
449

    
450
    elif status == constants.JOB_STATUS_WAITING:
451
      # The worker will notice the new status and cancel the job
452
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
453
      return (True, "Job %s will be canceled" % self.id)
454

    
455
    else:
456
      logging.debug("Job %s is no longer waiting in the queue", self.id)
457
      return (False, "Job %s is no longer waiting in the queue" % self.id)
458

    
459

    
460
class _OpExecCallbacks(mcpu.OpExecCbBase):
461
  def __init__(self, queue, job, op):
462
    """Initializes this class.
463

464
    @type queue: L{JobQueue}
465
    @param queue: Job queue
466
    @type job: L{_QueuedJob}
467
    @param job: Job object
468
    @type op: L{_QueuedOpCode}
469
    @param op: OpCode
470

471
    """
472
    assert queue, "Queue is missing"
473
    assert job, "Job is missing"
474
    assert op, "Opcode is missing"
475

    
476
    self._queue = queue
477
    self._job = job
478
    self._op = op
479

    
480
  def _CheckCancel(self):
481
    """Raises an exception to cancel the job if asked to.
482

483
    """
484
    # Cancel here if we were asked to
485
    if self._op.status == constants.OP_STATUS_CANCELING:
486
      logging.debug("Canceling opcode")
487
      raise CancelJob()
488

    
489
  @locking.ssynchronized(_QUEUE, shared=1)
490
  def NotifyStart(self):
491
    """Mark the opcode as running, not lock-waiting.
492

493
    This is called from the mcpu code as a notifier function, when the LU is
494
    finally about to start the Exec() method. Of course, to have end-user
495
    visible results, the opcode must be initially (before calling into
496
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
497

498
    """
499
    assert self._op in self._job.ops
500
    assert self._op.status in (constants.OP_STATUS_WAITING,
501
                               constants.OP_STATUS_CANCELING)
502

    
503
    # Cancel here if we were asked to
504
    self._CheckCancel()
505

    
506
    logging.debug("Opcode is now running")
507

    
508
    self._op.status = constants.OP_STATUS_RUNNING
509
    self._op.exec_timestamp = TimeStampNow()
510

    
511
    # And finally replicate the job status
512
    self._queue.UpdateJobUnlocked(self._job)
513

    
514
  @locking.ssynchronized(_QUEUE, shared=1)
515
  def _AppendFeedback(self, timestamp, log_type, log_msg):
516
    """Internal feedback append function, with locks
517

518
    """
519
    self._job.log_serial += 1
520
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
521
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
522

    
523
  def Feedback(self, *args):
524
    """Append a log entry.
525

526
    """
527
    assert len(args) < 3
528

    
529
    if len(args) == 1:
530
      log_type = constants.ELOG_MESSAGE
531
      log_msg = args[0]
532
    else:
533
      (log_type, log_msg) = args
534

    
535
    # The time is split to make serialization easier and not lose
536
    # precision.
537
    timestamp = utils.SplitTime(time.time())
538
    self._AppendFeedback(timestamp, log_type, log_msg)
539

    
540
  def CheckCancel(self):
541
    """Check whether job has been cancelled.
542

543
    """
544
    assert self._op.status in (constants.OP_STATUS_WAITING,
545
                               constants.OP_STATUS_CANCELING)
546

    
547
    # Cancel here if we were asked to
548
    self._CheckCancel()
549

    
550
  def SubmitManyJobs(self, jobs):
551
    """Submits jobs for processing.
552

553
    See L{JobQueue.SubmitManyJobs}.
554

555
    """
556
    # Locking is done in job queue
557
    return self._queue.SubmitManyJobs(jobs)
558

    
559

    
560
class _JobChangesChecker(object):
561
  def __init__(self, fields, prev_job_info, prev_log_serial):
562
    """Initializes this class.
563

564
    @type fields: list of strings
565
    @param fields: Fields requested by LUXI client
566
    @type prev_job_info: string
567
    @param prev_job_info: previous job info, as passed by the LUXI client
568
    @type prev_log_serial: string
569
    @param prev_log_serial: previous job serial, as passed by the LUXI client
570

571
    """
572
    self._squery = _SimpleJobQuery(fields)
573
    self._prev_job_info = prev_job_info
574
    self._prev_log_serial = prev_log_serial
575

    
576
  def __call__(self, job):
577
    """Checks whether job has changed.
578

579
    @type job: L{_QueuedJob}
580
    @param job: Job object
581

582
    """
583
    assert not job.writable, "Expected read-only job"
584

    
585
    status = job.CalcStatus()
586
    job_info = self._squery(job)
587
    log_entries = job.GetLogEntries(self._prev_log_serial)
588

    
589
    # Serializing and deserializing data can cause type changes (e.g. from
590
    # tuple to list) or precision loss. We're doing it here so that we get
591
    # the same modifications as the data received from the client. Without
592
    # this, the comparison afterwards might fail without the data being
593
    # significantly different.
594
    # TODO: we just deserialized from disk, investigate how to make sure that
595
    # the job info and log entries are compatible to avoid this further step.
596
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
597
    # efficient, though floats will be tricky
598
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
599
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
600

    
601
    # Don't even try to wait if the job is no longer running, there will be
602
    # no changes.
603
    if (status not in (constants.JOB_STATUS_QUEUED,
604
                       constants.JOB_STATUS_RUNNING,
605
                       constants.JOB_STATUS_WAITING) or
606
        job_info != self._prev_job_info or
607
        (log_entries and self._prev_log_serial != log_entries[0][0])):
608
      logging.debug("Job %s changed", job.id)
609
      return (job_info, log_entries)
610

    
611
    return None
612

    
613

    
614
class _JobFileChangesWaiter(object):
615
  def __init__(self, filename):
616
    """Initializes this class.
617

618
    @type filename: string
619
    @param filename: Path to job file
620
    @raises errors.InotifyError: if the notifier cannot be setup
621

622
    """
623
    self._wm = pyinotify.WatchManager()
624
    self._inotify_handler = \
625
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
626
    self._notifier = \
627
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
628
    try:
629
      self._inotify_handler.enable()
630
    except Exception:
631
      # pyinotify doesn't close file descriptors automatically
632
      self._notifier.stop()
633
      raise
634

    
635
  def _OnInotify(self, notifier_enabled):
636
    """Callback for inotify.
637

638
    """
639
    if not notifier_enabled:
640
      self._inotify_handler.enable()
641

    
642
  def Wait(self, timeout):
643
    """Waits for the job file to change.
644

645
    @type timeout: float
646
    @param timeout: Timeout in seconds
647
    @return: Whether there have been events
648

649
    """
650
    assert timeout >= 0
651
    have_events = self._notifier.check_events(timeout * 1000)
652
    if have_events:
653
      self._notifier.read_events()
654
    self._notifier.process_events()
655
    return have_events
656

    
657
  def Close(self):
658
    """Closes underlying notifier and its file descriptor.
659

660
    """
661
    self._notifier.stop()
662

    
663

    
664
class _JobChangesWaiter(object):
665
  def __init__(self, filename):
666
    """Initializes this class.
667

668
    @type filename: string
669
    @param filename: Path to job file
670

671
    """
672
    self._filewaiter = None
673
    self._filename = filename
674

    
675
  def Wait(self, timeout):
676
    """Waits for a job to change.
677

678
    @type timeout: float
679
    @param timeout: Timeout in seconds
680
    @return: Whether there have been events
681

682
    """
683
    if self._filewaiter:
684
      return self._filewaiter.Wait(timeout)
685

    
686
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
687
    # If this point is reached, return immediately and let caller check the job
688
    # file again in case there were changes since the last check. This avoids a
689
    # race condition.
690
    self._filewaiter = _JobFileChangesWaiter(self._filename)
691

    
692
    return True
693

    
694
  def Close(self):
695
    """Closes underlying waiter.
696

697
    """
698
    if self._filewaiter:
699
      self._filewaiter.Close()
700

    
701

    
702
class _WaitForJobChangesHelper(object):
703
  """Helper class using inotify to wait for changes in a job file.
704

705
  This class takes a previous job status and serial, and alerts the client when
706
  the current job status has changed.
707

708
  """
709
  @staticmethod
710
  def _CheckForChanges(counter, job_load_fn, check_fn):
711
    if counter.next() > 0:
712
      # If this isn't the first check the job is given some more time to change
713
      # again. This gives better performance for jobs generating many
714
      # changes/messages.
715
      time.sleep(0.1)
716

    
717
    job = job_load_fn()
718
    if not job:
719
      raise errors.JobLost()
720

    
721
    result = check_fn(job)
722
    if result is None:
723
      raise utils.RetryAgain()
724

    
725
    return result
726

    
727
  def __call__(self, filename, job_load_fn,
728
               fields, prev_job_info, prev_log_serial, timeout):
729
    """Waits for changes on a job.
730

731
    @type filename: string
732
    @param filename: File on which to wait for changes
733
    @type job_load_fn: callable
734
    @param job_load_fn: Function to load job
735
    @type fields: list of strings
736
    @param fields: Which fields to check for changes
737
    @type prev_job_info: list or None
738
    @param prev_job_info: Last job information returned
739
    @type prev_log_serial: int
740
    @param prev_log_serial: Last job message serial number
741
    @type timeout: float
742
    @param timeout: maximum time to wait in seconds
743

744
    """
745
    counter = itertools.count()
746
    try:
747
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
748
      waiter = _JobChangesWaiter(filename)
749
      try:
750
        return utils.Retry(compat.partial(self._CheckForChanges,
751
                                          counter, job_load_fn, check_fn),
752
                           utils.RETRY_REMAINING_TIME, timeout,
753
                           wait_fn=waiter.Wait)
754
      finally:
755
        waiter.Close()
756
    except (errors.InotifyError, errors.JobLost):
757
      return None
758
    except utils.RetryTimeout:
759
      return constants.JOB_NOTCHANGED
760

    
761

    
762
def _EncodeOpError(err):
763
  """Encodes an error which occurred while processing an opcode.
764

765
  """
766
  if isinstance(err, errors.GenericError):
767
    to_encode = err
768
  else:
769
    to_encode = errors.OpExecError(str(err))
770

    
771
  return errors.EncodeException(to_encode)
772

    
773

    
774
class _TimeoutStrategyWrapper:
775
  def __init__(self, fn):
776
    """Initializes this class.
777

778
    """
779
    self._fn = fn
780
    self._next = None
781

    
782
  def _Advance(self):
783
    """Gets the next timeout if necessary.
784

785
    """
786
    if self._next is None:
787
      self._next = self._fn()
788

    
789
  def Peek(self):
790
    """Returns the next timeout.
791

792
    """
793
    self._Advance()
794
    return self._next
795

    
796
  def Next(self):
797
    """Returns the current timeout and advances the internal state.
798

799
    """
800
    self._Advance()
801
    result = self._next
802
    self._next = None
803
    return result
804

    
805

    
806
class _OpExecContext:
807
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
808
    """Initializes this class.
809

810
    """
811
    self.op = op
812
    self.index = index
813
    self.log_prefix = log_prefix
814
    self.summary = op.input.Summary()
815

    
816
    # Create local copy to modify
817
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
818
      self.jobdeps = op.input.depends[:]
819
    else:
820
      self.jobdeps = None
821

    
822
    self._timeout_strategy_factory = timeout_strategy_factory
823
    self._ResetTimeoutStrategy()
824

    
825
  def _ResetTimeoutStrategy(self):
826
    """Creates a new timeout strategy.
827

828
    """
829
    self._timeout_strategy = \
830
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
831

    
832
  def CheckPriorityIncrease(self):
833
    """Checks whether priority can and should be increased.
834

835
    Called when locks couldn't be acquired.
836

837
    """
838
    op = self.op
839

    
840
    # Exhausted all retries and next round should not use blocking acquire
841
    # for locks?
842
    if (self._timeout_strategy.Peek() is None and
843
        op.priority > constants.OP_PRIO_HIGHEST):
844
      logging.debug("Increasing priority")
845
      op.priority -= 1
846
      self._ResetTimeoutStrategy()
847
      return True
848

    
849
    return False
850

    
851
  def GetNextLockTimeout(self):
852
    """Returns the next lock acquire timeout.
853

854
    """
855
    return self._timeout_strategy.Next()
856

    
857

    
858
class _JobProcessor(object):
859
  (DEFER,
860
   WAITDEP,
861
   FINISHED) = range(1, 4)
862

    
863
  def __init__(self, queue, opexec_fn, job,
864
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
865
    """Initializes this class.
866

867
    """
868
    self.queue = queue
869
    self.opexec_fn = opexec_fn
870
    self.job = job
871
    self._timeout_strategy_factory = _timeout_strategy_factory
872

    
873
  @staticmethod
874
  def _FindNextOpcode(job, timeout_strategy_factory):
875
    """Locates the next opcode to run.
876

877
    @type job: L{_QueuedJob}
878
    @param job: Job object
879
    @param timeout_strategy_factory: Callable to create new timeout strategy
880

881
    """
882
    # Create some sort of a cache to speed up locating next opcode for future
883
    # lookups
884
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
885
    # pending and one for processed ops.
886
    if job.ops_iter is None:
887
      job.ops_iter = enumerate(job.ops)
888

    
889
    # Find next opcode to run
890
    while True:
891
      try:
892
        (idx, op) = job.ops_iter.next()
893
      except StopIteration:
894
        raise errors.ProgrammerError("Called for a finished job")
895

    
896
      if op.status == constants.OP_STATUS_RUNNING:
897
        # Found an opcode already marked as running
898
        raise errors.ProgrammerError("Called for job marked as running")
899

    
900
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
901
                             timeout_strategy_factory)
902

    
903
      if op.status not in constants.OPS_FINALIZED:
904
        return opctx
905

    
906
      # This is a job that was partially completed before master daemon
907
      # shutdown, so it can be expected that some opcodes are already
908
      # completed successfully (if any did error out, then the whole job
909
      # should have been aborted and not resubmitted for processing).
910
      logging.info("%s: opcode %s already processed, skipping",
911
                   opctx.log_prefix, opctx.summary)
912

    
913
  @staticmethod
914
  def _MarkWaitlock(job, op):
915
    """Marks an opcode as waiting for locks.
916

917
    The job's start timestamp is also set if necessary.
918

919
    @type job: L{_QueuedJob}
920
    @param job: Job object
921
    @type op: L{_QueuedOpCode}
922
    @param op: Opcode object
923

924
    """
925
    assert op in job.ops
926
    assert op.status in (constants.OP_STATUS_QUEUED,
927
                         constants.OP_STATUS_WAITING)
928

    
929
    update = False
930

    
931
    op.result = None
932

    
933
    if op.status == constants.OP_STATUS_QUEUED:
934
      op.status = constants.OP_STATUS_WAITING
935
      update = True
936

    
937
    if op.start_timestamp is None:
938
      op.start_timestamp = TimeStampNow()
939
      update = True
940

    
941
    if job.start_timestamp is None:
942
      job.start_timestamp = op.start_timestamp
943
      update = True
944

    
945
    assert op.status == constants.OP_STATUS_WAITING
946

    
947
    return update
948

    
949
  @staticmethod
950
  def _CheckDependencies(queue, job, opctx):
951
    """Checks if an opcode has dependencies and if so, processes them.
952

953
    @type queue: L{JobQueue}
954
    @param queue: Queue object
955
    @type job: L{_QueuedJob}
956
    @param job: Job object
957
    @type opctx: L{_OpExecContext}
958
    @param opctx: Opcode execution context
959
    @rtype: bool
960
    @return: Whether opcode will be re-scheduled by dependency tracker
961

962
    """
963
    op = opctx.op
964

    
965
    result = False
966

    
967
    while opctx.jobdeps:
968
      (dep_job_id, dep_status) = opctx.jobdeps[0]
969

    
970
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
971
                                                          dep_status)
972
      assert ht.TNonEmptyString(depmsg), "No dependency message"
973

    
974
      logging.info("%s: %s", opctx.log_prefix, depmsg)
975

    
976
      if depresult == _JobDependencyManager.CONTINUE:
977
        # Remove dependency and continue
978
        opctx.jobdeps.pop(0)
979

    
980
      elif depresult == _JobDependencyManager.WAIT:
981
        # Need to wait for notification, dependency tracker will re-add job
982
        # to workerpool
983
        result = True
984
        break
985

    
986
      elif depresult == _JobDependencyManager.CANCEL:
987
        # Job was cancelled, cancel this job as well
988
        job.Cancel()
989
        assert op.status == constants.OP_STATUS_CANCELING
990
        break
991

    
992
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
993
                         _JobDependencyManager.ERROR):
994
        # Job failed or there was an error, this job must fail
995
        op.status = constants.OP_STATUS_ERROR
996
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
997
        break
998

    
999
      else:
1000
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1001
                                     depresult)
1002

    
1003
    return result
1004

    
1005
  def _ExecOpCodeUnlocked(self, opctx):
1006
    """Processes one opcode and returns the result.
1007

1008
    """
1009
    op = opctx.op
1010

    
1011
    assert op.status == constants.OP_STATUS_WAITING
1012

    
1013
    timeout = opctx.GetNextLockTimeout()
1014

    
1015
    try:
1016
      # Make sure not to hold queue lock while calling ExecOpCode
1017
      result = self.opexec_fn(op.input,
1018
                              _OpExecCallbacks(self.queue, self.job, op),
1019
                              timeout=timeout, priority=op.priority)
1020
    except mcpu.LockAcquireTimeout:
1021
      assert timeout is not None, "Received timeout for blocking acquire"
1022
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1023

    
1024
      assert op.status in (constants.OP_STATUS_WAITING,
1025
                           constants.OP_STATUS_CANCELING)
1026

    
1027
      # Was job cancelled while we were waiting for the lock?
1028
      if op.status == constants.OP_STATUS_CANCELING:
1029
        return (constants.OP_STATUS_CANCELING, None)
1030

    
1031
      # Stay in waitlock while trying to re-acquire lock
1032
      return (constants.OP_STATUS_WAITING, None)
1033
    except CancelJob:
1034
      logging.exception("%s: Canceling job", opctx.log_prefix)
1035
      assert op.status == constants.OP_STATUS_CANCELING
1036
      return (constants.OP_STATUS_CANCELING, None)
1037
    except Exception, err: # pylint: disable=W0703
1038
      logging.exception("%s: Caught exception in %s",
1039
                        opctx.log_prefix, opctx.summary)
1040
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1041
    else:
1042
      logging.debug("%s: %s successful",
1043
                    opctx.log_prefix, opctx.summary)
1044
      return (constants.OP_STATUS_SUCCESS, result)
1045

    
1046
  def __call__(self, _nextop_fn=None):
1047
    """Continues execution of a job.
1048

1049
    @param _nextop_fn: Callback function for tests
1050
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1051
      be deferred and C{WAITDEP} if the dependency manager
1052
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1053

1054
    """
1055
    queue = self.queue
1056
    job = self.job
1057

    
1058
    logging.debug("Processing job %s", job.id)
1059

    
1060
    queue.acquire(shared=1)
1061
    try:
1062
      opcount = len(job.ops)
1063

    
1064
      assert job.writable, "Expected writable job"
1065

    
1066
      # Don't do anything for finalized jobs
1067
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1068
        return self.FINISHED
1069

    
1070
      # Is a previous opcode still pending?
1071
      if job.cur_opctx:
1072
        opctx = job.cur_opctx
1073
        job.cur_opctx = None
1074
      else:
1075
        if __debug__ and _nextop_fn:
1076
          _nextop_fn()
1077
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1078

    
1079
      op = opctx.op
1080

    
1081
      # Consistency check
1082
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1083
                                     constants.OP_STATUS_CANCELING)
1084
                        for i in job.ops[opctx.index + 1:])
1085

    
1086
      assert op.status in (constants.OP_STATUS_QUEUED,
1087
                           constants.OP_STATUS_WAITING,
1088
                           constants.OP_STATUS_CANCELING)
1089

    
1090
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1091
              op.priority >= constants.OP_PRIO_HIGHEST)
1092

    
1093
      waitjob = None
1094

    
1095
      if op.status != constants.OP_STATUS_CANCELING:
1096
        assert op.status in (constants.OP_STATUS_QUEUED,
1097
                             constants.OP_STATUS_WAITING)
1098

    
1099
        # Prepare to start opcode
1100
        if self._MarkWaitlock(job, op):
1101
          # Write to disk
1102
          queue.UpdateJobUnlocked(job)
1103

    
1104
        assert op.status == constants.OP_STATUS_WAITING
1105
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1106
        assert job.start_timestamp and op.start_timestamp
1107
        assert waitjob is None
1108

    
1109
        # Check if waiting for a job is necessary
1110
        waitjob = self._CheckDependencies(queue, job, opctx)
1111

    
1112
        assert op.status in (constants.OP_STATUS_WAITING,
1113
                             constants.OP_STATUS_CANCELING,
1114
                             constants.OP_STATUS_ERROR)
1115

    
1116
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1117
                                         constants.OP_STATUS_ERROR)):
1118
          logging.info("%s: opcode %s waiting for locks",
1119
                       opctx.log_prefix, opctx.summary)
1120

    
1121
          assert not opctx.jobdeps, "Not all dependencies were removed"
1122

    
1123
          queue.release()
1124
          try:
1125
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1126
          finally:
1127
            queue.acquire(shared=1)
1128

    
1129
          op.status = op_status
1130
          op.result = op_result
1131

    
1132
          assert not waitjob
1133

    
1134
        if op.status == constants.OP_STATUS_WAITING:
1135
          # Couldn't get locks in time
1136
          assert not op.end_timestamp
1137
        else:
1138
          # Finalize opcode
1139
          op.end_timestamp = TimeStampNow()
1140

    
1141
          if op.status == constants.OP_STATUS_CANCELING:
1142
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1143
                                  for i in job.ops[opctx.index:])
1144
          else:
1145
            assert op.status in constants.OPS_FINALIZED
1146

    
1147
      if op.status == constants.OP_STATUS_WAITING or waitjob:
1148
        finalize = False
1149

    
1150
        if not waitjob and opctx.CheckPriorityIncrease():
1151
          # Priority was changed, need to update on-disk file
1152
          queue.UpdateJobUnlocked(job)
1153

    
1154
        # Keep around for another round
1155
        job.cur_opctx = opctx
1156

    
1157
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1158
                op.priority >= constants.OP_PRIO_HIGHEST)
1159

    
1160
        # In no case must the status be finalized here
1161
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1162

    
1163
      else:
1164
        # Ensure all opcodes so far have been successful
1165
        assert (opctx.index == 0 or
1166
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1167
                           for i in job.ops[:opctx.index]))
1168

    
1169
        # Reset context
1170
        job.cur_opctx = None
1171

    
1172
        if op.status == constants.OP_STATUS_SUCCESS:
1173
          finalize = False
1174

    
1175
        elif op.status == constants.OP_STATUS_ERROR:
1176
          # Ensure failed opcode has an exception as its result
1177
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1178

    
1179
          to_encode = errors.OpExecError("Preceding opcode failed")
1180
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1181
                                _EncodeOpError(to_encode))
1182
          finalize = True
1183

    
1184
          # Consistency check
1185
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1186
                            errors.GetEncodedError(i.result)
1187
                            for i in job.ops[opctx.index:])
1188

    
1189
        elif op.status == constants.OP_STATUS_CANCELING:
1190
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1191
                                "Job canceled by request")
1192
          finalize = True
1193

    
1194
        else:
1195
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1196

    
1197
        if opctx.index == (opcount - 1):
1198
          # Finalize on last opcode
1199
          finalize = True
1200

    
1201
        if finalize:
1202
          # All opcodes have been run, finalize job
1203
          job.Finalize()
1204

    
1205
        # Write to disk. If the job status is final, this is the final write
1206
        # allowed. Once the file has been written, it can be archived anytime.
1207
        queue.UpdateJobUnlocked(job)
1208

    
1209
        assert not waitjob
1210

    
1211
        if finalize:
1212
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1213
          return self.FINISHED
1214

    
1215
      assert not waitjob or queue.depmgr.JobWaiting(job)
1216

    
1217
      if waitjob:
1218
        return self.WAITDEP
1219
      else:
1220
        return self.DEFER
1221
    finally:
1222
      assert job.writable, "Job became read-only while being processed"
1223
      queue.release()
1224

    
1225

    
1226
def _EvaluateJobProcessorResult(depmgr, job, result):
1227
  """Looks at a result from L{_JobProcessor} for a job.
1228

1229
  To be used in a L{_JobQueueWorker}.
1230

1231
  """
1232
  if result == _JobProcessor.FINISHED:
1233
    # Notify waiting jobs
1234
    depmgr.NotifyWaiters(job.id)
1235

    
1236
  elif result == _JobProcessor.DEFER:
1237
    # Schedule again
1238
    raise workerpool.DeferTask(priority=job.CalcPriority())
1239

    
1240
  elif result == _JobProcessor.WAITDEP:
1241
    # No-op, dependency manager will re-schedule
1242
    pass
1243

    
1244
  else:
1245
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1246
                                 (result, ))
1247

    
1248

    
1249
class _JobQueueWorker(workerpool.BaseWorker):
1250
  """The actual job workers.
1251

1252
  """
1253
  def RunTask(self, job): # pylint: disable=W0221
1254
    """Job executor.
1255

1256
    @type job: L{_QueuedJob}
1257
    @param job: the job to be processed
1258

1259
    """
1260
    assert job.writable, "Expected writable job"
1261

    
1262
    # Ensure only one worker is active on a single job. If a job registers for
1263
    # a dependency job, and the other job notifies before the first worker is
1264
    # done, the job can end up in the tasklist more than once.
1265
    job.processor_lock.acquire()
1266
    try:
1267
      return self._RunTaskInner(job)
1268
    finally:
1269
      job.processor_lock.release()
1270

    
1271
  def _RunTaskInner(self, job):
1272
    """Executes a job.
1273

1274
    Must be called with per-job lock acquired.
1275

1276
    """
1277
    queue = job.queue
1278
    assert queue == self.pool.queue
1279

    
1280
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1281
    setname_fn(None)
1282

    
1283
    proc = mcpu.Processor(queue.context, job.id)
1284

    
1285
    # Create wrapper for setting thread name
1286
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1287
                                    proc.ExecOpCode)
1288

    
1289
    _EvaluateJobProcessorResult(queue.depmgr, job,
1290
                                _JobProcessor(queue, wrap_execop_fn, job)())
1291

    
1292
  @staticmethod
1293
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1294
    """Updates the worker thread name to include a short summary of the opcode.
1295

1296
    @param setname_fn: Callable setting worker thread name
1297
    @param execop_fn: Callable for executing opcode (usually
1298
                      L{mcpu.Processor.ExecOpCode})
1299

1300
    """
1301
    setname_fn(op)
1302
    try:
1303
      return execop_fn(op, *args, **kwargs)
1304
    finally:
1305
      setname_fn(None)
1306

    
1307
  @staticmethod
1308
  def _GetWorkerName(job, op):
1309
    """Sets the worker thread name.
1310

1311
    @type job: L{_QueuedJob}
1312
    @type op: L{opcodes.OpCode}
1313

1314
    """
1315
    parts = ["Job%s" % job.id]
1316

    
1317
    if op:
1318
      parts.append(op.TinySummary())
1319

    
1320
    return "/".join(parts)
1321

    
1322

    
1323
class _JobQueueWorkerPool(workerpool.WorkerPool):
1324
  """Simple class implementing a job-processing workerpool.
1325

1326
  """
1327
  def __init__(self, queue):
1328
    super(_JobQueueWorkerPool, self).__init__("Jq",
1329
                                              JOBQUEUE_THREADS,
1330
                                              _JobQueueWorker)
1331
    self.queue = queue
1332

    
1333

    
1334
class _JobDependencyManager:
1335
  """Keeps track of job dependencies.
1336

1337
  """
1338
  (WAIT,
1339
   ERROR,
1340
   CANCEL,
1341
   CONTINUE,
1342
   WRONGSTATUS) = range(1, 6)
1343

    
1344
  def __init__(self, getstatus_fn, enqueue_fn):
1345
    """Initializes this class.
1346

1347
    """
1348
    self._getstatus_fn = getstatus_fn
1349
    self._enqueue_fn = enqueue_fn
1350

    
1351
    self._waiters = {}
1352
    self._lock = locking.SharedLock("JobDepMgr")
1353

    
1354
  @locking.ssynchronized(_LOCK, shared=1)
1355
  def GetLockInfo(self, requested): # pylint: disable=W0613
1356
    """Retrieves information about waiting jobs.
1357

1358
    @type requested: set
1359
    @param requested: Requested information, see C{query.LQ_*}
1360

1361
    """
1362
    # No need to sort here, that's being done by the lock manager and query
1363
    # library. There are no priorities for notifying jobs, hence all show up as
1364
    # one item under "pending".
1365
    return [("job/%s" % job_id, None, None,
1366
             [("job", [job.id for job in waiters])])
1367
            for job_id, waiters in self._waiters.items()
1368
            if waiters]
1369

    
1370
  @locking.ssynchronized(_LOCK, shared=1)
1371
  def JobWaiting(self, job):
1372
    """Checks if a job is waiting.
1373

1374
    """
1375
    return compat.any(job in jobs
1376
                      for jobs in self._waiters.values())
1377

    
1378
  @locking.ssynchronized(_LOCK)
1379
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1380
    """Checks if a dependency job has the requested status.
1381

1382
    If the other job is not yet in a finalized status, the calling job will be
1383
    notified (re-added to the workerpool) at a later point.
1384

1385
    @type job: L{_QueuedJob}
1386
    @param job: Job object
1387
    @type dep_job_id: int
1388
    @param dep_job_id: ID of dependency job
1389
    @type dep_status: list
1390
    @param dep_status: Required status
1391

1392
    """
1393
    assert ht.TJobId(job.id)
1394
    assert ht.TJobId(dep_job_id)
1395
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1396

    
1397
    if job.id == dep_job_id:
1398
      return (self.ERROR, "Job can't depend on itself")
1399

    
1400
    # Get status of dependency job
1401
    try:
1402
      status = self._getstatus_fn(dep_job_id)
1403
    except errors.JobLost, err:
1404
      return (self.ERROR, "Dependency error: %s" % err)
1405

    
1406
    assert status in constants.JOB_STATUS_ALL
1407

    
1408
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1409

    
1410
    if status not in constants.JOBS_FINALIZED:
1411
      # Register for notification and wait for job to finish
1412
      job_id_waiters.add(job)
1413
      return (self.WAIT,
1414
              "Need to wait for job %s, wanted status '%s'" %
1415
              (dep_job_id, dep_status))
1416

    
1417
    # Remove from waiters list
1418
    if job in job_id_waiters:
1419
      job_id_waiters.remove(job)
1420

    
1421
    if (status == constants.JOB_STATUS_CANCELED and
1422
        constants.JOB_STATUS_CANCELED not in dep_status):
1423
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1424

    
1425
    elif not dep_status or status in dep_status:
1426
      return (self.CONTINUE,
1427
              "Dependency job %s finished with status '%s'" %
1428
              (dep_job_id, status))
1429

    
1430
    else:
1431
      return (self.WRONGSTATUS,
1432
              "Dependency job %s finished with status '%s',"
1433
              " not one of '%s' as required" %
1434
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1435

    
1436
  def _RemoveEmptyWaitersUnlocked(self):
1437
    """Remove all jobs without actual waiters.
1438

1439
    """
1440
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1441
                   if not waiters]:
1442
      del self._waiters[job_id]
1443

    
1444
  def NotifyWaiters(self, job_id):
1445
    """Notifies all jobs waiting for a certain job ID.
1446

1447
    @attention: Do not call until L{CheckAndRegister} returned a status other
1448
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1449
    @type job_id: int
1450
    @param job_id: Job ID
1451

1452
    """
1453
    assert ht.TJobId(job_id)
1454

    
1455
    self._lock.acquire()
1456
    try:
1457
      self._RemoveEmptyWaitersUnlocked()
1458

    
1459
      jobs = self._waiters.pop(job_id, None)
1460
    finally:
1461
      self._lock.release()
1462

    
1463
    if jobs:
1464
      # Re-add jobs to workerpool
1465
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1466
                    len(jobs), job_id)
1467
      self._enqueue_fn(jobs)
1468

    
1469

    
1470
def _RequireOpenQueue(fn):
1471
  """Decorator for "public" functions.
1472

1473
  This function should be used for all 'public' functions. That is,
1474
  functions usually called from other classes. Note that this should
1475
  be applied only to methods (not plain functions), since it expects
1476
  that the decorated function is called with a first argument that has
1477
  a '_queue_filelock' argument.
1478

1479
  @warning: Use this decorator only after locking.ssynchronized
1480

1481
  Example::
1482
    @locking.ssynchronized(_LOCK)
1483
    @_RequireOpenQueue
1484
    def Example(self):
1485
      pass
1486

1487
  """
1488
  def wrapper(self, *args, **kwargs):
1489
    # pylint: disable=W0212
1490
    assert self._queue_filelock is not None, "Queue should be open"
1491
    return fn(self, *args, **kwargs)
1492
  return wrapper
1493

    
1494

    
1495
def _RequireNonDrainedQueue(fn):
1496
  """Decorator checking for a non-drained queue.
1497

1498
  To be used with functions submitting new jobs.
1499

1500
  """
1501
  def wrapper(self, *args, **kwargs):
1502
    """Wrapper function.
1503

1504
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1505

1506
    """
1507
    # Ok when sharing the big job queue lock, as the drain file is created when
1508
    # the lock is exclusive.
1509
    # Needs access to protected member, pylint: disable=W0212
1510
    if self._drained:
1511
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1512

    
1513
    if not self._accepting_jobs:
1514
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1515

    
1516
    return fn(self, *args, **kwargs)
1517
  return wrapper
1518

    
1519

    
1520
class JobQueue(object):
1521
  """Queue used to manage the jobs.
1522

1523
  """
1524
  def __init__(self, context):
1525
    """Constructor for JobQueue.
1526

1527
    The constructor will initialize the job queue object and then
1528
    start loading the current jobs from disk, either for starting them
1529
    (if they were queue) or for aborting them (if they were already
1530
    running).
1531

1532
    @type context: GanetiContext
1533
    @param context: the context object for access to the configuration
1534
        data and other ganeti objects
1535

1536
    """
1537
    self.context = context
1538
    self._memcache = weakref.WeakValueDictionary()
1539
    self._my_hostname = netutils.Hostname.GetSysName()
1540

    
1541
    # The Big JobQueue lock. If a code block or method acquires it in shared
1542
    # mode safe it must guarantee concurrency with all the code acquiring it in
1543
    # shared mode, including itself. In order not to acquire it at all
1544
    # concurrency must be guaranteed with all code acquiring it in shared mode
1545
    # and all code acquiring it exclusively.
1546
    self._lock = locking.SharedLock("JobQueue")
1547

    
1548
    self.acquire = self._lock.acquire
1549
    self.release = self._lock.release
1550

    
1551
    # Accept jobs by default
1552
    self._accepting_jobs = True
1553

    
1554
    # Initialize the queue, and acquire the filelock.
1555
    # This ensures no other process is working on the job queue.
1556
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1557

    
1558
    # Read serial file
1559
    self._last_serial = jstore.ReadSerial()
1560
    assert self._last_serial is not None, ("Serial file was modified between"
1561
                                           " check in jstore and here")
1562

    
1563
    # Get initial list of nodes
1564
    self._nodes = dict((n.name, n.primary_ip)
1565
                       for n in self.context.cfg.GetAllNodesInfo().values()
1566
                       if n.master_candidate)
1567

    
1568
    # Remove master node
1569
    self._nodes.pop(self._my_hostname, None)
1570

    
1571
    # TODO: Check consistency across nodes
1572

    
1573
    self._queue_size = None
1574
    self._UpdateQueueSizeUnlocked()
1575
    assert ht.TInt(self._queue_size)
1576
    self._drained = jstore.CheckDrainFlag()
1577

    
1578
    # Job dependencies
1579
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1580
                                        self._EnqueueJobs)
1581
    self.context.glm.AddToLockMonitor(self.depmgr)
1582

    
1583
    # Setup worker pool
1584
    self._wpool = _JobQueueWorkerPool(self)
1585
    try:
1586
      self._InspectQueue()
1587
    except:
1588
      self._wpool.TerminateWorkers()
1589
      raise
1590

    
1591
  @locking.ssynchronized(_LOCK)
1592
  @_RequireOpenQueue
1593
  def _InspectQueue(self):
1594
    """Loads the whole job queue and resumes unfinished jobs.
1595

1596
    This function needs the lock here because WorkerPool.AddTask() may start a
1597
    job while we're still doing our work.
1598

1599
    """
1600
    logging.info("Inspecting job queue")
1601

    
1602
    restartjobs = []
1603

    
1604
    all_job_ids = self._GetJobIDsUnlocked()
1605
    jobs_count = len(all_job_ids)
1606
    lastinfo = time.time()
1607
    for idx, job_id in enumerate(all_job_ids):
1608
      # Give an update every 1000 jobs or 10 seconds
1609
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1610
          idx == (jobs_count - 1)):
1611
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1612
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1613
        lastinfo = time.time()
1614

    
1615
      job = self._LoadJobUnlocked(job_id)
1616

    
1617
      # a failure in loading the job can cause 'None' to be returned
1618
      if job is None:
1619
        continue
1620

    
1621
      status = job.CalcStatus()
1622

    
1623
      if status == constants.JOB_STATUS_QUEUED:
1624
        restartjobs.append(job)
1625

    
1626
      elif status in (constants.JOB_STATUS_RUNNING,
1627
                      constants.JOB_STATUS_WAITING,
1628
                      constants.JOB_STATUS_CANCELING):
1629
        logging.warning("Unfinished job %s found: %s", job.id, job)
1630

    
1631
        if status == constants.JOB_STATUS_WAITING:
1632
          # Restart job
1633
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1634
          restartjobs.append(job)
1635
        else:
1636
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1637
                                "Unclean master daemon shutdown")
1638
          job.Finalize()
1639

    
1640
        self.UpdateJobUnlocked(job)
1641

    
1642
    if restartjobs:
1643
      logging.info("Restarting %s jobs", len(restartjobs))
1644
      self._EnqueueJobsUnlocked(restartjobs)
1645

    
1646
    logging.info("Job queue inspection finished")
1647

    
1648
  def _GetRpc(self, address_list):
1649
    """Gets RPC runner with context.
1650

1651
    """
1652
    return rpc.JobQueueRunner(self.context, address_list)
1653

    
1654
  @locking.ssynchronized(_LOCK)
1655
  @_RequireOpenQueue
1656
  def AddNode(self, node):
1657
    """Register a new node with the queue.
1658

1659
    @type node: L{objects.Node}
1660
    @param node: the node object to be added
1661

1662
    """
1663
    node_name = node.name
1664
    assert node_name != self._my_hostname
1665

    
1666
    # Clean queue directory on added node
1667
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1668
    msg = result.fail_msg
1669
    if msg:
1670
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1671
                      node_name, msg)
1672

    
1673
    if not node.master_candidate:
1674
      # remove if existing, ignoring errors
1675
      self._nodes.pop(node_name, None)
1676
      # and skip the replication of the job ids
1677
      return
1678

    
1679
    # Upload the whole queue excluding archived jobs
1680
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1681

    
1682
    # Upload current serial file
1683
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1684

    
1685
    # Static address list
1686
    addrs = [node.primary_ip]
1687

    
1688
    for file_name in files:
1689
      # Read file content
1690
      content = utils.ReadFile(file_name)
1691

    
1692
      result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name,
1693
                                                        content)
1694
      msg = result[node_name].fail_msg
1695
      if msg:
1696
        logging.error("Failed to upload file %s to node %s: %s",
1697
                      file_name, node_name, msg)
1698

    
1699
    self._nodes[node_name] = node.primary_ip
1700

    
1701
  @locking.ssynchronized(_LOCK)
1702
  @_RequireOpenQueue
1703
  def RemoveNode(self, node_name):
1704
    """Callback called when removing nodes from the cluster.
1705

1706
    @type node_name: str
1707
    @param node_name: the name of the node to remove
1708

1709
    """
1710
    self._nodes.pop(node_name, None)
1711

    
1712
  @staticmethod
1713
  def _CheckRpcResult(result, nodes, failmsg):
1714
    """Verifies the status of an RPC call.
1715

1716
    Since we aim to keep consistency should this node (the current
1717
    master) fail, we will log errors if our rpc fail, and especially
1718
    log the case when more than half of the nodes fails.
1719

1720
    @param result: the data as returned from the rpc call
1721
    @type nodes: list
1722
    @param nodes: the list of nodes we made the call to
1723
    @type failmsg: str
1724
    @param failmsg: the identifier to be used for logging
1725

1726
    """
1727
    failed = []
1728
    success = []
1729

    
1730
    for node in nodes:
1731
      msg = result[node].fail_msg
1732
      if msg:
1733
        failed.append(node)
1734
        logging.error("RPC call %s (%s) failed on node %s: %s",
1735
                      result[node].call, failmsg, node, msg)
1736
      else:
1737
        success.append(node)
1738

    
1739
    # +1 for the master node
1740
    if (len(success) + 1) < len(failed):
1741
      # TODO: Handle failing nodes
1742
      logging.error("More than half of the nodes failed")
1743

    
1744
  def _GetNodeIp(self):
1745
    """Helper for returning the node name/ip list.
1746

1747
    @rtype: (list, list)
1748
    @return: a tuple of two lists, the first one with the node
1749
        names and the second one with the node addresses
1750

1751
    """
1752
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1753
    name_list = self._nodes.keys()
1754
    addr_list = [self._nodes[name] for name in name_list]
1755
    return name_list, addr_list
1756

    
1757
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1758
    """Writes a file locally and then replicates it to all nodes.
1759

1760
    This function will replace the contents of a file on the local
1761
    node and then replicate it to all the other nodes we have.
1762

1763
    @type file_name: str
1764
    @param file_name: the path of the file to be replicated
1765
    @type data: str
1766
    @param data: the new contents of the file
1767
    @type replicate: boolean
1768
    @param replicate: whether to spread the changes to the remote nodes
1769

1770
    """
1771
    getents = runtime.GetEnts()
1772
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1773
                    gid=getents.masterd_gid)
1774

    
1775
    if replicate:
1776
      names, addrs = self._GetNodeIp()
1777
      result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data)
1778
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1779

    
1780
  def _RenameFilesUnlocked(self, rename):
1781
    """Renames a file locally and then replicate the change.
1782

1783
    This function will rename a file in the local queue directory
1784
    and then replicate this rename to all the other nodes we have.
1785

1786
    @type rename: list of (old, new)
1787
    @param rename: List containing tuples mapping old to new names
1788

1789
    """
1790
    # Rename them locally
1791
    for old, new in rename:
1792
      utils.RenameFile(old, new, mkdir=True)
1793

    
1794
    # ... and on all nodes
1795
    names, addrs = self._GetNodeIp()
1796
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1797
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1798

    
1799
  def _NewSerialsUnlocked(self, count):
1800
    """Generates a new job identifier.
1801

1802
    Job identifiers are unique during the lifetime of a cluster.
1803

1804
    @type count: integer
1805
    @param count: how many serials to return
1806
    @rtype: list of int
1807
    @return: a list of job identifiers.
1808

1809
    """
1810
    assert ht.TPositiveInt(count)
1811

    
1812
    # New number
1813
    serial = self._last_serial + count
1814

    
1815
    # Write to file
1816
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1817
                             "%s\n" % serial, True)
1818

    
1819
    result = [jstore.FormatJobID(v)
1820
              for v in range(self._last_serial + 1, serial + 1)]
1821

    
1822
    # Keep it only if we were able to write the file
1823
    self._last_serial = serial
1824

    
1825
    assert len(result) == count
1826

    
1827
    return result
1828

    
1829
  @staticmethod
1830
  def _GetJobPath(job_id):
1831
    """Returns the job file for a given job id.
1832

1833
    @type job_id: str
1834
    @param job_id: the job identifier
1835
    @rtype: str
1836
    @return: the path to the job file
1837

1838
    """
1839
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1840

    
1841
  @staticmethod
1842
  def _GetArchivedJobPath(job_id):
1843
    """Returns the archived job file for a give job id.
1844

1845
    @type job_id: str
1846
    @param job_id: the job identifier
1847
    @rtype: str
1848
    @return: the path to the archived job file
1849

1850
    """
1851
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1852
                          jstore.GetArchiveDirectory(job_id),
1853
                          "job-%s" % job_id)
1854

    
1855
  @staticmethod
1856
  def _GetJobIDsUnlocked(sort=True):
1857
    """Return all known job IDs.
1858

1859
    The method only looks at disk because it's a requirement that all
1860
    jobs are present on disk (so in the _memcache we don't have any
1861
    extra IDs).
1862

1863
    @type sort: boolean
1864
    @param sort: perform sorting on the returned job ids
1865
    @rtype: list
1866
    @return: the list of job IDs
1867

1868
    """
1869
    jlist = []
1870
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1871
      m = constants.JOB_FILE_RE.match(filename)
1872
      if m:
1873
        jlist.append(int(m.group(1)))
1874
    if sort:
1875
      jlist.sort()
1876
    return jlist
1877

    
1878
  def _LoadJobUnlocked(self, job_id):
1879
    """Loads a job from the disk or memory.
1880

1881
    Given a job id, this will return the cached job object if
1882
    existing, or try to load the job from the disk. If loading from
1883
    disk, it will also add the job to the cache.
1884

1885
    @type job_id: int
1886
    @param job_id: the job id
1887
    @rtype: L{_QueuedJob} or None
1888
    @return: either None or the job object
1889

1890
    """
1891
    job = self._memcache.get(job_id, None)
1892
    if job:
1893
      logging.debug("Found job %s in memcache", job_id)
1894
      assert job.writable, "Found read-only job in memcache"
1895
      return job
1896

    
1897
    try:
1898
      job = self._LoadJobFromDisk(job_id, False)
1899
      if job is None:
1900
        return job
1901
    except errors.JobFileCorrupted:
1902
      old_path = self._GetJobPath(job_id)
1903
      new_path = self._GetArchivedJobPath(job_id)
1904
      if old_path == new_path:
1905
        # job already archived (future case)
1906
        logging.exception("Can't parse job %s", job_id)
1907
      else:
1908
        # non-archived case
1909
        logging.exception("Can't parse job %s, will archive.", job_id)
1910
        self._RenameFilesUnlocked([(old_path, new_path)])
1911
      return None
1912

    
1913
    assert job.writable, "Job just loaded is not writable"
1914

    
1915
    self._memcache[job_id] = job
1916
    logging.debug("Added job %s to the cache", job_id)
1917
    return job
1918

    
1919
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1920
    """Load the given job file from disk.
1921

1922
    Given a job file, read, load and restore it in a _QueuedJob format.
1923

1924
    @type job_id: int
1925
    @param job_id: job identifier
1926
    @type try_archived: bool
1927
    @param try_archived: Whether to try loading an archived job
1928
    @rtype: L{_QueuedJob} or None
1929
    @return: either None or the job object
1930

1931
    """
1932
    path_functions = [(self._GetJobPath, True)]
1933

    
1934
    if try_archived:
1935
      path_functions.append((self._GetArchivedJobPath, False))
1936

    
1937
    raw_data = None
1938
    writable_default = None
1939

    
1940
    for (fn, writable_default) in path_functions:
1941
      filepath = fn(job_id)
1942
      logging.debug("Loading job from %s", filepath)
1943
      try:
1944
        raw_data = utils.ReadFile(filepath)
1945
      except EnvironmentError, err:
1946
        if err.errno != errno.ENOENT:
1947
          raise
1948
      else:
1949
        break
1950

    
1951
    if not raw_data:
1952
      return None
1953

    
1954
    if writable is None:
1955
      writable = writable_default
1956

    
1957
    try:
1958
      data = serializer.LoadJson(raw_data)
1959
      job = _QueuedJob.Restore(self, data, writable)
1960
    except Exception, err: # pylint: disable=W0703
1961
      raise errors.JobFileCorrupted(err)
1962

    
1963
    return job
1964

    
1965
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1966
    """Load the given job file from disk.
1967

1968
    Given a job file, read, load and restore it in a _QueuedJob format.
1969
    In case of error reading the job, it gets returned as None, and the
1970
    exception is logged.
1971

1972
    @type job_id: int
1973
    @param job_id: job identifier
1974
    @type try_archived: bool
1975
    @param try_archived: Whether to try loading an archived job
1976
    @rtype: L{_QueuedJob} or None
1977
    @return: either None or the job object
1978

1979
    """
1980
    try:
1981
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
1982
    except (errors.JobFileCorrupted, EnvironmentError):
1983
      logging.exception("Can't load/parse job %s", job_id)
1984
      return None
1985

    
1986
  def _UpdateQueueSizeUnlocked(self):
1987
    """Update the queue size.
1988

1989
    """
1990
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1991

    
1992
  @locking.ssynchronized(_LOCK)
1993
  @_RequireOpenQueue
1994
  def SetDrainFlag(self, drain_flag):
1995
    """Sets the drain flag for the queue.
1996

1997
    @type drain_flag: boolean
1998
    @param drain_flag: Whether to set or unset the drain flag
1999

2000
    """
2001
    jstore.SetDrainFlag(drain_flag)
2002

    
2003
    self._drained = drain_flag
2004

    
2005
    return True
2006

    
2007
  @_RequireOpenQueue
2008
  def _SubmitJobUnlocked(self, job_id, ops):
2009
    """Create and store a new job.
2010

2011
    This enters the job into our job queue and also puts it on the new
2012
    queue, in order for it to be picked up by the queue processors.
2013

2014
    @type job_id: job ID
2015
    @param job_id: the job ID for the new job
2016
    @type ops: list
2017
    @param ops: The list of OpCodes that will become the new job.
2018
    @rtype: L{_QueuedJob}
2019
    @return: the job object to be queued
2020
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2021
    @raise errors.GenericError: If an opcode is not valid
2022

2023
    """
2024
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2025
      raise errors.JobQueueFull()
2026

    
2027
    job = _QueuedJob(self, job_id, ops, True)
2028

    
2029
    # Check priority
2030
    for idx, op in enumerate(job.ops):
2031
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2032
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2033
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2034
                                  " are %s" % (idx, op.priority, allowed))
2035

    
2036
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2037
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2038
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2039
                                  " match %s: %s" %
2040
                                  (idx, opcodes.TNoRelativeJobDependencies,
2041
                                   dependencies))
2042

    
2043
    # Write to disk
2044
    self.UpdateJobUnlocked(job)
2045

    
2046
    self._queue_size += 1
2047

    
2048
    logging.debug("Adding new job %s to the cache", job_id)
2049
    self._memcache[job_id] = job
2050

    
2051
    return job
2052

    
2053
  @locking.ssynchronized(_LOCK)
2054
  @_RequireOpenQueue
2055
  @_RequireNonDrainedQueue
2056
  def SubmitJob(self, ops):
2057
    """Create and store a new job.
2058

2059
    @see: L{_SubmitJobUnlocked}
2060

2061
    """
2062
    (job_id, ) = self._NewSerialsUnlocked(1)
2063
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2064
    return job_id
2065

    
2066
  @locking.ssynchronized(_LOCK)
2067
  @_RequireOpenQueue
2068
  @_RequireNonDrainedQueue
2069
  def SubmitManyJobs(self, jobs):
2070
    """Create and store multiple jobs.
2071

2072
    @see: L{_SubmitJobUnlocked}
2073

2074
    """
2075
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2076

    
2077
    (results, added_jobs) = \
2078
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2079

    
2080
    self._EnqueueJobsUnlocked(added_jobs)
2081

    
2082
    return results
2083

    
2084
  @staticmethod
2085
  def _FormatSubmitError(msg, ops):
2086
    """Formats errors which occurred while submitting a job.
2087

2088
    """
2089
    return ("%s; opcodes %s" %
2090
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2091

    
2092
  @staticmethod
2093
  def _ResolveJobDependencies(resolve_fn, deps):
2094
    """Resolves relative job IDs in dependencies.
2095

2096
    @type resolve_fn: callable
2097
    @param resolve_fn: Function to resolve a relative job ID
2098
    @type deps: list
2099
    @param deps: Dependencies
2100
    @rtype: list
2101
    @return: Resolved dependencies
2102

2103
    """
2104
    result = []
2105

    
2106
    for (dep_job_id, dep_status) in deps:
2107
      if ht.TRelativeJobId(dep_job_id):
2108
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2109
        try:
2110
          job_id = resolve_fn(dep_job_id)
2111
        except IndexError:
2112
          # Abort
2113
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2114
      else:
2115
        job_id = dep_job_id
2116

    
2117
      result.append((job_id, dep_status))
2118

    
2119
    return (True, result)
2120

    
2121
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2122
    """Create and store multiple jobs.
2123

2124
    @see: L{_SubmitJobUnlocked}
2125

2126
    """
2127
    results = []
2128
    added_jobs = []
2129

    
2130
    def resolve_fn(job_idx, reljobid):
2131
      assert reljobid < 0
2132
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2133

    
2134
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2135
      for op in ops:
2136
        if getattr(op, opcodes.DEPEND_ATTR, None):
2137
          (status, data) = \
2138
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2139
                                         op.depends)
2140
          if not status:
2141
            # Abort resolving dependencies
2142
            assert ht.TNonEmptyString(data), "No error message"
2143
            break
2144
          # Use resolved dependencies
2145
          op.depends = data
2146
      else:
2147
        try:
2148
          job = self._SubmitJobUnlocked(job_id, ops)
2149
        except errors.GenericError, err:
2150
          status = False
2151
          data = self._FormatSubmitError(str(err), ops)
2152
        else:
2153
          status = True
2154
          data = job_id
2155
          added_jobs.append(job)
2156

    
2157
      results.append((status, data))
2158

    
2159
    return (results, added_jobs)
2160

    
2161
  @locking.ssynchronized(_LOCK)
2162
  def _EnqueueJobs(self, jobs):
2163
    """Helper function to add jobs to worker pool's queue.
2164

2165
    @type jobs: list
2166
    @param jobs: List of all jobs
2167

2168
    """
2169
    return self._EnqueueJobsUnlocked(jobs)
2170

    
2171
  def _EnqueueJobsUnlocked(self, jobs):
2172
    """Helper function to add jobs to worker pool's queue.
2173

2174
    @type jobs: list
2175
    @param jobs: List of all jobs
2176

2177
    """
2178
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2179
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2180
                             priority=[job.CalcPriority() for job in jobs])
2181

    
2182
  def _GetJobStatusForDependencies(self, job_id):
2183
    """Gets the status of a job for dependencies.
2184

2185
    @type job_id: int
2186
    @param job_id: Job ID
2187
    @raise errors.JobLost: If job can't be found
2188

2189
    """
2190
    # Not using in-memory cache as doing so would require an exclusive lock
2191

    
2192
    # Try to load from disk
2193
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2194

    
2195
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2196

    
2197
    if job:
2198
      return job.CalcStatus()
2199

    
2200
    raise errors.JobLost("Job %s not found" % job_id)
2201

    
2202
  @_RequireOpenQueue
2203
  def UpdateJobUnlocked(self, job, replicate=True):
2204
    """Update a job's on disk storage.
2205

2206
    After a job has been modified, this function needs to be called in
2207
    order to write the changes to disk and replicate them to the other
2208
    nodes.
2209

2210
    @type job: L{_QueuedJob}
2211
    @param job: the changed job
2212
    @type replicate: boolean
2213
    @param replicate: whether to replicate the change to remote nodes
2214

2215
    """
2216
    if __debug__:
2217
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2218
      assert (finalized ^ (job.end_timestamp is None))
2219
      assert job.writable, "Can't update read-only job"
2220

    
2221
    filename = self._GetJobPath(job.id)
2222
    data = serializer.DumpJson(job.Serialize())
2223
    logging.debug("Writing job %s to %s", job.id, filename)
2224
    self._UpdateJobQueueFile(filename, data, replicate)
2225

    
2226
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2227
                        timeout):
2228
    """Waits for changes in a job.
2229

2230
    @type job_id: int
2231
    @param job_id: Job identifier
2232
    @type fields: list of strings
2233
    @param fields: Which fields to check for changes
2234
    @type prev_job_info: list or None
2235
    @param prev_job_info: Last job information returned
2236
    @type prev_log_serial: int
2237
    @param prev_log_serial: Last job message serial number
2238
    @type timeout: float
2239
    @param timeout: maximum time to wait in seconds
2240
    @rtype: tuple (job info, log entries)
2241
    @return: a tuple of the job information as required via
2242
        the fields parameter, and the log entries as a list
2243

2244
        if the job has not changed and the timeout has expired,
2245
        we instead return a special value,
2246
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2247
        as such by the clients
2248

2249
    """
2250
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
2251
                             writable=False)
2252

    
2253
    helper = _WaitForJobChangesHelper()
2254

    
2255
    return helper(self._GetJobPath(job_id), load_fn,
2256
                  fields, prev_job_info, prev_log_serial, timeout)
2257

    
2258
  @locking.ssynchronized(_LOCK)
2259
  @_RequireOpenQueue
2260
  def CancelJob(self, job_id):
2261
    """Cancels a job.
2262

2263
    This will only succeed if the job has not started yet.
2264

2265
    @type job_id: int
2266
    @param job_id: job ID of job to be cancelled.
2267

2268
    """
2269
    logging.info("Cancelling job %s", job_id)
2270

    
2271
    job = self._LoadJobUnlocked(job_id)
2272
    if not job:
2273
      logging.debug("Job %s not found", job_id)
2274
      return (False, "Job %s not found" % job_id)
2275

    
2276
    assert job.writable, "Can't cancel read-only job"
2277

    
2278
    (success, msg) = job.Cancel()
2279

    
2280
    if success:
2281
      # If the job was finalized (e.g. cancelled), this is the final write
2282
      # allowed. The job can be archived anytime.
2283
      self.UpdateJobUnlocked(job)
2284

    
2285
    return (success, msg)
2286

    
2287
  @_RequireOpenQueue
2288
  def _ArchiveJobsUnlocked(self, jobs):
2289
    """Archives jobs.
2290

2291
    @type jobs: list of L{_QueuedJob}
2292
    @param jobs: Job objects
2293
    @rtype: int
2294
    @return: Number of archived jobs
2295

2296
    """
2297
    archive_jobs = []
2298
    rename_files = []
2299
    for job in jobs:
2300
      assert job.writable, "Can't archive read-only job"
2301

    
2302
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2303
        logging.debug("Job %s is not yet done", job.id)
2304
        continue
2305

    
2306
      archive_jobs.append(job)
2307

    
2308
      old = self._GetJobPath(job.id)
2309
      new = self._GetArchivedJobPath(job.id)
2310
      rename_files.append((old, new))
2311

    
2312
    # TODO: What if 1..n files fail to rename?
2313
    self._RenameFilesUnlocked(rename_files)
2314

    
2315
    logging.debug("Successfully archived job(s) %s",
2316
                  utils.CommaJoin(job.id for job in archive_jobs))
2317

    
2318
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2319
    # the files, we update the cached queue size from the filesystem. When we
2320
    # get around to fix the TODO: above, we can use the number of actually
2321
    # archived jobs to fix this.
2322
    self._UpdateQueueSizeUnlocked()
2323
    return len(archive_jobs)
2324

    
2325
  @locking.ssynchronized(_LOCK)
2326
  @_RequireOpenQueue
2327
  def ArchiveJob(self, job_id):
2328
    """Archives a job.
2329

2330
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2331

2332
    @type job_id: int
2333
    @param job_id: Job ID of job to be archived.
2334
    @rtype: bool
2335
    @return: Whether job was archived
2336

2337
    """
2338
    logging.info("Archiving job %s", job_id)
2339

    
2340
    job = self._LoadJobUnlocked(job_id)
2341
    if not job:
2342
      logging.debug("Job %s not found", job_id)
2343
      return False
2344

    
2345
    return self._ArchiveJobsUnlocked([job]) == 1
2346

    
2347
  @locking.ssynchronized(_LOCK)
2348
  @_RequireOpenQueue
2349
  def AutoArchiveJobs(self, age, timeout):
2350
    """Archives all jobs based on age.
2351

2352
    The method will archive all jobs which are older than the age
2353
    parameter. For jobs that don't have an end timestamp, the start
2354
    timestamp will be considered. The special '-1' age will cause
2355
    archival of all jobs (that are not running or queued).
2356

2357
    @type age: int
2358
    @param age: the minimum age in seconds
2359

2360
    """
2361
    logging.info("Archiving jobs with age more than %s seconds", age)
2362

    
2363
    now = time.time()
2364
    end_time = now + timeout
2365
    archived_count = 0
2366
    last_touched = 0
2367

    
2368
    all_job_ids = self._GetJobIDsUnlocked()
2369
    pending = []
2370
    for idx, job_id in enumerate(all_job_ids):
2371
      last_touched = idx + 1
2372

    
2373
      # Not optimal because jobs could be pending
2374
      # TODO: Measure average duration for job archival and take number of
2375
      # pending jobs into account.
2376
      if time.time() > end_time:
2377
        break
2378

    
2379
      # Returns None if the job failed to load
2380
      job = self._LoadJobUnlocked(job_id)
2381
      if job:
2382
        if job.end_timestamp is None:
2383
          if job.start_timestamp is None:
2384
            job_age = job.received_timestamp
2385
          else:
2386
            job_age = job.start_timestamp
2387
        else:
2388
          job_age = job.end_timestamp
2389

    
2390
        if age == -1 or now - job_age[0] > age:
2391
          pending.append(job)
2392

    
2393
          # Archive 10 jobs at a time
2394
          if len(pending) >= 10:
2395
            archived_count += self._ArchiveJobsUnlocked(pending)
2396
            pending = []
2397

    
2398
    if pending:
2399
      archived_count += self._ArchiveJobsUnlocked(pending)
2400

    
2401
    return (archived_count, len(all_job_ids) - last_touched)
2402

    
2403
  def _Query(self, fields, qfilter):
2404
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2405
                       namefield="id")
2406

    
2407
    job_ids = qobj.RequestedNames()
2408

    
2409
    list_all = (job_ids is None)
2410

    
2411
    if list_all:
2412
      # Since files are added to/removed from the queue atomically, there's no
2413
      # risk of getting the job ids in an inconsistent state.
2414
      job_ids = self._GetJobIDsUnlocked()
2415

    
2416
    jobs = []
2417

    
2418
    for job_id in job_ids:
2419
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2420
      if job is not None or not list_all:
2421
        jobs.append((job_id, job))
2422

    
2423
    return (qobj, jobs, list_all)
2424

    
2425
  def QueryJobs(self, fields, qfilter):
2426
    """Returns a list of jobs in queue.
2427

2428
    @type fields: sequence
2429
    @param fields: List of wanted fields
2430
    @type qfilter: None or query2 filter (list)
2431
    @param qfilter: Query filter
2432

2433
    """
2434
    (qobj, ctx, _) = self._Query(fields, qfilter)
2435

    
2436
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2437

    
2438
  def OldStyleQueryJobs(self, job_ids, fields):
2439
    """Returns a list of jobs in queue.
2440

2441
    @type job_ids: list
2442
    @param job_ids: sequence of job identifiers or None for all
2443
    @type fields: list
2444
    @param fields: names of fields to return
2445
    @rtype: list
2446
    @return: list one element per job, each element being list with
2447
        the requested fields
2448

2449
    """
2450
    # backwards compat:
2451
    job_ids = [int(jid) for jid in job_ids]
2452
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2453

    
2454
    (qobj, ctx, _) = self._Query(fields, qfilter)
2455

    
2456
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2457

    
2458
  @locking.ssynchronized(_LOCK)
2459
  def PrepareShutdown(self):
2460
    """Prepare to stop the job queue.
2461

2462
    Disables execution of jobs in the workerpool and returns whether there are
2463
    any jobs currently running. If the latter is the case, the job queue is not
2464
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2465
    be called without interfering with any job. Queued and unfinished jobs will
2466
    be resumed next time.
2467

2468
    Once this function has been called no new job submissions will be accepted
2469
    (see L{_RequireNonDrainedQueue}).
2470

2471
    @rtype: bool
2472
    @return: Whether there are any running jobs
2473

2474
    """
2475
    if self._accepting_jobs:
2476
      self._accepting_jobs = False
2477

    
2478
      # Tell worker pool to stop processing pending tasks
2479
      self._wpool.SetActive(False)
2480

    
2481
    return self._wpool.HasRunningTasks()
2482

    
2483
  @locking.ssynchronized(_LOCK)
2484
  @_RequireOpenQueue
2485
  def Shutdown(self):
2486
    """Stops the job queue.
2487

2488
    This shutdowns all the worker threads an closes the queue.
2489

2490
    """
2491
    self._wpool.TerminateWorkers()
2492

    
2493
    self._queue_filelock.Close()
2494
    self._queue_filelock = None