Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 61e062dd

History | View | Annotate | Download (25.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import sys
32
import logging
33
import random
34
import time
35
import itertools
36
import traceback
37

    
38
from ganeti import opcodes
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import cmdlib
42
from ganeti import locking
43
from ganeti import utils
44
from ganeti import compat
45
from ganeti import pathutils
46

    
47

    
48
_OP_PREFIX = "Op"
49
_LU_PREFIX = "LU"
50

    
51
#: LU classes which don't need to acquire the node allocation lock
52
#: (L{locking.NAL}) when they acquire all node or node resource locks
53
_NODE_ALLOC_WHITELIST = frozenset()
54

    
55
#: LU classes which don't need to acquire the node allocation lock
56
#: (L{locking.NAL}) in the same mode (shared/exclusive) as the node
57
#: or node resource locks
58
_NODE_ALLOC_MODE_WHITELIST = frozenset([
59
  cmdlib.LUBackupExport,
60
  cmdlib.LUBackupRemove,
61
  cmdlib.LUOobCommand,
62
  ])
63

    
64

    
65
class LockAcquireTimeout(Exception):
66
  """Exception to report timeouts on acquiring locks.
67

68
  """
69

    
70

    
71
def _CalculateLockAttemptTimeouts():
72
  """Calculate timeouts for lock attempts.
73

74
  """
75
  result = [constants.LOCK_ATTEMPTS_MINWAIT]
76
  running_sum = result[0]
77

    
78
  # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
79
  # blocking acquire
80
  while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
81
    timeout = (result[-1] * 1.05) ** 1.25
82

    
83
    # Cap max timeout. This gives other jobs a chance to run even if
84
    # we're still trying to get our locks, before finally moving to a
85
    # blocking acquire.
86
    timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
87
    # And also cap the lower boundary for safety
88
    timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
89

    
90
    result.append(timeout)
91
    running_sum += timeout
92

    
93
  return result
94

    
95

    
96
class LockAttemptTimeoutStrategy(object):
97
  """Class with lock acquire timeout strategy.
98

99
  """
100
  __slots__ = [
101
    "_timeouts",
102
    "_random_fn",
103
    "_time_fn",
104
    ]
105

    
106
  _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
107

    
108
  def __init__(self, _time_fn=time.time, _random_fn=random.random):
109
    """Initializes this class.
110

111
    @param _time_fn: Time function for unittests
112
    @param _random_fn: Random number generator for unittests
113

114
    """
115
    object.__init__(self)
116

    
117
    self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
118
    self._time_fn = _time_fn
119
    self._random_fn = _random_fn
120

    
121
  def NextAttempt(self):
122
    """Returns the timeout for the next attempt.
123

124
    """
125
    try:
126
      timeout = self._timeouts.next()
127
    except StopIteration:
128
      # No more timeouts, do blocking acquire
129
      timeout = None
130

    
131
    if timeout is not None:
132
      # Add a small variation (-/+ 5%) to timeout. This helps in situations
133
      # where two or more jobs are fighting for the same lock(s).
134
      variation_range = timeout * 0.1
135
      timeout += ((self._random_fn() * variation_range) -
136
                  (variation_range * 0.5))
137

    
138
    return timeout
139

    
140

    
141
class OpExecCbBase: # pylint: disable=W0232
142
  """Base class for OpCode execution callbacks.
143

144
  """
145
  def NotifyStart(self):
146
    """Called when we are about to execute the LU.
147

148
    This function is called when we're about to start the lu's Exec() method,
149
    that is, after we have acquired all locks.
150

151
    """
152

    
153
  def Feedback(self, *args):
154
    """Sends feedback from the LU code to the end-user.
155

156
    """
157

    
158
  def CurrentPriority(self): # pylint: disable=R0201
159
    """Returns current priority or C{None}.
160

161
    """
162
    return None
163

    
164
  def SubmitManyJobs(self, jobs):
165
    """Submits jobs for processing.
166

167
    See L{jqueue.JobQueue.SubmitManyJobs}.
168

169
    """
170
    raise NotImplementedError
171

    
172

    
173
def _LUNameForOpName(opname):
174
  """Computes the LU name for a given OpCode name.
175

176
  """
177
  assert opname.startswith(_OP_PREFIX), \
178
      "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
179

    
180
  return _LU_PREFIX + opname[len(_OP_PREFIX):]
181

    
182

    
183
def _ComputeDispatchTable():
184
  """Computes the opcode-to-lu dispatch table.
185

186
  """
187
  return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
188
              for op in opcodes.OP_MAPPING.values()
189
              if op.WITH_LU)
