Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ a1bfdeb1

History | View | Annotate | Download (45.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
try:
41
  # pylint: disable-msg=E0611
42
  from pyinotify import pyinotify
43
except ImportError:
44
  import pyinotify
45

    
46
from ganeti import asyncnotifier
47
from ganeti import constants
48
from ganeti import serializer
49
from ganeti import workerpool
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56

    
57

    
58
JOBQUEUE_THREADS = 25
59
JOBS_PER_ARCHIVE_DIRECTORY = 10000
60

    
61

    
62
class CancelJob(Exception):
63
  """Special exception to cancel a job.
64

65
  """
66

    
67

    
68
def TimeStampNow():
69
  """Returns the current timestamp.
70

71
  @rtype: tuple
72
  @return: the current time in the (seconds, microseconds) format
73

74
  """
75
  return utils.SplitTime(time.time())
76

    
77

    
78
class _QueuedOpCode(object):
79
  """Encapsulates an opcode object.
80

81
  @ivar log: holds the execution log and consists of tuples
82
  of the form C{(log_serial, timestamp, level, message)}
83
  @ivar input: the OpCode we encapsulate
84
  @ivar status: the current status
85
  @ivar result: the result of the LU execution
86
  @ivar start_timestamp: timestamp for the start of the execution
87
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
88
  @ivar stop_timestamp: timestamp for the end of the execution
89

90
  """
91
  __slots__ = ["input", "status", "result", "log",
92
               "start_timestamp", "exec_timestamp", "end_timestamp",
93
               "__weakref__"]
94

    
95
  def __init__(self, op):
96
    """Constructor for the _QuededOpCode.
97

98
    @type op: L{opcodes.OpCode}
99
    @param op: the opcode we encapsulate
100

101
    """
102
    self.input = op
103
    self.status = constants.OP_STATUS_QUEUED
104
    self.result = None
105
    self.log = []
106
    self.start_timestamp = None
107
    self.exec_timestamp = None
108
    self.end_timestamp = None
109

    
110
  @classmethod
111
  def Restore(cls, state):
112
    """Restore the _QueuedOpCode from the serialized form.
113

114
    @type state: dict
115
    @param state: the serialized state
116
    @rtype: _QueuedOpCode
117
    @return: a new _QueuedOpCode instance
118

119
    """
120
    obj = _QueuedOpCode.__new__(cls)
121
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
122
    obj.status = state["status"]
123
    obj.result = state["result"]
124
    obj.log = state["log"]
125
    obj.start_timestamp = state.get("start_timestamp", None)
126
    obj.exec_timestamp = state.get("exec_timestamp", None)
127
    obj.end_timestamp = state.get("end_timestamp", None)
128
    return obj
129

    
130
  def Serialize(self):
131
    """Serializes this _QueuedOpCode.
132

133
    @rtype: dict
134
    @return: the dictionary holding the serialized state
135

136
    """
137
    return {
138
      "input": self.input.__getstate__(),
139
      "status": self.status,
140
      "result": self.result,
141
      "log": self.log,
142
      "start_timestamp": self.start_timestamp,
143
      "exec_timestamp": self.exec_timestamp,
144
      "end_timestamp": self.end_timestamp,
145
      }
146

    
147

    
148
class _QueuedJob(object):
149
  """In-memory job representation.
150

151
  This is what we use to track the user-submitted jobs. Locking must
152
  be taken care of by users of this class.
153

154
  @type queue: L{JobQueue}
155
  @ivar queue: the parent queue
156
  @ivar id: the job ID
157
  @type ops: list
158
  @ivar ops: the list of _QueuedOpCode that constitute the job
159
  @type log_serial: int
160
  @ivar log_serial: holds the index for the next log entry
161
  @ivar received_timestamp: the timestamp for when the job was received
162
  @ivar start_timestmap: the timestamp for start of execution
163
  @ivar end_timestamp: the timestamp for end of execution
164
  @ivar lock_status: In-memory locking information for debugging
165

166
  """
167
  # pylint: disable-msg=W0212
168
  __slots__ = ["queue", "id", "ops", "log_serial",
169
               "received_timestamp", "start_timestamp", "end_timestamp",
170
               "lock_status", "change",
171
               "__weakref__"]
172

    
173
  def __init__(self, queue, job_id, ops):
174
    """Constructor for the _QueuedJob.
175

176
    @type queue: L{JobQueue}
177
    @param queue: our parent queue
178
    @type job_id: job_id
179
    @param job_id: our job id
180
    @type ops: list
181
    @param ops: the list of opcodes we hold, which will be encapsulated
182
        in _QueuedOpCodes
183

184
    """
185
    if not ops:
186
      raise errors.GenericError("A job needs at least one opcode")
187

    
188
    self.queue = queue
189
    self.id = job_id
190
    self.ops = [_QueuedOpCode(op) for op in ops]
191
    self.log_serial = 0
192
    self.received_timestamp = TimeStampNow()
193
    self.start_timestamp = None
194
    self.end_timestamp = None
195

    
196
    # In-memory attributes
197
    self.lock_status = None
198

    
199
  def __repr__(self):
200
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
201
              "id=%s" % self.id,
202
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
203

    
204
    return "<%s at %#x>" % (" ".join(status), id(self))
205

    
206
  @classmethod
207
  def Restore(cls, queue, state):
208
    """Restore a _QueuedJob from serialized state:
209

210
    @type queue: L{JobQueue}
211
    @param queue: to which queue the restored job belongs
212
    @type state: dict
213
    @param state: the serialized state
214
    @rtype: _JobQueue
215
    @return: the restored _JobQueue instance
216

217
    """
218
    obj = _QueuedJob.__new__(cls)
219
    obj.queue = queue
220
    obj.id = state["id"]
221
    obj.received_timestamp = state.get("received_timestamp", None)
222
    obj.start_timestamp = state.get("start_timestamp", None)
223
    obj.end_timestamp = state.get("end_timestamp", None)
224

    
225
    # In-memory attributes
226
    obj.lock_status = None
227

    
228
    obj.ops = []
229
    obj.log_serial = 0
230
    for op_state in state["ops"]:
231
      op = _QueuedOpCode.Restore(op_state)
232
      for log_entry in op.log:
233
        obj.log_serial = max(obj.log_serial, log_entry[0])
234
      obj.ops.append(op)
235

    
236
    return obj
237

    
238
  def Serialize(self):
239
    """Serialize the _JobQueue instance.
240

241
    @rtype: dict
242
    @return: the serialized state
243

244
    """
245
    return {
246
      "id": self.id,
247
      "ops": [op.Serialize() for op in self.ops],
248
      "start_timestamp": self.start_timestamp,
249
      "end_timestamp": self.end_timestamp,
250
      "received_timestamp": self.received_timestamp,
251
      }
252

    
253
  def CalcStatus(self):
254
    """Compute the status of this job.
255

256
    This function iterates over all the _QueuedOpCodes in the job and
257
    based on their status, computes the job status.
258

259
    The algorithm is:
260
      - if we find a cancelled, or finished with error, the job
261
        status will be the same
262
      - otherwise, the last opcode with the status one of:
263
          - waitlock
264
          - canceling
265
          - running
266

267
        will determine the job status
268

269
      - otherwise, it means either all opcodes are queued, or success,
270
        and the job status will be the same
271

272
    @return: the job status
273

274
    """
275
    status = constants.JOB_STATUS_QUEUED
276

    
277
    all_success = True
278
    for op in self.ops:
279
      if op.status == constants.OP_STATUS_SUCCESS:
280
        continue
281

    
282
      all_success = False
283

    
284
      if op.status == constants.OP_STATUS_QUEUED:
285
        pass
286
      elif op.status == constants.OP_STATUS_WAITLOCK:
287
        status = constants.JOB_STATUS_WAITLOCK
288
      elif op.status == constants.OP_STATUS_RUNNING:
289
        status = constants.JOB_STATUS_RUNNING
290
      elif op.status == constants.OP_STATUS_CANCELING:
291
        status = constants.JOB_STATUS_CANCELING
292
        break
293
      elif op.status == constants.OP_STATUS_ERROR:
294
        status = constants.JOB_STATUS_ERROR
295
        # The whole job fails if one opcode failed
296
        break
297
      elif op.status == constants.OP_STATUS_CANCELED:
298
        status = constants.OP_STATUS_CANCELED
299
        break
300

    
301
    if all_success:
302
      status = constants.JOB_STATUS_SUCCESS
303

    
304
    return status
305

    
306
  def GetLogEntries(self, newer_than):
307
    """Selectively returns the log entries.
308

309
    @type newer_than: None or int
310
    @param newer_than: if this is None, return all log entries,
311
        otherwise return only the log entries with serial higher
312
        than this value
313
    @rtype: list
314
    @return: the list of the log entries selected
315

316
    """
317
    if newer_than is None:
318
      serial = -1
319
    else:
320
      serial = newer_than
321

    
322
    entries = []
323
    for op in self.ops:
324
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
325

    
326
    return entries
327

    
328
  def GetInfo(self, fields):
329
    """Returns information about a job.
330

331
    @type fields: list
332
    @param fields: names of fields to return
333
    @rtype: list
334
    @return: list with one element for each field
335
    @raise errors.OpExecError: when an invalid field
336
        has been passed
337

338
    """
339
    row = []
340
    for fname in fields:
341
      if fname == "id":
342
        row.append(self.id)
343
      elif fname == "status":
344
        row.append(self.CalcStatus())
345
      elif fname == "ops":
346
        row.append([op.input.__getstate__() for op in self.ops])
347
      elif fname == "opresult":
348
        row.append([op.result for op in self.ops])
349
      elif fname == "opstatus":
350
        row.append([op.status for op in self.ops])
351
      elif fname == "oplog":
352
        row.append([op.log for op in self.ops])
353
      elif fname == "opstart":
354
        row.append([op.start_timestamp for op in self.ops])
355
      elif fname == "opexec":
356
        row.append([op.exec_timestamp for op in self.ops])
357
      elif fname == "opend":
358
        row.append([op.end_timestamp for op in self.ops])
359
      elif fname == "received_ts":
360
        row.append(self.received_timestamp)
361
      elif fname == "start_ts":
362
        row.append(self.start_timestamp)
363
      elif fname == "end_ts":
364
        row.append(self.end_timestamp)
365
      elif fname == "lock_status":
366
        row.append(self.lock_status)
367
      elif fname == "summary":
368
        row.append([op.input.Summary() for op in self.ops])
369
      else:
370
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
371
    return row
372

    
373
  def MarkUnfinishedOps(self, status, result):
374
    """Mark unfinished opcodes with a given status and result.
375

376
    This is an utility function for marking all running or waiting to
377
    be run opcodes with a given status. Opcodes which are already
378
    finalised are not changed.
379

380
    @param status: a given opcode status
381
    @param result: the opcode result
382

383
    """
384
    not_marked = True
385
    for op in self.ops:
386
      if op.status in constants.OPS_FINALIZED:
387
        assert not_marked, "Finalized opcodes found after non-finalized ones"
388
        continue
389
      op.status = status
390
      op.result = result
391
      not_marked = False
392

    
393

    
394
class _OpExecCallbacks(mcpu.OpExecCbBase):
395
  def __init__(self, queue, job, op):
396
    """Initializes this class.
397

398
    @type queue: L{JobQueue}
399
    @param queue: Job queue
400
    @type job: L{_QueuedJob}
401
    @param job: Job object
402
    @type op: L{_QueuedOpCode}
403
    @param op: OpCode
404

405
    """
406
    assert queue, "Queue is missing"
407
    assert job, "Job is missing"
408
    assert op, "Opcode is missing"
409

    
410
    self._queue = queue
411
    self._job = job
412
    self._op = op
413

    
414
  def NotifyStart(self):
415
    """Mark the opcode as running, not lock-waiting.
416

417
    This is called from the mcpu code as a notifier function, when the LU is
418
    finally about to start the Exec() method. Of course, to have end-user
419
    visible results, the opcode must be initially (before calling into
420
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
421

422
    """
423
    self._queue.acquire()
424
    try:
425
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
426
                                 constants.OP_STATUS_CANCELING)
