Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 6ea72e43

History | View | Annotate | Download (48.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import errno
35
import re
36
import time
37
import weakref
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import netutils
57
from ganeti import compat
58

    
59

    
60
JOBQUEUE_THREADS = 25
61
JOBS_PER_ARCHIVE_DIRECTORY = 10000
62

    
63
# member lock names to be passed to @ssynchronized decorator
64
_LOCK = "_lock"
65
_QUEUE = "_queue"
66

    
67

    
68
class CancelJob(Exception):
69
  """Special exception to cancel a job.
70

71
  """
72

    
73

    
74
def TimeStampNow():
75
  """Returns the current timestamp.
76

77
  @rtype: tuple
78
  @return: the current time in the (seconds, microseconds) format
79

80
  """
81
  return utils.SplitTime(time.time())
82

    
83

    
84
class _QueuedOpCode(object):
85
  """Encapsulates an opcode object.
86

87
  @ivar log: holds the execution log and consists of tuples
88
  of the form C{(log_serial, timestamp, level, message)}
89
  @ivar input: the OpCode we encapsulate
90
  @ivar status: the current status
91
  @ivar result: the result of the LU execution
92
  @ivar start_timestamp: timestamp for the start of the execution
93
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94
  @ivar stop_timestamp: timestamp for the end of the execution
95

96
  """
97
  __slots__ = ["input", "status", "result", "log",
98
               "start_timestamp", "exec_timestamp", "end_timestamp",
99
               "__weakref__"]
100

    
101
  def __init__(self, op):
102
    """Constructor for the _QuededOpCode.
103

104
    @type op: L{opcodes.OpCode}
105
    @param op: the opcode we encapsulate
106

107
    """
108
    self.input = op
109
    self.status = constants.OP_STATUS_QUEUED
110
    self.result = None
111
    self.log = []
112
    self.start_timestamp = None
113
    self.exec_timestamp = None
114
    self.end_timestamp = None
115

    
116
  @classmethod
117
  def Restore(cls, state):
118
    """Restore the _QueuedOpCode from the serialized form.
119

120
    @type state: dict
121
    @param state: the serialized state
122
    @rtype: _QueuedOpCode
123
    @return: a new _QueuedOpCode instance
124

125
    """
126
    obj = _QueuedOpCode.__new__(cls)
127
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
128
    obj.status = state["status"]
129
    obj.result = state["result"]
130
    obj.log = state["log"]
131
    obj.start_timestamp = state.get("start_timestamp", None)
132
    obj.exec_timestamp = state.get("exec_timestamp", None)
133
    obj.end_timestamp = state.get("end_timestamp", None)
134
    return obj
135

    
136
  def Serialize(self):
137
    """Serializes this _QueuedOpCode.
138

139
    @rtype: dict
140
    @return: the dictionary holding the serialized state
141

142
    """
143
    return {
144
      "input": self.input.__getstate__(),
145
      "status": self.status,
146
      "result": self.result,
147
      "log": self.log,
148
      "start_timestamp": self.start_timestamp,
149
      "exec_timestamp": self.exec_timestamp,
150
      "end_timestamp": self.end_timestamp,
151
      }
152

    
153

    
154
class _QueuedJob(object):
155
  """In-memory job representation.
156

157
  This is what we use to track the user-submitted jobs. Locking must
158
  be taken care of by users of this class.
159

160
  @type queue: L{JobQueue}
161
  @ivar queue: the parent queue
162
  @ivar id: the job ID
163
  @type ops: list
164
  @ivar ops: the list of _QueuedOpCode that constitute the job
165
  @type log_serial: int
166
  @ivar log_serial: holds the index for the next log entry
167
  @ivar received_timestamp: the timestamp for when the job was received
168
  @ivar start_timestmap: the timestamp for start of execution
169
  @ivar end_timestamp: the timestamp for end of execution
170
  @ivar lock_status: In-memory locking information for debugging
171

172
  """
173
  # pylint: disable-msg=W0212
174
  __slots__ = ["queue", "id", "ops", "log_serial",
175
               "received_timestamp", "start_timestamp", "end_timestamp",
176
               "lock_status", "change",
177
               "__weakref__"]
178

    
179
  def __init__(self, queue, job_id, ops):
180
    """Constructor for the _QueuedJob.
181

182
    @type queue: L{JobQueue}
183
    @param queue: our parent queue
184
    @type job_id: job_id
185
    @param job_id: our job id
186
    @type ops: list
187
    @param ops: the list of opcodes we hold, which will be encapsulated
188
        in _QueuedOpCodes
189

190
    """
191
    if not ops:
192
      raise errors.GenericError("A job needs at least one opcode")
193

    
194
    self.queue = queue
195
    self.id = job_id
196
    self.ops = [_QueuedOpCode(op) for op in ops]
197
    self.log_serial = 0
198
    self.received_timestamp = TimeStampNow()
199
    self.start_timestamp = None
200
    self.end_timestamp = None
201

    
202
    # In-memory attributes
203
    self.lock_status = None
204

    
205
  def __repr__(self):
206
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
207
              "id=%s" % self.id,
208
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
209

    
210
    return "<%s at %#x>" % (" ".join(status), id(self))
211

    
212
  @classmethod
213
  def Restore(cls, queue, state):
214
    """Restore a _QueuedJob from serialized state:
215

216
    @type queue: L{JobQueue}
217
    @param queue: to which queue the restored job belongs
218
    @type state: dict
219
    @param state: the serialized state
220
    @rtype: _JobQueue
221
    @return: the restored _JobQueue instance
222

223
    """
224
    obj = _QueuedJob.__new__(cls)
225
    obj.queue = queue
226
    obj.id = state["id"]
227
    obj.received_timestamp = state.get("received_timestamp", None)
228
    obj.start_timestamp = state.get("start_timestamp", None)
229
    obj.end_timestamp = state.get("end_timestamp", None)
230

    
231
    # In-memory attributes
232
    obj.lock_status = None
233

    
234
    obj.ops = []
235
    obj.log_serial = 0
236
    for op_state in state["ops"]:
237
      op = _QueuedOpCode.Restore(op_state)
238
      for log_entry in op.log:
239
        obj.log_serial = max(obj.log_serial, log_entry[0])
240
      obj.ops.append(op)
241

    
242
    return obj
243

    
244
  def Serialize(self):
245
    """Serialize the _JobQueue instance.
246

247
    @rtype: dict
248
    @return: the serialized state
249

250
    """
251
    return {
252
      "id": self.id,
253
      "ops": [op.Serialize() for op in self.ops],
254
      "start_timestamp": self.start_timestamp,
255
      "end_timestamp": self.end_timestamp,
256
      "received_timestamp": self.received_timestamp,
257
      }
258

    
259
  def CalcStatus(self):
260
    """Compute the status of this job.
261

262
    This function iterates over all the _QueuedOpCodes in the job and
263
    based on their status, computes the job status.
264

265
    The algorithm is:
266
      - if we find a cancelled, or finished with error, the job
267
        status will be the same
268
      - otherwise, the last opcode with the status one of:
269
          - waitlock
270
          - canceling
271
          - running
272

273
        will determine the job status
274

275
      - otherwise, it means either all opcodes are queued, or success,
276
        and the job status will be the same
277

278
    @return: the job status
279

280
    """
281
    status = constants.JOB_STATUS_QUEUED
282

    
283
    all_success = True
284
    for op in self.ops:
285
      if op.status == constants.OP_STATUS_SUCCESS:
286
        continue
287

    
288
      all_success = False
289

    
290
      if op.status == constants.OP_STATUS_QUEUED:
291
        pass
292
      elif op.status == constants.OP_STATUS_WAITLOCK:
293
        status = constants.JOB_STATUS_WAITLOCK
294
      elif op.status == constants.OP_STATUS_RUNNING:
295
        status = constants.JOB_STATUS_RUNNING
296
      elif op.status == constants.OP_STATUS_CANCELING:
297
        status = constants.JOB_STATUS_CANCELING
298
        break
299
      elif op.status == constants.OP_STATUS_ERROR:
300
        status = constants.JOB_STATUS_ERROR
301
        # The whole job fails if one opcode failed
302
        break
303
      elif op.status == constants.OP_STATUS_CANCELED:
304
        status = constants.OP_STATUS_CANCELED
305
        break
306

    
307
    if all_success:
308
      status = constants.JOB_STATUS_SUCCESS
309

    
310
    return status
311

    
312
  def GetLogEntries(self, newer_than):
313
    """Selectively returns the log entries.
314

315
    @type newer_than: None or int
316
    @param newer_than: if this is None, return all log entries,
317
        otherwise return only the log entries with serial higher
318
        than this value
319
    @rtype: list
320
    @return: the list of the log entries selected
321

322
    """
323
    if newer_than is None:
324
      serial = -1
325
    else:
326
      serial = newer_than
327

    
328
    entries = []
329
    for op in self.ops:
330
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
331

    
332
    return entries
333

    
334
  def GetInfo(self, fields):
335
    """Returns information about a job.
336

337
    @type fields: list
338
    @param fields: names of fields to return
339
    @rtype: list
340
    @return: list with one element for each field
341
    @raise errors.OpExecError: when an invalid field
342
        has been passed
343

344
    """
345
    row = []
346
    for fname in fields:
347
      if fname == "id":
348
        row.append(self.id)
349
      elif fname == "status":
350
        row.append(self.CalcStatus())
351
      elif fname == "ops":
352
        row.append([op.input.__getstate__() for op in self.ops])
353
      elif fname == "opresult":
354
        row.append([op.result for op in self.ops])
355
      elif fname == "opstatus":
356
        row.append([op.status for op in self.ops])
357
      elif fname == "oplog":
358
        row.append([op.log for op in self.ops])
359
      elif fname == "opstart":
360
        row.append([op.start_timestamp for op in self.ops])
361
      elif fname == "opexec":
362
        row.append([op.exec_timestamp for op in self.ops])
363
      elif fname == "opend":
364
        row.append([op.end_timestamp for op in self.ops])
365
      elif fname == "received_ts":
366
        row.append(self.received_timestamp)
367
      elif fname == "start_ts":
368
        row.append(self.start_timestamp)
369
      elif fname == "end_ts":
370
        row.append(self.end_timestamp)
371
      elif fname == "lock_status":
372
        row.append(self.lock_status)
373
      elif fname == "summary":
374
        row.append([op.input.Summary() for op in self.ops])
375
      else:
376
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
377
    return row
378

    
379
  def MarkUnfinishedOps(self, status, result):
380
    """Mark unfinished opcodes with a given status and result.
381

382
    This is an utility function for marking all running or waiting to
383
    be run opcodes with a given status. Opcodes which are already
384
    finalised are not changed.
385

386
    @param status: a given opcode status
387
    @param result: the opcode result
388

389
    """
390
    try:
391
      not_marked = True
392
      for op in self.ops:
393
        if op.status in constants.OPS_FINALIZED:
394
          assert not_marked, "Finalized opcodes found after non-finalized ones"
395
          continue
396
        op.status = status
397
        op.result = result
398
        not_marked = False
399
    finally:
400
      self.queue.UpdateJobUnlocked(self)
401

    
402

    
403
class _OpExecCallbacks(mcpu.OpExecCbBase):
404
  def __init__(self, queue, job, op):
405
    """Initializes this class.
406

407
    @type queue: L{JobQueue}
408
    @param queue: Job queue
409
    @type job: L{_QueuedJob}
410
    @param job: Job object
411
    @type op: L{_QueuedOpCode}
412
    @param op: OpCode
413

414
    """
415
    assert queue, "Queue is missing"
416
    assert job, "Job is missing"
417
    assert op, "Opcode is missing"
418

    
419
    self._queue = queue
420
    self._job = job
421
    self._op = op
422

    
423
  def _CheckCancel(self):
424
    """Raises an exception to cancel the job if asked to.
425

426
    """
427
    # Cancel here if we were asked to
428
    if self._op.status == constants.OP_STATUS_CANCELING:
429
      logging.debug("Canceling opcode")
430
      raise CancelJob()
431

    
432
  @locking.ssynchronized(_QUEUE, shared=1)
433
  def NotifyStart(self):
434
    """Mark the opcode as running, not lock-waiting.
435

436
    This is called from the mcpu code as a notifier function, when the LU is
437
    finally about to start the Exec() method. Of course, to have end-user
438
    visible results, the opcode must be initially (before calling into
439
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
440

441
    """
442
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
443
                               constants.OP_STATUS_CANCELING)
