Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 42d49574

History | View | Annotate | Download (74 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import time
35
import weakref
36
import threading
37
import itertools
38

    
39
try:
40
  # pylint: disable=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
59
from ganeti import ht
60
from ganeti import query
61
from ganeti import qlang
62
from ganeti import pathutils
63
from ganeti import vcluster
64

    
65

    
66
JOBQUEUE_THREADS = 25
67

    
68
# member lock names to be passed to @ssynchronized decorator
69
_LOCK = "_lock"
70
_QUEUE = "_queue"
71

    
72

    
73
class CancelJob(Exception):
74
  """Special exception to cancel a job.
75

76
  """
77

    
78

    
79
def TimeStampNow():
80
  """Returns the current timestamp.
81

82
  @rtype: tuple
83
  @return: the current time in the (seconds, microseconds) format
84

85
  """
86
  return utils.SplitTime(time.time())
87

    
88

    
89
def _CallJqUpdate(runner, names, file_name, content):
90
  """Updates job queue file after virtualizing filename.
91

92
  """
93
  virt_file_name = vcluster.MakeVirtualPath(file_name)
94
  return runner.call_jobqueue_update(names, virt_file_name, content)
95

    
96

    
97
class _SimpleJobQuery:
98
  """Wrapper for job queries.
99

100
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
101

102
  """
103
  def __init__(self, fields):
104
    """Initializes this class.
105

106
    """
107
    self._query = query.Query(query.JOB_FIELDS, fields)
108

    
109
  def __call__(self, job):
110
    """Executes a job query using cached field list.
111

112
    """
113
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
114

    
115

    
116
class _QueuedOpCode(object):
117
  """Encapsulates an opcode object.
118

119
  @ivar log: holds the execution log and consists of tuples
120
  of the form C{(log_serial, timestamp, level, message)}
121
  @ivar input: the OpCode we encapsulate
122
  @ivar status: the current status
123
  @ivar result: the result of the LU execution
124
  @ivar start_timestamp: timestamp for the start of the execution
125
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
126
  @ivar stop_timestamp: timestamp for the end of the execution
127

128
  """
129
  __slots__ = ["input", "status", "result", "log", "priority",
130
               "start_timestamp", "exec_timestamp", "end_timestamp",
131
               "__weakref__"]
132

    
133
  def __init__(self, op):
134
    """Initializes instances of this class.
135

136
    @type op: L{opcodes.OpCode}
137
    @param op: the opcode we encapsulate
138

139
    """
140
    self.input = op
141
    self.status = constants.OP_STATUS_QUEUED
142
    self.result = None
143
    self.log = []
144
    self.start_timestamp = None
145
    self.exec_timestamp = None
146
    self.end_timestamp = None
147

    
148
    # Get initial priority (it might change during the lifetime of this opcode)
149
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
150

    
151
  @classmethod
152
  def Restore(cls, state):
153
    """Restore the _QueuedOpCode from the serialized form.
154

155
    @type state: dict
156
    @param state: the serialized state
157
    @rtype: _QueuedOpCode
158
    @return: a new _QueuedOpCode instance
159

160
    """
161
    obj = _QueuedOpCode.__new__(cls)
162
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
163
    obj.status = state["status"]
164
    obj.result = state["result"]
165
    obj.log = state["log"]
166
    obj.start_timestamp = state.get("start_timestamp", None)
167
    obj.exec_timestamp = state.get("exec_timestamp", None)
168
    obj.end_timestamp = state.get("end_timestamp", None)
169
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
170
    return obj
171

    
172
  def Serialize(self):
173
    """Serializes this _QueuedOpCode.
174

175
    @rtype: dict
176
    @return: the dictionary holding the serialized state
177

178
    """
179
    return {
180
      "input": self.input.__getstate__(),
181
      "status": self.status,
182
      "result": self.result,
183
      "log": self.log,
184
      "start_timestamp": self.start_timestamp,
185
      "exec_timestamp": self.exec_timestamp,
186
      "end_timestamp": self.end_timestamp,
187
      "priority": self.priority,
188
      }
189

    
190

    
191
class _QueuedJob(object):
192
  """In-memory job representation.
193

194
  This is what we use to track the user-submitted jobs. Locking must
195
  be taken care of by users of this class.
196

197
  @type queue: L{JobQueue}
198
  @ivar queue: the parent queue
199
  @ivar id: the job ID
200
  @type ops: list
201
  @ivar ops: the list of _QueuedOpCode that constitute the job
202
  @type log_serial: int
203
  @ivar log_serial: holds the index for the next log entry
204
  @ivar received_timestamp: the timestamp for when the job was received
205
  @ivar start_timestmap: the timestamp for start of execution
206
  @ivar end_timestamp: the timestamp for end of execution
207
  @ivar writable: Whether the job is allowed to be modified
208

209
  """
210
  # pylint: disable=W0212
211
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
212
               "received_timestamp", "start_timestamp", "end_timestamp",
213
               "__weakref__", "processor_lock", "writable", "archived"]
214

    
215
  def __init__(self, queue, job_id, ops, writable):
216
    """Constructor for the _QueuedJob.
217

218
    @type queue: L{JobQueue}
219
    @param queue: our parent queue
220
    @type job_id: job_id
221
    @param job_id: our job id
222
    @type ops: list
223
    @param ops: the list of opcodes we hold, which will be encapsulated
224
        in _QueuedOpCodes
225
    @type writable: bool
226
    @param writable: Whether job can be modified
227

228
    """
229
    if not ops:
230
      raise errors.GenericError("A job needs at least one opcode")
231

    
232
    self.queue = queue
233
    self.id = int(job_id)
234
    self.ops = [_QueuedOpCode(op) for op in ops]
235
    self.log_serial = 0
236
    self.received_timestamp = TimeStampNow()
237
    self.start_timestamp = None
238
    self.end_timestamp = None
239
    self.archived = False
240

    
241
    self._InitInMemory(self, writable)
242

    
243
    assert not self.archived, "New jobs can not be marked as archived"
244

    
245
  @staticmethod
246
  def _InitInMemory(obj, writable):
247
    """Initializes in-memory variables.
248

249
    """
250
    obj.writable = writable
251
    obj.ops_iter = None
252
    obj.cur_opctx = None
253

    
254
    # Read-only jobs are not processed and therefore don't need a lock
255
    if writable:
256
      obj.processor_lock = threading.Lock()
257
    else:
258
      obj.processor_lock = None
259

    
260
  def __repr__(self):
261
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
262
              "id=%s" % self.id,
263
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
264

    
265
    return "<%s at %#x>" % (" ".join(status), id(self))
266

    
267
  @classmethod
268
  def Restore(cls, queue, state, writable, archived):
269
    """Restore a _QueuedJob from serialized state:
270

271
    @type queue: L{JobQueue}
272
    @param queue: to which queue the restored job belongs
273
    @type state: dict
274
    @param state: the serialized state
275
    @type writable: bool
276
    @param writable: Whether job can be modified
277
    @type archived: bool
278
    @param archived: Whether job was already archived
279
    @rtype: _JobQueue
280
    @return: the restored _JobQueue instance
281

282
    """
283
    obj = _QueuedJob.__new__(cls)
284
    obj.queue = queue
285
    obj.id = int(state["id"])
286
    obj.received_timestamp = state.get("received_timestamp", None)
287
    obj.start_timestamp = state.get("start_timestamp", None)
288
    obj.end_timestamp = state.get("end_timestamp", None)
289
    obj.archived = archived
290

    
291
    obj.ops = []
292
    obj.log_serial = 0
293
    for op_state in state["ops"]:
294
      op = _QueuedOpCode.Restore(op_state)
295
      for log_entry in op.log:
296
        obj.log_serial = max(obj.log_serial, log_entry[0])
297
      obj.ops.append(op)
298

    
299
    cls._InitInMemory(obj, writable)
300

    
301
    return obj
302

    
303
  def Serialize(self):
304
    """Serialize the _JobQueue instance.
305

306
    @rtype: dict
307
    @return: the serialized state
308

309
    """
310
    return {
311
      "id": self.id,
312
      "ops": [op.Serialize() for op in self.ops],
313
      "start_timestamp": self.start_timestamp,
314
      "end_timestamp": self.end_timestamp,
315
      "received_timestamp": self.received_timestamp,
316
      }
317

    
318
  def CalcStatus(self):
319
    """Compute the status of this job.
320

321
    This function iterates over all the _QueuedOpCodes in the job and
322
    based on their status, computes the job status.
323

324
    The algorithm is:
325
      - if we find a cancelled, or finished with error, the job
326
        status will be the same
327
      - otherwise, the last opcode with the status one of:
328
          - waitlock
329
          - canceling
330
          - running
331

332
        will determine the job status
333

334
      - otherwise, it means either all opcodes are queued, or success,
335
        and the job status will be the same
336

337
    @return: the job status
338

339
    """
340
    status = constants.JOB_STATUS_QUEUED
341

    
342
    all_success = True
343
    for op in self.ops:
344
      if op.status == constants.OP_STATUS_SUCCESS:
345
        continue
346

    
347
      all_success = False
348

    
349
      if op.status == constants.OP_STATUS_QUEUED:
350
        pass
351
      elif op.status == constants.OP_STATUS_WAITING:
352
        status = constants.JOB_STATUS_WAITING
353
      elif op.status == constants.OP_STATUS_RUNNING:
354
        status = constants.JOB_STATUS_RUNNING
355
      elif op.status == constants.OP_STATUS_CANCELING:
356
        status = constants.JOB_STATUS_CANCELING
357
        break
358
      elif op.status == constants.OP_STATUS_ERROR:
359
        status = constants.JOB_STATUS_ERROR
360
        # The whole job fails if one opcode failed
361
        break
362
      elif op.status == constants.OP_STATUS_CANCELED:
363
        status = constants.OP_STATUS_CANCELED
364
        break
365

    
366
    if all_success:
367
      status = constants.JOB_STATUS_SUCCESS