427

    
428
      # All locks are acquired by now
429
      self._job.lock_status = None
430

    
431
      # Cancel here if we were asked to
432
      if self._op.status == constants.OP_STATUS_CANCELING:
433
        raise CancelJob()
434

    
435
      self._op.status = constants.OP_STATUS_RUNNING
436
      self._op.exec_timestamp = TimeStampNow()
437
    finally:
438
      self._queue.release()
439

    
440
  def Feedback(self, *args):
441
    """Append a log entry.
442

443
    """
444
    assert len(args) < 3
445

    
446
    if len(args) == 1:
447
      log_type = constants.ELOG_MESSAGE
448
      log_msg = args[0]
449
    else:
450
      (log_type, log_msg) = args
451

    
452
    # The time is split to make serialization easier and not lose
453
    # precision.
454
    timestamp = utils.SplitTime(time.time())
455

    
456
    self._queue.acquire()
457
    try:
458
      self._job.log_serial += 1
459
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
460
      self._queue.UpdateJobUnlocked(self._job, replicate=False)
461
    finally:
462
      self._queue.release()
463

    
464
  def ReportLocks(self, msg):
465
    """Write locking information to the job.
466

467
    Called whenever the LU processor is waiting for a lock or has acquired one.
468

469
    """
470
    # Not getting the queue lock because this is a single assignment
