Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ b705c7a6

History | View | Annotate | Download (47.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import errno
35
import re
36
import time
37
import weakref
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import netutils
57
from ganeti import compat
58

    
59

    
60
JOBQUEUE_THREADS = 25
61
JOBS_PER_ARCHIVE_DIRECTORY = 10000
62

    
63
# member lock names to be passed to @ssynchronized decorator
64
_LOCK = "_lock"
65
_QUEUE = "_queue"
66

    
67

    
68
class CancelJob(Exception):
69
  """Special exception to cancel a job.
70

71
  """
72

    
73

    
74
def TimeStampNow():
75
  """Returns the current timestamp.
76

77
  @rtype: tuple
78
  @return: the current time in the (seconds, microseconds) format
79

80
  """
81
  return utils.SplitTime(time.time())
82

    
83

    
84
class _QueuedOpCode(object):
85
  """Encapsulates an opcode object.
86

87
  @ivar log: holds the execution log and consists of tuples
88
  of the form C{(log_serial, timestamp, level, message)}
89
  @ivar input: the OpCode we encapsulate
90
  @ivar status: the current status
91
  @ivar result: the result of the LU execution
92
  @ivar start_timestamp: timestamp for the start of the execution
93
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94
  @ivar stop_timestamp: timestamp for the end of the execution
95

96
  """
97
  __slots__ = ["input", "status", "result", "log",
98
               "start_timestamp", "exec_timestamp", "end_timestamp",
99
               "__weakref__"]
100

    
101
  def __init__(self, op):
102
    """Constructor for the _QuededOpCode.
103

104
    @type op: L{opcodes.OpCode}
105
    @param op: the opcode we encapsulate
106

107
    """
108
    self.input = op
109
    self.status = constants.OP_STATUS_QUEUED
110
    self.result = None
111
    self.log = []
112
    self.start_timestamp = None
113
    self.exec_timestamp = None
114
    self.end_timestamp = None
115

    
116
  @classmethod
117
  def Restore(cls, state):
118
    """Restore the _QueuedOpCode from the serialized form.
119

120
    @type state: dict
121
    @param state: the serialized state
122
    @rtype: _QueuedOpCode
123
    @return: a new _QueuedOpCode instance
124

125
    """
126
    obj = _QueuedOpCode.__new__(cls)
127
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
128
    obj.status = state["status"]
129
    obj.result = state["result"]
130
    obj.log = state["log"]
131
    obj.start_timestamp = state.get("start_timestamp", None)
132
    obj.exec_timestamp = state.get("exec_timestamp", None)
133
    obj.end_timestamp = state.get("end_timestamp", None)
134
    return obj
135

    
136
  def Serialize(self):
137
    """Serializes this _QueuedOpCode.
138

139
    @rtype: dict
140
    @return: the dictionary holding the serialized state
141

142
    """
143
    return {
144
      "input": self.input.__getstate__(),
145
      "status": self.status,
146
      "result": self.result,
147
      "log": self.log,
148
      "start_timestamp": self.start_timestamp,
149
      "exec_timestamp": self.exec_timestamp,
150
      "end_timestamp": self.end_timestamp,
151
      }
152

    
153

    
154
class _QueuedJob(object):
155
  """In-memory job representation.
156

157
  This is what we use to track the user-submitted jobs. Locking must
158
  be taken care of by users of this class.
159

160
  @type queue: L{JobQueue}
161
  @ivar queue: the parent queue
162
  @ivar id: the job ID
163
  @type ops: list
164
  @ivar ops: the list of _QueuedOpCode that constitute the job
165
  @type log_serial: int
166
  @ivar log_serial: holds the index for the next log entry
167
  @ivar received_timestamp: the timestamp for when the job was received
168
  @ivar start_timestmap: the timestamp for start of execution
169
  @ivar end_timestamp: the timestamp for end of execution
170
  @ivar lock_status: In-memory locking information for debugging
171

172
  """
173
  # pylint: disable-msg=W0212
174
  __slots__ = ["queue", "id", "ops", "log_serial",
175
               "received_timestamp", "start_timestamp", "end_timestamp",
176
               "lock_status", "change",
177
               "__weakref__"]
178

    
179
  def __init__(self, queue, job_id, ops):
180
    """Constructor for the _QueuedJob.
181

182
    @type queue: L{JobQueue}
183
    @param queue: our parent queue
184
    @type job_id: job_id
185
    @param job_id: our job id
186
    @type ops: list
187
    @param ops: the list of opcodes we hold, which will be encapsulated
188
        in _QueuedOpCodes
189

190
    """
191
    if not ops:
192
      raise errors.GenericError("A job needs at least one opcode")
193

    
194
    self.queue = queue
195
    self.id = job_id
196
    self.ops = [_QueuedOpCode(op) for op in ops]
197
    self.log_serial = 0
198
    self.received_timestamp = TimeStampNow()
199
    self.start_timestamp = None
200
    self.end_timestamp = None
201

    
202
    # In-memory attributes
203
    self.lock_status = None
204

    
205
  def __repr__(self):
206
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
207
              "id=%s" % self.id,
208
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
209

    
210
    return "<%s at %#x>" % (" ".join(status), id(self))
211

    
212
  @classmethod
213
  def Restore(cls, queue, state):
214
    """Restore a _QueuedJob from serialized state:
215

216
    @type queue: L{JobQueue}
217
    @param queue: to which queue the restored job belongs
218
    @type state: dict
219
    @param state: the serialized state
220
    @rtype: _JobQueue
221
    @return: the restored _JobQueue instance
222

223
    """
224
    obj = _QueuedJob.__new__(cls)
225
    obj.queue = queue
226
    obj.id = state["id"]
227
    obj.received_timestamp = state.get("received_timestamp", None)
228
    obj.start_timestamp = state.get("start_timestamp", None)
229
    obj.end_timestamp = state.get("end_timestamp", None)
230

    
231
    # In-memory attributes
232
    obj.lock_status = None
233

    
234
    obj.ops = []
235
    obj.log_serial = 0
236
    for op_state in state["ops"]:
237
      op = _QueuedOpCode.Restore(op_state)
238
      for log_entry in op.log:
239
        obj.log_serial = max(obj.log_serial, log_entry[0])
240
      obj.ops.append(op)
241

    
242
    return obj
243

    
244
  def Serialize(self):
245
    """Serialize the _JobQueue instance.
246

247
    @rtype: dict
248
    @return: the serialized state
249

250
    """
251
    return {
252
      "id": self.id,
253
      "ops": [op.Serialize() for op in self.ops],
254
      "start_timestamp": self.start_timestamp,
255
      "end_timestamp": self.end_timestamp,
256
      "received_timestamp": self.received_timestamp,
257
      }
258

    
259
  def CalcStatus(self):
260
    """Compute the status of this job.
261

262
    This function iterates over all the _QueuedOpCodes in the job and
263
    based on their status, computes the job status.
264

265
    The algorithm is:
266
      - if we find a cancelled, or finished with error, the job
267
        status will be the same
268
      - otherwise, the last opcode with the status one of:
269
          - waitlock
270
          - canceling
271
          - running
272

273
        will determine the job status
274

275
      - otherwise, it means either all opcodes are queued, or success,
276
        and the job status will be the same
277

278
    @return: the job status
279

280
    """
281
    status = constants.JOB_STATUS_QUEUED
282

    
283
    all_success = True
284
    for op in self.ops:
285
      if op.status == constants.OP_STATUS_SUCCESS:
286
        continue
287

    
288
      all_success = False
289

    
290
      if op.status == constants.OP_STATUS_QUEUED:
291
        pass
292
      elif op.status == constants.OP_STATUS_WAITLOCK:
293
        status = constants.JOB_STATUS_WAITLOCK
294
      elif op.status == constants.OP_STATUS_RUNNING:
295
        status = constants.JOB_STATUS_RUNNING
296
      elif op.status == constants.OP_STATUS_CANCELING:
297
        status = constants.JOB_STATUS_CANCELING
298
        break
299
      elif op.status == constants.OP_STATUS_ERROR:
300
        status = constants.JOB_STATUS_ERROR
301
        # The whole job fails if one opcode failed
302
        break
303
      elif op.status == constants.OP_STATUS_CANCELED:
304
        status = constants.OP_STATUS_CANCELED
305
        break
306

    
307
    if all_success:
308
      status = constants.JOB_STATUS_SUCCESS
309

    
310
    return status
311

    
312
  def GetLogEntries(self, newer_than):
313
    """Selectively returns the log entries.
314

315
    @type newer_than: None or int
316
    @param newer_than: if this is None, return all log entries,
317
        otherwise return only the log entries with serial higher
318
        than this value
319
    @rtype: list
320
    @return: the list of the log entries selected
321

322
    """
323
    if newer_than is None:
324
      serial = -1
325
    else:
326
      serial = newer_than
327

    
328
    entries = []
329
    for op in self.ops:
330
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
331

    
332
    return entries
333

    
334
  def GetInfo(self, fields):
335
    """Returns information about a job.
336

337
    @type fields: list
338
    @param fields: names of fields to return
339
    @rtype: list
340
    @return: list with one element for each field
341
    @raise errors.OpExecError: when an invalid field
342
        has been passed
343

344
    """
345
    row = []
346
    for fname in fields:
347
      if fname == "id":
348
        row.append(self.id)
349
      elif fname == "status":
350
        row.append(self.CalcStatus())
351
      elif fname == "ops":
352
        row.append([op.input.__getstate__() for op in self.ops])
353
      elif fname == "opresult":
354
        row.append([op.result for op in self.ops])
355
      elif fname == "opstatus":
356
        row.append([op.status for op in self.ops])
357
      elif fname == "oplog":
358
        row.append([op.log for op in self.ops])
359
      elif fname == "opstart":
360
        row.append([op.start_timestamp for op in self.ops])
361
      elif fname == "opexec":
362
        row.append([op.exec_timestamp for op in self.ops])
363
      elif fname == "opend":
364
        row.append([op.end_timestamp for op in self.ops])
365
      elif fname == "received_ts":
366
        row.append(self.received_timestamp)
367
      elif fname == "start_ts":
368
        row.append(self.start_timestamp)
369
      elif fname == "end_ts":
370
        row.append(self.end_timestamp)
371
      elif fname == "lock_status":
372
        row.append(self.lock_status)
373
      elif fname == "summary":
374
        row.append([op.input.Summary() for op in self.ops])
375
      else:
376
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
377
    return row
378

    
379
  def MarkUnfinishedOps(self, status, result):
380
    """Mark unfinished opcodes with a given status and result.
381

382
    This is an utility function for marking all running or waiting to
383
    be run opcodes with a given status. Opcodes which are already
384
    finalised are not changed.
385

386
    @param status: a given opcode status
387
    @param result: the opcode result
388

389
    """
390
    try:
391
      not_marked = True
392
      for op in self.ops:
393
        if op.status in constants.OPS_FINALIZED:
394
          assert not_marked, "Finalized opcodes found after non-finalized ones"
395
          continue
396
        op.status = status
397
        op.result = result
398
        not_marked = False
399
    finally:
400
      self.queue.UpdateJobUnlocked(self)
401

    
402

    
403
class _OpExecCallbacks(mcpu.OpExecCbBase):
404
  def __init__(self, queue, job, op):
405
    """Initializes this class.
406

407
    @type queue: L{JobQueue}
408
    @param queue: Job queue
409
    @type job: L{_QueuedJob}
410
    @param job: Job object
411
    @type op: L{_QueuedOpCode}
412
    @param op: OpCode
413

414
    """
415
    assert queue, "Queue is missing"
416
    assert job, "Job is missing"
417
    assert op, "Opcode is missing"
418

    
419
    self._queue = queue
420
    self._job = job
421
    self._op = op
422

    
423
  @locking.ssynchronized(_QUEUE, shared=1)
424
  def NotifyStart(self):
425
    """Mark the opcode as running, not lock-waiting.
426

427
    This is called from the mcpu code as a notifier function, when the LU is
428
    finally about to start the Exec() method. Of course, to have end-user
429
    visible results, the opcode must be initially (before calling into
430
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
431

432
    """
433
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
434
                               constants.OP_STATUS_CANCELING)
