Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ f8a4adfa

History | View | Annotate | Download (69.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import re
35
import time
36
import weakref
37
import threading
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
59
from ganeti import ht
60

    
61

    
62
JOBQUEUE_THREADS = 25
63
JOBS_PER_ARCHIVE_DIRECTORY = 10000
64

    
65
# member lock names to be passed to @ssynchronized decorator
66
_LOCK = "_lock"
67
_QUEUE = "_queue"
68

    
69

    
70
class CancelJob(Exception):
71
  """Special exception to cancel a job.
72

73
  """
74

    
75

    
76
def TimeStampNow():
77
  """Returns the current timestamp.
78

79
  @rtype: tuple
80
  @return: the current time in the (seconds, microseconds) format
81

82
  """
83
  return utils.SplitTime(time.time())
84

    
85

    
86
class _QueuedOpCode(object):
87
  """Encapsulates an opcode object.
88

89
  @ivar log: holds the execution log and consists of tuples
90
  of the form C{(log_serial, timestamp, level, message)}
91
  @ivar input: the OpCode we encapsulate
92
  @ivar status: the current status
93
  @ivar result: the result of the LU execution
94
  @ivar start_timestamp: timestamp for the start of the execution
95
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
96
  @ivar stop_timestamp: timestamp for the end of the execution
97

98
  """
99
  __slots__ = ["input", "status", "result", "log", "priority",
100
               "start_timestamp", "exec_timestamp", "end_timestamp",
101
               "__weakref__"]
102

    
103
  def __init__(self, op):
104
    """Constructor for the _QuededOpCode.
105

106
    @type op: L{opcodes.OpCode}
107
    @param op: the opcode we encapsulate
108

109
    """
110
    self.input = op
111
    self.status = constants.OP_STATUS_QUEUED
112
    self.result = None
113
    self.log = []
114
    self.start_timestamp = None
115
    self.exec_timestamp = None
116
    self.end_timestamp = None
117

    
118
    # Get initial priority (it might change during the lifetime of this opcode)
119
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
120

    
121
  @classmethod
122
  def Restore(cls, state):
123
    """Restore the _QueuedOpCode from the serialized form.
124

125
    @type state: dict
126
    @param state: the serialized state
127
    @rtype: _QueuedOpCode
128
    @return: a new _QueuedOpCode instance
129

130
    """
131
    obj = _QueuedOpCode.__new__(cls)
132
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
133
    obj.status = state["status"]
134
    obj.result = state["result"]
135
    obj.log = state["log"]
136
    obj.start_timestamp = state.get("start_timestamp", None)
137
    obj.exec_timestamp = state.get("exec_timestamp", None)
138
    obj.end_timestamp = state.get("end_timestamp", None)
139
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
140
    return obj
141

    
142
  def Serialize(self):
143
    """Serializes this _QueuedOpCode.
144

145
    @rtype: dict
146
    @return: the dictionary holding the serialized state
147

148
    """
149
    return {
150
      "input": self.input.__getstate__(),
151
      "status": self.status,
152
      "result": self.result,
153
      "log": self.log,
154
      "start_timestamp": self.start_timestamp,
155
      "exec_timestamp": self.exec_timestamp,
156
      "end_timestamp": self.end_timestamp,
157
      "priority": self.priority,
158
      }
159

    
160

    
161
class _QueuedJob(object):
162
  """In-memory job representation.
163

164
  This is what we use to track the user-submitted jobs. Locking must
165
  be taken care of by users of this class.
166

167
  @type queue: L{JobQueue}
168
  @ivar queue: the parent queue
169
  @ivar id: the job ID
170
  @type ops: list
171
  @ivar ops: the list of _QueuedOpCode that constitute the job
172
  @type log_serial: int
173
  @ivar log_serial: holds the index for the next log entry
174
  @ivar received_timestamp: the timestamp for when the job was received
175
  @ivar start_timestmap: the timestamp for start of execution
176
  @ivar end_timestamp: the timestamp for end of execution
177
  @ivar writable: Whether the job is allowed to be modified
178

179
  """
180
  # pylint: disable-msg=W0212
181
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
182
               "received_timestamp", "start_timestamp", "end_timestamp",
183
               "__weakref__", "processor_lock", "writable"]
184

    
185
  def __init__(self, queue, job_id, ops, writable):
186
    """Constructor for the _QueuedJob.
187

188
    @type queue: L{JobQueue}
189
    @param queue: our parent queue
190
    @type job_id: job_id
191
    @param job_id: our job id
192
    @type ops: list
193
    @param ops: the list of opcodes we hold, which will be encapsulated
194
        in _QueuedOpCodes
195
    @type writable: bool
196
    @param writable: Whether job can be modified
197

198
    """
199
    if not ops:
200
      raise errors.GenericError("A job needs at least one opcode")
201

    
202
    self.queue = queue
203
    self.id = job_id
204
    self.ops = [_QueuedOpCode(op) for op in ops]
205
    self.log_serial = 0
206
    self.received_timestamp = TimeStampNow()
207
    self.start_timestamp = None
208
    self.end_timestamp = None
209

    
210
    self._InitInMemory(self, writable)
211

    
212
  @staticmethod
213
  def _InitInMemory(obj, writable):
214
    """Initializes in-memory variables.
215

216
    """
217
    obj.writable = writable
218
    obj.ops_iter = None
219
    obj.cur_opctx = None
220

    
221
    # Read-only jobs are not processed and therefore don't need a lock
222
    if writable:
223
      obj.processor_lock = threading.Lock()
224
    else:
225
      obj.processor_lock = None
226

    
227
  def __repr__(self):
228
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
229
              "id=%s" % self.id,
230
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
231

    
232
    return "<%s at %#x>" % (" ".join(status), id(self))
233

    
234
  @classmethod
235
  def Restore(cls, queue, state, writable):
236
    """Restore a _QueuedJob from serialized state:
237

238
    @type queue: L{JobQueue}
239
    @param queue: to which queue the restored job belongs
240
    @type state: dict
241
    @param state: the serialized state
242
    @type writable: bool
243
    @param writable: Whether job can be modified
244
    @rtype: _JobQueue
245
    @return: the restored _JobQueue instance
246

247
    """
248
    obj = _QueuedJob.__new__(cls)
249
    obj.queue = queue
250
    obj.id = state["id"]
251
    obj.received_timestamp = state.get("received_timestamp", None)
252
    obj.start_timestamp = state.get("start_timestamp", None)
253
    obj.end_timestamp = state.get("end_timestamp", None)
254

    
255
    obj.ops = []
256
    obj.log_serial = 0
257
    for op_state in state["ops"]:
258
      op = _QueuedOpCode.Restore(op_state)
259
      for log_entry in op.log:
260
        obj.log_serial = max(obj.log_serial, log_entry[0])
261
      obj.ops.append(op)
262

    
263
    cls._InitInMemory(obj, writable)
264

    
265
    return obj
266

    
267
  def Serialize(self):
268
    """Serialize the _JobQueue instance.
269

270
    @rtype: dict
271
    @return: the serialized state
272

273
    """
274
    return {
275
      "id": self.id,
276
      "ops": [op.Serialize() for op in self.ops],
277
      "start_timestamp": self.start_timestamp,
278
      "end_timestamp": self.end_timestamp,
279
      "received_timestamp": self.received_timestamp,
280
      }
281

    
282
  def CalcStatus(self):
283
    """Compute the status of this job.
284

285
    This function iterates over all the _QueuedOpCodes in the job and
286
    based on their status, computes the job status.
287

288
    The algorithm is:
289
      - if we find a cancelled, or finished with error, the job
290
        status will be the same
291
      - otherwise, the last opcode with the status one of:
292
          - waitlock
293
          - canceling
294
          - running
295

296
        will determine the job status
297

298
      - otherwise, it means either all opcodes are queued, or success,
299
        and the job status will be the same
300

301
    @return: the job status
302

303
    """
304
    status = constants.JOB_STATUS_QUEUED
305

    
306
    all_success = True
307
    for op in self.ops:
308
      if op.status == constants.OP_STATUS_SUCCESS:
309
        continue
310

    
311
      all_success = False
312

    
313
      if op.status == constants.OP_STATUS_QUEUED:
314
        pass
315
      elif op.status == constants.OP_STATUS_WAITLOCK:
316
        status = constants.JOB_STATUS_WAITLOCK
317
      elif op.status == constants.OP_STATUS_RUNNING:
318
        status = constants.JOB_STATUS_RUNNING
319
      elif op.status == constants.OP_STATUS_CANCELING:
320
        status = constants.JOB_STATUS_CANCELING
321
        break
322
      elif op.status == constants.OP_STATUS_ERROR:
323
        status = constants.JOB_STATUS_ERROR
324
        # The whole job fails if one opcode failed
325
        break
326
      elif op.status == constants.OP_STATUS_CANCELED:
327
        status = constants.OP_STATUS_CANCELED
328
        break
329

    
330
    if all_success:
331
      status = constants.JOB_STATUS_SUCCESS
332

    
333
    return status
334

    
335
  def CalcPriority(self):
336
    """Gets the current priority for this job.
337

338
    Only unfinished opcodes are considered. When all are done, the default
339
    priority is used.
340

341
    @rtype: int
342

343
    """
344
    priorities = [op.priority for op in self.ops
345
                  if op.status not in constants.OPS_FINALIZED]
346

    
347
    if not priorities:
348
      # All opcodes are done, assume default priority
349
      return constants.OP_PRIO_DEFAULT
350

    
351
    return min(priorities)
352

    
353
  def GetLogEntries(self, newer_than):
354
    """Selectively returns the log entries.
355

356
    @type newer_than: None or int
357
    @param newer_than: if this is None, return all log entries,
358
        otherwise return only the log entries with serial higher
359
        than this value
360
    @rtype: list
361
    @return: the list of the log entries selected
362

363
    """
364
    if newer_than is None:
365
      serial = -1
366
    else:
367
      serial = newer_than
368

    
369
    entries = []
370
    for op in self.ops:
371
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
372

    
373
    return entries
374

    
375
  def GetInfo(self, fields):
376
    """Returns information about a job.
377

378
    @type fields: list
379
    @param fields: names of fields to return
380
    @rtype: list
381
    @return: list with one element for each field
382
    @raise errors.OpExecError: when an invalid field
383
        has been passed
384

385
    """
386
    row = []
387
    for fname in fields:
388
      if fname == "id":
389
        row.append(self.id)
390
      elif fname == "status":
391
        row.append(self.CalcStatus())
392
      elif fname == "priority":
393
        row.append(self.CalcPriority())
394
      elif fname == "ops":
395
        row.append([op.input.__getstate__() for op in self.ops])
396
      elif fname == "opresult":
397
        row.append([op.result for op in self.ops])
398
      elif fname == "opstatus":
399
        row.append([op.status for op in self.ops])
400
      elif fname == "oplog":
401
        row.append([op.log for op in self.ops])
402
      elif fname == "opstart":
403
        row.append([op.start_timestamp for op in self.ops])
404
      elif fname == "opexec":
405
        row.append([op.exec_timestamp for op in self.ops])
406
      elif fname == "opend":
407
        row.append([op.end_timestamp for op in self.ops])
408
      elif fname == "oppriority":
409
        row.append([op.priority for op in self.ops])
410
      elif fname == "received_ts":
411
        row.append(self.received_timestamp)
412
      elif fname == "start_ts":
413
        row.append(self.start_timestamp)
414
      elif fname == "end_ts":
415
        row.append(self.end_timestamp)
416
      elif fname == "summary":
417
        row.append([op.input.Summary() for op in self.ops])
418
      else:
419
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
420
    return row
421

    
422
  def MarkUnfinishedOps(self, status, result):
423
    """Mark unfinished opcodes with a given status and result.
424

425
    This is an utility function for marking all running or waiting to
426
    be run opcodes with a given status. Opcodes which are already
427
    finalised are not changed.
428

429
    @param status: a given opcode status
430
    @param result: the opcode result
431

432
    """
433
    not_marked = True
434
    for op in self.ops:
435
      if op.status in constants.OPS_FINALIZED:
436
        assert not_marked, "Finalized opcodes found after non-finalized ones"
437
        continue
438
      op.status = status
439
      op.result = result
440
      not_marked = False
441

    
442
  def Finalize(self):
443
    """Marks the job as finalized.
444

445
    """
446
    self.end_timestamp = TimeStampNow()
447

    
448
  def Cancel(self):
449
    """Marks job as canceled/-ing if possible.
450

451
    @rtype: tuple; (bool, string)
452
    @return: Boolean describing whether job was successfully canceled or marked
453
      as canceling and a text message
454

455
    """
456
    status = self.CalcStatus()
457

    
458
    if status == constants.JOB_STATUS_QUEUED:
459
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
460
                             "Job canceled by request")
461
      self.Finalize()
462
      return (True, "Job %s canceled" % self.id)
463

    
464
    elif status == constants.JOB_STATUS_WAITLOCK:
465
      # The worker will notice the new status and cancel the job
466
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
467
      return (True, "Job %s will be canceled" % self.id)
468

    
469
    else:
470
      logging.debug("Job %s is no longer waiting in the queue", self.id)
471
      return (False, "Job %s is no longer waiting in the queue" % self.id)
472

    
473

    
474
class _OpExecCallbacks(mcpu.OpExecCbBase):
475
  def __init__(self, queue, job, op):
476
    """Initializes this class.
477

478
    @type queue: L{JobQueue}
479
    @param queue: Job queue
480
    @type job: L{_QueuedJob}
481
    @param job: Job object
482
    @type op: L{_QueuedOpCode}
483
    @param op: OpCode
484

485
    """
486
    assert queue, "Queue is missing"
487
    assert job, "Job is missing"
488
    assert op, "Opcode is missing"
489

    
490
    self._queue = queue
491
    self._job = job
492
    self._op = op
493

    
494
  def _CheckCancel(self):
495
    """Raises an exception to cancel the job if asked to.
496

497
    """
498
    # Cancel here if we were asked to
499
    if self._op.status == constants.OP_STATUS_CANCELING:
500
      logging.debug("Canceling opcode")
501
      raise CancelJob()
502

    
503
  @locking.ssynchronized(_QUEUE, shared=1)
504
  def NotifyStart(self):
505
    """Mark the opcode as running, not lock-waiting.
506

507
    This is called from the mcpu code as a notifier function, when the LU is
508
    finally about to start the Exec() method. Of course, to have end-user
509
    visible results, the opcode must be initially (before calling into
510
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
511

512
    """
513
    assert self._op in self._job.ops
514
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
515
                               constants.OP_STATUS_CANCELING)
516

    
517
    # Cancel here if we were asked to
518
    self._CheckCancel()
519

    
520
    logging.debug("Opcode is now running")
521

    
522
    self._op.status = constants.OP_STATUS_RUNNING
523
    self._op.exec_timestamp = TimeStampNow()
524

    
525
    # And finally replicate the job status
526
    self._queue.UpdateJobUnlocked(self._job)
527

    
528
  @locking.ssynchronized(_QUEUE, shared=1)
529
  def _AppendFeedback(self, timestamp, log_type, log_msg):
530
    """Internal feedback append function, with locks
531

532
    """
533
    self._job.log_serial += 1
534
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
535
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
536

    
537
  def Feedback(self, *args):
538
    """Append a log entry.
539

540
    """
541
    assert len(args) < 3
542

    
543
    if len(args) == 1:
544
      log_type = constants.ELOG_MESSAGE
545
      log_msg = args[0]
546
    else:
547
      (log_type, log_msg) = args
548

    
549
    # The time is split to make serialization easier and not lose
550
    # precision.
551
    timestamp = utils.SplitTime(time.time())
552
    self._AppendFeedback(timestamp, log_type, log_msg)
553

    
554
  def CheckCancel(self):
555
    """Check whether job has been cancelled.
556

557
    """
558
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
559
                               constants.OP_STATUS_CANCELING)
560

    
561
    # Cancel here if we were asked to
562
    self._CheckCancel()
563

    
564
  def SubmitManyJobs(self, jobs):
565
    """Submits jobs for processing.
566

567
    See L{JobQueue.SubmitManyJobs}.
568

569
    """
570
    # Locking is done in job queue
571
    return self._queue.SubmitManyJobs(jobs)
572

    
573

    
574
class _JobChangesChecker(object):
575
  def __init__(self, fields, prev_job_info, prev_log_serial):
576
    """Initializes this class.
577

578
    @type fields: list of strings
579
    @param fields: Fields requested by LUXI client
580
    @type prev_job_info: string
581
    @param prev_job_info: previous job info, as passed by the LUXI client
582
    @type prev_log_serial: string
583
    @param prev_log_serial: previous job serial, as passed by the LUXI client
584

585
    """
586
    self._fields = fields
587
    self._prev_job_info = prev_job_info
588
    self._prev_log_serial = prev_log_serial
589

    
590
  def __call__(self, job):
591
    """Checks whether job has changed.
592

593
    @type job: L{_QueuedJob}
594
    @param job: Job object
595

596
    """
597
    assert not job.writable, "Expected read-only job"
598

    
599
    status = job.CalcStatus()
600
    job_info = job.GetInfo(self._fields)
601
    log_entries = job.GetLogEntries(self._prev_log_serial)
602

    
603
    # Serializing and deserializing data can cause type changes (e.g. from
604
    # tuple to list) or precision loss. We're doing it here so that we get
605
    # the same modifications as the data received from the client. Without
606
    # this, the comparison afterwards might fail without the data being
607
    # significantly different.
608
    # TODO: we just deserialized from disk, investigate how to make sure that
609
    # the job info and log entries are compatible to avoid this further step.
610
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
611
    # efficient, though floats will be tricky
612
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
613
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
614

    
615
    # Don't even try to wait if the job is no longer running, there will be
616
    # no changes.
617
    if (status not in (constants.JOB_STATUS_QUEUED,
618
                       constants.JOB_STATUS_RUNNING,
619
                       constants.JOB_STATUS_WAITLOCK) or
620
        job_info != self._prev_job_info or
621
        (log_entries and self._prev_log_serial != log_entries[0][0])):
622
      logging.debug("Job %s changed", job.id)
623
      return (job_info, log_entries)
624

    
625
    return None
626

    
627

    
628
class _JobFileChangesWaiter(object):
629
  def __init__(self, filename):
630
    """Initializes this class.
631

632
    @type filename: string
633
    @param filename: Path to job file
634
    @raises errors.InotifyError: if the notifier cannot be setup
635

636
    """
637
    self._wm = pyinotify.WatchManager()
638
    self._inotify_handler = \
639
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
640
    self._notifier = \
