Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 963a068b

History | View | Annotate | Download (49.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import errno
35
import re
36
import time
37
import weakref
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import netutils
57
from ganeti import compat
58

    
59

    
60
JOBQUEUE_THREADS = 25
61
JOBS_PER_ARCHIVE_DIRECTORY = 10000
62

    
63
# member lock names to be passed to @ssynchronized decorator
64
_LOCK = "_lock"
65
_QUEUE = "_queue"
66

    
67

    
68
class CancelJob(Exception):
69
  """Special exception to cancel a job.
70

71
  """
72

    
73

    
74
def TimeStampNow():
75
  """Returns the current timestamp.
76

77
  @rtype: tuple
78
  @return: the current time in the (seconds, microseconds) format
79

80
  """
81
  return utils.SplitTime(time.time())
82

    
83

    
84
class _QueuedOpCode(object):
85
  """Encapsulates an opcode object.
86

87
  @ivar log: holds the execution log and consists of tuples
88
  of the form C{(log_serial, timestamp, level, message)}
89
  @ivar input: the OpCode we encapsulate
90
  @ivar status: the current status
91
  @ivar result: the result of the LU execution
92
  @ivar start_timestamp: timestamp for the start of the execution
93
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94
  @ivar stop_timestamp: timestamp for the end of the execution
95

96
  """
97
  __slots__ = ["input", "status", "result", "log",
98
               "start_timestamp", "exec_timestamp", "end_timestamp",
99
               "__weakref__"]
100

    
101
  def __init__(self, op):
102
    """Constructor for the _QuededOpCode.
103

104
    @type op: L{opcodes.OpCode}
105
    @param op: the opcode we encapsulate
106

107
    """
108
    self.input = op
109
    self.status = constants.OP_STATUS_QUEUED
110
    self.result = None
111
    self.log = []
112
    self.start_timestamp = None
113
    self.exec_timestamp = None
114
    self.end_timestamp = None
115

    
116
  @classmethod
117
  def Restore(cls, state):
118
    """Restore the _QueuedOpCode from the serialized form.
119

120
    @type state: dict
121
    @param state: the serialized state
122
    @rtype: _QueuedOpCode
123
    @return: a new _QueuedOpCode instance
124

125
    """
126
    obj = _QueuedOpCode.__new__(cls)
127
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
128
    obj.status = state["status"]
129
    obj.result = state["result"]
130
    obj.log = state["log"]
131
    obj.start_timestamp = state.get("start_timestamp", None)
132
    obj.exec_timestamp = state.get("exec_timestamp", None)
133
    obj.end_timestamp = state.get("end_timestamp", None)
134
    return obj
135

    
136
  def Serialize(self):
137
    """Serializes this _QueuedOpCode.
138

139
    @rtype: dict
140
    @return: the dictionary holding the serialized state
141

142
    """
143
    return {
144
      "input": self.input.__getstate__(),
145
      "status": self.status,
146
      "result": self.result,
147
      "log": self.log,
148
      "start_timestamp": self.start_timestamp,
149
      "exec_timestamp": self.exec_timestamp,
150
      "end_timestamp": self.end_timestamp,
151
      }
152

    
153

    
154
class _QueuedJob(object):
155
  """In-memory job representation.
156

157
  This is what we use to track the user-submitted jobs. Locking must
158
  be taken care of by users of this class.
159

160
  @type queue: L{JobQueue}
161
  @ivar queue: the parent queue
162
  @ivar id: the job ID
163
  @type ops: list
164
  @ivar ops: the list of _QueuedOpCode that constitute the job
165
  @type log_serial: int
166
  @ivar log_serial: holds the index for the next log entry
167
  @ivar received_timestamp: the timestamp for when the job was received
168
  @ivar start_timestmap: the timestamp for start of execution
169
  @ivar end_timestamp: the timestamp for end of execution
170
  @ivar lock_status: In-memory locking information for debugging
171

172
  """
173
  # pylint: disable-msg=W0212
174
  __slots__ = ["queue", "id", "ops", "log_serial",
175
               "received_timestamp", "start_timestamp", "end_timestamp",
176
               "lock_status", "change",
177
               "__weakref__"]
178

    
179
  def __init__(self, queue, job_id, ops):
180
    """Constructor for the _QueuedJob.
181

182
    @type queue: L{JobQueue}
183
    @param queue: our parent queue
184
    @type job_id: job_id
185
    @param job_id: our job id
186
    @type ops: list
187
    @param ops: the list of opcodes we hold, which will be encapsulated
188
        in _QueuedOpCodes
189

190
    """
191
    if not ops:
192
      raise errors.GenericError("A job needs at least one opcode")
193

    
194
    self.queue = queue
195
    self.id = job_id
196
    self.ops = [_QueuedOpCode(op) for op in ops]
197
    self.log_serial = 0
198
    self.received_timestamp = TimeStampNow()
199
    self.start_timestamp = None
200
    self.end_timestamp = None
201

    
202
    # In-memory attributes
203
    self.lock_status = None
204

    
205
  def __repr__(self):
206
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
207
              "id=%s" % self.id,
208
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
209

    
210
    return "<%s at %#x>" % (" ".join(status), id(self))
211

    
212
  @classmethod
213
  def Restore(cls, queue, state):
214
    """Restore a _QueuedJob from serialized state:
215

216
    @type queue: L{JobQueue}
217
    @param queue: to which queue the restored job belongs
218
    @type state: dict
219
    @param state: the serialized state
220
    @rtype: _JobQueue
221
    @return: the restored _JobQueue instance
222

223
    """
224
    obj = _QueuedJob.__new__(cls)
225
    obj.queue = queue
226
    obj.id = state["id"]
227
    obj.received_timestamp = state.get("received_timestamp", None)
228
    obj.start_timestamp = state.get("start_timestamp", None)
229
    obj.end_timestamp = state.get("end_timestamp", None)
230

    
231
    # In-memory attributes
232
    obj.lock_status = None
233

    
234
    obj.ops = []
235
    obj.log_serial = 0
236
    for op_state in state["ops"]:
237
      op = _QueuedOpCode.Restore(op_state)
238
      for log_entry in op.log:
239
        obj.log_serial = max(obj.log_serial, log_entry[0])
240
      obj.ops.append(op)
241

    
242
    return obj
243

    
244
  def Serialize(self):
245
    """Serialize the _JobQueue instance.
246

247
    @rtype: dict
248
    @return: the serialized state
249

250
    """
251
    return {
252
      "id": self.id,
253
      "ops": [op.Serialize() for op in self.ops],
254
      "start_timestamp": self.start_timestamp,
255
      "end_timestamp": self.end_timestamp,
256
      "received_timestamp": self.received_timestamp,
257
      }
258

    
259
  def CalcStatus(self):
260
    """Compute the status of this job.
261

262
    This function iterates over all the _QueuedOpCodes in the job and
263
    based on their status, computes the job status.
264

265
    The algorithm is:
266
      - if we find a cancelled, or finished with error, the job
267
        status will be the same
268
      - otherwise, the last opcode with the status one of:
269
          - waitlock
270
          - canceling
271
          - running
272

273
        will determine the job status
274

275
      - otherwise, it means either all opcodes are queued, or success,
276
        and the job status will be the same
277

278
    @return: the job status
279

280
    """
281
    status = constants.JOB_STATUS_QUEUED
282

    
283
    all_success = True
284
    for op in self.ops:
285
      if op.status == constants.OP_STATUS_SUCCESS:
286
        continue
287

    
288
      all_success = False
289

    
290
      if op.status == constants.OP_STATUS_QUEUED:
291
        pass
292
      elif op.status == constants.OP_STATUS_WAITLOCK:
293
        status = constants.JOB_STATUS_WAITLOCK
294
      elif op.status == constants.OP_STATUS_RUNNING:
295
        status = constants.JOB_STATUS_RUNNING
296
      elif op.status == constants.OP_STATUS_CANCELING:
297
        status = constants.JOB_STATUS_CANCELING
298
        break
299
      elif op.status == constants.OP_STATUS_ERROR:
300
        status = constants.JOB_STATUS_ERROR
301
        # The whole job fails if one opcode failed
302
        break
303
      elif op.status == constants.OP_STATUS_CANCELED:
304
        status = constants.OP_STATUS_CANCELED
305
        break
306

    
307
    if all_success:
308
      status = constants.JOB_STATUS_SUCCESS
309

    
310
    return status
311

    
312
  def GetLogEntries(self, newer_than):
313
    """Selectively returns the log entries.
314

315
    @type newer_than: None or int
316
    @param newer_than: if this is None, return all log entries,
317
        otherwise return only the log entries with serial higher
318
        than this value
319
    @rtype: list
320
    @return: the list of the log entries selected
321

322
    """
323
    if newer_than is None:
324
      serial = -1
325
    else:
326
      serial = newer_than
327

    
328
    entries = []
329
    for op in self.ops:
330
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
331

    
332
    return entries
333

    
334
  def GetInfo(self, fields):
335
    """Returns information about a job.
336

337
    @type fields: list
338
    @param fields: names of fields to return
339
    @rtype: list
340
    @return: list with one element for each field
341
    @raise errors.OpExecError: when an invalid field
342
        has been passed
343

344
    """
345
    row = []
346
    for fname in fields:
347
      if fname == "id":
348
        row.append(self.id)
349
      elif fname == "status":
350
        row.append(self.CalcStatus())
351
      elif fname == "ops":
352
        row.append([op.input.__getstate__() for op in self.ops])
353
      elif fname == "opresult":
354
        row.append([op.result for op in self.ops])
355
      elif fname == "opstatus":
356
        row.append([op.status for op in self.ops])
357
      elif fname == "oplog":
358
        row.append([op.log for op in self.ops])
359
      elif fname == "opstart":
360
        row.append([op.start_timestamp for op in self.ops])
361
      elif fname == "opexec":
362
        row.append([op.exec_timestamp for op in self.ops])
363
      elif fname == "opend":
364
        row.append([op.end_timestamp for op in self.ops])
365
      elif fname == "received_ts":
366
        row.append(self.received_timestamp)
367
      elif fname == "start_ts":
368
        row.append(self.start_timestamp)
369
      elif fname == "end_ts":
370
        row.append(self.end_timestamp)
371
      elif fname == "lock_status":
372
        row.append(self.lock_status)
373
      elif fname == "summary":
374
        row.append([op.input.Summary() for op in self.ops])
375
      else:
376
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
377
    return row
378

    
379
  def MarkUnfinishedOps(self, status, result):
380
    """Mark unfinished opcodes with a given status and result.
381

382
    This is an utility function for marking all running or waiting to
383
    be run opcodes with a given status. Opcodes which are already
384
    finalised are not changed.
385

386
    @param status: a given opcode status
387
    @param result: the opcode result
388

389
    """
390
    try:
391
      not_marked = True
392
      for op in self.ops:
393
        if op.status in constants.OPS_FINALIZED:
394
          assert not_marked, "Finalized opcodes found after non-finalized ones"
395
          continue
396
        op.status = status
397
        op.result = result
398
        not_marked = False
399
    finally:
400
      self.queue.UpdateJobUnlocked(self)
401

    
402

    
403
class _OpExecCallbacks(mcpu.OpExecCbBase):
404
  def __init__(self, queue, job, op):
405
    """Initializes this class.
406

407
    @type queue: L{JobQueue}
408
    @param queue: Job queue
409
    @type job: L{_QueuedJob}
410
    @param job: Job object
411
    @type op: L{_QueuedOpCode}
412
    @param op: OpCode
413

414
    """
415
    assert queue, "Queue is missing"
416
    assert job, "Job is missing"
417
    assert op, "Opcode is missing"
418

    
419
    self._queue = queue
420
    self._job = job
421
    self._op = op
422

    
423
  def _CheckCancel(self):
424
    """Raises an exception to cancel the job if asked to.
425

426
    """
427
    # Cancel here if we were asked to
428
    if self._op.status == constants.OP_STATUS_CANCELING:
429
      logging.debug("Canceling opcode")
430
      raise CancelJob()
431

    
432
  @locking.ssynchronized(_QUEUE, shared=1)
433
  def NotifyStart(self):
434
    """Mark the opcode as running, not lock-waiting.
435

436
    This is called from the mcpu code as a notifier function, when the LU is
437
    finally about to start the Exec() method. Of course, to have end-user
438
    visible results, the opcode must be initially (before calling into
439
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
440

441
    """
442
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
443
                               constants.OP_STATUS_CANCELING)
