Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ db2203e0

History | View | Annotate | Download (23.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import sys
32
import logging
33
import random
34
import time
35
import itertools
36
import traceback
37

    
38
from ganeti import opcodes
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import cmdlib
42
from ganeti import locking
43
from ganeti import utils
44
from ganeti import compat
45
from ganeti import pathutils
46

    
47

    
48
_OP_PREFIX = "Op"
49
_LU_PREFIX = "LU"
50

    
51

    
52
class LockAcquireTimeout(Exception):
53
  """Exception to report timeouts on acquiring locks.
54

55
  """
56

    
57

    
58
def _CalculateLockAttemptTimeouts():
59
  """Calculate timeouts for lock attempts.
60

61
  """
62
  result = [constants.LOCK_ATTEMPTS_MINWAIT]
63
  running_sum = result[0]
64

    
65
  # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
66
  # blocking acquire
67
  while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
68
    timeout = (result[-1] * 1.05) ** 1.25
69

    
70
    # Cap max timeout. This gives other jobs a chance to run even if
71
    # we're still trying to get our locks, before finally moving to a
72
    # blocking acquire.
73
    timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
74
    # And also cap the lower boundary for safety
75
    timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
76

    
77
    result.append(timeout)
78
    running_sum += timeout
79

    
80
  return result
81

    
82

    
83
class LockAttemptTimeoutStrategy(object):
84
  """Class with lock acquire timeout strategy.
85

86
  """
87
  __slots__ = [
88
    "_timeouts",
89
    "_random_fn",
90
    "_time_fn",
91
    ]
92

    
93
  _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
94

    
95
  def __init__(self, _time_fn=time.time, _random_fn=random.random):
96
    """Initializes this class.
97

98
    @param _time_fn: Time function for unittests
99
    @param _random_fn: Random number generator for unittests
100

101
    """
102
    object.__init__(self)
103

    
104
    self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
105
    self._time_fn = _time_fn
106
    self._random_fn = _random_fn
107

    
108
  def NextAttempt(self):
109
    """Returns the timeout for the next attempt.
110

111
    """
112
    try:
113
      timeout = self._timeouts.next()
114
    except StopIteration:
115
      # No more timeouts, do blocking acquire
116
      timeout = None
117

    
118
    if timeout is not None:
119
      # Add a small variation (-/+ 5%) to timeout. This helps in situations
120
      # where two or more jobs are fighting for the same lock(s).
121
      variation_range = timeout * 0.1
122
      timeout += ((self._random_fn() * variation_range) -
123
                  (variation_range * 0.5))
124

    
125
    return timeout
126

    
127

    
128
class OpExecCbBase: # pylint: disable=W0232
129
  """Base class for OpCode execution callbacks.
130

131
  """
132
  def NotifyStart(self):
133
    """Called when we are about to execute the LU.
134

135
    This function is called when we're about to start the lu's Exec() method,
136
    that is, after we have acquired all locks.
137

138
    """
139

    
140
  def Feedback(self, *args):
141
    """Sends feedback from the LU code to the end-user.
142

143
    """
144

    
145
  def CurrentPriority(self): # pylint: disable=R0201
146
    """Returns current priority or C{None}.
147

148
    """
149
    return None
150

    
151
  def SubmitManyJobs(self, jobs):
152
    """Submits jobs for processing.
153

154
    See L{jqueue.JobQueue.SubmitManyJobs}.
155

156
    """
157
    raise NotImplementedError
158

    
159

    
160
def _LUNameForOpName(opname):
161
  """Computes the LU name for a given OpCode name.
162

163
  """
164
  assert opname.startswith(_OP_PREFIX), \
165
      "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
166

    
167
  return _LU_PREFIX + opname[len(_OP_PREFIX):]
168

    
169

    
170
def _ComputeDispatchTable():
171
  """Computes the opcode-to-lu dispatch table.
172

173
  """
174
  return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
175
              for op in opcodes.OP_MAPPING.values()
176
              if op.WITH_LU)
177

    
178

    
179
def _SetBaseOpParams(src, defcomment, dst):
180
  """Copies basic opcode parameters.
181

182
  @type src: L{opcodes.OpCode}
183
  @param src: Source opcode
184
  @type defcomment: string
185
  @param defcomment: Comment to specify if not already given
186
  @type dst: L{opcodes.OpCode}
187
  @param dst: Destination opcode
188

189
  """
