Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 95a74ef3

History | View | Annotate | Download (74.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import time
35
import weakref
36
import threading
37
import itertools
38

    
39
try:
40
  # pylint: disable=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
59
from ganeti import ht
60
from ganeti import query
61
from ganeti import qlang
62

    
63

    
64
JOBQUEUE_THREADS = 25
65
JOBS_PER_ARCHIVE_DIRECTORY = 10000
66

    
67
# member lock names to be passed to @ssynchronized decorator
68
_LOCK = "_lock"
69
_QUEUE = "_queue"
70

    
71

    
72
class CancelJob(Exception):
73
  """Special exception to cancel a job.
74

75
  """
76

    
77

    
78
class QueueShutdown(Exception):
79
  """Special exception to abort a job when the job queue is shutting down.
80

81
  """
82

    
83

    
84
def TimeStampNow():
85
  """Returns the current timestamp.
86

87
  @rtype: tuple
88
  @return: the current time in the (seconds, microseconds) format
89

90
  """
91
  return utils.SplitTime(time.time())
92

    
93

    
94
class _SimpleJobQuery:
95
  """Wrapper for job queries.
96

97
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
98

99
  """
100
  def __init__(self, fields):
101
    """Initializes this class.
102

103
    """
104
    self._query = query.Query(query.JOB_FIELDS, fields)
105

    
106
  def __call__(self, job):
107
    """Executes a job query using cached field list.
108

109
    """
110
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
111

    
112

    
113
class _QueuedOpCode(object):
114
  """Encapsulates an opcode object.
115

116
  @ivar log: holds the execution log and consists of tuples
117
  of the form C{(log_serial, timestamp, level, message)}
118
  @ivar input: the OpCode we encapsulate
119
  @ivar status: the current status
120
  @ivar result: the result of the LU execution
121
  @ivar start_timestamp: timestamp for the start of the execution
122
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
123
  @ivar stop_timestamp: timestamp for the end of the execution
124

125
  """
126
  __slots__ = ["input", "status", "result", "log", "priority",
127
               "start_timestamp", "exec_timestamp", "end_timestamp",
128
               "__weakref__"]
129

    
130
  def __init__(self, op):
131
    """Initializes instances of this class.
132

133
    @type op: L{opcodes.OpCode}
134
    @param op: the opcode we encapsulate
135

136
    """
137
    self.input = op
138
    self.status = constants.OP_STATUS_QUEUED
139
    self.result = None
140
    self.log = []
141
    self.start_timestamp = None
142
    self.exec_timestamp = None
143
    self.end_timestamp = None
144

    
145
    # Get initial priority (it might change during the lifetime of this opcode)
146
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
147

    
148
  @classmethod
149
  def Restore(cls, state):
150
    """Restore the _QueuedOpCode from the serialized form.
151

152
    @type state: dict
153
    @param state: the serialized state
154
    @rtype: _QueuedOpCode
155
    @return: a new _QueuedOpCode instance
156

157
    """
158
    obj = _QueuedOpCode.__new__(cls)
159
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
160
    obj.status = state["status"]
161
    obj.result = state["result"]
162
    obj.log = state["log"]
163
    obj.start_timestamp = state.get("start_timestamp", None)
164
    obj.exec_timestamp = state.get("exec_timestamp", None)
165
    obj.end_timestamp = state.get("end_timestamp", None)
166
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
167
    return obj
168

    
169
  def Serialize(self):
170
    """Serializes this _QueuedOpCode.
171

172
    @rtype: dict
173
    @return: the dictionary holding the serialized state
174

175
    """
176
    return {
177
      "input": self.input.__getstate__(),
178
      "status": self.status,
179
      "result": self.result,
180
      "log": self.log,
181
      "start_timestamp": self.start_timestamp,
182
      "exec_timestamp": self.exec_timestamp,
183
      "end_timestamp": self.end_timestamp,
184
      "priority": self.priority,
185
      }
186

    
187

    
188
class _QueuedJob(object):
189
  """In-memory job representation.
190

191
  This is what we use to track the user-submitted jobs. Locking must
192
  be taken care of by users of this class.
193

194
  @type queue: L{JobQueue}
195
  @ivar queue: the parent queue
196
  @ivar id: the job ID
197
  @type ops: list
198
  @ivar ops: the list of _QueuedOpCode that constitute the job
199
  @type log_serial: int
200
  @ivar log_serial: holds the index for the next log entry
201
  @ivar received_timestamp: the timestamp for when the job was received
202
  @ivar start_timestmap: the timestamp for start of execution
203
  @ivar end_timestamp: the timestamp for end of execution
204
  @ivar writable: Whether the job is allowed to be modified
205

206
  """
207
  # pylint: disable=W0212
208
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
209
               "received_timestamp", "start_timestamp", "end_timestamp",
210
               "__weakref__", "processor_lock", "writable"]
211

    
212
  def __init__(self, queue, job_id, ops, writable):
213
    """Constructor for the _QueuedJob.
214

215
    @type queue: L{JobQueue}
216
    @param queue: our parent queue
217
    @type job_id: job_id
218
    @param job_id: our job id
219
    @type ops: list
220
    @param ops: the list of opcodes we hold, which will be encapsulated
221
        in _QueuedOpCodes
222
    @type writable: bool
223
    @param writable: Whether job can be modified
224

225
    """
226
    if not ops:
227
      raise errors.GenericError("A job needs at least one opcode")
228

    
229
    self.queue = queue
230
    self.id = job_id
231
    self.ops = [_QueuedOpCode(op) for op in ops]
232
    self.log_serial = 0
233
    self.received_timestamp = TimeStampNow()
234
    self.start_timestamp = None
235
    self.end_timestamp = None
236

    
237
    self._InitInMemory(self, writable)
238

    
239
  @staticmethod
240
  def _InitInMemory(obj, writable):
241
    """Initializes in-memory variables.
242

243
    """
244
    obj.writable = writable
245
    obj.ops_iter = None
246
    obj.cur_opctx = None
247

    
248
    # Read-only jobs are not processed and therefore don't need a lock
249
    if writable:
250
      obj.processor_lock = threading.Lock()
251
    else:
252
      obj.processor_lock = None
253

    
254
  def __repr__(self):
255
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
256
              "id=%s" % self.id,
257
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
258

    
259
    return "<%s at %#x>" % (" ".join(status), id(self))
260

    
261
  @classmethod
262
  def Restore(cls, queue, state, writable):
263
    """Restore a _QueuedJob from serialized state:
264

265
    @type queue: L{JobQueue}
266
    @param queue: to which queue the restored job belongs
267
    @type state: dict
268
    @param state: the serialized state
269
    @type writable: bool
270
    @param writable: Whether job can be modified
271
    @rtype: _JobQueue
272
    @return: the restored _JobQueue instance
273

274
    """
275
    obj = _QueuedJob.__new__(cls)
276
    obj.queue = queue
277
    obj.id = state["id"]
278
    obj.received_timestamp = state.get("received_timestamp", None)
279
    obj.start_timestamp = state.get("start_timestamp", None)
280
    obj.end_timestamp = state.get("end_timestamp", None)
281

    
282
    obj.ops = []
283
    obj.log_serial = 0
284
    for op_state in state["ops"]:
285
      op = _QueuedOpCode.Restore(op_state)
286
      for log_entry in op.log:
287
        obj.log_serial = max(obj.log_serial, log_entry[0])
288
      obj.ops.append(op)
289

    
290
    cls._InitInMemory(obj, writable)
291

    
292
    return obj
293

    
294
  def Serialize(self):
295
    """Serialize the _JobQueue instance.
296

297
    @rtype: dict
298
    @return: the serialized state
299

300
    """
301
    return {
302
      "id": self.id,
303
      "ops": [op.Serialize() for op in self.ops],
304
      "start_timestamp": self.start_timestamp,
305
      "end_timestamp": self.end_timestamp,
306
      "received_timestamp": self.received_timestamp,
307
      }
308

    
309
  def CalcStatus(self):
310
    """Compute the status of this job.
311

312
    This function iterates over all the _QueuedOpCodes in the job and
313
    based on their status, computes the job status.
314

315
    The algorithm is:
316
      - if we find a cancelled, or finished with error, the job
317
        status will be the same
318
      - otherwise, the last opcode with the status one of:
319
          - waitlock
320
          - canceling
321
          - running
322

323
        will determine the job status
324

325
      - otherwise, it means either all opcodes are queued, or success,
326
        and the job status will be the same
327

328
    @return: the job status
329

330
    """
331
    status = constants.JOB_STATUS_QUEUED
332

    
333
    all_success = True
334
    for op in self.ops:
335
      if op.status == constants.OP_STATUS_SUCCESS:
336
        continue
337

    
338
      all_success = False
339

    
340
      if op.status == constants.OP_STATUS_QUEUED:
341
        pass
342
      elif op.status == constants.OP_STATUS_WAITING:
343
        status = constants.JOB_STATUS_WAITING
344
      elif op.status == constants.OP_STATUS_RUNNING:
345
        status = constants.JOB_STATUS_RUNNING
346
      elif op.status == constants.OP_STATUS_CANCELING:
347
        status = constants.JOB_STATUS_CANCELING
348
        break
349
      elif op.status == constants.OP_STATUS_ERROR:
350
        status = constants.JOB_STATUS_ERROR
351
        # The whole job fails if one opcode failed
352
        break
353
      elif op.status == constants.OP_STATUS_CANCELED:
354
        status = constants.OP_STATUS_CANCELED
355
        break
356

    
357
    if all_success:
358
      status = constants.JOB_STATUS_SUCCESS
359

    
360
    return status
361

    
362
  def CalcPriority(self):
363
    """Gets the current priority for this job.
364

365
    Only unfinished opcodes are considered. When all are done, the default
366
    priority is used.
367

368
    @rtype: int
369

370
    """
371
    priorities = [op.priority for op in self.ops
372
                  if op.status not in constants.OPS_FINALIZED]
373

    
374
    if not priorities:
375
      # All opcodes are done, assume default priority
376
      return constants.OP_PRIO_DEFAULT
377

    
378
    return min(priorities)
