Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 3c88bf36

History | View | Annotate | Download (58.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import errno
35
import re
36
import time
37
import weakref
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
59

    
60

    
61
JOBQUEUE_THREADS = 25
62
JOBS_PER_ARCHIVE_DIRECTORY = 10000
63

    
64
# member lock names to be passed to @ssynchronized decorator
65
_LOCK = "_lock"
66
_QUEUE = "_queue"
67

    
68

    
69
class CancelJob(Exception):
70
  """Special exception to cancel a job.
71

72
  """
73

    
74

    
75
def TimeStampNow():
76
  """Returns the current timestamp.
77

78
  @rtype: tuple
79
  @return: the current time in the (seconds, microseconds) format
80

81
  """
82
  return utils.SplitTime(time.time())
83

    
84

    
85
class _QueuedOpCode(object):
86
  """Encapsulates an opcode object.
87

88
  @ivar log: holds the execution log and consists of tuples
89
  of the form C{(log_serial, timestamp, level, message)}
90
  @ivar input: the OpCode we encapsulate
91
  @ivar status: the current status
92
  @ivar result: the result of the LU execution
93
  @ivar start_timestamp: timestamp for the start of the execution
94
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
95
  @ivar stop_timestamp: timestamp for the end of the execution
96

97
  """
98
  __slots__ = ["input", "status", "result", "log", "priority",
99
               "start_timestamp", "exec_timestamp", "end_timestamp",
100
               "__weakref__"]
101

    
102
  def __init__(self, op):
103
    """Constructor for the _QuededOpCode.
104

105
    @type op: L{opcodes.OpCode}
106
    @param op: the opcode we encapsulate
107

108
    """
109
    self.input = op
110
    self.status = constants.OP_STATUS_QUEUED
111
    self.result = None
112
    self.log = []
113
    self.start_timestamp = None
114
    self.exec_timestamp = None
115
    self.end_timestamp = None
116

    
117
    # Get initial priority (it might change during the lifetime of this opcode)
118
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
119

    
120
  @classmethod
121
  def Restore(cls, state):
122
    """Restore the _QueuedOpCode from the serialized form.
123

124
    @type state: dict
125
    @param state: the serialized state
126
    @rtype: _QueuedOpCode
127
    @return: a new _QueuedOpCode instance
128

129
    """
130
    obj = _QueuedOpCode.__new__(cls)
131
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
132
    obj.status = state["status"]
133
    obj.result = state["result"]
134
    obj.log = state["log"]
135
    obj.start_timestamp = state.get("start_timestamp", None)
136
    obj.exec_timestamp = state.get("exec_timestamp", None)
137
    obj.end_timestamp = state.get("end_timestamp", None)
138
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
139
    return obj
140

    
141
  def Serialize(self):
142
    """Serializes this _QueuedOpCode.
143

144
    @rtype: dict
145
    @return: the dictionary holding the serialized state
146

147
    """
148
    return {
149
      "input": self.input.__getstate__(),
150
      "status": self.status,
151
      "result": self.result,
152
      "log": self.log,
153
      "start_timestamp": self.start_timestamp,
154
      "exec_timestamp": self.exec_timestamp,
155
      "end_timestamp": self.end_timestamp,
156
      "priority": self.priority,
157
      }
158

    
159

    
160
class _QueuedJob(object):
161
  """In-memory job representation.
162

163
  This is what we use to track the user-submitted jobs. Locking must
164
  be taken care of by users of this class.
165

166
  @type queue: L{JobQueue}
167
  @ivar queue: the parent queue
168
  @ivar id: the job ID
169
  @type ops: list
170
  @ivar ops: the list of _QueuedOpCode that constitute the job
171
  @type log_serial: int
172
  @ivar log_serial: holds the index for the next log entry
173
  @ivar received_timestamp: the timestamp for when the job was received
174
  @ivar start_timestmap: the timestamp for start of execution
175
  @ivar end_timestamp: the timestamp for end of execution
176

177
  """
178
  # pylint: disable-msg=W0212
179
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
180
               "received_timestamp", "start_timestamp", "end_timestamp",
181
               "__weakref__"]
182

    
183
  def __init__(self, queue, job_id, ops):
184
    """Constructor for the _QueuedJob.
185

186
    @type queue: L{JobQueue}
187
    @param queue: our parent queue
188
    @type job_id: job_id
189
    @param job_id: our job id
190
    @type ops: list
191
    @param ops: the list of opcodes we hold, which will be encapsulated
192
        in _QueuedOpCodes
193

194
    """
195
    if not ops:
196
      raise errors.GenericError("A job needs at least one opcode")
197

    
198
    self.queue = queue
199
    self.id = job_id
200
    self.ops = [_QueuedOpCode(op) for op in ops]
201
    self.log_serial = 0
202
    self.received_timestamp = TimeStampNow()
203
    self.start_timestamp = None
204
    self.end_timestamp = None
205

    
206
    self._InitInMemory(self)
207

    
208
  @staticmethod
209
  def _InitInMemory(obj):
210
    """Initializes in-memory variables.
211

212
    """
213
    obj.ops_iter = None
214
    obj.cur_opctx = None
215

    
216
  def __repr__(self):
217
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
218
              "id=%s" % self.id,
219
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
220

    
221
    return "<%s at %#x>" % (" ".join(status), id(self))
222

    
223
  @classmethod
224
  def Restore(cls, queue, state):
225
    """Restore a _QueuedJob from serialized state:
226

227
    @type queue: L{JobQueue}
228
    @param queue: to which queue the restored job belongs
229
    @type state: dict
230
    @param state: the serialized state
231
    @rtype: _JobQueue
232
    @return: the restored _JobQueue instance
233

234
    """
235
    obj = _QueuedJob.__new__(cls)
236
    obj.queue = queue
237
    obj.id = state["id"]
238
    obj.received_timestamp = state.get("received_timestamp", None)
239
    obj.start_timestamp = state.get("start_timestamp", None)
240
    obj.end_timestamp = state.get("end_timestamp", None)
241

    
242
    obj.ops = []
243
    obj.log_serial = 0
244
    for op_state in state["ops"]:
245
      op = _QueuedOpCode.Restore(op_state)
246
      for log_entry in op.log:
247
        obj.log_serial = max(obj.log_serial, log_entry[0])
248
      obj.ops.append(op)
249

    
250
    cls._InitInMemory(obj)
251

    
252
    return obj
253

    
254
  def Serialize(self):
255
    """Serialize the _JobQueue instance.
256

257
    @rtype: dict
258
    @return: the serialized state
259

260
    """
261
    return {
262
      "id": self.id,
263
      "ops": [op.Serialize() for op in self.ops],
264
      "start_timestamp": self.start_timestamp,
265
      "end_timestamp": self.end_timestamp,
266
      "received_timestamp": self.received_timestamp,
267
      }
268

    
269
  def CalcStatus(self):
270
    """Compute the status of this job.
271

272
    This function iterates over all the _QueuedOpCodes in the job and
273
    based on their status, computes the job status.
274

275
    The algorithm is:
276
      - if we find a cancelled, or finished with error, the job
277
        status will be the same
278
      - otherwise, the last opcode with the status one of:
279
          - waitlock
280
          - canceling
281
          - running
282

283
        will determine the job status
284

285
      - otherwise, it means either all opcodes are queued, or success,
286
        and the job status will be the same
287

288
    @return: the job status
289

290
    """
291
    status = constants.JOB_STATUS_QUEUED
292

    
293
    all_success = True
294
    for op in self.ops:
295
      if op.status == constants.OP_STATUS_SUCCESS:
296
        continue
297

    
298
      all_success = False
299

    
300
      if op.status == constants.OP_STATUS_QUEUED:
301
        pass
302
      elif op.status == constants.OP_STATUS_WAITLOCK:
303
        status = constants.JOB_STATUS_WAITLOCK
