Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ b8291e00

History | View | Annotate | Download (23.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import logging
32
import random
33
import time
34
import itertools
35

    
36
from ganeti import opcodes
37
from ganeti import constants
38
from ganeti import errors
39
from ganeti import cmdlib
40
from ganeti import locking
41
from ganeti import utils
42
from ganeti import compat
43

    
44

    
45
_OP_PREFIX = "Op"
46
_LU_PREFIX = "LU"
47

    
48

    
49
class LockAcquireTimeout(Exception):
50
  """Exception to report timeouts on acquiring locks.
51

52
  """
53

    
54

    
55
def _CalculateLockAttemptTimeouts():
56
  """Calculate timeouts for lock attempts.
57

58
  """
59
  result = [constants.LOCK_ATTEMPTS_MINWAIT]
60
  running_sum = result[0]
61

    
62
  # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
63
  # blocking acquire
64
  while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
65
    timeout = (result[-1] * 1.05) ** 1.25
66

    
67
    # Cap max timeout. This gives other jobs a chance to run even if
68
    # we're still trying to get our locks, before finally moving to a
69
    # blocking acquire.
70
    timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
71
    # And also cap the lower boundary for safety
72
    timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
73

    
74
    result.append(timeout)
75
    running_sum += timeout
76

    
77
  return result
78

    
79

    
80
class LockAttemptTimeoutStrategy(object):
81
  """Class with lock acquire timeout strategy.
82

83
  """
84
  __slots__ = [
85
    "_timeouts",
86
    "_random_fn",
87
    "_time_fn",
88
    ]
89

    
90
  _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
91

    
92
  def __init__(self, _time_fn=time.time, _random_fn=random.random):
93
    """Initializes this class.
94

95
    @param _time_fn: Time function for unittests
96
    @param _random_fn: Random number generator for unittests
97

98
    """
99
    object.__init__(self)
100

    
101
    self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
102
    self._time_fn = _time_fn
103
    self._random_fn = _random_fn
104

    
105
  def NextAttempt(self):
106
    """Returns the timeout for the next attempt.
107

108
    """
109
    try:
110
      timeout = self._timeouts.next()
111
    except StopIteration:
112
      # No more timeouts, do blocking acquire
113
      timeout = None
114

    
115
    if timeout is not None:
116
      # Add a small variation (-/+ 5%) to timeout. This helps in situations
117
      # where two or more jobs are fighting for the same lock(s).
118
      variation_range = timeout * 0.1
119
      timeout += ((self._random_fn() * variation_range) -
120
                  (variation_range * 0.5))
121

    
122
    return timeout
123

    
124

    
125
class OpExecCbBase: # pylint: disable=W0232
126
  """Base class for OpCode execution callbacks.
127

128
  """
129
  def NotifyStart(self):
130
    """Called when we are about to execute the LU.
131

132
    This function is called when we're about to start the lu's Exec() method,
133
    that is, after we have acquired all locks.
134

135
    """
136

    
137
  def Feedback(self, *args):
138
    """Sends feedback from the LU code to the end-user.
139

140
    """
141

    
142
  def CheckCancel(self):
143
    """Check whether job has been cancelled.
144

145
    """
146

    
147
  def SubmitManyJobs(self, jobs):
148
    """Submits jobs for processing.
149

150
    See L{jqueue.JobQueue.SubmitManyJobs}.
151

152
    """
153
    raise NotImplementedError
154

    
155

    
156
def _LUNameForOpName(opname):
157
  """Computes the LU name for a given OpCode name.
158

159
  """
160
  assert opname.startswith(_OP_PREFIX), \
161
      "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
162

    
163
  return _LU_PREFIX + opname[len(_OP_PREFIX):]
164

    
165

    
166
def _ComputeDispatchTable():
167
  """Computes the opcode-to-lu dispatch table.
168

169
  """
170
  return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
171
              for op in opcodes.OP_MAPPING.values()
172
              if op.WITH_LU)
