Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ e35344b4

History | View | Annotate | Download (48.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import errno
35
import re
36
import time
37
import weakref
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import netutils
57
from ganeti import compat
58

    
59

    
60
JOBQUEUE_THREADS = 25
61
JOBS_PER_ARCHIVE_DIRECTORY = 10000
62

    
63
# member lock names to be passed to @ssynchronized decorator
64
_LOCK = "_lock"
65
_QUEUE = "_queue"
66

    
67

    
68
class CancelJob(Exception):
69
  """Special exception to cancel a job.
70

71
  """
72

    
73

    
74
def TimeStampNow():
75
  """Returns the current timestamp.
76

77
  @rtype: tuple
78
  @return: the current time in the (seconds, microseconds) format
79

80
  """
81
  return utils.SplitTime(time.time())
82

    
83

    
84
class _QueuedOpCode(object):
85
  """Encapsulates an opcode object.
86

87
  @ivar log: holds the execution log and consists of tuples
88
  of the form C{(log_serial, timestamp, level, message)}
89
  @ivar input: the OpCode we encapsulate
90
  @ivar status: the current status
91
  @ivar result: the result of the LU execution
92
  @ivar start_timestamp: timestamp for the start of the execution
93
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94
  @ivar stop_timestamp: timestamp for the end of the execution
95

96
  """
97
  __slots__ = ["input", "status", "result", "log",
98
               "start_timestamp", "exec_timestamp", "end_timestamp",
99
               "__weakref__"]
100

    
101
  def __init__(self, op):
102
    """Constructor for the _QuededOpCode.
103

104
    @type op: L{opcodes.OpCode}
105
    @param op: the opcode we encapsulate
106

107
    """
108
    self.input = op
109
    self.status = constants.OP_STATUS_QUEUED
110
    self.result = None
111
    self.log = []
112
    self.start_timestamp = None
113
    self.exec_timestamp = None
114
    self.end_timestamp = None
115

    
116
  @classmethod
117
  def Restore(cls, state):
118
    """Restore the _QueuedOpCode from the serialized form.
119

120
    @type state: dict
121
    @param state: the serialized state
122
    @rtype: _QueuedOpCode
123
    @return: a new _QueuedOpCode instance
124

125
    """
126
    obj = _QueuedOpCode.__new__(cls)
127
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
128
    obj.status = state["status"]
129
    obj.result = state["result"]
130
    obj.log = state["log"]
131
    obj.start_timestamp = state.get("start_timestamp", None)
132
    obj.exec_timestamp = state.get("exec_timestamp", None)
133
    obj.end_timestamp = state.get("end_timestamp", None)
134
    return obj
135

    
136
  def Serialize(self):
137
    """Serializes this _QueuedOpCode.
138

139
    @rtype: dict
140
    @return: the dictionary holding the serialized state
141

142
    """
143
    return {
144
      "input": self.input.__getstate__(),
145
      "status": self.status,
146
      "result": self.result,
147
      "log": self.log,
148
      "start_timestamp": self.start_timestamp,
149
      "exec_timestamp": self.exec_timestamp,
150
      "end_timestamp": self.end_timestamp,
151
      }
152

    
153

    
154
class _QueuedJob(object):
155
  """In-memory job representation.
156

157
  This is what we use to track the user-submitted jobs. Locking must
158
  be taken care of by users of this class.
159

160
  @type queue: L{JobQueue}
161
  @ivar queue: the parent queue
162
  @ivar id: the job ID
163
  @type ops: list
164
  @ivar ops: the list of _QueuedOpCode that constitute the job
165
  @type log_serial: int
166
  @ivar log_serial: holds the index for the next log entry
167
  @ivar received_timestamp: the timestamp for when the job was received
168
  @ivar start_timestmap: the timestamp for start of execution
169
  @ivar end_timestamp: the timestamp for end of execution
170
  @ivar lock_status: In-memory locking information for debugging
171

172
  """
173
  # pylint: disable-msg=W0212
174
  __slots__ = ["queue", "id", "ops", "log_serial",
175
               "received_timestamp", "start_timestamp", "end_timestamp",
176
               "lock_status", "change",
177
               "__weakref__"]
178

    
179
  def __init__(self, queue, job_id, ops):
180
    """Constructor for the _QueuedJob.
181

182
    @type queue: L{JobQueue}
183
    @param queue: our parent queue
184
    @type job_id: job_id
185
    @param job_id: our job id
186
    @type ops: list
187
    @param ops: the list of opcodes we hold, which will be encapsulated
188
        in _QueuedOpCodes
189

190
    """
191
    if not ops:
192
      raise errors.GenericError("A job needs at least one opcode")
193

    
194
    self.queue = queue
195
    self.id = job_id
196
    self.ops = [_QueuedOpCode(op) for op in ops]
197
    self.log_serial = 0
198
    self.received_timestamp = TimeStampNow()
199
    self.start_timestamp = None
200
    self.end_timestamp = None
201

    
202
    # In-memory attributes
203
    self.lock_status = None
204

    
205
  def __repr__(self):
206
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
207
              "id=%s" % self.id,
208
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
209

    
210
    return "<%s at %#x>" % (" ".join(status), id(self))
211

    
212
  @classmethod
213
  def Restore(cls, queue, state):
214
    """Restore a _QueuedJob from serialized state:
215

216
    @type queue: L{JobQueue}
217
    @param queue: to which queue the restored job belongs
218
    @type state: dict
219
    @param state: the serialized state
220
    @rtype: _JobQueue
221
    @return: the restored _JobQueue instance
222

223
    """
224
    obj = _QueuedJob.__new__(cls)
225
    obj.queue = queue
226
    obj.id = state["id"]
227
    obj.received_timestamp = state.get("received_timestamp", None)
228
    obj.start_timestamp = state.get("start_timestamp", None)
229
    obj.end_timestamp = state.get("end_timestamp", None)
230

    
231
    # In-memory attributes
232
    obj.lock_status = None
233

    
234
    obj.ops = []
235
    obj.log_serial = 0
236
    for op_state in state["ops"]:
237
      op = _QueuedOpCode.Restore(op_state)
238
      for log_entry in op.log:
239
        obj.log_serial = max(obj.log_serial, log_entry[0])
240
      obj.ops.append(op)
241

    
242
    return obj
243

    
244
  def Serialize(self):
245
    """Serialize the _JobQueue instance.
246

247
    @rtype: dict
248
    @return: the serialized state
249

250
    """
251
    return {
252
      "id": self.id,
253
      "ops": [op.Serialize() for op in self.ops],
254
      "start_timestamp": self.start_timestamp,
255
      "end_timestamp": self.end_timestamp,
256
      "received_timestamp": self.received_timestamp,
257
      }
258

    
259
  def CalcStatus(self):
260
    """Compute the status of this job.
261

262
    This function iterates over all the _QueuedOpCodes in the job and
263
    based on their status, computes the job status.
264

265
    The algorithm is:
266
      - if we find a cancelled, or finished with error, the job
267
        status will be the same
268
      - otherwise, the last opcode with the status one of:
269
          - waitlock
270
          - canceling
271
          - running
272

273
        will determine the job status
274

275
      - otherwise, it means either all opcodes are queued, or success,
276
        and the job status will be the same
277

278
    @return: the job status
279

280
    """
281
    status = constants.JOB_STATUS_QUEUED
282

    
283
    all_success = True
284
    for op in self.ops:
285
      if op.status == constants.OP_STATUS_SUCCESS:
286
        continue
287

    
288
      all_success = False
289

    
290
      if op.status == constants.OP_STATUS_QUEUED:
291
        pass
292
      elif op.status == constants.OP_STATUS_WAITLOCK:
293
        status = constants.JOB_STATUS_WAITLOCK
294
      elif op.status == constants.OP_STATUS_RUNNING:
295
        status = constants.JOB_STATUS_RUNNING
296
      elif op.status == constants.OP_STATUS_CANCELING:
297
        status = constants.JOB_STATUS_CANCELING
298
        break
299
      elif op.status == constants.OP_STATUS_ERROR:
300
        status = constants.JOB_STATUS_ERROR
301
        # The whole job fails if one opcode failed
302
        break
303
      elif op.status == constants.OP_STATUS_CANCELED:
304
        status = constants.OP_STATUS_CANCELED
305
        break
306

    
307
    if all_success:
308
      status = constants.JOB_STATUS_SUCCESS
309

    
310
    return status
311

    
312
  def GetLogEntries(self, newer_than):
313
    """Selectively returns the log entries.
314

315
    @type newer_than: None or int
316
    @param newer_than: if this is None, return all log entries,
317
        otherwise return only the log entries with serial higher
318
        than this value
319
    @rtype: list
320
    @return: the list of the log entries selected
321

322
    """
323
    if newer_than is None:
324
      serial = -1
325
    else:
326
      serial = newer_than
327

    
328
    entries = []
329
    for op in self.ops:
330
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
331

    
332
    return entries
333

    
334
  def GetInfo(self, fields):
335
    """Returns information about a job.
336

337
    @type fields: list
338
    @param fields: names of fields to return
339
    @rtype: list
340
    @return: list with one element for each field
341
    @raise errors.OpExecError: when an invalid field
342
        has been passed
343

344
    """
345
    row = []
346
    for fname in fields:
347
      if fname == "id":
348
        row.append(self.id)
349
      elif fname == "status":
350
        row.append(self.CalcStatus())
351
      elif fname == "ops":
352
        row.append([op.input.__getstate__() for op in self.ops])
353
      elif fname == "opresult":
354
        row.append([op.result for op in self.ops])
355
      elif fname == "opstatus":
356
        row.append([op.status for op in self.ops])
357
      elif fname == "oplog":
358
        row.append([op.log for op in self.ops])
359
      elif fname == "opstart":
360
        row.append([op.start_timestamp for op in self.ops])
361
      elif fname == "opexec":
362
        row.append([op.exec_timestamp for op in self.ops])
363
      elif fname == "opend":
364
        row.append([op.end_timestamp for op in self.ops])
365
      elif fname == "received_ts":
366
        row.append(self.received_timestamp)
367
      elif fname == "start_ts":
368
        row.append(self.start_timestamp)
369
      elif fname == "end_ts":
370
        row.append(self.end_timestamp)
371
      elif fname == "lock_status":
372
        row.append(self.lock_status)
373
      elif fname == "summary":
374
        row.append([op.input.Summary() for op in self.ops])
375
      else:
376
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
377
    return row
378

    
379
  def MarkUnfinishedOps(self, status, result):
380
    """Mark unfinished opcodes with a given status and result.
381

382
    This is an utility function for marking all running or waiting to
383
    be run opcodes with a given status. Opcodes which are already
384
    finalised are not changed.
385

386
    @param status: a given opcode status
387
    @param result: the opcode result
388

389
    """
390
    try:
391
      not_marked = True
392
      for op in self.ops:
393
        if op.status in constants.OPS_FINALIZED:
394
          assert not_marked, "Finalized opcodes found after non-finalized ones"
395
          continue
396
        op.status = status
397
        op.result = result
398
        not_marked = False
399
    finally:
400
      self.queue.UpdateJobUnlocked(self)
401

    
402

    
403
class _OpExecCallbacks(mcpu.OpExecCbBase):
404
  def __init__(self, queue, job, op):
405
    """Initializes this class.
406

407
    @type queue: L{JobQueue}
408
    @param queue: Job queue
409
    @type job: L{_QueuedJob}
410
    @param job: Job object
411
    @type op: L{_QueuedOpCode}
412
    @param op: OpCode
413

414
    """
415
    assert queue, "Queue is missing"
416
    assert job, "Job is missing"
417
    assert op, "Opcode is missing"
418

    
419
    self._queue = queue
420
    self._job = job
421
    self._op = op
422

    
423
  @locking.ssynchronized(_QUEUE, shared=1)
424
  def NotifyStart(self):
425
    """Mark the opcode as running, not lock-waiting.
426

427
    This is called from the mcpu code as a notifier function, when the LU is
428
    finally about to start the Exec() method. Of course, to have end-user
429
    visible results, the opcode must be initially (before calling into
430
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
431

432
    """
433
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
434
                               constants.OP_STATUS_CANCELING)
