Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ dc1e2262

History | View | Annotate | Download (48.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import errno
35
import re
36
import time
37
import weakref
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import netutils
57
from ganeti import compat
58

    
59

    
60
JOBQUEUE_THREADS = 25
61
JOBS_PER_ARCHIVE_DIRECTORY = 10000
62

    
63
# member lock names to be passed to @ssynchronized decorator
64
_LOCK = "_lock"
65
_QUEUE = "_queue"
66

    
67

    
68
class CancelJob(Exception):
69
  """Special exception to cancel a job.
70

71
  """
72

    
73

    
74
def TimeStampNow():
75
  """Returns the current timestamp.
76

77
  @rtype: tuple
78
  @return: the current time in the (seconds, microseconds) format
79

80
  """
81
  return utils.SplitTime(time.time())
82

    
83

    
84
class _QueuedOpCode(object):
85
  """Encapsulates an opcode object.
86

87
  @ivar log: holds the execution log and consists of tuples
88
  of the form C{(log_serial, timestamp, level, message)}
89
  @ivar input: the OpCode we encapsulate
90
  @ivar status: the current status
91
  @ivar result: the result of the LU execution
92
  @ivar start_timestamp: timestamp for the start of the execution
93
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94
  @ivar stop_timestamp: timestamp for the end of the execution
95

96
  """
97
  __slots__ = ["input", "status", "result", "log",
98
               "start_timestamp", "exec_timestamp", "end_timestamp",
99
               "__weakref__"]
100

    
101
  def __init__(self, op):
102
    """Constructor for the _QuededOpCode.
103

104
    @type op: L{opcodes.OpCode}
105
    @param op: the opcode we encapsulate
106

107
    """
108
    self.input = op
109
    self.status = constants.OP_STATUS_QUEUED
110
    self.result = None
111
    self.log = []
112
    self.start_timestamp = None
113
    self.exec_timestamp = None
114
    self.end_timestamp = None
115

    
116
  @classmethod
117
  def Restore(cls, state):
118
    """Restore the _QueuedOpCode from the serialized form.
119

120
    @type state: dict
121
    @param state: the serialized state
122
    @rtype: _QueuedOpCode
123
    @return: a new _QueuedOpCode instance
124

125
    """
126
    obj = _QueuedOpCode.__new__(cls)
127
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
128
    obj.status = state["status"]
129
    obj.result = state["result"]
130
    obj.log = state["log"]
131
    obj.start_timestamp = state.get("start_timestamp", None)
132
    obj.exec_timestamp = state.get("exec_timestamp", None)
133
    obj.end_timestamp = state.get("end_timestamp", None)
134
    return obj
135

    
136
  def Serialize(self):
137
    """Serializes this _QueuedOpCode.
138

139
    @rtype: dict
140
    @return: the dictionary holding the serialized state
141

142
    """
143
    return {
144
      "input": self.input.__getstate__(),
145
      "status": self.status,
146
      "result": self.result,
147
      "log": self.log,
148
      "start_timestamp": self.start_timestamp,
149
      "exec_timestamp": self.exec_timestamp,
150
      "end_timestamp": self.end_timestamp,
151
      }
152

    
153

    
154
class _QueuedJob(object):
155
  """In-memory job representation.
156

157
  This is what we use to track the user-submitted jobs. Locking must
158
  be taken care of by users of this class.
159

160
  @type queue: L{JobQueue}
161
  @ivar queue: the parent queue
162
  @ivar id: the job ID
163
  @type ops: list
164
  @ivar ops: the list of _QueuedOpCode that constitute the job
165
  @type log_serial: int
166
  @ivar log_serial: holds the index for the next log entry
167
  @ivar received_timestamp: the timestamp for when the job was received
168
  @ivar start_timestmap: the timestamp for start of execution
169
  @ivar end_timestamp: the timestamp for end of execution
170
  @ivar lock_status: In-memory locking information for debugging
171

172
  """
173
  # pylint: disable-msg=W0212
174
  __slots__ = ["queue", "id", "ops", "log_serial",
175
               "received_timestamp", "start_timestamp", "end_timestamp",
176
               "lock_status", "change",
177
               "__weakref__"]
178

    
179
  def __init__(self, queue, job_id, ops):
180
    """Constructor for the _QueuedJob.
181

182
    @type queue: L{JobQueue}
183
    @param queue: our parent queue
184
    @type job_id: job_id
185
    @param job_id: our job id
186
    @type ops: list
187
    @param ops: the list of opcodes we hold, which will be encapsulated
188
        in _QueuedOpCodes
189

190
    """
191
    if not ops:
192
      raise errors.GenericError("A job needs at least one opcode")
193

    
194
    self.queue = queue
195
    self.id = job_id
196
    self.ops = [_QueuedOpCode(op) for op in ops]
197
    self.log_serial = 0
198
    self.received_timestamp = TimeStampNow()
199
    self.start_timestamp = None
200
    self.end_timestamp = None
201

    
202
    # In-memory attributes
203
    self.lock_status = None
204

    
205
  def __repr__(self):
206
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
207
              "id=%s" % self.id,
208
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
209

    
210
    return "<%s at %#x>" % (" ".join(status), id(self))
211

    
212
  @classmethod
213
  def Restore(cls, queue, state):
214
    """Restore a _QueuedJob from serialized state:
215

216
    @type queue: L{JobQueue}
217
    @param queue: to which queue the restored job belongs
218
    @type state: dict
219
    @param state: the serialized state
220
    @rtype: _JobQueue
221
    @return: the restored _JobQueue instance
222

223
    """
224
    obj = _QueuedJob.__new__(cls)
225
    obj.queue = queue
226
    obj.id = state["id"]
227
    obj.received_timestamp = state.get("received_timestamp", None)
228
    obj.start_timestamp = state.get("start_timestamp", None)
229
    obj.end_timestamp = state.get("end_timestamp", None)
230

    
231
    # In-memory attributes
232
    obj.lock_status = None
233

    
234
    obj.ops = []
235
    obj.log_serial = 0
236
    for op_state in state["ops"]:
237
      op = _QueuedOpCode.Restore(op_state)
238
      for log_entry in op.log:
239
        obj.log_serial = max(obj.log_serial, log_entry[0])
240
      obj.ops.append(op)
241

    
242
    return obj
243

    
244
  def Serialize(self):
245
    """Serialize the _JobQueue instance.
246

247
    @rtype: dict
248
    @return: the serialized state
249

250
    """
251
    return {
252
      "id": self.id,
253
      "ops": [op.Serialize() for op in self.ops],
254
      "start_timestamp": self.start_timestamp,
255
      "end_timestamp": self.end_timestamp,
256
      "received_timestamp": self.received_timestamp,
257
      }
258

    
259
  def CalcStatus(self):
260
    """Compute the status of this job.
261

262
    This function iterates over all the _QueuedOpCodes in the job and
263
    based on their status, computes the job status.
264

265
    The algorithm is:
266
      - if we find a cancelled, or finished with error, the job
267
        status will be the same
268
      - otherwise, the last opcode with the status one of:
269
          - waitlock
270
          - canceling
271
          - running
272

273
        will determine the job status
274

275
      - otherwise, it means either all opcodes are queued, or success,
276
        and the job status will be the same
277

278
    @return: the job status
279

280
    """
281
    status = constants.JOB_STATUS_QUEUED
282

    
283
    all_success = True
284
    for op in self.ops:
285
      if op.status == constants.OP_STATUS_SUCCESS:
286
        continue
287

    
288
      all_success = False
289

    
290
      if op.status == constants.OP_STATUS_QUEUED:
291
        pass
292
      elif op.status == constants.OP_STATUS_WAITLOCK:
293
        status = constants.JOB_STATUS_WAITLOCK
294
      elif op.status == constants.OP_STATUS_RUNNING:
295
        status = constants.JOB_STATUS_RUNNING
296
      elif op.status == constants.OP_STATUS_CANCELING:
297
        status = constants.JOB_STATUS_CANCELING
298
        break
299
      elif op.status == constants.OP_STATUS_ERROR:
300
        status = constants.JOB_STATUS_ERROR
301
        # The whole job fails if one opcode failed
302
        break
303
      elif op.status == constants.OP_STATUS_CANCELED:
304
        status = constants.OP_STATUS_CANCELED
305
        break
306

    
307
    if all_success:
308
      status = constants.JOB_STATUS_SUCCESS
309

    
310
    return status
311

    
312
  def GetLogEntries(self, newer_than):
313
    """Selectively returns the log entries.
314

315
    @type newer_than: None or int
316
    @param newer_than: if this is None, return all log entries,
317
        otherwise return only the log entries with serial higher
318
        than this value
319
    @rtype: list
320
    @return: the list of the log entries selected
321

322
    """
323
    if newer_than is None:
324
      serial = -1
325
    else:
326
      serial = newer_than
327

    
328
    entries = []
329
    for op in self.ops:
330
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
331

    
332
    return entries
333

    
334
  def GetInfo(self, fields):
335
    """Returns information about a job.
336

337
    @type fields: list
338
    @param fields: names of fields to return
339
    @rtype: list
340
    @return: list with one element for each field
341
    @raise errors.OpExecError: when an invalid field
342
        has been passed
343

344
    """
345
    row = []
346
    for fname in fields:
347
      if fname == "id":
348
        row.append(self.id)
349
      elif fname == "status":
350
        row.append(self.CalcStatus())
351
      elif fname == "ops":
352
        row.append([op.input.__getstate__() for op in self.ops])
353
      elif fname == "opresult":
354
        row.append([op.result for op in self.ops])
355
      elif fname == "opstatus":
356
        row.append([op.status for op in self.ops])
357
      elif fname == "oplog":
358
        row.append([op.log for op in self.ops])
359
      elif fname == "opstart":
360
        row.append([op.start_timestamp for op in self.ops])
361
      elif fname == "opexec":
362
        row.append([op.exec_timestamp for op in self.ops])
363
      elif fname == "opend":
364
        row.append([op.end_timestamp for op in self.ops])
365
      elif fname == "received_ts":
366
        row.append(self.received_timestamp)
367
      elif fname == "start_ts":
368
        row.append(self.start_timestamp)
369
      elif fname == "end_ts":
370
        row.append(self.end_timestamp)
371
      elif fname == "lock_status":
372
        row.append(self.lock_status)
373
      elif fname == "summary":
374
        row.append([op.input.Summary() for op in self.ops])
375
      else:
376
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
377
    return row
378

    
379
  def MarkUnfinishedOps(self, status, result):
380
    """Mark unfinished opcodes with a given status and result.
381

382
    This is an utility function for marking all running or waiting to
383
    be run opcodes with a given status. Opcodes which are already
384
    finalised are not changed.
385

386
    @param status: a given opcode status
387
    @param result: the opcode result
388

389
    """
390
    try:
391
      not_marked = True
392
      for op in self.ops:
393
        if op.status in constants.OPS_FINALIZED:
394
          assert not_marked, "Finalized opcodes found after non-finalized ones"
395
          continue
396
        op.status = status
397
        op.result = result
398
        not_marked = False
399
    finally:
400
      self.queue.UpdateJobUnlocked(self)
401

    
402

    
403
class _OpExecCallbacks(mcpu.OpExecCbBase):
404
  def __init__(self, queue, job, op):
405
    """Initializes this class.
406

407
    @type queue: L{JobQueue}
408
    @param queue: Job queue
409
    @type job: L{_QueuedJob}
410
    @param job: Job object
411
    @type op: L{_QueuedOpCode}
412
    @param op: OpCode
413

414
    """
415
    assert queue, "Queue is missing"
416
    assert job, "Job is missing"
417
    assert op, "Opcode is missing"
418

    
419
    self._queue = queue
420
    self._job = job
421
    self._op = op
422

    
423
  def _CheckCancel(self):
424
    """Raises an exception to cancel the job if asked to.
425

426
    """
427
    # Cancel here if we were asked to
428
    if self._op.status == constants.OP_STATUS_CANCELING:
429
      logging.debug("Canceling opcode")
430
      raise CancelJob()
431

    
432
  @locking.ssynchronized(_QUEUE, shared=1)
433
  def NotifyStart(self):
434
    """Mark the opcode as running, not lock-waiting.
435

436
    This is called from the mcpu code as a notifier function, when the LU is
437
    finally about to start the Exec() method. Of course, to have end-user
438
    visible results, the opcode must be initially (before calling into
439
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
440

441
    """
442
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
443
                               constants.OP_STATUS_CANCELING)
