Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 7f93570a

History | View | Annotate | Download (47.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import errno
35
import re
36
import time
37
import weakref
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import netutils
57
from ganeti import compat
58

    
59

    
60
JOBQUEUE_THREADS = 25
61
JOBS_PER_ARCHIVE_DIRECTORY = 10000
62

    
63
# member lock names to be passed to @ssynchronized decorator
64
_LOCK = "_lock"
65
_QUEUE = "_queue"
66

    
67

    
68
class CancelJob(Exception):
69
  """Special exception to cancel a job.
70

71
  """
72

    
73

    
74
def TimeStampNow():
75
  """Returns the current timestamp.
76

77
  @rtype: tuple
78
  @return: the current time in the (seconds, microseconds) format
79

80
  """
81
  return utils.SplitTime(time.time())
82

    
83

    
84
class _QueuedOpCode(object):
85
  """Encapsulates an opcode object.
86

87
  @ivar log: holds the execution log and consists of tuples
88
  of the form C{(log_serial, timestamp, level, message)}
89
  @ivar input: the OpCode we encapsulate
90
  @ivar status: the current status
91
  @ivar result: the result of the LU execution
92
  @ivar start_timestamp: timestamp for the start of the execution
93
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94
  @ivar stop_timestamp: timestamp for the end of the execution
95

96
  """
97
  __slots__ = ["input", "status", "result", "log",
98
               "start_timestamp", "exec_timestamp", "end_timestamp",
99
               "__weakref__"]
100

    
101
  def __init__(self, op):
102
    """Constructor for the _QuededOpCode.
103

104
    @type op: L{opcodes.OpCode}
105
    @param op: the opcode we encapsulate
106

107
    """
108
    self.input = op
109
    self.status = constants.OP_STATUS_QUEUED
110
    self.result = None
111
    self.log = []
112
    self.start_timestamp = None
113
    self.exec_timestamp = None
114
    self.end_timestamp = None
115

    
116
  @classmethod
117
  def Restore(cls, state):
118
    """Restore the _QueuedOpCode from the serialized form.
119

120
    @type state: dict
121
    @param state: the serialized state
122
    @rtype: _QueuedOpCode
123
    @return: a new _QueuedOpCode instance
124

125
    """
126
    obj = _QueuedOpCode.__new__(cls)
127
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
128
    obj.status = state["status"]
129
    obj.result = state["result"]
130
    obj.log = state["log"]
131
    obj.start_timestamp = state.get("start_timestamp", None)
132
    obj.exec_timestamp = state.get("exec_timestamp", None)
133
    obj.end_timestamp = state.get("end_timestamp", None)
134
    return obj
135

    
136
  def Serialize(self):
137
    """Serializes this _QueuedOpCode.
138

139
    @rtype: dict
140
    @return: the dictionary holding the serialized state
141

142
    """
143
    return {
144
      "input": self.input.__getstate__(),
145
      "status": self.status,
146
      "result": self.result,
147
      "log": self.log,
148
      "start_timestamp": self.start_timestamp,
149
      "exec_timestamp": self.exec_timestamp,
150
      "end_timestamp": self.end_timestamp,
151
      }
152

    
153

    
154
class _QueuedJob(object):
155
  """In-memory job representation.
156

157
  This is what we use to track the user-submitted jobs. Locking must
158
  be taken care of by users of this class.
159

160
  @type queue: L{JobQueue}
161
  @ivar queue: the parent queue
162
  @ivar id: the job ID
163
  @type ops: list
164
  @ivar ops: the list of _QueuedOpCode that constitute the job
165
  @type log_serial: int
166
  @ivar log_serial: holds the index for the next log entry
167
  @ivar received_timestamp: the timestamp for when the job was received
168
  @ivar start_timestmap: the timestamp for start of execution
169
  @ivar end_timestamp: the timestamp for end of execution
170
  @ivar lock_status: In-memory locking information for debugging
171

172
  """
173
  # pylint: disable-msg=W0212
174
  __slots__ = ["queue", "id", "ops", "log_serial",
175
               "received_timestamp", "start_timestamp", "end_timestamp",
176
               "lock_status", "change",
177
               "__weakref__"]
178

    
179
  def __init__(self, queue, job_id, ops):
180
    """Constructor for the _QueuedJob.
181

182
    @type queue: L{JobQueue}
183
    @param queue: our parent queue
184
    @type job_id: job_id
185
    @param job_id: our job id
186
    @type ops: list
187
    @param ops: the list of opcodes we hold, which will be encapsulated
188
        in _QueuedOpCodes
189

190
    """
191
    if not ops:
192
      raise errors.GenericError("A job needs at least one opcode")
193

    
194
    self.queue = queue
195
    self.id = job_id
196
    self.ops = [_QueuedOpCode(op) for op in ops]
197
    self.log_serial = 0
198
    self.received_timestamp = TimeStampNow()
199
    self.start_timestamp = None
200
    self.end_timestamp = None
201

    
202
    # In-memory attributes
203
    self.lock_status = None
204

    
205
  def __repr__(self):
206
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
207
              "id=%s" % self.id,
208
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
209

    
210
    return "<%s at %#x>" % (" ".join(status), id(self))
211

    
212
  @classmethod
213
  def Restore(cls, queue, state):
214
    """Restore a _QueuedJob from serialized state:
215

216
    @type queue: L{JobQueue}
217
    @param queue: to which queue the restored job belongs
218
    @type state: dict
219
    @param state: the serialized state
220
    @rtype: _JobQueue
221
    @return: the restored _JobQueue instance
222

223
    """
224
    obj = _QueuedJob.__new__(cls)
225
    obj.queue = queue
226
    obj.id = state["id"]
227
    obj.received_timestamp = state.get("received_timestamp", None)
228
    obj.start_timestamp = state.get("start_timestamp", None)
229
    obj.end_timestamp = state.get("end_timestamp", None)
230

    
231
    # In-memory attributes
232
    obj.lock_status = None
233

    
234
    obj.ops = []
235
    obj.log_serial = 0
236
    for op_state in state["ops"]:
237
      op = _QueuedOpCode.Restore(op_state)
238
      for log_entry in op.log:
239
        obj.log_serial = max(obj.log_serial, log_entry[0])
240
      obj.ops.append(op)
241

    
242
    return obj
243

    
244
  def Serialize(self):
245
    """Serialize the _JobQueue instance.
246

247
    @rtype: dict
248
    @return: the serialized state
249

250
    """
251
    return {
252
      "id": self.id,
253
      "ops": [op.Serialize() for op in self.ops],
254
      "start_timestamp": self.start_timestamp,
255
      "end_timestamp": self.end_timestamp,
256
      "received_timestamp": self.received_timestamp,
257
      }
258

    
259
  def CalcStatus(self):
260
    """Compute the status of this job.
261

262
    This function iterates over all the _QueuedOpCodes in the job and
263
    based on their status, computes the job status.
264

265
    The algorithm is:
266
      - if we find a cancelled, or finished with error, the job
267
        status will be the same
268
      - otherwise, the last opcode with the status one of:
269
          - waitlock
270
          - canceling
271
          - running
272

273
        will determine the job status
274

275
      - otherwise, it means either all opcodes are queued, or success,
276
        and the job status will be the same
277

278
    @return: the job status
279

280
    """
281
    status = constants.JOB_STATUS_QUEUED
282

    
283
    all_success = True
284
    for op in self.ops:
285
      if op.status == constants.OP_STATUS_SUCCESS:
286
        continue
287

    
288
      all_success = False
289

    
290
      if op.status == constants.OP_STATUS_QUEUED:
291
        pass
292
      elif op.status == constants.OP_STATUS_WAITLOCK:
293
        status = constants.JOB_STATUS_WAITLOCK
294
      elif op.status == constants.OP_STATUS_RUNNING:
295
        status = constants.JOB_STATUS_RUNNING
296
      elif op.status == constants.OP_STATUS_CANCELING:
297
        status = constants.JOB_STATUS_CANCELING
298
        break
299
      elif op.status == constants.OP_STATUS_ERROR:
300
        status = constants.JOB_STATUS_ERROR
301
        # The whole job fails if one opcode failed
302
        break
303
      elif op.status == constants.OP_STATUS_CANCELED:
304
        status = constants.OP_STATUS_CANCELED
305
        break
306

    
307
    if all_success:
308
      status = constants.JOB_STATUS_SUCCESS
309

    
310
    return status
311

    
312
  def GetLogEntries(self, newer_than):
313
    """Selectively returns the log entries.
314

315
    @type newer_than: None or int
316
    @param newer_than: if this is None, return all log entries,
317
        otherwise return only the log entries with serial higher
318
        than this value
319
    @rtype: list
320
    @return: the list of the log entries selected
321

322
    """
323
    if newer_than is None:
324
      serial = -1
325
    else:
326
      serial = newer_than
327

    
328
    entries = []
329
    for op in self.ops:
330
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
331

    
332
    return entries
333

    
334
  def GetInfo(self, fields):
335
    """Returns information about a job.
336

337
    @type fields: list
338
    @param fields: names of fields to return
339
    @rtype: list
340
    @return: list with one element for each field
341
    @raise errors.OpExecError: when an invalid field
342
        has been passed
343

344
    """
345
    row = []
346
    for fname in fields:
347
      if fname == "id":
348
        row.append(self.id)
349
      elif fname == "status":
350
        row.append(self.CalcStatus())
351
      elif fname == "ops":
352
        row.append([op.input.__getstate__() for op in self.ops])
353
      elif fname == "opresult":
354
        row.append([op.result for op in self.ops])
355
      elif fname == "opstatus":
356
        row.append([op.status for op in self.ops])
357
      elif fname == "oplog":
358
        row.append([op.log for op in self.ops])
359
      elif fname == "opstart":
360
        row.append([op.start_timestamp for op in self.ops])
361
      elif fname == "opexec":
362
        row.append([op.exec_timestamp for op in self.ops])
363
      elif fname == "opend":
364
        row.append([op.end_timestamp for op in self.ops])
365
      elif fname == "received_ts":
366
        row.append(self.received_timestamp)
367
      elif fname == "start_ts":
368
        row.append(self.start_timestamp)
369
      elif fname == "end_ts":
370
        row.append(self.end_timestamp)
371
      elif fname == "lock_status":
372
        row.append(self.lock_status)
373
      elif fname == "summary":
374
        row.append([op.input.Summary() for op in self.ops])
375
      else:
376
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
377
    return row
378

    
379
  def MarkUnfinishedOps(self, status, result):
380
    """Mark unfinished opcodes with a given status and result.
381

382
    This is an utility function for marking all running or waiting to
383
    be run opcodes with a given status. Opcodes which are already
384
    finalised are not changed.
385

386
    @param status: a given opcode status
387
    @param result: the opcode result
388

389
    """
390
    try:
391
      not_marked = True
392
      for op in self.ops:
393
        if op.status in constants.OPS_FINALIZED:
394
          assert not_marked, "Finalized opcodes found after non-finalized ones"
395
          continue
396
        op.status = status
397
        op.result = result
398
        not_marked = False
399
    finally:
400
      self.queue.UpdateJobUnlocked(self)
401

    
402

    
403
class _OpExecCallbacks(mcpu.OpExecCbBase):
404
  def __init__(self, queue, job, op):
405
    """Initializes this class.
406

407
    @type queue: L{JobQueue}
408
    @param queue: Job queue
409
    @type job: L{_QueuedJob}
410
    @param job: Job object
411
    @type op: L{_QueuedOpCode}
412
    @param op: OpCode
413

414
    """
415
    assert queue, "Queue is missing"
416
    assert job, "Job is missing"
417
    assert op, "Opcode is missing"
418

    
419
    self._queue = queue
420
    self._job = job
421
    self._op = op
422

    
423
  @locking.ssynchronized(_QUEUE, shared=1)
424
  def NotifyStart(self):
425
    """Mark the opcode as running, not lock-waiting.
426

427
    This is called from the mcpu code as a notifier function, when the LU is
428
    finally about to start the Exec() method. Of course, to have end-user
429
    visible results, the opcode must be initially (before calling into
430
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
431

432
    """
433
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
434
                               constants.OP_STATUS_CANCELING)