190

    
191

    
192
def _SetBaseOpParams(src, defcomment, dst):
193
  """Copies basic opcode parameters.
194

195
  @type src: L{opcodes.OpCode}
196
  @param src: Source opcode
197
  @type defcomment: string
198
  @param defcomment: Comment to specify if not already given
199
  @type dst: L{opcodes.OpCode}
200
  @param dst: Destination opcode
201

202
  """
203
  if hasattr(src, "debug_level"):
204
    dst.debug_level = src.debug_level
205

    
206
  if (getattr(dst, "priority", None) is None and
207
      hasattr(src, "priority")):
208
    dst.priority = src.priority
209

    
210
  if not getattr(dst, opcodes.COMMENT_ATTR, None):
211
    dst.comment = defcomment
212

    
213

    
214
def _ProcessResult(submit_fn, op, result):
215
  """Examines opcode result.
216

217
  If necessary, additional processing on the result is done.
218

219
  """
220
  if isinstance(result, cmdlib.ResultWithJobs):
221
    # Copy basic parameters (e.g. priority)
222
    map(compat.partial(_SetBaseOpParams, op,
223
                       "Submitted by %s" % op.OP_ID),
224
        itertools.chain(*result.jobs))
225

    
226
    # Submit jobs
227
    job_submission = submit_fn(result.jobs)
228

    
229
    # Build dictionary
230
    result = result.other
231

    
232
    assert constants.JOB_IDS_KEY not in result, \
233
      "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
234

    
235
    result[constants.JOB_IDS_KEY] = job_submission
236

    
237
  return result
238

    
239

    
240
def _FailingSubmitManyJobs(_):
241
  """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
242

243
  """
244
  raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
245
                               " queries) can not submit jobs")
246

    
247

    
248
def _RpcResultsToHooksResults(rpc_results):
249
  """Function to convert RPC results to the format expected by HooksMaster.
250

251
  @type rpc_results: dict(node: L{rpc.RpcResult})
252
  @param rpc_results: RPC results
253
  @rtype: dict(node: (fail_msg, offline, hooks_results))
254
  @return: RPC results unpacked according to the format expected by
255
    L({mcpu.HooksMaster}
256

257
  """
258
  return dict((node, (rpc_res.fail_msg, rpc_res.offline, rpc_res.payload))
259
              for (node, rpc_res) in rpc_results.items())
260

    
261

    
262
def _VerifyLocks(lu, glm, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST,
263
                 _nal_whitelist=_NODE_ALLOC_WHITELIST):
264
  """Performs consistency checks on locks acquired by a logical unit.
265

266
  @type lu: L{cmdlib.LogicalUnit}
267
  @param lu: Logical unit instance
268
  @type glm: L{locking.GanetiLockManager}
269
  @param glm: Lock manager
270

271
  """
272
  if not __debug__:
273
    return
274

    
275
  have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL)
276

    
277
  for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
278
    # TODO: Verify using actual lock mode, not using LU variables
279
    if level in lu.needed_locks:
280
      share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC]
281
      share_level = lu.share_locks[level]
282

    
283
      if lu.__class__ in _mode_whitelist:
284
        assert share_node_alloc != share_level, \
285
          "LU is whitelisted to use different modes for node allocation lock"
286
      else:
287
        assert bool(share_node_alloc) == bool(share_level), \
288
          ("Node allocation lock must be acquired using the same mode as nodes"
289
           " and node resources")
290

    
291
      if lu.__class__ in _nal_whitelist:
292
        assert not have_nal, \
293
          "LU is whitelisted for not acquiring the node allocation lock"
294
      elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level):
295
        assert have_nal, \
296
          ("Node allocation lock must be used if an LU acquires all nodes"
297
           " or node resources")
298

    
299

    
300
class Processor(object):
301
  """Object which runs OpCodes"""
302
  DISPATCH_TABLE = _ComputeDispatchTable()
303

    
304
  def __init__(self, context, ec_id, enable_locks=True):
305
    """Constructor for Processor
306

307
    @type context: GanetiContext
308
    @param context: global Ganeti context
309
    @type ec_id: string
310
    @param ec_id: execution context identifier
311

312
    """
313
    self.context = context
314
    self._ec_id = ec_id
315
    self._cbs = None
316
    self.rpc = context.rpc
317
    self.hmclass = HooksMaster
318
    self._enable_locks = enable_locks
319

    
320
  def _CheckLocksEnabled(self):
321
    """Checks if locking is enabled.
322

323
    @raise errors.ProgrammerError: In case locking is not enabled
324

325
    """
326
    if not self._enable_locks:
327
      raise errors.ProgrammerError("Attempted to use disabled locks")
328

    
329
  def _AcquireLocks(self, level, names, shared, opportunistic, timeout):
330
    """Acquires locks via the Ganeti lock manager.
331

332
    @type level: int
333
    @param level: Lock level
334
    @type names: list or string
335
    @param names: Lock names
336
    @type shared: bool
337
    @param shared: Whether the locks should be acquired in shared mode
338
    @type opportunistic: bool
339
    @param opportunistic: Whether to acquire opportunistically
340
    @type timeout: None or float
341
    @param timeout: Timeout for acquiring the locks
342
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
343
        amount of time
344

345
    """
346
    self._CheckLocksEnabled()
347

    
348
    if self._cbs:
349
      priority = self._cbs.CurrentPriority()
350
    else:
351
      priority = None
352

    
353
    acquired = self.context.glm.acquire(level, names, shared=shared,
354
                                        timeout=timeout, priority=priority,
355
                                        opportunistic=opportunistic)
356

    
357
    if acquired is None:
358
      raise LockAcquireTimeout()
359

    
360
    return acquired
361

    
362
  def _ExecLU(self, lu):
363
    """Logical Unit execution sequence.
364

365
    """
366
    write_count = self.context.cfg.write_count
367
    lu.CheckPrereq()
368

    
369
    hm = self.BuildHooksManager(lu)
370
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
371
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
372
                     self.Log, None)
373

    
374
    if getattr(lu.op, "dry_run", False):
375
      # in this mode, no post-hooks are run, and the config is not
376
      # written (as it might have been modified by another LU, and we
377
      # shouldn't do writeout on behalf of other threads
378
      self.LogInfo("dry-run mode requested, not actually executing"
379
                   " the operation")
380
      return lu.dry_run_result
381

    
382
    if self._cbs:
383
      submit_mj_fn = self._cbs.SubmitManyJobs
384
    else:
385
      submit_mj_fn = _FailingSubmitManyJobs
386

    
387
    try:
388
      result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
389
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
390
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
391
                                self.Log, result)
392
    finally:
393
      # FIXME: This needs locks if not lu_class.REQ_BGL
394
      if write_count != self.context.cfg.write_count:
395
        hm.RunConfigUpdate()
396

    
397
    return result
398

    
399
  def BuildHooksManager(self, lu):
400
    return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
401

    
402
  def _LockAndExecLU(self, lu, level, calc_timeout):
403
    """Execute a Logical Unit, with the needed locks.
404

405
    This is a recursive function that starts locking the given level, and
406
    proceeds up, till there are no more locks to acquire. Then it executes the
407
    given LU and its opcodes.
408

409
    """