435

    
436
    # All locks are acquired by now
437
    self._job.lock_status = None
438

    
439
    # Cancel here if we were asked to
440
    if self._op.status == constants.OP_STATUS_CANCELING:
441
      raise CancelJob()
442

    
443
    self._op.status = constants.OP_STATUS_RUNNING
444
    self._op.exec_timestamp = TimeStampNow()
445

    
446
    # And finally replicate the job status
447
    self._queue.UpdateJobUnlocked(self._job)
448

    
449
  @locking.ssynchronized(_QUEUE, shared=1)
450
  def _AppendFeedback(self, timestamp, log_type, log_msg):
451
    """Internal feedback append function, with locks
452

453
    """
454
    self._job.log_serial += 1
455
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
456
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
457

    
458
  def Feedback(self, *args):
459
    """Append a log entry.
460

461
    """
462
    assert len(args) < 3
463

    
464
    if len(args) == 1:
465
      log_type = constants.ELOG_MESSAGE
466
      log_msg = args[0]
467
    else:
468
      (log_type, log_msg) = args
469

    
470
    # The time is split to make serialization easier and not lose
471
    # precision.
472
    timestamp = utils.SplitTime(time.time())
473
    self._AppendFeedback(timestamp, log_type, log_msg)
474

    
475
  def ReportLocks(self, msg):