435

    
436
    # All locks are acquired by now
437
    self._job.lock_status = None
438

    
439
    # Cancel here if we were asked to
440
    if self._op.status == constants.OP_STATUS_CANCELING:
441
      raise CancelJob()
442

    
443
    self._op.status = constants.OP_STATUS_RUNNING
444
    self._op.exec_timestamp = TimeStampNow()
445

    
446
    # And finally replicate the job status
447
    self._queue.UpdateJobUnlocked(self._job)
448

    
449
  @locking.ssynchronized(_QUEUE, shared=1)
450
  def _AppendFeedback(self, timestamp, log_type, log_msg):
451
    """Internal feedback append function, with locks
452

453
    """
454
    self._job.log_serial += 1
455
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
456
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
457

    
458
  def Feedback(self, *args):
459
    """Append a log entry.
460

461
    """
462
    assert len(args) < 3
463

    
464
    if len(args) == 1:
465
      log_type = constants.ELOG_MESSAGE
466
      log_msg = args[0]
467
    else:
468
      (log_type, log_msg) = args
469

    
470
    # The time is split to make serialization easier and not lose
471
    # precision.
472
    timestamp = utils.SplitTime(time.time())
473
    self._AppendFeedback(timestamp, log_type, log_msg)
474

    
475
  def ReportLocks(self, msg):
