Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ aebd0e4e

History | View | Annotate | Download (74.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import time
35
import weakref
36
import threading
37
import itertools
38

    
39
try:
40
  # pylint: disable=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
59
from ganeti import ht
60
from ganeti import query
61
from ganeti import qlang
62
from ganeti import pathutils
63
from ganeti import vcluster
64

    
65

    
66
JOBQUEUE_THREADS = 25
67

    
68
# member lock names to be passed to @ssynchronized decorator
69
_LOCK = "_lock"
70
_QUEUE = "_queue"
71

    
72

    
73
class CancelJob(Exception):
74
  """Special exception to cancel a job.
75

76
  """
77

    
78

    
79
def TimeStampNow():
80
  """Returns the current timestamp.
81

82
  @rtype: tuple
83
  @return: the current time in the (seconds, microseconds) format
84

85
  """
86
  return utils.SplitTime(time.time())
87

    
88

    
89
def _CallJqUpdate(runner, names, file_name, content):
90
  """Updates job queue file after virtualizing filename.
91

92
  """
93
  virt_file_name = vcluster.MakeVirtualPath(file_name)
94
  return runner.call_jobqueue_update(names, virt_file_name, content)
95

    
96

    
97
class _SimpleJobQuery:
98
  """Wrapper for job queries.
99

100
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
101

102
  """
103
  def __init__(self, fields):
104
    """Initializes this class.
105

106
    """
107
    self._query = query.Query(query.JOB_FIELDS, fields)
108

    
109
  def __call__(self, job):
110
    """Executes a job query using cached field list.
111

112
    """
113
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
114

    
115

    
116
class _QueuedOpCode(object):
117
  """Encapsulates an opcode object.
118

119
  @ivar log: holds the execution log and consists of tuples
120
  of the form C{(log_serial, timestamp, level, message)}
121
  @ivar input: the OpCode we encapsulate
122
  @ivar status: the current status
123
  @ivar result: the result of the LU execution
124
  @ivar start_timestamp: timestamp for the start of the execution
125
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
126
  @ivar stop_timestamp: timestamp for the end of the execution
127

128
  """
129
  __slots__ = ["input", "status", "result", "log", "priority",
130
               "start_timestamp", "exec_timestamp", "end_timestamp",
131
               "__weakref__"]
132

    
133
  def __init__(self, op):
134
    """Initializes instances of this class.
135

136
    @type op: L{opcodes.OpCode}
137
    @param op: the opcode we encapsulate
138

139
    """
140
    self.input = op
141
    self.status = constants.OP_STATUS_QUEUED
142
    self.result = None
143
    self.log = []
144
    self.start_timestamp = None
145
    self.exec_timestamp = None
146
    self.end_timestamp = None
147

    
148
    # Get initial priority (it might change during the lifetime of this opcode)
149
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
150

    
151
  @classmethod
152
  def Restore(cls, state):
153
    """Restore the _QueuedOpCode from the serialized form.
154

155
    @type state: dict
156
    @param state: the serialized state
157
    @rtype: _QueuedOpCode
158
    @return: a new _QueuedOpCode instance
159

160
    """
161
    obj = _QueuedOpCode.__new__(cls)
162
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
163
    obj.status = state["status"]
164
    obj.result = state["result"]
165
    obj.log = state["log"]
166
    obj.start_timestamp = state.get("start_timestamp", None)
167
    obj.exec_timestamp = state.get("exec_timestamp", None)
168
    obj.end_timestamp = state.get("end_timestamp", None)
169
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
170
    return obj
171

    
172
  def Serialize(self):
173
    """Serializes this _QueuedOpCode.
174

175
    @rtype: dict
176
    @return: the dictionary holding the serialized state
177

178
    """
179
    return {
180
      "input": self.input.__getstate__(),
181
      "status": self.status,
182
      "result": self.result,
183
      "log": self.log,
184
      "start_timestamp": self.start_timestamp,
185
      "exec_timestamp": self.exec_timestamp,
186
      "end_timestamp": self.end_timestamp,
187
      "priority": self.priority,
188
      }
189

    
190

    
191
class _QueuedJob(object):
192
  """In-memory job representation.
193

194
  This is what we use to track the user-submitted jobs. Locking must
195
  be taken care of by users of this class.
196

197
  @type queue: L{JobQueue}
198
  @ivar queue: the parent queue
199
  @ivar id: the job ID
200
  @type ops: list
201
  @ivar ops: the list of _QueuedOpCode that constitute the job
202
  @type log_serial: int
203
  @ivar log_serial: holds the index for the next log entry
204
  @ivar received_timestamp: the timestamp for when the job was received
205
  @ivar start_timestmap: the timestamp for start of execution
206
  @ivar end_timestamp: the timestamp for end of execution
207
  @ivar writable: Whether the job is allowed to be modified
208

209
  """
210
  # pylint: disable=W0212
211
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
212
               "received_timestamp", "start_timestamp", "end_timestamp",
213
               "__weakref__", "processor_lock", "writable", "archived"]
214

    
215
  def __init__(self, queue, job_id, ops, writable):
216
    """Constructor for the _QueuedJob.
217

218
    @type queue: L{JobQueue}
219
    @param queue: our parent queue
220
    @type job_id: job_id
221
    @param job_id: our job id
222
    @type ops: list
223
    @param ops: the list of opcodes we hold, which will be encapsulated
224
        in _QueuedOpCodes
225
    @type writable: bool
226
    @param writable: Whether job can be modified
227

228
    """
229
    if not ops:
230
      raise errors.GenericError("A job needs at least one opcode")
231

    
232
    self.queue = queue
233
    self.id = int(job_id)
234
    self.ops = [_QueuedOpCode(op) for op in ops]
235
    self.log_serial = 0
236
    self.received_timestamp = TimeStampNow()
237
    self.start_timestamp = None
238
    self.end_timestamp = None
239
    self.archived = False
240

    
241
    self._InitInMemory(self, writable)
242

    
243
    assert not self.archived, "New jobs can not be marked as archived"
244

    
245
  @staticmethod
246
  def _InitInMemory(obj, writable):
247
    """Initializes in-memory variables.
248

249
    """
250
    obj.writable = writable
251
    obj.ops_iter = None
252
    obj.cur_opctx = None
253

    
254
    # Read-only jobs are not processed and therefore don't need a lock
255
    if writable:
256
      obj.processor_lock = threading.Lock()
257
    else:
258
      obj.processor_lock = None
259

    
260
  def __repr__(self):
261
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
262
              "id=%s" % self.id,
263
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
264

    
265
    return "<%s at %#x>" % (" ".join(status), id(self))
266

    
267
  @classmethod
268
  def Restore(cls, queue, state, writable, archived):
269
    """Restore a _QueuedJob from serialized state:
270

271
    @type queue: L{JobQueue}
272
    @param queue: to which queue the restored job belongs
273
    @type state: dict
274
    @param state: the serialized state
275
    @type writable: bool
276
    @param writable: Whether job can be modified
277
    @type archived: bool
278
    @param archived: Whether job was already archived
279
    @rtype: _JobQueue
280
    @return: the restored _JobQueue instance
281

282
    """
283
    obj = _QueuedJob.__new__(cls)
284
    obj.queue = queue
285
    obj.id = int(state["id"])
286
    obj.received_timestamp = state.get("received_timestamp", None)
287
    obj.start_timestamp = state.get("start_timestamp", None)
288
    obj.end_timestamp = state.get("end_timestamp", None)
289
    obj.archived = archived
290

    
291
    obj.ops = []
292
    obj.log_serial = 0
293
    for op_state in state["ops"]:
294
      op = _QueuedOpCode.Restore(op_state)
295
      for log_entry in op.log:
296
        obj.log_serial = max(obj.log_serial, log_entry[0])
297
      obj.ops.append(op)
298

    
299
    cls._InitInMemory(obj, writable)
300

    
301
    return obj
302

    
303
  def Serialize(self):
304
    """Serialize the _JobQueue instance.
305

306
    @rtype: dict
307
    @return: the serialized state
308

309
    """
310
    return {
311
      "id": self.id,
312
      "ops": [op.Serialize() for op in self.ops],
313
      "start_timestamp": self.start_timestamp,
314
      "end_timestamp": self.end_timestamp,
315
      "received_timestamp": self.received_timestamp,
316
      }
317

    
318
  def CalcStatus(self):
319
    """Compute the status of this job.
320

321
    This function iterates over all the _QueuedOpCodes in the job and
322
    based on their status, computes the job status.
323

324
    The algorithm is:
325
      - if we find a cancelled, or finished with error, the job
326
        status will be the same
327
      - otherwise, the last opcode with the status one of:
328
          - waitlock
329
          - canceling
330
          - running
331

332
        will determine the job status
333

334
      - otherwise, it means either all opcodes are queued, or success,
335
        and the job status will be the same
336

337
    @return: the job status
338

339
    """
340
    status = constants.JOB_STATUS_QUEUED
341

    
342
    all_success = True
343
    for op in self.ops:
344
      if op.status == constants.OP_STATUS_SUCCESS:
345
        continue
346

    
347
      all_success = False
348

    
349
      if op.status == constants.OP_STATUS_QUEUED:
350
        pass
351
      elif op.status == constants.OP_STATUS_WAITING:
352
        status = constants.JOB_STATUS_WAITING
353
      elif op.status == constants.OP_STATUS_RUNNING:
354
        status = constants.JOB_STATUS_RUNNING
355
      elif op.status == constants.OP_STATUS_CANCELING:
356
        status = constants.JOB_STATUS_CANCELING
357
        break
358
      elif op.status == constants.OP_STATUS_ERROR:
359
        status = constants.JOB_STATUS_ERROR
360
        # The whole job fails if one opcode failed
361
        break
362
      elif op.status == constants.OP_STATUS_CANCELED:
363
        status = constants.OP_STATUS_CANCELED
364
        break
365

    
366
    if all_success:
367
      status = constants.JOB_STATUS_SUCCESS
368

    
369
    return status
370

    
371
  def CalcPriority(self):
