Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 1410a389

History | View | Annotate | Download (72.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import time
35
import weakref
36
import threading
37
import itertools
38

    
39
try:
40
  # pylint: disable=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
59
from ganeti import ht
60
from ganeti import query
61
from ganeti import qlang
62

    
63

    
64
JOBQUEUE_THREADS = 25
65

    
66
# member lock names to be passed to @ssynchronized decorator
67
_LOCK = "_lock"
68
_QUEUE = "_queue"
69

    
70

    
71
class CancelJob(Exception):
72
  """Special exception to cancel a job.
73

74
  """
75

    
76

    
77
def TimeStampNow():
78
  """Returns the current timestamp.
79

80
  @rtype: tuple
81
  @return: the current time in the (seconds, microseconds) format
82

83
  """
84
  return utils.SplitTime(time.time())
85

    
86

    
87
class _SimpleJobQuery:
88
  """Wrapper for job queries.
89

90
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
91

92
  """
93
  def __init__(self, fields):
94
    """Initializes this class.
95

96
    """
97
    self._query = query.Query(query.JOB_FIELDS, fields)
98

    
99
  def __call__(self, job):
100
    """Executes a job query using cached field list.
101

102
    """
103
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
104

    
105

    
106
class _QueuedOpCode(object):
107
  """Encapsulates an opcode object.
108

109
  @ivar log: holds the execution log and consists of tuples
110
  of the form C{(log_serial, timestamp, level, message)}
111
  @ivar input: the OpCode we encapsulate
112
  @ivar status: the current status
113
  @ivar result: the result of the LU execution
114
  @ivar start_timestamp: timestamp for the start of the execution
115
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
116
  @ivar stop_timestamp: timestamp for the end of the execution
117

118
  """
119
  __slots__ = ["input", "status", "result", "log", "priority",
120
               "start_timestamp", "exec_timestamp", "end_timestamp",
121
               "__weakref__"]
122

    
123
  def __init__(self, op):
124
    """Initializes instances of this class.
125

126
    @type op: L{opcodes.OpCode}
127
    @param op: the opcode we encapsulate
128

129
    """
130
    self.input = op
131
    self.status = constants.OP_STATUS_QUEUED
132
    self.result = None
133
    self.log = []
134
    self.start_timestamp = None
135
    self.exec_timestamp = None
136
    self.end_timestamp = None
137

    
138
    # Get initial priority (it might change during the lifetime of this opcode)
139
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
140

    
141
  @classmethod
142
  def Restore(cls, state):
143
    """Restore the _QueuedOpCode from the serialized form.
144

145
    @type state: dict
146
    @param state: the serialized state
147
    @rtype: _QueuedOpCode
148
    @return: a new _QueuedOpCode instance
149

150
    """
151
    obj = _QueuedOpCode.__new__(cls)
152
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
153
    obj.status = state["status"]
154
    obj.result = state["result"]
155
    obj.log = state["log"]
156
    obj.start_timestamp = state.get("start_timestamp", None)
157
    obj.exec_timestamp = state.get("exec_timestamp", None)
158
    obj.end_timestamp = state.get("end_timestamp", None)
159
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
160
    return obj
161

    
162
  def Serialize(self):
163
    """Serializes this _QueuedOpCode.
164

165
    @rtype: dict
166
    @return: the dictionary holding the serialized state
167

168
    """
169
    return {
170
      "input": self.input.__getstate__(),
171
      "status": self.status,
172
      "result": self.result,
173
      "log": self.log,
174
      "start_timestamp": self.start_timestamp,
175
      "exec_timestamp": self.exec_timestamp,
176
      "end_timestamp": self.end_timestamp,
177
      "priority": self.priority,
178
      }
179

    
180

    
181
class _QueuedJob(object):
182
  """In-memory job representation.
183

184
  This is what we use to track the user-submitted jobs. Locking must
185
  be taken care of by users of this class.
186

187
  @type queue: L{JobQueue}
188
  @ivar queue: the parent queue
189
  @ivar id: the job ID
190
  @type ops: list
191
  @ivar ops: the list of _QueuedOpCode that constitute the job
192
  @type log_serial: int
193
  @ivar log_serial: holds the index for the next log entry
194
  @ivar received_timestamp: the timestamp for when the job was received
195
  @ivar start_timestmap: the timestamp for start of execution
196
  @ivar end_timestamp: the timestamp for end of execution
197
  @ivar writable: Whether the job is allowed to be modified
198

199
  """
200
  # pylint: disable=W0212
201
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
202
               "received_timestamp", "start_timestamp", "end_timestamp",
203
               "__weakref__", "processor_lock", "writable"]
204

    
205
  def __init__(self, queue, job_id, ops, writable):
206
    """Constructor for the _QueuedJob.
207

208
    @type queue: L{JobQueue}
209
    @param queue: our parent queue
210
    @type job_id: job_id
211
    @param job_id: our job id
212
    @type ops: list
213
    @param ops: the list of opcodes we hold, which will be encapsulated
214
        in _QueuedOpCodes
215
    @type writable: bool
216
    @param writable: Whether job can be modified
217

218
    """
219
    if not ops:
220
      raise errors.GenericError("A job needs at least one opcode")
221

    
222
    self.queue = queue
223
    self.id = job_id
224
    self.ops = [_QueuedOpCode(op) for op in ops]
225
    self.log_serial = 0
226
    self.received_timestamp = TimeStampNow()
227
    self.start_timestamp = None
228
    self.end_timestamp = None
229

    
230
    self._InitInMemory(self, writable)
231

    
232
  @staticmethod
233
  def _InitInMemory(obj, writable):
234
    """Initializes in-memory variables.
235

236
    """
237
    obj.writable = writable
238
    obj.ops_iter = None
239
    obj.cur_opctx = None
240

    
241
    # Read-only jobs are not processed and therefore don't need a lock
242
    if writable:
243
      obj.processor_lock = threading.Lock()
244
    else:
245
      obj.processor_lock = None
246

    
247
  def __repr__(self):
248
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
249
              "id=%s" % self.id,
250
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
251

    
252
    return "<%s at %#x>" % (" ".join(status), id(self))
253

    
254
  @classmethod
255
  def Restore(cls, queue, state, writable):
256
    """Restore a _QueuedJob from serialized state:
257

258
    @type queue: L{JobQueue}
259
    @param queue: to which queue the restored job belongs
260
    @type state: dict
261
    @param state: the serialized state
262
    @type writable: bool
263
    @param writable: Whether job can be modified
264
    @rtype: _JobQueue
265
    @return: the restored _JobQueue instance
266

267
    """
268
    obj = _QueuedJob.__new__(cls)
269
    obj.queue = queue
270
    obj.id = state["id"]
271
    obj.received_timestamp = state.get("received_timestamp", None)
272
    obj.start_timestamp = state.get("start_timestamp", None)
273
    obj.end_timestamp = state.get("end_timestamp", None)
274

    
275
    obj.ops = []
276
    obj.log_serial = 0
277
    for op_state in state["ops"]:
278
      op = _QueuedOpCode.Restore(op_state)
279
      for log_entry in op.log:
280
        obj.log_serial = max(obj.log_serial, log_entry[0])
281
      obj.ops.append(op)
282

    
283
    cls._InitInMemory(obj, writable)
284

    
285
    return obj
286

    
287
  def Serialize(self):
288
    """Serialize the _JobQueue instance.
289

290
    @rtype: dict
291
    @return: the serialized state
292

293
    """
294
    return {
295
      "id": self.id,
296
      "ops": [op.Serialize() for op in self.ops],
297
      "start_timestamp": self.start_timestamp,
298
      "end_timestamp": self.end_timestamp,
299
      "received_timestamp": self.received_timestamp,
300
      }
301

    
302
  def CalcStatus(self):
303
    """Compute the status of this job.
304

305
    This function iterates over all the _QueuedOpCodes in the job and
306
    based on their status, computes the job status.
307

308
    The algorithm is:
309
      - if we find a cancelled, or finished with error, the job
310
        status will be the same
311
      - otherwise, the last opcode with the status one of:
312
          - waitlock
313
          - canceling
314
          - running
315

316
        will determine the job status
317

318
      - otherwise, it means either all opcodes are queued, or success,
319
        and the job status will be the same
320

321
    @return: the job status
322

323
    """
324
    status = constants.JOB_STATUS_QUEUED
325

    
326
    all_success = True
327
    for op in self.ops:
328
      if op.status == constants.OP_STATUS_SUCCESS:
329
        continue
330

    
331
      all_success = False
332

    
333
      if op.status == constants.OP_STATUS_QUEUED:
334
        pass
335
      elif op.status == constants.OP_STATUS_WAITING:
336
        status = constants.JOB_STATUS_WAITING
337
      elif op.status == constants.OP_STATUS_RUNNING:
338
        status = constants.JOB_STATUS_RUNNING
339
      elif op.status == constants.OP_STATUS_CANCELING:
340
        status = constants.JOB_STATUS_CANCELING
341
        break
342
      elif op.status == constants.OP_STATUS_ERROR:
343
        status = constants.JOB_STATUS_ERROR
344
        # The whole job fails if one opcode failed
345
        break
346
      elif op.status == constants.OP_STATUS_CANCELED:
347
        status = constants.OP_STATUS_CANCELED
348
        break
349

    
350
    if all_success:
351
      status = constants.JOB_STATUS_SUCCESS
352

    
353
    return status
354

    
355
  def CalcPriority(self):
356
    """Gets the current priority for this job.
357

358
    Only unfinished opcodes are considered. When all are done, the default
359
    priority is used.
360

361
    @rtype: int
362

363
    """
364
    priorities = [op.priority for op in self.ops
365
                  if op.status not in constants.OPS_FINALIZED]
366

    
367
    if not priorities:
368
      # All opcodes are done, assume default priority