173

    
174

    
175
def _SetBaseOpParams(src, defcomment, dst):
176
  """Copies basic opcode parameters.
177

178
  @type src: L{opcodes.OpCode}
179
  @param src: Source opcode
180
  @type defcomment: string
181
  @param defcomment: Comment to specify if not already given
182
  @type dst: L{opcodes.OpCode}
183
  @param dst: Destination opcode
184

185
  """
186
  if hasattr(src, "debug_level"):
187
    dst.debug_level = src.debug_level
188

    
189
  if (getattr(dst, "priority", None) is None and
190
      hasattr(src, "priority")):
191
    dst.priority = src.priority
192

    
193
  if not getattr(dst, opcodes.COMMENT_ATTR, None):
194
    dst.comment = defcomment
195

    
196

    
197
def _ProcessResult(submit_fn, op, result):
198
  """Examines opcode result.
199

200
  If necessary, additional processing on the result is done.
201

202
  """
203
  if isinstance(result, cmdlib.ResultWithJobs):
204
    # Copy basic parameters (e.g. priority)
205
    map(compat.partial(_SetBaseOpParams, op,
206
                       "Submitted by %s" % op.OP_ID),
207
        itertools.chain(*result.jobs))
208

    
209
    # Submit jobs
210
    job_submission = submit_fn(result.jobs)
211

    
212
    # Build dictionary
213
    result = result.other
214

    
215
    assert constants.JOB_IDS_KEY not in result, \
216
      "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
217

    
218
    result[constants.JOB_IDS_KEY] = job_submission
219

    
220
  return result
221

    
222

    
223
def _FailingSubmitManyJobs(_):
224
  """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
225

226
  """
227
  raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
228
                               " queries) can not submit jobs")
229

    
230

    
231
def _RpcResultsToHooksResults(rpc_results):
232
  """Function to convert RPC results to the format expected by HooksMaster.
233

234
  @type rpc_results: dict(node: L{rpc.RpcResult})
235
  @param rpc_results: RPC results
236
  @rtype: dict(node: (fail_msg, offline, hooks_results))
237
  @return: RPC results unpacked according to the format expected by
238
    L({mcpu.HooksMaster}
239

240
  """
241
  return dict((node, (rpc_res.fail_msg, rpc_res.offline, rpc_res.payload))
242
              for (node, rpc_res) in rpc_results.items())
243

    
244

    
245
class Processor(object):
246
  """Object which runs OpCodes"""
247
  DISPATCH_TABLE = _ComputeDispatchTable()
248

    
249
  def __init__(self, context, ec_id, enable_locks=True):
250
    """Constructor for Processor
251

252
    @type context: GanetiContext
253
    @param context: global Ganeti context
254
    @type ec_id: string
255
    @param ec_id: execution context identifier
256

257
    """
258
    self.context = context
259
    self._ec_id = ec_id
260
    self._cbs = None
261
    self.rpc = context.rpc
262
    self.hmclass = HooksMaster
263
    self._enable_locks = enable_locks
264

    
265
  def _CheckLocksEnabled(self):
266
    """Checks if locking is enabled.
267

268
    @raise errors.ProgrammerError: In case locking is not enabled
269

270
    """
271
    if not self._enable_locks:
272
      raise errors.ProgrammerError("Attempted to use disabled locks")
273

    
274
  def _AcquireLocks(self, level, names, shared, timeout, priority):
275
    """Acquires locks via the Ganeti lock manager.
276

277
    @type level: int
278
    @param level: Lock level
279
    @type names: list or string
280
    @param names: Lock names
281
    @type shared: bool
282
    @param shared: Whether the locks should be acquired in shared mode
283
    @type timeout: None or float
284
    @param timeout: Timeout for acquiring the locks
285
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
286
        amount of time
287

288
    """
289
    self._CheckLocksEnabled()
290

    
291
    if self._cbs:
292
      self._cbs.CheckCancel()
293

    
294
    acquired = self.context.glm.acquire(level, names, shared=shared,
295
                                        timeout=timeout, priority=priority)
296

    
297
    if acquired is None:
298
      raise LockAcquireTimeout()
299

    
300
    return acquired
301

    
302
  def _ExecLU(self, lu):
303
    """Logical Unit execution sequence.
304

305
    """
