Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 04569469

History | View | Annotate | Download (72.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import time
35
import weakref
36
import threading
37
import itertools
38

    
39
try:
40
  # pylint: disable=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
59
from ganeti import ht
60
from ganeti import query
61
from ganeti import qlang
62
from ganeti import pathutils
63
from ganeti import vcluster
64

    
65

    
66
JOBQUEUE_THREADS = 25
67

    
68
# member lock names to be passed to @ssynchronized decorator
69
_LOCK = "_lock"
70
_QUEUE = "_queue"
71

    
72

    
73
class CancelJob(Exception):
74
  """Special exception to cancel a job.
75

76
  """
77

    
78

    
79
def TimeStampNow():
80
  """Returns the current timestamp.
81

82
  @rtype: tuple
83
  @return: the current time in the (seconds, microseconds) format
84

85
  """
86
  return utils.SplitTime(time.time())
87

    
88

    
89
def _CallJqUpdate(runner, names, file_name, content):
90
  """Updates job queue file after virtualizing filename.
91

92
  """
93
  virt_file_name = vcluster.MakeVirtualPath(file_name)
94
  return runner.call_jobqueue_update(names, virt_file_name, content)
95

    
96

    
97
class _SimpleJobQuery:
98
  """Wrapper for job queries.
99

100
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
101

102
  """
103
  def __init__(self, fields):
104
    """Initializes this class.
105

106
    """
107
    self._query = query.Query(query.JOB_FIELDS, fields)
108

    
109
  def __call__(self, job):
110
    """Executes a job query using cached field list.
111

112
    """
113
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
114

    
115

    
116
class _QueuedOpCode(object):
117
  """Encapsulates an opcode object.
118

119
  @ivar log: holds the execution log and consists of tuples
120
  of the form C{(log_serial, timestamp, level, message)}
121
  @ivar input: the OpCode we encapsulate
122
  @ivar status: the current status
123
  @ivar result: the result of the LU execution
124
  @ivar start_timestamp: timestamp for the start of the execution
125
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
126
  @ivar stop_timestamp: timestamp for the end of the execution
127

128
  """
129
  __slots__ = ["input", "status", "result", "log", "priority",
130
               "start_timestamp", "exec_timestamp", "end_timestamp",
131
               "__weakref__"]
132

    
133
  def __init__(self, op):
134
    """Initializes instances of this class.
135

136
    @type op: L{opcodes.OpCode}
137
    @param op: the opcode we encapsulate
138

139
    """
140
    self.input = op
141
    self.status = constants.OP_STATUS_QUEUED
142
    self.result = None
143
    self.log = []
144
    self.start_timestamp = None
145
    self.exec_timestamp = None
146
    self.end_timestamp = None
147

    
148
    # Get initial priority (it might change during the lifetime of this opcode)
149
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
150

    
151
  @classmethod
152
  def Restore(cls, state):
153
    """Restore the _QueuedOpCode from the serialized form.
154

155
    @type state: dict
156
    @param state: the serialized state
157
    @rtype: _QueuedOpCode
158
    @return: a new _QueuedOpCode instance
159

160
    """
161
    obj = _QueuedOpCode.__new__(cls)
162
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
163
    obj.status = state["status"]
164
    obj.result = state["result"]
165
    obj.log = state["log"]
166
    obj.start_timestamp = state.get("start_timestamp", None)
167
    obj.exec_timestamp = state.get("exec_timestamp", None)
168
    obj.end_timestamp = state.get("end_timestamp", None)
169
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
170
    return obj
171

    
172
  def Serialize(self):
173
    """Serializes this _QueuedOpCode.
174

175
    @rtype: dict
176
    @return: the dictionary holding the serialized state
177

178
    """
179
    return {
180
      "input": self.input.__getstate__(),
181
      "status": self.status,
182
      "result": self.result,
183
      "log": self.log,
184
      "start_timestamp": self.start_timestamp,
185
      "exec_timestamp": self.exec_timestamp,
186
      "end_timestamp": self.end_timestamp,
187
      "priority": self.priority,
188
      }
189

    
190

    
191
class _QueuedJob(object):
192
  """In-memory job representation.
193

194
  This is what we use to track the user-submitted jobs. Locking must
195
  be taken care of by users of this class.
196

197
  @type queue: L{JobQueue}
198
  @ivar queue: the parent queue
199
  @ivar id: the job ID
200
  @type ops: list
201
  @ivar ops: the list of _QueuedOpCode that constitute the job
202
  @type log_serial: int
203
  @ivar log_serial: holds the index for the next log entry
204
  @ivar received_timestamp: the timestamp for when the job was received
205
  @ivar start_timestmap: the timestamp for start of execution
206
  @ivar end_timestamp: the timestamp for end of execution
207
  @ivar writable: Whether the job is allowed to be modified
208

209
  """
210
  # pylint: disable=W0212
211
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
212
               "received_timestamp", "start_timestamp", "end_timestamp",
213
               "__weakref__", "processor_lock", "writable"]
214

    
215
  def __init__(self, queue, job_id, ops, writable):
216
    """Constructor for the _QueuedJob.
217

218
    @type queue: L{JobQueue}
219
    @param queue: our parent queue
220
    @type job_id: job_id
221
    @param job_id: our job id
222
    @type ops: list
223
    @param ops: the list of opcodes we hold, which will be encapsulated
224
        in _QueuedOpCodes
225
    @type writable: bool
226
    @param writable: Whether job can be modified
227

228
    """
229
    if not ops:
230
      raise errors.GenericError("A job needs at least one opcode")
231

    
232
    self.queue = queue
233
    self.id = int(job_id)
234
    self.ops = [_QueuedOpCode(op) for op in ops]
235
    self.log_serial = 0
236
    self.received_timestamp = TimeStampNow()
237
    self.start_timestamp = None
238
    self.end_timestamp = None
239

    
240
    self._InitInMemory(self, writable)
241

    
242
  @staticmethod
243
  def _InitInMemory(obj, writable):
244
    """Initializes in-memory variables.
245

246
    """
247
    obj.writable = writable
248
    obj.ops_iter = None
249
    obj.cur_opctx = None
250

    
251
    # Read-only jobs are not processed and therefore don't need a lock
252
    if writable:
253
      obj.processor_lock = threading.Lock()
254
    else:
255
      obj.processor_lock = None
256

    
257
  def __repr__(self):
258
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
259
              "id=%s" % self.id,
260
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
261

    
262
    return "<%s at %#x>" % (" ".join(status), id(self))
263

    
264
  @classmethod
265
  def Restore(cls, queue, state, writable):
266
    """Restore a _QueuedJob from serialized state:
267

268
    @type queue: L{JobQueue}
269
    @param queue: to which queue the restored job belongs
270
    @type state: dict
271
    @param state: the serialized state
272
    @type writable: bool
273
    @param writable: Whether job can be modified
274
    @rtype: _JobQueue
275
    @return: the restored _JobQueue instance
276

277
    """
278
    obj = _QueuedJob.__new__(cls)
279
    obj.queue = queue
280
    obj.id = int(state["id"])
281
    obj.received_timestamp = state.get("received_timestamp", None)
282
    obj.start_timestamp = state.get("start_timestamp", None)
283
    obj.end_timestamp = state.get("end_timestamp", None)
284

    
285
    obj.ops = []
286
    obj.log_serial = 0
287
    for op_state in state["ops"]:
288
      op = _QueuedOpCode.Restore(op_state)
289
      for log_entry in op.log:
290
        obj.log_serial = max(obj.log_serial, log_entry[0])
291
      obj.ops.append(op)
292

    
293
    cls._InitInMemory(obj, writable)
294

    
295
    return obj
296

    
297
  def Serialize(self):
298
    """Serialize the _JobQueue instance.
299

300
    @rtype: dict
301
    @return: the serialized state
302

303
    """
304
    return {
305
      "id": self.id,
306
      "ops": [op.Serialize() for op in self.ops],
307
      "start_timestamp": self.start_timestamp,
308
      "end_timestamp": self.end_timestamp,
309
      "received_timestamp": self.received_timestamp,
310
      }
311

    
312
  def CalcStatus(self):
313
    """Compute the status of this job.
314

315
    This function iterates over all the _QueuedOpCodes in the job and
316
    based on their status, computes the job status.
317

318
    The algorithm is:
319
      - if we find a cancelled, or finished with error, the job
320
        status will be the same
321
      - otherwise, the last opcode with the status one of:
322
          - waitlock
323
          - canceling
324
          - running
325

326
        will determine the job status
327

328
      - otherwise, it means either all opcodes are queued, or success,
329
        and the job status will be the same
330

331
    @return: the job status
332

333
    """
334
    status = constants.JOB_STATUS_QUEUED
335

    
336
    all_success = True
337
    for op in self.ops:
338
      if op.status == constants.OP_STATUS_SUCCESS:
339
        continue
340

    
341
      all_success = False
342

    
343
      if op.status == constants.OP_STATUS_QUEUED:
344
        pass
345
      elif op.status == constants.OP_STATUS_WAITING:
346
        status = constants.JOB_STATUS_WAITING
347
      elif op.status == constants.OP_STATUS_RUNNING:
348
        status = constants.JOB_STATUS_RUNNING
349
      elif op.status == constants.OP_STATUS_CANCELING:
350
        status = constants.JOB_STATUS_CANCELING
351
        break
352
      elif op.status == constants.OP_STATUS_ERROR:
353
        status = constants.JOB_STATUS_ERROR
354
        # The whole job fails if one opcode failed
355
        break
356
      elif op.status == constants.OP_STATUS_CANCELED:
357
        status = constants.OP_STATUS_CANCELED
358
        break
359

    
360
    if all_success:
361
      status = constants.JOB_STATUS_SUCCESS
362

    
363
    return status
364

    
365
  def CalcPriority(self):
