Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ a06c6ae8

History | View | Annotate | Download (72.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import time
35
import weakref
36
import threading
37
import itertools
38

    
39
try:
40
  # pylint: disable=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
59
from ganeti import ht
60
from ganeti import query
61
from ganeti import qlang
62

    
63

    
64
JOBQUEUE_THREADS = 25
65
JOBS_PER_ARCHIVE_DIRECTORY = 10000
66

    
67
# member lock names to be passed to @ssynchronized decorator
68
_LOCK = "_lock"
69
_QUEUE = "_queue"
70

    
71

    
72
class CancelJob(Exception):
73
  """Special exception to cancel a job.
74

75
  """
76

    
77

    
78
def TimeStampNow():
79
  """Returns the current timestamp.
80

81
  @rtype: tuple
82
  @return: the current time in the (seconds, microseconds) format
83

84
  """
85
  return utils.SplitTime(time.time())
86

    
87

    
88
class _SimpleJobQuery:
89
  """Wrapper for job queries.
90

91
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
92

93
  """
94
  def __init__(self, fields):
95
    """Initializes this class.
96

97
    """
98
    self._query = query.Query(query.JOB_FIELDS, fields)
99

    
100
  def __call__(self, job):
101
    """Executes a job query using cached field list.
102

103
    """
104
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
105

    
106

    
107
class _QueuedOpCode(object):
108
  """Encapsulates an opcode object.
109

110
  @ivar log: holds the execution log and consists of tuples
111
  of the form C{(log_serial, timestamp, level, message)}
112
  @ivar input: the OpCode we encapsulate
113
  @ivar status: the current status
114
  @ivar result: the result of the LU execution
115
  @ivar start_timestamp: timestamp for the start of the execution
116
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
117
  @ivar stop_timestamp: timestamp for the end of the execution
118

119
  """
120
  __slots__ = ["input", "status", "result", "log", "priority",
121
               "start_timestamp", "exec_timestamp", "end_timestamp",
122
               "__weakref__"]
123

    
124
  def __init__(self, op):
125
    """Initializes instances of this class.
126

127
    @type op: L{opcodes.OpCode}
128
    @param op: the opcode we encapsulate
129

130
    """
131
    self.input = op
132
    self.status = constants.OP_STATUS_QUEUED
133
    self.result = None
134
    self.log = []
135
    self.start_timestamp = None
136
    self.exec_timestamp = None
137
    self.end_timestamp = None
138

    
139
    # Get initial priority (it might change during the lifetime of this opcode)
140
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
141

    
142
  @classmethod
143
  def Restore(cls, state):
144
    """Restore the _QueuedOpCode from the serialized form.
145

146
    @type state: dict
147
    @param state: the serialized state
148
    @rtype: _QueuedOpCode
149
    @return: a new _QueuedOpCode instance
150

151
    """
152
    obj = _QueuedOpCode.__new__(cls)
153
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
154
    obj.status = state["status"]
155
    obj.result = state["result"]
156
    obj.log = state["log"]
157
    obj.start_timestamp = state.get("start_timestamp", None)
158
    obj.exec_timestamp = state.get("exec_timestamp", None)
159
    obj.end_timestamp = state.get("end_timestamp", None)
160
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
161
    return obj
162

    
163
  def Serialize(self):
164
    """Serializes this _QueuedOpCode.
165

166
    @rtype: dict
167
    @return: the dictionary holding the serialized state
168

169
    """
170
    return {
171
      "input": self.input.__getstate__(),
172
      "status": self.status,
173
      "result": self.result,
174
      "log": self.log,
175
      "start_timestamp": self.start_timestamp,
176
      "exec_timestamp": self.exec_timestamp,
177
      "end_timestamp": self.end_timestamp,
178
      "priority": self.priority,
179
      }
180

    
181

    
182
class _QueuedJob(object):
183
  """In-memory job representation.
184

185
  This is what we use to track the user-submitted jobs. Locking must
186
  be taken care of by users of this class.
187

188
  @type queue: L{JobQueue}
189
  @ivar queue: the parent queue
190
  @ivar id: the job ID
191
  @type ops: list
192
  @ivar ops: the list of _QueuedOpCode that constitute the job
193
  @type log_serial: int
194
  @ivar log_serial: holds the index for the next log entry
195
  @ivar received_timestamp: the timestamp for when the job was received
196
  @ivar start_timestmap: the timestamp for start of execution
197
  @ivar end_timestamp: the timestamp for end of execution
198
  @ivar writable: Whether the job is allowed to be modified
199

200
  """
201
  # pylint: disable=W0212
202
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
203
               "received_timestamp", "start_timestamp", "end_timestamp",
204
               "__weakref__", "processor_lock", "writable"]
205

    
206
  def __init__(self, queue, job_id, ops, writable):
207
    """Constructor for the _QueuedJob.
208

209
    @type queue: L{JobQueue}
210
    @param queue: our parent queue
211
    @type job_id: job_id
212
    @param job_id: our job id
213
    @type ops: list
214
    @param ops: the list of opcodes we hold, which will be encapsulated
215
        in _QueuedOpCodes
216
    @type writable: bool
217
    @param writable: Whether job can be modified
218

219
    """
220
    if not ops:
221
      raise errors.GenericError("A job needs at least one opcode")
222

    
223
    self.queue = queue
224
    self.id = job_id
225
    self.ops = [_QueuedOpCode(op) for op in ops]
226
    self.log_serial = 0
227
    self.received_timestamp = TimeStampNow()
228
    self.start_timestamp = None
229
    self.end_timestamp = None
230

    
231
    self._InitInMemory(self, writable)
232

    
233
  @staticmethod
234
  def _InitInMemory(obj, writable):
235
    """Initializes in-memory variables.
236

237
    """
238
    obj.writable = writable
239
    obj.ops_iter = None
240
    obj.cur_opctx = None
241

    
242
    # Read-only jobs are not processed and therefore don't need a lock
243
    if writable:
244
      obj.processor_lock = threading.Lock()
245
    else:
246
      obj.processor_lock = None
247

    
248
  def __repr__(self):
249
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
250
              "id=%s" % self.id,
251
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
252

    
253
    return "<%s at %#x>" % (" ".join(status), id(self))
254

    
255
  @classmethod
256
  def Restore(cls, queue, state, writable):
257
    """Restore a _QueuedJob from serialized state:
258

259
    @type queue: L{JobQueue}
260
    @param queue: to which queue the restored job belongs
261
    @type state: dict
262
    @param state: the serialized state
263
    @type writable: bool
264
    @param writable: Whether job can be modified
265
    @rtype: _JobQueue
266
    @return: the restored _JobQueue instance
267

268
    """
269
    obj = _QueuedJob.__new__(cls)
270
    obj.queue = queue
271
    obj.id = state["id"]
272
    obj.received_timestamp = state.get("received_timestamp", None)
273
    obj.start_timestamp = state.get("start_timestamp", None)
274
    obj.end_timestamp = state.get("end_timestamp", None)
275

    
276
    obj.ops = []
277
    obj.log_serial = 0
278
    for op_state in state["ops"]:
279
      op = _QueuedOpCode.Restore(op_state)
280
      for log_entry in op.log:
281
        obj.log_serial = max(obj.log_serial, log_entry[0])
282
      obj.ops.append(op)
283

    
284
    cls._InitInMemory(obj, writable)
285

    
286
    return obj
287

    
288
  def Serialize(self):
289
    """Serialize the _JobQueue instance.
290

291
    @rtype: dict
292
    @return: the serialized state
293

294
    """
295
    return {
296
      "id": self.id,
297
      "ops": [op.Serialize() for op in self.ops],
298
      "start_timestamp": self.start_timestamp,
299
      "end_timestamp": self.end_timestamp,
300
      "received_timestamp": self.received_timestamp,
301
      }
302

    
303
  def CalcStatus(self):
304
    """Compute the status of this job.
305

306
    This function iterates over all the _QueuedOpCodes in the job and
307
    based on their status, computes the job status.
308

309
    The algorithm is:
310
      - if we find a cancelled, or finished with error, the job
311
        status will be the same
312
      - otherwise, the last opcode with the status one of:
313
          - waitlock
314
          - canceling
315
          - running
316

317
        will determine the job status
318

319
      - otherwise, it means either all opcodes are queued, or success,
320
        and the job status will be the same
321

322
    @return: the job status
323

324
    """
325
    status = constants.JOB_STATUS_QUEUED
326

    
327
    all_success = True
328
    for op in self.ops:
329
      if op.status == constants.OP_STATUS_SUCCESS:
330
        continue
331

    
332
      all_success = False
333

    
334
      if op.status == constants.OP_STATUS_QUEUED:
335
        pass
336
      elif op.status == constants.OP_STATUS_WAITING:
337
        status = constants.JOB_STATUS_WAITING
338
      elif op.status == constants.OP_STATUS_RUNNING:
339
        status = constants.JOB_STATUS_RUNNING
340
      elif op.status == constants.OP_STATUS_CANCELING:
341
        status = constants.JOB_STATUS_CANCELING
342
        break
343
      elif op.status == constants.OP_STATUS_ERROR:
344
        status = constants.JOB_STATUS_ERROR
345
        # The whole job fails if one opcode failed
346
        break
347
      elif op.status == constants.OP_STATUS_CANCELED:
348
        status = constants.OP_STATUS_CANCELED
349
        break
350

    
351
    if all_success:
352
      status = constants.JOB_STATUS_SUCCESS
353

    
354
    return status
355

    
356
  def CalcPriority(self):
357
    """Gets the current priority for this job.
358

359
    Only unfinished opcodes are considered. When all are done, the default
360
    priority is used.
361

362
    @rtype: int
363

364
    """
365
    priorities = [op.priority for op in self.ops
366
                  if op.status not in constants.OPS_FINALIZED]
