Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 686d24f0

History | View | Annotate | Download (24 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import sys
32
import logging
33
import random
34
import time
35
import itertools
36
import traceback
37

    
38
from ganeti import opcodes
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import cmdlib
42
from ganeti import locking
43
from ganeti import utils
44
from ganeti import compat
45
from ganeti import pathutils
46

    
47

    
48
_OP_PREFIX = "Op"
49
_LU_PREFIX = "LU"
50

    
51

    
52
class LockAcquireTimeout(Exception):
53
  """Exception to report timeouts on acquiring locks.
54

55
  """
56

    
57

    
58
def _CalculateLockAttemptTimeouts():
59
  """Calculate timeouts for lock attempts.
60

61
  """
62
  result = [constants.LOCK_ATTEMPTS_MINWAIT]
63
  running_sum = result[0]
64

    
65
  # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
66
  # blocking acquire
67
  while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
68
    timeout = (result[-1] * 1.05) ** 1.25
69

    
70
    # Cap max timeout. This gives other jobs a chance to run even if
71
    # we're still trying to get our locks, before finally moving to a
72
    # blocking acquire.
73
    timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
74
    # And also cap the lower boundary for safety
75
    timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
76

    
77
    result.append(timeout)
78
    running_sum += timeout
79

    
80
  return result
81

    
82

    
83
class LockAttemptTimeoutStrategy(object):
84
  """Class with lock acquire timeout strategy.
85

86
  """
87
  __slots__ = [
88
    "_timeouts",
89
    "_random_fn",
90
    "_time_fn",
91
    ]
92

    
93
  _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
94

    
95
  def __init__(self, _time_fn=time.time, _random_fn=random.random):
96
    """Initializes this class.
97

98
    @param _time_fn: Time function for unittests
99
    @param _random_fn: Random number generator for unittests
100

101
    """
102
    object.__init__(self)
103

    
104
    self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
105
    self._time_fn = _time_fn
106
    self._random_fn = _random_fn
107

    
108
  def NextAttempt(self):
109
    """Returns the timeout for the next attempt.
110

111
    """
112
    try:
113
      timeout = self._timeouts.next()
114
    except StopIteration:
115
      # No more timeouts, do blocking acquire
116
      timeout = None
117

    
118
    if timeout is not None:
119
      # Add a small variation (-/+ 5%) to timeout. This helps in situations
120
      # where two or more jobs are fighting for the same lock(s).
121
      variation_range = timeout * 0.1
122
      timeout += ((self._random_fn() * variation_range) -
123
                  (variation_range * 0.5))
124

    
125
    return timeout
126

    
127

    
128
class OpExecCbBase: # pylint: disable=W0232
129
  """Base class for OpCode execution callbacks.
130

131
  """
132
  def NotifyStart(self):
133
    """Called when we are about to execute the LU.
134

135
    This function is called when we're about to start the lu's Exec() method,
136
    that is, after we have acquired all locks.
137

138
    """
139

    
140
  def Feedback(self, *args):
141
    """Sends feedback from the LU code to the end-user.
142

143
    """
144

    
145
  def CurrentPriority(self): # pylint: disable=R0201
146
    """Returns current priority or C{None}.
147

148
    """
149
    return None
150

    
151
  def SubmitManyJobs(self, jobs):
152
    """Submits jobs for processing.
153

154
    See L{jqueue.JobQueue.SubmitManyJobs}.
155

156
    """
157
    raise NotImplementedError
158

    
159

    
160
def _LUNameForOpName(opname):
161
  """Computes the LU name for a given OpCode name.
162

163
  """
164
  assert opname.startswith(_OP_PREFIX), \
165
      "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
166

    
167
  return _LU_PREFIX + opname[len(_OP_PREFIX):]
168

    
169

    
170
def _ComputeDispatchTable():
171
  """Computes the opcode-to-lu dispatch table.
172

173
  """
174
  return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
175
              for op in opcodes.OP_MAPPING.values()
176
              if op.WITH_LU)
177

    
178

    
179
def _SetBaseOpParams(src, defcomment, dst):
180
  """Copies basic opcode parameters.
181

182
  @type src: L{opcodes.OpCode}
183
  @param src: Source opcode
184
  @type defcomment: string
185
  @param defcomment: Comment to specify if not already given
186
  @type dst: L{opcodes.OpCode}
187
  @param dst: Destination opcode
188

189
  """