368

    
369
    return status
370

    
371
  def CalcPriority(self):
372
    """Gets the current priority for this job.
373

374
    Only unfinished opcodes are considered. When all are done, the default
375
    priority is used.
376

377
    @rtype: int
378

379
    """
380
    priorities = [op.priority for op in self.ops
381
                  if op.status not in constants.OPS_FINALIZED]
382

    
383
    if not priorities:
384
      # All opcodes are done, assume default priority
385
      return constants.OP_PRIO_DEFAULT
386

    
387
    return min(priorities)
388

    
389
  def GetLogEntries(self, newer_than):
390
    """Selectively returns the log entries.
391

392
    @type newer_than: None or int
393
    @param newer_than: if this is None, return all log entries,
394
        otherwise return only the log entries with serial higher
395
        than this value
396
    @rtype: list
397
    @return: the list of the log entries selected
398

399
    """
400
    if newer_than is None:
401
      serial = -1
402
    else:
403
      serial = newer_than
404

    
405
    entries = []
406
    for op in self.ops:
407
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
408

    
409
    return entries
410

    
411
  def GetInfo(self, fields):
412
    """Returns information about a job.
413

414
    @type fields: list
415
    @param fields: names of fields to return
416
    @rtype: list
417
    @return: list with one element for each field
418
    @raise errors.OpExecError: when an invalid field
419
        has been passed
420

421
    """
422
    return _SimpleJobQuery(fields)(self)
423

    
424
  def MarkUnfinishedOps(self, status, result):
425
    """Mark unfinished opcodes with a given status and result.
426

427
    This is an utility function for marking all running or waiting to
428
    be run opcodes with a given status. Opcodes which are already
429
    finalised are not changed.
430

431
    @param status: a given opcode status
432
    @param result: the opcode result
433

434
    """
435
    not_marked = True
436
    for op in self.ops:
437
      if op.status in constants.OPS_FINALIZED:
438
        assert not_marked, "Finalized opcodes found after non-finalized ones"
439
        continue
440
      op.status = status
441
      op.result = result
442
      not_marked = False
443

    
444
  def Finalize(self):
445
    """Marks the job as finalized.
446

447
    """
448
    self.end_timestamp = TimeStampNow()
449

    
450
  def Cancel(self):
451
    """Marks job as canceled/-ing if possible.
452

453
    @rtype: tuple; (bool, string)
454
    @return: Boolean describing whether job was successfully canceled or marked
455
      as canceling and a text message
456

457
    """
458
    status = self.CalcStatus()
459

    
460
    if status == constants.JOB_STATUS_QUEUED:
461
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
462
                             "Job canceled by request")
463
      self.Finalize()
464
      return (True, "Job %s canceled" % self.id)
465

    
466
    elif status == constants.JOB_STATUS_WAITING:
467
      # The worker will notice the new status and cancel the job
468
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
469
      return (True, "Job %s will be canceled" % self.id)
470

    
471
    else:
472
      logging.debug("Job %s is no longer waiting in the queue", self.id)
473
      return (False, "Job %s is no longer waiting in the queue" % self.id)
474

    
475

    
476
class _OpExecCallbacks(mcpu.OpExecCbBase):
477
  def __init__(self, queue, job, op):
478
    """Initializes this class.
479

480
    @type queue: L{JobQueue}
481
    @param queue: Job queue
482
    @type job: L{_QueuedJob}
483
    @param job: Job object
484
    @type op: L{_QueuedOpCode}
485
    @param op: OpCode
486

487
    """
488
    assert queue, "Queue is missing"
489
    assert job, "Job is missing"
490
    assert op, "Opcode is missing"
491

    
492
    self._queue = queue
493
    self._job = job
494
    self._op = op
495

    
496
  def _CheckCancel(self):
497
    """Raises an exception to cancel the job if asked to.
498

499
    """
500
    # Cancel here if we were asked to
501
    if self._op.status == constants.OP_STATUS_CANCELING:
502
      logging.debug("Canceling opcode")
503
      raise CancelJob()
504

    
505
  @locking.ssynchronized(_QUEUE, shared=1)
506
  def NotifyStart(self):
507
    """Mark the opcode as running, not lock-waiting.
508

509
    This is called from the mcpu code as a notifier function, when the LU is
510
    finally about to start the Exec() method. Of course, to have end-user
511
    visible results, the opcode must be initially (before calling into
512
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
513

514
    """
515
    assert self._op in self._job.ops
516
    assert self._op.status in (constants.OP_STATUS_WAITING,
517
                               constants.OP_STATUS_CANCELING)
518

    
519
    # Cancel here if we were asked to
520
    self._CheckCancel()
521

    
522
    logging.debug("Opcode is now running")
523

    
524
    self._op.status = constants.OP_STATUS_RUNNING
525
    self._op.exec_timestamp = TimeStampNow()
526

    
527
    # And finally replicate the job status
528
    self._queue.UpdateJobUnlocked(self._job)
529

    
530
  @locking.ssynchronized(_QUEUE, shared=1)
531
  def _AppendFeedback(self, timestamp, log_type, log_msg):
532
    """Internal feedback append function, with locks
533

534
    """
535
    self._job.log_serial += 1
536
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
537
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
538

    
539
  def Feedback(self, *args):
540
    """Append a log entry.
541

542
    """
543
    assert len(args) < 3
544

    
545
    if len(args) == 1:
546
      log_type = constants.ELOG_MESSAGE
547
      log_msg = args[0]
548
    else:
549
      (log_type, log_msg) = args
550

    
551
    # The time is split to make serialization easier and not lose
552
    # precision.
553
    timestamp = utils.SplitTime(time.time())
554
    self._AppendFeedback(timestamp, log_type, log_msg)
555

    
556
  def CheckCancel(self):
557
    """Check whether job has been cancelled.
558

559
    """
560
    assert self._op.status in (constants.OP_STATUS_WAITING,
561
                               constants.OP_STATUS_CANCELING)
562

    
563
    # Cancel here if we were asked to
564
    self._CheckCancel()
565

    
566
  def SubmitManyJobs(self, jobs):
567
    """Submits jobs for processing.
568

569
    See L{JobQueue.SubmitManyJobs}.
570

571
    """
572
    # Locking is done in job queue
573
    return self._queue.SubmitManyJobs(jobs)
574

    
575

    
576
class _JobChangesChecker(object):
577
  def __init__(self, fields, prev_job_info, prev_log_serial):
578
    """Initializes this class.
579

580
    @type fields: list of strings
581
    @param fields: Fields requested by LUXI client
582
    @type prev_job_info: string
583
    @param prev_job_info: previous job info, as passed by the LUXI client
584
    @type prev_log_serial: string
585
    @param prev_log_serial: previous job serial, as passed by the LUXI client
586

587
    """
588
    self._squery = _SimpleJobQuery(fields)
589
    self._prev_job_info = prev_job_info
590
    self._prev_log_serial = prev_log_serial
591

    
592
  def __call__(self, job):
593
    """Checks whether job has changed.
594

595
    @type job: L{_QueuedJob}
596
    @param job: Job object
597

598
    """
599
    assert not job.writable, "Expected read-only job"
600

    
601
    status = job.CalcStatus()
602
    job_info = self._squery(job)
603
    log_entries = job.GetLogEntries(self._prev_log_serial)
604

    
605
    # Serializing and deserializing data can cause type changes (e.g. from
606
    # tuple to list) or precision loss. We're doing it here so that we get
607
    # the same modifications as the data received from the client. Without
608
    # this, the comparison afterwards might fail without the data being
609
    # significantly different.
610
    # TODO: we just deserialized from disk, investigate how to make sure that
611
    # the job info and log entries are compatible to avoid this further step.
612
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
613
    # efficient, though floats will be tricky
614
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
615
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
616

    
617
    # Don't even try to wait if the job is no longer running, there will be
618
    # no changes.
619
    if (status not in (constants.JOB_STATUS_QUEUED,
620
                       constants.JOB_STATUS_RUNNING,
621
                       constants.JOB_STATUS_WAITING) or
622
        job_info != self._prev_job_info or
623
        (log_entries and self._prev_log_serial != log_entries[0][0])):
624
      logging.debug("Job %s changed", job.id)
625
      return (job_info, log_entries)
626

    
627
    return None
628

    
629

    
630
class _JobFileChangesWaiter(object):
631
  def __init__(self, filename):
632
    """Initializes this class.
633

634
    @type filename: string
635
    @param filename: Path to job file
636
    @raises errors.InotifyError: if the notifier cannot be setup
637

638
    """
639
    self._wm = pyinotify.WatchManager()
640
    self._inotify_handler = \
641
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
642
    self._notifier = \
643
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
644
    try:
645
      self._inotify_handler.enable()
646
    except Exception:
647
      # pyinotify doesn't close file descriptors automatically
648
      self._notifier.stop()
649
      raise
650

    
651
  def _OnInotify(self, notifier_enabled):
652
    """Callback for inotify.
653

654
    """
655
    if not notifier_enabled:
656
      self._inotify_handler.enable()
657

    
658
  def Wait(self, timeout):
659
    """Waits for the job file to change.
660

661
    @type timeout: float
662
    @param timeout: Timeout in seconds
663
    @return: Whether there have been events
664

665
    """
666
    assert timeout >= 0
667
    have_events = self._notifier.check_events(timeout * 1000)
668
    if have_events:
669
      self._notifier.read_events()
670
    self._notifier.process_events()
671
    return have_events
672

    
673
  def Close(self):
674
    """Closes underlying notifier and its file descriptor.
675

676
    """
677
    self._notifier.stop()
678

    
679

    
680
class _JobChangesWaiter(object):
681
  def __init__(self, filename):
682
    """Initializes this class.
683

684
    @type filename: string
685
    @param filename: Path to job file
686

687
    """
688
    self._filewaiter = None
689
    self._filename = filename
690

    
691
  def Wait(self, timeout):