476
    """Write locking information to the job.
477

478
    Called whenever the LU processor is waiting for a lock or has acquired one.
479

480
    """
481
    # Not getting the queue lock because this is a single assignment
482
    self._job.lock_status = msg
483

    
484

    
485
class _JobChangesChecker(object):
486
  def __init__(self, fields, prev_job_info, prev_log_serial):
487
    """Initializes this class.
488

489
    @type fields: list of strings
490
    @param fields: Fields requested by LUXI client
491
    @type prev_job_info: string
492
    @param prev_job_info: previous job info, as passed by the LUXI client
493
    @type prev_log_serial: string
494
    @param prev_log_serial: previous job serial, as passed by the LUXI client
495

496
    """
497
    self._fields = fields
498
    self._prev_job_info = prev_job_info
499
    self._prev_log_serial = prev_log_serial
500

    
501
  def __call__(self, job):
502
    """Checks whether job has changed.
503

504
    @type job: L{_QueuedJob}
505
    @param job: Job object
506

507
    """
508
    status = job.CalcStatus()
509
    job_info = job.GetInfo(self._fields)
510
    log_entries = job.GetLogEntries(self._prev_log_serial)
511

    
512
    # Serializing and deserializing data can cause type changes (e.g. from
513
    # tuple to list) or precision loss. We're doing it here so that we get
514
    # the same modifications as the data received from the client. Without
515
    # this, the comparison afterwards might fail without the data being
516
    # significantly different.
517
    # TODO: we just deserialized from disk, investigate how to make sure that
518
    # the job info and log entries are compatible to avoid this further step.
519
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
520
    # efficient, though floats will be tricky
521
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
522
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
523

    
524
    # Don't even try to wait if the job is no longer running, there will be
525
    # no changes.
526
    if (status not in (constants.JOB_STATUS_QUEUED,
527
                       constants.JOB_STATUS_RUNNING,
528
                       constants.JOB_STATUS_WAITLOCK) or
529
        job_info != self._prev_job_info or
530
        (log_entries and self._prev_log_serial != log_entries[0][0])):
531
      logging.debug("Job %s changed", job.id)
532
      return (job_info, log_entries)
533

    
534
    return None
535

    
536

    
537
class _JobFileChangesWaiter(object):
538
  def __init__(self, filename):
539
    """Initializes this class.
540

541
    @type filename: string
542
    @param filename: Path to job file
543
    @raises errors.InotifyError: if the notifier cannot be setup
544

545
    """
546
    self._wm = pyinotify.WatchManager()
547
    self._inotify_handler = \
548
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
549
    self._notifier = \
550
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
551
    try:
552
      self._inotify_handler.enable()
553
    except Exception:
554
      # pyinotify doesn't close file descriptors automatically
555
      self._notifier.stop()
556
      raise
557

    
558
  def _OnInotify(self, notifier_enabled):
559
    """Callback for inotify.
560

561
    """
562
    if not notifier_enabled:
563
      self._inotify_handler.enable()
564

    
565
  def Wait(self, timeout):
566
    """Waits for the job file to change.
567

568
    @type timeout: float
569
    @param timeout: Timeout in seconds
570
    @return: Whether there have been events
571

572
    """
573
    assert timeout >= 0
574
    have_events = self._notifier.check_events(timeout * 1000)
575
    if have_events:
576
      self._notifier.read_events()
577
    self._notifier.process_events()
578
    return have_events
579

    
580
  def Close(self):
581
    """Closes underlying notifier and its file descriptor.
582

583
    """
584
    self._notifier.stop()
585

    
586

    
587
class _JobChangesWaiter(object):
588
  def __init__(self, filename):
589
    """Initializes this class.
590

591
    @type filename: string
592
    @param filename: Path to job file
593

594
    """
595
    self._filewaiter = None
596
    self._filename = filename
597

    
598
  def Wait(self, timeout):
599
    """Waits for a job to change.
600

601
    @type timeout: float
602
    @param timeout: Timeout in seconds
603
    @return: Whether there have been events
604

605
    """
606
    if self._filewaiter:
607
      return self._filewaiter.Wait(timeout)
608

    
609
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
610
    # If this point is reached, return immediately and let caller check the job
611
    # file again in case there were changes since the last check. This avoids a
612
    # race condition.
613
    self._filewaiter = _JobFileChangesWaiter(self._filename)
614

    
615
    return True
616

    
617
  def Close(self):
618
    """Closes underlying waiter.
619

620
    """
621
    if self._filewaiter:
622
      self._filewaiter.Close()
623

    
624

    
625
class _WaitForJobChangesHelper(object):
626
  """Helper class using inotify to wait for changes in a job file.
627

628
  This class takes a previous job status and serial, and alerts the client when
629
  the current job status has changed.
630

631
  """
632
  @staticmethod
633
  def _CheckForChanges(job_load_fn, check_fn):
634
    job = job_load_fn()
635
    if not job:
636
      raise errors.JobLost()
637

    
638
    result = check_fn(job)
639
    if result is None:
640
      raise utils.RetryAgain()
641

    
642
    return result
643

    
644
  def __call__(self, filename, job_load_fn,
645
               fields, prev_job_info, prev_log_serial, timeout):