444

    
445
    # All locks are acquired by now
446
    self._job.lock_status = None
447

    
448
    # Cancel here if we were asked to
449
    self._CheckCancel()
450

    
451
    logging.debug("Opcode is now running")
452
    self._op.status = constants.OP_STATUS_RUNNING
453
    self._op.exec_timestamp = TimeStampNow()
454

    
455
    # And finally replicate the job status
456
    self._queue.UpdateJobUnlocked(self._job)
457

    
458
  @locking.ssynchronized(_QUEUE, shared=1)
459
  def _AppendFeedback(self, timestamp, log_type, log_msg):
460
    """Internal feedback append function, with locks
461

462
    """
463
    self._job.log_serial += 1
464
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
465
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
466

    
467
  def Feedback(self, *args):
468
    """Append a log entry.
469

470
    """
471
    assert len(args) < 3
472

    
473
    if len(args) == 1:
474
      log_type = constants.ELOG_MESSAGE
475
      log_msg = args[0]
476
    else:
477
      (log_type, log_msg) = args
478

    
479
    # The time is split to make serialization easier and not lose
480
    # precision.
481
    timestamp = utils.SplitTime(time.time())
482
    self._AppendFeedback(timestamp, log_type, log_msg)
483

    
484
  def ReportLocks(self, msg):