190
  if hasattr(src, "debug_level"):
191
    dst.debug_level = src.debug_level
192

    
193
  if (getattr(dst, "priority", None) is None and
194
      hasattr(src, "priority")):
195
    dst.priority = src.priority
196

    
197
  if not getattr(dst, opcodes.COMMENT_ATTR, None):
198
    dst.comment = defcomment
199

    
200

    
201
def _ProcessResult(submit_fn, op, result):
202
  """Examines opcode result.
203

204
  If necessary, additional processing on the result is done.
205

206
  """
207
  if isinstance(result, cmdlib.ResultWithJobs):
208
    # Copy basic parameters (e.g. priority)
209
    map(compat.partial(_SetBaseOpParams, op,
210
                       "Submitted by %s" % op.OP_ID),
211
        itertools.chain(*result.jobs))
212

    
213
    # Submit jobs
214
    job_submission = submit_fn(result.jobs)
215

    
216
    # Build dictionary
217
    result = result.other
218

    
219
    assert constants.JOB_IDS_KEY not in result, \
220
      "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
221

    
222
    result[constants.JOB_IDS_KEY] = job_submission
223

    
224
  return result
225

    
226

    
227
def _FailingSubmitManyJobs(_):
228
  """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
229

230
  """
231
  raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
232
                               " queries) can not submit jobs")
233

    
234

    
235
def _RpcResultsToHooksResults(rpc_results):
236
  """Function to convert RPC results to the format expected by HooksMaster.
237

238
  @type rpc_results: dict(node: L{rpc.RpcResult})
239
  @param rpc_results: RPC results
240
  @rtype: dict(node: (fail_msg, offline, hooks_results))
241
  @return: RPC results unpacked according to the format expected by
242
    L({mcpu.HooksMaster}
243

244
  """
245
  return dict((node, (rpc_res.fail_msg, rpc_res.offline, rpc_res.payload))
246
              for (node, rpc_res) in rpc_results.items())
247

    
248

    
249
class Processor(object):
250
  """Object which runs OpCodes"""
251
  DISPATCH_TABLE = _ComputeDispatchTable()
252

    
253
  def __init__(self, context, ec_id, enable_locks=True):
254
    """Constructor for Processor
255

256
    @type context: GanetiContext
257
    @param context: global Ganeti context
258
    @type ec_id: string
259
    @param ec_id: execution context identifier
260

261
    """
262
    self.context = context
263
    self._ec_id = ec_id
264
    self._cbs = None
265
    self.rpc = context.rpc
266
    self.hmclass = HooksMaster
267
    self._enable_locks = enable_locks
268

    
269
  def _CheckLocksEnabled(self):
270
    """Checks if locking is enabled.
271

272
    @raise errors.ProgrammerError: In case locking is not enabled
273

274
    """
275
    if not self._enable_locks:
276
      raise errors.ProgrammerError("Attempted to use disabled locks")
277

    
278
  def _AcquireLocks(self, level, names, shared, timeout):
279
    """Acquires locks via the Ganeti lock manager.
280

281
    @type level: int
282
    @param level: Lock level
283
    @type names: list or string
284
    @param names: Lock names
285
    @type shared: bool
286
    @param shared: Whether the locks should be acquired in shared mode
287
    @type timeout: None or float
288
    @param timeout: Timeout for acquiring the locks
289
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
290
        amount of time
291

292
    """
293
    self._CheckLocksEnabled()
294

    
295
    if self._cbs:
296
      priority = self._cbs.CurrentPriority()
297
    else:
298
      priority = None
299

    
300
    acquired = self.context.glm.acquire(level, names, shared=shared,
301
                                        timeout=timeout, priority=priority)
302

    
303
    if acquired is None:
304
      raise LockAcquireTimeout()
305

    
306
    return acquired
307

    
308
  def _ExecLU(self, lu):
309
    """Logical Unit execution sequence.
310

311
    """
312
    write_count = self.context.cfg.write_count
313
    lu.CheckPrereq()
314

    
315
    hm = self.BuildHooksManager(lu)