410
    glm = self.context.glm
411
    adding_locks = level in lu.add_locks
412
    acquiring_locks = level in lu.needed_locks
413

    
414
    if level not in locking.LEVELS:
415
      _VerifyLocks(lu, glm)
416

    
417
      if self._cbs:
418
        self._cbs.NotifyStart()
419

    
420
      try:
421
        result = self._ExecLU(lu)
422
      except AssertionError, err:
423
        # this is a bit ugly, as we don't know from which phase
424
        # (prereq, exec) this comes; but it's better than an exception
425
        # with no information
426
        (_, _, tb) = sys.exc_info()
427
        err_info = traceback.format_tb(tb)
428
        del tb
429
        logging.exception("Detected AssertionError")
430
        raise errors.OpExecError("Internal assertion error: please report"
431
                                 " this as a bug.\nError message: '%s';"
432
                                 " location:\n%s" % (str(err), err_info[-1]))
433

    
434
    elif adding_locks and acquiring_locks:
435
      # We could both acquire and add locks at the same level, but for now we
436
      # don't need this, so we'll avoid the complicated code needed.
437
      raise NotImplementedError("Can't declare locks to acquire when adding"
438
                                " others")
439

    
440
    elif adding_locks or acquiring_locks:
441
      self._CheckLocksEnabled()
442

    
443
      lu.DeclareLocks(level)
444
      share = lu.share_locks[level]
445
      opportunistic = lu.opportunistic_locks[level]
446

    
447
      try:
448
        assert adding_locks ^ acquiring_locks, \
449
          "Locks must be either added or acquired"
450

    
451
        if acquiring_locks:
452
          # Acquiring locks
453
          needed_locks = lu.needed_locks[level]
454

    
455
          self._AcquireLocks(level, needed_locks, share, opportunistic,
456
                             calc_timeout())
457
        else:
458
          # Adding locks
459
          add_locks = lu.add_locks[level]
460
          lu.remove_locks[level] = add_locks
461

    
462
          try:
463
            glm.add(level, add_locks, acquired=1, shared=share)
464
          except errors.LockError:
465
            logging.exception("Detected lock error in level %s for locks"
466
                              " %s, shared=%s", level, add_locks, share)
467
            raise errors.OpPrereqError(
468
              "Couldn't add locks (%s), most likely because of another"
469
              " job who added them first" % add_locks,
470
              errors.ECODE_NOTUNIQUE)
471

    
472
        try:
473
          result = self._LockAndExecLU(lu, level + 1, calc_timeout)
474
        finally:
475
          if level in lu.remove_locks:
476
            glm.remove(level, lu.remove_locks[level])
477
      finally:
478
        if glm.is_owned(level):
479
          glm.release(level)
480

    
481
    else:
482
      result = self._LockAndExecLU(lu, level + 1, calc_timeout)
483

    
484
    return result
485

    
486
  def ExecOpCode(self, op, cbs, timeout=None):
487
    """Execute an opcode.
488

489
    @type op: an OpCode instance
490
    @param op: the opcode to be executed
491
    @type cbs: L{OpExecCbBase}
492
    @param cbs: Runtime callbacks
493
    @type timeout: float or None
494
    @param timeout: Maximum time to acquire all locks, None for no timeout
495
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
496
        amount of time
497

498
    """
499
    if not isinstance(op, opcodes.OpCode):
500
      raise errors.ProgrammerError("Non-opcode instance passed"
501
                                   " to ExecOpcode (%s)" % type(op))
502

    
503
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
504
    if lu_class is None:
505
      raise errors.OpCodeUnknown("Unknown opcode")
506

    
507
    if timeout is None:
508
      calc_timeout = lambda: None
509
    else:
510
      calc_timeout = utils.RunningTimeout(timeout, False).Remaining
511

    
512
    self._cbs = cbs
513
    try:
514
      if self._enable_locks:
515
        # Acquire the Big Ganeti Lock exclusively if this LU requires it,
516
        # and in a shared fashion otherwise (to prevent concurrent run with
517
        # an exclusive LU.
518
        self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
519
                            not lu_class.REQ_BGL, False, calc_timeout())
520
      elif lu_class.REQ_BGL:
521
        raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
522
                                     " disabled" % op.OP_ID)
523

    
524
      try:
525
        lu = lu_class(self, op, self.context, self.rpc)
526
        lu.ExpandNames()
527
        assert lu.needed_locks is not None, "needed_locks not set by LU"
528

    
529
        try:
530
          result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
531
                                       calc_timeout)
532
        finally:
533
          if self._ec_id:
534
            self.context.cfg.DropECReservations(self._ec_id)
535
      finally:
536
        # Release BGL if owned
537
        if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
538
          assert self._enable_locks
539
          self.context.glm.release(locking.LEVEL_CLUSTER)
540
    finally:
541
      self._cbs = None
542

    
543
    resultcheck_fn = op.OP_RESULT
544
    if not (resultcheck_fn is None or resultcheck_fn(result)):
545
      logging.error("Expected opcode result matching %s, got %s",
546
                    resultcheck_fn, result)
547
      raise errors.OpResultError("Opcode result does not match %s: %s" %
548
                                 (resultcheck_fn, utils.Truncate(result, 80)))
549

    
550
    return result
551

    
552
  def Log(self, *args):
553
    """Forward call to feedback callback function.
554

555
    """
556
    if self._cbs:
557
      self._cbs.Feedback(*args)
558

    
559
  def LogStep(self, current, total, message):
560
    """Log a change in LU execution progress.
561

562
    """
563
    logging.debug("Step %d/%d %s", current, total, message)
564
    self.Log("STEP %d/%d %s" % (current, total, message))
565

    
566
  def LogWarning(self, message, *args, **kwargs):
567
    """Log a warning to the logs and the user.
568

569
    The optional keyword argument is 'hint' and can be used to show a
570
    hint to the user (presumably related to the warning). If the
571
    message is empty, it will not be printed at all, allowing one to
572
    show only a hint.
573

574
    """
575
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
576
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
577
    if args:
578
      message = message % tuple(args)
579
    if message:
580
      logging.warning(message)
581
      self.Log(" - WARNING: %s" % message)
582
    if "hint" in kwargs:
583
      self.Log("      Hint: %s" % kwargs["hint"])
584

    
585
  def LogInfo(self, message, *args):
586
    """Log an informational message to the logs and the user.
587

588
    """
589
    if args:
590
      message = message % tuple(args)
591
    logging.info(message)
592
    self.Log(" - INFO: %s" % message)
593

    
594
  def GetECId(self):
595
    """Returns the current execution context ID.
596

597
    """
598
    if not self._ec_id:
599
      raise errors.ProgrammerError("Tried to use execution context id when"
600
                                   " not set")
601
    return self._ec_id
602

    
603

    
604
class HooksMaster(object):
605
  def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn,
606
               hooks_results_adapt_fn, build_env_fn, log_fn, htype=None,
607
               cluster_name=None, master_name=None):
608
    """Base class for hooks masters.
609

610
    This class invokes the execution of hooks according to the behaviour
611
    specified by its parameters.
612

613
    @type opcode: string
614
    @param opcode: opcode of the operation to which the hooks are tied
615
    @type hooks_path: string
616
    @param hooks_path: prefix of the hooks directories
617
    @type nodes: 2-tuple of lists
618
    @param nodes: 2-tuple of lists containing nodes on which pre-hooks must be
619
      run and nodes on which post-hooks must be run
620
    @type hooks_execution_fn: function that accepts the following parameters:
621
      (node_list, hooks_path, phase, environment)
622
    @param hooks_execution_fn: function that will execute the hooks; can be
623
      None, indicating that no conversion is necessary.
624
    @type hooks_results_adapt_fn: function
625
    @param hooks_results_adapt_fn: function that will adapt the return value of
626
      hooks_execution_fn to the format expected by RunPhase
627
    @type build_env_fn: function that returns a dictionary having strings as
628
      keys
629
    @param build_env_fn: function that builds the environment for the hooks
630
    @type log_fn: function that accepts a string
631
    @param log_fn: logging function
632
    @type htype: string or None
633
    @param htype: None or one of L{constants.HTYPE_CLUSTER},
634
     L{constants.HTYPE_NODE}, L{constants.HTYPE_INSTANCE}
635
    @type cluster_name: string
636
    @param cluster_name: name of the cluster
637
    @type master_name: string
638
    @param master_name: name of the master
639

640
    """
