Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ b95479a5

History | View | Annotate | Download (66.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import re
35
import time
36
import weakref
37
import threading
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
59
from ganeti import ht
60

    
61

    
62
JOBQUEUE_THREADS = 25
63
JOBS_PER_ARCHIVE_DIRECTORY = 10000
64

    
65
# member lock names to be passed to @ssynchronized decorator
66
_LOCK = "_lock"
67
_QUEUE = "_queue"
68

    
69

    
70
class CancelJob(Exception):
71
  """Special exception to cancel a job.
72

73
  """
74

    
75

    
76
def TimeStampNow():
77
  """Returns the current timestamp.
78

79
  @rtype: tuple
80
  @return: the current time in the (seconds, microseconds) format
81

82
  """
83
  return utils.SplitTime(time.time())
84

    
85

    
86
class _QueuedOpCode(object):
87
  """Encapsulates an opcode object.
88

89
  @ivar log: holds the execution log and consists of tuples
90
  of the form C{(log_serial, timestamp, level, message)}
91
  @ivar input: the OpCode we encapsulate
92
  @ivar status: the current status
93
  @ivar result: the result of the LU execution
94
  @ivar start_timestamp: timestamp for the start of the execution
95
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
96
  @ivar stop_timestamp: timestamp for the end of the execution
97

98
  """
99
  __slots__ = ["input", "status", "result", "log", "priority",
100
               "start_timestamp", "exec_timestamp", "end_timestamp",
101
               "__weakref__"]
102

    
103
  def __init__(self, op):
104
    """Constructor for the _QuededOpCode.
105

106
    @type op: L{opcodes.OpCode}
107
    @param op: the opcode we encapsulate
108

109
    """
110
    self.input = op
111
    self.status = constants.OP_STATUS_QUEUED
112
    self.result = None
113
    self.log = []
114
    self.start_timestamp = None
115
    self.exec_timestamp = None
116
    self.end_timestamp = None
117

    
118
    # Get initial priority (it might change during the lifetime of this opcode)
119
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
120

    
121
  @classmethod
122
  def Restore(cls, state):
123
    """Restore the _QueuedOpCode from the serialized form.
124

125
    @type state: dict
126
    @param state: the serialized state
127
    @rtype: _QueuedOpCode
128
    @return: a new _QueuedOpCode instance
129

130
    """
131
    obj = _QueuedOpCode.__new__(cls)
132
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
133
    obj.status = state["status"]
134
    obj.result = state["result"]
135
    obj.log = state["log"]
136
    obj.start_timestamp = state.get("start_timestamp", None)
137
    obj.exec_timestamp = state.get("exec_timestamp", None)
138
    obj.end_timestamp = state.get("end_timestamp", None)
139
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
140
    return obj
141

    
142
  def Serialize(self):
143
    """Serializes this _QueuedOpCode.
144

145
    @rtype: dict
146
    @return: the dictionary holding the serialized state
147

148
    """
149
    return {
150
      "input": self.input.__getstate__(),
151
      "status": self.status,
152
      "result": self.result,
153
      "log": self.log,
154
      "start_timestamp": self.start_timestamp,
155
      "exec_timestamp": self.exec_timestamp,
156
      "end_timestamp": self.end_timestamp,
157
      "priority": self.priority,
158
      }
159

    
160

    
161
class _QueuedJob(object):
162
  """In-memory job representation.
163

164
  This is what we use to track the user-submitted jobs. Locking must
165
  be taken care of by users of this class.
166

167
  @type queue: L{JobQueue}
168
  @ivar queue: the parent queue
169
  @ivar id: the job ID
170
  @type ops: list
171
  @ivar ops: the list of _QueuedOpCode that constitute the job
172
  @type log_serial: int
173
  @ivar log_serial: holds the index for the next log entry
174
  @ivar received_timestamp: the timestamp for when the job was received
175
  @ivar start_timestmap: the timestamp for start of execution
176
  @ivar end_timestamp: the timestamp for end of execution
177

178
  """
179
  # pylint: disable-msg=W0212
180
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
181
               "received_timestamp", "start_timestamp", "end_timestamp",
182
               "__weakref__", "processor_lock"]
183

    
184
  def __init__(self, queue, job_id, ops):
185
    """Constructor for the _QueuedJob.
186

187
    @type queue: L{JobQueue}
188
    @param queue: our parent queue
189
    @type job_id: job_id
190
    @param job_id: our job id
191
    @type ops: list
192
    @param ops: the list of opcodes we hold, which will be encapsulated
193
        in _QueuedOpCodes
194

195
    """
196
    if not ops:
197
      raise errors.GenericError("A job needs at least one opcode")
198

    
199
    self.queue = queue
200
    self.id = job_id
201
    self.ops = [_QueuedOpCode(op) for op in ops]
202
    self.log_serial = 0
203
    self.received_timestamp = TimeStampNow()
204
    self.start_timestamp = None
205
    self.end_timestamp = None
206

    
207
    self._InitInMemory(self)
208

    
209
  @staticmethod
210
  def _InitInMemory(obj):
211
    """Initializes in-memory variables.
212

213
    """
214
    obj.ops_iter = None
215
    obj.cur_opctx = None
216
    obj.processor_lock = threading.Lock()
217

    
218
  def __repr__(self):
219
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
220
              "id=%s" % self.id,
221
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
222

    
223
    return "<%s at %#x>" % (" ".join(status), id(self))
224

    
225
  @classmethod
226
  def Restore(cls, queue, state):
227
    """Restore a _QueuedJob from serialized state:
228

229
    @type queue: L{JobQueue}
230
    @param queue: to which queue the restored job belongs
231
    @type state: dict
232
    @param state: the serialized state
233
    @rtype: _JobQueue
234
    @return: the restored _JobQueue instance
235

236
    """
237
    obj = _QueuedJob.__new__(cls)
238
    obj.queue = queue
239
    obj.id = state["id"]
240
    obj.received_timestamp = state.get("received_timestamp", None)
241
    obj.start_timestamp = state.get("start_timestamp", None)
242
    obj.end_timestamp = state.get("end_timestamp", None)
243

    
244
    obj.ops = []
245
    obj.log_serial = 0
246
    for op_state in state["ops"]:
247
      op = _QueuedOpCode.Restore(op_state)
248
      for log_entry in op.log:
249
        obj.log_serial = max(obj.log_serial, log_entry[0])
250
      obj.ops.append(op)
251

    
252
    cls._InitInMemory(obj)
253

    
254
    return obj
255

    
256
  def Serialize(self):
257
    """Serialize the _JobQueue instance.
258

259
    @rtype: dict
260
    @return: the serialized state
261

262
    """
263
    return {
264
      "id": self.id,
265
      "ops": [op.Serialize() for op in self.ops],
266
      "start_timestamp": self.start_timestamp,
267
      "end_timestamp": self.end_timestamp,
268
      "received_timestamp": self.received_timestamp,
269
      }
270

    
271
  def CalcStatus(self):
272
    """Compute the status of this job.
273

274
    This function iterates over all the _QueuedOpCodes in the job and
275
    based on their status, computes the job status.
276

277
    The algorithm is:
278
      - if we find a cancelled, or finished with error, the job
279
        status will be the same
280
      - otherwise, the last opcode with the status one of:
281
          - waitlock
282
          - canceling
283
          - running
284

285
        will determine the job status
286

287
      - otherwise, it means either all opcodes are queued, or success,
288
        and the job status will be the same
289

290
    @return: the job status
291

292
    """
293
    status = constants.JOB_STATUS_QUEUED
294

    
295
    all_success = True
296
    for op in self.ops:
297
      if op.status == constants.OP_STATUS_SUCCESS:
298
        continue
299

    
300
      all_success = False
301

    
302
      if op.status == constants.OP_STATUS_QUEUED:
303
        pass
304
      elif op.status == constants.OP_STATUS_WAITLOCK:
305
        status = constants.JOB_STATUS_WAITLOCK
306
      elif op.status == constants.OP_STATUS_RUNNING:
307
        status = constants.JOB_STATUS_RUNNING
308
      elif op.status == constants.OP_STATUS_CANCELING:
309
        status = constants.JOB_STATUS_CANCELING
310
        break
311
      elif op.status == constants.OP_STATUS_ERROR:
312
        status = constants.JOB_STATUS_ERROR
313
        # The whole job fails if one opcode failed
314
        break
315
      elif op.status == constants.OP_STATUS_CANCELED:
316
        status = constants.OP_STATUS_CANCELED
317
        break
318

    
319
    if all_success:
320
      status = constants.JOB_STATUS_SUCCESS
321

    
322
    return status
323

    
324
  def CalcPriority(self):
325
    """Gets the current priority for this job.
326

327
    Only unfinished opcodes are considered. When all are done, the default
328
    priority is used.
329

330
    @rtype: int
331

332
    """
333
    priorities = [op.priority for op in self.ops
334
                  if op.status not in constants.OPS_FINALIZED]
335

    
336
    if not priorities:
337
      # All opcodes are done, assume default priority
