Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 7b5c4a69

History | View | Annotate | Download (50.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import errno
35
import re
36
import time
37
import weakref
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
59

    
60

    
61
JOBQUEUE_THREADS = 25
62
JOBS_PER_ARCHIVE_DIRECTORY = 10000
63

    
64
# member lock names to be passed to @ssynchronized decorator
65
_LOCK = "_lock"
66
_QUEUE = "_queue"
67

    
68

    
69
class CancelJob(Exception):
70
  """Special exception to cancel a job.
71

72
  """
73

    
74

    
75
def TimeStampNow():
76
  """Returns the current timestamp.
77

78
  @rtype: tuple
79
  @return: the current time in the (seconds, microseconds) format
80

81
  """
82
  return utils.SplitTime(time.time())
83

    
84

    
85
class _QueuedOpCode(object):
86
  """Encapsulates an opcode object.
87

88
  @ivar log: holds the execution log and consists of tuples
89
  of the form C{(log_serial, timestamp, level, message)}
90
  @ivar input: the OpCode we encapsulate
91
  @ivar status: the current status
92
  @ivar result: the result of the LU execution
93
  @ivar start_timestamp: timestamp for the start of the execution
94
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
95
  @ivar stop_timestamp: timestamp for the end of the execution
96

97
  """
98
  __slots__ = ["input", "status", "result", "log", "priority",
99
               "start_timestamp", "exec_timestamp", "end_timestamp",
100
               "__weakref__"]
101

    
102
  def __init__(self, op):
103
    """Constructor for the _QuededOpCode.
104

105
    @type op: L{opcodes.OpCode}
106
    @param op: the opcode we encapsulate
107

108
    """
109
    self.input = op
110
    self.status = constants.OP_STATUS_QUEUED
111
    self.result = None
112
    self.log = []
113
    self.start_timestamp = None
114
    self.exec_timestamp = None
115
    self.end_timestamp = None
116

    
117
    # Get initial priority (it might change during the lifetime of this opcode)
118
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
119

    
120
  @classmethod
121
  def Restore(cls, state):
122
    """Restore the _QueuedOpCode from the serialized form.
123

124
    @type state: dict
125
    @param state: the serialized state
126
    @rtype: _QueuedOpCode
127
    @return: a new _QueuedOpCode instance
128

129
    """
130
    obj = _QueuedOpCode.__new__(cls)
131
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
132
    obj.status = state["status"]
133
    obj.result = state["result"]
134
    obj.log = state["log"]
135
    obj.start_timestamp = state.get("start_timestamp", None)
136
    obj.exec_timestamp = state.get("exec_timestamp", None)
137
    obj.end_timestamp = state.get("end_timestamp", None)
138
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
139
    return obj
140

    
141
  def Serialize(self):
142
    """Serializes this _QueuedOpCode.
143

144
    @rtype: dict
145
    @return: the dictionary holding the serialized state
146

147
    """
148
    return {
149
      "input": self.input.__getstate__(),
150
      "status": self.status,
151
      "result": self.result,
152
      "log": self.log,
153
      "start_timestamp": self.start_timestamp,
154
      "exec_timestamp": self.exec_timestamp,
155
      "end_timestamp": self.end_timestamp,
156
      "priority": self.priority,
157
      }
158

    
159

    
160
class _QueuedJob(object):
161
  """In-memory job representation.
162

163
  This is what we use to track the user-submitted jobs. Locking must
164
  be taken care of by users of this class.
165

166
  @type queue: L{JobQueue}
167
  @ivar queue: the parent queue
168
  @ivar id: the job ID
169
  @type ops: list
170
  @ivar ops: the list of _QueuedOpCode that constitute the job
171
  @type log_serial: int
172
  @ivar log_serial: holds the index for the next log entry
173
  @ivar received_timestamp: the timestamp for when the job was received
174
  @ivar start_timestmap: the timestamp for start of execution
175
  @ivar end_timestamp: the timestamp for end of execution
176

177
  """
178
  # pylint: disable-msg=W0212
179
  __slots__ = ["queue", "id", "ops", "log_serial",
180
               "received_timestamp", "start_timestamp", "end_timestamp",
181
               "__weakref__"]
182

    
183
  def __init__(self, queue, job_id, ops):
184
    """Constructor for the _QueuedJob.
185

186
    @type queue: L{JobQueue}
187
    @param queue: our parent queue
188
    @type job_id: job_id
189
    @param job_id: our job id
190
    @type ops: list
191
    @param ops: the list of opcodes we hold, which will be encapsulated
192
        in _QueuedOpCodes
193

194
    """
195
    if not ops:
196
      raise errors.GenericError("A job needs at least one opcode")
197

    
198
    self.queue = queue
199
    self.id = job_id
200
    self.ops = [_QueuedOpCode(op) for op in ops]
201
    self.log_serial = 0
202
    self.received_timestamp = TimeStampNow()
203
    self.start_timestamp = None
204
    self.end_timestamp = None
205

    
206
  def __repr__(self):
207
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
208
              "id=%s" % self.id,
209
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
210

    
211
    return "<%s at %#x>" % (" ".join(status), id(self))
212

    
213
  @classmethod
214
  def Restore(cls, queue, state):
215
    """Restore a _QueuedJob from serialized state:
216

217
    @type queue: L{JobQueue}
218
    @param queue: to which queue the restored job belongs
219
    @type state: dict
220
    @param state: the serialized state
221
    @rtype: _JobQueue
222
    @return: the restored _JobQueue instance
223

224
    """
225
    obj = _QueuedJob.__new__(cls)
226
    obj.queue = queue
227
    obj.id = state["id"]
228
    obj.received_timestamp = state.get("received_timestamp", None)
229
    obj.start_timestamp = state.get("start_timestamp", None)
230
    obj.end_timestamp = state.get("end_timestamp", None)
231

    
232
    obj.ops = []
233
    obj.log_serial = 0
234
    for op_state in state["ops"]:
235
      op = _QueuedOpCode.Restore(op_state)
236
      for log_entry in op.log:
237
        obj.log_serial = max(obj.log_serial, log_entry[0])
238
      obj.ops.append(op)
239

    
240
    return obj
241

    
242
  def Serialize(self):
243
    """Serialize the _JobQueue instance.
244

245
    @rtype: dict
246
    @return: the serialized state
247

248
    """
249
    return {
250
      "id": self.id,
251
      "ops": [op.Serialize() for op in self.ops],
252
      "start_timestamp": self.start_timestamp,
253
      "end_timestamp": self.end_timestamp,
254
      "received_timestamp": self.received_timestamp,
255
      }
256

    
257
  def CalcStatus(self):
258
    """Compute the status of this job.
259

260
    This function iterates over all the _QueuedOpCodes in the job and
261
    based on their status, computes the job status.
262

263
    The algorithm is:
264
      - if we find a cancelled, or finished with error, the job
265
        status will be the same
266
      - otherwise, the last opcode with the status one of:
267
          - waitlock
268
          - canceling
269
          - running
270

271
        will determine the job status
272

273
      - otherwise, it means either all opcodes are queued, or success,
274
        and the job status will be the same
275

276
    @return: the job status
277

278
    """
279
    status = constants.JOB_STATUS_QUEUED
280

    
281
    all_success = True
282
    for op in self.ops:
283
      if op.status == constants.OP_STATUS_SUCCESS:
284
        continue
285

    
286
      all_success = False
287

    
288
      if op.status == constants.OP_STATUS_QUEUED:
289
        pass
290
      elif op.status == constants.OP_STATUS_WAITLOCK:
291
        status = constants.JOB_STATUS_WAITLOCK
292
      elif op.status == constants.OP_STATUS_RUNNING:
293
        status = constants.JOB_STATUS_RUNNING
294
      elif op.status == constants.OP_STATUS_CANCELING:
295
        status = constants.JOB_STATUS_CANCELING
296
        break
297
      elif op.status == constants.OP_STATUS_ERROR:
298
        status = constants.JOB_STATUS_ERROR
299
        # The whole job fails if one opcode failed
300
        break
301
      elif op.status == constants.OP_STATUS_CANCELED:
302
        status = constants.OP_STATUS_CANCELED
303
        break
304

    
305
    if all_success:
306
      status = constants.JOB_STATUS_SUCCESS
307

    
308
    return status
309

    
310
  def CalcPriority(self):
311
    """Gets the current priority for this job.
312

313
    Only unfinished opcodes are considered. When all are done, the default
314
    priority is used.
315

316
    @rtype: int
317

318
    """
319
    priorities = [op.priority for op in self.ops
320
                  if op.status not in constants.OPS_FINALIZED]
321

    
322
    if not priorities:
323
      # All opcodes are done, assume default priority
324
      return constants.OP_PRIO_DEFAULT
325

    
326
    return min(priorities)
327

    
328
  def GetLogEntries(self, newer_than):
329
    """Selectively returns the log entries.
330

331
    @type newer_than: None or int
332
    @param newer_than: if this is None, return all log entries,
333
        otherwise return only the log entries with serial higher
334
        than this value
335
    @rtype: list
336
    @return: the list of the log entries selected
337

338
    """
339
    if newer_than is None:
340
      serial = -1
341
    else:
342
      serial = newer_than
343

    
344
    entries = []
345
    for op in self.ops:
346
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
347

    
348
    return entries
349

    
350
  def GetInfo(self, fields):
351
    """Returns information about a job.
352

353
    @type fields: list
354
    @param fields: names of fields to return
355
    @rtype: list
356
    @return: list with one element for each field
357
    @raise errors.OpExecError: when an invalid field
358
        has been passed
359

360
    """
361
    row = []
362
    for fname in fields:
363
      if fname == "id":
364
        row.append(self.id)
365
      elif fname == "status":
366
        row.append(self.CalcStatus())
367
      elif fname == "ops":
368
        row.append([op.input.__getstate__() for op in self.ops])
369
      elif fname == "opresult":
370
        row.append([op.result for op in self.ops])
371
      elif fname == "opstatus":
372
        row.append([op.status for op in self.ops])
373
      elif fname == "oplog":
374
        row.append([op.log for op in self.ops])
375
      elif fname == "opstart":
376
        row.append([op.start_timestamp for op in self.ops])
377
      elif fname == "opexec":
378
        row.append([op.exec_timestamp for op in self.ops])
379
      elif fname == "opend":
380
        row.append([op.end_timestamp for op in self.ops])
381
      elif fname == "received_ts":
382
        row.append(self.received_timestamp)
383
      elif fname == "start_ts":
384
        row.append(self.start_timestamp)
385
      elif fname == "end_ts":
386
        row.append(self.end_timestamp)
387
      elif fname == "summary":
388
        row.append([op.input.Summary() for op in self.ops])
389
      else:
390
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
391
    return row
392

    
393
  def MarkUnfinishedOps(self, status, result):
394
    """Mark unfinished opcodes with a given status and result.
395

396
    This is an utility function for marking all running or waiting to
397
    be run opcodes with a given status. Opcodes which are already
398
    finalised are not changed.
399

400
    @param status: a given opcode status
401
    @param result: the opcode result
402

403
    """
404
    not_marked = True
405
    for op in self.ops:
406
      if op.status in constants.OPS_FINALIZED:
407
        assert not_marked, "Finalized opcodes found after non-finalized ones"
408
        continue
409
      op.status = status
410
      op.result = result
411
      not_marked = False
412

    
413
  def Cancel(self):
414
    """Marks job as canceled/-ing if possible.
415

416
    @rtype: tuple; (bool, string)
417
    @return: Boolean describing whether job was successfully canceled or marked
418
      as canceling and a text message
419

420
    """
421
    status = self.CalcStatus()
422

    
423
    if status not in (constants.JOB_STATUS_QUEUED,
424
                      constants.JOB_STATUS_WAITLOCK):
425
      logging.debug("Job %s is no longer waiting in the queue", self.id)
426
      return (False, "Job %s is no longer waiting in the queue" % self.id)
427

    
428
    if status == constants.JOB_STATUS_QUEUED:
429
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
430
                             "Job canceled by request")
