Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ e71c8147

History | View | Annotate | Download (50.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import errno
35
import re
36
import time
37
import weakref
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
59

    
60

    
61
JOBQUEUE_THREADS = 25
62
JOBS_PER_ARCHIVE_DIRECTORY = 10000
63

    
64
# member lock names to be passed to @ssynchronized decorator
65
_LOCK = "_lock"
66
_QUEUE = "_queue"
67

    
68

    
69
class CancelJob(Exception):
70
  """Special exception to cancel a job.
71

72
  """
73

    
74

    
75
def TimeStampNow():
76
  """Returns the current timestamp.
77

78
  @rtype: tuple
79
  @return: the current time in the (seconds, microseconds) format
80

81
  """
82
  return utils.SplitTime(time.time())
83

    
84

    
85
class _QueuedOpCode(object):
86
  """Encapsulates an opcode object.
87

88
  @ivar log: holds the execution log and consists of tuples
89
  of the form C{(log_serial, timestamp, level, message)}
90
  @ivar input: the OpCode we encapsulate
91
  @ivar status: the current status
92
  @ivar result: the result of the LU execution
93
  @ivar start_timestamp: timestamp for the start of the execution
94
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
95
  @ivar stop_timestamp: timestamp for the end of the execution
96

97
  """
98
  __slots__ = ["input", "status", "result", "log", "priority",
99
               "start_timestamp", "exec_timestamp", "end_timestamp",
100
               "__weakref__"]
101

    
102
  def __init__(self, op):
103
    """Constructor for the _QuededOpCode.
104

105
    @type op: L{opcodes.OpCode}
106
    @param op: the opcode we encapsulate
107

108
    """
109
    self.input = op
110
    self.status = constants.OP_STATUS_QUEUED
111
    self.result = None
112
    self.log = []
113
    self.start_timestamp = None
114
    self.exec_timestamp = None
115
    self.end_timestamp = None
116

    
117
    # Get initial priority (it might change during the lifetime of this opcode)
118
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
119

    
120
  @classmethod
121
  def Restore(cls, state):
122
    """Restore the _QueuedOpCode from the serialized form.
123

124
    @type state: dict
125
    @param state: the serialized state
126
    @rtype: _QueuedOpCode
127
    @return: a new _QueuedOpCode instance
128

129
    """
130
    obj = _QueuedOpCode.__new__(cls)
131
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
132
    obj.status = state["status"]
133
    obj.result = state["result"]
134
    obj.log = state["log"]
135
    obj.start_timestamp = state.get("start_timestamp", None)
136
    obj.exec_timestamp = state.get("exec_timestamp", None)
137
    obj.end_timestamp = state.get("end_timestamp", None)
138
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
139
    return obj
140

    
141
  def Serialize(self):
142
    """Serializes this _QueuedOpCode.
143

144
    @rtype: dict
145
    @return: the dictionary holding the serialized state
146

147
    """
148
    return {
149
      "input": self.input.__getstate__(),
150
      "status": self.status,
151
      "result": self.result,
152
      "log": self.log,
153
      "start_timestamp": self.start_timestamp,
154
      "exec_timestamp": self.exec_timestamp,
155
      "end_timestamp": self.end_timestamp,
156
      "priority": self.priority,
157
      }
158

    
159

    
160
class _QueuedJob(object):
161
  """In-memory job representation.
162

163
  This is what we use to track the user-submitted jobs. Locking must
164
  be taken care of by users of this class.
165

166
  @type queue: L{JobQueue}
167
  @ivar queue: the parent queue
168
  @ivar id: the job ID
169
  @type ops: list
170
  @ivar ops: the list of _QueuedOpCode that constitute the job
171
  @type log_serial: int
172
  @ivar log_serial: holds the index for the next log entry
173
  @ivar received_timestamp: the timestamp for when the job was received
174
  @ivar start_timestmap: the timestamp for start of execution
175
  @ivar end_timestamp: the timestamp for end of execution
176

177
  """
178
  # pylint: disable-msg=W0212
179
  __slots__ = ["queue", "id", "ops", "log_serial",
180
               "received_timestamp", "start_timestamp", "end_timestamp",
181
               "__weakref__"]
182

    
183
  def __init__(self, queue, job_id, ops):
184
    """Constructor for the _QueuedJob.
185

186
    @type queue: L{JobQueue}
187
    @param queue: our parent queue
188
    @type job_id: job_id
189
    @param job_id: our job id
190
    @type ops: list
191
    @param ops: the list of opcodes we hold, which will be encapsulated
192
        in _QueuedOpCodes
193

194
    """
195
    if not ops:
196
      raise errors.GenericError("A job needs at least one opcode")
197

    
198
    self.queue = queue
199
    self.id = job_id
200
    self.ops = [_QueuedOpCode(op) for op in ops]
201
    self.log_serial = 0
202
    self.received_timestamp = TimeStampNow()
203
    self.start_timestamp = None
204
    self.end_timestamp = None
205

    
206
  def __repr__(self):
207
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
208
              "id=%s" % self.id,
209
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
210

    
211
    return "<%s at %#x>" % (" ".join(status), id(self))
212

    
213
  @classmethod
214
  def Restore(cls, queue, state):
215
    """Restore a _QueuedJob from serialized state:
216

217
    @type queue: L{JobQueue}
218
    @param queue: to which queue the restored job belongs
219
    @type state: dict
220
    @param state: the serialized state
221
    @rtype: _JobQueue
222
    @return: the restored _JobQueue instance
223

224
    """
225
    obj = _QueuedJob.__new__(cls)
226
    obj.queue = queue
227
    obj.id = state["id"]
228
    obj.received_timestamp = state.get("received_timestamp", None)
229
    obj.start_timestamp = state.get("start_timestamp", None)
230
    obj.end_timestamp = state.get("end_timestamp", None)
231

    
232
    obj.ops = []
233
    obj.log_serial = 0
234
    for op_state in state["ops"]:
235
      op = _QueuedOpCode.Restore(op_state)
236
      for log_entry in op.log:
237
        obj.log_serial = max(obj.log_serial, log_entry[0])
238
      obj.ops.append(op)
239

    
240
    return obj
241

    
242
  def Serialize(self):
243
    """Serialize the _JobQueue instance.
244

245
    @rtype: dict
246
    @return: the serialized state
247

248
    """
249
    return {
250
      "id": self.id,
251
      "ops": [op.Serialize() for op in self.ops],
252
      "start_timestamp": self.start_timestamp,
253
      "end_timestamp": self.end_timestamp,
254
      "received_timestamp": self.received_timestamp,
255
      }
256

    
257
  def CalcStatus(self):
258
    """Compute the status of this job.
259

260
    This function iterates over all the _QueuedOpCodes in the job and
261
    based on their status, computes the job status.
262

263
    The algorithm is:
264
      - if we find a cancelled, or finished with error, the job
265
        status will be the same
266
      - otherwise, the last opcode with the status one of:
267
          - waitlock
268
          - canceling
269
          - running
270

271
        will determine the job status
272

273
      - otherwise, it means either all opcodes are queued, or success,
274
        and the job status will be the same
275

276
    @return: the job status
277

278
    """
279
    status = constants.JOB_STATUS_QUEUED
280

    
281
    all_success = True
282
    for op in self.ops:
283
      if op.status == constants.OP_STATUS_SUCCESS:
284
        continue
285

    
286
      all_success = False
287

    
288
      if op.status == constants.OP_STATUS_QUEUED:
289
        pass
290
      elif op.status == constants.OP_STATUS_WAITLOCK:
291
        status = constants.JOB_STATUS_WAITLOCK
292
      elif op.status == constants.OP_STATUS_RUNNING:
293
        status = constants.JOB_STATUS_RUNNING
294
      elif op.status == constants.OP_STATUS_CANCELING:
295
        status = constants.JOB_STATUS_CANCELING
296
        break
297
      elif op.status == constants.OP_STATUS_ERROR:
298
        status = constants.JOB_STATUS_ERROR
299
        # The whole job fails if one opcode failed
300
        break
301
      elif op.status == constants.OP_STATUS_CANCELED:
302
        status = constants.OP_STATUS_CANCELED
303
        break
304

    
305
    if all_success:
306
      status = constants.JOB_STATUS_SUCCESS
307

    
308
    return status
309

    
310
  def CalcPriority(self):
311
    """Gets the current priority for this job.
312

313
    Only unfinished opcodes are considered. When all are done, the default
314
    priority is used.
315

316
    @rtype: int
317

318
    """
319
    priorities = [op.priority for op in self.ops
320
                  if op.status not in constants.OPS_FINALIZED]
321

    
322
    if not priorities:
323
      # All opcodes are done, assume default priority
324
      return constants.OP_PRIO_DEFAULT
325

    
326
    return min(priorities)
327

    
328
  def GetLogEntries(self, newer_than):
329
    """Selectively returns the log entries.
330

331
    @type newer_than: None or int
332
    @param newer_than: if this is None, return all log entries,
333
        otherwise return only the log entries with serial higher
334
        than this value
335
    @rtype: list
336
    @return: the list of the log entries selected
337

338
    """
339
    if newer_than is None:
340
      serial = -1
341
    else:
342
      serial = newer_than
343

    
344
    entries = []
345
    for op in self.ops:
346
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
347

    
348
    return entries
349

    
350
  def GetInfo(self, fields):
351
    """Returns information about a job.
352

353
    @type fields: list
354
    @param fields: names of fields to return
355
    @rtype: list
356
    @return: list with one element for each field
357
    @raise errors.OpExecError: when an invalid field
358
        has been passed
359

360
    """
361
    row = []
362
    for fname in fields:
363
      if fname == "id":
364
        row.append(self.id)
365
      elif fname == "status":
366
        row.append(self.CalcStatus())
367
      elif fname == "ops":
368
        row.append([op.input.__getstate__() for op in self.ops])
369
      elif fname == "opresult":
370
        row.append([op.result for op in self.ops])
371
      elif fname == "opstatus":
372
        row.append([op.status for op in self.ops])
373
      elif fname == "oplog":
374
        row.append([op.log for op in self.ops])
375
      elif fname == "opstart":
376
        row.append([op.start_timestamp for op in self.ops])
377
      elif fname == "opexec":
378
        row.append([op.exec_timestamp for op in self.ops])
379
      elif fname == "opend":
380
        row.append([op.end_timestamp for op in self.ops])
381
      elif fname == "received_ts":
382
        row.append(self.received_timestamp)
383
      elif fname == "start_ts":
384
        row.append(self.start_timestamp)
385
      elif fname == "end_ts":
386
        row.append(self.end_timestamp)
387
      elif fname == "summary":
388
        row.append([op.input.Summary() for op in self.ops])
389
      else:
390
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
391
    return row
392

    
393
  def MarkUnfinishedOps(self, status, result):
394
    """Mark unfinished opcodes with a given status and result.
395

396
    This is an utility function for marking all running or waiting to
397
    be run opcodes with a given status. Opcodes which are already
398
    finalised are not changed.
399

400
    @param status: a given opcode status
401
    @param result: the opcode result
402

403
    """
404
    not_marked = True
405
    for op in self.ops:
406
      if op.status in constants.OPS_FINALIZED:
407
        assert not_marked, "Finalized opcodes found after non-finalized ones"
408
        continue
409
      op.status = status
410
      op.result = result
411
      not_marked = False
412

    
413

    
414
class _OpExecCallbacks(mcpu.OpExecCbBase):
415
  def __init__(self, queue, job, op):
416
    """Initializes this class.
417

418
    @type queue: L{JobQueue}
419
    @param queue: Job queue
420
    @type job: L{_QueuedJob}
421
    @param job: Job object
422
    @type op: L{_QueuedOpCode}
423
    @param op: OpCode
424

425
    """
426
    assert queue, "Queue is missing"
427
    assert job, "Job is missing"
428
    assert op, "Opcode is missing"
429

    
430
    self._queue = queue
431
    self._job = job
432
    self._op = op
433

    
434
  def _CheckCancel(self):
435
    """Raises an exception to cancel the job if asked to.
436

437
    """
438
    # Cancel here if we were asked to
439
    if self._op.status == constants.OP_STATUS_CANCELING:
440
      logging.debug("Canceling opcode")
441
      raise CancelJob()
442

    
443
  @locking.ssynchronized(_QUEUE, shared=1)
444
  def NotifyStart(self):
445
    """Mark the opcode as running, not lock-waiting.
446

447
    This is called from the mcpu code as a notifier function, when the LU is
448
    finally about to start the Exec() method. Of course, to have end-user
449
    visible results, the opcode must be initially (before calling into
450
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
451

452
    """
453
    assert self._op in self._job.ops
454
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
455
                               constants.OP_STATUS_CANCELING)