444

    
445
    # All locks are acquired by now
446
    self._job.lock_status = None
447

    
448
    # Cancel here if we were asked to
449
    self._CheckCancel()
450

    
451
    logging.debug("Opcode is now running")
452
    self._op.status = constants.OP_STATUS_RUNNING
453
    self._op.exec_timestamp = TimeStampNow()
454

    
455
    # And finally replicate the job status
456
    self._queue.UpdateJobUnlocked(self._job)
457

    
458
  @locking.ssynchronized(_QUEUE, shared=1)
459
  def _AppendFeedback(self, timestamp, log_type, log_msg):
460
    """Internal feedback append function, with locks
461

462
    """
463
    self._job.log_serial += 1
464
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
465
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
466

    
467
  def Feedback(self, *args):
468
    """Append a log entry.
469

470
    """
471
    assert len(args) < 3
472

    
473
    if len(args) == 1:
474
      log_type = constants.ELOG_MESSAGE
475
      log_msg = args[0]
476
    else:
477
      (log_type, log_msg) = args
478

    
479
    # The time is split to make serialization easier and not lose
480
    # precision.
481
    timestamp = utils.SplitTime(time.time())
482
    self._AppendFeedback(timestamp, log_type, log_msg)
483

    
484
  def ReportLocks(self, msg):