379

    
380
  def GetLogEntries(self, newer_than):
381
    """Selectively returns the log entries.
382

383
    @type newer_than: None or int
384
    @param newer_than: if this is None, return all log entries,
385
        otherwise return only the log entries with serial higher
386
        than this value
387
    @rtype: list
388
    @return: the list of the log entries selected
389

390
    """
391
    if newer_than is None:
392
      serial = -1
393
    else:
394
      serial = newer_than
395

    
396
    entries = []
397
    for op in self.ops:
398
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
399

    
400
    return entries
401

    
402
  def GetInfo(self, fields):
403
    """Returns information about a job.
404

405
    @type fields: list
406
    @param fields: names of fields to return
407
    @rtype: list
408
    @return: list with one element for each field
409
    @raise errors.OpExecError: when an invalid field
410
        has been passed
411

412
    """
413
    return _SimpleJobQuery(fields)(self)
414

    
415
  def MarkUnfinishedOps(self, status, result):
416
    """Mark unfinished opcodes with a given status and result.
417

418
    This is an utility function for marking all running or waiting to
419
    be run opcodes with a given status. Opcodes which are already
420
    finalised are not changed.
421

422
    @param status: a given opcode status
423
    @param result: the opcode result
424

425
    """
426
    not_marked = True
427
    for op in self.ops:
428
      if op.status in constants.OPS_FINALIZED:
429
        assert not_marked, "Finalized opcodes found after non-finalized ones"
430
        continue
431
      op.status = status
432
      op.result = result
433
      not_marked = False
434

    
435
  def Finalize(self):
436
    """Marks the job as finalized.
437

438
    """
439
    self.end_timestamp = TimeStampNow()
440

    
441
  def Cancel(self):
442
    """Marks job as canceled/-ing if possible.
443

444
    @rtype: tuple; (bool, string)
445
    @return: Boolean describing whether job was successfully canceled or marked
446
      as canceling and a text message
447

448
    """
449
    status = self.CalcStatus()
450

    
451
    if status == constants.JOB_STATUS_QUEUED:
452
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
453
                             "Job canceled by request")
454
      self.Finalize()
455
      return (True, "Job %s canceled" % self.id)
456

    
457
    elif status == constants.JOB_STATUS_WAITING:
458
      # The worker will notice the new status and cancel the job
459
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
460
      return (True, "Job %s will be canceled" % self.id)
461

    
462
    else:
463
      logging.debug("Job %s is no longer waiting in the queue", self.id)
464
      return (False, "Job %s is no longer waiting in the queue" % self.id)
465

    
466

    
467
class _OpExecCallbacks(mcpu.OpExecCbBase):
468
  def __init__(self, queue, job, op):
469
    """Initializes this class.
470

471
    @type queue: L{JobQueue}
472
    @param queue: Job queue
473
    @type job: L{_QueuedJob}
474
    @param job: Job object
475
    @type op: L{_QueuedOpCode}
476
    @param op: OpCode
477

478
    """
479
    assert queue, "Queue is missing"
480
    assert job, "Job is missing"
481
    assert op, "Opcode is missing"
482

    
483
    self._queue = queue
484
    self._job = job
485
    self._op = op
486

    
487
  def _CheckCancel(self):
488
    """Raises an exception to cancel the job if asked to.
489

490
    """
491
    # Cancel here if we were asked to
492
    if self._op.status == constants.OP_STATUS_CANCELING:
493
      logging.debug("Canceling opcode")
494
      raise CancelJob()
495

    
496
    # See if queue is shutting down
497
    if not self._queue.AcceptingJobsUnlocked():
498
      logging.debug("Queue is shutting down")
499
      raise QueueShutdown()
500

    
501
  @locking.ssynchronized(_QUEUE, shared=1)
502
  def NotifyStart(self):
503
    """Mark the opcode as running, not lock-waiting.
504

505
    This is called from the mcpu code as a notifier function, when the LU is
506
    finally about to start the Exec() method. Of course, to have end-user
507
    visible results, the opcode must be initially (before calling into
508
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
509

510
    """
511
    assert self._op in self._job.ops
512
    assert self._op.status in (constants.OP_STATUS_WAITING,
513
                               constants.OP_STATUS_CANCELING)
514

    
515
    # Cancel here if we were asked to
516
    self._CheckCancel()
517

    
518
    logging.debug("Opcode is now running")
519

    
520
    self._op.status = constants.OP_STATUS_RUNNING
521
    self._op.exec_timestamp = TimeStampNow()
522

    
523
    # And finally replicate the job status
524
    self._queue.UpdateJobUnlocked(self._job)
525

    
526
  @locking.ssynchronized(_QUEUE, shared=1)
527
  def _AppendFeedback(self, timestamp, log_type, log_msg):
528
    """Internal feedback append function, with locks
529

530
    """
531
    self._job.log_serial += 1
532
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
533
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
534

    
535
  def Feedback(self, *args):
536
    """Append a log entry.
537

538
    """
539
    assert len(args) < 3
540

    
541
    if len(args) == 1:
542
      log_type = constants.ELOG_MESSAGE
543
      log_msg = args[0]
544
    else:
545
      (log_type, log_msg) = args
546

    
547
    # The time is split to make serialization easier and not lose
548
    # precision.
549
    timestamp = utils.SplitTime(time.time())
550
    self._AppendFeedback(timestamp, log_type, log_msg)
551

    
552
  def CheckCancel(self):
553
    """Check whether job has been cancelled.
554

555
    """
556
    assert self._op.status in (constants.OP_STATUS_WAITING,
557
                               constants.OP_STATUS_CANCELING)
558

    
559
    # Cancel here if we were asked to
560
    self._CheckCancel()
561

    
562
  def SubmitManyJobs(self, jobs):
563
    """Submits jobs for processing.
564

565
    See L{JobQueue.SubmitManyJobs}.
566

567
    """
568
    # Locking is done in job queue
569
    return self._queue.SubmitManyJobs(jobs)
570

    
571

    
572
class _JobChangesChecker(object):
573
  def __init__(self, fields, prev_job_info, prev_log_serial):
574
    """Initializes this class.
575

576
    @type fields: list of strings
577
    @param fields: Fields requested by LUXI client
578
    @type prev_job_info: string
579
    @param prev_job_info: previous job info, as passed by the LUXI client
580
    @type prev_log_serial: string
581
    @param prev_log_serial: previous job serial, as passed by the LUXI client
582

583
    """
584
    self._squery = _SimpleJobQuery(fields)
585
    self._prev_job_info = prev_job_info
586
    self._prev_log_serial = prev_log_serial
587

    
588
  def __call__(self, job):
589
    """Checks whether job has changed.
590

591
    @type job: L{_QueuedJob}
592
    @param job: Job object
593

594
    """
595
    assert not job.writable, "Expected read-only job"
596

    
597
    status = job.CalcStatus()
598
    job_info = self._squery(job)
599
    log_entries = job.GetLogEntries(self._prev_log_serial)
600

    
601
    # Serializing and deserializing data can cause type changes (e.g. from
602
    # tuple to list) or precision loss. We're doing it here so that we get
603
    # the same modifications as the data received from the client. Without
604
    # this, the comparison afterwards might fail without the data being
605
    # significantly different.
606
    # TODO: we just deserialized from disk, investigate how to make sure that
607
    # the job info and log entries are compatible to avoid this further step.
608
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
609
    # efficient, though floats will be tricky
610
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
611
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
612

    
613
    # Don't even try to wait if the job is no longer running, there will be
614
    # no changes.
615
    if (status not in (constants.JOB_STATUS_QUEUED,
616
                       constants.JOB_STATUS_RUNNING,
617
                       constants.JOB_STATUS_WAITING) or
618
        job_info != self._prev_job_info or
619
        (log_entries and self._prev_log_serial != log_entries[0][0])):
620
      logging.debug("Job %s changed", job.id)
621
      return (job_info, log_entries)
622

    
623
    return None
624

    
625

    
626
class _JobFileChangesWaiter(object):
627
  def __init__(self, filename):
628
    """Initializes this class.
629

630
    @type filename: string
631
    @param filename: Path to job file
632
    @raises errors.InotifyError: if the notifier cannot be setup
633

634
    """
635
    self._wm = pyinotify.WatchManager()
636
    self._inotify_handler = \
637
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
638
    self._notifier = \
639
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
640
    try:
641
      self._inotify_handler.enable()
642
    except Exception:
643
      # pyinotify doesn't close file descriptors automatically
644
      self._notifier.stop()
645
      raise
646

    
647
  def _OnInotify(self, notifier_enabled):
648
    """Callback for inotify.
649

650
    """
651
    if not notifier_enabled:
652
      self._inotify_handler.enable()
653

    
654
  def Wait(self, timeout):
655
    """Waits for the job file to change.
656

657
    @type timeout: float
658
    @param timeout: Timeout in seconds
659
    @return: Whether there have been events
660

661
    """
662
    assert timeout >= 0
663
    have_events = self._notifier.check_events(timeout * 1000)
664
    if have_events:
665
      self._notifier.read_events()
666
    self._notifier.process_events()
667
    return have_events
668

    
669
  def Close(self):
670
    """Closes underlying notifier and its file descriptor.
671

672
    """
673
    self._notifier.stop()
674

    
675

    
676
class _JobChangesWaiter(object):
677
  def __init__(self, filename):
678
    """Initializes this class.
679

680
    @type filename: string
681
    @param filename: Path to job file
682

683
    """
684
    self._filewaiter = None
685
    self._filename = filename
686

    
687
  def Wait(self, timeout):
688
    """Waits for a job to change.
689

690
    @type timeout: float
691
    @param timeout: Timeout in seconds
692
    @return: Whether there have been events
693

694
    """
695
    if self._filewaiter:
696
      return self._filewaiter.Wait(timeout)
697

    
698
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
699
    # If this point is reached, return immediately and let caller check the job
700
    # file again in case there were changes since the last check. This avoids a
701
    # race condition.
702
    self._filewaiter = _JobFileChangesWaiter(self._filename)
703

    
704
    return True
705

    
706
  def Close(self):