692
    """Waits for a job to change.
693

694
    @type timeout: float
695
    @param timeout: Timeout in seconds
696
    @return: Whether there have been events
697

698
    """
699
    if self._filewaiter:
700
      return self._filewaiter.Wait(timeout)
701

    
702
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
703
    # If this point is reached, return immediately and let caller check the job
704
    # file again in case there were changes since the last check. This avoids a
705
    # race condition.
706
    self._filewaiter = _JobFileChangesWaiter(self._filename)
707

    
708
    return True
709

    
710
  def Close(self):
711
    """Closes underlying waiter.
712

713
    """
714
    if self._filewaiter:
715
      self._filewaiter.Close()
716

    
717

    
718
class _WaitForJobChangesHelper(object):
719
  """Helper class using inotify to wait for changes in a job file.
720

721
  This class takes a previous job status and serial, and alerts the client when
722
  the current job status has changed.
723

724
  """
725
  @staticmethod
726
  def _CheckForChanges(counter, job_load_fn, check_fn):
727
    if counter.next() > 0:
728
      # If this isn't the first check the job is given some more time to change
729
      # again. This gives better performance for jobs generating many
730
      # changes/messages.
731
      time.sleep(0.1)
732

    
733
    job = job_load_fn()
734
    if not job:
735
      raise errors.JobLost()
736

    
737
    result = check_fn(job)
738
    if result is None:
739
      raise utils.RetryAgain()
740

    
741
    return result
742

    
743
  def __call__(self, filename, job_load_fn,
744
               fields, prev_job_info, prev_log_serial, timeout):
745
    """Waits for changes on a job.
746

747
    @type filename: string
748
    @param filename: File on which to wait for changes
749
    @type job_load_fn: callable
750
    @param job_load_fn: Function to load job
751
    @type fields: list of strings
752
    @param fields: Which fields to check for changes
753
    @type prev_job_info: list or None
754
    @param prev_job_info: Last job information returned
755
    @type prev_log_serial: int
756
    @param prev_log_serial: Last job message serial number
757
    @type timeout: float
758
    @param timeout: maximum time to wait in seconds
759

760
    """
761
    counter = itertools.count()
762
    try:
763
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
764
      waiter = _JobChangesWaiter(filename)
765
      try:
766
        return utils.Retry(compat.partial(self._CheckForChanges,
767
                                          counter, job_load_fn, check_fn),
768
                           utils.RETRY_REMAINING_TIME, timeout,
769
                           wait_fn=waiter.Wait)
770
      finally:
771
        waiter.Close()
772
    except (errors.InotifyError, errors.JobLost):
773
      return None
774
    except utils.RetryTimeout:
775
      return constants.JOB_NOTCHANGED
776

    
777

    
778
def _EncodeOpError(err):
779
  """Encodes an error which occurred while processing an opcode.
780

781
  """
782
  if isinstance(err, errors.GenericError):
783
    to_encode = err
784
  else:
785
    to_encode = errors.OpExecError(str(err))
786

    
787
  return errors.EncodeException(to_encode)
788

    
789

    
790
class _TimeoutStrategyWrapper:
791
  def __init__(self, fn):
792
    """Initializes this class.
793

794
    """
795
    self._fn = fn
796
    self._next = None
797

    
798
  def _Advance(self):
799
    """Gets the next timeout if necessary.
800

801
    """
802
    if self._next is None:
803
      self._next = self._fn()
804

    
805
  def Peek(self):
806
    """Returns the next timeout.
807

808
    """
809
    self._Advance()
810
    return self._next
811

    
812
  def Next(self):
813
    """Returns the current timeout and advances the internal state.
814

815
    """
816
    self._Advance()
817
    result = self._next
818
    self._next = None
819
    return result
820

    
821

    
822
class _OpExecContext:
823
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
824
    """Initializes this class.
825

826
    """
827
    self.op = op
828
    self.index = index
829
    self.log_prefix = log_prefix
830
    self.summary = op.input.Summary()
831

    
832
    # Create local copy to modify
833
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
834
      self.jobdeps = op.input.depends[:]
835
    else:
836
      self.jobdeps = None
837

    
838
    self._timeout_strategy_factory = timeout_strategy_factory
839
    self._ResetTimeoutStrategy()
840

    
841
  def _ResetTimeoutStrategy(self):
842
    """Creates a new timeout strategy.
843

844
    """
845
    self._timeout_strategy = \
846
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
847

    
848
  def CheckPriorityIncrease(self):
849
    """Checks whether priority can and should be increased.
850

851
    Called when locks couldn't be acquired.
852

853
    """
854
    op = self.op
855

    
856
    # Exhausted all retries and next round should not use blocking acquire
857
    # for locks?
858
    if (self._timeout_strategy.Peek() is None and
859
        op.priority > constants.OP_PRIO_HIGHEST):
860
      logging.debug("Increasing priority")
861
      op.priority -= 1
862
      self._ResetTimeoutStrategy()
863
      return True
864

    
865
    return False
866

    
867
  def GetNextLockTimeout(self):
868
    """Returns the next lock acquire timeout.
869

870
    """
871
    return self._timeout_strategy.Next()
872

    
873

    
874
class _JobProcessor(object):
875
  (DEFER,
876
   WAITDEP,
877
   FINISHED) = range(1, 4)
878

    
879
  def __init__(self, queue, opexec_fn, job,
880
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
881
    """Initializes this class.
882

883
    """
884
    self.queue = queue
885
    self.opexec_fn = opexec_fn
886
    self.job = job
887
    self._timeout_strategy_factory = _timeout_strategy_factory
888

    
889
  @staticmethod
890
  def _FindNextOpcode(job, timeout_strategy_factory):
891
    """Locates the next opcode to run.
892

893
    @type job: L{_QueuedJob}
894
    @param job: Job object
895
    @param timeout_strategy_factory: Callable to create new timeout strategy
896

897
    """
898
    # Create some sort of a cache to speed up locating next opcode for future
899
    # lookups
900
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
901
    # pending and one for processed ops.
902
    if job.ops_iter is None:
903
      job.ops_iter = enumerate(job.ops)
904

    
905
    # Find next opcode to run
906
    while True:
907
      try:
908
        (idx, op) = job.ops_iter.next()
909
      except StopIteration:
910
        raise errors.ProgrammerError("Called for a finished job")
911

    
912
      if op.status == constants.OP_STATUS_RUNNING:
913
        # Found an opcode already marked as running
914
        raise errors.ProgrammerError("Called for job marked as running")
915

    
916
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
917
                             timeout_strategy_factory)
918

    
919
      if op.status not in constants.OPS_FINALIZED:
920
        return opctx
921

    
922
      # This is a job that was partially completed before master daemon
923
      # shutdown, so it can be expected that some opcodes are already
924
      # completed successfully (if any did error out, then the whole job
925
      # should have been aborted and not resubmitted for processing).
926
      logging.info("%s: opcode %s already processed, skipping",
927
                   opctx.log_prefix, opctx.summary)
928

    
929
  @staticmethod
930
  def _MarkWaitlock(job, op):
931
    """Marks an opcode as waiting for locks.
932

933
    The job's start timestamp is also set if necessary.
934

935
    @type job: L{_QueuedJob}
936
    @param job: Job object
937
    @type op: L{_QueuedOpCode}
938
    @param op: Opcode object
939

940
    """
941
    assert op in job.ops
942
    assert op.status in (constants.OP_STATUS_QUEUED,
943
                         constants.OP_STATUS_WAITING)
944

    
945
    update = False
946

    
947
    op.result = None
948

    
949
    if op.status == constants.OP_STATUS_QUEUED:
950
      op.status = constants.OP_STATUS_WAITING
951
      update = True
952

    
953
    if op.start_timestamp is None:
954
      op.start_timestamp = TimeStampNow()
955
      update = True
956

    
957
    if job.start_timestamp is None:
958
      job.start_timestamp = op.start_timestamp
959
      update = True
960

    
961
    assert op.status == constants.OP_STATUS_WAITING
962

    
963
    return update
964

    
965
  @staticmethod
966
  def _CheckDependencies(queue, job, opctx):
967
    """Checks if an opcode has dependencies and if so, processes them.
968

969
    @type queue: L{JobQueue}
970
    @param queue: Queue object
971
    @type job: L{_QueuedJob}
972
    @param job: Job object
973
    @type opctx: L{_OpExecContext}
974
    @param opctx: Opcode execution context
975
    @rtype: bool
976
    @return: Whether opcode will be re-scheduled by dependency tracker
977

978
    """
979
    op = opctx.op
980

    
981
    result = False
982

    
983
    while opctx.jobdeps:
984
      (dep_job_id, dep_status) = opctx.jobdeps[0]
985

    
986
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
987
                                                          dep_status)
988
      assert ht.TNonEmptyString(depmsg), "No dependency message"
989

    
990
      logging.info("%s: %s", opctx.log_prefix, depmsg)
991

    
992
      if depresult == _JobDependencyManager.CONTINUE:
993
        # Remove dependency and continue
994
        opctx.jobdeps.pop(0)
995

    
996
      elif depresult == _JobDependencyManager.WAIT:
997
        # Need to wait for notification, dependency tracker will re-add job
998
        # to workerpool
999
        result = True
1000
        break
1001

    
1002
      elif depresult == _JobDependencyManager.CANCEL:
1003
        # Job was cancelled, cancel this job as well
1004
        job.Cancel()
1005
        assert op.status == constants.OP_STATUS_CANCELING
1006
        break
1007

    
1008
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1009
                         _JobDependencyManager.ERROR):
1010
        # Job failed or there was an error, this job must fail
1011
        op.status = constants.OP_STATUS_ERROR
1012
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1013
        break
1014

    
1015
      else:
1016
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1017
                                     depresult)
1018

    
1019
    return result
1020

    
1021
  def _ExecOpCodeUnlocked(self, opctx):
1022
    """Processes one opcode and returns the result.
1023

1024
    """
1025
    op = opctx.op
1026

    
1027
    assert op.status == constants.OP_STATUS_WAITING