476
    """Write locking information to the job.
477

478
    Called whenever the LU processor is waiting for a lock or has acquired one.
479

480
    """
481
    # Not getting the queue lock because this is a single assignment
482
    self._job.lock_status = msg
483

    
484

    
485
class _JobChangesChecker(object):
486
  def __init__(self, fields, prev_job_info, prev_log_serial):
487
    """Initializes this class.
488

489
    @type fields: list of strings
490
    @param fields: Fields requested by LUXI client
491
    @type prev_job_info: string
492
    @param prev_job_info: previous job info, as passed by the LUXI client
493
    @type prev_log_serial: string
494
    @param prev_log_serial: previous job serial, as passed by the LUXI client
495

496
    """
497
    self._fields = fields
498
    self._prev_job_info = prev_job_info
499
    self._prev_log_serial = prev_log_serial
500

    
501
  def __call__(self, job):
502
    """Checks whether job has changed.
503

504
    @type job: L{_QueuedJob}
505
    @param job: Job object
506

507
    """
508
    status = job.CalcStatus()
509
    job_info = job.GetInfo(self._fields)
510
    log_entries = job.GetLogEntries(self._prev_log_serial)
511

    
512
    # Serializing and deserializing data can cause type changes (e.g. from
513
    # tuple to list) or precision loss. We're doing it here so that we get
514
    # the same modifications as the data received from the client. Without
515
    # this, the comparison afterwards might fail without the data being
516
    # significantly different.
517
    # TODO: we just deserialized from disk, investigate how to make sure that
518
    # the job info and log entries are compatible to avoid this further step.
519
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
520
    # efficient, though floats will be tricky
521
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
522
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
523

    
524
    # Don't even try to wait if the job is no longer running, there will be
525
    # no changes.
526
    if (status not in (constants.JOB_STATUS_QUEUED,
527
                       constants.JOB_STATUS_RUNNING,
528
                       constants.JOB_STATUS_WAITLOCK) or
529
        job_info != self._prev_job_info or
530
        (log_entries and self._prev_log_serial != log_entries[0][0])):
531
      logging.debug("Job %s changed", job.id)
532
      return (job_info, log_entries)
533

    
534
    return None
535

    
536

    
537
class _JobFileChangesWaiter(object):
538
  def __init__(self, filename):
539
    """Initializes this class.
540

541
    @type filename: string
542
    @param filename: Path to job file
543
    @raises errors.InotifyError: if the notifier cannot be setup
544

545
    """
546
    self._wm = pyinotify.WatchManager()
547
    self._inotify_handler = \
548
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
549
    self._notifier = \
550
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
551
    try:
552
      self._inotify_handler.enable()
553
    except Exception:
554
      # pyinotify doesn't close file descriptors automatically
555
      self._notifier.stop()
556
      raise
557

    
558
  def _OnInotify(self, notifier_enabled):
559
    """Callback for inotify.
560

561
    """
562
    if not notifier_enabled:
563
      self._inotify_handler.enable()
564

    
565
  def Wait(self, timeout):
566
    """Waits for the job file to change.
567

568
    @type timeout: float
569
    @param timeout: Timeout in seconds
570
    @return: Whether there have been events
571

572
    """
573
    assert timeout >= 0
574
    have_events = self._notifier.check_events(timeout * 1000)
575
    if have_events:
576
      self._notifier.read_events()
577
    self._notifier.process_events()
578
    return have_events
579

    
580
  def Close(self):
581
    """Closes underlying notifier and its file descriptor.
582

583
    """
584
    self._notifier.stop()
585

    
586

    
587
class _JobChangesWaiter(object):
588
  def __init__(self, filename):
589
    """Initializes this class.
590

591
    @type filename: string
592
    @param filename: Path to job file
593

594
    """
595
    self._filewaiter = None
596
    self._filename = filename
597

    
598
  def Wait(self, timeout):
599
    """Waits for a job to change.
600

601
    @type timeout: float
602
    @param timeout: Timeout in seconds
603
    @return: Whether there have been events
604

605
    """
606
    if self._filewaiter:
607
      return self._filewaiter.Wait(timeout)
608

    
609
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
610
    # If this point is reached, return immediately and let caller check the job
611
    # file again in case there were changes since the last check. This avoids a
612
    # race condition.
613
    self._filewaiter = _JobFileChangesWaiter(self._filename)
614

    
615
    return True
616

    
617
  def Close(self):
618
    """Closes underlying waiter.
619

620
    """
621
    if self._filewaiter:
622
      self._filewaiter.Close()
623

    
624

    
625
class _WaitForJobChangesHelper(object):
626
  """Helper class using inotify to wait for changes in a job file.
627

628
  This class takes a previous job status and serial, and alerts the client when
629
  the current job status has changed.
630

631
  """
632
  @staticmethod
633
  def _CheckForChanges(job_load_fn, check_fn):
634
    job = job_load_fn()
635
    if not job:
636
      raise errors.JobLost()
637

    
638
    result = check_fn(job)
639
    if result is None:
640
      raise utils.RetryAgain()
641

    
642
    return result
643

    
644
  def __call__(self, filename, job_load_fn,
645
               fields, prev_job_info, prev_log_serial, timeout):