456

    
457
    # Cancel here if we were asked to
458
    self._CheckCancel()
459

    
460
    logging.debug("Opcode is now running")
461

    
462
    self._op.status = constants.OP_STATUS_RUNNING
463
    self._op.exec_timestamp = TimeStampNow()
464

    
465
    # And finally replicate the job status
466
    self._queue.UpdateJobUnlocked(self._job)
467

    
468
  @locking.ssynchronized(_QUEUE, shared=1)
469
  def _AppendFeedback(self, timestamp, log_type, log_msg):
470
    """Internal feedback append function, with locks
471

472
    """
473
    self._job.log_serial += 1
474
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
475
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
476

    
477
  def Feedback(self, *args):
478
    """Append a log entry.
479

480
    """
481
    assert len(args) < 3
482

    
483
    if len(args) == 1:
484
      log_type = constants.ELOG_MESSAGE
485
      log_msg = args[0]
486
    else:
487
      (log_type, log_msg) = args
488

    
489
    # The time is split to make serialization easier and not lose
490
    # precision.
491
    timestamp = utils.SplitTime(time.time())
492
    self._AppendFeedback(timestamp, log_type, log_msg)
493

    
494
  def ReportLocks(self, msg):
495
    """Write locking information to the job.
496

497
    Called whenever the LU processor is waiting for a lock or has acquired one.
498

499
    """