444

    
445
    # All locks are acquired by now
446
    self._job.lock_status = None
447

    
448
    # Cancel here if we were asked to
449
    self._CheckCancel()
450

    
451
    logging.debug("Opcode is now running")
452
    self._op.status = constants.OP_STATUS_RUNNING
453
    self._op.exec_timestamp = TimeStampNow()
454

    
455
    # And finally replicate the job status
456
    self._queue.UpdateJobUnlocked(self._job)
457

    
458
  @locking.ssynchronized(_QUEUE, shared=1)
459
  def _AppendFeedback(self, timestamp, log_type, log_msg):
460
    """Internal feedback append function, with locks
461

462
    """
463
    self._job.log_serial += 1
464
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
465
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
466

    
467
  def Feedback(self, *args):
468
    """Append a log entry.
469

470
    """
471
    assert len(args) < 3
472

    
473
    if len(args) == 1:
474
      log_type = constants.ELOG_MESSAGE
475
      log_msg = args[0]
476
    else:
477
      (log_type, log_msg) = args
478

    
479
    # The time is split to make serialization easier and not lose
480
    # precision.
481
    timestamp = utils.SplitTime(time.time())
482
    self._AppendFeedback(timestamp, log_type, log_msg)
483

    
484
  def ReportLocks(self, msg):
