Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 099b2870

History | View | Annotate | Download (50.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import errno
35
import re
36
import time
37
import weakref
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
59

    
60

    
61
JOBQUEUE_THREADS = 25
62
JOBS_PER_ARCHIVE_DIRECTORY = 10000
63

    
64
# member lock names to be passed to @ssynchronized decorator
65
_LOCK = "_lock"
66
_QUEUE = "_queue"
67

    
68

    
69
class CancelJob(Exception):
70
  """Special exception to cancel a job.
71

72
  """
73

    
74

    
75
def TimeStampNow():
76
  """Returns the current timestamp.
77

78
  @rtype: tuple
79
  @return: the current time in the (seconds, microseconds) format
80

81
  """
82
  return utils.SplitTime(time.time())
83

    
84

    
85
class _QueuedOpCode(object):
86
  """Encapsulates an opcode object.
87

88
  @ivar log: holds the execution log and consists of tuples
89
  of the form C{(log_serial, timestamp, level, message)}
90
  @ivar input: the OpCode we encapsulate
91
  @ivar status: the current status
92
  @ivar result: the result of the LU execution
93
  @ivar start_timestamp: timestamp for the start of the execution
94
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
95
  @ivar stop_timestamp: timestamp for the end of the execution
96

97
  """
98
  __slots__ = ["input", "status", "result", "log", "priority",
99
               "start_timestamp", "exec_timestamp", "end_timestamp",
100
               "__weakref__"]
101

    
102
  def __init__(self, op):
103
    """Constructor for the _QuededOpCode.
104

105
    @type op: L{opcodes.OpCode}
106
    @param op: the opcode we encapsulate
107

108
    """
109
    self.input = op
110
    self.status = constants.OP_STATUS_QUEUED
111
    self.result = None
112
    self.log = []
113
    self.start_timestamp = None
114
    self.exec_timestamp = None
115
    self.end_timestamp = None
116

    
117
    # Get initial priority (it might change during the lifetime of this opcode)
118
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
119

    
120
  @classmethod
121
  def Restore(cls, state):
122
    """Restore the _QueuedOpCode from the serialized form.
123

124
    @type state: dict
125
    @param state: the serialized state
126
    @rtype: _QueuedOpCode
127
    @return: a new _QueuedOpCode instance
128

129
    """
130
    obj = _QueuedOpCode.__new__(cls)
131
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
132
    obj.status = state["status"]
133
    obj.result = state["result"]
134
    obj.log = state["log"]
135
    obj.start_timestamp = state.get("start_timestamp", None)
136
    obj.exec_timestamp = state.get("exec_timestamp", None)
137
    obj.end_timestamp = state.get("end_timestamp", None)
138
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
139
    return obj
140

    
141
  def Serialize(self):
142
    """Serializes this _QueuedOpCode.
143

144
    @rtype: dict
145
    @return: the dictionary holding the serialized state
146

147
    """
148
    return {
149
      "input": self.input.__getstate__(),
150
      "status": self.status,
151
      "result": self.result,
152
      "log": self.log,
153
      "start_timestamp": self.start_timestamp,
154
      "exec_timestamp": self.exec_timestamp,
155
      "end_timestamp": self.end_timestamp,
156
      "priority": self.priority,
157
      }
158

    
159

    
160
class _QueuedJob(object):
161
  """In-memory job representation.
162

163
  This is what we use to track the user-submitted jobs. Locking must
164
  be taken care of by users of this class.
165

166
  @type queue: L{JobQueue}
167
  @ivar queue: the parent queue
168
  @ivar id: the job ID
169
  @type ops: list
170
  @ivar ops: the list of _QueuedOpCode that constitute the job
171
  @type log_serial: int
172
  @ivar log_serial: holds the index for the next log entry
173
  @ivar received_timestamp: the timestamp for when the job was received
174
  @ivar start_timestmap: the timestamp for start of execution
175
  @ivar end_timestamp: the timestamp for end of execution
176

177
  """
178
  # pylint: disable-msg=W0212
179
  __slots__ = ["queue", "id", "ops", "log_serial",
180
               "received_timestamp", "start_timestamp", "end_timestamp",
181
               "__weakref__"]
182

    
183
  def __init__(self, queue, job_id, ops):
184
    """Constructor for the _QueuedJob.
185

186
    @type queue: L{JobQueue}
187
    @param queue: our parent queue
188
    @type job_id: job_id
189
    @param job_id: our job id
190
    @type ops: list
191
    @param ops: the list of opcodes we hold, which will be encapsulated
192
        in _QueuedOpCodes
193

194
    """
195
    if not ops:
196
      raise errors.GenericError("A job needs at least one opcode")
197

    
198
    self.queue = queue
199
    self.id = job_id
200
    self.ops = [_QueuedOpCode(op) for op in ops]
201
    self.log_serial = 0
202
    self.received_timestamp = TimeStampNow()
203
    self.start_timestamp = None
204
    self.end_timestamp = None
205

    
206
  def __repr__(self):
207
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
208
              "id=%s" % self.id,
209
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
210

    
211
    return "<%s at %#x>" % (" ".join(status), id(self))
212

    
213
  @classmethod
214
  def Restore(cls, queue, state):
215
    """Restore a _QueuedJob from serialized state:
216

217
    @type queue: L{JobQueue}
218
    @param queue: to which queue the restored job belongs
219
    @type state: dict
220
    @param state: the serialized state
221
    @rtype: _JobQueue
222
    @return: the restored _JobQueue instance
223

224
    """
225
    obj = _QueuedJob.__new__(cls)
226
    obj.queue = queue
227
    obj.id = state["id"]
228
    obj.received_timestamp = state.get("received_timestamp", None)
229
    obj.start_timestamp = state.get("start_timestamp", None)
230
    obj.end_timestamp = state.get("end_timestamp", None)
231

    
232
    obj.ops = []
233
    obj.log_serial = 0
234
    for op_state in state["ops"]:
235
      op = _QueuedOpCode.Restore(op_state)
236
      for log_entry in op.log:
237
        obj.log_serial = max(obj.log_serial, log_entry[0])
238
      obj.ops.append(op)
239

    
240
    return obj
241

    
242
  def Serialize(self):
243
    """Serialize the _JobQueue instance.
244

245
    @rtype: dict
246
    @return: the serialized state
247

248
    """
249
    return {
250
      "id": self.id,
251
      "ops": [op.Serialize() for op in self.ops],
252
      "start_timestamp": self.start_timestamp,
253
      "end_timestamp": self.end_timestamp,
254
      "received_timestamp": self.received_timestamp,
255
      }
256

    
257
  def CalcStatus(self):
258
    """Compute the status of this job.
259

260
    This function iterates over all the _QueuedOpCodes in the job and
261
    based on their status, computes the job status.
262

263
    The algorithm is:
264
      - if we find a cancelled, or finished with error, the job
265
        status will be the same
266
      - otherwise, the last opcode with the status one of:
267
          - waitlock
268
          - canceling
269
          - running
270

271
        will determine the job status
272

273
      - otherwise, it means either all opcodes are queued, or success,
274
        and the job status will be the same
275

276
    @return: the job status
277

278
    """
279
    status = constants.JOB_STATUS_QUEUED
280

    
281
    all_success = True
282
    for op in self.ops:
283
      if op.status == constants.OP_STATUS_SUCCESS:
284
        continue
285

    
286
      all_success = False
287

    
288
      if op.status == constants.OP_STATUS_QUEUED:
289
        pass
290
      elif op.status == constants.OP_STATUS_WAITLOCK:
291
        status = constants.JOB_STATUS_WAITLOCK
292
      elif op.status == constants.OP_STATUS_RUNNING:
293
        status = constants.JOB_STATUS_RUNNING
294
      elif op.status == constants.OP_STATUS_CANCELING:
295
        status = constants.JOB_STATUS_CANCELING
296
        break
297
      elif op.status == constants.OP_STATUS_ERROR:
298
        status = constants.JOB_STATUS_ERROR
299
        # The whole job fails if one opcode failed
300
        break
301
      elif op.status == constants.OP_STATUS_CANCELED:
302
        status = constants.OP_STATUS_CANCELED
303
        break
304

    
305
    if all_success:
306
      status = constants.JOB_STATUS_SUCCESS
307

    
308
    return status
309

    
310
  def CalcPriority(self):
311
    """Gets the current priority for this job.
312

313
    Only unfinished opcodes are considered. When all are done, the default
314
    priority is used.
315

316
    @rtype: int
317

318
    """
319
    priorities = [op.priority for op in self.ops
320
                  if op.status not in constants.OPS_FINALIZED]
321

    
322
    if not priorities:
323
      # All opcodes are done, assume default priority
324
      return constants.OP_PRIO_DEFAULT
325

    
326
    return min(priorities)
327

    
328
  def GetLogEntries(self, newer_than):
329
    """Selectively returns the log entries.
330

331
    @type newer_than: None or int
332
    @param newer_than: if this is None, return all log entries,
333
        otherwise return only the log entries with serial higher
334
        than this value
335
    @rtype: list
336
    @return: the list of the log entries selected
337

338
    """
339
    if newer_than is None:
340
      serial = -1
341
    else:
342
      serial = newer_than
343

    
344
    entries = []
345
    for op in self.ops:
346
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
347

    
348
    return entries
349

    
350
  def GetInfo(self, fields):
351
    """Returns information about a job.
352

353
    @type fields: list
354
    @param fields: names of fields to return
355
    @rtype: list
356
    @return: list with one element for each field
357
    @raise errors.OpExecError: when an invalid field
358
        has been passed
359

360
    """
361
    row = []
362
    for fname in fields:
363
      if fname == "id":
364
        row.append(self.id)
365
      elif fname == "status":
366
        row.append(self.CalcStatus())
367
      elif fname == "ops":
368
        row.append([op.input.__getstate__() for op in self.ops])
369
      elif fname == "opresult":
370
        row.append([op.result for op in self.ops])
371
      elif fname == "opstatus":
372
        row.append([op.status for op in self.ops])
373
      elif fname == "oplog":
374
        row.append([op.log for op in self.ops])
375
      elif fname == "opstart":
376
        row.append([op.start_timestamp for op in self.ops])
377
      elif fname == "opexec":
378
        row.append([op.exec_timestamp for op in self.ops])
379
      elif fname == "opend":
380
        row.append([op.end_timestamp for op in self.ops])
381
      elif fname == "received_ts":
382
        row.append(self.received_timestamp)
383
      elif fname == "start_ts":
384
        row.append(self.start_timestamp)
385
      elif fname == "end_ts":
386
        row.append(self.end_timestamp)
387
      elif fname == "summary":
388
        row.append([op.input.Summary() for op in self.ops])
389
      else:
390
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
391
    return row
392

    
393
  def MarkUnfinishedOps(self, status, result):
394
    """Mark unfinished opcodes with a given status and result.
395

396
    This is an utility function for marking all running or waiting to
397
    be run opcodes with a given status. Opcodes which are already
398
    finalised are not changed.
399

400
    @param status: a given opcode status
401
    @param result: the opcode result
402

403
    """
404
    not_marked = True
405
    for op in self.ops:
406
      if op.status in constants.OPS_FINALIZED:
407
        assert not_marked, "Finalized opcodes found after non-finalized ones"
408
        continue
409
      op.status = status
410
      op.result = result
411
      not_marked = False
412

    
413
  def Cancel(self):
414
    status = self.CalcStatus()
415

    
416
    if status not in (constants.JOB_STATUS_QUEUED,
417
                      constants.JOB_STATUS_WAITLOCK):
418
      logging.debug("Job %s is no longer waiting in the queue", self.id)
419
      return (False, "Job %s is no longer waiting in the queue" % self.id)
420

    
421
    if status == constants.JOB_STATUS_QUEUED:
422
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
423
                             "Job canceled by request")