707
    """Closes underlying waiter.
708

709
    """
710
    if self._filewaiter:
711
      self._filewaiter.Close()
712

    
713

    
714
class _WaitForJobChangesHelper(object):
715
  """Helper class using inotify to wait for changes in a job file.
716

717
  This class takes a previous job status and serial, and alerts the client when
718
  the current job status has changed.
719

720
  """
721
  @staticmethod
722
  def _CheckForChanges(counter, job_load_fn, check_fn):
723
    if counter.next() > 0:
724
      # If this isn't the first check the job is given some more time to change
725
      # again. This gives better performance for jobs generating many
726
      # changes/messages.
727
      time.sleep(0.1)
728

    
729
    job = job_load_fn()
730
    if not job:
731
      raise errors.JobLost()
732

    
733
    result = check_fn(job)
734
    if result is None:
735
      raise utils.RetryAgain()
736

    
737
    return result
738

    
739
  def __call__(self, filename, job_load_fn,
740
               fields, prev_job_info, prev_log_serial, timeout):
741
    """Waits for changes on a job.
742

743
    @type filename: string
744
    @param filename: File on which to wait for changes
745
    @type job_load_fn: callable
746
    @param job_load_fn: Function to load job
747
    @type fields: list of strings
748
    @param fields: Which fields to check for changes
749
    @type prev_job_info: list or None
750
    @param prev_job_info: Last job information returned
751
    @type prev_log_serial: int
752
    @param prev_log_serial: Last job message serial number
753
    @type timeout: float
754
    @param timeout: maximum time to wait in seconds
755

756
    """
757
    counter = itertools.count()
758
    try:
759
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
760
      waiter = _JobChangesWaiter(filename)
761
      try:
762
        return utils.Retry(compat.partial(self._CheckForChanges,
763
                                          counter, job_load_fn, check_fn),
764
                           utils.RETRY_REMAINING_TIME, timeout,
765
                           wait_fn=waiter.Wait)
766
      finally:
767
        waiter.Close()
768
    except (errors.InotifyError, errors.JobLost):
769
      return None
770
    except utils.RetryTimeout:
771
      return constants.JOB_NOTCHANGED
772

    
773

    
774
def _EncodeOpError(err):
775
  """Encodes an error which occurred while processing an opcode.
776

777
  """
778
  if isinstance(err, errors.GenericError):
779
    to_encode = err
780
  else:
781
    to_encode = errors.OpExecError(str(err))
782

    
783
  return errors.EncodeException(to_encode)
784

    
785

    
786
class _TimeoutStrategyWrapper:
787
  def __init__(self, fn):
788
    """Initializes this class.
789

790
    """
791
    self._fn = fn
792
    self._next = None
793

    
794
  def _Advance(self):
795
    """Gets the next timeout if necessary.
796

797
    """
798
    if self._next is None:
799
      self._next = self._fn()
800

    
801
  def Peek(self):
802
    """Returns the next timeout.
803

804
    """
805
    self._Advance()
806
    return self._next
807

    
808
  def Next(self):
809
    """Returns the current timeout and advances the internal state.
810

811
    """
812
    self._Advance()
813
    result = self._next
814
    self._next = None
815
    return result
816

    
817

    
818
class _OpExecContext:
819
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
820
    """Initializes this class.
821

822
    """
823
    self.op = op
824
    self.index = index
825
    self.log_prefix = log_prefix
826
    self.summary = op.input.Summary()
827

    
828
    # Create local copy to modify
829
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
830
      self.jobdeps = op.input.depends[:]
831
    else:
832
      self.jobdeps = None
833

    
834
    self._timeout_strategy_factory = timeout_strategy_factory
835
    self._ResetTimeoutStrategy()
836

    
837
  def _ResetTimeoutStrategy(self):
838
    """Creates a new timeout strategy.
839

840
    """
841
    self._timeout_strategy = \
842
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
843

    
844
  def CheckPriorityIncrease(self):
845
    """Checks whether priority can and should be increased.
846

847
    Called when locks couldn't be acquired.
848

849
    """
850
    op = self.op
851

    
852
    # Exhausted all retries and next round should not use blocking acquire
853
    # for locks?
854
    if (self._timeout_strategy.Peek() is None and
855
        op.priority > constants.OP_PRIO_HIGHEST):
856
      logging.debug("Increasing priority")
857
      op.priority -= 1
858
      self._ResetTimeoutStrategy()
859
      return True
860

    
861
    return False
862

    
863
  def GetNextLockTimeout(self):
864
    """Returns the next lock acquire timeout.
865

866
    """
867
    return self._timeout_strategy.Next()
868

    
869

    
870
class _JobProcessor(object):
871
  (DEFER,
872
   WAITDEP,
873
   FINISHED) = range(1, 4)
874

    
875
  def __init__(self, queue, opexec_fn, job,
876
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
877
    """Initializes this class.
878

879
    """
880
    self.queue = queue
881
    self.opexec_fn = opexec_fn
882
    self.job = job
883
    self._timeout_strategy_factory = _timeout_strategy_factory
884

    
885
  @staticmethod
886
  def _FindNextOpcode(job, timeout_strategy_factory):
887
    """Locates the next opcode to run.
888

889
    @type job: L{_QueuedJob}
890
    @param job: Job object
891
    @param timeout_strategy_factory: Callable to create new timeout strategy
892

893
    """
894
    # Create some sort of a cache to speed up locating next opcode for future
895
    # lookups
896
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
897
    # pending and one for processed ops.
898
    if job.ops_iter is None:
899
      job.ops_iter = enumerate(job.ops)
900

    
901
    # Find next opcode to run
902
    while True:
903
      try:
904
        (idx, op) = job.ops_iter.next()
905
      except StopIteration:
906
        raise errors.ProgrammerError("Called for a finished job")
907

    
908
      if op.status == constants.OP_STATUS_RUNNING:
909
        # Found an opcode already marked as running
910
        raise errors.ProgrammerError("Called for job marked as running")
911

    
912
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
913
                             timeout_strategy_factory)
914

    
915
      if op.status not in constants.OPS_FINALIZED:
916
        return opctx
917

    
918
      # This is a job that was partially completed before master daemon
919
      # shutdown, so it can be expected that some opcodes are already
920
      # completed successfully (if any did error out, then the whole job
921
      # should have been aborted and not resubmitted for processing).
922
      logging.info("%s: opcode %s already processed, skipping",
923
                   opctx.log_prefix, opctx.summary)
924

    
925
  @staticmethod
926
  def _MarkWaitlock(job, op):
927
    """Marks an opcode as waiting for locks.
928

929
    The job's start timestamp is also set if necessary.
930

931
    @type job: L{_QueuedJob}
932
    @param job: Job object
933
    @type op: L{_QueuedOpCode}
934
    @param op: Opcode object
935

936
    """
937
    assert op in job.ops
938
    assert op.status in (constants.OP_STATUS_QUEUED,
939
                         constants.OP_STATUS_WAITING)
940

    
941
    update = False
942

    
943
    op.result = None
944

    
945
    if op.status == constants.OP_STATUS_QUEUED:
946
      op.status = constants.OP_STATUS_WAITING
947
      update = True
948

    
949
    if op.start_timestamp is None:
950
      op.start_timestamp = TimeStampNow()
951
      update = True
952

    
953
    if job.start_timestamp is None:
954
      job.start_timestamp = op.start_timestamp
955
      update = True
956

    
957
    assert op.status == constants.OP_STATUS_WAITING
958

    
959
    return update
960

    
961
  @staticmethod
962
  def _CheckDependencies(queue, job, opctx):
963
    """Checks if an opcode has dependencies and if so, processes them.
964

965
    @type queue: L{JobQueue}
966
    @param queue: Queue object
967
    @type job: L{_QueuedJob}
968
    @param job: Job object
969
    @type opctx: L{_OpExecContext}
970
    @param opctx: Opcode execution context
971
    @rtype: bool
972
    @return: Whether opcode will be re-scheduled by dependency tracker
973

974
    """
975
    op = opctx.op
976

    
977
    result = False
978

    
979
    while opctx.jobdeps:
980
      (dep_job_id, dep_status) = opctx.jobdeps[0]
981

    
982
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
983
                                                          dep_status)
984
      assert ht.TNonEmptyString(depmsg), "No dependency message"
985

    
986
      logging.info("%s: %s", opctx.log_prefix, depmsg)
987

    
988
      if depresult == _JobDependencyManager.CONTINUE:
989
        # Remove dependency and continue
990
        opctx.jobdeps.pop(0)
991

    
992
      elif depresult == _JobDependencyManager.WAIT:
993
        # Need to wait for notification, dependency tracker will re-add job
994
        # to workerpool
995
        result = True
996
        break
997

    
998
      elif depresult == _JobDependencyManager.CANCEL:
999
        # Job was cancelled, cancel this job as well
1000
        job.Cancel()
1001
        assert op.status == constants.OP_STATUS_CANCELING
1002
        break
1003

    
1004
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1005
                         _JobDependencyManager.ERROR):
1006
        # Job failed or there was an error, this job must fail
1007
        op.status = constants.OP_STATUS_ERROR
1008
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1009
        break
1010

    
1011
      else:
1012
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1013
                                     depresult)
1014

    
1015
    return result
1016

    
1017
  def _ExecOpCodeUnlocked(self, opctx):
1018
    """Processes one opcode and returns the result.
1019

1020
    """
1021
    op = opctx.op
1022

    
1023
    assert op.status == constants.OP_STATUS_WAITING
1024

    
1025
    timeout = opctx.GetNextLockTimeout()
1026

    
1027
    try:
1028
      # Make sure not to hold queue lock while calling ExecOpCode
1029
      result = self.opexec_fn(op.input,
1030
                              _OpExecCallbacks(self.queue, self.job, op),
1031
                              timeout=timeout, priority=op.priority)
1032
    except mcpu.LockAcquireTimeout:
1033
      assert timeout is not None, "Received timeout for blocking acquire"
1034
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1035

    
1036
      assert op.status in (constants.OP_STATUS_WAITING,
1037
                           constants.OP_STATUS_CANCELING)