304
      elif op.status == constants.OP_STATUS_RUNNING:
305
        status = constants.JOB_STATUS_RUNNING
306
      elif op.status == constants.OP_STATUS_CANCELING:
307
        status = constants.JOB_STATUS_CANCELING
308
        break
309
      elif op.status == constants.OP_STATUS_ERROR:
310
        status = constants.JOB_STATUS_ERROR
311
        # The whole job fails if one opcode failed
312
        break
313
      elif op.status == constants.OP_STATUS_CANCELED:
314
        status = constants.OP_STATUS_CANCELED
315
        break
316

    
317
    if all_success:
318
      status = constants.JOB_STATUS_SUCCESS
319

    
320
    return status
321

    
322
  def CalcPriority(self):
323
    """Gets the current priority for this job.
324

325
    Only unfinished opcodes are considered. When all are done, the default
326
    priority is used.
327

328
    @rtype: int
329

330
    """
331
    priorities = [op.priority for op in self.ops
332
                  if op.status not in constants.OPS_FINALIZED]
333

    
334
    if not priorities:
335
      # All opcodes are done, assume default priority
336
      return constants.OP_PRIO_DEFAULT
337

    
338
    return min(priorities)
339

    
340
  def GetLogEntries(self, newer_than):
341
    """Selectively returns the log entries.
342

343
    @type newer_than: None or int
344
    @param newer_than: if this is None, return all log entries,
345
        otherwise return only the log entries with serial higher
346
        than this value
347
    @rtype: list
348
    @return: the list of the log entries selected
349

350
    """
351
    if newer_than is None:
352
      serial = -1
353
    else:
354
      serial = newer_than
355

    
356
    entries = []
357
    for op in self.ops:
358
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
359

    
360
    return entries
361

    
362
  def GetInfo(self, fields):
363
    """Returns information about a job.
364

365
    @type fields: list
366
    @param fields: names of fields to return
367
    @rtype: list
368
    @return: list with one element for each field
369
    @raise errors.OpExecError: when an invalid field
370
        has been passed
371

372
    """
373
    row = []
374
    for fname in fields:
375
      if fname == "id":
376
        row.append(self.id)
377
      elif fname == "status":
378
        row.append(self.CalcStatus())
379
      elif fname == "priority":
380
        row.append(self.CalcPriority())
381
      elif fname == "ops":
382
        row.append([op.input.__getstate__() for op in self.ops])
383
      elif fname == "opresult":
384
        row.append([op.result for op in self.ops])
385
      elif fname == "opstatus":
386
        row.append([op.status for op in self.ops])
387
      elif fname == "oplog":
388
        row.append([op.log for op in self.ops])
389
      elif fname == "opstart":
390
        row.append([op.start_timestamp for op in self.ops])
391
      elif fname == "opexec":
392
        row.append([op.exec_timestamp for op in self.ops])
393
      elif fname == "opend":
394
        row.append([op.end_timestamp for op in self.ops])
395
      elif fname == "oppriority":
396
        row.append([op.priority for op in self.ops])
397
      elif fname == "received_ts":
398
        row.append(self.received_timestamp)
399
      elif fname == "start_ts":
400
        row.append(self.start_timestamp)
401
      elif fname == "end_ts":
402
        row.append(self.end_timestamp)
403
      elif fname == "summary":
404
        row.append([op.input.Summary() for op in self.ops])
405
      else:
406
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
407
    return row
408

    
409
  def MarkUnfinishedOps(self, status, result):
410
    """Mark unfinished opcodes with a given status and result.
411

412
    This is an utility function for marking all running or waiting to
413
    be run opcodes with a given status. Opcodes which are already
414
    finalised are not changed.
415

416
    @param status: a given opcode status
417
    @param result: the opcode result
418

419
    """
420
    not_marked = True
421
    for op in self.ops:
422
      if op.status in constants.OPS_FINALIZED:
423
        assert not_marked, "Finalized opcodes found after non-finalized ones"
424
        continue
425
      op.status = status
426
      op.result = result
427
      not_marked = False
428

    
429
  def Finalize(self):
430
    """Marks the job as finalized.
431

432
    """
433
    self.end_timestamp = TimeStampNow()
434

    
435
  def Cancel(self):
436
    """Marks job as canceled/-ing if possible.
437

438
    @rtype: tuple; (bool, string)
439
    @return: Boolean describing whether job was successfully canceled or marked
440
      as canceling and a text message
441

442
    """
443
    status = self.CalcStatus()
444

    
445
    if status == constants.JOB_STATUS_QUEUED:
446
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
447
                             "Job canceled by request")
448
      self.Finalize()
449
      return (True, "Job %s canceled" % self.id)
450

    
451
    elif status == constants.JOB_STATUS_WAITLOCK:
452
      # The worker will notice the new status and cancel the job
453
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
454
      return (True, "Job %s will be canceled" % self.id)
455

    
456
    else:
457
      logging.debug("Job %s is no longer waiting in the queue", self.id)
458
      return (False, "Job %s is no longer waiting in the queue" % self.id)
459

    
460

    
461
class _OpExecCallbacks(mcpu.OpExecCbBase):
462
  def __init__(self, queue, job, op):
463
    """Initializes this class.
464

465
    @type queue: L{JobQueue}
466
    @param queue: Job queue
467
    @type job: L{_QueuedJob}
468
    @param job: Job object
469
    @type op: L{_QueuedOpCode}
470
    @param op: OpCode
471

472
    """
473
    assert queue, "Queue is missing"
474
    assert job, "Job is missing"
475
    assert op, "Opcode is missing"
476

    
477
    self._queue = queue
478
    self._job = job
479
    self._op = op
480

    
481
  def _CheckCancel(self):
482
    """Raises an exception to cancel the job if asked to.
483

484
    """
485
    # Cancel here if we were asked to
486
    if self._op.status == constants.OP_STATUS_CANCELING:
487
      logging.debug("Canceling opcode")
488
      raise CancelJob()
489

    
490
  @locking.ssynchronized(_QUEUE, shared=1)
491
  def NotifyStart(self):
492
    """Mark the opcode as running, not lock-waiting.
493

494
    This is called from the mcpu code as a notifier function, when the LU is
495
    finally about to start the Exec() method. Of course, to have end-user
496
    visible results, the opcode must be initially (before calling into
497
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
498

499
    """
500
    assert self._op in self._job.ops
501
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
502
                               constants.OP_STATUS_CANCELING)
503

    
504
    # Cancel here if we were asked to
505
    self._CheckCancel()
506

    
507
    logging.debug("Opcode is now running")
508

    
509
    self._op.status = constants.OP_STATUS_RUNNING
510
    self._op.exec_timestamp = TimeStampNow()
511

    
512
    # And finally replicate the job status
513
    self._queue.UpdateJobUnlocked(self._job)
514

    
515
  @locking.ssynchronized(_QUEUE, shared=1)
516
  def _AppendFeedback(self, timestamp, log_type, log_msg):
517
    """Internal feedback append function, with locks
518

519
    """
520
    self._job.log_serial += 1
521
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
522
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
523

    
524
  def Feedback(self, *args):
525
    """Append a log entry.
526

527
    """
528
    assert len(args) < 3
529

    
530
    if len(args) == 1:
531
      log_type = constants.ELOG_MESSAGE
532
      log_msg = args[0]
533
    else:
534
      (log_type, log_msg) = args
535

    
536
    # The time is split to make serialization easier and not lose
537
    # precision.
538
    timestamp = utils.SplitTime(time.time())
539
    self._AppendFeedback(timestamp, log_type, log_msg)
540

    
541
  def CheckCancel(self):
542
    """Check whether job has been cancelled.
543

544
    """
545
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
546
                               constants.OP_STATUS_CANCELING)
547

    
548
    # Cancel here if we were asked to
549
    self._CheckCancel()
550

    
551

    
552
class _JobChangesChecker(object):
553
  def __init__(self, fields, prev_job_info, prev_log_serial):