306
    write_count = self.context.cfg.write_count
307
    lu.CheckPrereq()
308

    
309
    hm = self.BuildHooksManager(lu)
310
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
311
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
312
                     self.Log, None)
313

    
314
    if getattr(lu.op, "dry_run", False):
315
      # in this mode, no post-hooks are run, and the config is not
316
      # written (as it might have been modified by another LU, and we
317
      # shouldn't do writeout on behalf of other threads
318
      self.LogInfo("dry-run mode requested, not actually executing"
319
                   " the operation")
320
      return lu.dry_run_result
321

    
322
    if self._cbs:
323
      submit_mj_fn = self._cbs.SubmitManyJobs
324
    else:
325
      submit_mj_fn = _FailingSubmitManyJobs
326

    
327
    try:
328
      result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
329
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
330
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
331
                                self.Log, result)
332
    finally:
333
      # FIXME: This needs locks if not lu_class.REQ_BGL
334
      if write_count != self.context.cfg.write_count:
335
        hm.RunConfigUpdate()
336

    
337
    return result
338

    
339
  def BuildHooksManager(self, lu):
340
    return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
341

    
342
  def _LockAndExecLU(self, lu, level, calc_timeout, priority):
343
    """Execute a Logical Unit, with the needed locks.
344

345
    This is a recursive function that starts locking the given level, and
346
    proceeds up, till there are no more locks to acquire. Then it executes the
347
    given LU and its opcodes.
348

349
    """
350
    adding_locks = level in lu.add_locks
351
    acquiring_locks = level in lu.needed_locks
352
    if level not in locking.LEVELS:
353
      if self._cbs:
354
        self._cbs.NotifyStart()
355

    
356
      result = self._ExecLU(lu)
357

    
358
    elif adding_locks and acquiring_locks:
359
      # We could both acquire and add locks at the same level, but for now we
360
      # don't need this, so we'll avoid the complicated code needed.
361
      raise NotImplementedError("Can't declare locks to acquire when adding"
362
                                " others")
363

    
364
    elif adding_locks or acquiring_locks:
365
      self._CheckLocksEnabled()
366

    
367
      lu.DeclareLocks(level)
368
      share = lu.share_locks[level]
369

    
370
      try:
371
        assert adding_locks ^ acquiring_locks, \
372
          "Locks must be either added or acquired"
373

    
374
        if acquiring_locks:
375
          # Acquiring locks
376
          needed_locks = lu.needed_locks[level]
377

    
378
          self._AcquireLocks(level, needed_locks, share,
379
                             calc_timeout(), priority)
380
        else:
381
          # Adding locks
382
          add_locks = lu.add_locks[level]
383
          lu.remove_locks[level] = add_locks
384

    
385
          try:
386
            self.context.glm.add(level, add_locks, acquired=1, shared=share)
387
          except errors.LockError:
388
            raise errors.OpPrereqError(
389
              "Couldn't add locks (%s), probably because of a race condition"
390
              " with another job, who added them first" % add_locks,
391
              errors.ECODE_FAULT)
392

    
393
        try:
394
          result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
395
        finally:
396
          if level in lu.remove_locks:
397
            self.context.glm.remove(level, lu.remove_locks[level])
398
      finally:
399
        if self.context.glm.is_owned(level):
400
          self.context.glm.release(level)
401

    
402
    else:
403
      result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
404

    
405
    return result
406

    
407
  def ExecOpCode(self, op, cbs, timeout=None, priority=None):
408
    """Execute an opcode.
409

410
    @type op: an OpCode instance
411
    @param op: the opcode to be executed
412
    @type cbs: L{OpExecCbBase}
413
    @param cbs: Runtime callbacks
414
    @type timeout: float or None
415
    @param timeout: Maximum time to acquire all locks, None for no timeout
416
    @type priority: number or None
417
    @param priority: Priority for acquiring lock(s)
418
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
419
        amount of time
420

421
    """
422
    if not isinstance(op, opcodes.OpCode):
423
      raise errors.ProgrammerError("Non-opcode instance passed"
424
                                   " to ExecOpcode (%s)" % type(op))
