Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ e2b4a7ba

History | View | Annotate | Download (72.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import time
35
import weakref
36
import threading
37
import itertools
38

    
39
try:
40
  # pylint: disable=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
59
from ganeti import ht
60
from ganeti import query
61
from ganeti import qlang
62
from ganeti import pathutils
63

    
64

    
65
JOBQUEUE_THREADS = 25
66

    
67
# member lock names to be passed to @ssynchronized decorator
68
_LOCK = "_lock"
69
_QUEUE = "_queue"
70

    
71

    
72
class CancelJob(Exception):
73
  """Special exception to cancel a job.
74

75
  """
76

    
77

    
78
def TimeStampNow():
79
  """Returns the current timestamp.
80

81
  @rtype: tuple
82
  @return: the current time in the (seconds, microseconds) format
83

84
  """
85
  return utils.SplitTime(time.time())
86

    
87

    
88
class _SimpleJobQuery:
89
  """Wrapper for job queries.
90

91
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
92

93
  """
94
  def __init__(self, fields):
95
    """Initializes this class.
96

97
    """
98
    self._query = query.Query(query.JOB_FIELDS, fields)
99

    
100
  def __call__(self, job):
101
    """Executes a job query using cached field list.
102

103
    """
104
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
105

    
106

    
107
class _QueuedOpCode(object):
108
  """Encapsulates an opcode object.
109

110
  @ivar log: holds the execution log and consists of tuples
111
  of the form C{(log_serial, timestamp, level, message)}
112
  @ivar input: the OpCode we encapsulate
113
  @ivar status: the current status
114
  @ivar result: the result of the LU execution
115
  @ivar start_timestamp: timestamp for the start of the execution
116
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
117
  @ivar stop_timestamp: timestamp for the end of the execution
118

119
  """
120
  __slots__ = ["input", "status", "result", "log", "priority",
121
               "start_timestamp", "exec_timestamp", "end_timestamp",
122
               "__weakref__"]
123

    
124
  def __init__(self, op):
125
    """Initializes instances of this class.
126

127
    @type op: L{opcodes.OpCode}
128
    @param op: the opcode we encapsulate
129

130
    """
131
    self.input = op
132
    self.status = constants.OP_STATUS_QUEUED
133
    self.result = None
134
    self.log = []
135
    self.start_timestamp = None
136
    self.exec_timestamp = None
137
    self.end_timestamp = None
138

    
139
    # Get initial priority (it might change during the lifetime of this opcode)
140
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
141

    
142
  @classmethod
143
  def Restore(cls, state):
144
    """Restore the _QueuedOpCode from the serialized form.
145

146
    @type state: dict
147
    @param state: the serialized state
148
    @rtype: _QueuedOpCode
149
    @return: a new _QueuedOpCode instance
150

151
    """
152
    obj = _QueuedOpCode.__new__(cls)
153
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
154
    obj.status = state["status"]
155
    obj.result = state["result"]
156
    obj.log = state["log"]
157
    obj.start_timestamp = state.get("start_timestamp", None)
158
    obj.exec_timestamp = state.get("exec_timestamp", None)
159
    obj.end_timestamp = state.get("end_timestamp", None)
160
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
161
    return obj
162

    
163
  def Serialize(self):
164
    """Serializes this _QueuedOpCode.
165

166
    @rtype: dict
167
    @return: the dictionary holding the serialized state
168

169
    """
170
    return {
171
      "input": self.input.__getstate__(),
172
      "status": self.status,
173
      "result": self.result,
174
      "log": self.log,
175
      "start_timestamp": self.start_timestamp,
176
      "exec_timestamp": self.exec_timestamp,
177
      "end_timestamp": self.end_timestamp,
178
      "priority": self.priority,
179
      }
180

    
181

    
182
class _QueuedJob(object):
183
  """In-memory job representation.
184

185
  This is what we use to track the user-submitted jobs. Locking must
186
  be taken care of by users of this class.
187

188
  @type queue: L{JobQueue}
189
  @ivar queue: the parent queue
190
  @ivar id: the job ID
191
  @type ops: list
192
  @ivar ops: the list of _QueuedOpCode that constitute the job
193
  @type log_serial: int
194
  @ivar log_serial: holds the index for the next log entry
195
  @ivar received_timestamp: the timestamp for when the job was received
196
  @ivar start_timestmap: the timestamp for start of execution
197
  @ivar end_timestamp: the timestamp for end of execution
198
  @ivar writable: Whether the job is allowed to be modified
199

200
  """
201
  # pylint: disable=W0212
202
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
203
               "received_timestamp", "start_timestamp", "end_timestamp",
204
               "__weakref__", "processor_lock", "writable"]
205

    
206
  def __init__(self, queue, job_id, ops, writable):
207
    """Constructor for the _QueuedJob.
208

209
    @type queue: L{JobQueue}
210
    @param queue: our parent queue
211
    @type job_id: job_id
212
    @param job_id: our job id
213
    @type ops: list
214
    @param ops: the list of opcodes we hold, which will be encapsulated
215
        in _QueuedOpCodes
216
    @type writable: bool
217
    @param writable: Whether job can be modified
218

219
    """
220
    if not ops:
221
      raise errors.GenericError("A job needs at least one opcode")
222

    
223
    self.queue = queue
224
    self.id = int(job_id)
225
    self.ops = [_QueuedOpCode(op) for op in ops]
226
    self.log_serial = 0
227
    self.received_timestamp = TimeStampNow()
228
    self.start_timestamp = None
229
    self.end_timestamp = None
230

    
231
    self._InitInMemory(self, writable)
232

    
233
  @staticmethod
234
  def _InitInMemory(obj, writable):
235
    """Initializes in-memory variables.
236

237
    """
238
    obj.writable = writable
239
    obj.ops_iter = None
240
    obj.cur_opctx = None
241

    
242
    # Read-only jobs are not processed and therefore don't need a lock
243
    if writable:
244
      obj.processor_lock = threading.Lock()
245
    else:
246
      obj.processor_lock = None
247

    
248
  def __repr__(self):
249
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
250
              "id=%s" % self.id,
251
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
252

    
253
    return "<%s at %#x>" % (" ".join(status), id(self))
254

    
255
  @classmethod
256
  def Restore(cls, queue, state, writable):
257
    """Restore a _QueuedJob from serialized state:
258

259
    @type queue: L{JobQueue}
260
    @param queue: to which queue the restored job belongs
261
    @type state: dict
262
    @param state: the serialized state
263
    @type writable: bool
264
    @param writable: Whether job can be modified
265
    @rtype: _JobQueue
266
    @return: the restored _JobQueue instance
267

268
    """
269
    obj = _QueuedJob.__new__(cls)
270
    obj.queue = queue
271
    obj.id = int(state["id"])
272
    obj.received_timestamp = state.get("received_timestamp", None)
273
    obj.start_timestamp = state.get("start_timestamp", None)
274
    obj.end_timestamp = state.get("end_timestamp", None)
275

    
276
    obj.ops = []
277
    obj.log_serial = 0
278
    for op_state in state["ops"]:
279
      op = _QueuedOpCode.Restore(op_state)
280
      for log_entry in op.log:
281
        obj.log_serial = max(obj.log_serial, log_entry[0])
282
      obj.ops.append(op)
283

    
284
    cls._InitInMemory(obj, writable)
285

    
286
    return obj
287

    
288
  def Serialize(self):
289
    """Serialize the _JobQueue instance.
290

291
    @rtype: dict
292
    @return: the serialized state
293

294
    """
295
    return {
296
      "id": self.id,
297
      "ops": [op.Serialize() for op in self.ops],
298
      "start_timestamp": self.start_timestamp,
299
      "end_timestamp": self.end_timestamp,
300
      "received_timestamp": self.received_timestamp,
301
      }
302

    
303
  def CalcStatus(self):
304
    """Compute the status of this job.
305

306
    This function iterates over all the _QueuedOpCodes in the job and
307
    based on their status, computes the job status.
308

309
    The algorithm is:
310
      - if we find a cancelled, or finished with error, the job
311
        status will be the same
312
      - otherwise, the last opcode with the status one of:
313
          - waitlock
314
          - canceling
315
          - running
316

317
        will determine the job status
318

319
      - otherwise, it means either all opcodes are queued, or success,
320
        and the job status will be the same
321

322
    @return: the job status
323

324
    """
325
    status = constants.JOB_STATUS_QUEUED
326

    
327
    all_success = True
328
    for op in self.ops:
329
      if op.status == constants.OP_STATUS_SUCCESS:
330
        continue
331

    
332
      all_success = False
333

    
334
      if op.status == constants.OP_STATUS_QUEUED:
335
        pass
336
      elif op.status == constants.OP_STATUS_WAITING:
337
        status = constants.JOB_STATUS_WAITING
338
      elif op.status == constants.OP_STATUS_RUNNING:
339
        status = constants.JOB_STATUS_RUNNING
340
      elif op.status == constants.OP_STATUS_CANCELING:
341
        status = constants.JOB_STATUS_CANCELING
342
        break
343
      elif op.status == constants.OP_STATUS_ERROR:
344
        status = constants.JOB_STATUS_ERROR
345
        # The whole job fails if one opcode failed
346
        break
347
      elif op.status == constants.OP_STATUS_CANCELED:
348
        status = constants.OP_STATUS_CANCELED
349
        break
350

    
351
    if all_success:
352
      status = constants.JOB_STATUS_SUCCESS
353

    
354
    return status
355

    
356
  def CalcPriority(self):
357
    """Gets the current priority for this job.
358

359
    Only unfinished opcodes are considered. When all are done, the default
360
    priority is used.
361

362
    @rtype: int
363

364
    """
365
    priorities = [op.priority for op in self.ops
366
                  if op.status not in constants.OPS_FINALIZED]
367

    
368
    if not priorities:
369
      # All opcodes are done, assume default priority
370
      return constants.OP_PRIO_DEFAULT
371

    
372
    return min(priorities)
373

    
374
  def GetLogEntries(self, newer_than):
375
    """Selectively returns the log entries.
376

377
    @type newer_than: None or int
378
    @param newer_than: if this is None, return all log entries,
379
        otherwise return only the log entries with serial higher
380
        than this value
381
    @rtype: list
382
    @return: the list of the log entries selected
383

384
    """
385
    if newer_than is None:
386
      serial = -1
387
    else:
388
      serial = newer_than
389

    
390
    entries = []
391
    for op in self.ops:
392
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
393

    
394
    return entries
395

    
396
  def GetInfo(self, fields):
397
    """Returns information about a job.
398

399
    @type fields: list
400
    @param fields: names of fields to return
401
    @rtype: list
402
    @return: list with one element for each field
403
    @raise errors.OpExecError: when an invalid field
404
        has been passed
405

406
    """
407
    return _SimpleJobQuery(fields)(self)
408

    
409
  def MarkUnfinishedOps(self, status, result):
410
    """Mark unfinished opcodes with a given status and result.
411

412
    This is an utility function for marking all running or waiting to
413
    be run opcodes with a given status. Opcodes which are already
414
    finalised are not changed.
415

416
    @param status: a given opcode status
417
    @param result: the opcode result
418

419
    """
420
    not_marked = True
421
    for op in self.ops:
422
      if op.status in constants.OPS_FINALIZED:
423
        assert not_marked, "Finalized opcodes found after non-finalized ones"
424
        continue
425
      op.status = status
426
      op.result = result
427
      not_marked = False
428

    
429
  def Finalize(self):
430
    """Marks the job as finalized.
431

432
    """
433
    self.end_timestamp = TimeStampNow()
434

    
435
  def Cancel(self):
436
    """Marks job as canceled/-ing if possible.
437

438
    @rtype: tuple; (bool, string)
439
    @return: Boolean describing whether job was successfully canceled or marked
440
      as canceling and a text message
441

442
    """
443
    status = self.CalcStatus()
444

    
445
    if status == constants.JOB_STATUS_QUEUED:
446
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
447
                             "Job canceled by request")
448
      self.Finalize()
449
      return (True, "Job %s canceled" % self.id)
450

    
451
    elif status == constants.JOB_STATUS_WAITING:
452
      # The worker will notice the new status and cancel the job
453
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
454
      return (True, "Job %s will be canceled" % self.id)
455

    
456
    else:
457
      logging.debug("Job %s is no longer waiting in the queue", self.id)
458
      return (False, "Job %s is no longer waiting in the queue" % self.id)
459

    
460

    
461
class _OpExecCallbacks(mcpu.OpExecCbBase):
462
  def __init__(self, queue, job, op):
463
    """Initializes this class.
464

465
    @type queue: L{JobQueue}
466
    @param queue: Job queue
467
    @type job: L{_QueuedJob}
468
    @param job: Job object
469
    @type op: L{_QueuedOpCode}
470
    @param op: OpCode
471

472
    """
473
    assert queue, "Queue is missing"
474
    assert job, "Job is missing"
475
    assert op, "Opcode is missing"
476

    
477
    self._queue = queue
478
    self._job = job
479
    self._op = op
480

    
481
  def _CheckCancel(self):
482
    """Raises an exception to cancel the job if asked to.
483

484
    """
485
    # Cancel here if we were asked to
486
    if self._op.status == constants.OP_STATUS_CANCELING:
487
      logging.debug("Canceling opcode")
488
      raise CancelJob()
489

    
490
  @locking.ssynchronized(_QUEUE, shared=1)
491
  def NotifyStart(self):
492
    """Mark the opcode as running, not lock-waiting.
493

494
    This is called from the mcpu code as a notifier function, when the LU is
495
    finally about to start the Exec() method. Of course, to have end-user
496
    visible results, the opcode must be initially (before calling into
497
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
498

499
    """
500
    assert self._op in self._job.ops
501
    assert self._op.status in (constants.OP_STATUS_WAITING,
502
                               constants.OP_STATUS_CANCELING)
503

    
504
    # Cancel here if we were asked to
505
    self._CheckCancel()
506

    
507
    logging.debug("Opcode is now running")
508

    
509
    self._op.status = constants.OP_STATUS_RUNNING
510
    self._op.exec_timestamp = TimeStampNow()
511

    
512
    # And finally replicate the job status
513
    self._queue.UpdateJobUnlocked(self._job)
514

    
515
  @locking.ssynchronized(_QUEUE, shared=1)
516
  def _AppendFeedback(self, timestamp, log_type, log_msg):
517
    """Internal feedback append function, with locks
518

519
    """
520
    self._job.log_serial += 1
521
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
522
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
523

    
524
  def Feedback(self, *args):
525
    """Append a log entry.
526

527
    """
528
    assert len(args) < 3
529

    
530
    if len(args) == 1:
531
      log_type = constants.ELOG_MESSAGE
532
      log_msg = args[0]
533
    else:
534
      (log_type, log_msg) = args
535

    
536
    # The time is split to make serialization easier and not lose
537
    # precision.
538
    timestamp = utils.SplitTime(time.time())
539
    self._AppendFeedback(timestamp, log_type, log_msg)
540

    
541
  def CheckCancel(self):
542
    """Check whether job has been cancelled.
543

544
    """
545
    assert self._op.status in (constants.OP_STATUS_WAITING,
546
                               constants.OP_STATUS_CANCELING)
547

    
548
    # Cancel here if we were asked to
549
    self._CheckCancel()
550

    
551
  def SubmitManyJobs(self, jobs):
552
    """Submits jobs for processing.
553

554
    See L{JobQueue.SubmitManyJobs}.
555

556
    """
557
    # Locking is done in job queue
558
    return self._queue.SubmitManyJobs(jobs)
559

    
560

    
561
class _JobChangesChecker(object):
562
  def __init__(self, fields, prev_job_info, prev_log_serial):
563
    """Initializes this class.
564

565
    @type fields: list of strings
566
    @param fields: Fields requested by LUXI client
567
    @type prev_job_info: string
568
    @param prev_job_info: previous job info, as passed by the LUXI client
569
    @type prev_log_serial: string
570
    @param prev_log_serial: previous job serial, as passed by the LUXI client
571

572
    """
573
    self._squery = _SimpleJobQuery(fields)
574
    self._prev_job_info = prev_job_info
575
    self._prev_log_serial = prev_log_serial
576

    
577
  def __call__(self, job):
578
    """Checks whether job has changed.
579

580
    @type job: L{_QueuedJob}
581
    @param job: Job object
582

583
    """
584
    assert not job.writable, "Expected read-only job"
585

    
586
    status = job.CalcStatus()
587
    job_info = self._squery(job)
588
    log_entries = job.GetLogEntries(self._prev_log_serial)
589

    
590
    # Serializing and deserializing data can cause type changes (e.g. from
591
    # tuple to list) or precision loss. We're doing it here so that we get
592
    # the same modifications as the data received from the client. Without
593
    # this, the comparison afterwards might fail without the data being
594
    # significantly different.
595
    # TODO: we just deserialized from disk, investigate how to make sure that
596
    # the job info and log entries are compatible to avoid this further step.
597
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
598
    # efficient, though floats will be tricky
599
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
600
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
601

    
602
    # Don't even try to wait if the job is no longer running, there will be
603
    # no changes.
604
    if (status not in (constants.JOB_STATUS_QUEUED,
605
                       constants.JOB_STATUS_RUNNING,
606
                       constants.JOB_STATUS_WAITING) or
607
        job_info != self._prev_job_info or
608
        (log_entries and self._prev_log_serial != log_entries[0][0])):
609
      logging.debug("Job %s changed", job.id)
610
      return (job_info, log_entries)
611

    
612
    return None
613

    
614

    
615
class _JobFileChangesWaiter(object):
616
  def __init__(self, filename):
617
    """Initializes this class.
618

619
    @type filename: string
620
    @param filename: Path to job file
621
    @raises errors.InotifyError: if the notifier cannot be setup
622

623
    """
624
    self._wm = pyinotify.WatchManager()
625
    self._inotify_handler = \
626
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
627
    self._notifier = \
628
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
629
    try:
630
      self._inotify_handler.enable()
631
    except Exception:
632
      # pyinotify doesn't close file descriptors automatically
633
      self._notifier.stop()
634
      raise
635

    
636
  def _OnInotify(self, notifier_enabled):
637
    """Callback for inotify.
638

639
    """
640
    if not notifier_enabled:
641
      self._inotify_handler.enable()
642

    
643
  def Wait(self, timeout):
644
    """Waits for the job file to change.
645

646
    @type timeout: float
647
    @param timeout: Timeout in seconds
648
    @return: Whether there have been events
649

650
    """
651
    assert timeout >= 0
652
    have_events = self._notifier.check_events(timeout * 1000)
653
    if have_events:
654
      self._notifier.read_events()
655
    self._notifier.process_events()
656
    return have_events
657

    
658
  def Close(self):
659
    """Closes underlying notifier and its file descriptor.
660

661
    """
662
    self._notifier.stop()
663

    
664

    
665
class _JobChangesWaiter(object):
666
  def __init__(self, filename):
667
    """Initializes this class.
668

669
    @type filename: string
670
    @param filename: Path to job file
671

672
    """
673
    self._filewaiter = None
674
    self._filename = filename
675

    
676
  def Wait(self, timeout):
677
    """Waits for a job to change.
678

679
    @type timeout: float
680
    @param timeout: Timeout in seconds
681
    @return: Whether there have been events
682

683
    """
684
    if self._filewaiter:
685
      return self._filewaiter.Wait(timeout)