369
      return constants.OP_PRIO_DEFAULT
370

    
371
    return min(priorities)
372

    
373
  def GetLogEntries(self, newer_than):
374
    """Selectively returns the log entries.
375

376
    @type newer_than: None or int
377
    @param newer_than: if this is None, return all log entries,
378
        otherwise return only the log entries with serial higher
379
        than this value
380
    @rtype: list
381
    @return: the list of the log entries selected
382

383
    """
384
    if newer_than is None:
385
      serial = -1
386
    else:
387
      serial = newer_than
388

    
389
    entries = []
390
    for op in self.ops:
391
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
392

    
393
    return entries
394

    
395
  def GetInfo(self, fields):
396
    """Returns information about a job.
397

398
    @type fields: list
399
    @param fields: names of fields to return
400
    @rtype: list
401
    @return: list with one element for each field
402
    @raise errors.OpExecError: when an invalid field
403
        has been passed
404

405
    """
406
    return _SimpleJobQuery(fields)(self)
407

    
408
  def MarkUnfinishedOps(self, status, result):
409
    """Mark unfinished opcodes with a given status and result.
410

411
    This is an utility function for marking all running or waiting to
412
    be run opcodes with a given status. Opcodes which are already
413
    finalised are not changed.
414

415
    @param status: a given opcode status
416
    @param result: the opcode result
417

418
    """
419
    not_marked = True
420
    for op in self.ops:
421
      if op.status in constants.OPS_FINALIZED:
422
        assert not_marked, "Finalized opcodes found after non-finalized ones"
423
        continue
424
      op.status = status
425
      op.result = result
426
      not_marked = False
427

    
428
  def Finalize(self):
429
    """Marks the job as finalized.
430

431
    """
432
    self.end_timestamp = TimeStampNow()
433

    
434
  def Cancel(self):
435
    """Marks job as canceled/-ing if possible.
436

437
    @rtype: tuple; (bool, string)
438
    @return: Boolean describing whether job was successfully canceled or marked
439
      as canceling and a text message
440

441
    """
442
    status = self.CalcStatus()
443

    
444
    if status == constants.JOB_STATUS_QUEUED:
445
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
446
                             "Job canceled by request")
447
      self.Finalize()
448
      return (True, "Job %s canceled" % self.id)
449

    
450
    elif status == constants.JOB_STATUS_WAITING:
451
      # The worker will notice the new status and cancel the job
452
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
453
      return (True, "Job %s will be canceled" % self.id)
454

    
455
    else:
456
      logging.debug("Job %s is no longer waiting in the queue", self.id)
457
      return (False, "Job %s is no longer waiting in the queue" % self.id)
458

    
459

    
460
class _OpExecCallbacks(mcpu.OpExecCbBase):
461
  def __init__(self, queue, job, op):
462
    """Initializes this class.
463

464
    @type queue: L{JobQueue}
465
    @param queue: Job queue
466
    @type job: L{_QueuedJob}
467
    @param job: Job object
468
    @type op: L{_QueuedOpCode}
469
    @param op: OpCode
470

471
    """
472
    assert queue, "Queue is missing"
473
    assert job, "Job is missing"
474
    assert op, "Opcode is missing"
475

    
476
    self._queue = queue
477
    self._job = job
478
    self._op = op
479

    
480
  def _CheckCancel(self):
481
    """Raises an exception to cancel the job if asked to.
482

483
    """
484
    # Cancel here if we were asked to
485
    if self._op.status == constants.OP_STATUS_CANCELING:
486
      logging.debug("Canceling opcode")
487
      raise CancelJob()
488

    
489
  @locking.ssynchronized(_QUEUE, shared=1)
490
  def NotifyStart(self):
491
    """Mark the opcode as running, not lock-waiting.
492

493
    This is called from the mcpu code as a notifier function, when the LU is
494
    finally about to start the Exec() method. Of course, to have end-user
495
    visible results, the opcode must be initially (before calling into
496
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
497

498
    """
499
    assert self._op in self._job.ops
500
    assert self._op.status in (constants.OP_STATUS_WAITING,
501
                               constants.OP_STATUS_CANCELING)
502

    
503
    # Cancel here if we were asked to
504
    self._CheckCancel()
505

    
506
    logging.debug("Opcode is now running")
507

    
508
    self._op.status = constants.OP_STATUS_RUNNING
509
    self._op.exec_timestamp = TimeStampNow()
510

    
511
    # And finally replicate the job status
512
    self._queue.UpdateJobUnlocked(self._job)
513

    
514
  @locking.ssynchronized(_QUEUE, shared=1)
515
  def _AppendFeedback(self, timestamp, log_type, log_msg):
516
    """Internal feedback append function, with locks
517

518
    """
519
    self._job.log_serial += 1
520
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
521
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
522

    
523
  def Feedback(self, *args):
524
    """Append a log entry.
525

526
    """
527
    assert len(args) < 3
528

    
529
    if len(args) == 1:
530
      log_type = constants.ELOG_MESSAGE
531
      log_msg = args[0]
532
    else:
533
      (log_type, log_msg) = args
534

    
535
    # The time is split to make serialization easier and not lose
536
    # precision.
537
    timestamp = utils.SplitTime(time.time())
538
    self._AppendFeedback(timestamp, log_type, log_msg)
539

    
540
  def CheckCancel(self):
541
    """Check whether job has been cancelled.
542

543
    """
544
    assert self._op.status in (constants.OP_STATUS_WAITING,
545
                               constants.OP_STATUS_CANCELING)
546

    
547
    # Cancel here if we were asked to
548
    self._CheckCancel()
549

    
550
  def SubmitManyJobs(self, jobs):
551
    """Submits jobs for processing.
552

553
    See L{JobQueue.SubmitManyJobs}.
554

555
    """
556
    # Locking is done in job queue
557
    return self._queue.SubmitManyJobs(jobs)
558

    
559

    
560
class _JobChangesChecker(object):
561
  def __init__(self, fields, prev_job_info, prev_log_serial):
562
    """Initializes this class.
563

564
    @type fields: list of strings
565
    @param fields: Fields requested by LUXI client
566
    @type prev_job_info: string
567
    @param prev_job_info: previous job info, as passed by the LUXI client
568
    @type prev_log_serial: string
569
    @param prev_log_serial: previous job serial, as passed by the LUXI client
570

571
    """
572
    self._squery = _SimpleJobQuery(fields)
573
    self._prev_job_info = prev_job_info
574
    self._prev_log_serial = prev_log_serial
575

    
576
  def __call__(self, job):
577
    """Checks whether job has changed.
578

579
    @type job: L{_QueuedJob}
580
    @param job: Job object
581

582
    """
583
    assert not job.writable, "Expected read-only job"
584

    
585
    status = job.CalcStatus()
586
    job_info = self._squery(job)
587
    log_entries = job.GetLogEntries(self._prev_log_serial)
588

    
589
    # Serializing and deserializing data can cause type changes (e.g. from
590
    # tuple to list) or precision loss. We're doing it here so that we get
591
    # the same modifications as the data received from the client. Without
592
    # this, the comparison afterwards might fail without the data being
593
    # significantly different.
594
    # TODO: we just deserialized from disk, investigate how to make sure that
595
    # the job info and log entries are compatible to avoid this further step.
596
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
597
    # efficient, though floats will be tricky
598
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
599
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
600

    
601
    # Don't even try to wait if the job is no longer running, there will be
602
    # no changes.
603
    if (status not in (constants.JOB_STATUS_QUEUED,
604
                       constants.JOB_STATUS_RUNNING,
605
                       constants.JOB_STATUS_WAITING) or
606
        job_info != self._prev_job_info or
607
        (log_entries and self._prev_log_serial != log_entries[0][0])):
608
      logging.debug("Job %s changed", job.id)
609
      return (job_info, log_entries)
610

    
611
    return None
612

    
613

    
614
class _JobFileChangesWaiter(object):
615
  def __init__(self, filename):
616
    """Initializes this class.
617

618
    @type filename: string
619
    @param filename: Path to job file
620
    @raises errors.InotifyError: if the notifier cannot be setup
621

622
    """
623
    self._wm = pyinotify.WatchManager()
624
    self._inotify_handler = \
625
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
626
    self._notifier = \
627
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
628
    try:
629
      self._inotify_handler.enable()
630
    except Exception:
631
      # pyinotify doesn't close file descriptors automatically
632
      self._notifier.stop()
633
      raise
634

    
635
  def _OnInotify(self, notifier_enabled):
636
    """Callback for inotify.
637

638
    """
639
    if not notifier_enabled:
640
      self._inotify_handler.enable()
641

    
642
  def Wait(self, timeout):
643
    """Waits for the job file to change.
644

645
    @type timeout: float
646
    @param timeout: Timeout in seconds
647
    @return: Whether there have been events
648

649
    """
650
    assert timeout >= 0
651
    have_events = self._notifier.check_events(timeout * 1000)
652
    if have_events:
653
      self._notifier.read_events()
654
    self._notifier.process_events()
655
    return have_events
656

    
657
  def Close(self):
658
    """Closes underlying notifier and its file descriptor.
659

660
    """
661
    self._notifier.stop()
662

    
663

    
664
class _JobChangesWaiter(object):
665
  def __init__(self, filename):
666
    """Initializes this class.
667

668
    @type filename: string
669
    @param filename: Path to job file
670

671
    """
672
    self._filewaiter = None
673
    self._filename = filename
674

    
675
  def Wait(self, timeout):
676
    """Waits for a job to change.
677

678
    @type timeout: float
679
    @param timeout: Timeout in seconds
680
    @return: Whether there have been events
681

682
    """
683
    if self._filewaiter:
684
      return self._filewaiter.Wait(timeout)
685

    
686
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
687
    # If this point is reached, return immediately and let caller check the job