485
    """Write locking information to the job.
486

487
    Called whenever the LU processor is waiting for a lock or has acquired one.
488

489
    """
490
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
491
                               constants.OP_STATUS_CANCELING)
492

    
493
    # Not getting the queue lock because this is a single assignment
494
    self._job.lock_status = msg
495

    
496
    # Cancel here if we were asked to
497
    self._CheckCancel()
498

    
499

    
500
class _JobChangesChecker(object):
501
  def __init__(self, fields, prev_job_info, prev_log_serial):
502
    """Initializes this class.
503

504
    @type fields: list of strings
505
    @param fields: Fields requested by LUXI client
506
    @type prev_job_info: string
507
    @param prev_job_info: previous job info, as passed by the LUXI client
508
    @type prev_log_serial: string
509
    @param prev_log_serial: previous job serial, as passed by the LUXI client
510

511
    """
512
    self._fields = fields
513
    self._prev_job_info = prev_job_info
514
    self._prev_log_serial = prev_log_serial
515

    
516
  def __call__(self, job):
517
    """Checks whether job has changed.
518

519
    @type job: L{_QueuedJob}
520
    @param job: Job object
521

522
    """
523
    status = job.CalcStatus()
524
    job_info = job.GetInfo(self._fields)
525
    log_entries = job.GetLogEntries(self._prev_log_serial)
526

    
527
    # Serializing and deserializing data can cause type changes (e.g. from
528
    # tuple to list) or precision loss. We're doing it here so that we get
529
    # the same modifications as the data received from the client. Without
530
    # this, the comparison afterwards might fail without the data being
531
    # significantly different.
532
    # TODO: we just deserialized from disk, investigate how to make sure that
533
    # the job info and log entries are compatible to avoid this further step.
534
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
535
    # efficient, though floats will be tricky
536
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
537
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
538

    
539
    # Don't even try to wait if the job is no longer running, there will be
540
    # no changes.
541
    if (status not in (constants.JOB_STATUS_QUEUED,
542
                       constants.JOB_STATUS_RUNNING,
543
                       constants.JOB_STATUS_WAITLOCK) or
544
        job_info != self._prev_job_info or
545
        (log_entries and self._prev_log_serial != log_entries[0][0])):
546
      logging.debug("Job %s changed", job.id)
547
      return (job_info, log_entries)
548

    
549
    return None
550

    
551

    
552
class _JobFileChangesWaiter(object):
553
  def __init__(self, filename):
554
    """Initializes this class.
555

556
    @type filename: string
557
    @param filename: Path to job file
558
    @raises errors.InotifyError: if the notifier cannot be setup
559

560
    """
561
    self._wm = pyinotify.WatchManager()
562
    self._inotify_handler = \
563
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
564
    self._notifier = \
565
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
566
    try:
567
      self._inotify_handler.enable()
568
    except Exception:
569
      # pyinotify doesn't close file descriptors automatically
570
      self._notifier.stop()
571
      raise
572

    
573
  def _OnInotify(self, notifier_enabled):
574
    """Callback for inotify.
575

576
    """
577
    if not notifier_enabled:
578
      self._inotify_handler.enable()
579

    
580
  def Wait(self, timeout):
581
    """Waits for the job file to change.
582

583
    @type timeout: float
584
    @param timeout: Timeout in seconds
585
    @return: Whether there have been events
586

587
    """
588
    assert timeout >= 0
589
    have_events = self._notifier.check_events(timeout * 1000)
590
    if have_events:
591
      self._notifier.read_events()
592
    self._notifier.process_events()
593
    return have_events
594

    
595
  def Close(self):
596
    """Closes underlying notifier and its file descriptor.
597

598
    """
599
    self._notifier.stop()
600

    
601

    
602
class _JobChangesWaiter(object):
603
  def __init__(self, filename):
604
    """Initializes this class.
605

606
    @type filename: string
607
    @param filename: Path to job file
608

609
    """
610
    self._filewaiter = None
611
    self._filename = filename
612

    
613
  def Wait(self, timeout):
614
    """Waits for a job to change.
615

616
    @type timeout: float
617
    @param timeout: Timeout in seconds
618
    @return: Whether there have been events
619

620
    """
621
    if self._filewaiter:
622
      return self._filewaiter.Wait(timeout)
623

    
624
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
625
    # If this point is reached, return immediately and let caller check the job
626
    # file again in case there were changes since the last check. This avoids a
627
    # race condition.
628
    self._filewaiter = _JobFileChangesWaiter(self._filename)
629

    
630
    return True
631

    
632
  def Close(self):
633
    """Closes underlying waiter.
634

635
    """
636
    if self._filewaiter:
637
      self._filewaiter.Close()
638

    
639

    
640
class _WaitForJobChangesHelper(object):
641
  """Helper class using inotify to wait for changes in a job file.
642

643
  This class takes a previous job status and serial, and alerts the client when
644
  the current job status has changed.
645

646
  """
647
  @staticmethod
648
  def _CheckForChanges(job_load_fn, check_fn):
649
    job = job_load_fn()
650
    if not job:
651
      raise errors.JobLost()
652

    
653
    result = check_fn(job)
654
    if result is None:
655
      raise utils.RetryAgain()
656

    
657
    return result
658

    
659
  def __call__(self, filename, job_load_fn,
660
               fields, prev_job_info, prev_log_serial, timeout):
661
    """Waits for changes on a job.
662

663
    @type filename: string
664
    @param filename: File on which to wait for changes
665
    @type job_load_fn: callable
666
    @param job_load_fn: Function to load job
667
    @type fields: list of strings
668
    @param fields: Which fields to check for changes
669
    @type prev_job_info: list or None
670
    @param prev_job_info: Last job information returned
671
    @type prev_log_serial: int
672
    @param prev_log_serial: Last job message serial number
673
    @type timeout: float
674
    @param timeout: maximum time to wait in seconds
675

676
    """