366
    """Gets the current priority for this job.
367

368
    Only unfinished opcodes are considered. When all are done, the default
369
    priority is used.
370

371
    @rtype: int
372

373
    """
374
    priorities = [op.priority for op in self.ops
375
                  if op.status not in constants.OPS_FINALIZED]
376

    
377
    if not priorities:
378
      # All opcodes are done, assume default priority
379
      return constants.OP_PRIO_DEFAULT
380

    
381
    return min(priorities)
382

    
383
  def GetLogEntries(self, newer_than):
384
    """Selectively returns the log entries.
385

386
    @type newer_than: None or int
387
    @param newer_than: if this is None, return all log entries,
388
        otherwise return only the log entries with serial higher
389
        than this value
390
    @rtype: list
391
    @return: the list of the log entries selected
392

393
    """
394
    if newer_than is None:
395
      serial = -1
396
    else:
397
      serial = newer_than
398

    
399
    entries = []
400
    for op in self.ops:
401
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
402

    
403
    return entries
404

    
405
  def GetInfo(self, fields):
406
    """Returns information about a job.
407

408
    @type fields: list
409
    @param fields: names of fields to return
410
    @rtype: list
411
    @return: list with one element for each field
412
    @raise errors.OpExecError: when an invalid field
413
        has been passed
414

415
    """
416
    return _SimpleJobQuery(fields)(self)
417

    
418
  def MarkUnfinishedOps(self, status, result):
419
    """Mark unfinished opcodes with a given status and result.
420

421
    This is an utility function for marking all running or waiting to
422
    be run opcodes with a given status. Opcodes which are already
423
    finalised are not changed.
424

425
    @param status: a given opcode status
426
    @param result: the opcode result
427

428
    """
429
    not_marked = True
430
    for op in self.ops:
431
      if op.status in constants.OPS_FINALIZED:
432
        assert not_marked, "Finalized opcodes found after non-finalized ones"
433
        continue
434
      op.status = status
435
      op.result = result
436
      not_marked = False
437

    
438
  def Finalize(self):
439
    """Marks the job as finalized.
440

441
    """
442
    self.end_timestamp = TimeStampNow()
443

    
444
  def Cancel(self):
445
    """Marks job as canceled/-ing if possible.
446

447
    @rtype: tuple; (bool, string)
448
    @return: Boolean describing whether job was successfully canceled or marked
449
      as canceling and a text message
450

451
    """
452
    status = self.CalcStatus()
453

    
454
    if status == constants.JOB_STATUS_QUEUED:
455
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
456
                             "Job canceled by request")
457
      self.Finalize()
458
      return (True, "Job %s canceled" % self.id)
459

    
460
    elif status == constants.JOB_STATUS_WAITING:
461
      # The worker will notice the new status and cancel the job
462
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
463
      return (True, "Job %s will be canceled" % self.id)
464

    
465
    else:
466
      logging.debug("Job %s is no longer waiting in the queue", self.id)
467
      return (False, "Job %s is no longer waiting in the queue" % self.id)
468

    
469

    
470
class _OpExecCallbacks(mcpu.OpExecCbBase):
471
  def __init__(self, queue, job, op):
472
    """Initializes this class.
473

474
    @type queue: L{JobQueue}
475
    @param queue: Job queue
476
    @type job: L{_QueuedJob}
477
    @param job: Job object
478
    @type op: L{_QueuedOpCode}
479
    @param op: OpCode
480

481
    """
482
    assert queue, "Queue is missing"
483
    assert job, "Job is missing"
484
    assert op, "Opcode is missing"
485

    
486
    self._queue = queue
487
    self._job = job
488
    self._op = op
489

    
490
  def _CheckCancel(self):
491
    """Raises an exception to cancel the job if asked to.
492

493
    """
494
    # Cancel here if we were asked to
495
    if self._op.status == constants.OP_STATUS_CANCELING:
496
      logging.debug("Canceling opcode")
497
      raise CancelJob()
498

    
499
  @locking.ssynchronized(_QUEUE, shared=1)
500
  def NotifyStart(self):
501
    """Mark the opcode as running, not lock-waiting.
502

503
    This is called from the mcpu code as a notifier function, when the LU is
504
    finally about to start the Exec() method. Of course, to have end-user
505
    visible results, the opcode must be initially (before calling into
506
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
507

508
    """
509
    assert self._op in self._job.ops
510
    assert self._op.status in (constants.OP_STATUS_WAITING,
511
                               constants.OP_STATUS_CANCELING)
512

    
513
    # Cancel here if we were asked to
514
    self._CheckCancel()
515

    
516
    logging.debug("Opcode is now running")
517

    
518
    self._op.status = constants.OP_STATUS_RUNNING
519
    self._op.exec_timestamp = TimeStampNow()
520

    
521
    # And finally replicate the job status
522
    self._queue.UpdateJobUnlocked(self._job)
523

    
524
  @locking.ssynchronized(_QUEUE, shared=1)
525
  def _AppendFeedback(self, timestamp, log_type, log_msg):
526
    """Internal feedback append function, with locks
527

528
    """
529
    self._job.log_serial += 1
530
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
531
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
532

    
533
  def Feedback(self, *args):
534
    """Append a log entry.
535

536
    """
537
    assert len(args) < 3
538

    
539
    if len(args) == 1:
540
      log_type = constants.ELOG_MESSAGE
541
      log_msg = args[0]
542
    else:
543
      (log_type, log_msg) = args
544

    
545
    # The time is split to make serialization easier and not lose
546
    # precision.
547
    timestamp = utils.SplitTime(time.time())
548
    self._AppendFeedback(timestamp, log_type, log_msg)
549

    
550
  def CheckCancel(self):
551
    """Check whether job has been cancelled.
552

553
    """
554
    assert self._op.status in (constants.OP_STATUS_WAITING,
555
                               constants.OP_STATUS_CANCELING)
556

    
557
    # Cancel here if we were asked to
558
    self._CheckCancel()
559

    
560
  def SubmitManyJobs(self, jobs):
561
    """Submits jobs for processing.
562

563
    See L{JobQueue.SubmitManyJobs}.
564

565
    """
566
    # Locking is done in job queue
567
    return self._queue.SubmitManyJobs(jobs)
568

    
569

    
570
class _JobChangesChecker(object):
571
  def __init__(self, fields, prev_job_info, prev_log_serial):
572
    """Initializes this class.
573

574
    @type fields: list of strings
575
    @param fields: Fields requested by LUXI client
576
    @type prev_job_info: string
577
    @param prev_job_info: previous job info, as passed by the LUXI client
578
    @type prev_log_serial: string
579
    @param prev_log_serial: previous job serial, as passed by the LUXI client
580

581
    """
582
    self._squery = _SimpleJobQuery(fields)
583
    self._prev_job_info = prev_job_info
584
    self._prev_log_serial = prev_log_serial
585

    
586
  def __call__(self, job):
587
    """Checks whether job has changed.
588

589
    @type job: L{_QueuedJob}
590
    @param job: Job object
591

592
    """
593
    assert not job.writable, "Expected read-only job"
594

    
595
    status = job.CalcStatus()
596
    job_info = self._squery(job)
597
    log_entries = job.GetLogEntries(self._prev_log_serial)
598

    
599
    # Serializing and deserializing data can cause type changes (e.g. from
600
    # tuple to list) or precision loss. We're doing it here so that we get
601
    # the same modifications as the data received from the client. Without
602
    # this, the comparison afterwards might fail without the data being
603
    # significantly different.
604
    # TODO: we just deserialized from disk, investigate how to make sure that
605
    # the job info and log entries are compatible to avoid this further step.
606
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
607
    # efficient, though floats will be tricky
608
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
609
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
610

    
611
    # Don't even try to wait if the job is no longer running, there will be
612
    # no changes.
613
    if (status not in (constants.JOB_STATUS_QUEUED,
614
                       constants.JOB_STATUS_RUNNING,
615
                       constants.JOB_STATUS_WAITING) or
616
        job_info != self._prev_job_info or
617
        (log_entries and self._prev_log_serial != log_entries[0][0])):
618
      logging.debug("Job %s changed", job.id)
619
      return (job_info, log_entries)
620

    
621
    return None
622

    
623

    
624
class _JobFileChangesWaiter(object):
625
  def __init__(self, filename):
626
    """Initializes this class.
627

628
    @type filename: string
629
    @param filename: Path to job file
630
    @raises errors.InotifyError: if the notifier cannot be setup
631

632
    """
633
    self._wm = pyinotify.WatchManager()
634
    self._inotify_handler = \
635
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
636
    self._notifier = \
637
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
638
    try:
639
      self._inotify_handler.enable()
640
    except Exception:
641
      # pyinotify doesn't close file descriptors automatically
642
      self._notifier.stop()
643
      raise
644

    
645
  def _OnInotify(self, notifier_enabled):
646
    """Callback for inotify.
647

648
    """
649
    if not notifier_enabled:
650
      self._inotify_handler.enable()
651

    
652
  def Wait(self, timeout):
653
    """Waits for the job file to change.
654

655
    @type timeout: float
656
    @param timeout: Timeout in seconds
657
    @return: Whether there have been events
658

659
    """
660
    assert timeout >= 0
661
    have_events = self._notifier.check_events(timeout * 1000)
662
    if have_events:
663
      self._notifier.read_events()
664
    self._notifier.process_events()
665
    return have_events
666

    
667
  def Close(self):
668
    """Closes underlying notifier and its file descriptor.
669

670
    """
671
    self._notifier.stop()
672

    
673

    
674
class _JobChangesWaiter(object):
675
  def __init__(self, filename):
676
    """Initializes this class.
677

678
    @type filename: string
679
    @param filename: Path to job file
680

681
    """
682
    self._filewaiter = None
683
    self._filename = filename
684

    
685
  def Wait(self, timeout):