554
    """Initializes this class.
555

556
    @type fields: list of strings
557
    @param fields: Fields requested by LUXI client
558
    @type prev_job_info: string
559
    @param prev_job_info: previous job info, as passed by the LUXI client
560
    @type prev_log_serial: string
561
    @param prev_log_serial: previous job serial, as passed by the LUXI client
562

563
    """
564
    self._fields = fields
565
    self._prev_job_info = prev_job_info
566
    self._prev_log_serial = prev_log_serial
567

    
568
  def __call__(self, job):
569
    """Checks whether job has changed.
570

571
    @type job: L{_QueuedJob}
572
    @param job: Job object
573

574
    """
575
    status = job.CalcStatus()
576
    job_info = job.GetInfo(self._fields)
577
    log_entries = job.GetLogEntries(self._prev_log_serial)
578

    
579
    # Serializing and deserializing data can cause type changes (e.g. from
580
    # tuple to list) or precision loss. We're doing it here so that we get
581
    # the same modifications as the data received from the client. Without
582
    # this, the comparison afterwards might fail without the data being
583
    # significantly different.
584
    # TODO: we just deserialized from disk, investigate how to make sure that
585
    # the job info and log entries are compatible to avoid this further step.
586
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
587
    # efficient, though floats will be tricky
588
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
589
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
590

    
591
    # Don't even try to wait if the job is no longer running, there will be
592
    # no changes.
593
    if (status not in (constants.JOB_STATUS_QUEUED,
594
                       constants.JOB_STATUS_RUNNING,
595
                       constants.JOB_STATUS_WAITLOCK) or
596
        job_info != self._prev_job_info or
597
        (log_entries and self._prev_log_serial != log_entries[0][0])):
598
      logging.debug("Job %s changed", job.id)
599
      return (job_info, log_entries)
600

    
601
    return None
602

    
603

    
604
class _JobFileChangesWaiter(object):
605
  def __init__(self, filename):
606
    """Initializes this class.
607

608
    @type filename: string
609
    @param filename: Path to job file
610
    @raises errors.InotifyError: if the notifier cannot be setup
611

612
    """
613
    self._wm = pyinotify.WatchManager()
614
    self._inotify_handler = \
615
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
616
    self._notifier = \
617
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
618
    try:
619
      self._inotify_handler.enable()
620
    except Exception:
621
      # pyinotify doesn't close file descriptors automatically
622
      self._notifier.stop()
623
      raise
624

    
625
  def _OnInotify(self, notifier_enabled):
626
    """Callback for inotify.
627

628
    """
629
    if not notifier_enabled:
630
      self._inotify_handler.enable()
631

    
632
  def Wait(self, timeout):
633
    """Waits for the job file to change.
634

635
    @type timeout: float
636
    @param timeout: Timeout in seconds
637
    @return: Whether there have been events
638

639
    """
640
    assert timeout >= 0
641
    have_events = self._notifier.check_events(timeout * 1000)
642
    if have_events:
643
      self._notifier.read_events()
644
    self._notifier.process_events()
645
    return have_events
646

    
647
  def Close(self):
648
    """Closes underlying notifier and its file descriptor.
649

650
    """
651
    self._notifier.stop()
652

    
653

    
654
class _JobChangesWaiter(object):
655
  def __init__(self, filename):
656
    """Initializes this class.
657

658
    @type filename: string
659
    @param filename: Path to job file
660

661
    """
662
    self._filewaiter = None
663
    self._filename = filename
664

    
665
  def Wait(self, timeout):
666
    """Waits for a job to change.
667

668
    @type timeout: float
669
    @param timeout: Timeout in seconds
670
    @return: Whether there have been events
671

672
    """
673
    if self._filewaiter:
674
      return self._filewaiter.Wait(timeout)
675

    
676
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
677
    # If this point is reached, return immediately and let caller check the job
678
    # file again in case there were changes since the last check. This avoids a
679
    # race condition.
680
    self._filewaiter = _JobFileChangesWaiter(self._filename)
681

    
682
    return True
683

    
684
  def Close(self):
685
    """Closes underlying waiter.
686

687
    """
688
    if self._filewaiter:
689
      self._filewaiter.Close()
690

    
691

    
692
class _WaitForJobChangesHelper(object):
693
  """Helper class using inotify to wait for changes in a job file.
694

695
  This class takes a previous job status and serial, and alerts the client when
696
  the current job status has changed.
697

698
  """
699
  @staticmethod
700
  def _CheckForChanges(job_load_fn, check_fn):
701
    job = job_load_fn()
702
    if not job:
703
      raise errors.JobLost()
704

    
705
    result = check_fn(job)
706
    if result is None:
707
      raise utils.RetryAgain()
708

    
709
    return result
710

    
711
  def __call__(self, filename, job_load_fn,
712
               fields, prev_job_info, prev_log_serial, timeout):
713
    """Waits for changes on a job.
714

715
    @type filename: string
716
    @param filename: File on which to wait for changes
717
    @type job_load_fn: callable
718
    @param job_load_fn: Function to load job
719
    @type fields: list of strings
720
    @param fields: Which fields to check for changes
721
    @type prev_job_info: list or None
722
    @param prev_job_info: Last job information returned
723
    @type prev_log_serial: int
724
    @param prev_log_serial: Last job message serial number
725
    @type timeout: float
726
    @param timeout: maximum time to wait in seconds
727

728
    """
729
    try:
730
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
731
      waiter = _JobChangesWaiter(filename)
732
      try:
733
        return utils.Retry(compat.partial(self._CheckForChanges,
734
                                          job_load_fn, check_fn),
735
                           utils.RETRY_REMAINING_TIME, timeout,
736
                           wait_fn=waiter.Wait)
737
      finally:
738
        waiter.Close()
739
    except (errors.InotifyError, errors.JobLost):
740
      return None
741
    except utils.RetryTimeout:
742
      return constants.JOB_NOTCHANGED
743

    
744

    
745
def _EncodeOpError(err):
746
  """Encodes an error which occurred while processing an opcode.
747

748
  """
749
  if isinstance(err, errors.GenericError):
750
    to_encode = err
751
  else:
752
    to_encode = errors.OpExecError(str(err))
753

    
754
  return errors.EncodeException(to_encode)
755

    
756

    
757
class _TimeoutStrategyWrapper:
758
  def __init__(self, fn):
759
    """Initializes this class.
760

761
    """
762
    self._fn = fn
763
    self._next = None
764

    
765
  def _Advance(self):
766
    """Gets the next timeout if necessary.
767

768
    """
769
    if self._next is None:
770
      self._next = self._fn()
771

    
772
  def Peek(self):
773
    """Returns the next timeout.
774

775
    """
776
    self._Advance()
777
    return self._next
778

    
779
  def Next(self):
780
    """Returns the current timeout and advances the internal state.
781

782
    """
783
    self._Advance()
784
    result = self._next
785
    self._next = None
786
    return result
787

    
788

    
789
class _OpExecContext:
790
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
791
    """Initializes this class.
792

793
    """
794
    self.op = op
795
    self.index = index
796
    self.log_prefix = log_prefix
797
    self.summary = op.input.Summary()
798

    
799
    self._timeout_strategy_factory = timeout_strategy_factory
800
    self._ResetTimeoutStrategy()
801

    
802
  def _ResetTimeoutStrategy(self):
803
    """Creates a new timeout strategy.
804

805
    """
806
    self._timeout_strategy = \
807
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
808

    
809
  def CheckPriorityIncrease(self):
810
    """Checks whether priority can and should be increased.
811

812
    Called when locks couldn't be acquired.
813

814
    """
815
    op = self.op
816

    
817
    # Exhausted all retries and next round should not use blocking acquire
818
    # for locks?
819
    if (self._timeout_strategy.Peek() is None and
820
        op.priority > constants.OP_PRIO_HIGHEST):