435

    
436
    # All locks are acquired by now
437
    self._job.lock_status = None
438

    
439
    # Cancel here if we were asked to
440
    if self._op.status == constants.OP_STATUS_CANCELING:
441
      logging.debug("Canceling opcode")
442
      raise CancelJob()
443

    
444
    logging.debug("Opcode is now running")
445
    self._op.status = constants.OP_STATUS_RUNNING
446
    self._op.exec_timestamp = TimeStampNow()
447

    
448
    # And finally replicate the job status
449
    self._queue.UpdateJobUnlocked(self._job)
450

    
451
  @locking.ssynchronized(_QUEUE, shared=1)
452
  def _AppendFeedback(self, timestamp, log_type, log_msg):
453
    """Internal feedback append function, with locks
454

455
    """
456
    self._job.log_serial += 1
457
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
458
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
459

    
460
  def Feedback(self, *args):
461
    """Append a log entry.
462

463
    """
464
    assert len(args) < 3
465

    
466
    if len(args) == 1:
467
      log_type = constants.ELOG_MESSAGE
468
      log_msg = args[0]
469
    else:
470
      (log_type, log_msg) = args
471

    
472
    # The time is split to make serialization easier and not lose
473
    # precision.
474
    timestamp = utils.SplitTime(time.time())