485
    """Write locking information to the job.
486

487
    Called whenever the LU processor is waiting for a lock or has acquired one.
488

489
    """
490
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
491
                               constants.OP_STATUS_CANCELING)
492

    
493
    # Not getting the queue lock because this is a single assignment
494
    self._job.lock_status = msg
495

    
496
    # Cancel here if we were asked to
497
    self._CheckCancel()
498

    
499

    
500
class _JobChangesChecker(object):
501
  def __init__(self, fields, prev_job_info, prev_log_serial):
502
    """Initializes this class.
503

504
    @type fields: list of strings
505
    @param fields: Fields requested by LUXI client
506
    @type prev_job_info: string
507
    @param prev_job_info: previous job info, as passed by the LUXI client
508
    @type prev_log_serial: string
509
    @param prev_log_serial: previous job serial, as passed by the LUXI client
510

511
    """
512
    self._fields = fields
513
    self._prev_job_info = prev_job_info
514
    self._prev_log_serial = prev_log_serial
515

    
516
  def __call__(self, job):
517
    """Checks whether job has changed.
518

519
    @type job: L{_QueuedJob}
520
    @param job: Job object
521

522
    """
523
    status = job.CalcStatus()
524
    job_info = job.GetInfo(self._fields)
525
    log_entries = job.GetLogEntries(self._prev_log_serial)
526

    
527
    # Serializing and deserializing data can cause type changes (e.g. from
528
    # tuple to list) or precision loss. We're doing it here so that we get
529
    # the same modifications as the data received from the client. Without
530
    # this, the comparison afterwards might fail without the data being
531
    # significantly different.
532
    # TODO: we just deserialized from disk, investigate how to make sure that
533
    # the job info and log entries are compatible to avoid this further step.
534
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
535
    # efficient, though floats will be tricky
536
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
537
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
538

    
539
    # Don't even try to wait if the job is no longer running, there will be
540
    # no changes.
541
    if (status not in (constants.JOB_STATUS_QUEUED,
542
                       constants.JOB_STATUS_RUNNING,
543
                       constants.JOB_STATUS_WAITLOCK) or
544
        job_info != self._prev_job_info or
545
        (log_entries and self._prev_log_serial != log_entries[0][0])):
546
      logging.debug("Job %s changed", job.id)
547
      return (job_info, log_entries)
548

    
549
    return None
550

    
551

    
552
class _JobFileChangesWaiter(object):
553
  def __init__(self, filename):
554
    """Initializes this class.
555

556
    @type filename: string
557
    @param filename: Path to job file
558
    @raises errors.InotifyError: if the notifier cannot be setup
559

560
    """
561
    self._wm = pyinotify.WatchManager()
562
    self._inotify_handler = \
563
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
564
    self._notifier = \
565
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
566
    try:
567
      self._inotify_handler.enable()
568
    except Exception:
569
      # pyinotify doesn't close file descriptors automatically
570
      self._notifier.stop()
571
      raise
572

    
573
  def _OnInotify(self, notifier_enabled):
574
    """Callback for inotify.
575

576
    """
577
    if not notifier_enabled:
578
      self._inotify_handler.enable()
579

    
580
  def Wait(self, timeout):
581
    """Waits for the job file to change.
582

583
    @type timeout: float
584
    @param timeout: Timeout in seconds
585
    @return: Whether there have been events
586

587
    """
588
    assert timeout >= 0
589
    have_events = self._notifier.check_events(timeout * 1000)
590
    if have_events:
591
      self._notifier.read_events()
592
    self._notifier.process_events()
593
    return have_events
594

    
595
  def Close(self):
596
    """Closes underlying notifier and its file descriptor.
597

598
    """
599
    self._notifier.stop()
600

    
601

    
602
class _JobChangesWaiter(object):
603
  def __init__(self, filename):
604
    """Initializes this class.
605

606
    @type filename: string
607
    @param filename: Path to job file
608

609
    """
610
    self._filewaiter = None
611
    self._filename = filename
612

    
613
  def Wait(self, timeout):
614
    """Waits for a job to change.
615

616
    @type timeout: float
617
    @param timeout: Timeout in seconds
618
    @return: Whether there have been events
619

620
    """
621
    if self._filewaiter:
622
      return self._filewaiter.Wait(timeout)
623

    
624
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
625
    # If this point is reached, return immediately and let caller check the job
626
    # file again in case there were changes since the last check. This avoids a
627
    # race condition.
628
    self._filewaiter = _JobFileChangesWaiter(self._filename)
629

    
630
    return True
631

    
632
  def Close(self):
633
    """Closes underlying waiter.
634

635
    """
636
    if self._filewaiter:
637
      self._filewaiter.Close()
638

    
639

    
640
class _WaitForJobChangesHelper(object):
641
  """Helper class using inotify to wait for changes in a job file.
642

643
  This class takes a previous job status and serial, and alerts the client when
644
  the current job status has changed.
645

646
  """
647
  @staticmethod
648
  def _CheckForChanges(job_load_fn, check_fn):
649
    job = job_load_fn()
650
    if not job:
651
      raise errors.JobLost()
652

    
653
    result = check_fn(job)
654
    if result is None:
655
      raise utils.RetryAgain()
656

    
657
    return result
658

    
659
  def __call__(self, filename, job_load_fn,
660
               fields, prev_job_info, prev_log_serial, timeout):