425

    
426
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
427
    if lu_class is None:
428
      raise errors.OpCodeUnknown("Unknown opcode")
429

    
430
    if timeout is None:
431
      calc_timeout = lambda: None
432
    else:
433
      calc_timeout = utils.RunningTimeout(timeout, False).Remaining
434

    
435
    self._cbs = cbs
436
    try:
437
      if self._enable_locks:
438
        # Acquire the Big Ganeti Lock exclusively if this LU requires it,
439
        # and in a shared fashion otherwise (to prevent concurrent run with
440
        # an exclusive LU.
441
        self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
442
                            not lu_class.REQ_BGL, calc_timeout(),
443
                            priority)
444
      elif lu_class.REQ_BGL:
445
        raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
446
                                     " disabled" % op.OP_ID)
447

    
448
      try:
449
        lu = lu_class(self, op, self.context, self.rpc)
450
        lu.ExpandNames()
451
        assert lu.needed_locks is not None, "needed_locks not set by LU"
452

    
453
        try:
454
          result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
455
                                       priority)
456
        finally:
457
          if self._ec_id:
458
            self.context.cfg.DropECReservations(self._ec_id)
459
      finally:
460
        # Release BGL if owned
461
        if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
462
          assert self._enable_locks
463
          self.context.glm.release(locking.LEVEL_CLUSTER)
464
    finally:
465
      self._cbs = None
466

    
467
    resultcheck_fn = op.OP_RESULT
468
    if not (resultcheck_fn is None or resultcheck_fn(result)):
469
      logging.error("Expected opcode result matching %s, got %s",
470
                    resultcheck_fn, result)
471
      raise errors.OpResultError("Opcode result does not match %s: %s" %
472
                                 (resultcheck_fn, utils.Truncate(result, 80)))
473

    
474
    return result
475

    
476
  def Log(self, *args):
477
    """Forward call to feedback callback function.
478

479
    """
480
    if self._cbs:
481
      self._cbs.Feedback(*args)
482

    
483
  def LogStep(self, current, total, message):
484
    """Log a change in LU execution progress.
485

486
    """
487
    logging.debug("Step %d/%d %s", current, total, message)
488
    self.Log("STEP %d/%d %s" % (current, total, message))
489

    
490
  def LogWarning(self, message, *args, **kwargs):
491
    """Log a warning to the logs and the user.
492

493
    The optional keyword argument is 'hint' and can be used to show a
494
    hint to the user (presumably related to the warning). If the
495
    message is empty, it will not be printed at all, allowing one to
496
    show only a hint.
497

498
    """
499
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
500
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
501
    if args:
502
      message = message % tuple(args)
503
    if message:
504
      logging.warning(message)
505
      self.Log(" - WARNING: %s" % message)
506
    if "hint" in kwargs:
507
      self.Log("      Hint: %s" % kwargs["hint"])
508

    
509
  def LogInfo(self, message, *args):
510
    """Log an informational message to the logs and the user.
511

512
    """
513
    if args:
514
      message = message % tuple(args)
515
    logging.info(message)
516
    self.Log(" - INFO: %s" % message)
517

    
518
  def GetECId(self):
519
    """Returns the current execution context ID.
520

521
    """
522
    if not self._ec_id:
523
      raise errors.ProgrammerError("Tried to use execution context id when"
524
                                   " not set")
525
    return self._ec_id
526

    
527

    
528
class HooksMaster(object):
529
  def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn,
530
    hooks_results_adapt_fn, build_env_fn, log_fn, htype=None, cluster_name=None,
531
    master_name=None):