431
      msg = "Job %s canceled" % self.id
432

    
433
    elif status == constants.JOB_STATUS_WAITLOCK:
434
      # The worker will notice the new status and cancel the job
435
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
436
      msg = "Job %s will be canceled" % self.id
437

    
438
    return (True, msg)
439

    
440

    
441
class _OpExecCallbacks(mcpu.OpExecCbBase):
442
  def __init__(self, queue, job, op):
443
    """Initializes this class.
444

445
    @type queue: L{JobQueue}
446
    @param queue: Job queue
447
    @type job: L{_QueuedJob}
448
    @param job: Job object
449
    @type op: L{_QueuedOpCode}
450
    @param op: OpCode
451

452
    """
453
    assert queue, "Queue is missing"
454
    assert job, "Job is missing"
455
    assert op, "Opcode is missing"
456

    
457
    self._queue = queue
458
    self._job = job
459
    self._op = op
460

    
461
  def _CheckCancel(self):
462
    """Raises an exception to cancel the job if asked to.
463

464
    """
465
    # Cancel here if we were asked to
466
    if self._op.status == constants.OP_STATUS_CANCELING:
467
      logging.debug("Canceling opcode")
468
      raise CancelJob()
469

    
470
  @locking.ssynchronized(_QUEUE, shared=1)
471
  def NotifyStart(self):