316
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
317
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
318
                     self.Log, None)
319

    
320
    if getattr(lu.op, "dry_run", False):
321
      # in this mode, no post-hooks are run, and the config is not
322
      # written (as it might have been modified by another LU, and we
323
      # shouldn't do writeout on behalf of other threads
324
      self.LogInfo("dry-run mode requested, not actually executing"
325
                   " the operation")
326
      return lu.dry_run_result
327

    
328
    if self._cbs:
329
      submit_mj_fn = self._cbs.SubmitManyJobs
330
    else:
331
      submit_mj_fn = _FailingSubmitManyJobs
332

    
333
    try:
334
      result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
335
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
336
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
337
                                self.Log, result)
338
    finally:
339
      # FIXME: This needs locks if not lu_class.REQ_BGL
340
      if write_count != self.context.cfg.write_count:
341
        hm.RunConfigUpdate()
342

    
343
    return result
344

    
345
  def BuildHooksManager(self, lu):
346
    return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
347

    
348
  def _LockAndExecLU(self, lu, level, calc_timeout):
349
    """Execute a Logical Unit, with the needed locks.
350

351
    This is a recursive function that starts locking the given level, and
352
    proceeds up, till there are no more locks to acquire. Then it executes the
353
    given LU and its opcodes.
354

355
    """
356
    adding_locks = level in lu.add_locks
357
    acquiring_locks = level in lu.needed_locks
358
    if level not in locking.LEVELS:
359
      if self._cbs:
360
        self._cbs.NotifyStart()
361

    
362
      try:
363
        result = self._ExecLU(lu)
364
      except AssertionError, err:
365
        # this is a bit ugly, as we don't know from which phase
366
        # (prereq, exec) this comes; but it's better than an exception
367
        # with no information
368
        (_, _, tb) = sys.exc_info()
369
        err_info = traceback.format_tb(tb)
370
        del tb
371
        logging.exception("Detected AssertionError")
372
        raise errors.OpExecError("Internal assertion error: please report"
373
                                 " this as a bug.\nError message: '%s';"
374
                                 " location:\n%s" % (str(err), err_info[-1]))
375

    
376
    elif adding_locks and acquiring_locks:
377
      # We could both acquire and add locks at the same level, but for now we
378
      # don't need this, so we'll avoid the complicated code needed.
379
      raise NotImplementedError("Can't declare locks to acquire when adding"
380
                                " others")
381

    
382
    elif adding_locks or acquiring_locks:
383
      self._CheckLocksEnabled()
384

    
385
      lu.DeclareLocks(level)
386
      share = lu.share_locks[level]
387

    
388
      try:
389
        assert adding_locks ^ acquiring_locks, \
390
          "Locks must be either added or acquired"
391

    
392
        if acquiring_locks:
393
          # Acquiring locks
394
          needed_locks = lu.needed_locks[level]
395

    
396
          self._AcquireLocks(level, needed_locks, share,
397
                             calc_timeout())
398
        else:
399
          # Adding locks
400
          add_locks = lu.add_locks[level]
401
          lu.remove_locks[level] = add_locks
402

    
403
          try:
404
            self.context.glm.add(level, add_locks, acquired=1, shared=share)
405
          except errors.LockError:
406
            logging.exception("Detected lock error in level %s for locks"
407
                              " %s, shared=%s", level, add_locks, share)
408
            raise errors.OpPrereqError(
409
              "Couldn't add locks (%s), most likely because of another"
410
              " job who added them first" % add_locks,
411
              errors.ECODE_NOTUNIQUE)
412

    
413
        try:
414
          result = self._LockAndExecLU(lu, level + 1, calc_timeout)
415
        finally:
416
          if level in lu.remove_locks:
417
            self.context.glm.remove(level, lu.remove_locks[level])
418
      finally:
419
        if self.context.glm.is_owned(level):
420
          self.context.glm.release(level)
421

    
422
    else:
423
      result = self._LockAndExecLU(lu, level + 1, calc_timeout)
424

    
425
    return result
426

    
427
  def ExecOpCode(self, op, cbs, timeout=None):