532
    """Base class for hooks masters.
533

534
    This class invokes the execution of hooks according to the behaviour
535
    specified by its parameters.
536

537
    @type opcode: string
538
    @param opcode: opcode of the operation to which the hooks are tied
539
    @type hooks_path: string
540
    @param hooks_path: prefix of the hooks directories
541
    @type nodes: 2-tuple of lists
542
    @param nodes: 2-tuple of lists containing nodes on which pre-hooks must be
543
      run and nodes on which post-hooks must be run
544
    @type hooks_execution_fn: function that accepts the following parameters:
545
      (node_list, hooks_path, phase, environment)
546
    @param hooks_execution_fn: function that will execute the hooks; can be
547
      None, indicating that no conversion is necessary.
548
    @type hooks_results_adapt_fn: function
549
    @param hooks_results_adapt_fn: function that will adapt the return value of
550
      hooks_execution_fn to the format expected by RunPhase
551
    @type build_env_fn: function that returns a dictionary having strings as
552
      keys
553
    @param build_env_fn: function that builds the environment for the hooks
554
    @type log_fn: function that accepts a string
555
    @param log_fn: logging function
556
    @type htype: string or None
557
    @param htype: None or one of L{constants.HTYPE_CLUSTER},
558
     L{constants.HTYPE_NODE}, L{constants.HTYPE_INSTANCE}
559
    @type cluster_name: string
560
    @param cluster_name: name of the cluster
561
    @type master_name: string
562
    @param master_name: name of the master
563

564
    """
565
    self.opcode = opcode
566
    self.hooks_path = hooks_path
567
    self.hooks_execution_fn = hooks_execution_fn
568
    self.hooks_results_adapt_fn = hooks_results_adapt_fn
569
    self.build_env_fn = build_env_fn
570
    self.log_fn = log_fn
571
    self.htype = htype
572
    self.cluster_name = cluster_name
573
    self.master_name = master_name
574

    
575
    self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
576
    (self.pre_nodes, self.post_nodes) = nodes
577

    
578
  def _BuildEnv(self, phase):
579
    """Compute the environment and the target nodes.
580

581
    Based on the opcode and the current node list, this builds the
582
    environment for the hooks and the target node list for the run.
583

584
    """
585
    if phase == constants.HOOKS_PHASE_PRE:
586
      prefix = "GANETI_"
587
    elif phase == constants.HOOKS_PHASE_POST:
588
      prefix = "GANETI_POST_"
589
    else:
590
      raise AssertionError("Unknown phase '%s'" % phase)
591

    
592
    env = {}
593

    
594
    if self.hooks_path is not None:
595
      phase_env = self.build_env_fn()
596
      if phase_env:
597
        assert not compat.any(key.upper().startswith(prefix)
598
                              for key in phase_env)
599
        env.update(("%s%s" % (prefix, key), value)
600
                   for (key, value) in phase_env.items())
601

    
602
    if phase == constants.HOOKS_PHASE_PRE:
603
      assert compat.all((key.startswith("GANETI_") and
604
                         not key.startswith("GANETI_POST_"))
605
                        for key in env)
606

    
607
    elif phase == constants.HOOKS_PHASE_POST:
608
      assert compat.all(key.startswith("GANETI_POST_") for key in env)
609
      assert isinstance(self.pre_env, dict)
610

    
611
      # Merge with pre-phase environment
612
      assert not compat.any(key.startswith("GANETI_POST_")
613
                            for key in self.pre_env)
614
      env.update(self.pre_env)
615
    else:
616
      raise AssertionError("Unknown phase '%s'" % phase)
617

    
618
    return env
619

    
620
  def _RunWrapper(self, node_list, hpath, phase, phase_env):
621
    """Simple wrapper over self.callfn.
622

623
    This method fixes the environment before executing the hooks.
624

625
    """
626
    env = {
627
      "PATH": constants.HOOKS_PATH,
628
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
629
      "GANETI_OP_CODE": self.opcode,
630
      "GANETI_DATA_DIR": constants.DATA_DIR,
631
      "GANETI_HOOKS_PHASE": phase,
632
      "GANETI_HOOKS_PATH": hpath,
633
      }
634

    
635
    if self.htype:
636
      env["GANETI_OBJECT_TYPE"] = self.htype
637

    
638
    if self.cluster_name is not None:
639
      env["GANETI_CLUSTER"] = self.cluster_name
640

    
641
    if self.master_name is not None:
642
      env["GANETI_MASTER"] = self.master_name
643

    
644
    if phase_env:
645
      env = utils.algo.JoinDisjointDicts(env, phase_env)