475
    self._AppendFeedback(timestamp, log_type, log_msg)
476

    
477
  def ReportLocks(self, msg):
478
    """Write locking information to the job.
479

480
    Called whenever the LU processor is waiting for a lock or has acquired one.
481

482
    """
483
    # Not getting the queue lock because this is a single assignment
484
    self._job.lock_status = msg
485

    
486

    
487
class _JobChangesChecker(object):
488
  def __init__(self, fields, prev_job_info, prev_log_serial):
489
    """Initializes this class.
490

491
    @type fields: list of strings
492
    @param fields: Fields requested by LUXI client
493
    @type prev_job_info: string
494
    @param prev_job_info: previous job info, as passed by the LUXI client
495
    @type prev_log_serial: string
496
    @param prev_log_serial: previous job serial, as passed by the LUXI client
497

498
    """
499
    self._fields = fields
500
    self._prev_job_info = prev_job_info
501
    self._prev_log_serial = prev_log_serial
502

    
503
  def __call__(self, job):
504
    """Checks whether job has changed.
505

506
    @type job: L{_QueuedJob}
507
    @param job: Job object
508

509
    """
510
    status = job.CalcStatus()
511
    job_info = job.GetInfo(self._fields)
512
    log_entries = job.GetLogEntries(self._prev_log_serial)
513

    
514
    # Serializing and deserializing data can cause type changes (e.g. from
515
    # tuple to list) or precision loss. We're doing it here so that we get
516
    # the same modifications as the data received from the client. Without
517
    # this, the comparison afterwards might fail without the data being
518
    # significantly different.
519
    # TODO: we just deserialized from disk, investigate how to make sure that
520
    # the job info and log entries are compatible to avoid this further step.
521
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
522
    # efficient, though floats will be tricky
523
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
524
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
525

    
526
    # Don't even try to wait if the job is no longer running, there will be
527
    # no changes.
528
    if (status not in (constants.JOB_STATUS_QUEUED,
529
                       constants.JOB_STATUS_RUNNING,
530
                       constants.JOB_STATUS_WAITLOCK) or
531
        job_info != self._prev_job_info or
532
        (log_entries and self._prev_log_serial != log_entries[0][0])):
533
      logging.debug("Job %s changed", job.id)
534
      return (job_info, log_entries)
535

    
536
    return None
537

    
538

    
539
class _JobFileChangesWaiter(object):
540
  def __init__(self, filename):
541
    """Initializes this class.
542

543
    @type filename: string
544
    @param filename: Path to job file
545
    @raises errors.InotifyError: if the notifier cannot be setup
546

547
    """
548
    self._wm = pyinotify.WatchManager()
549
    self._inotify_handler = \
550
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
551
    self._notifier = \
552
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
553
    try:
554
      self._inotify_handler.enable()
555
    except Exception:
556
      # pyinotify doesn't close file descriptors automatically
557
      self._notifier.stop()
558
      raise
559

    
560
  def _OnInotify(self, notifier_enabled):
561
    """Callback for inotify.
562

563
    """
564
    if not notifier_enabled:
565
      self._inotify_handler.enable()
566

    
567
  def Wait(self, timeout):
568
    """Waits for the job file to change.
569

570
    @type timeout: float
571
    @param timeout: Timeout in seconds
572
    @return: Whether there have been events
573

574
    """
575
    assert timeout >= 0
576
    have_events = self._notifier.check_events(timeout * 1000)
577
    if have_events:
578
      self._notifier.read_events()
579
    self._notifier.process_events()
580
    return have_events
581

    
582
  def Close(self):
583
    """Closes underlying notifier and its file descriptor.
584

585
    """
586
    self._notifier.stop()
587

    
588

    
589
class _JobChangesWaiter(object):
590
  def __init__(self, filename):
591
    """Initializes this class.
592

593
    @type filename: string
594
    @param filename: Path to job file
595

596
    """
597
    self._filewaiter = None
598
    self._filename = filename
599

    
600
  def Wait(self, timeout):
601
    """Waits for a job to change.
602

603
    @type timeout: float
604
    @param timeout: Timeout in seconds
605
    @return: Whether there have been events
606

607
    """
608
    if self._filewaiter:
609
      return self._filewaiter.Wait(timeout)
610

    
611
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
612
    # If this point is reached, return immediately and let caller check the job
613
    # file again in case there were changes since the last check. This avoids a
614
    # race condition.
615
    self._filewaiter = _JobFileChangesWaiter(self._filename)
616

    
617
    return True
618

    
619
  def Close(self):
620
    """Closes underlying waiter.
621

622
    """
623
    if self._filewaiter:
624
      self._filewaiter.Close()
625

    
626

    
627
class _WaitForJobChangesHelper(object):
628
  """Helper class using inotify to wait for changes in a job file.
629

630
  This class takes a previous job status and serial, and alerts the client when
631
  the current job status has changed.
632

633
  """
634
  @staticmethod
635
  def _CheckForChanges(job_load_fn, check_fn):
636
    job = job_load_fn()
637
    if not job:
638
      raise errors.JobLost()
639

    
640
    result = check_fn(job)
641
    if result is None:
642
      raise utils.RetryAgain()
643

    
644
    return result
645

    
646
  def __call__(self, filename, job_load_fn,
647
               fields, prev_job_info, prev_log_serial, timeout):
648
    """Waits for changes on a job.
649

650
    @type filename: string
651
    @param filename: File on which to wait for changes
652
    @type job_load_fn: callable
653
    @param job_load_fn: Function to load job
654
    @type fields: list of strings
655
    @param fields: Which fields to check for changes
656
    @type prev_job_info: list or None
657
    @param prev_job_info: Last job information returned
658
    @type prev_log_serial: int
659
    @param prev_log_serial: Last job message serial number
660
    @type timeout: float
661
    @param timeout: maximum time to wait in seconds
662

663
    """