646
    """Waits for changes on a job.
647

648
    @type filename: string
649
    @param filename: File on which to wait for changes
650
    @type job_load_fn: callable
651
    @param job_load_fn: Function to load job
652
    @type fields: list of strings
653
    @param fields: Which fields to check for changes
654
    @type prev_job_info: list or None
655
    @param prev_job_info: Last job information returned
656
    @type prev_log_serial: int
657
    @param prev_log_serial: Last job message serial number
658
    @type timeout: float
659
    @param timeout: maximum time to wait in seconds
660

661
    """
662
    try:
663
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
664
      waiter = _JobChangesWaiter(filename)
665
      try:
666
        return utils.Retry(compat.partial(self._CheckForChanges,
667
                                          job_load_fn, check_fn),
668
                           utils.RETRY_REMAINING_TIME, timeout,
669
                           wait_fn=waiter.Wait)
670
      finally:
671
        waiter.Close()
672
    except (errors.InotifyError, errors.JobLost):
673
      return None
674
    except utils.RetryTimeout:
675
      return constants.JOB_NOTCHANGED
676

    
677

    
678
class _JobQueueWorker(workerpool.BaseWorker):
679
  """The actual job workers.
680

681
  """
682
  def RunTask(self, job): # pylint: disable-msg=W0221
683
    """Job executor.
684

685
    This functions processes a job. It is closely tied to the _QueuedJob and
686
    _QueuedOpCode classes.
687

688
    @type job: L{_QueuedJob}
689
    @param job: the job to be processed
690

691
    """
692
    logging.info("Processing job %s", job.id)
693
    proc = mcpu.Processor(self.pool.queue.context, job.id)
694
    queue = job.queue
695
    try:
696
      try:
697
        count = len(job.ops)
698
        for idx, op in enumerate(job.ops):
699
          op_summary = op.input.Summary()
700
          if op.status == constants.OP_STATUS_SUCCESS:
701
            # this is a job that was partially completed before master
702
            # daemon shutdown, so it can be expected that some opcodes
703
            # are already completed successfully (if any did error
704
            # out, then the whole job should have been aborted and not
705
            # resubmitted for processing)
706
            logging.info("Op %s/%s: opcode %s already processed, skipping",
707
                         idx + 1, count, op_summary)
708
            continue
709
          try:
710
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
711
                         op_summary)
712

    
713
            queue.acquire(shared=1)
714
            try:
715
              if op.status == constants.OP_STATUS_CANCELED:
716
                raise CancelJob()
717
              assert op.status == constants.OP_STATUS_QUEUED
718
              op.status = constants.OP_STATUS_WAITLOCK
719
              op.result = None
720
              op.start_timestamp = TimeStampNow()
721
              if idx == 0: # first opcode
722
                job.start_timestamp = op.start_timestamp
723
              queue.UpdateJobUnlocked(job)
724

    
725
              input_opcode = op.input
726
            finally:
727
              queue.release()
728

    
729
            # Make sure not to hold queue lock while calling ExecOpCode
730
            result = proc.ExecOpCode(input_opcode,
731
                                     _OpExecCallbacks(queue, job, op))
732

    
733
            queue.acquire(shared=1)
734
            try:
735
              op.status = constants.OP_STATUS_SUCCESS
736
              op.result = result
737
              op.end_timestamp = TimeStampNow()
738
              queue.UpdateJobUnlocked(job)
739
            finally:
740
              queue.release()
741

    
742
            logging.info("Op %s/%s: Successfully finished opcode %s",
743
                         idx + 1, count, op_summary)
744
          except CancelJob:
745
            # Will be handled further up
746
            raise
747
          except Exception, err:
748
            queue.acquire(shared=1)
749
            try:
750
              try:
751
                op.status = constants.OP_STATUS_ERROR
752
                if isinstance(err, errors.GenericError):
753
                  to_encode = err
754
                else:
755
                  to_encode = errors.OpExecError(str(err))
756
                op.result = errors.EncodeException(to_encode)
757
                op.end_timestamp = TimeStampNow()
758
                logging.info("Op %s/%s: Error in opcode %s: %s",
759
                             idx + 1, count, op_summary, err)
760
              finally:
761
                queue.UpdateJobUnlocked(job)
762
            finally:
763
              queue.release()
764
            raise
765

    
766
      except CancelJob:
767
        queue.acquire(shared=1)
768
        try:
769
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
770
                                "Job canceled by request")
771
        finally:
772
          queue.release()
773
      except errors.GenericError, err:
774
        logging.exception("Ganeti exception")
775
      except:
776
        logging.exception("Unhandled exception")
777
    finally:
778
      queue.acquire(shared=1)
779
      try:
780
        try:
781
          job.lock_status = None
782
          job.end_timestamp = TimeStampNow()
783
          queue.UpdateJobUnlocked(job)
784
        finally:
785
          job_id = job.id
786
          status = job.CalcStatus()
787
      finally:
788
        queue.release()
789

    
790
      logging.info("Finished job %s, status = %s", job_id, status)
791

    
792

    
793
class _JobQueueWorkerPool(workerpool.WorkerPool):
794
  """Simple class implementing a job-processing workerpool.
795

796
  """
797
  def __init__(self, queue):
798
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
799
                                              JOBQUEUE_THREADS,
800
                                              _JobQueueWorker)
801
    self.queue = queue
802

    
803

    
804
def _RequireOpenQueue(fn):
805
  """Decorator for "public" functions.
806

807
  This function should be used for all 'public' functions. That is,
808
  functions usually called from other classes. Note that this should
809
  be applied only to methods (not plain functions), since it expects
810
  that the decorated function is called with a first argument that has
811
  a '_queue_filelock' argument.
812

813
  @warning: Use this decorator only after locking.ssynchronized
814

815
  Example::
816
    @locking.ssynchronized(_LOCK)
817
    @_RequireOpenQueue
818
    def Example(self):
819
      pass
820

821
  """
822
  def wrapper(self, *args, **kwargs):
823
    # pylint: disable-msg=W0212
824
    assert self._queue_filelock is not None, "Queue should be open"
825
    return fn(self, *args, **kwargs)
826
  return wrapper
827

    
828

    
829
class JobQueue(object):
830
  """Queue used to manage the jobs.
831

832
  @cvar _RE_JOB_FILE: regex matching the valid job file names
833

834
  """
835
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
836

    
837
  def __init__(self, context):