641
    self.opcode = opcode
642
    self.hooks_path = hooks_path
643
    self.hooks_execution_fn = hooks_execution_fn
644
    self.hooks_results_adapt_fn = hooks_results_adapt_fn
645
    self.build_env_fn = build_env_fn
646
    self.log_fn = log_fn
647
    self.htype = htype
648
    self.cluster_name = cluster_name
649
    self.master_name = master_name
650

    
651
    self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
652
    (self.pre_nodes, self.post_nodes) = nodes
653

    
654
  def _BuildEnv(self, phase):
655
    """Compute the environment and the target nodes.
656

657
    Based on the opcode and the current node list, this builds the
658
    environment for the hooks and the target node list for the run.
659

660
    """
661
    if phase == constants.HOOKS_PHASE_PRE:
662
      prefix = "GANETI_"
663
    elif phase == constants.HOOKS_PHASE_POST:
664
      prefix = "GANETI_POST_"
665
    else:
666
      raise AssertionError("Unknown phase '%s'" % phase)
667

    
668
    env = {}
669

    
670
    if self.hooks_path is not None:
671
      phase_env = self.build_env_fn()
672
      if phase_env:
673
        assert not compat.any(key.upper().startswith(prefix)
674
                              for key in phase_env)
675
        env.update(("%s%s" % (prefix, key), value)
676
                   for (key, value) in phase_env.items())
677

    
678
    if phase == constants.HOOKS_PHASE_PRE:
679
      assert compat.all((key.startswith("GANETI_") and
680
                         not key.startswith("GANETI_POST_"))
681
                        for key in env)
682

    
683
    elif phase == constants.HOOKS_PHASE_POST:
684
      assert compat.all(key.startswith("GANETI_POST_") for key in env)
685
      assert isinstance(self.pre_env, dict)
686

    
687
      # Merge with pre-phase environment
688
      assert not compat.any(key.startswith("GANETI_POST_")
689
                            for key in self.pre_env)
690
      env.update(self.pre_env)
691
    else:
692
      raise AssertionError("Unknown phase '%s'" % phase)
693

    
694
    return env
695

    
696
  def _RunWrapper(self, node_list, hpath, phase, phase_env):
697
    """Simple wrapper over self.callfn.
698

699
    This method fixes the environment before executing the hooks.
700

701
    """
702
    env = {
703
      "PATH": constants.HOOKS_PATH,
704
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
705
      "GANETI_OP_CODE": self.opcode,
706
      "GANETI_DATA_DIR": pathutils.DATA_DIR,
707
      "GANETI_HOOKS_PHASE": phase,
708
      "GANETI_HOOKS_PATH": hpath,
709
      }
710

    
711
    if self.htype:
712
      env["GANETI_OBJECT_TYPE"] = self.htype
713

    
714
    if self.cluster_name is not None:
715
      env["GANETI_CLUSTER"] = self.cluster_name
716

    
717
    if self.master_name is not None:
718
      env["GANETI_MASTER"] = self.master_name
719

    
720
    if phase_env:
721
      env = utils.algo.JoinDisjointDicts(env, phase_env)
722

    
723
    # Convert everything to strings
724
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
725

    
726
    assert compat.all(key == "PATH" or key.startswith("GANETI_")
727
                      for key in env)
