Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ e4e59de8

History | View | Annotate | Download (75.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import time
35
import weakref
36
import threading
37
import itertools
38

    
39
try:
40
  # pylint: disable=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
59
from ganeti import ht
60
from ganeti import query
61
from ganeti import qlang
62
from ganeti import pathutils
63
from ganeti import vcluster
64

    
65

    
66
JOBQUEUE_THREADS = 25
67

    
68
# member lock names to be passed to @ssynchronized decorator
69
_LOCK = "_lock"
70
_QUEUE = "_queue"
71

    
72

    
73
class CancelJob(Exception):
74
  """Special exception to cancel a job.
75

76
  """
77

    
78

    
79
class QueueShutdown(Exception):
80
  """Special exception to abort a job when the job queue is shutting down.
81

82
  """
83

    
84

    
85
def TimeStampNow():
86
  """Returns the current timestamp.
87

88
  @rtype: tuple
89
  @return: the current time in the (seconds, microseconds) format
90

91
  """
92
  return utils.SplitTime(time.time())
93

    
94

    
95
def _CallJqUpdate(runner, names, file_name, content):
96
  """Updates job queue file after virtualizing filename.
97

98
  """
99
  virt_file_name = vcluster.MakeVirtualPath(file_name)
100
  return runner.call_jobqueue_update(names, virt_file_name, content)
101

    
102

    
103
class _SimpleJobQuery:
104
  """Wrapper for job queries.
105

106
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
107

108
  """
109
  def __init__(self, fields):
110
    """Initializes this class.
111

112
    """
113
    self._query = query.Query(query.JOB_FIELDS, fields)
114

    
115
  def __call__(self, job):
116
    """Executes a job query using cached field list.
117

118
    """
119
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
120

    
121

    
122
class _QueuedOpCode(object):
123
  """Encapsulates an opcode object.
124

125
  @ivar log: holds the execution log and consists of tuples
126
  of the form C{(log_serial, timestamp, level, message)}
127
  @ivar input: the OpCode we encapsulate
128
  @ivar status: the current status
129
  @ivar result: the result of the LU execution
130
  @ivar start_timestamp: timestamp for the start of the execution
131
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
132
  @ivar stop_timestamp: timestamp for the end of the execution
133

134
  """
135
  __slots__ = ["input", "status", "result", "log", "priority",
136
               "start_timestamp", "exec_timestamp", "end_timestamp",
137
               "__weakref__"]
138

    
139
  def __init__(self, op):
140
    """Initializes instances of this class.
141

142
    @type op: L{opcodes.OpCode}
143
    @param op: the opcode we encapsulate
144

145
    """
146
    self.input = op
147
    self.status = constants.OP_STATUS_QUEUED
148
    self.result = None
149
    self.log = []
150
    self.start_timestamp = None
151
    self.exec_timestamp = None
152
    self.end_timestamp = None
153

    
154
    # Get initial priority (it might change during the lifetime of this opcode)
155
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
156

    
157
  @classmethod
158
  def Restore(cls, state):
159
    """Restore the _QueuedOpCode from the serialized form.
160

161
    @type state: dict
162
    @param state: the serialized state
163
    @rtype: _QueuedOpCode
164
    @return: a new _QueuedOpCode instance
165

166
    """
167
    obj = _QueuedOpCode.__new__(cls)
168
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
169
    obj.status = state["status"]
170
    obj.result = state["result"]
171
    obj.log = state["log"]
172
    obj.start_timestamp = state.get("start_timestamp", None)
173
    obj.exec_timestamp = state.get("exec_timestamp", None)
174
    obj.end_timestamp = state.get("end_timestamp", None)
175
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
176
    return obj
177

    
178
  def Serialize(self):
179
    """Serializes this _QueuedOpCode.
180

181
    @rtype: dict
182
    @return: the dictionary holding the serialized state
183

184
    """
185
    return {
186
      "input": self.input.__getstate__(),
187
      "status": self.status,
188
      "result": self.result,
189
      "log": self.log,
190
      "start_timestamp": self.start_timestamp,
191
      "exec_timestamp": self.exec_timestamp,
192
      "end_timestamp": self.end_timestamp,
193
      "priority": self.priority,
194
      }
195

    
196

    
197
class _QueuedJob(object):
198
  """In-memory job representation.
199

200
  This is what we use to track the user-submitted jobs. Locking must
201
  be taken care of by users of this class.
202

203
  @type queue: L{JobQueue}
204
  @ivar queue: the parent queue
205
  @ivar id: the job ID
206
  @type ops: list
207
  @ivar ops: the list of _QueuedOpCode that constitute the job
208
  @type log_serial: int
209
  @ivar log_serial: holds the index for the next log entry
210
  @ivar received_timestamp: the timestamp for when the job was received
211
  @ivar start_timestmap: the timestamp for start of execution
212
  @ivar end_timestamp: the timestamp for end of execution
213
  @ivar writable: Whether the job is allowed to be modified
214

215
  """
216
  # pylint: disable=W0212
217
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
218
               "received_timestamp", "start_timestamp", "end_timestamp",
219
               "__weakref__", "processor_lock", "writable", "archived"]
220

    
221
  def __init__(self, queue, job_id, ops, writable):
222
    """Constructor for the _QueuedJob.
223

224
    @type queue: L{JobQueue}
225
    @param queue: our parent queue
226
    @type job_id: job_id
227
    @param job_id: our job id
228
    @type ops: list
229
    @param ops: the list of opcodes we hold, which will be encapsulated
230
        in _QueuedOpCodes
231
    @type writable: bool
232
    @param writable: Whether job can be modified
233

234
    """
235
    if not ops:
236
      raise errors.GenericError("A job needs at least one opcode")
237

    
238
    self.queue = queue
239
    self.id = int(job_id)
240
    self.ops = [_QueuedOpCode(op) for op in ops]
241
    self.log_serial = 0
242
    self.received_timestamp = TimeStampNow()
243
    self.start_timestamp = None
244
    self.end_timestamp = None
245
    self.archived = False
246

    
247
    self._InitInMemory(self, writable)
248

    
249
    assert not self.archived, "New jobs can not be marked as archived"
250

    
251
  @staticmethod
252
  def _InitInMemory(obj, writable):
253
    """Initializes in-memory variables.
254

255
    """
256
    obj.writable = writable
257
    obj.ops_iter = None
258
    obj.cur_opctx = None
259

    
260
    # Read-only jobs are not processed and therefore don't need a lock
261
    if writable:
262
      obj.processor_lock = threading.Lock()
263
    else:
264
      obj.processor_lock = None
265

    
266
  def __repr__(self):
267
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
268
              "id=%s" % self.id,
269
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
270

    
271
    return "<%s at %#x>" % (" ".join(status), id(self))
272

    
273
  @classmethod
274
  def Restore(cls, queue, state, writable, archived):
275
    """Restore a _QueuedJob from serialized state:
276

277
    @type queue: L{JobQueue}
278
    @param queue: to which queue the restored job belongs
279
    @type state: dict
280
    @param state: the serialized state
281
    @type writable: bool
282
    @param writable: Whether job can be modified
283
    @type archived: bool
284
    @param archived: Whether job was already archived
285
    @rtype: _JobQueue
286
    @return: the restored _JobQueue instance
287

288
    """
289
    obj = _QueuedJob.__new__(cls)
290
    obj.queue = queue
291
    obj.id = int(state["id"])
292
    obj.received_timestamp = state.get("received_timestamp", None)
293
    obj.start_timestamp = state.get("start_timestamp", None)
294
    obj.end_timestamp = state.get("end_timestamp", None)
295
    obj.archived = archived
296

    
297
    obj.ops = []
298
    obj.log_serial = 0
299
    for op_state in state["ops"]:
300
      op = _QueuedOpCode.Restore(op_state)
301
      for log_entry in op.log:
302
        obj.log_serial = max(obj.log_serial, log_entry[0])
303
      obj.ops.append(op)
304

    
305
    cls._InitInMemory(obj, writable)
306

    
307
    return obj
308

    
309
  def Serialize(self):
310
    """Serialize the _JobQueue instance.
311

312
    @rtype: dict
313
    @return: the serialized state
314

315
    """
316
    return {
317
      "id": self.id,
318
      "ops": [op.Serialize() for op in self.ops],
319
      "start_timestamp": self.start_timestamp,
320
      "end_timestamp": self.end_timestamp,
321
      "received_timestamp": self.received_timestamp,
322
      }
323

    
324
  def CalcStatus(self):
325
    """Compute the status of this job.
326

327
    This function iterates over all the _QueuedOpCodes in the job and
328
    based on their status, computes the job status.
329

330
    The algorithm is:
331
      - if we find a cancelled, or finished with error, the job
332
        status will be the same
333
      - otherwise, the last opcode with the status one of:
334
          - waitlock
335
          - canceling
336
          - running
337

338
        will determine the job status
339

340
      - otherwise, it means either all opcodes are queued, or success,
341
        and the job status will be the same
342

343
    @return: the job status
344

345
    """
346
    status = constants.JOB_STATUS_QUEUED
347

    
348
    all_success = True
349
    for op in self.ops:
350
      if op.status == constants.OP_STATUS_SUCCESS:
351
        continue
352

    
353
      all_success = False
354

    
355
      if op.status == constants.OP_STATUS_QUEUED:
356
        pass
357
      elif op.status == constants.OP_STATUS_WAITING:
358
        status = constants.JOB_STATUS_WAITING
359
      elif op.status == constants.OP_STATUS_RUNNING:
360
        status = constants.JOB_STATUS_RUNNING
361
      elif op.status == constants.OP_STATUS_CANCELING:
362
        status = constants.JOB_STATUS_CANCELING
363
        break
364
      elif op.status == constants.OP_STATUS_ERROR:
365
        status = constants.JOB_STATUS_ERROR
366
        # The whole job fails if one opcode failed
367
        break
368
      elif op.status == constants.OP_STATUS_CANCELED:
369
        status = constants.OP_STATUS_CANCELED
370
        break
371

    
372
    if all_success:
373
      status = constants.JOB_STATUS_SUCCESS
374

    
375
    return status
376

    
377
  def CalcPriority(self):