661
    """Waits for changes on a job.
662

663
    @type filename: string
664
    @param filename: File on which to wait for changes
665
    @type job_load_fn: callable
666
    @param job_load_fn: Function to load job
667
    @type fields: list of strings
668
    @param fields: Which fields to check for changes
669
    @type prev_job_info: list or None
670
    @param prev_job_info: Last job information returned
671
    @type prev_log_serial: int
672
    @param prev_log_serial: Last job message serial number
673
    @type timeout: float
674
    @param timeout: maximum time to wait in seconds
675

676
    """
677
    try:
678
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
679
      waiter = _JobChangesWaiter(filename)
680
      try:
681
        return utils.Retry(compat.partial(self._CheckForChanges,
682
                                          job_load_fn, check_fn),
683
                           utils.RETRY_REMAINING_TIME, timeout,
684
                           wait_fn=waiter.Wait)
685
      finally:
686
        waiter.Close()
687
    except (errors.InotifyError, errors.JobLost):
688
      return None
689
    except utils.RetryTimeout:
690
      return constants.JOB_NOTCHANGED
691

    
692

    
693
class _JobQueueWorker(workerpool.BaseWorker):
694
  """The actual job workers.
695

696
  """
697
  def RunTask(self, job): # pylint: disable-msg=W0221
698
    """Job executor.
699

700
    This functions processes a job. It is closely tied to the _QueuedJob and
701
    _QueuedOpCode classes.
702

703
    @type job: L{_QueuedJob}
704
    @param job: the job to be processed
705

706
    """
707
    logging.info("Processing job %s", job.id)
708
    proc = mcpu.Processor(self.pool.queue.context, job.id)
709
    queue = job.queue
710
    try:
711
      try:
712
        count = len(job.ops)
713
        for idx, op in enumerate(job.ops):
714
          op_summary = op.input.Summary()
715
          if op.status == constants.OP_STATUS_SUCCESS:
716
            # this is a job that was partially completed before master
717
            # daemon shutdown, so it can be expected that some opcodes
718
            # are already completed successfully (if any did error
719
            # out, then the whole job should have been aborted and not
720
            # resubmitted for processing)
721
            logging.info("Op %s/%s: opcode %s already processed, skipping",
722
                         idx + 1, count, op_summary)
723
            continue
724
          try:
725
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
726
                         op_summary)
727

    
728
            queue.acquire(shared=1)
729
            try:
730
              if op.status == constants.OP_STATUS_CANCELED:
731
                logging.debug("Canceling opcode")
732
                raise CancelJob()
733
              assert op.status == constants.OP_STATUS_QUEUED
734
              logging.debug("Opcode %s/%s waiting for locks",
735
                            idx + 1, count)
736
              op.status = constants.OP_STATUS_WAITLOCK
737
              op.result = None
738
              op.start_timestamp = TimeStampNow()
739
              if idx == 0: # first opcode
740
                job.start_timestamp = op.start_timestamp
741
              queue.UpdateJobUnlocked(job)
742

    
743
              input_opcode = op.input
744
            finally:
745
              queue.release()
746

    
747
            # Make sure not to hold queue lock while calling ExecOpCode
748
            result = proc.ExecOpCode(input_opcode,
749
                                     _OpExecCallbacks(queue, job, op))
750

    
751
            queue.acquire(shared=1)
752
            try:
753
              logging.debug("Opcode %s/%s succeeded", idx + 1, count)
754
              op.status = constants.OP_STATUS_SUCCESS
755
              op.result = result
756
              op.end_timestamp = TimeStampNow()
757
              queue.UpdateJobUnlocked(job)
758
            finally:
759
              queue.release()
760

    
761
            logging.info("Op %s/%s: Successfully finished opcode %s",
762
                         idx + 1, count, op_summary)
763
          except CancelJob:
764
            # Will be handled further up
765
            raise
766
          except Exception, err:
767
            queue.acquire(shared=1)
768
            try:
769
              try:
770
                logging.debug("Opcode %s/%s failed", idx + 1, count)
771
                op.status = constants.OP_STATUS_ERROR
772
                if isinstance(err, errors.GenericError):
773
                  to_encode = err
774
                else:
775
                  to_encode = errors.OpExecError(str(err))
776
                op.result = errors.EncodeException(to_encode)
777
                op.end_timestamp = TimeStampNow()
778
                logging.info("Op %s/%s: Error in opcode %s: %s",
779
                             idx + 1, count, op_summary, err)
780
              finally:
781
                queue.UpdateJobUnlocked(job)
782
            finally:
783
              queue.release()
784
            raise
785

    
786
      except CancelJob:
787
        queue.acquire(shared=1)
788
        try:
789
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
790
                                "Job canceled by request")
791
        finally:
792
          queue.release()
793
      except errors.GenericError, err:
794
        logging.exception("Ganeti exception")
795
      except:
796
        logging.exception("Unhandled exception")
797
    finally:
798
      queue.acquire(shared=1)
799
      try:
800
        try:
801
          job.lock_status = None
802
          job.end_timestamp = TimeStampNow()
803
          queue.UpdateJobUnlocked(job)
804
        finally:
805
          job_id = job.id
806
          status = job.CalcStatus()
807
      finally:
808
        queue.release()
809

    
810
      logging.info("Finished job %s, status = %s", job_id, status)
811

    
812

    
813
class _JobQueueWorkerPool(workerpool.WorkerPool):
814
  """Simple class implementing a job-processing workerpool.
815

816
  """
817
  def __init__(self, queue):
818
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
819
                                              JOBQUEUE_THREADS,
820
                                              _JobQueueWorker)
821
    self.queue = queue
822

    
823

    
824
def _RequireOpenQueue(fn):
825
  """Decorator for "public" functions.
826

827
  This function should be used for all 'public' functions. That is,
828
  functions usually called from other classes. Note that this should
829
  be applied only to methods (not plain functions), since it expects
830
  that the decorated function is called with a first argument that has
831
  a '_queue_filelock' argument.
832

833
  @warning: Use this decorator only after locking.ssynchronized
834

835
  Example::
836
    @locking.ssynchronized(_LOCK)
837
    @_RequireOpenQueue
838
    def Example(self):
839
      pass
840

841
  """
842
  def wrapper(self, *args, **kwargs):
843
    # pylint: disable-msg=W0212
844
    assert self._queue_filelock is not None, "Queue should be open"
845
    return fn(self, *args, **kwargs)
846
  return wrapper