472
    """Mark the opcode as running, not lock-waiting.
473

474
    This is called from the mcpu code as a notifier function, when the LU is
475
    finally about to start the Exec() method. Of course, to have end-user
476
    visible results, the opcode must be initially (before calling into
477
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
478

479
    """
480
    assert self._op in self._job.ops
481
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
482
                               constants.OP_STATUS_CANCELING)
483

    
484
    # Cancel here if we were asked to
485
    self._CheckCancel()
486

    
487
    logging.debug("Opcode is now running")
488

    
489
    self._op.status = constants.OP_STATUS_RUNNING
490
    self._op.exec_timestamp = TimeStampNow()
491

    
492
    # And finally replicate the job status
493
    self._queue.UpdateJobUnlocked(self._job)
494

    
495
  @locking.ssynchronized(_QUEUE, shared=1)
496
  def _AppendFeedback(self, timestamp, log_type, log_msg):
497
    """Internal feedback append function, with locks
498

499
    """
500
    self._job.log_serial += 1
501
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
502
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
503

    
504
  def Feedback(self, *args):
505
    """Append a log entry.
506

507
    """
508
    assert len(args) < 3
509

    
510
    if len(args) == 1:
511
      log_type = constants.ELOG_MESSAGE
512
      log_msg = args[0]
513
    else:
514
      (log_type, log_msg) = args
515

    
516
    # The time is split to make serialization easier and not lose
517
    # precision.
518
    timestamp = utils.SplitTime(time.time())
519
    self._AppendFeedback(timestamp, log_type, log_msg)
520

    
521
  def CheckCancel(self):
522
    """Check whether job has been cancelled.
523

524
    """
525
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
526
                               constants.OP_STATUS_CANCELING)
527

    
528
    # Cancel here if we were asked to
529
    self._CheckCancel()
530

    
531

    
532
class _JobChangesChecker(object):
533
  def __init__(self, fields, prev_job_info, prev_log_serial):
534
    """Initializes this class.
535

536
    @type fields: list of strings
537
    @param fields: Fields requested by LUXI client
538
    @type prev_job_info: string
539
    @param prev_job_info: previous job info, as passed by the LUXI client
540
    @type prev_log_serial: string
541
    @param prev_log_serial: previous job serial, as passed by the LUXI client
542

543
    """
544
    self._fields = fields
545
    self._prev_job_info = prev_job_info
546
    self._prev_log_serial = prev_log_serial
547

    
548
  def __call__(self, job):
549
    """Checks whether job has changed.
550

551
    @type job: L{_QueuedJob}
552
    @param job: Job object
553

554
    """
555
    status = job.CalcStatus()
556
    job_info = job.GetInfo(self._fields)
557
    log_entries = job.GetLogEntries(self._prev_log_serial)
558

    
559
    # Serializing and deserializing data can cause type changes (e.g. from
560
    # tuple to list) or precision loss. We're doing it here so that we get
561
    # the same modifications as the data received from the client. Without
562
    # this, the comparison afterwards might fail without the data being
563
    # significantly different.
564
    # TODO: we just deserialized from disk, investigate how to make sure that
565
    # the job info and log entries are compatible to avoid this further step.
566
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
567
    # efficient, though floats will be tricky
568
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
569
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
570

    
571
    # Don't even try to wait if the job is no longer running, there will be
572
    # no changes.
573
    if (status not in (constants.JOB_STATUS_QUEUED,
574
                       constants.JOB_STATUS_RUNNING,
575
                       constants.JOB_STATUS_WAITLOCK) or
576
        job_info != self._prev_job_info or
577
        (log_entries and self._prev_log_serial != log_entries[0][0])):
578
      logging.debug("Job %s changed", job.id)
579
      return (job_info, log_entries)
580

    
581
    return None
582

    
583

    
584
class _JobFileChangesWaiter(object):
585
  def __init__(self, filename):
586
    """Initializes this class.
587

588
    @type filename: string
589
    @param filename: Path to job file
590
    @raises errors.InotifyError: if the notifier cannot be setup
591

592
    """
593
    self._wm = pyinotify.WatchManager()
594
    self._inotify_handler = \
595
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
596
    self._notifier = \
597
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
598
    try:
599
      self._inotify_handler.enable()
600
    except Exception:
601
      # pyinotify doesn't close file descriptors automatically
602
      self._notifier.stop()
603
      raise
604

    
605
  def _OnInotify(self, notifier_enabled):
606
    """Callback for inotify.
607

608
    """
609
    if not notifier_enabled:
610
      self._inotify_handler.enable()
611

    
612
  def Wait(self, timeout):
613
    """Waits for the job file to change.
614

615
    @type timeout: float
616
    @param timeout: Timeout in seconds
617
    @return: Whether there have been events
618

619
    """
620
    assert timeout >= 0
621
    have_events = self._notifier.check_events(timeout * 1000)
622
    if have_events:
623
      self._notifier.read_events()
624
    self._notifier.process_events()
625
    return have_events
626

    
627
  def Close(self):
628
    """Closes underlying notifier and its file descriptor.
629

630
    """
631
    self._notifier.stop()
632

    
633

    
634
class _JobChangesWaiter(object):
635
  def __init__(self, filename):
636
    """Initializes this class.
637

638
    @type filename: string
639
    @param filename: Path to job file
640

641
    """
642
    self._filewaiter = None
643
    self._filename = filename
644

    
645
  def Wait(self, timeout):
646
    """Waits for a job to change.
647

648
    @type timeout: float
649
    @param timeout: Timeout in seconds
650
    @return: Whether there have been events
651

652
    """
653
    if self._filewaiter:
654
      return self._filewaiter.Wait(timeout)
655

    
656
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
657
    # If this point is reached, return immediately and let caller check the job
658
    # file again in case there were changes since the last check. This avoids a
659
    # race condition.
660
    self._filewaiter = _JobFileChangesWaiter(self._filename)
661

    
662
    return True
663

    
664
  def Close(self):
665
    """Closes underlying waiter.
666

667
    """
668
    if self._filewaiter:
669
      self._filewaiter.Close()
670

    
671

    
672
class _WaitForJobChangesHelper(object):
673
  """Helper class using inotify to wait for changes in a job file.
674

675
  This class takes a previous job status and serial, and alerts the client when
676
  the current job status has changed.
677

678
  """
679
  @staticmethod
680
  def _CheckForChanges(job_load_fn, check_fn):
681
    job = job_load_fn()
682
    if not job:
683
      raise errors.JobLost()
684

    
685
    result = check_fn(job)
686
    if result is None:
687
      raise utils.RetryAgain()
688

    
689
    return result
690

    
691
  def __call__(self, filename, job_load_fn,
692
               fields, prev_job_info, prev_log_serial, timeout):