646
    """Waits for changes on a job.
647

648
    @type filename: string
649
    @param filename: File on which to wait for changes
650
    @type job_load_fn: callable
651
    @param job_load_fn: Function to load job
652
    @type fields: list of strings
653
    @param fields: Which fields to check for changes
654
    @type prev_job_info: list or None
655
    @param prev_job_info: Last job information returned
656
    @type prev_log_serial: int
657
    @param prev_log_serial: Last job message serial number
658
    @type timeout: float
659
    @param timeout: maximum time to wait in seconds
660

661
    """
662
    try:
663
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
664
      waiter = _JobChangesWaiter(filename)
665
      try:
666
        return utils.Retry(compat.partial(self._CheckForChanges,
667
                                          job_load_fn, check_fn),
668
                           utils.RETRY_REMAINING_TIME, timeout,
669
                           wait_fn=waiter.Wait)
670
      finally:
671
        waiter.Close()
672
    except (errors.InotifyError, errors.JobLost):
673
      return None
674
    except utils.RetryTimeout:
675
      return constants.JOB_NOTCHANGED
676

    
677

    
678
class _JobQueueWorker(workerpool.BaseWorker):
679
  """The actual job workers.
680

681
  """
682
  def RunTask(self, job): # pylint: disable-msg=W0221
683
    """Job executor.
684

685
    This functions processes a job. It is closely tied to the _QueuedJob and
686
    _QueuedOpCode classes.
687

688
    @type job: L{_QueuedJob}
689
    @param job: the job to be processed
690

691
    """
692
    logging.info("Processing job %s", job.id)
693
    proc = mcpu.Processor(self.pool.queue.context, job.id)
694
    queue = job.queue
695
    try:
696
      try:
697
        count = len(job.ops)
698
        for idx, op in enumerate(job.ops):
699
          op_summary = op.input.Summary()
700
          if op.status == constants.OP_STATUS_SUCCESS:
701
            # this is a job that was partially completed before master
702
            # daemon shutdown, so it can be expected that some opcodes
703
            # are already completed successfully (if any did error
704
            # out, then the whole job should have been aborted and not
705
            # resubmitted for processing)
706
            logging.info("Op %s/%s: opcode %s already processed, skipping",
707
                         idx + 1, count, op_summary)
708
            continue
709
          try:
710
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
711
                         op_summary)
712

    
713
            queue.acquire(shared=1)
714
            try:
715
              if op.status == constants.OP_STATUS_CANCELED:
716
                raise CancelJob()
717
              assert op.status == constants.OP_STATUS_QUEUED
718
              op.status = constants.OP_STATUS_WAITLOCK
719
              op.result = None
720
              op.start_timestamp = TimeStampNow()
721
              if idx == 0: # first opcode
722
                job.start_timestamp = op.start_timestamp
723
              queue.UpdateJobUnlocked(job)
724

    
725
              input_opcode = op.input
726
            finally:
727
              queue.release()
728

    
729
            # Make sure not to hold queue lock while calling ExecOpCode
730
            result = proc.ExecOpCode(input_opcode,
731
                                     _OpExecCallbacks(queue, job, op))
732

    
733
            queue.acquire(shared=1)
734
            try:
735
              op.status = constants.OP_STATUS_SUCCESS
736
              op.result = result
737
              op.end_timestamp = TimeStampNow()
738
              queue.UpdateJobUnlocked(job)
739
            finally:
740
              queue.release()
741

    
742
            logging.info("Op %s/%s: Successfully finished opcode %s",
743
                         idx + 1, count, op_summary)
744
          except CancelJob:
745
            # Will be handled further up
746
            raise
747
          except Exception, err:
748
            queue.acquire(shared=1)
749
            try:
750
              try:
751
                op.status = constants.OP_STATUS_ERROR
752
                if isinstance(err, errors.GenericError):
753
                  op.result = errors.EncodeException(err)
754
                else:
755
                  op.result = str(err)
756
                op.end_timestamp = TimeStampNow()
757
                logging.info("Op %s/%s: Error in opcode %s: %s",
758
                             idx + 1, count, op_summary, err)
759
              finally:
760
                queue.UpdateJobUnlocked(job)
761
            finally:
762
              queue.release()
763
            raise
764

    
765
      except CancelJob:
766
        queue.acquire(shared=1)
767
        try:
768
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
769
                                "Job canceled by request")
770
        finally:
771
          queue.release()
772
      except errors.GenericError, err:
773
        logging.exception("Ganeti exception")
774
      except:
775
        logging.exception("Unhandled exception")
776
    finally:
777
      queue.acquire(shared=1)
778
      try:
779
        try:
780
          job.lock_status = None
781
          job.end_timestamp = TimeStampNow()
782
          queue.UpdateJobUnlocked(job)
783
        finally:
784
          job_id = job.id
785
          status = job.CalcStatus()
786
      finally:
787
        queue.release()
788

    
789
      logging.info("Finished job %s, status = %s", job_id, status)
790

    
791

    
792
class _JobQueueWorkerPool(workerpool.WorkerPool):
793
  """Simple class implementing a job-processing workerpool.
794

795
  """
796
  def __init__(self, queue):
797
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
798
                                              JOBQUEUE_THREADS,
799
                                              _JobQueueWorker)
800
    self.queue = queue
801

    
802

    
803
def _RequireOpenQueue(fn):
804
  """Decorator for "public" functions.
805

806
  This function should be used for all 'public' functions. That is,
807
  functions usually called from other classes. Note that this should
808
  be applied only to methods (not plain functions), since it expects
809
  that the decorated function is called with a first argument that has
810
  a '_queue_filelock' argument.
811

812
  @warning: Use this decorator only after locking.ssynchronized
813

814
  Example::
815
    @locking.ssynchronized(_LOCK)
816
    @_RequireOpenQueue
817
    def Example(self):
818
      pass
819

820
  """
821
  def wrapper(self, *args, **kwargs):
822
    # pylint: disable-msg=W0212
823
    assert self._queue_filelock is not None, "Queue should be open"
824
    return fn(self, *args, **kwargs)
825
  return wrapper
826

    
827

    
828
class JobQueue(object):
829
  """Queue used to manage the jobs.
830

831
  @cvar _RE_JOB_FILE: regex matching the valid job file names
832

833
  """
834
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
835

    
836
  def __init__(self, context):
837
    """Constructor for JobQueue.
838

839
    The constructor will initialize the job queue object and then
840
    start loading the current jobs from disk, either for starting them
841
    (if they were queue) or for aborting them (if they were already
842
    running).
843

844
    @type context: GanetiContext
845
    @param context: the context object for access to the configuration
846
        data and other ganeti objects
847

848
    """