847

    
848

    
849
class JobQueue(object):
850
  """Queue used to manage the jobs.
851

852
  @cvar _RE_JOB_FILE: regex matching the valid job file names
853

854
  """
855
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
856

    
857
  def __init__(self, context):
858
    """Constructor for JobQueue.
859

860
    The constructor will initialize the job queue object and then
861
    start loading the current jobs from disk, either for starting them
862
    (if they were queue) or for aborting them (if they were already
863
    running).
864

865
    @type context: GanetiContext
866
    @param context: the context object for access to the configuration
867
        data and other ganeti objects
868

869
    """
870
    self.context = context
871
    self._memcache = weakref.WeakValueDictionary()
872
    self._my_hostname = netutils.HostInfo().name
873

    
874
    # The Big JobQueue lock. If a code block or method acquires it in shared
875
    # mode safe it must guarantee concurrency with all the code acquiring it in
876
    # shared mode, including itself. In order not to acquire it at all
877
    # concurrency must be guaranteed with all code acquiring it in shared mode
878
    # and all code acquiring it exclusively.
879
    self._lock = locking.SharedLock("JobQueue")
880

    
881
    self.acquire = self._lock.acquire
882
    self.release = self._lock.release
883

    
884
    # Initialize the queue, and acquire the filelock.
885
    # This ensures no other process is working on the job queue.
886
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
887

    
888
    # Read serial file
889
    self._last_serial = jstore.ReadSerial()
890
    assert self._last_serial is not None, ("Serial file was modified between"
891
                                           " check in jstore and here")
892

    
893
    # Get initial list of nodes
894
    self._nodes = dict((n.name, n.primary_ip)
895
                       for n in self.context.cfg.GetAllNodesInfo().values()
896
                       if n.master_candidate)
897

    
898
    # Remove master node
899
    self._nodes.pop(self._my_hostname, None)
900

    
901
    # TODO: Check consistency across nodes
902

    
903
    self._queue_size = 0
904
    self._UpdateQueueSizeUnlocked()
905
    self._drained = self._IsQueueMarkedDrain()
906

    
907
    # Setup worker pool
908
    self._wpool = _JobQueueWorkerPool(self)
909
    try:
910
      # We need to lock here because WorkerPool.AddTask() may start a job while
911
      # we're still doing our work.
912
      self.acquire()
913
      try:
914
        logging.info("Inspecting job queue")
915

    
916
        all_job_ids = self._GetJobIDsUnlocked()
917
        jobs_count = len(all_job_ids)
918
        lastinfo = time.time()
919
        for idx, job_id in enumerate(all_job_ids):
920
          # Give an update every 1000 jobs or 10 seconds
921
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
922
              idx == (jobs_count - 1)):
923
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
924
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
925
            lastinfo = time.time()
926

    
927
          job = self._LoadJobUnlocked(job_id)
928

    
929
          # a failure in loading the job can cause 'None' to be returned
930
          if job is None:
931
            continue
932

    
933
          status = job.CalcStatus()
934

    
935
          if status in (constants.JOB_STATUS_QUEUED, ):
936
            self._wpool.AddTask((job, ))
937

    
938
          elif status in (constants.JOB_STATUS_RUNNING,
939
                          constants.JOB_STATUS_WAITLOCK,
940
                          constants.JOB_STATUS_CANCELING):
941
            logging.warning("Unfinished job %s found: %s", job.id, job)
942
            job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
943
                                  "Unclean master daemon shutdown")
944

    
945
        logging.info("Job queue inspection finished")
946
      finally:
947
        self.release()
948
    except:
949
      self._wpool.TerminateWorkers()
950
      raise
951

    
952
  @locking.ssynchronized(_LOCK)
953
  @_RequireOpenQueue
954
  def AddNode(self, node):
955
    """Register a new node with the queue.
956

957
    @type node: L{objects.Node}
958
    @param node: the node object to be added
959

960
    """
961
    node_name = node.name
962
    assert node_name != self._my_hostname
963

    
964
    # Clean queue directory on added node
965
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
966
    msg = result.fail_msg
967
    if msg:
968
      logging.warning("Cannot cleanup queue directory on node %s: %s",
969
                      node_name, msg)
970

    
971
    if not node.master_candidate:
972
      # remove if existing, ignoring errors
973
      self._nodes.pop(node_name, None)
974
      # and skip the replication of the job ids
975
      return
976

    
977
    # Upload the whole queue excluding archived jobs
978
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
979

    
980
    # Upload current serial file
981
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
982

    
983
    for file_name in files:
984
      # Read file content
985
      content = utils.ReadFile(file_name)
986

    
987
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
988
                                                  [node.primary_ip],
989
                                                  file_name, content)
990
      msg = result[node_name].fail_msg
991
      if msg:
992
        logging.error("Failed to upload file %s to node %s: %s",
993
                      file_name, node_name, msg)
994

    
995
    self._nodes[node_name] = node.primary_ip
996

    
997
  @locking.ssynchronized(_LOCK)
998
  @_RequireOpenQueue
999
  def RemoveNode(self, node_name):
1000
    """Callback called when removing nodes from the cluster.
1001

1002
    @type node_name: str
1003
    @param node_name: the name of the node to remove
1004

1005
    """
1006
    self._nodes.pop(node_name, None)
1007

    
1008
  @staticmethod
1009
  def _CheckRpcResult(result, nodes, failmsg):
1010
    """Verifies the status of an RPC call.
1011

1012
    Since we aim to keep consistency should this node (the current
1013
    master) fail, we will log errors if our rpc fail, and especially
1014
    log the case when more than half of the nodes fails.
1015

1016
    @param result: the data as returned from the rpc call
1017
    @type nodes: list
1018
    @param nodes: the list of nodes we made the call to
1019
    @type failmsg: str
1020
    @param failmsg: the identifier to be used for logging
1021

1022
    """
1023
    failed = []
1024
    success = []
1025

    
1026
    for node in nodes:
1027
      msg = result[node].fail_msg
1028
      if msg:
1029
        failed.append(node)
1030
        logging.error("RPC call %s (%s) failed on node %s: %s",
1031
                      result[node].call, failmsg, node, msg)
1032
      else:
1033
        success.append(node)
1034

    
1035
    # +1 for the master node
1036
    if (len(success) + 1) < len(failed):
1037
      # TODO: Handle failing nodes
1038
      logging.error("More than half of the nodes failed")
1039

    
1040
  def _GetNodeIp(self):