664
    try:
665
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
666
      waiter = _JobChangesWaiter(filename)
667
      try:
668
        return utils.Retry(compat.partial(self._CheckForChanges,
669
                                          job_load_fn, check_fn),
670
                           utils.RETRY_REMAINING_TIME, timeout,
671
                           wait_fn=waiter.Wait)
672
      finally:
673
        waiter.Close()
674
    except (errors.InotifyError, errors.JobLost):
675
      return None
676
    except utils.RetryTimeout:
677
      return constants.JOB_NOTCHANGED
678

    
679

    
680
class _JobQueueWorker(workerpool.BaseWorker):
681
  """The actual job workers.
682

683
  """
684
  def RunTask(self, job): # pylint: disable-msg=W0221
685
    """Job executor.
686

687
    This functions processes a job. It is closely tied to the _QueuedJob and
688
    _QueuedOpCode classes.
689

690
    @type job: L{_QueuedJob}
691
    @param job: the job to be processed
692

693
    """
694
    logging.info("Processing job %s", job.id)
695
    proc = mcpu.Processor(self.pool.queue.context, job.id)
696
    queue = job.queue
697
    try:
698
      try:
699
        count = len(job.ops)
700
        for idx, op in enumerate(job.ops):
701
          op_summary = op.input.Summary()
702
          if op.status == constants.OP_STATUS_SUCCESS:
703
            # this is a job that was partially completed before master
704
            # daemon shutdown, so it can be expected that some opcodes
705
            # are already completed successfully (if any did error
706
            # out, then the whole job should have been aborted and not
707
            # resubmitted for processing)
708
            logging.info("Op %s/%s: opcode %s already processed, skipping",
709
                         idx + 1, count, op_summary)
710
            continue
711
          try:
712
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
713
                         op_summary)
714

    
715
            queue.acquire(shared=1)
716
            try:
717
              if op.status == constants.OP_STATUS_CANCELED:
718
                logging.debug("Canceling opcode")
719
                raise CancelJob()
720
              assert op.status == constants.OP_STATUS_QUEUED
721
              logging.debug("Opcode %s/%s waiting for locks",
722
                            idx + 1, count)
723
              op.status = constants.OP_STATUS_WAITLOCK
724
              op.result = None
725
              op.start_timestamp = TimeStampNow()
726
              if idx == 0: # first opcode
727
                job.start_timestamp = op.start_timestamp
728
              queue.UpdateJobUnlocked(job)
729

    
730
              input_opcode = op.input
731
            finally:
732
              queue.release()
733

    
734
            # Make sure not to hold queue lock while calling ExecOpCode
735
            result = proc.ExecOpCode(input_opcode,
736
                                     _OpExecCallbacks(queue, job, op))
737

    
738
            queue.acquire(shared=1)
739
            try:
740
              logging.debug("Opcode %s/%s succeeded", idx + 1, count)
741
              op.status = constants.OP_STATUS_SUCCESS
742
              op.result = result
743
              op.end_timestamp = TimeStampNow()
744
              queue.UpdateJobUnlocked(job)
745
            finally:
746
              queue.release()
747

    
748
            logging.info("Op %s/%s: Successfully finished opcode %s",
749
                         idx + 1, count, op_summary)
750
          except CancelJob:
751
            # Will be handled further up
752
            raise
753
          except Exception, err:
754
            queue.acquire(shared=1)
755
            try:
756
              try:
757
                logging.debug("Opcode %s/%s failed", idx + 1, count)
758
                op.status = constants.OP_STATUS_ERROR
759
                if isinstance(err, errors.GenericError):
760
                  to_encode = err
761
                else:
762
                  to_encode = errors.OpExecError(str(err))
763
                op.result = errors.EncodeException(to_encode)
764
                op.end_timestamp = TimeStampNow()
765
                logging.info("Op %s/%s: Error in opcode %s: %s",
766
                             idx + 1, count, op_summary, err)
767
              finally:
768
                queue.UpdateJobUnlocked(job)
769
            finally:
770
              queue.release()
771
            raise
772

    
773
      except CancelJob:
774
        queue.acquire(shared=1)
775
        try:
776
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
777
                                "Job canceled by request")
778
        finally:
779
          queue.release()
780
      except errors.GenericError, err:
781
        logging.exception("Ganeti exception")
782
      except:
783
        logging.exception("Unhandled exception")
784
    finally:
785
      queue.acquire(shared=1)
786
      try:
787
        try:
788
          job.lock_status = None
789
          job.end_timestamp = TimeStampNow()
790
          queue.UpdateJobUnlocked(job)
791
        finally:
792
          job_id = job.id
793
          status = job.CalcStatus()
794
      finally:
795
        queue.release()
796

    
797
      logging.info("Finished job %s, status = %s", job_id, status)
798

    
799

    
800
class _JobQueueWorkerPool(workerpool.WorkerPool):
801
  """Simple class implementing a job-processing workerpool.
802

803
  """
804
  def __init__(self, queue):
805
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
806
                                              JOBQUEUE_THREADS,
807
                                              _JobQueueWorker)
808
    self.queue = queue
809

    
810

    
811
def _RequireOpenQueue(fn):
812
  """Decorator for "public" functions.
813

814
  This function should be used for all 'public' functions. That is,
815
  functions usually called from other classes. Note that this should
816
  be applied only to methods (not plain functions), since it expects
817
  that the decorated function is called with a first argument that has
818
  a '_queue_filelock' argument.
819

820
  @warning: Use this decorator only after locking.ssynchronized
821

822
  Example::
823
    @locking.ssynchronized(_LOCK)
824
    @_RequireOpenQueue
825
    def Example(self):
826
      pass
827

828
  """
829
  def wrapper(self, *args, **kwargs):
830
    # pylint: disable-msg=W0212
831
    assert self._queue_filelock is not None, "Queue should be open"
832
    return fn(self, *args, **kwargs)
833
  return wrapper
834

    
835

    
836
class JobQueue(object):
837
  """Queue used to manage the jobs.
838

839
  @cvar _RE_JOB_FILE: regex matching the valid job file names
840

841
  """
842
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
843

    
844
  def __init__(self, context):