372
    """Gets the current priority for this job.
373

374
    Only unfinished opcodes are considered. When all are done, the default
375
    priority is used.
376

377
    @rtype: int
378

379
    """
380
    priorities = [op.priority for op in self.ops
381
                  if op.status not in constants.OPS_FINALIZED]
382

    
383
    if not priorities:
384
      # All opcodes are done, assume default priority
385
      return constants.OP_PRIO_DEFAULT
386

    
387
    return min(priorities)
388

    
389
  def GetLogEntries(self, newer_than):
390
    """Selectively returns the log entries.
391

392
    @type newer_than: None or int
393
    @param newer_than: if this is None, return all log entries,
394
        otherwise return only the log entries with serial higher
395
        than this value
396
    @rtype: list
397
    @return: the list of the log entries selected
398

399
    """
400
    if newer_than is None:
401
      serial = -1
402
    else:
403
      serial = newer_than
404

    
405
    entries = []
406
    for op in self.ops:
407
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
408

    
409
    return entries
410

    
411
  def GetInfo(self, fields):
412
    """Returns information about a job.
413

414
    @type fields: list
415
    @param fields: names of fields to return
416
    @rtype: list
417
    @return: list with one element for each field
418
    @raise errors.OpExecError: when an invalid field
419
        has been passed
420

421
    """
422
    return _SimpleJobQuery(fields)(self)
423

    
424
  def MarkUnfinishedOps(self, status, result):
425
    """Mark unfinished opcodes with a given status and result.
426

427
    This is an utility function for marking all running or waiting to
428
    be run opcodes with a given status. Opcodes which are already
429
    finalised are not changed.
430

431
    @param status: a given opcode status
432
    @param result: the opcode result
433

434
    """
435
    not_marked = True
436
    for op in self.ops:
437
      if op.status in constants.OPS_FINALIZED:
438
        assert not_marked, "Finalized opcodes found after non-finalized ones"
439
        continue
440
      op.status = status
441
      op.result = result
442
      not_marked = False
443

    
444
  def Finalize(self):
445
    """Marks the job as finalized.
446

447
    """
448
    self.end_timestamp = TimeStampNow()
449

    
450
  def Cancel(self):
451
    """Marks job as canceled/-ing if possible.
452

453
    @rtype: tuple; (bool, string)
454
    @return: Boolean describing whether job was successfully canceled or marked
455
      as canceling and a text message
456

457
    """
458
    status = self.CalcStatus()
459

    
460
    if status == constants.JOB_STATUS_QUEUED:
461
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
462
                             "Job canceled by request")
463
      self.Finalize()
464
      return (True, "Job %s canceled" % self.id)
465

    
466
    elif status == constants.JOB_STATUS_WAITING:
467
      # The worker will notice the new status and cancel the job
468
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
469
      return (True, "Job %s will be canceled" % self.id)
470

    
471
    else:
472
      logging.debug("Job %s is no longer waiting in the queue", self.id)
473
      return (False, "Job %s is no longer waiting in the queue" % self.id)
474

    
475

    
476
class _OpExecCallbacks(mcpu.OpExecCbBase):
477
  def __init__(self, queue, job, op):
478
    """Initializes this class.
479

480
    @type queue: L{JobQueue}
481
    @param queue: Job queue
482
    @type job: L{_QueuedJob}
483
    @param job: Job object
484
    @type op: L{_QueuedOpCode}
485
    @param op: OpCode
486

487
    """
488
    assert queue, "Queue is missing"
489
    assert job, "Job is missing"
490
    assert op, "Opcode is missing"
491

    
492
    self._queue = queue
493
    self._job = job
494
    self._op = op
495

    
496
  def _CheckCancel(self):
497
    """Raises an exception to cancel the job if asked to.
498

499
    """
500
    # Cancel here if we were asked to
501
    if self._op.status == constants.OP_STATUS_CANCELING:
502
      logging.debug("Canceling opcode")
503
      raise CancelJob()
504

    
505
  @locking.ssynchronized(_QUEUE, shared=1)
506
  def NotifyStart(self):
507
    """Mark the opcode as running, not lock-waiting.
508

509
    This is called from the mcpu code as a notifier function, when the LU is
510
    finally about to start the Exec() method. Of course, to have end-user
511
    visible results, the opcode must be initially (before calling into
512
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
513

514
    """
515
    assert self._op in self._job.ops
516
    assert self._op.status in (constants.OP_STATUS_WAITING,
517
                               constants.OP_STATUS_CANCELING)
518

    
519
    # Cancel here if we were asked to
520
    self._CheckCancel()
521

    
522
    logging.debug("Opcode is now running")
523

    
524
    self._op.status = constants.OP_STATUS_RUNNING
525
    self._op.exec_timestamp = TimeStampNow()
526

    
527
    # And finally replicate the job status
528
    self._queue.UpdateJobUnlocked(self._job)
529

    
530
  @locking.ssynchronized(_QUEUE, shared=1)
531
  def _AppendFeedback(self, timestamp, log_type, log_msg):
532
    """Internal feedback append function, with locks
533

534
    """
535
    self._job.log_serial += 1
536
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
537
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
538

    
539
  def Feedback(self, *args):
540
    """Append a log entry.
541

542
    """
543
    assert len(args) < 3
544

    
545
    if len(args) == 1:
546
      log_type = constants.ELOG_MESSAGE
547
      log_msg = args[0]
548
    else:
549
      (log_type, log_msg) = args
550

    
551
    # The time is split to make serialization easier and not lose
552
    # precision.
553
    timestamp = utils.SplitTime(time.time())
554
    self._AppendFeedback(timestamp, log_type, log_msg)
555

    
556
  def CheckCancel(self):
557
    """Check whether job has been cancelled.
558

559
    """
560
    assert self._op.status in (constants.OP_STATUS_WAITING,
561
                               constants.OP_STATUS_CANCELING)
562

    
563
    # Cancel here if we were asked to
564
    self._CheckCancel()
565

    
566
  def SubmitManyJobs(self, jobs):
567
    """Submits jobs for processing.
568

569
    See L{JobQueue.SubmitManyJobs}.
570

571
    """
572
    # Locking is done in job queue
573
    return self._queue.SubmitManyJobs(jobs)
574

    
575

    
576
class _JobChangesChecker(object):
577
  def __init__(self, fields, prev_job_info, prev_log_serial):
578
    """Initializes this class.
579

580
    @type fields: list of strings
581
    @param fields: Fields requested by LUXI client
582
    @type prev_job_info: string
583
    @param prev_job_info: previous job info, as passed by the LUXI client
584
    @type prev_log_serial: string
585
    @param prev_log_serial: previous job serial, as passed by the LUXI client
586

587
    """
588
    self._squery = _SimpleJobQuery(fields)
589
    self._prev_job_info = prev_job_info
590
    self._prev_log_serial = prev_log_serial
591

    
592
  def __call__(self, job):
593
    """Checks whether job has changed.
594

595
    @type job: L{_QueuedJob}
596
    @param job: Job object
597

598
    """
599
    assert not job.writable, "Expected read-only job"
600

    
601
    status = job.CalcStatus()
602
    job_info = self._squery(job)
603
    log_entries = job.GetLogEntries(self._prev_log_serial)
604

    
605
    # Serializing and deserializing data can cause type changes (e.g. from
606
    # tuple to list) or precision loss. We're doing it here so that we get
607
    # the same modifications as the data received from the client. Without
608
    # this, the comparison afterwards might fail without the data being
609
    # significantly different.
610
    # TODO: we just deserialized from disk, investigate how to make sure that
611
    # the job info and log entries are compatible to avoid this further step.
612
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
613
    # efficient, though floats will be tricky
614
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
615
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
616

    
617
    # Don't even try to wait if the job is no longer running, there will be
618
    # no changes.
619
    if (status not in (constants.JOB_STATUS_QUEUED,
620
                       constants.JOB_STATUS_RUNNING,
621
                       constants.JOB_STATUS_WAITING) or
622
        job_info != self._prev_job_info or
623
        (log_entries and self._prev_log_serial != log_entries[0][0])):
624
      logging.debug("Job %s changed", job.id)
625
      return (job_info, log_entries)
626

    
627
    return None
628

    
629

    
630
class _JobFileChangesWaiter(object):
631
  def __init__(self, filename):
632
    """Initializes this class.
633

634
    @type filename: string
635
    @param filename: Path to job file
636
    @raises errors.InotifyError: if the notifier cannot be setup
637

638
    """
639
    self._wm = pyinotify.WatchManager()
640
    self._inotify_handler = \
641
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
642
    self._notifier = \
643
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
644
    try:
645
      self._inotify_handler.enable()
646
    except Exception:
647
      # pyinotify doesn't close file descriptors automatically
648
      self._notifier.stop()
649
      raise
650

    
651
  def _OnInotify(self, notifier_enabled):
652
    """Callback for inotify.
653

654
    """
655
    if not notifier_enabled:
656
      self._inotify_handler.enable()
657

    
658
  def Wait(self, timeout):
659
    """Waits for the job file to change.
660

661
    @type timeout: float
662
    @param timeout: Timeout in seconds
663
    @return: Whether there have been events
664

665
    """
666
    assert timeout >= 0
667
    have_events = self._notifier.check_events(timeout * 1000)
668
    if have_events:
669
      self._notifier.read_events()
670
    self._notifier.process_events()
671
    return have_events
672

    
673
  def Close(self):
674
    """Closes underlying notifier and its file descriptor.
675

676
    """
677
    self._notifier.stop()
678

    
679

    
680
class _JobChangesWaiter(object):
681
  def __init__(self, filename):
682
    """Initializes this class.
683

684
    @type filename: string
685
    @param filename: Path to job file
686

687
    """
688
    self._filewaiter = None
689
    self._filename = filename
690

    
691
  def Wait(self, timeout):
692
    """Waits for a job to change.
693

694
    @type timeout: float
695
    @param timeout: Timeout in seconds
696
    @return: Whether there have been events
697

698
    """
699
    if self._filewaiter:
700
      return self._filewaiter.Wait(timeout)
701

    
702
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
703
    # If this point is reached, return immediately and let caller check the job
704
    # file again in case there were changes since the last check. This avoids a
705
    # race condition.
706
    self._filewaiter = _JobFileChangesWaiter(self._filename)
707

    
708
    return True
709

    
710
  def Close(self):
711
    """Closes underlying waiter.
712

713
    """
714
    if self._filewaiter:
715
      self._filewaiter.Close()
716

    
717

    
718
class _WaitForJobChangesHelper(object):
719
  """Helper class using inotify to wait for changes in a job file.
720

721
  This class takes a previous job status and serial, and alerts the client when
722
  the current job status has changed.
723

724
  """
725
  @staticmethod
726
  def _CheckForChanges(counter, job_load_fn, check_fn):
727
    if counter.next() > 0:
728
      # If this isn't the first check the job is given some more time to change
729
      # again. This gives better performance for jobs generating many
730
      # changes/messages.
731
      time.sleep(0.1)
732

    
733
    job = job_load_fn()
734
    if not job:
735
      raise errors.JobLost()
736

    
737
    result = check_fn(job)
738
    if result is None:
739
      raise utils.RetryAgain()
740

    
741
    return result
742

    
743
  def __call__(self, filename, job_load_fn,
744
               fields, prev_job_info, prev_log_serial, timeout):
745
    """Waits for changes on a job.
746

747
    @type filename: string
748
    @param filename: File on which to wait for changes
749
    @type job_load_fn: callable
750
    @param job_load_fn: Function to load job
751
    @type fields: list of strings
752
    @param fields: Which fields to check for changes
753
    @type prev_job_info: list or None
754
    @param prev_job_info: Last job information returned
755
    @type prev_log_serial: int
756
    @param prev_log_serial: Last job message serial number
757
    @type timeout: float
758
    @param timeout: maximum time to wait in seconds
759

760
    """
761
    counter = itertools.count()
762
    try:
763
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
764
      waiter = _JobChangesWaiter(filename)
765
      try:
766
        return utils.Retry(compat.partial(self._CheckForChanges,
767
                                          counter, job_load_fn, check_fn),
768
                           utils.RETRY_REMAINING_TIME, timeout,
769
                           wait_fn=waiter.Wait)
770
      finally:
771
        waiter.Close()
772
    except (errors.InotifyError, errors.JobLost):
773
      return None
774
    except utils.RetryTimeout:
775
      return constants.JOB_NOTCHANGED
776

    
777

    
778
def _EncodeOpError(err):
779
  """Encodes an error which occurred while processing an opcode.
780

781
  """
782
  if isinstance(err, errors.GenericError):
783
    to_encode = err
784
  else:
785
    to_encode = errors.OpExecError(str(err))
786

    
787
  return errors.EncodeException(to_encode)
788

    
789

    
790
class _TimeoutStrategyWrapper:
791
  def __init__(self, fn):
792
    """Initializes this class.
793

794
    """
795
    self._fn = fn
796
    self._next = None
797

    
798
  def _Advance(self):
799
    """Gets the next timeout if necessary.
800

801
    """
802
    if self._next is None:
803
      self._next = self._fn()
804

    
805
  def Peek(self):
806
    """Returns the next timeout.
807

808
    """
809
    self._Advance()
810
    return self._next
811

    
812
  def Next(self):
813
    """Returns the current timeout and advances the internal state.
814

815
    """
816
    self._Advance()
817
    result = self._next
818
    self._next = None
819
    return result
820

    
821

    
822
class _OpExecContext:
823
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
824
    """Initializes this class.
825

826
    """
827
    self.op = op
828
    self.index = index
829
    self.log_prefix = log_prefix
830
    self.summary = op.input.Summary()
831

    
832
    # Create local copy to modify
833
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
834
      self.jobdeps = op.input.depends[:]
835
    else:
836
      self.jobdeps = None
837

    
838
    self._timeout_strategy_factory = timeout_strategy_factory
839
    self._ResetTimeoutStrategy()
840

    
841
  def _ResetTimeoutStrategy(self):
842
    """Creates a new timeout strategy.
843

844
    """
845
    self._timeout_strategy = \
846
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
847

    
848
  def CheckPriorityIncrease(self):
849
    """Checks whether priority can and should be increased.
850

851
    Called when locks couldn't be acquired.
852

853
    """
854
    op = self.op
855

    
856
    # Exhausted all retries and next round should not use blocking acquire
857
    # for locks?
858
    if (self._timeout_strategy.Peek() is None and
859
        op.priority > constants.OP_PRIO_HIGHEST):
860
      logging.debug("Increasing priority")
861
      op.priority -= 1
862
      self._ResetTimeoutStrategy()
863
      return True
864

    
865
    return False
866

    
867
  def GetNextLockTimeout(self):
868
    """Returns the next lock acquire timeout.
869

870
    """
871
    return self._timeout_strategy.Next()
872

    
873

    
874
class _JobProcessor(object):
875
  (DEFER,
876
   WAITDEP,
877
   FINISHED) = range(1, 4)
878

    
879
  def __init__(self, queue, opexec_fn, job,
880
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
881
    """Initializes this class.
882

883
    """
884
    self.queue = queue
885
    self.opexec_fn = opexec_fn
886
    self.job = job
887
    self._timeout_strategy_factory = _timeout_strategy_factory
888

    
889
  @staticmethod
890
  def _FindNextOpcode(job, timeout_strategy_factory):
891
    """Locates the next opcode to run.
892

893
    @type job: L{_QueuedJob}
894
    @param job: Job object
895
    @param timeout_strategy_factory: Callable to create new timeout strategy
896

897
    """
898
    # Create some sort of a cache to speed up locating next opcode for future
899
    # lookups
900
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
901
    # pending and one for processed ops.
902
    if job.ops_iter is None:
903
      job.ops_iter = enumerate(job.ops)
904

    
905
    # Find next opcode to run
906
    while True:
907
      try:
908
        (idx, op) = job.ops_iter.next()
909
      except StopIteration:
910
        raise errors.ProgrammerError("Called for a finished job")
911

    
912
      if op.status == constants.OP_STATUS_RUNNING:
913
        # Found an opcode already marked as running
914
        raise errors.ProgrammerError("Called for job marked as running")
915

    
916
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
917
                             timeout_strategy_factory)
918

    
919
      if op.status not in constants.OPS_FINALIZED:
920
        return opctx
921

    
922
      # This is a job that was partially completed before master daemon
923
      # shutdown, so it can be expected that some opcodes are already
924
      # completed successfully (if any did error out, then the whole job
925
      # should have been aborted and not resubmitted for processing).
926
      logging.info("%s: opcode %s already processed, skipping",
927
                   opctx.log_prefix, opctx.summary)
928

    
929
  @staticmethod
930
  def _MarkWaitlock(job, op):
931
    """Marks an opcode as waiting for locks.
932

933
    The job's start timestamp is also set if necessary.
934

935
    @type job: L{_QueuedJob}
936
    @param job: Job object
937
    @type op: L{_QueuedOpCode}
938
    @param op: Opcode object
939

940
    """
941
    assert op in job.ops
942
    assert op.status in (constants.OP_STATUS_QUEUED,
943
                         constants.OP_STATUS_WAITING)
944

    
945
    update = False
946

    
947
    op.result = None
948

    
949
    if op.status == constants.OP_STATUS_QUEUED:
950
      op.status = constants.OP_STATUS_WAITING
951
      update = True
952

    
953
    if op.start_timestamp is None:
954
      op.start_timestamp = TimeStampNow()
955
      update = True
956

    
957
    if job.start_timestamp is None:
958
      job.start_timestamp = op.start_timestamp
959
      update = True
960

    
961
    assert op.status == constants.OP_STATUS_WAITING
962

    
963
    return update
964

    
965
  @staticmethod
966
  def _CheckDependencies(queue, job, opctx):
967
    """Checks if an opcode has dependencies and if so, processes them.
968

969
    @type queue: L{JobQueue}
970
    @param queue: Queue object
971
    @type job: L{_QueuedJob}
972
    @param job: Job object
973
    @type opctx: L{_OpExecContext}
974
    @param opctx: Opcode execution context
975
    @rtype: bool
976
    @return: Whether opcode will be re-scheduled by dependency tracker
977

978
    """
979
    op = opctx.op
980

    
981
    result = False
982

    
983
    while opctx.jobdeps:
984
      (dep_job_id, dep_status) = opctx.jobdeps[0]
985

    
986
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
987
                                                          dep_status)
988
      assert ht.TNonEmptyString(depmsg), "No dependency message"
989

    
990
      logging.info("%s: %s", opctx.log_prefix, depmsg)
991

    
992
      if depresult == _JobDependencyManager.CONTINUE:
993
        # Remove dependency and continue
994
        opctx.jobdeps.pop(0)
995

    
996
      elif depresult == _JobDependencyManager.WAIT:
997
        # Need to wait for notification, dependency tracker will re-add job
998
        # to workerpool
999
        result = True
1000
        break
1001

    
1002
      elif depresult == _JobDependencyManager.CANCEL:
1003
        # Job was cancelled, cancel this job as well
1004
        job.Cancel()
1005
        assert op.status == constants.OP_STATUS_CANCELING
1006
        break
1007

    
1008
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1009
                         _JobDependencyManager.ERROR):
1010
        # Job failed or there was an error, this job must fail
1011
        op.status = constants.OP_STATUS_ERROR
1012
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1013
        break
1014

    
1015
      else:
1016
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1017
                                     depresult)
1018

    
1019
    return result
1020

    
1021
  def _ExecOpCodeUnlocked(self, opctx):
1022
    """Processes one opcode and returns the result.
1023

1024
    """
1025
    op = opctx.op
1026

    
1027
    assert op.status == constants.OP_STATUS_WAITING
1028

    
1029
    timeout = opctx.GetNextLockTimeout()
1030

    
1031
    try:
1032
      # Make sure not to hold queue lock while calling ExecOpCode