688
    # file again in case there were changes since the last check. This avoids a
689
    # race condition.
690
    self._filewaiter = _JobFileChangesWaiter(self._filename)
691

    
692
    return True
693

    
694
  def Close(self):
695
    """Closes underlying waiter.
696

697
    """
698
    if self._filewaiter:
699
      self._filewaiter.Close()
700

    
701

    
702
class _WaitForJobChangesHelper(object):
703
  """Helper class using inotify to wait for changes in a job file.
704

705
  This class takes a previous job status and serial, and alerts the client when
706
  the current job status has changed.
707

708
  """
709
  @staticmethod
710
  def _CheckForChanges(counter, job_load_fn, check_fn):
711
    if counter.next() > 0:
712
      # If this isn't the first check the job is given some more time to change
713
      # again. This gives better performance for jobs generating many
714
      # changes/messages.
715
      time.sleep(0.1)
716

    
717
    job = job_load_fn()
718
    if not job:
719
      raise errors.JobLost()
720

    
721
    result = check_fn(job)
722
    if result is None:
723
      raise utils.RetryAgain()
724

    
725
    return result
726

    
727
  def __call__(self, filename, job_load_fn,
728
               fields, prev_job_info, prev_log_serial, timeout):
729
    """Waits for changes on a job.
730

731
    @type filename: string
732
    @param filename: File on which to wait for changes
733
    @type job_load_fn: callable
734
    @param job_load_fn: Function to load job
735
    @type fields: list of strings
736
    @param fields: Which fields to check for changes
737
    @type prev_job_info: list or None
738
    @param prev_job_info: Last job information returned
739
    @type prev_log_serial: int
740
    @param prev_log_serial: Last job message serial number
741
    @type timeout: float
742
    @param timeout: maximum time to wait in seconds
743

744
    """
745
    counter = itertools.count()
746
    try:
747
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
748
      waiter = _JobChangesWaiter(filename)
749
      try:
750
        return utils.Retry(compat.partial(self._CheckForChanges,
751
                                          counter, job_load_fn, check_fn),
752
                           utils.RETRY_REMAINING_TIME, timeout,
753
                           wait_fn=waiter.Wait)
754
      finally:
755
        waiter.Close()
756
    except (errors.InotifyError, errors.JobLost):
757
      return None
758
    except utils.RetryTimeout:
759
      return constants.JOB_NOTCHANGED
760

    
761

    
762
def _EncodeOpError(err):
763
  """Encodes an error which occurred while processing an opcode.
764

765
  """
766
  if isinstance(err, errors.GenericError):
767
    to_encode = err
768
  else:
769
    to_encode = errors.OpExecError(str(err))
770

    
771
  return errors.EncodeException(to_encode)
772

    
773

    
774
class _TimeoutStrategyWrapper:
775
  def __init__(self, fn):
776
    """Initializes this class.
777

778
    """
779
    self._fn = fn
780
    self._next = None
781

    
782
  def _Advance(self):
783
    """Gets the next timeout if necessary.
784

785
    """
786
    if self._next is None:
787
      self._next = self._fn()
788

    
789
  def Peek(self):
790
    """Returns the next timeout.
791

792
    """
793
    self._Advance()
794
    return self._next
795

    
796
  def Next(self):
797
    """Returns the current timeout and advances the internal state.
798

799
    """
800
    self._Advance()
801
    result = self._next
802
    self._next = None
803
    return result
804

    
805

    
806
class _OpExecContext:
807
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
808
    """Initializes this class.
809

810
    """
811
    self.op = op
812
    self.index = index
813
    self.log_prefix = log_prefix
814
    self.summary = op.input.Summary()
815

    
816
    # Create local copy to modify
817
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
818
      self.jobdeps = op.input.depends[:]
819
    else:
820
      self.jobdeps = None
821

    
822
    self._timeout_strategy_factory = timeout_strategy_factory
823
    self._ResetTimeoutStrategy()
824

    
825
  def _ResetTimeoutStrategy(self):
826
    """Creates a new timeout strategy.
827

828
    """
829
    self._timeout_strategy = \
830
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
831

    
832
  def CheckPriorityIncrease(self):
833
    """Checks whether priority can and should be increased.
834

835
    Called when locks couldn't be acquired.
836

837
    """
838
    op = self.op
839

    
840
    # Exhausted all retries and next round should not use blocking acquire
841
    # for locks?
842
    if (self._timeout_strategy.Peek() is None and
843
        op.priority > constants.OP_PRIO_HIGHEST):
844
      logging.debug("Increasing priority")
845
      op.priority -= 1
846
      self._ResetTimeoutStrategy()
847
      return True
848

    
849
    return False
850

    
851
  def GetNextLockTimeout(self):
852
    """Returns the next lock acquire timeout.
853

854
    """
855
    return self._timeout_strategy.Next()
856

    
857

    
858
class _JobProcessor(object):
859
  (DEFER,
860
   WAITDEP,
861
   FINISHED) = range(1, 4)
862

    
863
  def __init__(self, queue, opexec_fn, job,
864
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
865
    """Initializes this class.
866

867
    """
868
    self.queue = queue
869
    self.opexec_fn = opexec_fn
870
    self.job = job
871
    self._timeout_strategy_factory = _timeout_strategy_factory
872

    
873
  @staticmethod
874
  def _FindNextOpcode(job, timeout_strategy_factory):
875
    """Locates the next opcode to run.
876

877
    @type job: L{_QueuedJob}
878
    @param job: Job object
879
    @param timeout_strategy_factory: Callable to create new timeout strategy
880

881
    """
882
    # Create some sort of a cache to speed up locating next opcode for future
883
    # lookups
884
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
885
    # pending and one for processed ops.
886
    if job.ops_iter is None:
887
      job.ops_iter = enumerate(job.ops)
888

    
889
    # Find next opcode to run
890
    while True:
891
      try:
892
        (idx, op) = job.ops_iter.next()
893
      except StopIteration:
894
        raise errors.ProgrammerError("Called for a finished job")
895

    
896
      if op.status == constants.OP_STATUS_RUNNING:
897
        # Found an opcode already marked as running
898
        raise errors.ProgrammerError("Called for job marked as running")
899

    
900
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
901
                             timeout_strategy_factory)
902

    
903
      if op.status not in constants.OPS_FINALIZED:
904
        return opctx
905

    
906
      # This is a job that was partially completed before master daemon
907
      # shutdown, so it can be expected that some opcodes are already
908
      # completed successfully (if any did error out, then the whole job
909
      # should have been aborted and not resubmitted for processing).
910
      logging.info("%s: opcode %s already processed, skipping",
911
                   opctx.log_prefix, opctx.summary)
912

    
913
  @staticmethod
914
  def _MarkWaitlock(job, op):
915
    """Marks an opcode as waiting for locks.
916

917
    The job's start timestamp is also set if necessary.
918

919
    @type job: L{_QueuedJob}
920
    @param job: Job object
921
    @type op: L{_QueuedOpCode}
922
    @param op: Opcode object
923

924
    """
925
    assert op in job.ops
926
    assert op.status in (constants.OP_STATUS_QUEUED,
927
                         constants.OP_STATUS_WAITING)
928

    
929
    update = False
930

    
931
    op.result = None
932

    
933
    if op.status == constants.OP_STATUS_QUEUED:
934
      op.status = constants.OP_STATUS_WAITING
935
      update = True
936

    
937
    if op.start_timestamp is None:
938
      op.start_timestamp = TimeStampNow()
939
      update = True
940

    
941
    if job.start_timestamp is None:
942
      job.start_timestamp = op.start_timestamp
943
      update = True
944

    
945
    assert op.status == constants.OP_STATUS_WAITING
946

    
947
    return update
948

    
949
  @staticmethod
950
  def _CheckDependencies(queue, job, opctx):
951
    """Checks if an opcode has dependencies and if so, processes them.
952

953
    @type queue: L{JobQueue}
954
    @param queue: Queue object
955
    @type job: L{_QueuedJob}
956
    @param job: Job object
957
    @type opctx: L{_OpExecContext}
958
    @param opctx: Opcode execution context
959
    @rtype: bool
960
    @return: Whether opcode will be re-scheduled by dependency tracker
961

962
    """
963
    op = opctx.op
964

    
965
    result = False
966

    
967
    while opctx.jobdeps:
968
      (dep_job_id, dep_status) = opctx.jobdeps[0]
969

    
970
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
971
                                                          dep_status)
972
      assert ht.TNonEmptyString(depmsg), "No dependency message"
973

    
974
      logging.info("%s: %s", opctx.log_prefix, depmsg)
975

    
976
      if depresult == _JobDependencyManager.CONTINUE:
977
        # Remove dependency and continue
978
        opctx.jobdeps.pop(0)
979

    
980
      elif depresult == _JobDependencyManager.WAIT:
981
        # Need to wait for notification, dependency tracker will re-add job
982
        # to workerpool
983
        result = True
984
        break
985

    
986
      elif depresult == _JobDependencyManager.CANCEL:
987
        # Job was cancelled, cancel this job as well
988
        job.Cancel()
989
        assert op.status == constants.OP_STATUS_CANCELING
990
        break
991

    
992
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
993
                         _JobDependencyManager.ERROR):
994
        # Job failed or there was an error, this job must fail
995
        op.status = constants.OP_STATUS_ERROR
996
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
997
        break
998

    
999
      else:
1000
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1001
                                     depresult)
1002

    
1003
    return result
1004

    
1005
  def _ExecOpCodeUnlocked(self, opctx):
1006
    """Processes one opcode and returns the result.
1007

1008
    """
1009
    op = opctx.op
1010

    
1011
    assert op.status == constants.OP_STATUS_WAITING
1012

    
1013
    timeout = opctx.GetNextLockTimeout()
1014

    
1015
    try:
1016
      # Make sure not to hold queue lock while calling ExecOpCode