686
    """Waits for a job to change.
687

688
    @type timeout: float
689
    @param timeout: Timeout in seconds
690
    @return: Whether there have been events
691

692
    """
693
    if self._filewaiter:
694
      return self._filewaiter.Wait(timeout)
695

    
696
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
697
    # If this point is reached, return immediately and let caller check the job
698
    # file again in case there were changes since the last check. This avoids a
699
    # race condition.
700
    self._filewaiter = _JobFileChangesWaiter(self._filename)
701

    
702
    return True
703

    
704
  def Close(self):
705
    """Closes underlying waiter.
706

707
    """
708
    if self._filewaiter:
709
      self._filewaiter.Close()
710

    
711

    
712
class _WaitForJobChangesHelper(object):
713
  """Helper class using inotify to wait for changes in a job file.
714

715
  This class takes a previous job status and serial, and alerts the client when
716
  the current job status has changed.
717

718
  """
719
  @staticmethod
720
  def _CheckForChanges(counter, job_load_fn, check_fn):
721
    if counter.next() > 0:
722
      # If this isn't the first check the job is given some more time to change
723
      # again. This gives better performance for jobs generating many
724
      # changes/messages.
725
      time.sleep(0.1)
726

    
727
    job = job_load_fn()
728
    if not job:
729
      raise errors.JobLost()
730

    
731
    result = check_fn(job)
732
    if result is None:
733
      raise utils.RetryAgain()
734

    
735
    return result
736

    
737
  def __call__(self, filename, job_load_fn,
738
               fields, prev_job_info, prev_log_serial, timeout):
739
    """Waits for changes on a job.
740

741
    @type filename: string
742
    @param filename: File on which to wait for changes
743
    @type job_load_fn: callable
744
    @param job_load_fn: Function to load job
745
    @type fields: list of strings
746
    @param fields: Which fields to check for changes
747
    @type prev_job_info: list or None
748
    @param prev_job_info: Last job information returned
749
    @type prev_log_serial: int
750
    @param prev_log_serial: Last job message serial number
751
    @type timeout: float
752
    @param timeout: maximum time to wait in seconds
753

754
    """
755
    counter = itertools.count()
756
    try:
757
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
758
      waiter = _JobChangesWaiter(filename)
759
      try:
760
        return utils.Retry(compat.partial(self._CheckForChanges,
761
                                          counter, job_load_fn, check_fn),
762
                           utils.RETRY_REMAINING_TIME, timeout,
763
                           wait_fn=waiter.Wait)
764
      finally:
765
        waiter.Close()
766
    except (errors.InotifyError, errors.JobLost):
767
      return None
768
    except utils.RetryTimeout:
769
      return constants.JOB_NOTCHANGED
770

    
771

    
772
def _EncodeOpError(err):
773
  """Encodes an error which occurred while processing an opcode.
774

775
  """
776
  if isinstance(err, errors.GenericError):
777
    to_encode = err
778
  else:
779
    to_encode = errors.OpExecError(str(err))
780

    
781
  return errors.EncodeException(to_encode)
782

    
783

    
784
class _TimeoutStrategyWrapper:
785
  def __init__(self, fn):
786
    """Initializes this class.
787

788
    """
789
    self._fn = fn
790
    self._next = None
791

    
792
  def _Advance(self):
793
    """Gets the next timeout if necessary.
794

795
    """
796
    if self._next is None:
797
      self._next = self._fn()
798

    
799
  def Peek(self):
800
    """Returns the next timeout.
801

802
    """
803
    self._Advance()
804
    return self._next
805

    
806
  def Next(self):
807
    """Returns the current timeout and advances the internal state.
808

809
    """
810
    self._Advance()
811
    result = self._next
812
    self._next = None
813
    return result
814

    
815

    
816
class _OpExecContext:
817
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
818
    """Initializes this class.
819

820
    """
821
    self.op = op
822
    self.index = index
823
    self.log_prefix = log_prefix
824
    self.summary = op.input.Summary()
825

    
826
    # Create local copy to modify
827
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
828
      self.jobdeps = op.input.depends[:]
829
    else:
830
      self.jobdeps = None
831

    
832
    self._timeout_strategy_factory = timeout_strategy_factory
833
    self._ResetTimeoutStrategy()
834

    
835
  def _ResetTimeoutStrategy(self):
836
    """Creates a new timeout strategy.
837

838
    """
839
    self._timeout_strategy = \
840
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
841

    
842
  def CheckPriorityIncrease(self):
843
    """Checks whether priority can and should be increased.
844

845
    Called when locks couldn't be acquired.
846

847
    """
848
    op = self.op
849

    
850
    # Exhausted all retries and next round should not use blocking acquire
851
    # for locks?
852
    if (self._timeout_strategy.Peek() is None and
853
        op.priority > constants.OP_PRIO_HIGHEST):
854
      logging.debug("Increasing priority")
855
      op.priority -= 1
856
      self._ResetTimeoutStrategy()
857
      return True
858

    
859
    return False
860

    
861
  def GetNextLockTimeout(self):
862
    """Returns the next lock acquire timeout.
863

864
    """
865
    return self._timeout_strategy.Next()
866

    
867

    
868
class _JobProcessor(object):
869
  (DEFER,
870
   WAITDEP,
871
   FINISHED) = range(1, 4)
872

    
873
  def __init__(self, queue, opexec_fn, job,
874
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
875
    """Initializes this class.
876

877
    """
878
    self.queue = queue
879
    self.opexec_fn = opexec_fn
880
    self.job = job
881
    self._timeout_strategy_factory = _timeout_strategy_factory
882

    
883
  @staticmethod
884
  def _FindNextOpcode(job, timeout_strategy_factory):
885
    """Locates the next opcode to run.
886

887
    @type job: L{_QueuedJob}
888
    @param job: Job object
889
    @param timeout_strategy_factory: Callable to create new timeout strategy
890

891
    """
892
    # Create some sort of a cache to speed up locating next opcode for future
893
    # lookups
894
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
895
    # pending and one for processed ops.
896
    if job.ops_iter is None:
897
      job.ops_iter = enumerate(job.ops)
898

    
899
    # Find next opcode to run
900
    while True:
901
      try:
902
        (idx, op) = job.ops_iter.next()
903
      except StopIteration:
904
        raise errors.ProgrammerError("Called for a finished job")
905

    
906
      if op.status == constants.OP_STATUS_RUNNING:
907
        # Found an opcode already marked as running
908
        raise errors.ProgrammerError("Called for job marked as running")
909

    
910
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
911
                             timeout_strategy_factory)
912

    
913
      if op.status not in constants.OPS_FINALIZED:
914
        return opctx
915

    
916
      # This is a job that was partially completed before master daemon
917
      # shutdown, so it can be expected that some opcodes are already
918
      # completed successfully (if any did error out, then the whole job
919
      # should have been aborted and not resubmitted for processing).
920
      logging.info("%s: opcode %s already processed, skipping",
921
                   opctx.log_prefix, opctx.summary)
922

    
923
  @staticmethod
924
  def _MarkWaitlock(job, op):
925
    """Marks an opcode as waiting for locks.
926

927
    The job's start timestamp is also set if necessary.
928

929
    @type job: L{_QueuedJob}
930
    @param job: Job object
931
    @type op: L{_QueuedOpCode}
932
    @param op: Opcode object
933

934
    """
935
    assert op in job.ops
936
    assert op.status in (constants.OP_STATUS_QUEUED,
937
                         constants.OP_STATUS_WAITING)
938

    
939
    update = False
940

    
941
    op.result = None
942

    
943
    if op.status == constants.OP_STATUS_QUEUED:
944
      op.status = constants.OP_STATUS_WAITING
945
      update = True
946

    
947
    if op.start_timestamp is None:
948
      op.start_timestamp = TimeStampNow()
949
      update = True
950

    
951
    if job.start_timestamp is None:
952
      job.start_timestamp = op.start_timestamp
953
      update = True
954

    
955
    assert op.status == constants.OP_STATUS_WAITING
956

    
957
    return update
958

    
959
  @staticmethod
960
  def _CheckDependencies(queue, job, opctx):
961
    """Checks if an opcode has dependencies and if so, processes them.
962

963
    @type queue: L{JobQueue}
964
    @param queue: Queue object
965
    @type job: L{_QueuedJob}
966
    @param job: Job object
967
    @type opctx: L{_OpExecContext}
968
    @param opctx: Opcode execution context
969
    @rtype: bool
970
    @return: Whether opcode will be re-scheduled by dependency tracker
971

972
    """
973
    op = opctx.op
974

    
975
    result = False
976

    
977
    while opctx.jobdeps:
978
      (dep_job_id, dep_status) = opctx.jobdeps[0]
979

    
980
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
981
                                                          dep_status)
982
      assert ht.TNonEmptyString(depmsg), "No dependency message"
983

    
984
      logging.info("%s: %s", opctx.log_prefix, depmsg)
985

    
986
      if depresult == _JobDependencyManager.CONTINUE:
987
        # Remove dependency and continue
988
        opctx.jobdeps.pop(0)
989

    
990
      elif depresult == _JobDependencyManager.WAIT:
991
        # Need to wait for notification, dependency tracker will re-add job
992
        # to workerpool
993
        result = True
994
        break
995

    
996
      elif depresult == _JobDependencyManager.CANCEL:
997
        # Job was cancelled, cancel this job as well
998
        job.Cancel()
999
        assert op.status == constants.OP_STATUS_CANCELING
1000
        break
1001

    
1002
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1003
                         _JobDependencyManager.ERROR):
1004
        # Job failed or there was an error, this job must fail
1005
        op.status = constants.OP_STATUS_ERROR
1006
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1007
        break
1008

    
1009
      else:
1010
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1011
                                     depresult)
1012

    
1013
    return result
1014

    
1015
  def _ExecOpCodeUnlocked(self, opctx):