821
      logging.debug("Increasing priority")
822
      op.priority -= 1
823
      self._ResetTimeoutStrategy()
824
      return True
825

    
826
    return False
827

    
828
  def GetNextLockTimeout(self):
829
    """Returns the next lock acquire timeout.
830

831
    """
832
    return self._timeout_strategy.Next()
833

    
834

    
835
class _JobProcessor(object):
836
  def __init__(self, queue, opexec_fn, job,
837
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
838
    """Initializes this class.
839

840
    """
841
    self.queue = queue
842
    self.opexec_fn = opexec_fn
843
    self.job = job
844
    self._timeout_strategy_factory = _timeout_strategy_factory
845

    
846
  @staticmethod
847
  def _FindNextOpcode(job, timeout_strategy_factory):
848
    """Locates the next opcode to run.
849

850
    @type job: L{_QueuedJob}
851
    @param job: Job object
852
    @param timeout_strategy_factory: Callable to create new timeout strategy
853

854
    """
855
    # Create some sort of a cache to speed up locating next opcode for future
856
    # lookups
857
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
858
    # pending and one for processed ops.
859
    if job.ops_iter is None:
860
      job.ops_iter = enumerate(job.ops)
861

    
862
    # Find next opcode to run
863
    while True:
864
      try:
865
        (idx, op) = job.ops_iter.next()
866
      except StopIteration:
867
        raise errors.ProgrammerError("Called for a finished job")
868

    
869
      if op.status == constants.OP_STATUS_RUNNING:
870
        # Found an opcode already marked as running
871
        raise errors.ProgrammerError("Called for job marked as running")
872

    
873
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
874
                             timeout_strategy_factory)
875

    
876
      if op.status not in constants.OPS_FINALIZED:
877
        return opctx
878

    
879
      # This is a job that was partially completed before master daemon
880
      # shutdown, so it can be expected that some opcodes are already
881
      # completed successfully (if any did error out, then the whole job
882
      # should have been aborted and not resubmitted for processing).
883
      logging.info("%s: opcode %s already processed, skipping",
884
                   opctx.log_prefix, opctx.summary)
885

    
886
  @staticmethod
887
  def _MarkWaitlock(job, op):
888
    """Marks an opcode as waiting for locks.
889

890
    The job's start timestamp is also set if necessary.
891

892
    @type job: L{_QueuedJob}
893
    @param job: Job object
894
    @type op: L{_QueuedOpCode}
895
    @param op: Opcode object
896

897
    """
898
    assert op in job.ops
899
    assert op.status in (constants.OP_STATUS_QUEUED,
900
                         constants.OP_STATUS_WAITLOCK)
901

    
902
    update = False
903

    
904
    op.result = None
905

    
906
    if op.status == constants.OP_STATUS_QUEUED:
907
      op.status = constants.OP_STATUS_WAITLOCK
908
      update = True
909

    
910
    if op.start_timestamp is None:
911
      op.start_timestamp = TimeStampNow()
912
      update = True
913

    
914
    if job.start_timestamp is None:
915
      job.start_timestamp = op.start_timestamp
916
      update = True
917

    
918
    assert op.status == constants.OP_STATUS_WAITLOCK
919

    
920
    return update
921

    
922
  def _ExecOpCodeUnlocked(self, opctx):
923
    """Processes one opcode and returns the result.
924

925
    """
926
    op = opctx.op
927

    
928
    assert op.status == constants.OP_STATUS_WAITLOCK
929

    
930
    timeout = opctx.GetNextLockTimeout()
931

    
932
    try:
933
      # Make sure not to hold queue lock while calling ExecOpCode
934
      result = self.opexec_fn(op.input,
935
                              _OpExecCallbacks(self.queue, self.job, op),
936
                              timeout=timeout, priority=op.priority)
937
    except mcpu.LockAcquireTimeout:
938
      assert timeout is not None, "Received timeout for blocking acquire"
939
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
940

    
941
      assert op.status in (constants.OP_STATUS_WAITLOCK,
942
                           constants.OP_STATUS_CANCELING)
943

    
944
      # Was job cancelled while we were waiting for the lock?
945
      if op.status == constants.OP_STATUS_CANCELING:
946
        return (constants.OP_STATUS_CANCELING, None)
947

    
948
      # Stay in waitlock while trying to re-acquire lock
949
      return (constants.OP_STATUS_WAITLOCK, None)
950
    except CancelJob:
951
      logging.exception("%s: Canceling job", opctx.log_prefix)
952
      assert op.status == constants.OP_STATUS_CANCELING
953
      return (constants.OP_STATUS_CANCELING, None)
954
    except Exception, err: # pylint: disable-msg=W0703
955
      logging.exception("%s: Caught exception in %s",
956
                        opctx.log_prefix, opctx.summary)
957
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
958
    else:
959
      logging.debug("%s: %s successful",
960
                    opctx.log_prefix, opctx.summary)
961
      return (constants.OP_STATUS_SUCCESS, result)
962

    
963
  def __call__(self, _nextop_fn=None):
964
    """Continues execution of a job.
965

966
    @param _nextop_fn: Callback function for tests
967
    @rtype: bool
968
    @return: True if job is finished, False if processor needs to be called
969
             again
970

971
    """
972
    queue = self.queue
973
    job = self.job
974

    
975
    logging.debug("Processing job %s", job.id)
976

    
977
    queue.acquire(shared=1)
978
    try:
979
      opcount = len(job.ops)
980

    
981
      # Don't do anything for finalized jobs
982
      if job.CalcStatus() in constants.JOBS_FINALIZED:
983
        return True
984

    
985
      # Is a previous opcode still pending?
986
      if job.cur_opctx:
987
        opctx = job.cur_opctx
988
        job.cur_opctx = None
989
      else:
990
        if __debug__ and _nextop_fn:
991
          _nextop_fn()
992
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
993

    
994
      op = opctx.op
995

    
996
      # Consistency check
997
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
998
                                     constants.OP_STATUS_CANCELING)
999
                        for i in job.ops[opctx.index + 1:])
1000

    
1001
      assert op.status in (constants.OP_STATUS_QUEUED,
1002
                           constants.OP_STATUS_WAITLOCK,
1003
                           constants.OP_STATUS_CANCELING)
1004

    
1005
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1006
              op.priority >= constants.OP_PRIO_HIGHEST)
1007

    
1008
      if op.status != constants.OP_STATUS_CANCELING:
1009
        assert op.status in (constants.OP_STATUS_QUEUED,
1010
                             constants.OP_STATUS_WAITLOCK)
1011

    
1012
        # Prepare to start opcode
1013
        if self._MarkWaitlock(job, op):
1014
          # Write to disk
1015
          queue.UpdateJobUnlocked(job)
1016

    
1017
        assert op.status == constants.OP_STATUS_WAITLOCK
1018
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1019
        assert job.start_timestamp and op.start_timestamp
1020

    
1021
        logging.info("%s: opcode %s waiting for locks",
1022
                     opctx.log_prefix, opctx.summary)
1023

    
1024
        queue.release()
1025
        try:
1026
          (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1027
        finally:
1028
          queue.acquire(shared=1)
1029

    
1030
        op.status = op_status
1031
        op.result = op_result
1032

    
1033
        if op.status == constants.OP_STATUS_WAITLOCK:
1034
          # Couldn't get locks in time
1035
          assert not op.end_timestamp
1036
        else:
1037
          # Finalize opcode
1038
          op.end_timestamp = TimeStampNow()
1039

    
1040
          if op.status == constants.OP_STATUS_CANCELING:
1041
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1042
                                  for i in job.ops[opctx.index:])
1043
          else:
1044
            assert op.status in constants.OPS_FINALIZED
1045

    
1046
      if op.status == constants.OP_STATUS_WAITLOCK:
1047
        finalize = False
1048

    
1049
        if opctx.CheckPriorityIncrease():