1041
    """Helper for returning the node name/ip list.
1042

1043
    @rtype: (list, list)
1044
    @return: a tuple of two lists, the first one with the node
1045
        names and the second one with the node addresses
1046

1047
    """
1048
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1049
    name_list = self._nodes.keys()
1050
    addr_list = [self._nodes[name] for name in name_list]
1051
    return name_list, addr_list
1052

    
1053
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1054
    """Writes a file locally and then replicates it to all nodes.
1055

1056
    This function will replace the contents of a file on the local
1057
    node and then replicate it to all the other nodes we have.
1058

1059
    @type file_name: str
1060
    @param file_name: the path of the file to be replicated
1061
    @type data: str
1062
    @param data: the new contents of the file
1063
    @type replicate: boolean
1064
    @param replicate: whether to spread the changes to the remote nodes
1065

1066
    """
1067
    utils.WriteFile(file_name, data=data)
1068

    
1069
    if replicate:
1070
      names, addrs = self._GetNodeIp()
1071
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1072
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1073

    
1074
  def _RenameFilesUnlocked(self, rename):
1075
    """Renames a file locally and then replicate the change.
1076

1077
    This function will rename a file in the local queue directory
1078
    and then replicate this rename to all the other nodes we have.
1079

1080
    @type rename: list of (old, new)
1081
    @param rename: List containing tuples mapping old to new names
1082

1083
    """
1084
    # Rename them locally
1085
    for old, new in rename:
1086
      utils.RenameFile(old, new, mkdir=True)
1087

    
1088
    # ... and on all nodes
1089
    names, addrs = self._GetNodeIp()
1090
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1091
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1092

    
1093
  @staticmethod
1094
  def _FormatJobID(job_id):
1095
    """Convert a job ID to string format.
1096

1097
    Currently this just does C{str(job_id)} after performing some
1098
    checks, but if we want to change the job id format this will
1099
    abstract this change.
1100

1101
    @type job_id: int or long
1102
    @param job_id: the numeric job id
1103
    @rtype: str
1104
    @return: the formatted job id
1105

1106
    """
1107
    if not isinstance(job_id, (int, long)):
1108
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1109
    if job_id < 0:
1110
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1111

    
1112
    return str(job_id)
1113

    
1114
  @classmethod
1115
  def _GetArchiveDirectory(cls, job_id):
1116
    """Returns the archive directory for a job.
1117

1118
    @type job_id: str
1119
    @param job_id: Job identifier
1120
    @rtype: str
1121
    @return: Directory name
1122

1123
    """
1124
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1125

    
1126
  def _NewSerialsUnlocked(self, count):
1127
    """Generates a new job identifier.
1128

1129
    Job identifiers are unique during the lifetime of a cluster.
1130

1131
    @type count: integer
1132
    @param count: how many serials to return
1133
    @rtype: str
1134
    @return: a string representing the job identifier.
1135

1136
    """
1137
    assert count > 0
1138
    # New number
1139
    serial = self._last_serial + count
1140

    
1141
    # Write to file
1142
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1143
                             "%s\n" % serial, True)
1144

    
1145
    result = [self._FormatJobID(v)
1146
              for v in range(self._last_serial, serial + 1)]
1147
    # Keep it only if we were able to write the file
1148
    self._last_serial = serial
1149

    
1150
    return result
1151

    
1152
  @staticmethod
1153
  def _GetJobPath(job_id):
1154
    """Returns the job file for a given job id.
1155

1156
    @type job_id: str
1157
    @param job_id: the job identifier
1158
    @rtype: str
1159
    @return: the path to the job file
1160

1161
    """
1162
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1163

    
1164
  @classmethod
1165
  def _GetArchivedJobPath(cls, job_id):
1166
    """Returns the archived job file for a give job id.
1167

1168
    @type job_id: str
1169
    @param job_id: the job identifier
1170
    @rtype: str
1171
    @return: the path to the archived job file
1172

1173
    """
1174
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1175
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1176

    
1177
  def _GetJobIDsUnlocked(self, sort=True):
1178
    """Return all known job IDs.
1179

1180
    The method only looks at disk because it's a requirement that all
1181
    jobs are present on disk (so in the _memcache we don't have any
1182
    extra IDs).
1183

1184
    @type sort: boolean
1185
    @param sort: perform sorting on the returned job ids
1186
    @rtype: list
1187
    @return: the list of job IDs
1188

1189
    """
1190
    jlist = []
1191
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1192
      m = self._RE_JOB_FILE.match(filename)
1193
      if m:
1194
        jlist.append(m.group(1))
1195
    if sort:
1196
      jlist = utils.NiceSort(jlist)
1197
    return jlist
1198

    
1199
  def _LoadJobUnlocked(self, job_id):
1200
    """Loads a job from the disk or memory.
1201

1202
    Given a job id, this will return the cached job object if
1203
    existing, or try to load the job from the disk. If loading from
1204
    disk, it will also add the job to the cache.
1205

1206
    @param job_id: the job id
1207
    @rtype: L{_QueuedJob} or None
1208
    @return: either None or the job object
1209

1210
    """
1211
    job = self._memcache.get(job_id, None)
1212
    if job:
1213
      logging.debug("Found job %s in memcache", job_id)
1214
      return job
1215

    
1216
    try:
1217
      job = self._LoadJobFromDisk(job_id)
1218
      if job is None:
1219
        return job
1220
    except errors.JobFileCorrupted:
1221
      old_path = self._GetJobPath(job_id)
1222
      new_path = self._GetArchivedJobPath(job_id)
1223
      if old_path == new_path:
1224
        # job already archived (future case)
1225
        logging.exception("Can't parse job %s", job_id)
1226
      else:
1227
        # non-archived case
1228
        logging.exception("Can't parse job %s, will archive.", job_id)
1229
        self._RenameFilesUnlocked([(old_path, new_path)])
1230
      return None
1231

    
1232
    self._memcache[job_id] = job
1233
    logging.debug("Added job %s to the cache", job_id)
1234
    return job
1235

    
1236
  def _LoadJobFromDisk(self, job_id):
1237
    """Load the given job file from disk.
1238

1239
    Given a job file, read, load and restore it in a _QueuedJob format.
1240

1241
    @type job_id: string
1242
    @param job_id: job identifier
1243
    @rtype: L{_QueuedJob} or None
1244
    @return: either None or the job object
1245

1246
    """
1247
    filepath = self._GetJobPath(job_id)
1248
    logging.debug("Loading job from %s", filepath)
1249
    try:
1250
      raw_data = utils.ReadFile(filepath)
1251
    except EnvironmentError, err:
1252
      if err.errno in (errno.ENOENT, ):
1253
        return None
1254
      raise
1255

    
1256
    try:
1257
      data = serializer.LoadJson(raw_data)
1258
      job = _QueuedJob.Restore(self, data)
1259
    except Exception, err: # pylint: disable-msg=W0703
1260
      raise errors.JobFileCorrupted(err)
1261

    
1262
    return job
1263

    
1264
  def SafeLoadJobFromDisk(self, job_id):
1265
    """Load the given job file from disk.
1266

1267
    Given a job file, read, load and restore it in a _QueuedJob format.
1268
    In case of error reading the job, it gets returned as None, and the
1269
    exception is logged.
1270

1271
    @type job_id: string
1272
    @param job_id: job identifier
1273
    @rtype: L{_QueuedJob} or None
1274
    @return: either None or the job object
1275

1276
    """
1277
    try:
1278
      return self._LoadJobFromDisk(job_id)
1279
    except (errors.JobFileCorrupted, EnvironmentError):
1280
      logging.exception("Can't load/parse job %s", job_id)
1281
      return None
1282

    
1283
  @staticmethod
1284
  def _IsQueueMarkedDrain():
1285
    """Check if the queue is marked from drain.
1286

1287
    This currently uses the queue drain file, which makes it a
1288
    per-node flag. In the future this can be moved to the config file.
1289

1290
    @rtype: boolean
1291
    @return: True of the job queue is marked for draining
1292

1293
    """
1294
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1295

    
1296
  def _UpdateQueueSizeUnlocked(self):
1297
    """Update the queue size.
1298

1299
    """
1300
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1301

    
1302
  @locking.ssynchronized(_LOCK)
1303
  @_RequireOpenQueue
1304
  def SetDrainFlag(self, drain_flag):
1305
    """Sets the drain flag for the queue.
1306

1307
    @type drain_flag: boolean
1308
    @param drain_flag: Whether to set or unset the drain flag
1309

1310
    """
1311
    if drain_flag:
1312
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1313
    else:
1314
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1315

    
1316
    self._drained = drain_flag
1317

    
1318
    return True
1319

    
1320
  @_RequireOpenQueue
1321
  def _SubmitJobUnlocked(self, job_id, ops):
1322
    """Create and store a new job.
1323

1324
    This enters the job into our job queue and also puts it on the new
1325
    queue, in order for it to be picked up by the queue processors.
1326

1327
    @type job_id: job ID
1328
    @param job_id: the job ID for the new job
1329
    @type ops: list
1330
    @param ops: The list of OpCodes that will become the new job.
1331
    @rtype: L{_QueuedJob}
1332
    @return: the job object to be queued
1333
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1334
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1335

1336
    """
1337
    # Ok when sharing the big job queue lock, as the drain file is created when
1338
    # the lock is exclusive.
1339
    if self._drained:
1340
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1341

    
1342
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1343
      raise errors.JobQueueFull()
1344

    
1345
    job = _QueuedJob(self, job_id, ops)
1346

    
1347
    # Write to disk
1348
    self.UpdateJobUnlocked(job)
1349

    
1350
    self._queue_size += 1
1351

    
1352
    logging.debug("Adding new job %s to the cache", job_id)
1353
    self._memcache[job_id] = job
1354

    
1355
    return job
1356

    
1357
  @locking.ssynchronized(_LOCK)
1358
  @_RequireOpenQueue
1359
  def SubmitJob(self, ops):
1360
    """Create and store a new job.
1361

1362
    @see: L{_SubmitJobUnlocked}
1363

1364
    """
1365
    job_id = self._NewSerialsUnlocked(1)[0]
1366
    self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
1367
    return job_id
1368

    
1369
  @locking.ssynchronized(_LOCK)
1370
  @_RequireOpenQueue
1371
  def SubmitManyJobs(self, jobs):
1372
    """Create and store multiple jobs.
1373

1374
    @see: L{_SubmitJobUnlocked}
1375

1376
    """
1377
    results = []
1378
    tasks = []
1379
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1380
    for job_id, ops in zip(all_job_ids, jobs):
1381
      try:
1382
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1383
        status = True
1384
        data = job_id
1385
      except errors.GenericError, err:
1386
        data = str(err)
1387
        status = False
1388
      results.append((status, data))
1389
    self._wpool.AddManyTasks(tasks)
1390

    
1391
    return results
1392

    
1393
  @_RequireOpenQueue
1394
  def UpdateJobUnlocked(self, job, replicate=True):
1395
    """Update a job's on disk storage.
1396

1397
    After a job has been modified, this function needs to be called in
1398
    order to write the changes to disk and replicate them to the other
1399
    nodes.
1400

1401
    @type job: L{_QueuedJob}
1402
    @param job: the changed job
1403
    @type replicate: boolean
1404
    @param replicate: whether to replicate the change to remote nodes
1405

1406
    """
1407
    filename = self._GetJobPath(job.id)
1408
    data = serializer.DumpJson(job.Serialize(), indent=False)
1409
    logging.debug("Writing job %s to %s", job.id, filename)
1410
    self._UpdateJobQueueFile(filename, data, replicate)
1411

    
1412
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1413
                        timeout):
1414
    """Waits for changes in a job.
1415

1416
    @type job_id: string
1417
    @param job_id: Job identifier
1418
    @type fields: list of strings
1419
    @param fields: Which fields to check for changes
1420
    @type prev_job_info: list or None
1421
    @param prev_job_info: Last job information returned
1422
    @type prev_log_serial: int
1423
    @param prev_log_serial: Last job message serial number
1424
    @type timeout: float
1425
    @param timeout: maximum time to wait in seconds
1426
    @rtype: tuple (job info, log entries)
1427
    @return: a tuple of the job information as required via
1428
        the fields parameter, and the log entries as a list
1429

1430
        if the job has not changed and the timeout has expired,
1431
        we instead return a special value,
1432
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1433
        as such by the clients
1434

1435
    """
1436
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1437

    
1438
    helper = _WaitForJobChangesHelper()
1439

    
1440
    return helper(self._GetJobPath(job_id), load_fn,
1441
                  fields, prev_job_info, prev_log_serial, timeout)
1442

    
1443
  @locking.ssynchronized(_LOCK)
1444
  @_RequireOpenQueue
1445
  def CancelJob(self, job_id):