485
    """Write locking information to the job.
486

487
    Called whenever the LU processor is waiting for a lock or has acquired one.
488

489
    """
490
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
491
                               constants.OP_STATUS_CANCELING)
492

    
493
    # Not getting the queue lock because this is a single assignment
494
    self._job.lock_status = msg
495

    
496
    # Cancel here if we were asked to
497
    self._CheckCancel()
498

    
499

    
500
class _JobChangesChecker(object):
501
  def __init__(self, fields, prev_job_info, prev_log_serial):
502
    """Initializes this class.
503

504
    @type fields: list of strings
505
    @param fields: Fields requested by LUXI client
506
    @type prev_job_info: string
507
    @param prev_job_info: previous job info, as passed by the LUXI client
508
    @type prev_log_serial: string
509
    @param prev_log_serial: previous job serial, as passed by the LUXI client
510

511
    """
512
    self._fields = fields
513
    self._prev_job_info = prev_job_info
514
    self._prev_log_serial = prev_log_serial
515

    
516
  def __call__(self, job):
517
    """Checks whether job has changed.
518

519
    @type job: L{_QueuedJob}
520
    @param job: Job object
521

522
    """
523
    status = job.CalcStatus()
524
    job_info = job.GetInfo(self._fields)
525
    log_entries = job.GetLogEntries(self._prev_log_serial)
526

    
527
    # Serializing and deserializing data can cause type changes (e.g. from
528
    # tuple to list) or precision loss. We're doing it here so that we get
529
    # the same modifications as the data received from the client. Without
530
    # this, the comparison afterwards might fail without the data being
531
    # significantly different.
532
    # TODO: we just deserialized from disk, investigate how to make sure that
533
    # the job info and log entries are compatible to avoid this further step.
534
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
535
    # efficient, though floats will be tricky
536
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
537
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
538

    
539
    # Don't even try to wait if the job is no longer running, there will be
540
    # no changes.
541
    if (status not in (constants.JOB_STATUS_QUEUED,
542
                       constants.JOB_STATUS_RUNNING,
543
                       constants.JOB_STATUS_WAITLOCK) or
544
        job_info != self._prev_job_info or
545
        (log_entries and self._prev_log_serial != log_entries[0][0])):
546
      logging.debug("Job %s changed", job.id)
547
      return (job_info, log_entries)
548

    
549
    return None
550

    
551

    
552
class _JobFileChangesWaiter(object):
553
  def __init__(self, filename):
554
    """Initializes this class.
555

556
    @type filename: string
557
    @param filename: Path to job file
558
    @raises errors.InotifyError: if the notifier cannot be setup
559

560
    """
561
    self._wm = pyinotify.WatchManager()
562
    self._inotify_handler = \
563
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
564
    self._notifier = \
565
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
566
    try:
567
      self._inotify_handler.enable()
568
    except Exception:
569
      # pyinotify doesn't close file descriptors automatically
570
      self._notifier.stop()
571
      raise
572

    
573
  def _OnInotify(self, notifier_enabled):
574
    """Callback for inotify.
575

576
    """
577
    if not notifier_enabled:
578
      self._inotify_handler.enable()
579

    
580
  def Wait(self, timeout):
581
    """Waits for the job file to change.
582

583
    @type timeout: float
584
    @param timeout: Timeout in seconds
585
    @return: Whether there have been events
586

587
    """
588
    assert timeout >= 0
589
    have_events = self._notifier.check_events(timeout * 1000)
590
    if have_events:
591
      self._notifier.read_events()
592
    self._notifier.process_events()
593
    return have_events
594

    
595
  def Close(self):
596
    """Closes underlying notifier and its file descriptor.
597

598
    """
599
    self._notifier.stop()
600

    
601

    
602
class _JobChangesWaiter(object):
603
  def __init__(self, filename):
604
    """Initializes this class.
605

606
    @type filename: string
607
    @param filename: Path to job file
608

609
    """
610
    self._filewaiter = None
611
    self._filename = filename
612

    
613
  def Wait(self, timeout):
614
    """Waits for a job to change.
615

616
    @type timeout: float
617
    @param timeout: Timeout in seconds
618
    @return: Whether there have been events
619

620
    """
621
    if self._filewaiter:
622
      return self._filewaiter.Wait(timeout)
623

    
624
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
625
    # If this point is reached, return immediately and let caller check the job
626
    # file again in case there were changes since the last check. This avoids a
627
    # race condition.
628
    self._filewaiter = _JobFileChangesWaiter(self._filename)
629

    
630
    return True
631

    
632
  def Close(self):
633
    """Closes underlying waiter.
634

635
    """
636
    if self._filewaiter:
637
      self._filewaiter.Close()
638

    
639

    
640
class _WaitForJobChangesHelper(object):
641
  """Helper class using inotify to wait for changes in a job file.
642

643
  This class takes a previous job status and serial, and alerts the client when
644
  the current job status has changed.
645

646
  """
647
  @staticmethod
648
  def _CheckForChanges(job_load_fn, check_fn):
649
    job = job_load_fn()
650
    if not job:
651
      raise errors.JobLost()
652

    
653
    result = check_fn(job)
654
    if result is None:
655
      raise utils.RetryAgain()
656

    
657
    return result
658

    
659
  def __call__(self, filename, job_load_fn,
660
               fields, prev_job_info, prev_log_serial, timeout):