338
      return constants.OP_PRIO_DEFAULT
339

    
340
    return min(priorities)
341

    
342
  def GetLogEntries(self, newer_than):
343
    """Selectively returns the log entries.
344

345
    @type newer_than: None or int
346
    @param newer_than: if this is None, return all log entries,
347
        otherwise return only the log entries with serial higher
348
        than this value
349
    @rtype: list
350
    @return: the list of the log entries selected
351

352
    """
353
    if newer_than is None:
354
      serial = -1
355
    else:
356
      serial = newer_than
357

    
358
    entries = []
359
    for op in self.ops:
360
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
361

    
362
    return entries
363

    
364
  def GetInfo(self, fields):
365
    """Returns information about a job.
366

367
    @type fields: list
368
    @param fields: names of fields to return
369
    @rtype: list
370
    @return: list with one element for each field
371
    @raise errors.OpExecError: when an invalid field
372
        has been passed
373

374
    """
375
    row = []
376
    for fname in fields:
377
      if fname == "id":
378
        row.append(self.id)
379
      elif fname == "status":
380
        row.append(self.CalcStatus())
381
      elif fname == "priority":
382
        row.append(self.CalcPriority())
383
      elif fname == "ops":
384
        row.append([op.input.__getstate__() for op in self.ops])
385
      elif fname == "opresult":
386
        row.append([op.result for op in self.ops])
387
      elif fname == "opstatus":
388
        row.append([op.status for op in self.ops])
389
      elif fname == "oplog":
390
        row.append([op.log for op in self.ops])
391
      elif fname == "opstart":
392
        row.append([op.start_timestamp for op in self.ops])
393
      elif fname == "opexec":
394
        row.append([op.exec_timestamp for op in self.ops])
395
      elif fname == "opend":
396
        row.append([op.end_timestamp for op in self.ops])
397
      elif fname == "oppriority":
398
        row.append([op.priority for op in self.ops])
399
      elif fname == "received_ts":
400
        row.append(self.received_timestamp)
401
      elif fname == "start_ts":
402
        row.append(self.start_timestamp)
403
      elif fname == "end_ts":
404
        row.append(self.end_timestamp)
405
      elif fname == "summary":
406
        row.append([op.input.Summary() for op in self.ops])
407
      else:
408
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
409
    return row
410

    
411
  def MarkUnfinishedOps(self, status, result):
412
    """Mark unfinished opcodes with a given status and result.
413

414
    This is an utility function for marking all running or waiting to
415
    be run opcodes with a given status. Opcodes which are already
416
    finalised are not changed.
417

418
    @param status: a given opcode status
419
    @param result: the opcode result
420

421
    """
422
    not_marked = True
423
    for op in self.ops:
424
      if op.status in constants.OPS_FINALIZED:
425
        assert not_marked, "Finalized opcodes found after non-finalized ones"
426
        continue
427
      op.status = status
428
      op.result = result
429
      not_marked = False
430

    
431
  def Finalize(self):
432
    """Marks the job as finalized.
433

434
    """
435
    self.end_timestamp = TimeStampNow()
436

    
437
  def Cancel(self):
438
    """Marks job as canceled/-ing if possible.
439

440
    @rtype: tuple; (bool, string)
441
    @return: Boolean describing whether job was successfully canceled or marked
442
      as canceling and a text message
443

444
    """
445
    status = self.CalcStatus()
446

    
447
    if status == constants.JOB_STATUS_QUEUED:
448
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
449
                             "Job canceled by request")
450
      self.Finalize()
451
      return (True, "Job %s canceled" % self.id)
452

    
453
    elif status == constants.JOB_STATUS_WAITLOCK:
454
      # The worker will notice the new status and cancel the job
455
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
456
      return (True, "Job %s will be canceled" % self.id)
457

    
458
    else:
459
      logging.debug("Job %s is no longer waiting in the queue", self.id)
460
      return (False, "Job %s is no longer waiting in the queue" % self.id)
461

    
462

    
463
class _OpExecCallbacks(mcpu.OpExecCbBase):
464
  def __init__(self, queue, job, op):
465
    """Initializes this class.
466

467
    @type queue: L{JobQueue}
468
    @param queue: Job queue
469
    @type job: L{_QueuedJob}
470
    @param job: Job object
471
    @type op: L{_QueuedOpCode}
472
    @param op: OpCode
473

474
    """
475
    assert queue, "Queue is missing"
476
    assert job, "Job is missing"
477
    assert op, "Opcode is missing"
478

    
479
    self._queue = queue
480
    self._job = job
481
    self._op = op
482

    
483
  def _CheckCancel(self):
484
    """Raises an exception to cancel the job if asked to.
485

486
    """
487
    # Cancel here if we were asked to
488
    if self._op.status == constants.OP_STATUS_CANCELING:
489
      logging.debug("Canceling opcode")
490
      raise CancelJob()
491

    
492
  @locking.ssynchronized(_QUEUE, shared=1)
493
  def NotifyStart(self):
494
    """Mark the opcode as running, not lock-waiting.
495

496
    This is called from the mcpu code as a notifier function, when the LU is
497
    finally about to start the Exec() method. Of course, to have end-user
498
    visible results, the opcode must be initially (before calling into
499
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
500

501
    """
502
    assert self._op in self._job.ops
503
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
504
                               constants.OP_STATUS_CANCELING)
505

    
506
    # Cancel here if we were asked to
507
    self._CheckCancel()
508

    
509
    logging.debug("Opcode is now running")
510

    
511
    self._op.status = constants.OP_STATUS_RUNNING
512
    self._op.exec_timestamp = TimeStampNow()
513

    
514
    # And finally replicate the job status
515
    self._queue.UpdateJobUnlocked(self._job)
516

    
517
  @locking.ssynchronized(_QUEUE, shared=1)
518
  def _AppendFeedback(self, timestamp, log_type, log_msg):
519
    """Internal feedback append function, with locks
520

521
    """
522
    self._job.log_serial += 1
523
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
524
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
525

    
526
  def Feedback(self, *args):
527
    """Append a log entry.
528

529
    """
530
    assert len(args) < 3
531

    
532
    if len(args) == 1:
533
      log_type = constants.ELOG_MESSAGE
534
      log_msg = args[0]
535
    else:
536
      (log_type, log_msg) = args
537

    
538
    # The time is split to make serialization easier and not lose
539
    # precision.
540
    timestamp = utils.SplitTime(time.time())
541
    self._AppendFeedback(timestamp, log_type, log_msg)
542

    
543
  def CheckCancel(self):
544
    """Check whether job has been cancelled.
545

546
    """
547
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
548
                               constants.OP_STATUS_CANCELING)
549

    
550
    # Cancel here if we were asked to
551
    self._CheckCancel()
552

    
553
  def SubmitManyJobs(self, jobs):
554
    """Submits jobs for processing.
555

556
    See L{JobQueue.SubmitManyJobs}.
557

558
    """
559
    # Locking is done in job queue
560
    return self._queue.SubmitManyJobs(jobs)
561

    
562

    
563
class _JobChangesChecker(object):
564
  def __init__(self, fields, prev_job_info, prev_log_serial):
565
    """Initializes this class.
566

567
    @type fields: list of strings
568
    @param fields: Fields requested by LUXI client
569
    @type prev_job_info: string
570
    @param prev_job_info: previous job info, as passed by the LUXI client
571
    @type prev_log_serial: string
572
    @param prev_log_serial: previous job serial, as passed by the LUXI client
573

574
    """
575
    self._fields = fields
576
    self._prev_job_info = prev_job_info
577
    self._prev_log_serial = prev_log_serial
578

    
579
  def __call__(self, job):
580
    """Checks whether job has changed.
581

582
    @type job: L{_QueuedJob}
583
    @param job: Job object
584

585
    """
586
    status = job.CalcStatus()
587
    job_info = job.GetInfo(self._fields)
588
    log_entries = job.GetLogEntries(self._prev_log_serial)
589

    
590
    # Serializing and deserializing data can cause type changes (e.g. from
591
    # tuple to list) or precision loss. We're doing it here so that we get
592
    # the same modifications as the data received from the client. Without
593
    # this, the comparison afterwards might fail without the data being
594
    # significantly different.
595
    # TODO: we just deserialized from disk, investigate how to make sure that
596
    # the job info and log entries are compatible to avoid this further step.
597
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
598
    # efficient, though floats will be tricky
599
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
600
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
601

    
602
    # Don't even try to wait if the job is no longer running, there will be
603
    # no changes.
604
    if (status not in (constants.JOB_STATUS_QUEUED,
605
                       constants.JOB_STATUS_RUNNING,
606
                       constants.JOB_STATUS_WAITLOCK) or
607
        job_info != self._prev_job_info or
608
        (log_entries and self._prev_log_serial != log_entries[0][0])):
609
      logging.debug("Job %s changed", job.id)
610
      return (job_info, log_entries)
611

    
612
    return None
613

    
614

    
615
class _JobFileChangesWaiter(object):
616
  def __init__(self, filename):