471
    self._job.lock_status = msg
472

    
473

    
474
class _WaitForJobChangesHelper(object):
475
  """Helper class using initofy to wait for changes in a job file.
476

477
  This class takes a previous job status and serial, and alerts the client when
478
  the current job status has changed.
479

480
  @type job_id: string
481
  @ivar job_id: id of the job we're watching
482
  @type prev_job_info: string
483
  @ivar prev_job_info: previous job info, as passed by the luxi client
484
  @type prev_log_serial: string
485
  @ivar prev_log_serial: previous job serial, as passed by the luxi client
486
  @type queue: L{JobQueue}
487
  @ivar queue: job queue (used for a few utility functions)
488
  @type job_path: string
489
  @ivar job_path: absolute path of the job file
490
  @type wm: pyinotify.WatchManager (or None)
491
  @ivar wm: inotify watch manager to watch for changes
492
  @type inotify_handler: L{asyncnotifier.SingleFileEventHandler}
493
  @ivar inotify_handler: single file event handler, used for watching
494
  @type notifier: pyinotify.Notifier
495
  @ivar notifier: inotify single-threaded notifier, used for watching
496

497
  """
498
  def __init__(self, job_id, fields, prev_job_info, prev_log_serial, queue):
499
    self.job_id = job_id
500
    self.fields = fields
501
    self.prev_job_info = prev_job_info
502
    self.prev_log_serial = prev_log_serial
503
    self.queue = queue
504
    # pylint: disable-msg=W0212
505
    self.job_path = self.queue._GetJobPath(self.job_id)
506
    self.wm = None
507
    self.inotify_handler = None
508
    self.notifier = None
509

    
510
  def _SetupInotify(self):
511
    """Create the inotify
512

513
    @raises errors.InotifyError: if the notifier cannot be setup
514

515
    """
516
    if self.wm:
517
      return
518
    self.wm = pyinotify.WatchManager()
519
    self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm,
520
                                                                self.OnInotify,
521
                                                                self.job_path)
522
    self.notifier = pyinotify.Notifier(self.wm, self.inotify_handler)
523
    self.inotify_handler.enable()
524

    
525
  def _LoadDiskStatus(self):
526
    job = self.queue.SafeLoadJobFromDisk(self.job_id)
527
    if not job:
528
      raise errors.JobLost()
529
    self.job_status = job.CalcStatus()
530

    
531
    job_info = job.GetInfo(self.fields)
532
    log_entries = job.GetLogEntries(self.prev_log_serial)
533
    # Serializing and deserializing data can cause type changes (e.g. from
534
    # tuple to list) or precision loss. We're doing it here so that we get
535
    # the same modifications as the data received from the client. Without
536
    # this, the comparison afterwards might fail without the data being
537
    # significantly different.
538
    # TODO: we just deserialized from disk, investigate how to make sure that
539
    # the job info and log entries are compatible to avoid this further step.
540
    self.job_info = serializer.LoadJson(serializer.DumpJson(job_info))
541
    self.log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
542

    
543
  def _CheckForChanges(self):
544
    self._LoadDiskStatus()
545
    # Don't even try to wait if the job is no longer running, there will be
546
    # no changes.
547
    if (self.job_status not in (constants.JOB_STATUS_QUEUED,
548
                                constants.JOB_STATUS_RUNNING,
549
                                constants.JOB_STATUS_WAITLOCK) or
550
        self.prev_job_info != self.job_info or
551
        (self.log_entries and self.prev_log_serial != self.log_entries[0][0])):
552
      logging.debug("Job %s changed", self.job_id)
553
      return (self.job_info, self.log_entries)
554

    
555
    raise utils.RetryAgain()
556

    
557
  def OnInotify(self, notifier_enabled):
558
    if not notifier_enabled:
559
      self.inotify_handler.enable()
560

    
561
  def WaitFn(self, timeout):
562
    self._SetupInotify()
563
    if self.notifier.check_events(timeout*1000):
564
      self.notifier.read_events()
565
    self.notifier.process_events()
566

    
567
  def WaitForChanges(self, timeout):
568
    try:
569
      return utils.Retry(self._CheckForChanges,
570
                         utils.RETRY_REMAINING_TIME,
571
                         timeout,
572
                         wait_fn=self.WaitFn)
573
    except (errors.InotifyError, errors.JobLost):
574
      return None
575
    except utils.RetryTimeout:
576
      return constants.JOB_NOTCHANGED
577

    
578
  def Close(self):
579
    if self.wm:
580
      self.notifier.stop()
581

    
582

    
583
class _JobQueueWorker(workerpool.BaseWorker):
584
  """The actual job workers.
585

586
  """
587
  def RunTask(self, job): # pylint: disable-msg=W0221