686

    
687
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
688
    # If this point is reached, return immediately and let caller check the job
689
    # file again in case there were changes since the last check. This avoids a
690
    # race condition.
691
    self._filewaiter = _JobFileChangesWaiter(self._filename)
692

    
693
    return True
694

    
695
  def Close(self):
696
    """Closes underlying waiter.
697

698
    """
699
    if self._filewaiter:
700
      self._filewaiter.Close()
701

    
702

    
703
class _WaitForJobChangesHelper(object):
704
  """Helper class using inotify to wait for changes in a job file.
705

706
  This class takes a previous job status and serial, and alerts the client when
707
  the current job status has changed.
708

709
  """
710
  @staticmethod
711
  def _CheckForChanges(counter, job_load_fn, check_fn):
712
    if counter.next() > 0:
713
      # If this isn't the first check the job is given some more time to change
714
      # again. This gives better performance for jobs generating many
715
      # changes/messages.
716
      time.sleep(0.1)
717

    
718
    job = job_load_fn()
719
    if not job:
720
      raise errors.JobLost()
721

    
722
    result = check_fn(job)
723
    if result is None:
724
      raise utils.RetryAgain()
725

    
726
    return result
727

    
728
  def __call__(self, filename, job_load_fn,
729
               fields, prev_job_info, prev_log_serial, timeout):
730
    """Waits for changes on a job.
731

732
    @type filename: string
733
    @param filename: File on which to wait for changes
734
    @type job_load_fn: callable
735
    @param job_load_fn: Function to load job
736
    @type fields: list of strings
737
    @param fields: Which fields to check for changes
738
    @type prev_job_info: list or None
739
    @param prev_job_info: Last job information returned
740
    @type prev_log_serial: int
741
    @param prev_log_serial: Last job message serial number
742
    @type timeout: float
743
    @param timeout: maximum time to wait in seconds
744

745
    """
746
    counter = itertools.count()
747
    try:
748
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
749
      waiter = _JobChangesWaiter(filename)
750
      try:
751
        return utils.Retry(compat.partial(self._CheckForChanges,
752
                                          counter, job_load_fn, check_fn),
753
                           utils.RETRY_REMAINING_TIME, timeout,
754
                           wait_fn=waiter.Wait)
755
      finally:
756
        waiter.Close()
757
    except (errors.InotifyError, errors.JobLost):
758
      return None
759
    except utils.RetryTimeout:
760
      return constants.JOB_NOTCHANGED
761

    
762

    
763
def _EncodeOpError(err):
764
  """Encodes an error which occurred while processing an opcode.
765

766
  """
767
  if isinstance(err, errors.GenericError):
768
    to_encode = err
769
  else:
770
    to_encode = errors.OpExecError(str(err))
771

    
772
  return errors.EncodeException(to_encode)
773

    
774

    
775
class _TimeoutStrategyWrapper:
776
  def __init__(self, fn):
777
    """Initializes this class.
778

779
    """
780
    self._fn = fn
781
    self._next = None
782

    
783
  def _Advance(self):
784
    """Gets the next timeout if necessary.
785

786
    """
787
    if self._next is None:
788
      self._next = self._fn()
789

    
790
  def Peek(self):
791
    """Returns the next timeout.
792

793
    """
794
    self._Advance()
795
    return self._next
796

    
797
  def Next(self):
798
    """Returns the current timeout and advances the internal state.
799

800
    """
801
    self._Advance()
802
    result = self._next
803
    self._next = None
804
    return result
805

    
806

    
807
class _OpExecContext:
808
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
809
    """Initializes this class.
810

811
    """
812
    self.op = op
813
    self.index = index
814
    self.log_prefix = log_prefix
815
    self.summary = op.input.Summary()
816

    
817
    # Create local copy to modify
818
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
819
      self.jobdeps = op.input.depends[:]
820
    else:
821
      self.jobdeps = None
822

    
823
    self._timeout_strategy_factory = timeout_strategy_factory
824
    self._ResetTimeoutStrategy()
825

    
826
  def _ResetTimeoutStrategy(self):
827
    """Creates a new timeout strategy.
828

829
    """
830
    self._timeout_strategy = \
831
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
832

    
833
  def CheckPriorityIncrease(self):
834
    """Checks whether priority can and should be increased.
835

836
    Called when locks couldn't be acquired.
837

838
    """
839
    op = self.op
840

    
841
    # Exhausted all retries and next round should not use blocking acquire
842
    # for locks?
843
    if (self._timeout_strategy.Peek() is None and
844
        op.priority > constants.OP_PRIO_HIGHEST):
845
      logging.debug("Increasing priority")
846
      op.priority -= 1
847
      self._ResetTimeoutStrategy()
848
      return True
849

    
850
    return False
851

    
852
  def GetNextLockTimeout(self):
853
    """Returns the next lock acquire timeout.
854

855
    """
856
    return self._timeout_strategy.Next()
857

    
858

    
859
class _JobProcessor(object):
860
  (DEFER,
861
   WAITDEP,
862
   FINISHED) = range(1, 4)
863

    
864
  def __init__(self, queue, opexec_fn, job,
865
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
866
    """Initializes this class.
867

868
    """
869
    self.queue = queue
870
    self.opexec_fn = opexec_fn
871
    self.job = job
872
    self._timeout_strategy_factory = _timeout_strategy_factory
873

    
874
  @staticmethod
875
  def _FindNextOpcode(job, timeout_strategy_factory):
876
    """Locates the next opcode to run.
877

878
    @type job: L{_QueuedJob}
879
    @param job: Job object
880
    @param timeout_strategy_factory: Callable to create new timeout strategy
881

882
    """
883
    # Create some sort of a cache to speed up locating next opcode for future
884
    # lookups
885
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
886
    # pending and one for processed ops.
887
    if job.ops_iter is None:
888
      job.ops_iter = enumerate(job.ops)
889

    
890
    # Find next opcode to run
891
    while True:
892
      try:
893
        (idx, op) = job.ops_iter.next()
894
      except StopIteration:
895
        raise errors.ProgrammerError("Called for a finished job")
896

    
897
      if op.status == constants.OP_STATUS_RUNNING:
898
        # Found an opcode already marked as running
899
        raise errors.ProgrammerError("Called for job marked as running")
900

    
901
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
902
                             timeout_strategy_factory)
903

    
904
      if op.status not in constants.OPS_FINALIZED:
905
        return opctx
906

    
907
      # This is a job that was partially completed before master daemon
908
      # shutdown, so it can be expected that some opcodes are already
909
      # completed successfully (if any did error out, then the whole job
910
      # should have been aborted and not resubmitted for processing).
911
      logging.info("%s: opcode %s already processed, skipping",
912
                   opctx.log_prefix, opctx.summary)
913

    
914
  @staticmethod
915
  def _MarkWaitlock(job, op):
916
    """Marks an opcode as waiting for locks.
917

918
    The job's start timestamp is also set if necessary.
919

920
    @type job: L{_QueuedJob}
921
    @param job: Job object
922
    @type op: L{_QueuedOpCode}
923
    @param op: Opcode object
924

925
    """
926
    assert op in job.ops
927
    assert op.status in (constants.OP_STATUS_QUEUED,
928
                         constants.OP_STATUS_WAITING)
929

    
930
    update = False
931

    
932
    op.result = None
933

    
934
    if op.status == constants.OP_STATUS_QUEUED:
935
      op.status = constants.OP_STATUS_WAITING
936
      update = True
937

    
938
    if op.start_timestamp is None:
939
      op.start_timestamp = TimeStampNow()
940
      update = True
941

    
942
    if job.start_timestamp is None:
943
      job.start_timestamp = op.start_timestamp
944
      update = True
945

    
946
    assert op.status == constants.OP_STATUS_WAITING
947

    
948
    return update
949

    
950
  @staticmethod
951
  def _CheckDependencies(queue, job, opctx):
952
    """Checks if an opcode has dependencies and if so, processes them.
953

954
    @type queue: L{JobQueue}
955
    @param queue: Queue object
956
    @type job: L{_QueuedJob}
957
    @param job: Job object
958
    @type opctx: L{_OpExecContext}
959
    @param opctx: Opcode execution context
960
    @rtype: bool
961
    @return: Whether opcode will be re-scheduled by dependency tracker
962

963
    """
964
    op = opctx.op
965

    
966
    result = False
967

    
968
    while opctx.jobdeps:
969
      (dep_job_id, dep_status) = opctx.jobdeps[0]
970

    
971
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
972
                                                          dep_status)
973
      assert ht.TNonEmptyString(depmsg), "No dependency message"
974

    
975
      logging.info("%s: %s", opctx.log_prefix, depmsg)
976

    
977
      if depresult == _JobDependencyManager.CONTINUE:
978
        # Remove dependency and continue
979
        opctx.jobdeps.pop(0)
980

    
981
      elif depresult == _JobDependencyManager.WAIT:
982
        # Need to wait for notification, dependency tracker will re-add job
983
        # to workerpool
984
        result = True
985
        break
986

    
987
      elif depresult == _JobDependencyManager.CANCEL:
988
        # Job was cancelled, cancel this job as well
989
        job.Cancel()
990
        assert op.status == constants.OP_STATUS_CANCELING
991
        break
992

    
993
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
994
                         _JobDependencyManager.ERROR):
995
        # Job failed or there was an error, this job must fail
996
        op.status = constants.OP_STATUS_ERROR
997
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
998
        break
999

    
1000
      else:
1001
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1002
                                     depresult)
1003

    
1004
    return result
1005

    
1006
  def _ExecOpCodeUnlocked(self, opctx):
1007
    """Processes one opcode and returns the result.
1008

1009
    """
1010
    op = opctx.op
1011

    
1012
    assert op.status == constants.OP_STATUS_WAITING
1013

    
1014
    timeout = opctx.GetNextLockTimeout()