1017
      result = self.opexec_fn(op.input,
1018
                              _OpExecCallbacks(self.queue, self.job, op),
1019
                              timeout=timeout, priority=op.priority)
1020
    except mcpu.LockAcquireTimeout:
1021
      assert timeout is not None, "Received timeout for blocking acquire"
1022
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1023

    
1024
      assert op.status in (constants.OP_STATUS_WAITING,
1025
                           constants.OP_STATUS_CANCELING)
1026

    
1027
      # Was job cancelled while we were waiting for the lock?
1028
      if op.status == constants.OP_STATUS_CANCELING:
1029
        return (constants.OP_STATUS_CANCELING, None)
1030

    
1031
      # Stay in waitlock while trying to re-acquire lock
1032
      return (constants.OP_STATUS_WAITING, None)
1033
    except CancelJob:
1034
      logging.exception("%s: Canceling job", opctx.log_prefix)
1035
      assert op.status == constants.OP_STATUS_CANCELING
1036
      return (constants.OP_STATUS_CANCELING, None)
1037
    except Exception, err: # pylint: disable=W0703
1038
      logging.exception("%s: Caught exception in %s",
1039
                        opctx.log_prefix, opctx.summary)
1040
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1041
    else:
1042
      logging.debug("%s: %s successful",
1043
                    opctx.log_prefix, opctx.summary)
1044
      return (constants.OP_STATUS_SUCCESS, result)
1045

    
1046
  def __call__(self, _nextop_fn=None):
1047
    """Continues execution of a job.
1048

1049
    @param _nextop_fn: Callback function for tests
1050
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1051
      be deferred and C{WAITDEP} if the dependency manager
1052
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1053

1054
    """
1055
    queue = self.queue
1056
    job = self.job
1057

    
1058
    logging.debug("Processing job %s", job.id)
1059

    
1060
    queue.acquire(shared=1)
1061
    try:
1062
      opcount = len(job.ops)
1063

    
1064
      assert job.writable, "Expected writable job"
1065

    
1066
      # Don't do anything for finalized jobs
1067
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1068
        return self.FINISHED
1069

    
1070
      # Is a previous opcode still pending?
1071
      if job.cur_opctx:
1072
        opctx = job.cur_opctx
1073
        job.cur_opctx = None
1074
      else:
1075
        if __debug__ and _nextop_fn:
1076
          _nextop_fn()
1077
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1078

    
1079
      op = opctx.op
1080

    
1081
      # Consistency check
1082
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1083
                                     constants.OP_STATUS_CANCELING)
1084
                        for i in job.ops[opctx.index + 1:])
1085

    
1086
      assert op.status in (constants.OP_STATUS_QUEUED,
1087
                           constants.OP_STATUS_WAITING,
1088
                           constants.OP_STATUS_CANCELING)
1089

    
1090
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1091
              op.priority >= constants.OP_PRIO_HIGHEST)
1092

    
1093
      waitjob = None
1094

    
1095
      if op.status != constants.OP_STATUS_CANCELING:
1096
        assert op.status in (constants.OP_STATUS_QUEUED,
1097
                             constants.OP_STATUS_WAITING)
1098

    
1099
        # Prepare to start opcode
1100
        if self._MarkWaitlock(job, op):
1101
          # Write to disk
1102
          queue.UpdateJobUnlocked(job)
1103

    
1104
        assert op.status == constants.OP_STATUS_WAITING
1105
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1106
        assert job.start_timestamp and op.start_timestamp
1107
        assert waitjob is None
1108

    
1109
        # Check if waiting for a job is necessary
1110
        waitjob = self._CheckDependencies(queue, job, opctx)
1111

    
1112
        assert op.status in (constants.OP_STATUS_WAITING,
1113
                             constants.OP_STATUS_CANCELING,
1114
                             constants.OP_STATUS_ERROR)
1115

    
1116
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1117
                                         constants.OP_STATUS_ERROR)):
1118
          logging.info("%s: opcode %s waiting for locks",
1119
                       opctx.log_prefix, opctx.summary)
1120

    
1121
          assert not opctx.jobdeps, "Not all dependencies were removed"
1122

    
1123
          queue.release()
1124
          try:
1125
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1126
          finally:
1127
            queue.acquire(shared=1)
1128

    
1129
          op.status = op_status
1130
          op.result = op_result
1131

    
1132
          assert not waitjob
1133

    
1134
        if op.status == constants.OP_STATUS_WAITING:
1135
          # Couldn't get locks in time
1136
          assert not op.end_timestamp
1137
        else:
1138
          # Finalize opcode
1139
          op.end_timestamp = TimeStampNow()
1140

    
1141
          if op.status == constants.OP_STATUS_CANCELING:
1142
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1143
                                  for i in job.ops[opctx.index:])
1144
          else:
1145
            assert op.status in constants.OPS_FINALIZED
1146

    
1147
      if op.status == constants.OP_STATUS_WAITING or waitjob:
1148
        finalize = False
1149

    
1150
        if not waitjob and opctx.CheckPriorityIncrease():
1151
          # Priority was changed, need to update on-disk file
1152
          queue.UpdateJobUnlocked(job)
1153

    
1154
        # Keep around for another round
1155
        job.cur_opctx = opctx
1156

    
1157
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1158
                op.priority >= constants.OP_PRIO_HIGHEST)
1159

    
1160
        # In no case must the status be finalized here
1161
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1162

    
1163
      else:
1164
        # Ensure all opcodes so far have been successful
1165
        assert (opctx.index == 0 or
1166
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1167
                           for i in job.ops[:opctx.index]))
1168

    
1169
        # Reset context
1170
        job.cur_opctx = None
1171

    
1172
        if op.status == constants.OP_STATUS_SUCCESS:
1173
          finalize = False
1174

    
1175
        elif op.status == constants.OP_STATUS_ERROR:
1176
          # Ensure failed opcode has an exception as its result
1177
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1178

    
1179
          to_encode = errors.OpExecError("Preceding opcode failed")
1180
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1181
                                _EncodeOpError(to_encode))
1182
          finalize = True
1183

    
1184
          # Consistency check
1185
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1186
                            errors.GetEncodedError(i.result)
1187
                            for i in job.ops[opctx.index:])
1188

    
1189
        elif op.status == constants.OP_STATUS_CANCELING:
1190
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1191
                                "Job canceled by request")
1192
          finalize = True
1193

    
1194
        else:
1195
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1196

    
1197
        if opctx.index == (opcount - 1):
1198
          # Finalize on last opcode
1199
          finalize = True
1200

    
1201
        if finalize:
1202
          # All opcodes have been run, finalize job
1203
          job.Finalize()
1204

    
1205
        # Write to disk. If the job status is final, this is the final write
1206
        # allowed. Once the file has been written, it can be archived anytime.
1207
        queue.UpdateJobUnlocked(job)
1208

    
1209
        assert not waitjob
1210

    
1211
        if finalize:
1212
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1213
          return self.FINISHED
1214

    
1215
      assert not waitjob or queue.depmgr.JobWaiting(job)
1216

    
1217
      if waitjob:
1218
        return self.WAITDEP
1219
      else:
1220
        return self.DEFER
1221
    finally:
1222
      assert job.writable, "Job became read-only while being processed"
1223
      queue.release()
1224

    
1225

    
1226
def _EvaluateJobProcessorResult(depmgr, job, result):
1227
  """Looks at a result from L{_JobProcessor} for a job.
1228

1229
  To be used in a L{_JobQueueWorker}.
1230

1231
  """
1232
  if result == _JobProcessor.FINISHED:
1233
    # Notify waiting jobs
1234
    depmgr.NotifyWaiters(job.id)
1235

    
1236
  elif result == _JobProcessor.DEFER:
1237
    # Schedule again
1238
    raise workerpool.DeferTask(priority=job.CalcPriority())
1239

    
1240
  elif result == _JobProcessor.WAITDEP:
1241
    # No-op, dependency manager will re-schedule
1242
    pass
1243

    
1244
  else:
1245
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1246
                                 (result, ))
1247

    
1248

    
1249
class _JobQueueWorker(workerpool.BaseWorker):
1250
  """The actual job workers.
1251

1252
  """
1253
  def RunTask(self, job): # pylint: disable=W0221
1254
    """Job executor.
1255

1256
    @type job: L{_QueuedJob}
1257
    @param job: the job to be processed
1258

1259
    """
1260
    assert job.writable, "Expected writable job"
1261

    
1262
    # Ensure only one worker is active on a single job. If a job registers for
1263
    # a dependency job, and the other job notifies before the first worker is
1264
    # done, the job can end up in the tasklist more than once.
1265
    job.processor_lock.acquire()
1266
    try:
1267
      return self._RunTaskInner(job)
1268
    finally:
1269
      job.processor_lock.release()
1270

    
1271
  def _RunTaskInner(self, job):
1272
    """Executes a job.
1273

1274
    Must be called with per-job lock acquired.
1275

1276
    """
1277
    queue = job.queue
1278
    assert queue == self.pool.queue
1279

    
1280
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1281
    setname_fn(None)
1282

    
1283
    proc = mcpu.Processor(queue.context, job.id)
1284

    
1285
    # Create wrapper for setting thread name
1286
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1287
                                    proc.ExecOpCode)
1288

    
1289
    _EvaluateJobProcessorResult(queue.depmgr, job,
1290
                                _JobProcessor(queue, wrap_execop_fn, job)())
1291

    
1292
  @staticmethod
1293
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1294
    """Updates the worker thread name to include a short summary of the opcode.
1295

1296
    @param setname_fn: Callable setting worker thread name
1297
    @param execop_fn: Callable for executing opcode (usually
1298
                      L{mcpu.Processor.ExecOpCode})
1299

1300
    """
1301
    setname_fn(op)
1302
    try:
1303
      return execop_fn(op, *args, **kwargs)
1304
    finally:
1305
      setname_fn(None)
1306

    
1307
  @staticmethod
1308
  def _GetWorkerName(job, op):
1309
    """Sets the worker thread name.
1310

1311
    @type job: L{_QueuedJob}
1312
    @type op: L{opcodes.OpCode}
1313

1314
    """
1315
    parts = ["Job%s" % job.id]
1316

    
1317
    if op:
1318
      parts.append(op.TinySummary())
1319

    
1320
    return "/".join(parts)
1321

    
1322

    
1323
class _JobQueueWorkerPool(workerpool.WorkerPool):
1324
  """Simple class implementing a job-processing workerpool.
1325

1326
  """
1327
  def __init__(self, queue):
1328
    super(_JobQueueWorkerPool, self).__init__("Jq",
1329
                                              JOBQUEUE_THREADS,
1330
                                              _JobQueueWorker)
1331
    self.queue = queue
1332

    
1333

    
1334
class _JobDependencyManager:
1335
  """Keeps track of job dependencies.
1336

1337
  """
1338
  (WAIT,
1339
   ERROR,
1340
   CANCEL,
1341
   CONTINUE,
1342
   WRONGSTATUS) = range(1, 6)
1343

    
1344
  def __init__(self, getstatus_fn, enqueue_fn):
1345
    """Initializes this class.
1346

1347
    """
1348
    self._getstatus_fn = getstatus_fn
1349
    self._enqueue_fn = enqueue_fn
1350

    
1351
    self._waiters = {}
1352
    self._lock = locking.SharedLock("JobDepMgr")
1353

    
1354
  @locking.ssynchronized(_LOCK, shared=1)
1355
  def GetLockInfo(self, requested): # pylint: disable=W0613
1356
    """Retrieves information about waiting jobs.
1357

1358
    @type requested: set
1359
    @param requested: Requested information, see C{query.LQ_*}
1360

1361
    """
1362
    # No need to sort here, that's being done by the lock manager and query
1363
    # library. There are no priorities for notifying jobs, hence all show up as
1364
    # one item under "pending".
1365
    return [("job/%s" % job_id, None, None,
1366
             [("job", [job.id for job in waiters])])
1367
            for job_id, waiters in self._waiters.items()
1368
            if waiters]
1369

    
1370
  @locking.ssynchronized(_LOCK, shared=1)
1371
  def JobWaiting(self, job):
1372
    """Checks if a job is waiting.
1373

1374
    """
1375
    return compat.any(job in jobs
1376
                      for jobs in self._waiters.values())
1377

    
1378
  @locking.ssynchronized(_LOCK)
1379
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1380
    """Checks if a dependency job has the requested status.
1381

1382
    If the other job is not yet in a finalized status, the calling job will be
1383
    notified (re-added to the workerpool) at a later point.
1384

1385
    @type job: L{_QueuedJob}
1386
    @param job: Job object
1387
    @type dep_job_id: string
1388
    @param dep_job_id: ID of dependency job
1389
    @type dep_status: list
1390
    @param dep_status: Required status
1391

1392
    """
1393
    assert ht.TString(job.id)
1394
    assert ht.TString(dep_job_id)
1395
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1396

    
1397
    if job.id == dep_job_id:
1398
      return (self.ERROR, "Job can't depend on itself")
1399

    
1400
    # Get status of dependency job
1401
    try:
1402
      status = self._getstatus_fn(dep_job_id)
1403
    except errors.JobLost, err:
1404
      return (self.ERROR, "Dependency error: %s" % err)
1405

    
1406
    assert status in constants.JOB_STATUS_ALL
1407

    
1408
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1409

    
1410
    if status not in constants.JOBS_FINALIZED:
1411
      # Register for notification and wait for job to finish
1412
      job_id_waiters.add(job)
1413
      return (self.WAIT,
1414
              "Need to wait for job %s, wanted status '%s'" %
1415
              (dep_job_id, dep_status))
1416

    
1417
    # Remove from waiters list
1418
    if job in job_id_waiters:
1419
      job_id_waiters.remove(job)
1420

    
1421
    if (status == constants.JOB_STATUS_CANCELED and
1422
        constants.JOB_STATUS_CANCELED not in dep_status):
1423
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1424

    
1425
    elif not dep_status or status in dep_status:
1426
      return (self.CONTINUE,
1427
              "Dependency job %s finished with status '%s'" %
1428
              (dep_job_id, status))
1429

    
1430
    else:
1431
      return (self.WRONGSTATUS,
1432
              "Dependency job %s finished with status '%s',"
1433
              " not one of '%s' as required" %
1434
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1435

    
1436
  def _RemoveEmptyWaitersUnlocked(self):
1437
    """Remove all jobs without actual waiters.
1438

1439
    """
1440
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1441
                   if not waiters]:
1442
      del self._waiters[job_id]
1443

    
1444
  def NotifyWaiters(self, job_id):
1445
    """Notifies all jobs waiting for a certain job ID.
1446

1447
    @attention: Do not call until L{CheckAndRegister} returned a status other
1448
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1449
    @type job_id: string
1450
    @param job_id: Job ID
1451

1452
    """
1453
    assert ht.TString(job_id)
1454

    
1455
    self._lock.acquire()
1456
    try:
1457
      self._RemoveEmptyWaitersUnlocked()
1458

    
1459
      jobs = self._waiters.pop(job_id, None)
1460
    finally:
1461
      self._lock.release()
1462

    
1463
    if jobs:
1464
      # Re-add jobs to workerpool
1465
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1466
                    len(jobs), job_id)
1467
      self._enqueue_fn(jobs)
1468

    
1469

    
1470
def _RequireOpenQueue(fn):
1471
  """Decorator for "public" functions.
1472

1473
  This function should be used for all 'public' functions. That is,
1474
  functions usually called from other classes. Note that this should
1475
  be applied only to methods (not plain functions), since it expects
1476
  that the decorated function is called with a first argument that has
1477
  a '_queue_filelock' argument.
1478

1479
  @warning: Use this decorator only after locking.ssynchronized
1480

1481
  Example::
1482
    @locking.ssynchronized(_LOCK)
1483
    @_RequireOpenQueue
1484
    def Example(self):
1485
      pass
1486

1487
  """
1488
  def wrapper(self, *args, **kwargs):
1489
    # pylint: disable=W0212
1490
    assert self._queue_filelock is not None, "Queue should be open"
1491
    return fn(self, *args, **kwargs)
1492
  return wrapper
1493

    
1494

    
1495
def _RequireNonDrainedQueue(fn):
1496
  """Decorator checking for a non-drained queue.
1497

1498
  To be used with functions submitting new jobs.
1499

1500
  """
1501
  def wrapper(self, *args, **kwargs):
1502
    """Wrapper function.
1503

1504
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1505

1506
    """
1507
    # Ok when sharing the big job queue lock, as the drain file is created when
1508
    # the lock is exclusive.
1509
    # Needs access to protected member, pylint: disable=W0212
1510
    if self._drained:
1511
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1512

    
1513
    if not self._accepting_jobs:
1514
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1515

    
1516
    return fn(self, *args, **kwargs)
1517
  return wrapper
1518

    
1519

    
1520
class JobQueue(object):
1521
  """Queue used to manage the jobs.
1522

1523
  """
1524
  def __init__(self, context):
1525
    """Constructor for JobQueue.
1526

1527
    The constructor will initialize the job queue object and then
1528
    start loading the current jobs from disk, either for starting them
1529
    (if they were queue) or for aborting them (if they were already
1530
    running).
1531

1532
    @type context: GanetiContext
1533
    @param context: the context object for access to the configuration
1534
        data and other ganeti objects
1535

1536
    """
1537
    self.context = context
1538
    self._memcache = weakref.WeakValueDictionary()
1539
    self._my_hostname = netutils.Hostname.GetSysName()
1540

    
1541
    # The Big JobQueue lock. If a code block or method acquires it in shared
1542
    # mode safe it must guarantee concurrency with all the code acquiring it in
1543
    # shared mode, including itself. In order not to acquire it at all
1544
    # concurrency must be guaranteed with all code acquiring it in shared mode
1545
    # and all code acquiring it exclusively.
1546
    self._lock = locking.SharedLock("JobQueue")
1547

    
1548
    self.acquire = self._lock.acquire
1549
    self.release = self._lock.release
1550

    
1551
    # Accept jobs by default
1552
    self._accepting_jobs = True
1553

    
1554
    # Initialize the queue, and acquire the filelock.
1555
    # This ensures no other process is working on the job queue.
1556
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1557

    
1558
    # Read serial file
1559
    self._last_serial = jstore.ReadSerial()
1560
    assert self._last_serial is not None, ("Serial file was modified between"
1561
                                           " check in jstore and here")
1562

    
1563
    # Get initial list of nodes
1564
    self._nodes = dict((n.name, n.primary_ip)
1565
                       for n in self.context.cfg.GetAllNodesInfo().values()
1566
                       if n.master_candidate)
1567

    
1568
    # Remove master node
1569
    self._nodes.pop(self._my_hostname, None)
1570

    
1571
    # TODO: Check consistency across nodes
1572

    
1573
    self._queue_size = None
1574
    self._UpdateQueueSizeUnlocked()
1575
    assert ht.TInt(self._queue_size)
1576
    self._drained = jstore.CheckDrainFlag()
1577

    
1578
    # Job dependencies
1579
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1580
                                        self._EnqueueJobs)
1581
    self.context.glm.AddToLockMonitor(self.depmgr)
1582

    
1583
    # Setup worker pool
1584
    self._wpool = _JobQueueWorkerPool(self)
1585
    try:
1586
      self._InspectQueue()
1587
    except:
1588
      self._wpool.TerminateWorkers()
1589
      raise
1590

    
1591
  @locking.ssynchronized(_LOCK)