838
    """Constructor for JobQueue.
839

840
    The constructor will initialize the job queue object and then
841
    start loading the current jobs from disk, either for starting them
842
    (if they were queue) or for aborting them (if they were already
843
    running).
844

845
    @type context: GanetiContext
846
    @param context: the context object for access to the configuration
847
        data and other ganeti objects
848

849
    """
850
    self.context = context
851
    self._memcache = weakref.WeakValueDictionary()
852
    self._my_hostname = netutils.Hostname.GetSysName()
853

    
854
    # The Big JobQueue lock. If a code block or method acquires it in shared
855
    # mode safe it must guarantee concurrency with all the code acquiring it in
856
    # shared mode, including itself. In order not to acquire it at all
857
    # concurrency must be guaranteed with all code acquiring it in shared mode
858
    # and all code acquiring it exclusively.
859
    self._lock = locking.SharedLock("JobQueue")
860

    
861
    self.acquire = self._lock.acquire
862
    self.release = self._lock.release
863

    
864
    # Initialize the queue, and acquire the filelock.
865
    # This ensures no other process is working on the job queue.
866
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
867

    
868
    # Read serial file
869
    self._last_serial = jstore.ReadSerial()
870
    assert self._last_serial is not None, ("Serial file was modified between"
871
                                           " check in jstore and here")
872

    
873
    # Get initial list of nodes
874
    self._nodes = dict((n.name, n.primary_ip)
875
                       for n in self.context.cfg.GetAllNodesInfo().values()
876
                       if n.master_candidate)
877

    
878
    # Remove master node
879
    self._nodes.pop(self._my_hostname, None)
880

    
881
    # TODO: Check consistency across nodes
882

    
883
    self._queue_size = 0
884
    self._UpdateQueueSizeUnlocked()
885
    self._drained = self._IsQueueMarkedDrain()
886

    
887
    # Setup worker pool
888
    self._wpool = _JobQueueWorkerPool(self)
889
    try:
890
      # We need to lock here because WorkerPool.AddTask() may start a job while
891
      # we're still doing our work.
892
      self.acquire()
893
      try:
894
        logging.info("Inspecting job queue")
895

    
896
        all_job_ids = self._GetJobIDsUnlocked()
897
        jobs_count = len(all_job_ids)
898
        lastinfo = time.time()
899
        for idx, job_id in enumerate(all_job_ids):
900
          # Give an update every 1000 jobs or 10 seconds
901
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
902
              idx == (jobs_count - 1)):
903
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
904
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
905
            lastinfo = time.time()
906

    
907
          job = self._LoadJobUnlocked(job_id)
908

    
909
          # a failure in loading the job can cause 'None' to be returned
910
          if job is None:
911
            continue
912

    
913
          status = job.CalcStatus()
914

    
915
          if status in (constants.JOB_STATUS_QUEUED, ):
916
            self._wpool.AddTask((job, ))
917

    
918
          elif status in (constants.JOB_STATUS_RUNNING,
919
                          constants.JOB_STATUS_WAITLOCK,
920
                          constants.JOB_STATUS_CANCELING):
921
            logging.warning("Unfinished job %s found: %s", job.id, job)
922
            job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
923
                                  "Unclean master daemon shutdown")
924

    
925
        logging.info("Job queue inspection finished")
926
      finally:
927
        self.release()
928
    except:
929
      self._wpool.TerminateWorkers()
930
      raise
931

    
932
  @locking.ssynchronized(_LOCK)
933
  @_RequireOpenQueue
934
  def AddNode(self, node):
935
    """Register a new node with the queue.
936

937
    @type node: L{objects.Node}
938
    @param node: the node object to be added
939

940
    """
941
    node_name = node.name
942
    assert node_name != self._my_hostname
943

    
944
    # Clean queue directory on added node
945
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
946
    msg = result.fail_msg
947
    if msg:
948
      logging.warning("Cannot cleanup queue directory on node %s: %s",
949
                      node_name, msg)
950

    
951
    if not node.master_candidate:
952
      # remove if existing, ignoring errors
953
      self._nodes.pop(node_name, None)
954
      # and skip the replication of the job ids
955
      return
956

    
957
    # Upload the whole queue excluding archived jobs
958
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
959

    
960
    # Upload current serial file
961
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
962

    
963
    for file_name in files:
964
      # Read file content
965
      content = utils.ReadFile(file_name)
966

    
967
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
968
                                                  [node.primary_ip],
969
                                                  file_name, content)
970
      msg = result[node_name].fail_msg
971
      if msg:
972
        logging.error("Failed to upload file %s to node %s: %s",
973
                      file_name, node_name, msg)
974

    
975
    self._nodes[node_name] = node.primary_ip
976

    
977
  @locking.ssynchronized(_LOCK)
978
  @_RequireOpenQueue
979
  def RemoveNode(self, node_name):
980
    """Callback called when removing nodes from the cluster.
981

982
    @type node_name: str
983
    @param node_name: the name of the node to remove
984

985
    """
986
    self._nodes.pop(node_name, None)
987

    
988
  @staticmethod
989
  def _CheckRpcResult(result, nodes, failmsg):
990
    """Verifies the status of an RPC call.
991

992
    Since we aim to keep consistency should this node (the current
993
    master) fail, we will log errors if our rpc fail, and especially
994
    log the case when more than half of the nodes fails.
995

996
    @param result: the data as returned from the rpc call
997
    @type nodes: list
998
    @param nodes: the list of nodes we made the call to
999
    @type failmsg: str
1000
    @param failmsg: the identifier to be used for logging
1001

1002
    """
1003
    failed = []
1004
    success = []
1005

    
1006
    for node in nodes:
1007
      msg = result[node].fail_msg
1008
      if msg:
1009
        failed.append(node)
1010
        logging.error("RPC call %s (%s) failed on node %s: %s",
1011
                      result[node].call, failmsg, node, msg)
1012
      else:
1013
        success.append(node)
1014

    
1015
    # +1 for the master node
1016
    if (len(success) + 1) < len(failed):
1017
      # TODO: Handle failing nodes
1018
      logging.error("More than half of the nodes failed")
1019

    
1020
  def _GetNodeIp(self):
1021
    """Helper for returning the node name/ip list.
1022

1023
    @rtype: (list, list)
1024
    @return: a tuple of two lists, the first one with the node
1025
        names and the second one with the node addresses
1026

1027
    """
1028
    name_list = self._nodes.keys()
1029
    addr_list = [self._nodes[name] for name in name_list]
1030
    return name_list, addr_list