849
    self.context = context
850
    self._memcache = weakref.WeakValueDictionary()
851
    self._my_hostname = netutils.HostInfo().name
852

    
853
    # The Big JobQueue lock. If a code block or method acquires it in shared
854
    # mode safe it must guarantee concurrency with all the code acquiring it in
855
    # shared mode, including itself. In order not to acquire it at all
856
    # concurrency must be guaranteed with all code acquiring it in shared mode
857
    # and all code acquiring it exclusively.
858
    self._lock = locking.SharedLock("JobQueue")
859

    
860
    self.acquire = self._lock.acquire
861
    self.release = self._lock.release
862

    
863
    # Initialize the queue, and acquire the filelock.
864
    # This ensures no other process is working on the job queue.
865
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
866

    
867
    # Read serial file
868
    self._last_serial = jstore.ReadSerial()
869
    assert self._last_serial is not None, ("Serial file was modified between"
870
                                           " check in jstore and here")
871

    
872
    # Get initial list of nodes
873
    self._nodes = dict((n.name, n.primary_ip)
874
                       for n in self.context.cfg.GetAllNodesInfo().values()
875
                       if n.master_candidate)
876

    
877
    # Remove master node
878
    self._nodes.pop(self._my_hostname, None)
879

    
880
    # TODO: Check consistency across nodes
881

    
882
    self._queue_size = 0
883
    self._UpdateQueueSizeUnlocked()
884
    self._drained = self._IsQueueMarkedDrain()
885

    
886
    # Setup worker pool
887
    self._wpool = _JobQueueWorkerPool(self)
888
    try:
889
      # We need to lock here because WorkerPool.AddTask() may start a job while
890
      # we're still doing our work.
891
      self.acquire()
892
      try:
893
        logging.info("Inspecting job queue")
894

    
895
        all_job_ids = self._GetJobIDsUnlocked()
896
        jobs_count = len(all_job_ids)
897
        lastinfo = time.time()
898
        for idx, job_id in enumerate(all_job_ids):
899
          # Give an update every 1000 jobs or 10 seconds
900
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
901
              idx == (jobs_count - 1)):
902
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
903
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
904
            lastinfo = time.time()
905

    
906
          job = self._LoadJobUnlocked(job_id)
907

    
908
          # a failure in loading the job can cause 'None' to be returned
909
          if job is None:
910
            continue
911

    
912
          status = job.CalcStatus()
913

    
914
          if status in (constants.JOB_STATUS_QUEUED, ):
915
            self._wpool.AddTask(job)
916

    
917
          elif status in (constants.JOB_STATUS_RUNNING,
918
                          constants.JOB_STATUS_WAITLOCK,
919
                          constants.JOB_STATUS_CANCELING):
920
            logging.warning("Unfinished job %s found: %s", job.id, job)
921
            job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
922
                                  "Unclean master daemon shutdown")
923

    
924
        logging.info("Job queue inspection finished")
925
      finally:
926
        self.release()
927
    except:
928
      self._wpool.TerminateWorkers()
929
      raise
930

    
931
  @locking.ssynchronized(_LOCK)
932
  @_RequireOpenQueue
933
  def AddNode(self, node):
934
    """Register a new node with the queue.
935

936
    @type node: L{objects.Node}
937
    @param node: the node object to be added
938

939
    """
940
    node_name = node.name
941
    assert node_name != self._my_hostname
942

    
943
    # Clean queue directory on added node
944
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
945
    msg = result.fail_msg
946
    if msg:
947
      logging.warning("Cannot cleanup queue directory on node %s: %s",
948
                      node_name, msg)
949

    
950
    if not node.master_candidate:
951
      # remove if existing, ignoring errors
952
      self._nodes.pop(node_name, None)
953
      # and skip the replication of the job ids
954
      return
955

    
956
    # Upload the whole queue excluding archived jobs
957
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
958

    
959
    # Upload current serial file
960
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
961

    
962
    for file_name in files:
963
      # Read file content
964
      content = utils.ReadFile(file_name)
965

    
966
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
967
                                                  [node.primary_ip],
968
                                                  file_name, content)
969
      msg = result[node_name].fail_msg
970
      if msg:
971
        logging.error("Failed to upload file %s to node %s: %s",
972
                      file_name, node_name, msg)
973

    
974
    self._nodes[node_name] = node.primary_ip
975

    
976
  @locking.ssynchronized(_LOCK)
977
  @_RequireOpenQueue
978
  def RemoveNode(self, node_name):
979
    """Callback called when removing nodes from the cluster.
980

981
    @type node_name: str
982
    @param node_name: the name of the node to remove
983

984
    """
985
    self._nodes.pop(node_name, None)
986

    
987
  @staticmethod
988
  def _CheckRpcResult(result, nodes, failmsg):
989
    """Verifies the status of an RPC call.
990

991
    Since we aim to keep consistency should this node (the current
992
    master) fail, we will log errors if our rpc fail, and especially
993
    log the case when more than half of the nodes fails.
994

995
    @param result: the data as returned from the rpc call
996
    @type nodes: list
997
    @param nodes: the list of nodes we made the call to
998
    @type failmsg: str
999
    @param failmsg: the identifier to be used for logging
1000

1001
    """
1002
    failed = []
1003
    success = []
1004

    
1005
    for node in nodes:
1006
      msg = result[node].fail_msg
1007
      if msg:
1008
        failed.append(node)
1009
        logging.error("RPC call %s (%s) failed on node %s: %s",
1010
                      result[node].call, failmsg, node, msg)
1011
      else:
1012
        success.append(node)
1013

    
1014
    # +1 for the master node
1015
    if (len(success) + 1) < len(failed):
1016
      # TODO: Handle failing nodes
1017
      logging.error("More than half of the nodes failed")
1018

    
1019
  def _GetNodeIp(self):
1020
    """Helper for returning the node name/ip list.
1021

1022
    @rtype: (list, list)
1023
    @return: a tuple of two lists, the first one with the node
1024
        names and the second one with the node addresses
1025

1026
    """
1027
    name_list = self._nodes.keys()
1028
    addr_list = [self._nodes[name] for name in name_list]
1029
    return name_list, addr_list