661
    """Waits for changes on a job.
662

663
    @type filename: string
664
    @param filename: File on which to wait for changes
665
    @type job_load_fn: callable
666
    @param job_load_fn: Function to load job
667
    @type fields: list of strings
668
    @param fields: Which fields to check for changes
669
    @type prev_job_info: list or None
670
    @param prev_job_info: Last job information returned
671
    @type prev_log_serial: int
672
    @param prev_log_serial: Last job message serial number
673
    @type timeout: float
674
    @param timeout: maximum time to wait in seconds
675

676
    """
677
    try:
678
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
679
      waiter = _JobChangesWaiter(filename)
680
      try:
681
        return utils.Retry(compat.partial(self._CheckForChanges,
682
                                          job_load_fn, check_fn),
683
                           utils.RETRY_REMAINING_TIME, timeout,
684
                           wait_fn=waiter.Wait)
685
      finally:
686
        waiter.Close()
687
    except (errors.InotifyError, errors.JobLost):
688
      return None
689
    except utils.RetryTimeout:
690
      return constants.JOB_NOTCHANGED
691

    
692

    
693
class _JobQueueWorker(workerpool.BaseWorker):
694
  """The actual job workers.
695

696
  """
697
  def RunTask(self, job): # pylint: disable-msg=W0221
698
    """Job executor.
699

700
    This functions processes a job. It is closely tied to the _QueuedJob and
701
    _QueuedOpCode classes.
702

703
    @type job: L{_QueuedJob}
704
    @param job: the job to be processed
705

706
    """
707
    logging.info("Processing job %s", job.id)
708
    proc = mcpu.Processor(self.pool.queue.context, job.id)
709
    queue = job.queue
710
    try:
711
      try:
712
        count = len(job.ops)
713
        for idx, op in enumerate(job.ops):
714
          op_summary = op.input.Summary()
715
          if op.status == constants.OP_STATUS_SUCCESS:
716
            # this is a job that was partially completed before master
717
            # daemon shutdown, so it can be expected that some opcodes
718
            # are already completed successfully (if any did error
719
            # out, then the whole job should have been aborted and not
720
            # resubmitted for processing)
721
            logging.info("Op %s/%s: opcode %s already processed, skipping",
722
                         idx + 1, count, op_summary)
723
            continue
724
          try:
725
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
726
                         op_summary)
727

    
728
            queue.acquire(shared=1)
729
            try:
730
              if op.status == constants.OP_STATUS_CANCELED:
731
                logging.debug("Canceling opcode")
732
                raise CancelJob()
733
              assert op.status == constants.OP_STATUS_QUEUED
734
              logging.debug("Opcode %s/%s waiting for locks",
735
                            idx + 1, count)
736
              op.status = constants.OP_STATUS_WAITLOCK
737
              op.result = None
738
              op.start_timestamp = TimeStampNow()
739
              if idx == 0: # first opcode
740
                job.start_timestamp = op.start_timestamp
741
              queue.UpdateJobUnlocked(job)
742

    
743
              input_opcode = op.input
744
            finally:
745
              queue.release()
746

    
747
            # Make sure not to hold queue lock while calling ExecOpCode
748
            result = proc.ExecOpCode(input_opcode,
749
                                     _OpExecCallbacks(queue, job, op))
750

    
751
            queue.acquire(shared=1)
752
            try:
753
              logging.debug("Opcode %s/%s succeeded", idx + 1, count)
754
              op.status = constants.OP_STATUS_SUCCESS
755
              op.result = result
756
              op.end_timestamp = TimeStampNow()
757
              if idx == count - 1:
758
                job.lock_status = None
759
                job.end_timestamp = TimeStampNow()
760
              queue.UpdateJobUnlocked(job)
761
            finally:
762
              queue.release()
763

    
764
            logging.info("Op %s/%s: Successfully finished opcode %s",
765
                         idx + 1, count, op_summary)
766
          except CancelJob:
767
            # Will be handled further up
768
            raise
769
          except Exception, err:
770
            queue.acquire(shared=1)
771
            try:
772
              try:
773
                logging.debug("Opcode %s/%s failed", idx + 1, count)
774
                op.status = constants.OP_STATUS_ERROR
775
                if isinstance(err, errors.GenericError):
776
                  to_encode = err
777
                else:
778
                  to_encode = errors.OpExecError(str(err))
779
                op.result = errors.EncodeException(to_encode)
780
                op.end_timestamp = TimeStampNow()
781
                logging.info("Op %s/%s: Error in opcode %s: %s",
782
                             idx + 1, count, op_summary, err)
783
              finally:
784
                job.lock_status = None
785
                job.end_timestamp = TimeStampNow()
786
                queue.UpdateJobUnlocked(job)
787
            finally:
788
              queue.release()
789
            raise
790

    
791
      except CancelJob:
792
        queue.acquire(shared=1)
793
        try:
794
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
795
                                "Job canceled by request")
796
          job.lock_status = None
797
          job.end_timestamp = TimeStampNow()
798
          queue.UpdateJobUnlocked(job)
799
        finally:
800
          queue.release()
801
      except errors.GenericError, err:
802
        logging.exception("Ganeti exception")
803
      except:
804
        logging.exception("Unhandled exception")
805
    finally:
806
      status = job.CalcStatus()
807
      logging.info("Finished job %s, status = %s", job.id, status)
808

    
809

    
810
class _JobQueueWorkerPool(workerpool.WorkerPool):
811
  """Simple class implementing a job-processing workerpool.
812

813
  """
814
  def __init__(self, queue):
815
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
816
                                              JOBQUEUE_THREADS,
817
                                              _JobQueueWorker)
818
    self.queue = queue
819

    
820

    
821
def _RequireOpenQueue(fn):
822
  """Decorator for "public" functions.
823

824
  This function should be used for all 'public' functions. That is,
825
  functions usually called from other classes. Note that this should
826
  be applied only to methods (not plain functions), since it expects
827
  that the decorated function is called with a first argument that has
828
  a '_queue_filelock' argument.
829

830
  @warning: Use this decorator only after locking.ssynchronized
831

832
  Example::
833
    @locking.ssynchronized(_LOCK)
834
    @_RequireOpenQueue
835
    def Example(self):
836
      pass
837

838
  """
839
  def wrapper(self, *args, **kwargs):
840
    # pylint: disable-msg=W0212
841
    assert self._queue_filelock is not None, "Queue should be open"
842
    return fn(self, *args, **kwargs)
843
  return wrapper