641
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
642
    try:
643
      self._inotify_handler.enable()
644
    except Exception:
645
      # pyinotify doesn't close file descriptors automatically
646
      self._notifier.stop()
647
      raise
648

    
649
  def _OnInotify(self, notifier_enabled):
650
    """Callback for inotify.
651

652
    """
653
    if not notifier_enabled:
654
      self._inotify_handler.enable()
655

    
656
  def Wait(self, timeout):
657
    """Waits for the job file to change.
658

659
    @type timeout: float
660
    @param timeout: Timeout in seconds
661
    @return: Whether there have been events
662

663
    """
664
    assert timeout >= 0
665
    have_events = self._notifier.check_events(timeout * 1000)
666
    if have_events:
667
      self._notifier.read_events()
668
    self._notifier.process_events()
669
    return have_events
670

    
671
  def Close(self):
672
    """Closes underlying notifier and its file descriptor.
673

674
    """
675
    self._notifier.stop()
676

    
677

    
678
class _JobChangesWaiter(object):
679
  def __init__(self, filename):
680
    """Initializes this class.
681

682
    @type filename: string
683
    @param filename: Path to job file
684

685
    """
686
    self._filewaiter = None
687
    self._filename = filename
688

    
689
  def Wait(self, timeout):
690
    """Waits for a job to change.
691

692
    @type timeout: float
693
    @param timeout: Timeout in seconds
694
    @return: Whether there have been events
695

696
    """
697
    if self._filewaiter:
698
      return self._filewaiter.Wait(timeout)
699

    
700
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
701
    # If this point is reached, return immediately and let caller check the job
702
    # file again in case there were changes since the last check. This avoids a
703
    # race condition.
704
    self._filewaiter = _JobFileChangesWaiter(self._filename)
705

    
706
    return True
707

    
708
  def Close(self):
709
    """Closes underlying waiter.
710

711
    """
712
    if self._filewaiter:
713
      self._filewaiter.Close()
714

    
715

    
716
class _WaitForJobChangesHelper(object):
717
  """Helper class using inotify to wait for changes in a job file.
718

719
  This class takes a previous job status and serial, and alerts the client when
720
  the current job status has changed.
721

722
  """
723
  @staticmethod
724
  def _CheckForChanges(job_load_fn, check_fn):
725
    job = job_load_fn()
726
    if not job:
727
      raise errors.JobLost()
728

    
729
    result = check_fn(job)
730
    if result is None:
731
      raise utils.RetryAgain()
732

    
733
    return result
734

    
735
  def __call__(self, filename, job_load_fn,
736
               fields, prev_job_info, prev_log_serial, timeout):
737
    """Waits for changes on a job.
738

739
    @type filename: string
740
    @param filename: File on which to wait for changes
741
    @type job_load_fn: callable
742
    @param job_load_fn: Function to load job
743
    @type fields: list of strings
744
    @param fields: Which fields to check for changes
745
    @type prev_job_info: list or None
746
    @param prev_job_info: Last job information returned
747
    @type prev_log_serial: int
748
    @param prev_log_serial: Last job message serial number
749
    @type timeout: float
750
    @param timeout: maximum time to wait in seconds
751

752
    """
753
    try:
754
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
755
      waiter = _JobChangesWaiter(filename)
756
      try:
757
        return utils.Retry(compat.partial(self._CheckForChanges,
758
                                          job_load_fn, check_fn),
759
                           utils.RETRY_REMAINING_TIME, timeout,
760
                           wait_fn=waiter.Wait)
761
      finally:
762
        waiter.Close()
763
    except (errors.InotifyError, errors.JobLost):
764
      return None
765
    except utils.RetryTimeout:
766
      return constants.JOB_NOTCHANGED
767

    
768

    
769
def _EncodeOpError(err):
770
  """Encodes an error which occurred while processing an opcode.
771

772
  """
773
  if isinstance(err, errors.GenericError):
774
    to_encode = err
775
  else:
776
    to_encode = errors.OpExecError(str(err))
777

    
778
  return errors.EncodeException(to_encode)
779

    
780

    
781
class _TimeoutStrategyWrapper:
782
  def __init__(self, fn):
783
    """Initializes this class.
784

785
    """
786
    self._fn = fn
787
    self._next = None
788

    
789
  def _Advance(self):
790
    """Gets the next timeout if necessary.
791

792
    """
793
    if self._next is None:
794
      self._next = self._fn()
795

    
796
  def Peek(self):
797
    """Returns the next timeout.
798

799
    """
800
    self._Advance()
801
    return self._next
802

    
803
  def Next(self):
804
    """Returns the current timeout and advances the internal state.
805

806
    """
807
    self._Advance()
808
    result = self._next
809
    self._next = None
810
    return result
811

    
812

    
813
class _OpExecContext:
814
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
815
    """Initializes this class.
816

817
    """
818
    self.op = op
819
    self.index = index
820
    self.log_prefix = log_prefix
821
    self.summary = op.input.Summary()
822

    
823
    # Create local copy to modify
824
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
825
      self.jobdeps = op.input.depends[:]
826
    else:
827
      self.jobdeps = None
828

    
829
    self._timeout_strategy_factory = timeout_strategy_factory
830
    self._ResetTimeoutStrategy()
831

    
832
  def _ResetTimeoutStrategy(self):
833
    """Creates a new timeout strategy.
834

835
    """
836
    self._timeout_strategy = \
837
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
838

    
839
  def CheckPriorityIncrease(self):
840
    """Checks whether priority can and should be increased.
841

842
    Called when locks couldn't be acquired.
843

844
    """
845
    op = self.op
846

    
847
    # Exhausted all retries and next round should not use blocking acquire
848
    # for locks?
849
    if (self._timeout_strategy.Peek() is None and
850
        op.priority > constants.OP_PRIO_HIGHEST):
851
      logging.debug("Increasing priority")
852
      op.priority -= 1
853
      self._ResetTimeoutStrategy()
854
      return True
855

    
856
    return False
857

    
858
  def GetNextLockTimeout(self):
859
    """Returns the next lock acquire timeout.
860

861
    """
862
    return self._timeout_strategy.Next()
863

    
864

    
865
class _JobProcessor(object):
866
  def __init__(self, queue, opexec_fn, job,
867
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
868
    """Initializes this class.
869

870
    """
871
    self.queue = queue
872
    self.opexec_fn = opexec_fn
873
    self.job = job
874
    self._timeout_strategy_factory = _timeout_strategy_factory
875

    
876
  @staticmethod
877
  def _FindNextOpcode(job, timeout_strategy_factory):
878
    """Locates the next opcode to run.
879

880
    @type job: L{_QueuedJob}
881
    @param job: Job object
882
    @param timeout_strategy_factory: Callable to create new timeout strategy
883

884
    """
885
    # Create some sort of a cache to speed up locating next opcode for future
886
    # lookups
887
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
888
    # pending and one for processed ops.
889
    if job.ops_iter is None:
890
      job.ops_iter = enumerate(job.ops)
891

    
892
    # Find next opcode to run
893
    while True:
894
      try:
895
        (idx, op) = job.ops_iter.next()
896
      except StopIteration:
897
        raise errors.ProgrammerError("Called for a finished job")
898

    
899
      if op.status == constants.OP_STATUS_RUNNING:
900
        # Found an opcode already marked as running
901
        raise errors.ProgrammerError("Called for job marked as running")
902

    
903
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
904
                             timeout_strategy_factory)
905

    
906
      if op.status not in constants.OPS_FINALIZED:
907
        return opctx
908

    
909
      # This is a job that was partially completed before master daemon
910
      # shutdown, so it can be expected that some opcodes are already
911
      # completed successfully (if any did error out, then the whole job
912
      # should have been aborted and not resubmitted for processing).
913
      logging.info("%s: opcode %s already processed, skipping",
914
                   opctx.log_prefix, opctx.summary)
915

    
916
  @staticmethod
917
  def _MarkWaitlock(job, op):
918
    """Marks an opcode as waiting for locks.
919

920
    The job's start timestamp is also set if necessary.
921

922
    @type job: L{_QueuedJob}
923
    @param job: Job object
924
    @type op: L{_QueuedOpCode}
925
    @param op: Opcode object
926

927
    """
928
    assert op in job.ops
929
    assert op.status in (constants.OP_STATUS_QUEUED,
930
                         constants.OP_STATUS_WAITLOCK)
931

    
932
    update = False
933

    
934
    op.result = None
935

    
936
    if op.status == constants.OP_STATUS_QUEUED:
937
      op.status = constants.OP_STATUS_WAITLOCK
938
      update = True
939

    
940
    if op.start_timestamp is None:
941
      op.start_timestamp = TimeStampNow()
942
      update = True
943

    
944
    if job.start_timestamp is None:
945
      job.start_timestamp = op.start_timestamp
946
      update = True
947

    
948
    assert op.status == constants.OP_STATUS_WAITLOCK
949

    
950
    return update
951

    
952
  @staticmethod
953
  def _CheckDependencies(queue, job, opctx):
954
    """Checks if an opcode has dependencies and if so, processes them.
955

956
    @type queue: L{JobQueue}
957
    @param queue: Queue object
958
    @type job: L{_QueuedJob}
959
    @param job: Job object
960
    @type opctx: L{_OpExecContext}
961
    @param opctx: Opcode execution context
962
    @rtype: bool
963
    @return: Whether opcode will be re-scheduled by dependency tracker
964

965
    """