588
    """Job executor.
589

590
    This functions processes a job. It is closely tied to the _QueuedJob and
591
    _QueuedOpCode classes.
592

593
    @type job: L{_QueuedJob}
594
    @param job: the job to be processed
595

596
    """
597
    logging.info("Processing job %s", job.id)
598
    proc = mcpu.Processor(self.pool.queue.context, job.id)
599
    queue = job.queue
600
    try:
601
      try:
602
        count = len(job.ops)
603
        for idx, op in enumerate(job.ops):
604
          op_summary = op.input.Summary()
605
          if op.status == constants.OP_STATUS_SUCCESS:
606
            # this is a job that was partially completed before master
607
            # daemon shutdown, so it can be expected that some opcodes
608
            # are already completed successfully (if any did error
609
            # out, then the whole job should have been aborted and not
610
            # resubmitted for processing)
611
            logging.info("Op %s/%s: opcode %s already processed, skipping",
612
                         idx + 1, count, op_summary)
613
            continue
614
          try:
615
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
616
                         op_summary)
617

    
618
            queue.acquire()
619
            try:
620
              if op.status == constants.OP_STATUS_CANCELED:
621
                raise CancelJob()
622
              assert op.status == constants.OP_STATUS_QUEUED
623
              op.status = constants.OP_STATUS_WAITLOCK
624
              op.result = None
625
              op.start_timestamp = TimeStampNow()
626
              if idx == 0: # first opcode
627
                job.start_timestamp = op.start_timestamp
628
              queue.UpdateJobUnlocked(job)
629

    
630
              input_opcode = op.input
631
            finally:
632
              queue.release()
633

    
634
            # Make sure not to hold queue lock while calling ExecOpCode
635
            result = proc.ExecOpCode(input_opcode,
636
                                     _OpExecCallbacks(queue, job, op))
637

    
638
            queue.acquire()
639
            try:
640
              op.status = constants.OP_STATUS_SUCCESS
641
              op.result = result
642
              op.end_timestamp = TimeStampNow()
643
              queue.UpdateJobUnlocked(job)
644
            finally:
645
              queue.release()
646

    
647
            logging.info("Op %s/%s: Successfully finished opcode %s",
648
                         idx + 1, count, op_summary)
649
          except CancelJob:
650
            # Will be handled further up
651
            raise
652
          except Exception, err:
653
            queue.acquire()
654
            try:
655
              try:
656
                op.status = constants.OP_STATUS_ERROR
657
                if isinstance(err, errors.GenericError):
658
                  op.result = errors.EncodeException(err)
659
                else:
660
                  op.result = str(err)
661
                op.end_timestamp = TimeStampNow()
662
                logging.info("Op %s/%s: Error in opcode %s: %s",
663
                             idx + 1, count, op_summary, err)
664
              finally:
665
                queue.UpdateJobUnlocked(job)
666
            finally:
667
              queue.release()
668
            raise
669

    
670
      except CancelJob:
671
        queue.acquire()
672
        try:
673
          queue.CancelJobUnlocked(job)
674
        finally:
675
          queue.release()
676
      except errors.GenericError, err:
677
        logging.exception("Ganeti exception")
678
      except:
679
        logging.exception("Unhandled exception")
680
    finally:
681
      queue.acquire()
682
      try:
683
        try:
684
          job.lock_status = None
685
          job.end_timestamp = TimeStampNow()
686
          queue.UpdateJobUnlocked(job)
687
        finally:
688
          job_id = job.id
689
          status = job.CalcStatus()
690
      finally:
691
        queue.release()
692

    
693
      logging.info("Finished job %s, status = %s", job_id, status)
694

    
695

    
696
class _JobQueueWorkerPool(workerpool.WorkerPool):
697
  """Simple class implementing a job-processing workerpool.
698

699
  """
700
  def __init__(self, queue):
701
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
702
                                              JOBQUEUE_THREADS,
703
                                              _JobQueueWorker)
704
    self.queue = queue
705

    
706

    
707
def _RequireOpenQueue(fn):
708
  """Decorator for "public" functions.
709

710
  This function should be used for all 'public' functions. That is,
711
  functions usually called from other classes. Note that this should
712
  be applied only to methods (not plain functions), since it expects
713
  that the decorated function is called with a first argument that has
714
  a '_queue_filelock' argument.
715

716
  @warning: Use this decorator only after utils.LockedMethod!
717

718
  Example::
719
    @utils.LockedMethod
720
    @_RequireOpenQueue
721
    def Example(self):
722
      pass
723

724
  """
725
  def wrapper(self, *args, **kwargs):
726
    # pylint: disable-msg=W0212
727
    assert self._queue_filelock is not None, "Queue should be open"
728
    return fn(self, *args, **kwargs)
729
  return wrapper
730

    
731

    
732
class JobQueue(object):
733
  """Queue used to manage the jobs.
734

735
  @cvar _RE_JOB_FILE: regex matching the valid job file names
736

737
  """
738
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
739

    
740
  def __init__(self, context):
741
    """Constructor for JobQueue.
742

743
    The constructor will initialize the job queue object and then
744
    start loading the current jobs from disk, either for starting them
745
    (if they were queue) or for aborting them (if they were already
746
    running).
747

748
    @type context: GanetiContext
749
    @param context: the context object for access to the configuration
750
        data and other ganeti objects
751

752
    """
753
    self.context = context
754
    self._memcache = weakref.WeakValueDictionary()
755
    self._my_hostname = utils.HostInfo().name
756

    
757
    # Locking
758
    self._lock = threading.Lock()
759
    self.acquire = self._lock.acquire
760
    self.release = self._lock.release
761

    
762
    # Initialize the queue, and acquire the filelock.
763
    # This ensures no other process is working on the job queue.
764
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
765

    
766
    # Read serial file
767
    self._last_serial = jstore.ReadSerial()
768
    assert self._last_serial is not None, ("Serial file was modified between"
769
                                           " check in jstore and here")
770

    
771
    # Get initial list of nodes
772
    self._nodes = dict((n.name, n.primary_ip)
773
                       for n in self.context.cfg.GetAllNodesInfo().values()
774
                       if n.master_candidate)