1038

    
1039
      # Was job cancelled while we were waiting for the lock?
1040
      if op.status == constants.OP_STATUS_CANCELING:
1041
        return (constants.OP_STATUS_CANCELING, None)
1042

    
1043
      # Queue is shutting down, return to queued
1044
      if not self.queue.AcceptingJobsUnlocked():
1045
        return (constants.OP_STATUS_QUEUED, None)
1046

    
1047
      # Stay in waitlock while trying to re-acquire lock
1048
      return (constants.OP_STATUS_WAITING, None)
1049
    except CancelJob:
1050
      logging.exception("%s: Canceling job", opctx.log_prefix)
1051
      assert op.status == constants.OP_STATUS_CANCELING
1052
      return (constants.OP_STATUS_CANCELING, None)
1053

    
1054
    except QueueShutdown:
1055
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1056

    
1057
      assert op.status == constants.OP_STATUS_WAITING
1058

    
1059
      # Job hadn't been started yet, so it should return to the queue
1060
      return (constants.OP_STATUS_QUEUED, None)
1061

    
1062
    except Exception, err: # pylint: disable=W0703
1063
      logging.exception("%s: Caught exception in %s",
1064
                        opctx.log_prefix, opctx.summary)
1065
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1066
    else:
1067
      logging.debug("%s: %s successful",
1068
                    opctx.log_prefix, opctx.summary)
1069
      return (constants.OP_STATUS_SUCCESS, result)
1070

    
1071
  def __call__(self, _nextop_fn=None):
1072
    """Continues execution of a job.
1073

1074
    @param _nextop_fn: Callback function for tests
1075
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1076
      be deferred and C{WAITDEP} if the dependency manager
1077
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1078

1079
    """
1080
    queue = self.queue
1081
    job = self.job
1082

    
1083
    logging.debug("Processing job %s", job.id)
1084

    
1085
    queue.acquire(shared=1)
1086
    try:
1087
      opcount = len(job.ops)
1088

    
1089
      assert job.writable, "Expected writable job"
1090

    
1091
      # Don't do anything for finalized jobs
1092
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1093
        return self.FINISHED
1094

    
1095
      # Is a previous opcode still pending?
1096
      if job.cur_opctx:
1097
        opctx = job.cur_opctx
1098
        job.cur_opctx = None
1099
      else:
1100
        if __debug__ and _nextop_fn:
1101
          _nextop_fn()
1102
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1103

    
1104
      op = opctx.op
1105

    
1106
      # Consistency check
1107
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1108
                                     constants.OP_STATUS_CANCELING)
1109
                        for i in job.ops[opctx.index + 1:])
1110

    
1111
      assert op.status in (constants.OP_STATUS_QUEUED,
1112
                           constants.OP_STATUS_WAITING,
1113
                           constants.OP_STATUS_CANCELING)
1114

    
1115
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1116
              op.priority >= constants.OP_PRIO_HIGHEST)
1117

    
1118
      waitjob = None
1119

    
1120
      if op.status != constants.OP_STATUS_CANCELING:
1121
        assert op.status in (constants.OP_STATUS_QUEUED,
1122
                             constants.OP_STATUS_WAITING)
1123

    
1124
        # Prepare to start opcode
1125
        if self._MarkWaitlock(job, op):
1126
          # Write to disk
1127
          queue.UpdateJobUnlocked(job)
1128

    
1129
        assert op.status == constants.OP_STATUS_WAITING
1130
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1131
        assert job.start_timestamp and op.start_timestamp
1132
        assert waitjob is None
1133

    
1134
        # Check if waiting for a job is necessary
1135
        waitjob = self._CheckDependencies(queue, job, opctx)
1136

    
1137
        assert op.status in (constants.OP_STATUS_WAITING,
1138
                             constants.OP_STATUS_CANCELING,
1139
                             constants.OP_STATUS_ERROR)
1140

    
1141
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1142
                                         constants.OP_STATUS_ERROR)):
1143
          logging.info("%s: opcode %s waiting for locks",
1144
                       opctx.log_prefix, opctx.summary)
1145

    
1146
          assert not opctx.jobdeps, "Not all dependencies were removed"
1147

    
1148
          queue.release()
1149
          try:
1150
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1151
          finally:
1152
            queue.acquire(shared=1)
1153

    
1154
          op.status = op_status
1155
          op.result = op_result
1156

    
1157
          assert not waitjob
1158

    
1159
        if op.status in (constants.OP_STATUS_WAITING,
1160
                         constants.OP_STATUS_QUEUED):
1161
          # waiting: Couldn't get locks in time
1162
          # queued: Queue is shutting down
1163
          assert not op.end_timestamp
1164
        else:
1165
          # Finalize opcode
1166
          op.end_timestamp = TimeStampNow()
1167

    
1168
          if op.status == constants.OP_STATUS_CANCELING:
1169
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1170
                                  for i in job.ops[opctx.index:])
1171
          else:
1172
            assert op.status in constants.OPS_FINALIZED
1173

    
1174
      if op.status == constants.OP_STATUS_QUEUED:
1175
        # Queue is shutting down
1176
        assert not waitjob
1177

    
1178
        finalize = False
1179

    
1180
        # Reset context
1181
        job.cur_opctx = None
1182

    
1183
        # In no case must the status be finalized here
1184
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1185

    
1186
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1187
        finalize = False
1188

    
1189
        if not waitjob and opctx.CheckPriorityIncrease():
1190
          # Priority was changed, need to update on-disk file
1191
          queue.UpdateJobUnlocked(job)
1192

    
1193
        # Keep around for another round
1194
        job.cur_opctx = opctx
1195

    
1196
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1197
                op.priority >= constants.OP_PRIO_HIGHEST)
1198

    
1199
        # In no case must the status be finalized here
1200
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1201

    
1202
      else:
1203
        # Ensure all opcodes so far have been successful
1204
        assert (opctx.index == 0 or
1205
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1206
                           for i in job.ops[:opctx.index]))
1207

    
1208
        # Reset context
1209
        job.cur_opctx = None
1210

    
1211
        if op.status == constants.OP_STATUS_SUCCESS:
1212
          finalize = False
1213

    
1214
        elif op.status == constants.OP_STATUS_ERROR:
1215
          # Ensure failed opcode has an exception as its result
1216
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1217

    
1218
          to_encode = errors.OpExecError("Preceding opcode failed")
1219
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1220
                                _EncodeOpError(to_encode))
1221
          finalize = True
1222

    
1223
          # Consistency check
1224
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1225
                            errors.GetEncodedError(i.result)
1226
                            for i in job.ops[opctx.index:])
1227

    
1228
        elif op.status == constants.OP_STATUS_CANCELING:
1229
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1230
                                "Job canceled by request")
1231
          finalize = True
1232

    
1233
        else:
1234
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1235

    
1236
        if opctx.index == (opcount - 1):
1237
          # Finalize on last opcode
1238
          finalize = True
1239

    
1240
        if finalize:
1241
          # All opcodes have been run, finalize job
1242
          job.Finalize()
1243

    
1244
        # Write to disk. If the job status is final, this is the final write
1245
        # allowed. Once the file has been written, it can be archived anytime.
1246
        queue.UpdateJobUnlocked(job)
1247

    
1248
        assert not waitjob
1249

    
1250
        if finalize:
1251
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1252
          return self.FINISHED
1253

    
1254
      assert not waitjob or queue.depmgr.JobWaiting(job)
1255

    
1256
      if waitjob:
1257
        return self.WAITDEP
1258
      else:
1259
        return self.DEFER
1260
    finally:
1261
      assert job.writable, "Job became read-only while being processed"
1262
      queue.release()
1263

    
1264

    
1265
def _EvaluateJobProcessorResult(depmgr, job, result):
1266
  """Looks at a result from L{_JobProcessor} for a job.
1267

1268
  To be used in a L{_JobQueueWorker}.
1269

1270
  """
1271
  if result == _JobProcessor.FINISHED:
1272
    # Notify waiting jobs
1273
    depmgr.NotifyWaiters(job.id)
1274

    
1275
  elif result == _JobProcessor.DEFER:
1276
    # Schedule again
1277
    raise workerpool.DeferTask(priority=job.CalcPriority())
1278

    
1279
  elif result == _JobProcessor.WAITDEP:
1280
    # No-op, dependency manager will re-schedule
1281
    pass
1282

    
1283
  else:
1284
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1285
                                 (result, ))
1286

    
1287

    
1288
class _JobQueueWorker(workerpool.BaseWorker):
1289
  """The actual job workers.
1290

1291
  """
1292
  def RunTask(self, job): # pylint: disable=W0221
1293
    """Job executor.
1294

1295
    @type job: L{_QueuedJob}
1296
    @param job: the job to be processed
1297

1298
    """
1299
    assert job.writable, "Expected writable job"
1300

    
1301
    # Ensure only one worker is active on a single job. If a job registers for
1302
    # a dependency job, and the other job notifies before the first worker is
1303
    # done, the job can end up in the tasklist more than once.
1304
    job.processor_lock.acquire()
1305
    try:
1306
      return self._RunTaskInner(job)
1307
    finally:
1308
      job.processor_lock.release()
1309

    
1310
  def _RunTaskInner(self, job):
1311
    """Executes a job.
1312

1313
    Must be called with per-job lock acquired.
1314

1315
    """
1316
    queue = job.queue
1317
    assert queue == self.pool.queue
1318

    
1319
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1320
    setname_fn(None)
1321

    
1322
    proc = mcpu.Processor(queue.context, job.id)
1323

    
1324
    # Create wrapper for setting thread name
1325
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1326
                                    proc.ExecOpCode)
1327

    
1328
    _EvaluateJobProcessorResult(queue.depmgr, job,
1329
                                _JobProcessor(queue, wrap_execop_fn, job)())
1330

    
1331
  @staticmethod
1332
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1333
    """Updates the worker thread name to include a short summary of the opcode.
1334

1335
    @param setname_fn: Callable setting worker thread name
1336
    @param execop_fn: Callable for executing opcode (usually
1337
                      L{mcpu.Processor.ExecOpCode})
1338

1339
    """