728

    
729
    return self.hooks_execution_fn(node_list, hpath, phase, env)
730

    
731
  def RunPhase(self, phase, nodes=None):
732
    """Run all the scripts for a phase.
733

734
    This is the main function of the HookMaster.
735
    It executes self.hooks_execution_fn, and after running
736
    self.hooks_results_adapt_fn on its results it expects them to be in the form
737
    {node_name: (fail_msg, [(script, result, output), ...]}).
738

739
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
740
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
741
    @param nodes: overrides the predefined list of nodes for the given phase
742
    @return: the processed results of the hooks multi-node rpc call
743
    @raise errors.HooksFailure: on communication failure to the nodes
744
    @raise errors.HooksAbort: on failure of one of the hooks
745

746
    """
747
    if phase == constants.HOOKS_PHASE_PRE:
748
      if nodes is None:
749
        nodes = self.pre_nodes
750
      env = self.pre_env
751
    elif phase == constants.HOOKS_PHASE_POST:
752
      if nodes is None:
753
        nodes = self.post_nodes
754
      env = self._BuildEnv(phase)
755
    else:
756
      raise AssertionError("Unknown phase '%s'" % phase)
757

    
758
    if not nodes:
759
      # empty node list, we should not attempt to run this as either
760
      # we're in the cluster init phase and the rpc client part can't
761
      # even attempt to run, or this LU doesn't do hooks at all
762
      return
763

    
764
    results = self._RunWrapper(nodes, self.hooks_path, phase, env)
765
    if not results:
766
      msg = "Communication Failure"
767
      if phase == constants.HOOKS_PHASE_PRE:
768
        raise errors.HooksFailure(msg)
769
      else:
770
        self.log_fn(msg)
771
        return results
772

    
773
    converted_res = results
774
    if self.hooks_results_adapt_fn:
775
      converted_res = self.hooks_results_adapt_fn(results)
776

    
777
    errs = []
778
    for node_name, (fail_msg, offline, hooks_results) in converted_res.items():
779
      if offline:
780
        continue
781

    
782
      if fail_msg:
783
        self.log_fn("Communication failure to node %s: %s", node_name, fail_msg)
784
        continue
785

    
786
      for script, hkr, output in hooks_results:
787
        if hkr == constants.HKR_FAIL:
788
          if phase == constants.HOOKS_PHASE_PRE:
789
            errs.append((node_name, script, output))
790
          else:
791
            if not output:
792
              output = "(no output)"
793
            self.log_fn("On %s script %s failed, output: %s" %
794
                        (node_name, script, output))
795

    
796
    if errs and phase == constants.HOOKS_PHASE_PRE:
797
      raise errors.HooksAbort(errs)
798

    
799
    return results
800

    
801
  def RunConfigUpdate(self):
802
    """Run the special configuration update hook
803

804
    This is a special hook that runs only on the master after each
805
    top-level LI if the configuration has been updated.
806

807
    """
808
    phase = constants.HOOKS_PHASE_POST
809
    hpath = constants.HOOKS_NAME_CFGUPDATE
810
    nodes = [self.master_name]
811
    self._RunWrapper(nodes, hpath, phase, self.pre_env)
812

    
813
  @staticmethod
814
  def BuildFromLu(hooks_execution_fn, lu):
815
    if lu.HPATH is None:
816
      nodes = (None, None)
817
    else:
818
      nodes = map(frozenset, lu.BuildHooksNodes())
819

    
820
    master_name = cluster_name = None
821
    if lu.cfg:
822
      master_name = lu.cfg.GetMasterNode()
823
      cluster_name = lu.cfg.GetClusterName()
824

    
825
    return HooksMaster(lu.op.OP_ID, lu.HPATH, nodes, hooks_execution_fn,
826
                       _RpcResultsToHooksResults, lu.BuildHooksEnv,
827
                       lu.LogWarning, lu.HTYPE, cluster_name, master_name)