1446
    """Cancels a job.
1447

1448
    This will only succeed if the job has not started yet.
1449

1450
    @type job_id: string
1451
    @param job_id: job ID of job to be cancelled.
1452

1453
    """
1454
    logging.info("Cancelling job %s", job_id)
1455

    
1456
    job = self._LoadJobUnlocked(job_id)
1457
    if not job:
1458
      logging.debug("Job %s not found", job_id)
1459
      return (False, "Job %s not found" % job_id)
1460

    
1461
    job_status = job.CalcStatus()
1462

    
1463
    if job_status not in (constants.JOB_STATUS_QUEUED,
1464
                          constants.JOB_STATUS_WAITLOCK):
1465
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1466
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1467

    
1468
    if job_status == constants.JOB_STATUS_QUEUED:
1469
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1470
                            "Job canceled by request")
1471
      return (True, "Job %s canceled" % job.id)
1472

    
1473
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1474
      # The worker will notice the new status and cancel the job
1475
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1476
      return (True, "Job %s will be canceled" % job.id)
1477

    
1478
  @_RequireOpenQueue
1479
  def _ArchiveJobsUnlocked(self, jobs):
1480
    """Archives jobs.
1481

1482
    @type jobs: list of L{_QueuedJob}
1483
    @param jobs: Job objects
1484
    @rtype: int
1485
    @return: Number of archived jobs
1486

1487
    """
1488
    archive_jobs = []
1489
    rename_files = []
1490
    for job in jobs:
1491
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1492
        logging.debug("Job %s is not yet done", job.id)
1493
        continue
1494

    
1495
      archive_jobs.append(job)
1496

    
1497
      old = self._GetJobPath(job.id)
1498
      new = self._GetArchivedJobPath(job.id)
1499
      rename_files.append((old, new))
1500

    
1501
    # TODO: What if 1..n files fail to rename?
1502
    self._RenameFilesUnlocked(rename_files)
1503

    
1504
    logging.debug("Successfully archived job(s) %s",
1505
                  utils.CommaJoin(job.id for job in archive_jobs))
1506

    
1507
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1508
    # the files, we update the cached queue size from the filesystem. When we
1509
    # get around to fix the TODO: above, we can use the number of actually
1510
    # archived jobs to fix this.
1511
    self._UpdateQueueSizeUnlocked()
1512
    return len(archive_jobs)
1513

    
1514
  @locking.ssynchronized(_LOCK)
1515
  @_RequireOpenQueue
1516
  def ArchiveJob(self, job_id):
1517
    """Archives a job.
1518

1519
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1520

1521
    @type job_id: string
1522
    @param job_id: Job ID of job to be archived.
1523
    @rtype: bool
1524
    @return: Whether job was archived
1525

1526
    """
1527
    logging.info("Archiving job %s", job_id)
1528

    
1529
    job = self._LoadJobUnlocked(job_id)
1530
    if not job:
1531
      logging.debug("Job %s not found", job_id)
1532
      return False
1533

    
1534
    return self._ArchiveJobsUnlocked([job]) == 1
1535

    
1536
  @locking.ssynchronized(_LOCK)
1537
  @_RequireOpenQueue
1538
  def AutoArchiveJobs(self, age, timeout):
1539
    """Archives all jobs based on age.
1540

1541
    The method will archive all jobs which are older than the age
1542
    parameter. For jobs that don't have an end timestamp, the start
1543
    timestamp will be considered. The special '-1' age will cause
1544
    archival of all jobs (that are not running or queued).
1545

1546
    @type age: int
1547
    @param age: the minimum age in seconds
1548

1549
    """
1550
    logging.info("Archiving jobs with age more than %s seconds", age)
1551

    
1552
    now = time.time()
1553
    end_time = now + timeout
1554
    archived_count = 0
1555
    last_touched = 0
1556

    
1557
    all_job_ids = self._GetJobIDsUnlocked()
1558
    pending = []
1559
    for idx, job_id in enumerate(all_job_ids):
1560
      last_touched = idx + 1
1561

    
1562
      # Not optimal because jobs could be pending
1563
      # TODO: Measure average duration for job archival and take number of
1564
      # pending jobs into account.
1565
      if time.time() > end_time:
1566
        break
1567

    
1568
      # Returns None if the job failed to load
1569
      job = self._LoadJobUnlocked(job_id)
1570
      if job:
1571
        if job.end_timestamp is None:
1572
          if job.start_timestamp is None:
1573
            job_age = job.received_timestamp
1574
          else:
1575
            job_age = job.start_timestamp
1576
        else:
1577
          job_age = job.end_timestamp
1578

    
1579
        if age == -1 or now - job_age[0] > age:
1580
          pending.append(job)
1581

    
1582
          # Archive 10 jobs at a time
1583
          if len(pending) >= 10:
1584
            archived_count += self._ArchiveJobsUnlocked(pending)
1585
            pending = []
1586

    
1587
    if pending:
1588
      archived_count += self._ArchiveJobsUnlocked(pending)
1589

    
1590
    return (archived_count, len(all_job_ids) - last_touched)
1591

    
1592
  def QueryJobs(self, job_ids, fields):
1593
    """Returns a list of jobs in queue.
1594

1595
    @type job_ids: list
1596
    @param job_ids: sequence of job identifiers or None for all
1597
    @type fields: list
1598
    @param fields: names of fields to return
1599
    @rtype: list
1600
    @return: list one element per job, each element being list with
1601
        the requested fields
1602

1603
    """
1604
    jobs = []
1605
    list_all = False
1606
    if not job_ids:
1607
      # Since files are added to/removed from the queue atomically, there's no
1608
      # risk of getting the job ids in an inconsistent state.
1609
      job_ids = self._GetJobIDsUnlocked()
1610
      list_all = True
1611

    
1612
    for job_id in job_ids:
1613
      job = self.SafeLoadJobFromDisk(job_id)
1614
      if job is not None:
1615
        jobs.append(job.GetInfo(fields))
1616
      elif not list_all:
1617
        jobs.append(None)
1618

    
1619
    return jobs
1620

    
1621
  @locking.ssynchronized(_LOCK)
1622
  @_RequireOpenQueue
1623
  def Shutdown(self):
1624
    """Stops the job queue.
1625

1626
    This shutdowns all the worker threads an closes the queue.
1627

1628
    """
1629
    self._wpool.TerminateWorkers()
1630

    
1631
    self._queue_filelock.Close()
1632
    self._queue_filelock = None