693
    """Waits for changes on a job.
694

695
    @type filename: string
696
    @param filename: File on which to wait for changes
697
    @type job_load_fn: callable
698
    @param job_load_fn: Function to load job
699
    @type fields: list of strings
700
    @param fields: Which fields to check for changes
701
    @type prev_job_info: list or None
702
    @param prev_job_info: Last job information returned
703
    @type prev_log_serial: int
704
    @param prev_log_serial: Last job message serial number
705
    @type timeout: float
706
    @param timeout: maximum time to wait in seconds
707

708
    """
709
    try:
710
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
711
      waiter = _JobChangesWaiter(filename)
712
      try:
713
        return utils.Retry(compat.partial(self._CheckForChanges,
714
                                          job_load_fn, check_fn),
715
                           utils.RETRY_REMAINING_TIME, timeout,
716
                           wait_fn=waiter.Wait)
717
      finally:
718
        waiter.Close()
719
    except (errors.InotifyError, errors.JobLost):
720
      return None
721
    except utils.RetryTimeout:
722
      return constants.JOB_NOTCHANGED
723

    
724

    
725
def _EncodeOpError(err):
726
  """Encodes an error which occurred while processing an opcode.
727

728
  """
729
  if isinstance(err, errors.GenericError):
730
    to_encode = err
731
  else:
732
    to_encode = errors.OpExecError(str(err))
733

    
734
  return errors.EncodeException(to_encode)
735

    
736

    
737
class _JobQueueWorker(workerpool.BaseWorker):
738
  """The actual job workers.
739

740
  """
741
  def RunTask(self, job): # pylint: disable-msg=W0221
742
    """Job executor.
743

744
    This functions processes a job. It is closely tied to the _QueuedJob and
745
    _QueuedOpCode classes.
746

747
    @type job: L{_QueuedJob}
748
    @param job: the job to be processed
749

750
    """
751
    self.SetTaskName("Job%s" % job.id)
752

    
753
    logging.info("Processing job %s", job.id)
754
    proc = mcpu.Processor(self.pool.queue.context, job.id)
755
    queue = job.queue
756
    try:
757
      try:
758
        count = len(job.ops)
759
        for idx, op in enumerate(job.ops):
760
          op_summary = op.input.Summary()
761
          if op.status == constants.OP_STATUS_SUCCESS:
762
            # this is a job that was partially completed before master
763
            # daemon shutdown, so it can be expected that some opcodes
764
            # are already completed successfully (if any did error
765
            # out, then the whole job should have been aborted and not
766
            # resubmitted for processing)
767
            logging.info("Op %s/%s: opcode %s already processed, skipping",
768
                         idx + 1, count, op_summary)
769
            continue
770
          try:
771
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
772
                         op_summary)
773

    
774
            queue.acquire(shared=1)
775
            try:
776
              if op.status == constants.OP_STATUS_CANCELED:
777
                logging.debug("Canceling opcode")
778
                raise CancelJob()
779
              assert op.status == constants.OP_STATUS_QUEUED
780
              logging.debug("Opcode %s/%s waiting for locks",
781
                            idx + 1, count)
782
              op.status = constants.OP_STATUS_WAITLOCK
783
              op.result = None
784
              op.start_timestamp = TimeStampNow()
785
              if idx == 0: # first opcode
786
                job.start_timestamp = op.start_timestamp
787
              queue.UpdateJobUnlocked(job)
788

    
789
              input_opcode = op.input
790
            finally:
791
              queue.release()
792

    
793
            # Make sure not to hold queue lock while calling ExecOpCode
794
            result = proc.ExecOpCode(input_opcode,
795
                                     _OpExecCallbacks(queue, job, op))
796

    
797
            queue.acquire(shared=1)
798
            try:
799
              logging.debug("Opcode %s/%s succeeded", idx + 1, count)
800
              op.status = constants.OP_STATUS_SUCCESS
801
              op.result = result
802
              op.end_timestamp = TimeStampNow()
803
              if idx == count - 1:
804
                job.end_timestamp = TimeStampNow()
805

    
806
                # Consistency check
807
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
808
                                  for i in job.ops)
809

    
810
              queue.UpdateJobUnlocked(job)
811
            finally:
812
              queue.release()
813

    
814
            logging.info("Op %s/%s: Successfully finished opcode %s",
815
                         idx + 1, count, op_summary)
816
          except CancelJob:
817
            # Will be handled further up
818
            raise
819
          except Exception, err:
820
            queue.acquire(shared=1)
821
            try:
822
              try:
823
                logging.debug("Opcode %s/%s failed", idx + 1, count)
824
                op.status = constants.OP_STATUS_ERROR
825
                op.result = _EncodeOpError(err)
826
                op.end_timestamp = TimeStampNow()
827
                logging.info("Op %s/%s: Error in opcode %s: %s",
828
                             idx + 1, count, op_summary, err)
829

    
830
                to_encode = errors.OpExecError("Preceding opcode failed")
831
                job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
832
                                      _EncodeOpError(to_encode))
833

    
834
                # Consistency check
835
                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
836
                                  for i in job.ops[:idx])
837
                assert compat.all(i.status == constants.OP_STATUS_ERROR and
838
                                  errors.GetEncodedError(i.result)
839
                                  for i in job.ops[idx:])
840
              finally:
841
                job.end_timestamp = TimeStampNow()
842
                queue.UpdateJobUnlocked(job)
843
            finally:
844
              queue.release()
845
            raise
846

    
847
      except CancelJob:
848
        queue.acquire(shared=1)
849
        try:
850
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
851
                                "Job canceled by request")
852
          job.end_timestamp = TimeStampNow()
853
          queue.UpdateJobUnlocked(job)
854
        finally:
855
          queue.release()
856
      except errors.GenericError, err:
857
        logging.exception("Ganeti exception")
858
      except:
859
        logging.exception("Unhandled exception")
860
    finally:
861
      status = job.CalcStatus()
862
      logging.info("Finished job %s, status = %s", job.id, status)
863

    
864

    
865
class _JobQueueWorkerPool(workerpool.WorkerPool):
866
  """Simple class implementing a job-processing workerpool.
867

868
  """
869
  def __init__(self, queue):
870
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
871
                                              JOBQUEUE_THREADS,
872
                                              _JobQueueWorker)
873
    self.queue = queue
874

    
875

    
876
def _RequireOpenQueue(fn):
877
  """Decorator for "public" functions.
878

879
  This function should be used for all 'public' functions. That is,
880
  functions usually called from other classes. Note that this should
881
  be applied only to methods (not plain functions), since it expects
882
  that the decorated function is called with a first argument that has
883
  a '_queue_filelock' argument.
884

885
  @warning: Use this decorator only after locking.ssynchronized
886

887
  Example::
888
    @locking.ssynchronized(_LOCK)
889
    @_RequireOpenQueue
890
    def Example(self):
891
      pass
892

893
  """