190
  if hasattr(src, "debug_level"):
191
    dst.debug_level = src.debug_level
192

    
193
  if (getattr(dst, "priority", None) is None and
194
      hasattr(src, "priority")):
195
    dst.priority = src.priority
196

    
197
  if not getattr(dst, opcodes.COMMENT_ATTR, None):
198
    dst.comment = defcomment
199

    
200

    
201
def _ProcessResult(submit_fn, op, result):
202
  """Examines opcode result.
203

204
  If necessary, additional processing on the result is done.
205

206
  """
207
  if isinstance(result, cmdlib.ResultWithJobs):
208
    # Copy basic parameters (e.g. priority)
209
    map(compat.partial(_SetBaseOpParams, op,
210
                       "Submitted by %s" % op.OP_ID),
211
        itertools.chain(*result.jobs))
212

    
213
    # Submit jobs
214
    job_submission = submit_fn(result.jobs)
215

    
216
    # Build dictionary
217
    result = result.other
218

    
219
    assert constants.JOB_IDS_KEY not in result, \
220
      "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
221

    
222
    result[constants.JOB_IDS_KEY] = job_submission
223

    
224
  return result
225

    
226

    
227
def _FailingSubmitManyJobs(_):
228
  """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
229

230
  """
231
  raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
232
                               " queries) can not submit jobs")
233

    
234

    
235
def _RpcResultsToHooksResults(rpc_results):
236
  """Function to convert RPC results to the format expected by HooksMaster.
237

238
  @type rpc_results: dict(node: L{rpc.RpcResult})
239
  @param rpc_results: RPC results
240
  @rtype: dict(node: (fail_msg, offline, hooks_results))
241
  @return: RPC results unpacked according to the format expected by
242
    L({mcpu.HooksMaster}
243

244
  """
245
  return dict((node, (rpc_res.fail_msg, rpc_res.offline, rpc_res.payload))
246
              for (node, rpc_res) in rpc_results.items())
247

    
248

    
249
class Processor(object):
250
  """Object which runs OpCodes"""
251
  DISPATCH_TABLE = _ComputeDispatchTable()
252

    
253
  def __init__(self, context, ec_id, enable_locks=True):
254
    """Constructor for Processor
255

256
    @type context: GanetiContext
257
    @param context: global Ganeti context
258
    @type ec_id: string
259
    @param ec_id: execution context identifier
260

261
    """
262
    self.context = context
263
    self._ec_id = ec_id
264
    self._cbs = None
265
    self.rpc = context.rpc
266
    self.hmclass = HooksMaster
267
    self._enable_locks = enable_locks
268

    
269
  def _CheckLocksEnabled(self):
270
    """Checks if locking is enabled.
271

272
    @raise errors.ProgrammerError: In case locking is not enabled
273

274
    """
275
    if not self._enable_locks:
276
      raise errors.ProgrammerError("Attempted to use disabled locks")
277

    
278
  def _AcquireLocks(self, level, names, shared, opportunistic, timeout):
279
    """Acquires locks via the Ganeti lock manager.
280

281
    @type level: int
282
    @param level: Lock level
283
    @type names: list or string
284
    @param names: Lock names
285
    @type shared: bool
286
    @param shared: Whether the locks should be acquired in shared mode
287
    @type opportunistic: bool
288
    @param opportunistic: Whether to acquire opportunistically
289
    @type timeout: None or float
290
    @param timeout: Timeout for acquiring the locks
291
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
292
        amount of time
293

294
    """
295
    self._CheckLocksEnabled()
296

    
297
    if self._cbs:
298
      priority = self._cbs.CurrentPriority()
299
    else:
300
      priority = None
301

    
302
    acquired = self.context.glm.acquire(level, names, shared=shared,
303
                                        timeout=timeout, priority=priority,
304
                                        opportunistic=opportunistic)
305

    
306
    if acquired is None:
307
      raise LockAcquireTimeout()
308

    
309
    return acquired
310

    
311
  def _ExecLU(self, lu):
312
    """Logical Unit execution sequence.
313

314
    """
315
    write_count = self.context.cfg.write_count
316
    lu.CheckPrereq()