378
    """Gets the current priority for this job.
379

380
    Only unfinished opcodes are considered. When all are done, the default
381
    priority is used.
382

383
    @rtype: int
384

385
    """
386
    priorities = [op.priority for op in self.ops
387
                  if op.status not in constants.OPS_FINALIZED]
388

    
389
    if not priorities:
390
      # All opcodes are done, assume default priority
391
      return constants.OP_PRIO_DEFAULT
392

    
393
    return min(priorities)
394

    
395
  def GetLogEntries(self, newer_than):
396
    """Selectively returns the log entries.
397

398
    @type newer_than: None or int
399
    @param newer_than: if this is None, return all log entries,
400
        otherwise return only the log entries with serial higher
401
        than this value
402
    @rtype: list
403
    @return: the list of the log entries selected
404

405
    """
406
    if newer_than is None:
407
      serial = -1
408
    else:
409
      serial = newer_than
410

    
411
    entries = []
412
    for op in self.ops:
413
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
414

    
415
    return entries
416

    
417
  def GetInfo(self, fields):
418
    """Returns information about a job.
419

420
    @type fields: list
421
    @param fields: names of fields to return
422
    @rtype: list
423
    @return: list with one element for each field
424
    @raise errors.OpExecError: when an invalid field
425
        has been passed
426

427
    """
428
    return _SimpleJobQuery(fields)(self)
429

    
430
  def MarkUnfinishedOps(self, status, result):
431
    """Mark unfinished opcodes with a given status and result.
432

433
    This is an utility function for marking all running or waiting to
434
    be run opcodes with a given status. Opcodes which are already
435
    finalised are not changed.
436

437
    @param status: a given opcode status
438
    @param result: the opcode result
439

440
    """
441
    not_marked = True
442
    for op in self.ops:
443
      if op.status in constants.OPS_FINALIZED:
444
        assert not_marked, "Finalized opcodes found after non-finalized ones"
445
        continue
446
      op.status = status
447
      op.result = result
448
      not_marked = False
449

    
450
  def Finalize(self):
451
    """Marks the job as finalized.
452

453
    """
454
    self.end_timestamp = TimeStampNow()
455

    
456
  def Cancel(self):
457
    """Marks job as canceled/-ing if possible.
458

459
    @rtype: tuple; (bool, string)
460
    @return: Boolean describing whether job was successfully canceled or marked
461
      as canceling and a text message
462

463
    """
464
    status = self.CalcStatus()
465

    
466
    if status == constants.JOB_STATUS_QUEUED:
467
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
468
                             "Job canceled by request")
469
      self.Finalize()
470
      return (True, "Job %s canceled" % self.id)
471

    
472
    elif status == constants.JOB_STATUS_WAITING:
473
      # The worker will notice the new status and cancel the job
474
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
475
      return (True, "Job %s will be canceled" % self.id)
476

    
477
    else:
478
      logging.debug("Job %s is no longer waiting in the queue", self.id)
479
      return (False, "Job %s is no longer waiting in the queue" % self.id)
480

    
481

    
482
class _OpExecCallbacks(mcpu.OpExecCbBase):
483
  def __init__(self, queue, job, op):
484
    """Initializes this class.
485

486
    @type queue: L{JobQueue}
487
    @param queue: Job queue
488
    @type job: L{_QueuedJob}
489
    @param job: Job object
490
    @type op: L{_QueuedOpCode}
491
    @param op: OpCode
492

493
    """
494
    assert queue, "Queue is missing"
495
    assert job, "Job is missing"
496
    assert op, "Opcode is missing"
497

    
498
    self._queue = queue
499
    self._job = job
500
    self._op = op
501

    
502
  def _CheckCancel(self):
503
    """Raises an exception to cancel the job if asked to.
504

505
    """
506
    # Cancel here if we were asked to
507
    if self._op.status == constants.OP_STATUS_CANCELING:
508
      logging.debug("Canceling opcode")
509
      raise CancelJob()
510

    
511
    # See if queue is shutting down
512
    if not self._queue.AcceptingJobsUnlocked():
513
      logging.debug("Queue is shutting down")
514
      raise QueueShutdown()
515

    
516
  @locking.ssynchronized(_QUEUE, shared=1)
517
  def NotifyStart(self):
518
    """Mark the opcode as running, not lock-waiting.
519

520
    This is called from the mcpu code as a notifier function, when the LU is
521
    finally about to start the Exec() method. Of course, to have end-user
522
    visible results, the opcode must be initially (before calling into
523
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
524

525
    """
526
    assert self._op in self._job.ops
527
    assert self._op.status in (constants.OP_STATUS_WAITING,
528
                               constants.OP_STATUS_CANCELING)
529

    
530
    # Cancel here if we were asked to
531
    self._CheckCancel()
532

    
533
    logging.debug("Opcode is now running")
534

    
535
    self._op.status = constants.OP_STATUS_RUNNING
536
    self._op.exec_timestamp = TimeStampNow()
537

    
538
    # And finally replicate the job status
539
    self._queue.UpdateJobUnlocked(self._job)
540

    
541
  @locking.ssynchronized(_QUEUE, shared=1)
542
  def _AppendFeedback(self, timestamp, log_type, log_msg):
543
    """Internal feedback append function, with locks
544

545
    """
546
    self._job.log_serial += 1
547
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
548
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
549

    
550
  def Feedback(self, *args):
551
    """Append a log entry.
552

553
    """
554
    assert len(args) < 3
555

    
556
    if len(args) == 1:
557
      log_type = constants.ELOG_MESSAGE
558
      log_msg = args[0]
559
    else:
560
      (log_type, log_msg) = args
561

    
562
    # The time is split to make serialization easier and not lose
563
    # precision.
564
    timestamp = utils.SplitTime(time.time())
565
    self._AppendFeedback(timestamp, log_type, log_msg)
566

    
567
  def CurrentPriority(self):
568
    """Returns current priority for opcode.
569

570
    """
571
    assert self._op.status in (constants.OP_STATUS_WAITING,
572
                               constants.OP_STATUS_CANCELING)
573

    
574
    # Cancel here if we were asked to
575
    self._CheckCancel()
576

    
577
    return self._op.priority
578

    
579
  def SubmitManyJobs(self, jobs):
580
    """Submits jobs for processing.
581

582
    See L{JobQueue.SubmitManyJobs}.
583

584
    """
585
    # Locking is done in job queue
586
    return self._queue.SubmitManyJobs(jobs)
587

    
588

    
589
class _JobChangesChecker(object):
590
  def __init__(self, fields, prev_job_info, prev_log_serial):
591
    """Initializes this class.
592

593
    @type fields: list of strings
594
    @param fields: Fields requested by LUXI client
595
    @type prev_job_info: string
596
    @param prev_job_info: previous job info, as passed by the LUXI client
597
    @type prev_log_serial: string
598
    @param prev_log_serial: previous job serial, as passed by the LUXI client
599

600
    """
601
    self._squery = _SimpleJobQuery(fields)
602
    self._prev_job_info = prev_job_info
603
    self._prev_log_serial = prev_log_serial
604

    
605
  def __call__(self, job):
606
    """Checks whether job has changed.
607

608
    @type job: L{_QueuedJob}
609
    @param job: Job object
610

611
    """
612
    assert not job.writable, "Expected read-only job"
613

    
614
    status = job.CalcStatus()
615
    job_info = self._squery(job)
616
    log_entries = job.GetLogEntries(self._prev_log_serial)
617

    
618
    # Serializing and deserializing data can cause type changes (e.g. from
619
    # tuple to list) or precision loss. We're doing it here so that we get
620
    # the same modifications as the data received from the client. Without
621
    # this, the comparison afterwards might fail without the data being
622
    # significantly different.
623
    # TODO: we just deserialized from disk, investigate how to make sure that
624
    # the job info and log entries are compatible to avoid this further step.
625
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
626
    # efficient, though floats will be tricky
627
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
628
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
629

    
630
    # Don't even try to wait if the job is no longer running, there will be
631
    # no changes.
632
    if (status not in (constants.JOB_STATUS_QUEUED,
633
                       constants.JOB_STATUS_RUNNING,
634
                       constants.JOB_STATUS_WAITING) or
635
        job_info != self._prev_job_info or
636
        (log_entries and self._prev_log_serial != log_entries[0][0])):
637
      logging.debug("Job %s changed", job.id)
638
      return (job_info, log_entries)
639

    
640
    return None
641

    
642

    
643
class _JobFileChangesWaiter(object):
644
  def __init__(self, filename):
645
    """Initializes this class.
646

647
    @type filename: string
648
    @param filename: Path to job file
649
    @raises errors.InotifyError: if the notifier cannot be setup
650

651
    """
652
    self._wm = pyinotify.WatchManager()
653
    self._inotify_handler = \
654
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
655
    self._notifier = \
656
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
657
    try:
658
      self._inotify_handler.enable()
659
    except Exception:
660
      # pyinotify doesn't close file descriptors automatically
661
      self._notifier.stop()
662
      raise
663

    
664
  def _OnInotify(self, notifier_enabled):
665
    """Callback for inotify.
666

667
    """
668
    if not notifier_enabled:
669
      self._inotify_handler.enable()
670

    
671
  def Wait(self, timeout):
672
    """Waits for the job file to change.
673

674
    @type timeout: float
675
    @param timeout: Timeout in seconds
676
    @return: Whether there have been events
677

678
    """
679
    assert timeout >= 0
680
    have_events = self._notifier.check_events(timeout * 1000)
681
    if have_events:
682
      self._notifier.read_events()
683
    self._notifier.process_events()
684
    return have_events
685

    
686
  def Close(self):
687
    """Closes underlying notifier and its file descriptor.
688

689
    """
690
    self._notifier.stop()
691

    
692

    
693
class _JobChangesWaiter(object):
694
  def __init__(self, filename):
695
    """Initializes this class.
696

697
    @type filename: string
698
    @param filename: Path to job file
699

700
    """
701
    self._filewaiter = None
702
    self._filename = filename
703

    
704
  def Wait(self, timeout):
705
    """Waits for a job to change.
706

707
    @type timeout: float
708
    @param timeout: Timeout in seconds
709
    @return: Whether there have been events
710

711
    """
712
    if self._filewaiter:
713
      return self._filewaiter.Wait(timeout)
714

    
715
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
716
    # If this point is reached, return immediately and let caller check the job
717
    # file again in case there were changes since the last check. This avoids a
718
    # race condition.
719
    self._filewaiter = _JobFileChangesWaiter(self._filename)
720

    
721
    return True
722

    
723
  def Close(self):
724
    """Closes underlying waiter.
725

726
    """
727
    if self._filewaiter:
728
      self._filewaiter.Close()
729

    
730

    
731
class _WaitForJobChangesHelper(object):
732
  """Helper class using inotify to wait for changes in a job file.
733

734
  This class takes a previous job status and serial, and alerts the client when
735
  the current job status has changed.
736

737
  """
738
  @staticmethod
739
  def _CheckForChanges(counter, job_load_fn, check_fn):
740
    if counter.next() > 0:
741
      # If this isn't the first check the job is given some more time to change
742
      # again. This gives better performance for jobs generating many
743
      # changes/messages.
744
      time.sleep(0.1)
745

    
746
    job = job_load_fn()
747
    if not job:
748
      raise errors.JobLost()
749

    
750
    result = check_fn(job)
751
    if result is None:
752
      raise utils.RetryAgain()
753

    
754
    return result
755

    
756
  def __call__(self, filename, job_load_fn,
757
               fields, prev_job_info, prev_log_serial, timeout):
758
    """Waits for changes on a job.
759

760
    @type filename: string
761
    @param filename: File on which to wait for changes
762
    @type job_load_fn: callable
763
    @param job_load_fn: Function to load job
764
    @type fields: list of strings
765
    @param fields: Which fields to check for changes
766
    @type prev_job_info: list or None
767
    @param prev_job_info: Last job information returned
768
    @type prev_log_serial: int
769
    @param prev_log_serial: Last job message serial number
770
    @type timeout: float
771
    @param timeout: maximum time to wait in seconds
772

773
    """
774
    counter = itertools.count()
775
    try:
776
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
777
      waiter = _JobChangesWaiter(filename)
778
      try:
779
        return utils.Retry(compat.partial(self._CheckForChanges,
780
                                          counter, job_load_fn, check_fn),
781
                           utils.RETRY_REMAINING_TIME, timeout,
782
                           wait_fn=waiter.Wait)
783
      finally:
784
        waiter.Close()
785
    except (errors.InotifyError, errors.JobLost):
786
      return None
787
    except utils.RetryTimeout:
788
      return constants.JOB_NOTCHANGED
789

    
790

    
791
def _EncodeOpError(err):
792
  """Encodes an error which occurred while processing an opcode.
793

794
  """
795
  if isinstance(err, errors.GenericError):
796
    to_encode = err
797
  else:
798
    to_encode = errors.OpExecError(str(err))
799

    
800
  return errors.EncodeException(to_encode)
801

    
802

    
803
class _TimeoutStrategyWrapper:
804
  def __init__(self, fn):
805
    """Initializes this class.
806

807
    """
808
    self._fn = fn
809
    self._next = None
810

    
811
  def _Advance(self):
812
    """Gets the next timeout if necessary.
813

814
    """
815
    if self._next is None:
816
      self._next = self._fn()
817

    
818
  def Peek(self):
819
    """Returns the next timeout.
820

821
    """
822
    self._Advance()
823
    return self._next
824

    
825
  def Next(self):
826
    """Returns the current timeout and advances the internal state.
827

828
    """
829
    self._Advance()
830
    result = self._next
831
    self._next = None
832
    return result
833

    
834

    
835
class _OpExecContext:
836
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
837
    """Initializes this class.
838

839
    """
840
    self.op = op
841
    self.index = index
842
    self.log_prefix = log_prefix
843
    self.summary = op.input.Summary()
844

    
845
    # Create local copy to modify
846
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
847
      self.jobdeps = op.input.depends[:]
848
    else:
849
      self.jobdeps = None
850

    
851
    self._timeout_strategy_factory = timeout_strategy_factory
852
    self._ResetTimeoutStrategy()
853

    
854
  def _ResetTimeoutStrategy(self):
855
    """Creates a new timeout strategy.
856

857
    """
858
    self._timeout_strategy = \
859
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
860

    
861
  def CheckPriorityIncrease(self):
862
    """Checks whether priority can and should be increased.
863

864
    Called when locks couldn't be acquired.
865

866
    """
867
    op = self.op
868

    
869
    # Exhausted all retries and next round should not use blocking acquire
870
    # for locks?
871
    if (self._timeout_strategy.Peek() is None and
872
        op.priority > constants.OP_PRIO_HIGHEST):
873
      logging.debug("Increasing priority")
874
      op.priority -= 1
875
      self._ResetTimeoutStrategy()
876
      return True
877

    
878
    return False
879

    
880
  def GetNextLockTimeout(self):
881
    """Returns the next lock acquire timeout.
882

883
    """
884
    return self._timeout_strategy.Next()
885

    
886

    
887
class _JobProcessor(object):
888
  (DEFER,
889
   WAITDEP,
890
   FINISHED) = range(1, 4)
891

    
892
  def __init__(self, queue, opexec_fn, job,
893
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
894
    """Initializes this class.
895

896
    """
897
    self.queue = queue
898
    self.opexec_fn = opexec_fn
899
    self.job = job
900
    self._timeout_strategy_factory = _timeout_strategy_factory
901

    
902
  @staticmethod
903
  def _FindNextOpcode(job, timeout_strategy_factory):
904
    """Locates the next opcode to run.
905

906
    @type job: L{_QueuedJob}
907
    @param job: Job object
908
    @param timeout_strategy_factory: Callable to create new timeout strategy
909

910
    """
911
    # Create some sort of a cache to speed up locating next opcode for future
912
    # lookups
913
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
914
    # pending and one for processed ops.
915
    if job.ops_iter is None:
916
      job.ops_iter = enumerate(job.ops)
917

    
918
    # Find next opcode to run
919
    while True:
920
      try:
921
        (idx, op) = job.ops_iter.next()
922
      except StopIteration:
923
        raise errors.ProgrammerError("Called for a finished job")
924

    
925
      if op.status == constants.OP_STATUS_RUNNING:
926
        # Found an opcode already marked as running
927
        raise errors.ProgrammerError("Called for job marked as running")
928

    
929
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
930
                             timeout_strategy_factory)
931

    
932
      if op.status not in constants.OPS_FINALIZED:
933
        return opctx
934

    
935
      # This is a job that was partially completed before master daemon
936
      # shutdown, so it can be expected that some opcodes are already
937
      # completed successfully (if any did error out, then the whole job
938
      # should have been aborted and not resubmitted for processing).
939
      logging.info("%s: opcode %s already processed, skipping",
940
                   opctx.log_prefix, opctx.summary)
941

    
942
  @staticmethod
943
  def _MarkWaitlock(job, op):
944
    """Marks an opcode as waiting for locks.
945

946
    The job's start timestamp is also set if necessary.
947

948
    @type job: L{_QueuedJob}
949
    @param job: Job object
950
    @type op: L{_QueuedOpCode}
951
    @param op: Opcode object
952

953
    """
954
    assert op in job.ops
955
    assert op.status in (constants.OP_STATUS_QUEUED,
956
                         constants.OP_STATUS_WAITING)
957

    
958
    update = False
959

    
960
    op.result = None
961

    
962
    if op.status == constants.OP_STATUS_QUEUED:
963
      op.status = constants.OP_STATUS_WAITING
964
      update = True
965

    
966
    if op.start_timestamp is None:
967
      op.start_timestamp = TimeStampNow()
968
      update = True
969

    
970
    if job.start_timestamp is None:
971
      job.start_timestamp = op.start_timestamp
972
      update = True
973

    
974
    assert op.status == constants.OP_STATUS_WAITING
975

    
976
    return update
977

    
978
  @staticmethod
979
  def _CheckDependencies(queue, job, opctx):
980
    """Checks if an opcode has dependencies and if so, processes them.
981

982
    @type queue: L{JobQueue}
983
    @param queue: Queue object
984
    @type job: L{_QueuedJob}
985
    @param job: Job object
986
    @type opctx: L{_OpExecContext}
987
    @param opctx: Opcode execution context
988
    @rtype: bool
989
    @return: Whether opcode will be re-scheduled by dependency tracker
990

991
    """
992
    op = opctx.op
993

    
994
    result = False
995

    
996
    while opctx.jobdeps:
997
      (dep_job_id, dep_status) = opctx.jobdeps[0]
998

    
999
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1000
                                                          dep_status)
1001
      assert ht.TNonEmptyString(depmsg), "No dependency message"
1002

    
1003
      logging.info("%s: %s", opctx.log_prefix, depmsg)
1004

    
1005
      if depresult == _JobDependencyManager.CONTINUE:
1006
        # Remove dependency and continue
1007
        opctx.jobdeps.pop(0)
1008

    
1009
      elif depresult == _JobDependencyManager.WAIT:
1010
        # Need to wait for notification, dependency tracker will re-add job
1011
        # to workerpool
1012
        result = True
1013
        break
1014

    
1015
      elif depresult == _JobDependencyManager.CANCEL:
1016
        # Job was cancelled, cancel this job as well
1017
        job.Cancel()
1018
        assert op.status == constants.OP_STATUS_CANCELING
1019
        break
1020

    
1021
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1022
                         _JobDependencyManager.ERROR):
1023
        # Job failed or there was an error, this job must fail
1024
        op.status = constants.OP_STATUS_ERROR
1025
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1026
        break
1027

    
1028
      else:
1029
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1030
                                     depresult)
1031

    
1032
    return result
1033

    
1034
  def _ExecOpCodeUnlocked(self, opctx):
1035
    """Processes one opcode and returns the result.
1036

1037
    """
1038
    op = opctx.op
1039

    
1040
    assert op.status == constants.OP_STATUS_WAITING
1041

    
1042
    timeout = opctx.GetNextLockTimeout()
1043

    
1044
    try:
1045
      # Make sure not to hold queue lock while calling ExecOpCode
1046
      result = self.opexec_fn(op.input,
1047
                              _OpExecCallbacks(self.queue, self.job, op),
1048
                              timeout=timeout)