1016
    """Processes one opcode and returns the result.
1017

1018
    """
1019
    op = opctx.op
1020

    
1021
    assert op.status == constants.OP_STATUS_WAITING
1022

    
1023
    timeout = opctx.GetNextLockTimeout()
1024

    
1025
    try:
1026
      # Make sure not to hold queue lock while calling ExecOpCode
1027
      result = self.opexec_fn(op.input,
1028
                              _OpExecCallbacks(self.queue, self.job, op),
1029
                              timeout=timeout, priority=op.priority)
1030
    except mcpu.LockAcquireTimeout:
1031
      assert timeout is not None, "Received timeout for blocking acquire"
1032
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1033

    
1034
      assert op.status in (constants.OP_STATUS_WAITING,
1035
                           constants.OP_STATUS_CANCELING)
1036

    
1037
      # Was job cancelled while we were waiting for the lock?
1038
      if op.status == constants.OP_STATUS_CANCELING:
1039
        return (constants.OP_STATUS_CANCELING, None)
1040

    
1041
      # Stay in waitlock while trying to re-acquire lock
1042
      return (constants.OP_STATUS_WAITING, None)
1043
    except CancelJob:
1044
      logging.exception("%s: Canceling job", opctx.log_prefix)
1045
      assert op.status == constants.OP_STATUS_CANCELING
1046
      return (constants.OP_STATUS_CANCELING, None)
1047
    except Exception, err: # pylint: disable=W0703
1048
      logging.exception("%s: Caught exception in %s",
1049
                        opctx.log_prefix, opctx.summary)
1050
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1051
    else:
1052
      logging.debug("%s: %s successful",
1053
                    opctx.log_prefix, opctx.summary)
1054
      return (constants.OP_STATUS_SUCCESS, result)
1055

    
1056
  def __call__(self, _nextop_fn=None):
1057
    """Continues execution of a job.
1058

1059
    @param _nextop_fn: Callback function for tests
1060
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1061
      be deferred and C{WAITDEP} if the dependency manager
1062
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1063

1064
    """
1065
    queue = self.queue
1066
    job = self.job
1067

    
1068
    logging.debug("Processing job %s", job.id)
1069

    
1070
    queue.acquire(shared=1)
1071
    try:
1072
      opcount = len(job.ops)
1073

    
1074
      assert job.writable, "Expected writable job"
1075

    
1076
      # Don't do anything for finalized jobs
1077
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1078
        return self.FINISHED
1079

    
1080
      # Is a previous opcode still pending?
1081
      if job.cur_opctx:
1082
        opctx = job.cur_opctx
1083
        job.cur_opctx = None
1084
      else:
1085
        if __debug__ and _nextop_fn:
1086
          _nextop_fn()
1087
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1088

    
1089
      op = opctx.op
1090

    
1091
      # Consistency check
1092
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1093
                                     constants.OP_STATUS_CANCELING)
1094
                        for i in job.ops[opctx.index + 1:])
1095

    
1096
      assert op.status in (constants.OP_STATUS_QUEUED,
1097
                           constants.OP_STATUS_WAITING,
1098
                           constants.OP_STATUS_CANCELING)
1099

    
1100
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1101
              op.priority >= constants.OP_PRIO_HIGHEST)
1102

    
1103
      waitjob = None
1104

    
1105
      if op.status != constants.OP_STATUS_CANCELING:
1106
        assert op.status in (constants.OP_STATUS_QUEUED,
1107
                             constants.OP_STATUS_WAITING)
1108

    
1109
        # Prepare to start opcode
1110
        if self._MarkWaitlock(job, op):
1111
          # Write to disk
1112
          queue.UpdateJobUnlocked(job)
1113

    
1114
        assert op.status == constants.OP_STATUS_WAITING
1115
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1116
        assert job.start_timestamp and op.start_timestamp
1117
        assert waitjob is None
1118

    
1119
        # Check if waiting for a job is necessary
1120
        waitjob = self._CheckDependencies(queue, job, opctx)
1121

    
1122
        assert op.status in (constants.OP_STATUS_WAITING,
1123
                             constants.OP_STATUS_CANCELING,
1124
                             constants.OP_STATUS_ERROR)
1125

    
1126
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1127
                                         constants.OP_STATUS_ERROR)):
1128
          logging.info("%s: opcode %s waiting for locks",
1129
                       opctx.log_prefix, opctx.summary)
1130

    
1131
          assert not opctx.jobdeps, "Not all dependencies were removed"
1132

    
1133
          queue.release()
1134
          try:
1135
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1136
          finally:
1137
            queue.acquire(shared=1)
1138

    
1139
          op.status = op_status
1140
          op.result = op_result
1141

    
1142
          assert not waitjob
1143

    
1144
        if op.status == constants.OP_STATUS_WAITING:
1145
          # Couldn't get locks in time
1146
          assert not op.end_timestamp
1147
        else:
1148
          # Finalize opcode
1149
          op.end_timestamp = TimeStampNow()
1150

    
1151
          if op.status == constants.OP_STATUS_CANCELING:
1152
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1153
                                  for i in job.ops[opctx.index:])
1154
          else:
1155
            assert op.status in constants.OPS_FINALIZED
1156

    
1157
      if op.status == constants.OP_STATUS_WAITING or waitjob:
1158
        finalize = False
1159

    
1160
        if not waitjob and opctx.CheckPriorityIncrease():
1161
          # Priority was changed, need to update on-disk file
1162
          queue.UpdateJobUnlocked(job)
1163

    
1164
        # Keep around for another round
1165
        job.cur_opctx = opctx
1166

    
1167
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1168
                op.priority >= constants.OP_PRIO_HIGHEST)
1169

    
1170
        # In no case must the status be finalized here
1171
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1172

    
1173
      else:
1174
        # Ensure all opcodes so far have been successful
1175
        assert (opctx.index == 0 or
1176
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1177
                           for i in job.ops[:opctx.index]))
1178

    
1179
        # Reset context
1180
        job.cur_opctx = None
1181

    
1182
        if op.status == constants.OP_STATUS_SUCCESS:
1183
          finalize = False
1184

    
1185
        elif op.status == constants.OP_STATUS_ERROR:
1186
          # Ensure failed opcode has an exception as its result
1187
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1188

    
1189
          to_encode = errors.OpExecError("Preceding opcode failed")
1190
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1191
                                _EncodeOpError(to_encode))
1192
          finalize = True
1193

    
1194
          # Consistency check
1195
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1196
                            errors.GetEncodedError(i.result)
1197
                            for i in job.ops[opctx.index:])
1198

    
1199
        elif op.status == constants.OP_STATUS_CANCELING:
1200
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1201
                                "Job canceled by request")
1202
          finalize = True
1203

    
1204
        else:
1205
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1206

    
1207
        if opctx.index == (opcount - 1):
1208
          # Finalize on last opcode
1209
          finalize = True
1210

    
1211
        if finalize:
1212
          # All opcodes have been run, finalize job
1213
          job.Finalize()
1214

    
1215
        # Write to disk. If the job status is final, this is the final write
1216
        # allowed. Once the file has been written, it can be archived anytime.
1217
        queue.UpdateJobUnlocked(job)
1218

    
1219
        assert not waitjob
1220

    
1221
        if finalize:
1222
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1223
          return self.FINISHED
1224

    
1225
      assert not waitjob or queue.depmgr.JobWaiting(job)
1226

    
1227
      if waitjob:
1228
        return self.WAITDEP
1229
      else:
1230
        return self.DEFER
1231
    finally:
1232
      assert job.writable, "Job became read-only while being processed"
1233
      queue.release()
1234

    
1235

    
1236
def _EvaluateJobProcessorResult(depmgr, job, result):
1237
  """Looks at a result from L{_JobProcessor} for a job.
1238

1239
  To be used in a L{_JobQueueWorker}.
1240

1241
  """
1242
  if result == _JobProcessor.FINISHED:
1243
    # Notify waiting jobs
1244
    depmgr.NotifyWaiters(job.id)
1245

    
1246
  elif result == _JobProcessor.DEFER:
1247
    # Schedule again
1248
    raise workerpool.DeferTask(priority=job.CalcPriority())
1249

    
1250
  elif result == _JobProcessor.WAITDEP:
1251
    # No-op, dependency manager will re-schedule
1252
    pass
1253

    
1254
  else:
1255
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1256
                                 (result, ))
1257

    
1258

    
1259
class _JobQueueWorker(workerpool.BaseWorker):
1260
  """The actual job workers.
1261

1262
  """
1263
  def RunTask(self, job): # pylint: disable=W0221
1264
    """Job executor.
1265

1266
    @type job: L{_QueuedJob}
1267
    @param job: the job to be processed
1268

1269
    """
1270
    assert job.writable, "Expected writable job"
1271

    
1272
    # Ensure only one worker is active on a single job. If a job registers for
1273
    # a dependency job, and the other job notifies before the first worker is
1274
    # done, the job can end up in the tasklist more than once.
1275
    job.processor_lock.acquire()
1276
    try:
1277
      return self._RunTaskInner(job)
1278
    finally:
1279
      job.processor_lock.release()
1280

    
1281
  def _RunTaskInner(self, job):
1282
    """Executes a job.
1283

1284
    Must be called with per-job lock acquired.
1285

1286
    """
1287
    queue = job.queue
1288
    assert queue == self.pool.queue
1289

    
1290
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1291
    setname_fn(None)
1292

    
1293
    proc = mcpu.Processor(queue.context, job.id)
1294

    
1295
    # Create wrapper for setting thread name
1296
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1297
                                    proc.ExecOpCode)
1298

    
1299
    _EvaluateJobProcessorResult(queue.depmgr, job,
1300
                                _JobProcessor(queue, wrap_execop_fn, job)())
1301

    
1302
  @staticmethod