424
      msg = "Job %s canceled" % self.id
425

    
426
    elif status == constants.JOB_STATUS_WAITLOCK:
427
      # The worker will notice the new status and cancel the job
428
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
429
      msg = "Job %s will be canceled" % self.id
430

    
431
    return (True, msg)
432

    
433

    
434
class _OpExecCallbacks(mcpu.OpExecCbBase):
435
  def __init__(self, queue, job, op):
436
    """Initializes this class.
437

438
    @type queue: L{JobQueue}
439
    @param queue: Job queue
440
    @type job: L{_QueuedJob}
441
    @param job: Job object
442
    @type op: L{_QueuedOpCode}
443
    @param op: OpCode
444

445
    """
446
    assert queue, "Queue is missing"
447
    assert job, "Job is missing"
448
    assert op, "Opcode is missing"
449

    
450
    self._queue = queue
451
    self._job = job
452
    self._op = op
453

    
454
  def _CheckCancel(self):
455
    """Raises an exception to cancel the job if asked to.
456

457
    """
458
    # Cancel here if we were asked to
459
    if self._op.status == constants.OP_STATUS_CANCELING:
460
      logging.debug("Canceling opcode")
461
      raise CancelJob()
462

    
463
  @locking.ssynchronized(_QUEUE, shared=1)
464
  def NotifyStart(self):