500
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
501
                               constants.OP_STATUS_CANCELING)
502

    
503
    # Cancel here if we were asked to
504
    self._CheckCancel()
505

    
506

    
507
class _JobChangesChecker(object):
508
  def __init__(self, fields, prev_job_info, prev_log_serial):
509
    """Initializes this class.
510

511
    @type fields: list of strings
512
    @param fields: Fields requested by LUXI client
513
    @type prev_job_info: string
514
    @param prev_job_info: previous job info, as passed by the LUXI client
515
    @type prev_log_serial: string
516
    @param prev_log_serial: previous job serial, as passed by the LUXI client
517

518
    """
519
    self._fields = fields
520
    self._prev_job_info = prev_job_info
521
    self._prev_log_serial = prev_log_serial
522

    
523
  def __call__(self, job):
524
    """Checks whether job has changed.
525

526
    @type job: L{_QueuedJob}
527
    @param job: Job object
528

529
    """
530
    status = job.CalcStatus()
531
    job_info = job.GetInfo(self._fields)
532
    log_entries = job.GetLogEntries(self._prev_log_serial)
533

    
534
    # Serializing and deserializing data can cause type changes (e.g. from
535
    # tuple to list) or precision loss. We're doing it here so that we get
536
    # the same modifications as the data received from the client. Without
537
    # this, the comparison afterwards might fail without the data being
538
    # significantly different.
539
    # TODO: we just deserialized from disk, investigate how to make sure that
540
    # the job info and log entries are compatible to avoid this further step.
541
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
542
    # efficient, though floats will be tricky
543
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
544
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
545

    
546
    # Don't even try to wait if the job is no longer running, there will be
547
    # no changes.
548
    if (status not in (constants.JOB_STATUS_QUEUED,
549
                       constants.JOB_STATUS_RUNNING,
550
                       constants.JOB_STATUS_WAITLOCK) or
551
        job_info != self._prev_job_info or
552
        (log_entries and self._prev_log_serial != log_entries[0][0])):
553
      logging.debug("Job %s changed", job.id)
554
      return (job_info, log_entries)
555

    
556
    return None
557

    
558

    
559
class _JobFileChangesWaiter(object):
560
  def __init__(self, filename):
561
    """Initializes this class.
562

563
    @type filename: string
564
    @param filename: Path to job file
565
    @raises errors.InotifyError: if the notifier cannot be setup
566

567
    """
568
    self._wm = pyinotify.WatchManager()
569
    self._inotify_handler = \
570
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
571
    self._notifier = \
572
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
573
    try:
574
      self._inotify_handler.enable()
575
    except Exception:
576
      # pyinotify doesn't close file descriptors automatically
577
      self._notifier.stop()
578
      raise
579

    
580
  def _OnInotify(self, notifier_enabled):
581
    """Callback for inotify.
582

583
    """
584
    if not notifier_enabled:
585
      self._inotify_handler.enable()
586

    
587
  def Wait(self, timeout):
588
    """Waits for the job file to change.
589

590
    @type timeout: float
591
    @param timeout: Timeout in seconds
592
    @return: Whether there have been events
593

594
    """
595
    assert timeout >= 0
596
    have_events = self._notifier.check_events(timeout * 1000)
597
    if have_events:
598
      self._notifier.read_events()
599
    self._notifier.process_events()
600
    return have_events
601

    
602
  def Close(self):
603
    """Closes underlying notifier and its file descriptor.
604

605
    """
606
    self._notifier.stop()
607

    
608

    
609
class _JobChangesWaiter(object):
610
  def __init__(self, filename):
611
    """Initializes this class.
612

613
    @type filename: string
614
    @param filename: Path to job file
615

616
    """
617
    self._filewaiter = None
618
    self._filename = filename
619

    
620
  def Wait(self, timeout):
621
    """Waits for a job to change.
622

623
    @type timeout: float
624
    @param timeout: Timeout in seconds
625
    @return: Whether there have been events
626

627
    """
628
    if self._filewaiter:
629
      return self._filewaiter.Wait(timeout)
630

    
631
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
632
    # If this point is reached, return immediately and let caller check the job
633
    # file again in case there were changes since the last check. This avoids a
634
    # race condition.
635
    self._filewaiter = _JobFileChangesWaiter(self._filename)
636

    
637
    return True
638

    
639
  def Close(self):
640
    """Closes underlying waiter.
641

642
    """
643
    if self._filewaiter:
644
      self._filewaiter.Close()
645

    
646

    
647
class _WaitForJobChangesHelper(object):
648
  """Helper class using inotify to wait for changes in a job file.
649

650
  This class takes a previous job status and serial, and alerts the client when
651
  the current job status has changed.
652

653
  """
654
  @staticmethod
655
  def _CheckForChanges(job_load_fn, check_fn):
656
    job = job_load_fn()
657
    if not job:
658
      raise errors.JobLost()
659

    
660
    result = check_fn(job)
661
    if result is None:
662
      raise utils.RetryAgain()
663

    
664
    return result
665

    
666
  def __call__(self, filename, job_load_fn,
667
               fields, prev_job_info, prev_log_serial, timeout):
668
    """Waits for changes on a job.
669

670
    @type filename: string
671
    @param filename: File on which to wait for changes
672
    @type job_load_fn: callable
673
    @param job_load_fn: Function to load job
674
    @type fields: list of strings
675
    @param fields: Which fields to check for changes
676
    @type prev_job_info: list or None
677
    @param prev_job_info: Last job information returned
678
    @type prev_log_serial: int
679
    @param prev_log_serial: Last job message serial number
680
    @type timeout: float
681
    @param timeout: maximum time to wait in seconds
682

683
    """
684
    try:
685
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
686
      waiter = _JobChangesWaiter(filename)
687
      try:
688
        return utils.Retry(compat.partial(self._CheckForChanges,
689
                                          job_load_fn, check_fn),
690
                           utils.RETRY_REMAINING_TIME, timeout,
691
                           wait_fn=waiter.Wait)
692
      finally:
693
        waiter.Close()
694
    except (errors.InotifyError, errors.JobLost):
695
      return None
696
    except utils.RetryTimeout:
697
      return constants.JOB_NOTCHANGED
698

    
699

    
700
def _EncodeOpError(err):
701
  """Encodes an error which occurred while processing an opcode.
702

703
  """
704
  if isinstance(err, errors.GenericError):
705
    to_encode = err
706
  else:
707
    to_encode = errors.OpExecError(str(err))
708

    
709
  return errors.EncodeException(to_encode)
710

    
711

    
712
class _JobQueueWorker(workerpool.BaseWorker):
713
  """The actual job workers.
714

715
  """
716
  def RunTask(self, job): # pylint: disable-msg=W0221
717
    """Job executor.
718

719
    This functions processes a job. It is closely tied to the _QueuedJob and
720
    _QueuedOpCode classes.
721

722
    @type job: L{_QueuedJob}
723
    @param job: the job to be processed
724

725
    """
726
    self.SetTaskName("Job%s" % job.id)
727

    
728
    logging.info("Processing job %s", job.id)
729
    proc = mcpu.Processor(self.pool.queue.context, job.id)
730
    queue = job.queue
731
    try:
732
      try:
733
        count = len(job.ops)
734
        for idx, op in enumerate(job.ops):
735
          op_summary = op.input.Summary()
736
          if op.status == constants.OP_STATUS_SUCCESS:
737
            # this is a job that was partially completed before master
738
            # daemon shutdown, so it can be expected that some opcodes
739
            # are already completed successfully (if any did error
740
            # out, then the whole job should have been aborted and not
741
            # resubmitted for processing)
742
            logging.info("Op %s/%s: opcode %s already processed, skipping",
743
                         idx + 1, count, op_summary)
744
            continue
745
          try:
746
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
747
                         op_summary)
748

    
749
            queue.acquire(shared=1)
750
            try:
751
              if op.status == constants.OP_STATUS_CANCELED:
752
                logging.debug("Canceling opcode")
753
                raise CancelJob()
754
              assert op.status == constants.OP_STATUS_QUEUED
755
              logging.debug("Opcode %s/%s waiting for locks",
756
                            idx + 1, count)
757
              op.status = constants.OP_STATUS_WAITLOCK
758
              op.result = None
759
              op.start_timestamp = TimeStampNow()
760
              if idx == 0: # first opcode
761
                job.start_timestamp = op.start_timestamp
762
              queue.UpdateJobUnlocked(job)
763

    
764
              input_opcode = op.input
765
            finally:
766
              queue.release()
767

    
768
            # Make sure not to hold queue lock while calling ExecOpCode
769
            result = proc.ExecOpCode(input_opcode,
770
                                     _OpExecCallbacks(queue, job, op))
771

    
772
            queue.acquire(shared=1)
773
            try:
774
              logging.debug("Opcode %s/%s succeeded", idx + 1, count)
775
              op.status = constants.OP_STATUS_SUCCESS
776
              op.result = result
777
              op.end_timestamp = TimeStampNow()
778
              if idx == count - 1:
779
                job.end_timestamp = TimeStampNow()
780

    
781
                # Consistency check
782
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
783
                                  for i in job.ops)
784

    
785
              queue.UpdateJobUnlocked(job)
786
            finally:
787
              queue.release()
788

    
789
            logging.info("Op %s/%s: Successfully finished opcode %s",
790
                         idx + 1, count, op_summary)
791
          except CancelJob:
792
            # Will be handled further up
793
            raise
794
          except Exception, err:
795
            queue.acquire(shared=1)
796
            try:
797
              try:
798
                logging.debug("Opcode %s/%s failed", idx + 1, count)
799
                op.status = constants.OP_STATUS_ERROR
800
                op.result = _EncodeOpError(err)
801
                op.end_timestamp = TimeStampNow()
802
                logging.info("Op %s/%s: Error in opcode %s: %s",
803
                             idx + 1, count, op_summary, err)
804

    
805
                to_encode = errors.OpExecError("Preceding opcode failed")
806
                job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
807
                                      _EncodeOpError(to_encode))
808

    
809
                # Consistency check
810
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
811
                                  for i in job.ops[:idx])
812
                assert compat.all(i.status == constants.OP_STATUS_ERROR and
813
                                  errors.GetEncodedError(i.result)
814
                                  for i in job.ops[idx:])
815
              finally:
816
                job.end_timestamp = TimeStampNow()
817
                queue.UpdateJobUnlocked(job)
818
            finally:
819
              queue.release()
820
            raise
821

    
822
      except CancelJob:
823
        queue.acquire(shared=1)
824
        try:
825
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
826
                                "Job canceled by request")
827
          job.end_timestamp = TimeStampNow()
828
          queue.UpdateJobUnlocked(job)
829
        finally:
830
          queue.release()
831
      except errors.GenericError, err:
832
        logging.exception("Ganeti exception")
833
      except:
834
        logging.exception("Unhandled exception")
835
    finally:
836
      status = job.CalcStatus()
837
      logging.info("Finished job %s, status = %s", job.id, status)
838

    
839

    
840
class _JobQueueWorkerPool(workerpool.WorkerPool):
841
  """Simple class implementing a job-processing workerpool.
842

843
  """
844
  def __init__(self, queue):
845
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
846
                                              JOBQUEUE_THREADS,
847
                                              _JobQueueWorker)
848
    self.queue = queue
849

    
850

    
851
def _RequireOpenQueue(fn):
852
  """Decorator for "public" functions.
853

854
  This function should be used for all 'public' functions. That is,
855
  functions usually called from other classes. Note that this should
856
  be applied only to methods (not plain functions), since it expects
857
  that the decorated function is called with a first argument that has
858
  a '_queue_filelock' argument.
859

860
  @warning: Use this decorator only after locking.ssynchronized
861

862
  Example::
863
    @locking.ssynchronized(_LOCK)
864
    @_RequireOpenQueue
865
    def Example(self):
866
      pass
867

868
  """
869
  def wrapper(self, *args, **kwargs):
870
    # pylint: disable-msg=W0212
871
    assert self._queue_filelock is not None, "Queue should be open"
872
    return fn(self, *args, **kwargs)
873
  return wrapper