1303
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1304
    """Updates the worker thread name to include a short summary of the opcode.
1305

1306
    @param setname_fn: Callable setting worker thread name
1307
    @param execop_fn: Callable for executing opcode (usually
1308
                      L{mcpu.Processor.ExecOpCode})
1309

1310
    """
1311
    setname_fn(op)
1312
    try:
1313
      return execop_fn(op, *args, **kwargs)
1314
    finally:
1315
      setname_fn(None)
1316

    
1317
  @staticmethod
1318
  def _GetWorkerName(job, op):
1319
    """Sets the worker thread name.
1320

1321
    @type job: L{_QueuedJob}
1322
    @type op: L{opcodes.OpCode}
1323

1324
    """
1325
    parts = ["Job%s" % job.id]
1326

    
1327
    if op:
1328
      parts.append(op.TinySummary())
1329

    
1330
    return "/".join(parts)
1331

    
1332

    
1333
class _JobQueueWorkerPool(workerpool.WorkerPool):
1334
  """Simple class implementing a job-processing workerpool.
1335

1336
  """
1337
  def __init__(self, queue):
1338
    super(_JobQueueWorkerPool, self).__init__("Jq",
1339
                                              JOBQUEUE_THREADS,
1340
                                              _JobQueueWorker)
1341
    self.queue = queue
1342

    
1343

    
1344
class _JobDependencyManager:
1345
  """Keeps track of job dependencies.
1346

1347
  """
1348
  (WAIT,
1349
   ERROR,
1350
   CANCEL,
1351
   CONTINUE,
1352
   WRONGSTATUS) = range(1, 6)
1353

    
1354
  def __init__(self, getstatus_fn, enqueue_fn):
1355
    """Initializes this class.
1356

1357
    """
1358
    self._getstatus_fn = getstatus_fn
1359
    self._enqueue_fn = enqueue_fn
1360

    
1361
    self._waiters = {}
1362
    self._lock = locking.SharedLock("JobDepMgr")
1363

    
1364
  @locking.ssynchronized(_LOCK, shared=1)
1365
  def GetLockInfo(self, requested): # pylint: disable=W0613
1366
    """Retrieves information about waiting jobs.
1367

1368
    @type requested: set
1369
    @param requested: Requested information, see C{query.LQ_*}
1370

1371
    """
1372
    # No need to sort here, that's being done by the lock manager and query
1373
    # library. There are no priorities for notifying jobs, hence all show up as
1374
    # one item under "pending".
1375
    return [("job/%s" % job_id, None, None,
1376
             [("job", [job.id for job in waiters])])
1377
            for job_id, waiters in self._waiters.items()
1378
            if waiters]
1379

    
1380
  @locking.ssynchronized(_LOCK, shared=1)
1381
  def JobWaiting(self, job):
1382
    """Checks if a job is waiting.
1383

1384
    """
1385
    return compat.any(job in jobs
1386
                      for jobs in self._waiters.values())
1387

    
1388
  @locking.ssynchronized(_LOCK)
1389
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1390
    """Checks if a dependency job has the requested status.
1391

1392
    If the other job is not yet in a finalized status, the calling job will be
1393
    notified (re-added to the workerpool) at a later point.
1394

1395
    @type job: L{_QueuedJob}
1396
    @param job: Job object
1397
    @type dep_job_id: int
1398
    @param dep_job_id: ID of dependency job
1399
    @type dep_status: list
1400
    @param dep_status: Required status
1401

1402
    """
1403
    assert ht.TJobId(job.id)
1404
    assert ht.TJobId(dep_job_id)
1405
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1406

    
1407
    if job.id == dep_job_id:
1408
      return (self.ERROR, "Job can't depend on itself")
1409

    
1410
    # Get status of dependency job
1411
    try:
1412
      status = self._getstatus_fn(dep_job_id)
1413
    except errors.JobLost, err:
1414
      return (self.ERROR, "Dependency error: %s" % err)
1415

    
1416
    assert status in constants.JOB_STATUS_ALL
1417

    
1418
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1419

    
1420
    if status not in constants.JOBS_FINALIZED:
1421
      # Register for notification and wait for job to finish
1422
      job_id_waiters.add(job)
1423
      return (self.WAIT,
1424
              "Need to wait for job %s, wanted status '%s'" %
1425
              (dep_job_id, dep_status))
1426

    
1427
    # Remove from waiters list
1428
    if job in job_id_waiters:
1429
      job_id_waiters.remove(job)
1430

    
1431
    if (status == constants.JOB_STATUS_CANCELED and
1432
        constants.JOB_STATUS_CANCELED not in dep_status):
1433
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1434

    
1435
    elif not dep_status or status in dep_status:
1436
      return (self.CONTINUE,
1437
              "Dependency job %s finished with status '%s'" %
1438
              (dep_job_id, status))
1439

    
1440
    else:
1441
      return (self.WRONGSTATUS,
1442
              "Dependency job %s finished with status '%s',"
1443
              " not one of '%s' as required" %
1444
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1445

    
1446
  def _RemoveEmptyWaitersUnlocked(self):
1447
    """Remove all jobs without actual waiters.
1448

1449
    """
1450
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1451
                   if not waiters]:
1452
      del self._waiters[job_id]
1453

    
1454
  def NotifyWaiters(self, job_id):
1455
    """Notifies all jobs waiting for a certain job ID.
1456

1457
    @attention: Do not call until L{CheckAndRegister} returned a status other
1458
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1459
    @type job_id: int
1460
    @param job_id: Job ID
1461

1462
    """
1463
    assert ht.TJobId(job_id)
1464

    
1465
    self._lock.acquire()
1466
    try:
1467
      self._RemoveEmptyWaitersUnlocked()
1468

    
1469
      jobs = self._waiters.pop(job_id, None)
1470
    finally:
1471
      self._lock.release()
1472

    
1473
    if jobs:
1474
      # Re-add jobs to workerpool
1475
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1476
                    len(jobs), job_id)
1477
      self._enqueue_fn(jobs)
1478

    
1479

    
1480
def _RequireOpenQueue(fn):
1481
  """Decorator for "public" functions.
1482

1483
  This function should be used for all 'public' functions. That is,
1484
  functions usually called from other classes. Note that this should
1485
  be applied only to methods (not plain functions), since it expects
1486
  that the decorated function is called with a first argument that has
1487
  a '_queue_filelock' argument.
1488

1489
  @warning: Use this decorator only after locking.ssynchronized
1490

1491
  Example::
1492
    @locking.ssynchronized(_LOCK)
1493
    @_RequireOpenQueue
1494
    def Example(self):
1495
      pass
1496

1497
  """
1498
  def wrapper(self, *args, **kwargs):
1499
    # pylint: disable=W0212
1500
    assert self._queue_filelock is not None, "Queue should be open"
1501
    return fn(self, *args, **kwargs)
1502
  return wrapper
1503

    
1504

    
1505
def _RequireNonDrainedQueue(fn):
1506
  """Decorator checking for a non-drained queue.
1507

1508
  To be used with functions submitting new jobs.
1509

1510
  """
1511
  def wrapper(self, *args, **kwargs):
1512
    """Wrapper function.
1513

1514
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1515

1516
    """
1517
    # Ok when sharing the big job queue lock, as the drain file is created when
1518
    # the lock is exclusive.
1519
    # Needs access to protected member, pylint: disable=W0212
1520
    if self._drained:
1521
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1522

    
1523
    if not self._accepting_jobs:
1524
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1525

    
1526
    return fn(self, *args, **kwargs)
1527
  return wrapper
1528

    
1529

    
1530
class JobQueue(object):
1531
  """Queue used to manage the jobs.
1532

1533
  """
1534
  def __init__(self, context):
1535
    """Constructor for JobQueue.
1536

1537
    The constructor will initialize the job queue object and then
1538
    start loading the current jobs from disk, either for starting them
1539
    (if they were queue) or for aborting them (if they were already
1540
    running).
1541

1542
    @type context: GanetiContext
1543
    @param context: the context object for access to the configuration
1544
        data and other ganeti objects
1545

1546
    """
1547
    self.context = context
1548
    self._memcache = weakref.WeakValueDictionary()
1549
    self._my_hostname = netutils.Hostname.GetSysName()
1550

    
1551
    # The Big JobQueue lock. If a code block or method acquires it in shared
1552
    # mode safe it must guarantee concurrency with all the code acquiring it in
1553
    # shared mode, including itself. In order not to acquire it at all
1554
    # concurrency must be guaranteed with all code acquiring it in shared mode
1555
    # and all code acquiring it exclusively.
1556
    self._lock = locking.SharedLock("JobQueue")
1557

    
1558
    self.acquire = self._lock.acquire
1559
    self.release = self._lock.release
1560

    
1561
    # Accept jobs by default
1562
    self._accepting_jobs = True
1563

    
1564
    # Initialize the queue, and acquire the filelock.
1565
    # This ensures no other process is working on the job queue.
1566
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1567

    
1568
    # Read serial file
1569
    self._last_serial = jstore.ReadSerial()
1570
    assert self._last_serial is not None, ("Serial file was modified between"
1571
                                           " check in jstore and here")
1572

    
1573
    # Get initial list of nodes
1574
    self._nodes = dict((n.name, n.primary_ip)
1575
                       for n in self.context.cfg.GetAllNodesInfo().values()
1576
                       if n.master_candidate)
1577

    
1578
    # Remove master node
1579
    self._nodes.pop(self._my_hostname, None)
1580

    
1581
    # TODO: Check consistency across nodes
1582

    
1583
    self._queue_size = None
1584
    self._UpdateQueueSizeUnlocked()
1585
    assert ht.TInt(self._queue_size)
1586
    self._drained = jstore.CheckDrainFlag()
1587

    
1588
    # Job dependencies
1589
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1590
                                        self._EnqueueJobs)
1591
    self.context.glm.AddToLockMonitor(self.depmgr)
1592

    
1593
    # Setup worker pool