845
    """Constructor for JobQueue.
846

847
    The constructor will initialize the job queue object and then
848
    start loading the current jobs from disk, either for starting them
849
    (if they were queue) or for aborting them (if they were already
850
    running).
851

852
    @type context: GanetiContext
853
    @param context: the context object for access to the configuration
854
        data and other ganeti objects
855

856
    """
857
    self.context = context
858
    self._memcache = weakref.WeakValueDictionary()
859
    self._my_hostname = netutils.HostInfo().name
860

    
861
    # The Big JobQueue lock. If a code block or method acquires it in shared
862
    # mode safe it must guarantee concurrency with all the code acquiring it in
863
    # shared mode, including itself. In order not to acquire it at all
864
    # concurrency must be guaranteed with all code acquiring it in shared mode
865
    # and all code acquiring it exclusively.
866
    self._lock = locking.SharedLock("JobQueue")
867

    
868
    self.acquire = self._lock.acquire
869
    self.release = self._lock.release
870

    
871
    # Initialize the queue, and acquire the filelock.
872
    # This ensures no other process is working on the job queue.
873
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
874

    
875
    # Read serial file
876
    self._last_serial = jstore.ReadSerial()
877
    assert self._last_serial is not None, ("Serial file was modified between"
878
                                           " check in jstore and here")
879

    
880
    # Get initial list of nodes
881
    self._nodes = dict((n.name, n.primary_ip)
882
                       for n in self.context.cfg.GetAllNodesInfo().values()
883
                       if n.master_candidate)
884

    
885
    # Remove master node
886
    self._nodes.pop(self._my_hostname, None)
887

    
888
    # TODO: Check consistency across nodes
889

    
890
    self._queue_size = 0
891
    self._UpdateQueueSizeUnlocked()
892
    self._drained = self._IsQueueMarkedDrain()
893

    
894
    # Setup worker pool
895
    self._wpool = _JobQueueWorkerPool(self)
896
    try:
897
      # We need to lock here because WorkerPool.AddTask() may start a job while
898
      # we're still doing our work.
899
      self.acquire()
900
      try:
901
        logging.info("Inspecting job queue")
902

    
903
        all_job_ids = self._GetJobIDsUnlocked()
904
        jobs_count = len(all_job_ids)
905
        lastinfo = time.time()
906
        for idx, job_id in enumerate(all_job_ids):
907
          # Give an update every 1000 jobs or 10 seconds
908
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
909
              idx == (jobs_count - 1)):
910
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
911
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
912
            lastinfo = time.time()
913

    
914
          job = self._LoadJobUnlocked(job_id)
915

    
916
          # a failure in loading the job can cause 'None' to be returned
917
          if job is None:
918
            continue
919

    
920
          status = job.CalcStatus()
921

    
922
          if status in (constants.JOB_STATUS_QUEUED, ):
923
            self._wpool.AddTask((job, ))
924

    
925
          elif status in (constants.JOB_STATUS_RUNNING,
926
                          constants.JOB_STATUS_WAITLOCK,
927
                          constants.JOB_STATUS_CANCELING):
928
            logging.warning("Unfinished job %s found: %s", job.id, job)
929
            job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
930
                                  "Unclean master daemon shutdown")
931

    
932
        logging.info("Job queue inspection finished")
933
      finally:
934
        self.release()
935
    except:
936
      self._wpool.TerminateWorkers()
937
      raise
938

    
939
  @locking.ssynchronized(_LOCK)
940
  @_RequireOpenQueue
941
  def AddNode(self, node):
942
    """Register a new node with the queue.
943

944
    @type node: L{objects.Node}
945
    @param node: the node object to be added
946

947
    """
948
    node_name = node.name
949
    assert node_name != self._my_hostname
950

    
951
    # Clean queue directory on added node
952
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
953
    msg = result.fail_msg
954
    if msg:
955
      logging.warning("Cannot cleanup queue directory on node %s: %s",
956
                      node_name, msg)
957

    
958
    if not node.master_candidate:
959
      # remove if existing, ignoring errors
960
      self._nodes.pop(node_name, None)
961
      # and skip the replication of the job ids
962
      return
963

    
964
    # Upload the whole queue excluding archived jobs
965
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
966

    
967
    # Upload current serial file
968
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
969

    
970
    for file_name in files:
971
      # Read file content
972
      content = utils.ReadFile(file_name)
973

    
974
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
975
                                                  [node.primary_ip],
976
                                                  file_name, content)
977
      msg = result[node_name].fail_msg
978
      if msg:
979
        logging.error("Failed to upload file %s to node %s: %s",
980
                      file_name, node_name, msg)
981

    
982
    self._nodes[node_name] = node.primary_ip
983

    
984
  @locking.ssynchronized(_LOCK)
985
  @_RequireOpenQueue
986
  def RemoveNode(self, node_name):
987
    """Callback called when removing nodes from the cluster.
988

989
    @type node_name: str
990
    @param node_name: the name of the node to remove
991

992
    """
993
    self._nodes.pop(node_name, None)
994

    
995
  @staticmethod
996
  def _CheckRpcResult(result, nodes, failmsg):
997
    """Verifies the status of an RPC call.
998

999
    Since we aim to keep consistency should this node (the current
1000
    master) fail, we will log errors if our rpc fail, and especially
1001
    log the case when more than half of the nodes fails.
1002

1003
    @param result: the data as returned from the rpc call
1004
    @type nodes: list
1005
    @param nodes: the list of nodes we made the call to
1006
    @type failmsg: str
1007
    @param failmsg: the identifier to be used for logging
1008

1009
    """
1010
    failed = []
1011
    success = []
1012

    
1013
    for node in nodes:
1014
      msg = result[node].fail_msg
1015
      if msg:
1016
        failed.append(node)
1017
        logging.error("RPC call %s (%s) failed on node %s: %s",
1018
                      result[node].call, failmsg, node, msg)
1019
      else:
1020
        success.append(node)
1021

    
1022
    # +1 for the master node
1023
    if (len(success) + 1) < len(failed):
1024
      # TODO: Handle failing nodes
1025
      logging.error("More than half of the nodes failed")
1026

    
1027
  def _GetNodeIp(self):
1028
    """Helper for returning the node name/ip list.
1029

1030
    @rtype: (list, list)
1031
    @return: a tuple of two lists, the first one with the node
1032
        names and the second one with the node addresses
1033

1034
    """