677
    try:
678
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
679
      waiter = _JobChangesWaiter(filename)
680
      try:
681
        return utils.Retry(compat.partial(self._CheckForChanges,
682
                                          job_load_fn, check_fn),
683
                           utils.RETRY_REMAINING_TIME, timeout,
684
                           wait_fn=waiter.Wait)
685
      finally:
686
        waiter.Close()
687
    except (errors.InotifyError, errors.JobLost):
688
      return None
689
    except utils.RetryTimeout:
690
      return constants.JOB_NOTCHANGED
691

    
692

    
693
class _JobQueueWorker(workerpool.BaseWorker):
694
  """The actual job workers.
695

696
  """
697
  def RunTask(self, job): # pylint: disable-msg=W0221
698
    """Job executor.
699

700
    This functions processes a job. It is closely tied to the _QueuedJob and
701
    _QueuedOpCode classes.
702

703
    @type job: L{_QueuedJob}
704
    @param job: the job to be processed
705

706
    """
707
    logging.info("Processing job %s", job.id)
708
    proc = mcpu.Processor(self.pool.queue.context, job.id)
709
    queue = job.queue
710
    try:
711
      try:
712
        count = len(job.ops)
713
        for idx, op in enumerate(job.ops):
714
          op_summary = op.input.Summary()
715
          if op.status == constants.OP_STATUS_SUCCESS:
716
            # this is a job that was partially completed before master
717
            # daemon shutdown, so it can be expected that some opcodes
718
            # are already completed successfully (if any did error
719
            # out, then the whole job should have been aborted and not
720
            # resubmitted for processing)
721
            logging.info("Op %s/%s: opcode %s already processed, skipping",
722
                         idx + 1, count, op_summary)
723
            continue
724
          try:
725
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
726
                         op_summary)
727

    
728
            queue.acquire(shared=1)
729
            try:
730
              if op.status == constants.OP_STATUS_CANCELED:
731
                logging.debug("Canceling opcode")
732
                raise CancelJob()
733
              assert op.status == constants.OP_STATUS_QUEUED
734
              logging.debug("Opcode %s/%s waiting for locks",
735
                            idx + 1, count)
736
              op.status = constants.OP_STATUS_WAITLOCK
737
              op.result = None
738
              op.start_timestamp = TimeStampNow()
739
              if idx == 0: # first opcode
740
                job.start_timestamp = op.start_timestamp
741
              queue.UpdateJobUnlocked(job)
742

    
743
              input_opcode = op.input
744
            finally:
745
              queue.release()
746

    
747
            # Make sure not to hold queue lock while calling ExecOpCode
748
            result = proc.ExecOpCode(input_opcode,
749
                                     _OpExecCallbacks(queue, job, op))
750

    
751
            queue.acquire(shared=1)
752
            try:
753
              logging.debug("Opcode %s/%s succeeded", idx + 1, count)
754
              op.status = constants.OP_STATUS_SUCCESS
755
              op.result = result
756
              op.end_timestamp = TimeStampNow()
757
              if idx == count - 1:
758
                job.lock_status = None
759
                job.end_timestamp = TimeStampNow()
760

    
761
                # Consistency check
762
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
763
                                  for i in job.ops)
764

    
765
              queue.UpdateJobUnlocked(job)
766
            finally:
767
              queue.release()
768

    
769
            logging.info("Op %s/%s: Successfully finished opcode %s",
770
                         idx + 1, count, op_summary)
771
          except CancelJob:
772
            # Will be handled further up
773
            raise
774
          except Exception, err:
775
            queue.acquire(shared=1)
776
            try:
777
              try:
778
                logging.debug("Opcode %s/%s failed", idx + 1, count)
779
                op.status = constants.OP_STATUS_ERROR
780
                if isinstance(err, errors.GenericError):
781
                  to_encode = err
782
                else:
783
                  to_encode = errors.OpExecError(str(err))
784
                op.result = errors.EncodeException(to_encode)
785
                op.end_timestamp = TimeStampNow()
786
                logging.info("Op %s/%s: Error in opcode %s: %s",
787
                             idx + 1, count, op_summary, err)
788

    
789
                to_encode = errors.OpExecError("Preceding opcode failed")
790
                job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
791
                                      errors.EncodeException(to_encode))
792

    
793
                # Consistency check
794
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
795
                                  for i in job.ops[:idx])
796
                assert compat.all(i.status == constants.OP_STATUS_ERROR and
797
                                  errors.GetEncodedError(i.result)
798
                                  for i in job.ops[idx:])
799
              finally:
800
                job.lock_status = None
801
                job.end_timestamp = TimeStampNow()
802
                queue.UpdateJobUnlocked(job)
803
            finally:
804
              queue.release()
805
            raise
806

    
807
      except CancelJob:
808
        queue.acquire(shared=1)
809
        try:
810
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
811
                                "Job canceled by request")
812
          job.lock_status = None
813
          job.end_timestamp = TimeStampNow()
814
          queue.UpdateJobUnlocked(job)
815
        finally:
816
          queue.release()
817
      except errors.GenericError, err:
818
        logging.exception("Ganeti exception")
819
      except:
820
        logging.exception("Unhandled exception")
821
    finally:
822
      status = job.CalcStatus()
823
      logging.info("Finished job %s, status = %s", job.id, status)
824

    
825

    
826
class _JobQueueWorkerPool(workerpool.WorkerPool):
827
  """Simple class implementing a job-processing workerpool.
828

829
  """
830
  def __init__(self, queue):
831
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
832
                                              JOBQUEUE_THREADS,
833
                                              _JobQueueWorker)
834
    self.queue = queue
835

    
836

    
837
def _RequireOpenQueue(fn):
838
  """Decorator for "public" functions.
839

840
  This function should be used for all 'public' functions. That is,
841
  functions usually called from other classes. Note that this should
842
  be applied only to methods (not plain functions), since it expects
843
  that the decorated function is called with a first argument that has
844
  a '_queue_filelock' argument.
845

846
  @warning: Use this decorator only after locking.ssynchronized
847

848
  Example::
849
    @locking.ssynchronized(_LOCK)
850
    @_RequireOpenQueue
851
    def Example(self):
852
      pass
853

854
  """
855
  def wrapper(self, *args, **kwargs):