1594
    self._wpool = _JobQueueWorkerPool(self)
1595
    try:
1596
      self._InspectQueue()
1597
    except:
1598
      self._wpool.TerminateWorkers()
1599
      raise
1600

    
1601
  @locking.ssynchronized(_LOCK)
1602
  @_RequireOpenQueue
1603
  def _InspectQueue(self):
1604
    """Loads the whole job queue and resumes unfinished jobs.
1605

1606
    This function needs the lock here because WorkerPool.AddTask() may start a
1607
    job while we're still doing our work.
1608

1609
    """
1610
    logging.info("Inspecting job queue")
1611

    
1612
    restartjobs = []
1613

    
1614
    all_job_ids = self._GetJobIDsUnlocked()
1615
    jobs_count = len(all_job_ids)
1616
    lastinfo = time.time()
1617
    for idx, job_id in enumerate(all_job_ids):
1618
      # Give an update every 1000 jobs or 10 seconds
1619
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1620
          idx == (jobs_count - 1)):
1621
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1622
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1623
        lastinfo = time.time()
1624

    
1625
      job = self._LoadJobUnlocked(job_id)
1626

    
1627
      # a failure in loading the job can cause 'None' to be returned
1628
      if job is None:
1629
        continue
1630

    
1631
      status = job.CalcStatus()
1632

    
1633
      if status == constants.JOB_STATUS_QUEUED:
1634
        restartjobs.append(job)
1635

    
1636
      elif status in (constants.JOB_STATUS_RUNNING,
1637
                      constants.JOB_STATUS_WAITING,
1638
                      constants.JOB_STATUS_CANCELING):
1639
        logging.warning("Unfinished job %s found: %s", job.id, job)
1640

    
1641
        if status == constants.JOB_STATUS_WAITING:
1642
          # Restart job
1643
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1644
          restartjobs.append(job)
1645
        else:
1646
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1647
                                "Unclean master daemon shutdown")
1648
          job.Finalize()
1649

    
1650
        self.UpdateJobUnlocked(job)
1651

    
1652
    if restartjobs:
1653
      logging.info("Restarting %s jobs", len(restartjobs))
1654
      self._EnqueueJobsUnlocked(restartjobs)
1655

    
1656
    logging.info("Job queue inspection finished")
1657

    
1658
  def _GetRpc(self, address_list):
1659
    """Gets RPC runner with context.
1660

1661
    """
1662
    return rpc.JobQueueRunner(self.context, address_list)
1663

    
1664
  @locking.ssynchronized(_LOCK)
1665
  @_RequireOpenQueue
1666
  def AddNode(self, node):
1667
    """Register a new node with the queue.
1668

1669
    @type node: L{objects.Node}
1670
    @param node: the node object to be added
1671

1672
    """
1673
    node_name = node.name
1674
    assert node_name != self._my_hostname
1675

    
1676
    # Clean queue directory on added node
1677
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1678
    msg = result.fail_msg
1679
    if msg:
1680
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1681
                      node_name, msg)
1682

    
1683
    if not node.master_candidate:
1684
      # remove if existing, ignoring errors
1685
      self._nodes.pop(node_name, None)
1686
      # and skip the replication of the job ids
1687
      return
1688

    
1689
    # Upload the whole queue excluding archived jobs
1690
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1691

    
1692
    # Upload current serial file
1693
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1694

    
1695
    # Static address list
1696
    addrs = [node.primary_ip]
1697

    
1698
    for file_name in files:
1699
      # Read file content
1700
      content = utils.ReadFile(file_name)
1701

    
1702
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1703
                             file_name, content)
1704
      msg = result[node_name].fail_msg
1705
      if msg:
1706
        logging.error("Failed to upload file %s to node %s: %s",
1707
                      file_name, node_name, msg)
1708

    
1709
    self._nodes[node_name] = node.primary_ip
1710

    
1711
  @locking.ssynchronized(_LOCK)
1712
  @_RequireOpenQueue
1713
  def RemoveNode(self, node_name):
1714
    """Callback called when removing nodes from the cluster.
1715

1716
    @type node_name: str
1717
    @param node_name: the name of the node to remove
1718

1719
    """
1720
    self._nodes.pop(node_name, None)
1721

    
1722
  @staticmethod
1723
  def _CheckRpcResult(result, nodes, failmsg):
1724
    """Verifies the status of an RPC call.
1725

1726
    Since we aim to keep consistency should this node (the current
1727
    master) fail, we will log errors if our rpc fail, and especially
1728
    log the case when more than half of the nodes fails.
1729

1730
    @param result: the data as returned from the rpc call
1731
    @type nodes: list
1732
    @param nodes: the list of nodes we made the call to
1733
    @type failmsg: str
1734
    @param failmsg: the identifier to be used for logging
1735

1736
    """
1737
    failed = []
1738
    success = []
1739

    
1740
    for node in nodes:
1741
      msg = result[node].fail_msg
1742
      if msg:
1743
        failed.append(node)
1744
        logging.error("RPC call %s (%s) failed on node %s: %s",
1745
                      result[node].call, failmsg, node, msg)
1746
      else:
1747
        success.append(node)
1748

    
1749
    # +1 for the master node
1750
    if (len(success) + 1) < len(failed):
1751
      # TODO: Handle failing nodes
1752
      logging.error("More than half of the nodes failed")
1753

    
1754
  def _GetNodeIp(self):
1755
    """Helper for returning the node name/ip list.
1756

1757
    @rtype: (list, list)
1758
    @return: a tuple of two lists, the first one with the node
1759
        names and the second one with the node addresses
1760

1761
    """
1762
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1763
    name_list = self._nodes.keys()
1764
    addr_list = [self._nodes[name] for name in name_list]
1765
    return name_list, addr_list
1766

    
1767
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1768
    """Writes a file locally and then replicates it to all nodes.
1769

1770
    This function will replace the contents of a file on the local
1771
    node and then replicate it to all the other nodes we have.
1772

1773
    @type file_name: str
1774
    @param file_name: the path of the file to be replicated
1775
    @type data: str
1776
    @param data: the new contents of the file
1777
    @type replicate: boolean
1778
    @param replicate: whether to spread the changes to the remote nodes
1779

1780
    """
1781
    getents = runtime.GetEnts()
1782
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1783
                    gid=getents.masterd_gid)
1784

    
1785
    if replicate:
1786
      names, addrs = self._GetNodeIp()
1787
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1788
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1789

    
1790
  def _RenameFilesUnlocked(self, rename):
1791
    """Renames a file locally and then replicate the change.
1792

1793
    This function will rename a file in the local queue directory
1794
    and then replicate this rename to all the other nodes we have.
1795

1796
    @type rename: list of (old, new)
1797
    @param rename: List containing tuples mapping old to new names
1798

1799
    """
1800
    # Rename them locally
1801
    for old, new in rename:
1802
      utils.RenameFile(old, new, mkdir=True)
1803

    
1804
    # ... and on all nodes
1805
    names, addrs = self._GetNodeIp()
1806
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1807
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1808

    
1809
  def _NewSerialsUnlocked(self, count):
1810
    """Generates a new job identifier.
1811

1812
    Job identifiers are unique during the lifetime of a cluster.
1813

1814
    @type count: integer
1815
    @param count: how many serials to return
1816
    @rtype: list of int
1817
    @return: a list of job identifiers.
1818

1819
    """
1820
    assert ht.TPositiveInt(count)
1821

    
1822
    # New number
1823
    serial = self._last_serial + count
1824

    
1825
    # Write to file
1826
    self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1827
                             "%s\n" % serial, True)
1828

    
1829
    result = [jstore.FormatJobID(v)
1830
              for v in range(self._last_serial + 1, serial + 1)]
1831

    
1832
    # Keep it only if we were able to write the file
1833
    self._last_serial = serial
1834

    
1835
    assert len(result) == count
1836

    
1837
    return result
1838

    
1839
  @staticmethod
1840
  def _GetJobPath(job_id):
1841
    """Returns the job file for a given job id.
1842

1843
    @type job_id: str
1844
    @param job_id: the job identifier
1845
    @rtype: str
1846
    @return: the path to the job file
1847

1848
    """
1849
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1850

    
1851
  @staticmethod
1852
  def _GetArchivedJobPath(job_id):
1853
    """Returns the archived job file for a give job id.
1854

1855
    @type job_id: str
1856
    @param job_id: the job identifier
1857
    @rtype: str
1858
    @return: the path to the archived job file
1859

1860
    """
1861
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1862
                          jstore.GetArchiveDirectory(job_id),
1863
                          "job-%s" % job_id)
1864

    
1865
  @staticmethod
1866
  def _GetJobIDsUnlocked(sort=True):
1867
    """Return all known job IDs.
1868

1869
    The method only looks at disk because it's a requirement that all
1870
    jobs are present on disk (so in the _memcache we don't have any
1871
    extra IDs).
1872

1873
    @type sort: boolean
1874
    @param sort: perform sorting on the returned job ids
1875
    @rtype: list
1876
    @return: the list of job IDs
1877

1878
    """
1879
    jlist = []
1880
    for filename in utils.ListVisibleFiles(pathutils.QUEUE_DIR):
1881
      m = constants.JOB_FILE_RE.match(filename)
1882
      if m:
1883
        jlist.append(int(m.group(1)))
1884
    if sort:
1885
      jlist.sort()
1886
    return jlist
1887

    
1888
  def _LoadJobUnlocked(self, job_id):
1889
    """Loads a job from the disk or memory.
1890

1891
    Given a job id, this will return the cached job object if
1892
    existing, or try to load the job from the disk. If loading from
1893
    disk, it will also add the job to the cache.
1894

1895
    @type job_id: int
1896
    @param job_id: the job id
1897
    @rtype: L{_QueuedJob} or None
1898
    @return: either None or the job object
1899

1900
    """