894
  def wrapper(self, *args, **kwargs):
895
    # pylint: disable-msg=W0212
896
    assert self._queue_filelock is not None, "Queue should be open"
897
    return fn(self, *args, **kwargs)
898
  return wrapper
899

    
900

    
901
class JobQueue(object):
902
  """Queue used to manage the jobs.
903

904
  @cvar _RE_JOB_FILE: regex matching the valid job file names
905

906
  """
907
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
908

    
909
  def __init__(self, context):
910
    """Constructor for JobQueue.
911

912
    The constructor will initialize the job queue object and then
913
    start loading the current jobs from disk, either for starting them
914
    (if they were queue) or for aborting them (if they were already
915
    running).
916

917
    @type context: GanetiContext
918
    @param context: the context object for access to the configuration
919
        data and other ganeti objects
920

921
    """
922
    self.context = context
923
    self._memcache = weakref.WeakValueDictionary()
924
    self._my_hostname = netutils.Hostname.GetSysName()
925

    
926
    # The Big JobQueue lock. If a code block or method acquires it in shared
927
    # mode safe it must guarantee concurrency with all the code acquiring it in
928
    # shared mode, including itself. In order not to acquire it at all
929
    # concurrency must be guaranteed with all code acquiring it in shared mode
930
    # and all code acquiring it exclusively.
931
    self._lock = locking.SharedLock("JobQueue")
932

    
933
    self.acquire = self._lock.acquire
934
    self.release = self._lock.release
935

    
936
    # Initialize the queue, and acquire the filelock.
937
    # This ensures no other process is working on the job queue.
938
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
939

    
940
    # Read serial file
941
    self._last_serial = jstore.ReadSerial()
942
    assert self._last_serial is not None, ("Serial file was modified between"
943
                                           " check in jstore and here")
944

    
945
    # Get initial list of nodes
946
    self._nodes = dict((n.name, n.primary_ip)
947
                       for n in self.context.cfg.GetAllNodesInfo().values()
948
                       if n.master_candidate)
949

    
950
    # Remove master node
951
    self._nodes.pop(self._my_hostname, None)
952

    
953
    # TODO: Check consistency across nodes
954

    
955
    self._queue_size = 0
956
    self._UpdateQueueSizeUnlocked()
957
    self._drained = self._IsQueueMarkedDrain()
958

    
959
    # Setup worker pool
960
    self._wpool = _JobQueueWorkerPool(self)
961
    try:
962
      self._InspectQueue()
963
    except:
964
      self._wpool.TerminateWorkers()
965
      raise
966

    
967
  @locking.ssynchronized(_LOCK)
968
  @_RequireOpenQueue
969
  def _InspectQueue(self):
970
    """Loads the whole job queue and resumes unfinished jobs.
971

972
    This function needs the lock here because WorkerPool.AddTask() may start a
973
    job while we're still doing our work.
974

975
    """
976
    logging.info("Inspecting job queue")
977

    
978
    restartjobs = []
979

    
980
    all_job_ids = self._GetJobIDsUnlocked()
981
    jobs_count = len(all_job_ids)
982
    lastinfo = time.time()
983
    for idx, job_id in enumerate(all_job_ids):
984
      # Give an update every 1000 jobs or 10 seconds
985
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
986
          idx == (jobs_count - 1)):
987
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
988
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
989
        lastinfo = time.time()
990

    
991
      job = self._LoadJobUnlocked(job_id)
992

    
993
      # a failure in loading the job can cause 'None' to be returned
994
      if job is None:
995
        continue
996

    
997
      status = job.CalcStatus()
998

    
999
      if status in (constants.JOB_STATUS_QUEUED, ):
1000
        restartjobs.append(job)
1001

    
1002
      elif status in (constants.JOB_STATUS_RUNNING,
1003
                      constants.JOB_STATUS_WAITLOCK,
1004
                      constants.JOB_STATUS_CANCELING):
1005
        logging.warning("Unfinished job %s found: %s", job.id, job)
1006
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1007
                              "Unclean master daemon shutdown")
1008
        self.UpdateJobUnlocked(job)
1009

    
1010
    if restartjobs:
1011
      logging.info("Restarting %s jobs", len(restartjobs))
1012
      self._EnqueueJobs(restartjobs)
1013

    
1014
    logging.info("Job queue inspection finished")
1015

    
1016
  @locking.ssynchronized(_LOCK)
1017
  @_RequireOpenQueue
1018
  def AddNode(self, node):
1019
    """Register a new node with the queue.
1020

1021
    @type node: L{objects.Node}
1022
    @param node: the node object to be added
1023

1024
    """
1025
    node_name = node.name
1026
    assert node_name != self._my_hostname
1027

    
1028
    # Clean queue directory on added node
1029
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1030
    msg = result.fail_msg
1031
    if msg:
1032
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1033
                      node_name, msg)
1034

    
1035
    if not node.master_candidate:
1036
      # remove if existing, ignoring errors
1037
      self._nodes.pop(node_name, None)
1038
      # and skip the replication of the job ids
1039
      return
1040

    
1041
    # Upload the whole queue excluding archived jobs
1042
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1043

    
1044
    # Upload current serial file
1045
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1046

    
1047
    for file_name in files:
1048
      # Read file content
1049
      content = utils.ReadFile(file_name)
1050

    
1051
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
1052
                                                  [node.primary_ip],
1053
                                                  file_name, content)
1054
      msg = result[node_name].fail_msg
1055
      if msg:
1056
        logging.error("Failed to upload file %s to node %s: %s",
1057
                      file_name, node_name, msg)
1058

    
1059
    self._nodes[node_name] = node.primary_ip
1060

    
1061
  @locking.ssynchronized(_LOCK)
1062
  @_RequireOpenQueue
1063
  def RemoveNode(self, node_name):
1064
    """Callback called when removing nodes from the cluster.
1065

1066
    @type node_name: str
1067
    @param node_name: the name of the node to remove
1068

1069
    """
1070
    self._nodes.pop(node_name, None)
1071

    
1072
  @staticmethod
1073
  def _CheckRpcResult(result, nodes, failmsg):
1074
    """Verifies the status of an RPC call.
1075

1076
    Since we aim to keep consistency should this node (the current
1077
    master) fail, we will log errors if our rpc fail, and especially
1078
    log the case when more than half of the nodes fails.
1079

1080
    @param result: the data as returned from the rpc call
1081
    @type nodes: list
1082
    @param nodes: the list of nodes we made the call to
1083
    @type failmsg: str
1084
    @param failmsg: the identifier to be used for logging
1085

1086
    """
1087
    failed = []
1088
    success = []
1089

    
1090
    for node in nodes:
1091
      msg = result[node].fail_msg