1592
  @_RequireOpenQueue
1593
  def _InspectQueue(self):
1594
    """Loads the whole job queue and resumes unfinished jobs.
1595

1596
    This function needs the lock here because WorkerPool.AddTask() may start a
1597
    job while we're still doing our work.
1598

1599
    """
1600
    logging.info("Inspecting job queue")
1601

    
1602
    restartjobs = []
1603

    
1604
    all_job_ids = self._GetJobIDsUnlocked()
1605
    jobs_count = len(all_job_ids)
1606
    lastinfo = time.time()
1607
    for idx, job_id in enumerate(all_job_ids):
1608
      # Give an update every 1000 jobs or 10 seconds
1609
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1610
          idx == (jobs_count - 1)):
1611
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1612
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1613
        lastinfo = time.time()
1614

    
1615
      job = self._LoadJobUnlocked(job_id)
1616

    
1617
      # a failure in loading the job can cause 'None' to be returned
1618
      if job is None:
1619
        continue
1620

    
1621
      status = job.CalcStatus()
1622

    
1623
      if status == constants.JOB_STATUS_QUEUED:
1624
        restartjobs.append(job)
1625

    
1626
      elif status in (constants.JOB_STATUS_RUNNING,
1627
                      constants.JOB_STATUS_WAITING,
1628
                      constants.JOB_STATUS_CANCELING):
1629
        logging.warning("Unfinished job %s found: %s", job.id, job)
1630

    
1631
        if status == constants.JOB_STATUS_WAITING:
1632
          # Restart job
1633
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1634
          restartjobs.append(job)
1635
        else:
1636
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1637
                                "Unclean master daemon shutdown")
1638
          job.Finalize()
1639

    
1640
        self.UpdateJobUnlocked(job)
1641

    
1642
    if restartjobs:
1643
      logging.info("Restarting %s jobs", len(restartjobs))
1644
      self._EnqueueJobsUnlocked(restartjobs)
1645

    
1646
    logging.info("Job queue inspection finished")
1647

    
1648
  def _GetRpc(self, address_list):
1649
    """Gets RPC runner with context.
1650

1651
    """
1652
    return rpc.JobQueueRunner(self.context, address_list)
1653

    
1654
  @locking.ssynchronized(_LOCK)
1655
  @_RequireOpenQueue
1656
  def AddNode(self, node):
1657
    """Register a new node with the queue.
1658

1659
    @type node: L{objects.Node}
1660
    @param node: the node object to be added
1661

1662
    """
1663
    node_name = node.name
1664
    assert node_name != self._my_hostname
1665

    
1666
    # Clean queue directory on added node
1667
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1668
    msg = result.fail_msg
1669
    if msg:
1670
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1671
                      node_name, msg)
1672

    
1673
    if not node.master_candidate:
1674
      # remove if existing, ignoring errors
1675
      self._nodes.pop(node_name, None)
1676
      # and skip the replication of the job ids
1677
      return
1678

    
1679
    # Upload the whole queue excluding archived jobs
1680
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1681

    
1682
    # Upload current serial file
1683
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1684

    
1685
    # Static address list
1686
    addrs = [node.primary_ip]
1687

    
1688
    for file_name in files:
1689
      # Read file content
1690
      content = utils.ReadFile(file_name)
1691

    
1692
      result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name,
1693
                                                        content)
1694
      msg = result[node_name].fail_msg
1695
      if msg:
1696
        logging.error("Failed to upload file %s to node %s: %s",
1697
                      file_name, node_name, msg)
1698

    
1699
    self._nodes[node_name] = node.primary_ip
1700

    
1701
  @locking.ssynchronized(_LOCK)
1702
  @_RequireOpenQueue
1703
  def RemoveNode(self, node_name):
1704
    """Callback called when removing nodes from the cluster.
1705

1706
    @type node_name: str
1707
    @param node_name: the name of the node to remove
1708

1709
    """
1710
    self._nodes.pop(node_name, None)
1711

    
1712
  @staticmethod
1713
  def _CheckRpcResult(result, nodes, failmsg):
1714
    """Verifies the status of an RPC call.
1715

1716
    Since we aim to keep consistency should this node (the current
1717
    master) fail, we will log errors if our rpc fail, and especially
1718
    log the case when more than half of the nodes fails.
1719

1720
    @param result: the data as returned from the rpc call
1721
    @type nodes: list
1722
    @param nodes: the list of nodes we made the call to
1723
    @type failmsg: str
1724
    @param failmsg: the identifier to be used for logging
1725

1726
    """
1727
    failed = []
1728
    success = []
1729

    
1730
    for node in nodes:
1731
      msg = result[node].fail_msg
1732
      if msg:
1733
        failed.append(node)
1734
        logging.error("RPC call %s (%s) failed on node %s: %s",
1735
                      result[node].call, failmsg, node, msg)
1736
      else:
1737
        success.append(node)
1738

    
1739
    # +1 for the master node
1740
    if (len(success) + 1) < len(failed):
1741
      # TODO: Handle failing nodes
1742
      logging.error("More than half of the nodes failed")
1743

    
1744
  def _GetNodeIp(self):
1745
    """Helper for returning the node name/ip list.
1746

1747
    @rtype: (list, list)
1748
    @return: a tuple of two lists, the first one with the node
1749
        names and the second one with the node addresses
1750

1751
    """
1752
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1753
    name_list = self._nodes.keys()
1754
    addr_list = [self._nodes[name] for name in name_list]
1755
    return name_list, addr_list
1756

    
1757
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1758
    """Writes a file locally and then replicates it to all nodes.
1759

1760
    This function will replace the contents of a file on the local
1761
    node and then replicate it to all the other nodes we have.
1762

1763
    @type file_name: str
1764
    @param file_name: the path of the file to be replicated
1765
    @type data: str
1766
    @param data: the new contents of the file
1767
    @type replicate: boolean
1768
    @param replicate: whether to spread the changes to the remote nodes
1769

1770
    """
1771
    getents = runtime.GetEnts()
1772
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1773
                    gid=getents.masterd_gid)
1774

    
1775
    if replicate:
1776
      names, addrs = self._GetNodeIp()
1777
      result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data)
1778
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1779

    
1780
  def _RenameFilesUnlocked(self, rename):
1781
    """Renames a file locally and then replicate the change.
1782

1783
    This function will rename a file in the local queue directory
1784
    and then replicate this rename to all the other nodes we have.
1785

1786
    @type rename: list of (old, new)
1787
    @param rename: List containing tuples mapping old to new names
1788

1789
    """
1790
    # Rename them locally
1791
    for old, new in rename:
1792
      utils.RenameFile(old, new, mkdir=True)
1793

    
1794
    # ... and on all nodes
1795
    names, addrs = self._GetNodeIp()
1796
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1797
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1798

    
1799
  def _NewSerialsUnlocked(self, count):
1800
    """Generates a new job identifier.
1801

1802
    Job identifiers are unique during the lifetime of a cluster.
1803

1804
    @type count: integer
1805
    @param count: how many serials to return
1806
    @rtype: str
1807
    @return: a string representing the job identifier.
1808

1809
    """
1810
    assert ht.TPositiveInt(count)
1811

    
1812
    # New number
1813
    serial = self._last_serial + count
1814

    
1815
    # Write to file
1816
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1817
                             "%s\n" % serial, True)
1818

    
1819
    result = [jstore.FormatJobID(v)
1820
              for v in range(self._last_serial + 1, serial + 1)]
1821

    
1822
    # Keep it only if we were able to write the file
1823
    self._last_serial = serial
1824

    
1825
    assert len(result) == count
1826

    
1827
    return result
1828

    
1829
  @staticmethod
1830
  def _GetJobPath(job_id):
1831
    """Returns the job file for a given job id.
1832

1833
    @type job_id: str
1834
    @param job_id: the job identifier
1835
    @rtype: str
1836
    @return: the path to the job file
1837

1838
    """
1839
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1840

    
1841
  @staticmethod
1842
  def _GetArchivedJobPath(job_id):
1843
    """Returns the archived job file for a give job id.
1844

1845
    @type job_id: str
1846
    @param job_id: the job identifier
1847
    @rtype: str
1848
    @return: the path to the archived job file
1849

1850
    """
1851
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1852
                          jstore.GetArchiveDirectory(job_id),
1853
                          "job-%s" % job_id)
1854

    
1855
  @staticmethod
1856
  def _GetJobIDsUnlocked(sort=True):
1857
    """Return all known job IDs.
1858

1859
    The method only looks at disk because it's a requirement that all
1860
    jobs are present on disk (so in the _memcache we don't have any
1861
    extra IDs).
1862

1863
    @type sort: boolean
1864
    @param sort: perform sorting on the returned job ids
1865
    @rtype: list
1866
    @return: the list of job IDs
1867

1868
    """
1869
    jlist = []
1870
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1871
      m = constants.JOB_FILE_RE.match(filename)
1872
      if m:
1873
        jlist.append(m.group(1))
1874
    if sort:
1875
      jlist = utils.NiceSort(jlist)
1876
    return jlist
1877

    
1878
  def _LoadJobUnlocked(self, job_id):
1879
    """Loads a job from the disk or memory.
1880

1881
    Given a job id, this will return the cached job object if
1882
    existing, or try to load the job from the disk. If loading from
1883
    disk, it will also add the job to the cache.
1884

1885
    @param job_id: the job id
1886
    @rtype: L{_QueuedJob} or None
1887
    @return: either None or the job object
1888

1889
    """
1890
    job = self._memcache.get(job_id, None)
1891
    if job:
1892
      logging.debug("Found job %s in memcache", job_id)
1893
      assert job.writable, "Found read-only job in memcache"
1894
      return job
1895

    
1896
    try:
1897
      job = self._LoadJobFromDisk(job_id, False)