775

    
776
    # Remove master node
777
    self._nodes.pop(self._my_hostname, None)
778

    
779
    # TODO: Check consistency across nodes
780

    
781
    self._queue_size = 0
782
    self._UpdateQueueSizeUnlocked()
783
    self._drained = self._IsQueueMarkedDrain()
784

    
785
    # Setup worker pool
786
    self._wpool = _JobQueueWorkerPool(self)
787
    try:
788
      # We need to lock here because WorkerPool.AddTask() may start a job while
789
      # we're still doing our work.
790
      self.acquire()
791
      try:
792
        logging.info("Inspecting job queue")
793

    
794
        all_job_ids = self._GetJobIDsUnlocked()
795
        jobs_count = len(all_job_ids)
796
        lastinfo = time.time()
797
        for idx, job_id in enumerate(all_job_ids):
798
          # Give an update every 1000 jobs or 10 seconds
799
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
800
              idx == (jobs_count - 1)):
801
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
802
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
803
            lastinfo = time.time()
804

    
805
          job = self._LoadJobUnlocked(job_id)
806

    
807
          # a failure in loading the job can cause 'None' to be returned
808
          if job is None:
809
            continue
810

    
811
          status = job.CalcStatus()
812

    
813
          if status in (constants.JOB_STATUS_QUEUED, ):
814
            self._wpool.AddTask(job)
815

    
816
          elif status in (constants.JOB_STATUS_RUNNING,
817
                          constants.JOB_STATUS_WAITLOCK,
818
                          constants.JOB_STATUS_CANCELING):
819
            logging.warning("Unfinished job %s found: %s", job.id, job)
820
            try:
821
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
822
                                    "Unclean master daemon shutdown")
823
            finally:
824
              self.UpdateJobUnlocked(job)
825

    
826
        logging.info("Job queue inspection finished")
827
      finally:
828
        self.release()
829
    except:
830
      self._wpool.TerminateWorkers()
831
      raise
832

    
833
  @utils.LockedMethod
834
  @_RequireOpenQueue
835
  def AddNode(self, node):
836
    """Register a new node with the queue.
837

838
    @type node: L{objects.Node}
839
    @param node: the node object to be added
840

841
    """
842
    node_name = node.name
843
    assert node_name != self._my_hostname
844

    
845
    # Clean queue directory on added node
846
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
847
    msg = result.fail_msg
848
    if msg:
849
      logging.warning("Cannot cleanup queue directory on node %s: %s",
850
                      node_name, msg)
851

    
852
    if not node.master_candidate:
853
      # remove if existing, ignoring errors
854
      self._nodes.pop(node_name, None)
855
      # and skip the replication of the job ids
856
      return
857

    
858
    # Upload the whole queue excluding archived jobs
859
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
860

    
861
    # Upload current serial file
862
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
863

    
864
    for file_name in files:
865
      # Read file content
866
      content = utils.ReadFile(file_name)
867

    
868
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
869
                                                  [node.primary_ip],
870
                                                  file_name, content)
871
      msg = result[node_name].fail_msg
872
      if msg:
873
        logging.error("Failed to upload file %s to node %s: %s",
874
                      file_name, node_name, msg)
875

    
876
    self._nodes[node_name] = node.primary_ip
877

    
878
  @utils.LockedMethod
879
  @_RequireOpenQueue
880
  def RemoveNode(self, node_name):
881
    """Callback called when removing nodes from the cluster.
882

883
    @type node_name: str
884
    @param node_name: the name of the node to remove
885

886
    """
887
    self._nodes.pop(node_name, None)
888

    
889
  @staticmethod
890
  def _CheckRpcResult(result, nodes, failmsg):
891
    """Verifies the status of an RPC call.
892

893
    Since we aim to keep consistency should this node (the current
894
    master) fail, we will log errors if our rpc fail, and especially
895
    log the case when more than half of the nodes fails.
896

897
    @param result: the data as returned from the rpc call
898
    @type nodes: list
899
    @param nodes: the list of nodes we made the call to
900
    @type failmsg: str
901
    @param failmsg: the identifier to be used for logging
902

903
    """
904
    failed = []
905
    success = []
906

    
907
    for node in nodes:
908
      msg = result[node].fail_msg
909
      if msg:
910
        failed.append(node)
911
        logging.error("RPC call %s (%s) failed on node %s: %s",
912
                      result[node].call, failmsg, node, msg)
913
      else:
914
        success.append(node)
915

    
916
    # +1 for the master node
917
    if (len(success) + 1) < len(failed):
918
      # TODO: Handle failing nodes
919
      logging.error("More than half of the nodes failed")
920

    
921
  def _GetNodeIp(self):
922
    """Helper for returning the node name/ip list.
923

924
    @rtype: (list, list)
925
    @return: a tuple of two lists, the first one with the node
926
        names and the second one with the node addresses
927

928
    """
929
    name_list = self._nodes.keys()
930
    addr_list = [self._nodes[name] for name in name_list]
931
    return name_list, addr_list
932

    
933
  def _UpdateJobQueueFile(self, file_name, data, replicate):
934
    """Writes a file locally and then replicates it to all nodes.
935

936
    This function will replace the contents of a file on the local
937
    node and then replicate it to all the other nodes we have.
938

939
    @type file_name: str
940
    @param file_name: the path of the file to be replicated
941
    @type data: str
942
    @param data: the new contents of the file
943
    @type replicate: boolean
944
    @param replicate: whether to spread the changes to the remote nodes
945

946
    """
947
    utils.WriteFile(file_name, data=data)
948

    
949
    if replicate:
950
      names, addrs = self._GetNodeIp()
951
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
952
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
953

    
954
  def _RenameFilesUnlocked(self, rename):
955
    """Renames a file locally and then replicate the change.
956

957
    This function will rename a file in the local queue directory
958
    and then replicate this rename to all the other nodes we have.
959

960
    @type rename: list of (old, new)
961
    @param rename: List containing tuples mapping old to new names
962

963
    """
964
    # Rename them locally
965
    for old, new in rename:
966
      utils.RenameFile(old, new, mkdir=True)
967

    
968
    # ... and on all nodes
969
    names, addrs = self._GetNodeIp()
970
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
971
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
972

    
973
  @staticmethod
974
  def _FormatJobID(job_id):
975
    """Convert a job ID to string format.
976

977
    Currently this just does C{str(job_id)} after performing some
978
    checks, but if we want to change the job id format this will
979
    abstract this change.
980

981
    @type job_id: int or long
982
    @param job_id: the numeric job id
983
    @rtype: str
984
    @return: the formatted job id
985

986
    """
987
    if not isinstance(job_id, (int, long)):
988
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
989
    if job_id < 0:
990
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
991

    
992
    return str(job_id)
993

    
994
  @classmethod
995
  def _GetArchiveDirectory(cls, job_id):
996
    """Returns the archive directory for a job.
997

998
    @type job_id: str
999
    @param job_id: Job identifier
1000
    @rtype: str
1001
    @return: Directory name
1002

1003
    """
1004
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1005

    
1006
  def _NewSerialsUnlocked(self, count):
1007
    """Generates a new job identifier.
1008

1009
    Job identifiers are unique during the lifetime of a cluster.
1010

1011
    @type count: integer
1012
    @param count: how many serials to return
1013
    @rtype: str
1014
    @return: a string representing the job identifier.
1015

1016
    """
1017
    assert count > 0
1018
    # New number
1019
    serial = self._last_serial + count
1020

    
1021
    # Write to file
1022
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1023
                             "%s\n" % serial, True)
1024

    
1025
    result = [self._FormatJobID(v)
1026
              for v in range(self._last_serial, serial + 1)]
1027
    # Keep it only if we were able to write the file
1028
    self._last_serial = serial
1029

    
1030
    return result
1031

    
1032
  @staticmethod
1033
  def _GetJobPath(job_id):
1034
    """Returns the job file for a given job id.
1035

1036
    @type job_id: str
1037
    @param job_id: the job identifier
1038
    @rtype: str
1039
    @return: the path to the job file
1040

1041
    """
1042
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1043

    
1044
  @classmethod
1045
  def _GetArchivedJobPath(cls, job_id):
1046
    """Returns the archived job file for a give job id.
1047

1048
    @type job_id: str
1049
    @param job_id: the job identifier
1050
    @rtype: str
1051
    @return: the path to the archived job file
1052

1053
    """
1054
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1055
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1056

    
1057
  def _GetJobIDsUnlocked(self, sort=True):
1058
    """Return all known job IDs.
1059

1060
    The method only looks at disk because it's a requirement that all
1061
    jobs are present on disk (so in the _memcache we don't have any
1062
    extra IDs).
1063

1064
    @type sort: boolean
1065
    @param sort: perform sorting on the returned job ids
1066
    @rtype: list
1067
    @return: the list of job IDs
1068

1069
    """
1070
    jlist = []
1071
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1072
      m = self._RE_JOB_FILE.match(filename)
1073
      if m:
1074
        jlist.append(m.group(1))
1075
    if sort:
1076
      jlist = utils.NiceSort(jlist)
1077
    return jlist
1078

    
1079
  def _LoadJobUnlocked(self, job_id):
1080
    """Loads a job from the disk or memory.
1081

1082
    Given a job id, this will return the cached job object if
1083
    existing, or try to load the job from the disk. If loading from
1084
    disk, it will also add the job to the cache.
1085

1086
    @param job_id: the job id
1087
    @rtype: L{_QueuedJob} or None
1088
    @return: either None or the job object
1089

1090
    """
1091
    job = self._memcache.get(job_id, None)
1092
    if job:
1093
      logging.debug("Found job %s in memcache", job_id)
1094
      return job
1095

    
1096
    try:
1097
      job = self._LoadJobFromDisk(job_id)
1098
    except errors.JobFileCorrupted:
1099
      old_path = self._GetJobPath(job_id)
1100
      new_path = self._GetArchivedJobPath(job_id)
1101
      if old_path == new_path:
1102
        # job already archived (future case)
1103
        logging.exception("Can't parse job %s", job_id)
1104
      else:
1105
        # non-archived case
1106
        logging.exception("Can't parse job %s, will archive.", job_id)
1107
        self._RenameFilesUnlocked([(old_path, new_path)])
1108
      return None
1109

    
1110
    self._memcache[job_id] = job
1111
    logging.debug("Added job %s to the cache", job_id)
1112
    return job
1113

    
1114
  def _LoadJobFromDisk(self, job_id):
1115
    """Load the given job file from disk.
1116

1117
    Given a job file, read, load and restore it in a _QueuedJob format.
1118

1119
    @type job_id: string
1120
    @param job_id: job identifier
1121
    @rtype: L{_QueuedJob} or None
1122
    @return: either None or the job object
1123

1124
    """
1125
    filepath = self._GetJobPath(job_id)
1126
    logging.debug("Loading job from %s", filepath)
1127
    try:
1128
      raw_data = utils.ReadFile(filepath)
1129
    except EnvironmentError, err:
1130
      if err.errno in (errno.ENOENT, ):
1131
        return None
1132
      raise
1133

    
1134
    try:
1135
      data = serializer.LoadJson(raw_data)
1136
      job = _QueuedJob.Restore(self, data)
1137
    except Exception, err: # pylint: disable-msg=W0703
1138
      raise errors.JobFileCorrupted(err)
1139

    
1140
    return job
1141

    
1142
  def SafeLoadJobFromDisk(self, job_id):
1143
    """Load the given job file from disk.
1144

1145
    Given a job file, read, load and restore it in a _QueuedJob format.
1146
    In case of error reading the job, it gets returned as None, and the
1147
    exception is logged.
1148

1149
    @type job_id: string
1150
    @param job_id: job identifier
1151
    @rtype: L{_QueuedJob} or None
1152
    @return: either None or the job object
1153

1154
    """
1155
    try:
1156
      return self._LoadJobFromDisk(job_id)
1157
    except (errors.JobFileCorrupted, EnvironmentError):
1158
      logging.exception("Can't load/parse job %s", job_id)