1050
          # Priority was changed, need to update on-disk file
1051
          queue.UpdateJobUnlocked(job)
1052

    
1053
        # Keep around for another round
1054
        job.cur_opctx = opctx
1055

    
1056
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1057
                op.priority >= constants.OP_PRIO_HIGHEST)
1058

    
1059
        # In no case must the status be finalized here
1060
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1061

    
1062
      else:
1063
        # Ensure all opcodes so far have been successful
1064
        assert (opctx.index == 0 or
1065
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1066
                           for i in job.ops[:opctx.index]))
1067

    
1068
        # Reset context
1069
        job.cur_opctx = None
1070

    
1071
        if op.status == constants.OP_STATUS_SUCCESS:
1072
          finalize = False
1073

    
1074
        elif op.status == constants.OP_STATUS_ERROR:
1075
          # Ensure failed opcode has an exception as its result
1076
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1077

    
1078
          to_encode = errors.OpExecError("Preceding opcode failed")
1079
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1080
                                _EncodeOpError(to_encode))
1081
          finalize = True
1082

    
1083
          # Consistency check
1084
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1085
                            errors.GetEncodedError(i.result)
1086
                            for i in job.ops[opctx.index:])
1087

    
1088
        elif op.status == constants.OP_STATUS_CANCELING:
1089
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1090
                                "Job canceled by request")
1091
          finalize = True
1092

    
1093
        else:
1094
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1095

    
1096
        if opctx.index == (opcount - 1):
1097
          # Finalize on last opcode
1098
          finalize = True
1099

    
1100
        if finalize:
1101
          # All opcodes have been run, finalize job
1102
          job.Finalize()
1103

    
1104
        # Write to disk. If the job status is final, this is the final write
1105
        # allowed. Once the file has been written, it can be archived anytime.
1106
        queue.UpdateJobUnlocked(job)
1107

    
1108
        if finalize:
1109
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1110
          return True
1111

    
1112
      return False
1113
    finally:
1114
      queue.release()
1115

    
1116

    
1117
class _JobQueueWorker(workerpool.BaseWorker):
1118
  """The actual job workers.
1119

1120
  """
1121
  def RunTask(self, job): # pylint: disable-msg=W0221
1122
    """Job executor.
1123

1124
    This functions processes a job. It is closely tied to the L{_QueuedJob} and
1125
    L{_QueuedOpCode} classes.
1126

1127
    @type job: L{_QueuedJob}
1128
    @param job: the job to be processed
1129

1130
    """
1131
    queue = job.queue
1132
    assert queue == self.pool.queue
1133

    
1134
    self.SetTaskName("Job%s" % job.id)
1135

    
1136
    proc = mcpu.Processor(queue.context, job.id)
1137

    
1138
    if not _JobProcessor(queue, proc.ExecOpCode, job)():
1139
      # Schedule again
1140
      raise workerpool.DeferTask(priority=job.CalcPriority())
1141

    
1142

    
1143
class _JobQueueWorkerPool(workerpool.WorkerPool):
1144
  """Simple class implementing a job-processing workerpool.
1145

1146
  """
1147
  def __init__(self, queue):
1148
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
1149
                                              JOBQUEUE_THREADS,
1150
                                              _JobQueueWorker)
1151
    self.queue = queue
1152

    
1153

    
1154
def _RequireOpenQueue(fn):
1155
  """Decorator for "public" functions.
1156

1157
  This function should be used for all 'public' functions. That is,
1158
  functions usually called from other classes. Note that this should
1159
  be applied only to methods (not plain functions), since it expects
1160
  that the decorated function is called with a first argument that has
1161
  a '_queue_filelock' argument.
1162

1163
  @warning: Use this decorator only after locking.ssynchronized
1164

1165
  Example::
1166
    @locking.ssynchronized(_LOCK)
1167
    @_RequireOpenQueue
1168
    def Example(self):
1169
      pass
1170

1171
  """
1172
  def wrapper(self, *args, **kwargs):
1173
    # pylint: disable-msg=W0212
1174
    assert self._queue_filelock is not None, "Queue should be open"
1175
    return fn(self, *args, **kwargs)
1176
  return wrapper
1177

    
1178

    
1179
class JobQueue(object):
1180
  """Queue used to manage the jobs.
1181

1182
  @cvar _RE_JOB_FILE: regex matching the valid job file names
1183

1184
  """
1185
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1186

    
1187
  def __init__(self, context):
1188
    """Constructor for JobQueue.
1189

1190
    The constructor will initialize the job queue object and then
1191
    start loading the current jobs from disk, either for starting them
1192
    (if they were queue) or for aborting them (if they were already
1193
    running).
1194

1195
    @type context: GanetiContext
1196
    @param context: the context object for access to the configuration
1197
        data and other ganeti objects
1198

1199
    """
1200
    self.context = context
1201
    self._memcache = weakref.WeakValueDictionary()
1202
    self._my_hostname = netutils.Hostname.GetSysName()
1203

    
1204
    # The Big JobQueue lock. If a code block or method acquires it in shared
1205
    # mode safe it must guarantee concurrency with all the code acquiring it in
1206
    # shared mode, including itself. In order not to acquire it at all
1207
    # concurrency must be guaranteed with all code acquiring it in shared mode
1208
    # and all code acquiring it exclusively.
1209
    self._lock = locking.SharedLock("JobQueue")
1210

    
1211
    self.acquire = self._lock.acquire
1212
    self.release = self._lock.release
1213

    
1214
    # Initialize the queue, and acquire the filelock.
1215
    # This ensures no other process is working on the job queue.
1216
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1217

    
1218
    # Read serial file
1219
    self._last_serial = jstore.ReadSerial()
1220
    assert self._last_serial is not None, ("Serial file was modified between"
1221
                                           " check in jstore and here")
1222

    
1223
    # Get initial list of nodes
1224
    self._nodes = dict((n.name, n.primary_ip)
1225
                       for n in self.context.cfg.GetAllNodesInfo().values()
1226
                       if n.master_candidate)
1227

    
1228
    # Remove master node
1229
    self._nodes.pop(self._my_hostname, None)
1230

    
1231
    # TODO: Check consistency across nodes
1232

    
1233
    self._queue_size = 0
1234
    self._UpdateQueueSizeUnlocked()
1235
    self._drained = self._IsQueueMarkedDrain()
1236

    
1237
    # Setup worker pool
1238
    self._wpool = _JobQueueWorkerPool(self)
1239
    try:
1240
      self._InspectQueue()
1241
    except:
1242
      self._wpool.TerminateWorkers()
1243
      raise
1244

    
1245
  @locking.ssynchronized(_LOCK)
1246
  @_RequireOpenQueue
1247
  def _InspectQueue(self):
1248
    """Loads the whole job queue and resumes unfinished jobs.
1249

1250
    This function needs the lock here because WorkerPool.AddTask() may start a
1251
    job while we're still doing our work.
1252

1253
    """
1254
    logging.info("Inspecting job queue")
1255

    
1256
    restartjobs = []
1257

    
1258
    all_job_ids = self._GetJobIDsUnlocked()
1259
    jobs_count = len(all_job_ids)
1260
    lastinfo = time.time()
1261
    for idx, job_id in enumerate(all_job_ids):
1262
      # Give an update every 1000 jobs or 10 seconds
1263
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1264
          idx == (jobs_count - 1)):
1265
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1266
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1267
        lastinfo = time.time()
1268

    
1269
      job = self._LoadJobUnlocked(job_id)
1270

    
1271
      # a failure in loading the job can cause 'None' to be returned
1272
      if job is None:
1273
        continue
1274

    
1275
      status = job.CalcStatus()
1276

    
1277
      if status == constants.JOB_STATUS_QUEUED:
1278
        restartjobs.append(job)
1279

    
1280
      elif status in (constants.JOB_STATUS_RUNNING,
1281
                      constants.JOB_STATUS_WAITLOCK,
1282
                      constants.JOB_STATUS_CANCELING):