1901
    job = self._memcache.get(job_id, None)
1902
    if job:
1903
      logging.debug("Found job %s in memcache", job_id)
1904
      assert job.writable, "Found read-only job in memcache"
1905
      return job
1906

    
1907
    try:
1908
      job = self._LoadJobFromDisk(job_id, False)
1909
      if job is None:
1910
        return job
1911
    except errors.JobFileCorrupted:
1912
      old_path = self._GetJobPath(job_id)
1913
      new_path = self._GetArchivedJobPath(job_id)
1914
      if old_path == new_path:
1915
        # job already archived (future case)
1916
        logging.exception("Can't parse job %s", job_id)
1917
      else:
1918
        # non-archived case
1919
        logging.exception("Can't parse job %s, will archive.", job_id)
1920
        self._RenameFilesUnlocked([(old_path, new_path)])
1921
      return None
1922

    
1923
    assert job.writable, "Job just loaded is not writable"
1924

    
1925
    self._memcache[job_id] = job
1926
    logging.debug("Added job %s to the cache", job_id)
1927
    return job
1928

    
1929
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1930
    """Load the given job file from disk.
1931

1932
    Given a job file, read, load and restore it in a _QueuedJob format.
1933

1934
    @type job_id: int
1935
    @param job_id: job identifier
1936
    @type try_archived: bool
1937
    @param try_archived: Whether to try loading an archived job
1938
    @rtype: L{_QueuedJob} or None
1939
    @return: either None or the job object
1940

1941
    """
1942
    path_functions = [(self._GetJobPath, True)]
1943

    
1944
    if try_archived:
1945
      path_functions.append((self._GetArchivedJobPath, False))
1946

    
1947
    raw_data = None
1948
    writable_default = None
1949

    
1950
    for (fn, writable_default) in path_functions:
1951
      filepath = fn(job_id)
1952
      logging.debug("Loading job from %s", filepath)
1953
      try:
1954
        raw_data = utils.ReadFile(filepath)
1955
      except EnvironmentError, err:
1956
        if err.errno != errno.ENOENT:
1957
          raise
1958
      else:
1959
        break
1960

    
1961
    if not raw_data:
1962
      return None
1963

    
1964
    if writable is None:
1965
      writable = writable_default
1966

    
1967
    try:
1968
      data = serializer.LoadJson(raw_data)
1969
      job = _QueuedJob.Restore(self, data, writable)
1970
    except Exception, err: # pylint: disable=W0703
1971
      raise errors.JobFileCorrupted(err)
1972

    
1973
    return job
1974

    
1975
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1976
    """Load the given job file from disk.
1977

1978
    Given a job file, read, load and restore it in a _QueuedJob format.
1979
    In case of error reading the job, it gets returned as None, and the
1980
    exception is logged.
1981

1982
    @type job_id: int
1983
    @param job_id: job identifier
1984
    @type try_archived: bool
1985
    @param try_archived: Whether to try loading an archived job
1986
    @rtype: L{_QueuedJob} or None
1987
    @return: either None or the job object
1988

1989
    """
1990
    try:
1991
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
1992
    except (errors.JobFileCorrupted, EnvironmentError):
1993
      logging.exception("Can't load/parse job %s", job_id)
1994
      return None
1995

    
1996
  def _UpdateQueueSizeUnlocked(self):
1997
    """Update the queue size.
1998

1999
    """
2000
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2001

    
2002
  @locking.ssynchronized(_LOCK)
2003
  @_RequireOpenQueue
2004
  def SetDrainFlag(self, drain_flag):
2005
    """Sets the drain flag for the queue.
2006

2007
    @type drain_flag: boolean
2008
    @param drain_flag: Whether to set or unset the drain flag
2009

2010
    """
2011
    jstore.SetDrainFlag(drain_flag)
2012

    
2013
    self._drained = drain_flag
2014

    
2015
    return True
2016

    
2017
  @_RequireOpenQueue
2018
  def _SubmitJobUnlocked(self, job_id, ops):
2019
    """Create and store a new job.
2020

2021
    This enters the job into our job queue and also puts it on the new
2022
    queue, in order for it to be picked up by the queue processors.
2023

2024
    @type job_id: job ID
2025
    @param job_id: the job ID for the new job
2026
    @type ops: list
2027
    @param ops: The list of OpCodes that will become the new job.
2028
    @rtype: L{_QueuedJob}
2029
    @return: the job object to be queued
2030
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2031
    @raise errors.GenericError: If an opcode is not valid
2032

2033
    """
2034
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2035
      raise errors.JobQueueFull()
2036

    
2037
    job = _QueuedJob(self, job_id, ops, True)
2038

    
2039
    # Check priority
2040
    for idx, op in enumerate(job.ops):
2041
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2042
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2043
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2044
                                  " are %s" % (idx, op.priority, allowed))
2045

    
2046
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2047
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2048
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2049
                                  " match %s: %s" %
2050
                                  (idx, opcodes.TNoRelativeJobDependencies,
2051
                                   dependencies))
2052

    
2053
    # Write to disk
2054
    self.UpdateJobUnlocked(job)
2055

    
2056
    self._queue_size += 1
2057

    
2058
    logging.debug("Adding new job %s to the cache", job_id)
2059
    self._memcache[job_id] = job
2060

    
2061
    return job
2062

    
2063
  @locking.ssynchronized(_LOCK)
2064
  @_RequireOpenQueue
2065
  @_RequireNonDrainedQueue
2066
  def SubmitJob(self, ops):
2067
    """Create and store a new job.
2068

2069
    @see: L{_SubmitJobUnlocked}
2070

2071
    """
2072
    (job_id, ) = self._NewSerialsUnlocked(1)
2073
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2074
    return job_id
2075

    
2076
  @locking.ssynchronized(_LOCK)
2077
  @_RequireOpenQueue
2078
  @_RequireNonDrainedQueue
2079
  def SubmitManyJobs(self, jobs):
2080
    """Create and store multiple jobs.
2081

2082
    @see: L{_SubmitJobUnlocked}
2083

2084
    """
2085
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2086

    
2087
    (results, added_jobs) = \
2088
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2089

    
2090
    self._EnqueueJobsUnlocked(added_jobs)
2091

    
2092
    return results
2093

    
2094
  @staticmethod
2095
  def _FormatSubmitError(msg, ops):
2096
    """Formats errors which occurred while submitting a job.
2097

2098
    """
2099
    return ("%s; opcodes %s" %
2100
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2101

    
2102
  @staticmethod
2103
  def _ResolveJobDependencies(resolve_fn, deps):
2104
    """Resolves relative job IDs in dependencies.
2105

2106
    @type resolve_fn: callable
2107
    @param resolve_fn: Function to resolve a relative job ID
2108
    @type deps: list
2109
    @param deps: Dependencies
2110
    @rtype: list
2111
    @return: Resolved dependencies
2112

2113
    """
2114
    result = []
2115

    
2116
    for (dep_job_id, dep_status) in deps:
2117
      if ht.TRelativeJobId(dep_job_id):
2118
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2119
        try:
2120
          job_id = resolve_fn(dep_job_id)
2121
        except IndexError:
2122
          # Abort
2123
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2124
      else:
2125
        job_id = dep_job_id
2126

    
2127
      result.append((job_id, dep_status))
2128

    
2129
    return (True, result)
2130

    
2131
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2132
    """Create and store multiple jobs.
2133

2134
    @see: L{_SubmitJobUnlocked}
2135

2136
    """
2137
    results = []
2138
    added_jobs = []
2139

    
2140
    def resolve_fn(job_idx, reljobid):
2141
      assert reljobid < 0
2142
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2143

    
2144
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2145
      for op in ops:
2146
        if getattr(op, opcodes.DEPEND_ATTR, None):
2147
          (status, data) = \
2148
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2149
                                         op.depends)
2150
          if not status:
2151
            # Abort resolving dependencies
2152
            assert ht.TNonEmptyString(data), "No error message"
2153
            break
2154
          # Use resolved dependencies
2155
          op.depends = data
2156
      else:
2157
        try:
2158
          job = self._SubmitJobUnlocked(job_id, ops)
2159
        except errors.GenericError, err:
2160
          status = False
2161
          data = self._FormatSubmitError(str(err), ops)
2162
        else:
2163
          status = True
2164
          data = job_id
2165
          added_jobs.append(job)
2166

    
2167
      results.append((status, data))
2168

    
2169
    return (results, added_jobs)
2170

    
2171
  @locking.ssynchronized(_LOCK)
2172
  def _EnqueueJobs(self, jobs):
2173
    """Helper function to add jobs to worker pool's queue.
2174

2175
    @type jobs: list
2176
    @param jobs: List of all jobs
2177

2178
    """
2179
    return self._EnqueueJobsUnlocked(jobs)
2180

    
2181
  def _EnqueueJobsUnlocked(self, jobs):
2182
    """Helper function to add jobs to worker pool's queue.
2183

2184
    @type jobs: list
2185
    @param jobs: List of all jobs
2186

2187
    """
2188
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2189
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2190
                             priority=[job.CalcPriority() for job in jobs])
2191

    
2192
  def _GetJobStatusForDependencies(self, job_id):
2193
    """Gets the status of a job for dependencies.
2194

2195
    @type job_id: int
2196
    @param job_id: Job ID
2197
    @raise errors.JobLost: If job can't be found
2198

2199
    """
2200
    # Not using in-memory cache as doing so would require an exclusive lock
2201

    
2202
    # Try to load from disk
2203
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2204

    
2205
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2206

    
2207
    if job:
2208
      return job.CalcStatus()
2209

    
2210
    raise errors.JobLost("Job %s not found" % job_id)
2211

    
2212
  @_RequireOpenQueue
2213
  def UpdateJobUnlocked(self, job, replicate=True):