367

    
368
    if not priorities:
369
      # All opcodes are done, assume default priority
370
      return constants.OP_PRIO_DEFAULT
371

    
372
    return min(priorities)
373

    
374
  def GetLogEntries(self, newer_than):
375
    """Selectively returns the log entries.
376

377
    @type newer_than: None or int
378
    @param newer_than: if this is None, return all log entries,
379
        otherwise return only the log entries with serial higher
380
        than this value
381
    @rtype: list
382
    @return: the list of the log entries selected
383

384
    """
385
    if newer_than is None:
386
      serial = -1
387
    else:
388
      serial = newer_than
389

    
390
    entries = []
391
    for op in self.ops:
392
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
393

    
394
    return entries
395

    
396
  def GetInfo(self, fields):
397
    """Returns information about a job.
398

399
    @type fields: list
400
    @param fields: names of fields to return
401
    @rtype: list
402
    @return: list with one element for each field
403
    @raise errors.OpExecError: when an invalid field
404
        has been passed
405

406
    """
407
    return _SimpleJobQuery(fields)(self)
408

    
409
  def MarkUnfinishedOps(self, status, result):
410
    """Mark unfinished opcodes with a given status and result.
411

412
    This is an utility function for marking all running or waiting to
413
    be run opcodes with a given status. Opcodes which are already
414
    finalised are not changed.
415

416
    @param status: a given opcode status
417
    @param result: the opcode result
418

419
    """
420
    not_marked = True
421
    for op in self.ops:
422
      if op.status in constants.OPS_FINALIZED:
423
        assert not_marked, "Finalized opcodes found after non-finalized ones"
424
        continue
425
      op.status = status
426
      op.result = result
427
      not_marked = False
428

    
429
  def Finalize(self):
430
    """Marks the job as finalized.
431

432
    """
433
    self.end_timestamp = TimeStampNow()
434

    
435
  def Cancel(self):
436
    """Marks job as canceled/-ing if possible.
437

438
    @rtype: tuple; (bool, string)
439
    @return: Boolean describing whether job was successfully canceled or marked
440
      as canceling and a text message
441

442
    """
443
    status = self.CalcStatus()
444

    
445
    if status == constants.JOB_STATUS_QUEUED:
446
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
447
                             "Job canceled by request")
448
      self.Finalize()
449
      return (True, "Job %s canceled" % self.id)
450

    
451
    elif status == constants.JOB_STATUS_WAITING:
452
      # The worker will notice the new status and cancel the job
453
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
454
      return (True, "Job %s will be canceled" % self.id)
455

    
456
    else:
457
      logging.debug("Job %s is no longer waiting in the queue", self.id)
458
      return (False, "Job %s is no longer waiting in the queue" % self.id)
459

    
460

    
461
class _OpExecCallbacks(mcpu.OpExecCbBase):
462
  def __init__(self, queue, job, op):
463
    """Initializes this class.
464

465
    @type queue: L{JobQueue}
466
    @param queue: Job queue
467
    @type job: L{_QueuedJob}
468
    @param job: Job object
469
    @type op: L{_QueuedOpCode}
470
    @param op: OpCode
471

472
    """
473
    assert queue, "Queue is missing"
474
    assert job, "Job is missing"
475
    assert op, "Opcode is missing"
476

    
477
    self._queue = queue
478
    self._job = job
479
    self._op = op
480

    
481
  def _CheckCancel(self):
482
    """Raises an exception to cancel the job if asked to.
483

484
    """
485
    # Cancel here if we were asked to
486
    if self._op.status == constants.OP_STATUS_CANCELING:
487
      logging.debug("Canceling opcode")
488
      raise CancelJob()
489

    
490
  @locking.ssynchronized(_QUEUE, shared=1)
491
  def NotifyStart(self):
492
    """Mark the opcode as running, not lock-waiting.
493

494
    This is called from the mcpu code as a notifier function, when the LU is
495
    finally about to start the Exec() method. Of course, to have end-user
496
    visible results, the opcode must be initially (before calling into
497
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
498

499
    """
500
    assert self._op in self._job.ops
501
    assert self._op.status in (constants.OP_STATUS_WAITING,
502
                               constants.OP_STATUS_CANCELING)
503

    
504
    # Cancel here if we were asked to
505
    self._CheckCancel()
506

    
507
    logging.debug("Opcode is now running")
508

    
509
    self._op.status = constants.OP_STATUS_RUNNING
510
    self._op.exec_timestamp = TimeStampNow()
511

    
512
    # And finally replicate the job status
513
    self._queue.UpdateJobUnlocked(self._job)
514

    
515
  @locking.ssynchronized(_QUEUE, shared=1)
516
  def _AppendFeedback(self, timestamp, log_type, log_msg):
517
    """Internal feedback append function, with locks
518

519
    """
520
    self._job.log_serial += 1
521
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
522
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
523

    
524
  def Feedback(self, *args):
525
    """Append a log entry.
526

527
    """
528
    assert len(args) < 3
529

    
530
    if len(args) == 1:
531
      log_type = constants.ELOG_MESSAGE
532
      log_msg = args[0]
533
    else:
534
      (log_type, log_msg) = args
535

    
536
    # The time is split to make serialization easier and not lose
537
    # precision.
538
    timestamp = utils.SplitTime(time.time())
539
    self._AppendFeedback(timestamp, log_type, log_msg)
540

    
541
  def CheckCancel(self):
542
    """Check whether job has been cancelled.
543

544
    """
545
    assert self._op.status in (constants.OP_STATUS_WAITING,
546
                               constants.OP_STATUS_CANCELING)
547

    
548
    # Cancel here if we were asked to
549
    self._CheckCancel()
550

    
551
  def SubmitManyJobs(self, jobs):
552
    """Submits jobs for processing.
553

554
    See L{JobQueue.SubmitManyJobs}.
555

556
    """
557
    # Locking is done in job queue
558
    return self._queue.SubmitManyJobs(jobs)
559

    
560

    
561
class _JobChangesChecker(object):
562
  def __init__(self, fields, prev_job_info, prev_log_serial):
563
    """Initializes this class.
564

565
    @type fields: list of strings
566
    @param fields: Fields requested by LUXI client
567
    @type prev_job_info: string
568
    @param prev_job_info: previous job info, as passed by the LUXI client
569
    @type prev_log_serial: string
570
    @param prev_log_serial: previous job serial, as passed by the LUXI client
571

572
    """
573
    self._fields = fields
574
    self._prev_job_info = prev_job_info
575
    self._prev_log_serial = prev_log_serial
576

    
577
  def __call__(self, job):
578
    """Checks whether job has changed.
579

580
    @type job: L{_QueuedJob}
581
    @param job: Job object
582

583
    """
584
    assert not job.writable, "Expected read-only job"
585

    
586
    status = job.CalcStatus()
587
    job_info = job.GetInfo(self._fields)
588
    log_entries = job.GetLogEntries(self._prev_log_serial)
589

    
590
    # Serializing and deserializing data can cause type changes (e.g. from
591
    # tuple to list) or precision loss. We're doing it here so that we get
592
    # the same modifications as the data received from the client. Without
593
    # this, the comparison afterwards might fail without the data being
594
    # significantly different.
595
    # TODO: we just deserialized from disk, investigate how to make sure that
596
    # the job info and log entries are compatible to avoid this further step.
597
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
598
    # efficient, though floats will be tricky
599
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
600
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
601

    
602
    # Don't even try to wait if the job is no longer running, there will be
603
    # no changes.
604
    if (status not in (constants.JOB_STATUS_QUEUED,
605
                       constants.JOB_STATUS_RUNNING,
606
                       constants.JOB_STATUS_WAITING) or
607
        job_info != self._prev_job_info or
608
        (log_entries and self._prev_log_serial != log_entries[0][0])):
609
      logging.debug("Job %s changed", job.id)
610
      return (job_info, log_entries)
611

    
612
    return None
613

    
614

    
615
class _JobFileChangesWaiter(object):
616
  def __init__(self, filename):
617
    """Initializes this class.
618

619
    @type filename: string
620
    @param filename: Path to job file
621
    @raises errors.InotifyError: if the notifier cannot be setup
622

623
    """
624
    self._wm = pyinotify.WatchManager()
625
    self._inotify_handler = \
626
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
627
    self._notifier = \
628
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
629
    try:
630
      self._inotify_handler.enable()
631
    except Exception:
632
      # pyinotify doesn't close file descriptors automatically
633
      self._notifier.stop()
634
      raise
635

    
636
  def _OnInotify(self, notifier_enabled):
637
    """Callback for inotify.
638

639
    """
640
    if not notifier_enabled:
641
      self._inotify_handler.enable()
642

    
643
  def Wait(self, timeout):
644
    """Waits for the job file to change.
645

646
    @type timeout: float
647
    @param timeout: Timeout in seconds
648
    @return: Whether there have been events
649

650
    """
651
    assert timeout >= 0
652
    have_events = self._notifier.check_events(timeout * 1000)
653
    if have_events:
654
      self._notifier.read_events()
655
    self._notifier.process_events()
656
    return have_events
657

    
658
  def Close(self):
659
    """Closes underlying notifier and its file descriptor.
660

661
    """
662
    self._notifier.stop()
663

    
664

    
665
class _JobChangesWaiter(object):
666
  def __init__(self, filename):
667
    """Initializes this class.
668

669
    @type filename: string
670
    @param filename: Path to job file
671

672
    """
673
    self._filewaiter = None
674
    self._filename = filename
675

    
676
  def Wait(self, timeout):
677
    """Waits for a job to change.
678

679
    @type timeout: float
680
    @param timeout: Timeout in seconds
681
    @return: Whether there have been events
682

683
    """
684
    if self._filewaiter:
685
      return self._filewaiter.Wait(timeout)
686

    
687
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
688
    # If this point is reached, return immediately and let caller check the job