465
    """Mark the opcode as running, not lock-waiting.
466

467
    This is called from the mcpu code as a notifier function, when the LU is
468
    finally about to start the Exec() method. Of course, to have end-user
469
    visible results, the opcode must be initially (before calling into
470
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
471

472
    """
473
    assert self._op in self._job.ops
474
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
475
                               constants.OP_STATUS_CANCELING)
476

    
477
    # Cancel here if we were asked to
478
    self._CheckCancel()
479

    
480
    logging.debug("Opcode is now running")
481

    
482
    self._op.status = constants.OP_STATUS_RUNNING
483
    self._op.exec_timestamp = TimeStampNow()
484

    
485
    # And finally replicate the job status
486
    self._queue.UpdateJobUnlocked(self._job)
487

    
488
  @locking.ssynchronized(_QUEUE, shared=1)
489
  def _AppendFeedback(self, timestamp, log_type, log_msg):
490
    """Internal feedback append function, with locks
491

492
    """
493
    self._job.log_serial += 1
494
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
495
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
496

    
497
  def Feedback(self, *args):
498
    """Append a log entry.
499

500
    """
501
    assert len(args) < 3
502

    
503
    if len(args) == 1:
504
      log_type = constants.ELOG_MESSAGE
505
      log_msg = args[0]
506
    else:
507
      (log_type, log_msg) = args
508

    
509
    # The time is split to make serialization easier and not lose
510
    # precision.
511
    timestamp = utils.SplitTime(time.time())
512
    self._AppendFeedback(timestamp, log_type, log_msg)
513

    
514
  def CheckCancel(self):
515
    """Check whether job has been cancelled.
516

517
    """
518
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
519
                               constants.OP_STATUS_CANCELING)
520

    
521
    # Cancel here if we were asked to
522
    self._CheckCancel()
523

    
524

    
525
class _JobChangesChecker(object):
526
  def __init__(self, fields, prev_job_info, prev_log_serial):
527
    """Initializes this class.
528

529
    @type fields: list of strings
530
    @param fields: Fields requested by LUXI client
531
    @type prev_job_info: string
532
    @param prev_job_info: previous job info, as passed by the LUXI client
533
    @type prev_log_serial: string
534
    @param prev_log_serial: previous job serial, as passed by the LUXI client
535

536
    """
537
    self._fields = fields
538
    self._prev_job_info = prev_job_info
539
    self._prev_log_serial = prev_log_serial
540

    
541
  def __call__(self, job):
542
    """Checks whether job has changed.
543

544
    @type job: L{_QueuedJob}
545
    @param job: Job object
546

547
    """
548
    status = job.CalcStatus()
549
    job_info = job.GetInfo(self._fields)
550
    log_entries = job.GetLogEntries(self._prev_log_serial)
551

    
552
    # Serializing and deserializing data can cause type changes (e.g. from
553
    # tuple to list) or precision loss. We're doing it here so that we get
554
    # the same modifications as the data received from the client. Without
555
    # this, the comparison afterwards might fail without the data being
556
    # significantly different.
557
    # TODO: we just deserialized from disk, investigate how to make sure that
558
    # the job info and log entries are compatible to avoid this further step.
559
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
560
    # efficient, though floats will be tricky
561
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
562
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
563

    
564
    # Don't even try to wait if the job is no longer running, there will be
565
    # no changes.
566
    if (status not in (constants.JOB_STATUS_QUEUED,
567
                       constants.JOB_STATUS_RUNNING,
568
                       constants.JOB_STATUS_WAITLOCK) or
569
        job_info != self._prev_job_info or
570
        (log_entries and self._prev_log_serial != log_entries[0][0])):
571
      logging.debug("Job %s changed", job.id)
572
      return (job_info, log_entries)
573

    
574
    return None
575

    
576

    
577
class _JobFileChangesWaiter(object):
578
  def __init__(self, filename):
579
    """Initializes this class.
580

581
    @type filename: string
582
    @param filename: Path to job file
583
    @raises errors.InotifyError: if the notifier cannot be setup
584

585
    """
586
    self._wm = pyinotify.WatchManager()
587
    self._inotify_handler = \
588
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
589
    self._notifier = \
590
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
591
    try:
592
      self._inotify_handler.enable()
593
    except Exception:
594
      # pyinotify doesn't close file descriptors automatically
595
      self._notifier.stop()
596
      raise
597

    
598
  def _OnInotify(self, notifier_enabled):
599
    """Callback for inotify.
600

601
    """
602
    if not notifier_enabled:
603
      self._inotify_handler.enable()
604

    
605
  def Wait(self, timeout):
606
    """Waits for the job file to change.
607

608
    @type timeout: float
609
    @param timeout: Timeout in seconds
610
    @return: Whether there have been events
611

612
    """
613
    assert timeout >= 0
614
    have_events = self._notifier.check_events(timeout * 1000)
615
    if have_events:
616
      self._notifier.read_events()
617
    self._notifier.process_events()
618
    return have_events
619

    
620
  def Close(self):
621
    """Closes underlying notifier and its file descriptor.
622

623
    """
624
    self._notifier.stop()
625

    
626

    
627
class _JobChangesWaiter(object):
628
  def __init__(self, filename):
629
    """Initializes this class.
630

631
    @type filename: string
632
    @param filename: Path to job file
633

634
    """
635
    self._filewaiter = None
636
    self._filename = filename
637

    
638
  def Wait(self, timeout):
639
    """Waits for a job to change.
640

641
    @type timeout: float
642
    @param timeout: Timeout in seconds
643
    @return: Whether there have been events
644

645
    """
646
    if self._filewaiter:
647
      return self._filewaiter.Wait(timeout)
648

    
649
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
650
    # If this point is reached, return immediately and let caller check the job
651
    # file again in case there were changes since the last check. This avoids a
652
    # race condition.
653
    self._filewaiter = _JobFileChangesWaiter(self._filename)
654

    
655
    return True
656

    
657
  def Close(self):
658
    """Closes underlying waiter.
659

660
    """
661
    if self._filewaiter:
662
      self._filewaiter.Close()
663

    
664

    
665
class _WaitForJobChangesHelper(object):
666
  """Helper class using inotify to wait for changes in a job file.
667

668
  This class takes a previous job status and serial, and alerts the client when
669
  the current job status has changed.
670

671
  """
672
  @staticmethod
673
  def _CheckForChanges(job_load_fn, check_fn):
674
    job = job_load_fn()
675
    if not job:
676
      raise errors.JobLost()
677

    
678
    result = check_fn(job)
679
    if result is None:
680
      raise utils.RetryAgain()
681

    
682
    return result
683

    
684
  def __call__(self, filename, job_load_fn,
685
               fields, prev_job_info, prev_log_serial, timeout):