1015

    
1016
    try:
1017
      # Make sure not to hold queue lock while calling ExecOpCode
1018
      result = self.opexec_fn(op.input,
1019
                              _OpExecCallbacks(self.queue, self.job, op),
1020
                              timeout=timeout, priority=op.priority)
1021
    except mcpu.LockAcquireTimeout:
1022
      assert timeout is not None, "Received timeout for blocking acquire"
1023
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1024

    
1025
      assert op.status in (constants.OP_STATUS_WAITING,
1026
                           constants.OP_STATUS_CANCELING)
1027

    
1028
      # Was job cancelled while we were waiting for the lock?
1029
      if op.status == constants.OP_STATUS_CANCELING:
1030
        return (constants.OP_STATUS_CANCELING, None)
1031

    
1032
      # Stay in waitlock while trying to re-acquire lock
1033
      return (constants.OP_STATUS_WAITING, None)
1034
    except CancelJob:
1035
      logging.exception("%s: Canceling job", opctx.log_prefix)
1036
      assert op.status == constants.OP_STATUS_CANCELING
1037
      return (constants.OP_STATUS_CANCELING, None)
1038
    except Exception, err: # pylint: disable=W0703
1039
      logging.exception("%s: Caught exception in %s",
1040
                        opctx.log_prefix, opctx.summary)
1041
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1042
    else:
1043
      logging.debug("%s: %s successful",
1044
                    opctx.log_prefix, opctx.summary)
1045
      return (constants.OP_STATUS_SUCCESS, result)
1046

    
1047
  def __call__(self, _nextop_fn=None):
1048
    """Continues execution of a job.
1049

1050
    @param _nextop_fn: Callback function for tests
1051
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1052
      be deferred and C{WAITDEP} if the dependency manager
1053
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1054

1055
    """
1056
    queue = self.queue
1057
    job = self.job
1058

    
1059
    logging.debug("Processing job %s", job.id)
1060

    
1061
    queue.acquire(shared=1)
1062
    try:
1063
      opcount = len(job.ops)
1064

    
1065
      assert job.writable, "Expected writable job"
1066

    
1067
      # Don't do anything for finalized jobs
1068
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1069
        return self.FINISHED
1070

    
1071
      # Is a previous opcode still pending?
1072
      if job.cur_opctx:
1073
        opctx = job.cur_opctx
1074
        job.cur_opctx = None
1075
      else:
1076
        if __debug__ and _nextop_fn:
1077
          _nextop_fn()
1078
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1079

    
1080
      op = opctx.op
1081

    
1082
      # Consistency check
1083
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1084
                                     constants.OP_STATUS_CANCELING)
1085
                        for i in job.ops[opctx.index + 1:])
1086

    
1087
      assert op.status in (constants.OP_STATUS_QUEUED,
1088
                           constants.OP_STATUS_WAITING,
1089
                           constants.OP_STATUS_CANCELING)
1090

    
1091
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1092
              op.priority >= constants.OP_PRIO_HIGHEST)
1093

    
1094
      waitjob = None
1095

    
1096
      if op.status != constants.OP_STATUS_CANCELING:
1097
        assert op.status in (constants.OP_STATUS_QUEUED,
1098
                             constants.OP_STATUS_WAITING)
1099

    
1100
        # Prepare to start opcode
1101
        if self._MarkWaitlock(job, op):
1102
          # Write to disk
1103
          queue.UpdateJobUnlocked(job)
1104

    
1105
        assert op.status == constants.OP_STATUS_WAITING
1106
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1107
        assert job.start_timestamp and op.start_timestamp
1108
        assert waitjob is None
1109

    
1110
        # Check if waiting for a job is necessary
1111
        waitjob = self._CheckDependencies(queue, job, opctx)
1112

    
1113
        assert op.status in (constants.OP_STATUS_WAITING,
1114
                             constants.OP_STATUS_CANCELING,
1115
                             constants.OP_STATUS_ERROR)
1116

    
1117
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1118
                                         constants.OP_STATUS_ERROR)):
1119
          logging.info("%s: opcode %s waiting for locks",
1120
                       opctx.log_prefix, opctx.summary)
1121

    
1122
          assert not opctx.jobdeps, "Not all dependencies were removed"
1123

    
1124
          queue.release()
1125
          try:
1126
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1127
          finally:
1128
            queue.acquire(shared=1)
1129

    
1130
          op.status = op_status
1131
          op.result = op_result
1132

    
1133
          assert not waitjob
1134

    
1135
        if op.status == constants.OP_STATUS_WAITING:
1136
          # Couldn't get locks in time
1137
          assert not op.end_timestamp
1138
        else:
1139
          # Finalize opcode
1140
          op.end_timestamp = TimeStampNow()
1141

    
1142
          if op.status == constants.OP_STATUS_CANCELING:
1143
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1144
                                  for i in job.ops[opctx.index:])
1145
          else:
1146
            assert op.status in constants.OPS_FINALIZED
1147

    
1148
      if op.status == constants.OP_STATUS_WAITING or waitjob:
1149
        finalize = False
1150

    
1151
        if not waitjob and opctx.CheckPriorityIncrease():
1152
          # Priority was changed, need to update on-disk file
1153
          queue.UpdateJobUnlocked(job)
1154

    
1155
        # Keep around for another round
1156
        job.cur_opctx = opctx
1157

    
1158
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1159
                op.priority >= constants.OP_PRIO_HIGHEST)
1160

    
1161
        # In no case must the status be finalized here
1162
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1163

    
1164
      else:
1165
        # Ensure all opcodes so far have been successful
1166
        assert (opctx.index == 0 or
1167
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1168
                           for i in job.ops[:opctx.index]))
1169

    
1170
        # Reset context
1171
        job.cur_opctx = None
1172

    
1173
        if op.status == constants.OP_STATUS_SUCCESS:
1174
          finalize = False
1175

    
1176
        elif op.status == constants.OP_STATUS_ERROR:
1177
          # Ensure failed opcode has an exception as its result
1178
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1179

    
1180
          to_encode = errors.OpExecError("Preceding opcode failed")
1181
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1182
                                _EncodeOpError(to_encode))
1183
          finalize = True
1184

    
1185
          # Consistency check
1186
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1187
                            errors.GetEncodedError(i.result)
1188
                            for i in job.ops[opctx.index:])
1189

    
1190
        elif op.status == constants.OP_STATUS_CANCELING:
1191
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1192
                                "Job canceled by request")
1193
          finalize = True
1194

    
1195
        else:
1196
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1197

    
1198
        if opctx.index == (opcount - 1):
1199
          # Finalize on last opcode
1200
          finalize = True
1201

    
1202
        if finalize:
1203
          # All opcodes have been run, finalize job
1204
          job.Finalize()
1205

    
1206
        # Write to disk. If the job status is final, this is the final write
1207
        # allowed. Once the file has been written, it can be archived anytime.
1208
        queue.UpdateJobUnlocked(job)
1209

    
1210
        assert not waitjob
1211

    
1212
        if finalize:
1213
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1214
          return self.FINISHED
1215

    
1216
      assert not waitjob or queue.depmgr.JobWaiting(job)
1217

    
1218
      if waitjob:
1219
        return self.WAITDEP
1220
      else:
1221
        return self.DEFER
1222
    finally:
1223
      assert job.writable, "Job became read-only while being processed"
1224
      queue.release()
1225

    
1226

    
1227
def _EvaluateJobProcessorResult(depmgr, job, result):
1228
  """Looks at a result from L{_JobProcessor} for a job.
1229

1230
  To be used in a L{_JobQueueWorker}.
1231

1232
  """
1233
  if result == _JobProcessor.FINISHED:
1234
    # Notify waiting jobs
1235
    depmgr.NotifyWaiters(job.id)
1236

    
1237
  elif result == _JobProcessor.DEFER:
1238
    # Schedule again
1239
    raise workerpool.DeferTask(priority=job.CalcPriority())
1240

    
1241
  elif result == _JobProcessor.WAITDEP:
1242
    # No-op, dependency manager will re-schedule
1243
    pass
1244

    
1245
  else:
1246
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1247
                                 (result, ))
1248

    
1249

    
1250
class _JobQueueWorker(workerpool.BaseWorker):
1251
  """The actual job workers.
1252

1253
  """
1254
  def RunTask(self, job): # pylint: disable=W0221
1255
    """Job executor.
1256

1257
    @type job: L{_QueuedJob}
1258
    @param job: the job to be processed
1259

1260
    """
1261
    assert job.writable, "Expected writable job"
1262

    
1263
    # Ensure only one worker is active on a single job. If a job registers for
1264
    # a dependency job, and the other job notifies before the first worker is
1265
    # done, the job can end up in the tasklist more than once.
1266
    job.processor_lock.acquire()
1267
    try:
1268
      return self._RunTaskInner(job)
1269
    finally:
1270
      job.processor_lock.release()
1271

    
1272
  def _RunTaskInner(self, job):
1273
    """Executes a job.
1274

1275
    Must be called with per-job lock acquired.
1276

1277
    """
1278
    queue = job.queue
1279
    assert queue == self.pool.queue
1280

    
1281
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1282
    setname_fn(None)
1283

    
1284
    proc = mcpu.Processor(queue.context, job.id)
1285

    
1286
    # Create wrapper for setting thread name
1287
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1288
                                    proc.ExecOpCode)
1289

    
1290
    _EvaluateJobProcessorResult(queue.depmgr, job,
1291
                                _JobProcessor(queue, wrap_execop_fn, job)())
1292

    
1293
  @staticmethod
1294
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1295
    """Updates the worker thread name to include a short summary of the opcode.
1296

1297
    @param setname_fn: Callable setting worker thread name
1298
    @param execop_fn: Callable for executing opcode (usually
1299
                      L{mcpu.Processor.ExecOpCode})
1300

1301
    """