1031

    
1032
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1033
    """Writes a file locally and then replicates it to all nodes.
1034

1035
    This function will replace the contents of a file on the local
1036
    node and then replicate it to all the other nodes we have.
1037

1038
    @type file_name: str
1039
    @param file_name: the path of the file to be replicated
1040
    @type data: str
1041
    @param data: the new contents of the file
1042
    @type replicate: boolean
1043
    @param replicate: whether to spread the changes to the remote nodes
1044

1045
    """
1046
    utils.WriteFile(file_name, data=data)
1047

    
1048
    if replicate:
1049
      names, addrs = self._GetNodeIp()
1050
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1051
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1052

    
1053
  def _RenameFilesUnlocked(self, rename):
1054
    """Renames a file locally and then replicate the change.
1055

1056
    This function will rename a file in the local queue directory
1057
    and then replicate this rename to all the other nodes we have.
1058

1059
    @type rename: list of (old, new)
1060
    @param rename: List containing tuples mapping old to new names
1061

1062
    """
1063
    # Rename them locally
1064
    for old, new in rename:
1065
      utils.RenameFile(old, new, mkdir=True)
1066

    
1067
    # ... and on all nodes
1068
    names, addrs = self._GetNodeIp()
1069
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1070
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1071

    
1072
  @staticmethod
1073
  def _FormatJobID(job_id):
1074
    """Convert a job ID to string format.
1075

1076
    Currently this just does C{str(job_id)} after performing some
1077
    checks, but if we want to change the job id format this will
1078
    abstract this change.
1079

1080
    @type job_id: int or long
1081
    @param job_id: the numeric job id
1082
    @rtype: str
1083
    @return: the formatted job id
1084

1085
    """
1086
    if not isinstance(job_id, (int, long)):
1087
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1088
    if job_id < 0:
1089
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1090

    
1091
    return str(job_id)
1092

    
1093
  @classmethod
1094
  def _GetArchiveDirectory(cls, job_id):
1095
    """Returns the archive directory for a job.
1096

1097
    @type job_id: str
1098
    @param job_id: Job identifier
1099
    @rtype: str
1100
    @return: Directory name
1101

1102
    """
1103
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1104

    
1105
  def _NewSerialsUnlocked(self, count):
1106
    """Generates a new job identifier.
1107

1108
    Job identifiers are unique during the lifetime of a cluster.
1109

1110
    @type count: integer
1111
    @param count: how many serials to return
1112
    @rtype: str
1113
    @return: a string representing the job identifier.
1114

1115
    """
1116
    assert count > 0
1117
    # New number
1118
    serial = self._last_serial + count
1119

    
1120
    # Write to file
1121
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1122
                             "%s\n" % serial, True)
1123

    
1124
    result = [self._FormatJobID(v)
1125
              for v in range(self._last_serial, serial + 1)]
1126
    # Keep it only if we were able to write the file
1127
    self._last_serial = serial
1128

    
1129
    return result
1130

    
1131
  @staticmethod
1132
  def _GetJobPath(job_id):
1133
    """Returns the job file for a given job id.
1134

1135
    @type job_id: str
1136
    @param job_id: the job identifier
1137
    @rtype: str
1138
    @return: the path to the job file
1139

1140
    """
1141
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1142

    
1143
  @classmethod
1144
  def _GetArchivedJobPath(cls, job_id):
1145
    """Returns the archived job file for a give job id.
1146

1147
    @type job_id: str
1148
    @param job_id: the job identifier
1149
    @rtype: str
1150
    @return: the path to the archived job file
1151

1152
    """
1153
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1154
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1155

    
1156
  def _GetJobIDsUnlocked(self, sort=True):
1157
    """Return all known job IDs.
1158

1159
    The method only looks at disk because it's a requirement that all
1160
    jobs are present on disk (so in the _memcache we don't have any
1161
    extra IDs).
1162

1163
    @type sort: boolean
1164
    @param sort: perform sorting on the returned job ids
1165
    @rtype: list
1166
    @return: the list of job IDs
1167

1168
    """
1169
    jlist = []
1170
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1171
      m = self._RE_JOB_FILE.match(filename)
1172
      if m:
1173
        jlist.append(m.group(1))
1174
    if sort:
1175
      jlist = utils.NiceSort(jlist)
1176
    return jlist
1177

    
1178
  def _LoadJobUnlocked(self, job_id):
1179
    """Loads a job from the disk or memory.
1180

1181
    Given a job id, this will return the cached job object if
1182
    existing, or try to load the job from the disk. If loading from
1183
    disk, it will also add the job to the cache.
1184

1185
    @param job_id: the job id
1186
    @rtype: L{_QueuedJob} or None
1187
    @return: either None or the job object
1188

1189
    """
1190
    job = self._memcache.get(job_id, None)
1191
    if job:
1192
      logging.debug("Found job %s in memcache", job_id)
1193
      return job
1194

    
1195
    try:
1196
      job = self._LoadJobFromDisk(job_id)
1197
      if job is None:
1198
        return job
1199
    except errors.JobFileCorrupted:
1200
      old_path = self._GetJobPath(job_id)
1201
      new_path = self._GetArchivedJobPath(job_id)
1202
      if old_path == new_path:
1203
        # job already archived (future case)
1204
        logging.exception("Can't parse job %s", job_id)
1205
      else:
1206
        # non-archived case
1207
        logging.exception("Can't parse job %s, will archive.", job_id)
1208
        self._RenameFilesUnlocked([(old_path, new_path)])
1209
      return None
1210

    
1211
    self._memcache[job_id] = job
1212
    logging.debug("Added job %s to the cache", job_id)
1213
    return job
1214

    
1215
  def _LoadJobFromDisk(self, job_id):
1216
    """Load the given job file from disk.
1217

1218
    Given a job file, read, load and restore it in a _QueuedJob format.
1219

1220
    @type job_id: string
1221
    @param job_id: job identifier
1222
    @rtype: L{_QueuedJob} or None
1223
    @return: either None or the job object
1224

1225
    """
1226
    filepath = self._GetJobPath(job_id)
1227
    logging.debug("Loading job from %s", filepath)
1228
    try:
1229
      raw_data = utils.ReadFile(filepath)
1230
    except EnvironmentError, err:
1231
      if err.errno in (errno.ENOENT, ):
1232
        return None
1233
      raise