1898
      if job is None:
1899
        return job
1900
    except errors.JobFileCorrupted:
1901
      old_path = self._GetJobPath(job_id)
1902
      new_path = self._GetArchivedJobPath(job_id)
1903
      if old_path == new_path:
1904
        # job already archived (future case)
1905
        logging.exception("Can't parse job %s", job_id)
1906
      else:
1907
        # non-archived case
1908
        logging.exception("Can't parse job %s, will archive.", job_id)
1909
        self._RenameFilesUnlocked([(old_path, new_path)])
1910
      return None
1911

    
1912
    assert job.writable, "Job just loaded is not writable"
1913

    
1914
    self._memcache[job_id] = job
1915
    logging.debug("Added job %s to the cache", job_id)
1916
    return job
1917

    
1918
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1919
    """Load the given job file from disk.
1920

1921
    Given a job file, read, load and restore it in a _QueuedJob format.
1922

1923
    @type job_id: string
1924
    @param job_id: job identifier
1925
    @type try_archived: bool
1926
    @param try_archived: Whether to try loading an archived job
1927
    @rtype: L{_QueuedJob} or None
1928
    @return: either None or the job object
1929

1930
    """
1931
    path_functions = [(self._GetJobPath, True)]
1932

    
1933
    if try_archived:
1934
      path_functions.append((self._GetArchivedJobPath, False))
1935

    
1936
    raw_data = None
1937
    writable_default = None
1938

    
1939
    for (fn, writable_default) in path_functions:
1940
      filepath = fn(job_id)
1941
      logging.debug("Loading job from %s", filepath)
1942
      try:
1943
        raw_data = utils.ReadFile(filepath)
1944
      except EnvironmentError, err:
1945
        if err.errno != errno.ENOENT:
1946
          raise
1947
      else:
1948
        break
1949

    
1950
    if not raw_data:
1951
      return None
1952

    
1953
    if writable is None:
1954
      writable = writable_default
1955

    
1956
    try:
1957
      data = serializer.LoadJson(raw_data)
1958
      job = _QueuedJob.Restore(self, data, writable)
1959
    except Exception, err: # pylint: disable=W0703
1960
      raise errors.JobFileCorrupted(err)
1961

    
1962
    return job
1963

    
1964
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1965
    """Load the given job file from disk.
1966

1967
    Given a job file, read, load and restore it in a _QueuedJob format.
1968
    In case of error reading the job, it gets returned as None, and the
1969
    exception is logged.
1970

1971
    @type job_id: string
1972
    @param job_id: job identifier
1973
    @type try_archived: bool
1974
    @param try_archived: Whether to try loading an archived job
1975
    @rtype: L{_QueuedJob} or None
1976
    @return: either None or the job object
1977

1978
    """
1979
    try:
1980
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
1981
    except (errors.JobFileCorrupted, EnvironmentError):
1982
      logging.exception("Can't load/parse job %s", job_id)
1983
      return None
1984

    
1985
  def _UpdateQueueSizeUnlocked(self):
1986
    """Update the queue size.
1987

1988
    """
1989
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1990

    
1991
  @locking.ssynchronized(_LOCK)
1992
  @_RequireOpenQueue
1993
  def SetDrainFlag(self, drain_flag):
1994
    """Sets the drain flag for the queue.
1995

1996
    @type drain_flag: boolean
1997
    @param drain_flag: Whether to set or unset the drain flag
1998

1999
    """
2000
    jstore.SetDrainFlag(drain_flag)
2001

    
2002
    self._drained = drain_flag
2003

    
2004
    return True
2005

    
2006
  @_RequireOpenQueue
2007
  def _SubmitJobUnlocked(self, job_id, ops):
2008
    """Create and store a new job.
2009

2010
    This enters the job into our job queue and also puts it on the new
2011
    queue, in order for it to be picked up by the queue processors.
2012

2013
    @type job_id: job ID
2014
    @param job_id: the job ID for the new job
2015
    @type ops: list
2016
    @param ops: The list of OpCodes that will become the new job.
2017
    @rtype: L{_QueuedJob}
2018
    @return: the job object to be queued
2019
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2020
    @raise errors.GenericError: If an opcode is not valid
2021

2022
    """
2023
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2024
      raise errors.JobQueueFull()
2025

    
2026
    job = _QueuedJob(self, job_id, ops, True)
2027

    
2028
    # Check priority
2029
    for idx, op in enumerate(job.ops):
2030
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2031
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2032
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2033
                                  " are %s" % (idx, op.priority, allowed))
2034

    
2035
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2036
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2037
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2038
                                  " match %s: %s" %
2039
                                  (idx, opcodes.TNoRelativeJobDependencies,
2040
                                   dependencies))
2041

    
2042
    # Write to disk
2043
    self.UpdateJobUnlocked(job)
2044

    
2045
    self._queue_size += 1
2046

    
2047
    logging.debug("Adding new job %s to the cache", job_id)
2048
    self._memcache[job_id] = job
2049

    
2050
    return job
2051

    
2052
  @locking.ssynchronized(_LOCK)
2053
  @_RequireOpenQueue
2054
  @_RequireNonDrainedQueue
2055
  def SubmitJob(self, ops):
2056
    """Create and store a new job.
2057

2058
    @see: L{_SubmitJobUnlocked}
2059

2060
    """
2061
    (job_id, ) = self._NewSerialsUnlocked(1)
2062
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2063
    return job_id
2064

    
2065
  @locking.ssynchronized(_LOCK)
2066
  @_RequireOpenQueue
2067
  @_RequireNonDrainedQueue
2068
  def SubmitManyJobs(self, jobs):
2069
    """Create and store multiple jobs.
2070

2071
    @see: L{_SubmitJobUnlocked}
2072

2073
    """
2074
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2075

    
2076
    (results, added_jobs) = \
2077
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2078

    
2079
    self._EnqueueJobsUnlocked(added_jobs)
2080

    
2081
    return results
2082

    
2083
  @staticmethod
2084
  def _FormatSubmitError(msg, ops):
2085
    """Formats errors which occurred while submitting a job.
2086

2087
    """
2088
    return ("%s; opcodes %s" %
2089
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2090

    
2091
  @staticmethod
2092
  def _ResolveJobDependencies(resolve_fn, deps):
2093
    """Resolves relative job IDs in dependencies.
2094

2095
    @type resolve_fn: callable
2096
    @param resolve_fn: Function to resolve a relative job ID
2097
    @type deps: list
2098
    @param deps: Dependencies
2099
    @rtype: list
2100
    @return: Resolved dependencies
2101

2102
    """
2103
    result = []
2104

    
2105
    for (dep_job_id, dep_status) in deps:
2106
      if ht.TRelativeJobId(dep_job_id):
2107
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2108
        try:
2109
          job_id = resolve_fn(dep_job_id)
2110
        except IndexError:
2111
          # Abort
2112
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2113
      else:
2114
        job_id = dep_job_id
2115

    
2116
      result.append((job_id, dep_status))
2117

    
2118
    return (True, result)
2119

    
2120
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2121
    """Create and store multiple jobs.
2122

2123
    @see: L{_SubmitJobUnlocked}
2124

2125
    """
2126
    results = []
2127
    added_jobs = []
2128

    
2129
    def resolve_fn(job_idx, reljobid):
2130
      assert reljobid < 0
2131
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2132

    
2133
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2134
      for op in ops:
2135
        if getattr(op, opcodes.DEPEND_ATTR, None):
2136
          (status, data) = \
2137
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2138
                                         op.depends)
2139
          if not status:
2140
            # Abort resolving dependencies
2141
            assert ht.TNonEmptyString(data), "No error message"
2142
            break
2143
          # Use resolved dependencies
2144
          op.depends = data
2145
      else:
2146
        try:
2147
          job = self._SubmitJobUnlocked(job_id, ops)
2148
        except errors.GenericError, err:
2149
          status = False
2150
          data = self._FormatSubmitError(str(err), ops)
2151
        else:
2152
          status = True
2153
          data = job_id
2154
          added_jobs.append(job)
2155

    
2156
      results.append((status, data))
2157

    
2158
    return (results, added_jobs)
2159

    
2160
  @locking.ssynchronized(_LOCK)
2161
  def _EnqueueJobs(self, jobs):
2162
    """Helper function to add jobs to worker pool's queue.
2163

2164
    @type jobs: list
2165
    @param jobs: List of all jobs
2166

2167
    """
2168
    return self._EnqueueJobsUnlocked(jobs)
2169

    
2170
  def _EnqueueJobsUnlocked(self, jobs):
2171
    """Helper function to add jobs to worker pool's queue.
2172

2173
    @type jobs: list
2174
    @param jobs: List of all jobs
2175

2176
    """
2177
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2178
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2179
                             priority=[job.CalcPriority() for job in jobs])
2180

    
2181
  def _GetJobStatusForDependencies(self, job_id):
2182
    """Gets the status of a job for dependencies.
2183

2184
    @type job_id: string
2185
    @param job_id: Job ID
2186
    @raise errors.JobLost: If job can't be found
2187

2188
    """
2189
    if not isinstance(job_id, basestring):
2190
      job_id = jstore.FormatJobID(job_id)
2191

    
2192
    # Not using in-memory cache as doing so would require an exclusive lock
2193

    
2194
    # Try to load from disk
2195
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2196

    
2197
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2198

    
2199
    if job:
2200
      return job.CalcStatus()
2201

    
2202
    raise errors.JobLost("Job %s not found" % job_id)
2203

    
2204
  @_RequireOpenQueue
2205
  def UpdateJobUnlocked(self, job, replicate=True):
2206
    """Update a job's on disk storage.
2207

2208
    After a job has been modified, this function needs to be called in
2209
    order to write the changes to disk and replicate them to the other
2210
    nodes.
2211

2212
    @type job: L{_QueuedJob}
2213
    @param job: the changed job
2214
    @type replicate: boolean
2215
    @param replicate: whether to replicate the change to remote nodes
2216

2217
    """