1302
    setname_fn(op)
1303
    try:
1304
      return execop_fn(op, *args, **kwargs)
1305
    finally:
1306
      setname_fn(None)
1307

    
1308
  @staticmethod
1309
  def _GetWorkerName(job, op):
1310
    """Sets the worker thread name.
1311

1312
    @type job: L{_QueuedJob}
1313
    @type op: L{opcodes.OpCode}
1314

1315
    """
1316
    parts = ["Job%s" % job.id]
1317

    
1318
    if op:
1319
      parts.append(op.TinySummary())
1320

    
1321
    return "/".join(parts)
1322

    
1323

    
1324
class _JobQueueWorkerPool(workerpool.WorkerPool):
1325
  """Simple class implementing a job-processing workerpool.
1326

1327
  """
1328
  def __init__(self, queue):
1329
    super(_JobQueueWorkerPool, self).__init__("Jq",
1330
                                              JOBQUEUE_THREADS,
1331
                                              _JobQueueWorker)
1332
    self.queue = queue
1333

    
1334

    
1335
class _JobDependencyManager:
1336
  """Keeps track of job dependencies.
1337

1338
  """
1339
  (WAIT,
1340
   ERROR,
1341
   CANCEL,
1342
   CONTINUE,
1343
   WRONGSTATUS) = range(1, 6)
1344

    
1345
  def __init__(self, getstatus_fn, enqueue_fn):
1346
    """Initializes this class.
1347

1348
    """
1349
    self._getstatus_fn = getstatus_fn
1350
    self._enqueue_fn = enqueue_fn
1351

    
1352
    self._waiters = {}
1353
    self._lock = locking.SharedLock("JobDepMgr")
1354

    
1355
  @locking.ssynchronized(_LOCK, shared=1)
1356
  def GetLockInfo(self, requested): # pylint: disable=W0613
1357
    """Retrieves information about waiting jobs.
1358

1359
    @type requested: set
1360
    @param requested: Requested information, see C{query.LQ_*}
1361

1362
    """
1363
    # No need to sort here, that's being done by the lock manager and query
1364
    # library. There are no priorities for notifying jobs, hence all show up as
1365
    # one item under "pending".
1366
    return [("job/%s" % job_id, None, None,
1367
             [("job", [job.id for job in waiters])])
1368
            for job_id, waiters in self._waiters.items()
1369
            if waiters]
1370

    
1371
  @locking.ssynchronized(_LOCK, shared=1)
1372
  def JobWaiting(self, job):
1373
    """Checks if a job is waiting.
1374

1375
    """
1376
    return compat.any(job in jobs
1377
                      for jobs in self._waiters.values())
1378

    
1379
  @locking.ssynchronized(_LOCK)
1380
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1381
    """Checks if a dependency job has the requested status.
1382

1383
    If the other job is not yet in a finalized status, the calling job will be
1384
    notified (re-added to the workerpool) at a later point.
1385

1386
    @type job: L{_QueuedJob}
1387
    @param job: Job object
1388
    @type dep_job_id: int
1389
    @param dep_job_id: ID of dependency job
1390
    @type dep_status: list
1391
    @param dep_status: Required status
1392

1393
    """
1394
    assert ht.TJobId(job.id)
1395
    assert ht.TJobId(dep_job_id)
1396
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1397

    
1398
    if job.id == dep_job_id:
1399
      return (self.ERROR, "Job can't depend on itself")
1400

    
1401
    # Get status of dependency job
1402
    try:
1403
      status = self._getstatus_fn(dep_job_id)
1404
    except errors.JobLost, err:
1405
      return (self.ERROR, "Dependency error: %s" % err)
1406

    
1407
    assert status in constants.JOB_STATUS_ALL
1408

    
1409
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1410

    
1411
    if status not in constants.JOBS_FINALIZED:
1412
      # Register for notification and wait for job to finish
1413
      job_id_waiters.add(job)
1414
      return (self.WAIT,
1415
              "Need to wait for job %s, wanted status '%s'" %
1416
              (dep_job_id, dep_status))
1417

    
1418
    # Remove from waiters list
1419
    if job in job_id_waiters:
1420
      job_id_waiters.remove(job)
1421

    
1422
    if (status == constants.JOB_STATUS_CANCELED and
1423
        constants.JOB_STATUS_CANCELED not in dep_status):
1424
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1425

    
1426
    elif not dep_status or status in dep_status:
1427
      return (self.CONTINUE,
1428
              "Dependency job %s finished with status '%s'" %
1429
              (dep_job_id, status))
1430

    
1431
    else:
1432
      return (self.WRONGSTATUS,
1433
              "Dependency job %s finished with status '%s',"
1434
              " not one of '%s' as required" %
1435
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1436

    
1437
  def _RemoveEmptyWaitersUnlocked(self):
1438
    """Remove all jobs without actual waiters.
1439

1440
    """
1441
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1442
                   if not waiters]:
1443
      del self._waiters[job_id]
1444

    
1445
  def NotifyWaiters(self, job_id):
1446
    """Notifies all jobs waiting for a certain job ID.
1447

1448
    @attention: Do not call until L{CheckAndRegister} returned a status other
1449
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1450
    @type job_id: int
1451
    @param job_id: Job ID
1452

1453
    """
1454
    assert ht.TJobId(job_id)
1455

    
1456
    self._lock.acquire()
1457
    try:
1458
      self._RemoveEmptyWaitersUnlocked()
1459

    
1460
      jobs = self._waiters.pop(job_id, None)
1461
    finally:
1462
      self._lock.release()
1463

    
1464
    if jobs:
1465
      # Re-add jobs to workerpool
1466
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1467
                    len(jobs), job_id)
1468
      self._enqueue_fn(jobs)
1469

    
1470

    
1471
def _RequireOpenQueue(fn):
1472
  """Decorator for "public" functions.
1473

1474
  This function should be used for all 'public' functions. That is,
1475
  functions usually called from other classes. Note that this should
1476
  be applied only to methods (not plain functions), since it expects
1477
  that the decorated function is called with a first argument that has
1478
  a '_queue_filelock' argument.
1479

1480
  @warning: Use this decorator only after locking.ssynchronized
1481

1482
  Example::
1483
    @locking.ssynchronized(_LOCK)
1484
    @_RequireOpenQueue
1485
    def Example(self):
1486
      pass
1487

1488
  """
1489
  def wrapper(self, *args, **kwargs):
1490
    # pylint: disable=W0212
1491
    assert self._queue_filelock is not None, "Queue should be open"
1492
    return fn(self, *args, **kwargs)
1493
  return wrapper
1494

    
1495

    
1496
def _RequireNonDrainedQueue(fn):
1497
  """Decorator checking for a non-drained queue.
1498

1499
  To be used with functions submitting new jobs.
1500

1501
  """
1502
  def wrapper(self, *args, **kwargs):
1503
    """Wrapper function.
1504

1505
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1506

1507
    """
1508
    # Ok when sharing the big job queue lock, as the drain file is created when
1509
    # the lock is exclusive.
1510
    # Needs access to protected member, pylint: disable=W0212
1511
    if self._drained:
1512
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1513

    
1514
    if not self._accepting_jobs:
1515
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1516

    
1517
    return fn(self, *args, **kwargs)
1518
  return wrapper
1519

    
1520

    
1521
class JobQueue(object):
1522
  """Queue used to manage the jobs.
1523

1524
  """
1525
  def __init__(self, context):
1526
    """Constructor for JobQueue.
1527

1528
    The constructor will initialize the job queue object and then
1529
    start loading the current jobs from disk, either for starting them
1530
    (if they were queue) or for aborting them (if they were already
1531
    running).
1532

1533
    @type context: GanetiContext
1534
    @param context: the context object for access to the configuration
1535
        data and other ganeti objects
1536

1537
    """
1538
    self.context = context
1539
    self._memcache = weakref.WeakValueDictionary()
1540
    self._my_hostname = netutils.Hostname.GetSysName()
1541

    
1542
    # The Big JobQueue lock. If a code block or method acquires it in shared
1543
    # mode safe it must guarantee concurrency with all the code acquiring it in
1544
    # shared mode, including itself. In order not to acquire it at all
1545
    # concurrency must be guaranteed with all code acquiring it in shared mode
1546
    # and all code acquiring it exclusively.
1547
    self._lock = locking.SharedLock("JobQueue")
1548

    
1549
    self.acquire = self._lock.acquire
1550
    self.release = self._lock.release
1551

    
1552
    # Accept jobs by default
1553
    self._accepting_jobs = True
1554

    
1555
    # Initialize the queue, and acquire the filelock.
1556
    # This ensures no other process is working on the job queue.
1557
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1558

    
1559
    # Read serial file
1560
    self._last_serial = jstore.ReadSerial()
1561
    assert self._last_serial is not None, ("Serial file was modified between"
1562
                                           " check in jstore and here")
1563

    
1564
    # Get initial list of nodes
1565
    self._nodes = dict((n.name, n.primary_ip)
1566
                       for n in self.context.cfg.GetAllNodesInfo().values()
1567
                       if n.master_candidate)
1568

    
1569
    # Remove master node
1570
    self._nodes.pop(self._my_hostname, None)
1571

    
1572
    # TODO: Check consistency across nodes
1573

    
1574
    self._queue_size = None
1575
    self._UpdateQueueSizeUnlocked()
1576
    assert ht.TInt(self._queue_size)
1577
    self._drained = jstore.CheckDrainFlag()
1578

    
1579
    # Job dependencies
1580
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1581
                                        self._EnqueueJobs)
1582
    self.context.glm.AddToLockMonitor(self.depmgr)
1583

    
1584
    # Setup worker pool
1585
    self._wpool = _JobQueueWorkerPool(self)
1586
    try:
1587
      self._InspectQueue()
1588
    except:
1589
      self._wpool.TerminateWorkers()
1590
      raise
1591

    
1592
  @locking.ssynchronized(_LOCK)
1593
  @_RequireOpenQueue
1594
  def _InspectQueue(self):
1595
    """Loads the whole job queue and resumes unfinished jobs.
1596

1597
    This function needs the lock here because WorkerPool.AddTask() may start a
1598
    job while we're still doing our work.
1599

1600
    """
1601
    logging.info("Inspecting job queue")
1602

    
1603
    restartjobs = []
1604

    
1605
    all_job_ids = self._GetJobIDsUnlocked()
1606
    jobs_count = len(all_job_ids)
1607
    lastinfo = time.time()
1608
    for idx, job_id in enumerate(all_job_ids):
1609
      # Give an update every 1000 jobs or 10 seconds
1610
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1611
          idx == (jobs_count - 1)):
1612
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1613
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1614
        lastinfo = time.time()
1615

    
1616
      job = self._LoadJobUnlocked(job_id)
1617

    
1618
      # a failure in loading the job can cause 'None' to be returned
1619
      if job is None:
1620
        continue
1621

    
1622
      status = job.CalcStatus()
1623

    
1624
      if status == constants.JOB_STATUS_QUEUED:
1625
        restartjobs.append(job)
1626

    
1627
      elif status in (constants.JOB_STATUS_RUNNING,
1628
                      constants.JOB_STATUS_WAITING,
1629
                      constants.JOB_STATUS_CANCELING):
1630
        logging.warning("Unfinished job %s found: %s", job.id, job)
1631

    
1632
        if status == constants.JOB_STATUS_WAITING:
1633
          # Restart job
1634
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1635
          restartjobs.append(job)
1636
        else:
1637
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1638
                                "Unclean master daemon shutdown")
1639
          job.Finalize()
1640

    
1641
        self.UpdateJobUnlocked(job)
1642

    
1643
    if restartjobs:
1644
      logging.info("Restarting %s jobs", len(restartjobs))
1645
      self._EnqueueJobsUnlocked(restartjobs)
1646

    
1647
    logging.info("Job queue inspection finished")
1648

    
1649
  def _GetRpc(self, address_list):
1650
    """Gets RPC runner with context.
1651

1652
    """
1653
    return rpc.JobQueueRunner(self.context, address_list)
1654

    
1655
  @locking.ssynchronized(_LOCK)
1656
  @_RequireOpenQueue
1657
  def AddNode(self, node):
1658
    """Register a new node with the queue.
1659

1660
    @type node: L{objects.Node}
1661
    @param node: the node object to be added
1662

1663
    """
1664
    node_name = node.name
1665
    assert node_name != self._my_hostname
1666

    
1667
    # Clean queue directory on added node
1668
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1669
    msg = result.fail_msg
1670
    if msg:
1671
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1672
                      node_name, msg)
1673

    
1674
    if not node.master_candidate:
1675
      # remove if existing, ignoring errors
1676
      self._nodes.pop(node_name, None)
1677
      # and skip the replication of the job ids
1678
      return
1679

    
1680
    # Upload the whole queue excluding archived jobs
1681
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1682

    
1683
    # Upload current serial file
1684
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1685

    
1686
    # Static address list
1687
    addrs = [node.primary_ip]
1688

    
1689
    for file_name in files:
1690
      # Read file content
1691
      content = utils.ReadFile(file_name)
1692

    
1693
      result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name,
1694
                                                        content)
1695
      msg = result[node_name].fail_msg
1696
      if msg:
1697
        logging.error("Failed to upload file %s to node %s: %s",
1698
                      file_name, node_name, msg)
1699

    
1700
    self._nodes[node_name] = node.primary_ip
1701

    
1702
  @locking.ssynchronized(_LOCK)
1703
  @_RequireOpenQueue
1704
  def RemoveNode(self, node_name):
1705
    """Callback called when removing nodes from the cluster.
1706

1707
    @type node_name: str
1708
    @param node_name: the name of the node to remove
1709

1710
    """
1711
    self._nodes.pop(node_name, None)
1712

    
1713
  @staticmethod
1714
  def _CheckRpcResult(result, nodes, failmsg):
1715
    """Verifies the status of an RPC call.
1716

1717
    Since we aim to keep consistency should this node (the current
1718
    master) fail, we will log errors if our rpc fail, and especially
1719
    log the case when more than half of the nodes fails.
1720

1721
    @param result: the data as returned from the rpc call
1722
    @type nodes: list
1723
    @param nodes: the list of nodes we made the call to
1724
    @type failmsg: str
1725
    @param failmsg: the identifier to be used for logging
1726

1727
    """
1728
    failed = []
1729
    success = []
1730

    
1731
    for node in nodes:
1732
      msg = result[node].fail_msg
1733
      if msg:
1734
        failed.append(node)
1735
        logging.error("RPC call %s (%s) failed on node %s: %s",
1736
                      result[node].call, failmsg, node, msg)
1737
      else:
1738
        success.append(node)
1739

    
1740
    # +1 for the master node
1741
    if (len(success) + 1) < len(failed):
1742
      # TODO: Handle failing nodes
1743
      logging.error("More than half of the nodes failed")
1744

    
1745
  def _GetNodeIp(self):
1746
    """Helper for returning the node name/ip list.
1747

1748
    @rtype: (list, list)
1749
    @return: a tuple of two lists, the first one with the node
1750
        names and the second one with the node addresses
1751

1752
    """
1753
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1754
    name_list = self._nodes.keys()
1755
    addr_list = [self._nodes[name] for name in name_list]
1756
    return name_list, addr_list
1757

    
1758
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1759
    """Writes a file locally and then replicates it to all nodes.
1760

1761
    This function will replace the contents of a file on the local
1762
    node and then replicate it to all the other nodes we have.
1763

1764
    @type file_name: str
1765
    @param file_name: the path of the file to be replicated
1766
    @type data: str
1767
    @param data: the new contents of the file
1768
    @type replicate: boolean
1769
    @param replicate: whether to spread the changes to the remote nodes
1770

1771
    """
1772
    getents = runtime.GetEnts()
1773
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1774
                    gid=getents.masterd_gid)
1775

    
1776
    if replicate:
1777
      names, addrs = self._GetNodeIp()
1778
      result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data)
1779
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1780

    
1781
  def _RenameFilesUnlocked(self, rename):
1782
    """Renames a file locally and then replicate the change.
1783

1784
    This function will rename a file in the local queue directory
1785
    and then replicate this rename to all the other nodes we have.
1786

1787
    @type rename: list of (old, new)
1788
    @param rename: List containing tuples mapping old to new names
1789

1790
    """
1791
    # Rename them locally
1792
    for old, new in rename:
1793
      utils.RenameFile(old, new, mkdir=True)
1794

    
1795
    # ... and on all nodes
1796
    names, addrs = self._GetNodeIp()
1797
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1798
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1799

    
1800
  def _NewSerialsUnlocked(self, count):
1801
    """Generates a new job identifier.
1802

1803
    Job identifiers are unique during the lifetime of a cluster.
1804

1805
    @type count: integer
1806
    @param count: how many serials to return
1807
    @rtype: list of int
1808
    @return: a list of job identifiers.
1809

1810
    """
1811
    assert ht.TPositiveInt(count)
1812

    
1813
    # New number
1814
    serial = self._last_serial + count
1815

    
1816
    # Write to file
1817
    self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1818
                             "%s\n" % serial, True)
1819

    
1820
    result = [jstore.FormatJobID(v)
1821
              for v in range(self._last_serial + 1, serial + 1)]
1822

    
1823
    # Keep it only if we were able to write the file
1824
    self._last_serial = serial
1825

    
1826
    assert len(result) == count
1827

    
1828
    return result
1829

    
1830
  @staticmethod
1831
  def _GetJobPath(job_id):
1832
    """Returns the job file for a given job id.
1833

1834
    @type job_id: str
1835
    @param job_id: the job identifier
1836
    @rtype: str
1837
    @return: the path to the job file
1838

1839
    """
1840
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1841

    
1842
  @staticmethod
1843
  def _GetArchivedJobPath(job_id):
1844
    """Returns the archived job file for a give job id.
1845

1846
    @type job_id: str
1847
    @param job_id: the job identifier
1848
    @rtype: str
1849
    @return: the path to the archived job file
1850

1851
    """
1852
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1853
                          jstore.GetArchiveDirectory(job_id),
1854
                          "job-%s" % job_id)
1855

    
1856
  @staticmethod
1857
  def _GetJobIDsUnlocked(sort=True):
1858
    """Return all known job IDs.
1859

1860
    The method only looks at disk because it's a requirement that all
1861
    jobs are present on disk (so in the _memcache we don't have any
1862
    extra IDs).
1863

1864
    @type sort: boolean
1865
    @param sort: perform sorting on the returned job ids
1866
    @rtype: list
1867
    @return: the list of job IDs
1868

1869
    """
1870
    jlist = []
1871
    for filename in utils.ListVisibleFiles(pathutils.QUEUE_DIR):
1872
      m = constants.JOB_FILE_RE.match(filename)
1873
      if m:
1874
        jlist.append(int(m.group(1)))
1875
    if sort:
1876
      jlist.sort()
1877
    return jlist
1878

    
1879
  def _LoadJobUnlocked(self, job_id):
1880
    """Loads a job from the disk or memory.
1881

1882
    Given a job id, this will return the cached job object if
1883
    existing, or try to load the job from the disk. If loading from
1884
    disk, it will also add the job to the cache.
1885

1886
    @type job_id: int
1887
    @param job_id: the job id
1888
    @rtype: L{_QueuedJob} or None
1889
    @return: either None or the job object
1890

1891
    """
1892
    job = self._memcache.get(job_id, None)
1893
    if job:
1894
      logging.debug("Found job %s in memcache", job_id)
1895
      assert job.writable, "Found read-only job in memcache"
1896
      return job
1897

    
1898
    try:
1899
      job = self._LoadJobFromDisk(job_id, False)
1900
      if job is None:
1901
        return job
1902
    except errors.JobFileCorrupted:
1903
      old_path = self._GetJobPath(job_id)
1904
      new_path = self._GetArchivedJobPath(job_id)
1905
      if old_path == new_path:
1906
        # job already archived (future case)
1907
        logging.exception("Can't parse job %s", job_id)
1908
      else:
1909
        # non-archived case
1910
        logging.exception("Can't parse job %s, will archive.", job_id)
1911
        self._RenameFilesUnlocked([(old_path, new_path)])
1912
      return None
1913

    
1914
    assert job.writable, "Job just loaded is not writable"
1915

    
1916
    self._memcache[job_id] = job
1917
    logging.debug("Added job %s to the cache", job_id)
1918
    return job
1919

    
1920
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1921
    """Load the given job file from disk.
1922

1923
    Given a job file, read, load and restore it in a _QueuedJob format.
1924

1925
    @type job_id: int
1926
    @param job_id: job identifier
1927
    @type try_archived: bool
1928
    @param try_archived: Whether to try loading an archived job
1929
    @rtype: L{_QueuedJob} or None
1930
    @return: either None or the job object
1931

1932
    """
1933
    path_functions = [(self._GetJobPath, True)]
1934

    
1935
    if try_archived:
1936
      path_functions.append((self._GetArchivedJobPath, False))
1937

    
1938
    raw_data = None
1939
    writable_default = None
1940

    
1941
    for (fn, writable_default) in path_functions:
1942
      filepath = fn(job_id)
1943
      logging.debug("Loading job from %s", filepath)
1944
      try:
1945
        raw_data = utils.ReadFile(filepath)
1946
      except EnvironmentError, err:
1947
        if err.errno != errno.ENOENT:
1948
          raise
1949
      else:
1950
        break
1951

    
1952
    if not raw_data:
1953
      return None
1954

    
1955
    if writable is None:
1956
      writable = writable_default
1957

    
1958
    try:
1959
      data = serializer.LoadJson(raw_data)
1960
      job = _QueuedJob.Restore(self, data, writable)
1961
    except Exception, err: # pylint: disable=W0703
1962
      raise errors.JobFileCorrupted(err)
1963

    
1964
    return job
1965

    
1966
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1967
    """Load the given job file from disk.
1968

1969
    Given a job file, read, load and restore it in a _QueuedJob format.
1970
    In case of error reading the job, it gets returned as None, and the
1971
    exception is logged.
1972

1973
    @type job_id: int
1974
    @param job_id: job identifier
1975
    @type try_archived: bool
1976
    @param try_archived: Whether to try loading an archived job
1977
    @rtype: L{_QueuedJob} or None
1978
    @return: either None or the job object
1979

1980
    """
1981
    try:
1982
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
1983
    except (errors.JobFileCorrupted, EnvironmentError):
1984
      logging.exception("Can't load/parse job %s", job_id)
1985
      return None
1986

    
1987
  def _UpdateQueueSizeUnlocked(self):
1988
    """Update the queue size.
1989

1990
    """
1991
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1992

    
1993
  @locking.ssynchronized(_LOCK)
1994
  @_RequireOpenQueue
1995
  def SetDrainFlag(self, drain_flag):
1996
    """Sets the drain flag for the queue.
1997

1998
    @type drain_flag: boolean
1999
    @param drain_flag: Whether to set or unset the drain flag
2000

2001
    """
2002
    jstore.SetDrainFlag(drain_flag)
2003

    
2004
    self._drained = drain_flag
2005

    
2006
    return True
2007

    
2008
  @_RequireOpenQueue
2009
  def _SubmitJobUnlocked(self, job_id, ops):
2010
    """Create and store a new job.
2011

2012
    This enters the job into our job queue and also puts it on the new
2013
    queue, in order for it to be picked up by the queue processors.
2014

2015
    @type job_id: job ID
2016
    @param job_id: the job ID for the new job
2017
    @type ops: list
2018
    @param ops: The list of OpCodes that will become the new job.
2019
    @rtype: L{_QueuedJob}
2020
    @return: the job object to be queued
2021
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2022
    @raise errors.GenericError: If an opcode is not valid
2023

2024
    """
2025
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2026
      raise errors.JobQueueFull()
2027

    
2028
    job = _QueuedJob(self, job_id, ops, True)
2029

    
2030
    # Check priority
2031
    for idx, op in enumerate(job.ops):
2032
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2033
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2034
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2035
                                  " are %s" % (idx, op.priority, allowed))
2036

    
2037
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2038
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2039
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2040
                                  " match %s: %s" %
2041
                                  (idx, opcodes.TNoRelativeJobDependencies,
2042
                                   dependencies))
2043

    
2044
    # Write to disk
2045
    self.UpdateJobUnlocked(job)
2046

    
2047
    self._queue_size += 1
2048

    
2049
    logging.debug("Adding new job %s to the cache", job_id)
2050
    self._memcache[job_id] = job
2051

    
2052
    return job
2053

    
2054
  @locking.ssynchronized(_LOCK)
2055
  @_RequireOpenQueue
2056
  @_RequireNonDrainedQueue
2057
  def SubmitJob(self, ops):
2058
    """Create and store a new job.
2059

2060
    @see: L{_SubmitJobUnlocked}
2061

2062
    """
2063
    (job_id, ) = self._NewSerialsUnlocked(1)
2064
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2065
    return job_id
2066

    
2067
  @locking.ssynchronized(_LOCK)
2068
  @_RequireOpenQueue
2069
  @_RequireNonDrainedQueue
2070
  def SubmitManyJobs(self, jobs):
2071
    """Create and store multiple jobs.
2072

2073
    @see: L{_SubmitJobUnlocked}
2074

2075
    """
2076
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2077

    
2078
    (results, added_jobs) = \
2079
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2080

    
2081
    self._EnqueueJobsUnlocked(added_jobs)
2082

    
2083
    return results
2084

    
2085
  @staticmethod
2086
  def _FormatSubmitError(msg, ops):
2087
    """Formats errors which occurred while submitting a job.
2088

2089
    """
2090
    return ("%s; opcodes %s" %
2091
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2092

    
2093
  @staticmethod
2094
  def _ResolveJobDependencies(resolve_fn, deps):
2095
    """Resolves relative job IDs in dependencies.
2096

2097
    @type resolve_fn: callable
2098
    @param resolve_fn: Function to resolve a relative job ID
2099
    @type deps: list
2100
    @param deps: Dependencies
2101
    @rtype: list
2102
    @return: Resolved dependencies
2103

2104
    """
2105
    result = []
2106

    
2107
    for (dep_job_id, dep_status) in deps:
2108
      if ht.TRelativeJobId(dep_job_id):
2109
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2110
        try:
2111
          job_id = resolve_fn(dep_job_id)
2112
        except IndexError:
2113
          # Abort
2114
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2115
      else:
2116
        job_id = dep_job_id
2117

    
2118
      result.append((job_id, dep_status))
2119

    
2120
    return (True, result)
2121

    
2122
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2123
    """Create and store multiple jobs.
2124

2125
    @see: L{_SubmitJobUnlocked}
2126

2127
    """
2128
    results = []
2129
    added_jobs = []
2130

    
2131
    def resolve_fn(job_idx, reljobid):
2132
      assert reljobid < 0
2133
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2134

    
2135
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2136
      for op in ops:
2137
        if getattr(op, opcodes.DEPEND_ATTR, None):
2138
          (status, data) = \
2139
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2140
                                         op.depends)
2141
          if not status:
2142
            # Abort resolving dependencies
2143
            assert ht.TNonEmptyString(data), "No error message"
2144
            break
2145
          # Use resolved dependencies
2146
          op.depends = data
2147
      else:
2148
        try:
2149
          job = self._SubmitJobUnlocked(job_id, ops)
2150
        except errors.GenericError, err:
2151
          status = False
2152
          data = self._FormatSubmitError(str(err), ops)
2153
        else:
2154
          status = True
2155
          data = job_id
2156
          added_jobs.append(job)
2157

    
2158
      results.append((status, data))
2159

    
2160
    return (results, added_jobs)
2161

    
2162
  @locking.ssynchronized(_LOCK)
2163
  def _EnqueueJobs(self, jobs):
2164
    """Helper function to add jobs to worker pool's queue.
2165

2166
    @type jobs: list
2167
    @param jobs: List of all jobs
2168

2169
    """
2170
    return self._EnqueueJobsUnlocked(jobs)
2171

    
2172
  def _EnqueueJobsUnlocked(self, jobs):
2173
    """Helper function to add jobs to worker pool's queue.
2174

2175
    @type jobs: list
2176
    @param jobs: List of all jobs
2177

2178
    """
2179
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2180
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2181
                             priority=[job.CalcPriority() for job in jobs])
2182

    
2183
  def _GetJobStatusForDependencies(self, job_id):
2184
    """Gets the status of a job for dependencies.
2185

2186
    @type job_id: int
2187
    @param job_id: Job ID
2188
    @raise errors.JobLost: If job can't be found
2189

2190
    """
2191
    # Not using in-memory cache as doing so would require an exclusive lock
2192

    
2193
    # Try to load from disk
2194
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2195

    
2196
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2197

    
2198
    if job:
2199
      return job.CalcStatus()
2200

    
2201
    raise errors.JobLost("Job %s not found" % job_id)
2202

    
2203
  @_RequireOpenQueue
2204
  def UpdateJobUnlocked(self, job, replicate=True):