1340
    setname_fn(op)
1341
    try:
1342
      return execop_fn(op, *args, **kwargs)
1343
    finally:
1344
      setname_fn(None)
1345

    
1346
  @staticmethod
1347
  def _GetWorkerName(job, op):
1348
    """Sets the worker thread name.
1349

1350
    @type job: L{_QueuedJob}
1351
    @type op: L{opcodes.OpCode}
1352

1353
    """
1354
    parts = ["Job%s" % job.id]
1355

    
1356
    if op:
1357
      parts.append(op.TinySummary())
1358

    
1359
    return "/".join(parts)
1360

    
1361

    
1362
class _JobQueueWorkerPool(workerpool.WorkerPool):
1363
  """Simple class implementing a job-processing workerpool.
1364

1365
  """
1366
  def __init__(self, queue):
1367
    super(_JobQueueWorkerPool, self).__init__("Jq",
1368
                                              JOBQUEUE_THREADS,
1369
                                              _JobQueueWorker)
1370
    self.queue = queue
1371

    
1372

    
1373
class _JobDependencyManager:
1374
  """Keeps track of job dependencies.
1375

1376
  """
1377
  (WAIT,
1378
   ERROR,
1379
   CANCEL,
1380
   CONTINUE,
1381
   WRONGSTATUS) = range(1, 6)
1382

    
1383
  def __init__(self, getstatus_fn, enqueue_fn):
1384
    """Initializes this class.
1385

1386
    """
1387
    self._getstatus_fn = getstatus_fn
1388
    self._enqueue_fn = enqueue_fn
1389

    
1390
    self._waiters = {}
1391
    self._lock = locking.SharedLock("JobDepMgr")
1392

    
1393
  @locking.ssynchronized(_LOCK, shared=1)
1394
  def GetLockInfo(self, requested): # pylint: disable=W0613
1395
    """Retrieves information about waiting jobs.
1396

1397
    @type requested: set
1398
    @param requested: Requested information, see C{query.LQ_*}
1399

1400
    """
1401
    # No need to sort here, that's being done by the lock manager and query
1402
    # library. There are no priorities for notifying jobs, hence all show up as
1403
    # one item under "pending".
1404
    return [("job/%s" % job_id, None, None,
1405
             [("job", [job.id for job in waiters])])
1406
            for job_id, waiters in self._waiters.items()
1407
            if waiters]
1408

    
1409
  @locking.ssynchronized(_LOCK, shared=1)
1410
  def JobWaiting(self, job):
1411
    """Checks if a job is waiting.
1412

1413
    """
1414
    return compat.any(job in jobs
1415
                      for jobs in self._waiters.values())
1416

    
1417
  @locking.ssynchronized(_LOCK)
1418
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1419
    """Checks if a dependency job has the requested status.
1420

1421
    If the other job is not yet in a finalized status, the calling job will be
1422
    notified (re-added to the workerpool) at a later point.
1423

1424
    @type job: L{_QueuedJob}
1425
    @param job: Job object
1426
    @type dep_job_id: string
1427
    @param dep_job_id: ID of dependency job
1428
    @type dep_status: list
1429
    @param dep_status: Required status
1430

1431
    """
1432
    assert ht.TString(job.id)
1433
    assert ht.TString(dep_job_id)
1434
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1435

    
1436
    if job.id == dep_job_id:
1437
      return (self.ERROR, "Job can't depend on itself")
1438

    
1439
    # Get status of dependency job
1440
    try:
1441
      status = self._getstatus_fn(dep_job_id)
1442
    except errors.JobLost, err:
1443
      return (self.ERROR, "Dependency error: %s" % err)
1444

    
1445
    assert status in constants.JOB_STATUS_ALL
1446

    
1447
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1448

    
1449
    if status not in constants.JOBS_FINALIZED:
1450
      # Register for notification and wait for job to finish
1451
      job_id_waiters.add(job)
1452
      return (self.WAIT,
1453
              "Need to wait for job %s, wanted status '%s'" %
1454
              (dep_job_id, dep_status))
1455

    
1456
    # Remove from waiters list
1457
    if job in job_id_waiters:
1458
      job_id_waiters.remove(job)
1459

    
1460
    if (status == constants.JOB_STATUS_CANCELED and
1461
        constants.JOB_STATUS_CANCELED not in dep_status):
1462
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1463

    
1464
    elif not dep_status or status in dep_status:
1465
      return (self.CONTINUE,
1466
              "Dependency job %s finished with status '%s'" %
1467
              (dep_job_id, status))
1468

    
1469
    else:
1470
      return (self.WRONGSTATUS,
1471
              "Dependency job %s finished with status '%s',"
1472
              " not one of '%s' as required" %
1473
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1474

    
1475
  def _RemoveEmptyWaitersUnlocked(self):
1476
    """Remove all jobs without actual waiters.
1477

1478
    """
1479
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1480
                   if not waiters]:
1481
      del self._waiters[job_id]
1482

    
1483
  def NotifyWaiters(self, job_id):
1484
    """Notifies all jobs waiting for a certain job ID.
1485

1486
    @attention: Do not call until L{CheckAndRegister} returned a status other
1487
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1488
    @type job_id: string
1489
    @param job_id: Job ID
1490

1491
    """
1492
    assert ht.TString(job_id)
1493

    
1494
    self._lock.acquire()
1495
    try:
1496
      self._RemoveEmptyWaitersUnlocked()
1497

    
1498
      jobs = self._waiters.pop(job_id, None)
1499
    finally:
1500
      self._lock.release()
1501

    
1502
    if jobs:
1503
      # Re-add jobs to workerpool
1504
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1505
                    len(jobs), job_id)
1506
      self._enqueue_fn(jobs)
1507

    
1508

    
1509
def _RequireOpenQueue(fn):
1510
  """Decorator for "public" functions.
1511

1512
  This function should be used for all 'public' functions. That is,
1513
  functions usually called from other classes. Note that this should
1514
  be applied only to methods (not plain functions), since it expects
1515
  that the decorated function is called with a first argument that has
1516
  a '_queue_filelock' argument.
1517

1518
  @warning: Use this decorator only after locking.ssynchronized
1519

1520
  Example::
1521
    @locking.ssynchronized(_LOCK)
1522
    @_RequireOpenQueue
1523
    def Example(self):
1524
      pass
1525

1526
  """
1527
  def wrapper(self, *args, **kwargs):
1528
    # pylint: disable=W0212
1529
    assert self._queue_filelock is not None, "Queue should be open"
1530
    return fn(self, *args, **kwargs)
1531
  return wrapper
1532

    
1533

    
1534
def _RequireNonDrainedQueue(fn):
1535
  """Decorator checking for a non-drained queue.
1536

1537
  To be used with functions submitting new jobs.
1538

1539
  """
1540
  def wrapper(self, *args, **kwargs):
1541
    """Wrapper function.
1542

1543
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1544

1545
    """
1546
    # Ok when sharing the big job queue lock, as the drain file is created when
1547
    # the lock is exclusive.
1548
    # Needs access to protected member, pylint: disable=W0212
1549
    if self._drained:
1550
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1551

    
1552
    if not self._accepting_jobs:
1553
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1554

    
1555
    return fn(self, *args, **kwargs)
1556
  return wrapper
1557

    
1558

    
1559
class JobQueue(object):
1560
  """Queue used to manage the jobs.
1561

1562
  """
1563
  def __init__(self, context):
1564
    """Constructor for JobQueue.
1565

1566
    The constructor will initialize the job queue object and then
1567
    start loading the current jobs from disk, either for starting them
1568
    (if they were queue) or for aborting them (if they were already
1569
    running).
1570

1571
    @type context: GanetiContext
1572
    @param context: the context object for access to the configuration
1573
        data and other ganeti objects
1574

1575
    """
1576
    self.context = context
1577
    self._memcache = weakref.WeakValueDictionary()
1578
    self._my_hostname = netutils.Hostname.GetSysName()
1579

    
1580
    # The Big JobQueue lock. If a code block or method acquires it in shared
1581
    # mode safe it must guarantee concurrency with all the code acquiring it in
1582
    # shared mode, including itself. In order not to acquire it at all
1583
    # concurrency must be guaranteed with all code acquiring it in shared mode
1584
    # and all code acquiring it exclusively.
1585
    self._lock = locking.SharedLock("JobQueue")
1586

    
1587
    self.acquire = self._lock.acquire
1588
    self.release = self._lock.release
1589

    
1590
    # Accept jobs by default
1591
    self._accepting_jobs = True
1592

    
1593
    # Initialize the queue, and acquire the filelock.
1594
    # This ensures no other process is working on the job queue.
1595
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1596

    
1597
    # Read serial file
1598
    self._last_serial = jstore.ReadSerial()
1599
    assert self._last_serial is not None, ("Serial file was modified between"
1600
                                           " check in jstore and here")
1601

    
1602
    # Get initial list of nodes
1603
    self._nodes = dict((n.name, n.primary_ip)
1604
                       for n in self.context.cfg.GetAllNodesInfo().values()
1605
                       if n.master_candidate)
1606

    
1607
    # Remove master node
1608
    self._nodes.pop(self._my_hostname, None)
1609

    
1610
    # TODO: Check consistency across nodes
1611

    
1612
    self._queue_size = None
1613
    self._UpdateQueueSizeUnlocked()
1614
    assert ht.TInt(self._queue_size)
1615
    self._drained = jstore.CheckDrainFlag()
1616

    
1617
    # Job dependencies
1618
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1619
                                        self._EnqueueJobs)
1620
    self.context.glm.AddToLockMonitor(self.depmgr)
1621

    
1622
    # Setup worker pool
1623
    self._wpool = _JobQueueWorkerPool(self)
1624
    try:
1625
      self._InspectQueue()
1626
    except:
1627
      self._wpool.TerminateWorkers()
1628
      raise
1629

    
1630
  @locking.ssynchronized(_LOCK)
1631
  @_RequireOpenQueue
1632
  def _InspectQueue(self):