686
    """Waits for changes on a job.
687

688
    @type filename: string
689
    @param filename: File on which to wait for changes
690
    @type job_load_fn: callable
691
    @param job_load_fn: Function to load job
692
    @type fields: list of strings
693
    @param fields: Which fields to check for changes
694
    @type prev_job_info: list or None
695
    @param prev_job_info: Last job information returned
696
    @type prev_log_serial: int
697
    @param prev_log_serial: Last job message serial number
698
    @type timeout: float
699
    @param timeout: maximum time to wait in seconds
700

701
    """
702
    try:
703
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
704
      waiter = _JobChangesWaiter(filename)
705
      try:
706
        return utils.Retry(compat.partial(self._CheckForChanges,
707
                                          job_load_fn, check_fn),
708
                           utils.RETRY_REMAINING_TIME, timeout,
709
                           wait_fn=waiter.Wait)
710
      finally:
711
        waiter.Close()
712
    except (errors.InotifyError, errors.JobLost):
713
      return None
714
    except utils.RetryTimeout:
715
      return constants.JOB_NOTCHANGED
716

    
717

    
718
def _EncodeOpError(err):
719
  """Encodes an error which occurred while processing an opcode.
720

721
  """
722
  if isinstance(err, errors.GenericError):
723
    to_encode = err
724
  else:
725
    to_encode = errors.OpExecError(str(err))
726

    
727
  return errors.EncodeException(to_encode)
728

    
729

    
730
class _JobQueueWorker(workerpool.BaseWorker):
731
  """The actual job workers.
732

733
  """
734
  def RunTask(self, job): # pylint: disable-msg=W0221
735
    """Job executor.
736

737
    This functions processes a job. It is closely tied to the _QueuedJob and
738
    _QueuedOpCode classes.
739

740
    @type job: L{_QueuedJob}
741
    @param job: the job to be processed
742

743
    """
744
    self.SetTaskName("Job%s" % job.id)
745

    
746
    logging.info("Processing job %s", job.id)
747
    proc = mcpu.Processor(self.pool.queue.context, job.id)
748
    queue = job.queue
749
    try:
750
      try:
751
        count = len(job.ops)
752
        for idx, op in enumerate(job.ops):
753
          op_summary = op.input.Summary()
754
          if op.status == constants.OP_STATUS_SUCCESS:
755
            # this is a job that was partially completed before master
756
            # daemon shutdown, so it can be expected that some opcodes
757
            # are already completed successfully (if any did error
758
            # out, then the whole job should have been aborted and not
759
            # resubmitted for processing)
760
            logging.info("Op %s/%s: opcode %s already processed, skipping",
761
                         idx + 1, count, op_summary)
762
            continue
763
          try:
764
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
765
                         op_summary)
766

    
767
            queue.acquire(shared=1)
768
            try:
769
              if op.status == constants.OP_STATUS_CANCELED:
770
                logging.debug("Canceling opcode")
771
                raise CancelJob()
772
              assert op.status == constants.OP_STATUS_QUEUED
773
              logging.debug("Opcode %s/%s waiting for locks",
774
                            idx + 1, count)
775
              op.status = constants.OP_STATUS_WAITLOCK
776
              op.result = None
777
              op.start_timestamp = TimeStampNow()
778
              if idx == 0: # first opcode
779
                job.start_timestamp = op.start_timestamp
780
              queue.UpdateJobUnlocked(job)
781

    
782
              input_opcode = op.input
783
            finally:
784
              queue.release()
785

    
786
            # Make sure not to hold queue lock while calling ExecOpCode
787
            result = proc.ExecOpCode(input_opcode,
788
                                     _OpExecCallbacks(queue, job, op))
789

    
790
            queue.acquire(shared=1)
791
            try:
792
              logging.debug("Opcode %s/%s succeeded", idx + 1, count)
793
              op.status = constants.OP_STATUS_SUCCESS
794
              op.result = result
795
              op.end_timestamp = TimeStampNow()
796
              if idx == count - 1:
797
                job.end_timestamp = TimeStampNow()
798

    
799
                # Consistency check
800
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
801
                                  for i in job.ops)
802

    
803
              queue.UpdateJobUnlocked(job)
804
            finally:
805
              queue.release()
806

    
807
            logging.info("Op %s/%s: Successfully finished opcode %s",
808
                         idx + 1, count, op_summary)
809
          except CancelJob:
810
            # Will be handled further up
811
            raise
812
          except Exception, err:
813
            queue.acquire(shared=1)
814
            try:
815
              try:
816
                logging.debug("Opcode %s/%s failed", idx + 1, count)
817
                op.status = constants.OP_STATUS_ERROR
818
                op.result = _EncodeOpError(err)
819
                op.end_timestamp = TimeStampNow()
820
                logging.info("Op %s/%s: Error in opcode %s: %s",
821
                             idx + 1, count, op_summary, err)
822

    
823
                to_encode = errors.OpExecError("Preceding opcode failed")
824
                job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
825
                                      _EncodeOpError(to_encode))
826

    
827
                # Consistency check
828
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
829
                                  for i in job.ops[:idx])
830
                assert compat.all(i.status == constants.OP_STATUS_ERROR and
831
                                  errors.GetEncodedError(i.result)
832
                                  for i in job.ops[idx:])
833
              finally:
834
                job.end_timestamp = TimeStampNow()
835
                queue.UpdateJobUnlocked(job)
836
            finally:
837
              queue.release()
838
            raise
839

    
840
      except CancelJob:
841
        queue.acquire(shared=1)
842
        try:
843
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
844
                                "Job canceled by request")
845
          job.end_timestamp = TimeStampNow()
846
          queue.UpdateJobUnlocked(job)
847
        finally:
848
          queue.release()
849
      except errors.GenericError, err:
850
        logging.exception("Ganeti exception")
851
      except:
852
        logging.exception("Unhandled exception")
853
    finally:
854
      status = job.CalcStatus()
855
      logging.info("Finished job %s, status = %s", job.id, status)
856

    
857

    
858
class _JobQueueWorkerPool(workerpool.WorkerPool):
859
  """Simple class implementing a job-processing workerpool.
860

861
  """
862
  def __init__(self, queue):
863
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
864
                                              JOBQUEUE_THREADS,
865
                                              _JobQueueWorker)
866
    self.queue = queue
867

    
868

    
869
def _RequireOpenQueue(fn):
870
  """Decorator for "public" functions.
871

872
  This function should be used for all 'public' functions. That is,
873
  functions usually called from other classes. Note that this should
874
  be applied only to methods (not plain functions), since it expects
875
  that the decorated function is called with a first argument that has
876
  a '_queue_filelock' argument.
877

878
  @warning: Use this decorator only after locking.ssynchronized
879

880
  Example::
881
    @locking.ssynchronized(_LOCK)
882
    @_RequireOpenQueue
883
    def Example(self):
884
      pass
885

886
  """