1159
      return None
1160

    
1161
  @staticmethod
1162
  def _IsQueueMarkedDrain():
1163
    """Check if the queue is marked from drain.
1164

1165
    This currently uses the queue drain file, which makes it a
1166
    per-node flag. In the future this can be moved to the config file.
1167

1168
    @rtype: boolean
1169
    @return: True of the job queue is marked for draining
1170

1171
    """
1172
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1173

    
1174
  def _UpdateQueueSizeUnlocked(self):
1175
    """Update the queue size.
1176

1177
    """
1178
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1179

    
1180
  @utils.LockedMethod
1181
  @_RequireOpenQueue
1182
  def SetDrainFlag(self, drain_flag):
1183
    """Sets the drain flag for the queue.
1184

1185
    @type drain_flag: boolean
1186
    @param drain_flag: Whether to set or unset the drain flag
1187

1188
    """
1189
    if drain_flag:
1190
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1191
    else:
1192
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1193

    
1194
    self._drained = drain_flag
1195

    
1196
    return True
1197

    
1198
  @_RequireOpenQueue
1199
  def _SubmitJobUnlocked(self, job_id, ops):
1200
    """Create and store a new job.
1201

1202
    This enters the job into our job queue and also puts it on the new
1203
    queue, in order for it to be picked up by the queue processors.
1204

1205
    @type job_id: job ID
1206
    @param job_id: the job ID for the new job
1207
    @type ops: list
1208
    @param ops: The list of OpCodes that will become the new job.
1209
    @rtype: L{_QueuedJob}
1210
    @return: the job object to be queued
1211
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1212
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1213

1214
    """
1215
    # Ok when sharing the big job queue lock, as the drain file is created when
1216
    # the lock is exclusive.
1217
    if self._drained:
1218
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1219

    
1220
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1221
      raise errors.JobQueueFull()
1222

    
1223
    job = _QueuedJob(self, job_id, ops)
1224

    
1225
    # Write to disk
1226
    self.UpdateJobUnlocked(job)
1227

    
1228
    self._queue_size += 1
1229

    
1230
    logging.debug("Adding new job %s to the cache", job_id)
1231
    self._memcache[job_id] = job
1232

    
1233
    return job
1234

    
1235
  @utils.LockedMethod
1236
  @_RequireOpenQueue
1237
  def SubmitJob(self, ops):
1238
    """Create and store a new job.
1239

1240
    @see: L{_SubmitJobUnlocked}
1241

1242
    """
1243
    job_id = self._NewSerialsUnlocked(1)[0]
1244
    self._wpool.AddTask(self._SubmitJobUnlocked(job_id, ops))
1245
    return job_id
1246

    
1247
  @utils.LockedMethod
1248
  @_RequireOpenQueue
1249
  def SubmitManyJobs(self, jobs):
1250
    """Create and store multiple jobs.
1251

1252
    @see: L{_SubmitJobUnlocked}
1253

1254
    """
1255
    results = []
1256
    tasks = []
1257
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1258
    for job_id, ops in zip(all_job_ids, jobs):
1259
      try:
1260
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1261
        status = True
1262
        data = job_id
1263
      except errors.GenericError, err:
1264
        data = str(err)
1265
        status = False
1266
      results.append((status, data))
1267
    self._wpool.AddManyTasks(tasks)
1268

    
1269
    return results
1270

    
1271
  @_RequireOpenQueue
1272
  def UpdateJobUnlocked(self, job, replicate=True):
1273
    """Update a job's on disk storage.
1274

1275
    After a job has been modified, this function needs to be called in
1276
    order to write the changes to disk and replicate them to the other
1277
    nodes.
1278

1279
    @type job: L{_QueuedJob}
1280
    @param job: the changed job
1281
    @type replicate: boolean
1282
    @param replicate: whether to replicate the change to remote nodes
1283

1284
    """
1285
    filename = self._GetJobPath(job.id)
1286
    data = serializer.DumpJson(job.Serialize(), indent=False)
1287
    logging.debug("Writing job %s to %s", job.id, filename)
1288
    self._UpdateJobQueueFile(filename, data, replicate)
1289

    
1290
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1291
                        timeout):
1292
    """Waits for changes in a job.
1293

1294
    @type job_id: string
1295
    @param job_id: Job identifier
1296
    @type fields: list of strings
1297
    @param fields: Which fields to check for changes
1298
    @type prev_job_info: list or None
1299
    @param prev_job_info: Last job information returned
1300
    @type prev_log_serial: int
1301
    @param prev_log_serial: Last job message serial number
1302
    @type timeout: float
1303
    @param timeout: maximum time to wait
1304
    @rtype: tuple (job info, log entries)
1305
    @return: a tuple of the job information as required via
1306
        the fields parameter, and the log entries as a list
1307

1308
        if the job has not changed and the timeout has expired,
1309
        we instead return a special value,
1310
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1311
        as such by the clients
1312

1313
    """
1314
    helper = _WaitForJobChangesHelper(job_id, fields, prev_job_info,
1315
                                      prev_log_serial, self)
1316
    try:
1317
      return helper.WaitForChanges(timeout)
1318
    finally:
1319
      helper.Close()
1320

    
1321
  @utils.LockedMethod
1322
  @_RequireOpenQueue
1323
  def CancelJob(self, job_id):
1324
    """Cancels a job.
1325

1326
    This will only succeed if the job has not started yet.
1327

1328
    @type job_id: string
1329
    @param job_id: job ID of job to be cancelled.
1330

1331
    """
1332
    logging.info("Cancelling job %s", job_id)
1333

    
1334
    job = self._LoadJobUnlocked(job_id)
1335
    if not job:
1336
      logging.debug("Job %s not found", job_id)
1337
      return (False, "Job %s not found" % job_id)
1338

    
1339
    job_status = job.CalcStatus()
1340

    
1341
    if job_status not in (constants.JOB_STATUS_QUEUED,
1342
                          constants.JOB_STATUS_WAITLOCK):
1343
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1344
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1345

    
1346
    if job_status == constants.JOB_STATUS_QUEUED:
1347
      self.CancelJobUnlocked(job)