1633
    """Loads the whole job queue and resumes unfinished jobs.
1634

1635
    This function needs the lock here because WorkerPool.AddTask() may start a
1636
    job while we're still doing our work.
1637

1638
    """
1639
    logging.info("Inspecting job queue")
1640

    
1641
    restartjobs = []
1642

    
1643
    all_job_ids = self._GetJobIDsUnlocked()
1644
    jobs_count = len(all_job_ids)
1645
    lastinfo = time.time()
1646
    for idx, job_id in enumerate(all_job_ids):
1647
      # Give an update every 1000 jobs or 10 seconds
1648
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1649
          idx == (jobs_count - 1)):
1650
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1651
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1652
        lastinfo = time.time()
1653

    
1654
      job = self._LoadJobUnlocked(job_id)
1655

    
1656
      # a failure in loading the job can cause 'None' to be returned
1657
      if job is None:
1658
        continue
1659

    
1660
      status = job.CalcStatus()
1661

    
1662
      if status == constants.JOB_STATUS_QUEUED:
1663
        restartjobs.append(job)
1664

    
1665
      elif status in (constants.JOB_STATUS_RUNNING,
1666
                      constants.JOB_STATUS_WAITING,
1667
                      constants.JOB_STATUS_CANCELING):
1668
        logging.warning("Unfinished job %s found: %s", job.id, job)
1669

    
1670
        if status == constants.JOB_STATUS_WAITING:
1671
          # Restart job
1672
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1673
          restartjobs.append(job)
1674
        else:
1675
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1676
                                "Unclean master daemon shutdown")
1677
          job.Finalize()
1678

    
1679
        self.UpdateJobUnlocked(job)
1680

    
1681
    if restartjobs:
1682
      logging.info("Restarting %s jobs", len(restartjobs))
1683
      self._EnqueueJobsUnlocked(restartjobs)
1684

    
1685
    logging.info("Job queue inspection finished")
1686

    
1687
  def _GetRpc(self, address_list):
1688
    """Gets RPC runner with context.
1689

1690
    """
1691
    return rpc.JobQueueRunner(self.context, address_list)
1692

    
1693
  @locking.ssynchronized(_LOCK)
1694
  @_RequireOpenQueue
1695
  def AddNode(self, node):
1696
    """Register a new node with the queue.
1697

1698
    @type node: L{objects.Node}
1699
    @param node: the node object to be added
1700

1701
    """
1702
    node_name = node.name
1703
    assert node_name != self._my_hostname
1704

    
1705
    # Clean queue directory on added node
1706
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1707
    msg = result.fail_msg
1708
    if msg:
1709
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1710
                      node_name, msg)
1711

    
1712
    if not node.master_candidate:
1713
      # remove if existing, ignoring errors
1714
      self._nodes.pop(node_name, None)
1715
      # and skip the replication of the job ids
1716
      return
1717

    
1718
    # Upload the whole queue excluding archived jobs
1719
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1720

    
1721
    # Upload current serial file
1722
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1723

    
1724
    # Static address list
1725
    addrs = [node.primary_ip]
1726

    
1727
    for file_name in files:
1728
      # Read file content
1729
      content = utils.ReadFile(file_name)
1730

    
1731
      result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name,
1732
                                                        content)
1733
      msg = result[node_name].fail_msg
1734
      if msg:
1735
        logging.error("Failed to upload file %s to node %s: %s",
1736
                      file_name, node_name, msg)
1737

    
1738
    self._nodes[node_name] = node.primary_ip
1739

    
1740
  @locking.ssynchronized(_LOCK)
1741
  @_RequireOpenQueue
1742
  def RemoveNode(self, node_name):
1743
    """Callback called when removing nodes from the cluster.
1744

1745
    @type node_name: str
1746
    @param node_name: the name of the node to remove
1747

1748
    """
1749
    self._nodes.pop(node_name, None)
1750

    
1751
  @staticmethod
1752
  def _CheckRpcResult(result, nodes, failmsg):
1753
    """Verifies the status of an RPC call.
1754

1755
    Since we aim to keep consistency should this node (the current
1756
    master) fail, we will log errors if our rpc fail, and especially
1757
    log the case when more than half of the nodes fails.
1758

1759
    @param result: the data as returned from the rpc call
1760
    @type nodes: list
1761
    @param nodes: the list of nodes we made the call to
1762
    @type failmsg: str
1763
    @param failmsg: the identifier to be used for logging
1764

1765
    """
1766
    failed = []
1767
    success = []
1768

    
1769
    for node in nodes:
1770
      msg = result[node].fail_msg
1771
      if msg:
1772
        failed.append(node)
1773
        logging.error("RPC call %s (%s) failed on node %s: %s",
1774
                      result[node].call, failmsg, node, msg)
1775
      else:
1776
        success.append(node)
1777

    
1778
    # +1 for the master node
1779
    if (len(success) + 1) < len(failed):
1780
      # TODO: Handle failing nodes
1781
      logging.error("More than half of the nodes failed")
1782

    
1783
  def _GetNodeIp(self):
1784
    """Helper for returning the node name/ip list.
1785

1786
    @rtype: (list, list)
1787
    @return: a tuple of two lists, the first one with the node
1788
        names and the second one with the node addresses
1789

1790
    """
1791
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1792
    name_list = self._nodes.keys()
1793
    addr_list = [self._nodes[name] for name in name_list]
1794
    return name_list, addr_list
1795

    
1796
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1797
    """Writes a file locally and then replicates it to all nodes.
1798

1799
    This function will replace the contents of a file on the local
1800
    node and then replicate it to all the other nodes we have.
1801

1802
    @type file_name: str
1803
    @param file_name: the path of the file to be replicated
1804
    @type data: str
1805
    @param data: the new contents of the file
1806
    @type replicate: boolean
1807
    @param replicate: whether to spread the changes to the remote nodes
1808

1809
    """
1810
    getents = runtime.GetEnts()
1811
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1812
                    gid=getents.masterd_gid)
1813

    
1814
    if replicate:
1815
      names, addrs = self._GetNodeIp()
1816
      result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data)
1817
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1818

    
1819
  def _RenameFilesUnlocked(self, rename):
1820
    """Renames a file locally and then replicate the change.
1821

1822
    This function will rename a file in the local queue directory
1823
    and then replicate this rename to all the other nodes we have.
1824

1825
    @type rename: list of (old, new)
1826
    @param rename: List containing tuples mapping old to new names
1827

1828
    """
1829
    # Rename them locally
1830
    for old, new in rename:
1831
      utils.RenameFile(old, new, mkdir=True)
1832

    
1833
    # ... and on all nodes
1834
    names, addrs = self._GetNodeIp()
1835
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1836
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1837

    
1838
  @staticmethod
1839
  def _FormatJobID(job_id):
1840
    """Convert a job ID to string format.
1841

1842
    Currently this just does C{str(job_id)} after performing some
1843
    checks, but if we want to change the job id format this will
1844
    abstract this change.
1845

1846
    @type job_id: int or long
1847
    @param job_id: the numeric job id
1848
    @rtype: str
1849
    @return: the formatted job id
1850

1851
    """
1852
    if not isinstance(job_id, (int, long)):
1853
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1854
    if job_id < 0:
1855
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1856

    
1857
    return str(job_id)
1858

    
1859
  @classmethod
1860
  def _GetArchiveDirectory(cls, job_id):
1861
    """Returns the archive directory for a job.
1862

1863
    @type job_id: str
1864
    @param job_id: Job identifier
1865
    @rtype: str
1866
    @return: Directory name
1867

1868
    """
1869
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1870

    
1871
  def _NewSerialsUnlocked(self, count):
1872
    """Generates a new job identifier.
1873

1874
    Job identifiers are unique during the lifetime of a cluster.
1875

1876
    @type count: integer
1877
    @param count: how many serials to return
1878
    @rtype: str
1879
    @return: a string representing the job identifier.
1880

1881
    """
1882
    assert ht.TPositiveInt(count)
1883

    
1884
    # New number
1885
    serial = self._last_serial + count
1886

    
1887
    # Write to file
1888
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1889
                             "%s\n" % serial, True)
1890

    
1891
    result = [self._FormatJobID(v)
1892
              for v in range(self._last_serial + 1, serial + 1)]
1893

    
1894
    # Keep it only if we were able to write the file
1895
    self._last_serial = serial
1896

    
1897
    assert len(result) == count
1898

    
1899
    return result
1900

    
1901
  @staticmethod
1902
  def _GetJobPath(job_id):
1903
    """Returns the job file for a given job id.
1904

1905
    @type job_id: str
1906
    @param job_id: the job identifier
1907
    @rtype: str
1908
    @return: the path to the job file
1909

1910
    """
1911
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1912

    
1913
  @classmethod
1914
  def _GetArchivedJobPath(cls, job_id):
1915
    """Returns the archived job file for a give job id.
1916

1917
    @type job_id: str
1918
    @param job_id: the job identifier
1919
    @rtype: str
1920
    @return: the path to the archived job file
1921

1922
    """
1923
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1924
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1925

    
1926
  @staticmethod
1927
  def _GetJobIDsUnlocked(sort=True):
1928
    """Return all known job IDs.
1929

1930
    The method only looks at disk because it's a requirement that all
1931
    jobs are present on disk (so in the _memcache we don't have any
1932
    extra IDs).
1933

1934
    @type sort: boolean
1935
    @param sort: perform sorting on the returned job ids
1936
    @rtype: list
1937
    @return: the list of job IDs
1938

1939
    """
1940
    jlist = []
1941
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1942
      m = constants.JOB_FILE_RE.match(filename)
1943
      if m:
1944
        jlist.append(m.group(1))
1945
    if sort:
1946
      jlist = utils.NiceSort(jlist)
1947
    return jlist
1948

    
1949
  def _LoadJobUnlocked(self, job_id):
1950
    """Loads a job from the disk or memory.
1951

1952
    Given a job id, this will return the cached job object if
1953
    existing, or try to load the job from the disk. If loading from
1954
    disk, it will also add the job to the cache.
1955

1956
    @param job_id: the job id
1957
    @rtype: L{_QueuedJob} or None
1958
    @return: either None or the job object
1959

1960
    """