317

    
318
    hm = self.BuildHooksManager(lu)
319
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
320
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
321
                     self.Log, None)
322

    
323
    if getattr(lu.op, "dry_run", False):
324
      # in this mode, no post-hooks are run, and the config is not
325
      # written (as it might have been modified by another LU, and we
326
      # shouldn't do writeout on behalf of other threads
327
      self.LogInfo("dry-run mode requested, not actually executing"
328
                   " the operation")
329
      return lu.dry_run_result
330

    
331
    if self._cbs:
332
      submit_mj_fn = self._cbs.SubmitManyJobs
333
    else:
334
      submit_mj_fn = _FailingSubmitManyJobs
335

    
336
    try:
337
      result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
338
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
339
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
340
                                self.Log, result)
341
    finally:
342
      # FIXME: This needs locks if not lu_class.REQ_BGL
343
      if write_count != self.context.cfg.write_count:
344
        hm.RunConfigUpdate()
345

    
346
    return result
347

    
348
  def BuildHooksManager(self, lu):
349
    return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
350

    
351
  def _LockAndExecLU(self, lu, level, calc_timeout):
352
    """Execute a Logical Unit, with the needed locks.
353

354
    This is a recursive function that starts locking the given level, and
355
    proceeds up, till there are no more locks to acquire. Then it executes the
356
    given LU and its opcodes.
357

358
    """
359
    adding_locks = level in lu.add_locks
360
    acquiring_locks = level in lu.needed_locks
361
    if level not in locking.LEVELS:
362
      if self._cbs:
363
        self._cbs.NotifyStart()
364

    
365
      try:
366
        result = self._ExecLU(lu)
367
      except AssertionError, err:
368
        # this is a bit ugly, as we don't know from which phase
369
        # (prereq, exec) this comes; but it's better than an exception
370
        # with no information
371
        (_, _, tb) = sys.exc_info()
372
        err_info = traceback.format_tb(tb)
373
        del tb
374
        logging.exception("Detected AssertionError")
375
        raise errors.OpExecError("Internal assertion error: please report"
376
                                 " this as a bug.\nError message: '%s';"
377
                                 " location:\n%s" % (str(err), err_info[-1]))
378

    
379
    elif adding_locks and acquiring_locks:
380
      # We could both acquire and add locks at the same level, but for now we
381
      # don't need this, so we'll avoid the complicated code needed.
382
      raise NotImplementedError("Can't declare locks to acquire when adding"
383
                                " others")
384

    
385
    elif adding_locks or acquiring_locks:
386
      self._CheckLocksEnabled()
387

    
388
      lu.DeclareLocks(level)
389
      share = lu.share_locks[level]
390
      opportunistic = lu.opportunistic_locks[level]
391

    
392
      try:
393
        assert adding_locks ^ acquiring_locks, \
394
          "Locks must be either added or acquired"
395

    
396
        if acquiring_locks:
397
          # Acquiring locks
398
          needed_locks = lu.needed_locks[level]
399

    
400
          self._AcquireLocks(level, needed_locks, share, opportunistic,
401
                             calc_timeout())
402
        else:
403
          # Adding locks
404
          add_locks = lu.add_locks[level]
405
          lu.remove_locks[level] = add_locks
406

    
407
          try:
408
            self.context.glm.add(level, add_locks, acquired=1, shared=share)
409
          except errors.LockError:
410
            logging.exception("Detected lock error in level %s for locks"
411
                              " %s, shared=%s", level, add_locks, share)
412
            raise errors.OpPrereqError(
413
              "Couldn't add locks (%s), most likely because of another"
414
              " job who added them first" % add_locks,
415
              errors.ECODE_NOTUNIQUE)
416

    
417
        try:
418
          result = self._LockAndExecLU(lu, level + 1, calc_timeout)
419
        finally:
420
          if level in lu.remove_locks:
421
            self.context.glm.remove(level, lu.remove_locks[level])
422
      finally:
423
        if self.context.glm.is_owned(level):
424
          self.context.glm.release(level)
425

    
426
    else:
427
      result = self._LockAndExecLU(lu, level + 1, calc_timeout)
428

    
429
    return result
430

    
431
  def ExecOpCode(self, op, cbs, timeout=None):
