# Copyright (C) 2006, 2007 Google Inc.
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Module implementing the logic behind the cluster operations
This module implements the logic for doing operations in the cluster. There
are two kinds of classes defined:
  - logical units, which know how to deal with their specific opcode only
  - the processor, which dispatches the opcodes to their logical units
import logging
import random
import time
from ganeti import opcodes
from ganeti import constants
from ganeti import errors
from ganeti import rpc
from ganeti import cmdlib
from ganeti import locking
class _LockAcquireTimeout(Exception):
  """Internal exception to report timeouts on acquiring locks.
46 407339d0 Michael Hanselmann
def _CalculateLockAttemptTimeouts():
  """Calculate timeouts for lock attempts.
53 e3200b18 Michael Hanselmann
54 e3200b18 Michael Hanselmann
  # Wait for a total of at least 150s before doing a blocking acquire
  while sum(result) < 150.0:
    timeout = (result[-1] * 1.05) ** 1.25
    # Cap timeout at 10 seconds. This gives other jobs a chance to run
    # even if we're still trying to get our locks, before finally moving
    # to a blocking acquire.
    if timeout > 10.0:
      timeout = 10.0
    elif timeout < 0.1:
      # Lower boundary for safety
      timeout = 0.1
  return result
class _LockAttemptTimeoutStrategy(object):
  """Class with lock acquire timeout strategy.
77 407339d0 Michael Hanselmann
  __slots__ = [
80 407339d0 Michael Hanselmann
85 407339d0 Michael Hanselmann
  _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
  def __init__(self, attempt=0, _time_fn=time.time, _random_fn=random.random):
    """Initializes this class.