1028

    
1029
    timeout = opctx.GetNextLockTimeout()
1030

    
1031
    try:
1032
      # Make sure not to hold queue lock while calling ExecOpCode
1033
      result = self.opexec_fn(op.input,
1034
                              _OpExecCallbacks(self.queue, self.job, op),
1035
                              timeout=timeout, priority=op.priority)
1036
    except mcpu.LockAcquireTimeout:
1037
      assert timeout is not None, "Received timeout for blocking acquire"
1038
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1039

    
1040
      assert op.status in (constants.OP_STATUS_WAITING,
1041
                           constants.OP_STATUS_CANCELING)
1042

    
1043
      # Was job cancelled while we were waiting for the lock?
1044
      if op.status == constants.OP_STATUS_CANCELING:
1045
        return (constants.OP_STATUS_CANCELING, None)
1046

    
1047
      # Stay in waitlock while trying to re-acquire lock
1048
      return (constants.OP_STATUS_WAITING, None)
1049
    except CancelJob:
1050
      logging.exception("%s: Canceling job", opctx.log_prefix)
1051
      assert op.status == constants.OP_STATUS_CANCELING
1052
      return (constants.OP_STATUS_CANCELING, None)
1053
    except Exception, err: # pylint: disable=W0703
1054
      logging.exception("%s: Caught exception in %s",
1055
                        opctx.log_prefix, opctx.summary)
1056
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1057
    else:
1058
      logging.debug("%s: %s successful",
1059
                    opctx.log_prefix, opctx.summary)
1060
      return (constants.OP_STATUS_SUCCESS, result)
1061

    
1062
  def __call__(self, _nextop_fn=None):
1063
    """Continues execution of a job.
1064

1065
    @param _nextop_fn: Callback function for tests
1066
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1067
      be deferred and C{WAITDEP} if the dependency manager
1068
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1069

1070
    """
1071
    queue = self.queue
1072
    job = self.job
1073

    
1074
    logging.debug("Processing job %s", job.id)
1075

    
1076
    queue.acquire(shared=1)
1077
    try:
1078
      opcount = len(job.ops)
1079

    
1080
      assert job.writable, "Expected writable job"
1081

    
1082
      # Don't do anything for finalized jobs
1083
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1084
        return self.FINISHED
1085

    
1086
      # Is a previous opcode still pending?
1087
      if job.cur_opctx:
1088
        opctx = job.cur_opctx
1089
        job.cur_opctx = None
1090
      else:
1091
        if __debug__ and _nextop_fn:
1092
          _nextop_fn()
1093
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1094

    
1095
      op = opctx.op
1096

    
1097
      # Consistency check
1098
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1099
                                     constants.OP_STATUS_CANCELING)
1100
                        for i in job.ops[opctx.index + 1:])
1101

    
1102
      assert op.status in (constants.OP_STATUS_QUEUED,
1103
                           constants.OP_STATUS_WAITING,
1104
                           constants.OP_STATUS_CANCELING)
1105

    
1106
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1107
              op.priority >= constants.OP_PRIO_HIGHEST)
1108

    
1109
      waitjob = None
1110

    
1111
      if op.status != constants.OP_STATUS_CANCELING:
1112
        assert op.status in (constants.OP_STATUS_QUEUED,
1113
                             constants.OP_STATUS_WAITING)
1114

    
1115
        # Prepare to start opcode
1116
        if self._MarkWaitlock(job, op):
1117
          # Write to disk
1118
          queue.UpdateJobUnlocked(job)
1119

    
1120
        assert op.status == constants.OP_STATUS_WAITING
1121
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1122
        assert job.start_timestamp and op.start_timestamp
1123
        assert waitjob is None
1124

    
1125
        # Check if waiting for a job is necessary
1126
        waitjob = self._CheckDependencies(queue, job, opctx)
1127

    
1128
        assert op.status in (constants.OP_STATUS_WAITING,
1129
                             constants.OP_STATUS_CANCELING,
1130
                             constants.OP_STATUS_ERROR)
1131

    
1132
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1133
                                         constants.OP_STATUS_ERROR)):
1134
          logging.info("%s: opcode %s waiting for locks",
1135
                       opctx.log_prefix, opctx.summary)
1136

    
1137
          assert not opctx.jobdeps, "Not all dependencies were removed"
1138

    
1139
          queue.release()
1140
          try:
1141
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1142
          finally:
1143
            queue.acquire(shared=1)
1144

    
1145
          op.status = op_status
1146
          op.result = op_result
1147

    
1148
          assert not waitjob
1149

    
1150
        if op.status == constants.OP_STATUS_WAITING:
1151
          # Couldn't get locks in time
1152
          assert not op.end_timestamp
1153
        else:
1154
          # Finalize opcode
1155
          op.end_timestamp = TimeStampNow()
1156

    
1157
          if op.status == constants.OP_STATUS_CANCELING:
1158
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1159
                                  for i in job.ops[opctx.index:])
1160
          else:
1161
            assert op.status in constants.OPS_FINALIZED
1162

    
1163
      if op.status == constants.OP_STATUS_WAITING or waitjob:
1164
        finalize = False
1165

    
1166
        if not waitjob and opctx.CheckPriorityIncrease():
1167
          # Priority was changed, need to update on-disk file
1168
          queue.UpdateJobUnlocked(job)
1169

    
1170
        # Keep around for another round
1171
        job.cur_opctx = opctx
1172

    
1173
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1174
                op.priority >= constants.OP_PRIO_HIGHEST)
1175

    
1176
        # In no case must the status be finalized here
1177
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1178

    
1179
      else:
1180
        # Ensure all opcodes so far have been successful
1181
        assert (opctx.index == 0 or
1182
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1183
                           for i in job.ops[:opctx.index]))
1184

    
1185
        # Reset context
1186
        job.cur_opctx = None
1187

    
1188
        if op.status == constants.OP_STATUS_SUCCESS:
1189
          finalize = False
1190

    
1191
        elif op.status == constants.OP_STATUS_ERROR:
1192
          # Ensure failed opcode has an exception as its result
1193
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1194

    
1195
          to_encode = errors.OpExecError("Preceding opcode failed")
1196
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1197
                                _EncodeOpError(to_encode))
1198
          finalize = True
1199

    
1200
          # Consistency check
1201
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1202
                            errors.GetEncodedError(i.result)
1203
                            for i in job.ops[opctx.index:])
1204

    
1205
        elif op.status == constants.OP_STATUS_CANCELING:
1206
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1207
                                "Job canceled by request")
1208
          finalize = True
1209

    
1210
        else:
1211
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1212

    
1213
        if opctx.index == (opcount - 1):
1214
          # Finalize on last opcode
1215
          finalize = True
1216

    
1217
        if finalize:
1218
          # All opcodes have been run, finalize job
1219
          job.Finalize()
1220

    
1221
        # Write to disk. If the job status is final, this is the final write
1222
        # allowed. Once the file has been written, it can be archived anytime.
1223
        queue.UpdateJobUnlocked(job)
1224

    
1225
        assert not waitjob
1226

    
1227
        if finalize:
1228
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1229
          return self.FINISHED
1230

    
1231
      assert not waitjob or queue.depmgr.JobWaiting(job)
1232

    
1233
      if waitjob:
1234
        return self.WAITDEP
1235
      else:
1236
        return self.DEFER
1237
    finally:
1238
      assert job.writable, "Job became read-only while being processed"
1239
      queue.release()
1240

    
1241

    
1242
def _EvaluateJobProcessorResult(depmgr, job, result):
1243
  """Looks at a result from L{_JobProcessor} for a job.
1244

1245
  To be used in a L{_JobQueueWorker}.
1246

1247
  """
1248
  if result == _JobProcessor.FINISHED:
1249
    # Notify waiting jobs
1250
    depmgr.NotifyWaiters(job.id)
1251

    
1252
  elif result == _JobProcessor.DEFER:
1253
    # Schedule again
1254
    raise workerpool.DeferTask(priority=job.CalcPriority())
1255

    
1256
  elif result == _JobProcessor.WAITDEP:
1257
    # No-op, dependency manager will re-schedule
1258
    pass
1259

    
1260
  else:
1261
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1262
                                 (result, ))
1263

    
1264

    
1265
class _JobQueueWorker(workerpool.BaseWorker):
1266
  """The actual job workers.
1267

1268
  """
1269
  def RunTask(self, job): # pylint: disable=W0221
1270
    """Job executor.
1271

1272
    @type job: L{_QueuedJob}
1273
    @param job: the job to be processed
1274

1275
    """
1276
    assert job.writable, "Expected writable job"
1277

    
1278
    # Ensure only one worker is active on a single job. If a job registers for
1279
    # a dependency job, and the other job notifies before the first worker is
1280
    # done, the job can end up in the tasklist more than once.
1281
    job.processor_lock.acquire()
1282
    try:
1283
      return self._RunTaskInner(job)
1284
    finally:
1285
      job.processor_lock.release()
1286

    
1287
  def _RunTaskInner(self, job):
1288
    """Executes a job.
1289

1290
    Must be called with per-job lock acquired.
1291

1292
    """
1293
    queue = job.queue
1294
    assert queue == self.pool.queue
1295

    
1296
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1297
    setname_fn(None)
1298

    
1299
    proc = mcpu.Processor(queue.context, job.id)
1300

    
1301
    # Create wrapper for setting thread name
1302
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1303
                                    proc.ExecOpCode)
1304

    
1305
    _EvaluateJobProcessorResult(queue.depmgr, job,
1306
                                _JobProcessor(queue, wrap_execop_fn, job)())
1307

    
1308
  @staticmethod
1309
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1310
    """Updates the worker thread name to include a short summary of the opcode.
1311

1312
    @param setname_fn: Callable setting worker thread name
1313
    @param execop_fn: Callable for executing opcode (usually
1314
                      L{mcpu.Processor.ExecOpCode})
1315

1316
    """
1317
    setname_fn(op)
1318
    try:
1319
      return execop_fn(op, *args, **kwargs)
1320
    finally:
1321
      setname_fn(None)
1322

    
1323
  @staticmethod
1324
  def _GetWorkerName(job, op):
1325
    """Sets the worker thread name.
1326

1327
    @type job: L{_QueuedJob}
1328
    @type op: L{opcodes.OpCode}
1329

1330
    """
1331
    parts = ["Job%s" % job.id]
1332

    
1333
    if op:
1334
      parts.append(op.TinySummary())
1335

    
1336
    return "/".join(parts)
1337

    
1338

    
1339
class _JobQueueWorkerPool(workerpool.WorkerPool):
1340
  """Simple class implementing a job-processing workerpool.
1341

1342
  """
1343
  def __init__(self, queue):
1344
    super(_JobQueueWorkerPool, self).__init__("Jq",
1345
                                              JOBQUEUE_THREADS,
1346
                                              _JobQueueWorker)
1347
    self.queue = queue
1348

    
1349

    
1350
class _JobDependencyManager:
1351
  """Keeps track of job dependencies.
1352

1353
  """
1354
  (WAIT,
1355
   ERROR,
1356
   CANCEL,
1357
   CONTINUE,
1358
   WRONGSTATUS) = range(1, 6)
1359

    
1360
  def __init__(self, getstatus_fn, enqueue_fn):
1361
    """Initializes this class.
1362

1363
    """
1364
    self._getstatus_fn = getstatus_fn
1365
    self._enqueue_fn = enqueue_fn
1366

    
1367
    self._waiters = {}
1368
    self._lock = locking.SharedLock("JobDepMgr")
1369

    
1370
  @locking.ssynchronized(_LOCK, shared=1)
1371
  def GetLockInfo(self, requested): # pylint: disable=W0613
1372
    """Retrieves information about waiting jobs.
1373

1374
    @type requested: set
1375
    @param requested: Requested information, see C{query.LQ_*}
1376

1377
    """
1378
    # No need to sort here, that's being done by the lock manager and query
1379
    # library. There are no priorities for notifying jobs, hence all show up as
1380
    # one item under "pending".
1381
    return [("job/%s" % job_id, None, None,
1382
             [("job", [job.id for job in waiters])])
1383
            for job_id, waiters in self._waiters.items()
1384
            if waiters]
1385

    
1386
  @locking.ssynchronized(_LOCK, shared=1)
1387
  def JobWaiting(self, job):
1388
    """Checks if a job is waiting.
1389

1390
    """
1391
    return compat.any(job in jobs
1392
                      for jobs in self._waiters.values())
1393

    
1394
  @locking.ssynchronized(_LOCK)
1395
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1396
    """Checks if a dependency job has the requested status.
1397

1398
    If the other job is not yet in a finalized status, the calling job will be
1399
    notified (re-added to the workerpool) at a later point.
1400

1401
    @type job: L{_QueuedJob}
1402
    @param job: Job object
1403
    @type dep_job_id: int
1404
    @param dep_job_id: ID of dependency job
1405
    @type dep_status: list
1406
    @param dep_status: Required status
1407

1408
    """
1409
    assert ht.TJobId(job.id)
1410
    assert ht.TJobId(dep_job_id)
1411
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1412

    
1413
    if job.id == dep_job_id:
1414
      return (self.ERROR, "Job can't depend on itself")
1415

    
1416
    # Get status of dependency job
1417
    try:
1418
      status = self._getstatus_fn(dep_job_id)
1419
    except errors.JobLost, err:
1420
      return (self.ERROR, "Dependency error: %s" % err)
1421

    
1422
    assert status in constants.JOB_STATUS_ALL
1423

    
1424
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1425

    
1426
    if status not in constants.JOBS_FINALIZED:
1427
      # Register for notification and wait for job to finish
1428
      job_id_waiters.add(job)
1429
      return (self.WAIT,
1430
              "Need to wait for job %s, wanted status '%s'" %
1431
              (dep_job_id, dep_status))
1432

    
1433
    # Remove from waiters list
1434
    if job in job_id_waiters:
1435
      job_id_waiters.remove(job)
1436

    
1437
    if (status == constants.JOB_STATUS_CANCELED and
1438
        constants.JOB_STATUS_CANCELED not in dep_status):
1439
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1440

    
1441
    elif not dep_status or status in dep_status:
1442
      return (self.CONTINUE,
1443
              "Dependency job %s finished with status '%s'" %
1444
              (dep_job_id, status))
1445

    
1446
    else:
1447
      return (self.WRONGSTATUS,
1448
              "Dependency job %s finished with status '%s',"
1449
              " not one of '%s' as required" %
1450
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1451

    
1452
  def _RemoveEmptyWaitersUnlocked(self):
1453
    """Remove all jobs without actual waiters.
1454

1455
    """
1456
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1457
                   if not waiters]:
1458
      del self._waiters[job_id]
1459

    
1460
  def NotifyWaiters(self, job_id):
1461
    """Notifies all jobs waiting for a certain job ID.
1462

1463
    @attention: Do not call until L{CheckAndRegister} returned a status other
1464
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1465
    @type job_id: int
1466
    @param job_id: Job ID
1467

1468
    """
1469
    assert ht.TJobId(job_id)
1470

    
1471
    self._lock.acquire()
1472
    try:
1473
      self._RemoveEmptyWaitersUnlocked()
1474

    
1475
      jobs = self._waiters.pop(job_id, None)
1476
    finally:
1477
      self._lock.release()
1478

    
1479
    if jobs:
1480
      # Re-add jobs to workerpool
1481
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1482
                    len(jobs), job_id)
1483
      self._enqueue_fn(jobs)
1484

    
1485

    
1486
def _RequireOpenQueue(fn):
1487
  """Decorator for "public" functions.
1488

1489
  This function should be used for all 'public' functions. That is,
1490
  functions usually called from other classes. Note that this should
1491
  be applied only to methods (not plain functions), since it expects
1492
  that the decorated function is called with a first argument that has
1493
  a '_queue_filelock' argument.
1494

1495
  @warning: Use this decorator only after locking.ssynchronized
1496

1497
  Example::
1498
    @locking.ssynchronized(_LOCK)
1499
    @_RequireOpenQueue
1500
    def Example(self):
1501
      pass
1502

1503
  """
1504
  def wrapper(self, *args, **kwargs):
1505
    # pylint: disable=W0212
1506
    assert self._queue_filelock is not None, "Queue should be open"
1507
    return fn(self, *args, **kwargs)
1508
  return wrapper
1509

    
1510

    
1511
def _RequireNonDrainedQueue(fn):
1512
  """Decorator checking for a non-drained queue.
1513

1514
  To be used with functions submitting new jobs.
1515

1516
  """
1517
  def wrapper(self, *args, **kwargs):
1518
    """Wrapper function.
1519

1520
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1521

1522
    """
1523
    # Ok when sharing the big job queue lock, as the drain file is created when
1524
    # the lock is exclusive.
1525
    # Needs access to protected member, pylint: disable=W0212
1526
    if self._drained:
1527
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1528

    
1529
    if not self._accepting_jobs:
1530
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1531

    
1532
    return fn(self, *args, **kwargs)
1533
  return wrapper
1534

    
1535

    
1536
class JobQueue(object):
1537
  """Queue used to manage the jobs.
1538

1539
  """
1540
  def __init__(self, context):
1541
    """Constructor for JobQueue.
1542

1543
    The constructor will initialize the job queue object and then
1544
    start loading the current jobs from disk, either for starting them
1545
    (if they were queue) or for aborting them (if they were already
1546
    running).
1547

1548
    @type context: GanetiContext
1549
    @param context: the context object for access to the configuration
1550
        data and other ganeti objects
1551

1552
    """
1553
    self.context = context
1554
    self._memcache = weakref.WeakValueDictionary()
1555
    self._my_hostname = netutils.Hostname.GetSysName()
1556

    
1557
    # The Big JobQueue lock. If a code block or method acquires it in shared
1558
    # mode safe it must guarantee concurrency with all the code acquiring it in
1559
    # shared mode, including itself. In order not to acquire it at all
1560
    # concurrency must be guaranteed with all code acquiring it in shared mode
1561
    # and all code acquiring it exclusively.
1562
    self._lock = locking.SharedLock("JobQueue")
1563

    
1564
    self.acquire = self._lock.acquire
1565
    self.release = self._lock.release
1566

    
1567
    # Accept jobs by default
1568
    self._accepting_jobs = True
1569

    
1570
    # Initialize the queue, and acquire the filelock.
1571
    # This ensures no other process is working on the job queue.
1572
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1573

    
1574
    # Read serial file
1575
    self._last_serial = jstore.ReadSerial()
1576
    assert self._last_serial is not None, ("Serial file was modified between"
1577
                                           " check in jstore and here")
1578

    
1579
    # Get initial list of nodes
1580
    self._nodes = dict((n.name, n.primary_ip)
1581
                       for n in self.context.cfg.GetAllNodesInfo().values()
1582
                       if n.master_candidate)
1583

    
1584
    # Remove master node
1585
    self._nodes.pop(self._my_hostname, None)
1586

    
1587
    # TODO: Check consistency across nodes
1588

    
1589
    self._queue_size = None
1590
    self._UpdateQueueSizeUnlocked()
1591
    assert ht.TInt(self._queue_size)
1592
    self._drained = jstore.CheckDrainFlag()
1593

    
1594
    # Job dependencies
1595
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1596
                                        self._EnqueueJobs)
1597
    self.context.glm.AddToLockMonitor(self.depmgr)
1598

    
1599
    # Setup worker pool
1600
    self._wpool = _JobQueueWorkerPool(self)
1601
    try:
1602
      self._InspectQueue()
1603
    except:
1604
      self._wpool.TerminateWorkers()
1605
      raise
1606

    
1607
  @locking.ssynchronized(_LOCK)
1608
  @_RequireOpenQueue
1609
  def _InspectQueue(self):