617
    """Initializes this class.
618

619
    @type filename: string
620
    @param filename: Path to job file
621
    @raises errors.InotifyError: if the notifier cannot be setup
622

623
    """
624
    self._wm = pyinotify.WatchManager()
625
    self._inotify_handler = \
626
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
627
    self._notifier = \
628
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
629
    try:
630
      self._inotify_handler.enable()
631
    except Exception:
632
      # pyinotify doesn't close file descriptors automatically
633
      self._notifier.stop()
634
      raise
635

    
636
  def _OnInotify(self, notifier_enabled):
637
    """Callback for inotify.
638

639
    """
640
    if not notifier_enabled:
641
      self._inotify_handler.enable()
642

    
643
  def Wait(self, timeout):
644
    """Waits for the job file to change.
645

646
    @type timeout: float
647
    @param timeout: Timeout in seconds
648
    @return: Whether there have been events
649

650
    """
651
    assert timeout >= 0
652
    have_events = self._notifier.check_events(timeout * 1000)
653
    if have_events:
654
      self._notifier.read_events()
655
    self._notifier.process_events()
656
    return have_events
657

    
658
  def Close(self):
659
    """Closes underlying notifier and its file descriptor.
660

661
    """
662
    self._notifier.stop()
663

    
664

    
665
class _JobChangesWaiter(object):
666
  def __init__(self, filename):
667
    """Initializes this class.
668

669
    @type filename: string
670
    @param filename: Path to job file
671

672
    """
673
    self._filewaiter = None
674
    self._filename = filename
675

    
676
  def Wait(self, timeout):
677
    """Waits for a job to change.
678

679
    @type timeout: float
680
    @param timeout: Timeout in seconds
681
    @return: Whether there have been events
682

683
    """
684
    if self._filewaiter:
685
      return self._filewaiter.Wait(timeout)
686

    
687
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
688
    # If this point is reached, return immediately and let caller check the job
689
    # file again in case there were changes since the last check. This avoids a
690
    # race condition.
691
    self._filewaiter = _JobFileChangesWaiter(self._filename)
692

    
693
    return True
694

    
695
  def Close(self):
696
    """Closes underlying waiter.
697

698
    """
699
    if self._filewaiter:
700
      self._filewaiter.Close()
701

    
702

    
703
class _WaitForJobChangesHelper(object):
704
  """Helper class using inotify to wait for changes in a job file.
705

706
  This class takes a previous job status and serial, and alerts the client when
707
  the current job status has changed.
708

709
  """
710
  @staticmethod
711
  def _CheckForChanges(job_load_fn, check_fn):
712
    job = job_load_fn()
713
    if not job:
714
      raise errors.JobLost()
715

    
716
    result = check_fn(job)
717
    if result is None:
718
      raise utils.RetryAgain()
719

    
720
    return result
721

    
722
  def __call__(self, filename, job_load_fn,
723
               fields, prev_job_info, prev_log_serial, timeout):
724
    """Waits for changes on a job.
725

726
    @type filename: string
727
    @param filename: File on which to wait for changes
728
    @type job_load_fn: callable
729
    @param job_load_fn: Function to load job
730
    @type fields: list of strings
731
    @param fields: Which fields to check for changes
732
    @type prev_job_info: list or None
733
    @param prev_job_info: Last job information returned
734
    @type prev_log_serial: int
735
    @param prev_log_serial: Last job message serial number
736
    @type timeout: float
737
    @param timeout: maximum time to wait in seconds
738

739
    """
740
    try:
741
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
742
      waiter = _JobChangesWaiter(filename)
743
      try:
744
        return utils.Retry(compat.partial(self._CheckForChanges,
745
                                          job_load_fn, check_fn),
746
                           utils.RETRY_REMAINING_TIME, timeout,
747
                           wait_fn=waiter.Wait)
748
      finally:
749
        waiter.Close()
750
    except (errors.InotifyError, errors.JobLost):
751
      return None
752
    except utils.RetryTimeout:
753
      return constants.JOB_NOTCHANGED
754

    
755

    
756
def _EncodeOpError(err):
757
  """Encodes an error which occurred while processing an opcode.
758

759
  """
760
  if isinstance(err, errors.GenericError):
761
    to_encode = err
762
  else:
763
    to_encode = errors.OpExecError(str(err))
764

    
765
  return errors.EncodeException(to_encode)
766

    
767

    
768
class _TimeoutStrategyWrapper:
769
  def __init__(self, fn):
770
    """Initializes this class.
771

772
    """
773
    self._fn = fn
774
    self._next = None
775

    
776
  def _Advance(self):
777
    """Gets the next timeout if necessary.
778

779
    """
780
    if self._next is None:
781
      self._next = self._fn()
782

    
783
  def Peek(self):
784
    """Returns the next timeout.
785

786
    """
787
    self._Advance()
788
    return self._next
789

    
790
  def Next(self):
791
    """Returns the current timeout and advances the internal state.
792

793
    """
794
    self._Advance()
795
    result = self._next
796
    self._next = None
797
    return result
798

    
799

    
800
class _OpExecContext:
801
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
802
    """Initializes this class.
803

804
    """
805
    self.op = op
806
    self.index = index
807
    self.log_prefix = log_prefix
808
    self.summary = op.input.Summary()
809

    
810
    # Create local copy to modify
811
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
812
      self.jobdeps = op.input.depends[:]
813
    else:
814
      self.jobdeps = None
815

    
816
    self._timeout_strategy_factory = timeout_strategy_factory
817
    self._ResetTimeoutStrategy()
818

    
819
  def _ResetTimeoutStrategy(self):
820
    """Creates a new timeout strategy.
821

822
    """
823
    self._timeout_strategy = \
824
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
825

    
826
  def CheckPriorityIncrease(self):
827
    """Checks whether priority can and should be increased.
828

829
    Called when locks couldn't be acquired.
830

831
    """
832
    op = self.op
833

    
834
    # Exhausted all retries and next round should not use blocking acquire
835
    # for locks?
836
    if (self._timeout_strategy.Peek() is None and
837
        op.priority > constants.OP_PRIO_HIGHEST):
838
      logging.debug("Increasing priority")
839
      op.priority -= 1
840
      self._ResetTimeoutStrategy()
841
      return True
842

    
843
    return False
844

    
845
  def GetNextLockTimeout(self):
846
    """Returns the next lock acquire timeout.
847

848
    """
849
    return self._timeout_strategy.Next()
850

    
851

    
852
class _JobProcessor(object):
853
  def __init__(self, queue, opexec_fn, job,
854
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
855
    """Initializes this class.
856

857
    """
858
    self.queue = queue
859
    self.opexec_fn = opexec_fn
860
    self.job = job
861
    self._timeout_strategy_factory = _timeout_strategy_factory
862

    
863
  @staticmethod
864
  def _FindNextOpcode(job, timeout_strategy_factory):
865
    """Locates the next opcode to run.
866

867
    @type job: L{_QueuedJob}
868
    @param job: Job object
869
    @param timeout_strategy_factory: Callable to create new timeout strategy
870

871
    """
872
    # Create some sort of a cache to speed up locating next opcode for future
873
    # lookups
874
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
875
    # pending and one for processed ops.
876
    if job.ops_iter is None:
877
      job.ops_iter = enumerate(job.ops)
878

    
879
    # Find next opcode to run
880
    while True:
881
      try:
882
        (idx, op) = job.ops_iter.next()
883
      except StopIteration:
884
        raise errors.ProgrammerError("Called for a finished job")
885

    
886
      if op.status == constants.OP_STATUS_RUNNING:
887
        # Found an opcode already marked as running
888
        raise errors.ProgrammerError("Called for job marked as running")
889

    
890
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
891
                             timeout_strategy_factory)
892

    
893
      if op.status not in constants.OPS_FINALIZED:
894
        return opctx
895

    
896
      # This is a job that was partially completed before master daemon
897
      # shutdown, so it can be expected that some opcodes are already
898
      # completed successfully (if any did error out, then the whole job
899
      # should have been aborted and not resubmitted for processing).
900
      logging.info("%s: opcode %s already processed, skipping",
901
                   opctx.log_prefix, opctx.summary)
902

    
903
  @staticmethod
904
  def _MarkWaitlock(job, op):
905
    """Marks an opcode as waiting for locks.
906

907
    The job's start timestamp is also set if necessary.
908

909
    @type job: L{_QueuedJob}
910
    @param job: Job object
911
    @type op: L{_QueuedOpCode}
912
    @param op: Opcode object
913

914
    """
915
    assert op in job.ops
916
    assert op.status in (constants.OP_STATUS_QUEUED,
917
                         constants.OP_STATUS_WAITLOCK)
918

    
919
    update = False
920

    
921
    op.result = None
922

    
923
    if op.status == constants.OP_STATUS_QUEUED:
924
      op.status = constants.OP_STATUS_WAITLOCK
925
      update = True
926

    
927
    if op.start_timestamp is None:
928
      op.start_timestamp = TimeStampNow()
929
      update = True
930

    
931
    if job.start_timestamp is None:
932
      job.start_timestamp = op.start_timestamp
933
      update = True
934

    
935
    assert op.status == constants.OP_STATUS_WAITLOCK
936

    
937
    return update
938

    
939
  @staticmethod
940
  def _CheckDependencies(queue, job, opctx):
941
    """Checks if an opcode has dependencies and if so, processes them.
942

943
    @type queue: L{JobQueue}
944
    @param queue: Queue object
945
    @type job: L{_QueuedJob}
946
    @param job: Job object
947
    @type opctx: L{_OpExecContext}
948
    @param opctx: Opcode execution context
949
    @rtype: bool
950
    @return: Whether opcode will be re-scheduled by dependency tracker
951

952
    """
953
    op = opctx.op
954

    
955
    result = False
956

    
957
    while opctx.jobdeps:
958
      (dep_job_id, dep_status) = opctx.jobdeps[0]
959

    
960
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
961
                                                          dep_status)
962
      assert ht.TNonEmptyString(depmsg), "No dependency message"
963

    
964
      logging.info("%s: %s", opctx.log_prefix, depmsg)
965

    
966
      if depresult == _JobDependencyManager.CONTINUE:
967
        # Remove dependency and continue
968
        opctx.jobdeps.pop(0)
969

    
970
      elif depresult == _JobDependencyManager.WAIT:
971
        # Need to wait for notification, dependency tracker will re-add job
972
        # to workerpool
973
        result = True
974
        break
975

    
976
      elif depresult == _JobDependencyManager.CANCEL:
977
        # Job was cancelled, cancel this job as well
978
        job.Cancel()
979
        assert op.status == constants.OP_STATUS_CANCELING
980
        break
981

    
982
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
983
                         _JobDependencyManager.ERROR):
984
        # Job failed or there was an error, this job must fail
985
        op.status = constants.OP_STATUS_ERROR
986
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
987
        break
988

    
989
      else:
990
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
991
                                     depresult)
992

    
993
    return result
994

    
995
  def _ExecOpCodeUnlocked(self, opctx):
996
    """Processes one opcode and returns the result.
997

998
    """
999
    op = opctx.op
1000

    
1001
    assert op.status == constants.OP_STATUS_WAITLOCK
1002

    
1003
    timeout = opctx.GetNextLockTimeout()
1004

    
1005
    try:
1006
      # Make sure not to hold queue lock while calling ExecOpCode
1007
      result = self.opexec_fn(op.input,
1008
                              _OpExecCallbacks(self.queue, self.job, op),
1009
                              timeout=timeout, priority=op.priority)
1010
    except mcpu.LockAcquireTimeout:
1011
      assert timeout is not None, "Received timeout for blocking acquire"
1012
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1013

    
1014
      assert op.status in (constants.OP_STATUS_WAITLOCK,
1015
                           constants.OP_STATUS_CANCELING)
1016

    
1017
      # Was job cancelled while we were waiting for the lock?
1018
      if op.status == constants.OP_STATUS_CANCELING:
1019
        return (constants.OP_STATUS_CANCELING, None)
1020

    
1021
      # Stay in waitlock while trying to re-acquire lock
1022
      return (constants.OP_STATUS_WAITLOCK, None)
1023
    except CancelJob:
1024
      logging.exception("%s: Canceling job", opctx.log_prefix)
1025
      assert op.status == constants.OP_STATUS_CANCELING
1026
      return (constants.OP_STATUS_CANCELING, None)
1027
    except Exception, err: # pylint: disable-msg=W0703
1028
      logging.exception("%s: Caught exception in %s",
1029
                        opctx.log_prefix, opctx.summary)
1030
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1031
    else:
1032
      logging.debug("%s: %s successful",
1033
                    opctx.log_prefix, opctx.summary)
1034
      return (constants.OP_STATUS_SUCCESS, result)
1035

    
1036
  def __call__(self, _nextop_fn=None):
1037
    """Continues execution of a job.
1038

1039
    @param _nextop_fn: Callback function for tests
1040
    @rtype: bool
1041
    @return: True if job is finished, False if processor needs to be called
1042
             again
1043

1044
    """
1045
    queue = self.queue
1046
    job = self.job
1047

    
1048
    logging.debug("Processing job %s", job.id)
1049

    
1050
    queue.acquire(shared=1)
1051
    try:
1052
      opcount = len(job.ops)
1053

    
1054
      # Don't do anything for finalized jobs
1055
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1056
        return True
1057

    
1058
      # Is a previous opcode still pending?
1059
      if job.cur_opctx:
1060
        opctx = job.cur_opctx
1061
        job.cur_opctx = None
1062
      else:
1063
        if __debug__ and _nextop_fn:
1064
          _nextop_fn()
1065
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1066

    
1067
      op = opctx.op
1068

    
1069
      # Consistency check
1070
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1071
                                     constants.OP_STATUS_CANCELING)
1072
                        for i in job.ops[opctx.index + 1:])
1073

    
1074
      assert op.status in (constants.OP_STATUS_QUEUED,
1075
                           constants.OP_STATUS_WAITLOCK,
1076
                           constants.OP_STATUS_CANCELING)
1077

    
1078
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1079
              op.priority >= constants.OP_PRIO_HIGHEST)
1080

    
1081
      waitjob = None
1082

    
1083
      if op.status != constants.OP_STATUS_CANCELING:
1084
        assert op.status in (constants.OP_STATUS_QUEUED,
1085
                             constants.OP_STATUS_WAITLOCK)
1086

    
1087
        # Prepare to start opcode
1088
        if self._MarkWaitlock(job, op):
1089
          # Write to disk
1090
          queue.UpdateJobUnlocked(job)
1091

    
1092
        assert op.status == constants.OP_STATUS_WAITLOCK
1093
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1094
        assert job.start_timestamp and op.start_timestamp
1095
        assert waitjob is None
1096

    
1097
        # Check if waiting for a job is necessary
1098
        waitjob = self._CheckDependencies(queue, job, opctx)
1099

    
1100
        assert op.status in (constants.OP_STATUS_WAITLOCK,
1101
                             constants.OP_STATUS_CANCELING,
1102
                             constants.OP_STATUS_ERROR)
1103

    
1104
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1105
                                         constants.OP_STATUS_ERROR)):
1106
          logging.info("%s: opcode %s waiting for locks",
1107
                       opctx.log_prefix, opctx.summary)
1108

    
1109
          assert not opctx.jobdeps, "Not all dependencies were removed"
1110

    
1111
          queue.release()
1112
          try:
1113
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1114
          finally:
1115
            queue.acquire(shared=1)
1116

    
1117
          op.status = op_status
1118
          op.result = op_result
1119

    
1120
          assert not waitjob
1121

    
1122
        if op.status == constants.OP_STATUS_WAITLOCK:
1123
          # Couldn't get locks in time
1124
          assert not op.end_timestamp
1125
        else:
1126
          # Finalize opcode
1127
          op.end_timestamp = TimeStampNow()
1128

    
1129
          if op.status == constants.OP_STATUS_CANCELING:
1130
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1131
                                  for i in job.ops[opctx.index:])
1132
          else:
1133
            assert op.status in constants.OPS_FINALIZED
1134

    
1135
      if op.status == constants.OP_STATUS_WAITLOCK or waitjob:
1136
        finalize = False
1137

    
1138
        if not waitjob and opctx.CheckPriorityIncrease():
1139
          # Priority was changed, need to update on-disk file
1140
          queue.UpdateJobUnlocked(job)
1141

    
1142
        # Keep around for another round
1143
        job.cur_opctx = opctx
1144

    
1145
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1146
                op.priority >= constants.OP_PRIO_HIGHEST)
1147

    
1148
        # In no case must the status be finalized here
1149
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1150

    
1151
      else:
1152
        # Ensure all opcodes so far have been successful
1153
        assert (opctx.index == 0 or
1154
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1155
                           for i in job.ops[:opctx.index]))
1156

    
1157
        # Reset context
1158
        job.cur_opctx = None
1159

    
1160
        if op.status == constants.OP_STATUS_SUCCESS:
1161
          finalize = False
1162

    
1163
        elif op.status == constants.OP_STATUS_ERROR:
1164
          # Ensure failed opcode has an exception as its result
1165
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1166

    
1167
          to_encode = errors.OpExecError("Preceding opcode failed")
1168
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1169
                                _EncodeOpError(to_encode))
1170
          finalize = True
1171

    
1172
          # Consistency check
1173
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1174
                            errors.GetEncodedError(i.result)
1175
                            for i in job.ops[opctx.index:])
1176

    
1177
        elif op.status == constants.OP_STATUS_CANCELING:
1178
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1179
                                "Job canceled by request")
1180
          finalize = True
1181

    
1182
        else:
1183
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1184

    
1185
        if opctx.index == (opcount - 1):
1186
          # Finalize on last opcode
1187
          finalize = True
1188

    
1189
        if finalize:
1190
          # All opcodes have been run, finalize job
1191
          job.Finalize()