2214
    """Update a job's on disk storage.
2215

2216
    After a job has been modified, this function needs to be called in
2217
    order to write the changes to disk and replicate them to the other
2218
    nodes.
2219

2220
    @type job: L{_QueuedJob}
2221
    @param job: the changed job
2222
    @type replicate: boolean
2223
    @param replicate: whether to replicate the change to remote nodes
2224

2225
    """
2226
    if __debug__:
2227
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2228
      assert (finalized ^ (job.end_timestamp is None))
2229
      assert job.writable, "Can't update read-only job"
2230

    
2231
    filename = self._GetJobPath(job.id)
2232
    data = serializer.DumpJson(job.Serialize())
2233
    logging.debug("Writing job %s to %s", job.id, filename)
2234
    self._UpdateJobQueueFile(filename, data, replicate)
2235

    
2236
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2237
                        timeout):
2238
    """Waits for changes in a job.
2239

2240
    @type job_id: int
2241
    @param job_id: Job identifier
2242
    @type fields: list of strings
2243
    @param fields: Which fields to check for changes
2244
    @type prev_job_info: list or None
2245
    @param prev_job_info: Last job information returned
2246
    @type prev_log_serial: int
2247
    @param prev_log_serial: Last job message serial number
2248
    @type timeout: float
2249
    @param timeout: maximum time to wait in seconds
2250
    @rtype: tuple (job info, log entries)
2251
    @return: a tuple of the job information as required via
2252
        the fields parameter, and the log entries as a list
2253

2254
        if the job has not changed and the timeout has expired,
2255
        we instead return a special value,
2256
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2257
        as such by the clients
2258

2259
    """
2260
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2261
                             writable=False)
2262

    
2263
    helper = _WaitForJobChangesHelper()
2264

    
2265
    return helper(self._GetJobPath(job_id), load_fn,
2266
                  fields, prev_job_info, prev_log_serial, timeout)
2267

    
2268
  @locking.ssynchronized(_LOCK)
2269
  @_RequireOpenQueue
2270
  def CancelJob(self, job_id):
2271
    """Cancels a job.
2272

2273
    This will only succeed if the job has not started yet.
2274

2275
    @type job_id: int
2276
    @param job_id: job ID of job to be cancelled.
2277

2278
    """
2279
    logging.info("Cancelling job %s", job_id)
2280

    
2281
    job = self._LoadJobUnlocked(job_id)
2282
    if not job:
2283
      logging.debug("Job %s not found", job_id)
2284
      return (False, "Job %s not found" % job_id)
2285

    
2286
    assert job.writable, "Can't cancel read-only job"
2287

    
2288
    (success, msg) = job.Cancel()
2289

    
2290
    if success:
2291
      # If the job was finalized (e.g. cancelled), this is the final write
2292
      # allowed. The job can be archived anytime.
2293
      self.UpdateJobUnlocked(job)
2294

    
2295
    return (success, msg)
2296

    
2297
  @_RequireOpenQueue
2298
  def _ArchiveJobsUnlocked(self, jobs):
2299
    """Archives jobs.
2300

2301
    @type jobs: list of L{_QueuedJob}
2302
    @param jobs: Job objects
2303
    @rtype: int
2304
    @return: Number of archived jobs
2305

2306
    """
2307
    archive_jobs = []
2308
    rename_files = []
2309
    for job in jobs:
2310
      assert job.writable, "Can't archive read-only job"
2311

    
2312
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2313
        logging.debug("Job %s is not yet done", job.id)
2314
        continue
2315

    
2316
      archive_jobs.append(job)
2317

    
2318
      old = self._GetJobPath(job.id)
2319
      new = self._GetArchivedJobPath(job.id)
2320
      rename_files.append((old, new))
2321

    
2322
    # TODO: What if 1..n files fail to rename?
2323
    self._RenameFilesUnlocked(rename_files)
2324

    
2325
    logging.debug("Successfully archived job(s) %s",
2326
                  utils.CommaJoin(job.id for job in archive_jobs))
2327

    
2328
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2329
    # the files, we update the cached queue size from the filesystem. When we
2330
    # get around to fix the TODO: above, we can use the number of actually
2331
    # archived jobs to fix this.
2332
    self._UpdateQueueSizeUnlocked()
2333
    return len(archive_jobs)
2334

    
2335
  @locking.ssynchronized(_LOCK)
2336
  @_RequireOpenQueue
2337
  def ArchiveJob(self, job_id):
2338
    """Archives a job.
2339

2340
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2341

2342
    @type job_id: int
2343
    @param job_id: Job ID of job to be archived.
2344
    @rtype: bool
2345
    @return: Whether job was archived
2346

2347
    """
2348
    logging.info("Archiving job %s", job_id)
2349

    
2350
    job = self._LoadJobUnlocked(job_id)
2351
    if not job:
2352
      logging.debug("Job %s not found", job_id)
2353
      return False
2354

    
2355
    return self._ArchiveJobsUnlocked([job]) == 1
2356

    
2357
  @locking.ssynchronized(_LOCK)
2358
  @_RequireOpenQueue
2359
  def AutoArchiveJobs(self, age, timeout):
2360
    """Archives all jobs based on age.
2361

2362
    The method will archive all jobs which are older than the age
2363
    parameter. For jobs that don't have an end timestamp, the start
2364
    timestamp will be considered. The special '-1' age will cause
2365
    archival of all jobs (that are not running or queued).
2366

2367
    @type age: int
2368
    @param age: the minimum age in seconds
2369

2370
    """
2371
    logging.info("Archiving jobs with age more than %s seconds", age)
2372

    
2373
    now = time.time()
2374
    end_time = now + timeout
2375
    archived_count = 0
2376
    last_touched = 0
2377

    
2378
    all_job_ids = self._GetJobIDsUnlocked()
2379
    pending = []
2380
    for idx, job_id in enumerate(all_job_ids):
2381
      last_touched = idx + 1
2382

    
2383
      # Not optimal because jobs could be pending
2384
      # TODO: Measure average duration for job archival and take number of
2385
      # pending jobs into account.
2386
      if time.time() > end_time:
2387
        break
2388

    
2389
      # Returns None if the job failed to load
2390
      job = self._LoadJobUnlocked(job_id)
2391
      if job:
2392
        if job.end_timestamp is None:
2393
          if job.start_timestamp is None:
2394
            job_age = job.received_timestamp
2395
          else:
2396
            job_age = job.start_timestamp
2397
        else:
2398
          job_age = job.end_timestamp
2399

    
2400
        if age == -1 or now - job_age[0] > age:
2401
          pending.append(job)
2402

    
2403
          # Archive 10 jobs at a time
2404
          if len(pending) >= 10:
2405
            archived_count += self._ArchiveJobsUnlocked(pending)
2406
            pending = []
2407

    
2408
    if pending:
2409
      archived_count += self._ArchiveJobsUnlocked(pending)
2410

    
2411
    return (archived_count, len(all_job_ids) - last_touched)
2412

    
2413
  def _Query(self, fields, qfilter):
2414
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2415
                       namefield="id")
2416

    
2417
    job_ids = qobj.RequestedNames()
2418

    
2419
    list_all = (job_ids is None)
2420

    
2421
    if list_all:
2422
      # Since files are added to/removed from the queue atomically, there's no
2423
      # risk of getting the job ids in an inconsistent state.
2424
      job_ids = self._GetJobIDsUnlocked()
2425

    
2426
    jobs = []
2427

    
2428
    for job_id in job_ids:
2429
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2430
      if job is not None or not list_all:
2431
        jobs.append((job_id, job))
2432

    
2433
    return (qobj, jobs, list_all)
2434

    
2435
  def QueryJobs(self, fields, qfilter):
2436
    """Returns a list of jobs in queue.
2437

2438
    @type fields: sequence
2439
    @param fields: List of wanted fields
2440
    @type qfilter: None or query2 filter (list)
2441
    @param qfilter: Query filter
2442

2443
    """
2444
    (qobj, ctx, _) = self._Query(fields, qfilter)
2445

    
2446
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2447

    
2448
  def OldStyleQueryJobs(self, job_ids, fields):
2449
    """Returns a list of jobs in queue.
2450

2451
    @type job_ids: list
2452
    @param job_ids: sequence of job identifiers or None for all
2453
    @type fields: list
2454
    @param fields: names of fields to return
2455
    @rtype: list
2456
    @return: list one element per job, each element being list with
2457
        the requested fields
2458

2459
    """
2460
    # backwards compat:
2461
    job_ids = [int(jid) for jid in job_ids]
2462
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2463

    
2464
    (qobj, ctx, _) = self._Query(fields, qfilter)
2465

    
2466
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2467

    
2468
  @locking.ssynchronized(_LOCK)
2469
  def PrepareShutdown(self):
2470
    """Prepare to stop the job queue.
2471

2472
    Disables execution of jobs in the workerpool and returns whether there are
2473
    any jobs currently running. If the latter is the case, the job queue is not
2474
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2475
    be called without interfering with any job. Queued and unfinished jobs will
2476
    be resumed next time.
2477

2478
    Once this function has been called no new job submissions will be accepted
2479
    (see L{_RequireNonDrainedQueue}).
2480

2481
    @rtype: bool
2482
    @return: Whether there are any running jobs
2483

2484
    """
2485
    if self._accepting_jobs:
2486
      self._accepting_jobs = False
2487

    
2488
      # Tell worker pool to stop processing pending tasks
2489
      self._wpool.SetActive(False)
2490

    
2491
    return self._wpool.HasRunningTasks()
2492

    
2493
  @locking.ssynchronized(_LOCK)
2494
  @_RequireOpenQueue
2495
  def Shutdown(self):
2496
    """Stops the job queue.
2497

2498
    This shutdowns all the worker threads an closes the queue.
2499

2500
    """
2501
    self._wpool.TerminateWorkers()
2502

    
2503
    self._queue_filelock.Close()
2504
    self._queue_filelock = None