856
    # pylint: disable-msg=W0212
857
    assert self._queue_filelock is not None, "Queue should be open"
858
    return fn(self, *args, **kwargs)
859
  return wrapper
860

    
861

    
862
class JobQueue(object):
863
  """Queue used to manage the jobs.
864

865
  @cvar _RE_JOB_FILE: regex matching the valid job file names
866

867
  """
868
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
869

    
870
  def __init__(self, context):
871
    """Constructor for JobQueue.
872

873
    The constructor will initialize the job queue object and then
874
    start loading the current jobs from disk, either for starting them
875
    (if they were queue) or for aborting them (if they were already
876
    running).
877

878
    @type context: GanetiContext
879
    @param context: the context object for access to the configuration
880
        data and other ganeti objects
881

882
    """
883
    self.context = context
884
    self._memcache = weakref.WeakValueDictionary()
885
    self._my_hostname = netutils.HostInfo().name
886

    
887
    # The Big JobQueue lock. If a code block or method acquires it in shared
888
    # mode safe it must guarantee concurrency with all the code acquiring it in
889
    # shared mode, including itself. In order not to acquire it at all
890
    # concurrency must be guaranteed with all code acquiring it in shared mode
891
    # and all code acquiring it exclusively.
892
    self._lock = locking.SharedLock("JobQueue")
893

    
894
    self.acquire = self._lock.acquire
895
    self.release = self._lock.release
896

    
897
    # Initialize the queue, and acquire the filelock.
898
    # This ensures no other process is working on the job queue.
899
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
900

    
901
    # Read serial file
902
    self._last_serial = jstore.ReadSerial()
903
    assert self._last_serial is not None, ("Serial file was modified between"
904
                                           " check in jstore and here")
905

    
906
    # Get initial list of nodes
907
    self._nodes = dict((n.name, n.primary_ip)
908
                       for n in self.context.cfg.GetAllNodesInfo().values()
909
                       if n.master_candidate)
910

    
911
    # Remove master node
912
    self._nodes.pop(self._my_hostname, None)
913

    
914
    # TODO: Check consistency across nodes
915

    
916
    self._queue_size = 0
917
    self._UpdateQueueSizeUnlocked()
918
    self._drained = self._IsQueueMarkedDrain()
919

    
920
    # Setup worker pool
921
    self._wpool = _JobQueueWorkerPool(self)
922
    try:
923
      # We need to lock here because WorkerPool.AddTask() may start a job while
924
      # we're still doing our work.
925
      self.acquire()
926
      try:
927
        logging.info("Inspecting job queue")
928

    
929
        all_job_ids = self._GetJobIDsUnlocked()
930
        jobs_count = len(all_job_ids)
931
        lastinfo = time.time()
932
        for idx, job_id in enumerate(all_job_ids):
933
          # Give an update every 1000 jobs or 10 seconds
934
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
935
              idx == (jobs_count - 1)):
936
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
937
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
938
            lastinfo = time.time()
939

    
940
          job = self._LoadJobUnlocked(job_id)
941

    
942
          # a failure in loading the job can cause 'None' to be returned
943
          if job is None:
944
            continue
945

    
946
          status = job.CalcStatus()
947

    
948
          if status in (constants.JOB_STATUS_QUEUED, ):
949
            self._wpool.AddTask((job, ))
950

    
951
          elif status in (constants.JOB_STATUS_RUNNING,
952
                          constants.JOB_STATUS_WAITLOCK,
953
                          constants.JOB_STATUS_CANCELING):
954
            logging.warning("Unfinished job %s found: %s", job.id, job)
955
            job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
956
                                  "Unclean master daemon shutdown")
957

    
958
        logging.info("Job queue inspection finished")
959
      finally:
960
        self.release()
961
    except:
962
      self._wpool.TerminateWorkers()
963
      raise
964

    
965
  @locking.ssynchronized(_LOCK)
966
  @_RequireOpenQueue
967
  def AddNode(self, node):
968
    """Register a new node with the queue.
969

970
    @type node: L{objects.Node}
971
    @param node: the node object to be added
972

973
    """
974
    node_name = node.name
975
    assert node_name != self._my_hostname
976

    
977
    # Clean queue directory on added node
978
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
979
    msg = result.fail_msg
980
    if msg:
981
      logging.warning("Cannot cleanup queue directory on node %s: %s",
982
                      node_name, msg)
983

    
984
    if not node.master_candidate:
985
      # remove if existing, ignoring errors
986
      self._nodes.pop(node_name, None)
987
      # and skip the replication of the job ids
988
      return
989

    
990
    # Upload the whole queue excluding archived jobs
991
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
992

    
993
    # Upload current serial file
994
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
995

    
996
    for file_name in files:
997
      # Read file content
998
      content = utils.ReadFile(file_name)
999

    
1000
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
1001
                                                  [node.primary_ip],
1002
                                                  file_name, content)
1003
      msg = result[node_name].fail_msg
1004
      if msg:
1005
        logging.error("Failed to upload file %s to node %s: %s",
1006
                      file_name, node_name, msg)
1007

    
1008
    self._nodes[node_name] = node.primary_ip
1009

    
1010
  @locking.ssynchronized(_LOCK)
1011
  @_RequireOpenQueue
1012
  def RemoveNode(self, node_name):
1013
    """Callback called when removing nodes from the cluster.
1014

1015
    @type node_name: str
1016
    @param node_name: the name of the node to remove
1017

1018
    """
1019
    self._nodes.pop(node_name, None)
1020

    
1021
  @staticmethod
1022
  def _CheckRpcResult(result, nodes, failmsg):
1023
    """Verifies the status of an RPC call.
1024

1025
    Since we aim to keep consistency should this node (the current
1026
    master) fail, we will log errors if our rpc fail, and especially
1027
    log the case when more than half of the nodes fails.
1028

1029
    @param result: the data as returned from the rpc call
1030
    @type nodes: list
1031
    @param nodes: the list of nodes we made the call to
1032
    @type failmsg: str
1033
    @param failmsg: the identifier to be used for logging
1034

1035
    """
1036
    failed = []
1037
    success = []
1038

    
1039
    for node in nodes:
1040
      msg = result[node].fail_msg
1041
      if msg:
1042
        failed.append(node)
1043
        logging.error("RPC call %s (%s) failed on node %s: %s",
1044
                      result[node].call, failmsg, node, msg)
1045
      else:
1046
        success.append(node)
1047

    
1048
    # +1 for the master node
1049
    if (len(success) + 1) < len(failed):