1283
        logging.warning("Unfinished job %s found: %s", job.id, job)
1284

    
1285
        if status == constants.JOB_STATUS_WAITLOCK:
1286
          # Restart job
1287
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1288
          restartjobs.append(job)
1289
        else:
1290
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1291
                                "Unclean master daemon shutdown")
1292

    
1293
        self.UpdateJobUnlocked(job)
1294

    
1295
    if restartjobs:
1296
      logging.info("Restarting %s jobs", len(restartjobs))
1297
      self._EnqueueJobs(restartjobs)
1298

    
1299
    logging.info("Job queue inspection finished")
1300

    
1301
  @locking.ssynchronized(_LOCK)
1302
  @_RequireOpenQueue
1303
  def AddNode(self, node):
1304
    """Register a new node with the queue.
1305

1306
    @type node: L{objects.Node}
1307
    @param node: the node object to be added
1308

1309
    """
1310
    node_name = node.name
1311
    assert node_name != self._my_hostname
1312

    
1313
    # Clean queue directory on added node
1314
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1315
    msg = result.fail_msg
1316
    if msg:
1317
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1318
                      node_name, msg)
1319

    
1320
    if not node.master_candidate:
1321
      # remove if existing, ignoring errors
1322
      self._nodes.pop(node_name, None)
1323
      # and skip the replication of the job ids
1324
      return
1325

    
1326
    # Upload the whole queue excluding archived jobs
1327
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1328

    
1329
    # Upload current serial file
1330
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1331

    
1332
    for file_name in files:
1333
      # Read file content
1334
      content = utils.ReadFile(file_name)
1335

    
1336
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
1337
                                                  [node.primary_ip],
1338
                                                  file_name, content)
1339
      msg = result[node_name].fail_msg
1340
      if msg:
1341
        logging.error("Failed to upload file %s to node %s: %s",
1342
                      file_name, node_name, msg)
1343

    
1344
    self._nodes[node_name] = node.primary_ip
1345

    
1346
  @locking.ssynchronized(_LOCK)
1347
  @_RequireOpenQueue
1348
  def RemoveNode(self, node_name):
1349
    """Callback called when removing nodes from the cluster.
1350

1351
    @type node_name: str
1352
    @param node_name: the name of the node to remove
1353

1354
    """
1355
    self._nodes.pop(node_name, None)
1356

    
1357
  @staticmethod
1358
  def _CheckRpcResult(result, nodes, failmsg):
1359
    """Verifies the status of an RPC call.
1360

1361
    Since we aim to keep consistency should this node (the current
1362
    master) fail, we will log errors if our rpc fail, and especially
1363
    log the case when more than half of the nodes fails.
1364

1365
    @param result: the data as returned from the rpc call
1366
    @type nodes: list
1367
    @param nodes: the list of nodes we made the call to
1368
    @type failmsg: str
1369
    @param failmsg: the identifier to be used for logging
1370

1371
    """
1372
    failed = []
1373
    success = []
1374

    
1375
    for node in nodes:
1376
      msg = result[node].fail_msg
1377
      if msg:
1378
        failed.append(node)
1379
        logging.error("RPC call %s (%s) failed on node %s: %s",
1380
                      result[node].call, failmsg, node, msg)
1381
      else:
1382
        success.append(node)
1383

    
1384
    # +1 for the master node
1385
    if (len(success) + 1) < len(failed):
1386
      # TODO: Handle failing nodes
1387
      logging.error("More than half of the nodes failed")
1388

    
1389
  def _GetNodeIp(self):
1390
    """Helper for returning the node name/ip list.
1391

1392
    @rtype: (list, list)
1393
    @return: a tuple of two lists, the first one with the node
1394
        names and the second one with the node addresses
1395

1396
    """
1397
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1398
    name_list = self._nodes.keys()
1399
    addr_list = [self._nodes[name] for name in name_list]
1400
    return name_list, addr_list
1401

    
1402
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1403
    """Writes a file locally and then replicates it to all nodes.
1404

1405
    This function will replace the contents of a file on the local
1406
    node and then replicate it to all the other nodes we have.
1407

1408
    @type file_name: str
1409
    @param file_name: the path of the file to be replicated
1410
    @type data: str
1411
    @param data: the new contents of the file
1412
    @type replicate: boolean
1413
    @param replicate: whether to spread the changes to the remote nodes
1414

1415
    """
1416
    getents = runtime.GetEnts()
1417
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1418
                    gid=getents.masterd_gid)
1419

    
1420
    if replicate:
1421
      names, addrs = self._GetNodeIp()
1422
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1423
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1424

    
1425
  def _RenameFilesUnlocked(self, rename):
1426
    """Renames a file locally and then replicate the change.
1427

1428
    This function will rename a file in the local queue directory
1429
    and then replicate this rename to all the other nodes we have.
1430

1431
    @type rename: list of (old, new)
1432
    @param rename: List containing tuples mapping old to new names
1433

1434
    """
1435
    # Rename them locally
1436
    for old, new in rename:
1437
      utils.RenameFile(old, new, mkdir=True)
1438

    
1439
    # ... and on all nodes
1440
    names, addrs = self._GetNodeIp()
1441
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1442
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1443

    
1444
  @staticmethod
1445
  def _FormatJobID(job_id):
1446
    """Convert a job ID to string format.
1447

1448
    Currently this just does C{str(job_id)} after performing some
1449
    checks, but if we want to change the job id format this will
1450
    abstract this change.
1451

1452
    @type job_id: int or long
1453
    @param job_id: the numeric job id
1454
    @rtype: str
1455
    @return: the formatted job id
1456

1457
    """
1458
    if not isinstance(job_id, (int, long)):
1459
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1460
    if job_id < 0:
1461
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1462

    
1463
    return str(job_id)
1464

    
1465
  @classmethod
1466
  def _GetArchiveDirectory(cls, job_id):
1467
    """Returns the archive directory for a job.
1468

1469
    @type job_id: str
1470
    @param job_id: Job identifier
1471
    @rtype: str
1472
    @return: Directory name
1473

1474
    """
1475
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1476

    
1477
  def _NewSerialsUnlocked(self, count):
1478
    """Generates a new job identifier.
1479

1480
    Job identifiers are unique during the lifetime of a cluster.
1481

1482
    @type count: integer
1483
    @param count: how many serials to return
1484
    @rtype: str
1485
    @return: a string representing the job identifier.
1486

1487
    """
1488
    assert count > 0
1489
    # New number
1490
    serial = self._last_serial + count
1491

    
1492
    # Write to file
1493
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1494
                             "%s\n" % serial, True)
1495

    
1496
    result = [self._FormatJobID(v)
1497
              for v in range(self._last_serial + 1, serial + 1)]
1498

    
1499
    # Keep it only if we were able to write the file
1500
    self._last_serial = serial
1501

    
1502
    assert len(result) == count
1503

    
1504
    return result
1505

    
1506
  @staticmethod
1507
  def _GetJobPath(job_id):
1508
    """Returns the job file for a given job id.
1509

1510
    @type job_id: str
1511
    @param job_id: the job identifier
1512
    @rtype: str
1513
    @return: the path to the job file
1514

1515
    """
1516
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1517

    
1518
  @classmethod
1519
  def _GetArchivedJobPath(cls, job_id):
1520
    """Returns the archived job file for a give job id.
1521

1522
    @type job_id: str
1523
    @param job_id: the job identifier
1524
    @rtype: str
1525
    @return: the path to the archived job file
1526

1527
    """
1528
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1529
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1530

    
1531
  def _GetJobIDsUnlocked(self, sort=True):
1532
    """Return all known job IDs.
1533

1534
    The method only looks at disk because it's a requirement that all
1535
    jobs are present on disk (so in the _memcache we don't have any
1536
    extra IDs).
1537

1538
    @type sort: boolean
1539
    @param sort: perform sorting on the returned job ids
1540
    @rtype: list
1541
    @return: the list of job IDs
1542

1543
    """