844

    
845

    
846
class JobQueue(object):
847
  """Queue used to manage the jobs.
848

849
  @cvar _RE_JOB_FILE: regex matching the valid job file names
850

851
  """
852
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
853

    
854
  def __init__(self, context):
855
    """Constructor for JobQueue.
856

857
    The constructor will initialize the job queue object and then
858
    start loading the current jobs from disk, either for starting them
859
    (if they were queue) or for aborting them (if they were already
860
    running).
861

862
    @type context: GanetiContext
863
    @param context: the context object for access to the configuration
864
        data and other ganeti objects
865

866
    """
867
    self.context = context
868
    self._memcache = weakref.WeakValueDictionary()
869
    self._my_hostname = netutils.HostInfo().name
870

    
871
    # The Big JobQueue lock. If a code block or method acquires it in shared
872
    # mode safe it must guarantee concurrency with all the code acquiring it in
873
    # shared mode, including itself. In order not to acquire it at all
874
    # concurrency must be guaranteed with all code acquiring it in shared mode
875
    # and all code acquiring it exclusively.
876
    self._lock = locking.SharedLock("JobQueue")
877

    
878
    self.acquire = self._lock.acquire
879
    self.release = self._lock.release
880

    
881
    # Initialize the queue, and acquire the filelock.
882
    # This ensures no other process is working on the job queue.
883
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
884

    
885
    # Read serial file
886
    self._last_serial = jstore.ReadSerial()
887
    assert self._last_serial is not None, ("Serial file was modified between"
888
                                           " check in jstore and here")
889

    
890
    # Get initial list of nodes
891
    self._nodes = dict((n.name, n.primary_ip)
892
                       for n in self.context.cfg.GetAllNodesInfo().values()
893
                       if n.master_candidate)
894

    
895
    # Remove master node
896
    self._nodes.pop(self._my_hostname, None)
897

    
898
    # TODO: Check consistency across nodes
899

    
900
    self._queue_size = 0
901
    self._UpdateQueueSizeUnlocked()
902
    self._drained = self._IsQueueMarkedDrain()
903

    
904
    # Setup worker pool
905
    self._wpool = _JobQueueWorkerPool(self)
906
    try:
907
      # We need to lock here because WorkerPool.AddTask() may start a job while
908
      # we're still doing our work.
909
      self.acquire()
910
      try:
911
        logging.info("Inspecting job queue")
912

    
913
        all_job_ids = self._GetJobIDsUnlocked()
914
        jobs_count = len(all_job_ids)
915
        lastinfo = time.time()
916
        for idx, job_id in enumerate(all_job_ids):
917
          # Give an update every 1000 jobs or 10 seconds
918
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
919
              idx == (jobs_count - 1)):
920
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
921
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
922
            lastinfo = time.time()
923

    
924
          job = self._LoadJobUnlocked(job_id)
925

    
926
          # a failure in loading the job can cause 'None' to be returned
927
          if job is None:
928
            continue
929

    
930
          status = job.CalcStatus()
931

    
932
          if status in (constants.JOB_STATUS_QUEUED, ):
933
            self._wpool.AddTask((job, ))
934

    
935
          elif status in (constants.JOB_STATUS_RUNNING,
936
                          constants.JOB_STATUS_WAITLOCK,
937
                          constants.JOB_STATUS_CANCELING):
938
            logging.warning("Unfinished job %s found: %s", job.id, job)
939
            job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
940
                                  "Unclean master daemon shutdown")
941

    
942
        logging.info("Job queue inspection finished")
943
      finally:
944
        self.release()
945
    except:
946
      self._wpool.TerminateWorkers()
947
      raise
948

    
949
  @locking.ssynchronized(_LOCK)
950
  @_RequireOpenQueue
951
  def AddNode(self, node):
952
    """Register a new node with the queue.
953

954
    @type node: L{objects.Node}
955
    @param node: the node object to be added
956

957
    """
958
    node_name = node.name
959
    assert node_name != self._my_hostname
960

    
961
    # Clean queue directory on added node
962
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
963
    msg = result.fail_msg
964
    if msg:
965
      logging.warning("Cannot cleanup queue directory on node %s: %s",
966
                      node_name, msg)
967

    
968
    if not node.master_candidate:
969
      # remove if existing, ignoring errors
970
      self._nodes.pop(node_name, None)
971
      # and skip the replication of the job ids
972
      return
973

    
974
    # Upload the whole queue excluding archived jobs
975
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
976

    
977
    # Upload current serial file
978
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
979

    
980
    for file_name in files:
981
      # Read file content
982
      content = utils.ReadFile(file_name)
983

    
984
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
985
                                                  [node.primary_ip],
986
                                                  file_name, content)
987
      msg = result[node_name].fail_msg
988
      if msg:
989
        logging.error("Failed to upload file %s to node %s: %s",
990
                      file_name, node_name, msg)
991

    
992
    self._nodes[node_name] = node.primary_ip
993

    
994
  @locking.ssynchronized(_LOCK)
995
  @_RequireOpenQueue
996
  def RemoveNode(self, node_name):
997
    """Callback called when removing nodes from the cluster.
998

999
    @type node_name: str
1000
    @param node_name: the name of the node to remove
1001

1002
    """
1003
    self._nodes.pop(node_name, None)
1004

    
1005
  @staticmethod
1006
  def _CheckRpcResult(result, nodes, failmsg):
1007
    """Verifies the status of an RPC call.
1008

1009
    Since we aim to keep consistency should this node (the current
1010
    master) fail, we will log errors if our rpc fail, and especially
1011
    log the case when more than half of the nodes fails.
1012

1013
    @param result: the data as returned from the rpc call
1014
    @type nodes: list
1015
    @param nodes: the list of nodes we made the call to
1016
    @type failmsg: str
1017
    @param failmsg: the identifier to be used for logging
1018

1019
    """
1020
    failed = []
1021
    success = []
1022

    
1023
    for node in nodes:
1024
      msg = result[node].fail_msg
1025
      if msg:
1026
        failed.append(node)
1027
        logging.error("RPC call %s (%s) failed on node %s: %s",
1028
                      result[node].call, failmsg, node, msg)
1029
      else:
1030
        success.append(node)
1031

    
1032
    # +1 for the master node
1033
    if (len(success) + 1) < len(failed):
1034
      # TODO: Handle failing nodes
1035
      logging.error("More than half of the nodes failed")
1036

    
1037
  def _GetNodeIp(self):