428
    """Execute an opcode.
429

430
    @type op: an OpCode instance
431
    @param op: the opcode to be executed
432
    @type cbs: L{OpExecCbBase}
433
    @param cbs: Runtime callbacks
434
    @type timeout: float or None
435
    @param timeout: Maximum time to acquire all locks, None for no timeout
436
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
437
        amount of time
438

439
    """
440
    if not isinstance(op, opcodes.OpCode):
441
      raise errors.ProgrammerError("Non-opcode instance passed"
442
                                   " to ExecOpcode (%s)" % type(op))
443

    
444
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
445
    if lu_class is None:
446
      raise errors.OpCodeUnknown("Unknown opcode")
447

    
448
    if timeout is None:
449
      calc_timeout = lambda: None
450
    else:
451
      calc_timeout = utils.RunningTimeout(timeout, False).Remaining
452

    
453
    self._cbs = cbs
454
    try:
455
      if self._enable_locks:
456
        # Acquire the Big Ganeti Lock exclusively if this LU requires it,
457
        # and in a shared fashion otherwise (to prevent concurrent run with
458
        # an exclusive LU.
459
        self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
460
                            not lu_class.REQ_BGL, calc_timeout())
461
      elif lu_class.REQ_BGL:
462
        raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
463
                                     " disabled" % op.OP_ID)
464

    
465
      try:
466
        lu = lu_class(self, op, self.context, self.rpc)
467
        lu.ExpandNames()
468
        assert lu.needed_locks is not None, "needed_locks not set by LU"
469

    
470
        try:
471
          result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout)
472
        finally:
473
          if self._ec_id:
474
            self.context.cfg.DropECReservations(self._ec_id)
475
      finally:
476
        # Release BGL if owned
477
        if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
478
          assert self._enable_locks
479
          self.context.glm.release(locking.LEVEL_CLUSTER)
480
    finally:
481
      self._cbs = None
482

    
483
    resultcheck_fn = op.OP_RESULT
484
    if not (resultcheck_fn is None or resultcheck_fn(result)):
485
      logging.error("Expected opcode result matching %s, got %s",
486
                    resultcheck_fn, result)
487
      raise errors.OpResultError("Opcode result does not match %s: %s" %
488
                                 (resultcheck_fn, utils.Truncate(result, 80)))
489

    
490
    return result
491

    
492
  def Log(self, *args):
493
    """Forward call to feedback callback function.
494

495
    """
496
    if self._cbs:
497
      self._cbs.Feedback(*args)
498

    
499
  def LogStep(self, current, total, message):
500
    """Log a change in LU execution progress.
501

502
    """
503
    logging.debug("Step %d/%d %s", current, total, message)
504
    self.Log("STEP %d/%d %s" % (current, total, message))
505

    
506
  def LogWarning(self, message, *args, **kwargs):
507
    """Log a warning to the logs and the user.
508

509
    The optional keyword argument is 'hint' and can be used to show a
510
    hint to the user (presumably related to the warning). If the
511
    message is empty, it will not be printed at all, allowing one to
512
    show only a hint.
513

514
    """
515
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
516
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
517
    if args:
518
      message = message % tuple(args)
519
    if message:
520
      logging.warning(message)
521
      self.Log(" - WARNING: %s" % message)
522
    if "hint" in kwargs:
523
      self.Log("      Hint: %s" % kwargs["hint"])
524

    
525
  def LogInfo(self, message, *args):
526
    """Log an informational message to the logs and the user.
527

528
    """
529
    if args:
530
      message = message % tuple(args)
531
    logging.info(message)
532
    self.Log(" - INFO: %s" % message)
533

    
534
  def GetECId(self):
535
    """Returns the current execution context ID.
536

537
    """
538
    if not self._ec_id:
539
      raise errors.ProgrammerError("Tried to use execution context id when"
540
                                   " not set")
541
    return self._ec_id
542

    
543

    
544
class HooksMaster(object):
545
  def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn,
546
               hooks_results_adapt_fn, build_env_fn, log_fn, htype=None,
547
               cluster_name=None, master_name=None):