1348
      return (True, "Job %s canceled" % job.id)
1349

    
1350
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1351
      # The worker will notice the new status and cancel the job
1352
      try:
1353
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1354
      finally:
1355
        self.UpdateJobUnlocked(job)
1356
      return (True, "Job %s will be canceled" % job.id)
1357

    
1358
  @_RequireOpenQueue
1359
  def CancelJobUnlocked(self, job):
1360
    """Marks a job as canceled.
1361

1362
    """
1363
    try:
1364
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1365
                            "Job canceled by request")
1366
    finally:
1367
      self.UpdateJobUnlocked(job)
1368

    
1369
  @_RequireOpenQueue
1370
  def _ArchiveJobsUnlocked(self, jobs):
1371
    """Archives jobs.
1372

1373
    @type jobs: list of L{_QueuedJob}
1374
    @param jobs: Job objects
1375
    @rtype: int
1376
    @return: Number of archived jobs
1377

1378
    """
1379
    archive_jobs = []
1380
    rename_files = []
1381
    for job in jobs:
1382
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1383
                                  constants.JOB_STATUS_SUCCESS,
1384
                                  constants.JOB_STATUS_ERROR):
1385
        logging.debug("Job %s is not yet done", job.id)
1386
        continue
1387

    
1388
      archive_jobs.append(job)
1389

    
1390
      old = self._GetJobPath(job.id)
1391
      new = self._GetArchivedJobPath(job.id)
1392
      rename_files.append((old, new))
1393

    
1394
    # TODO: What if 1..n files fail to rename?
1395
    self._RenameFilesUnlocked(rename_files)
1396

    
1397
    logging.debug("Successfully archived job(s) %s",
1398
                  utils.CommaJoin(job.id for job in archive_jobs))
1399

    
1400
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1401
    # the files, we update the cached queue size from the filesystem. When we
1402
    # get around to fix the TODO: above, we can use the number of actually
1403
    # archived jobs to fix this.
1404
    self._UpdateQueueSizeUnlocked()
1405
    return len(archive_jobs)
1406

    
1407
  @utils.LockedMethod
1408
  @_RequireOpenQueue
1409
  def ArchiveJob(self, job_id):
1410
    """Archives a job.
1411

1412
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1413

1414
    @type job_id: string
1415
    @param job_id: Job ID of job to be archived.
1416
    @rtype: bool
1417
    @return: Whether job was archived
1418

1419
    """
1420
    logging.info("Archiving job %s", job_id)
1421

    
1422
    job = self._LoadJobUnlocked(job_id)
1423
    if not job:
1424
      logging.debug("Job %s not found", job_id)
1425
      return False
1426

    
1427
    return self._ArchiveJobsUnlocked([job]) == 1
1428

    
1429
  @utils.LockedMethod
1430
  @_RequireOpenQueue
1431
  def AutoArchiveJobs(self, age, timeout):
1432
    """Archives all jobs based on age.
1433

1434
    The method will archive all jobs which are older than the age
1435
    parameter. For jobs that don't have an end timestamp, the start
1436
    timestamp will be considered. The special '-1' age will cause
1437
    archival of all jobs (that are not running or queued).
1438

1439
    @type age: int
1440
    @param age: the minimum age in seconds
1441

1442
    """
1443
    logging.info("Archiving jobs with age more than %s seconds", age)
1444

    
1445
    now = time.time()
1446
    end_time = now + timeout
1447
    archived_count = 0
1448
    last_touched = 0
1449

    
1450
    all_job_ids = self._GetJobIDsUnlocked()
1451
    pending = []
1452
    for idx, job_id in enumerate(all_job_ids):
1453
      last_touched = idx + 1
1454

    
1455
      # Not optimal because jobs could be pending
1456
      # TODO: Measure average duration for job archival and take number of
1457
      # pending jobs into account.
1458
      if time.time() > end_time:
1459
        break
1460

    
1461
      # Returns None if the job failed to load
1462
      job = self._LoadJobUnlocked(job_id)
1463
      if job:
1464
        if job.end_timestamp is None:
1465
          if job.start_timestamp is None:
1466
            job_age = job.received_timestamp
1467
          else:
1468
            job_age = job.start_timestamp
1469
        else:
1470
          job_age = job.end_timestamp
1471

    
1472
        if age == -1 or now - job_age[0] > age:
1473
          pending.append(job)
1474

    
1475
          # Archive 10 jobs at a time
1476
          if len(pending) >= 10:
1477
            archived_count += self._ArchiveJobsUnlocked(pending)
1478
            pending = []
1479

    
1480
    if pending:
1481
      archived_count += self._ArchiveJobsUnlocked(pending)
1482

    
1483
    return (archived_count, len(all_job_ids) - last_touched)
1484

    
1485
  def QueryJobs(self, job_ids, fields):
1486
    """Returns a list of jobs in queue.
1487

1488
    @type job_ids: list
1489
    @param job_ids: sequence of job identifiers or None for all
1490
    @type fields: list
1491
    @param fields: names of fields to return
1492
    @rtype: list
1493
    @return: list one element per job, each element being list with
1494
        the requested fields
1495

1496
    """
1497
    jobs = []
1498
    list_all = False
1499
    if not job_ids:
1500
      # Since files are added to/removed from the queue atomically, there's no
1501
      # risk of getting the job ids in an inconsistent state.
1502
      job_ids = self._GetJobIDsUnlocked()
1503
      list_all = True
1504

    
1505
    for job_id in job_ids:
1506
      job = self.SafeLoadJobFromDisk(job_id)
1507
      if job is not None:
1508
        jobs.append(job.GetInfo(fields))
1509
      elif not list_all:
1510
        jobs.append(None)
1511

    
1512
    return jobs
1513

    
1514
  @utils.LockedMethod
1515
  @_RequireOpenQueue
1516
  def Shutdown(self):
1517
    """Stops the job queue.
1518

1519
    This shutdowns all the worker threads an closes the queue.
1520

1521
    """
1522
    self._wpool.TerminateWorkers()
1523

    
1524
    self._queue_filelock.Close()
1525
    self._queue_filelock = None