1192

    
1193
        # Write to disk. If the job status is final, this is the final write
1194
        # allowed. Once the file has been written, it can be archived anytime.
1195
        queue.UpdateJobUnlocked(job)
1196

    
1197
        assert not waitjob
1198

    
1199
        if finalize:
1200
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1201
          # TODO: Check locking
1202
          queue.depmgr.NotifyWaiters(job.id)
1203
          return True
1204

    
1205
      assert not waitjob or queue.depmgr.JobWaiting(job)
1206

    
1207
      return bool(waitjob)
1208
    finally:
1209
      queue.release()
1210

    
1211

    
1212
class _JobQueueWorker(workerpool.BaseWorker):
1213
  """The actual job workers.
1214

1215
  """
1216
  def RunTask(self, job): # pylint: disable-msg=W0221
1217
    """Job executor.
1218

1219
    @type job: L{_QueuedJob}
1220
    @param job: the job to be processed
1221

1222
    """
1223
    # Ensure only one worker is active on a single job. If a job registers for
1224
    # a dependency job, and the other job notifies before the first worker is
1225
    # done, the job can end up in the tasklist more than once.
1226
    job.processor_lock.acquire()
1227
    try:
1228
      return self._RunTaskInner(job)
1229
    finally:
1230
      job.processor_lock.release()
1231

    
1232
  def _RunTaskInner(self, job):
1233
    """Executes a job.
1234

1235
    Must be called with per-job lock acquired.
1236

1237
    """
1238
    queue = job.queue
1239
    assert queue == self.pool.queue
1240

    
1241
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1242
    setname_fn(None)
1243

    
1244
    proc = mcpu.Processor(queue.context, job.id)
1245

    
1246
    # Create wrapper for setting thread name
1247
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1248
                                    proc.ExecOpCode)
1249

    
1250
    if not _JobProcessor(queue, wrap_execop_fn, job)():
1251
      # Schedule again
1252
      raise workerpool.DeferTask(priority=job.CalcPriority())
1253

    
1254
  @staticmethod
1255
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1256
    """Updates the worker thread name to include a short summary of the opcode.
1257

1258
    @param setname_fn: Callable setting worker thread name
1259
    @param execop_fn: Callable for executing opcode (usually
1260
                      L{mcpu.Processor.ExecOpCode})
1261

1262
    """
1263
    setname_fn(op)
1264
    try:
1265
      return execop_fn(op, *args, **kwargs)
1266
    finally:
1267
      setname_fn(None)
1268

    
1269
  @staticmethod
1270
  def _GetWorkerName(job, op):
1271
    """Sets the worker thread name.
1272

1273
    @type job: L{_QueuedJob}
1274
    @type op: L{opcodes.OpCode}
1275

1276
    """
1277
    parts = ["Job%s" % job.id]
1278

    
1279
    if op:
1280
      parts.append(op.TinySummary())
1281

    
1282
    return "/".join(parts)
1283

    
1284

    
1285
class _JobQueueWorkerPool(workerpool.WorkerPool):
1286
  """Simple class implementing a job-processing workerpool.
1287

1288
  """
1289
  def __init__(self, queue):
1290
    super(_JobQueueWorkerPool, self).__init__("Jq",
1291
                                              JOBQUEUE_THREADS,
1292
                                              _JobQueueWorker)
1293
    self.queue = queue
1294

    
1295

    
1296
class _JobDependencyManager:
1297
  """Keeps track of job dependencies.
1298

1299
  """
1300
  (WAIT,
1301
   ERROR,
1302
   CANCEL,
1303
   CONTINUE,
1304
   WRONGSTATUS) = range(1, 6)
1305

    
1306
  # TODO: Export waiter information to lock monitor
1307

    
1308
  def __init__(self, getstatus_fn, enqueue_fn):
1309
    """Initializes this class.
1310

1311
    """
1312
    self._getstatus_fn = getstatus_fn
1313
    self._enqueue_fn = enqueue_fn
1314

    
1315
    self._waiters = {}
1316
    self._lock = locking.SharedLock("JobDepMgr")
1317

    
1318
  @locking.ssynchronized(_LOCK, shared=1)
1319
  def JobWaiting(self, job):
1320
    """Checks if a job is waiting.
1321

1322
    """
1323
    return compat.any(job in jobs
1324
                      for jobs in self._waiters.values())
1325

    
1326
  @locking.ssynchronized(_LOCK)
1327
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1328
    """Checks if a dependency job has the requested status.
1329

1330
    If the other job is not yet in a finalized status, the calling job will be
1331
    notified (re-added to the workerpool) at a later point.
1332

1333
    @type job: L{_QueuedJob}
1334
    @param job: Job object
1335
    @type dep_job_id: string
1336
    @param dep_job_id: ID of dependency job
1337
    @type dep_status: list
1338
    @param dep_status: Required status
1339

1340
    """
1341
    assert ht.TString(job.id)
1342
    assert ht.TString(dep_job_id)
1343
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1344

    
1345
    if job.id == dep_job_id:
1346
      return (self.ERROR, "Job can't depend on itself")
1347

    
1348
    # Get status of dependency job
1349
    try:
1350
      status = self._getstatus_fn(dep_job_id)
1351
    except errors.JobLost, err:
1352
      return (self.ERROR, "Dependency error: %s" % err)
1353

    
1354
    assert status in constants.JOB_STATUS_ALL
1355

    
1356
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1357

    
1358
    if status not in constants.JOBS_FINALIZED:
1359
      # Register for notification and wait for job to finish
1360
      job_id_waiters.add(job)
1361
      return (self.WAIT,
1362
              "Need to wait for job %s, wanted status '%s'" %
1363
              (dep_job_id, dep_status))
1364

    
1365
    # Remove from waiters list
1366
    if job in job_id_waiters:
1367
      job_id_waiters.remove(job)
1368

    
1369
    if (status == constants.JOB_STATUS_CANCELED and
1370
        constants.JOB_STATUS_CANCELED not in dep_status):
1371
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1372

    
1373
    elif not dep_status or status in dep_status:
1374
      return (self.CONTINUE,
1375
              "Dependency job %s finished with status '%s'" %
1376
              (dep_job_id, status))
1377

    
1378
    else:
1379
      return (self.WRONGSTATUS,
1380
              "Dependency job %s finished with status '%s',"
1381
              " not one of '%s' as required" %
1382
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1383

    
1384
  @locking.ssynchronized(_LOCK)
1385
  def NotifyWaiters(self, job_id):
1386
    """Notifies all jobs waiting for a certain job ID.
1387

1388
    @type job_id: string
1389
    @param job_id: Job ID
1390

1391
    """
1392
    assert ht.TString(job_id)
1393

    
1394
    jobs = self._waiters.pop(job_id, None)
1395
    if jobs:
1396
      # Re-add jobs to workerpool
1397
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1398
                    len(jobs), job_id)
1399
      self._enqueue_fn(jobs)
1400

    
1401
    # Remove all jobs without actual waiters
1402
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1403
                   if not waiters]:
1404
      del self._waiters[job_id]
1405

    
1406

    
1407
def _RequireOpenQueue(fn):
1408
  """Decorator for "public" functions.
1409

1410
  This function should be used for all 'public' functions. That is,
1411
  functions usually called from other classes. Note that this should
1412
  be applied only to methods (not plain functions), since it expects
1413
  that the decorated function is called with a first argument that has
1414
  a '_queue_filelock' argument.
1415

1416
  @warning: Use this decorator only after locking.ssynchronized
1417

1418
  Example::
1419
    @locking.ssynchronized(_LOCK)
1420
    @_RequireOpenQueue
1421
    def Example(self):
1422
      pass
1423

1424
  """
1425
  def wrapper(self, *args, **kwargs):
1426
    # pylint: disable-msg=W0212
1427
    assert self._queue_filelock is not None, "Queue should be open"
1428
    return fn(self, *args, **kwargs)
1429
  return wrapper
1430

    
1431

    
1432
class JobQueue(object):
1433
  """Queue used to manage the jobs.
1434

1435
  @cvar _RE_JOB_FILE: regex matching the valid job file names
1436

1437
  """
1438
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1439

    
1440
  def __init__(self, context):
1441
    """Constructor for JobQueue.
1442

1443
    The constructor will initialize the job queue object and then
1444
    start loading the current jobs from disk, either for starting them
1445
    (if they were queue) or for aborting them (if they were already
1446
    running).
1447

1448
    @type context: GanetiContext
1449
    @param context: the context object for access to the configuration
1450
        data and other ganeti objects
1451

1452
    """
1453
    self.context = context
1454
    self._memcache = weakref.WeakValueDictionary()
1455
    self._my_hostname = netutils.Hostname.GetSysName()
1456

    
1457
    # The Big JobQueue lock. If a code block or method acquires it in shared
1458
    # mode safe it must guarantee concurrency with all the code acquiring it in
1459
    # shared mode, including itself. In order not to acquire it at all
1460
    # concurrency must be guaranteed with all code acquiring it in shared mode
1461
    # and all code acquiring it exclusively.