1038
    """Helper for returning the node name/ip list.
1039

1040
    @rtype: (list, list)
1041
    @return: a tuple of two lists, the first one with the node
1042
        names and the second one with the node addresses
1043

1044
    """
1045
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1046
    name_list = self._nodes.keys()
1047
    addr_list = [self._nodes[name] for name in name_list]
1048
    return name_list, addr_list
1049

    
1050
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1051
    """Writes a file locally and then replicates it to all nodes.
1052

1053
    This function will replace the contents of a file on the local
1054
    node and then replicate it to all the other nodes we have.
1055

1056
    @type file_name: str
1057
    @param file_name: the path of the file to be replicated
1058
    @type data: str
1059
    @param data: the new contents of the file
1060
    @type replicate: boolean
1061
    @param replicate: whether to spread the changes to the remote nodes
1062

1063
    """
1064
    utils.WriteFile(file_name, data=data)
1065

    
1066
    if replicate:
1067
      names, addrs = self._GetNodeIp()
1068
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1069
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1070

    
1071
  def _RenameFilesUnlocked(self, rename):
1072
    """Renames a file locally and then replicate the change.
1073

1074
    This function will rename a file in the local queue directory
1075
    and then replicate this rename to all the other nodes we have.
1076

1077
    @type rename: list of (old, new)
1078
    @param rename: List containing tuples mapping old to new names
1079

1080
    """
1081
    # Rename them locally
1082
    for old, new in rename:
1083
      utils.RenameFile(old, new, mkdir=True)
1084

    
1085
    # ... and on all nodes
1086
    names, addrs = self._GetNodeIp()
1087
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1088
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1089

    
1090
  @staticmethod
1091
  def _FormatJobID(job_id):
1092
    """Convert a job ID to string format.
1093

1094
    Currently this just does C{str(job_id)} after performing some
1095
    checks, but if we want to change the job id format this will
1096
    abstract this change.
1097

1098
    @type job_id: int or long
1099
    @param job_id: the numeric job id
1100
    @rtype: str
1101
    @return: the formatted job id
1102

1103
    """
1104
    if not isinstance(job_id, (int, long)):
1105
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1106
    if job_id < 0:
1107
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1108

    
1109
    return str(job_id)
1110

    
1111
  @classmethod
1112
  def _GetArchiveDirectory(cls, job_id):
1113
    """Returns the archive directory for a job.
1114

1115
    @type job_id: str
1116
    @param job_id: Job identifier
1117
    @rtype: str
1118
    @return: Directory name
1119

1120
    """
1121
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1122

    
1123
  def _NewSerialsUnlocked(self, count):
1124
    """Generates a new job identifier.
1125

1126
    Job identifiers are unique during the lifetime of a cluster.
1127

1128
    @type count: integer
1129
    @param count: how many serials to return
1130
    @rtype: str
1131
    @return: a string representing the job identifier.
1132

1133
    """
1134
    assert count > 0
1135
    # New number
1136
    serial = self._last_serial + count
1137

    
1138
    # Write to file
1139
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1140
                             "%s\n" % serial, True)
1141

    
1142
    result = [self._FormatJobID(v)
1143
              for v in range(self._last_serial, serial + 1)]
1144
    # Keep it only if we were able to write the file
1145
    self._last_serial = serial
1146

    
1147
    return result
1148

    
1149
  @staticmethod
1150
  def _GetJobPath(job_id):
1151
    """Returns the job file for a given job id.
1152

1153
    @type job_id: str
1154
    @param job_id: the job identifier
1155
    @rtype: str
1156
    @return: the path to the job file
1157

1158
    """
1159
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1160

    
1161
  @classmethod
1162
  def _GetArchivedJobPath(cls, job_id):
1163
    """Returns the archived job file for a give job id.
1164

1165
    @type job_id: str
1166
    @param job_id: the job identifier
1167
    @rtype: str
1168
    @return: the path to the archived job file
1169

1170
    """
1171
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1172
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1173

    
1174
  def _GetJobIDsUnlocked(self, sort=True):
1175
    """Return all known job IDs.
1176

1177
    The method only looks at disk because it's a requirement that all
1178
    jobs are present on disk (so in the _memcache we don't have any
1179
    extra IDs).
1180

1181
    @type sort: boolean
1182
    @param sort: perform sorting on the returned job ids
1183
    @rtype: list
1184
    @return: the list of job IDs
1185

1186
    """
1187
    jlist = []
1188
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1189
      m = self._RE_JOB_FILE.match(filename)
1190
      if m:
1191
        jlist.append(m.group(1))
1192
    if sort:
1193
      jlist = utils.NiceSort(jlist)
1194
    return jlist
1195

    
1196
  def _LoadJobUnlocked(self, job_id):
1197
    """Loads a job from the disk or memory.
1198

1199
    Given a job id, this will return the cached job object if
1200
    existing, or try to load the job from the disk. If loading from
1201
    disk, it will also add the job to the cache.
1202

1203
    @param job_id: the job id
1204
    @rtype: L{_QueuedJob} or None
1205
    @return: either None or the job object
1206

1207
    """
1208
    job = self._memcache.get(job_id, None)
1209
    if job:
1210
      logging.debug("Found job %s in memcache", job_id)
1211
      return job
1212

    
1213
    try:
1214
      job = self._LoadJobFromDisk(job_id)
1215
      if job is None:
1216
        return job
1217
    except errors.JobFileCorrupted:
1218
      old_path = self._GetJobPath(job_id)
1219
      new_path = self._GetArchivedJobPath(job_id)
1220
      if old_path == new_path:
1221
        # job already archived (future case)
1222
        logging.exception("Can't parse job %s", job_id)
1223
      else:
1224
        # non-archived case
1225
        logging.exception("Can't parse job %s, will archive.", job_id)
1226
        self._RenameFilesUnlocked([(old_path, new_path)])
1227
      return None
1228

    
1229
    self._memcache[job_id] = job
1230
    logging.debug("Added job %s to the cache", job_id)
1231
    return job
1232

    
1233
  def _LoadJobFromDisk(self, job_id):
1234
    """Load the given job file from disk.
1235

1236
    Given a job file, read, load and restore it in a _QueuedJob format.
1237

1238
    @type job_id: string
1239
    @param job_id: job identifier
1240
    @rtype: L{_QueuedJob} or None
1241
    @return: either None or the job object
1242

1243
    """
1244
    filepath = self._GetJobPath(job_id)
1245
    logging.debug("Loading job from %s", filepath)