548
    """Base class for hooks masters.
549

550
    This class invokes the execution of hooks according to the behaviour
551
    specified by its parameters.
552

553
    @type opcode: string
554
    @param opcode: opcode of the operation to which the hooks are tied
555
    @type hooks_path: string
556
    @param hooks_path: prefix of the hooks directories
557
    @type nodes: 2-tuple of lists
558
    @param nodes: 2-tuple of lists containing nodes on which pre-hooks must be
559
      run and nodes on which post-hooks must be run
560
    @type hooks_execution_fn: function that accepts the following parameters:
561
      (node_list, hooks_path, phase, environment)
562
    @param hooks_execution_fn: function that will execute the hooks; can be
563
      None, indicating that no conversion is necessary.
564
    @type hooks_results_adapt_fn: function
565
    @param hooks_results_adapt_fn: function that will adapt the return value of
566
      hooks_execution_fn to the format expected by RunPhase
567
    @type build_env_fn: function that returns a dictionary having strings as
568
      keys
569
    @param build_env_fn: function that builds the environment for the hooks
570
    @type log_fn: function that accepts a string
571
    @param log_fn: logging function
572
    @type htype: string or None
573
    @param htype: None or one of L{constants.HTYPE_CLUSTER},
574
     L{constants.HTYPE_NODE}, L{constants.HTYPE_INSTANCE}
575
    @type cluster_name: string
576
    @param cluster_name: name of the cluster
577
    @type master_name: string
578
    @param master_name: name of the master
579

580
    """
581
    self.opcode = opcode
582
    self.hooks_path = hooks_path
583
    self.hooks_execution_fn = hooks_execution_fn
584
    self.hooks_results_adapt_fn = hooks_results_adapt_fn
585
    self.build_env_fn = build_env_fn
586
    self.log_fn = log_fn
587
    self.htype = htype
588
    self.cluster_name = cluster_name
589
    self.master_name = master_name
590

    
591
    self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
592
    (self.pre_nodes, self.post_nodes) = nodes
593

    
594
  def _BuildEnv(self, phase):
595
    """Compute the environment and the target nodes.
596

597
    Based on the opcode and the current node list, this builds the
598
    environment for the hooks and the target node list for the run.
599

600
    """
601
    if phase == constants.HOOKS_PHASE_PRE:
602
      prefix = "GANETI_"
603
    elif phase == constants.HOOKS_PHASE_POST:
604
      prefix = "GANETI_POST_"
605
    else:
606
      raise AssertionError("Unknown phase '%s'" % phase)
607

    
608
    env = {}
609

    
610
    if self.hooks_path is not None:
611
      phase_env = self.build_env_fn()
612
      if phase_env:
613
        assert not compat.any(key.upper().startswith(prefix)
614
                              for key in phase_env)
615
        env.update(("%s%s" % (prefix, key), value)
616
                   for (key, value) in phase_env.items())
617

    
618
    if phase == constants.HOOKS_PHASE_PRE:
619
      assert compat.all((key.startswith("GANETI_") and
620
                         not key.startswith("GANETI_POST_"))
621
                        for key in env)
622

    
623
    elif phase == constants.HOOKS_PHASE_POST:
624
      assert compat.all(key.startswith("GANETI_POST_") for key in env)
625
      assert isinstance(self.pre_env, dict)
626

    
627
      # Merge with pre-phase environment
628
      assert not compat.any(key.startswith("GANETI_POST_")
629
                            for key in self.pre_env)
630
      env.update(self.pre_env)
631
    else:
632
      raise AssertionError("Unknown phase '%s'" % phase)
633

    
634
    return env
635

    
636
  def _RunWrapper(self, node_list, hpath, phase, phase_env):
637
    """Simple wrapper over self.callfn.
638

639
    This method fixes the environment before executing the hooks.
640

641
    """
642
    env = {
643
      "PATH": constants.HOOKS_PATH,
644
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
645
      "GANETI_OP_CODE": self.opcode,
646
      "GANETI_DATA_DIR": pathutils.DATA_DIR,
647
      "GANETI_HOOKS_PHASE": phase,
648
      "GANETI_HOOKS_PATH": hpath,
649
      }
650

    
651
    if self.htype:
652
      env["GANETI_OBJECT_TYPE"] = self.htype
653

    
654
    if self.cluster_name is not None:
655
      env["GANETI_CLUSTER"] = self.cluster_name
656

    
657
    if self.master_name is not None:
658
      env["GANETI_MASTER"] = self.master_name
659

    
660
    if phase_env:
661
      env = utils.algo.JoinDisjointDicts(env, phase_env)
662

    
663
    # Convert everything to strings
664
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
665

    
666
    assert compat.all(key == "PATH" or key.startswith("GANETI_")
667
                      for key in env)
668

    
669
    return self.hooks_execution_fn(node_list, hpath, phase, env)
670

    
671
  def RunPhase(self, phase, nodes=None):
672
    """Run all the scripts for a phase.
673

674
    This is the main function of the HookMaster.
675
    It executes self.hooks_execution_fn, and after running
676
    self.hooks_results_adapt_fn on its results it expects them to be in the form
677
    {node_name: (fail_msg, [(script, result, output), ...]}).
678

679
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
680
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
681
    @param nodes: overrides the predefined list of nodes for the given phase
682
    @return: the processed results of the hooks multi-node rpc call
683
    @raise errors.HooksFailure: on communication failure to the nodes
684
    @raise errors.HooksAbort: on failure of one of the hooks
685

686
    """
687
    if phase == constants.HOOKS_PHASE_PRE:
688
      if nodes is None:
689
        nodes = self.pre_nodes
690
      env = self.pre_env
691
    elif phase == constants.HOOKS_PHASE_POST:
692
      if nodes is None:
693
        nodes = self.post_nodes
694
      env = self._BuildEnv(phase)
695
    else:
696
      raise AssertionError("Unknown phase '%s'" % phase)
697

    
698
    if not nodes:
699
      # empty node list, we should not attempt to run this as either
700
      # we're in the cluster init phase and the rpc client part can't
701
      # even attempt to run, or this LU doesn't do hooks at all
702
      return
703

    
704
    results = self._RunWrapper(nodes, self.hooks_path, phase, env)
705
    if not results:
706
      msg = "Communication Failure"
707
      if phase == constants.HOOKS_PHASE_PRE:
708
        raise errors.HooksFailure(msg)
709
      else:
710
        self.log_fn(msg)
711
        return results
712

    
713
    converted_res = results
714
    if self.hooks_results_adapt_fn:
715
      converted_res = self.hooks_results_adapt_fn(results)
716

    
717
    errs = []
718
    for node_name, (fail_msg, offline, hooks_results) in converted_res.items():
719
      if offline:
720
        continue
721

    
722
      if fail_msg:
723
        self.log_fn("Communication failure to node %s: %s", node_name, fail_msg)
724
        continue
725

    
726
      for script, hkr, output in hooks_results:
727
        if hkr == constants.HKR_FAIL:
728
          if phase == constants.HOOKS_PHASE_PRE:
729
            errs.append((node_name, script, output))
730
          else:
731
            if not output:
732
              output = "(no output)"
733
            self.log_fn("On %s script %s failed, output: %s" %
734
                        (node_name, script, output))
735

    
736
    if errs and phase == constants.HOOKS_PHASE_PRE:
737
      raise errors.HooksAbort(errs)
738

    
739
    return results
740

    
741
  def RunConfigUpdate(self):
742
    """Run the special configuration update hook
743

744
    This is a special hook that runs only on the master after each
745
    top-level LI if the configuration has been updated.
746

747
    """
748
    phase = constants.HOOKS_PHASE_POST
749
    hpath = constants.HOOKS_NAME_CFGUPDATE
750
    nodes = [self.master_name]
751
    self._RunWrapper(nodes, hpath, phase, self.pre_env)
752

    
753
  @staticmethod
754
  def BuildFromLu(hooks_execution_fn, lu):
755
    if lu.HPATH is None:
756
      nodes = (None, None)
757
    else:
758
      nodes = map(frozenset, lu.BuildHooksNodes())
759

    
760
    master_name = cluster_name = None
761
    if lu.cfg:
762
      master_name = lu.cfg.GetMasterNode()
763
      cluster_name = lu.cfg.GetClusterName()
764

    
765
    return HooksMaster(lu.op.OP_ID, lu.HPATH, nodes, hooks_execution_fn,
766
                       _RpcResultsToHooksResults, lu.BuildHooksEnv,
767
                       lu.LogWarning, lu.HTYPE, cluster_name, master_name)