1049
    except mcpu.LockAcquireTimeout:
1050
      assert timeout is not None, "Received timeout for blocking acquire"
1051
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1052

    
1053
      assert op.status in (constants.OP_STATUS_WAITING,
1054
                           constants.OP_STATUS_CANCELING)
1055

    
1056
      # Was job cancelled while we were waiting for the lock?
1057
      if op.status == constants.OP_STATUS_CANCELING:
1058
        return (constants.OP_STATUS_CANCELING, None)
1059

    
1060
      # Queue is shutting down, return to queued
1061
      if not self.queue.AcceptingJobsUnlocked():
1062
        return (constants.OP_STATUS_QUEUED, None)
1063

    
1064
      # Stay in waitlock while trying to re-acquire lock
1065
      return (constants.OP_STATUS_WAITING, None)
1066
    except CancelJob:
1067
      logging.exception("%s: Canceling job", opctx.log_prefix)
1068
      assert op.status == constants.OP_STATUS_CANCELING
1069
      return (constants.OP_STATUS_CANCELING, None)
1070

    
1071
    except QueueShutdown:
1072
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1073

    
1074
      assert op.status == constants.OP_STATUS_WAITING
1075

    
1076
      # Job hadn't been started yet, so it should return to the queue
1077
      return (constants.OP_STATUS_QUEUED, None)
1078

    
1079
    except Exception, err: # pylint: disable=W0703
1080
      logging.exception("%s: Caught exception in %s",
1081
                        opctx.log_prefix, opctx.summary)
1082
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1083
    else:
1084
      logging.debug("%s: %s successful",
1085
                    opctx.log_prefix, opctx.summary)
1086
      return (constants.OP_STATUS_SUCCESS, result)
1087

    
1088
  def __call__(self, _nextop_fn=None):
1089
    """Continues execution of a job.
1090

1091
    @param _nextop_fn: Callback function for tests
1092
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1093
      be deferred and C{WAITDEP} if the dependency manager
1094
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1095

1096
    """
1097
    queue = self.queue
1098
    job = self.job
1099

    
1100
    logging.debug("Processing job %s", job.id)
1101

    
1102
    queue.acquire(shared=1)
1103
    try:
1104
      opcount = len(job.ops)
1105

    
1106
      assert job.writable, "Expected writable job"
1107

    
1108
      # Don't do anything for finalized jobs
1109
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1110
        return self.FINISHED
1111

    
1112
      # Is a previous opcode still pending?
1113
      if job.cur_opctx:
1114
        opctx = job.cur_opctx
1115
        job.cur_opctx = None
1116
      else:
1117
        if __debug__ and _nextop_fn:
1118
          _nextop_fn()
1119
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1120

    
1121
      op = opctx.op
1122

    
1123
      # Consistency check
1124
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1125
                                     constants.OP_STATUS_CANCELING)
1126
                        for i in job.ops[opctx.index + 1:])
1127

    
1128
      assert op.status in (constants.OP_STATUS_QUEUED,
1129
                           constants.OP_STATUS_WAITING,
1130
                           constants.OP_STATUS_CANCELING)
1131

    
1132
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1133
              op.priority >= constants.OP_PRIO_HIGHEST)
1134

    
1135
      waitjob = None
1136

    
1137
      if op.status != constants.OP_STATUS_CANCELING:
1138
        assert op.status in (constants.OP_STATUS_QUEUED,
1139
                             constants.OP_STATUS_WAITING)
1140

    
1141
        # Prepare to start opcode
1142
        if self._MarkWaitlock(job, op):
1143
          # Write to disk
1144
          queue.UpdateJobUnlocked(job)
1145

    
1146
        assert op.status == constants.OP_STATUS_WAITING
1147
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1148
        assert job.start_timestamp and op.start_timestamp
1149
        assert waitjob is None
1150

    
1151
        # Check if waiting for a job is necessary
1152
        waitjob = self._CheckDependencies(queue, job, opctx)
1153

    
1154
        assert op.status in (constants.OP_STATUS_WAITING,
1155
                             constants.OP_STATUS_CANCELING,
1156
                             constants.OP_STATUS_ERROR)
1157

    
1158
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1159
                                         constants.OP_STATUS_ERROR)):
1160
          logging.info("%s: opcode %s waiting for locks",
1161
                       opctx.log_prefix, opctx.summary)
1162

    
1163
          assert not opctx.jobdeps, "Not all dependencies were removed"
1164

    
1165
          queue.release()
1166
          try:
1167
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1168
          finally:
1169
            queue.acquire(shared=1)
1170

    
1171
          op.status = op_status
1172
          op.result = op_result
1173

    
1174
          assert not waitjob
1175

    
1176
        if op.status in (constants.OP_STATUS_WAITING,
1177
                         constants.OP_STATUS_QUEUED):
1178
          # waiting: Couldn't get locks in time
1179
          # queued: Queue is shutting down
1180
          assert not op.end_timestamp
1181
        else:
1182
          # Finalize opcode
1183
          op.end_timestamp = TimeStampNow()
1184

    
1185
          if op.status == constants.OP_STATUS_CANCELING:
1186
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1187
                                  for i in job.ops[opctx.index:])
1188
          else:
1189
            assert op.status in constants.OPS_FINALIZED
1190

    
1191
      if op.status == constants.OP_STATUS_QUEUED:
1192
        # Queue is shutting down
1193
        assert not waitjob
1194

    
1195
        finalize = False
1196

    
1197
        # Reset context
1198
        job.cur_opctx = None
1199

    
1200
        # In no case must the status be finalized here
1201
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1202

    
1203
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1204
        finalize = False
1205

    
1206
        if not waitjob and opctx.CheckPriorityIncrease():
1207
          # Priority was changed, need to update on-disk file
1208
          queue.UpdateJobUnlocked(job)
1209

    
1210
        # Keep around for another round
1211
        job.cur_opctx = opctx
1212

    
1213
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1214
                op.priority >= constants.OP_PRIO_HIGHEST)
1215

    
1216
        # In no case must the status be finalized here
1217
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1218

    
1219
      else:
1220
        # Ensure all opcodes so far have been successful
1221
        assert (opctx.index == 0 or
1222
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1223
                           for i in job.ops[:opctx.index]))
1224

    
1225
        # Reset context
1226
        job.cur_opctx = None
1227

    
1228
        if op.status == constants.OP_STATUS_SUCCESS:
1229
          finalize = False
1230

    
1231
        elif op.status == constants.OP_STATUS_ERROR:
1232
          # Ensure failed opcode has an exception as its result
1233
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1234

    
1235
          to_encode = errors.OpExecError("Preceding opcode failed")
1236
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1237
                                _EncodeOpError(to_encode))
1238
          finalize = True
1239

    
1240
          # Consistency check
1241
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1242
                            errors.GetEncodedError(i.result)
1243
                            for i in job.ops[opctx.index:])
1244

    
1245
        elif op.status == constants.OP_STATUS_CANCELING:
1246
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1247
                                "Job canceled by request")
1248
          finalize = True
1249

    
1250
        else:
1251
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1252

    
1253
        if opctx.index == (opcount - 1):
1254
          # Finalize on last opcode
1255
          finalize = True
1256

    
1257
        if finalize:
1258
          # All opcodes have been run, finalize job
1259
          job.Finalize()
1260

    
1261
        # Write to disk. If the job status is final, this is the final write
1262
        # allowed. Once the file has been written, it can be archived anytime.
1263
        queue.UpdateJobUnlocked(job)
1264

    
1265
        assert not waitjob
1266

    
1267
        if finalize:
1268
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1269
          return self.FINISHED
1270

    
1271
      assert not waitjob or queue.depmgr.JobWaiting(job)
1272

    
1273
      if waitjob:
1274
        return self.WAITDEP
1275
      else:
1276
        return self.DEFER
1277
    finally:
1278
      assert job.writable, "Job became read-only while being processed"
1279
      queue.release()
1280

    
1281

    
1282
def _EvaluateJobProcessorResult(depmgr, job, result):
1283
  """Looks at a result from L{_JobProcessor} for a job.
1284

1285
  To be used in a L{_JobQueueWorker}.
1286

1287
  """
1288
  if result == _JobProcessor.FINISHED:
1289
    # Notify waiting jobs
1290
    depmgr.NotifyWaiters(job.id)
1291

    
1292
  elif result == _JobProcessor.DEFER:
1293
    # Schedule again
1294
    raise workerpool.DeferTask(priority=job.CalcPriority())
1295

    
1296
  elif result == _JobProcessor.WAITDEP:
1297
    # No-op, dependency manager will re-schedule
1298
    pass
1299

    
1300
  else:
1301
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1302
                                 (result, ))
1303

    
1304

    
1305
class _JobQueueWorker(workerpool.BaseWorker):
1306
  """The actual job workers.
1307

1308
  """
1309
  def RunTask(self, job): # pylint: disable=W0221
1310
    """Job executor.
1311

1312
    @type job: L{_QueuedJob}
1313
    @param job: the job to be processed
1314

1315
    """
1316
    assert job.writable, "Expected writable job"
1317

    
1318
    # Ensure only one worker is active on a single job. If a job registers for
1319
    # a dependency job, and the other job notifies before the first worker is
1320
    # done, the job can end up in the tasklist more than once.
1321
    job.processor_lock.acquire()
1322
    try:
1323
      return self._RunTaskInner(job)
1324
    finally:
1325
      job.processor_lock.release()
1326

    
1327
  def _RunTaskInner(self, job):
1328
    """Executes a job.
1329

1330
    Must be called with per-job lock acquired.
1331

1332
    """
1333
    queue = job.queue
1334
    assert queue == self.pool.queue
1335

    
1336
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1337
    setname_fn(None)
1338

    
1339
    proc = mcpu.Processor(queue.context, job.id)
1340

    
1341
    # Create wrapper for setting thread name
1342
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1343
                                    proc.ExecOpCode)
1344

    
1345
    _EvaluateJobProcessorResult(queue.depmgr, job,
1346
                                _JobProcessor(queue, wrap_execop_fn, job)())
1347

    
1348
  @staticmethod