1462
    self._lock = locking.SharedLock("JobQueue")
1463

    
1464
    self.acquire = self._lock.acquire
1465
    self.release = self._lock.release
1466

    
1467
    # Initialize the queue, and acquire the filelock.
1468
    # This ensures no other process is working on the job queue.
1469
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1470

    
1471
    # Read serial file
1472
    self._last_serial = jstore.ReadSerial()
1473
    assert self._last_serial is not None, ("Serial file was modified between"
1474
                                           " check in jstore and here")
1475

    
1476
    # Get initial list of nodes
1477
    self._nodes = dict((n.name, n.primary_ip)
1478
                       for n in self.context.cfg.GetAllNodesInfo().values()
1479
                       if n.master_candidate)
1480

    
1481
    # Remove master node
1482
    self._nodes.pop(self._my_hostname, None)
1483

    
1484
    # TODO: Check consistency across nodes
1485

    
1486
    self._queue_size = 0
1487
    self._UpdateQueueSizeUnlocked()
1488
    self._drained = jstore.CheckDrainFlag()
1489

    
1490
    # Job dependencies
1491
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1492
                                        self._EnqueueJobs)
1493

    
1494
    # Setup worker pool
1495
    self._wpool = _JobQueueWorkerPool(self)
1496
    try:
1497
      self._InspectQueue()
1498
    except:
1499
      self._wpool.TerminateWorkers()
1500
      raise
1501

    
1502
  @locking.ssynchronized(_LOCK)
1503
  @_RequireOpenQueue
1504
  def _InspectQueue(self):
1505
    """Loads the whole job queue and resumes unfinished jobs.
1506

1507
    This function needs the lock here because WorkerPool.AddTask() may start a
1508
    job while we're still doing our work.
1509

1510
    """
1511
    logging.info("Inspecting job queue")
1512

    
1513
    restartjobs = []
1514

    
1515
    all_job_ids = self._GetJobIDsUnlocked()
1516
    jobs_count = len(all_job_ids)
1517
    lastinfo = time.time()
1518
    for idx, job_id in enumerate(all_job_ids):
1519
      # Give an update every 1000 jobs or 10 seconds
1520
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1521
          idx == (jobs_count - 1)):
1522
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1523
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1524
        lastinfo = time.time()
1525

    
1526
      job = self._LoadJobUnlocked(job_id)
1527

    
1528
      # a failure in loading the job can cause 'None' to be returned
1529
      if job is None:
1530
        continue
1531

    
1532
      status = job.CalcStatus()
1533

    
1534
      if status == constants.JOB_STATUS_QUEUED:
1535
        restartjobs.append(job)
1536

    
1537
      elif status in (constants.JOB_STATUS_RUNNING,
1538
                      constants.JOB_STATUS_WAITLOCK,
1539
                      constants.JOB_STATUS_CANCELING):
1540
        logging.warning("Unfinished job %s found: %s", job.id, job)
1541

    
1542
        if status == constants.JOB_STATUS_WAITLOCK:
1543
          # Restart job
1544
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1545
          restartjobs.append(job)
1546
        else:
1547
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1548
                                "Unclean master daemon shutdown")
1549
          job.Finalize()
1550

    
1551
        self.UpdateJobUnlocked(job)
1552

    
1553
    if restartjobs:
1554
      logging.info("Restarting %s jobs", len(restartjobs))
1555
      self._EnqueueJobs(restartjobs)
1556

    
1557
    logging.info("Job queue inspection finished")
1558

    
1559
  @locking.ssynchronized(_LOCK)
1560
  @_RequireOpenQueue
1561
  def AddNode(self, node):
1562
    """Register a new node with the queue.
1563

1564
    @type node: L{objects.Node}
1565
    @param node: the node object to be added
1566

1567
    """
1568
    node_name = node.name
1569
    assert node_name != self._my_hostname
1570

    
1571
    # Clean queue directory on added node
1572
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1573
    msg = result.fail_msg
1574
    if msg:
1575
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1576
                      node_name, msg)
1577

    
1578
    if not node.master_candidate:
1579
      # remove if existing, ignoring errors
1580
      self._nodes.pop(node_name, None)
1581
      # and skip the replication of the job ids
1582
      return
1583

    
1584
    # Upload the whole queue excluding archived jobs
1585
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1586

    
1587
    # Upload current serial file
1588
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1589

    
1590
    for file_name in files:
1591
      # Read file content
1592
      content = utils.ReadFile(file_name)
1593

    
1594
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
1595
                                                  [node.primary_ip],
1596
                                                  file_name, content)
1597
      msg = result[node_name].fail_msg
1598
      if msg:
1599
        logging.error("Failed to upload file %s to node %s: %s",
1600
                      file_name, node_name, msg)
1601

    
1602
    self._nodes[node_name] = node.primary_ip
1603

    
1604
  @locking.ssynchronized(_LOCK)
1605
  @_RequireOpenQueue
1606
  def RemoveNode(self, node_name):
1607
    """Callback called when removing nodes from the cluster.
1608

1609
    @type node_name: str
1610
    @param node_name: the name of the node to remove
1611

1612
    """
1613
    self._nodes.pop(node_name, None)
1614

    
1615
  @staticmethod
1616
  def _CheckRpcResult(result, nodes, failmsg):
1617
    """Verifies the status of an RPC call.
1618

1619
    Since we aim to keep consistency should this node (the current
1620
    master) fail, we will log errors if our rpc fail, and especially
1621
    log the case when more than half of the nodes fails.
1622

1623
    @param result: the data as returned from the rpc call
1624
    @type nodes: list
1625
    @param nodes: the list of nodes we made the call to
1626
    @type failmsg: str
1627
    @param failmsg: the identifier to be used for logging
1628

1629
    """
1630
    failed = []
1631
    success = []
1632

    
1633
    for node in nodes:
1634
      msg = result[node].fail_msg
1635
      if msg:
1636
        failed.append(node)
1637
        logging.error("RPC call %s (%s) failed on node %s: %s",
1638
                      result[node].call, failmsg, node, msg)
1639
      else:
1640
        success.append(node)
1641

    
1642
    # +1 for the master node
1643
    if (len(success) + 1) < len(failed):
1644
      # TODO: Handle failing nodes
1645
      logging.error("More than half of the nodes failed")
1646

    
1647
  def _GetNodeIp(self):
1648
    """Helper for returning the node name/ip list.
1649

1650
    @rtype: (list, list)
1651
    @return: a tuple of two lists, the first one with the node
1652
        names and the second one with the node addresses
1653

1654
    """
1655
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1656
    name_list = self._nodes.keys()
1657
    addr_list = [self._nodes[name] for name in name_list]
1658
    return name_list, addr_list
1659

    
1660
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1661
    """Writes a file locally and then replicates it to all nodes.
1662

1663
    This function will replace the contents of a file on the local
1664
    node and then replicate it to all the other nodes we have.
1665

1666
    @type file_name: str
1667
    @param file_name: the path of the file to be replicated
1668
    @type data: str
1669
    @param data: the new contents of the file
1670
    @type replicate: boolean
1671
    @param replicate: whether to spread the changes to the remote nodes
1672

1673
    """
1674
    getents = runtime.GetEnts()
1675
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1676
                    gid=getents.masterd_gid)
1677

    
1678
    if replicate:
1679
      names, addrs = self._GetNodeIp()
1680
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1681
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1682

    
1683
  def _RenameFilesUnlocked(self, rename):
1684
    """Renames a file locally and then replicate the change.
1685

1686
    This function will rename a file in the local queue directory
1687
    and then replicate this rename to all the other nodes we have.
1688

1689
    @type rename: list of (old, new)
1690
    @param rename: List containing tuples mapping old to new names
1691

1692
    """
1693
    # Rename them locally
1694
    for old, new in rename:
1695
      utils.RenameFile(old, new, mkdir=True)
1696

    
1697
    # ... and on all nodes
1698
    names, addrs = self._GetNodeIp()
1699
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1700
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1701

    
1702
  @staticmethod
1703
  def _FormatJobID(job_id):
1704
    """Convert a job ID to string format.
1705

1706
    Currently this just does C{str(job_id)} after performing some
1707
    checks, but if we want to change the job id format this will
1708
    abstract this change.
1709

1710
    @type job_id: int or long
1711
    @param job_id: the numeric job id
1712
    @rtype: str
1713
    @return: the formatted job id
1714

1715
    """
1716
    if not isinstance(job_id, (int, long)):
1717
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1718
    if job_id < 0:
1719
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1720

    
1721
    return str(job_id)
1722

    
1723
  @classmethod
1724
  def _GetArchiveDirectory(cls, job_id):
1725
    """Returns the archive directory for a job.
1726

1727
    @type job_id: str
1728
    @param job_id: Job identifier
1729
    @rtype: str
1730
    @return: Directory name
1731

1732
    """
1733
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1734

    
1735
  def _NewSerialsUnlocked(self, count):