1033
      result = self.opexec_fn(op.input,
1034
                              _OpExecCallbacks(self.queue, self.job, op),
1035
                              timeout=timeout, priority=op.priority)
1036
    except mcpu.LockAcquireTimeout:
1037
      assert timeout is not None, "Received timeout for blocking acquire"
1038
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1039

    
1040
      assert op.status in (constants.OP_STATUS_WAITING,
1041
                           constants.OP_STATUS_CANCELING)
1042

    
1043
      # Was job cancelled while we were waiting for the lock?
1044
      if op.status == constants.OP_STATUS_CANCELING:
1045
        return (constants.OP_STATUS_CANCELING, None)
1046

    
1047
      # Stay in waitlock while trying to re-acquire lock
1048
      return (constants.OP_STATUS_WAITING, None)
1049
    except CancelJob:
1050
      logging.exception("%s: Canceling job", opctx.log_prefix)
1051
      assert op.status == constants.OP_STATUS_CANCELING
1052
      return (constants.OP_STATUS_CANCELING, None)
1053
    except Exception, err: # pylint: disable=W0703
1054
      logging.exception("%s: Caught exception in %s",
1055
                        opctx.log_prefix, opctx.summary)
1056
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1057
    else:
1058
      logging.debug("%s: %s successful",
1059
                    opctx.log_prefix, opctx.summary)
1060
      return (constants.OP_STATUS_SUCCESS, result)
1061

    
1062
  def __call__(self, _nextop_fn=None):
1063
    """Continues execution of a job.
1064

1065
    @param _nextop_fn: Callback function for tests
1066
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1067
      be deferred and C{WAITDEP} if the dependency manager
1068
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1069

1070
    """
1071
    queue = self.queue
1072
    job = self.job
1073

    
1074
    logging.debug("Processing job %s", job.id)
1075

    
1076
    queue.acquire(shared=1)
1077
    try:
1078
      opcount = len(job.ops)
1079

    
1080
      assert job.writable, "Expected writable job"
1081

    
1082
      # Don't do anything for finalized jobs
1083
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1084
        return self.FINISHED
1085

    
1086
      # Is a previous opcode still pending?
1087
      if job.cur_opctx:
1088
        opctx = job.cur_opctx
1089
        job.cur_opctx = None
1090
      else:
1091
        if __debug__ and _nextop_fn:
1092
          _nextop_fn()
1093
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1094

    
1095
      op = opctx.op
1096

    
1097
      # Consistency check
1098
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1099
                                     constants.OP_STATUS_CANCELING)
1100
                        for i in job.ops[opctx.index + 1:])
1101

    
1102
      assert op.status in (constants.OP_STATUS_QUEUED,
1103
                           constants.OP_STATUS_WAITING,
1104
                           constants.OP_STATUS_CANCELING)
1105

    
1106
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1107
              op.priority >= constants.OP_PRIO_HIGHEST)
1108

    
1109
      waitjob = None
1110

    
1111
      if op.status != constants.OP_STATUS_CANCELING:
1112
        assert op.status in (constants.OP_STATUS_QUEUED,
1113
                             constants.OP_STATUS_WAITING)
1114

    
1115
        # Prepare to start opcode
1116
        if self._MarkWaitlock(job, op):
1117
          # Write to disk
1118
          queue.UpdateJobUnlocked(job)
1119

    
1120
        assert op.status == constants.OP_STATUS_WAITING
1121
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1122
        assert job.start_timestamp and op.start_timestamp
1123
        assert waitjob is None
1124

    
1125
        # Check if waiting for a job is necessary
1126
        waitjob = self._CheckDependencies(queue, job, opctx)
1127

    
1128
        assert op.status in (constants.OP_STATUS_WAITING,
1129
                             constants.OP_STATUS_CANCELING,
1130
                             constants.OP_STATUS_ERROR)
1131

    
1132
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1133
                                         constants.OP_STATUS_ERROR)):
1134
          logging.info("%s: opcode %s waiting for locks",
1135
                       opctx.log_prefix, opctx.summary)
1136

    
1137
          assert not opctx.jobdeps, "Not all dependencies were removed"
1138

    
1139
          queue.release()
1140
          try:
1141
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1142
          finally:
1143
            queue.acquire(shared=1)
1144

    
1145
          op.status = op_status
1146
          op.result = op_result
1147

    
1148
          assert not waitjob
1149

    
1150
        if op.status == constants.OP_STATUS_WAITING:
1151
          # Couldn't get locks in time
1152
          assert not op.end_timestamp
1153
        else:
1154
          # Finalize opcode
1155
          op.end_timestamp = TimeStampNow()
1156

    
1157
          if op.status == constants.OP_STATUS_CANCELING:
1158
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1159
                                  for i in job.ops[opctx.index:])
1160
          else:
1161
            assert op.status in constants.OPS_FINALIZED
1162

    
1163
      if op.status == constants.OP_STATUS_WAITING or waitjob:
1164
        finalize = False
1165

    
1166
        if not waitjob and opctx.CheckPriorityIncrease():
1167
          # Priority was changed, need to update on-disk file
1168
          queue.UpdateJobUnlocked(job)
1169

    
1170
        # Keep around for another round
1171
        job.cur_opctx = opctx
1172

    
1173
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1174
                op.priority >= constants.OP_PRIO_HIGHEST)
1175

    
1176
        # In no case must the status be finalized here
1177
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1178

    
1179
      else:
1180
        # Ensure all opcodes so far have been successful
1181
        assert (opctx.index == 0 or
1182
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1183
                           for i in job.ops[:opctx.index]))
1184

    
1185
        # Reset context
1186
        job.cur_opctx = None
1187

    
1188
        if op.status == constants.OP_STATUS_SUCCESS:
1189
          finalize = False
1190

    
1191
        elif op.status == constants.OP_STATUS_ERROR:
1192
          # Ensure failed opcode has an exception as its result
1193
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1194

    
1195
          to_encode = errors.OpExecError("Preceding opcode failed")
1196
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1197
                                _EncodeOpError(to_encode))
1198
          finalize = True
1199

    
1200
          # Consistency check
1201
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1202
                            errors.GetEncodedError(i.result)
1203
                            for i in job.ops[opctx.index:])
1204

    
1205
        elif op.status == constants.OP_STATUS_CANCELING:
1206
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1207
                                "Job canceled by request")
1208
          finalize = True
1209

    
1210
        else:
1211
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1212

    
1213
        if opctx.index == (opcount - 1):
1214
          # Finalize on last opcode
1215
          finalize = True
1216

    
1217
        if finalize:
1218
          # All opcodes have been run, finalize job
1219
          job.Finalize()
1220

    
1221
        # Write to disk. If the job status is final, this is the final write
1222
        # allowed. Once the file has been written, it can be archived anytime.
1223
        queue.UpdateJobUnlocked(job)
1224

    
1225
        assert not waitjob
1226

    
1227
        if finalize:
1228
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1229
          return self.FINISHED
1230

    
1231
      assert not waitjob or queue.depmgr.JobWaiting(job)
1232

    
1233
      if waitjob:
1234
        return self.WAITDEP
1235
      else:
1236
        return self.DEFER
1237
    finally:
1238
      assert job.writable, "Job became read-only while being processed"
1239
      queue.release()
1240

    
1241

    
1242
def _EvaluateJobProcessorResult(depmgr, job, result):
1243
  """Looks at a result from L{_JobProcessor} for a job.
1244

1245
  To be used in a L{_JobQueueWorker}.
1246

1247
  """
1248
  if result == _JobProcessor.FINISHED:
1249
    # Notify waiting jobs
1250
    depmgr.NotifyWaiters(job.id)
1251

    
1252
  elif result == _JobProcessor.DEFER:
1253
    # Schedule again
1254
    raise workerpool.DeferTask(priority=job.CalcPriority())
1255

    
1256
  elif result == _JobProcessor.WAITDEP:
1257
    # No-op, dependency manager will re-schedule
1258
    pass
1259

    
1260
  else:
1261
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1262
                                 (result, ))
1263

    
1264

    
1265
class _JobQueueWorker(workerpool.BaseWorker):
1266
  """The actual job workers.
1267

1268
  """
1269
  def RunTask(self, job): # pylint: disable=W0221
1270
    """Job executor.
1271

1272
    @type job: L{_QueuedJob}
1273
    @param job: the job to be processed
1274

1275
    """
1276
    assert job.writable, "Expected writable job"
1277

    
1278
    # Ensure only one worker is active on a single job. If a job registers for
1279
    # a dependency job, and the other job notifies before the first worker is
1280
    # done, the job can end up in the tasklist more than once.
1281
    job.processor_lock.acquire()
1282
    try:
1283
      return self._RunTaskInner(job)
1284
    finally:
1285
      job.processor_lock.release()
1286

    
1287
  def _RunTaskInner(self, job):
1288
    """Executes a job.
1289

1290
    Must be called with per-job lock acquired.
1291

1292
    """
1293
    queue = job.queue
1294
    assert queue == self.pool.queue
1295

    
1296
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1297
    setname_fn(None)
1298

    
1299
    proc = mcpu.Processor(queue.context, job.id)
1300

    
1301
    # Create wrapper for setting thread name
1302
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1303
                                    proc.ExecOpCode)
1304

    
1305
    _EvaluateJobProcessorResult(queue.depmgr, job,
1306
                                _JobProcessor(queue, wrap_execop_fn, job)())
1307

    
1308
  @staticmethod
1309
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1310
    """Updates the worker thread name to include a short summary of the opcode.
1311

1312
    @param setname_fn: Callable setting worker thread name
1313
    @param execop_fn: Callable for executing opcode (usually
1314
                      L{mcpu.Processor.ExecOpCode})
1315

1316
    """
1317
    setname_fn(op)
1318
    try:
1319
      return execop_fn(op, *args, **kwargs)
1320
    finally:
1321
      setname_fn(None)
1322

    
1323
  @staticmethod
1324
  def _GetWorkerName(job, op):