1961
    job = self._memcache.get(job_id, None)
1962
    if job:
1963
      logging.debug("Found job %s in memcache", job_id)
1964
      assert job.writable, "Found read-only job in memcache"
1965
      return job
1966

    
1967
    try:
1968
      job = self._LoadJobFromDisk(job_id, False)
1969
      if job is None:
1970
        return job
1971
    except errors.JobFileCorrupted:
1972
      old_path = self._GetJobPath(job_id)
1973
      new_path = self._GetArchivedJobPath(job_id)
1974
      if old_path == new_path:
1975
        # job already archived (future case)
1976
        logging.exception("Can't parse job %s", job_id)
1977
      else:
1978
        # non-archived case
1979
        logging.exception("Can't parse job %s, will archive.", job_id)
1980
        self._RenameFilesUnlocked([(old_path, new_path)])
1981
      return None
1982

    
1983
    assert job.writable, "Job just loaded is not writable"
1984

    
1985
    self._memcache[job_id] = job
1986
    logging.debug("Added job %s to the cache", job_id)
1987
    return job
1988

    
1989
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1990
    """Load the given job file from disk.
1991

1992
    Given a job file, read, load and restore it in a _QueuedJob format.
1993

1994
    @type job_id: string
1995
    @param job_id: job identifier
1996
    @type try_archived: bool
1997
    @param try_archived: Whether to try loading an archived job
1998
    @rtype: L{_QueuedJob} or None
1999
    @return: either None or the job object
2000

2001
    """
2002
    path_functions = [(self._GetJobPath, True)]
2003

    
2004
    if try_archived:
2005
      path_functions.append((self._GetArchivedJobPath, False))
2006

    
2007
    raw_data = None
2008
    writable_default = None
2009

    
2010
    for (fn, writable_default) in path_functions:
2011
      filepath = fn(job_id)
2012
      logging.debug("Loading job from %s", filepath)
2013
      try:
2014
        raw_data = utils.ReadFile(filepath)
2015
      except EnvironmentError, err:
2016
        if err.errno != errno.ENOENT:
2017
          raise
2018
      else:
2019
        break
2020

    
2021
    if not raw_data:
2022
      return None
2023

    
2024
    if writable is None:
2025
      writable = writable_default
2026

    
2027
    try:
2028
      data = serializer.LoadJson(raw_data)
2029
      job = _QueuedJob.Restore(self, data, writable)
2030
    except Exception, err: # pylint: disable=W0703
2031
      raise errors.JobFileCorrupted(err)
2032

    
2033
    return job
2034

    
2035
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2036
    """Load the given job file from disk.
2037

2038
    Given a job file, read, load and restore it in a _QueuedJob format.
2039
    In case of error reading the job, it gets returned as None, and the
2040
    exception is logged.
2041

2042
    @type job_id: string
2043
    @param job_id: job identifier
2044
    @type try_archived: bool
2045
    @param try_archived: Whether to try loading an archived job
2046
    @rtype: L{_QueuedJob} or None
2047
    @return: either None or the job object
2048

2049
    """
2050
    try:
2051
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2052
    except (errors.JobFileCorrupted, EnvironmentError):
2053
      logging.exception("Can't load/parse job %s", job_id)
2054
      return None
2055

    
2056
  def _UpdateQueueSizeUnlocked(self):
2057
    """Update the queue size.
2058

2059
    """
2060
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2061

    
2062
  @locking.ssynchronized(_LOCK)
2063
  @_RequireOpenQueue
2064
  def SetDrainFlag(self, drain_flag):
2065
    """Sets the drain flag for the queue.
2066

2067
    @type drain_flag: boolean
2068
    @param drain_flag: Whether to set or unset the drain flag
2069

2070
    """
2071
    jstore.SetDrainFlag(drain_flag)
2072

    
2073
    self._drained = drain_flag
2074

    
2075
    return True
2076

    
2077
  @_RequireOpenQueue
2078
  def _SubmitJobUnlocked(self, job_id, ops):
2079
    """Create and store a new job.
2080

2081
    This enters the job into our job queue and also puts it on the new
2082
    queue, in order for it to be picked up by the queue processors.
2083

2084
    @type job_id: job ID
2085
    @param job_id: the job ID for the new job
2086
    @type ops: list
2087
    @param ops: The list of OpCodes that will become the new job.
2088
    @rtype: L{_QueuedJob}
2089
    @return: the job object to be queued
2090
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2091
    @raise errors.GenericError: If an opcode is not valid
2092

2093
    """
2094
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2095
      raise errors.JobQueueFull()
2096

    
2097
    job = _QueuedJob(self, job_id, ops, True)
2098

    
2099
    # Check priority
2100
    for idx, op in enumerate(job.ops):
2101
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2102
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2103
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2104
                                  " are %s" % (idx, op.priority, allowed))
2105

    
2106
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2107
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2108
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2109
                                  " match %s: %s" %
2110
                                  (idx, opcodes.TNoRelativeJobDependencies,
2111
                                   dependencies))
2112

    
2113
    # Write to disk
2114
    self.UpdateJobUnlocked(job)
2115

    
2116
    self._queue_size += 1
2117

    
2118
    logging.debug("Adding new job %s to the cache", job_id)
2119
    self._memcache[job_id] = job
2120

    
2121
    return job
2122

    
2123
  @locking.ssynchronized(_LOCK)
2124
  @_RequireOpenQueue
2125
  @_RequireNonDrainedQueue
2126
  def SubmitJob(self, ops):
2127
    """Create and store a new job.
2128

2129
    @see: L{_SubmitJobUnlocked}
2130

2131
    """
2132
    (job_id, ) = self._NewSerialsUnlocked(1)
2133
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2134
    return job_id
2135

    
2136
  @locking.ssynchronized(_LOCK)
2137
  @_RequireOpenQueue
2138
  @_RequireNonDrainedQueue
2139
  def SubmitManyJobs(self, jobs):
2140
    """Create and store multiple jobs.
2141

2142
    @see: L{_SubmitJobUnlocked}
2143

2144
    """
2145
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2146

    
2147
    (results, added_jobs) = \
2148
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2149

    
2150
    self._EnqueueJobsUnlocked(added_jobs)
2151

    
2152
    return results
2153

    
2154
  @staticmethod
2155
  def _FormatSubmitError(msg, ops):
2156
    """Formats errors which occurred while submitting a job.
2157

2158
    """
2159
    return ("%s; opcodes %s" %
2160
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2161

    
2162
  @staticmethod
2163
  def _ResolveJobDependencies(resolve_fn, deps):
2164
    """Resolves relative job IDs in dependencies.
2165

2166
    @type resolve_fn: callable
2167
    @param resolve_fn: Function to resolve a relative job ID
2168
    @type deps: list
2169
    @param deps: Dependencies
2170
    @rtype: list
2171
    @return: Resolved dependencies
2172

2173
    """
2174
    result = []
2175

    
2176
    for (dep_job_id, dep_status) in deps:
2177
      if ht.TRelativeJobId(dep_job_id):
2178
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2179
        try:
2180
          job_id = resolve_fn(dep_job_id)
2181
        except IndexError:
2182
          # Abort
2183
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2184
      else:
2185
        job_id = dep_job_id
2186

    
2187
      result.append((job_id, dep_status))
2188

    
2189
    return (True, result)
2190

    
2191
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2192
    """Create and store multiple jobs.
2193

2194
    @see: L{_SubmitJobUnlocked}
2195

2196
    """
2197
    results = []
2198
    added_jobs = []
2199

    
2200
    def resolve_fn(job_idx, reljobid):
2201
      assert reljobid < 0
2202
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2203

    
2204
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2205
      for op in ops:
2206
        if getattr(op, opcodes.DEPEND_ATTR, None):
2207
          (status, data) = \
2208
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2209
                                         op.depends)
2210
          if not status:
2211
            # Abort resolving dependencies
2212
            assert ht.TNonEmptyString(data), "No error message"
2213
            break
2214
          # Use resolved dependencies
2215
          op.depends = data
2216
      else:
2217
        try:
2218
          job = self._SubmitJobUnlocked(job_id, ops)
2219
        except errors.GenericError, err:
2220
          status = False
2221
          data = self._FormatSubmitError(str(err), ops)
2222
        else:
2223
          status = True
2224
          data = job_id
2225
          added_jobs.append(job)
2226

    
2227
      results.append((status, data))
2228

    
2229
    return (results, added_jobs)
2230

    
2231
  @locking.ssynchronized(_LOCK)
2232
  def _EnqueueJobs(self, jobs):
2233
    """Helper function to add jobs to worker pool's queue.
2234

2235
    @type jobs: list
2236
    @param jobs: List of all jobs
2237

2238
    """
2239
    return self._EnqueueJobsUnlocked(jobs)
2240

    
2241
  def _EnqueueJobsUnlocked(self, jobs):
2242
    """Helper function to add jobs to worker pool's queue.
2243

2244
    @type jobs: list
2245
    @param jobs: List of all jobs
2246

2247
    """
2248
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2249
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2250
                             priority=[job.CalcPriority() for job in jobs])
2251

    
2252
  def _GetJobStatusForDependencies(self, job_id):
2253
    """Gets the status of a job for dependencies.
2254

2255
    @type job_id: string
2256
    @param job_id: Job ID
2257
    @raise errors.JobLost: If job can't be found
2258

2259
    """
2260
    if not isinstance(job_id, basestring):
2261
      job_id = self._FormatJobID(job_id)
2262

    
2263
    # Not using in-memory cache as doing so would require an exclusive lock
2264

    
2265
    # Try to load from disk
2266
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2267

    
2268
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2269

    
2270
    if job:
2271
      return job.CalcStatus()
2272

    
2273
    raise errors.JobLost("Job %s not found" % job_id)
2274

    
2275
  @_RequireOpenQueue
2276
  def UpdateJobUnlocked(self, job, replicate=True):