966
    op = opctx.op
967

    
968
    result = False
969

    
970
    while opctx.jobdeps:
971
      (dep_job_id, dep_status) = opctx.jobdeps[0]
972

    
973
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
974
                                                          dep_status)
975
      assert ht.TNonEmptyString(depmsg), "No dependency message"
976

    
977
      logging.info("%s: %s", opctx.log_prefix, depmsg)
978

    
979
      if depresult == _JobDependencyManager.CONTINUE:
980
        # Remove dependency and continue
981
        opctx.jobdeps.pop(0)
982

    
983
      elif depresult == _JobDependencyManager.WAIT:
984
        # Need to wait for notification, dependency tracker will re-add job
985
        # to workerpool
986
        result = True
987
        break
988

    
989
      elif depresult == _JobDependencyManager.CANCEL:
990
        # Job was cancelled, cancel this job as well
991
        job.Cancel()
992
        assert op.status == constants.OP_STATUS_CANCELING
993
        break
994

    
995
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
996
                         _JobDependencyManager.ERROR):
997
        # Job failed or there was an error, this job must fail
998
        op.status = constants.OP_STATUS_ERROR
999
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1000
        break
1001

    
1002
      else:
1003
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1004
                                     depresult)
1005

    
1006
    return result
1007

    
1008
  def _ExecOpCodeUnlocked(self, opctx):
1009
    """Processes one opcode and returns the result.
1010

1011
    """
1012
    op = opctx.op
1013

    
1014
    assert op.status == constants.OP_STATUS_WAITLOCK
1015

    
1016
    timeout = opctx.GetNextLockTimeout()
1017

    
1018
    try:
1019
      # Make sure not to hold queue lock while calling ExecOpCode
1020
      result = self.opexec_fn(op.input,
1021
                              _OpExecCallbacks(self.queue, self.job, op),
1022
                              timeout=timeout, priority=op.priority)
1023
    except mcpu.LockAcquireTimeout:
1024
      assert timeout is not None, "Received timeout for blocking acquire"
1025
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1026

    
1027
      assert op.status in (constants.OP_STATUS_WAITLOCK,
1028
                           constants.OP_STATUS_CANCELING)
1029

    
1030
      # Was job cancelled while we were waiting for the lock?
1031
      if op.status == constants.OP_STATUS_CANCELING:
1032
        return (constants.OP_STATUS_CANCELING, None)
1033

    
1034
      # Stay in waitlock while trying to re-acquire lock
1035
      return (constants.OP_STATUS_WAITLOCK, None)
1036
    except CancelJob:
1037
      logging.exception("%s: Canceling job", opctx.log_prefix)
1038
      assert op.status == constants.OP_STATUS_CANCELING
1039
      return (constants.OP_STATUS_CANCELING, None)
1040
    except Exception, err: # pylint: disable-msg=W0703
1041
      logging.exception("%s: Caught exception in %s",
1042
                        opctx.log_prefix, opctx.summary)
1043
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1044
    else:
1045
      logging.debug("%s: %s successful",
1046
                    opctx.log_prefix, opctx.summary)
1047
      return (constants.OP_STATUS_SUCCESS, result)
1048

    
1049
  def __call__(self, _nextop_fn=None):
1050
    """Continues execution of a job.
1051

1052
    @param _nextop_fn: Callback function for tests
1053
    @rtype: bool
1054
    @return: True if job is finished, False if processor needs to be called
1055
             again
1056

1057
    """
1058
    queue = self.queue
1059
    job = self.job
1060

    
1061
    logging.debug("Processing job %s", job.id)
1062

    
1063
    queue.acquire(shared=1)
1064
    try:
1065
      opcount = len(job.ops)
1066

    
1067
      assert job.writable, "Expected writable job"
1068

    
1069
      # Don't do anything for finalized jobs
1070
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1071
        return True
1072

    
1073
      # Is a previous opcode still pending?
1074
      if job.cur_opctx:
1075
        opctx = job.cur_opctx
1076
        job.cur_opctx = None
1077
      else:
1078
        if __debug__ and _nextop_fn:
1079
          _nextop_fn()
1080
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1081

    
1082
      op = opctx.op
1083

    
1084
      # Consistency check
1085
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1086
                                     constants.OP_STATUS_CANCELING)
1087
                        for i in job.ops[opctx.index + 1:])
1088

    
1089
      assert op.status in (constants.OP_STATUS_QUEUED,
1090
                           constants.OP_STATUS_WAITLOCK,
1091
                           constants.OP_STATUS_CANCELING)
1092

    
1093
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1094
              op.priority >= constants.OP_PRIO_HIGHEST)
1095

    
1096
      waitjob = None
1097

    
1098
      if op.status != constants.OP_STATUS_CANCELING:
1099
        assert op.status in (constants.OP_STATUS_QUEUED,
1100
                             constants.OP_STATUS_WAITLOCK)
1101

    
1102
        # Prepare to start opcode
1103
        if self._MarkWaitlock(job, op):
1104
          # Write to disk
1105
          queue.UpdateJobUnlocked(job)
1106

    
1107
        assert op.status == constants.OP_STATUS_WAITLOCK
1108
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1109
        assert job.start_timestamp and op.start_timestamp
1110
        assert waitjob is None
1111

    
1112
        # Check if waiting for a job is necessary
1113
        waitjob = self._CheckDependencies(queue, job, opctx)
1114

    
1115
        assert op.status in (constants.OP_STATUS_WAITLOCK,
1116
                             constants.OP_STATUS_CANCELING,
1117
                             constants.OP_STATUS_ERROR)
1118

    
1119
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1120
                                         constants.OP_STATUS_ERROR)):
1121
          logging.info("%s: opcode %s waiting for locks",
1122
                       opctx.log_prefix, opctx.summary)
1123

    
1124
          assert not opctx.jobdeps, "Not all dependencies were removed"
1125

    
1126
          queue.release()
1127
          try:
1128
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1129
          finally:
1130
            queue.acquire(shared=1)
1131

    
1132
          op.status = op_status
1133
          op.result = op_result
1134

    
1135
          assert not waitjob
1136

    
1137
        if op.status == constants.OP_STATUS_WAITLOCK:
1138
          # Couldn't get locks in time
1139
          assert not op.end_timestamp
1140
        else:
1141
          # Finalize opcode
1142
          op.end_timestamp = TimeStampNow()
1143

    
1144
          if op.status == constants.OP_STATUS_CANCELING:
1145
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1146
                                  for i in job.ops[opctx.index:])
1147
          else:
1148
            assert op.status in constants.OPS_FINALIZED
1149

    
1150
      if op.status == constants.OP_STATUS_WAITLOCK or waitjob:
1151
        finalize = False
1152

    
1153
        if not waitjob and opctx.CheckPriorityIncrease():
1154
          # Priority was changed, need to update on-disk file
1155
          queue.UpdateJobUnlocked(job)
1156

    
1157
        # Keep around for another round
1158
        job.cur_opctx = opctx
1159

    
1160
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1161
                op.priority >= constants.OP_PRIO_HIGHEST)
1162

    
1163
        # In no case must the status be finalized here
1164
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1165

    
1166
      else:
1167
        # Ensure all opcodes so far have been successful
1168
        assert (opctx.index == 0 or
1169
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1170
                           for i in job.ops[:opctx.index]))
1171

    
1172
        # Reset context
1173
        job.cur_opctx = None
1174

    
1175
        if op.status == constants.OP_STATUS_SUCCESS:
1176
          finalize = False
1177

    
1178
        elif op.status == constants.OP_STATUS_ERROR:
1179
          # Ensure failed opcode has an exception as its result
1180
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1181

    
1182
          to_encode = errors.OpExecError("Preceding opcode failed")
1183
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1184
                                _EncodeOpError(to_encode))
1185
          finalize = True
1186

    
1187
          # Consistency check
1188
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1189
                            errors.GetEncodedError(i.result)
1190
                            for i in job.ops[opctx.index:])
1191

    
1192
        elif op.status == constants.OP_STATUS_CANCELING:
1193
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1194
                                "Job canceled by request")
1195
          finalize = True
1196

    
1197
        else:
1198
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1199

    
1200
        if opctx.index == (opcount - 1):
1201
          # Finalize on last opcode
1202
          finalize = True
1203

    
1204
        if finalize:
1205
          # All opcodes have been run, finalize job
1206
          job.Finalize()
1207

    
1208
        # Write to disk. If the job status is final, this is the final write
1209
        # allowed. Once the file has been written, it can be archived anytime.
1210
        queue.UpdateJobUnlocked(job)
1211

    
1212
        assert not waitjob
1213

    
1214
        if finalize:
1215
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1216
          # TODO: Check locking
1217
          queue.depmgr.NotifyWaiters(job.id)
1218
          return True
1219

    
1220
      assert not waitjob or queue.depmgr.JobWaiting(job)
1221

    
1222
      return bool(waitjob)
1223
    finally:
1224
      assert job.writable, "Job became read-only while being processed"
1225
      queue.release()
1226

    
1227

    
1228
class _JobQueueWorker(workerpool.BaseWorker):
1229
  """The actual job workers.
1230

1231
  """
1232
  def RunTask(self, job): # pylint: disable-msg=W0221
1233
    """Job executor.
1234

1235
    @type job: L{_QueuedJob}
1236
    @param job: the job to be processed
1237

1238
    """
1239
    assert job.writable, "Expected writable job"