646

    
647
    # Convert everything to strings
648
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
649

    
650
    assert compat.all(key == "PATH" or key.startswith("GANETI_")
651
                      for key in env)
652

    
653
    return self.hooks_execution_fn(node_list, hpath, phase, env)
654

    
655
  def RunPhase(self, phase, nodes=None):
656
    """Run all the scripts for a phase.
657

658
    This is the main function of the HookMaster.
659
    It executes self.hooks_execution_fn, and after running
660
    self.hooks_results_adapt_fn on its results it expects them to be in the form
661
    {node_name: (fail_msg, [(script, result, output), ...]}).
662

663
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
664
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
665
    @param nodes: overrides the predefined list of nodes for the given phase
666
    @return: the processed results of the hooks multi-node rpc call
667
    @raise errors.HooksFailure: on communication failure to the nodes
668
    @raise errors.HooksAbort: on failure of one of the hooks
669

670
    """
671
    if phase == constants.HOOKS_PHASE_PRE:
672
      if nodes is None:
673
        nodes = self.pre_nodes
674
      env = self.pre_env
675
    elif phase == constants.HOOKS_PHASE_POST:
676
      if nodes is None:
677
        nodes = self.post_nodes
678
      env = self._BuildEnv(phase)
679
    else:
680
      raise AssertionError("Unknown phase '%s'" % phase)
681

    
682
    if not nodes:
683
      # empty node list, we should not attempt to run this as either
684
      # we're in the cluster init phase and the rpc client part can't
685
      # even attempt to run, or this LU doesn't do hooks at all
686
      return
687

    
688
    results = self._RunWrapper(nodes, self.hooks_path, phase, env)
689
    if not results:
690
      msg = "Communication Failure"
691
      if phase == constants.HOOKS_PHASE_PRE:
692
        raise errors.HooksFailure(msg)
693
      else:
694
        self.log_fn(msg)
695
        return results
696

    
697
    converted_res = results
698
    if self.hooks_results_adapt_fn:
699
      converted_res = self.hooks_results_adapt_fn(results)
700

    
701
    errs = []
702
    for node_name, (fail_msg, offline, hooks_results) in converted_res.items():
703
      if offline:
704
        continue
705

    
706
      if fail_msg:
707
        self.log_fn("Communication failure to node %s: %s", node_name, fail_msg)
708
        continue
709

    
710
      for script, hkr, output in hooks_results:
711
        if hkr == constants.HKR_FAIL:
712
          if phase == constants.HOOKS_PHASE_PRE:
713
            errs.append((node_name, script, output))
714
          else:
715
            if not output:
716
              output = "(no output)"
717
            self.log_fn("On %s script %s failed, output: %s" %
718
                        (node_name, script, output))
719

    
720
    if errs and phase == constants.HOOKS_PHASE_PRE:
721
      raise errors.HooksAbort(errs)
722

    
723
    return results
724

    
725
  def RunConfigUpdate(self):
726
    """Run the special configuration update hook
727

728
    This is a special hook that runs only on the master after each
729
    top-level LI if the configuration has been updated.
730

731
    """
732
    phase = constants.HOOKS_PHASE_POST
733
    hpath = constants.HOOKS_NAME_CFGUPDATE
734
    nodes = [self.master_name]
735
    self._RunWrapper(nodes, hpath, phase, self.pre_env)
736

    
737
  @staticmethod
738
  def BuildFromLu(hooks_execution_fn, lu):
739
    if lu.HPATH is None:
740
      nodes = (None, None)
741
    else:
742
      nodes = map(frozenset, lu.BuildHooksNodes())
743

    
744
    master_name = cluster_name = None
745
    if lu.cfg:
746
      master_name = lu.cfg.GetMasterNode()
747
      cluster_name = lu.cfg.GetClusterName()
748

    
749
    return HooksMaster(lu.op.OP_ID, lu.HPATH, nodes, hooks_execution_fn,
750
                       _RpcResultsToHooksResults, lu.BuildHooksEnv,
751
                       lu.LogWarning, lu.HTYPE, cluster_name, master_name)