1234

    
1235
    try:
1236
      data = serializer.LoadJson(raw_data)
1237
      job = _QueuedJob.Restore(self, data)
1238
    except Exception, err: # pylint: disable-msg=W0703
1239
      raise errors.JobFileCorrupted(err)
1240

    
1241
    return job
1242

    
1243
  def SafeLoadJobFromDisk(self, job_id):
1244
    """Load the given job file from disk.
1245

1246
    Given a job file, read, load and restore it in a _QueuedJob format.
1247
    In case of error reading the job, it gets returned as None, and the
1248
    exception is logged.
1249

1250
    @type job_id: string
1251
    @param job_id: job identifier
1252
    @rtype: L{_QueuedJob} or None
1253
    @return: either None or the job object
1254

1255
    """
1256
    try:
1257
      return self._LoadJobFromDisk(job_id)
1258
    except (errors.JobFileCorrupted, EnvironmentError):
1259
      logging.exception("Can't load/parse job %s", job_id)
1260
      return None
1261

    
1262
  @staticmethod
1263
  def _IsQueueMarkedDrain():
1264
    """Check if the queue is marked from drain.
1265

1266
    This currently uses the queue drain file, which makes it a
1267
    per-node flag. In the future this can be moved to the config file.
1268

1269
    @rtype: boolean
1270
    @return: True of the job queue is marked for draining
1271

1272
    """
1273
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1274

    
1275
  def _UpdateQueueSizeUnlocked(self):
1276
    """Update the queue size.
1277

1278
    """
1279
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1280

    
1281
  @locking.ssynchronized(_LOCK)
1282
  @_RequireOpenQueue
1283
  def SetDrainFlag(self, drain_flag):
1284
    """Sets the drain flag for the queue.
1285

1286
    @type drain_flag: boolean
1287
    @param drain_flag: Whether to set or unset the drain flag
1288

1289
    """
1290
    if drain_flag:
1291
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1292
    else:
1293
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1294

    
1295
    self._drained = drain_flag
1296

    
1297
    return True
1298

    
1299
  @_RequireOpenQueue
1300
  def _SubmitJobUnlocked(self, job_id, ops):
1301
    """Create and store a new job.
1302

1303
    This enters the job into our job queue and also puts it on the new
1304
    queue, in order for it to be picked up by the queue processors.
1305

1306
    @type job_id: job ID
1307
    @param job_id: the job ID for the new job
1308
    @type ops: list
1309
    @param ops: The list of OpCodes that will become the new job.
1310
    @rtype: L{_QueuedJob}
1311
    @return: the job object to be queued
1312
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1313
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1314

1315
    """
1316
    # Ok when sharing the big job queue lock, as the drain file is created when
1317
    # the lock is exclusive.
1318
    if self._drained:
1319
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1320

    
1321
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1322
      raise errors.JobQueueFull()
1323

    
1324
    job = _QueuedJob(self, job_id, ops)
1325

    
1326
    # Write to disk
1327
    self.UpdateJobUnlocked(job)
1328

    
1329
    self._queue_size += 1
1330

    
1331
    logging.debug("Adding new job %s to the cache", job_id)
1332
    self._memcache[job_id] = job
1333

    
1334
    return job
1335

    
1336
  @locking.ssynchronized(_LOCK)
1337
  @_RequireOpenQueue
1338
  def SubmitJob(self, ops):
1339
    """Create and store a new job.
1340

1341
    @see: L{_SubmitJobUnlocked}
1342

1343
    """
1344
    job_id = self._NewSerialsUnlocked(1)[0]
1345
    self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
1346
    return job_id
1347

    
1348
  @locking.ssynchronized(_LOCK)
1349
  @_RequireOpenQueue
1350
  def SubmitManyJobs(self, jobs):
1351
    """Create and store multiple jobs.
1352

1353
    @see: L{_SubmitJobUnlocked}
1354

1355
    """
1356
    results = []
1357
    tasks = []
1358
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1359
    for job_id, ops in zip(all_job_ids, jobs):
1360
      try:
1361
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1362
        status = True
1363
        data = job_id
1364
      except errors.GenericError, err:
1365
        data = str(err)
1366
        status = False
1367
      results.append((status, data))
1368
    self._wpool.AddManyTasks(tasks)
1369

    
1370
    return results
1371

    
1372
  @_RequireOpenQueue
1373
  def UpdateJobUnlocked(self, job, replicate=True):
1374
    """Update a job's on disk storage.
1375

1376
    After a job has been modified, this function needs to be called in
1377
    order to write the changes to disk and replicate them to the other
1378
    nodes.
1379

1380
    @type job: L{_QueuedJob}
1381
    @param job: the changed job
1382
    @type replicate: boolean
1383
    @param replicate: whether to replicate the change to remote nodes
1384

1385
    """
1386
    filename = self._GetJobPath(job.id)
1387
    data = serializer.DumpJson(job.Serialize(), indent=False)
1388
    logging.debug("Writing job %s to %s", job.id, filename)
1389
    self._UpdateJobQueueFile(filename, data, replicate)
1390

    
1391
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1392
                        timeout):
1393
    """Waits for changes in a job.
1394

1395
    @type job_id: string
1396
    @param job_id: Job identifier
1397
    @type fields: list of strings
1398
    @param fields: Which fields to check for changes
1399
    @type prev_job_info: list or None
1400
    @param prev_job_info: Last job information returned
1401
    @type prev_log_serial: int
1402
    @param prev_log_serial: Last job message serial number
1403
    @type timeout: float
1404
    @param timeout: maximum time to wait in seconds
1405
    @rtype: tuple (job info, log entries)
1406
    @return: a tuple of the job information as required via
1407
        the fields parameter, and the log entries as a list
1408

1409
        if the job has not changed and the timeout has expired,
1410
        we instead return a special value,
1411
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1412
        as such by the clients
1413

1414
    """
1415
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1416

    
1417
    helper = _WaitForJobChangesHelper()
1418

    
1419
    return helper(self._GetJobPath(job_id), load_fn,
1420
                  fields, prev_job_info, prev_log_serial, timeout)
1421

    
1422
  @locking.ssynchronized(_LOCK)
1423
  @_RequireOpenQueue
1424
  def CancelJob(self, job_id):
1425
    """Cancels a job.
1426

1427
    This will only succeed if the job has not started yet.
1428

1429
    @type job_id: string
1430
    @param job_id: job ID of job to be cancelled.
1431

1432
    """