1030

    
1031
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1032
    """Writes a file locally and then replicates it to all nodes.
1033

1034
    This function will replace the contents of a file on the local
1035
    node and then replicate it to all the other nodes we have.
1036

1037
    @type file_name: str
1038
    @param file_name: the path of the file to be replicated
1039
    @type data: str
1040
    @param data: the new contents of the file
1041
    @type replicate: boolean
1042
    @param replicate: whether to spread the changes to the remote nodes
1043

1044
    """
1045
    utils.WriteFile(file_name, data=data)
1046

    
1047
    if replicate:
1048
      names, addrs = self._GetNodeIp()
1049
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1050
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1051

    
1052
  def _RenameFilesUnlocked(self, rename):
1053
    """Renames a file locally and then replicate the change.
1054

1055
    This function will rename a file in the local queue directory
1056
    and then replicate this rename to all the other nodes we have.
1057

1058
    @type rename: list of (old, new)
1059
    @param rename: List containing tuples mapping old to new names
1060

1061
    """
1062
    # Rename them locally
1063
    for old, new in rename:
1064
      utils.RenameFile(old, new, mkdir=True)
1065

    
1066
    # ... and on all nodes
1067
    names, addrs = self._GetNodeIp()
1068
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1069
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1070

    
1071
  @staticmethod
1072
  def _FormatJobID(job_id):
1073
    """Convert a job ID to string format.
1074

1075
    Currently this just does C{str(job_id)} after performing some
1076
    checks, but if we want to change the job id format this will
1077
    abstract this change.
1078

1079
    @type job_id: int or long
1080
    @param job_id: the numeric job id
1081
    @rtype: str
1082
    @return: the formatted job id
1083

1084
    """
1085
    if not isinstance(job_id, (int, long)):
1086
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1087
    if job_id < 0:
1088
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1089

    
1090
    return str(job_id)
1091

    
1092
  @classmethod
1093
  def _GetArchiveDirectory(cls, job_id):
1094
    """Returns the archive directory for a job.
1095

1096
    @type job_id: str
1097
    @param job_id: Job identifier
1098
    @rtype: str
1099
    @return: Directory name
1100

1101
    """
1102
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1103

    
1104
  def _NewSerialsUnlocked(self, count):
1105
    """Generates a new job identifier.
1106

1107
    Job identifiers are unique during the lifetime of a cluster.
1108

1109
    @type count: integer
1110
    @param count: how many serials to return
1111
    @rtype: str
1112
    @return: a string representing the job identifier.
1113

1114
    """
1115
    assert count > 0
1116
    # New number
1117
    serial = self._last_serial + count
1118

    
1119
    # Write to file
1120
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1121
                             "%s\n" % serial, True)
1122

    
1123
    result = [self._FormatJobID(v)
1124
              for v in range(self._last_serial, serial + 1)]
1125
    # Keep it only if we were able to write the file
1126
    self._last_serial = serial
1127

    
1128
    return result
1129

    
1130
  @staticmethod
1131
  def _GetJobPath(job_id):
1132
    """Returns the job file for a given job id.
1133

1134
    @type job_id: str
1135
    @param job_id: the job identifier
1136
    @rtype: str
1137
    @return: the path to the job file
1138

1139
    """
1140
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1141

    
1142
  @classmethod
1143
  def _GetArchivedJobPath(cls, job_id):
1144
    """Returns the archived job file for a give job id.
1145

1146
    @type job_id: str
1147
    @param job_id: the job identifier
1148
    @rtype: str
1149
    @return: the path to the archived job file
1150

1151
    """
1152
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1153
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1154

    
1155
  def _GetJobIDsUnlocked(self, sort=True):
1156
    """Return all known job IDs.
1157

1158
    The method only looks at disk because it's a requirement that all
1159
    jobs are present on disk (so in the _memcache we don't have any
1160
    extra IDs).
1161

1162
    @type sort: boolean
1163
    @param sort: perform sorting on the returned job ids
1164
    @rtype: list
1165
    @return: the list of job IDs
1166

1167
    """
1168
    jlist = []
1169
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1170
      m = self._RE_JOB_FILE.match(filename)
1171
      if m:
1172
        jlist.append(m.group(1))
1173
    if sort:
1174
      jlist = utils.NiceSort(jlist)
1175
    return jlist
1176

    
1177
  def _LoadJobUnlocked(self, job_id):
1178
    """Loads a job from the disk or memory.
1179

1180
    Given a job id, this will return the cached job object if
1181
    existing, or try to load the job from the disk. If loading from
1182
    disk, it will also add the job to the cache.
1183

1184
    @param job_id: the job id
1185
    @rtype: L{_QueuedJob} or None
1186
    @return: either None or the job object
1187

1188
    """
1189
    job = self._memcache.get(job_id, None)
1190
    if job:
1191
      logging.debug("Found job %s in memcache", job_id)
1192
      return job
1193

    
1194
    try:
1195
      job = self._LoadJobFromDisk(job_id)
1196
    except errors.JobFileCorrupted:
1197
      old_path = self._GetJobPath(job_id)
1198
      new_path = self._GetArchivedJobPath(job_id)
1199
      if old_path == new_path:
1200
        # job already archived (future case)
1201
        logging.exception("Can't parse job %s", job_id)
1202
      else:
1203
        # non-archived case
1204
        logging.exception("Can't parse job %s, will archive.", job_id)
1205
        self._RenameFilesUnlocked([(old_path, new_path)])
1206
      return None
1207

    
1208
    self._memcache[job_id] = job
1209
    logging.debug("Added job %s to the cache", job_id)
1210
    return job
1211

    
1212
  def _LoadJobFromDisk(self, job_id):
1213
    """Load the given job file from disk.
1214

1215
    Given a job file, read, load and restore it in a _QueuedJob format.
1216

1217
    @type job_id: string
1218
    @param job_id: job identifier
1219
    @rtype: L{_QueuedJob} or None
1220
    @return: either None or the job object
1221

1222
    """
1223
    filepath = self._GetJobPath(job_id)
1224
    logging.debug("Loading job from %s", filepath)
1225
    try:
1226
      raw_data = utils.ReadFile(filepath)
1227
    except EnvironmentError, err:
1228
      if err.errno in (errno.ENOENT, ):
1229
        return None
1230
      raise
1231

    
1232
    try:
1233
      data = serializer.LoadJson(raw_data)