1349
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1350
    """Updates the worker thread name to include a short summary of the opcode.
1351

1352
    @param setname_fn: Callable setting worker thread name
1353
    @param execop_fn: Callable for executing opcode (usually
1354
                      L{mcpu.Processor.ExecOpCode})
1355

1356
    """
1357
    setname_fn(op)
1358
    try:
1359
      return execop_fn(op, *args, **kwargs)
1360
    finally:
1361
      setname_fn(None)
1362

    
1363
  @staticmethod
1364
  def _GetWorkerName(job, op):
1365
    """Sets the worker thread name.
1366

1367
    @type job: L{_QueuedJob}
1368
    @type op: L{opcodes.OpCode}
1369

1370
    """
1371
    parts = ["Job%s" % job.id]
1372

    
1373
    if op:
1374
      parts.append(op.TinySummary())
1375

    
1376
    return "/".join(parts)
1377

    
1378

    
1379
class _JobQueueWorkerPool(workerpool.WorkerPool):
1380
  """Simple class implementing a job-processing workerpool.
1381

1382
  """
1383
  def __init__(self, queue):
1384
    super(_JobQueueWorkerPool, self).__init__("Jq",
1385
                                              JOBQUEUE_THREADS,
1386
                                              _JobQueueWorker)
1387
    self.queue = queue
1388

    
1389

    
1390
class _JobDependencyManager:
1391
  """Keeps track of job dependencies.
1392

1393
  """
1394
  (WAIT,
1395
   ERROR,
1396
   CANCEL,
1397
   CONTINUE,
1398
   WRONGSTATUS) = range(1, 6)
1399

    
1400
  def __init__(self, getstatus_fn, enqueue_fn):
1401
    """Initializes this class.
1402

1403
    """
1404
    self._getstatus_fn = getstatus_fn
1405
    self._enqueue_fn = enqueue_fn
1406

    
1407
    self._waiters = {}
1408
    self._lock = locking.SharedLock("JobDepMgr")
1409

    
1410
  @locking.ssynchronized(_LOCK, shared=1)
1411
  def GetLockInfo(self, requested): # pylint: disable=W0613
1412
    """Retrieves information about waiting jobs.
1413

1414
    @type requested: set
1415
    @param requested: Requested information, see C{query.LQ_*}
1416

1417
    """
1418
    # No need to sort here, that's being done by the lock manager and query
1419
    # library. There are no priorities for notifying jobs, hence all show up as
1420
    # one item under "pending".
1421
    return [("job/%s" % job_id, None, None,
1422
             [("job", [job.id for job in waiters])])
1423
            for job_id, waiters in self._waiters.items()
1424
            if waiters]
1425

    
1426
  @locking.ssynchronized(_LOCK, shared=1)
1427
  def JobWaiting(self, job):
1428
    """Checks if a job is waiting.
1429

1430
    """
1431
    return compat.any(job in jobs
1432
                      for jobs in self._waiters.values())
1433

    
1434
  @locking.ssynchronized(_LOCK)
1435
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1436
    """Checks if a dependency job has the requested status.
1437

1438
    If the other job is not yet in a finalized status, the calling job will be
1439
    notified (re-added to the workerpool) at a later point.
1440

1441
    @type job: L{_QueuedJob}
1442
    @param job: Job object
1443
    @type dep_job_id: int
1444
    @param dep_job_id: ID of dependency job
1445
    @type dep_status: list
1446
    @param dep_status: Required status
1447

1448
    """
1449
    assert ht.TJobId(job.id)
1450
    assert ht.TJobId(dep_job_id)
1451
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1452

    
1453
    if job.id == dep_job_id:
1454
      return (self.ERROR, "Job can't depend on itself")
1455

    
1456
    # Get status of dependency job
1457
    try:
1458
      status = self._getstatus_fn(dep_job_id)
1459
    except errors.JobLost, err:
1460
      return (self.ERROR, "Dependency error: %s" % err)
1461

    
1462
    assert status in constants.JOB_STATUS_ALL
1463

    
1464
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1465

    
1466
    if status not in constants.JOBS_FINALIZED:
1467
      # Register for notification and wait for job to finish
1468
      job_id_waiters.add(job)
1469
      return (self.WAIT,
1470
              "Need to wait for job %s, wanted status '%s'" %
1471
              (dep_job_id, dep_status))
1472

    
1473
    # Remove from waiters list
1474
    if job in job_id_waiters:
1475
      job_id_waiters.remove(job)
1476

    
1477
    if (status == constants.JOB_STATUS_CANCELED and
1478
        constants.JOB_STATUS_CANCELED not in dep_status):
1479
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1480

    
1481
    elif not dep_status or status in dep_status:
1482
      return (self.CONTINUE,
1483
              "Dependency job %s finished with status '%s'" %
1484
              (dep_job_id, status))
1485

    
1486
    else:
1487
      return (self.WRONGSTATUS,
1488
              "Dependency job %s finished with status '%s',"
1489
              " not one of '%s' as required" %
1490
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1491

    
1492
  def _RemoveEmptyWaitersUnlocked(self):
1493
    """Remove all jobs without actual waiters.
1494

1495
    """
1496
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1497
                   if not waiters]:
1498
      del self._waiters[job_id]
1499

    
1500
  def NotifyWaiters(self, job_id):
1501
    """Notifies all jobs waiting for a certain job ID.
1502

1503
    @attention: Do not call until L{CheckAndRegister} returned a status other
1504
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1505
    @type job_id: int
1506
    @param job_id: Job ID
1507

1508
    """
1509
    assert ht.TJobId(job_id)
1510

    
1511
    self._lock.acquire()
1512
    try:
1513
      self._RemoveEmptyWaitersUnlocked()
1514

    
1515
      jobs = self._waiters.pop(job_id, None)
1516
    finally:
1517
      self._lock.release()
1518

    
1519
    if jobs:
1520
      # Re-add jobs to workerpool
1521
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1522
                    len(jobs), job_id)
1523
      self._enqueue_fn(jobs)
1524

    
1525

    
1526
def _RequireOpenQueue(fn):
1527
  """Decorator for "public" functions.
1528

1529
  This function should be used for all 'public' functions. That is,
1530
  functions usually called from other classes. Note that this should
1531
  be applied only to methods (not plain functions), since it expects
1532
  that the decorated function is called with a first argument that has
1533
  a '_queue_filelock' argument.
1534

1535
  @warning: Use this decorator only after locking.ssynchronized
1536

1537
  Example::
1538
    @locking.ssynchronized(_LOCK)
1539
    @_RequireOpenQueue
1540
    def Example(self):
1541
      pass
1542

1543
  """
1544
  def wrapper(self, *args, **kwargs):
1545
    # pylint: disable=W0212
1546
    assert self._queue_filelock is not None, "Queue should be open"
1547
    return fn(self, *args, **kwargs)
1548
  return wrapper
1549

    
1550

    
1551
def _RequireNonDrainedQueue(fn):
1552
  """Decorator checking for a non-drained queue.
1553

1554
  To be used with functions submitting new jobs.
1555

1556
  """
1557
  def wrapper(self, *args, **kwargs):
1558
    """Wrapper function.
1559

1560
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1561

1562
    """
1563
    # Ok when sharing the big job queue lock, as the drain file is created when
1564
    # the lock is exclusive.
1565
    # Needs access to protected member, pylint: disable=W0212
1566
    if self._drained:
1567
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1568

    
1569
    if not self._accepting_jobs:
1570
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1571

    
1572
    return fn(self, *args, **kwargs)
1573
  return wrapper
1574

    
1575

    
1576
class JobQueue(object):
1577
  """Queue used to manage the jobs.
1578

1579
  """
1580
  def __init__(self, context):
1581
    """Constructor for JobQueue.
1582

1583
    The constructor will initialize the job queue object and then
1584
    start loading the current jobs from disk, either for starting them
1585
    (if they were queue) or for aborting them (if they were already
1586
    running).
1587

1588
    @type context: GanetiContext
1589
    @param context: the context object for access to the configuration
1590
        data and other ganeti objects
1591

1592
    """
1593
    self.context = context
1594
    self._memcache = weakref.WeakValueDictionary()
1595
    self._my_hostname = netutils.Hostname.GetSysName()
1596

    
1597
    # The Big JobQueue lock. If a code block or method acquires it in shared
1598
    # mode safe it must guarantee concurrency with all the code acquiring it in
1599
    # shared mode, including itself. In order not to acquire it at all
1600
    # concurrency must be guaranteed with all code acquiring it in shared mode
1601
    # and all code acquiring it exclusively.
1602
    self._lock = locking.SharedLock("JobQueue")
1603

    
1604
    self.acquire = self._lock.acquire
1605
    self.release = self._lock.release
1606

    
1607
    # Accept jobs by default
1608
    self._accepting_jobs = True
1609

    
1610
    # Initialize the queue, and acquire the filelock.
1611
    # This ensures no other process is working on the job queue.
1612
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1613

    
1614
    # Read serial file
1615
    self._last_serial = jstore.ReadSerial()
1616
    assert self._last_serial is not None, ("Serial file was modified between"
1617
                                           " check in jstore and here")
1618

    
1619
    # Get initial list of nodes
1620
    self._nodes = dict((n.name, n.primary_ip)
1621
                       for n in self.context.cfg.GetAllNodesInfo().values()
1622
                       if n.master_candidate)
1623

    
1624
    # Remove master node
1625
    self._nodes.pop(self._my_hostname, None)
1626

    
1627
    # TODO: Check consistency across nodes
1628

    
1629
    self._queue_size = None
1630
    self._UpdateQueueSizeUnlocked()
1631
    assert ht.TInt(self._queue_size)
1632
    self._drained = jstore.CheckDrainFlag()
1633

    
1634
    # Job dependencies
1635
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1636
                                        self._EnqueueJobs)
1637
    self.context.glm.AddToLockMonitor(self.depmgr)
1638

    
1639
    # Setup worker pool
1640
    self._wpool = _JobQueueWorkerPool(self)
1641
    try:
1642
      self._InspectQueue()
1643
    except:
1644
      self._wpool.TerminateWorkers()
1645
      raise
1646

    
1647
  @locking.ssynchronized(_LOCK)
1648
  @_RequireOpenQueue
1649
  def _InspectQueue(self):