689
    # file again in case there were changes since the last check. This avoids a
690
    # race condition.
691
    self._filewaiter = _JobFileChangesWaiter(self._filename)
692

    
693
    return True
694

    
695
  def Close(self):
696
    """Closes underlying waiter.
697

698
    """
699
    if self._filewaiter:
700
      self._filewaiter.Close()
701

    
702

    
703
class _WaitForJobChangesHelper(object):
704
  """Helper class using inotify to wait for changes in a job file.
705

706
  This class takes a previous job status and serial, and alerts the client when
707
  the current job status has changed.
708

709
  """
710
  @staticmethod
711
  def _CheckForChanges(counter, job_load_fn, check_fn):
712
    if counter.next() > 0:
713
      # If this isn't the first check the job is given some more time to change
714
      # again. This gives better performance for jobs generating many
715
      # changes/messages.
716
      time.sleep(0.1)
717

    
718
    job = job_load_fn()
719
    if not job:
720
      raise errors.JobLost()
721

    
722
    result = check_fn(job)
723
    if result is None:
724
      raise utils.RetryAgain()
725

    
726
    return result
727

    
728
  def __call__(self, filename, job_load_fn,
729
               fields, prev_job_info, prev_log_serial, timeout):
730
    """Waits for changes on a job.
731

732
    @type filename: string
733
    @param filename: File on which to wait for changes
734
    @type job_load_fn: callable
735
    @param job_load_fn: Function to load job
736
    @type fields: list of strings
737
    @param fields: Which fields to check for changes
738
    @type prev_job_info: list or None
739
    @param prev_job_info: Last job information returned
740
    @type prev_log_serial: int
741
    @param prev_log_serial: Last job message serial number
742
    @type timeout: float
743
    @param timeout: maximum time to wait in seconds
744

745
    """
746
    counter = itertools.count()
747
    try:
748
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
749
      waiter = _JobChangesWaiter(filename)
750
      try:
751
        return utils.Retry(compat.partial(self._CheckForChanges,
752
                                          counter, job_load_fn, check_fn),
753
                           utils.RETRY_REMAINING_TIME, timeout,
754
                           wait_fn=waiter.Wait)
755
      finally:
756
        waiter.Close()
757
    except (errors.InotifyError, errors.JobLost):
758
      return None
759
    except utils.RetryTimeout:
760
      return constants.JOB_NOTCHANGED
761

    
762

    
763
def _EncodeOpError(err):
764
  """Encodes an error which occurred while processing an opcode.
765

766
  """
767
  if isinstance(err, errors.GenericError):
768
    to_encode = err
769
  else:
770
    to_encode = errors.OpExecError(str(err))
771

    
772
  return errors.EncodeException(to_encode)
773

    
774

    
775
class _TimeoutStrategyWrapper:
776
  def __init__(self, fn):
777
    """Initializes this class.
778

779
    """
780
    self._fn = fn
781
    self._next = None
782

    
783
  def _Advance(self):
784
    """Gets the next timeout if necessary.
785

786
    """
787
    if self._next is None:
788
      self._next = self._fn()
789

    
790
  def Peek(self):
791
    """Returns the next timeout.
792

793
    """
794
    self._Advance()
795
    return self._next
796

    
797
  def Next(self):
798
    """Returns the current timeout and advances the internal state.
799

800
    """
801
    self._Advance()
802
    result = self._next
803
    self._next = None
804
    return result
805

    
806

    
807
class _OpExecContext:
808
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
809
    """Initializes this class.
810

811
    """
812
    self.op = op
813
    self.index = index
814
    self.log_prefix = log_prefix
815
    self.summary = op.input.Summary()
816

    
817
    # Create local copy to modify
818
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
819
      self.jobdeps = op.input.depends[:]
820
    else:
821
      self.jobdeps = None
822

    
823
    self._timeout_strategy_factory = timeout_strategy_factory
824
    self._ResetTimeoutStrategy()
825

    
826
  def _ResetTimeoutStrategy(self):
827
    """Creates a new timeout strategy.
828

829
    """
830
    self._timeout_strategy = \
831
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
832

    
833
  def CheckPriorityIncrease(self):
834
    """Checks whether priority can and should be increased.
835

836
    Called when locks couldn't be acquired.
837

838
    """
839
    op = self.op
840

    
841
    # Exhausted all retries and next round should not use blocking acquire
842
    # for locks?
843
    if (self._timeout_strategy.Peek() is None and
844
        op.priority > constants.OP_PRIO_HIGHEST):
845
      logging.debug("Increasing priority")
846
      op.priority -= 1
847
      self._ResetTimeoutStrategy()
848
      return True
849

    
850
    return False
851

    
852
  def GetNextLockTimeout(self):
853
    """Returns the next lock acquire timeout.
854

855
    """
856
    return self._timeout_strategy.Next()
857

    
858

    
859
class _JobProcessor(object):
860
  (DEFER,
861
   WAITDEP,
862
   FINISHED) = range(1, 4)
863

    
864
  def __init__(self, queue, opexec_fn, job,
865
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
866
    """Initializes this class.
867

868
    """
869
    self.queue = queue
870
    self.opexec_fn = opexec_fn
871
    self.job = job
872
    self._timeout_strategy_factory = _timeout_strategy_factory
873

    
874
  @staticmethod
875
  def _FindNextOpcode(job, timeout_strategy_factory):
876
    """Locates the next opcode to run.
877

878
    @type job: L{_QueuedJob}
879
    @param job: Job object
880
    @param timeout_strategy_factory: Callable to create new timeout strategy
881

882
    """
883
    # Create some sort of a cache to speed up locating next opcode for future
884
    # lookups
885
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
886
    # pending and one for processed ops.
887
    if job.ops_iter is None:
888
      job.ops_iter = enumerate(job.ops)
889

    
890
    # Find next opcode to run
891
    while True:
892
      try:
893
        (idx, op) = job.ops_iter.next()
894
      except StopIteration:
895
        raise errors.ProgrammerError("Called for a finished job")
896

    
897
      if op.status == constants.OP_STATUS_RUNNING:
898
        # Found an opcode already marked as running
899
        raise errors.ProgrammerError("Called for job marked as running")
900

    
901
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
902
                             timeout_strategy_factory)
903

    
904
      if op.status not in constants.OPS_FINALIZED:
905
        return opctx
906

    
907
      # This is a job that was partially completed before master daemon
908
      # shutdown, so it can be expected that some opcodes are already
909
      # completed successfully (if any did error out, then the whole job
910
      # should have been aborted and not resubmitted for processing).
911
      logging.info("%s: opcode %s already processed, skipping",
912
                   opctx.log_prefix, opctx.summary)
913

    
914
  @staticmethod
915
  def _MarkWaitlock(job, op):
916
    """Marks an opcode as waiting for locks.
917

918
    The job's start timestamp is also set if necessary.
919

920
    @type job: L{_QueuedJob}
921
    @param job: Job object
922
    @type op: L{_QueuedOpCode}
923
    @param op: Opcode object
924

925
    """
926
    assert op in job.ops
927
    assert op.status in (constants.OP_STATUS_QUEUED,
928
                         constants.OP_STATUS_WAITING)
929

    
930
    update = False
931

    
932
    op.result = None
933

    
934
    if op.status == constants.OP_STATUS_QUEUED:
935
      op.status = constants.OP_STATUS_WAITING
936
      update = True
937

    
938
    if op.start_timestamp is None:
939
      op.start_timestamp = TimeStampNow()
940
      update = True
941

    
942
    if job.start_timestamp is None:
943
      job.start_timestamp = op.start_timestamp
944
      update = True
945

    
946
    assert op.status == constants.OP_STATUS_WAITING
947

    
948
    return update
949

    
950
  @staticmethod
951
  def _CheckDependencies(queue, job, opctx):
952
    """Checks if an opcode has dependencies and if so, processes them.
953

954
    @type queue: L{JobQueue}
955
    @param queue: Queue object
956
    @type job: L{_QueuedJob}
957
    @param job: Job object
958
    @type opctx: L{_OpExecContext}
959
    @param opctx: Opcode execution context
960
    @rtype: bool
961
    @return: Whether opcode will be re-scheduled by dependency tracker
962

963
    """
964
    op = opctx.op
965

    
966
    result = False
967

    
968
    while opctx.jobdeps:
969
      (dep_job_id, dep_status) = opctx.jobdeps[0]
970

    
971
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
972
                                                          dep_status)
973
      assert ht.TNonEmptyString(depmsg), "No dependency message"
974

    
975
      logging.info("%s: %s", opctx.log_prefix, depmsg)
976

    
977
      if depresult == _JobDependencyManager.CONTINUE:
978
        # Remove dependency and continue
979
        opctx.jobdeps.pop(0)
980

    
981
      elif depresult == _JobDependencyManager.WAIT:
982
        # Need to wait for notification, dependency tracker will re-add job
983
        # to workerpool
984
        result = True
985
        break
986

    
987
      elif depresult == _JobDependencyManager.CANCEL:
988
        # Job was cancelled, cancel this job as well
989
        job.Cancel()
990
        assert op.status == constants.OP_STATUS_CANCELING
991
        break
992

    
993
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
994
                         _JobDependencyManager.ERROR):
995
        # Job failed or there was an error, this job must fail
996
        op.status = constants.OP_STATUS_ERROR
997
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
998
        break
999

    
1000
      else:
1001
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1002
                                     depresult)
1003

    
1004
    return result
1005

    
1006
  def _ExecOpCodeUnlocked(self, opctx):
1007
    """Processes one opcode and returns the result.
1008

1009
    """
1010
    op = opctx.op
1011

    
1012
    assert op.status == constants.OP_STATUS_WAITING
1013

    
1014
    timeout = opctx.GetNextLockTimeout()
1015

    
1016
    try:
1017
      # Make sure not to hold queue lock while calling ExecOpCode
1018
      result = self.opexec_fn(op.input,
1019
                              _OpExecCallbacks(self.queue, self.job, op),
1020
                              timeout=timeout, priority=op.priority)
1021
    except mcpu.LockAcquireTimeout:
1022
      assert timeout is not None, "Received timeout for blocking acquire"
1023
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1024

    
1025
      assert op.status in (constants.OP_STATUS_WAITING,
1026
                           constants.OP_STATUS_CANCELING)
1027

    
1028
      # Was job cancelled while we were waiting for the lock?
1029
      if op.status == constants.OP_STATUS_CANCELING:
1030
        return (constants.OP_STATUS_CANCELING, None)
1031

    
1032
      # Stay in waitlock while trying to re-acquire lock
1033
      return (constants.OP_STATUS_WAITING, None)
1034
    except CancelJob:
1035
      logging.exception("%s: Canceling job", opctx.log_prefix)
1036
      assert op.status == constants.OP_STATUS_CANCELING
1037
      return (constants.OP_STATUS_CANCELING, None)
1038
    except Exception, err: # pylint: disable=W0703
1039
      logging.exception("%s: Caught exception in %s",
1040
                        opctx.log_prefix, opctx.summary)
1041
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1042
    else:
1043
      logging.debug("%s: %s successful",
1044
                    opctx.log_prefix, opctx.summary)
1045
      return (constants.OP_STATUS_SUCCESS, result)
1046

    
1047
  def __call__(self, _nextop_fn=None):
1048
    """Continues execution of a job.
1049

1050
    @param _nextop_fn: Callback function for tests
1051
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1052
      be deferred and C{WAITDEP} if the dependency manager
1053
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1054

1055
    """
1056
    queue = self.queue
1057
    job = self.job
1058

    
1059
    logging.debug("Processing job %s", job.id)
1060

    
1061
    queue.acquire(shared=1)
1062
    try:
1063
      opcount = len(job.ops)
1064

    
1065
      assert job.writable, "Expected writable job"
1066

    
1067
      # Don't do anything for finalized jobs
1068
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1069
        return self.FINISHED
1070

    
1071
      # Is a previous opcode still pending?
1072
      if job.cur_opctx:
1073
        opctx = job.cur_opctx
1074
        job.cur_opctx = None
1075
      else:
1076
        if __debug__ and _nextop_fn:
1077
          _nextop_fn()
1078
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1079

    
1080
      op = opctx.op
1081

    
1082
      # Consistency check
1083
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1084
                                     constants.OP_STATUS_CANCELING)
1085
                        for i in job.ops[opctx.index + 1:])
1086

    
1087
      assert op.status in (constants.OP_STATUS_QUEUED,
1088
                           constants.OP_STATUS_WAITING,
1089
                           constants.OP_STATUS_CANCELING)
1090

    
1091
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1092
              op.priority >= constants.OP_PRIO_HIGHEST)
1093

    
1094
      waitjob = None
1095

    
1096
      if op.status != constants.OP_STATUS_CANCELING:
1097
        assert op.status in (constants.OP_STATUS_QUEUED,
1098
                             constants.OP_STATUS_WAITING)
1099

    
1100
        # Prepare to start opcode
1101
        if self._MarkWaitlock(job, op):
1102
          # Write to disk
1103
          queue.UpdateJobUnlocked(job)
1104

    
1105
        assert op.status == constants.OP_STATUS_WAITING
1106
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1107
        assert job.start_timestamp and op.start_timestamp
1108
        assert waitjob is None
1109

    
1110
        # Check if waiting for a job is necessary
1111
        waitjob = self._CheckDependencies(queue, job, opctx)
1112

    
1113
        assert op.status in (constants.OP_STATUS_WAITING,
1114
                             constants.OP_STATUS_CANCELING,
1115
                             constants.OP_STATUS_ERROR)
1116

    
1117
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1118
                                         constants.OP_STATUS_ERROR)):
1119
          logging.info("%s: opcode %s waiting for locks",
1120
                       opctx.log_prefix, opctx.summary)
1121

    
1122
          assert not opctx.jobdeps, "Not all dependencies were removed"
1123

    
1124
          queue.release()
1125
          try:
1126
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1127
          finally:
1128
            queue.acquire(shared=1)
1129

    
1130
          op.status = op_status
1131
          op.result = op_result
1132

    
1133
          assert not waitjob
1134

    
1135
        if op.status == constants.OP_STATUS_WAITING:
1136
          # Couldn't get locks in time
1137
          assert not op.end_timestamp
1138
        else:
1139
          # Finalize opcode
1140
          op.end_timestamp = TimeStampNow()
1141

    
1142
          if op.status == constants.OP_STATUS_CANCELING:
1143
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1144
                                  for i in job.ops[opctx.index:])
1145
          else:
1146
            assert op.status in constants.OPS_FINALIZED
1147

    
1148
      if op.status == constants.OP_STATUS_WAITING or waitjob:
1149
        finalize = False
1150

    
1151
        if not waitjob and opctx.CheckPriorityIncrease():
1152
          # Priority was changed, need to update on-disk file
1153
          queue.UpdateJobUnlocked(job)
1154

    
1155
        # Keep around for another round
1156
        job.cur_opctx = opctx
1157

    
1158
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1159
                op.priority >= constants.OP_PRIO_HIGHEST)
1160

    
1161
        # In no case must the status be finalized here
1162
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1163

    
1164
      else:
1165
        # Ensure all opcodes so far have been successful
1166
        assert (opctx.index == 0 or
1167
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1168
                           for i in job.ops[:opctx.index]))
1169

    
1170
        # Reset context
1171
        job.cur_opctx = None
1172

    
1173
        if op.status == constants.OP_STATUS_SUCCESS:
1174
          finalize = False
1175

    
1176
        elif op.status == constants.OP_STATUS_ERROR:
1177
          # Ensure failed opcode has an exception as its result
1178
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1179

    
1180
          to_encode = errors.OpExecError("Preceding opcode failed")
1181
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1182
                                _EncodeOpError(to_encode))
1183
          finalize = True
1184

    
1185
          # Consistency check
1186
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1187
                            errors.GetEncodedError(i.result)
1188
                            for i in job.ops[opctx.index:])
1189

    
1190
        elif op.status == constants.OP_STATUS_CANCELING:
1191
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1192
                                "Job canceled by request")
1193
          finalize = True
1194

    
1195
        else:
1196
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1197

    
1198
        if opctx.index == (opcount - 1):
1199
          # Finalize on last opcode
1200
          finalize = True
1201

    
1202
        if finalize:
1203
          # All opcodes have been run, finalize job
1204
          job.Finalize()
1205

    
1206
        # Write to disk. If the job status is final, this is the final write
1207
        # allowed. Once the file has been written, it can be archived anytime.
1208
        queue.UpdateJobUnlocked(job)
1209

    
1210
        assert not waitjob
1211

    
1212
        if finalize:
1213
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1214
          return self.FINISHED
1215

    
1216
      assert not waitjob or queue.depmgr.JobWaiting(job)
1217

    
1218
      if waitjob:
1219
        return self.WAITDEP
1220
      else:
1221
        return self.DEFER
1222
    finally:
1223
      assert job.writable, "Job became read-only while being processed"
1224
      queue.release()
1225

    
1226

    
1227
def _EvaluateJobProcessorResult(depmgr, job, result):
1228
  """Looks at a result from L{_JobProcessor} for a job.
1229

1230
  To be used in a L{_JobQueueWorker}.
1231

1232
  """
1233
  if result == _JobProcessor.FINISHED:
1234
    # Notify waiting jobs
1235
    depmgr.NotifyWaiters(job.id)
1236

    
1237
  elif result == _JobProcessor.DEFER:
1238
    # Schedule again
1239
    raise workerpool.DeferTask(priority=job.CalcPriority())
1240

    
1241
  elif result == _JobProcessor.WAITDEP:
1242
    # No-op, dependency manager will re-schedule
1243
    pass
1244

    
1245
  else:
1246
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1247
                                 (result, ))
1248

    
1249

    
1250
class _JobQueueWorker(workerpool.BaseWorker):
1251
  """The actual job workers.
1252

1253
  """
1254
  def RunTask(self, job): # pylint: disable=W0221
1255
    """Job executor.
1256

1257
    @type job: L{_QueuedJob}
1258
    @param job: the job to be processed
1259

1260
    """
1261
    assert job.writable, "Expected writable job"
1262

    
1263
    # Ensure only one worker is active on a single job. If a job registers for
1264
    # a dependency job, and the other job notifies before the first worker is
1265
    # done, the job can end up in the tasklist more than once.
1266
    job.processor_lock.acquire()
1267
    try:
1268
      return self._RunTaskInner(job)
1269
    finally:
1270
      job.processor_lock.release()
1271

    
1272
  def _RunTaskInner(self, job):
1273
    """Executes a job.
1274

1275
    Must be called with per-job lock acquired.
1276

1277
    """
1278
    queue = job.queue
1279
    assert queue == self.pool.queue
1280

    
1281
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1282
    setname_fn(None)
1283

    
1284
    proc = mcpu.Processor(queue.context, job.id)
1285

    
1286
    # Create wrapper for setting thread name
1287
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1288
                                    proc.ExecOpCode)
1289

    
1290
    _EvaluateJobProcessorResult(queue.depmgr, job,
1291
                                _JobProcessor(queue, wrap_execop_fn, job)())
1292

    
1293
  @staticmethod
1294
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1295
    """Updates the worker thread name to include a short summary of the opcode.
1296

1297
    @param setname_fn: Callable setting worker thread name
1298
    @param execop_fn: Callable for executing opcode (usually
1299
                      L{mcpu.Processor.ExecOpCode})
1300

1301
    """