1240

    
1241
    # Ensure only one worker is active on a single job. If a job registers for
1242
    # a dependency job, and the other job notifies before the first worker is
1243
    # done, the job can end up in the tasklist more than once.
1244
    job.processor_lock.acquire()
1245
    try:
1246
      return self._RunTaskInner(job)
1247
    finally:
1248
      job.processor_lock.release()
1249

    
1250
  def _RunTaskInner(self, job):
1251
    """Executes a job.
1252

1253
    Must be called with per-job lock acquired.
1254

1255
    """
1256
    queue = job.queue
1257
    assert queue == self.pool.queue
1258

    
1259
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1260
    setname_fn(None)
1261

    
1262
    proc = mcpu.Processor(queue.context, job.id)
1263

    
1264
    # Create wrapper for setting thread name
1265
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1266
                                    proc.ExecOpCode)
1267

    
1268
    if not _JobProcessor(queue, wrap_execop_fn, job)():
1269
      # Schedule again
1270
      raise workerpool.DeferTask(priority=job.CalcPriority())
1271

    
1272
  @staticmethod
1273
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1274
    """Updates the worker thread name to include a short summary of the opcode.
1275

1276
    @param setname_fn: Callable setting worker thread name
1277
    @param execop_fn: Callable for executing opcode (usually
1278
                      L{mcpu.Processor.ExecOpCode})
1279

1280
    """
1281
    setname_fn(op)
1282
    try:
1283
      return execop_fn(op, *args, **kwargs)
1284
    finally:
1285
      setname_fn(None)
1286

    
1287
  @staticmethod
1288
  def _GetWorkerName(job, op):
1289
    """Sets the worker thread name.
1290

1291
    @type job: L{_QueuedJob}
1292
    @type op: L{opcodes.OpCode}
1293

1294
    """
1295
    parts = ["Job%s" % job.id]
1296

    
1297
    if op:
1298
      parts.append(op.TinySummary())
1299

    
1300
    return "/".join(parts)
1301

    
1302

    
1303
class _JobQueueWorkerPool(workerpool.WorkerPool):
1304
  """Simple class implementing a job-processing workerpool.
1305

1306
  """
1307
  def __init__(self, queue):
1308
    super(_JobQueueWorkerPool, self).__init__("Jq",
1309
                                              JOBQUEUE_THREADS,
1310
                                              _JobQueueWorker)
1311
    self.queue = queue
1312

    
1313

    
1314
class _JobDependencyManager:
1315
  """Keeps track of job dependencies.
1316

1317
  """
1318
  (WAIT,
1319
   ERROR,
1320
   CANCEL,
1321
   CONTINUE,
1322
   WRONGSTATUS) = range(1, 6)
1323

    
1324
  # TODO: Export waiter information to lock monitor
1325

    
1326
  def __init__(self, getstatus_fn, enqueue_fn):
1327
    """Initializes this class.
1328

1329
    """
1330
    self._getstatus_fn = getstatus_fn
1331
    self._enqueue_fn = enqueue_fn
1332

    
1333
    self._waiters = {}
1334
    self._lock = locking.SharedLock("JobDepMgr")
1335

    
1336
  @locking.ssynchronized(_LOCK, shared=1)
1337
  def JobWaiting(self, job):
1338
    """Checks if a job is waiting.
1339

1340
    """
1341
    return compat.any(job in jobs
1342
                      for jobs in self._waiters.values())
1343

    
1344
  @locking.ssynchronized(_LOCK)
1345
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1346
    """Checks if a dependency job has the requested status.
1347

1348
    If the other job is not yet in a finalized status, the calling job will be
1349
    notified (re-added to the workerpool) at a later point.
1350

1351
    @type job: L{_QueuedJob}
1352
    @param job: Job object
1353
    @type dep_job_id: string
1354
    @param dep_job_id: ID of dependency job
1355
    @type dep_status: list
1356
    @param dep_status: Required status
1357

1358
    """
1359
    assert ht.TString(job.id)
1360
    assert ht.TString(dep_job_id)
1361
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1362

    
1363
    if job.id == dep_job_id:
1364
      return (self.ERROR, "Job can't depend on itself")
1365

    
1366
    # Get status of dependency job
1367
    try:
1368
      status = self._getstatus_fn(dep_job_id)
1369
    except errors.JobLost, err:
1370
      return (self.ERROR, "Dependency error: %s" % err)
1371

    
1372
    assert status in constants.JOB_STATUS_ALL
1373

    
1374
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1375

    
1376
    if status not in constants.JOBS_FINALIZED:
1377
      # Register for notification and wait for job to finish
1378
      job_id_waiters.add(job)
1379
      return (self.WAIT,
1380
              "Need to wait for job %s, wanted status '%s'" %
1381
              (dep_job_id, dep_status))
1382

    
1383
    # Remove from waiters list
1384
    if job in job_id_waiters:
1385
      job_id_waiters.remove(job)
1386

    
1387
    if (status == constants.JOB_STATUS_CANCELED and
1388
        constants.JOB_STATUS_CANCELED not in dep_status):
1389
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1390

    
1391
    elif not dep_status or status in dep_status:
1392
      return (self.CONTINUE,
1393
              "Dependency job %s finished with status '%s'" %
1394
              (dep_job_id, status))
1395

    
1396
    else:
1397
      return (self.WRONGSTATUS,
1398
              "Dependency job %s finished with status '%s',"
1399
              " not one of '%s' as required" %
1400
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1401

    
1402
  @locking.ssynchronized(_LOCK)
1403
  def NotifyWaiters(self, job_id):
1404
    """Notifies all jobs waiting for a certain job ID.
1405

1406
    @type job_id: string
1407
    @param job_id: Job ID
1408

1409
    """
1410
    assert ht.TString(job_id)
1411

    
1412
    jobs = self._waiters.pop(job_id, None)
1413
    if jobs:
1414
      # Re-add jobs to workerpool
1415
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1416
                    len(jobs), job_id)
1417
      self._enqueue_fn(jobs)
1418

    
1419
    # Remove all jobs without actual waiters
1420
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1421
                   if not waiters]:
1422
      del self._waiters[job_id]
1423

    
1424

    
1425
def _RequireOpenQueue(fn):
1426
  """Decorator for "public" functions.
1427

1428
  This function should be used for all 'public' functions. That is,
1429
  functions usually called from other classes. Note that this should
1430
  be applied only to methods (not plain functions), since it expects
1431
  that the decorated function is called with a first argument that has
1432
  a '_queue_filelock' argument.
1433

1434
  @warning: Use this decorator only after locking.ssynchronized
1435

1436
  Example::
1437
    @locking.ssynchronized(_LOCK)
1438
    @_RequireOpenQueue
1439
    def Example(self):
1440
      pass
1441

1442
  """
1443
  def wrapper(self, *args, **kwargs):
1444
    # pylint: disable-msg=W0212
1445
    assert self._queue_filelock is not None, "Queue should be open"
1446
    return fn(self, *args, **kwargs)
1447
  return wrapper
1448

    
1449

    
1450
class JobQueue(object):
1451
  """Queue used to manage the jobs.
1452

1453
  @cvar _RE_JOB_FILE: regex matching the valid job file names
1454

1455
  """
1456
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1457

    
1458
  def __init__(self, context):
1459
    """Constructor for JobQueue.
1460

1461
    The constructor will initialize the job queue object and then
1462
    start loading the current jobs from disk, either for starting them
1463
    (if they were queue) or for aborting them (if they were already
1464
    running).
1465

1466
    @type context: GanetiContext
1467
    @param context: the context object for access to the configuration
1468
        data and other ganeti objects
1469

1470
    """
1471
    self.context = context
1472
    self._memcache = weakref.WeakValueDictionary()
1473
    self._my_hostname = netutils.Hostname.GetSysName()
1474

    
1475
    # The Big JobQueue lock. If a code block or method acquires it in shared
1476
    # mode safe it must guarantee concurrency with all the code acquiring it in
1477
    # shared mode, including itself. In order not to acquire it at all
1478
    # concurrency must be guaranteed with all code acquiring it in shared mode
1479
    # and all code acquiring it exclusively.
1480
    self._lock = locking.SharedLock("JobQueue")
1481

    
1482
    self.acquire = self._lock.acquire
1483
    self.release = self._lock.release
1484

    
1485
    # Initialize the queue, and acquire the filelock.
1486
    # This ensures no other process is working on the job queue.
1487
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1488

    
1489
    # Read serial file
1490
    self._last_serial = jstore.ReadSerial()
1491
    assert self._last_serial is not None, ("Serial file was modified between"
1492
                                           " check in jstore and here")
1493

    
1494
    # Get initial list of nodes
1495
    self._nodes = dict((n.name, n.primary_ip)
1496
                       for n in self.context.cfg.GetAllNodesInfo().values()
1497
                       if n.master_candidate)
1498

    
1499
    # Remove master node
1500
    self._nodes.pop(self._my_hostname, None)
1501

    
1502
    # TODO: Check consistency across nodes
1503

    
1504
    self._queue_size = 0
1505
    self._UpdateQueueSizeUnlocked()
1506
    self._drained = jstore.CheckDrainFlag()
1507

    
1508
    # Job dependencies
1509
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1510
                                        self._EnqueueJobs)
1511

    
1512
    # Setup worker pool
1513
    self._wpool = _JobQueueWorkerPool(self)
1514
    try:
1515
      self._InspectQueue()
1516
    except:
1517
      self._wpool.TerminateWorkers()
1518
      raise
1519

    
1520
  @locking.ssynchronized(_LOCK)
1521
  @_RequireOpenQueue
1522
  def _InspectQueue(self):
1523
    """Loads the whole job queue and resumes unfinished jobs.
1524

1525
    This function needs the lock here because WorkerPool.AddTask() may start a
1526
    job while we're still doing our work.
1527

1528
    """
1529
    logging.info("Inspecting job queue")
1530

    
1531
    restartjobs = []
1532

    
1533
    all_job_ids = self._GetJobIDsUnlocked()
1534
    jobs_count = len(all_job_ids)
1535
    lastinfo = time.time()
1536
    for idx, job_id in enumerate(all_job_ids):
1537
      # Give an update every 1000 jobs or 10 seconds
1538
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1539
          idx == (jobs_count - 1)):
1540
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1541
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1542
        lastinfo = time.time()
1543

    
1544
      job = self._LoadJobUnlocked(job_id)
1545

    
1546
      # a failure in loading the job can cause 'None' to be returned
1547
      if job is None:
1548
        continue
1549

    
1550
      status = job.CalcStatus()
1551

    
1552
      if status == constants.JOB_STATUS_QUEUED:
1553
        restartjobs.append(job)
1554

    
1555
      elif status in (constants.JOB_STATUS_RUNNING,
1556
                      constants.JOB_STATUS_WAITLOCK,
1557
                      constants.JOB_STATUS_CANCELING):
1558
        logging.warning("Unfinished job %s found: %s", job.id, job)
1559

    
1560
        if status == constants.JOB_STATUS_WAITLOCK:
1561
          # Restart job
1562
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1563
          restartjobs.append(job)
1564
        else:
1565
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1566
                                "Unclean master daemon shutdown")
1567
          job.Finalize()
1568

    
1569
        self.UpdateJobUnlocked(job)
1570

    
1571
    if restartjobs:
1572
      logging.info("Restarting %s jobs", len(restartjobs))
1573
      self._EnqueueJobs(restartjobs)
1574

    
1575
    logging.info("Job queue inspection finished")
1576

    
1577
  @locking.ssynchronized(_LOCK)
1578
  @_RequireOpenQueue
1579
  def AddNode(self, node):
1580
    """Register a new node with the queue.
1581

1582
    @type node: L{objects.Node}
1583
    @param node: the node object to be added
1584

1585
    """
1586
    node_name = node.name
1587
    assert node_name != self._my_hostname
1588

    
1589
    # Clean queue directory on added node
1590
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1591
    msg = result.fail_msg
1592
    if msg:
1593
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1594
                      node_name, msg)
1595

    
1596
    if not node.master_candidate:
1597
      # remove if existing, ignoring errors
1598
      self._nodes.pop(node_name, None)
1599
      # and skip the replication of the job ids
1600
      return
1601

    
1602
    # Upload the whole queue excluding archived jobs
1603
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1604

    
1605
    # Upload current serial file
1606
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1607

    
1608
    for file_name in files:
1609
      # Read file content
1610
      content = utils.ReadFile(file_name)
1611

    
1612
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
1613
                                                  [node.primary_ip],
1614
                                                  file_name, content)
1615
      msg = result[node_name].fail_msg
1616
      if msg:
1617
        logging.error("Failed to upload file %s to node %s: %s",
1618
                      file_name, node_name, msg)
1619

    
1620
    self._nodes[node_name] = node.primary_ip
1621

    
1622
  @locking.ssynchronized(_LOCK)
1623
  @_RequireOpenQueue
1624
  def RemoveNode(self, node_name):
1625
    """Callback called when removing nodes from the cluster.
1626

1627
    @type node_name: str
1628
    @param node_name: the name of the node to remove
1629

1630
    """
1631
    self._nodes.pop(node_name, None)
1632

    
1633
  @staticmethod
1634
  def _CheckRpcResult(result, nodes, failmsg):
1635
    """Verifies the status of an RPC call.
1636

1637
    Since we aim to keep consistency should this node (the current
1638
    master) fail, we will log errors if our rpc fail, and especially
1639
    log the case when more than half of the nodes fails.
1640

1641
    @param result: the data as returned from the rpc call
1642
    @type nodes: list
1643
    @param nodes: the list of nodes we made the call to
1644
    @type failmsg: str
1645
    @param failmsg: the identifier to be used for logging
1646

1647
    """
1648
    failed = []
1649
    success = []
1650

    
1651
    for node in nodes:
1652
      msg = result[node].fail_msg
1653
      if msg:
1654
        failed.append(node)
1655
        logging.error("RPC call %s (%s) failed on node %s: %s",
1656
                      result[node].call, failmsg, node, msg)
1657
      else:
1658
        success.append(node)
1659

    
1660
    # +1 for the master node
1661
    if (len(success) + 1) < len(failed):
1662
      # TODO: Handle failing nodes
1663
      logging.error("More than half of the nodes failed")
1664

    
1665
  def _GetNodeIp(self):
1666
    """Helper for returning the node name/ip list.
1667

1668
    @rtype: (list, list)
1669
    @return: a tuple of two lists, the first one with the node
1670
        names and the second one with the node addresses
1671

1672
    """
1673
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1674
    name_list = self._nodes.keys()
1675
    addr_list = [self._nodes[name] for name in name_list]
1676
    return name_list, addr_list
1677

    
1678
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1679
    """Writes a file locally and then replicates it to all nodes.
1680

1681
    This function will replace the contents of a file on the local
1682
    node and then replicate it to all the other nodes we have.
1683

1684
    @type file_name: str
1685
    @param file_name: the path of the file to be replicated
1686
    @type data: str
1687
    @param data: the new contents of the file
1688
    @type replicate: boolean
1689
    @param replicate: whether to spread the changes to the remote nodes
1690

1691
    """
1692
    getents = runtime.GetEnts()
1693
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1694
                    gid=getents.masterd_gid)
1695

    
1696
    if replicate:
1697
      names, addrs = self._GetNodeIp()
1698
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1699
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1700

    
1701
  def _RenameFilesUnlocked(self, rename):
1702
    """Renames a file locally and then replicate the change.
1703

1704
    This function will rename a file in the local queue directory
1705
    and then replicate this rename to all the other nodes we have.
1706

1707
    @type rename: list of (old, new)
1708
    @param rename: List containing tuples mapping old to new names
1709

1710
    """
1711
    # Rename them locally
1712
    for old, new in rename:
1713
      utils.RenameFile(old, new, mkdir=True)
1714

    
1715
    # ... and on all nodes
1716
    names, addrs = self._GetNodeIp()
1717
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1718
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1719

    
1720
  @staticmethod
1721
  def _FormatJobID(job_id):
1722
    """Convert a job ID to string format.
1723

1724
    Currently this just does C{str(job_id)} after performing some
1725
    checks, but if we want to change the job id format this will
1726
    abstract this change.
1727

1728
    @type job_id: int or long
1729
    @param job_id: the numeric job id
1730
    @rtype: str
1731
    @return: the formatted job id
1732

1733
    """
1734
    if not isinstance(job_id, (int, long)):
1735
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1736
    if job_id < 0:
1737
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1738

    
1739
    return str(job_id)
1740

    
1741
  @classmethod
1742
  def _GetArchiveDirectory(cls, job_id):
1743
    """Returns the archive directory for a job.
1744

1745
    @type job_id: str
1746
    @param job_id: Job identifier
1747
    @rtype: str
1748
    @return: Directory name
1749

1750
    """
1751
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1752

    
1753
  def _NewSerialsUnlocked(self, count):
1754
    """Generates a new job identifier.
1755

1756
    Job identifiers are unique during the lifetime of a cluster.
1757

1758
    @type count: integer
1759
    @param count: how many serials to return
1760
    @rtype: str
1761
    @return: a string representing the job identifier.
1762

1763
    """
1764
    assert count > 0
1765
    # New number
1766
    serial = self._last_serial + count
1767

    
1768
    # Write to file
1769
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1770
                             "%s\n" % serial, True)
1771

    
1772
    result = [self._FormatJobID(v)
1773
              for v in range(self._last_serial + 1, serial + 1)]
1774

    
1775
    # Keep it only if we were able to write the file
1776
    self._last_serial = serial
1777

    
1778
    assert len(result) == count
1779

    
1780
    return result
1781

    
1782
  @staticmethod
1783
  def _GetJobPath(job_id):
1784
    """Returns the job file for a given job id.
1785

1786
    @type job_id: str
1787
    @param job_id: the job identifier
1788
    @rtype: str
1789
    @return: the path to the job file
1790

1791
    """
1792
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1793

    
1794
  @classmethod
1795
  def _GetArchivedJobPath(cls, job_id):
1796
    """Returns the archived job file for a give job id.
1797

1798
    @type job_id: str
1799
    @param job_id: the job identifier
1800
    @rtype: str
1801
    @return: the path to the archived job file
1802

1803
    """
1804
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1805
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1806

    
1807
  def _GetJobIDsUnlocked(self, sort=True):
1808
    """Return all known job IDs.
1809

1810
    The method only looks at disk because it's a requirement that all
1811
    jobs are present on disk (so in the _memcache we don't have any
1812
    extra IDs).
1813

1814
    @type sort: boolean
1815
    @param sort: perform sorting on the returned job ids
1816
    @rtype: list
1817
    @return: the list of job IDs
1818

1819
    """