874

    
875

    
876
class JobQueue(object):
877
  """Queue used to manage the jobs.
878

879
  @cvar _RE_JOB_FILE: regex matching the valid job file names
880

881
  """
882
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
883

    
884
  def __init__(self, context):
885
    """Constructor for JobQueue.
886

887
    The constructor will initialize the job queue object and then
888
    start loading the current jobs from disk, either for starting them
889
    (if they were queue) or for aborting them (if they were already
890
    running).
891

892
    @type context: GanetiContext
893
    @param context: the context object for access to the configuration
894
        data and other ganeti objects
895

896
    """
897
    self.context = context
898
    self._memcache = weakref.WeakValueDictionary()
899
    self._my_hostname = netutils.Hostname.GetSysName()
900

    
901
    # The Big JobQueue lock. If a code block or method acquires it in shared
902
    # mode safe it must guarantee concurrency with all the code acquiring it in
903
    # shared mode, including itself. In order not to acquire it at all
904
    # concurrency must be guaranteed with all code acquiring it in shared mode
905
    # and all code acquiring it exclusively.
906
    self._lock = locking.SharedLock("JobQueue")
907

    
908
    self.acquire = self._lock.acquire
909
    self.release = self._lock.release
910

    
911
    # Initialize the queue, and acquire the filelock.
912
    # This ensures no other process is working on the job queue.
913
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
914

    
915
    # Read serial file
916
    self._last_serial = jstore.ReadSerial()
917
    assert self._last_serial is not None, ("Serial file was modified between"
918
                                           " check in jstore and here")
919

    
920
    # Get initial list of nodes
921
    self._nodes = dict((n.name, n.primary_ip)
922
                       for n in self.context.cfg.GetAllNodesInfo().values()
923
                       if n.master_candidate)
924

    
925
    # Remove master node
926
    self._nodes.pop(self._my_hostname, None)
927

    
928
    # TODO: Check consistency across nodes
929

    
930
    self._queue_size = 0
931
    self._UpdateQueueSizeUnlocked()
932
    self._drained = self._IsQueueMarkedDrain()
933

    
934
    # Setup worker pool
935
    self._wpool = _JobQueueWorkerPool(self)
936
    try:
937
      self._InspectQueue()
938
    except:
939
      self._wpool.TerminateWorkers()
940
      raise
941

    
942
  @locking.ssynchronized(_LOCK)
943
  @_RequireOpenQueue
944
  def _InspectQueue(self):
945
    """Loads the whole job queue and resumes unfinished jobs.
946

947
    This function needs the lock here because WorkerPool.AddTask() may start a
948
    job while we're still doing our work.
949

950
    """
951
    logging.info("Inspecting job queue")
952

    
953
    all_job_ids = self._GetJobIDsUnlocked()
954
    jobs_count = len(all_job_ids)
955
    lastinfo = time.time()
956
    for idx, job_id in enumerate(all_job_ids):
957
      # Give an update every 1000 jobs or 10 seconds
958
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
959
          idx == (jobs_count - 1)):
960
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
961
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
962
        lastinfo = time.time()
963

    
964
      job = self._LoadJobUnlocked(job_id)
965

    
966
      # a failure in loading the job can cause 'None' to be returned
967
      if job is None:
968
        continue
969

    
970
      status = job.CalcStatus()
971

    
972
      if status in (constants.JOB_STATUS_QUEUED,
973
                    constants.JOB_STATUS_WAITLOCK):
974
        self._wpool.AddTask((job, ))
975

    
976
      elif status in (constants.JOB_STATUS_RUNNING,
977
                      constants.JOB_STATUS_CANCELING):
978
        logging.warning("Unfinished job %s found: %s", job.id, job)
979
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
980
                              "Unclean master daemon shutdown")
981
        self.UpdateJobUnlocked(job)
982

    
983
    logging.info("Job queue inspection finished")
984

    
985
  @locking.ssynchronized(_LOCK)
986
  @_RequireOpenQueue
987
  def AddNode(self, node):
988
    """Register a new node with the queue.
989

990
    @type node: L{objects.Node}
991
    @param node: the node object to be added
992

993
    """
994
    node_name = node.name
995
    assert node_name != self._my_hostname
996

    
997
    # Clean queue directory on added node
998
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
999
    msg = result.fail_msg
1000
    if msg:
1001
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1002
                      node_name, msg)
1003

    
1004
    if not node.master_candidate:
1005
      # remove if existing, ignoring errors
1006
      self._nodes.pop(node_name, None)
1007
      # and skip the replication of the job ids
1008
      return
1009

    
1010
    # Upload the whole queue excluding archived jobs
1011
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1012

    
1013
    # Upload current serial file
1014
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1015

    
1016
    for file_name in files:
1017
      # Read file content
1018
      content = utils.ReadFile(file_name)
1019

    
1020
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
1021
                                                  [node.primary_ip],
1022
                                                  file_name, content)
1023
      msg = result[node_name].fail_msg
1024
      if msg:
1025
        logging.error("Failed to upload file %s to node %s: %s",
1026
                      file_name, node_name, msg)
1027

    
1028
    self._nodes[node_name] = node.primary_ip
1029

    
1030
  @locking.ssynchronized(_LOCK)
1031
  @_RequireOpenQueue
1032
  def RemoveNode(self, node_name):
1033
    """Callback called when removing nodes from the cluster.
1034

1035
    @type node_name: str
1036
    @param node_name: the name of the node to remove
1037

1038
    """
1039
    self._nodes.pop(node_name, None)
1040

    
1041
  @staticmethod
1042
  def _CheckRpcResult(result, nodes, failmsg):
1043
    """Verifies the status of an RPC call.
1044

1045
    Since we aim to keep consistency should this node (the current
1046
    master) fail, we will log errors if our rpc fail, and especially
1047
    log the case when more than half of the nodes fails.
1048

1049
    @param result: the data as returned from the rpc call
1050
    @type nodes: list
1051
    @param nodes: the list of nodes we made the call to
1052
    @type failmsg: str
1053
    @param failmsg: the identifier to be used for logging
1054

1055
    """
1056
    failed = []
1057
    success = []
1058

    
1059
    for node in nodes:
1060
      msg = result[node].fail_msg
1061
      if msg:
1062
        failed.append(node)
1063
        logging.error("RPC call %s (%s) failed on node %s: %s",
1064
                      result[node].call, failmsg, node, msg)
1065
      else:
1066
        success.append(node)
1067

    
1068
    # +1 for the master node
1069
    if (len(success) + 1) < len(failed):
1070
      # TODO: Handle failing nodes
1071
      logging.error("More than half of the nodes failed")
1072

    
1073
  def _GetNodeIp(self):