2205
    """Update a job's on disk storage.
2206

2207
    After a job has been modified, this function needs to be called in
2208
    order to write the changes to disk and replicate them to the other
2209
    nodes.
2210

2211
    @type job: L{_QueuedJob}
2212
    @param job: the changed job
2213
    @type replicate: boolean
2214
    @param replicate: whether to replicate the change to remote nodes
2215

2216
    """
2217
    if __debug__:
2218
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2219
      assert (finalized ^ (job.end_timestamp is None))
2220
      assert job.writable, "Can't update read-only job"
2221

    
2222
    filename = self._GetJobPath(job.id)
2223
    data = serializer.DumpJson(job.Serialize())
2224
    logging.debug("Writing job %s to %s", job.id, filename)
2225
    self._UpdateJobQueueFile(filename, data, replicate)
2226

    
2227
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2228
                        timeout):
2229
    """Waits for changes in a job.
2230

2231
    @type job_id: int
2232
    @param job_id: Job identifier
2233
    @type fields: list of strings
2234
    @param fields: Which fields to check for changes
2235
    @type prev_job_info: list or None
2236
    @param prev_job_info: Last job information returned
2237
    @type prev_log_serial: int
2238
    @param prev_log_serial: Last job message serial number
2239
    @type timeout: float
2240
    @param timeout: maximum time to wait in seconds
2241
    @rtype: tuple (job info, log entries)
2242
    @return: a tuple of the job information as required via
2243
        the fields parameter, and the log entries as a list
2244

2245
        if the job has not changed and the timeout has expired,
2246
        we instead return a special value,
2247
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2248
        as such by the clients
2249

2250
    """
2251
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
2252
                             writable=False)
2253

    
2254
    helper = _WaitForJobChangesHelper()
2255

    
2256
    return helper(self._GetJobPath(job_id), load_fn,
2257
                  fields, prev_job_info, prev_log_serial, timeout)
2258

    
2259
  @locking.ssynchronized(_LOCK)
2260
  @_RequireOpenQueue
2261
  def CancelJob(self, job_id):
2262
    """Cancels a job.
2263

2264
    This will only succeed if the job has not started yet.
2265

2266
    @type job_id: int
2267
    @param job_id: job ID of job to be cancelled.
2268

2269
    """
2270
    logging.info("Cancelling job %s", job_id)
2271

    
2272
    job = self._LoadJobUnlocked(job_id)
2273
    if not job:
2274
      logging.debug("Job %s not found", job_id)
2275
      return (False, "Job %s not found" % job_id)
2276

    
2277
    assert job.writable, "Can't cancel read-only job"
2278

    
2279
    (success, msg) = job.Cancel()
2280

    
2281
    if success:
2282
      # If the job was finalized (e.g. cancelled), this is the final write
2283
      # allowed. The job can be archived anytime.
2284
      self.UpdateJobUnlocked(job)
2285

    
2286
    return (success, msg)
2287

    
2288
  @_RequireOpenQueue
2289
  def _ArchiveJobsUnlocked(self, jobs):
2290
    """Archives jobs.
2291

2292
    @type jobs: list of L{_QueuedJob}
2293
    @param jobs: Job objects
2294
    @rtype: int
2295
    @return: Number of archived jobs
2296

2297
    """
2298
    archive_jobs = []
2299
    rename_files = []
2300
    for job in jobs:
2301
      assert job.writable, "Can't archive read-only job"
2302

    
2303
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2304
        logging.debug("Job %s is not yet done", job.id)
2305
        continue
2306

    
2307
      archive_jobs.append(job)
2308

    
2309
      old = self._GetJobPath(job.id)
2310
      new = self._GetArchivedJobPath(job.id)
2311
      rename_files.append((old, new))
2312

    
2313
    # TODO: What if 1..n files fail to rename?
2314
    self._RenameFilesUnlocked(rename_files)
2315

    
2316
    logging.debug("Successfully archived job(s) %s",
2317
                  utils.CommaJoin(job.id for job in archive_jobs))
2318

    
2319
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2320
    # the files, we update the cached queue size from the filesystem. When we
2321
    # get around to fix the TODO: above, we can use the number of actually
2322
    # archived jobs to fix this.
2323
    self._UpdateQueueSizeUnlocked()
2324
    return len(archive_jobs)
2325

    
2326
  @locking.ssynchronized(_LOCK)
2327
  @_RequireOpenQueue
2328
  def ArchiveJob(self, job_id):
2329
    """Archives a job.
2330

2331
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2332

2333
    @type job_id: int
2334
    @param job_id: Job ID of job to be archived.
2335
    @rtype: bool
2336
    @return: Whether job was archived
2337

2338
    """
2339
    logging.info("Archiving job %s", job_id)
2340

    
2341
    job = self._LoadJobUnlocked(job_id)
2342
    if not job:
2343
      logging.debug("Job %s not found", job_id)
2344
      return False
2345

    
2346
    return self._ArchiveJobsUnlocked([job]) == 1
2347

    
2348
  @locking.ssynchronized(_LOCK)
2349
  @_RequireOpenQueue
2350
  def AutoArchiveJobs(self, age, timeout):
2351
    """Archives all jobs based on age.
2352

2353
    The method will archive all jobs which are older than the age
2354
    parameter. For jobs that don't have an end timestamp, the start
2355
    timestamp will be considered. The special '-1' age will cause
2356
    archival of all jobs (that are not running or queued).
2357

2358
    @type age: int
2359
    @param age: the minimum age in seconds
2360

2361
    """
2362
    logging.info("Archiving jobs with age more than %s seconds", age)
2363

    
2364
    now = time.time()
2365
    end_time = now + timeout
2366
    archived_count = 0
2367
    last_touched = 0
2368

    
2369
    all_job_ids = self._GetJobIDsUnlocked()
2370
    pending = []
2371
    for idx, job_id in enumerate(all_job_ids):
2372
      last_touched = idx + 1
2373

    
2374
      # Not optimal because jobs could be pending
2375
      # TODO: Measure average duration for job archival and take number of
2376
      # pending jobs into account.
2377
      if time.time() > end_time:
2378
        break
2379

    
2380
      # Returns None if the job failed to load
2381
      job = self._LoadJobUnlocked(job_id)
2382
      if job:
2383
        if job.end_timestamp is None:
2384
          if job.start_timestamp is None:
2385
            job_age = job.received_timestamp
2386
          else:
2387
            job_age = job.start_timestamp
2388
        else:
2389
          job_age = job.end_timestamp
2390

    
2391
        if age == -1 or now - job_age[0] > age:
2392
          pending.append(job)
2393

    
2394
          # Archive 10 jobs at a time
2395
          if len(pending) >= 10:
2396
            archived_count += self._ArchiveJobsUnlocked(pending)
2397
            pending = []
2398

    
2399
    if pending:
2400
      archived_count += self._ArchiveJobsUnlocked(pending)
2401

    
2402
    return (archived_count, len(all_job_ids) - last_touched)
2403

    
2404
  def _Query(self, fields, qfilter):
2405
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2406
                       namefield="id")
2407

    
2408
    job_ids = qobj.RequestedNames()
2409

    
2410
    list_all = (job_ids is None)
2411

    
2412
    if list_all:
2413
      # Since files are added to/removed from the queue atomically, there's no
2414
      # risk of getting the job ids in an inconsistent state.
2415
      job_ids = self._GetJobIDsUnlocked()
2416

    
2417
    jobs = []
2418

    
2419
    for job_id in job_ids:
2420
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2421
      if job is not None or not list_all:
2422
        jobs.append((job_id, job))
2423

    
2424
    return (qobj, jobs, list_all)
2425

    
2426
  def QueryJobs(self, fields, qfilter):
2427
    """Returns a list of jobs in queue.
2428

2429
    @type fields: sequence
2430
    @param fields: List of wanted fields
2431
    @type qfilter: None or query2 filter (list)
2432
    @param qfilter: Query filter
2433

2434
    """
2435
    (qobj, ctx, _) = self._Query(fields, qfilter)
2436

    
2437
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2438

    
2439
  def OldStyleQueryJobs(self, job_ids, fields):
2440
    """Returns a list of jobs in queue.
2441

2442
    @type job_ids: list
2443
    @param job_ids: sequence of job identifiers or None for all
2444
    @type fields: list
2445
    @param fields: names of fields to return
2446
    @rtype: list
2447
    @return: list one element per job, each element being list with
2448
        the requested fields
2449

2450
    """
2451
    # backwards compat:
2452
    job_ids = [int(jid) for jid in job_ids]
2453
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2454

    
2455
    (qobj, ctx, _) = self._Query(fields, qfilter)
2456

    
2457
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2458

    
2459
  @locking.ssynchronized(_LOCK)
2460
  def PrepareShutdown(self):
2461
    """Prepare to stop the job queue.
2462

2463
    Disables execution of jobs in the workerpool and returns whether there are
2464
    any jobs currently running. If the latter is the case, the job queue is not
2465
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2466
    be called without interfering with any job. Queued and unfinished jobs will
2467
    be resumed next time.
2468

2469
    Once this function has been called no new job submissions will be accepted
2470
    (see L{_RequireNonDrainedQueue}).
2471

2472
    @rtype: bool
2473
    @return: Whether there are any running jobs
2474

2475
    """
2476
    if self._accepting_jobs:
2477
      self._accepting_jobs = False
2478

    
2479
      # Tell worker pool to stop processing pending tasks
2480
      self._wpool.SetActive(False)
2481

    
2482
    return self._wpool.HasRunningTasks()
2483

    
2484
  @locking.ssynchronized(_LOCK)
2485
  @_RequireOpenQueue
2486
  def Shutdown(self):
2487
    """Stops the job queue.
2488

2489
    This shutdowns all the worker threads an closes the queue.
2490

2491
    """
2492
    self._wpool.TerminateWorkers()
2493

    
2494
    self._queue_filelock.Close()
2495
    self._queue_filelock = None