2218
    if __debug__:
2219
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2220
      assert (finalized ^ (job.end_timestamp is None))
2221
      assert job.writable, "Can't update read-only job"
2222

    
2223
    filename = self._GetJobPath(job.id)
2224
    data = serializer.DumpJson(job.Serialize())
2225
    logging.debug("Writing job %s to %s", job.id, filename)
2226
    self._UpdateJobQueueFile(filename, data, replicate)
2227

    
2228
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2229
                        timeout):
2230
    """Waits for changes in a job.
2231

2232
    @type job_id: string
2233
    @param job_id: Job identifier
2234
    @type fields: list of strings
2235
    @param fields: Which fields to check for changes
2236
    @type prev_job_info: list or None
2237
    @param prev_job_info: Last job information returned
2238
    @type prev_log_serial: int
2239
    @param prev_log_serial: Last job message serial number
2240
    @type timeout: float
2241
    @param timeout: maximum time to wait in seconds
2242
    @rtype: tuple (job info, log entries)
2243
    @return: a tuple of the job information as required via
2244
        the fields parameter, and the log entries as a list
2245

2246
        if the job has not changed and the timeout has expired,
2247
        we instead return a special value,
2248
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2249
        as such by the clients
2250

2251
    """
2252
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
2253
                             writable=False)
2254

    
2255
    helper = _WaitForJobChangesHelper()
2256

    
2257
    return helper(self._GetJobPath(job_id), load_fn,
2258
                  fields, prev_job_info, prev_log_serial, timeout)
2259

    
2260
  @locking.ssynchronized(_LOCK)
2261
  @_RequireOpenQueue
2262
  def CancelJob(self, job_id):
2263
    """Cancels a job.
2264

2265
    This will only succeed if the job has not started yet.
2266

2267
    @type job_id: string
2268
    @param job_id: job ID of job to be cancelled.
2269

2270
    """
2271
    logging.info("Cancelling job %s", job_id)
2272

    
2273
    job = self._LoadJobUnlocked(job_id)
2274
    if not job:
2275
      logging.debug("Job %s not found", job_id)
2276
      return (False, "Job %s not found" % job_id)
2277

    
2278
    assert job.writable, "Can't cancel read-only job"
2279

    
2280
    (success, msg) = job.Cancel()
2281

    
2282
    if success:
2283
      # If the job was finalized (e.g. cancelled), this is the final write
2284
      # allowed. The job can be archived anytime.
2285
      self.UpdateJobUnlocked(job)
2286

    
2287
    return (success, msg)
2288

    
2289
  @_RequireOpenQueue
2290
  def _ArchiveJobsUnlocked(self, jobs):
2291
    """Archives jobs.
2292

2293
    @type jobs: list of L{_QueuedJob}
2294
    @param jobs: Job objects
2295
    @rtype: int
2296
    @return: Number of archived jobs
2297

2298
    """
2299
    archive_jobs = []
2300
    rename_files = []
2301
    for job in jobs:
2302
      assert job.writable, "Can't archive read-only job"
2303

    
2304
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2305
        logging.debug("Job %s is not yet done", job.id)
2306
        continue
2307

    
2308
      archive_jobs.append(job)
2309

    
2310
      old = self._GetJobPath(job.id)
2311
      new = self._GetArchivedJobPath(job.id)
2312
      rename_files.append((old, new))
2313

    
2314
    # TODO: What if 1..n files fail to rename?
2315
    self._RenameFilesUnlocked(rename_files)
2316

    
2317
    logging.debug("Successfully archived job(s) %s",
2318
                  utils.CommaJoin(job.id for job in archive_jobs))
2319

    
2320
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2321
    # the files, we update the cached queue size from the filesystem. When we
2322
    # get around to fix the TODO: above, we can use the number of actually
2323
    # archived jobs to fix this.
2324
    self._UpdateQueueSizeUnlocked()
2325
    return len(archive_jobs)
2326

    
2327
  @locking.ssynchronized(_LOCK)
2328
  @_RequireOpenQueue
2329
  def ArchiveJob(self, job_id):
2330
    """Archives a job.
2331

2332
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2333

2334
    @type job_id: string
2335
    @param job_id: Job ID of job to be archived.
2336
    @rtype: bool
2337
    @return: Whether job was archived
2338

2339
    """
2340
    logging.info("Archiving job %s", job_id)
2341

    
2342
    job = self._LoadJobUnlocked(job_id)
2343
    if not job:
2344
      logging.debug("Job %s not found", job_id)
2345
      return False
2346

    
2347
    return self._ArchiveJobsUnlocked([job]) == 1
2348

    
2349
  @locking.ssynchronized(_LOCK)
2350
  @_RequireOpenQueue
2351
  def AutoArchiveJobs(self, age, timeout):
2352
    """Archives all jobs based on age.
2353

2354
    The method will archive all jobs which are older than the age
2355
    parameter. For jobs that don't have an end timestamp, the start
2356
    timestamp will be considered. The special '-1' age will cause
2357
    archival of all jobs (that are not running or queued).
2358

2359
    @type age: int
2360
    @param age: the minimum age in seconds
2361

2362
    """
2363
    logging.info("Archiving jobs with age more than %s seconds", age)
2364

    
2365
    now = time.time()
2366
    end_time = now + timeout
2367
    archived_count = 0
2368
    last_touched = 0
2369

    
2370
    all_job_ids = self._GetJobIDsUnlocked()
2371
    pending = []
2372
    for idx, job_id in enumerate(all_job_ids):
2373
      last_touched = idx + 1
2374

    
2375
      # Not optimal because jobs could be pending
2376
      # TODO: Measure average duration for job archival and take number of
2377
      # pending jobs into account.
2378
      if time.time() > end_time:
2379
        break
2380

    
2381
      # Returns None if the job failed to load
2382
      job = self._LoadJobUnlocked(job_id)
2383
      if job:
2384
        if job.end_timestamp is None:
2385
          if job.start_timestamp is None:
2386
            job_age = job.received_timestamp
2387
          else:
2388
            job_age = job.start_timestamp
2389
        else:
2390
          job_age = job.end_timestamp
2391

    
2392
        if age == -1 or now - job_age[0] > age:
2393
          pending.append(job)
2394

    
2395
          # Archive 10 jobs at a time
2396
          if len(pending) >= 10:
2397
            archived_count += self._ArchiveJobsUnlocked(pending)
2398
            pending = []
2399

    
2400
    if pending:
2401
      archived_count += self._ArchiveJobsUnlocked(pending)
2402

    
2403
    return (archived_count, len(all_job_ids) - last_touched)
2404

    
2405
  def _Query(self, fields, qfilter):
2406
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2407
                       namefield="id")
2408

    
2409
    job_ids = qobj.RequestedNames()
2410

    
2411
    list_all = (job_ids is None)
2412

    
2413
    if list_all:
2414
      # Since files are added to/removed from the queue atomically, there's no
2415
      # risk of getting the job ids in an inconsistent state.
2416
      job_ids = self._GetJobIDsUnlocked()
2417

    
2418
    jobs = []
2419

    
2420
    for job_id in job_ids:
2421
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2422
      if job is not None or not list_all:
2423
        jobs.append((job_id, job))
2424

    
2425
    return (qobj, jobs, list_all)
2426

    
2427
  def QueryJobs(self, fields, qfilter):
2428
    """Returns a list of jobs in queue.
2429

2430
    @type fields: sequence
2431
    @param fields: List of wanted fields
2432
    @type qfilter: None or query2 filter (list)
2433
    @param qfilter: Query filter
2434

2435
    """
2436
    (qobj, ctx, sort_by_name) = self._Query(fields, qfilter)
2437

    
2438
    return query.GetQueryResponse(qobj, ctx, sort_by_name=sort_by_name)
2439

    
2440
  def OldStyleQueryJobs(self, job_ids, fields):
2441
    """Returns a list of jobs in queue.
2442

2443
    @type job_ids: list
2444
    @param job_ids: sequence of job identifiers or None for all
2445
    @type fields: list
2446
    @param fields: names of fields to return
2447
    @rtype: list
2448
    @return: list one element per job, each element being list with
2449
        the requested fields
2450

2451
    """
2452
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2453

    
2454
    (qobj, ctx, sort_by_name) = self._Query(fields, qfilter)
2455

    
2456
    return qobj.OldStyleQuery(ctx, sort_by_name=sort_by_name)
2457

    
2458
  @locking.ssynchronized(_LOCK)
2459
  def PrepareShutdown(self):
2460
    """Prepare to stop the job queue.
2461

2462
    Disables execution of jobs in the workerpool and returns whether there are
2463
    any jobs currently running. If the latter is the case, the job queue is not
2464
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2465
    be called without interfering with any job. Queued and unfinished jobs will
2466
    be resumed next time.
2467

2468
    Once this function has been called no new job submissions will be accepted
2469
    (see L{_RequireNonDrainedQueue}).
2470

2471
    @rtype: bool
2472
    @return: Whether there are any running jobs
2473

2474
    """
2475
    if self._accepting_jobs:
2476
      self._accepting_jobs = False
2477

    
2478
      # Tell worker pool to stop processing pending tasks
2479
      self._wpool.SetActive(False)
2480

    
2481
    return self._wpool.HasRunningTasks()
2482

    
2483
  @locking.ssynchronized(_LOCK)
2484
  @_RequireOpenQueue
2485
  def Shutdown(self):
2486
    """Stops the job queue.
2487

2488
    This shutdowns all the worker threads an closes the queue.
2489

2490
    """
2491
    self._wpool.TerminateWorkers()
2492

    
2493
    self._queue_filelock.Close()
2494
    self._queue_filelock = None