1092
      if msg:
1093
        failed.append(node)
1094
        logging.error("RPC call %s (%s) failed on node %s: %s",
1095
                      result[node].call, failmsg, node, msg)
1096
      else:
1097
        success.append(node)
1098

    
1099
    # +1 for the master node
1100
    if (len(success) + 1) < len(failed):
1101
      # TODO: Handle failing nodes
1102
      logging.error("More than half of the nodes failed")
1103

    
1104
  def _GetNodeIp(self):
1105
    """Helper for returning the node name/ip list.
1106

1107
    @rtype: (list, list)
1108
    @return: a tuple of two lists, the first one with the node
1109
        names and the second one with the node addresses
1110

1111
    """
1112
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1113
    name_list = self._nodes.keys()
1114
    addr_list = [self._nodes[name] for name in name_list]
1115
    return name_list, addr_list
1116

    
1117
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1118
    """Writes a file locally and then replicates it to all nodes.
1119

1120
    This function will replace the contents of a file on the local
1121
    node and then replicate it to all the other nodes we have.
1122

1123
    @type file_name: str
1124
    @param file_name: the path of the file to be replicated
1125
    @type data: str
1126
    @param data: the new contents of the file
1127
    @type replicate: boolean
1128
    @param replicate: whether to spread the changes to the remote nodes
1129

1130
    """
1131
    getents = runtime.GetEnts()
1132
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1133
                    gid=getents.masterd_gid)
1134

    
1135
    if replicate:
1136
      names, addrs = self._GetNodeIp()
1137
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1138
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1139

    
1140
  def _RenameFilesUnlocked(self, rename):
1141
    """Renames a file locally and then replicate the change.
1142

1143
    This function will rename a file in the local queue directory
1144
    and then replicate this rename to all the other nodes we have.
1145

1146
    @type rename: list of (old, new)
1147
    @param rename: List containing tuples mapping old to new names
1148

1149
    """
1150
    # Rename them locally
1151
    for old, new in rename:
1152
      utils.RenameFile(old, new, mkdir=True)
1153

    
1154
    # ... and on all nodes
1155
    names, addrs = self._GetNodeIp()
1156
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1157
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1158

    
1159
  @staticmethod
1160
  def _FormatJobID(job_id):
1161
    """Convert a job ID to string format.
1162

1163
    Currently this just does C{str(job_id)} after performing some
1164
    checks, but if we want to change the job id format this will
1165
    abstract this change.
1166

1167
    @type job_id: int or long
1168
    @param job_id: the numeric job id
1169
    @rtype: str
1170
    @return: the formatted job id
1171

1172
    """
1173
    if not isinstance(job_id, (int, long)):
1174
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1175
    if job_id < 0:
1176
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1177

    
1178
    return str(job_id)
1179

    
1180
  @classmethod
1181
  def _GetArchiveDirectory(cls, job_id):
1182
    """Returns the archive directory for a job.
1183

1184
    @type job_id: str
1185
    @param job_id: Job identifier
1186
    @rtype: str
1187
    @return: Directory name
1188

1189
    """
1190
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1191

    
1192
  def _NewSerialsUnlocked(self, count):
1193
    """Generates a new job identifier.
1194

1195
    Job identifiers are unique during the lifetime of a cluster.
1196

1197
    @type count: integer
1198
    @param count: how many serials to return
1199
    @rtype: str
1200
    @return: a string representing the job identifier.
1201

1202
    """
1203
    assert count > 0
1204
    # New number
1205
    serial = self._last_serial + count
1206

    
1207
    # Write to file
1208
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1209
                             "%s\n" % serial, True)
1210

    
1211
    result = [self._FormatJobID(v)
1212
              for v in range(self._last_serial, serial + 1)]
1213
    # Keep it only if we were able to write the file
1214
    self._last_serial = serial
1215

    
1216
    return result
1217

    
1218
  @staticmethod
1219
  def _GetJobPath(job_id):
1220
    """Returns the job file for a given job id.
1221

1222
    @type job_id: str
1223
    @param job_id: the job identifier
1224
    @rtype: str
1225
    @return: the path to the job file
1226

1227
    """
1228
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1229

    
1230
  @classmethod
1231
  def _GetArchivedJobPath(cls, job_id):
1232
    """Returns the archived job file for a give job id.
1233

1234
    @type job_id: str
1235
    @param job_id: the job identifier
1236
    @rtype: str
1237
    @return: the path to the archived job file
1238

1239
    """
1240
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1241
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1242

    
1243
  def _GetJobIDsUnlocked(self, sort=True):
1244
    """Return all known job IDs.
1245

1246
    The method only looks at disk because it's a requirement that all
1247
    jobs are present on disk (so in the _memcache we don't have any
1248
    extra IDs).
1249

1250
    @type sort: boolean
1251
    @param sort: perform sorting on the returned job ids
1252
    @rtype: list
1253
    @return: the list of job IDs
1254

1255
    """
1256
    jlist = []
1257
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1258
      m = self._RE_JOB_FILE.match(filename)
1259
      if m:
1260
        jlist.append(m.group(1))
1261
    if sort:
1262
      jlist = utils.NiceSort(jlist)
1263
    return jlist
1264

    
1265
  def _LoadJobUnlocked(self, job_id):
1266
    """Loads a job from the disk or memory.
1267

1268
    Given a job id, this will return the cached job object if
1269
    existing, or try to load the job from the disk. If loading from
1270
    disk, it will also add the job to the cache.
1271

1272
    @param job_id: the job id
1273
    @rtype: L{_QueuedJob} or None
1274
    @return: either None or the job object
1275

1276
    """
1277
    job = self._memcache.get(job_id, None)
1278
    if job:
1279
      logging.debug("Found job %s in memcache", job_id)
1280
      return job
1281

    
1282
    try:
1283
      job = self._LoadJobFromDisk(job_id)
1284
      if job is None:
1285
        return job
1286
    except errors.JobFileCorrupted:
1287
      old_path = self._GetJobPath(job_id)
1288
      new_path = self._GetArchivedJobPath(job_id)
1289
      if old_path == new_path:
1290
        # job already archived (future case)
1291
        logging.exception("Can't parse job %s", job_id)
1292
      else:
1293
        # non-archived case
1294
        logging.exception("Can't parse job %s, will archive.", job_id)
1295
        self._RenameFilesUnlocked([(old_path, new_path)])
1296
      return None
1297

    
1298
    self._memcache[job_id] = job
1299
    logging.debug("Added job %s to the cache", job_id)
1300
    return job
1301

    
1302
  def _LoadJobFromDisk(self, job_id):
1303
    """Load the given job file from disk.
1304

1305
    Given a job file, read, load and restore it in a _QueuedJob format.
1306

1307
    @type job_id: string
1308
    @param job_id: job identifier
1309
    @rtype: L{_QueuedJob} or None
1310
    @return: either None or the job object
1311

1312
    """