1302
    setname_fn(op)
1303
    try:
1304
      return execop_fn(op, *args, **kwargs)
1305
    finally:
1306
      setname_fn(None)
1307

    
1308
  @staticmethod
1309
  def _GetWorkerName(job, op):
1310
    """Sets the worker thread name.
1311

1312
    @type job: L{_QueuedJob}
1313
    @type op: L{opcodes.OpCode}
1314

1315
    """
1316
    parts = ["Job%s" % job.id]
1317

    
1318
    if op:
1319
      parts.append(op.TinySummary())
1320

    
1321
    return "/".join(parts)
1322

    
1323

    
1324
class _JobQueueWorkerPool(workerpool.WorkerPool):
1325
  """Simple class implementing a job-processing workerpool.
1326

1327
  """
1328
  def __init__(self, queue):
1329
    super(_JobQueueWorkerPool, self).__init__("Jq",
1330
                                              JOBQUEUE_THREADS,
1331
                                              _JobQueueWorker)
1332
    self.queue = queue
1333

    
1334

    
1335
class _JobDependencyManager:
1336
  """Keeps track of job dependencies.
1337

1338
  """
1339
  (WAIT,
1340
   ERROR,
1341
   CANCEL,
1342
   CONTINUE,
1343
   WRONGSTATUS) = range(1, 6)
1344

    
1345
  def __init__(self, getstatus_fn, enqueue_fn):
1346
    """Initializes this class.
1347

1348
    """
1349
    self._getstatus_fn = getstatus_fn
1350
    self._enqueue_fn = enqueue_fn
1351

    
1352
    self._waiters = {}
1353
    self._lock = locking.SharedLock("JobDepMgr")
1354

    
1355
  @locking.ssynchronized(_LOCK, shared=1)
1356
  def GetLockInfo(self, requested): # pylint: disable=W0613
1357
    """Retrieves information about waiting jobs.
1358

1359
    @type requested: set
1360
    @param requested: Requested information, see C{query.LQ_*}
1361

1362
    """
1363
    # No need to sort here, that's being done by the lock manager and query
1364
    # library. There are no priorities for notifying jobs, hence all show up as
1365
    # one item under "pending".
1366
    return [("job/%s" % job_id, None, None,
1367
             [("job", [job.id for job in waiters])])
1368
            for job_id, waiters in self._waiters.items()
1369
            if waiters]
1370

    
1371
  @locking.ssynchronized(_LOCK, shared=1)
1372
  def JobWaiting(self, job):
1373
    """Checks if a job is waiting.
1374

1375
    """
1376
    return compat.any(job in jobs
1377
                      for jobs in self._waiters.values())
1378

    
1379
  @locking.ssynchronized(_LOCK)
1380
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1381
    """Checks if a dependency job has the requested status.
1382

1383
    If the other job is not yet in a finalized status, the calling job will be
1384
    notified (re-added to the workerpool) at a later point.
1385

1386
    @type job: L{_QueuedJob}
1387
    @param job: Job object
1388
    @type dep_job_id: string
1389
    @param dep_job_id: ID of dependency job
1390
    @type dep_status: list
1391
    @param dep_status: Required status
1392

1393
    """
1394
    assert ht.TString(job.id)
1395
    assert ht.TString(dep_job_id)
1396
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1397

    
1398
    if job.id == dep_job_id:
1399
      return (self.ERROR, "Job can't depend on itself")
1400

    
1401
    # Get status of dependency job
1402
    try:
1403
      status = self._getstatus_fn(dep_job_id)
1404
    except errors.JobLost, err:
1405
      return (self.ERROR, "Dependency error: %s" % err)
1406

    
1407
    assert status in constants.JOB_STATUS_ALL
1408

    
1409
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1410

    
1411
    if status not in constants.JOBS_FINALIZED:
1412
      # Register for notification and wait for job to finish
1413
      job_id_waiters.add(job)
1414
      return (self.WAIT,
1415
              "Need to wait for job %s, wanted status '%s'" %
1416
              (dep_job_id, dep_status))
1417

    
1418
    # Remove from waiters list
1419
    if job in job_id_waiters:
1420
      job_id_waiters.remove(job)
1421

    
1422
    if (status == constants.JOB_STATUS_CANCELED and
1423
        constants.JOB_STATUS_CANCELED not in dep_status):
1424
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1425

    
1426
    elif not dep_status or status in dep_status:
1427
      return (self.CONTINUE,
1428
              "Dependency job %s finished with status '%s'" %
1429
              (dep_job_id, status))
1430

    
1431
    else:
1432
      return (self.WRONGSTATUS,
1433
              "Dependency job %s finished with status '%s',"
1434
              " not one of '%s' as required" %
1435
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1436

    
1437
  def _RemoveEmptyWaitersUnlocked(self):
1438
    """Remove all jobs without actual waiters.
1439

1440
    """
1441
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1442
                   if not waiters]:
1443
      del self._waiters[job_id]
1444

    
1445
  def NotifyWaiters(self, job_id):
1446
    """Notifies all jobs waiting for a certain job ID.
1447

1448
    @attention: Do not call until L{CheckAndRegister} returned a status other
1449
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1450
    @type job_id: string
1451
    @param job_id: Job ID
1452

1453
    """
1454
    assert ht.TString(job_id)
1455

    
1456
    self._lock.acquire()
1457
    try:
1458
      self._RemoveEmptyWaitersUnlocked()
1459

    
1460
      jobs = self._waiters.pop(job_id, None)
1461
    finally:
1462
      self._lock.release()
1463

    
1464
    if jobs:
1465
      # Re-add jobs to workerpool
1466
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1467
                    len(jobs), job_id)
1468
      self._enqueue_fn(jobs)
1469

    
1470

    
1471
def _RequireOpenQueue(fn):
1472
  """Decorator for "public" functions.
1473

1474
  This function should be used for all 'public' functions. That is,
1475
  functions usually called from other classes. Note that this should
1476
  be applied only to methods (not plain functions), since it expects
1477
  that the decorated function is called with a first argument that has
1478
  a '_queue_filelock' argument.
1479

1480
  @warning: Use this decorator only after locking.ssynchronized
1481

1482
  Example::
1483
    @locking.ssynchronized(_LOCK)
1484
    @_RequireOpenQueue
1485
    def Example(self):
1486
      pass
1487

1488
  """
1489
  def wrapper(self, *args, **kwargs):
1490
    # pylint: disable=W0212
1491
    assert self._queue_filelock is not None, "Queue should be open"
1492
    return fn(self, *args, **kwargs)
1493
  return wrapper
1494

    
1495

    
1496
def _RequireNonDrainedQueue(fn):
1497
  """Decorator checking for a non-drained queue.
1498

1499
  To be used with functions submitting new jobs.
1500

1501
  """
1502
  def wrapper(self, *args, **kwargs):
1503
    """Wrapper function.
1504

1505
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1506

1507
    """
1508
    # Ok when sharing the big job queue lock, as the drain file is created when
1509
    # the lock is exclusive.
1510
    # Needs access to protected member, pylint: disable=W0212
1511
    if self._drained:
1512
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1513

    
1514
    if not self._accepting_jobs:
1515
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1516

    
1517
    return fn(self, *args, **kwargs)
1518
  return wrapper
1519

    
1520

    
1521
class JobQueue(object):
1522
  """Queue used to manage the jobs.
1523

1524
  """
1525
  def __init__(self, context):
1526
    """Constructor for JobQueue.
1527

1528
    The constructor will initialize the job queue object and then
1529
    start loading the current jobs from disk, either for starting them
1530
    (if they were queue) or for aborting them (if they were already
1531
    running).
1532

1533
    @type context: GanetiContext
1534
    @param context: the context object for access to the configuration
1535
        data and other ganeti objects
1536

1537
    """
1538
    self.context = context
1539
    self._memcache = weakref.WeakValueDictionary()
1540
    self._my_hostname = netutils.Hostname.GetSysName()
1541

    
1542
    # The Big JobQueue lock. If a code block or method acquires it in shared
1543
    # mode safe it must guarantee concurrency with all the code acquiring it in
1544
    # shared mode, including itself. In order not to acquire it at all
1545
    # concurrency must be guaranteed with all code acquiring it in shared mode
1546
    # and all code acquiring it exclusively.
1547
    self._lock = locking.SharedLock("JobQueue")
1548

    
1549
    self.acquire = self._lock.acquire
1550
    self.release = self._lock.release
1551

    
1552
    # Accept jobs by default
1553
    self._accepting_jobs = True
1554

    
1555
    # Initialize the queue, and acquire the filelock.
1556
    # This ensures no other process is working on the job queue.
1557
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1558

    
1559
    # Read serial file
1560
    self._last_serial = jstore.ReadSerial()
1561
    assert self._last_serial is not None, ("Serial file was modified between"
1562
                                           " check in jstore and here")
1563

    
1564
    # Get initial list of nodes
1565
    self._nodes = dict((n.name, n.primary_ip)
1566
                       for n in self.context.cfg.GetAllNodesInfo().values()
1567
                       if n.master_candidate)
1568

    
1569
    # Remove master node
1570
    self._nodes.pop(self._my_hostname, None)
1571

    
1572
    # TODO: Check consistency across nodes
1573

    
1574
    self._queue_size = None
1575
    self._UpdateQueueSizeUnlocked()
1576
    assert ht.TInt(self._queue_size)
1577
    self._drained = jstore.CheckDrainFlag()
1578

    
1579
    # Job dependencies
1580
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1581
                                        self._EnqueueJobs)
1582
    self.context.glm.AddToLockMonitor(self.depmgr)
1583

    
1584
    # Setup worker pool
1585
    self._wpool = _JobQueueWorkerPool(self)
1586
    try:
1587
      self._InspectQueue()
1588
    except:
1589
      self._wpool.TerminateWorkers()