1246
    try:
1247
      raw_data = utils.ReadFile(filepath)
1248
    except EnvironmentError, err:
1249
      if err.errno in (errno.ENOENT, ):
1250
        return None
1251
      raise
1252

    
1253
    try:
1254
      data = serializer.LoadJson(raw_data)
1255
      job = _QueuedJob.Restore(self, data)
1256
    except Exception, err: # pylint: disable-msg=W0703
1257
      raise errors.JobFileCorrupted(err)
1258

    
1259
    return job
1260

    
1261
  def SafeLoadJobFromDisk(self, job_id):
1262
    """Load the given job file from disk.
1263

1264
    Given a job file, read, load and restore it in a _QueuedJob format.
1265
    In case of error reading the job, it gets returned as None, and the
1266
    exception is logged.
1267

1268
    @type job_id: string
1269
    @param job_id: job identifier
1270
    @rtype: L{_QueuedJob} or None
1271
    @return: either None or the job object
1272

1273
    """
1274
    try:
1275
      return self._LoadJobFromDisk(job_id)
1276
    except (errors.JobFileCorrupted, EnvironmentError):
1277
      logging.exception("Can't load/parse job %s", job_id)
1278
      return None
1279

    
1280
  @staticmethod
1281
  def _IsQueueMarkedDrain():
1282
    """Check if the queue is marked from drain.
1283

1284
    This currently uses the queue drain file, which makes it a
1285
    per-node flag. In the future this can be moved to the config file.
1286

1287
    @rtype: boolean
1288
    @return: True of the job queue is marked for draining
1289

1290
    """
1291
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1292

    
1293
  def _UpdateQueueSizeUnlocked(self):
1294
    """Update the queue size.
1295

1296
    """
1297
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1298

    
1299
  @locking.ssynchronized(_LOCK)
1300
  @_RequireOpenQueue
1301
  def SetDrainFlag(self, drain_flag):
1302
    """Sets the drain flag for the queue.
1303

1304
    @type drain_flag: boolean
1305
    @param drain_flag: Whether to set or unset the drain flag
1306

1307
    """
1308
    if drain_flag:
1309
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1310
    else:
1311
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1312

    
1313
    self._drained = drain_flag
1314

    
1315
    return True
1316

    
1317
  @_RequireOpenQueue
1318
  def _SubmitJobUnlocked(self, job_id, ops):
1319
    """Create and store a new job.
1320

1321
    This enters the job into our job queue and also puts it on the new
1322
    queue, in order for it to be picked up by the queue processors.
1323

1324
    @type job_id: job ID
1325
    @param job_id: the job ID for the new job
1326
    @type ops: list
1327
    @param ops: The list of OpCodes that will become the new job.
1328
    @rtype: L{_QueuedJob}
1329
    @return: the job object to be queued
1330
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1331
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1332

1333
    """
1334
    # Ok when sharing the big job queue lock, as the drain file is created when
1335
    # the lock is exclusive.
1336
    if self._drained:
1337
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1338

    
1339
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1340
      raise errors.JobQueueFull()
1341

    
1342
    job = _QueuedJob(self, job_id, ops)
1343

    
1344
    # Write to disk
1345
    self.UpdateJobUnlocked(job)
1346

    
1347
    self._queue_size += 1
1348

    
1349
    logging.debug("Adding new job %s to the cache", job_id)
1350
    self._memcache[job_id] = job
1351

    
1352
    return job
1353

    
1354
  @locking.ssynchronized(_LOCK)
1355
  @_RequireOpenQueue
1356
  def SubmitJob(self, ops):
1357
    """Create and store a new job.
1358

1359
    @see: L{_SubmitJobUnlocked}
1360

1361
    """
1362
    job_id = self._NewSerialsUnlocked(1)[0]
1363
    self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
1364
    return job_id
1365

    
1366
  @locking.ssynchronized(_LOCK)
1367
  @_RequireOpenQueue
1368
  def SubmitManyJobs(self, jobs):
1369
    """Create and store multiple jobs.
1370

1371
    @see: L{_SubmitJobUnlocked}
1372

1373
    """
1374
    results = []
1375
    tasks = []
1376
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1377
    for job_id, ops in zip(all_job_ids, jobs):
1378
      try:
1379
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1380
        status = True
1381
        data = job_id
1382
      except errors.GenericError, err:
1383
        data = str(err)
1384
        status = False
1385
      results.append((status, data))
1386
    self._wpool.AddManyTasks(tasks)
1387

    
1388
    return results
1389

    
1390
  @_RequireOpenQueue
1391
  def UpdateJobUnlocked(self, job, replicate=True):
1392
    """Update a job's on disk storage.
1393

1394
    After a job has been modified, this function needs to be called in
1395
    order to write the changes to disk and replicate them to the other
1396
    nodes.
1397

1398
    @type job: L{_QueuedJob}
1399
    @param job: the changed job
1400
    @type replicate: boolean
1401
    @param replicate: whether to replicate the change to remote nodes
1402

1403
    """
1404
    filename = self._GetJobPath(job.id)
1405
    data = serializer.DumpJson(job.Serialize(), indent=False)
1406
    logging.debug("Writing job %s to %s", job.id, filename)
1407
    self._UpdateJobQueueFile(filename, data, replicate)
1408

    
1409
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1410
                        timeout):
1411
    """Waits for changes in a job.
1412

1413
    @type job_id: string
1414
    @param job_id: Job identifier
1415
    @type fields: list of strings
1416
    @param fields: Which fields to check for changes
1417
    @type prev_job_info: list or None
1418
    @param prev_job_info: Last job information returned
1419
    @type prev_log_serial: int
1420
    @param prev_log_serial: Last job message serial number
1421
    @type timeout: float
1422
    @param timeout: maximum time to wait in seconds
1423
    @rtype: tuple (job info, log entries)
1424
    @return: a tuple of the job information as required via
1425
        the fields parameter, and the log entries as a list
1426

1427
        if the job has not changed and the timeout has expired,
1428
        we instead return a special value,
1429
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1430
        as such by the clients
1431

1432
    """
1433
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1434

    
1435
    helper = _WaitForJobChangesHelper()
1436

    
1437
    return helper(self._GetJobPath(job_id), load_fn,
1438
                  fields, prev_job_info, prev_log_serial, timeout)
1439

    
1440
  @locking.ssynchronized(_LOCK)
1441
  @_RequireOpenQueue
1442
  def CancelJob(self, job_id):