2277
    """Update a job's on disk storage.
2278

2279
    After a job has been modified, this function needs to be called in
2280
    order to write the changes to disk and replicate them to the other
2281
    nodes.
2282

2283
    @type job: L{_QueuedJob}
2284
    @param job: the changed job
2285
    @type replicate: boolean
2286
    @param replicate: whether to replicate the change to remote nodes
2287

2288
    """
2289
    if __debug__:
2290
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2291
      assert (finalized ^ (job.end_timestamp is None))
2292
      assert job.writable, "Can't update read-only job"
2293

    
2294
    filename = self._GetJobPath(job.id)
2295
    data = serializer.DumpJson(job.Serialize())
2296
    logging.debug("Writing job %s to %s", job.id, filename)
2297
    self._UpdateJobQueueFile(filename, data, replicate)
2298

    
2299
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2300
                        timeout):
2301
    """Waits for changes in a job.
2302

2303
    @type job_id: string
2304
    @param job_id: Job identifier
2305
    @type fields: list of strings
2306
    @param fields: Which fields to check for changes
2307
    @type prev_job_info: list or None
2308
    @param prev_job_info: Last job information returned
2309
    @type prev_log_serial: int
2310
    @param prev_log_serial: Last job message serial number
2311
    @type timeout: float
2312
    @param timeout: maximum time to wait in seconds
2313
    @rtype: tuple (job info, log entries)
2314
    @return: a tuple of the job information as required via
2315
        the fields parameter, and the log entries as a list
2316

2317
        if the job has not changed and the timeout has expired,
2318
        we instead return a special value,
2319
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2320
        as such by the clients
2321

2322
    """
2323
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2324
                             writable=False)
2325

    
2326
    helper = _WaitForJobChangesHelper()
2327

    
2328
    return helper(self._GetJobPath(job_id), load_fn,
2329
                  fields, prev_job_info, prev_log_serial, timeout)
2330

    
2331
  @locking.ssynchronized(_LOCK)
2332
  @_RequireOpenQueue
2333
  def CancelJob(self, job_id):
2334
    """Cancels a job.
2335

2336
    This will only succeed if the job has not started yet.
2337

2338
    @type job_id: string
2339
    @param job_id: job ID of job to be cancelled.
2340

2341
    """
2342
    logging.info("Cancelling job %s", job_id)
2343

    
2344
    job = self._LoadJobUnlocked(job_id)
2345
    if not job:
2346
      logging.debug("Job %s not found", job_id)
2347
      return (False, "Job %s not found" % job_id)
2348

    
2349
    assert job.writable, "Can't cancel read-only job"
2350

    
2351
    (success, msg) = job.Cancel()
2352

    
2353
    if success:
2354
      # If the job was finalized (e.g. cancelled), this is the final write
2355
      # allowed. The job can be archived anytime.
2356
      self.UpdateJobUnlocked(job)
2357

    
2358
    return (success, msg)
2359

    
2360
  @_RequireOpenQueue
2361
  def _ArchiveJobsUnlocked(self, jobs):
2362
    """Archives jobs.
2363

2364
    @type jobs: list of L{_QueuedJob}
2365
    @param jobs: Job objects
2366
    @rtype: int
2367
    @return: Number of archived jobs
2368

2369
    """
2370
    archive_jobs = []
2371
    rename_files = []
2372
    for job in jobs:
2373
      assert job.writable, "Can't archive read-only job"
2374

    
2375
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2376
        logging.debug("Job %s is not yet done", job.id)
2377
        continue
2378

    
2379
      archive_jobs.append(job)
2380

    
2381
      old = self._GetJobPath(job.id)
2382
      new = self._GetArchivedJobPath(job.id)
2383
      rename_files.append((old, new))
2384

    
2385
    # TODO: What if 1..n files fail to rename?
2386
    self._RenameFilesUnlocked(rename_files)
2387

    
2388
    logging.debug("Successfully archived job(s) %s",
2389
                  utils.CommaJoin(job.id for job in archive_jobs))
2390

    
2391
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2392
    # the files, we update the cached queue size from the filesystem. When we
2393
    # get around to fix the TODO: above, we can use the number of actually
2394
    # archived jobs to fix this.
2395
    self._UpdateQueueSizeUnlocked()
2396
    return len(archive_jobs)
2397

    
2398
  @locking.ssynchronized(_LOCK)
2399
  @_RequireOpenQueue
2400
  def ArchiveJob(self, job_id):
2401
    """Archives a job.
2402

2403
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2404

2405
    @type job_id: string
2406
    @param job_id: Job ID of job to be archived.
2407
    @rtype: bool
2408
    @return: Whether job was archived
2409

2410
    """
2411
    logging.info("Archiving job %s", job_id)
2412

    
2413
    job = self._LoadJobUnlocked(job_id)
2414
    if not job:
2415
      logging.debug("Job %s not found", job_id)
2416
      return False
2417

    
2418
    return self._ArchiveJobsUnlocked([job]) == 1
2419

    
2420
  @locking.ssynchronized(_LOCK)
2421
  @_RequireOpenQueue
2422
  def AutoArchiveJobs(self, age, timeout):
2423
    """Archives all jobs based on age.
2424

2425
    The method will archive all jobs which are older than the age
2426
    parameter. For jobs that don't have an end timestamp, the start
2427
    timestamp will be considered. The special '-1' age will cause
2428
    archival of all jobs (that are not running or queued).
2429

2430
    @type age: int
2431
    @param age: the minimum age in seconds
2432

2433
    """
2434
    logging.info("Archiving jobs with age more than %s seconds", age)
2435

    
2436
    now = time.time()
2437
    end_time = now + timeout
2438
    archived_count = 0
2439
    last_touched = 0
2440

    
2441
    all_job_ids = self._GetJobIDsUnlocked()
2442
    pending = []
2443
    for idx, job_id in enumerate(all_job_ids):
2444
      last_touched = idx + 1
2445

    
2446
      # Not optimal because jobs could be pending
2447
      # TODO: Measure average duration for job archival and take number of
2448
      # pending jobs into account.
2449
      if time.time() > end_time:
2450
        break
2451

    
2452
      # Returns None if the job failed to load
2453
      job = self._LoadJobUnlocked(job_id)
2454
      if job:
2455
        if job.end_timestamp is None:
2456
          if job.start_timestamp is None:
2457
            job_age = job.received_timestamp
2458
          else:
2459
            job_age = job.start_timestamp
2460
        else:
2461
          job_age = job.end_timestamp
2462

    
2463
        if age == -1 or now - job_age[0] > age:
2464
          pending.append(job)
2465

    
2466
          # Archive 10 jobs at a time
2467
          if len(pending) >= 10:
2468
            archived_count += self._ArchiveJobsUnlocked(pending)
2469
            pending = []
2470

    
2471
    if pending:
2472
      archived_count += self._ArchiveJobsUnlocked(pending)
2473

    
2474
    return (archived_count, len(all_job_ids) - last_touched)
2475

    
2476
  def _Query(self, fields, qfilter):
2477
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2478
                       namefield="id")
2479

    
2480
    job_ids = qobj.RequestedNames()
2481

    
2482
    list_all = (job_ids is None)
2483

    
2484
    if list_all:
2485
      # Since files are added to/removed from the queue atomically, there's no
2486
      # risk of getting the job ids in an inconsistent state.
2487
      job_ids = self._GetJobIDsUnlocked()
2488

    
2489
    jobs = []
2490

    
2491
    for job_id in job_ids:
2492
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2493
      if job is not None or not list_all:
2494
        jobs.append((job_id, job))
2495

    
2496
    return (qobj, jobs, list_all)
2497

    
2498
  def QueryJobs(self, fields, qfilter):
2499
    """Returns a list of jobs in queue.
2500

2501
    @type fields: sequence
2502
    @param fields: List of wanted fields
2503
    @type qfilter: None or query2 filter (list)
2504
    @param qfilter: Query filter
2505

2506
    """
2507
    (qobj, ctx, sort_by_name) = self._Query(fields, qfilter)
2508

    
2509
    return query.GetQueryResponse(qobj, ctx, sort_by_name=sort_by_name)
2510

    
2511
  def OldStyleQueryJobs(self, job_ids, fields):
2512
    """Returns a list of jobs in queue.
2513

2514
    @type job_ids: list
2515
    @param job_ids: sequence of job identifiers or None for all
2516
    @type fields: list
2517
    @param fields: names of fields to return
2518
    @rtype: list
2519
    @return: list one element per job, each element being list with
2520
        the requested fields
2521

2522
    """
2523
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2524

    
2525
    (qobj, ctx, sort_by_name) = self._Query(fields, qfilter)
2526

    
2527
    return qobj.OldStyleQuery(ctx, sort_by_name=sort_by_name)
2528

    
2529
  @locking.ssynchronized(_LOCK)
2530
  def PrepareShutdown(self):
2531
    """Prepare to stop the job queue.
2532

2533
    Disables execution of jobs in the workerpool and returns whether there are
2534
    any jobs currently running. If the latter is the case, the job queue is not
2535
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2536
    be called without interfering with any job. Queued and unfinished jobs will
2537
    be resumed next time.
2538

2539
    Once this function has been called no new job submissions will be accepted
2540
    (see L{_RequireNonDrainedQueue}).
2541

2542
    @rtype: bool
2543
    @return: Whether there are any running jobs
2544

2545
    """
2546
    if self._accepting_jobs:
2547
      self._accepting_jobs = False
2548

    
2549
      # Tell worker pool to stop processing pending tasks
2550
      self._wpool.SetActive(False)
2551

    
2552
    return self._wpool.HasRunningTasks()
2553

    
2554
  def AcceptingJobsUnlocked(self):
2555
    """Returns whether jobs are accepted.
2556

2557
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2558
    queue is shutting down.
2559

2560
    @rtype: bool
2561

2562
    """
2563
    return self._accepting_jobs
2564

    
2565
  @locking.ssynchronized(_LOCK)
2566
  @_RequireOpenQueue
2567
  def Shutdown(self):
2568
    """Stops the job queue.
2569

2570
    This shutdowns all the worker threads an closes the queue.
2571

2572
    """
2573
    self._wpool.TerminateWorkers()
2574

    
2575
    self._queue_filelock.Close()
2576
    self._queue_filelock = None