887
  def wrapper(self, *args, **kwargs):
888
    # pylint: disable-msg=W0212
889
    assert self._queue_filelock is not None, "Queue should be open"
890
    return fn(self, *args, **kwargs)
891
  return wrapper
892

    
893

    
894
class JobQueue(object):
895
  """Queue used to manage the jobs.
896

897
  @cvar _RE_JOB_FILE: regex matching the valid job file names
898

899
  """
900
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
901

    
902
  def __init__(self, context):
903
    """Constructor for JobQueue.
904

905
    The constructor will initialize the job queue object and then
906
    start loading the current jobs from disk, either for starting them
907
    (if they were queue) or for aborting them (if they were already
908
    running).
909

910
    @type context: GanetiContext
911
    @param context: the context object for access to the configuration
912
        data and other ganeti objects
913

914
    """
915
    self.context = context
916
    self._memcache = weakref.WeakValueDictionary()
917
    self._my_hostname = netutils.Hostname.GetSysName()
918

    
919
    # The Big JobQueue lock. If a code block or method acquires it in shared
920
    # mode safe it must guarantee concurrency with all the code acquiring it in
921
    # shared mode, including itself. In order not to acquire it at all
922
    # concurrency must be guaranteed with all code acquiring it in shared mode
923
    # and all code acquiring it exclusively.
924
    self._lock = locking.SharedLock("JobQueue")
925

    
926
    self.acquire = self._lock.acquire
927
    self.release = self._lock.release
928

    
929
    # Initialize the queue, and acquire the filelock.
930
    # This ensures no other process is working on the job queue.
931
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
932

    
933
    # Read serial file
934
    self._last_serial = jstore.ReadSerial()
935
    assert self._last_serial is not None, ("Serial file was modified between"
936
                                           " check in jstore and here")
937

    
938
    # Get initial list of nodes
939
    self._nodes = dict((n.name, n.primary_ip)
940
                       for n in self.context.cfg.GetAllNodesInfo().values()
941
                       if n.master_candidate)
942

    
943
    # Remove master node
944
    self._nodes.pop(self._my_hostname, None)
945

    
946
    # TODO: Check consistency across nodes
947

    
948
    self._queue_size = 0
949
    self._UpdateQueueSizeUnlocked()
950
    self._drained = self._IsQueueMarkedDrain()
951

    
952
    # Setup worker pool
953
    self._wpool = _JobQueueWorkerPool(self)
954
    try:
955
      self._InspectQueue()
956
    except:
957
      self._wpool.TerminateWorkers()
958
      raise
959

    
960
  @locking.ssynchronized(_LOCK)
961
  @_RequireOpenQueue
962
  def _InspectQueue(self):
963
    """Loads the whole job queue and resumes unfinished jobs.
964

965
    This function needs the lock here because WorkerPool.AddTask() may start a
966
    job while we're still doing our work.
967

968
    """
969
    logging.info("Inspecting job queue")
970

    
971
    all_job_ids = self._GetJobIDsUnlocked()
972
    jobs_count = len(all_job_ids)
973
    lastinfo = time.time()
974
    for idx, job_id in enumerate(all_job_ids):
975
      # Give an update every 1000 jobs or 10 seconds
976
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
977
          idx == (jobs_count - 1)):
978
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
979
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
980
        lastinfo = time.time()
981

    
982
      job = self._LoadJobUnlocked(job_id)
983

    
984
      # a failure in loading the job can cause 'None' to be returned
985
      if job is None:
986
        continue
987

    
988
      status = job.CalcStatus()
989

    
990
      if status in (constants.JOB_STATUS_QUEUED, ):
991
        self._wpool.AddTask((job, ))
992

    
993
      elif status in (constants.JOB_STATUS_RUNNING,
994
                      constants.JOB_STATUS_WAITLOCK,
995
                      constants.JOB_STATUS_CANCELING):
996
        logging.warning("Unfinished job %s found: %s", job.id, job)
997
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
998
                              "Unclean master daemon shutdown")
999
        self.UpdateJobUnlocked(job)
1000

    
1001
    logging.info("Job queue inspection finished")
1002

    
1003
  @locking.ssynchronized(_LOCK)
1004
  @_RequireOpenQueue
1005
  def AddNode(self, node):
1006
    """Register a new node with the queue.
1007

1008
    @type node: L{objects.Node}
1009
    @param node: the node object to be added
1010

1011
    """
1012
    node_name = node.name
1013
    assert node_name != self._my_hostname
1014

    
1015
    # Clean queue directory on added node
1016
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1017
    msg = result.fail_msg
1018
    if msg:
1019
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1020
                      node_name, msg)
1021

    
1022
    if not node.master_candidate:
1023
      # remove if existing, ignoring errors
1024
      self._nodes.pop(node_name, None)
1025
      # and skip the replication of the job ids
1026
      return
1027

    
1028
    # Upload the whole queue excluding archived jobs
1029
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1030

    
1031
    # Upload current serial file
1032
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1033

    
1034
    for file_name in files:
1035
      # Read file content
1036
      content = utils.ReadFile(file_name)
1037

    
1038
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
1039
                                                  [node.primary_ip],
1040
                                                  file_name, content)
1041
      msg = result[node_name].fail_msg
1042
      if msg:
1043
        logging.error("Failed to upload file %s to node %s: %s",
1044
                      file_name, node_name, msg)
1045

    
1046
    self._nodes[node_name] = node.primary_ip
1047

    
1048
  @locking.ssynchronized(_LOCK)
1049
  @_RequireOpenQueue
1050
  def RemoveNode(self, node_name):
1051
    """Callback called when removing nodes from the cluster.
1052

1053
    @type node_name: str
1054
    @param node_name: the name of the node to remove
1055

1056
    """
1057
    self._nodes.pop(node_name, None)
1058

    
1059
  @staticmethod
1060
  def _CheckRpcResult(result, nodes, failmsg):
1061
    """Verifies the status of an RPC call.
1062

1063
    Since we aim to keep consistency should this node (the current
1064
    master) fail, we will log errors if our rpc fail, and especially
1065
    log the case when more than half of the nodes fails.
1066

1067
    @param result: the data as returned from the rpc call
1068
    @type nodes: list
1069
    @param nodes: the list of nodes we made the call to
1070
    @type failmsg: str
1071
    @param failmsg: the identifier to be used for logging
1072

1073
    """
1074
    failed = []
1075
    success = []