1074
    """Helper for returning the node name/ip list.
1075

1076
    @rtype: (list, list)
1077
    @return: a tuple of two lists, the first one with the node
1078
        names and the second one with the node addresses
1079

1080
    """
1081
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1082
    name_list = self._nodes.keys()
1083
    addr_list = [self._nodes[name] for name in name_list]
1084
    return name_list, addr_list
1085

    
1086
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1087
    """Writes a file locally and then replicates it to all nodes.
1088

1089
    This function will replace the contents of a file on the local
1090
    node and then replicate it to all the other nodes we have.
1091

1092
    @type file_name: str
1093
    @param file_name: the path of the file to be replicated
1094
    @type data: str
1095
    @param data: the new contents of the file
1096
    @type replicate: boolean
1097
    @param replicate: whether to spread the changes to the remote nodes
1098

1099
    """
1100
    getents = runtime.GetEnts()
1101
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1102
                    gid=getents.masterd_gid)
1103

    
1104
    if replicate:
1105
      names, addrs = self._GetNodeIp()
1106
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1107
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1108

    
1109
  def _RenameFilesUnlocked(self, rename):
1110
    """Renames a file locally and then replicate the change.
1111

1112
    This function will rename a file in the local queue directory
1113
    and then replicate this rename to all the other nodes we have.
1114

1115
    @type rename: list of (old, new)
1116
    @param rename: List containing tuples mapping old to new names
1117

1118
    """
1119
    # Rename them locally
1120
    for old, new in rename:
1121
      utils.RenameFile(old, new, mkdir=True)
1122

    
1123
    # ... and on all nodes
1124
    names, addrs = self._GetNodeIp()
1125
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1126
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1127

    
1128
  @staticmethod
1129
  def _FormatJobID(job_id):
1130
    """Convert a job ID to string format.
1131

1132
    Currently this just does C{str(job_id)} after performing some
1133
    checks, but if we want to change the job id format this will
1134
    abstract this change.
1135

1136
    @type job_id: int or long
1137
    @param job_id: the numeric job id
1138
    @rtype: str
1139
    @return: the formatted job id
1140

1141
    """
1142
    if not isinstance(job_id, (int, long)):
1143
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1144
    if job_id < 0:
1145
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1146

    
1147
    return str(job_id)
1148

    
1149
  @classmethod
1150
  def _GetArchiveDirectory(cls, job_id):
1151
    """Returns the archive directory for a job.
1152

1153
    @type job_id: str
1154
    @param job_id: Job identifier
1155
    @rtype: str
1156
    @return: Directory name
1157

1158
    """
1159
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1160

    
1161
  def _NewSerialsUnlocked(self, count):
1162
    """Generates a new job identifier.
1163

1164
    Job identifiers are unique during the lifetime of a cluster.
1165

1166
    @type count: integer
1167
    @param count: how many serials to return
1168
    @rtype: str
1169
    @return: a string representing the job identifier.
1170

1171
    """
1172
    assert count > 0
1173
    # New number
1174
    serial = self._last_serial + count
1175

    
1176
    # Write to file
1177
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1178
                             "%s\n" % serial, True)
1179

    
1180
    result = [self._FormatJobID(v)
1181
              for v in range(self._last_serial, serial + 1)]
1182
    # Keep it only if we were able to write the file
1183
    self._last_serial = serial
1184

    
1185
    return result
1186

    
1187
  @staticmethod
1188
  def _GetJobPath(job_id):
1189
    """Returns the job file for a given job id.
1190

1191
    @type job_id: str
1192
    @param job_id: the job identifier
1193
    @rtype: str
1194
    @return: the path to the job file
1195

1196
    """
1197
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1198

    
1199
  @classmethod
1200
  def _GetArchivedJobPath(cls, job_id):
1201
    """Returns the archived job file for a give job id.
1202

1203
    @type job_id: str
1204
    @param job_id: the job identifier
1205
    @rtype: str
1206
    @return: the path to the archived job file
1207

1208
    """
1209
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1210
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1211

    
1212
  def _GetJobIDsUnlocked(self, sort=True):
1213
    """Return all known job IDs.
1214

1215
    The method only looks at disk because it's a requirement that all
1216
    jobs are present on disk (so in the _memcache we don't have any
1217
    extra IDs).
1218

1219
    @type sort: boolean
1220
    @param sort: perform sorting on the returned job ids
1221
    @rtype: list
1222
    @return: the list of job IDs
1223

1224
    """
1225
    jlist = []
1226
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1227
      m = self._RE_JOB_FILE.match(filename)
1228
      if m:
1229
        jlist.append(m.group(1))
1230
    if sort:
1231
      jlist = utils.NiceSort(jlist)
1232
    return jlist
1233

    
1234
  def _LoadJobUnlocked(self, job_id):
1235
    """Loads a job from the disk or memory.
1236

1237
    Given a job id, this will return the cached job object if
1238
    existing, or try to load the job from the disk. If loading from
1239
    disk, it will also add the job to the cache.
1240

1241
    @param job_id: the job id
1242
    @rtype: L{_QueuedJob} or None
1243
    @return: either None or the job object
1244

1245
    """
1246
    job = self._memcache.get(job_id, None)
1247
    if job:
1248
      logging.debug("Found job %s in memcache", job_id)
1249
      return job
1250

    
1251
    try:
1252
      job = self._LoadJobFromDisk(job_id)
1253
      if job is None:
1254
        return job
1255
    except errors.JobFileCorrupted:
1256
      old_path = self._GetJobPath(job_id)
1257
      new_path = self._GetArchivedJobPath(job_id)
1258
      if old_path == new_path:
1259
        # job already archived (future case)
1260
        logging.exception("Can't parse job %s", job_id)
1261
      else:
1262
        # non-archived case
1263
        logging.exception("Can't parse job %s, will archive.", job_id)
1264
        self._RenameFilesUnlocked([(old_path, new_path)])
1265
      return None
1266

    
1267
    self._memcache[job_id] = job
1268
    logging.debug("Added job %s to the cache", job_id)
1269
    return job
1270

    
1271
  def _LoadJobFromDisk(self, job_id):
1272
    """Load the given job file from disk.
1273

1274
    Given a job file, read, load and restore it in a _QueuedJob format.
1275

1276
    @type job_id: string
1277
    @param job_id: job identifier
1278
    @rtype: L{_QueuedJob} or None
1279
    @return: either None or the job object
1280

1281
    """
1282
    filepath = self._GetJobPath(job_id)
1283
    logging.debug("Loading job from %s", filepath)
1284
    try:
1285
      raw_data = utils.ReadFile(filepath)
1286
    except EnvironmentError, err:
1287
      if err.errno in (errno.ENOENT, ):
1288
        return None
1289
      raise
1290

    
1291
    try:
1292
      data = serializer.LoadJson(raw_data)
1293
      job = _QueuedJob.Restore(self, data)
1294
    except Exception, err: # pylint: disable-msg=W0703
1295
      raise errors.JobFileCorrupted(err)
1296

    
1297
    return job
1298

    
1299
  def SafeLoadJobFromDisk(self, job_id):
1300
    """Load the given job file from disk.
1301

1302
    Given a job file, read, load and restore it in a _QueuedJob format.
1303
    In case of error reading the job, it gets returned as None, and the
1304
    exception is logged.
1305

1306
    @type job_id: string
1307
    @param job_id: job identifier
1308
    @rtype: L{_QueuedJob} or None
1309
    @return: either None or the job object
1310

1311
    """
1312
    try:
1313
      return self._LoadJobFromDisk(job_id)
1314
    except (errors.JobFileCorrupted, EnvironmentError):
1315
      logging.exception("Can't load/parse job %s", job_id)
1316
      return None
1317

    
1318
  @staticmethod
1319
  def _IsQueueMarkedDrain():
1320
    """Check if the queue is marked from drain.
1321

1322
    This currently uses the queue drain file, which makes it a
1323
    per-node flag. In the future this can be moved to the config file.
1324

1325
    @rtype: boolean
1326
    @return: True of the job queue is marked for draining
1327

1328
    """
1329
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1330

    
1331
  def _UpdateQueueSizeUnlocked(self):
1332
    """Update the queue size.
1333

1334
    """
1335
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1336

    
1337
  @locking.ssynchronized(_LOCK)
1338
  @_RequireOpenQueue
1339
  def SetDrainFlag(self, drain_flag):
1340
    """Sets the drain flag for the queue.
1341

1342
    @type drain_flag: boolean
1343
    @param drain_flag: Whether to set or unset the drain flag
1344

1345
    """
1346
    getents = runtime.GetEnts()
1347

    
1348
    if drain_flag:
1349
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
1350
                      uid=getents.masterd_uid, gid=getents.masterd_gid)
1351
    else:
1352
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1353

    
1354
    self._drained = drain_flag
1355

    
1356
    return True
1357

    
1358
  @_RequireOpenQueue
1359
  def _SubmitJobUnlocked(self, job_id, ops):
1360
    """Create and store a new job.
1361

1362
    This enters the job into our job queue and also puts it on the new
1363
    queue, in order for it to be picked up by the queue processors.
1364

1365
    @type job_id: job ID
1366
    @param job_id: the job ID for the new job
1367
    @type ops: list
1368
    @param ops: The list of OpCodes that will become the new job.
1369
    @rtype: L{_QueuedJob}
1370
    @return: the job object to be queued
1371
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1372
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1373
    @raise errors.GenericError: If an opcode is not valid
1374

1375
    """
1376
    # Ok when sharing the big job queue lock, as the drain file is created when
1377
    # the lock is exclusive.
1378
    if self._drained:
1379
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1380

    
1381
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1382
      raise errors.JobQueueFull()
1383

    
1384
    job = _QueuedJob(self, job_id, ops)
1385

    
1386
    # Check priority
1387
    for idx, op in enumerate(job.ops):
1388
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1389
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1390
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1391
                                  " are %s" % (idx, op.priority, allowed))
1392

    
1393
    # Write to disk
1394
    self.UpdateJobUnlocked(job)
1395

    
1396
    self._queue_size += 1
1397

    
1398
    logging.debug("Adding new job %s to the cache", job_id)
1399
    self._memcache[job_id] = job
1400

    
1401
    return job
1402

    
1403
  @locking.ssynchronized(_LOCK)
1404
  @_RequireOpenQueue
1405
  def SubmitJob(self, ops):
1406
    """Create and store a new job.
1407

1408
    @see: L{_SubmitJobUnlocked}
1409

1410
    """
1411
    job_id = self._NewSerialsUnlocked(1)[0]
1412
    self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
1413
    return job_id
1414

    
1415
  @locking.ssynchronized(_LOCK)
1416
  @_RequireOpenQueue
1417
  def SubmitManyJobs(self, jobs):
1418
    """Create and store multiple jobs.
1419

1420
    @see: L{_SubmitJobUnlocked}
1421

1422
    """
1423
    results = []
1424
    tasks = []
1425
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1426
    for job_id, ops in zip(all_job_ids, jobs):
1427
      try:
1428
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1429
        status = True
1430
        data = job_id
1431
      except errors.GenericError, err:
1432
        data = str(err)
1433
        status = False
1434
      results.append((status, data))
1435
    self._wpool.AddManyTasks(tasks)
1436

    
1437
    return results
1438

    
1439
  @_RequireOpenQueue
1440
  def UpdateJobUnlocked(self, job, replicate=True):
1441
    """Update a job's on disk storage.
1442

1443
    After a job has been modified, this function needs to be called in
1444
    order to write the changes to disk and replicate them to the other
1445
    nodes.
1446

1447
    @type job: L{_QueuedJob}
1448
    @param job: the changed job
1449
    @type replicate: boolean
1450
    @param replicate: whether to replicate the change to remote nodes
1451

1452
    """
1453
    filename = self._GetJobPath(job.id)
1454
    data = serializer.DumpJson(job.Serialize(), indent=False)
1455
    logging.debug("Writing job %s to %s", job.id, filename)
1456
    self._UpdateJobQueueFile(filename, data, replicate)
1457

    
1458
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1459
                        timeout):
1460
    """Waits for changes in a job.
1461

1462
    @type job_id: string
1463
    @param job_id: Job identifier
1464
    @type fields: list of strings
1465
    @param fields: Which fields to check for changes
1466
    @type prev_job_info: list or None
1467
    @param prev_job_info: Last job information returned
1468
    @type prev_log_serial: int
1469
    @param prev_log_serial: Last job message serial number
1470
    @type timeout: float
1471
    @param timeout: maximum time to wait in seconds
1472
    @rtype: tuple (job info, log entries)
1473
    @return: a tuple of the job information as required via
1474
        the fields parameter, and the log entries as a list
1475

1476
        if the job has not changed and the timeout has expired,
1477
        we instead return a special value,
1478
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1479
        as such by the clients
1480

1481
    """
1482
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1483

    
1484
    helper = _WaitForJobChangesHelper()
1485

    
1486
    return helper(self._GetJobPath(job_id), load_fn,
1487
                  fields, prev_job_info, prev_log_serial, timeout)
1488

    
1489
  @locking.ssynchronized(_LOCK)