1820
    jlist = []
1821
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1822
      m = self._RE_JOB_FILE.match(filename)
1823
      if m:
1824
        jlist.append(m.group(1))
1825
    if sort:
1826
      jlist = utils.NiceSort(jlist)
1827
    return jlist
1828

    
1829
  def _LoadJobUnlocked(self, job_id):
1830
    """Loads a job from the disk or memory.
1831

1832
    Given a job id, this will return the cached job object if
1833
    existing, or try to load the job from the disk. If loading from
1834
    disk, it will also add the job to the cache.
1835

1836
    @param job_id: the job id
1837
    @rtype: L{_QueuedJob} or None
1838
    @return: either None or the job object
1839

1840
    """
1841
    job = self._memcache.get(job_id, None)
1842
    if job:
1843
      logging.debug("Found job %s in memcache", job_id)
1844
      assert job.writable, "Found read-only job in memcache"
1845
      return job
1846

    
1847
    try:
1848
      job = self._LoadJobFromDisk(job_id, False)
1849
      if job is None:
1850
        return job
1851
    except errors.JobFileCorrupted:
1852
      old_path = self._GetJobPath(job_id)
1853
      new_path = self._GetArchivedJobPath(job_id)
1854
      if old_path == new_path:
1855
        # job already archived (future case)
1856
        logging.exception("Can't parse job %s", job_id)
1857
      else:
1858
        # non-archived case
1859
        logging.exception("Can't parse job %s, will archive.", job_id)
1860
        self._RenameFilesUnlocked([(old_path, new_path)])
1861
      return None
1862

    
1863
    assert job.writable, "Job just loaded is not writable"
1864

    
1865
    self._memcache[job_id] = job
1866
    logging.debug("Added job %s to the cache", job_id)
1867
    return job
1868

    
1869
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1870
    """Load the given job file from disk.
1871

1872
    Given a job file, read, load and restore it in a _QueuedJob format.
1873

1874
    @type job_id: string
1875
    @param job_id: job identifier
1876
    @type try_archived: bool
1877
    @param try_archived: Whether to try loading an archived job
1878
    @rtype: L{_QueuedJob} or None
1879
    @return: either None or the job object
1880

1881
    """
1882
    path_functions = [(self._GetJobPath, True)]
1883

    
1884
    if try_archived:
1885
      path_functions.append((self._GetArchivedJobPath, False))
1886

    
1887
    raw_data = None
1888
    writable_default = None
1889

    
1890
    for (fn, writable_default) in path_functions:
1891
      filepath = fn(job_id)
1892
      logging.debug("Loading job from %s", filepath)
1893
      try:
1894
        raw_data = utils.ReadFile(filepath)
1895
      except EnvironmentError, err:
1896
        if err.errno != errno.ENOENT:
1897
          raise
1898
      else:
1899
        break
1900

    
1901
    if not raw_data:
1902
      return None
1903

    
1904
    if writable is None:
1905
      writable = writable_default
1906

    
1907
    try:
1908
      data = serializer.LoadJson(raw_data)
1909
      job = _QueuedJob.Restore(self, data, writable)
1910
    except Exception, err: # pylint: disable-msg=W0703
1911
      raise errors.JobFileCorrupted(err)
1912

    
1913
    return job
1914

    
1915
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1916
    """Load the given job file from disk.
1917

1918
    Given a job file, read, load and restore it in a _QueuedJob format.
1919
    In case of error reading the job, it gets returned as None, and the
1920
    exception is logged.
1921

1922
    @type job_id: string
1923
    @param job_id: job identifier
1924
    @type try_archived: bool
1925
    @param try_archived: Whether to try loading an archived job
1926
    @rtype: L{_QueuedJob} or None
1927
    @return: either None or the job object
1928

1929
    """
1930
    try:
1931
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
1932
    except (errors.JobFileCorrupted, EnvironmentError):
1933
      logging.exception("Can't load/parse job %s", job_id)
1934
      return None
1935

    
1936
  def _UpdateQueueSizeUnlocked(self):
1937
    """Update the queue size.
1938

1939
    """
1940
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1941

    
1942
  @locking.ssynchronized(_LOCK)
1943
  @_RequireOpenQueue
1944
  def SetDrainFlag(self, drain_flag):
1945
    """Sets the drain flag for the queue.
1946

1947
    @type drain_flag: boolean
1948
    @param drain_flag: Whether to set or unset the drain flag
1949

1950
    """
1951
    jstore.SetDrainFlag(drain_flag)
1952

    
1953
    self._drained = drain_flag
1954

    
1955
    return True
1956

    
1957
  @_RequireOpenQueue
1958
  def _SubmitJobUnlocked(self, job_id, ops):
1959
    """Create and store a new job.
1960

1961
    This enters the job into our job queue and also puts it on the new
1962
    queue, in order for it to be picked up by the queue processors.
1963

1964
    @type job_id: job ID
1965
    @param job_id: the job ID for the new job
1966
    @type ops: list
1967
    @param ops: The list of OpCodes that will become the new job.
1968
    @rtype: L{_QueuedJob}
1969
    @return: the job object to be queued
1970
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1971
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1972
    @raise errors.GenericError: If an opcode is not valid
1973

1974
    """
1975
    # Ok when sharing the big job queue lock, as the drain file is created when
1976
    # the lock is exclusive.
1977
    if self._drained:
1978
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1979

    
1980
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1981
      raise errors.JobQueueFull()
1982

    
1983
    job = _QueuedJob(self, job_id, ops, True)
1984

    
1985
    # Check priority
1986
    for idx, op in enumerate(job.ops):
1987
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1988
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1989
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1990
                                  " are %s" % (idx, op.priority, allowed))
1991

    
1992
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
1993
      if not opcodes.TNoRelativeJobDependencies(dependencies):
1994
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
1995
                                  " match %s: %s" %
1996
                                  (idx, opcodes.TNoRelativeJobDependencies,
1997
                                   dependencies))
1998

    
1999
    # Write to disk
2000
    self.UpdateJobUnlocked(job)
2001

    
2002
    self._queue_size += 1
2003

    
2004
    logging.debug("Adding new job %s to the cache", job_id)
2005
    self._memcache[job_id] = job
2006

    
2007
    return job
2008

    
2009
  @locking.ssynchronized(_LOCK)
2010
  @_RequireOpenQueue
2011
  def SubmitJob(self, ops):
2012
    """Create and store a new job.
2013

2014
    @see: L{_SubmitJobUnlocked}
2015

2016
    """
2017
    (job_id, ) = self._NewSerialsUnlocked(1)
2018
    self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
2019
    return job_id
2020

    
2021
  @locking.ssynchronized(_LOCK)
2022
  @_RequireOpenQueue
2023
  def SubmitManyJobs(self, jobs):
2024
    """Create and store multiple jobs.
2025

2026
    @see: L{_SubmitJobUnlocked}
2027

2028
    """
2029
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2030

    
2031
    (results, added_jobs) = \
2032
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2033

    
2034
    self._EnqueueJobs(added_jobs)
2035

    
2036
    return results
2037

    
2038
  @staticmethod
2039
  def _FormatSubmitError(msg, ops):
2040
    """Formats errors which occurred while submitting a job.
2041

2042
    """
2043
    return ("%s; opcodes %s" %
2044
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2045

    
2046
  @staticmethod
2047
  def _ResolveJobDependencies(resolve_fn, deps):
2048
    """Resolves relative job IDs in dependencies.
2049

2050
    @type resolve_fn: callable
2051
    @param resolve_fn: Function to resolve a relative job ID
2052
    @type deps: list
2053
    @param deps: Dependencies
2054
    @rtype: list
2055
    @return: Resolved dependencies
2056

2057
    """
2058
    result = []
2059

    
2060
    for (dep_job_id, dep_status) in deps:
2061
      if ht.TRelativeJobId(dep_job_id):
2062
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2063
        try:
2064
          job_id = resolve_fn(dep_job_id)
2065
        except IndexError:
2066
          # Abort
2067
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2068
      else:
2069
        job_id = dep_job_id
2070

    
2071
      result.append((job_id, dep_status))
2072

    
2073
    return (True, result)
2074

    
2075
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2076
    """Create and store multiple jobs.
2077

2078
    @see: L{_SubmitJobUnlocked}
2079

2080
    """
2081
    results = []
2082
    added_jobs = []
2083

    
2084
    def resolve_fn(job_idx, reljobid):
2085
      assert reljobid < 0
2086
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2087

    
2088
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2089
      for op in ops:
2090
        if getattr(op, opcodes.DEPEND_ATTR, None):
2091
          (status, data) = \
2092
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2093
                                         op.depends)
2094
          if not status:
2095
            # Abort resolving dependencies
2096
            assert ht.TNonEmptyString(data), "No error message"
2097
            break
2098
          # Use resolved dependencies
2099
          op.depends = data
2100
      else:
2101
        try:
2102
          job = self._SubmitJobUnlocked(job_id, ops)
2103
        except errors.GenericError, err:
2104
          status = False
2105
          data = self._FormatSubmitError(str(err), ops)
2106
        else:
2107
          status = True
2108
          data = job_id
2109
          added_jobs.append(job)
2110

    
2111
      results.append((status, data))
2112

    
2113
    return (results, added_jobs)
2114

    
2115
  def _EnqueueJobs(self, jobs):