1076

    
1077
    for node in nodes:
1078
      msg = result[node].fail_msg
1079
      if msg:
1080
        failed.append(node)
1081
        logging.error("RPC call %s (%s) failed on node %s: %s",
1082
                      result[node].call, failmsg, node, msg)
1083
      else:
1084
        success.append(node)
1085

    
1086
    # +1 for the master node
1087
    if (len(success) + 1) < len(failed):
1088
      # TODO: Handle failing nodes
1089
      logging.error("More than half of the nodes failed")
1090

    
1091
  def _GetNodeIp(self):
1092
    """Helper for returning the node name/ip list.
1093

1094
    @rtype: (list, list)
1095
    @return: a tuple of two lists, the first one with the node
1096
        names and the second one with the node addresses
1097

1098
    """
1099
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1100
    name_list = self._nodes.keys()
1101
    addr_list = [self._nodes[name] for name in name_list]
1102
    return name_list, addr_list
1103

    
1104
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1105
    """Writes a file locally and then replicates it to all nodes.
1106

1107
    This function will replace the contents of a file on the local
1108
    node and then replicate it to all the other nodes we have.
1109

1110
    @type file_name: str
1111
    @param file_name: the path of the file to be replicated
1112
    @type data: str
1113
    @param data: the new contents of the file
1114
    @type replicate: boolean
1115
    @param replicate: whether to spread the changes to the remote nodes
1116

1117
    """
1118
    getents = runtime.GetEnts()
1119
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1120
                    gid=getents.masterd_gid)
1121

    
1122
    if replicate:
1123
      names, addrs = self._GetNodeIp()
1124
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1125
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1126

    
1127
  def _RenameFilesUnlocked(self, rename):
1128
    """Renames a file locally and then replicate the change.
1129

1130
    This function will rename a file in the local queue directory
1131
    and then replicate this rename to all the other nodes we have.
1132

1133
    @type rename: list of (old, new)
1134
    @param rename: List containing tuples mapping old to new names
1135

1136
    """
1137
    # Rename them locally
1138
    for old, new in rename:
1139
      utils.RenameFile(old, new, mkdir=True)
1140

    
1141
    # ... and on all nodes
1142
    names, addrs = self._GetNodeIp()
1143
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1144
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1145

    
1146
  @staticmethod
1147
  def _FormatJobID(job_id):
1148
    """Convert a job ID to string format.
1149

1150
    Currently this just does C{str(job_id)} after performing some
1151
    checks, but if we want to change the job id format this will
1152
    abstract this change.
1153

1154
    @type job_id: int or long
1155
    @param job_id: the numeric job id
1156
    @rtype: str
1157
    @return: the formatted job id
1158

1159
    """
1160
    if not isinstance(job_id, (int, long)):
1161
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1162
    if job_id < 0:
1163
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1164

    
1165
    return str(job_id)
1166

    
1167
  @classmethod
1168
  def _GetArchiveDirectory(cls, job_id):
1169
    """Returns the archive directory for a job.
1170

1171
    @type job_id: str
1172
    @param job_id: Job identifier
1173
    @rtype: str
1174
    @return: Directory name
1175

1176
    """
1177
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1178

    
1179
  def _NewSerialsUnlocked(self, count):
1180
    """Generates a new job identifier.
1181

1182
    Job identifiers are unique during the lifetime of a cluster.
1183

1184
    @type count: integer
1185
    @param count: how many serials to return
1186
    @rtype: str
1187
    @return: a string representing the job identifier.
1188

1189
    """
1190
    assert count > 0
1191
    # New number
1192
    serial = self._last_serial + count
1193

    
1194
    # Write to file
1195
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1196
                             "%s\n" % serial, True)
1197

    
1198
    result = [self._FormatJobID(v)
1199
              for v in range(self._last_serial, serial + 1)]
1200
    # Keep it only if we were able to write the file
1201
    self._last_serial = serial
1202

    
1203
    return result
1204

    
1205
  @staticmethod
1206
  def _GetJobPath(job_id):
1207
    """Returns the job file for a given job id.
1208

1209
    @type job_id: str
1210
    @param job_id: the job identifier
1211
    @rtype: str
1212
    @return: the path to the job file
1213

1214
    """
1215
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1216

    
1217
  @classmethod
1218
  def _GetArchivedJobPath(cls, job_id):
1219
    """Returns the archived job file for a give job id.
1220

1221
    @type job_id: str
1222
    @param job_id: the job identifier
1223
    @rtype: str
1224
    @return: the path to the archived job file
1225

1226
    """
1227
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1228
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1229

    
1230
  def _GetJobIDsUnlocked(self, sort=True):
1231
    """Return all known job IDs.
1232

1233
    The method only looks at disk because it's a requirement that all
1234
    jobs are present on disk (so in the _memcache we don't have any
1235
    extra IDs).
1236

1237
    @type sort: boolean
1238
    @param sort: perform sorting on the returned job ids
1239
    @rtype: list
1240
    @return: the list of job IDs
1241

1242
    """
1243
    jlist = []
1244
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1245
      m = self._RE_JOB_FILE.match(filename)
1246
      if m:
1247
        jlist.append(m.group(1))
1248
    if sort:
1249
      jlist = utils.NiceSort(jlist)
1250
    return jlist
1251

    
1252
  def _LoadJobUnlocked(self, job_id):
1253
    """Loads a job from the disk or memory.
1254

1255
    Given a job id, this will return the cached job object if
1256
    existing, or try to load the job from the disk. If loading from
1257
    disk, it will also add the job to the cache.
1258

1259
    @param job_id: the job id
1260
    @rtype: L{_QueuedJob} or None
1261
    @return: either None or the job object
1262

1263
    """
1264
    job = self._memcache.get(job_id, None)
1265
    if job:
1266
      logging.debug("Found job %s in memcache", job_id)
1267
      return job
1268

    
1269
    try:
1270
      job = self._LoadJobFromDisk(job_id)
1271
      if job is None:
1272
        return job
1273
    except errors.JobFileCorrupted:
1274
      old_path = self._GetJobPath(job_id)
1275
      new_path = self._GetArchivedJobPath(job_id)
1276
      if old_path == new_path:
1277
        # job already archived (future case)
1278
        logging.exception("Can't parse job %s", job_id)
1279
      else:
1280
        # non-archived case
1281
        logging.exception("Can't parse job %s, will archive.", job_id)
1282
        self._RenameFilesUnlocked([(old_path, new_path)])
1283
      return None
1284

    
1285
    self._memcache[job_id] = job
1286
    logging.debug("Added job %s to the cache", job_id)
1287
    return job
1288

    
1289
  def _LoadJobFromDisk(self, job_id):