1610
    """Loads the whole job queue and resumes unfinished jobs.
1611

1612
    This function needs the lock here because WorkerPool.AddTask() may start a
1613
    job while we're still doing our work.
1614

1615
    """
1616
    logging.info("Inspecting job queue")
1617

    
1618
    restartjobs = []
1619

    
1620
    all_job_ids = self._GetJobIDsUnlocked()
1621
    jobs_count = len(all_job_ids)
1622
    lastinfo = time.time()
1623
    for idx, job_id in enumerate(all_job_ids):
1624
      # Give an update every 1000 jobs or 10 seconds
1625
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1626
          idx == (jobs_count - 1)):
1627
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1628
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1629
        lastinfo = time.time()
1630

    
1631
      job = self._LoadJobUnlocked(job_id)
1632

    
1633
      # a failure in loading the job can cause 'None' to be returned
1634
      if job is None:
1635
        continue
1636

    
1637
      status = job.CalcStatus()
1638

    
1639
      if status == constants.JOB_STATUS_QUEUED:
1640
        restartjobs.append(job)
1641

    
1642
      elif status in (constants.JOB_STATUS_RUNNING,
1643
                      constants.JOB_STATUS_WAITING,
1644
                      constants.JOB_STATUS_CANCELING):
1645
        logging.warning("Unfinished job %s found: %s", job.id, job)
1646

    
1647
        if status == constants.JOB_STATUS_WAITING:
1648
          # Restart job
1649
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1650
          restartjobs.append(job)
1651
        else:
1652
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1653
                                "Unclean master daemon shutdown")
1654
          job.Finalize()
1655

    
1656
        self.UpdateJobUnlocked(job)
1657

    
1658
    if restartjobs:
1659
      logging.info("Restarting %s jobs", len(restartjobs))
1660
      self._EnqueueJobsUnlocked(restartjobs)
1661

    
1662
    logging.info("Job queue inspection finished")
1663

    
1664
  def _GetRpc(self, address_list):
1665
    """Gets RPC runner with context.
1666

1667
    """
1668
    return rpc.JobQueueRunner(self.context, address_list)
1669

    
1670
  @locking.ssynchronized(_LOCK)
1671
  @_RequireOpenQueue
1672
  def AddNode(self, node):
1673
    """Register a new node with the queue.
1674

1675
    @type node: L{objects.Node}
1676
    @param node: the node object to be added
1677

1678
    """
1679
    node_name = node.name
1680
    assert node_name != self._my_hostname
1681

    
1682
    # Clean queue directory on added node
1683
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1684
    msg = result.fail_msg
1685
    if msg:
1686
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1687
                      node_name, msg)
1688

    
1689
    if not node.master_candidate:
1690
      # remove if existing, ignoring errors
1691
      self._nodes.pop(node_name, None)
1692
      # and skip the replication of the job ids
1693
      return
1694

    
1695
    # Upload the whole queue excluding archived jobs
1696
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1697

    
1698
    # Upload current serial file
1699
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1700

    
1701
    # Static address list
1702
    addrs = [node.primary_ip]
1703

    
1704
    for file_name in files:
1705
      # Read file content
1706
      content = utils.ReadFile(file_name)
1707

    
1708
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1709
                             file_name, content)
1710
      msg = result[node_name].fail_msg
1711
      if msg:
1712
        logging.error("Failed to upload file %s to node %s: %s",
1713
                      file_name, node_name, msg)
1714

    
1715
    self._nodes[node_name] = node.primary_ip
1716

    
1717
  @locking.ssynchronized(_LOCK)
1718
  @_RequireOpenQueue
1719
  def RemoveNode(self, node_name):
1720
    """Callback called when removing nodes from the cluster.
1721

1722
    @type node_name: str
1723
    @param node_name: the name of the node to remove
1724

1725
    """
1726
    self._nodes.pop(node_name, None)
1727

    
1728
  @staticmethod
1729
  def _CheckRpcResult(result, nodes, failmsg):
1730
    """Verifies the status of an RPC call.
1731

1732
    Since we aim to keep consistency should this node (the current
1733
    master) fail, we will log errors if our rpc fail, and especially
1734
    log the case when more than half of the nodes fails.
1735

1736
    @param result: the data as returned from the rpc call
1737
    @type nodes: list
1738
    @param nodes: the list of nodes we made the call to
1739
    @type failmsg: str
1740
    @param failmsg: the identifier to be used for logging
1741

1742
    """
1743
    failed = []
1744
    success = []
1745

    
1746
    for node in nodes:
1747
      msg = result[node].fail_msg
1748
      if msg:
1749
        failed.append(node)
1750
        logging.error("RPC call %s (%s) failed on node %s: %s",
1751
                      result[node].call, failmsg, node, msg)
1752
      else:
1753
        success.append(node)
1754

    
1755
    # +1 for the master node
1756
    if (len(success) + 1) < len(failed):
1757
      # TODO: Handle failing nodes
1758
      logging.error("More than half of the nodes failed")
1759

    
1760
  def _GetNodeIp(self):
1761
    """Helper for returning the node name/ip list.
1762

1763
    @rtype: (list, list)
1764
    @return: a tuple of two lists, the first one with the node
1765
        names and the second one with the node addresses
1766

1767
    """
1768
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1769
    name_list = self._nodes.keys()
1770
    addr_list = [self._nodes[name] for name in name_list]
1771
    return name_list, addr_list
1772

    
1773
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1774
    """Writes a file locally and then replicates it to all nodes.
1775

1776
    This function will replace the contents of a file on the local
1777
    node and then replicate it to all the other nodes we have.
1778

1779
    @type file_name: str
1780
    @param file_name: the path of the file to be replicated
1781
    @type data: str
1782
    @param data: the new contents of the file
1783
    @type replicate: boolean
1784
    @param replicate: whether to spread the changes to the remote nodes
1785

1786
    """
1787
    getents = runtime.GetEnts()
1788
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1789
                    gid=getents.masterd_gid)
1790

    
1791
    if replicate:
1792
      names, addrs = self._GetNodeIp()
1793
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1794
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1795

    
1796
  def _RenameFilesUnlocked(self, rename):
1797
    """Renames a file locally and then replicate the change.
1798

1799
    This function will rename a file in the local queue directory
1800
    and then replicate this rename to all the other nodes we have.
1801

1802
    @type rename: list of (old, new)
1803
    @param rename: List containing tuples mapping old to new names
1804

1805
    """
1806
    # Rename them locally
1807
    for old, new in rename:
1808
      utils.RenameFile(old, new, mkdir=True)
1809

    
1810
    # ... and on all nodes
1811
    names, addrs = self._GetNodeIp()
1812
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1813
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1814

    
1815
  def _NewSerialsUnlocked(self, count):
1816
    """Generates a new job identifier.
1817

1818
    Job identifiers are unique during the lifetime of a cluster.
1819

1820
    @type count: integer
1821
    @param count: how many serials to return
1822
    @rtype: list of int
1823
    @return: a list of job identifiers.
1824

1825
    """
1826
    assert ht.TPositiveInt(count)
1827

    
1828
    # New number
1829
    serial = self._last_serial + count
1830

    
1831
    # Write to file
1832
    self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1833
                             "%s\n" % serial, True)
1834

    
1835
    result = [jstore.FormatJobID(v)
1836
              for v in range(self._last_serial + 1, serial + 1)]
1837

    
1838
    # Keep it only if we were able to write the file
1839
    self._last_serial = serial
1840

    
1841
    assert len(result) == count
1842

    
1843
    return result
1844

    
1845
  @staticmethod
1846
  def _GetJobPath(job_id):
1847
    """Returns the job file for a given job id.
1848

1849
    @type job_id: str
1850
    @param job_id: the job identifier
1851
    @rtype: str
1852
    @return: the path to the job file
1853

1854
    """
1855
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1856

    
1857
  @staticmethod
1858
  def _GetArchivedJobPath(job_id):
1859
    """Returns the archived job file for a give job id.
1860

1861
    @type job_id: str
1862
    @param job_id: the job identifier
1863
    @rtype: str
1864
    @return: the path to the archived job file
1865

1866
    """
1867
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1868
                          jstore.GetArchiveDirectory(job_id),
1869
                          "job-%s" % job_id)
1870

    
1871
  @staticmethod
1872
  def _DetermineJobDirectories(archived):
1873
    result = [pathutils.QUEUE_DIR]
1874

    
1875
    if archived:
1876
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1877
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
1878
                        utils.ListVisibleFiles(archive_path)))
1879

    
1880
    return result
1881

    
1882
  @classmethod
1883
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1884
    """Return all known job IDs.
1885

1886
    The method only looks at disk because it's a requirement that all
1887
    jobs are present on disk (so in the _memcache we don't have any
1888
    extra IDs).
1889

1890
    @type sort: boolean
1891
    @param sort: perform sorting on the returned job ids
1892
    @rtype: list
1893
    @return: the list of job IDs
1894

1895
    """
1896
    jlist = []
1897

    
1898
    for path in cls._DetermineJobDirectories(archived):
1899
      for filename in utils.ListVisibleFiles(path):
1900
        m = constants.JOB_FILE_RE.match(filename)
1901
        if m:
1902
          jlist.append(int(m.group(1)))
1903

    
1904
    if sort:
1905
      jlist.sort()
1906
    return jlist
1907

    
1908
  def _LoadJobUnlocked(self, job_id):
1909
    """Loads a job from the disk or memory.
1910

1911
    Given a job id, this will return the cached job object if
1912
    existing, or try to load the job from the disk. If loading from
1913
    disk, it will also add the job to the cache.
1914

1915
    @type job_id: int
1916
    @param job_id: the job id
1917
    @rtype: L{_QueuedJob} or None
1918
    @return: either None or the job object
1919

1920
    """
1921
    job = self._memcache.get(job_id, None)
1922
    if job:
1923
      logging.debug("Found job %s in memcache", job_id)
1924
      assert job.writable, "Found read-only job in memcache"
1925
      return job
1926

    
1927
    try:
1928
      job = self._LoadJobFromDisk(job_id, False)
1929
      if job is None:
1930
        return job
1931
    except errors.JobFileCorrupted:
1932
      old_path = self._GetJobPath(job_id)
1933
      new_path = self._GetArchivedJobPath(job_id)
1934
      if old_path == new_path:
1935
        # job already archived (future case)
1936
        logging.exception("Can't parse job %s", job_id)
1937
      else:
1938
        # non-archived case
1939
        logging.exception("Can't parse job %s, will archive.", job_id)
1940
        self._RenameFilesUnlocked([(old_path, new_path)])
1941
      return None
1942

    
1943
    assert job.writable, "Job just loaded is not writable"
1944

    
1945
    self._memcache[job_id] = job
1946
    logging.debug("Added job %s to the cache", job_id)
1947
    return job
1948

    
1949
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1950
    """Load the given job file from disk.
1951

1952
    Given a job file, read, load and restore it in a _QueuedJob format.
1953

1954
    @type job_id: int
1955
    @param job_id: job identifier
1956
    @type try_archived: bool
1957
    @param try_archived: Whether to try loading an archived job
1958
    @rtype: L{_QueuedJob} or None
1959
    @return: either None or the job object
1960

1961
    """
1962
    path_functions = [(self._GetJobPath, False)]
1963

    
1964
    if try_archived:
1965
      path_functions.append((self._GetArchivedJobPath, True))
1966

    
1967
    raw_data = None
1968
    archived = None
1969

    
1970
    for (fn, archived) in path_functions:
1971
      filepath = fn(job_id)
1972
      logging.debug("Loading job from %s", filepath)
1973
      try:
1974
        raw_data = utils.ReadFile(filepath)
1975
      except EnvironmentError, err:
1976
        if err.errno != errno.ENOENT:
1977
          raise
1978
      else:
1979
        break
1980

    
1981
    if not raw_data:
1982
      return None
1983

    
1984
    if writable is None:
1985
      writable = not archived
1986

    
1987
    try:
1988
      data = serializer.LoadJson(raw_data)
1989
      job = _QueuedJob.Restore(self, data, writable, archived)
1990
    except Exception, err: # pylint: disable=W0703
1991
      raise errors.JobFileCorrupted(err)
1992

    
1993
    return job
1994

    
1995
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1996
    """Load the given job file from disk.
1997

1998
    Given a job file, read, load and restore it in a _QueuedJob format.
1999
    In case of error reading the job, it gets returned as None, and the
2000
    exception is logged.
2001

2002
    @type job_id: int
2003
    @param job_id: job identifier
2004
    @type try_archived: bool
2005
    @param try_archived: Whether to try loading an archived job
2006
    @rtype: L{_QueuedJob} or None
2007
    @return: either None or the job object
2008

2009
    """
2010
    try:
2011
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2012
    except (errors.JobFileCorrupted, EnvironmentError):
2013
      logging.exception("Can't load/parse job %s", job_id)
2014
      return None
2015

    
2016
  def _UpdateQueueSizeUnlocked(self):
2017
    """Update the queue size.
2018

2019
    """
2020
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2021

    
2022
  @locking.ssynchronized(_LOCK)
2023
  @_RequireOpenQueue
2024
  def SetDrainFlag(self, drain_flag):
2025
    """Sets the drain flag for the queue.
2026

2027
    @type drain_flag: boolean
2028
    @param drain_flag: Whether to set or unset the drain flag
2029

2030
    """
2031
    jstore.SetDrainFlag(drain_flag)
2032

    
2033
    self._drained = drain_flag
2034

    
2035
    return True
2036

    
2037
  @_RequireOpenQueue
2038
  def _SubmitJobUnlocked(self, job_id, ops):
2039
    """Create and store a new job.
2040

2041
    This enters the job into our job queue and also puts it on the new
2042
    queue, in order for it to be picked up by the queue processors.
2043

2044
    @type job_id: job ID
2045
    @param job_id: the job ID for the new job
2046
    @type ops: list
2047
    @param ops: The list of OpCodes that will become the new job.
2048
    @rtype: L{_QueuedJob}
2049
    @return: the job object to be queued
2050
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2051
    @raise errors.GenericError: If an opcode is not valid
2052

2053
    """
2054
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2055
      raise errors.JobQueueFull()
2056

    
2057
    job = _QueuedJob(self, job_id, ops, True)
2058

    
2059
    for idx, op in enumerate(job.ops):
2060
      # Check priority
2061
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2062
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2063
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2064
                                  " are %s" % (idx, op.priority, allowed))
2065

    
2066
      # Check job dependencies
2067
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2068
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2069
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2070
                                  " match %s: %s" %
2071
                                  (idx, opcodes.TNoRelativeJobDependencies,
2072
                                   dependencies))
2073

    
2074
    # Write to disk
2075
    self.UpdateJobUnlocked(job)
2076

    
2077
    self._queue_size += 1
2078

    
2079
    logging.debug("Adding new job %s to the cache", job_id)
2080
    self._memcache[job_id] = job
2081

    
2082
    return job
2083

    
2084
  @locking.ssynchronized(_LOCK)
2085
  @_RequireOpenQueue
2086
  @_RequireNonDrainedQueue
2087
  def SubmitJob(self, ops):
2088
    """Create and store a new job.
2089

2090
    @see: L{_SubmitJobUnlocked}
2091

2092
    """
2093
    (job_id, ) = self._NewSerialsUnlocked(1)
2094
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2095
    return job_id
2096

    
2097
  @locking.ssynchronized(_LOCK)
2098
  @_RequireOpenQueue
2099
  @_RequireNonDrainedQueue
2100
  def SubmitManyJobs(self, jobs):
2101
    """Create and store multiple jobs.
2102

2103
    @see: L{_SubmitJobUnlocked}
2104

2105
    """
2106
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2107

    
2108
    (results, added_jobs) = \
2109
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2110

    
2111
    self._EnqueueJobsUnlocked(added_jobs)
2112

    
2113
    return results
2114

    
2115
  @staticmethod
2116
  def _FormatSubmitError(msg, ops):
2117
    """Formats errors which occurred while submitting a job.
2118

2119
    """
2120
    return ("%s; opcodes %s" %
2121
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2122

    
2123
  @staticmethod
2124
  def _ResolveJobDependencies(resolve_fn, deps):
2125
    """Resolves relative job IDs in dependencies.
2126

2127
    @type resolve_fn: callable
2128
    @param resolve_fn: Function to resolve a relative job ID
2129
    @type deps: list
2130
    @param deps: Dependencies
2131
    @rtype: tuple; (boolean, string or list)
2132
    @return: If successful (first tuple item), the returned list contains
2133
      resolved job IDs along with the requested status; if not successful,
2134
      the second element is an error message
2135

2136
    """
2137
    result = []
2138

    
2139
    for (dep_job_id, dep_status) in deps:
2140
      if ht.TRelativeJobId(dep_job_id):
2141
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2142
        try:
2143
          job_id = resolve_fn(dep_job_id)
2144
        except IndexError:
2145
          # Abort
2146
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2147
      else:
2148
        job_id = dep_job_id
2149

    
2150
      result.append((job_id, dep_status))
2151

    
2152
    return (True, result)
2153

    
2154
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2155
    """Create and store multiple jobs.
2156

2157
    @see: L{_SubmitJobUnlocked}
2158

2159
    """
2160
    results = []
2161
    added_jobs = []
2162

    
2163
    def resolve_fn(job_idx, reljobid):
2164
      assert reljobid < 0
2165
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2166

    
2167
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2168
      for op in ops:
2169
        if getattr(op, opcodes.DEPEND_ATTR, None):
2170
          (status, data) = \
2171
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2172
                                         op.depends)
2173
          if not status:
2174
            # Abort resolving dependencies
2175
            assert ht.TNonEmptyString(data), "No error message"
2176
            break
2177
          # Use resolved dependencies
2178
          op.depends = data
2179
      else:
2180
        try:
2181
          job = self._SubmitJobUnlocked(job_id, ops)
2182
        except errors.GenericError, err:
2183
          status = False
2184
          data = self._FormatSubmitError(str(err), ops)
2185
        else:
2186
          status = True
2187
          data = job_id
2188
          added_jobs.append(job)
2189

    
2190
      results.append((status, data))
2191

    
2192
    return (results, added_jobs)
2193

    
2194
  @locking.ssynchronized(_LOCK)
2195
  def _EnqueueJobs(self, jobs):
2196
    """Helper function to add jobs to worker pool's queue.
2197

2198
    @type jobs: list
2199
    @param jobs: List of all jobs
2200

2201
    """
2202
    return self._EnqueueJobsUnlocked(jobs)
2203

    
2204
  def _EnqueueJobsUnlocked(self, jobs):
2205
    """Helper function to add jobs to worker pool's queue.
2206

2207
    @type jobs: list
2208
    @param jobs: List of all jobs
2209

2210
    """
2211
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2212
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2213
                             priority=[job.CalcPriority() for job in jobs])
2214

    
2215
  def _GetJobStatusForDependencies(self, job_id):
2216
    """Gets the status of a job for dependencies.
2217

2218
    @type job_id: int
2219
    @param job_id: Job ID
2220
    @raise errors.JobLost: If job can't be found
2221

2222
    """
2223
    # Not using in-memory cache as doing so would require an exclusive lock
2224

    
2225
    # Try to load from disk
2226
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2227

    
2228
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2229

    
2230
    if job:
2231
      return job.CalcStatus()
2232

    
2233
    raise errors.JobLost("Job %s not found" % job_id)
2234

    
2235
  @_RequireOpenQueue
2236
  def UpdateJobUnlocked(self, job, replicate=True):
2237
    """Update a job's on disk storage.
2238

2239
    After a job has been modified, this function needs to be called in
2240
    order to write the changes to disk and replicate them to the other
2241
    nodes.
2242

2243
    @type job: L{_QueuedJob}
2244
    @param job: the changed job
2245
    @type replicate: boolean
2246
    @param replicate: whether to replicate the change to remote nodes
2247

2248
    """