1325
    """Sets the worker thread name.
1326

1327
    @type job: L{_QueuedJob}
1328
    @type op: L{opcodes.OpCode}
1329

1330
    """
1331
    parts = ["Job%s" % job.id]
1332

    
1333
    if op:
1334
      parts.append(op.TinySummary())
1335

    
1336
    return "/".join(parts)
1337

    
1338

    
1339
class _JobQueueWorkerPool(workerpool.WorkerPool):
1340
  """Simple class implementing a job-processing workerpool.
1341

1342
  """
1343
  def __init__(self, queue):
1344
    super(_JobQueueWorkerPool, self).__init__("Jq",
1345
                                              JOBQUEUE_THREADS,
1346
                                              _JobQueueWorker)
1347
    self.queue = queue
1348

    
1349

    
1350
class _JobDependencyManager:
1351
  """Keeps track of job dependencies.
1352

1353
  """
1354
  (WAIT,
1355
   ERROR,
1356
   CANCEL,
1357
   CONTINUE,
1358
   WRONGSTATUS) = range(1, 6)
1359

    
1360
  def __init__(self, getstatus_fn, enqueue_fn):
1361
    """Initializes this class.
1362

1363
    """
1364
    self._getstatus_fn = getstatus_fn
1365
    self._enqueue_fn = enqueue_fn
1366

    
1367
    self._waiters = {}
1368
    self._lock = locking.SharedLock("JobDepMgr")
1369

    
1370
  @locking.ssynchronized(_LOCK, shared=1)
1371
  def GetLockInfo(self, requested): # pylint: disable=W0613
1372
    """Retrieves information about waiting jobs.
1373

1374
    @type requested: set
1375
    @param requested: Requested information, see C{query.LQ_*}
1376

1377
    """
1378
    # No need to sort here, that's being done by the lock manager and query
1379
    # library. There are no priorities for notifying jobs, hence all show up as
1380
    # one item under "pending".
1381
    return [("job/%s" % job_id, None, None,
1382
             [("job", [job.id for job in waiters])])
1383
            for job_id, waiters in self._waiters.items()
1384
            if waiters]
1385

    
1386
  @locking.ssynchronized(_LOCK, shared=1)
1387
  def JobWaiting(self, job):
1388
    """Checks if a job is waiting.
1389

1390
    """
1391
    return compat.any(job in jobs
1392
                      for jobs in self._waiters.values())
1393

    
1394
  @locking.ssynchronized(_LOCK)
1395
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1396
    """Checks if a dependency job has the requested status.
1397

1398
    If the other job is not yet in a finalized status, the calling job will be
1399
    notified (re-added to the workerpool) at a later point.
1400

1401
    @type job: L{_QueuedJob}
1402
    @param job: Job object
1403
    @type dep_job_id: int
1404
    @param dep_job_id: ID of dependency job
1405
    @type dep_status: list
1406
    @param dep_status: Required status
1407

1408
    """
1409
    assert ht.TJobId(job.id)
1410
    assert ht.TJobId(dep_job_id)
1411
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1412

    
1413
    if job.id == dep_job_id:
1414
      return (self.ERROR, "Job can't depend on itself")
1415

    
1416
    # Get status of dependency job
1417
    try:
1418
      status = self._getstatus_fn(dep_job_id)
1419
    except errors.JobLost, err:
1420
      return (self.ERROR, "Dependency error: %s" % err)
1421

    
1422
    assert status in constants.JOB_STATUS_ALL
1423

    
1424
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1425

    
1426
    if status not in constants.JOBS_FINALIZED:
1427
      # Register for notification and wait for job to finish
1428
      job_id_waiters.add(job)
1429
      return (self.WAIT,
1430
              "Need to wait for job %s, wanted status '%s'" %
1431
              (dep_job_id, dep_status))
1432

    
1433
    # Remove from waiters list
1434
    if job in job_id_waiters:
1435
      job_id_waiters.remove(job)
1436

    
1437
    if (status == constants.JOB_STATUS_CANCELED and
1438
        constants.JOB_STATUS_CANCELED not in dep_status):
1439
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1440

    
1441
    elif not dep_status or status in dep_status:
1442
      return (self.CONTINUE,
1443
              "Dependency job %s finished with status '%s'" %
1444
              (dep_job_id, status))
1445

    
1446
    else:
1447
      return (self.WRONGSTATUS,
1448
              "Dependency job %s finished with status '%s',"
1449
              " not one of '%s' as required" %
1450
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1451

    
1452
  def _RemoveEmptyWaitersUnlocked(self):
1453
    """Remove all jobs without actual waiters.
1454

1455
    """
1456
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1457
                   if not waiters]:
1458
      del self._waiters[job_id]
1459

    
1460
  def NotifyWaiters(self, job_id):
1461
    """Notifies all jobs waiting for a certain job ID.
1462

1463
    @attention: Do not call until L{CheckAndRegister} returned a status other
1464
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1465
    @type job_id: int
1466
    @param job_id: Job ID
1467

1468
    """
1469
    assert ht.TJobId(job_id)
1470

    
1471
    self._lock.acquire()
1472
    try:
1473
      self._RemoveEmptyWaitersUnlocked()
1474

    
1475
      jobs = self._waiters.pop(job_id, None)
1476
    finally:
1477
      self._lock.release()
1478

    
1479
    if jobs:
1480
      # Re-add jobs to workerpool
1481
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1482
                    len(jobs), job_id)
1483
      self._enqueue_fn(jobs)
1484

    
1485

    
1486
def _RequireOpenQueue(fn):
1487
  """Decorator for "public" functions.
1488

1489
  This function should be used for all 'public' functions. That is,
1490
  functions usually called from other classes. Note that this should
1491
  be applied only to methods (not plain functions), since it expects
1492
  that the decorated function is called with a first argument that has
1493
  a '_queue_filelock' argument.
1494

1495
  @warning: Use this decorator only after locking.ssynchronized
1496

1497
  Example::
1498
    @locking.ssynchronized(_LOCK)
1499
    @_RequireOpenQueue
1500
    def Example(self):
1501
      pass
1502

1503
  """
1504
  def wrapper(self, *args, **kwargs):
1505
    # pylint: disable=W0212
1506
    assert self._queue_filelock is not None, "Queue should be open"
1507
    return fn(self, *args, **kwargs)
1508
  return wrapper
1509

    
1510

    
1511
def _RequireNonDrainedQueue(fn):
1512
  """Decorator checking for a non-drained queue.
1513

1514
  To be used with functions submitting new jobs.
1515

1516
  """
1517
  def wrapper(self, *args, **kwargs):
1518
    """Wrapper function.
1519

1520
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1521

1522
    """
1523
    # Ok when sharing the big job queue lock, as the drain file is created when
1524
    # the lock is exclusive.
1525
    # Needs access to protected member, pylint: disable=W0212
1526
    if self._drained:
1527
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1528

    
1529
    if not self._accepting_jobs:
1530
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1531

    
1532
    return fn(self, *args, **kwargs)
1533
  return wrapper
1534

    
1535

    
1536
class JobQueue(object):
1537
  """Queue used to manage the jobs.
1538

1539
  """
1540
  def __init__(self, context):
1541
    """Constructor for JobQueue.
1542

1543
    The constructor will initialize the job queue object and then
1544
    start loading the current jobs from disk, either for starting them
1545
    (if they were queue) or for aborting them (if they were already
1546
    running).
1547

1548
    @type context: GanetiContext
1549
    @param context: the context object for access to the configuration
1550
        data and other ganeti objects
1551

1552
    """
1553
    self.context = context
1554
    self._memcache = weakref.WeakValueDictionary()
1555
    self._my_hostname = netutils.Hostname.GetSysName()
1556

    
1557
    # The Big JobQueue lock. If a code block or method acquires it in shared
1558
    # mode safe it must guarantee concurrency with all the code acquiring it in
1559
    # shared mode, including itself. In order not to acquire it at all
1560
    # concurrency must be guaranteed with all code acquiring it in shared mode
1561
    # and all code acquiring it exclusively.
1562
    self._lock = locking.SharedLock("JobQueue")
1563

    
1564
    self.acquire = self._lock.acquire
1565
    self.release = self._lock.release
1566

    
1567
    # Accept jobs by default
1568
    self._accepting_jobs = True
1569

    
1570
    # Initialize the queue, and acquire the filelock.
1571
    # This ensures no other process is working on the job queue.
1572
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1573

    
1574
    # Read serial file
1575
    self._last_serial = jstore.ReadSerial()
1576
    assert self._last_serial is not None, ("Serial file was modified between"
1577
                                           " check in jstore and here")
1578

    
1579
    # Get initial list of nodes
1580
    self._nodes = dict((n.name, n.primary_ip)
1581
                       for n in self.context.cfg.GetAllNodesInfo().values()
1582
                       if n.master_candidate)
1583

    
1584
    # Remove master node
1585
    self._nodes.pop(self._my_hostname, None)
1586

    
1587
    # TODO: Check consistency across nodes
1588

    
1589
    self._queue_size = None
1590
    self._UpdateQueueSizeUnlocked()
1591
    assert ht.TInt(self._queue_size)
1592
    self._drained = jstore.CheckDrainFlag()
1593

    
1594
    # Job dependencies
1595
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1596
                                        self._EnqueueJobs)
1597
    self.context.glm.AddToLockMonitor(self.depmgr)
1598

    
1599
    # Setup worker pool
1600
    self._wpool = _JobQueueWorkerPool(self)
1601
    try:
1602
      self._InspectQueue()
1603
    except:
1604
      self._wpool.TerminateWorkers()
1605
      raise
1606

    
1607
  @locking.ssynchronized(_LOCK)
1608
  @_RequireOpenQueue
1609
  def _InspectQueue(self):
1610
    """Loads the whole job queue and resumes unfinished jobs.
1611

1612
    This function needs the lock here because WorkerPool.AddTask() may start a
1613
    job while we're still doing our work.
1614

1615
    """
1616
    logging.info("Inspecting job queue")
1617

    
1618
    restartjobs = []
1619

    
1620
    all_job_ids = self._GetJobIDsUnlocked()