1313
    filepath = self._GetJobPath(job_id)
1314
    logging.debug("Loading job from %s", filepath)
1315
    try:
1316
      raw_data = utils.ReadFile(filepath)
1317
    except EnvironmentError, err:
1318
      if err.errno in (errno.ENOENT, ):
1319
        return None
1320
      raise
1321

    
1322
    try:
1323
      data = serializer.LoadJson(raw_data)
1324
      job = _QueuedJob.Restore(self, data)
1325
    except Exception, err: # pylint: disable-msg=W0703
1326
      raise errors.JobFileCorrupted(err)
1327

    
1328
    return job
1329

    
1330
  def SafeLoadJobFromDisk(self, job_id):
1331
    """Load the given job file from disk.
1332

1333
    Given a job file, read, load and restore it in a _QueuedJob format.
1334
    In case of error reading the job, it gets returned as None, and the
1335
    exception is logged.
1336

1337
    @type job_id: string
1338
    @param job_id: job identifier
1339
    @rtype: L{_QueuedJob} or None
1340
    @return: either None or the job object
1341

1342
    """
1343
    try:
1344
      return self._LoadJobFromDisk(job_id)
1345
    except (errors.JobFileCorrupted, EnvironmentError):
1346
      logging.exception("Can't load/parse job %s", job_id)
1347
      return None
1348

    
1349
  @staticmethod
1350
  def _IsQueueMarkedDrain():
1351
    """Check if the queue is marked from drain.
1352

1353
    This currently uses the queue drain file, which makes it a
1354
    per-node flag. In the future this can be moved to the config file.
1355

1356
    @rtype: boolean
1357
    @return: True of the job queue is marked for draining
1358

1359
    """
1360
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1361

    
1362
  def _UpdateQueueSizeUnlocked(self):
1363
    """Update the queue size.
1364

1365
    """
1366
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1367

    
1368
  @locking.ssynchronized(_LOCK)
1369
  @_RequireOpenQueue
1370
  def SetDrainFlag(self, drain_flag):
1371
    """Sets the drain flag for the queue.
1372

1373
    @type drain_flag: boolean
1374
    @param drain_flag: Whether to set or unset the drain flag
1375

1376
    """
1377
    getents = runtime.GetEnts()
1378

    
1379
    if drain_flag:
1380
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
1381
                      uid=getents.masterd_uid, gid=getents.masterd_gid)
1382
    else:
1383
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1384

    
1385
    self._drained = drain_flag
1386

    
1387
    return True
1388

    
1389
  @_RequireOpenQueue
1390
  def _SubmitJobUnlocked(self, job_id, ops):
1391
    """Create and store a new job.
1392

1393
    This enters the job into our job queue and also puts it on the new
1394
    queue, in order for it to be picked up by the queue processors.
1395

1396
    @type job_id: job ID
1397
    @param job_id: the job ID for the new job
1398
    @type ops: list
1399
    @param ops: The list of OpCodes that will become the new job.
1400
    @rtype: L{_QueuedJob}
1401
    @return: the job object to be queued
1402
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1403
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1404
    @raise errors.GenericError: If an opcode is not valid
1405

1406
    """
1407
    # Ok when sharing the big job queue lock, as the drain file is created when
1408
    # the lock is exclusive.
1409
    if self._drained:
1410
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1411

    
1412
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1413
      raise errors.JobQueueFull()
1414

    
1415
    job = _QueuedJob(self, job_id, ops)
1416

    
1417
    # Check priority
1418
    for idx, op in enumerate(job.ops):
1419
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1420
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1421
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1422
                                  " are %s" % (idx, op.priority, allowed))
1423

    
1424
    # Write to disk
1425
    self.UpdateJobUnlocked(job)
1426

    
1427
    self._queue_size += 1
1428

    
1429
    logging.debug("Adding new job %s to the cache", job_id)
1430
    self._memcache[job_id] = job
1431

    
1432
    return job
1433

    
1434
  @locking.ssynchronized(_LOCK)
1435
  @_RequireOpenQueue
1436
  def SubmitJob(self, ops):
1437
    """Create and store a new job.
1438

1439
    @see: L{_SubmitJobUnlocked}
1440

1441
    """
1442
    job_id = self._NewSerialsUnlocked(1)[0]
1443
    self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1444
    return job_id
1445

    
1446
  @locking.ssynchronized(_LOCK)
1447
  @_RequireOpenQueue
1448
  def SubmitManyJobs(self, jobs):
1449
    """Create and store multiple jobs.
1450

1451
    @see: L{_SubmitJobUnlocked}
1452

1453
    """
1454
    results = []
1455
    added_jobs = []
1456
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1457
    for job_id, ops in zip(all_job_ids, jobs):
1458
      try:
1459
        added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1460
        status = True
1461
        data = job_id
1462
      except errors.GenericError, err:
1463
        data = str(err)
1464
        status = False
1465
      results.append((status, data))
1466

    
1467
    self._EnqueueJobs(added_jobs)
1468

    
1469
    return results
1470

    
1471
  def _EnqueueJobs(self, jobs):
1472
    """Helper function to add jobs to worker pool's queue.
1473

1474
    @type jobs: list
1475
    @param jobs: List of all jobs
1476

1477
    """
1478
    self._wpool.AddManyTasks([(job, ) for job in jobs],
1479
                             priority=[job.CalcPriority() for job in jobs])
1480

    
1481
  @_RequireOpenQueue
1482
  def UpdateJobUnlocked(self, job, replicate=True):
1483
    """Update a job's on disk storage.
1484

1485
    After a job has been modified, this function needs to be called in
1486
    order to write the changes to disk and replicate them to the other
1487
    nodes.
1488

1489
    @type job: L{_QueuedJob}
1490
    @param job: the changed job
1491
    @type replicate: boolean
1492
    @param replicate: whether to replicate the change to remote nodes
1493

1494
    """
1495
    filename = self._GetJobPath(job.id)
1496
    data = serializer.DumpJson(job.Serialize(), indent=False)
1497
    logging.debug("Writing job %s to %s", job.id, filename)
1498
    self._UpdateJobQueueFile(filename, data, replicate)
1499

    
1500
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1501
                        timeout):
1502
    """Waits for changes in a job.
1503

1504
    @type job_id: string
1505
    @param job_id: Job identifier
1506
    @type fields: list of strings
1507
    @param fields: Which fields to check for changes
1508
    @type prev_job_info: list or None
1509
    @param prev_job_info: Last job information returned
1510
    @type prev_log_serial: int
1511
    @param prev_log_serial: Last job message serial number
1512
    @type timeout: float
1513
    @param timeout: maximum time to wait in seconds
1514
    @rtype: tuple (job info, log entries)
1515
    @return: a tuple of the job information as required via
1516
        the fields parameter, and the log entries as a list
1517

1518
        if the job has not changed and the timeout has expired,
1519
        we instead return a special value,
1520
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1521
        as such by the clients
1522

1523
    """