1736
    """Generates a new job identifier.
1737

1738
    Job identifiers are unique during the lifetime of a cluster.
1739

1740
    @type count: integer
1741
    @param count: how many serials to return
1742
    @rtype: str
1743
    @return: a string representing the job identifier.
1744

1745
    """
1746
    assert count > 0
1747
    # New number
1748
    serial = self._last_serial + count
1749

    
1750
    # Write to file
1751
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1752
                             "%s\n" % serial, True)
1753

    
1754
    result = [self._FormatJobID(v)
1755
              for v in range(self._last_serial + 1, serial + 1)]
1756

    
1757
    # Keep it only if we were able to write the file
1758
    self._last_serial = serial
1759

    
1760
    assert len(result) == count
1761

    
1762
    return result
1763

    
1764
  @staticmethod
1765
  def _GetJobPath(job_id):
1766
    """Returns the job file for a given job id.
1767

1768
    @type job_id: str
1769
    @param job_id: the job identifier
1770
    @rtype: str
1771
    @return: the path to the job file
1772

1773
    """
1774
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1775

    
1776
  @classmethod
1777
  def _GetArchivedJobPath(cls, job_id):
1778
    """Returns the archived job file for a give job id.
1779

1780
    @type job_id: str
1781
    @param job_id: the job identifier
1782
    @rtype: str
1783
    @return: the path to the archived job file
1784

1785
    """
1786
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1787
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1788

    
1789
  def _GetJobIDsUnlocked(self, sort=True):
1790
    """Return all known job IDs.
1791

1792
    The method only looks at disk because it's a requirement that all
1793
    jobs are present on disk (so in the _memcache we don't have any
1794
    extra IDs).
1795

1796
    @type sort: boolean
1797
    @param sort: perform sorting on the returned job ids
1798
    @rtype: list
1799
    @return: the list of job IDs
1800

1801
    """
1802
    jlist = []
1803
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1804
      m = self._RE_JOB_FILE.match(filename)
1805
      if m:
1806
        jlist.append(m.group(1))
1807
    if sort:
1808
      jlist = utils.NiceSort(jlist)
1809
    return jlist
1810

    
1811
  def _LoadJobUnlocked(self, job_id):
1812
    """Loads a job from the disk or memory.
1813

1814
    Given a job id, this will return the cached job object if
1815
    existing, or try to load the job from the disk. If loading from
1816
    disk, it will also add the job to the cache.
1817

1818
    @param job_id: the job id
1819
    @rtype: L{_QueuedJob} or None
1820
    @return: either None or the job object
1821

1822
    """
1823
    job = self._memcache.get(job_id, None)
1824
    if job:
1825
      logging.debug("Found job %s in memcache", job_id)
1826
      return job
1827

    
1828
    try:
1829
      job = self._LoadJobFromDisk(job_id, False)
1830
      if job is None:
1831
        return job
1832
    except errors.JobFileCorrupted:
1833
      old_path = self._GetJobPath(job_id)
1834
      new_path = self._GetArchivedJobPath(job_id)
1835
      if old_path == new_path:
1836
        # job already archived (future case)
1837
        logging.exception("Can't parse job %s", job_id)
1838
      else:
1839
        # non-archived case
1840
        logging.exception("Can't parse job %s, will archive.", job_id)
1841
        self._RenameFilesUnlocked([(old_path, new_path)])
1842
      return None
1843

    
1844
    self._memcache[job_id] = job
1845
    logging.debug("Added job %s to the cache", job_id)
1846
    return job
1847

    
1848
  def _LoadJobFromDisk(self, job_id, try_archived):
1849
    """Load the given job file from disk.
1850

1851
    Given a job file, read, load and restore it in a _QueuedJob format.
1852

1853
    @type job_id: string
1854
    @param job_id: job identifier
1855
    @type try_archived: bool
1856
    @param try_archived: Whether to try loading an archived job
1857
    @rtype: L{_QueuedJob} or None
1858
    @return: either None or the job object
1859

1860
    """
1861
    path_functions = [self._GetJobPath]
1862

    
1863
    if try_archived:
1864
      path_functions.append(self._GetArchivedJobPath)
1865

    
1866
    raw_data = None
1867

    
1868
    for fn in path_functions:
1869
      filepath = fn(job_id)
1870
      logging.debug("Loading job from %s", filepath)
1871
      try:
1872
        raw_data = utils.ReadFile(filepath)
1873
      except EnvironmentError, err:
1874
        if err.errno != errno.ENOENT:
1875
          raise
1876
      else:
1877
        break
1878

    
1879
    if not raw_data:
1880
      return None
1881

    
1882
    try:
1883
      data = serializer.LoadJson(raw_data)
1884
      job = _QueuedJob.Restore(self, data)
1885
    except Exception, err: # pylint: disable-msg=W0703
1886
      raise errors.JobFileCorrupted(err)
1887

    
1888
    return job
1889

    
1890
  def SafeLoadJobFromDisk(self, job_id, try_archived):
1891
    """Load the given job file from disk.
1892

1893
    Given a job file, read, load and restore it in a _QueuedJob format.
1894
    In case of error reading the job, it gets returned as None, and the
1895
    exception is logged.
1896

1897
    @type job_id: string
1898
    @param job_id: job identifier
1899
    @type try_archived: bool
1900
    @param try_archived: Whether to try loading an archived job
1901
    @rtype: L{_QueuedJob} or None
1902
    @return: either None or the job object
1903

1904
    """
1905
    try:
1906
      return self._LoadJobFromDisk(job_id, try_archived)
1907
    except (errors.JobFileCorrupted, EnvironmentError):
1908
      logging.exception("Can't load/parse job %s", job_id)
1909
      return None
1910

    
1911
  def _UpdateQueueSizeUnlocked(self):
1912
    """Update the queue size.
1913

1914
    """
1915
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1916

    
1917
  @locking.ssynchronized(_LOCK)
1918
  @_RequireOpenQueue
1919
  def SetDrainFlag(self, drain_flag):
1920
    """Sets the drain flag for the queue.
1921

1922
    @type drain_flag: boolean
1923
    @param drain_flag: Whether to set or unset the drain flag
1924

1925
    """
1926
    jstore.SetDrainFlag(drain_flag)
1927

    
1928
    self._drained = drain_flag
1929

    
1930
    return True
1931

    
1932
  @_RequireOpenQueue
1933
  def _SubmitJobUnlocked(self, job_id, ops):
1934
    """Create and store a new job.
1935

1936
    This enters the job into our job queue and also puts it on the new
1937
    queue, in order for it to be picked up by the queue processors.
1938

1939
    @type job_id: job ID
1940
    @param job_id: the job ID for the new job
1941
    @type ops: list
1942
    @param ops: The list of OpCodes that will become the new job.
1943
    @rtype: L{_QueuedJob}
1944
    @return: the job object to be queued
1945
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1946
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1947
    @raise errors.GenericError: If an opcode is not valid
1948

1949
    """
1950
    # Ok when sharing the big job queue lock, as the drain file is created when
1951
    # the lock is exclusive.
1952
    if self._drained:
1953
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1954

    
1955
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1956
      raise errors.JobQueueFull()
1957

    
1958
    job = _QueuedJob(self, job_id, ops)
1959

    
1960
    # Check priority
1961
    for idx, op in enumerate(job.ops):
1962
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1963
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1964
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1965
                                  " are %s" % (idx, op.priority, allowed))
1966

    
1967
    # Write to disk
1968
    self.UpdateJobUnlocked(job)
1969

    
1970
    self._queue_size += 1
1971

    
1972
    logging.debug("Adding new job %s to the cache", job_id)
1973
    self._memcache[job_id] = job
1974

    
1975
    return job
1976

    
1977
  @locking.ssynchronized(_LOCK)
1978
  @_RequireOpenQueue
1979
  def SubmitJob(self, ops):
1980
    """Create and store a new job.
1981

1982
    @see: L{_SubmitJobUnlocked}
1983

1984
    """
1985
    job_id = self._NewSerialsUnlocked(1)[0]
1986
    self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1987
    return job_id
1988

    
1989
  @locking.ssynchronized(_LOCK)
1990
  @_RequireOpenQueue
1991
  def SubmitManyJobs(self, jobs):
1992
    """Create and store multiple jobs.
1993

1994
    @see: L{_SubmitJobUnlocked}
1995

1996
    """
1997
    results = []
1998
    added_jobs = []
1999
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2000
    for job_id, ops in zip(all_job_ids, jobs):
2001
      try:
2002
        added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
2003
        status = True
2004
        data = job_id
2005
      except errors.GenericError, err:
2006
        data = ("%s; opcodes %s" %
2007
                (err, utils.CommaJoin(op.Summary() for op in ops)))
2008
        status = False
2009
      results.append((status, data))
2010

    
2011
    self._EnqueueJobs(added_jobs)
2012

    
2013
    return results
2014

    
2015
  def _EnqueueJobs(self, jobs):
2016
    """Helper function to add jobs to worker pool's queue.
2017

2018
    @type jobs: list
2019
    @param jobs: List of all jobs
2020

2021
    """