1650
    """Loads the whole job queue and resumes unfinished jobs.
1651

1652
    This function needs the lock here because WorkerPool.AddTask() may start a
1653
    job while we're still doing our work.
1654

1655
    """
1656
    logging.info("Inspecting job queue")
1657

    
1658
    restartjobs = []
1659

    
1660
    all_job_ids = self._GetJobIDsUnlocked()
1661
    jobs_count = len(all_job_ids)
1662
    lastinfo = time.time()
1663
    for idx, job_id in enumerate(all_job_ids):
1664
      # Give an update every 1000 jobs or 10 seconds
1665
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1666
          idx == (jobs_count - 1)):
1667
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1668
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1669
        lastinfo = time.time()
1670

    
1671
      job = self._LoadJobUnlocked(job_id)
1672

    
1673
      # a failure in loading the job can cause 'None' to be returned
1674
      if job is None:
1675
        continue
1676

    
1677
      status = job.CalcStatus()
1678

    
1679
      if status == constants.JOB_STATUS_QUEUED:
1680
        restartjobs.append(job)
1681

    
1682
      elif status in (constants.JOB_STATUS_RUNNING,
1683
                      constants.JOB_STATUS_WAITING,
1684
                      constants.JOB_STATUS_CANCELING):
1685
        logging.warning("Unfinished job %s found: %s", job.id, job)
1686

    
1687
        if status == constants.JOB_STATUS_WAITING:
1688
          # Restart job
1689
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1690
          restartjobs.append(job)
1691
        else:
1692
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1693
                                "Unclean master daemon shutdown")
1694
          job.Finalize()
1695

    
1696
        self.UpdateJobUnlocked(job)
1697

    
1698
    if restartjobs:
1699
      logging.info("Restarting %s jobs", len(restartjobs))
1700
      self._EnqueueJobsUnlocked(restartjobs)
1701

    
1702
    logging.info("Job queue inspection finished")
1703

    
1704
  def _GetRpc(self, address_list):
1705
    """Gets RPC runner with context.
1706

1707
    """
1708
    return rpc.JobQueueRunner(self.context, address_list)
1709

    
1710
  @locking.ssynchronized(_LOCK)
1711
  @_RequireOpenQueue
1712
  def AddNode(self, node):
1713
    """Register a new node with the queue.
1714

1715
    @type node: L{objects.Node}
1716
    @param node: the node object to be added
1717

1718
    """
1719
    node_name = node.name
1720
    assert node_name != self._my_hostname
1721

    
1722
    # Clean queue directory on added node
1723
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1724
    msg = result.fail_msg
1725
    if msg:
1726
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1727
                      node_name, msg)
1728

    
1729
    if not node.master_candidate:
1730
      # remove if existing, ignoring errors
1731
      self._nodes.pop(node_name, None)
1732
      # and skip the replication of the job ids
1733
      return
1734

    
1735
    # Upload the whole queue excluding archived jobs
1736
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1737

    
1738
    # Upload current serial file
1739
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1740

    
1741
    # Static address list
1742
    addrs = [node.primary_ip]
1743

    
1744
    for file_name in files:
1745
      # Read file content
1746
      content = utils.ReadFile(file_name)
1747

    
1748
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1749
                             file_name, content)
1750
      msg = result[node_name].fail_msg
1751
      if msg:
1752
        logging.error("Failed to upload file %s to node %s: %s",
1753
                      file_name, node_name, msg)
1754

    
1755
    self._nodes[node_name] = node.primary_ip
1756

    
1757
  @locking.ssynchronized(_LOCK)
1758
  @_RequireOpenQueue
1759
  def RemoveNode(self, node_name):
1760
    """Callback called when removing nodes from the cluster.
1761

1762
    @type node_name: str
1763
    @param node_name: the name of the node to remove
1764

1765
    """
1766
    self._nodes.pop(node_name, None)
1767

    
1768
  @staticmethod
1769
  def _CheckRpcResult(result, nodes, failmsg):
1770
    """Verifies the status of an RPC call.
1771

1772
    Since we aim to keep consistency should this node (the current
1773
    master) fail, we will log errors if our rpc fail, and especially
1774
    log the case when more than half of the nodes fails.
1775

1776
    @param result: the data as returned from the rpc call
1777
    @type nodes: list
1778
    @param nodes: the list of nodes we made the call to
1779
    @type failmsg: str
1780
    @param failmsg: the identifier to be used for logging
1781

1782
    """
1783
    failed = []
1784
    success = []
1785

    
1786
    for node in nodes:
1787
      msg = result[node].fail_msg
1788
      if msg:
1789
        failed.append(node)
1790
        logging.error("RPC call %s (%s) failed on node %s: %s",
1791
                      result[node].call, failmsg, node, msg)
1792
      else:
1793
        success.append(node)
1794

    
1795
    # +1 for the master node
1796
    if (len(success) + 1) < len(failed):
1797
      # TODO: Handle failing nodes
1798
      logging.error("More than half of the nodes failed")
1799

    
1800
  def _GetNodeIp(self):
1801
    """Helper for returning the node name/ip list.
1802

1803
    @rtype: (list, list)
1804
    @return: a tuple of two lists, the first one with the node
1805
        names and the second one with the node addresses
1806

1807
    """
1808
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1809
    name_list = self._nodes.keys()
1810
    addr_list = [self._nodes[name] for name in name_list]
1811
    return name_list, addr_list
1812

    
1813
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1814
    """Writes a file locally and then replicates it to all nodes.
1815

1816
    This function will replace the contents of a file on the local
1817
    node and then replicate it to all the other nodes we have.
1818

1819
    @type file_name: str
1820
    @param file_name: the path of the file to be replicated
1821
    @type data: str
1822
    @param data: the new contents of the file
1823
    @type replicate: boolean
1824
    @param replicate: whether to spread the changes to the remote nodes
1825

1826
    """
1827
    getents = runtime.GetEnts()
1828
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1829
                    gid=getents.masterd_gid)
1830

    
1831
    if replicate:
1832
      names, addrs = self._GetNodeIp()
1833
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1834
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1835

    
1836
  def _RenameFilesUnlocked(self, rename):
1837
    """Renames a file locally and then replicate the change.
1838

1839
    This function will rename a file in the local queue directory
1840
    and then replicate this rename to all the other nodes we have.
1841

1842
    @type rename: list of (old, new)
1843
    @param rename: List containing tuples mapping old to new names
1844

1845
    """
1846
    # Rename them locally
1847
    for old, new in rename:
1848
      utils.RenameFile(old, new, mkdir=True)
1849

    
1850
    # ... and on all nodes
1851
    names, addrs = self._GetNodeIp()
1852
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1853
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1854

    
1855
  def _NewSerialsUnlocked(self, count):
1856
    """Generates a new job identifier.
1857

1858
    Job identifiers are unique during the lifetime of a cluster.
1859

1860
    @type count: integer
1861
    @param count: how many serials to return
1862
    @rtype: list of int
1863
    @return: a list of job identifiers.
1864

1865
    """
1866
    assert ht.TPositiveInt(count)
1867

    
1868
    # New number
1869
    serial = self._last_serial + count
1870

    
1871
    # Write to file
1872
    self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1873
                             "%s\n" % serial, True)
1874

    
1875
    result = [jstore.FormatJobID(v)
1876
              for v in range(self._last_serial + 1, serial + 1)]
1877

    
1878
    # Keep it only if we were able to write the file
1879
    self._last_serial = serial
1880

    
1881
    assert len(result) == count
1882

    
1883
    return result
1884

    
1885
  @staticmethod
1886
  def _GetJobPath(job_id):
1887
    """Returns the job file for a given job id.
1888

1889
    @type job_id: str
1890
    @param job_id: the job identifier
1891
    @rtype: str
1892
    @return: the path to the job file
1893

1894
    """
1895
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1896

    
1897
  @staticmethod
1898
  def _GetArchivedJobPath(job_id):
1899
    """Returns the archived job file for a give job id.
1900

1901
    @type job_id: str
1902
    @param job_id: the job identifier
1903
    @rtype: str
1904
    @return: the path to the archived job file
1905

1906
    """
1907
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1908
                          jstore.GetArchiveDirectory(job_id),
1909
                          "job-%s" % job_id)
1910

    
1911
  @staticmethod
1912
  def _DetermineJobDirectories(archived):
1913
    """Build list of directories containing job files.
1914

1915
    @type archived: bool
1916
    @param archived: Whether to include directories for archived jobs
1917
    @rtype: list
1918

1919
    """
1920
    result = [pathutils.QUEUE_DIR]
1921

    
1922
    if archived:
1923
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1924
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
1925
                        utils.ListVisibleFiles(archive_path)))
1926

    
1927
    return result
1928

    
1929
  @classmethod
1930
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1931
    """Return all known job IDs.
1932

1933
    The method only looks at disk because it's a requirement that all
1934
    jobs are present on disk (so in the _memcache we don't have any
1935
    extra IDs).
1936

1937
    @type sort: boolean
1938
    @param sort: perform sorting on the returned job ids
1939
    @rtype: list
1940
    @return: the list of job IDs
1941

1942
    """
1943
    jlist = []
1944

    
1945
    for path in cls._DetermineJobDirectories(archived):
1946
      for filename in utils.ListVisibleFiles(path):
1947
        m = constants.JOB_FILE_RE.match(filename)
1948
        if m:
1949
          jlist.append(int(m.group(1)))
1950

    
1951
    if sort:
1952
      jlist.sort()
1953
    return jlist
1954

    
1955
  def _LoadJobUnlocked(self, job_id):
1956
    """Loads a job from the disk or memory.
1957

1958
    Given a job id, this will return the cached job object if
1959
    existing, or try to load the job from the disk. If loading from
1960
    disk, it will also add the job to the cache.
1961

1962
    @type job_id: int
1963
    @param job_id: the job id
1964
    @rtype: L{_QueuedJob} or None
1965
    @return: either None or the job object
1966

1967
    """
1968
    job = self._memcache.get(job_id, None)
1969
    if job:
1970
      logging.debug("Found job %s in memcache", job_id)
1971
      assert job.writable, "Found read-only job in memcache"
1972
      return job
1973

    
1974
    try:
1975
      job = self._LoadJobFromDisk(job_id, False)