1234
      job = _QueuedJob.Restore(self, data)
1235
    except Exception, err: # pylint: disable-msg=W0703
1236
      raise errors.JobFileCorrupted(err)
1237

    
1238
    return job
1239

    
1240
  def SafeLoadJobFromDisk(self, job_id):
1241
    """Load the given job file from disk.
1242

1243
    Given a job file, read, load and restore it in a _QueuedJob format.
1244
    In case of error reading the job, it gets returned as None, and the
1245
    exception is logged.
1246

1247
    @type job_id: string
1248
    @param job_id: job identifier
1249
    @rtype: L{_QueuedJob} or None
1250
    @return: either None or the job object
1251

1252
    """
1253
    try:
1254
      return self._LoadJobFromDisk(job_id)
1255
    except (errors.JobFileCorrupted, EnvironmentError):
1256
      logging.exception("Can't load/parse job %s", job_id)
1257
      return None
1258

    
1259
  @staticmethod
1260
  def _IsQueueMarkedDrain():
1261
    """Check if the queue is marked from drain.
1262

1263
    This currently uses the queue drain file, which makes it a
1264
    per-node flag. In the future this can be moved to the config file.
1265

1266
    @rtype: boolean
1267
    @return: True of the job queue is marked for draining
1268

1269
    """
1270
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1271

    
1272
  def _UpdateQueueSizeUnlocked(self):
1273
    """Update the queue size.
1274

1275
    """
1276
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1277

    
1278
  @locking.ssynchronized(_LOCK)
1279
  @_RequireOpenQueue
1280
  def SetDrainFlag(self, drain_flag):
1281
    """Sets the drain flag for the queue.
1282

1283
    @type drain_flag: boolean
1284
    @param drain_flag: Whether to set or unset the drain flag
1285

1286
    """
1287
    if drain_flag:
1288
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1289
    else:
1290
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1291

    
1292
    self._drained = drain_flag
1293

    
1294
    return True
1295

    
1296
  @_RequireOpenQueue
1297
  def _SubmitJobUnlocked(self, job_id, ops):
1298
    """Create and store a new job.
1299

1300
    This enters the job into our job queue and also puts it on the new
1301
    queue, in order for it to be picked up by the queue processors.
1302

1303
    @type job_id: job ID
1304
    @param job_id: the job ID for the new job
1305
    @type ops: list
1306
    @param ops: The list of OpCodes that will become the new job.
1307
    @rtype: L{_QueuedJob}
1308
    @return: the job object to be queued
1309
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1310
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1311

1312
    """
1313
    # Ok when sharing the big job queue lock, as the drain file is created when
1314
    # the lock is exclusive.
1315
    if self._drained:
1316
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1317

    
1318
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1319
      raise errors.JobQueueFull()
1320

    
1321
    job = _QueuedJob(self, job_id, ops)
1322

    
1323
    # Write to disk
1324
    self.UpdateJobUnlocked(job)
1325

    
1326
    self._queue_size += 1
1327

    
1328
    logging.debug("Adding new job %s to the cache", job_id)
1329
    self._memcache[job_id] = job
1330

    
1331
    return job
1332

    
1333
  @locking.ssynchronized(_LOCK)
1334
  @_RequireOpenQueue
1335
  def SubmitJob(self, ops):
1336
    """Create and store a new job.
1337

1338
    @see: L{_SubmitJobUnlocked}
1339

1340
    """
1341
    job_id = self._NewSerialsUnlocked(1)[0]
1342
    self._wpool.AddTask(self._SubmitJobUnlocked(job_id, ops))
1343
    return job_id
1344

    
1345
  @locking.ssynchronized(_LOCK)
1346
  @_RequireOpenQueue
1347
  def SubmitManyJobs(self, jobs):
1348
    """Create and store multiple jobs.
1349

1350
    @see: L{_SubmitJobUnlocked}
1351

1352
    """
1353
    results = []
1354
    tasks = []
1355
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1356
    for job_id, ops in zip(all_job_ids, jobs):
1357
      try:
1358
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1359
        status = True
1360
        data = job_id
1361
      except errors.GenericError, err:
1362
        data = str(err)
1363
        status = False
1364
      results.append((status, data))
1365
    self._wpool.AddManyTasks(tasks)
1366

    
1367
    return results
1368

    
1369
  @_RequireOpenQueue
1370
  def UpdateJobUnlocked(self, job, replicate=True):
1371
    """Update a job's on disk storage.
1372

1373
    After a job has been modified, this function needs to be called in
1374
    order to write the changes to disk and replicate them to the other
1375
    nodes.
1376

1377
    @type job: L{_QueuedJob}
1378
    @param job: the changed job
1379
    @type replicate: boolean
1380
    @param replicate: whether to replicate the change to remote nodes
1381

1382
    """
1383
    filename = self._GetJobPath(job.id)
1384
    data = serializer.DumpJson(job.Serialize(), indent=False)
1385
    logging.debug("Writing job %s to %s", job.id, filename)
1386
    self._UpdateJobQueueFile(filename, data, replicate)
1387

    
1388
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1389
                        timeout):
1390
    """Waits for changes in a job.
1391

1392
    @type job_id: string
1393
    @param job_id: Job identifier
1394
    @type fields: list of strings
1395
    @param fields: Which fields to check for changes
1396
    @type prev_job_info: list or None
1397
    @param prev_job_info: Last job information returned
1398
    @type prev_log_serial: int
1399
    @param prev_log_serial: Last job message serial number
1400
    @type timeout: float
1401
    @param timeout: maximum time to wait in seconds
1402
    @rtype: tuple (job info, log entries)
1403
    @return: a tuple of the job information as required via
1404
        the fields parameter, and the log entries as a list
1405

1406
        if the job has not changed and the timeout has expired,
1407
        we instead return a special value,
1408
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1409
        as such by the clients
1410

1411
    """
1412
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1413

    
1414
    helper = _WaitForJobChangesHelper()
1415

    
1416
    return helper(self._GetJobPath(job_id), load_fn,
1417
                  fields, prev_job_info, prev_log_serial, timeout)
1418

    
1419
  @locking.ssynchronized(_LOCK)
1420
  @_RequireOpenQueue
1421
  def CancelJob(self, job_id):
1422
    """Cancels a job.
1423

1424
    This will only succeed if the job has not started yet.
1425

1426
    @type job_id: string
1427
    @param job_id: job ID of job to be cancelled.
1428

1429
    """