91 e3200b18 Michael Hanselmann
92 e3200b18 Michael Hanselmann
93 e3200b18 Michael Hanselmann
94 407339d0 Michael Hanselmann
96 407339d0 Michael Hanselmann
98 407339d0 Michael Hanselmann
    if attempt < 0:
      raise ValueError("Attempt must be zero or positive")
    self._attempt = attempt
    self._time_fn = _time_fn
    self._random_fn = _random_fn
      timeout = self._TIMEOUT_PER_ATTEMPT[attempt]
    except IndexError:
      # No more timeouts, do blocking acquire
      timeout = None
    self._running_timeout = locking.RunningTimeout(timeout, False,
114 407339d0 Michael Hanselmann
  def NextAttempt(self):
    """Returns the strategy for the next attempt.
118 407339d0 Michael Hanselmann
    return _LockAttemptTimeoutStrategy(attempt=self._attempt + 1,
121 e3200b18 Michael Hanselmann
123 407339d0 Michael Hanselmann
124 407339d0 Michael Hanselmann
125 407339d0 Michael Hanselmann

127 a6db1af2 Michael Hanselmann
128 407339d0 Michael Hanselmann
    if timeout is not None:
      # Add a small variation (-/+ 5%) to timeout. This helps in situations
      # where two or more jobs are fighting for the same lock(s).
      variation_range = timeout * 0.1
      timeout += ((self._random_fn() * variation_range) -
                  (variation_range * 0.5))
136 a6db1af2 Michael Hanselmann
    return timeout
class OpExecCbBase: # pylint: disable-msg=W0232
140 031a3e57 Michael Hanselmann
141 031a3e57 Michael Hanselmann

143 031a3e57 Michael Hanselmann
144 031a3e57 Michael Hanselmann
145 031a3e57 Michael Hanselmann

    This function is called when we're about to start the lu's Exec() method,
    that is, after we have acquired all locks.
149 031a3e57 Michael Hanselmann
151 031a3e57 Michael Hanselmann
152 031a3e57 Michael Hanselmann
153 031a3e57 Michael Hanselmann

155 031a3e57 Michael Hanselmann
  def ReportLocks(self, msg):
    """Report lock operations.
159 ef2df7d3 Michael Hanselmann
161 031a3e57 Michael Hanselmann
class Processor(object):
  """Object which runs OpCodes"""
165 a8083063 Iustin Pop
    # Cluster
    opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
    opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
    opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
    opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
    # node lu
    opcodes.OpAddNode: cmdlib.LUAddNode,
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
    opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
    opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
    opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
    opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
    opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
    opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
    opcodes.OpNodeEvacuationStrategy: cmdlib.LUNodeEvacuationStrategy,
    # instance lu
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
    opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
    opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
    opcodes.OpMoveInstance: cmdlib.LUMoveInstance,
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
    # os lu
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
    # exports lu
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
    opcodes.OpPrepareExport: cmdlib.LUPrepareExport,
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
    # tags lu
    opcodes.OpGetTags: cmdlib.LUGetTags,
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
    opcodes.OpAddTags: cmdlib.LUAddTags,
    opcodes.OpDelTags: cmdlib.LUDelTags,
    # test lu
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
    opcodes.OpTestJobqueue: cmdlib.LUTestJobqueue,
  def __init__(self, context, ec_id):
    """Constructor for Processor
229 adfa97e3 Guido Trotter
230 adfa97e3 Guido Trotter
231 adfa97e3 Guido Trotter
232 adfa97e3 Guido Trotter
233 adfa97e3 Guido Trotter

    self.context = context
    self._ec_id = ec_id
    self._cbs = None
    self.rpc = rpc.RpcRunner(context.cfg)
    self.hmclass = HooksMaster
  def _ReportLocks(self, level, names, shared, timeout, acquired, result):
    """Reports lock operations.
244 ef2df7d3 Michael Hanselmann
245 ef2df7d3 Michael Hanselmann
246 ef2df7d3 Michael Hanselmann
247 ef2df7d3 Michael Hanselmann
248 ef2df7d3 Michael Hanselmann
249 211b6132 Michael Hanselmann
250 211b6132 Michael Hanselmann
251 211b6132 Michael Hanselmann
252 ef2df7d3 Michael Hanselmann
253 211b6132 Michael Hanselmann
254 211b6132 Michael Hanselmann
255 211b6132 Michael Hanselmann
256 ef2df7d3 Michael Hanselmann

    parts = []
260 ef2df7d3 Michael Hanselmann
261 ef2df7d3 Michael Hanselmann
262 211b6132 Michael Hanselmann
263 211b6132 Michael Hanselmann
265 211b6132 Michael Hanselmann
267 ef2df7d3 Michael Hanselmann
      if timeout is None:
270 211b6132 Michael Hanselmann
        parts.append("timeout=%0.6fs" % timeout)
    if names == locking.ALL_SET:
277 ef2df7d3 Michael Hanselmann
278 ef2df7d3 Michael Hanselmann
280 4776e022 Michael Hanselmann
282 ef2df7d3 Michael Hanselmann
283 ef2df7d3 Michael Hanselmann
285 ef2df7d3 Michael Hanselmann
287 ef2df7d3 Michael Hanselmann
288 ef2df7d3 Michael Hanselmann
    logging.debug("LU locks %s", msg)
291 ef2df7d3 Michael Hanselmann
292 ef2df7d3 Michael Hanselmann
294 211b6132 Michael Hanselmann
295 211b6132 Michael Hanselmann
296 211b6132 Michael Hanselmann

    @type level: int
    @param level: Lock level
    @type names: list or string
    @param names: Lock names
    @type shared: bool
    @param shared: Whether the locks should be acquired in shared mode
    @type timeout: None or float
    @param timeout: Timeout for acquiring the locks
306 211b6132 Michael Hanselmann
    self._ReportLocks(level, names, shared, timeout, False, None)
309 211b6132 Michael Hanselmann
310 211b6132 Michael Hanselmann
312 211b6132 Michael Hanselmann
313 211b6132 Michael Hanselmann
    return acquired
  def _ExecLU(self, lu):
    """Logical Unit execution sequence.
319 36c381d7 Guido Trotter
    write_count = self.context.cfg.write_count
322 4b5e8271 Iustin Pop
323 36c381d7 Guido Trotter
324 36c381d7 Guido Trotter
325 7b4c1cb9 Michael Hanselmann
326 20777413 Iustin Pop
    if getattr(lu.op, "dry_run", False):
      # in this mode, no post-hooks are run, and the config is not
      # written (as it might have been modified by another LU, and we
      # shouldn't do writeout on behalf of other threads
      self.LogInfo("dry-run mode requested, not actually executing"
                   " the operation")
      return lu.dry_run_result
336 7b4c1cb9 Michael Hanselmann
337 36c381d7 Guido Trotter
338 36c381d7 Guido Trotter
339 7b4c1cb9 Michael Hanselmann
340 36c381d7 Guido Trotter
      # FIXME: This needs locks if not lu_class.REQ_BGL
      if write_count != self.context.cfg.write_count:
344 36c381d7 Guido Trotter
    return result
  def _LockAndExecLU(self, lu, level, calc_timeout):
    """Execute a Logical Unit, with the needed locks.
350 68adfdb2 Guido Trotter
351 68adfdb2 Guido Trotter
352 68adfdb2 Guido Trotter
353 68adfdb2 Guido Trotter

355 ca2a79e1 Guido Trotter
356 ca2a79e1 Guido Trotter
357 8a2941c4 Guido Trotter
358 031a3e57 Michael Hanselmann
359 031a3e57 Michael Hanselmann
361 8a2941c4 Guido Trotter
362 407339d0 Michael Hanselmann
    elif adding_locks and acquiring_locks:
      # We could both acquire and add locks at the same level, but for now we
      # don't need this, so we'll avoid the complicated code needed.
      raise NotImplementedError("Can't declare locks to acquire when adding"
                                " others")
369 ca2a79e1 Guido Trotter
370 fb8dcb62 Guido Trotter
      share = lu.share_locks[level]
373 68adfdb2 Guido Trotter
        assert adding_locks ^ acquiring_locks, \
          "Locks must be either added or acquired"
377 407339d0 Michael Hanselmann
378 407339d0 Michael Hanselmann
379 407339d0 Michael Hanselmann
380 407339d0 Michael Hanselmann
          acquired = self._AcquireLocks(level, needed_locks, share,
383 407339d0 Michael Hanselmann
          if acquired is None:
            raise _LockAcquireTimeout()
387 407339d0 Michael Hanselmann
          # Adding locks
          add_locks = lu.add_locks[level]
          lu.remove_locks[level] = add_locks
392 407339d0 Michael Hanselmann
            self.context.glm.add(level, add_locks, acquired=1, shared=share)
          except errors.LockError:
            raise errors.OpPrereqError(
              "Couldn't add locks (%s), probably because of a race condition"
              " with another job, who added them first" % add_locks,
399 407339d0 Michael Hanselmann
          acquired = add_locks
402 ca2a79e1 Guido Trotter
          lu.acquired_locks[level] = acquired
405 407339d0 Michael Hanselmann
406 ca2a79e1 Guido Trotter
          if level in lu.remove_locks:
            self.context.glm.remove(level, lu.remove_locks[level])
410 80ee04a4 Guido Trotter
411 68adfdb2 Guido Trotter
413 68adfdb2 Guido Trotter
      result = self._LockAndExecLU(lu, level + 1, calc_timeout)
416 68adfdb2 Guido Trotter
  def ExecOpCode(self, op, cbs):
    """Execute an opcode.
421 e92376d7 Iustin Pop
422 e92376d7 Iustin Pop
423 031a3e57 Michael Hanselmann
424 031a3e57 Michael Hanselmann
425 a8083063 Iustin Pop

427 a8083063 Iustin Pop
428 3ecf6786 Iustin Pop
429 3ecf6786 Iustin Pop
430 a8083063 Iustin Pop
    self._cbs = cbs
433 031a3e57 Michael Hanselmann
434 031a3e57 Michael Hanselmann
435 031a3e57 Michael Hanselmann
436 031a3e57 Michael Hanselmann
      timeout_strategy = _LockAttemptTimeoutStrategy()
439 407339d0 Michael Hanselmann
440 407339d0 Michael Hanselmann
          acquire_timeout = timeout_strategy.CalcRemainingTimeout()
443 211b6132 Michael Hanselmann
444 211b6132 Michael Hanselmann
445 211b6132 Michael Hanselmann
446 211b6132 Michael Hanselmann
447 e3200b18 Michael Hanselmann
448 407339d0 Michael Hanselmann
449 407339d0 Michael Hanselmann
451 407339d0 Michael Hanselmann
452 407339d0 Michael Hanselmann
            assert lu.needed_locks is not None, "needed_locks not set by LU"
455 73064714 Guido Trotter
              return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE,
458 73064714 Guido Trotter
              if self._ec_id:
461 73064714 Guido Trotter
463 407339d0 Michael Hanselmann
465 407339d0 Michael Hanselmann
466 407339d0 Michael Hanselmann
467 407339d0 Michael Hanselmann
469 e3200b18 Michael Hanselmann
470 407339d0 Michael Hanselmann
      self._cbs = None
474 7b4c1cb9 Michael Hanselmann
475 031a3e57 Michael Hanselmann
476 031a3e57 Michael Hanselmann

478 031a3e57 Michael Hanselmann
479 031a3e57 Michael Hanselmann
481 0fbbf897 Iustin Pop
482 0fbbf897 Iustin Pop
483 0fbbf897 Iustin Pop

485 a5eb7789 Iustin Pop
486 7b4c1cb9 Michael Hanselmann
487 0fbbf897 Iustin Pop
  def LogWarning(self, message, *args, **kwargs):
    """Log a warning to the logs and the user.
491 c0088fb9 Iustin Pop
492 c0088fb9 Iustin Pop
493 c0088fb9 Iustin Pop
494 c0088fb9 Iustin Pop
495 0fbbf897 Iustin Pop

497 c0088fb9 Iustin Pop
498 c0088fb9 Iustin Pop
499 c0088fb9 Iustin Pop
500 c0088fb9 Iustin Pop
501 c0088fb9 Iustin Pop
502 c0088fb9 Iustin Pop
      self.Log(" - WARNING: %s" % message)
    if "hint" in kwargs:
      self.Log("      Hint: %s" % kwargs["hint"])
507 c0088fb9 Iustin Pop
508 0fbbf897 Iustin Pop
509 0fbbf897 Iustin Pop

511 c0088fb9 Iustin Pop
512 c0088fb9 Iustin Pop
513 a5eb7789 Iustin Pop
    self.Log(" - INFO: %s" % message)
516 adfa97e3 Guido Trotter
517 adfa97e3 Guido Trotter
518 adfa97e3 Guido Trotter
519 adfa97e3 Guido Trotter
520 adfa97e3 Guido Trotter
class HooksMaster(object):
  """Hooks master.
525 a8083063 Iustin Pop
526 a8083063 Iustin Pop
527 a8083063 Iustin Pop

  In order to remove the direct dependency on the rpc module, the
  constructor needs a function which actually does the remote
  call. This will usually be rpc.call_hooks_runner, but any function
  which behaves the same works.
533 a8083063 Iustin Pop
  def __init__(self, callfn, lu):
    self.callfn = callfn
537 a8083063 Iustin Pop
538 a8083063 Iustin Pop
539 a8083063 Iustin Pop
540 a8083063 Iustin Pop
541 a8083063 Iustin Pop
  def _BuildEnv(self):
    """Compute the environment and the target nodes.
545 a8083063 Iustin Pop
546 a8083063 Iustin Pop
547 a8083063 Iustin Pop

549 a8083063 Iustin Pop
550 a8083063 Iustin Pop
551 a8083063 Iustin Pop
      "GANETI_OP_CODE": self.op.OP_ID,
554 6a4aa7c1 Iustin Pop
555 a8083063 Iustin Pop
557 9a395a76 Iustin Pop
558 9a395a76 Iustin Pop
559 9a395a76 Iustin Pop
560 9a395a76 Iustin Pop
561 9a395a76 Iustin Pop
562 9a395a76 Iustin Pop
      lu_nodes_pre = lu_nodes_post = []
565 4167825b Iustin Pop
566 4167825b Iustin Pop
  def _RunWrapper(self, node_list, hpath, phase):
    """Simple wrapper over self.callfn.
570 4167825b Iustin Pop
571 4167825b Iustin Pop

573 4167825b Iustin Pop
574 4167825b Iustin Pop
575 4167825b Iustin Pop
576 437138c9 Michael Hanselmann
577 437138c9 Michael Hanselmann
578 437138c9 Michael Hanselmann
579 a8083063 Iustin Pop
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
582 4167825b Iustin Pop
583 a8083063 Iustin Pop
  def RunPhase(self, phase, nodes=None):
    """Run all the scripts for a phase.
587 a8083063 Iustin Pop
588 a8083063 Iustin Pop

    @param phase: one of L{constants.HOOKS_PHASE_POST} or
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
    @param nodes: overrides the predefined list of nodes for the given phase
    @return: the processed results of the hooks multi-node rpc call
    @raise errors.HooksFailure: on communication failure to the nodes
    @raise errors.HooksAbort: on failure of one of the hooks
596 a8083063 Iustin Pop
    if not self.node_list[phase] and not nodes:
      # empty node list, we should not attempt to run this as either
      # we're in the cluster init phase and the rpc client part can't
      # even attempt to run, or this LU doesn't do hooks at all
602 4167825b Iustin Pop
603 17e82923 Luca Bigliardi
604 17e82923 Luca Bigliardi
605 17e82923 Luca Bigliardi
      results = self._RunWrapper(self.node_list[phase], hpath, phase)
    errs = []
    if not results:
      msg = "Communication Failure"
      if phase == constants.HOOKS_PHASE_PRE:
        raise errors.HooksFailure(msg)
613 8c4b9364 Luca Bigliardi
        return results
    for node_name in results:
      res = results[node_name]
      if res.offline:
619 3cebe102 Michael Hanselmann
620 8c4b9364 Luca Bigliardi
621 8c4b9364 Luca Bigliardi"Communication failure to node %s: %s",
                           node_name, msg)
624 8c4b9364 Luca Bigliardi
625 8c4b9364 Luca Bigliardi
626 8c4b9364 Luca Bigliardi
627 a8083063 Iustin Pop
628 8c4b9364 Luca Bigliardi
            if not output:
              output = "(no output)"
  "On %s script %s failed, output: %s" %
                               (node_name, script, output))
    if errs and phase == constants.HOOKS_PHASE_PRE:
      raise errors.HooksAbort(errs)
    return results
637 6a4aa7c1 Iustin Pop
638 6a4aa7c1 Iustin Pop
639 6a4aa7c1 Iustin Pop

    This is a special hook that runs only on the master after each
    top-level LI if the configuration has been updated.
643 6a4aa7c1 Iustin Pop
    phase = constants.HOOKS_PHASE_POST
    hpath = constants.HOOKS_NAME_CFGUPDATE
    nodes = []
    self._RunWrapper(nodes, hpath, phase)