1490
  @_RequireOpenQueue
1491
  def CancelJob(self, job_id):
1492
    """Cancels a job.
1493

1494
    This will only succeed if the job has not started yet.
1495

1496
    @type job_id: string
1497
    @param job_id: job ID of job to be cancelled.
1498

1499
    """
1500
    logging.info("Cancelling job %s", job_id)
1501

    
1502
    job = self._LoadJobUnlocked(job_id)
1503
    if not job:
1504
      logging.debug("Job %s not found", job_id)
1505
      return (False, "Job %s not found" % job_id)
1506

    
1507
    job_status = job.CalcStatus()
1508

    
1509
    if job_status not in (constants.JOB_STATUS_QUEUED,
1510
                          constants.JOB_STATUS_WAITLOCK):
1511
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1512
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1513

    
1514
    if job_status == constants.JOB_STATUS_QUEUED:
1515
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1516
                            "Job canceled by request")
1517
      msg = "Job %s canceled" % job.id
1518

    
1519
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1520
      # The worker will notice the new status and cancel the job
1521
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1522
      msg = "Job %s will be canceled" % job.id
1523

    
1524
    self.UpdateJobUnlocked(job)
1525

    
1526
    return (True, msg)
1527

    
1528
  @_RequireOpenQueue
1529
  def _ArchiveJobsUnlocked(self, jobs):
1530
    """Archives jobs.
1531

1532
    @type jobs: list of L{_QueuedJob}
1533
    @param jobs: Job objects
1534
    @rtype: int
1535
    @return: Number of archived jobs
1536

1537
    """
1538
    archive_jobs = []
1539
    rename_files = []
1540
    for job in jobs:
1541
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1542
        logging.debug("Job %s is not yet done", job.id)
1543
        continue
1544

    
1545
      archive_jobs.append(job)
1546

    
1547
      old = self._GetJobPath(job.id)
1548
      new = self._GetArchivedJobPath(job.id)
1549
      rename_files.append((old, new))
1550

    
1551
    # TODO: What if 1..n files fail to rename?
1552
    self._RenameFilesUnlocked(rename_files)
1553

    
1554
    logging.debug("Successfully archived job(s) %s",
1555
                  utils.CommaJoin(job.id for job in archive_jobs))
1556

    
1557
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1558
    # the files, we update the cached queue size from the filesystem. When we
1559
    # get around to fix the TODO: above, we can use the number of actually
1560
    # archived jobs to fix this.
1561
    self._UpdateQueueSizeUnlocked()
1562
    return len(archive_jobs)
1563

    
1564
  @locking.ssynchronized(_LOCK)
1565
  @_RequireOpenQueue
1566
  def ArchiveJob(self, job_id):
1567
    """Archives a job.
1568

1569
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1570

1571
    @type job_id: string
1572
    @param job_id: Job ID of job to be archived.
1573
    @rtype: bool
1574
    @return: Whether job was archived
1575

1576
    """
1577
    logging.info("Archiving job %s", job_id)
1578

    
1579
    job = self._LoadJobUnlocked(job_id)
1580
    if not job:
1581
      logging.debug("Job %s not found", job_id)
1582
      return False
1583

    
1584
    return self._ArchiveJobsUnlocked([job]) == 1
1585

    
1586
  @locking.ssynchronized(_LOCK)
1587
  @_RequireOpenQueue
1588
  def AutoArchiveJobs(self, age, timeout):
1589
    """Archives all jobs based on age.
1590

1591
    The method will archive all jobs which are older than the age
1592
    parameter. For jobs that don't have an end timestamp, the start
1593
    timestamp will be considered. The special '-1' age will cause
1594
    archival of all jobs (that are not running or queued).
1595

1596
    @type age: int
1597
    @param age: the minimum age in seconds
1598

1599
    """
1600
    logging.info("Archiving jobs with age more than %s seconds", age)
1601

    
1602
    now = time.time()
1603
    end_time = now + timeout
1604
    archived_count = 0
1605
    last_touched = 0
1606

    
1607
    all_job_ids = self._GetJobIDsUnlocked()
1608
    pending = []
1609
    for idx, job_id in enumerate(all_job_ids):
1610
      last_touched = idx + 1
1611

    
1612
      # Not optimal because jobs could be pending
1613
      # TODO: Measure average duration for job archival and take number of
1614
      # pending jobs into account.
1615
      if time.time() > end_time:
1616
        break
1617

    
1618
      # Returns None if the job failed to load
1619
      job = self._LoadJobUnlocked(job_id)
1620
      if job:
1621
        if job.end_timestamp is None:
1622
          if job.start_timestamp is None:
1623
            job_age = job.received_timestamp
1624
          else:
1625
            job_age = job.start_timestamp
1626
        else:
1627
          job_age = job.end_timestamp
1628

    
1629
        if age == -1 or now - job_age[0] > age:
1630
          pending.append(job)
1631

    
1632
          # Archive 10 jobs at a time
1633
          if len(pending) >= 10:
1634
            archived_count += self._ArchiveJobsUnlocked(pending)
1635
            pending = []
1636

    
1637
    if pending:
1638
      archived_count += self._ArchiveJobsUnlocked(pending)
1639

    
1640
    return (archived_count, len(all_job_ids) - last_touched)
1641

    
1642
  def QueryJobs(self, job_ids, fields):
1643
    """Returns a list of jobs in queue.
1644

1645
    @type job_ids: list
1646
    @param job_ids: sequence of job identifiers or None for all
1647
    @type fields: list
1648
    @param fields: names of fields to return
1649
    @rtype: list
1650
    @return: list one element per job, each element being list with
1651
        the requested fields
1652

1653
    """
1654
    jobs = []
1655
    list_all = False
1656
    if not job_ids:
1657
      # Since files are added to/removed from the queue atomically, there's no
1658
      # risk of getting the job ids in an inconsistent state.
1659
      job_ids = self._GetJobIDsUnlocked()
1660
      list_all = True
1661

    
1662
    for job_id in job_ids:
1663
      job = self.SafeLoadJobFromDisk(job_id)
1664
      if job is not None:
1665
        jobs.append(job.GetInfo(fields))
1666
      elif not list_all:
1667
        jobs.append(None)
1668

    
1669
    return jobs
1670

    
1671
  @locking.ssynchronized(_LOCK)
1672
  @_RequireOpenQueue
1673
  def Shutdown(self):
1674
    """Stops the job queue.
1675

1676
    This shutdowns all the worker threads an closes the queue.
1677

1678
    """
1679
    self._wpool.TerminateWorkers()
1680

    
1681
    self._queue_filelock.Close()
1682
    self._queue_filelock = None