1050
      # TODO: Handle failing nodes
1051
      logging.error("More than half of the nodes failed")
1052

    
1053
  def _GetNodeIp(self):
1054
    """Helper for returning the node name/ip list.
1055

1056
    @rtype: (list, list)
1057
    @return: a tuple of two lists, the first one with the node
1058
        names and the second one with the node addresses
1059

1060
    """
1061
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1062
    name_list = self._nodes.keys()
1063
    addr_list = [self._nodes[name] for name in name_list]
1064
    return name_list, addr_list
1065

    
1066
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1067
    """Writes a file locally and then replicates it to all nodes.
1068

1069
    This function will replace the contents of a file on the local
1070
    node and then replicate it to all the other nodes we have.
1071

1072
    @type file_name: str
1073
    @param file_name: the path of the file to be replicated
1074
    @type data: str
1075
    @param data: the new contents of the file
1076
    @type replicate: boolean
1077
    @param replicate: whether to spread the changes to the remote nodes
1078

1079
    """
1080
    utils.WriteFile(file_name, data=data)
1081

    
1082
    if replicate:
1083
      names, addrs = self._GetNodeIp()
1084
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1085
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1086

    
1087
  def _RenameFilesUnlocked(self, rename):
1088
    """Renames a file locally and then replicate the change.
1089

1090
    This function will rename a file in the local queue directory
1091
    and then replicate this rename to all the other nodes we have.
1092

1093
    @type rename: list of (old, new)
1094
    @param rename: List containing tuples mapping old to new names
1095

1096
    """
1097
    # Rename them locally
1098
    for old, new in rename:
1099
      utils.RenameFile(old, new, mkdir=True)
1100

    
1101
    # ... and on all nodes
1102
    names, addrs = self._GetNodeIp()
1103
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1104
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1105

    
1106
  @staticmethod
1107
  def _FormatJobID(job_id):
1108
    """Convert a job ID to string format.
1109

1110
    Currently this just does C{str(job_id)} after performing some
1111
    checks, but if we want to change the job id format this will
1112
    abstract this change.
1113

1114
    @type job_id: int or long
1115
    @param job_id: the numeric job id
1116
    @rtype: str
1117
    @return: the formatted job id
1118

1119
    """
1120
    if not isinstance(job_id, (int, long)):
1121
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1122
    if job_id < 0:
1123
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1124

    
1125
    return str(job_id)
1126

    
1127
  @classmethod
1128
  def _GetArchiveDirectory(cls, job_id):
1129
    """Returns the archive directory for a job.
1130

1131
    @type job_id: str
1132
    @param job_id: Job identifier
1133
    @rtype: str
1134
    @return: Directory name
1135

1136
    """
1137
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1138

    
1139
  def _NewSerialsUnlocked(self, count):
1140
    """Generates a new job identifier.
1141

1142
    Job identifiers are unique during the lifetime of a cluster.
1143

1144
    @type count: integer
1145
    @param count: how many serials to return
1146
    @rtype: str
1147
    @return: a string representing the job identifier.
1148

1149
    """
1150
    assert count > 0
1151
    # New number
1152
    serial = self._last_serial + count
1153

    
1154
    # Write to file
1155
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1156
                             "%s\n" % serial, True)
1157

    
1158
    result = [self._FormatJobID(v)
1159
              for v in range(self._last_serial, serial + 1)]
1160
    # Keep it only if we were able to write the file
1161
    self._last_serial = serial
1162

    
1163
    return result
1164

    
1165
  @staticmethod
1166
  def _GetJobPath(job_id):
1167
    """Returns the job file for a given job id.
1168

1169
    @type job_id: str
1170
    @param job_id: the job identifier
1171
    @rtype: str
1172
    @return: the path to the job file
1173

1174
    """
1175
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1176

    
1177
  @classmethod
1178
  def _GetArchivedJobPath(cls, job_id):
1179
    """Returns the archived job file for a give job id.
1180

1181
    @type job_id: str
1182
    @param job_id: the job identifier
1183
    @rtype: str
1184
    @return: the path to the archived job file
1185

1186
    """
1187
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1188
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1189

    
1190
  def _GetJobIDsUnlocked(self, sort=True):
1191
    """Return all known job IDs.
1192

1193
    The method only looks at disk because it's a requirement that all
1194
    jobs are present on disk (so in the _memcache we don't have any
1195
    extra IDs).
1196

1197
    @type sort: boolean
1198
    @param sort: perform sorting on the returned job ids
1199
    @rtype: list
1200
    @return: the list of job IDs
1201

1202
    """
1203
    jlist = []
1204
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1205
      m = self._RE_JOB_FILE.match(filename)
1206
      if m:
1207
        jlist.append(m.group(1))
1208
    if sort:
1209
      jlist = utils.NiceSort(jlist)
1210
    return jlist
1211

    
1212
  def _LoadJobUnlocked(self, job_id):
1213
    """Loads a job from the disk or memory.
1214

1215
    Given a job id, this will return the cached job object if
1216
    existing, or try to load the job from the disk. If loading from
1217
    disk, it will also add the job to the cache.
1218

1219
    @param job_id: the job id
1220
    @rtype: L{_QueuedJob} or None
1221
    @return: either None or the job object
1222

1223
    """
1224
    job = self._memcache.get(job_id, None)
1225
    if job:
1226
      logging.debug("Found job %s in memcache", job_id)
1227
      return job
1228

    
1229
    try:
1230
      job = self._LoadJobFromDisk(job_id)
1231
      if job is None:
1232
        return job
1233
    except errors.JobFileCorrupted:
1234
      old_path = self._GetJobPath(job_id)
1235
      new_path = self._GetArchivedJobPath(job_id)
1236
      if old_path == new_path:
1237
        # job already archived (future case)
1238
        logging.exception("Can't parse job %s", job_id)
1239
      else:
1240
        # non-archived case
1241
        logging.exception("Can't parse job %s, will archive.", job_id)
1242
        self._RenameFilesUnlocked([(old_path, new_path)])
1243
      return None
1244

    
1245
    self._memcache[job_id] = job
1246
    logging.debug("Added job %s to the cache", job_id)
1247
    return job
1248

    
1249
  def _LoadJobFromDisk(self, job_id):
1250
    """Load the given job file from disk.
1251

1252
    Given a job file, read, load and restore it in a _QueuedJob format.
1253

1254
    @type job_id: string
1255
    @param job_id: job identifier
1256
    @rtype: L{_QueuedJob} or None
1257
    @return: either None or the job object
1258

1259
    """