1590
      raise
1591

    
1592
  @locking.ssynchronized(_LOCK)
1593
  @_RequireOpenQueue
1594
  def _InspectQueue(self):
1595
    """Loads the whole job queue and resumes unfinished jobs.
1596

1597
    This function needs the lock here because WorkerPool.AddTask() may start a
1598
    job while we're still doing our work.
1599

1600
    """
1601
    logging.info("Inspecting job queue")
1602

    
1603
    restartjobs = []
1604

    
1605
    all_job_ids = self._GetJobIDsUnlocked()
1606
    jobs_count = len(all_job_ids)
1607
    lastinfo = time.time()
1608
    for idx, job_id in enumerate(all_job_ids):
1609
      # Give an update every 1000 jobs or 10 seconds
1610
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1611
          idx == (jobs_count - 1)):
1612
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1613
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1614
        lastinfo = time.time()
1615

    
1616
      job = self._LoadJobUnlocked(job_id)
1617

    
1618
      # a failure in loading the job can cause 'None' to be returned
1619
      if job is None:
1620
        continue
1621

    
1622
      status = job.CalcStatus()
1623

    
1624
      if status == constants.JOB_STATUS_QUEUED:
1625
        restartjobs.append(job)
1626

    
1627
      elif status in (constants.JOB_STATUS_RUNNING,
1628
                      constants.JOB_STATUS_WAITING,
1629
                      constants.JOB_STATUS_CANCELING):
1630
        logging.warning("Unfinished job %s found: %s", job.id, job)
1631

    
1632
        if status == constants.JOB_STATUS_WAITING:
1633
          # Restart job
1634
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1635
          restartjobs.append(job)
1636
        else:
1637
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1638
                                "Unclean master daemon shutdown")
1639
          job.Finalize()
1640

    
1641
        self.UpdateJobUnlocked(job)
1642

    
1643
    if restartjobs:
1644
      logging.info("Restarting %s jobs", len(restartjobs))
1645
      self._EnqueueJobsUnlocked(restartjobs)
1646

    
1647
    logging.info("Job queue inspection finished")
1648

    
1649
  def _GetRpc(self, address_list):
1650
    """Gets RPC runner with context.
1651

1652
    """
1653
    return rpc.JobQueueRunner(self.context, address_list)
1654

    
1655
  @locking.ssynchronized(_LOCK)
1656
  @_RequireOpenQueue
1657
  def AddNode(self, node):
1658
    """Register a new node with the queue.
1659

1660
    @type node: L{objects.Node}
1661
    @param node: the node object to be added
1662

1663
    """
1664
    node_name = node.name
1665
    assert node_name != self._my_hostname
1666

    
1667
    # Clean queue directory on added node
1668
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1669
    msg = result.fail_msg
1670
    if msg:
1671
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1672
                      node_name, msg)
1673

    
1674
    if not node.master_candidate:
1675
      # remove if existing, ignoring errors
1676
      self._nodes.pop(node_name, None)
1677
      # and skip the replication of the job ids
1678
      return
1679

    
1680
    # Upload the whole queue excluding archived jobs
1681
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1682

    
1683
    # Upload current serial file
1684
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1685

    
1686
    # Static address list
1687
    addrs = [node.primary_ip]
1688

    
1689
    for file_name in files:
1690
      # Read file content
1691
      content = utils.ReadFile(file_name)
1692

    
1693
      result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name,
1694
                                                        content)
1695
      msg = result[node_name].fail_msg
1696
      if msg:
1697
        logging.error("Failed to upload file %s to node %s: %s",
1698
                      file_name, node_name, msg)
1699

    
1700
    self._nodes[node_name] = node.primary_ip
1701

    
1702
  @locking.ssynchronized(_LOCK)
1703
  @_RequireOpenQueue
1704
  def RemoveNode(self, node_name):
1705
    """Callback called when removing nodes from the cluster.
1706

1707
    @type node_name: str
1708
    @param node_name: the name of the node to remove
1709

1710
    """
1711
    self._nodes.pop(node_name, None)
1712

    
1713
  @staticmethod
1714
  def _CheckRpcResult(result, nodes, failmsg):
1715
    """Verifies the status of an RPC call.
1716

1717
    Since we aim to keep consistency should this node (the current
1718
    master) fail, we will log errors if our rpc fail, and especially
1719
    log the case when more than half of the nodes fails.
1720

1721
    @param result: the data as returned from the rpc call
1722
    @type nodes: list
1723
    @param nodes: the list of nodes we made the call to
1724
    @type failmsg: str
1725
    @param failmsg: the identifier to be used for logging
1726

1727
    """
1728
    failed = []
1729
    success = []
1730

    
1731
    for node in nodes:
1732
      msg = result[node].fail_msg
1733
      if msg:
1734
        failed.append(node)
1735
        logging.error("RPC call %s (%s) failed on node %s: %s",
1736
                      result[node].call, failmsg, node, msg)
1737
      else:
1738
        success.append(node)
1739

    
1740
    # +1 for the master node
1741
    if (len(success) + 1) < len(failed):
1742
      # TODO: Handle failing nodes
1743
      logging.error("More than half of the nodes failed")
1744

    
1745
  def _GetNodeIp(self):
1746
    """Helper for returning the node name/ip list.
1747

1748
    @rtype: (list, list)
1749
    @return: a tuple of two lists, the first one with the node
1750
        names and the second one with the node addresses
1751

1752
    """
1753
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1754
    name_list = self._nodes.keys()
1755
    addr_list = [self._nodes[name] for name in name_list]
1756
    return name_list, addr_list
1757

    
1758
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1759
    """Writes a file locally and then replicates it to all nodes.
1760

1761
    This function will replace the contents of a file on the local
1762
    node and then replicate it to all the other nodes we have.
1763

1764
    @type file_name: str
1765
    @param file_name: the path of the file to be replicated
1766
    @type data: str
1767
    @param data: the new contents of the file
1768
    @type replicate: boolean
1769
    @param replicate: whether to spread the changes to the remote nodes
1770

1771
    """
1772
    getents = runtime.GetEnts()
1773
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1774
                    gid=getents.masterd_gid)
1775

    
1776
    if replicate:
1777
      names, addrs = self._GetNodeIp()
1778
      result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data)
1779
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1780

    
1781
  def _RenameFilesUnlocked(self, rename):
1782
    """Renames a file locally and then replicate the change.
1783

1784
    This function will rename a file in the local queue directory
1785
    and then replicate this rename to all the other nodes we have.
1786

1787
    @type rename: list of (old, new)
1788
    @param rename: List containing tuples mapping old to new names
1789

1790
    """
1791
    # Rename them locally
1792
    for old, new in rename:
1793
      utils.RenameFile(old, new, mkdir=True)
1794

    
1795
    # ... and on all nodes
1796
    names, addrs = self._GetNodeIp()
1797
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1798
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1799

    
1800
  @staticmethod
1801
  def _FormatJobID(job_id):
1802
    """Convert a job ID to string format.
1803

1804
    Currently this just does C{str(job_id)} after performing some
1805
    checks, but if we want to change the job id format this will
1806
    abstract this change.
1807

1808
    @type job_id: int or long
1809
    @param job_id: the numeric job id
1810
    @rtype: str
1811
    @return: the formatted job id
1812

1813
    """
1814
    if not isinstance(job_id, (int, long)):
1815
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1816
    if job_id < 0:
1817
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1818

    
1819
    return str(job_id)
1820

    
1821
  @classmethod
1822
  def _GetArchiveDirectory(cls, job_id):
1823
    """Returns the archive directory for a job.
1824

1825
    @type job_id: str
1826
    @param job_id: Job identifier
1827
    @rtype: str
1828
    @return: Directory name
1829

1830
    """
1831
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1832

    
1833
  def _NewSerialsUnlocked(self, count):
1834
    """Generates a new job identifier.
1835

1836
    Job identifiers are unique during the lifetime of a cluster.
1837

1838
    @type count: integer
1839
    @param count: how many serials to return
1840
    @rtype: str
1841
    @return: a string representing the job identifier.
1842

1843
    """
1844
    assert ht.TPositiveInt(count)
1845

    
1846
    # New number
1847
    serial = self._last_serial + count
1848

    
1849
    # Write to file
1850
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1851
                             "%s\n" % serial, True)
1852

    
1853
    result = [self._FormatJobID(v)
1854
              for v in range(self._last_serial + 1, serial + 1)]
1855

    
1856
    # Keep it only if we were able to write the file
1857
    self._last_serial = serial
1858

    
1859
    assert len(result) == count
1860

    
1861
    return result
1862

    
1863
  @staticmethod
1864
  def _GetJobPath(job_id):
1865
    """Returns the job file for a given job id.
1866

1867
    @type job_id: str
1868
    @param job_id: the job identifier
1869
    @rtype: str
1870
    @return: the path to the job file
1871

1872
    """
1873
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1874

    
1875
  @classmethod
1876
  def _GetArchivedJobPath(cls, job_id):
1877
    """Returns the archived job file for a give job id.
1878

1879
    @type job_id: str
1880
    @param job_id: the job identifier
1881
    @rtype: str
1882
    @return: the path to the archived job file
1883

1884
    """
1885
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1886
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1887

    
1888
  @staticmethod
1889
  def _GetJobIDsUnlocked(sort=True):
1890
    """Return all known job IDs.
1891

1892
    The method only looks at disk because it's a requirement that all
1893
    jobs are present on disk (so in the _memcache we don't have any
1894
    extra IDs).
1895

1896
    @type sort: boolean
1897
    @param sort: perform sorting on the returned job ids
1898
    @rtype: list
1899
    @return: the list of job IDs
1900

1901
    """
1902
    jlist = []
1903
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1904
      m = constants.JOB_FILE_RE.match(filename)
1905
      if m:
1906
        jlist.append(m.group(1))