1443
    """Cancels a job.
1444

1445
    This will only succeed if the job has not started yet.
1446

1447
    @type job_id: string
1448
    @param job_id: job ID of job to be cancelled.
1449

1450
    """
1451
    logging.info("Cancelling job %s", job_id)
1452

    
1453
    job = self._LoadJobUnlocked(job_id)
1454
    if not job:
1455
      logging.debug("Job %s not found", job_id)
1456
      return (False, "Job %s not found" % job_id)
1457

    
1458
    job_status = job.CalcStatus()
1459

    
1460
    if job_status not in (constants.JOB_STATUS_QUEUED,
1461
                          constants.JOB_STATUS_WAITLOCK):
1462
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1463
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1464

    
1465
    if job_status == constants.JOB_STATUS_QUEUED:
1466
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1467
                            "Job canceled by request")
1468
      return (True, "Job %s canceled" % job.id)
1469

    
1470
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1471
      # The worker will notice the new status and cancel the job
1472
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1473
      return (True, "Job %s will be canceled" % job.id)
1474

    
1475
  @_RequireOpenQueue
1476
  def _ArchiveJobsUnlocked(self, jobs):
1477
    """Archives jobs.
1478

1479
    @type jobs: list of L{_QueuedJob}
1480
    @param jobs: Job objects
1481
    @rtype: int
1482
    @return: Number of archived jobs
1483

1484
    """
1485
    archive_jobs = []
1486
    rename_files = []
1487
    for job in jobs:
1488
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1489
        logging.debug("Job %s is not yet done", job.id)
1490
        continue
1491

    
1492
      archive_jobs.append(job)
1493

    
1494
      old = self._GetJobPath(job.id)
1495
      new = self._GetArchivedJobPath(job.id)
1496
      rename_files.append((old, new))
1497

    
1498
    # TODO: What if 1..n files fail to rename?
1499
    self._RenameFilesUnlocked(rename_files)
1500

    
1501
    logging.debug("Successfully archived job(s) %s",
1502
                  utils.CommaJoin(job.id for job in archive_jobs))
1503

    
1504
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1505
    # the files, we update the cached queue size from the filesystem. When we
1506
    # get around to fix the TODO: above, we can use the number of actually
1507
    # archived jobs to fix this.
1508
    self._UpdateQueueSizeUnlocked()
1509
    return len(archive_jobs)
1510

    
1511
  @locking.ssynchronized(_LOCK)
1512
  @_RequireOpenQueue
1513
  def ArchiveJob(self, job_id):
1514
    """Archives a job.
1515

1516
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1517

1518
    @type job_id: string
1519
    @param job_id: Job ID of job to be archived.
1520
    @rtype: bool
1521
    @return: Whether job was archived
1522

1523
    """
1524
    logging.info("Archiving job %s", job_id)
1525

    
1526
    job = self._LoadJobUnlocked(job_id)
1527
    if not job:
1528
      logging.debug("Job %s not found", job_id)
1529
      return False
1530

    
1531
    return self._ArchiveJobsUnlocked([job]) == 1
1532

    
1533
  @locking.ssynchronized(_LOCK)
1534
  @_RequireOpenQueue
1535
  def AutoArchiveJobs(self, age, timeout):
1536
    """Archives all jobs based on age.
1537

1538
    The method will archive all jobs which are older than the age
1539
    parameter. For jobs that don't have an end timestamp, the start
1540
    timestamp will be considered. The special '-1' age will cause
1541
    archival of all jobs (that are not running or queued).
1542

1543
    @type age: int
1544
    @param age: the minimum age in seconds
1545

1546
    """
1547
    logging.info("Archiving jobs with age more than %s seconds", age)
1548

    
1549
    now = time.time()
1550
    end_time = now + timeout
1551
    archived_count = 0
1552
    last_touched = 0
1553

    
1554
    all_job_ids = self._GetJobIDsUnlocked()
1555
    pending = []
1556
    for idx, job_id in enumerate(all_job_ids):
1557
      last_touched = idx + 1
1558

    
1559
      # Not optimal because jobs could be pending
1560
      # TODO: Measure average duration for job archival and take number of
1561
      # pending jobs into account.
1562
      if time.time() > end_time:
1563
        break
1564

    
1565
      # Returns None if the job failed to load
1566
      job = self._LoadJobUnlocked(job_id)
1567
      if job:
1568
        if job.end_timestamp is None:
1569
          if job.start_timestamp is None:
1570
            job_age = job.received_timestamp
1571
          else:
1572
            job_age = job.start_timestamp
1573
        else:
1574
          job_age = job.end_timestamp
1575

    
1576
        if age == -1 or now - job_age[0] > age:
1577
          pending.append(job)
1578

    
1579
          # Archive 10 jobs at a time
1580
          if len(pending) >= 10:
1581
            archived_count += self._ArchiveJobsUnlocked(pending)
1582
            pending = []
1583

    
1584
    if pending:
1585
      archived_count += self._ArchiveJobsUnlocked(pending)
1586

    
1587
    return (archived_count, len(all_job_ids) - last_touched)
1588

    
1589
  def QueryJobs(self, job_ids, fields):
1590
    """Returns a list of jobs in queue.
1591

1592
    @type job_ids: list
1593
    @param job_ids: sequence of job identifiers or None for all
1594
    @type fields: list
1595
    @param fields: names of fields to return
1596
    @rtype: list
1597
    @return: list one element per job, each element being list with
1598
        the requested fields
1599

1600
    """
1601
    jobs = []
1602
    list_all = False
1603
    if not job_ids:
1604
      # Since files are added to/removed from the queue atomically, there's no
1605
      # risk of getting the job ids in an inconsistent state.
1606
      job_ids = self._GetJobIDsUnlocked()
1607
      list_all = True
1608

    
1609
    for job_id in job_ids:
1610
      job = self.SafeLoadJobFromDisk(job_id)
1611
      if job is not None:
1612
        jobs.append(job.GetInfo(fields))
1613
      elif not list_all:
1614
        jobs.append(None)
1615

    
1616
    return jobs
1617

    
1618
  @locking.ssynchronized(_LOCK)
1619
  @_RequireOpenQueue
1620
  def Shutdown(self):
1621
    """Stops the job queue.
1622

1623
    This shutdowns all the worker threads an closes the queue.
1624

1625
    """
1626
    self._wpool.TerminateWorkers()
1627

    
1628
    self._queue_filelock.Close()
1629
    self._queue_filelock = None