1290
    """Load the given job file from disk.
1291

1292
    Given a job file, read, load and restore it in a _QueuedJob format.
1293

1294
    @type job_id: string
1295
    @param job_id: job identifier
1296
    @rtype: L{_QueuedJob} or None
1297
    @return: either None or the job object
1298

1299
    """
1300
    filepath = self._GetJobPath(job_id)
1301
    logging.debug("Loading job from %s", filepath)
1302
    try:
1303
      raw_data = utils.ReadFile(filepath)
1304
    except EnvironmentError, err:
1305
      if err.errno in (errno.ENOENT, ):
1306
        return None
1307
      raise
1308

    
1309
    try:
1310
      data = serializer.LoadJson(raw_data)
1311
      job = _QueuedJob.Restore(self, data)
1312
    except Exception, err: # pylint: disable-msg=W0703
1313
      raise errors.JobFileCorrupted(err)
1314

    
1315
    return job
1316

    
1317
  def SafeLoadJobFromDisk(self, job_id):
1318
    """Load the given job file from disk.
1319

1320
    Given a job file, read, load and restore it in a _QueuedJob format.
1321
    In case of error reading the job, it gets returned as None, and the
1322
    exception is logged.
1323

1324
    @type job_id: string
1325
    @param job_id: job identifier
1326
    @rtype: L{_QueuedJob} or None
1327
    @return: either None or the job object
1328

1329
    """
1330
    try:
1331
      return self._LoadJobFromDisk(job_id)
1332
    except (errors.JobFileCorrupted, EnvironmentError):
1333
      logging.exception("Can't load/parse job %s", job_id)
1334
      return None
1335

    
1336
  @staticmethod
1337
  def _IsQueueMarkedDrain():
1338
    """Check if the queue is marked from drain.
1339

1340
    This currently uses the queue drain file, which makes it a
1341
    per-node flag. In the future this can be moved to the config file.
1342

1343
    @rtype: boolean
1344
    @return: True of the job queue is marked for draining
1345

1346
    """
1347
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1348

    
1349
  def _UpdateQueueSizeUnlocked(self):
1350
    """Update the queue size.
1351

1352
    """
1353
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1354

    
1355
  @locking.ssynchronized(_LOCK)
1356
  @_RequireOpenQueue
1357
  def SetDrainFlag(self, drain_flag):
1358
    """Sets the drain flag for the queue.
1359

1360
    @type drain_flag: boolean
1361
    @param drain_flag: Whether to set or unset the drain flag
1362

1363
    """
1364
    getents = runtime.GetEnts()
1365

    
1366
    if drain_flag:
1367
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
1368
                      uid=getents.masterd_uid, gid=getents.masterd_gid)
1369
    else:
1370
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1371

    
1372
    self._drained = drain_flag
1373

    
1374
    return True
1375

    
1376
  @_RequireOpenQueue
1377
  def _SubmitJobUnlocked(self, job_id, ops):
1378
    """Create and store a new job.
1379

1380
    This enters the job into our job queue and also puts it on the new
1381
    queue, in order for it to be picked up by the queue processors.
1382

1383
    @type job_id: job ID
1384
    @param job_id: the job ID for the new job
1385
    @type ops: list
1386
    @param ops: The list of OpCodes that will become the new job.
1387
    @rtype: L{_QueuedJob}
1388
    @return: the job object to be queued
1389
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1390
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1391
    @raise errors.GenericError: If an opcode is not valid
1392

1393
    """
1394
    # Ok when sharing the big job queue lock, as the drain file is created when
1395
    # the lock is exclusive.
1396
    if self._drained:
1397
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1398

    
1399
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1400
      raise errors.JobQueueFull()
1401

    
1402
    job = _QueuedJob(self, job_id, ops)
1403

    
1404
    # Check priority
1405
    for idx, op in enumerate(job.ops):
1406
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1407
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1408
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1409
                                  " are %s" % (idx, op.priority, allowed))
1410

    
1411
    # Write to disk
1412
    self.UpdateJobUnlocked(job)
1413

    
1414
    self._queue_size += 1
1415

    
1416
    logging.debug("Adding new job %s to the cache", job_id)
1417
    self._memcache[job_id] = job
1418

    
1419
    return job
1420

    
1421
  @locking.ssynchronized(_LOCK)
1422
  @_RequireOpenQueue
1423
  def SubmitJob(self, ops):
1424
    """Create and store a new job.
1425

1426
    @see: L{_SubmitJobUnlocked}
1427

1428
    """
1429
    job_id = self._NewSerialsUnlocked(1)[0]
1430
    self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
1431
    return job_id
1432

    
1433
  @locking.ssynchronized(_LOCK)
1434
  @_RequireOpenQueue
1435
  def SubmitManyJobs(self, jobs):
1436
    """Create and store multiple jobs.
1437

1438
    @see: L{_SubmitJobUnlocked}
1439

1440
    """
1441
    results = []
1442
    tasks = []
1443
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1444
    for job_id, ops in zip(all_job_ids, jobs):
1445
      try:
1446
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1447
        status = True
1448
        data = job_id
1449
      except errors.GenericError, err:
1450
        data = str(err)
1451
        status = False
1452
      results.append((status, data))
1453
    self._wpool.AddManyTasks(tasks)
1454

    
1455
    return results
1456

    
1457
  @_RequireOpenQueue
1458
  def UpdateJobUnlocked(self, job, replicate=True):
1459
    """Update a job's on disk storage.
1460

1461
    After a job has been modified, this function needs to be called in
1462
    order to write the changes to disk and replicate them to the other
1463
    nodes.
1464

1465
    @type job: L{_QueuedJob}
1466
    @param job: the changed job
1467
    @type replicate: boolean
1468
    @param replicate: whether to replicate the change to remote nodes
1469

1470
    """
1471
    filename = self._GetJobPath(job.id)
1472
    data = serializer.DumpJson(job.Serialize(), indent=False)
1473
    logging.debug("Writing job %s to %s", job.id, filename)
1474
    self._UpdateJobQueueFile(filename, data, replicate)
1475

    
1476
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1477
                        timeout):
1478
    """Waits for changes in a job.
1479

1480
    @type job_id: string
1481
    @param job_id: Job identifier
1482
    @type fields: list of strings
1483
    @param fields: Which fields to check for changes
1484
    @type prev_job_info: list or None
1485
    @param prev_job_info: Last job information returned
1486
    @type prev_log_serial: int
1487
    @param prev_log_serial: Last job message serial number
1488
    @type timeout: float
1489
    @param timeout: maximum time to wait in seconds
1490
    @rtype: tuple (job info, log entries)
1491
    @return: a tuple of the job information as required via
1492
        the fields parameter, and the log entries as a list
1493

1494
        if the job has not changed and the timeout has expired,
1495
        we instead return a special value,
1496
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1497
        as such by the clients
1498

1499
    """