1907
    if sort:
1908
      jlist = utils.NiceSort(jlist)
1909
    return jlist
1910

    
1911
  def _LoadJobUnlocked(self, job_id):
1912
    """Loads a job from the disk or memory.
1913

1914
    Given a job id, this will return the cached job object if
1915
    existing, or try to load the job from the disk. If loading from
1916
    disk, it will also add the job to the cache.
1917

1918
    @param job_id: the job id
1919
    @rtype: L{_QueuedJob} or None
1920
    @return: either None or the job object
1921

1922
    """
1923
    job = self._memcache.get(job_id, None)
1924
    if job:
1925
      logging.debug("Found job %s in memcache", job_id)
1926
      assert job.writable, "Found read-only job in memcache"
1927
      return job
1928

    
1929
    try:
1930
      job = self._LoadJobFromDisk(job_id, False)
1931
      if job is None:
1932
        return job
1933
    except errors.JobFileCorrupted:
1934
      old_path = self._GetJobPath(job_id)
1935
      new_path = self._GetArchivedJobPath(job_id)
1936
      if old_path == new_path:
1937
        # job already archived (future case)
1938
        logging.exception("Can't parse job %s", job_id)
1939
      else:
1940
        # non-archived case
1941
        logging.exception("Can't parse job %s, will archive.", job_id)
1942
        self._RenameFilesUnlocked([(old_path, new_path)])
1943
      return None
1944

    
1945
    assert job.writable, "Job just loaded is not writable"
1946

    
1947
    self._memcache[job_id] = job
1948
    logging.debug("Added job %s to the cache", job_id)
1949
    return job
1950

    
1951
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1952
    """Load the given job file from disk.
1953

1954
    Given a job file, read, load and restore it in a _QueuedJob format.
1955

1956
    @type job_id: string
1957
    @param job_id: job identifier
1958
    @type try_archived: bool
1959
    @param try_archived: Whether to try loading an archived job
1960
    @rtype: L{_QueuedJob} or None
1961
    @return: either None or the job object
1962

1963
    """
1964
    path_functions = [(self._GetJobPath, True)]
1965

    
1966
    if try_archived:
1967
      path_functions.append((self._GetArchivedJobPath, False))
1968

    
1969
    raw_data = None
1970
    writable_default = None
1971

    
1972
    for (fn, writable_default) in path_functions:
1973
      filepath = fn(job_id)
1974
      logging.debug("Loading job from %s", filepath)
1975
      try:
1976
        raw_data = utils.ReadFile(filepath)
1977
      except EnvironmentError, err:
1978
        if err.errno != errno.ENOENT:
1979
          raise
1980
      else:
1981
        break
1982

    
1983
    if not raw_data:
1984
      return None
1985

    
1986
    if writable is None:
1987
      writable = writable_default
1988

    
1989
    try:
1990
      data = serializer.LoadJson(raw_data)
1991
      job = _QueuedJob.Restore(self, data, writable)
1992
    except Exception, err: # pylint: disable=W0703
1993
      raise errors.JobFileCorrupted(err)
1994

    
1995
    return job
1996

    
1997
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1998
    """Load the given job file from disk.
1999

2000
    Given a job file, read, load and restore it in a _QueuedJob format.
2001
    In case of error reading the job, it gets returned as None, and the
2002
    exception is logged.
2003

2004
    @type job_id: string
2005
    @param job_id: job identifier
2006
    @type try_archived: bool
2007
    @param try_archived: Whether to try loading an archived job
2008
    @rtype: L{_QueuedJob} or None
2009
    @return: either None or the job object
2010

2011
    """
2012
    try:
2013
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2014
    except (errors.JobFileCorrupted, EnvironmentError):
2015
      logging.exception("Can't load/parse job %s", job_id)
2016
      return None
2017

    
2018
  def _UpdateQueueSizeUnlocked(self):
2019
    """Update the queue size.
2020

2021
    """
2022
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2023

    
2024
  @locking.ssynchronized(_LOCK)
2025
  @_RequireOpenQueue
2026
  def SetDrainFlag(self, drain_flag):
2027
    """Sets the drain flag for the queue.
2028

2029
    @type drain_flag: boolean
2030
    @param drain_flag: Whether to set or unset the drain flag
2031

2032
    """
2033
    jstore.SetDrainFlag(drain_flag)
2034

    
2035
    self._drained = drain_flag
2036

    
2037
    return True
2038

    
2039
  @_RequireOpenQueue
2040
  def _SubmitJobUnlocked(self, job_id, ops):
2041
    """Create and store a new job.
2042

2043
    This enters the job into our job queue and also puts it on the new
2044
    queue, in order for it to be picked up by the queue processors.
2045

2046
    @type job_id: job ID
2047
    @param job_id: the job ID for the new job
2048
    @type ops: list
2049
    @param ops: The list of OpCodes that will become the new job.
2050
    @rtype: L{_QueuedJob}
2051
    @return: the job object to be queued
2052
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2053
    @raise errors.GenericError: If an opcode is not valid
2054

2055
    """
2056
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2057
      raise errors.JobQueueFull()
2058

    
2059
    job = _QueuedJob(self, job_id, ops, True)
2060

    
2061
    # Check priority
2062
    for idx, op in enumerate(job.ops):
2063
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2064
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2065
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2066
                                  " are %s" % (idx, op.priority, allowed))
2067

    
2068
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2069
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2070
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2071
                                  " match %s: %s" %
2072
                                  (idx, opcodes.TNoRelativeJobDependencies,
2073
                                   dependencies))
2074

    
2075
    # Write to disk
2076
    self.UpdateJobUnlocked(job)
2077

    
2078
    self._queue_size += 1
2079

    
2080
    logging.debug("Adding new job %s to the cache", job_id)
2081
    self._memcache[job_id] = job
2082

    
2083
    return job
2084

    
2085
  @locking.ssynchronized(_LOCK)
2086
  @_RequireOpenQueue
2087
  @_RequireNonDrainedQueue
2088
  def SubmitJob(self, ops):
2089
    """Create and store a new job.
2090

2091
    @see: L{_SubmitJobUnlocked}
2092

2093
    """
2094
    (job_id, ) = self._NewSerialsUnlocked(1)
2095
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2096
    return job_id
2097

    
2098
  @locking.ssynchronized(_LOCK)
2099
  @_RequireOpenQueue
2100
  @_RequireNonDrainedQueue
2101
  def SubmitManyJobs(self, jobs):
2102
    """Create and store multiple jobs.
2103

2104
    @see: L{_SubmitJobUnlocked}
2105

2106
    """
2107
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2108

    
2109
    (results, added_jobs) = \
2110
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2111

    
2112
    self._EnqueueJobsUnlocked(added_jobs)
2113

    
2114
    return results
2115

    
2116
  @staticmethod
2117
  def _FormatSubmitError(msg, ops):
2118
    """Formats errors which occurred while submitting a job.
2119

2120
    """
2121
    return ("%s; opcodes %s" %
2122
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2123

    
2124
  @staticmethod
2125
  def _ResolveJobDependencies(resolve_fn, deps):
2126
    """Resolves relative job IDs in dependencies.
2127

2128
    @type resolve_fn: callable
2129
    @param resolve_fn: Function to resolve a relative job ID
2130
    @type deps: list
2131
    @param deps: Dependencies
2132
    @rtype: list
2133
    @return: Resolved dependencies
2134

2135
    """
2136
    result = []
2137

    
2138
    for (dep_job_id, dep_status) in deps:
2139
      if ht.TRelativeJobId(dep_job_id):
2140
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2141
        try:
2142
          job_id = resolve_fn(dep_job_id)
2143
        except IndexError:
2144
          # Abort
2145
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2146
      else:
2147
        job_id = dep_job_id
2148

    
2149
      result.append((job_id, dep_status))
2150

    
2151
    return (True, result)
2152

    
2153
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2154
    """Create and store multiple jobs.
2155

2156
    @see: L{_SubmitJobUnlocked}
2157

2158
    """
2159
    results = []
2160
    added_jobs = []
2161

    
2162
    def resolve_fn(job_idx, reljobid):
2163
      assert reljobid < 0
2164
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2165

    
2166
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2167
      for op in ops:
2168
        if getattr(op, opcodes.DEPEND_ATTR, None):
2169
          (status, data) = \
2170
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2171
                                         op.depends)
2172
          if not status:
2173
            # Abort resolving dependencies
2174
            assert ht.TNonEmptyString(data), "No error message"
2175
            break
2176
          # Use resolved dependencies
2177
          op.depends = data
2178
      else:
2179
        try:
2180
          job = self._SubmitJobUnlocked(job_id, ops)
2181
        except errors.GenericError, err:
2182
          status = False
2183
          data = self._FormatSubmitError(str(err), ops)
2184
        else:
2185
          status = True
2186
          data = job_id
2187
          added_jobs.append(job)
2188

    
2189
      results.append((status, data))
2190

    
2191
    return (results, added_jobs)
2192

    
2193
  @locking.ssynchronized(_LOCK)
2194
  def _EnqueueJobs(self, jobs):
2195
    """Helper function to add jobs to worker pool's queue.
2196

2197
    @type jobs: list
2198
    @param jobs: List of all jobs
2199

2200
    """
2201
    return self._EnqueueJobsUnlocked(jobs)
2202

    
2203
  def _EnqueueJobsUnlocked(self, jobs):
2204
    """Helper function to add jobs to worker pool's queue.
2205

2206
    @type jobs: list
2207
    @param jobs: List of all jobs
2208

2209
    """
2210
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2211
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2212
                             priority=[job.CalcPriority() for job in jobs])
2213

    
2214
  def _GetJobStatusForDependencies(self, job_id):
2215
    """Gets the status of a job for dependencies.
2216

2217
    @type job_id: string
2218
    @param job_id: Job ID
2219
    @raise errors.JobLost: If job can't be found
2220

2221
    """