1621
    jobs_count = len(all_job_ids)
1622
    lastinfo = time.time()
1623
    for idx, job_id in enumerate(all_job_ids):
1624
      # Give an update every 1000 jobs or 10 seconds
1625
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1626
          idx == (jobs_count - 1)):
1627
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1628
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1629
        lastinfo = time.time()
1630

    
1631
      job = self._LoadJobUnlocked(job_id)
1632

    
1633
      # a failure in loading the job can cause 'None' to be returned
1634
      if job is None:
1635
        continue
1636

    
1637
      status = job.CalcStatus()
1638

    
1639
      if status == constants.JOB_STATUS_QUEUED:
1640
        restartjobs.append(job)
1641

    
1642
      elif status in (constants.JOB_STATUS_RUNNING,
1643
                      constants.JOB_STATUS_WAITING,
1644
                      constants.JOB_STATUS_CANCELING):
1645
        logging.warning("Unfinished job %s found: %s", job.id, job)
1646

    
1647
        if status == constants.JOB_STATUS_WAITING:
1648
          # Restart job
1649
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1650
          restartjobs.append(job)
1651
        else:
1652
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1653
                                "Unclean master daemon shutdown")
1654
          job.Finalize()
1655

    
1656
        self.UpdateJobUnlocked(job)
1657

    
1658
    if restartjobs:
1659
      logging.info("Restarting %s jobs", len(restartjobs))
1660
      self._EnqueueJobsUnlocked(restartjobs)
1661

    
1662
    logging.info("Job queue inspection finished")
1663

    
1664
  def _GetRpc(self, address_list):
1665
    """Gets RPC runner with context.
1666

1667
    """
1668
    return rpc.JobQueueRunner(self.context, address_list)
1669

    
1670
  @locking.ssynchronized(_LOCK)
1671
  @_RequireOpenQueue
1672
  def AddNode(self, node):
1673
    """Register a new node with the queue.
1674

1675
    @type node: L{objects.Node}
1676
    @param node: the node object to be added
1677

1678
    """
1679
    node_name = node.name
1680
    assert node_name != self._my_hostname
1681

    
1682
    # Clean queue directory on added node
1683
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1684
    msg = result.fail_msg
1685
    if msg:
1686
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1687
                      node_name, msg)
1688

    
1689
    if not node.master_candidate:
1690
      # remove if existing, ignoring errors
1691
      self._nodes.pop(node_name, None)
1692
      # and skip the replication of the job ids
1693
      return
1694

    
1695
    # Upload the whole queue excluding archived jobs
1696
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1697

    
1698
    # Upload current serial file
1699
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1700

    
1701
    # Static address list
1702
    addrs = [node.primary_ip]
1703

    
1704
    for file_name in files:
1705
      # Read file content
1706
      content = utils.ReadFile(file_name)
1707

    
1708
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1709
                             file_name, content)
1710
      msg = result[node_name].fail_msg
1711
      if msg:
1712
        logging.error("Failed to upload file %s to node %s: %s",
1713
                      file_name, node_name, msg)
1714

    
1715
    self._nodes[node_name] = node.primary_ip
1716

    
1717
  @locking.ssynchronized(_LOCK)
1718
  @_RequireOpenQueue
1719
  def RemoveNode(self, node_name):
1720
    """Callback called when removing nodes from the cluster.
1721

1722
    @type node_name: str
1723
    @param node_name: the name of the node to remove
1724

1725
    """
1726
    self._nodes.pop(node_name, None)
1727

    
1728
  @staticmethod
1729
  def _CheckRpcResult(result, nodes, failmsg):
1730
    """Verifies the status of an RPC call.
1731

1732
    Since we aim to keep consistency should this node (the current
1733
    master) fail, we will log errors if our rpc fail, and especially
1734
    log the case when more than half of the nodes fails.
1735

1736
    @param result: the data as returned from the rpc call
1737
    @type nodes: list
1738
    @param nodes: the list of nodes we made the call to
1739
    @type failmsg: str
1740
    @param failmsg: the identifier to be used for logging
1741

1742
    """
1743
    failed = []
1744
    success = []
1745

    
1746
    for node in nodes:
1747
      msg = result[node].fail_msg
1748
      if msg:
1749
        failed.append(node)
1750
        logging.error("RPC call %s (%s) failed on node %s: %s",
1751
                      result[node].call, failmsg, node, msg)
1752
      else:
1753
        success.append(node)
1754

    
1755
    # +1 for the master node
1756
    if (len(success) + 1) < len(failed):
1757
      # TODO: Handle failing nodes
1758
      logging.error("More than half of the nodes failed")
1759

    
1760
  def _GetNodeIp(self):
1761
    """Helper for returning the node name/ip list.
1762

1763
    @rtype: (list, list)
1764
    @return: a tuple of two lists, the first one with the node
1765
        names and the second one with the node addresses
1766

1767
    """
1768
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1769
    name_list = self._nodes.keys()
1770
    addr_list = [self._nodes[name] for name in name_list]
1771
    return name_list, addr_list
1772

    
1773
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1774
    """Writes a file locally and then replicates it to all nodes.
1775

1776
    This function will replace the contents of a file on the local
1777
    node and then replicate it to all the other nodes we have.
1778

1779
    @type file_name: str
1780
    @param file_name: the path of the file to be replicated
1781
    @type data: str
1782
    @param data: the new contents of the file
1783
    @type replicate: boolean
1784
    @param replicate: whether to spread the changes to the remote nodes
1785

1786
    """
1787
    getents = runtime.GetEnts()
1788
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1789
                    gid=getents.masterd_gid)
1790

    
1791
    if replicate:
1792
      names, addrs = self._GetNodeIp()
1793
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1794
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1795

    
1796
  def _RenameFilesUnlocked(self, rename):
1797
    """Renames a file locally and then replicate the change.
1798

1799
    This function will rename a file in the local queue directory
1800
    and then replicate this rename to all the other nodes we have.
1801

1802
    @type rename: list of (old, new)
1803
    @param rename: List containing tuples mapping old to new names
1804

1805
    """
1806
    # Rename them locally
1807
    for old, new in rename:
1808
      utils.RenameFile(old, new, mkdir=True)
1809

    
1810
    # ... and on all nodes
1811
    names, addrs = self._GetNodeIp()
1812
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1813
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1814

    
1815
  def _NewSerialsUnlocked(self, count):
1816
    """Generates a new job identifier.
1817

1818
    Job identifiers are unique during the lifetime of a cluster.
1819

1820
    @type count: integer
1821
    @param count: how many serials to return
1822
    @rtype: list of int
1823
    @return: a list of job identifiers.
1824

1825
    """
1826
    assert ht.TPositiveInt(count)
1827

    
1828
    # New number
1829
    serial = self._last_serial + count
1830

    
1831
    # Write to file
1832
    self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1833
                             "%s\n" % serial, True)
1834

    
1835
    result = [jstore.FormatJobID(v)
1836
              for v in range(self._last_serial + 1, serial + 1)]
1837

    
1838
    # Keep it only if we were able to write the file
1839
    self._last_serial = serial
1840

    
1841
    assert len(result) == count
1842

    
1843
    return result
1844

    
1845
  @staticmethod
1846
  def _GetJobPath(job_id):
1847
    """Returns the job file for a given job id.
1848

1849
    @type job_id: str
1850
    @param job_id: the job identifier
1851
    @rtype: str
1852
    @return: the path to the job file
1853

1854
    """
1855
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1856

    
1857
  @staticmethod
1858
  def _GetArchivedJobPath(job_id):
1859
    """Returns the archived job file for a give job id.
1860

1861
    @type job_id: str
1862
    @param job_id: the job identifier
1863
    @rtype: str
1864
    @return: the path to the archived job file
1865

1866
    """
1867
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1868
                          jstore.GetArchiveDirectory(job_id),
1869
                          "job-%s" % job_id)
1870

    
1871
  @staticmethod
1872
  def _DetermineJobDirectories(archived):
1873
    """Build list of directories containing job files.
1874

1875
    @type archived: bool
1876
    @param archived: Whether to include directories for archived jobs
1877
    @rtype: list
1878

1879
    """
1880
    result = [pathutils.QUEUE_DIR]
1881

    
1882
    if archived:
1883
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1884
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
1885
                        utils.ListVisibleFiles(archive_path)))
1886

    
1887
    return result
1888

    
1889
  @classmethod
1890
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1891
    """Return all known job IDs.
1892

1893
    The method only looks at disk because it's a requirement that all
1894
    jobs are present on disk (so in the _memcache we don't have any
1895
    extra IDs).
1896

1897
    @type sort: boolean
1898
    @param sort: perform sorting on the returned job ids
1899
    @rtype: list
1900
    @return: the list of job IDs
1901

1902
    """
1903
    jlist = []
1904

    
1905
    for path in cls._DetermineJobDirectories(archived):
1906
      for filename in utils.ListVisibleFiles(path):
1907
        m = constants.JOB_FILE_RE.match(filename)
1908
        if m:
1909
          jlist.append(int(m.group(1)))
1910

    
1911
    if sort:
1912
      jlist.sort()
1913
    return jlist
1914

    
1915
  def _LoadJobUnlocked(self, job_id):
1916
    """Loads a job from the disk or memory.
1917

1918
    Given a job id, this will return the cached job object if
1919
    existing, or try to load the job from the disk. If loading from
1920
    disk, it will also add the job to the cache.
1921

1922
    @type job_id: int
1923
    @param job_id: the job id
1924
    @rtype: L{_QueuedJob} or None
1925
    @return: either None or the job object
1926

1927
    """
1928
    job = self._memcache.get(job_id, None)
1929
    if job:
1930
      logging.debug("Found job %s in memcache", job_id)
1931
      assert job.writable, "Found read-only job in memcache"
1932
      return job
1933

    
1934
    try:
1935
      job = self._LoadJobFromDisk(job_id, False)
1936
      if job is None:
1937
        return job
1938
    except errors.JobFileCorrupted:
1939
      old_path = self._GetJobPath(job_id)
1940
      new_path = self._GetArchivedJobPath(job_id)
1941
      if old_path == new_path:
1942
        # job already archived (future case)
1943
        logging.exception("Can't parse job %s", job_id)
1944
      else:
1945
        # non-archived case
1946
        logging.exception("Can't parse job %s, will archive.", job_id)
1947
        self._RenameFilesUnlocked([(old_path, new_path)])
1948
      return None
1949

    
1950
    assert job.writable, "Job just loaded is not writable"
1951

    
1952
    self._memcache[job_id] = job
1953
    logging.debug("Added job %s to the cache", job_id)
1954
    return job
1955

    
1956
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1957
    """Load the given job file from disk.
1958

1959
    Given a job file, read, load and restore it in a _QueuedJob format.
1960

1961
    @type job_id: int
1962
    @param job_id: job identifier
1963
    @type try_archived: bool
1964
    @param try_archived: Whether to try loading an archived job
1965
    @rtype: L{_QueuedJob} or None
1966
    @return: either None or the job object
1967

1968
    """
1969
    path_functions = [(self._GetJobPath, False)]
1970

    
1971
    if try_archived:
1972
      path_functions.append((self._GetArchivedJobPath, True))
1973

    
1974
    raw_data = None
1975
    archived = None
1976

    
1977
    for (fn, archived) in path_functions:
1978
      filepath = fn(job_id)
1979
      logging.debug("Loading job from %s", filepath)
1980
      try:
1981
        raw_data = utils.ReadFile(filepath)
1982
      except EnvironmentError, err:
1983
        if err.errno != errno.ENOENT:
1984
          raise
1985
      else:
1986
        break
1987

    
1988
    if not raw_data:
1989
      return None
1990

    
1991
    if writable is None:
1992
      writable = not archived
1993

    
1994
    try:
1995
      data = serializer.LoadJson(raw_data)
1996
      job = _QueuedJob.Restore(self, data, writable, archived)
1997
    except Exception, err: # pylint: disable=W0703
1998
      raise errors.JobFileCorrupted(err)
1999

    
2000
    return job
2001

    
2002
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2003
    """Load the given job file from disk.
2004

2005
    Given a job file, read, load and restore it in a _QueuedJob format.
2006
    In case of error reading the job, it gets returned as None, and the
2007
    exception is logged.
2008

2009
    @type job_id: int
2010
    @param job_id: job identifier
2011
    @type try_archived: bool
2012
    @param try_archived: Whether to try loading an archived job
2013
    @rtype: L{_QueuedJob} or None
2014
    @return: either None or the job object
2015

2016
    """
2017
    try:
2018
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2019
    except (errors.JobFileCorrupted, EnvironmentError):
2020
      logging.exception("Can't load/parse job %s", job_id)
2021
      return None
2022

    
2023
  def _UpdateQueueSizeUnlocked(self):
2024
    """Update the queue size.
2025

2026
    """
2027
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2028

    
2029
  @locking.ssynchronized(_LOCK)
2030
  @_RequireOpenQueue
2031
  def SetDrainFlag(self, drain_flag):
2032
    """Sets the drain flag for the queue.
2033

2034
    @type drain_flag: boolean
2035
    @param drain_flag: Whether to set or unset the drain flag
2036

2037
    """
2038
    jstore.SetDrainFlag(drain_flag)
2039

    
2040
    self._drained = drain_flag
2041

    
2042
    return True
2043

    
2044
  @_RequireOpenQueue
2045
  def _SubmitJobUnlocked(self, job_id, ops):
2046
    """Create and store a new job.
2047

2048
    This enters the job into our job queue and also puts it on the new
2049
    queue, in order for it to be picked up by the queue processors.
2050

2051
    @type job_id: job ID
2052
    @param job_id: the job ID for the new job
2053
    @type ops: list
2054
    @param ops: The list of OpCodes that will become the new job.
2055
    @rtype: L{_QueuedJob}
2056
    @return: the job object to be queued
2057
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2058
    @raise errors.GenericError: If an opcode is not valid
2059

2060
    """
2061
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2062
      raise errors.JobQueueFull()
2063

    
2064
    job = _QueuedJob(self, job_id, ops, True)
2065

    
2066
    for idx, op in enumerate(job.ops):
2067
      # Check priority
2068
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2069
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2070
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2071
                                  " are %s" % (idx, op.priority, allowed))
2072

    
2073
      # Check job dependencies
2074
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2075
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2076
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2077
                                  " match %s: %s" %
2078
                                  (idx, opcodes.TNoRelativeJobDependencies,
2079
                                   dependencies))
2080

    
2081
    # Write to disk
2082
    self.UpdateJobUnlocked(job)
2083

    
2084
    self._queue_size += 1
2085

    
2086
    logging.debug("Adding new job %s to the cache", job_id)
2087
    self._memcache[job_id] = job
2088

    
2089
    return job
2090

    
2091
  @locking.ssynchronized(_LOCK)
2092
  @_RequireOpenQueue
2093
  @_RequireNonDrainedQueue
2094
  def SubmitJob(self, ops):
2095
    """Create and store a new job.
2096

2097
    @see: L{_SubmitJobUnlocked}
2098

2099
    """
2100
    (job_id, ) = self._NewSerialsUnlocked(1)
2101
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2102
    return job_id
2103

    
2104
  @locking.ssynchronized(_LOCK)
2105
  @_RequireOpenQueue
2106
  @_RequireNonDrainedQueue
2107
  def SubmitManyJobs(self, jobs):
2108
    """Create and store multiple jobs.
2109

2110
    @see: L{_SubmitJobUnlocked}
2111

2112
    """
2113
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2114

    
2115
    (results, added_jobs) = \
2116
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2117

    
2118
    self._EnqueueJobsUnlocked(added_jobs)
2119

    
2120
    return results
2121

    
2122
  @staticmethod
2123
  def _FormatSubmitError(msg, ops):
2124
    """Formats errors which occurred while submitting a job.
2125

2126
    """
2127
    return ("%s; opcodes %s" %
2128
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2129

    
2130
  @staticmethod
2131
  def _ResolveJobDependencies(resolve_fn, deps):
2132
    """Resolves relative job IDs in dependencies.
2133

2134
    @type resolve_fn: callable
2135
    @param resolve_fn: Function to resolve a relative job ID
2136
    @type deps: list
2137
    @param deps: Dependencies
2138
    @rtype: tuple; (boolean, string or list)
2139
    @return: If successful (first tuple item), the returned list contains
2140
      resolved job IDs along with the requested status; if not successful,
2141
      the second element is an error message
2142

2143
    """
2144
    result = []
2145

    
2146
    for (dep_job_id, dep_status) in deps:
2147
      if ht.TRelativeJobId(dep_job_id):
2148
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2149
        try:
2150
          job_id = resolve_fn(dep_job_id)
2151
        except IndexError:
2152
          # Abort
2153
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2154
      else:
2155
        job_id = dep_job_id
2156

    
2157
      result.append((job_id, dep_status))
2158

    
2159
    return (True, result)
2160

    
2161
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2162
    """Create and store multiple jobs.
2163

2164
    @see: L{_SubmitJobUnlocked}
2165

2166
    """
2167
    results = []
2168
    added_jobs = []
2169

    
2170
    def resolve_fn(job_idx, reljobid):
2171
      assert reljobid < 0
2172
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2173

    
2174
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2175
      for op in ops:
2176
        if getattr(op, opcodes.DEPEND_ATTR, None):
2177
          (status, data) = \
2178
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2179
                                         op.depends)
2180
          if not status:
2181
            # Abort resolving dependencies
2182
            assert ht.TNonEmptyString(data), "No error message"
2183
            break
2184
          # Use resolved dependencies
2185
          op.depends = data
2186
      else:
2187
        try:
2188
          job = self._SubmitJobUnlocked(job_id, ops)
2189
        except errors.GenericError, err:
2190
          status = False
2191
          data = self._FormatSubmitError(str(err), ops)
2192
        else:
2193
          status = True
2194
          data = job_id
2195
          added_jobs.append(job)
2196

    
2197
      results.append((status, data))
2198

    
2199
    return (results, added_jobs)
2200

    
2201
  @locking.ssynchronized(_LOCK)
2202
  def _EnqueueJobs(self, jobs):
2203
    """Helper function to add jobs to worker pool's queue.
2204

2205
    @type jobs: list
2206
    @param jobs: List of all jobs
2207

2208
    """
2209
    return self._EnqueueJobsUnlocked(jobs)
2210

    
2211
  def _EnqueueJobsUnlocked(self, jobs):
2212
    """Helper function to add jobs to worker pool's queue.
2213

2214
    @type jobs: list
2215
    @param jobs: List of all jobs
2216

2217
    """
2218
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2219
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2220
                             priority=[job.CalcPriority() for job in jobs])
2221

    
2222
  def _GetJobStatusForDependencies(self, job_id):
2223
    """Gets the status of a job for dependencies.
2224

2225
    @type job_id: int
2226
    @param job_id: Job ID
2227
    @raise errors.JobLost: If job can't be found
2228

2229
    """
2230
    # Not using in-memory cache as doing so would require an exclusive lock
2231

    
2232
    # Try to load from disk
2233
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2234

    
2235
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2236

    
2237
    if job:
2238
      return job.CalcStatus()
2239

    
2240
    raise errors.JobLost("Job %s not found" % job_id)
2241

    
2242
  @_RequireOpenQueue
2243
  def UpdateJobUnlocked(self, job, replicate=True):
2244
    """Update a job's on disk storage.
2245

2246
    After a job has been modified, this function needs to be called in
2247
    order to write the changes to disk and replicate them to the other
2248
    nodes.
2249

2250
    @type job: L{_QueuedJob}
2251
    @param job: the changed job
2252
    @type replicate: boolean
2253
    @param replicate: whether to replicate the change to remote nodes
2254

2255
    """
2256
    if __debug__:
2257
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2258
      assert (finalized ^ (job.end_timestamp is None))
2259
      assert job.writable, "Can't update read-only job"