2249
    if __debug__:
2250
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2251
      assert (finalized ^ (job.end_timestamp is None))
2252
      assert job.writable, "Can't update read-only job"
2253
      assert not job.archived, "Can't update archived job"
2254

    
2255
    filename = self._GetJobPath(job.id)
2256
    data = serializer.DumpJson(job.Serialize())
2257
    logging.debug("Writing job %s to %s", job.id, filename)
2258
    self._UpdateJobQueueFile(filename, data, replicate)
2259

    
2260
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2261
                        timeout):
2262
    """Waits for changes in a job.
2263

2264
    @type job_id: int
2265
    @param job_id: Job identifier
2266
    @type fields: list of strings
2267
    @param fields: Which fields to check for changes
2268
    @type prev_job_info: list or None
2269
    @param prev_job_info: Last job information returned
2270
    @type prev_log_serial: int
2271
    @param prev_log_serial: Last job message serial number
2272
    @type timeout: float
2273
    @param timeout: maximum time to wait in seconds
2274
    @rtype: tuple (job info, log entries)
2275
    @return: a tuple of the job information as required via
2276
        the fields parameter, and the log entries as a list
2277

2278
        if the job has not changed and the timeout has expired,
2279
        we instead return a special value,
2280
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2281
        as such by the clients
2282

2283
    """
2284
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2285
                             writable=False)
2286

    
2287
    helper = _WaitForJobChangesHelper()
2288

    
2289
    return helper(self._GetJobPath(job_id), load_fn,
2290
                  fields, prev_job_info, prev_log_serial, timeout)
2291

    
2292
  @locking.ssynchronized(_LOCK)
2293
  @_RequireOpenQueue
2294
  def CancelJob(self, job_id):
2295
    """Cancels a job.
2296

2297
    This will only succeed if the job has not started yet.
2298

2299
    @type job_id: int
2300
    @param job_id: job ID of job to be cancelled.
2301

2302
    """
2303
    logging.info("Cancelling job %s", job_id)
2304

    
2305
    job = self._LoadJobUnlocked(job_id)
2306
    if not job:
2307
      logging.debug("Job %s not found", job_id)
2308
      return (False, "Job %s not found" % job_id)
2309

    
2310
    assert job.writable, "Can't cancel read-only job"
2311
    assert not job.archived, "Can't cancel archived job"
2312

    
2313
    (success, msg) = job.Cancel()
2314

    
2315
    if success:
2316
      # If the job was finalized (e.g. cancelled), this is the final write
2317
      # allowed. The job can be archived anytime.
2318
      self.UpdateJobUnlocked(job)
2319

    
2320
    return (success, msg)
2321

    
2322
  @_RequireOpenQueue
2323
  def _ArchiveJobsUnlocked(self, jobs):
2324
    """Archives jobs.
2325

2326
    @type jobs: list of L{_QueuedJob}
2327
    @param jobs: Job objects
2328
    @rtype: int
2329
    @return: Number of archived jobs
2330

2331
    """
2332
    archive_jobs = []
2333
    rename_files = []
2334
    for job in jobs:
2335
      assert job.writable, "Can't archive read-only job"
2336
      assert not job.archived, "Can't cancel archived job"
2337

    
2338
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2339
        logging.debug("Job %s is not yet done", job.id)
2340
        continue
2341

    
2342
      archive_jobs.append(job)
2343

    
2344
      old = self._GetJobPath(job.id)
2345
      new = self._GetArchivedJobPath(job.id)
2346
      rename_files.append((old, new))
2347

    
2348
    # TODO: What if 1..n files fail to rename?
2349
    self._RenameFilesUnlocked(rename_files)
2350

    
2351
    logging.debug("Successfully archived job(s) %s",
2352
                  utils.CommaJoin(job.id for job in archive_jobs))
2353

    
2354
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2355
    # the files, we update the cached queue size from the filesystem. When we
2356
    # get around to fix the TODO: above, we can use the number of actually
2357
    # archived jobs to fix this.
2358
    self._UpdateQueueSizeUnlocked()
2359
    return len(archive_jobs)
2360

    
2361
  @locking.ssynchronized(_LOCK)
2362
  @_RequireOpenQueue
2363
  def ArchiveJob(self, job_id):
2364
    """Archives a job.
2365

2366
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2367

2368
    @type job_id: int
2369
    @param job_id: Job ID of job to be archived.
2370
    @rtype: bool
2371
    @return: Whether job was archived
2372

2373
    """
2374
    logging.info("Archiving job %s", job_id)
2375

    
2376
    job = self._LoadJobUnlocked(job_id)
2377
    if not job:
2378
      logging.debug("Job %s not found", job_id)
2379
      return False
2380

    
2381
    return self._ArchiveJobsUnlocked([job]) == 1
2382

    
2383
  @locking.ssynchronized(_LOCK)
2384
  @_RequireOpenQueue
2385
  def AutoArchiveJobs(self, age, timeout):
2386
    """Archives all jobs based on age.
2387

2388
    The method will archive all jobs which are older than the age
2389
    parameter. For jobs that don't have an end timestamp, the start
2390
    timestamp will be considered. The special '-1' age will cause
2391
    archival of all jobs (that are not running or queued).
2392

2393
    @type age: int
2394
    @param age: the minimum age in seconds
2395

2396
    """
2397
    logging.info("Archiving jobs with age more than %s seconds", age)
2398

    
2399
    now = time.time()
2400
    end_time = now + timeout
2401
    archived_count = 0
2402
    last_touched = 0
2403

    
2404
    all_job_ids = self._GetJobIDsUnlocked()
2405
    pending = []
2406
    for idx, job_id in enumerate(all_job_ids):
2407
      last_touched = idx + 1
2408

    
2409
      # Not optimal because jobs could be pending
2410
      # TODO: Measure average duration for job archival and take number of
2411
      # pending jobs into account.
2412
      if time.time() > end_time:
2413
        break
2414

    
2415
      # Returns None if the job failed to load
2416
      job = self._LoadJobUnlocked(job_id)
2417
      if job:
2418
        if job.end_timestamp is None:
2419
          if job.start_timestamp is None:
2420
            job_age = job.received_timestamp
2421
          else:
2422
            job_age = job.start_timestamp
2423
        else:
2424
          job_age = job.end_timestamp
2425

    
2426
        if age == -1 or now - job_age[0] > age:
2427
          pending.append(job)
2428

    
2429
          # Archive 10 jobs at a time
2430
          if len(pending) >= 10:
2431
            archived_count += self._ArchiveJobsUnlocked(pending)
2432
            pending = []
2433

    
2434
    if pending:
2435
      archived_count += self._ArchiveJobsUnlocked(pending)
2436

    
2437
    return (archived_count, len(all_job_ids) - last_touched)
2438

    
2439
  def _Query(self, fields, qfilter):
2440
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2441
                       namefield="id")
2442

    
2443
    # Archived jobs are only looked at if the "archived" field is referenced
2444
    # either as a requested field or in the filter. By default archived jobs
2445
    # are ignored.
2446
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2447

    
2448
    job_ids = qobj.RequestedNames()
2449

    
2450
    list_all = (job_ids is None)
2451

    
2452
    if list_all:
2453
      # Since files are added to/removed from the queue atomically, there's no
2454
      # risk of getting the job ids in an inconsistent state.
2455
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2456

    
2457
    jobs = []
2458

    
2459
    for job_id in job_ids:
2460
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2461
      if job is not None or not list_all:
2462
        jobs.append((job_id, job))
2463

    
2464
    return (qobj, jobs, list_all)
2465

    
2466
  def QueryJobs(self, fields, qfilter):
2467
    """Returns a list of jobs in queue.
2468

2469
    @type fields: sequence
2470
    @param fields: List of wanted fields
2471
    @type qfilter: None or query2 filter (list)
2472
    @param qfilter: Query filter
2473

2474
    """
2475
    (qobj, ctx, _) = self._Query(fields, qfilter)
2476

    
2477
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2478

    
2479
  def OldStyleQueryJobs(self, job_ids, fields):
2480
    """Returns a list of jobs in queue.
2481

2482
    @type job_ids: list
2483
    @param job_ids: sequence of job identifiers or None for all
2484
    @type fields: list
2485
    @param fields: names of fields to return
2486
    @rtype: list
2487
    @return: list one element per job, each element being list with
2488
        the requested fields
2489

2490
    """
2491
    # backwards compat:
2492
    job_ids = [int(jid) for jid in job_ids]
2493
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2494

    
2495
    (qobj, ctx, _) = self._Query(fields, qfilter)
2496

    
2497
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2498

    
2499
  @locking.ssynchronized(_LOCK)
2500
  def PrepareShutdown(self):
2501
    """Prepare to stop the job queue.
2502

2503
    Disables execution of jobs in the workerpool and returns whether there are
2504
    any jobs currently running. If the latter is the case, the job queue is not
2505
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2506
    be called without interfering with any job. Queued and unfinished jobs will
2507
    be resumed next time.
2508

2509
    Once this function has been called no new job submissions will be accepted
2510
    (see L{_RequireNonDrainedQueue}).
2511

2512
    @rtype: bool
2513
    @return: Whether there are any running jobs
2514

2515
    """
2516
    if self._accepting_jobs:
2517
      self._accepting_jobs = False
2518

    
2519
      # Tell worker pool to stop processing pending tasks
2520
      self._wpool.SetActive(False)
2521

    
2522
    return self._wpool.HasRunningTasks()
2523

    
2524
  @locking.ssynchronized(_LOCK)
2525
  @_RequireOpenQueue
2526
  def Shutdown(self):
2527
    """Stops the job queue.
2528

2529
    This shutdowns all the worker threads an closes the queue.
2530

2531
    """
2532
    self._wpool.TerminateWorkers()
2533

    
2534
    self._queue_filelock.Close()
2535
    self._queue_filelock = None