1260
    filepath = self._GetJobPath(job_id)
1261
    logging.debug("Loading job from %s", filepath)
1262
    try:
1263
      raw_data = utils.ReadFile(filepath)
1264
    except EnvironmentError, err:
1265
      if err.errno in (errno.ENOENT, ):
1266
        return None
1267
      raise
1268

    
1269
    try:
1270
      data = serializer.LoadJson(raw_data)
1271
      job = _QueuedJob.Restore(self, data)
1272
    except Exception, err: # pylint: disable-msg=W0703
1273
      raise errors.JobFileCorrupted(err)
1274

    
1275
    return job
1276

    
1277
  def SafeLoadJobFromDisk(self, job_id):
1278
    """Load the given job file from disk.
1279

1280
    Given a job file, read, load and restore it in a _QueuedJob format.
1281
    In case of error reading the job, it gets returned as None, and the
1282
    exception is logged.
1283

1284
    @type job_id: string
1285
    @param job_id: job identifier
1286
    @rtype: L{_QueuedJob} or None
1287
    @return: either None or the job object
1288

1289
    """
1290
    try:
1291
      return self._LoadJobFromDisk(job_id)
1292
    except (errors.JobFileCorrupted, EnvironmentError):
1293
      logging.exception("Can't load/parse job %s", job_id)
1294
      return None
1295

    
1296
  @staticmethod
1297
  def _IsQueueMarkedDrain():
1298
    """Check if the queue is marked from drain.
1299

1300
    This currently uses the queue drain file, which makes it a
1301
    per-node flag. In the future this can be moved to the config file.
1302

1303
    @rtype: boolean
1304
    @return: True of the job queue is marked for draining
1305

1306
    """
1307
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1308

    
1309
  def _UpdateQueueSizeUnlocked(self):
1310
    """Update the queue size.
1311

1312
    """
1313
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1314

    
1315
  @locking.ssynchronized(_LOCK)
1316
  @_RequireOpenQueue
1317
  def SetDrainFlag(self, drain_flag):
1318
    """Sets the drain flag for the queue.
1319

1320
    @type drain_flag: boolean
1321
    @param drain_flag: Whether to set or unset the drain flag
1322

1323
    """
1324
    if drain_flag:
1325
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1326
    else:
1327
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1328

    
1329
    self._drained = drain_flag
1330

    
1331
    return True
1332

    
1333
  @_RequireOpenQueue
1334
  def _SubmitJobUnlocked(self, job_id, ops):
1335
    """Create and store a new job.
1336

1337
    This enters the job into our job queue and also puts it on the new
1338
    queue, in order for it to be picked up by the queue processors.
1339

1340
    @type job_id: job ID
1341
    @param job_id: the job ID for the new job
1342
    @type ops: list
1343
    @param ops: The list of OpCodes that will become the new job.
1344
    @rtype: L{_QueuedJob}
1345
    @return: the job object to be queued
1346
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1347
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1348

1349
    """
1350
    # Ok when sharing the big job queue lock, as the drain file is created when
1351
    # the lock is exclusive.
1352
    if self._drained:
1353
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1354

    
1355
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1356
      raise errors.JobQueueFull()
1357

    
1358
    job = _QueuedJob(self, job_id, ops)
1359

    
1360
    # Write to disk
1361
    self.UpdateJobUnlocked(job)
1362

    
1363
    self._queue_size += 1
1364

    
1365
    logging.debug("Adding new job %s to the cache", job_id)
1366
    self._memcache[job_id] = job
1367

    
1368
    return job
1369

    
1370
  @locking.ssynchronized(_LOCK)
1371
  @_RequireOpenQueue
1372
  def SubmitJob(self, ops):
1373
    """Create and store a new job.
1374

1375
    @see: L{_SubmitJobUnlocked}
1376

1377
    """
1378
    job_id = self._NewSerialsUnlocked(1)[0]
1379
    self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
1380
    return job_id
1381

    
1382
  @locking.ssynchronized(_LOCK)
1383
  @_RequireOpenQueue
1384
  def SubmitManyJobs(self, jobs):
1385
    """Create and store multiple jobs.
1386

1387
    @see: L{_SubmitJobUnlocked}
1388

1389
    """
1390
    results = []
1391
    tasks = []
1392
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1393
    for job_id, ops in zip(all_job_ids, jobs):
1394
      try:
1395
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1396
        status = True
1397
        data = job_id
1398
      except errors.GenericError, err:
1399
        data = str(err)
1400
        status = False
1401
      results.append((status, data))
1402
    self._wpool.AddManyTasks(tasks)
1403

    
1404
    return results
1405

    
1406
  @_RequireOpenQueue
1407
  def UpdateJobUnlocked(self, job, replicate=True):
1408
    """Update a job's on disk storage.
1409

1410
    After a job has been modified, this function needs to be called in
1411
    order to write the changes to disk and replicate them to the other
1412
    nodes.
1413

1414
    @type job: L{_QueuedJob}
1415
    @param job: the changed job
1416
    @type replicate: boolean
1417
    @param replicate: whether to replicate the change to remote nodes
1418

1419
    """
1420
    filename = self._GetJobPath(job.id)
1421
    data = serializer.DumpJson(job.Serialize(), indent=False)
1422
    logging.debug("Writing job %s to %s", job.id, filename)
1423
    self._UpdateJobQueueFile(filename, data, replicate)
1424

    
1425
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1426
                        timeout):
1427
    """Waits for changes in a job.
1428

1429
    @type job_id: string
1430
    @param job_id: Job identifier
1431
    @type fields: list of strings
1432
    @param fields: Which fields to check for changes
1433
    @type prev_job_info: list or None
1434
    @param prev_job_info: Last job information returned
1435
    @type prev_log_serial: int
1436
    @param prev_log_serial: Last job message serial number
1437
    @type timeout: float
1438
    @param timeout: maximum time to wait in seconds
1439
    @rtype: tuple (job info, log entries)
1440
    @return: a tuple of the job information as required via
1441
        the fields parameter, and the log entries as a list
1442

1443
        if the job has not changed and the timeout has expired,
1444
        we instead return a special value,
1445
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1446
        as such by the clients
1447

1448
    """
1449
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1450

    
1451
    helper = _WaitForJobChangesHelper()
1452

    
1453
    return helper(self._GetJobPath(job_id), load_fn,
1454
                  fields, prev_job_info, prev_log_serial, timeout)
1455

    
1456
  @locking.ssynchronized(_LOCK)
1457
  @_RequireOpenQueue