1524
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1525

    
1526
    helper = _WaitForJobChangesHelper()
1527

    
1528
    return helper(self._GetJobPath(job_id), load_fn,
1529
                  fields, prev_job_info, prev_log_serial, timeout)
1530

    
1531
  @locking.ssynchronized(_LOCK)
1532
  @_RequireOpenQueue
1533
  def CancelJob(self, job_id):
1534
    """Cancels a job.
1535

1536
    This will only succeed if the job has not started yet.
1537

1538
    @type job_id: string
1539
    @param job_id: job ID of job to be cancelled.
1540

1541
    """
1542
    logging.info("Cancelling job %s", job_id)
1543

    
1544
    job = self._LoadJobUnlocked(job_id)
1545
    if not job:
1546
      logging.debug("Job %s not found", job_id)
1547
      return (False, "Job %s not found" % job_id)
1548

    
1549
    (success, msg) = job.Cancel()
1550

    
1551
    if success:
1552
      self.UpdateJobUnlocked(job)
1553

    
1554
    return (success, msg)
1555

    
1556
  @_RequireOpenQueue
1557
  def _ArchiveJobsUnlocked(self, jobs):
1558
    """Archives jobs.
1559

1560
    @type jobs: list of L{_QueuedJob}
1561
    @param jobs: Job objects
1562
    @rtype: int
1563
    @return: Number of archived jobs
1564

1565
    """
1566
    archive_jobs = []
1567
    rename_files = []
1568
    for job in jobs:
1569
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1570
        logging.debug("Job %s is not yet done", job.id)
1571
        continue
1572

    
1573
      archive_jobs.append(job)
1574

    
1575
      old = self._GetJobPath(job.id)
1576
      new = self._GetArchivedJobPath(job.id)
1577
      rename_files.append((old, new))
1578

    
1579
    # TODO: What if 1..n files fail to rename?
1580
    self._RenameFilesUnlocked(rename_files)
1581

    
1582
    logging.debug("Successfully archived job(s) %s",
1583
                  utils.CommaJoin(job.id for job in archive_jobs))
1584

    
1585
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1586
    # the files, we update the cached queue size from the filesystem. When we
1587
    # get around to fix the TODO: above, we can use the number of actually
1588
    # archived jobs to fix this.
1589
    self._UpdateQueueSizeUnlocked()
1590
    return len(archive_jobs)
1591

    
1592
  @locking.ssynchronized(_LOCK)
1593
  @_RequireOpenQueue
1594
  def ArchiveJob(self, job_id):
1595
    """Archives a job.
1596

1597
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1598

1599
    @type job_id: string
1600
    @param job_id: Job ID of job to be archived.
1601
    @rtype: bool
1602
    @return: Whether job was archived
1603

1604
    """
1605
    logging.info("Archiving job %s", job_id)
1606

    
1607
    job = self._LoadJobUnlocked(job_id)
1608
    if not job:
1609
      logging.debug("Job %s not found", job_id)
1610
      return False
1611

    
1612
    return self._ArchiveJobsUnlocked([job]) == 1
1613

    
1614
  @locking.ssynchronized(_LOCK)
1615
  @_RequireOpenQueue
1616
  def AutoArchiveJobs(self, age, timeout):
1617
    """Archives all jobs based on age.
1618

1619
    The method will archive all jobs which are older than the age
1620
    parameter. For jobs that don't have an end timestamp, the start
1621
    timestamp will be considered. The special '-1' age will cause
1622
    archival of all jobs (that are not running or queued).
1623

1624
    @type age: int
1625
    @param age: the minimum age in seconds
1626

1627
    """
1628
    logging.info("Archiving jobs with age more than %s seconds", age)
1629

    
1630
    now = time.time()
1631
    end_time = now + timeout
1632
    archived_count = 0
1633
    last_touched = 0
1634

    
1635
    all_job_ids = self._GetJobIDsUnlocked()
1636
    pending = []
1637
    for idx, job_id in enumerate(all_job_ids):
1638
      last_touched = idx + 1
1639

    
1640
      # Not optimal because jobs could be pending
1641
      # TODO: Measure average duration for job archival and take number of
1642
      # pending jobs into account.
1643
      if time.time() > end_time:
1644
        break
1645

    
1646
      # Returns None if the job failed to load
1647
      job = self._LoadJobUnlocked(job_id)
1648
      if job:
1649
        if job.end_timestamp is None:
1650
          if job.start_timestamp is None:
1651
            job_age = job.received_timestamp
1652
          else:
1653
            job_age = job.start_timestamp
1654
        else:
1655
          job_age = job.end_timestamp
1656

    
1657
        if age == -1 or now - job_age[0] > age:
1658
          pending.append(job)
1659

    
1660
          # Archive 10 jobs at a time
1661
          if len(pending) >= 10:
1662
            archived_count += self._ArchiveJobsUnlocked(pending)
1663
            pending = []
1664

    
1665
    if pending:
1666
      archived_count += self._ArchiveJobsUnlocked(pending)
1667

    
1668
    return (archived_count, len(all_job_ids) - last_touched)
1669

    
1670
  def QueryJobs(self, job_ids, fields):
1671
    """Returns a list of jobs in queue.
1672

1673
    @type job_ids: list
1674
    @param job_ids: sequence of job identifiers or None for all
1675
    @type fields: list
1676
    @param fields: names of fields to return
1677
    @rtype: list
1678
    @return: list one element per job, each element being list with
1679
        the requested fields
1680

1681
    """
1682
    jobs = []
1683
    list_all = False
1684
    if not job_ids:
1685
      # Since files are added to/removed from the queue atomically, there's no
1686
      # risk of getting the job ids in an inconsistent state.
1687
      job_ids = self._GetJobIDsUnlocked()
1688
      list_all = True
1689

    
1690
    for job_id in job_ids:
1691
      job = self.SafeLoadJobFromDisk(job_id)
1692
      if job is not None:
1693
        jobs.append(job.GetInfo(fields))
1694
      elif not list_all:
1695
        jobs.append(None)
1696

    
1697
    return jobs
1698

    
1699
  @locking.ssynchronized(_LOCK)
1700
  @_RequireOpenQueue
1701
  def Shutdown(self):
1702
    """Stops the job queue.
1703

1704
    This shutdowns all the worker threads an closes the queue.
1705

1706
    """
1707
    self._wpool.TerminateWorkers()
1708

    
1709
    self._queue_filelock.Close()
1710
    self._queue_filelock = None