1430
    logging.info("Cancelling job %s", job_id)
1431

    
1432
    job = self._LoadJobUnlocked(job_id)
1433
    if not job:
1434
      logging.debug("Job %s not found", job_id)
1435
      return (False, "Job %s not found" % job_id)
1436

    
1437
    job_status = job.CalcStatus()
1438

    
1439
    if job_status not in (constants.JOB_STATUS_QUEUED,
1440
                          constants.JOB_STATUS_WAITLOCK):
1441
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1442
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1443

    
1444
    if job_status == constants.JOB_STATUS_QUEUED:
1445
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1446
                            "Job canceled by request")
1447
      return (True, "Job %s canceled" % job.id)
1448

    
1449
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1450
      # The worker will notice the new status and cancel the job
1451
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1452
      return (True, "Job %s will be canceled" % job.id)
1453

    
1454
  @_RequireOpenQueue
1455
  def _ArchiveJobsUnlocked(self, jobs):
1456
    """Archives jobs.
1457

1458
    @type jobs: list of L{_QueuedJob}
1459
    @param jobs: Job objects
1460
    @rtype: int
1461
    @return: Number of archived jobs
1462

1463
    """
1464
    archive_jobs = []
1465
    rename_files = []
1466
    for job in jobs:
1467
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1468
        logging.debug("Job %s is not yet done", job.id)
1469
        continue
1470

    
1471
      archive_jobs.append(job)
1472

    
1473
      old = self._GetJobPath(job.id)
1474
      new = self._GetArchivedJobPath(job.id)
1475
      rename_files.append((old, new))
1476

    
1477
    # TODO: What if 1..n files fail to rename?
1478
    self._RenameFilesUnlocked(rename_files)
1479

    
1480
    logging.debug("Successfully archived job(s) %s",
1481
                  utils.CommaJoin(job.id for job in archive_jobs))
1482

    
1483
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1484
    # the files, we update the cached queue size from the filesystem. When we
1485
    # get around to fix the TODO: above, we can use the number of actually
1486
    # archived jobs to fix this.
1487
    self._UpdateQueueSizeUnlocked()
1488
    return len(archive_jobs)
1489

    
1490
  @locking.ssynchronized(_LOCK)
1491
  @_RequireOpenQueue
1492
  def ArchiveJob(self, job_id):
1493
    """Archives a job.
1494

1495
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1496

1497
    @type job_id: string
1498
    @param job_id: Job ID of job to be archived.
1499
    @rtype: bool
1500
    @return: Whether job was archived
1501

1502
    """
1503
    logging.info("Archiving job %s", job_id)
1504

    
1505
    job = self._LoadJobUnlocked(job_id)
1506
    if not job:
1507
      logging.debug("Job %s not found", job_id)
1508
      return False
1509

    
1510
    return self._ArchiveJobsUnlocked([job]) == 1
1511

    
1512
  @locking.ssynchronized(_LOCK)
1513
  @_RequireOpenQueue
1514
  def AutoArchiveJobs(self, age, timeout):
1515
    """Archives all jobs based on age.
1516

1517
    The method will archive all jobs which are older than the age
1518
    parameter. For jobs that don't have an end timestamp, the start
1519
    timestamp will be considered. The special '-1' age will cause
1520
    archival of all jobs (that are not running or queued).
1521

1522
    @type age: int
1523
    @param age: the minimum age in seconds
1524

1525
    """
1526
    logging.info("Archiving jobs with age more than %s seconds", age)
1527

    
1528
    now = time.time()
1529
    end_time = now + timeout
1530
    archived_count = 0
1531
    last_touched = 0
1532

    
1533
    all_job_ids = self._GetJobIDsUnlocked()
1534
    pending = []
1535
    for idx, job_id in enumerate(all_job_ids):
1536
      last_touched = idx + 1
1537

    
1538
      # Not optimal because jobs could be pending
1539
      # TODO: Measure average duration for job archival and take number of
1540
      # pending jobs into account.
1541
      if time.time() > end_time:
1542
        break
1543

    
1544
      # Returns None if the job failed to load
1545
      job = self._LoadJobUnlocked(job_id)
1546
      if job:
1547
        if job.end_timestamp is None:
1548
          if job.start_timestamp is None:
1549
            job_age = job.received_timestamp
1550
          else:
1551
            job_age = job.start_timestamp
1552
        else:
1553
          job_age = job.end_timestamp
1554

    
1555
        if age == -1 or now - job_age[0] > age:
1556
          pending.append(job)
1557

    
1558
          # Archive 10 jobs at a time
1559
          if len(pending) >= 10:
1560
            archived_count += self._ArchiveJobsUnlocked(pending)
1561
            pending = []
1562

    
1563
    if pending:
1564
      archived_count += self._ArchiveJobsUnlocked(pending)
1565

    
1566
    return (archived_count, len(all_job_ids) - last_touched)
1567

    
1568
  def QueryJobs(self, job_ids, fields):
1569
    """Returns a list of jobs in queue.
1570

1571
    @type job_ids: list
1572
    @param job_ids: sequence of job identifiers or None for all
1573
    @type fields: list
1574
    @param fields: names of fields to return
1575
    @rtype: list
1576
    @return: list one element per job, each element being list with
1577
        the requested fields
1578

1579
    """
1580
    jobs = []
1581
    list_all = False
1582
    if not job_ids:
1583
      # Since files are added to/removed from the queue atomically, there's no
1584
      # risk of getting the job ids in an inconsistent state.
1585
      job_ids = self._GetJobIDsUnlocked()
1586
      list_all = True
1587

    
1588
    for job_id in job_ids:
1589
      job = self.SafeLoadJobFromDisk(job_id)
1590
      if job is not None:
1591
        jobs.append(job.GetInfo(fields))
1592
      elif not list_all:
1593
        jobs.append(None)
1594

    
1595
    return jobs
1596

    
1597
  @locking.ssynchronized(_LOCK)
1598
  @_RequireOpenQueue
1599
  def Shutdown(self):
1600
    """Stops the job queue.
1601

1602
    This shutdowns all the worker threads an closes the queue.
1603

1604
    """
1605
    self._wpool.TerminateWorkers()
1606

    
1607
    self._queue_filelock.Close()
1608
    self._queue_filelock = None