2116
    """Helper function to add jobs to worker pool's queue.
2117

2118
    @type jobs: list
2119
    @param jobs: List of all jobs
2120

2121
    """
2122
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2123
                             priority=[job.CalcPriority() for job in jobs])
2124

    
2125
  def _GetJobStatusForDependencies(self, job_id):
2126
    """Gets the status of a job for dependencies.
2127

2128
    @type job_id: string
2129
    @param job_id: Job ID
2130
    @raise errors.JobLost: If job can't be found
2131

2132
    """
2133
    if not isinstance(job_id, basestring):
2134
      job_id = self._FormatJobID(job_id)
2135

    
2136
    # Not using in-memory cache as doing so would require an exclusive lock
2137

    
2138
    # Try to load from disk
2139
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2140

    
2141
    assert not job.writable, "Got writable job"
2142

    
2143
    if job:
2144
      return job.CalcStatus()
2145

    
2146
    raise errors.JobLost("Job %s not found" % job_id)
2147

    
2148
  @_RequireOpenQueue
2149
  def UpdateJobUnlocked(self, job, replicate=True):
2150
    """Update a job's on disk storage.
2151

2152
    After a job has been modified, this function needs to be called in
2153
    order to write the changes to disk and replicate them to the other
2154
    nodes.
2155

2156
    @type job: L{_QueuedJob}
2157
    @param job: the changed job
2158
    @type replicate: boolean
2159
    @param replicate: whether to replicate the change to remote nodes
2160

2161
    """
2162
    if __debug__:
2163
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2164
      assert (finalized ^ (job.end_timestamp is None))
2165
      assert job.writable, "Can't update read-only job"
2166

    
2167
    filename = self._GetJobPath(job.id)
2168
    data = serializer.DumpJson(job.Serialize(), indent=False)
2169
    logging.debug("Writing job %s to %s", job.id, filename)
2170
    self._UpdateJobQueueFile(filename, data, replicate)
2171

    
2172
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2173
                        timeout):
2174
    """Waits for changes in a job.
2175

2176
    @type job_id: string
2177
    @param job_id: Job identifier
2178
    @type fields: list of strings
2179
    @param fields: Which fields to check for changes
2180
    @type prev_job_info: list or None
2181
    @param prev_job_info: Last job information returned
2182
    @type prev_log_serial: int
2183
    @param prev_log_serial: Last job message serial number
2184
    @type timeout: float
2185
    @param timeout: maximum time to wait in seconds
2186
    @rtype: tuple (job info, log entries)
2187
    @return: a tuple of the job information as required via
2188
        the fields parameter, and the log entries as a list
2189

2190
        if the job has not changed and the timeout has expired,
2191
        we instead return a special value,
2192
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2193
        as such by the clients
2194

2195
    """
2196
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
2197
                             writable=False)
2198

    
2199
    helper = _WaitForJobChangesHelper()
2200

    
2201
    return helper(self._GetJobPath(job_id), load_fn,
2202
                  fields, prev_job_info, prev_log_serial, timeout)
2203

    
2204
  @locking.ssynchronized(_LOCK)
2205
  @_RequireOpenQueue
2206
  def CancelJob(self, job_id):
2207
    """Cancels a job.
2208

2209
    This will only succeed if the job has not started yet.
2210

2211
    @type job_id: string
2212
    @param job_id: job ID of job to be cancelled.
2213

2214
    """
2215
    logging.info("Cancelling job %s", job_id)
2216

    
2217
    job = self._LoadJobUnlocked(job_id)
2218
    if not job:
2219
      logging.debug("Job %s not found", job_id)
2220
      return (False, "Job %s not found" % job_id)
2221

    
2222
    assert job.writable, "Can't cancel read-only job"
2223

    
2224
    (success, msg) = job.Cancel()
2225

    
2226
    if success:
2227
      # If the job was finalized (e.g. cancelled), this is the final write
2228
      # allowed. The job can be archived anytime.
2229
      self.UpdateJobUnlocked(job)
2230

    
2231
    return (success, msg)
2232

    
2233
  @_RequireOpenQueue
2234
  def _ArchiveJobsUnlocked(self, jobs):
2235
    """Archives jobs.
2236

2237
    @type jobs: list of L{_QueuedJob}
2238
    @param jobs: Job objects
2239
    @rtype: int
2240
    @return: Number of archived jobs
2241

2242
    """
2243
    archive_jobs = []
2244
    rename_files = []
2245
    for job in jobs:
2246
      assert job.writable, "Can't archive read-only job"
2247

    
2248
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2249
        logging.debug("Job %s is not yet done", job.id)
2250
        continue
2251

    
2252
      archive_jobs.append(job)
2253

    
2254
      old = self._GetJobPath(job.id)
2255
      new = self._GetArchivedJobPath(job.id)
2256
      rename_files.append((old, new))
2257

    
2258
    # TODO: What if 1..n files fail to rename?
2259
    self._RenameFilesUnlocked(rename_files)
2260

    
2261
    logging.debug("Successfully archived job(s) %s",
2262
                  utils.CommaJoin(job.id for job in archive_jobs))
2263

    
2264
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2265
    # the files, we update the cached queue size from the filesystem. When we
2266
    # get around to fix the TODO: above, we can use the number of actually
2267
    # archived jobs to fix this.
2268
    self._UpdateQueueSizeUnlocked()
2269
    return len(archive_jobs)
2270

    
2271
  @locking.ssynchronized(_LOCK)
2272
  @_RequireOpenQueue
2273
  def ArchiveJob(self, job_id):
2274
    """Archives a job.
2275

2276
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2277

2278
    @type job_id: string
2279
    @param job_id: Job ID of job to be archived.
2280
    @rtype: bool
2281
    @return: Whether job was archived
2282

2283
    """
2284
    logging.info("Archiving job %s", job_id)
2285

    
2286
    job = self._LoadJobUnlocked(job_id)
2287
    if not job:
2288
      logging.debug("Job %s not found", job_id)
2289
      return False
2290

    
2291
    return self._ArchiveJobsUnlocked([job]) == 1
2292

    
2293
  @locking.ssynchronized(_LOCK)
2294
  @_RequireOpenQueue
2295
  def AutoArchiveJobs(self, age, timeout):
2296
    """Archives all jobs based on age.
2297

2298
    The method will archive all jobs which are older than the age
2299
    parameter. For jobs that don't have an end timestamp, the start
2300
    timestamp will be considered. The special '-1' age will cause
2301
    archival of all jobs (that are not running or queued).
2302

2303
    @type age: int
2304
    @param age: the minimum age in seconds
2305

2306
    """
2307
    logging.info("Archiving jobs with age more than %s seconds", age)
2308

    
2309
    now = time.time()
2310
    end_time = now + timeout
2311
    archived_count = 0
2312
    last_touched = 0
2313

    
2314
    all_job_ids = self._GetJobIDsUnlocked()
2315
    pending = []
2316
    for idx, job_id in enumerate(all_job_ids):
2317
      last_touched = idx + 1
2318

    
2319
      # Not optimal because jobs could be pending
2320
      # TODO: Measure average duration for job archival and take number of
2321
      # pending jobs into account.
2322
      if time.time() > end_time:
2323
        break
2324

    
2325
      # Returns None if the job failed to load
2326
      job = self._LoadJobUnlocked(job_id)
2327
      if job:
2328
        if job.end_timestamp is None:
2329
          if job.start_timestamp is None:
2330
            job_age = job.received_timestamp
2331
          else:
2332
            job_age = job.start_timestamp
2333
        else:
2334
          job_age = job.end_timestamp
2335

    
2336
        if age == -1 or now - job_age[0] > age:
2337
          pending.append(job)
2338

    
2339
          # Archive 10 jobs at a time
2340
          if len(pending) >= 10:
2341
            archived_count += self._ArchiveJobsUnlocked(pending)
2342
            pending = []
2343

    
2344
    if pending:
2345
      archived_count += self._ArchiveJobsUnlocked(pending)
2346

    
2347
    return (archived_count, len(all_job_ids) - last_touched)
2348

    
2349
  def QueryJobs(self, job_ids, fields):
2350
    """Returns a list of jobs in queue.
2351

2352
    @type job_ids: list
2353
    @param job_ids: sequence of job identifiers or None for all
2354
    @type fields: list
2355
    @param fields: names of fields to return
2356
    @rtype: list
2357
    @return: list one element per job, each element being list with
2358
        the requested fields
2359

2360
    """
2361
    jobs = []
2362
    list_all = False
2363
    if not job_ids:
2364
      # Since files are added to/removed from the queue atomically, there's no
2365
      # risk of getting the job ids in an inconsistent state.
2366
      job_ids = self._GetJobIDsUnlocked()
2367
      list_all = True
2368

    
2369
    for job_id in job_ids:
2370
      job = self.SafeLoadJobFromDisk(job_id, True)
2371
      if job is not None:
2372
        jobs.append(job.GetInfo(fields))
2373
      elif not list_all:
2374
        jobs.append(None)
2375

    
2376
    return jobs
2377

    
2378
  @locking.ssynchronized(_LOCK)
2379
  @_RequireOpenQueue
2380
  def Shutdown(self):
2381
    """Stops the job queue.
2382

2383
    This shutdowns all the worker threads an closes the queue.
2384

2385
    """
2386
    self._wpool.TerminateWorkers()
2387

    
2388
    self._queue_filelock.Close()
2389
    self._queue_filelock = None