1976
      if job is None:
1977
        return job
1978
    except errors.JobFileCorrupted:
1979
      old_path = self._GetJobPath(job_id)
1980
      new_path = self._GetArchivedJobPath(job_id)
1981
      if old_path == new_path:
1982
        # job already archived (future case)
1983
        logging.exception("Can't parse job %s", job_id)
1984
      else:
1985
        # non-archived case
1986
        logging.exception("Can't parse job %s, will archive.", job_id)
1987
        self._RenameFilesUnlocked([(old_path, new_path)])
1988
      return None
1989

    
1990
    assert job.writable, "Job just loaded is not writable"
1991

    
1992
    self._memcache[job_id] = job
1993
    logging.debug("Added job %s to the cache", job_id)
1994
    return job
1995

    
1996
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1997
    """Load the given job file from disk.
1998

1999
    Given a job file, read, load and restore it in a _QueuedJob format.
2000

2001
    @type job_id: int
2002
    @param job_id: job identifier
2003
    @type try_archived: bool
2004
    @param try_archived: Whether to try loading an archived job
2005
    @rtype: L{_QueuedJob} or None
2006
    @return: either None or the job object
2007

2008
    """
2009
    path_functions = [(self._GetJobPath, False)]
2010

    
2011
    if try_archived:
2012
      path_functions.append((self._GetArchivedJobPath, True))
2013

    
2014
    raw_data = None
2015
    archived = None
2016

    
2017
    for (fn, archived) in path_functions:
2018
      filepath = fn(job_id)
2019
      logging.debug("Loading job from %s", filepath)
2020
      try:
2021
        raw_data = utils.ReadFile(filepath)
2022
      except EnvironmentError, err:
2023
        if err.errno != errno.ENOENT:
2024
          raise
2025
      else:
2026
        break
2027

    
2028
    if not raw_data:
2029
      return None
2030

    
2031
    if writable is None:
2032
      writable = not archived
2033

    
2034
    try:
2035
      data = serializer.LoadJson(raw_data)
2036
      job = _QueuedJob.Restore(self, data, writable, archived)
2037
    except Exception, err: # pylint: disable=W0703
2038
      raise errors.JobFileCorrupted(err)
2039

    
2040
    return job
2041

    
2042
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2043
    """Load the given job file from disk.
2044

2045
    Given a job file, read, load and restore it in a _QueuedJob format.
2046
    In case of error reading the job, it gets returned as None, and the
2047
    exception is logged.
2048

2049
    @type job_id: int
2050
    @param job_id: job identifier
2051
    @type try_archived: bool
2052
    @param try_archived: Whether to try loading an archived job
2053
    @rtype: L{_QueuedJob} or None
2054
    @return: either None or the job object
2055

2056
    """
2057
    try:
2058
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2059
    except (errors.JobFileCorrupted, EnvironmentError):
2060
      logging.exception("Can't load/parse job %s", job_id)
2061
      return None
2062

    
2063
  def _UpdateQueueSizeUnlocked(self):
2064
    """Update the queue size.
2065

2066
    """
2067
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2068

    
2069
  @locking.ssynchronized(_LOCK)
2070
  @_RequireOpenQueue
2071
  def SetDrainFlag(self, drain_flag):
2072
    """Sets the drain flag for the queue.
2073

2074
    @type drain_flag: boolean
2075
    @param drain_flag: Whether to set or unset the drain flag
2076

2077
    """
2078
    jstore.SetDrainFlag(drain_flag)
2079

    
2080
    self._drained = drain_flag
2081

    
2082
    return True
2083

    
2084
  @_RequireOpenQueue
2085
  def _SubmitJobUnlocked(self, job_id, ops):
2086
    """Create and store a new job.
2087

2088
    This enters the job into our job queue and also puts it on the new
2089
    queue, in order for it to be picked up by the queue processors.
2090

2091
    @type job_id: job ID
2092
    @param job_id: the job ID for the new job
2093
    @type ops: list
2094
    @param ops: The list of OpCodes that will become the new job.
2095
    @rtype: L{_QueuedJob}
2096
    @return: the job object to be queued
2097
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2098
    @raise errors.GenericError: If an opcode is not valid
2099

2100
    """
2101
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2102
      raise errors.JobQueueFull()
2103

    
2104
    job = _QueuedJob(self, job_id, ops, True)
2105

    
2106
    for idx, op in enumerate(job.ops):
2107
      # Check priority
2108
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2109
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2110
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2111
                                  " are %s" % (idx, op.priority, allowed))
2112

    
2113
      # Check job dependencies
2114
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2115
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2116
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2117
                                  " match %s: %s" %
2118
                                  (idx, opcodes.TNoRelativeJobDependencies,
2119
                                   dependencies))
2120

    
2121
    # Write to disk
2122
    self.UpdateJobUnlocked(job)
2123

    
2124
    self._queue_size += 1
2125

    
2126
    logging.debug("Adding new job %s to the cache", job_id)
2127
    self._memcache[job_id] = job
2128

    
2129
    return job
2130

    
2131
  @locking.ssynchronized(_LOCK)
2132
  @_RequireOpenQueue
2133
  @_RequireNonDrainedQueue
2134
  def SubmitJob(self, ops):
2135
    """Create and store a new job.
2136

2137
    @see: L{_SubmitJobUnlocked}
2138

2139
    """
2140
    (job_id, ) = self._NewSerialsUnlocked(1)
2141
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2142
    return job_id
2143

    
2144
  @locking.ssynchronized(_LOCK)
2145
  @_RequireOpenQueue
2146
  @_RequireNonDrainedQueue
2147
  def SubmitManyJobs(self, jobs):
2148
    """Create and store multiple jobs.
2149

2150
    @see: L{_SubmitJobUnlocked}
2151

2152
    """
2153
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2154

    
2155
    (results, added_jobs) = \
2156
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2157

    
2158
    self._EnqueueJobsUnlocked(added_jobs)
2159

    
2160
    return results
2161

    
2162
  @staticmethod
2163
  def _FormatSubmitError(msg, ops):
2164
    """Formats errors which occurred while submitting a job.
2165

2166
    """
2167
    return ("%s; opcodes %s" %
2168
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2169

    
2170
  @staticmethod
2171
  def _ResolveJobDependencies(resolve_fn, deps):
2172
    """Resolves relative job IDs in dependencies.
2173

2174
    @type resolve_fn: callable
2175
    @param resolve_fn: Function to resolve a relative job ID
2176
    @type deps: list
2177
    @param deps: Dependencies
2178
    @rtype: tuple; (boolean, string or list)
2179
    @return: If successful (first tuple item), the returned list contains
2180
      resolved job IDs along with the requested status; if not successful,
2181
      the second element is an error message
2182

2183
    """
2184
    result = []
2185

    
2186
    for (dep_job_id, dep_status) in deps:
2187
      if ht.TRelativeJobId(dep_job_id):
2188
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2189
        try:
2190
          job_id = resolve_fn(dep_job_id)
2191
        except IndexError:
2192
          # Abort
2193
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2194
      else:
2195
        job_id = dep_job_id
2196

    
2197
      result.append((job_id, dep_status))
2198

    
2199
    return (True, result)
2200

    
2201
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2202
    """Create and store multiple jobs.
2203

2204
    @see: L{_SubmitJobUnlocked}
2205

2206
    """
2207
    results = []
2208
    added_jobs = []
2209

    
2210
    def resolve_fn(job_idx, reljobid):
2211
      assert reljobid < 0
2212
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2213

    
2214
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2215
      for op in ops:
2216
        if getattr(op, opcodes.DEPEND_ATTR, None):
2217
          (status, data) = \
2218
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2219
                                         op.depends)
2220
          if not status:
2221
            # Abort resolving dependencies
2222
            assert ht.TNonEmptyString(data), "No error message"
2223
            break
2224
          # Use resolved dependencies
2225
          op.depends = data
2226
      else:
2227
        try:
2228
          job = self._SubmitJobUnlocked(job_id, ops)
2229
        except errors.GenericError, err:
2230
          status = False
2231
          data = self._FormatSubmitError(str(err), ops)
2232
        else:
2233
          status = True
2234
          data = job_id
2235
          added_jobs.append(job)
2236

    
2237
      results.append((status, data))
2238

    
2239
    return (results, added_jobs)
2240

    
2241
  @locking.ssynchronized(_LOCK)
2242
  def _EnqueueJobs(self, jobs):
2243
    """Helper function to add jobs to worker pool's queue.
2244

2245
    @type jobs: list
2246
    @param jobs: List of all jobs
2247

2248
    """
2249
    return self._EnqueueJobsUnlocked(jobs)
2250

    
2251
  def _EnqueueJobsUnlocked(self, jobs):
2252
    """Helper function to add jobs to worker pool's queue.
2253

2254
    @type jobs: list
2255
    @param jobs: List of all jobs
2256

2257
    """
2258
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2259
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2260
                             priority=[job.CalcPriority() for job in jobs])
2261

    
2262
  def _GetJobStatusForDependencies(self, job_id):
2263
    """Gets the status of a job for dependencies.
2264

2265
    @type job_id: int
2266
    @param job_id: Job ID
2267
    @raise errors.JobLost: If job can't be found
2268

2269
    """
2270
    # Not using in-memory cache as doing so would require an exclusive lock
2271

    
2272
    # Try to load from disk
2273
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2274

    
2275
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2276

    
2277
    if job:
2278
      return job.CalcStatus()
2279

    
2280
    raise errors.JobLost("Job %s not found" % job_id)
2281

    
2282
  @_RequireOpenQueue
2283
  def UpdateJobUnlocked(self, job, replicate=True):
2284
    """Update a job's on disk storage.
2285

2286
    After a job has been modified, this function needs to be called in
2287
    order to write the changes to disk and replicate them to the other
2288
    nodes.
2289

2290
    @type job: L{_QueuedJob}
2291
    @param job: the changed job
2292
    @type replicate: boolean
2293
    @param replicate: whether to replicate the change to remote nodes
2294

2295
    """
2296
    if __debug__:
2297
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2298
      assert (finalized ^ (job.end_timestamp is None))
2299
      assert job.writable, "Can't update read-only job"
2300
      assert not job.archived, "Can't update archived job"