1544
    jlist = []
1545
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1546
      m = self._RE_JOB_FILE.match(filename)
1547
      if m:
1548
        jlist.append(m.group(1))
1549
    if sort:
1550
      jlist = utils.NiceSort(jlist)
1551
    return jlist
1552

    
1553
  def _LoadJobUnlocked(self, job_id):
1554
    """Loads a job from the disk or memory.
1555

1556
    Given a job id, this will return the cached job object if
1557
    existing, or try to load the job from the disk. If loading from
1558
    disk, it will also add the job to the cache.
1559

1560
    @param job_id: the job id
1561
    @rtype: L{_QueuedJob} or None
1562
    @return: either None or the job object
1563

1564
    """
1565
    job = self._memcache.get(job_id, None)
1566
    if job:
1567
      logging.debug("Found job %s in memcache", job_id)
1568
      return job
1569

    
1570
    try:
1571
      job = self._LoadJobFromDisk(job_id)
1572
      if job is None:
1573
        return job
1574
    except errors.JobFileCorrupted:
1575
      old_path = self._GetJobPath(job_id)
1576
      new_path = self._GetArchivedJobPath(job_id)
1577
      if old_path == new_path:
1578
        # job already archived (future case)
1579
        logging.exception("Can't parse job %s", job_id)
1580
      else:
1581
        # non-archived case
1582
        logging.exception("Can't parse job %s, will archive.", job_id)
1583
        self._RenameFilesUnlocked([(old_path, new_path)])
1584
      return None
1585

    
1586
    self._memcache[job_id] = job
1587
    logging.debug("Added job %s to the cache", job_id)
1588
    return job
1589

    
1590
  def _LoadJobFromDisk(self, job_id):
1591
    """Load the given job file from disk.
1592

1593
    Given a job file, read, load and restore it in a _QueuedJob format.
1594

1595
    @type job_id: string
1596
    @param job_id: job identifier
1597
    @rtype: L{_QueuedJob} or None
1598
    @return: either None or the job object
1599

1600
    """
1601
    filepath = self._GetJobPath(job_id)
1602
    logging.debug("Loading job from %s", filepath)
1603
    try:
1604
      raw_data = utils.ReadFile(filepath)
1605
    except EnvironmentError, err:
1606
      if err.errno in (errno.ENOENT, ):
1607
        return None
1608
      raise
1609

    
1610
    try:
1611
      data = serializer.LoadJson(raw_data)
1612
      job = _QueuedJob.Restore(self, data)
1613
    except Exception, err: # pylint: disable-msg=W0703
1614
      raise errors.JobFileCorrupted(err)
1615

    
1616
    return job
1617

    
1618
  def SafeLoadJobFromDisk(self, job_id):
1619
    """Load the given job file from disk.
1620

1621
    Given a job file, read, load and restore it in a _QueuedJob format.
1622
    In case of error reading the job, it gets returned as None, and the
1623
    exception is logged.
1624

1625
    @type job_id: string
1626
    @param job_id: job identifier
1627
    @rtype: L{_QueuedJob} or None
1628
    @return: either None or the job object
1629

1630
    """
1631
    try:
1632
      return self._LoadJobFromDisk(job_id)
1633
    except (errors.JobFileCorrupted, EnvironmentError):
1634
      logging.exception("Can't load/parse job %s", job_id)
1635
      return None
1636

    
1637
  @staticmethod
1638
  def _IsQueueMarkedDrain():
1639
    """Check if the queue is marked from drain.
1640

1641
    This currently uses the queue drain file, which makes it a
1642
    per-node flag. In the future this can be moved to the config file.
1643

1644
    @rtype: boolean
1645
    @return: True of the job queue is marked for draining
1646

1647
    """
1648
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1649

    
1650
  def _UpdateQueueSizeUnlocked(self):
1651
    """Update the queue size.
1652

1653
    """
1654
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1655

    
1656
  @locking.ssynchronized(_LOCK)
1657
  @_RequireOpenQueue
1658
  def SetDrainFlag(self, drain_flag):
1659
    """Sets the drain flag for the queue.
1660

1661
    @type drain_flag: boolean
1662
    @param drain_flag: Whether to set or unset the drain flag
1663

1664
    """
1665
    getents = runtime.GetEnts()
1666

    
1667
    if drain_flag:
1668
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
1669
                      uid=getents.masterd_uid, gid=getents.masterd_gid)
1670
    else:
1671
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1672

    
1673
    self._drained = drain_flag
1674

    
1675
    return True
1676

    
1677
  @_RequireOpenQueue
1678
  def _SubmitJobUnlocked(self, job_id, ops):
1679
    """Create and store a new job.
1680

1681
    This enters the job into our job queue and also puts it on the new
1682
    queue, in order for it to be picked up by the queue processors.
1683

1684
    @type job_id: job ID
1685
    @param job_id: the job ID for the new job
1686
    @type ops: list
1687
    @param ops: The list of OpCodes that will become the new job.
1688
    @rtype: L{_QueuedJob}
1689
    @return: the job object to be queued
1690
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1691
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1692
    @raise errors.GenericError: If an opcode is not valid
1693

1694
    """
1695
    # Ok when sharing the big job queue lock, as the drain file is created when
1696
    # the lock is exclusive.
1697
    if self._drained:
1698
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1699

    
1700
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1701
      raise errors.JobQueueFull()
1702

    
1703
    job = _QueuedJob(self, job_id, ops)
1704

    
1705
    # Check priority
1706
    for idx, op in enumerate(job.ops):
1707
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1708
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1709
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1710
                                  " are %s" % (idx, op.priority, allowed))
1711

    
1712
    # Write to disk
1713
    self.UpdateJobUnlocked(job)
1714

    
1715
    self._queue_size += 1
1716

    
1717
    logging.debug("Adding new job %s to the cache", job_id)
1718
    self._memcache[job_id] = job
1719

    
1720
    return job
1721

    
1722
  @locking.ssynchronized(_LOCK)
1723
  @_RequireOpenQueue
1724
  def SubmitJob(self, ops):
1725
    """Create and store a new job.
1726

1727
    @see: L{_SubmitJobUnlocked}
1728

1729
    """
1730
    job_id = self._NewSerialsUnlocked(1)[0]
1731
    self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1732
    return job_id
1733

    
1734
  @locking.ssynchronized(_LOCK)
1735
  @_RequireOpenQueue
1736
  def SubmitManyJobs(self, jobs):
1737
    """Create and store multiple jobs.
1738

1739
    @see: L{_SubmitJobUnlocked}
1740

1741
    """
1742
    results = []
1743
    added_jobs = []
1744
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1745
    for job_id, ops in zip(all_job_ids, jobs):
1746
      try:
1747
        added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1748
        status = True
1749
        data = job_id
1750
      except errors.GenericError, err:
1751
        data = str(err)
1752
        status = False
1753
      results.append((status, data))
1754

    
1755
    self._EnqueueJobs(added_jobs)
1756

    
1757
    return results
1758

    
1759
  def _EnqueueJobs(self, jobs):
1760
    """Helper function to add jobs to worker pool's queue.
1761

1762
    @type jobs: list
1763
    @param jobs: List of all jobs
1764

1765
    """
1766
    self._wpool.AddManyTasks([(job, ) for job in jobs],
1767
                             priority=[job.CalcPriority() for job in jobs])
1768

    
1769
  @_RequireOpenQueue
1770
  def UpdateJobUnlocked(self, job, replicate=True):
1771
    """Update a job's on disk storage.
1772

1773
    After a job has been modified, this function needs to be called in
1774
    order to write the changes to disk and replicate them to the other
1775
    nodes.
1776

1777
    @type job: L{_QueuedJob}
1778
    @param job: the changed job
1779
    @type replicate: boolean
1780
    @param replicate: whether to replicate the change to remote nodes
1781

1782
    """