1500
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1501

    
1502
    helper = _WaitForJobChangesHelper()
1503

    
1504
    return helper(self._GetJobPath(job_id), load_fn,
1505
                  fields, prev_job_info, prev_log_serial, timeout)
1506

    
1507
  @locking.ssynchronized(_LOCK)
1508
  @_RequireOpenQueue
1509
  def CancelJob(self, job_id):
1510
    """Cancels a job.
1511

1512
    This will only succeed if the job has not started yet.
1513

1514
    @type job_id: string
1515
    @param job_id: job ID of job to be cancelled.
1516

1517
    """
1518
    logging.info("Cancelling job %s", job_id)
1519

    
1520
    job = self._LoadJobUnlocked(job_id)
1521
    if not job:
1522
      logging.debug("Job %s not found", job_id)
1523
      return (False, "Job %s not found" % job_id)
1524

    
1525
    (success, msg) = job.Cancel()
1526

    
1527
    if success:
1528
      self.UpdateJobUnlocked(job)
1529

    
1530
    return (success, msg)
1531

    
1532
  @_RequireOpenQueue
1533
  def _ArchiveJobsUnlocked(self, jobs):
1534
    """Archives jobs.
1535

1536
    @type jobs: list of L{_QueuedJob}
1537
    @param jobs: Job objects
1538
    @rtype: int
1539
    @return: Number of archived jobs
1540

1541
    """
1542
    archive_jobs = []
1543
    rename_files = []
1544
    for job in jobs:
1545
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1546
        logging.debug("Job %s is not yet done", job.id)
1547
        continue
1548

    
1549
      archive_jobs.append(job)
1550

    
1551
      old = self._GetJobPath(job.id)
1552
      new = self._GetArchivedJobPath(job.id)
1553
      rename_files.append((old, new))
1554

    
1555
    # TODO: What if 1..n files fail to rename?
1556
    self._RenameFilesUnlocked(rename_files)
1557

    
1558
    logging.debug("Successfully archived job(s) %s",
1559
                  utils.CommaJoin(job.id for job in archive_jobs))
1560

    
1561
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1562
    # the files, we update the cached queue size from the filesystem. When we
1563
    # get around to fix the TODO: above, we can use the number of actually
1564
    # archived jobs to fix this.
1565
    self._UpdateQueueSizeUnlocked()
1566
    return len(archive_jobs)
1567

    
1568
  @locking.ssynchronized(_LOCK)
1569
  @_RequireOpenQueue
1570
  def ArchiveJob(self, job_id):
1571
    """Archives a job.
1572

1573
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1574

1575
    @type job_id: string
1576
    @param job_id: Job ID of job to be archived.
1577
    @rtype: bool
1578
    @return: Whether job was archived
1579

1580
    """
1581
    logging.info("Archiving job %s", job_id)
1582

    
1583
    job = self._LoadJobUnlocked(job_id)
1584
    if not job:
1585
      logging.debug("Job %s not found", job_id)
1586
      return False
1587

    
1588
    return self._ArchiveJobsUnlocked([job]) == 1
1589

    
1590
  @locking.ssynchronized(_LOCK)
1591
  @_RequireOpenQueue
1592
  def AutoArchiveJobs(self, age, timeout):
1593
    """Archives all jobs based on age.
1594

1595
    The method will archive all jobs which are older than the age
1596
    parameter. For jobs that don't have an end timestamp, the start
1597
    timestamp will be considered. The special '-1' age will cause
1598
    archival of all jobs (that are not running or queued).
1599

1600
    @type age: int
1601
    @param age: the minimum age in seconds
1602

1603
    """
1604
    logging.info("Archiving jobs with age more than %s seconds", age)
1605

    
1606
    now = time.time()
1607
    end_time = now + timeout
1608
    archived_count = 0
1609
    last_touched = 0
1610

    
1611
    all_job_ids = self._GetJobIDsUnlocked()
1612
    pending = []
1613
    for idx, job_id in enumerate(all_job_ids):
1614
      last_touched = idx + 1
1615

    
1616
      # Not optimal because jobs could be pending
1617
      # TODO: Measure average duration for job archival and take number of
1618
      # pending jobs into account.
1619
      if time.time() > end_time:
1620
        break
1621

    
1622
      # Returns None if the job failed to load
1623
      job = self._LoadJobUnlocked(job_id)
1624
      if job:
1625
        if job.end_timestamp is None:
1626
          if job.start_timestamp is None:
1627
            job_age = job.received_timestamp
1628
          else:
1629
            job_age = job.start_timestamp
1630
        else:
1631
          job_age = job.end_timestamp
1632

    
1633
        if age == -1 or now - job_age[0] > age:
1634
          pending.append(job)
1635

    
1636
          # Archive 10 jobs at a time
1637
          if len(pending) >= 10:
1638
            archived_count += self._ArchiveJobsUnlocked(pending)
1639
            pending = []
1640

    
1641
    if pending:
1642
      archived_count += self._ArchiveJobsUnlocked(pending)
1643

    
1644
    return (archived_count, len(all_job_ids) - last_touched)
1645

    
1646
  def QueryJobs(self, job_ids, fields):
1647
    """Returns a list of jobs in queue.
1648

1649
    @type job_ids: list
1650
    @param job_ids: sequence of job identifiers or None for all
1651
    @type fields: list
1652
    @param fields: names of fields to return
1653
    @rtype: list
1654
    @return: list one element per job, each element being list with
1655
        the requested fields
1656

1657
    """
1658
    jobs = []
1659
    list_all = False
1660
    if not job_ids:
1661
      # Since files are added to/removed from the queue atomically, there's no
1662
      # risk of getting the job ids in an inconsistent state.
1663
      job_ids = self._GetJobIDsUnlocked()
1664
      list_all = True
1665

    
1666
    for job_id in job_ids:
1667
      job = self.SafeLoadJobFromDisk(job_id)
1668
      if job is not None:
1669
        jobs.append(job.GetInfo(fields))
1670
      elif not list_all:
1671
        jobs.append(None)
1672

    
1673
    return jobs
1674

    
1675
  @locking.ssynchronized(_LOCK)
1676
  @_RequireOpenQueue
1677
  def Shutdown(self):
1678
    """Stops the job queue.
1679

1680
    This shutdowns all the worker threads an closes the queue.
1681

1682
    """
1683
    self._wpool.TerminateWorkers()
1684

    
1685
    self._queue_filelock.Close()
1686
    self._queue_filelock = None