1035
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1036
    name_list = self._nodes.keys()
1037
    addr_list = [self._nodes[name] for name in name_list]
1038
    return name_list, addr_list
1039

    
1040
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1041
    """Writes a file locally and then replicates it to all nodes.
1042

1043
    This function will replace the contents of a file on the local
1044
    node and then replicate it to all the other nodes we have.
1045

1046
    @type file_name: str
1047
    @param file_name: the path of the file to be replicated
1048
    @type data: str
1049
    @param data: the new contents of the file
1050
    @type replicate: boolean
1051
    @param replicate: whether to spread the changes to the remote nodes
1052

1053
    """
1054
    utils.WriteFile(file_name, data=data)
1055

    
1056
    if replicate:
1057
      names, addrs = self._GetNodeIp()
1058
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1059
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1060

    
1061
  def _RenameFilesUnlocked(self, rename):
1062
    """Renames a file locally and then replicate the change.
1063

1064
    This function will rename a file in the local queue directory
1065
    and then replicate this rename to all the other nodes we have.
1066

1067
    @type rename: list of (old, new)
1068
    @param rename: List containing tuples mapping old to new names
1069

1070
    """
1071
    # Rename them locally
1072
    for old, new in rename:
1073
      utils.RenameFile(old, new, mkdir=True)
1074

    
1075
    # ... and on all nodes
1076
    names, addrs = self._GetNodeIp()
1077
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1078
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1079

    
1080
  @staticmethod
1081
  def _FormatJobID(job_id):
1082
    """Convert a job ID to string format.
1083

1084
    Currently this just does C{str(job_id)} after performing some
1085
    checks, but if we want to change the job id format this will
1086
    abstract this change.
1087

1088
    @type job_id: int or long
1089
    @param job_id: the numeric job id
1090
    @rtype: str
1091
    @return: the formatted job id
1092

1093
    """
1094
    if not isinstance(job_id, (int, long)):
1095
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1096
    if job_id < 0:
1097
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1098

    
1099
    return str(job_id)
1100

    
1101
  @classmethod
1102
  def _GetArchiveDirectory(cls, job_id):
1103
    """Returns the archive directory for a job.
1104

1105
    @type job_id: str
1106
    @param job_id: Job identifier
1107
    @rtype: str
1108
    @return: Directory name
1109

1110
    """
1111
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1112

    
1113
  def _NewSerialsUnlocked(self, count):
1114
    """Generates a new job identifier.
1115

1116
    Job identifiers are unique during the lifetime of a cluster.
1117

1118
    @type count: integer
1119
    @param count: how many serials to return
1120
    @rtype: str
1121
    @return: a string representing the job identifier.
1122

1123
    """
1124
    assert count > 0
1125
    # New number
1126
    serial = self._last_serial + count
1127

    
1128
    # Write to file
1129
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1130
                             "%s\n" % serial, True)
1131

    
1132
    result = [self._FormatJobID(v)
1133
              for v in range(self._last_serial, serial + 1)]
1134
    # Keep it only if we were able to write the file
1135
    self._last_serial = serial
1136

    
1137
    return result
1138

    
1139
  @staticmethod
1140
  def _GetJobPath(job_id):
1141
    """Returns the job file for a given job id.
1142

1143
    @type job_id: str
1144
    @param job_id: the job identifier
1145
    @rtype: str
1146
    @return: the path to the job file
1147

1148
    """
1149
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1150

    
1151
  @classmethod
1152
  def _GetArchivedJobPath(cls, job_id):
1153
    """Returns the archived job file for a give job id.
1154

1155
    @type job_id: str
1156
    @param job_id: the job identifier
1157
    @rtype: str
1158
    @return: the path to the archived job file
1159

1160
    """
1161
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1162
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1163

    
1164
  def _GetJobIDsUnlocked(self, sort=True):
1165
    """Return all known job IDs.
1166

1167
    The method only looks at disk because it's a requirement that all
1168
    jobs are present on disk (so in the _memcache we don't have any
1169
    extra IDs).
1170

1171
    @type sort: boolean
1172
    @param sort: perform sorting on the returned job ids
1173
    @rtype: list
1174
    @return: the list of job IDs
1175

1176
    """
1177
    jlist = []
1178
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1179
      m = self._RE_JOB_FILE.match(filename)
1180
      if m:
1181
        jlist.append(m.group(1))
1182
    if sort:
1183
      jlist = utils.NiceSort(jlist)
1184
    return jlist
1185

    
1186
  def _LoadJobUnlocked(self, job_id):
1187
    """Loads a job from the disk or memory.
1188

1189
    Given a job id, this will return the cached job object if
1190
    existing, or try to load the job from the disk. If loading from
1191
    disk, it will also add the job to the cache.
1192

1193
    @param job_id: the job id
1194
    @rtype: L{_QueuedJob} or None
1195
    @return: either None or the job object
1196

1197
    """
1198
    job = self._memcache.get(job_id, None)
1199
    if job:
1200
      logging.debug("Found job %s in memcache", job_id)
1201
      return job
1202

    
1203
    try:
1204
      job = self._LoadJobFromDisk(job_id)
1205
      if job is None:
1206
        return job
1207
    except errors.JobFileCorrupted:
1208
      old_path = self._GetJobPath(job_id)
1209
      new_path = self._GetArchivedJobPath(job_id)
1210
      if old_path == new_path:
1211
        # job already archived (future case)
1212
        logging.exception("Can't parse job %s", job_id)
1213
      else:
1214
        # non-archived case
1215
        logging.exception("Can't parse job %s, will archive.", job_id)
1216
        self._RenameFilesUnlocked([(old_path, new_path)])
1217
      return None
1218

    
1219
    self._memcache[job_id] = job
1220
    logging.debug("Added job %s to the cache", job_id)
1221
    return job
1222

    
1223
  def _LoadJobFromDisk(self, job_id):
1224
    """Load the given job file from disk.
1225

1226
    Given a job file, read, load and restore it in a _QueuedJob format.
1227

1228
    @type job_id: string
1229
    @param job_id: job identifier
1230
    @rtype: L{_QueuedJob} or None
1231
    @return: either None or the job object
1232

1233
    """
1234
    filepath = self._GetJobPath(job_id)
1235
    logging.debug("Loading job from %s", filepath)
1236
    try:
1237
      raw_data = utils.ReadFile(filepath)
1238
    except EnvironmentError, err:
1239
      if err.errno in (errno.ENOENT, ):
1240
        return None
1241
      raise
1242

    
1243
    try:
1244
      data = serializer.LoadJson(raw_data)
1245
      job = _QueuedJob.Restore(self, data)
1246
    except Exception, err: # pylint: disable-msg=W0703
1247
      raise errors.JobFileCorrupted(err)
1248

    
1249
    return job
1250

    
1251
  def SafeLoadJobFromDisk(self, job_id):
1252
    """Load the given job file from disk.
1253

1254
    Given a job file, read, load and restore it in a _QueuedJob format.
1255
    In case of error reading the job, it gets returned as None, and the
1256
    exception is logged.
1257

1258
    @type job_id: string
1259
    @param job_id: job identifier
1260
    @rtype: L{_QueuedJob} or None
1261
    @return: either None or the job object
1262

1263
    """
1264
    try:
1265
      return self._LoadJobFromDisk(job_id)
1266
    except (errors.JobFileCorrupted, EnvironmentError):
1267
      logging.exception("Can't load/parse job %s", job_id)
1268
      return None
1269

    
1270
  @staticmethod
1271
  def _IsQueueMarkedDrain():
1272
    """Check if the queue is marked from drain.
1273

1274
    This currently uses the queue drain file, which makes it a
1275
    per-node flag. In the future this can be moved to the config file.
1276

1277
    @rtype: boolean
1278
    @return: True of the job queue is marked for draining
1279

1280
    """
1281
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1282

    
1283
  def _UpdateQueueSizeUnlocked(self):
1284
    """Update the queue size.
1285

1286
    """
1287
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1288

    
1289
  @locking.ssynchronized(_LOCK)
1290
  @_RequireOpenQueue
1291
  def SetDrainFlag(self, drain_flag):
1292
    """Sets the drain flag for the queue.
1293

1294
    @type drain_flag: boolean
1295
    @param drain_flag: Whether to set or unset the drain flag
1296

1297
    """
1298
    if drain_flag:
1299
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1300
    else:
1301
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1302

    
1303
    self._drained = drain_flag
1304

    
1305
    return True
1306

    
1307
  @_RequireOpenQueue
1308
  def _SubmitJobUnlocked(self, job_id, ops):
1309
    """Create and store a new job.
1310

1311
    This enters the job into our job queue and also puts it on the new
1312
    queue, in order for it to be picked up by the queue processors.
1313

1314
    @type job_id: job ID
1315
    @param job_id: the job ID for the new job
1316
    @type ops: list
1317
    @param ops: The list of OpCodes that will become the new job.
1318
    @rtype: L{_QueuedJob}
1319
    @return: the job object to be queued
1320
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1321
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1322

1323
    """
1324
    # Ok when sharing the big job queue lock, as the drain file is created when
1325
    # the lock is exclusive.
1326
    if self._drained:
1327
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1328

    
1329
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1330
      raise errors.JobQueueFull()
1331

    
1332
    job = _QueuedJob(self, job_id, ops)
1333

    
1334
    # Write to disk
1335
    self.UpdateJobUnlocked(job)
1336

    
1337
    self._queue_size += 1
1338

    
1339
    logging.debug("Adding new job %s to the cache", job_id)
1340
    self._memcache[job_id] = job
1341

    
1342
    return job
1343

    
1344
  @locking.ssynchronized(_LOCK)
1345
  @_RequireOpenQueue
1346
  def SubmitJob(self, ops):
1347
    """Create and store a new job.
1348

1349
    @see: L{_SubmitJobUnlocked}
1350

1351
    """
1352
    job_id = self._NewSerialsUnlocked(1)[0]
1353
    self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
1354
    return job_id
1355

    
1356
  @locking.ssynchronized(_LOCK)
1357
  @_RequireOpenQueue
1358
  def SubmitManyJobs(self, jobs):
1359
    """Create and store multiple jobs.
1360

1361
    @see: L{_SubmitJobUnlocked}
1362

1363
    """
1364
    results = []
1365
    tasks = []
1366
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1367
    for job_id, ops in zip(all_job_ids, jobs):
1368
      try:
1369
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1370
        status = True
1371
        data = job_id
1372
      except errors.GenericError, err:
1373
        data = str(err)
1374
        status = False
1375
      results.append((status, data))
1376
    self._wpool.AddManyTasks(tasks)
1377

    
1378
    return results
1379

    
1380
  @_RequireOpenQueue
1381
  def UpdateJobUnlocked(self, job, replicate=True):
1382
    """Update a job's on disk storage.
1383

1384
    After a job has been modified, this function needs to be called in
1385
    order to write the changes to disk and replicate them to the other
1386
    nodes.
1387

1388
    @type job: L{_QueuedJob}
1389
    @param job: the changed job
1390
    @type replicate: boolean
1391
    @param replicate: whether to replicate the change to remote nodes
1392

1393
    """
1394
    filename = self._GetJobPath(job.id)
1395
    data = serializer.DumpJson(job.Serialize(), indent=False)
1396
    logging.debug("Writing job %s to %s", job.id, filename)
1397
    self._UpdateJobQueueFile(filename, data, replicate)
1398

    
1399
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1400
                        timeout):
1401
    """Waits for changes in a job.
1402

1403
    @type job_id: string
1404
    @param job_id: Job identifier
1405
    @type fields: list of strings
1406
    @param fields: Which fields to check for changes
1407
    @type prev_job_info: list or None
1408
    @param prev_job_info: Last job information returned
1409
    @type prev_log_serial: int
1410
    @param prev_log_serial: Last job message serial number
1411
    @type timeout: float
1412
    @param timeout: maximum time to wait in seconds
1413
    @rtype: tuple (job info, log entries)
1414
    @return: a tuple of the job information as required via
1415
        the fields parameter, and the log entries as a list
1416

1417
        if the job has not changed and the timeout has expired,
1418
        we instead return a special value,
1419
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1420
        as such by the clients
1421

1422
    """
1423
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1424

    
1425
    helper = _WaitForJobChangesHelper()
1426

    
1427
    return helper(self._GetJobPath(job_id), load_fn,
1428
                  fields, prev_job_info, prev_log_serial, timeout)
1429

    
1430
  @locking.ssynchronized(_LOCK)
1431
  @_RequireOpenQueue
1432
  def CancelJob(self, job_id):