432
    """Execute an opcode.
433

434
    @type op: an OpCode instance
435
    @param op: the opcode to be executed
436
    @type cbs: L{OpExecCbBase}
437
    @param cbs: Runtime callbacks
438
    @type timeout: float or None
439
    @param timeout: Maximum time to acquire all locks, None for no timeout
440
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
441
        amount of time
442

443
    """
444
    if not isinstance(op, opcodes.OpCode):
445
      raise errors.ProgrammerError("Non-opcode instance passed"
446
                                   " to ExecOpcode (%s)" % type(op))
447

    
448
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
449
    if lu_class is None:
450
      raise errors.OpCodeUnknown("Unknown opcode")
451

    
452
    if timeout is None:
453
      calc_timeout = lambda: None
454
    else:
455
      calc_timeout = utils.RunningTimeout(timeout, False).Remaining
456

    
457
    self._cbs = cbs
458
    try:
459
      if self._enable_locks:
460
        # Acquire the Big Ganeti Lock exclusively if this LU requires it,
461
        # and in a shared fashion otherwise (to prevent concurrent run with
462
        # an exclusive LU.
463
        self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
464
                            not lu_class.REQ_BGL, False, calc_timeout())
465
      elif lu_class.REQ_BGL:
466
        raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
467
                                     " disabled" % op.OP_ID)
468

    
469
      try:
470
        lu = lu_class(self, op, self.context, self.rpc)
471
        lu.ExpandNames()
472
        assert lu.needed_locks is not None, "needed_locks not set by LU"
473

    
474
        try:
475
          result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
476
                                       calc_timeout)
477
        finally:
478
          if self._ec_id:
479
            self.context.cfg.DropECReservations(self._ec_id)
480
      finally:
481
        # Release BGL if owned
482
        if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
483
          assert self._enable_locks
484
          self.context.glm.release(locking.LEVEL_CLUSTER)
485
    finally:
486
      self._cbs = None
487

    
488
    resultcheck_fn = op.OP_RESULT
489
    if not (resultcheck_fn is None or resultcheck_fn(result)):
490
      logging.error("Expected opcode result matching %s, got %s",
491
                    resultcheck_fn, result)
492
      raise errors.OpResultError("Opcode result does not match %s: %s" %
493
                                 (resultcheck_fn, utils.Truncate(result, 80)))
494

    
495
    return result
496

    
497
  def Log(self, *args):
498
    """Forward call to feedback callback function.
499

500
    """
501
    if self._cbs:
502
      self._cbs.Feedback(*args)
503

    
504
  def LogStep(self, current, total, message):
505
    """Log a change in LU execution progress.
506

507
    """
508
    logging.debug("Step %d/%d %s", current, total, message)
509
    self.Log("STEP %d/%d %s" % (current, total, message))
510

    
511
  def LogWarning(self, message, *args, **kwargs):
512
    """Log a warning to the logs and the user.
513

514
    The optional keyword argument is 'hint' and can be used to show a
515
    hint to the user (presumably related to the warning). If the
516
    message is empty, it will not be printed at all, allowing one to
517
    show only a hint.
518

519
    """
520
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
521
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
522
    if args:
523
      message = message % tuple(args)
524
    if message:
525
      logging.warning(message)
526
      self.Log(" - WARNING: %s" % message)
527
    if "hint" in kwargs:
528
      self.Log("      Hint: %s" % kwargs["hint"])
529

    
530
  def LogInfo(self, message, *args):
531
    """Log an informational message to the logs and the user.
532

533
    """
534
    if args:
535
      message = message % tuple(args)
536
    logging.info(message)
537
    self.Log(" - INFO: %s" % message)
538

    
539
  def GetECId(self):
540
    """Returns the current execution context ID.
541

542
    """
543
    if not self._ec_id:
544
      raise errors.ProgrammerError("Tried to use execution context id when"
545
                                   " not set")
546
    return self._ec_id
547

    
548

    
549
class HooksMaster(object):
550
  def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn,
551
               hooks_results_adapt_fn, build_env_fn, log_fn, htype=None,
552
               cluster_name=None, master_name=None):