1783
    if __debug__:
1784
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
1785
      assert (finalized ^ (job.end_timestamp is None))
1786

    
1787
    filename = self._GetJobPath(job.id)
1788
    data = serializer.DumpJson(job.Serialize(), indent=False)
1789
    logging.debug("Writing job %s to %s", job.id, filename)
1790
    self._UpdateJobQueueFile(filename, data, replicate)
1791

    
1792
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1793
                        timeout):
1794
    """Waits for changes in a job.
1795

1796
    @type job_id: string
1797
    @param job_id: Job identifier
1798
    @type fields: list of strings
1799
    @param fields: Which fields to check for changes
1800
    @type prev_job_info: list or None
1801
    @param prev_job_info: Last job information returned
1802
    @type prev_log_serial: int
1803
    @param prev_log_serial: Last job message serial number
1804
    @type timeout: float
1805
    @param timeout: maximum time to wait in seconds
1806
    @rtype: tuple (job info, log entries)
1807
    @return: a tuple of the job information as required via
1808
        the fields parameter, and the log entries as a list
1809

1810
        if the job has not changed and the timeout has expired,
1811
        we instead return a special value,
1812
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1813
        as such by the clients
1814

1815
    """
1816
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1817

    
1818
    helper = _WaitForJobChangesHelper()
1819

    
1820
    return helper(self._GetJobPath(job_id), load_fn,
1821
                  fields, prev_job_info, prev_log_serial, timeout)
1822

    
1823
  @locking.ssynchronized(_LOCK)
1824
  @_RequireOpenQueue
1825
  def CancelJob(self, job_id):
1826
    """Cancels a job.
1827

1828
    This will only succeed if the job has not started yet.
1829

1830
    @type job_id: string
1831
    @param job_id: job ID of job to be cancelled.
1832

1833
    """
1834
    logging.info("Cancelling job %s", job_id)
1835

    
1836
    job = self._LoadJobUnlocked(job_id)
1837
    if not job:
1838
      logging.debug("Job %s not found", job_id)
1839
      return (False, "Job %s not found" % job_id)
1840

    
1841
    (success, msg) = job.Cancel()
1842

    
1843
    if success:
1844
      # If the job was finalized (e.g. cancelled), this is the final write
1845
      # allowed. The job can be archived anytime.
1846
      self.UpdateJobUnlocked(job)
1847

    
1848
    return (success, msg)
1849

    
1850
  @_RequireOpenQueue
1851
  def _ArchiveJobsUnlocked(self, jobs):
1852
    """Archives jobs.
1853

1854
    @type jobs: list of L{_QueuedJob}
1855
    @param jobs: Job objects
1856
    @rtype: int
1857
    @return: Number of archived jobs
1858

1859
    """
1860
    archive_jobs = []
1861
    rename_files = []
1862
    for job in jobs:
1863
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1864
        logging.debug("Job %s is not yet done", job.id)
1865
        continue
1866

    
1867
      archive_jobs.append(job)
1868

    
1869
      old = self._GetJobPath(job.id)
1870
      new = self._GetArchivedJobPath(job.id)
1871
      rename_files.append((old, new))
1872

    
1873
    # TODO: What if 1..n files fail to rename?
1874
    self._RenameFilesUnlocked(rename_files)
1875

    
1876
    logging.debug("Successfully archived job(s) %s",
1877
                  utils.CommaJoin(job.id for job in archive_jobs))
1878

    
1879
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1880
    # the files, we update the cached queue size from the filesystem. When we
1881
    # get around to fix the TODO: above, we can use the number of actually
1882
    # archived jobs to fix this.
1883
    self._UpdateQueueSizeUnlocked()
1884
    return len(archive_jobs)
1885

    
1886
  @locking.ssynchronized(_LOCK)
1887
  @_RequireOpenQueue
1888
  def ArchiveJob(self, job_id):
1889
    """Archives a job.
1890

1891
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1892

1893
    @type job_id: string
1894
    @param job_id: Job ID of job to be archived.
1895
    @rtype: bool
1896
    @return: Whether job was archived
1897

1898
    """
1899
    logging.info("Archiving job %s", job_id)
1900

    
1901
    job = self._LoadJobUnlocked(job_id)
1902
    if not job:
1903
      logging.debug("Job %s not found", job_id)
1904
      return False
1905

    
1906
    return self._ArchiveJobsUnlocked([job]) == 1
1907

    
1908
  @locking.ssynchronized(_LOCK)
1909
  @_RequireOpenQueue
1910
  def AutoArchiveJobs(self, age, timeout):
1911
    """Archives all jobs based on age.
1912

1913
    The method will archive all jobs which are older than the age
1914
    parameter. For jobs that don't have an end timestamp, the start
1915
    timestamp will be considered. The special '-1' age will cause
1916
    archival of all jobs (that are not running or queued).
1917

1918
    @type age: int
1919
    @param age: the minimum age in seconds
1920

1921
    """
1922
    logging.info("Archiving jobs with age more than %s seconds", age)
1923

    
1924
    now = time.time()
1925
    end_time = now + timeout
1926
    archived_count = 0
1927
    last_touched = 0
1928

    
1929
    all_job_ids = self._GetJobIDsUnlocked()
1930
    pending = []
1931
    for idx, job_id in enumerate(all_job_ids):
1932
      last_touched = idx + 1
1933

    
1934
      # Not optimal because jobs could be pending
1935
      # TODO: Measure average duration for job archival and take number of
1936
      # pending jobs into account.
1937
      if time.time() > end_time:
1938
        break
1939

    
1940
      # Returns None if the job failed to load
1941
      job = self._LoadJobUnlocked(job_id)
1942
      if job:
1943
        if job.end_timestamp is None:
1944
          if job.start_timestamp is None:
1945
            job_age = job.received_timestamp
1946
          else:
1947
            job_age = job.start_timestamp
1948
        else:
1949
          job_age = job.end_timestamp
1950

    
1951
        if age == -1 or now - job_age[0] > age:
1952
          pending.append(job)
1953

    
1954
          # Archive 10 jobs at a time
1955
          if len(pending) >= 10:
1956
            archived_count += self._ArchiveJobsUnlocked(pending)
1957
            pending = []
1958

    
1959
    if pending:
1960
      archived_count += self._ArchiveJobsUnlocked(pending)
1961

    
1962
    return (archived_count, len(all_job_ids) - last_touched)
1963

    
1964
  def QueryJobs(self, job_ids, fields):
1965
    """Returns a list of jobs in queue.
1966

1967
    @type job_ids: list
1968
    @param job_ids: sequence of job identifiers or None for all
1969
    @type fields: list
1970
    @param fields: names of fields to return
1971
    @rtype: list
1972
    @return: list one element per job, each element being list with
1973
        the requested fields
1974

1975
    """
1976
    jobs = []
1977
    list_all = False
1978
    if not job_ids:
1979
      # Since files are added to/removed from the queue atomically, there's no
1980
      # risk of getting the job ids in an inconsistent state.
1981
      job_ids = self._GetJobIDsUnlocked()
1982
      list_all = True
1983

    
1984
    for job_id in job_ids:
1985
      job = self.SafeLoadJobFromDisk(job_id)
1986
      if job is not None:
1987
        jobs.append(job.GetInfo(fields))
1988
      elif not list_all:
1989
        jobs.append(None)
1990

    
1991
    return jobs
1992

    
1993
  @locking.ssynchronized(_LOCK)
1994
  @_RequireOpenQueue
1995
  def Shutdown(self):
1996
    """Stops the job queue.
1997

1998
    This shutdowns all the worker threads an closes the queue.
1999

2000
    """
2001
    self._wpool.TerminateWorkers()
2002

    
2003
    self._queue_filelock.Close()
2004
    self._queue_filelock = None