2022
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2023
                             priority=[job.CalcPriority() for job in jobs])
2024

    
2025
  def _GetJobStatusForDependencies(self, job_id):
2026
    """Gets the status of a job for dependencies.
2027

2028
    @type job_id: string
2029
    @param job_id: Job ID
2030
    @raise errors.JobLost: If job can't be found
2031

2032
    """
2033
    if not isinstance(job_id, basestring):
2034
      job_id = self._FormatJobID(job_id)
2035

    
2036
    # Not using in-memory cache as doing so would require an exclusive lock
2037

    
2038
    # Try to load from disk
2039
    job = self.SafeLoadJobFromDisk(job_id, True)
2040

    
2041
    if job:
2042
      return job.CalcStatus()
2043

    
2044
    raise errors.JobLost("Job %s not found" % job_id)
2045

    
2046
  @_RequireOpenQueue
2047
  def UpdateJobUnlocked(self, job, replicate=True):
2048
    """Update a job's on disk storage.
2049

2050
    After a job has been modified, this function needs to be called in
2051
    order to write the changes to disk and replicate them to the other
2052
    nodes.
2053

2054
    @type job: L{_QueuedJob}
2055
    @param job: the changed job
2056
    @type replicate: boolean
2057
    @param replicate: whether to replicate the change to remote nodes
2058

2059
    """
2060
    if __debug__:
2061
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2062
      assert (finalized ^ (job.end_timestamp is None))
2063

    
2064
    filename = self._GetJobPath(job.id)
2065
    data = serializer.DumpJson(job.Serialize(), indent=False)
2066
    logging.debug("Writing job %s to %s", job.id, filename)
2067
    self._UpdateJobQueueFile(filename, data, replicate)
2068

    
2069
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2070
                        timeout):
2071
    """Waits for changes in a job.
2072

2073
    @type job_id: string
2074
    @param job_id: Job identifier
2075
    @type fields: list of strings
2076
    @param fields: Which fields to check for changes
2077
    @type prev_job_info: list or None
2078
    @param prev_job_info: Last job information returned
2079
    @type prev_log_serial: int
2080
    @param prev_log_serial: Last job message serial number
2081
    @type timeout: float
2082
    @param timeout: maximum time to wait in seconds
2083
    @rtype: tuple (job info, log entries)
2084
    @return: a tuple of the job information as required via
2085
        the fields parameter, and the log entries as a list
2086

2087
        if the job has not changed and the timeout has expired,
2088
        we instead return a special value,
2089
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2090
        as such by the clients
2091

2092
    """
2093
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False)
2094

    
2095
    helper = _WaitForJobChangesHelper()
2096

    
2097
    return helper(self._GetJobPath(job_id), load_fn,
2098
                  fields, prev_job_info, prev_log_serial, timeout)
2099

    
2100
  @locking.ssynchronized(_LOCK)
2101
  @_RequireOpenQueue
2102
  def CancelJob(self, job_id):
2103
    """Cancels a job.
2104

2105
    This will only succeed if the job has not started yet.
2106

2107
    @type job_id: string
2108
    @param job_id: job ID of job to be cancelled.
2109

2110
    """
2111
    logging.info("Cancelling job %s", job_id)
2112

    
2113
    job = self._LoadJobUnlocked(job_id)
2114
    if not job:
2115
      logging.debug("Job %s not found", job_id)
2116
      return (False, "Job %s not found" % job_id)
2117

    
2118
    (success, msg) = job.Cancel()
2119

    
2120
    if success:
2121
      # If the job was finalized (e.g. cancelled), this is the final write
2122
      # allowed. The job can be archived anytime.
2123
      self.UpdateJobUnlocked(job)
2124

    
2125
    return (success, msg)
2126

    
2127
  @_RequireOpenQueue
2128
  def _ArchiveJobsUnlocked(self, jobs):
2129
    """Archives jobs.
2130

2131
    @type jobs: list of L{_QueuedJob}
2132
    @param jobs: Job objects
2133
    @rtype: int
2134
    @return: Number of archived jobs
2135

2136
    """
2137
    archive_jobs = []
2138
    rename_files = []
2139
    for job in jobs:
2140
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2141
        logging.debug("Job %s is not yet done", job.id)
2142
        continue
2143

    
2144
      archive_jobs.append(job)
2145

    
2146
      old = self._GetJobPath(job.id)
2147
      new = self._GetArchivedJobPath(job.id)
2148
      rename_files.append((old, new))
2149

    
2150
    # TODO: What if 1..n files fail to rename?
2151
    self._RenameFilesUnlocked(rename_files)
2152

    
2153
    logging.debug("Successfully archived job(s) %s",
2154
                  utils.CommaJoin(job.id for job in archive_jobs))
2155

    
2156
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2157
    # the files, we update the cached queue size from the filesystem. When we
2158
    # get around to fix the TODO: above, we can use the number of actually
2159
    # archived jobs to fix this.
2160
    self._UpdateQueueSizeUnlocked()
2161
    return len(archive_jobs)
2162

    
2163
  @locking.ssynchronized(_LOCK)
2164
  @_RequireOpenQueue
2165
  def ArchiveJob(self, job_id):
2166
    """Archives a job.
2167

2168
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2169

2170
    @type job_id: string
2171
    @param job_id: Job ID of job to be archived.
2172
    @rtype: bool
2173
    @return: Whether job was archived
2174

2175
    """
2176
    logging.info("Archiving job %s", job_id)
2177

    
2178
    job = self._LoadJobUnlocked(job_id)
2179
    if not job:
2180
      logging.debug("Job %s not found", job_id)
2181
      return False
2182

    
2183
    return self._ArchiveJobsUnlocked([job]) == 1
2184

    
2185
  @locking.ssynchronized(_LOCK)
2186
  @_RequireOpenQueue
2187
  def AutoArchiveJobs(self, age, timeout):
2188
    """Archives all jobs based on age.
2189

2190
    The method will archive all jobs which are older than the age
2191
    parameter. For jobs that don't have an end timestamp, the start
2192
    timestamp will be considered. The special '-1' age will cause
2193
    archival of all jobs (that are not running or queued).
2194

2195
    @type age: int
2196
    @param age: the minimum age in seconds
2197

2198
    """
2199
    logging.info("Archiving jobs with age more than %s seconds", age)
2200

    
2201
    now = time.time()
2202
    end_time = now + timeout
2203
    archived_count = 0
2204
    last_touched = 0
2205

    
2206
    all_job_ids = self._GetJobIDsUnlocked()
2207
    pending = []
2208
    for idx, job_id in enumerate(all_job_ids):
2209
      last_touched = idx + 1
2210

    
2211
      # Not optimal because jobs could be pending
2212
      # TODO: Measure average duration for job archival and take number of
2213
      # pending jobs into account.
2214
      if time.time() > end_time:
2215
        break
2216

    
2217
      # Returns None if the job failed to load
2218
      job = self._LoadJobUnlocked(job_id)
2219
      if job:
2220
        if job.end_timestamp is None:
2221
          if job.start_timestamp is None:
2222
            job_age = job.received_timestamp
2223
          else:
2224
            job_age = job.start_timestamp
2225
        else:
2226
          job_age = job.end_timestamp
2227

    
2228
        if age == -1 or now - job_age[0] > age:
2229
          pending.append(job)
2230

    
2231
          # Archive 10 jobs at a time
2232
          if len(pending) >= 10:
2233
            archived_count += self._ArchiveJobsUnlocked(pending)
2234
            pending = []
2235

    
2236
    if pending:
2237
      archived_count += self._ArchiveJobsUnlocked(pending)
2238

    
2239
    return (archived_count, len(all_job_ids) - last_touched)
2240

    
2241
  def QueryJobs(self, job_ids, fields):
2242
    """Returns a list of jobs in queue.
2243

2244
    @type job_ids: list
2245
    @param job_ids: sequence of job identifiers or None for all
2246
    @type fields: list
2247
    @param fields: names of fields to return
2248
    @rtype: list
2249
    @return: list one element per job, each element being list with
2250
        the requested fields
2251

2252
    """
2253
    jobs = []
2254
    list_all = False
2255
    if not job_ids:
2256
      # Since files are added to/removed from the queue atomically, there's no
2257
      # risk of getting the job ids in an inconsistent state.
2258
      job_ids = self._GetJobIDsUnlocked()
2259
      list_all = True
2260

    
2261
    for job_id in job_ids:
2262
      job = self.SafeLoadJobFromDisk(job_id, True)
2263
      if job is not None:
2264
        jobs.append(job.GetInfo(fields))
2265
      elif not list_all:
2266
        jobs.append(None)
2267

    
2268
    return jobs
2269

    
2270
  @locking.ssynchronized(_LOCK)
2271
  @_RequireOpenQueue
2272
  def Shutdown(self):
2273
    """Stops the job queue.
2274

2275
    This shutdowns all the worker threads an closes the queue.
2276

2277
    """
2278
    self._wpool.TerminateWorkers()
2279

    
2280
    self._queue_filelock.Close()
2281
    self._queue_filelock = None