1458
  def CancelJob(self, job_id):
1459
    """Cancels a job.
1460

1461
    This will only succeed if the job has not started yet.
1462

1463
    @type job_id: string
1464
    @param job_id: job ID of job to be cancelled.
1465

1466
    """
1467
    logging.info("Cancelling job %s", job_id)
1468

    
1469
    job = self._LoadJobUnlocked(job_id)
1470
    if not job:
1471
      logging.debug("Job %s not found", job_id)
1472
      return (False, "Job %s not found" % job_id)
1473

    
1474
    job_status = job.CalcStatus()
1475

    
1476
    if job_status not in (constants.JOB_STATUS_QUEUED,
1477
                          constants.JOB_STATUS_WAITLOCK):
1478
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1479
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1480

    
1481
    if job_status == constants.JOB_STATUS_QUEUED:
1482
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1483
                            "Job canceled by request")
1484
      return (True, "Job %s canceled" % job.id)
1485

    
1486
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1487
      # The worker will notice the new status and cancel the job
1488
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1489
      return (True, "Job %s will be canceled" % job.id)
1490

    
1491
  @_RequireOpenQueue
1492
  def _ArchiveJobsUnlocked(self, jobs):
1493
    """Archives jobs.
1494

1495
    @type jobs: list of L{_QueuedJob}
1496
    @param jobs: Job objects
1497
    @rtype: int
1498
    @return: Number of archived jobs
1499

1500
    """
1501
    archive_jobs = []
1502
    rename_files = []
1503
    for job in jobs:
1504
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1505
        logging.debug("Job %s is not yet done", job.id)
1506
        continue
1507

    
1508
      archive_jobs.append(job)
1509

    
1510
      old = self._GetJobPath(job.id)
1511
      new = self._GetArchivedJobPath(job.id)
1512
      rename_files.append((old, new))
1513

    
1514
    # TODO: What if 1..n files fail to rename?
1515
    self._RenameFilesUnlocked(rename_files)
1516

    
1517
    logging.debug("Successfully archived job(s) %s",
1518
                  utils.CommaJoin(job.id for job in archive_jobs))
1519

    
1520
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1521
    # the files, we update the cached queue size from the filesystem. When we
1522
    # get around to fix the TODO: above, we can use the number of actually
1523
    # archived jobs to fix this.
1524
    self._UpdateQueueSizeUnlocked()
1525
    return len(archive_jobs)
1526

    
1527
  @locking.ssynchronized(_LOCK)
1528
  @_RequireOpenQueue
1529
  def ArchiveJob(self, job_id):
1530
    """Archives a job.
1531

1532
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1533

1534
    @type job_id: string
1535
    @param job_id: Job ID of job to be archived.
1536
    @rtype: bool
1537
    @return: Whether job was archived
1538

1539
    """
1540
    logging.info("Archiving job %s", job_id)
1541

    
1542
    job = self._LoadJobUnlocked(job_id)
1543
    if not job:
1544
      logging.debug("Job %s not found", job_id)
1545
      return False
1546

    
1547
    return self._ArchiveJobsUnlocked([job]) == 1
1548

    
1549
  @locking.ssynchronized(_LOCK)
1550
  @_RequireOpenQueue
1551
  def AutoArchiveJobs(self, age, timeout):
1552
    """Archives all jobs based on age.
1553

1554
    The method will archive all jobs which are older than the age
1555
    parameter. For jobs that don't have an end timestamp, the start
1556
    timestamp will be considered. The special '-1' age will cause
1557
    archival of all jobs (that are not running or queued).
1558

1559
    @type age: int
1560
    @param age: the minimum age in seconds
1561

1562
    """
1563
    logging.info("Archiving jobs with age more than %s seconds", age)
1564

    
1565
    now = time.time()
1566
    end_time = now + timeout
1567
    archived_count = 0
1568
    last_touched = 0
1569

    
1570
    all_job_ids = self._GetJobIDsUnlocked()
1571
    pending = []
1572
    for idx, job_id in enumerate(all_job_ids):
1573
      last_touched = idx + 1
1574

    
1575
      # Not optimal because jobs could be pending
1576
      # TODO: Measure average duration for job archival and take number of
1577
      # pending jobs into account.
1578
      if time.time() > end_time:
1579
        break
1580

    
1581
      # Returns None if the job failed to load
1582
      job = self._LoadJobUnlocked(job_id)
1583
      if job:
1584
        if job.end_timestamp is None:
1585
          if job.start_timestamp is None:
1586
            job_age = job.received_timestamp
1587
          else:
1588
            job_age = job.start_timestamp
1589
        else:
1590
          job_age = job.end_timestamp
1591

    
1592
        if age == -1 or now - job_age[0] > age:
1593
          pending.append(job)
1594

    
1595
          # Archive 10 jobs at a time
1596
          if len(pending) >= 10:
1597
            archived_count += self._ArchiveJobsUnlocked(pending)
1598
            pending = []
1599

    
1600
    if pending:
1601
      archived_count += self._ArchiveJobsUnlocked(pending)
1602

    
1603
    return (archived_count, len(all_job_ids) - last_touched)
1604

    
1605
  def QueryJobs(self, job_ids, fields):
1606
    """Returns a list of jobs in queue.
1607

1608
    @type job_ids: list
1609
    @param job_ids: sequence of job identifiers or None for all
1610
    @type fields: list
1611
    @param fields: names of fields to return
1612
    @rtype: list
1613
    @return: list one element per job, each element being list with
1614
        the requested fields
1615

1616
    """
1617
    jobs = []
1618
    list_all = False
1619
    if not job_ids:
1620
      # Since files are added to/removed from the queue atomically, there's no
1621
      # risk of getting the job ids in an inconsistent state.
1622
      job_ids = self._GetJobIDsUnlocked()
1623
      list_all = True
1624

    
1625
    for job_id in job_ids:
1626
      job = self.SafeLoadJobFromDisk(job_id)
1627
      if job is not None:
1628
        jobs.append(job.GetInfo(fields))
1629
      elif not list_all:
1630
        jobs.append(None)
1631

    
1632
    return jobs
1633

    
1634
  @locking.ssynchronized(_LOCK)
1635
  @_RequireOpenQueue
1636
  def Shutdown(self):
1637
    """Stops the job queue.
1638

1639
    This shutdowns all the worker threads an closes the queue.
1640

1641
    """
1642
    self._wpool.TerminateWorkers()
1643

    
1644
    self._queue_filelock.Close()
1645
    self._queue_filelock = None