1433
    logging.info("Cancelling job %s", job_id)
1434

    
1435
    job = self._LoadJobUnlocked(job_id)
1436
    if not job:
1437
      logging.debug("Job %s not found", job_id)
1438
      return (False, "Job %s not found" % job_id)
1439

    
1440
    job_status = job.CalcStatus()
1441

    
1442
    if job_status not in (constants.JOB_STATUS_QUEUED,
1443
                          constants.JOB_STATUS_WAITLOCK):
1444
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1445
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1446

    
1447
    if job_status == constants.JOB_STATUS_QUEUED:
1448
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1449
                            "Job canceled by request")
1450
      return (True, "Job %s canceled" % job.id)
1451

    
1452
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1453
      # The worker will notice the new status and cancel the job
1454
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1455
      return (True, "Job %s will be canceled" % job.id)
1456

    
1457
  @_RequireOpenQueue
1458
  def _ArchiveJobsUnlocked(self, jobs):
1459
    """Archives jobs.
1460

1461
    @type jobs: list of L{_QueuedJob}
1462
    @param jobs: Job objects
1463
    @rtype: int
1464
    @return: Number of archived jobs
1465

1466
    """
1467
    archive_jobs = []
1468
    rename_files = []
1469
    for job in jobs:
1470
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1471
        logging.debug("Job %s is not yet done", job.id)
1472
        continue
1473

    
1474
      archive_jobs.append(job)
1475

    
1476
      old = self._GetJobPath(job.id)
1477
      new = self._GetArchivedJobPath(job.id)
1478
      rename_files.append((old, new))
1479

    
1480
    # TODO: What if 1..n files fail to rename?
1481
    self._RenameFilesUnlocked(rename_files)
1482

    
1483
    logging.debug("Successfully archived job(s) %s",
1484
                  utils.CommaJoin(job.id for job in archive_jobs))
1485

    
1486
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1487
    # the files, we update the cached queue size from the filesystem. When we
1488
    # get around to fix the TODO: above, we can use the number of actually
1489
    # archived jobs to fix this.
1490
    self._UpdateQueueSizeUnlocked()
1491
    return len(archive_jobs)
1492

    
1493
  @locking.ssynchronized(_LOCK)
1494
  @_RequireOpenQueue
1495
  def ArchiveJob(self, job_id):
1496
    """Archives a job.
1497

1498
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1499

1500
    @type job_id: string
1501
    @param job_id: Job ID of job to be archived.
1502
    @rtype: bool
1503
    @return: Whether job was archived
1504

1505
    """
1506
    logging.info("Archiving job %s", job_id)
1507

    
1508
    job = self._LoadJobUnlocked(job_id)
1509
    if not job:
1510
      logging.debug("Job %s not found", job_id)
1511
      return False
1512

    
1513
    return self._ArchiveJobsUnlocked([job]) == 1
1514

    
1515
  @locking.ssynchronized(_LOCK)
1516
  @_RequireOpenQueue
1517
  def AutoArchiveJobs(self, age, timeout):
1518
    """Archives all jobs based on age.
1519

1520
    The method will archive all jobs which are older than the age
1521
    parameter. For jobs that don't have an end timestamp, the start
1522
    timestamp will be considered. The special '-1' age will cause
1523
    archival of all jobs (that are not running or queued).
1524

1525
    @type age: int
1526
    @param age: the minimum age in seconds
1527

1528
    """
1529
    logging.info("Archiving jobs with age more than %s seconds", age)
1530

    
1531
    now = time.time()
1532
    end_time = now + timeout
1533
    archived_count = 0
1534
    last_touched = 0
1535

    
1536
    all_job_ids = self._GetJobIDsUnlocked()
1537
    pending = []
1538
    for idx, job_id in enumerate(all_job_ids):
1539
      last_touched = idx + 1
1540

    
1541
      # Not optimal because jobs could be pending
1542
      # TODO: Measure average duration for job archival and take number of
1543
      # pending jobs into account.
1544
      if time.time() > end_time:
1545
        break
1546

    
1547
      # Returns None if the job failed to load
1548
      job = self._LoadJobUnlocked(job_id)
1549
      if job:
1550
        if job.end_timestamp is None:
1551
          if job.start_timestamp is None:
1552
            job_age = job.received_timestamp
1553
          else:
1554
            job_age = job.start_timestamp
1555
        else:
1556
          job_age = job.end_timestamp
1557

    
1558
        if age == -1 or now - job_age[0] > age:
1559
          pending.append(job)
1560

    
1561
          # Archive 10 jobs at a time
1562
          if len(pending) >= 10:
1563
            archived_count += self._ArchiveJobsUnlocked(pending)
1564
            pending = []
1565

    
1566
    if pending:
1567
      archived_count += self._ArchiveJobsUnlocked(pending)
1568

    
1569
    return (archived_count, len(all_job_ids) - last_touched)
1570

    
1571
  def QueryJobs(self, job_ids, fields):
1572
    """Returns a list of jobs in queue.
1573

1574
    @type job_ids: list
1575
    @param job_ids: sequence of job identifiers or None for all
1576
    @type fields: list
1577
    @param fields: names of fields to return
1578
    @rtype: list
1579
    @return: list one element per job, each element being list with
1580
        the requested fields
1581

1582
    """
1583
    jobs = []
1584
    list_all = False
1585
    if not job_ids:
1586
      # Since files are added to/removed from the queue atomically, there's no
1587
      # risk of getting the job ids in an inconsistent state.
1588
      job_ids = self._GetJobIDsUnlocked()
1589
      list_all = True
1590

    
1591
    for job_id in job_ids:
1592
      job = self.SafeLoadJobFromDisk(job_id)
1593
      if job is not None:
1594
        jobs.append(job.GetInfo(fields))
1595
      elif not list_all:
1596
        jobs.append(None)
1597

    
1598
    return jobs
1599

    
1600
  @locking.ssynchronized(_LOCK)
1601
  @_RequireOpenQueue
1602
  def Shutdown(self):
1603
    """Stops the job queue.
1604

1605
    This shutdowns all the worker threads an closes the queue.
1606

1607
    """
1608
    self._wpool.TerminateWorkers()
1609

    
1610
    self._queue_filelock.Close()
1611
    self._queue_filelock = None