2260
      assert not job.archived, "Can't update archived job"
2261

    
2262
    filename = self._GetJobPath(job.id)
2263
    data = serializer.DumpJson(job.Serialize())
2264
    logging.debug("Writing job %s to %s", job.id, filename)
2265
    self._UpdateJobQueueFile(filename, data, replicate)
2266

    
2267
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2268
                        timeout):
2269
    """Waits for changes in a job.
2270

2271
    @type job_id: int
2272
    @param job_id: Job identifier
2273
    @type fields: list of strings
2274
    @param fields: Which fields to check for changes
2275
    @type prev_job_info: list or None
2276
    @param prev_job_info: Last job information returned
2277
    @type prev_log_serial: int
2278
    @param prev_log_serial: Last job message serial number
2279
    @type timeout: float
2280
    @param timeout: maximum time to wait in seconds
2281
    @rtype: tuple (job info, log entries)
2282
    @return: a tuple of the job information as required via
2283
        the fields parameter, and the log entries as a list
2284

2285
        if the job has not changed and the timeout has expired,
2286
        we instead return a special value,
2287
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2288
        as such by the clients
2289

2290
    """
2291
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2292
                             writable=False)
2293

    
2294
    helper = _WaitForJobChangesHelper()
2295

    
2296
    return helper(self._GetJobPath(job_id), load_fn,
2297
                  fields, prev_job_info, prev_log_serial, timeout)
2298

    
2299
  @locking.ssynchronized(_LOCK)
2300
  @_RequireOpenQueue
2301
  def CancelJob(self, job_id):
2302
    """Cancels a job.
2303

2304
    This will only succeed if the job has not started yet.
2305

2306
    @type job_id: int
2307
    @param job_id: job ID of job to be cancelled.
2308

2309
    """
2310
    logging.info("Cancelling job %s", job_id)
2311

    
2312
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2313

    
2314
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2315
    """Modifies a job.
2316

2317
    @type job_id: int
2318
    @param job_id: Job ID
2319
    @type mod_fn: callable
2320
    @param mod_fn: Modifying function, receiving job object as parameter,
2321
      returning tuple of (status boolean, message string)
2322

2323
    """
2324
    job = self._LoadJobUnlocked(job_id)
2325
    if not job:
2326
      logging.debug("Job %s not found", job_id)
2327
      return (False, "Job %s not found" % job_id)
2328

    
2329
    assert job.writable, "Can't modify read-only job"
2330
    assert not job.archived, "Can't modify archived job"
2331

    
2332
    (success, msg) = mod_fn(job)
2333

    
2334
    if success:
2335
      # If the job was finalized (e.g. cancelled), this is the final write
2336
      # allowed. The job can be archived anytime.
2337
      self.UpdateJobUnlocked(job)
2338

    
2339
    return (success, msg)
2340

    
2341
  @_RequireOpenQueue
2342
  def _ArchiveJobsUnlocked(self, jobs):
2343
    """Archives jobs.
2344

2345
    @type jobs: list of L{_QueuedJob}
2346
    @param jobs: Job objects
2347
    @rtype: int
2348
    @return: Number of archived jobs
2349

2350
    """
2351
    archive_jobs = []
2352
    rename_files = []
2353
    for job in jobs:
2354
      assert job.writable, "Can't archive read-only job"
2355
      assert not job.archived, "Can't cancel archived job"
2356

    
2357
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2358
        logging.debug("Job %s is not yet done", job.id)
2359
        continue
2360

    
2361
      archive_jobs.append(job)
2362

    
2363
      old = self._GetJobPath(job.id)
2364
      new = self._GetArchivedJobPath(job.id)
2365
      rename_files.append((old, new))
2366

    
2367
    # TODO: What if 1..n files fail to rename?
2368
    self._RenameFilesUnlocked(rename_files)
2369

    
2370
    logging.debug("Successfully archived job(s) %s",
2371
                  utils.CommaJoin(job.id for job in archive_jobs))
2372

    
2373
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2374
    # the files, we update the cached queue size from the filesystem. When we
2375
    # get around to fix the TODO: above, we can use the number of actually
2376
    # archived jobs to fix this.
2377
    self._UpdateQueueSizeUnlocked()
2378
    return len(archive_jobs)
2379

    
2380
  @locking.ssynchronized(_LOCK)
2381
  @_RequireOpenQueue
2382
  def ArchiveJob(self, job_id):
2383
    """Archives a job.
2384

2385
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2386

2387
    @type job_id: int
2388
    @param job_id: Job ID of job to be archived.
2389
    @rtype: bool
2390
    @return: Whether job was archived
2391

2392
    """
2393
    logging.info("Archiving job %s", job_id)
2394

    
2395
    job = self._LoadJobUnlocked(job_id)
2396
    if not job:
2397
      logging.debug("Job %s not found", job_id)
2398
      return False
2399

    
2400
    return self._ArchiveJobsUnlocked([job]) == 1
2401

    
2402
  @locking.ssynchronized(_LOCK)
2403
  @_RequireOpenQueue
2404
  def AutoArchiveJobs(self, age, timeout):
2405
    """Archives all jobs based on age.
2406

2407
    The method will archive all jobs which are older than the age
2408
    parameter. For jobs that don't have an end timestamp, the start
2409
    timestamp will be considered. The special '-1' age will cause
2410
    archival of all jobs (that are not running or queued).
2411

2412
    @type age: int
2413
    @param age: the minimum age in seconds
2414

2415
    """
2416
    logging.info("Archiving jobs with age more than %s seconds", age)
2417

    
2418
    now = time.time()
2419
    end_time = now + timeout
2420
    archived_count = 0
2421
    last_touched = 0
2422

    
2423
    all_job_ids = self._GetJobIDsUnlocked()
2424
    pending = []
2425
    for idx, job_id in enumerate(all_job_ids):
2426
      last_touched = idx + 1
2427

    
2428
      # Not optimal because jobs could be pending
2429
      # TODO: Measure average duration for job archival and take number of
2430
      # pending jobs into account.
2431
      if time.time() > end_time:
2432
        break
2433

    
2434
      # Returns None if the job failed to load
2435
      job = self._LoadJobUnlocked(job_id)
2436
      if job:
2437
        if job.end_timestamp is None:
2438
          if job.start_timestamp is None:
2439
            job_age = job.received_timestamp
2440
          else:
2441
            job_age = job.start_timestamp
2442
        else:
2443
          job_age = job.end_timestamp
2444

    
2445
        if age == -1 or now - job_age[0] > age:
2446
          pending.append(job)
2447

    
2448
          # Archive 10 jobs at a time
2449
          if len(pending) >= 10:
2450
            archived_count += self._ArchiveJobsUnlocked(pending)
2451
            pending = []
2452

    
2453
    if pending:
2454
      archived_count += self._ArchiveJobsUnlocked(pending)
2455

    
2456
    return (archived_count, len(all_job_ids) - last_touched)
2457

    
2458
  def _Query(self, fields, qfilter):
2459
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2460
                       namefield="id")
2461

    
2462
    # Archived jobs are only looked at if the "archived" field is referenced
2463
    # either as a requested field or in the filter. By default archived jobs
2464
    # are ignored.
2465
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2466

    
2467
    job_ids = qobj.RequestedNames()
2468

    
2469
    list_all = (job_ids is None)
2470

    
2471
    if list_all:
2472
      # Since files are added to/removed from the queue atomically, there's no
2473
      # risk of getting the job ids in an inconsistent state.
2474
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2475

    
2476
    jobs = []
2477

    
2478
    for job_id in job_ids:
2479
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2480
      if job is not None or not list_all:
2481
        jobs.append((job_id, job))
2482

    
2483
    return (qobj, jobs, list_all)
2484

    
2485
  def QueryJobs(self, fields, qfilter):
2486
    """Returns a list of jobs in queue.
2487

2488
    @type fields: sequence
2489
    @param fields: List of wanted fields
2490
    @type qfilter: None or query2 filter (list)
2491
    @param qfilter: Query filter
2492

2493
    """
2494
    (qobj, ctx, _) = self._Query(fields, qfilter)
2495

    
2496
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2497

    
2498
  def OldStyleQueryJobs(self, job_ids, fields):
2499
    """Returns a list of jobs in queue.
2500

2501
    @type job_ids: list
2502
    @param job_ids: sequence of job identifiers or None for all
2503
    @type fields: list
2504
    @param fields: names of fields to return
2505
    @rtype: list
2506
    @return: list one element per job, each element being list with
2507
        the requested fields
2508

2509
    """
2510
    # backwards compat:
2511
    job_ids = [int(jid) for jid in job_ids]
2512
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2513

    
2514
    (qobj, ctx, _) = self._Query(fields, qfilter)
2515

    
2516
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2517

    
2518
  @locking.ssynchronized(_LOCK)
2519
  def PrepareShutdown(self):
2520
    """Prepare to stop the job queue.
2521

2522
    Disables execution of jobs in the workerpool and returns whether there are
2523
    any jobs currently running. If the latter is the case, the job queue is not
2524
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2525
    be called without interfering with any job. Queued and unfinished jobs will
2526
    be resumed next time.
2527

2528
    Once this function has been called no new job submissions will be accepted
2529
    (see L{_RequireNonDrainedQueue}).
2530

2531
    @rtype: bool
2532
    @return: Whether there are any running jobs
2533

2534
    """
2535
    if self._accepting_jobs:
2536
      self._accepting_jobs = False
2537

    
2538
      # Tell worker pool to stop processing pending tasks
2539
      self._wpool.SetActive(False)
2540

    
2541
    return self._wpool.HasRunningTasks()
2542

    
2543
  @locking.ssynchronized(_LOCK)
2544
  @_RequireOpenQueue
2545
  def Shutdown(self):
2546
    """Stops the job queue.
2547

2548
    This shutdowns all the worker threads an closes the queue.
2549

2550
    """
2551
    self._wpool.TerminateWorkers()
2552

    
2553
    self._queue_filelock.Close()
2554
    self._queue_filelock = None