1433
    """Cancels a job.
1434

1435
    This will only succeed if the job has not started yet.
1436

1437
    @type job_id: string
1438
    @param job_id: job ID of job to be cancelled.
1439

1440
    """
1441
    logging.info("Cancelling job %s", job_id)
1442

    
1443
    job = self._LoadJobUnlocked(job_id)
1444
    if not job:
1445
      logging.debug("Job %s not found", job_id)
1446
      return (False, "Job %s not found" % job_id)
1447

    
1448
    job_status = job.CalcStatus()
1449

    
1450
    if job_status not in (constants.JOB_STATUS_QUEUED,
1451
                          constants.JOB_STATUS_WAITLOCK):
1452
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1453
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1454

    
1455
    if job_status == constants.JOB_STATUS_QUEUED:
1456
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1457
                            "Job canceled by request")
1458
      return (True, "Job %s canceled" % job.id)
1459

    
1460
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1461
      # The worker will notice the new status and cancel the job
1462
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1463
      return (True, "Job %s will be canceled" % job.id)
1464

    
1465
  @_RequireOpenQueue
1466
  def _ArchiveJobsUnlocked(self, jobs):
1467
    """Archives jobs.
1468

1469
    @type jobs: list of L{_QueuedJob}
1470
    @param jobs: Job objects
1471
    @rtype: int
1472
    @return: Number of archived jobs
1473

1474
    """
1475
    archive_jobs = []
1476
    rename_files = []
1477
    for job in jobs:
1478
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1479
        logging.debug("Job %s is not yet done", job.id)
1480
        continue
1481

    
1482
      archive_jobs.append(job)
1483

    
1484
      old = self._GetJobPath(job.id)
1485
      new = self._GetArchivedJobPath(job.id)
1486
      rename_files.append((old, new))
1487

    
1488
    # TODO: What if 1..n files fail to rename?
1489
    self._RenameFilesUnlocked(rename_files)
1490

    
1491
    logging.debug("Successfully archived job(s) %s",
1492
                  utils.CommaJoin(job.id for job in archive_jobs))
1493

    
1494
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1495
    # the files, we update the cached queue size from the filesystem. When we
1496
    # get around to fix the TODO: above, we can use the number of actually
1497
    # archived jobs to fix this.
1498
    self._UpdateQueueSizeUnlocked()
1499
    return len(archive_jobs)
1500

    
1501
  @locking.ssynchronized(_LOCK)
1502
  @_RequireOpenQueue
1503
  def ArchiveJob(self, job_id):
1504
    """Archives a job.
1505

1506
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1507

1508
    @type job_id: string
1509
    @param job_id: Job ID of job to be archived.
1510
    @rtype: bool
1511
    @return: Whether job was archived
1512

1513
    """
1514
    logging.info("Archiving job %s", job_id)
1515

    
1516
    job = self._LoadJobUnlocked(job_id)
1517
    if not job:
1518
      logging.debug("Job %s not found", job_id)
1519
      return False
1520

    
1521
    return self._ArchiveJobsUnlocked([job]) == 1
1522

    
1523
  @locking.ssynchronized(_LOCK)
1524
  @_RequireOpenQueue
1525
  def AutoArchiveJobs(self, age, timeout):
1526
    """Archives all jobs based on age.
1527

1528
    The method will archive all jobs which are older than the age
1529
    parameter. For jobs that don't have an end timestamp, the start
1530
    timestamp will be considered. The special '-1' age will cause
1531
    archival of all jobs (that are not running or queued).
1532

1533
    @type age: int
1534
    @param age: the minimum age in seconds
1535

1536
    """
1537
    logging.info("Archiving jobs with age more than %s seconds", age)
1538

    
1539
    now = time.time()
1540
    end_time = now + timeout
1541
    archived_count = 0
1542
    last_touched = 0
1543

    
1544
    all_job_ids = self._GetJobIDsUnlocked()
1545
    pending = []
1546
    for idx, job_id in enumerate(all_job_ids):
1547
      last_touched = idx + 1
1548

    
1549
      # Not optimal because jobs could be pending
1550
      # TODO: Measure average duration for job archival and take number of
1551
      # pending jobs into account.
1552
      if time.time() > end_time:
1553
        break
1554

    
1555
      # Returns None if the job failed to load
1556
      job = self._LoadJobUnlocked(job_id)
1557
      if job:
1558
        if job.end_timestamp is None:
1559
          if job.start_timestamp is None:
1560
            job_age = job.received_timestamp
1561
          else:
1562
            job_age = job.start_timestamp
1563
        else:
1564
          job_age = job.end_timestamp
1565

    
1566
        if age == -1 or now - job_age[0] > age:
1567
          pending.append(job)
1568

    
1569
          # Archive 10 jobs at a time
1570
          if len(pending) >= 10:
1571
            archived_count += self._ArchiveJobsUnlocked(pending)
1572
            pending = []
1573

    
1574
    if pending:
1575
      archived_count += self._ArchiveJobsUnlocked(pending)
1576

    
1577
    return (archived_count, len(all_job_ids) - last_touched)
1578

    
1579
  def QueryJobs(self, job_ids, fields):
1580
    """Returns a list of jobs in queue.
1581

1582
    @type job_ids: list
1583
    @param job_ids: sequence of job identifiers or None for all
1584
    @type fields: list
1585
    @param fields: names of fields to return
1586
    @rtype: list
1587
    @return: list one element per job, each element being list with
1588
        the requested fields
1589

1590
    """
1591
    jobs = []
1592
    list_all = False
1593
    if not job_ids:
1594
      # Since files are added to/removed from the queue atomically, there's no
1595
      # risk of getting the job ids in an inconsistent state.
1596
      job_ids = self._GetJobIDsUnlocked()
1597
      list_all = True
1598

    
1599
    for job_id in job_ids:
1600
      job = self.SafeLoadJobFromDisk(job_id)
1601
      if job is not None:
1602
        jobs.append(job.GetInfo(fields))
1603
      elif not list_all:
1604
        jobs.append(None)
1605

    
1606
    return jobs
1607

    
1608
  @locking.ssynchronized(_LOCK)
1609
  @_RequireOpenQueue
1610
  def Shutdown(self):
1611
    """Stops the job queue.
1612

1613
    This shutdowns all the worker threads an closes the queue.
1614

1615
    """
1616
    self._wpool.TerminateWorkers()
1617

    
1618
    self._queue_filelock.Close()
1619
    self._queue_filelock = None