2222
    if not isinstance(job_id, basestring):
2223
      job_id = self._FormatJobID(job_id)
2224

    
2225
    # Not using in-memory cache as doing so would require an exclusive lock
2226

    
2227
    # Try to load from disk
2228
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2229

    
2230
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2231

    
2232
    if job:
2233
      return job.CalcStatus()
2234

    
2235
    raise errors.JobLost("Job %s not found" % job_id)
2236

    
2237
  @_RequireOpenQueue
2238
  def UpdateJobUnlocked(self, job, replicate=True):
2239
    """Update a job's on disk storage.
2240

2241
    After a job has been modified, this function needs to be called in
2242
    order to write the changes to disk and replicate them to the other
2243
    nodes.
2244

2245
    @type job: L{_QueuedJob}
2246
    @param job: the changed job
2247
    @type replicate: boolean
2248
    @param replicate: whether to replicate the change to remote nodes
2249

2250
    """
2251
    if __debug__:
2252
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2253
      assert (finalized ^ (job.end_timestamp is None))
2254
      assert job.writable, "Can't update read-only job"
2255

    
2256
    filename = self._GetJobPath(job.id)
2257
    data = serializer.DumpJson(job.Serialize())
2258
    logging.debug("Writing job %s to %s", job.id, filename)
2259
    self._UpdateJobQueueFile(filename, data, replicate)
2260

    
2261
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2262
                        timeout):
2263
    """Waits for changes in a job.
2264

2265
    @type job_id: string
2266
    @param job_id: Job identifier
2267
    @type fields: list of strings
2268
    @param fields: Which fields to check for changes
2269
    @type prev_job_info: list or None
2270
    @param prev_job_info: Last job information returned
2271
    @type prev_log_serial: int
2272
    @param prev_log_serial: Last job message serial number
2273
    @type timeout: float
2274
    @param timeout: maximum time to wait in seconds
2275
    @rtype: tuple (job info, log entries)
2276
    @return: a tuple of the job information as required via
2277
        the fields parameter, and the log entries as a list
2278

2279
        if the job has not changed and the timeout has expired,
2280
        we instead return a special value,
2281
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2282
        as such by the clients
2283

2284
    """
2285
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
2286
                             writable=False)
2287

    
2288
    helper = _WaitForJobChangesHelper()
2289

    
2290
    return helper(self._GetJobPath(job_id), load_fn,
2291
                  fields, prev_job_info, prev_log_serial, timeout)
2292

    
2293
  @locking.ssynchronized(_LOCK)
2294
  @_RequireOpenQueue
2295
  def CancelJob(self, job_id):
2296
    """Cancels a job.
2297

2298
    This will only succeed if the job has not started yet.
2299

2300
    @type job_id: string
2301
    @param job_id: job ID of job to be cancelled.
2302

2303
    """
2304
    logging.info("Cancelling job %s", job_id)
2305

    
2306
    job = self._LoadJobUnlocked(job_id)
2307
    if not job:
2308
      logging.debug("Job %s not found", job_id)
2309
      return (False, "Job %s not found" % job_id)
2310

    
2311
    assert job.writable, "Can't cancel read-only job"
2312

    
2313
    (success, msg) = job.Cancel()
2314

    
2315
    if success:
2316
      # If the job was finalized (e.g. cancelled), this is the final write
2317
      # allowed. The job can be archived anytime.
2318
      self.UpdateJobUnlocked(job)
2319

    
2320
    return (success, msg)
2321

    
2322
  @_RequireOpenQueue
2323
  def _ArchiveJobsUnlocked(self, jobs):
2324
    """Archives jobs.
2325

2326
    @type jobs: list of L{_QueuedJob}
2327
    @param jobs: Job objects
2328
    @rtype: int
2329
    @return: Number of archived jobs
2330

2331
    """
2332
    archive_jobs = []
2333
    rename_files = []
2334
    for job in jobs:
2335
      assert job.writable, "Can't archive read-only job"
2336

    
2337
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2338
        logging.debug("Job %s is not yet done", job.id)
2339
        continue
2340

    
2341
      archive_jobs.append(job)
2342

    
2343
      old = self._GetJobPath(job.id)
2344
      new = self._GetArchivedJobPath(job.id)
2345
      rename_files.append((old, new))
2346

    
2347
    # TODO: What if 1..n files fail to rename?
2348
    self._RenameFilesUnlocked(rename_files)
2349

    
2350
    logging.debug("Successfully archived job(s) %s",
2351
                  utils.CommaJoin(job.id for job in archive_jobs))
2352

    
2353
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2354
    # the files, we update the cached queue size from the filesystem. When we
2355
    # get around to fix the TODO: above, we can use the number of actually
2356
    # archived jobs to fix this.
2357
    self._UpdateQueueSizeUnlocked()
2358
    return len(archive_jobs)
2359

    
2360
  @locking.ssynchronized(_LOCK)
2361
  @_RequireOpenQueue
2362
  def ArchiveJob(self, job_id):
2363
    """Archives a job.
2364

2365
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2366

2367
    @type job_id: string
2368
    @param job_id: Job ID of job to be archived.
2369
    @rtype: bool
2370
    @return: Whether job was archived
2371

2372
    """
2373
    logging.info("Archiving job %s", job_id)
2374

    
2375
    job = self._LoadJobUnlocked(job_id)
2376
    if not job:
2377
      logging.debug("Job %s not found", job_id)
2378
      return False
2379

    
2380
    return self._ArchiveJobsUnlocked([job]) == 1
2381

    
2382
  @locking.ssynchronized(_LOCK)
2383
  @_RequireOpenQueue
2384
  def AutoArchiveJobs(self, age, timeout):
2385
    """Archives all jobs based on age.
2386

2387
    The method will archive all jobs which are older than the age
2388
    parameter. For jobs that don't have an end timestamp, the start
2389
    timestamp will be considered. The special '-1' age will cause
2390
    archival of all jobs (that are not running or queued).
2391

2392
    @type age: int
2393
    @param age: the minimum age in seconds
2394

2395
    """
2396
    logging.info("Archiving jobs with age more than %s seconds", age)
2397

    
2398
    now = time.time()
2399
    end_time = now + timeout
2400
    archived_count = 0
2401
    last_touched = 0
2402

    
2403
    all_job_ids = self._GetJobIDsUnlocked()
2404
    pending = []
2405
    for idx, job_id in enumerate(all_job_ids):
2406
      last_touched = idx + 1
2407

    
2408
      # Not optimal because jobs could be pending
2409
      # TODO: Measure average duration for job archival and take number of
2410
      # pending jobs into account.
2411
      if time.time() > end_time:
2412
        break
2413

    
2414
      # Returns None if the job failed to load
2415
      job = self._LoadJobUnlocked(job_id)
2416
      if job:
2417
        if job.end_timestamp is None:
2418
          if job.start_timestamp is None:
2419
            job_age = job.received_timestamp
2420
          else:
2421
            job_age = job.start_timestamp
2422
        else:
2423
          job_age = job.end_timestamp
2424

    
2425
        if age == -1 or now - job_age[0] > age:
2426
          pending.append(job)
2427

    
2428
          # Archive 10 jobs at a time
2429
          if len(pending) >= 10:
2430
            archived_count += self._ArchiveJobsUnlocked(pending)
2431
            pending = []
2432

    
2433
    if pending:
2434
      archived_count += self._ArchiveJobsUnlocked(pending)
2435

    
2436
    return (archived_count, len(all_job_ids) - last_touched)
2437

    
2438
  def QueryJobs(self, job_ids, fields):
2439
    """Returns a list of jobs in queue.
2440

2441
    @type job_ids: list
2442
    @param job_ids: sequence of job identifiers or None for all
2443
    @type fields: list
2444
    @param fields: names of fields to return
2445
    @rtype: list
2446
    @return: list one element per job, each element being list with
2447
        the requested fields
2448

2449
    """
2450
    jobs = []
2451
    list_all = False
2452
    if not job_ids:
2453
      # Since files are added to/removed from the queue atomically, there's no
2454
      # risk of getting the job ids in an inconsistent state.
2455
      job_ids = self._GetJobIDsUnlocked()
2456
      list_all = True
2457

    
2458
    for job_id in job_ids:
2459
      job = self.SafeLoadJobFromDisk(job_id, True)
2460
      if job is not None:
2461
        jobs.append(job.GetInfo(fields))
2462
      elif not list_all:
2463
        jobs.append(None)
2464

    
2465
    return jobs
2466

    
2467
  @locking.ssynchronized(_LOCK)
2468
  def PrepareShutdown(self):
2469
    """Prepare to stop the job queue.
2470

2471
    Disables execution of jobs in the workerpool and returns whether there are
2472
    any jobs currently running. If the latter is the case, the job queue is not
2473
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2474
    be called without interfering with any job. Queued and unfinished jobs will
2475
    be resumed next time.
2476

2477
    Once this function has been called no new job submissions will be accepted
2478
    (see L{_RequireNonDrainedQueue}).
2479

2480
    @rtype: bool
2481
    @return: Whether there are any running jobs
2482

2483
    """
2484
    if self._accepting_jobs:
2485
      self._accepting_jobs = False
2486

    
2487
      # Tell worker pool to stop processing pending tasks
2488
      self._wpool.SetActive(False)
2489

    
2490
    return self._wpool.HasRunningTasks()
2491

    
2492
  @locking.ssynchronized(_LOCK)
2493
  @_RequireOpenQueue
2494
  def Shutdown(self):
2495
    """Stops the job queue.
2496

2497
    This shutdowns all the worker threads an closes the queue.
2498

2499
    """
2500
    self._wpool.TerminateWorkers()
2501

    
2502
    self._queue_filelock.Close()
2503
    self._queue_filelock = None