2301

    
2302
    filename = self._GetJobPath(job.id)
2303
    data = serializer.DumpJson(job.Serialize())
2304
    logging.debug("Writing job %s to %s", job.id, filename)
2305
    self._UpdateJobQueueFile(filename, data, replicate)
2306

    
2307
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2308
                        timeout):
2309
    """Waits for changes in a job.
2310

2311
    @type job_id: int
2312
    @param job_id: Job identifier
2313
    @type fields: list of strings
2314
    @param fields: Which fields to check for changes
2315
    @type prev_job_info: list or None
2316
    @param prev_job_info: Last job information returned
2317
    @type prev_log_serial: int
2318
    @param prev_log_serial: Last job message serial number
2319
    @type timeout: float
2320
    @param timeout: maximum time to wait in seconds
2321
    @rtype: tuple (job info, log entries)
2322
    @return: a tuple of the job information as required via
2323
        the fields parameter, and the log entries as a list
2324

2325
        if the job has not changed and the timeout has expired,
2326
        we instead return a special value,
2327
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2328
        as such by the clients
2329

2330
    """
2331
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2332
                             writable=False)
2333

    
2334
    helper = _WaitForJobChangesHelper()
2335

    
2336
    return helper(self._GetJobPath(job_id), load_fn,
2337
                  fields, prev_job_info, prev_log_serial, timeout)
2338

    
2339
  @locking.ssynchronized(_LOCK)
2340
  @_RequireOpenQueue
2341
  def CancelJob(self, job_id):
2342
    """Cancels a job.
2343

2344
    This will only succeed if the job has not started yet.
2345

2346
    @type job_id: int
2347
    @param job_id: job ID of job to be cancelled.
2348

2349
    """
2350
    logging.info("Cancelling job %s", job_id)
2351

    
2352
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2353

    
2354
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2355
    """Modifies a job.
2356

2357
    @type job_id: int
2358
    @param job_id: Job ID
2359
    @type mod_fn: callable
2360
    @param mod_fn: Modifying function, receiving job object as parameter,
2361
      returning tuple of (status boolean, message string)
2362

2363
    """
2364
    job = self._LoadJobUnlocked(job_id)
2365
    if not job:
2366
      logging.debug("Job %s not found", job_id)
2367
      return (False, "Job %s not found" % job_id)
2368

    
2369
    assert job.writable, "Can't modify read-only job"
2370
    assert not job.archived, "Can't modify archived job"
2371

    
2372
    (success, msg) = mod_fn(job)
2373

    
2374
    if success:
2375
      # If the job was finalized (e.g. cancelled), this is the final write
2376
      # allowed. The job can be archived anytime.
2377
      self.UpdateJobUnlocked(job)
2378

    
2379
    return (success, msg)
2380

    
2381
  @_RequireOpenQueue
2382
  def _ArchiveJobsUnlocked(self, jobs):
2383
    """Archives jobs.
2384

2385
    @type jobs: list of L{_QueuedJob}
2386
    @param jobs: Job objects
2387
    @rtype: int
2388
    @return: Number of archived jobs
2389

2390
    """
2391
    archive_jobs = []
2392
    rename_files = []
2393
    for job in jobs:
2394
      assert job.writable, "Can't archive read-only job"
2395
      assert not job.archived, "Can't cancel archived job"
2396

    
2397
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2398
        logging.debug("Job %s is not yet done", job.id)
2399
        continue
2400

    
2401
      archive_jobs.append(job)
2402

    
2403
      old = self._GetJobPath(job.id)
2404
      new = self._GetArchivedJobPath(job.id)
2405
      rename_files.append((old, new))
2406

    
2407
    # TODO: What if 1..n files fail to rename?
2408
    self._RenameFilesUnlocked(rename_files)
2409

    
2410
    logging.debug("Successfully archived job(s) %s",
2411
                  utils.CommaJoin(job.id for job in archive_jobs))
2412

    
2413
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2414
    # the files, we update the cached queue size from the filesystem. When we
2415
    # get around to fix the TODO: above, we can use the number of actually
2416
    # archived jobs to fix this.
2417
    self._UpdateQueueSizeUnlocked()
2418
    return len(archive_jobs)
2419

    
2420
  @locking.ssynchronized(_LOCK)
2421
  @_RequireOpenQueue
2422
  def ArchiveJob(self, job_id):
2423
    """Archives a job.
2424

2425
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2426

2427
    @type job_id: int
2428
    @param job_id: Job ID of job to be archived.
2429
    @rtype: bool
2430
    @return: Whether job was archived
2431

2432
    """
2433
    logging.info("Archiving job %s", job_id)
2434

    
2435
    job = self._LoadJobUnlocked(job_id)
2436
    if not job:
2437
      logging.debug("Job %s not found", job_id)
2438
      return False
2439

    
2440
    return self._ArchiveJobsUnlocked([job]) == 1
2441

    
2442
  @locking.ssynchronized(_LOCK)
2443
  @_RequireOpenQueue
2444
  def AutoArchiveJobs(self, age, timeout):
2445
    """Archives all jobs based on age.
2446

2447
    The method will archive all jobs which are older than the age
2448
    parameter. For jobs that don't have an end timestamp, the start
2449
    timestamp will be considered. The special '-1' age will cause
2450
    archival of all jobs (that are not running or queued).
2451

2452
    @type age: int
2453
    @param age: the minimum age in seconds
2454

2455
    """
2456
    logging.info("Archiving jobs with age more than %s seconds", age)
2457

    
2458
    now = time.time()
2459
    end_time = now + timeout
2460
    archived_count = 0
2461
    last_touched = 0
2462

    
2463
    all_job_ids = self._GetJobIDsUnlocked()
2464
    pending = []
2465
    for idx, job_id in enumerate(all_job_ids):
2466
      last_touched = idx + 1
2467

    
2468
      # Not optimal because jobs could be pending
2469
      # TODO: Measure average duration for job archival and take number of
2470
      # pending jobs into account.
2471
      if time.time() > end_time:
2472
        break
2473

    
2474
      # Returns None if the job failed to load
2475
      job = self._LoadJobUnlocked(job_id)
2476
      if job:
2477
        if job.end_timestamp is None:
2478
          if job.start_timestamp is None:
2479
            job_age = job.received_timestamp
2480
          else:
2481
            job_age = job.start_timestamp
2482
        else:
2483
          job_age = job.end_timestamp
2484

    
2485
        if age == -1 or now - job_age[0] > age:
2486
          pending.append(job)
2487

    
2488
          # Archive 10 jobs at a time
2489
          if len(pending) >= 10:
2490
            archived_count += self._ArchiveJobsUnlocked(pending)
2491
            pending = []
2492

    
2493
    if pending:
2494
      archived_count += self._ArchiveJobsUnlocked(pending)
2495

    
2496
    return (archived_count, len(all_job_ids) - last_touched)
2497

    
2498
  def _Query(self, fields, qfilter):
2499
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2500
                       namefield="id")
2501

    
2502
    # Archived jobs are only looked at if the "archived" field is referenced
2503
    # either as a requested field or in the filter. By default archived jobs
2504
    # are ignored.
2505
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2506

    
2507
    job_ids = qobj.RequestedNames()
2508

    
2509
    list_all = (job_ids is None)
2510

    
2511
    if list_all:
2512
      # Since files are added to/removed from the queue atomically, there's no
2513
      # risk of getting the job ids in an inconsistent state.
2514
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2515

    
2516
    jobs = []
2517

    
2518
    for job_id in job_ids:
2519
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2520
      if job is not None or not list_all:
2521
        jobs.append((job_id, job))
2522

    
2523
    return (qobj, jobs, list_all)
2524

    
2525
  def QueryJobs(self, fields, qfilter):
2526
    """Returns a list of jobs in queue.
2527

2528
    @type fields: sequence
2529
    @param fields: List of wanted fields
2530
    @type qfilter: None or query2 filter (list)
2531
    @param qfilter: Query filter
2532

2533
    """
2534
    (qobj, ctx, _) = self._Query(fields, qfilter)
2535

    
2536
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2537

    
2538
  def OldStyleQueryJobs(self, job_ids, fields):
2539
    """Returns a list of jobs in queue.
2540

2541
    @type job_ids: list
2542
    @param job_ids: sequence of job identifiers or None for all
2543
    @type fields: list
2544
    @param fields: names of fields to return
2545
    @rtype: list
2546
    @return: list one element per job, each element being list with
2547
        the requested fields
2548

2549
    """
2550
    # backwards compat:
2551
    job_ids = [int(jid) for jid in job_ids]
2552
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2553

    
2554
    (qobj, ctx, _) = self._Query(fields, qfilter)
2555

    
2556
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2557

    
2558
  @locking.ssynchronized(_LOCK)
2559
  def PrepareShutdown(self):
2560
    """Prepare to stop the job queue.
2561

2562
    Disables execution of jobs in the workerpool and returns whether there are
2563
    any jobs currently running. If the latter is the case, the job queue is not
2564
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2565
    be called without interfering with any job. Queued and unfinished jobs will
2566
    be resumed next time.
2567

2568
    Once this function has been called no new job submissions will be accepted
2569
    (see L{_RequireNonDrainedQueue}).
2570

2571
    @rtype: bool
2572
    @return: Whether there are any running jobs
2573

2574
    """
2575
    if self._accepting_jobs:
2576
      self._accepting_jobs = False
2577

    
2578
      # Tell worker pool to stop processing pending tasks
2579
      self._wpool.SetActive(False)
2580

    
2581
    return self._wpool.HasRunningTasks()
2582

    
2583
  def AcceptingJobsUnlocked(self):
2584
    """Returns whether jobs are accepted.
2585

2586
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2587
    queue is shutting down.
2588

2589
    @rtype: bool
2590

2591
    """
2592
    return self._accepting_jobs
2593

    
2594
  @locking.ssynchronized(_LOCK)
2595
  @_RequireOpenQueue
2596
  def Shutdown(self):
2597
    """Stops the job queue.
2598

2599
    This shutdowns all the worker threads an closes the queue.
2600

2601
    """
2602
    self._wpool.TerminateWorkers()
2603

    
2604
    self._queue_filelock.Close()
2605
    self._queue_filelock = None