553
    """Base class for hooks masters.
554

555
    This class invokes the execution of hooks according to the behaviour
556
    specified by its parameters.
557

558
    @type opcode: string
559
    @param opcode: opcode of the operation to which the hooks are tied
560
    @type hooks_path: string
561
    @param hooks_path: prefix of the hooks directories
562
    @type nodes: 2-tuple of lists
563
    @param nodes: 2-tuple of lists containing nodes on which pre-hooks must be
564
      run and nodes on which post-hooks must be run
565
    @type hooks_execution_fn: function that accepts the following parameters:
566
      (node_list, hooks_path, phase, environment)
567
    @param hooks_execution_fn: function that will execute the hooks; can be
568
      None, indicating that no conversion is necessary.
569
    @type hooks_results_adapt_fn: function
570
    @param hooks_results_adapt_fn: function that will adapt the return value of
571
      hooks_execution_fn to the format expected by RunPhase
572
    @type build_env_fn: function that returns a dictionary having strings as
573
      keys
574
    @param build_env_fn: function that builds the environment for the hooks
575
    @type log_fn: function that accepts a string
576
    @param log_fn: logging function
577
    @type htype: string or None
578
    @param htype: None or one of L{constants.HTYPE_CLUSTER},
579
     L{constants.HTYPE_NODE}, L{constants.HTYPE_INSTANCE}
580
    @type cluster_name: string
581
    @param cluster_name: name of the cluster
582
    @type master_name: string
583
    @param master_name: name of the master
584

585
    """
586
    self.opcode = opcode
587
    self.hooks_path = hooks_path
588
    self.hooks_execution_fn = hooks_execution_fn
589
    self.hooks_results_adapt_fn = hooks_results_adapt_fn
590
    self.build_env_fn = build_env_fn
591
    self.log_fn = log_fn
592
    self.htype = htype
593
    self.cluster_name = cluster_name
594
    self.master_name = master_name
595

    
596
    self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
597
    (self.pre_nodes, self.post_nodes) = nodes
598

    
599
  def _BuildEnv(self, phase):
600
    """Compute the environment and the target nodes.
601

602
    Based on the opcode and the current node list, this builds the
603
    environment for the hooks and the target node list for the run.
604

605
    """
606
    if phase == constants.HOOKS_PHASE_PRE:
607
      prefix = "GANETI_"
608
    elif phase == constants.HOOKS_PHASE_POST:
609
      prefix = "GANETI_POST_"
610
    else:
611
      raise AssertionError("Unknown phase '%s'" % phase)
612

    
613
    env = {}
614

    
615
    if self.hooks_path is not None:
616
      phase_env = self.build_env_fn()
617
      if phase_env:
618
        assert not compat.any(key.upper().startswith(prefix)
619
                              for key in phase_env)
620
        env.update(("%s%s" % (prefix, key), value)
621
                   for (key, value) in phase_env.items())
622

    
623
    if phase == constants.HOOKS_PHASE_PRE:
624
      assert compat.all((key.startswith("GANETI_") and
625
                         not key.startswith("GANETI_POST_"))
626
                        for key in env)
627

    
628
    elif phase == constants.HOOKS_PHASE_POST:
629
      assert compat.all(key.startswith("GANETI_POST_") for key in env)
630
      assert isinstance(self.pre_env, dict)
631

    
632
      # Merge with pre-phase environment
633
      assert not compat.any(key.startswith("GANETI_POST_")
634
                            for key in self.pre_env)
635
      env.update(self.pre_env)
636
    else:
637
      raise AssertionError("Unknown phase '%s'" % phase)
638

    
639
    return env
640

    
641
  def _RunWrapper(self, node_list, hpath, phase, phase_env):
642
    """Simple wrapper over self.callfn.
643

644
    This method fixes the environment before executing the hooks.
645

646
    """
647
    env = {
648
      "PATH": constants.HOOKS_PATH,
649
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
650
      "GANETI_OP_CODE": self.opcode,
651
      "GANETI_DATA_DIR": pathutils.DATA_DIR,
652
      "GANETI_HOOKS_PHASE": phase,
653
      "GANETI_HOOKS_PATH": hpath,
654
      }
655

    
656
    if self.htype:
657
      env["GANETI_OBJECT_TYPE"] = self.htype
658

    
659
    if self.cluster_name is not None:
660
      env["GANETI_CLUSTER"] = self.cluster_name
661

    
662
    if self.master_name is not None:
663
      env["GANETI_MASTER"] = self.master_name
664

    
665
    if phase_env:
666
      env = utils.algo.JoinDisjointDicts(env, phase_env)
667

    
668
    # Convert everything to strings
669
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
670

    
671
    assert compat.all(key == "PATH" or key.startswith("GANETI_")
672
                      for key in env)
673

    
674
    return self.hooks_execution_fn(node_list, hpath, phase, env)
675

    
676
  def RunPhase(self, phase, nodes=None):
677
    """Run all the scripts for a phase.
678

679
    This is the main function of the HookMaster.
680
    It executes self.hooks_execution_fn, and after running
681
    self.hooks_results_adapt_fn on its results it expects them to be in the form
682
    {node_name: (fail_msg, [(script, result, output), ...]}).
683

684
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
685
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
686
    @param nodes: overrides the predefined list of nodes for the given phase
687
    @return: the processed results of the hooks multi-node rpc call
688
    @raise errors.HooksFailure: on communication failure to the nodes
689
    @raise errors.HooksAbort: on failure of one of the hooks
690

691
    """
692
    if phase == constants.HOOKS_PHASE_PRE:
693
      if nodes is None:
694
        nodes = self.pre_nodes
695
      env = self.pre_env
696
    elif phase == constants.HOOKS_PHASE_POST:
697
      if nodes is None:
698
        nodes = self.post_nodes
699
      env = self._BuildEnv(phase)
700
    else:
701
      raise AssertionError("Unknown phase '%s'" % phase)
702

    
703
    if not nodes:
704
      # empty node list, we should not attempt to run this as either
705
      # we're in the cluster init phase and the rpc client part can't
706
      # even attempt to run, or this LU doesn't do hooks at all
707
      return
708

    
709
    results = self._RunWrapper(nodes, self.hooks_path, phase, env)
710
    if not results:
711
      msg = "Communication Failure"
712
      if phase == constants.HOOKS_PHASE_PRE:
713
        raise errors.HooksFailure(msg)
714
      else:
715
        self.log_fn(msg)
716
        return results
717

    
718
    converted_res = results
719
    if self.hooks_results_adapt_fn:
720
      converted_res = self.hooks_results_adapt_fn(results)
721

    
722
    errs = []
723
    for node_name, (fail_msg, offline, hooks_results) in converted_res.items():
724
      if offline:
725
        continue
726

    
727
      if fail_msg:
728
        self.log_fn("Communication failure to node %s: %s", node_name, fail_msg)
729
        continue
730

    
731
      for script, hkr, output in hooks_results:
732
        if hkr == constants.HKR_FAIL:
733
          if phase == constants.HOOKS_PHASE_PRE:
734
            errs.append((node_name, script, output))
735
          else:
736
            if not output:
737
              output = "(no output)"
738
            self.log_fn("On %s script %s failed, output: %s" %
739
                        (node_name, script, output))
740

    
741
    if errs and phase == constants.HOOKS_PHASE_PRE:
742
      raise errors.HooksAbort(errs)
743

    
744
    return results
745

    
746
  def RunConfigUpdate(self):
747
    """Run the special configuration update hook
748

749
    This is a special hook that runs only on the master after each
750
    top-level LI if the configuration has been updated.
751

752
    """
753
    phase = constants.HOOKS_PHASE_POST
754
    hpath = constants.HOOKS_NAME_CFGUPDATE
755
    nodes = [self.master_name]
756
    self._RunWrapper(nodes, hpath, phase, self.pre_env)
757

    
758
  @staticmethod
759
  def BuildFromLu(hooks_execution_fn, lu):
760
    if lu.HPATH is None:
761
      nodes = (None, None)
762
    else:
763
      nodes = map(frozenset, lu.BuildHooksNodes())
764

    
765
    master_name = cluster_name = None
766
    if lu.cfg:
767
      master_name = lu.cfg.GetMasterNode()
768
      cluster_name = lu.cfg.GetClusterName()
769

    
770
    return HooksMaster(lu.op.OP_ID, lu.HPATH, nodes, hooks_execution_fn,
771
                       _RpcResultsToHooksResults, lu.BuildHooksEnv,
772
                       lu.LogWarning, lu.HTYPE, cluster_name, master_name)