Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ d5d76ab2

History | View | Annotate | Download (23.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import sys
32
import logging
33
import random
34
import time
35
import itertools
36
import traceback
37

    
38
from ganeti import opcodes
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import cmdlib
42
from ganeti import locking
43
from ganeti import utils
44
from ganeti import compat
45
from ganeti import pathutils
46

    
47

    
48
_OP_PREFIX = "Op"
49
_LU_PREFIX = "LU"
50

    
51

    
52
class LockAcquireTimeout(Exception):
53
  """Exception to report timeouts on acquiring locks.
54

55
  """
56

    
57

    
58
def _CalculateLockAttemptTimeouts():
59
  """Calculate timeouts for lock attempts.
60

61
  """
62
  result = [constants.LOCK_ATTEMPTS_MINWAIT]
63
  running_sum = result[0]
64

    
65
  # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
66
  # blocking acquire
67
  while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
68
    timeout = (result[-1] * 1.05) ** 1.25
69

    
70
    # Cap max timeout. This gives other jobs a chance to run even if
71
    # we're still trying to get our locks, before finally moving to a
72
    # blocking acquire.
73
    timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
74
    # And also cap the lower boundary for safety
75
    timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
76

    
77
    result.append(timeout)
78
    running_sum += timeout
79

    
80
  return result
81

    
82

    
83
class LockAttemptTimeoutStrategy(object):
84
  """Class with lock acquire timeout strategy.
85

86
  """
87
  __slots__ = [
88
    "_timeouts",
89
    "_random_fn",
90
    "_time_fn",
91
    ]
92

    
93
  _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
94

    
95
  def __init__(self, _time_fn=time.time, _random_fn=random.random):
96
    """Initializes this class.
97

98
    @param _time_fn: Time function for unittests
99
    @param _random_fn: Random number generator for unittests
100

101
    """
102
    object.__init__(self)
103

    
104
    self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
105
    self._time_fn = _time_fn
106
    self._random_fn = _random_fn
107

    
108
  def NextAttempt(self):
109
    """Returns the timeout for the next attempt.
110

111
    """
112
    try:
113
      timeout = self._timeouts.next()
114
    except StopIteration:
115
      # No more timeouts, do blocking acquire
116
      timeout = None
117

    
118
    if timeout is not None:
119
      # Add a small variation (-/+ 5%) to timeout. This helps in situations
120
      # where two or more jobs are fighting for the same lock(s).
121
      variation_range = timeout * 0.1
122
      timeout += ((self._random_fn() * variation_range) -
123
                  (variation_range * 0.5))
124

    
125
    return timeout
126

    
127

    
128
class OpExecCbBase: # pylint: disable=W0232
129
  """Base class for OpCode execution callbacks.
130

131
  """
132
  def NotifyStart(self):
133
    """Called when we are about to execute the LU.
134

135
    This function is called when we're about to start the lu's Exec() method,
136
    that is, after we have acquired all locks.
137

138
    """
139

    
140
  def Feedback(self, *args):
141
    """Sends feedback from the LU code to the end-user.
142

143
    """
144

    
145
  def CheckCancel(self):
146
    """Check whether job has been cancelled.
147

148
    """
149

    
150
  def SubmitManyJobs(self, jobs):
151
    """Submits jobs for processing.
152

153
    See L{jqueue.JobQueue.SubmitManyJobs}.
154

155
    """
156
    raise NotImplementedError
157

    
158

    
159
def _LUNameForOpName(opname):
160
  """Computes the LU name for a given OpCode name.
161

162
  """
163
  assert opname.startswith(_OP_PREFIX), \
164
      "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
165

    
166
  return _LU_PREFIX + opname[len(_OP_PREFIX):]
167

    
168

    
169
def _ComputeDispatchTable():
170
  """Computes the opcode-to-lu dispatch table.
171

172
  """
173
  return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
174
              for op in opcodes.OP_MAPPING.values()
175
              if op.WITH_LU)
176

    
177

    
178
def _SetBaseOpParams(src, defcomment, dst):
179
  """Copies basic opcode parameters.
180

181
  @type src: L{opcodes.OpCode}
182
  @param src: Source opcode
183
  @type defcomment: string
184
  @param defcomment: Comment to specify if not already given
185
  @type dst: L{opcodes.OpCode}
186
  @param dst: Destination opcode
187

188
  """
189
  if hasattr(src, "debug_level"):
190
    dst.debug_level = src.debug_level
191

    
192
  if (getattr(dst, "priority", None) is None and
193
      hasattr(src, "priority")):
194
    dst.priority = src.priority
195

    
196
  if not getattr(dst, opcodes.COMMENT_ATTR, None):
197
    dst.comment = defcomment
198

    
199

    
200
def _ProcessResult(submit_fn, op, result):
201
  """Examines opcode result.
202

203
  If necessary, additional processing on the result is done.
204

205
  """
206
  if isinstance(result, cmdlib.ResultWithJobs):
207
    # Copy basic parameters (e.g. priority)
208
    map(compat.partial(_SetBaseOpParams, op,
209
                       "Submitted by %s" % op.OP_ID),
210
        itertools.chain(*result.jobs))
211

    
212
    # Submit jobs
213
    job_submission = submit_fn(result.jobs)
214

    
215
    # Build dictionary
216
    result = result.other
217

    
218
    assert constants.JOB_IDS_KEY not in result, \
219
      "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
220

    
221
    result[constants.JOB_IDS_KEY] = job_submission
222

    
223
  return result
224

    
225

    
226
def _FailingSubmitManyJobs(_):
227
  """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
228

229
  """
230
  raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
231
                               " queries) can not submit jobs")
232

    
233

    
234
def _RpcResultsToHooksResults(rpc_results):
235
  """Function to convert RPC results to the format expected by HooksMaster.
236

237
  @type rpc_results: dict(node: L{rpc.RpcResult})
238
  @param rpc_results: RPC results
239
  @rtype: dict(node: (fail_msg, offline, hooks_results))
240
  @return: RPC results unpacked according to the format expected by
241
    L({mcpu.HooksMaster}
242

243
  """
244
  return dict((node, (rpc_res.fail_msg, rpc_res.offline, rpc_res.payload))
245
              for (node, rpc_res) in rpc_results.items())
246

    
247

    
248
class Processor(object):
249
  """Object which runs OpCodes"""
250
  DISPATCH_TABLE = _ComputeDispatchTable()
251

    
252
  def __init__(self, context, ec_id, enable_locks=True):
253
    """Constructor for Processor
254

255
    @type context: GanetiContext
256
    @param context: global Ganeti context
257
    @type ec_id: string
258
    @param ec_id: execution context identifier
259

260
    """
261
    self.context = context
262
    self._ec_id = ec_id
263
    self._cbs = None
264
    self.rpc = context.rpc
265
    self.hmclass = HooksMaster
266
    self._enable_locks = enable_locks
267

    
268
  def _CheckLocksEnabled(self):
269
    """Checks if locking is enabled.
270

271
    @raise errors.ProgrammerError: In case locking is not enabled
272

273
    """
274
    if not self._enable_locks:
275
      raise errors.ProgrammerError("Attempted to use disabled locks")
276

    
277
  def _AcquireLocks(self, level, names, shared, timeout, priority):
278
    """Acquires locks via the Ganeti lock manager.
279

280
    @type level: int
281
    @param level: Lock level
282
    @type names: list or string
283
    @param names: Lock names
284
    @type shared: bool
285
    @param shared: Whether the locks should be acquired in shared mode
286
    @type timeout: None or float
287
    @param timeout: Timeout for acquiring the locks
288
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
289
        amount of time
290

291
    """
292
    self._CheckLocksEnabled()
293

    
294
    if self._cbs:
295
      self._cbs.CheckCancel()
296

    
297
    acquired = self.context.glm.acquire(level, names, shared=shared,
298
                                        timeout=timeout, priority=priority)
299

    
300
    if acquired is None:
301
      raise LockAcquireTimeout()
302

    
303
    return acquired
304

    
305
  def _ExecLU(self, lu):
306
    """Logical Unit execution sequence.
307

308
    """
309
    write_count = self.context.cfg.write_count
310
    lu.CheckPrereq()
311

    
312
    hm = self.BuildHooksManager(lu)
313
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
314
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
315
                     self.Log, None)
316

    
317
    if getattr(lu.op, "dry_run", False):
318
      # in this mode, no post-hooks are run, and the config is not
319
      # written (as it might have been modified by another LU, and we
320
      # shouldn't do writeout on behalf of other threads
321
      self.LogInfo("dry-run mode requested, not actually executing"
322
                   " the operation")
323
      return lu.dry_run_result
324

    
325
    if self._cbs:
326
      submit_mj_fn = self._cbs.SubmitManyJobs
327
    else:
328
      submit_mj_fn = _FailingSubmitManyJobs
329

    
330
    try:
331
      result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
332
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
333
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
334
                                self.Log, result)
335
    finally:
336
      # FIXME: This needs locks if not lu_class.REQ_BGL
337
      if write_count != self.context.cfg.write_count:
338
        hm.RunConfigUpdate()
339

    
340
    return result
341

    
342
  def BuildHooksManager(self, lu):
343
    return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
344

    
345
  def _LockAndExecLU(self, lu, level, calc_timeout, priority):
346
    """Execute a Logical Unit, with the needed locks.
347

348
    This is a recursive function that starts locking the given level, and
349
    proceeds up, till there are no more locks to acquire. Then it executes the
350
    given LU and its opcodes.
351

352
    """
353
    adding_locks = level in lu.add_locks
354
    acquiring_locks = level in lu.needed_locks
355
    if level not in locking.LEVELS:
356
      if self._cbs:
357
        self._cbs.NotifyStart()
358

    
359
      try:
360
        result = self._ExecLU(lu)
361
      except AssertionError, err:
362
        # this is a bit ugly, as we don't know from which phase
363
        # (prereq, exec) this comes; but it's better than an exception
364
        # with no information
365
        (_, _, tb) = sys.exc_info()
366
        err_info = traceback.format_tb(tb)
367
        del tb
368
        logging.exception("Detected AssertionError")
369
        raise errors.OpExecError("Internal assertion error: please report"
370
                                 " this as a bug.\nError message: '%s';"
371
                                 " location:\n%s" % (str(err), err_info[-1]))
372

    
373
    elif adding_locks and acquiring_locks:
374
      # We could both acquire and add locks at the same level, but for now we
375
      # don't need this, so we'll avoid the complicated code needed.
376
      raise NotImplementedError("Can't declare locks to acquire when adding"
377
                                " others")
378

    
379
    elif adding_locks or acquiring_locks:
380
      self._CheckLocksEnabled()
381

    
382
      lu.DeclareLocks(level)
383
      share = lu.share_locks[level]
384

    
385
      try:
386
        assert adding_locks ^ acquiring_locks, \
387
          "Locks must be either added or acquired"
388

    
389
        if acquiring_locks:
390
          # Acquiring locks
391
          needed_locks = lu.needed_locks[level]
392

    
393
          self._AcquireLocks(level, needed_locks, share,
394
                             calc_timeout(), priority)
395
        else:
396
          # Adding locks
397
          add_locks = lu.add_locks[level]
398
          lu.remove_locks[level] = add_locks
399

    
400
          try:
401
            self.context.glm.add(level, add_locks, acquired=1, shared=share)
402
          except errors.LockError:
403
            raise errors.OpPrereqError(
404
              "Couldn't add locks (%s), probably because of a race condition"
405
              " with another job, who added them first" % add_locks,
406
              errors.ECODE_FAULT)
407

    
408
        try:
409
          result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
410
        finally:
411
          if level in lu.remove_locks:
412
            self.context.glm.remove(level, lu.remove_locks[level])
413
      finally:
414
        if self.context.glm.is_owned(level):
415
          self.context.glm.release(level)
416

    
417
    else:
418
      result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
419

    
420
    return result
421

    
422
  def ExecOpCode(self, op, cbs, timeout=None, priority=None):
423
    """Execute an opcode.
424

425
    @type op: an OpCode instance
426
    @param op: the opcode to be executed
427
    @type cbs: L{OpExecCbBase}
428
    @param cbs: Runtime callbacks
429
    @type timeout: float or None
430
    @param timeout: Maximum time to acquire all locks, None for no timeout
431
    @type priority: number or None
432
    @param priority: Priority for acquiring lock(s)
433
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
434
        amount of time
435

436
    """
437
    if not isinstance(op, opcodes.OpCode):
438
      raise errors.ProgrammerError("Non-opcode instance passed"
439
                                   " to ExecOpcode (%s)" % type(op))
440

    
441
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
442
    if lu_class is None:
443
      raise errors.OpCodeUnknown("Unknown opcode")
444

    
445
    if timeout is None:
446
      calc_timeout = lambda: None
447
    else:
448
      calc_timeout = utils.RunningTimeout(timeout, False).Remaining
449

    
450
    self._cbs = cbs
451
    try:
452
      if self._enable_locks:
453
        # Acquire the Big Ganeti Lock exclusively if this LU requires it,
454
        # and in a shared fashion otherwise (to prevent concurrent run with
455
        # an exclusive LU.
456
        self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
457
                            not lu_class.REQ_BGL, calc_timeout(),
458
                            priority)
459
      elif lu_class.REQ_BGL:
460
        raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
461
                                     " disabled" % op.OP_ID)
462

    
463
      try:
464
        lu = lu_class(self, op, self.context, self.rpc)
465
        lu.ExpandNames()
466
        assert lu.needed_locks is not None, "needed_locks not set by LU"
467

    
468
        try:
469
          result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
470
                                       priority)
471
        finally:
472
          if self._ec_id:
473
            self.context.cfg.DropECReservations(self._ec_id)
474
      finally:
475
        # Release BGL if owned
476
        if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
477
          assert self._enable_locks
478
          self.context.glm.release(locking.LEVEL_CLUSTER)
479
    finally:
480
      self._cbs = None
481

    
482
    resultcheck_fn = op.OP_RESULT
483
    if not (resultcheck_fn is None or resultcheck_fn(result)):
484
      logging.error("Expected opcode result matching %s, got %s",
485
                    resultcheck_fn, result)
486
      raise errors.OpResultError("Opcode result does not match %s: %s" %
487
                                 (resultcheck_fn, utils.Truncate(result, 80)))
488

    
489
    return result
490

    
491
  def Log(self, *args):
492
    """Forward call to feedback callback function.
493

494
    """
495
    if self._cbs:
496
      self._cbs.Feedback(*args)
497

    
498
  def LogStep(self, current, total, message):
499
    """Log a change in LU execution progress.
500

501
    """
502
    logging.debug("Step %d/%d %s", current, total, message)
503
    self.Log("STEP %d/%d %s" % (current, total, message))
504

    
505
  def LogWarning(self, message, *args, **kwargs):
506
    """Log a warning to the logs and the user.
507

508
    The optional keyword argument is 'hint' and can be used to show a
509
    hint to the user (presumably related to the warning). If the
510
    message is empty, it will not be printed at all, allowing one to
511
    show only a hint.
512

513
    """
514
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
515
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
516
    if args:
517
      message = message % tuple(args)
518
    if message:
519
      logging.warning(message)
520
      self.Log(" - WARNING: %s" % message)
521
    if "hint" in kwargs:
522
      self.Log("      Hint: %s" % kwargs["hint"])
523

    
524
  def LogInfo(self, message, *args):
525
    """Log an informational message to the logs and the user.
526

527
    """
528
    if args:
529
      message = message % tuple(args)
530
    logging.info(message)
531
    self.Log(" - INFO: %s" % message)
532

    
533
  def GetECId(self):
534
    """Returns the current execution context ID.
535

536
    """
537
    if not self._ec_id:
538
      raise errors.ProgrammerError("Tried to use execution context id when"
539
                                   " not set")
540
    return self._ec_id
541

    
542

    
543
class HooksMaster(object):
544
  def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn,
545
               hooks_results_adapt_fn, build_env_fn, log_fn, htype=None,
546
               cluster_name=None, master_name=None):
547
    """Base class for hooks masters.
548

549
    This class invokes the execution of hooks according to the behaviour
550
    specified by its parameters.
551

552
    @type opcode: string
553
    @param opcode: opcode of the operation to which the hooks are tied
554
    @type hooks_path: string
555
    @param hooks_path: prefix of the hooks directories
556
    @type nodes: 2-tuple of lists
557
    @param nodes: 2-tuple of lists containing nodes on which pre-hooks must be
558
      run and nodes on which post-hooks must be run
559
    @type hooks_execution_fn: function that accepts the following parameters:
560
      (node_list, hooks_path, phase, environment)
561
    @param hooks_execution_fn: function that will execute the hooks; can be
562
      None, indicating that no conversion is necessary.
563
    @type hooks_results_adapt_fn: function
564
    @param hooks_results_adapt_fn: function that will adapt the return value of
565
      hooks_execution_fn to the format expected by RunPhase
566
    @type build_env_fn: function that returns a dictionary having strings as
567
      keys
568
    @param build_env_fn: function that builds the environment for the hooks
569
    @type log_fn: function that accepts a string
570
    @param log_fn: logging function
571
    @type htype: string or None
572
    @param htype: None or one of L{constants.HTYPE_CLUSTER},
573
     L{constants.HTYPE_NODE}, L{constants.HTYPE_INSTANCE}
574
    @type cluster_name: string
575
    @param cluster_name: name of the cluster
576
    @type master_name: string
577
    @param master_name: name of the master
578

579
    """
580
    self.opcode = opcode
581
    self.hooks_path = hooks_path
582
    self.hooks_execution_fn = hooks_execution_fn
583
    self.hooks_results_adapt_fn = hooks_results_adapt_fn
584
    self.build_env_fn = build_env_fn
585
    self.log_fn = log_fn
586
    self.htype = htype
587
    self.cluster_name = cluster_name
588
    self.master_name = master_name
589

    
590
    self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
591
    (self.pre_nodes, self.post_nodes) = nodes
592

    
593
  def _BuildEnv(self, phase):
594
    """Compute the environment and the target nodes.
595

596
    Based on the opcode and the current node list, this builds the
597
    environment for the hooks and the target node list for the run.
598

599
    """
600
    if phase == constants.HOOKS_PHASE_PRE:
601
      prefix = "GANETI_"
602
    elif phase == constants.HOOKS_PHASE_POST:
603
      prefix = "GANETI_POST_"
604
    else:
605
      raise AssertionError("Unknown phase '%s'" % phase)
606

    
607
    env = {}
608

    
609
    if self.hooks_path is not None:
610
      phase_env = self.build_env_fn()
611
      if phase_env:
612
        assert not compat.any(key.upper().startswith(prefix)
613
                              for key in phase_env)
614
        env.update(("%s%s" % (prefix, key), value)
615
                   for (key, value) in phase_env.items())
616

    
617
    if phase == constants.HOOKS_PHASE_PRE:
618
      assert compat.all((key.startswith("GANETI_") and
619
                         not key.startswith("GANETI_POST_"))
620
                        for key in env)
621

    
622
    elif phase == constants.HOOKS_PHASE_POST:
623
      assert compat.all(key.startswith("GANETI_POST_") for key in env)
624
      assert isinstance(self.pre_env, dict)
625

    
626
      # Merge with pre-phase environment
627
      assert not compat.any(key.startswith("GANETI_POST_")
628
                            for key in self.pre_env)
629
      env.update(self.pre_env)
630
    else:
631
      raise AssertionError("Unknown phase '%s'" % phase)
632

    
633
    return env
634

    
635
  def _RunWrapper(self, node_list, hpath, phase, phase_env):
636
    """Simple wrapper over self.callfn.
637

638
    This method fixes the environment before executing the hooks.
639

640
    """
641
    env = {
642
      "PATH": constants.HOOKS_PATH,
643
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
644
      "GANETI_OP_CODE": self.opcode,
645
      "GANETI_DATA_DIR": pathutils.DATA_DIR,
646
      "GANETI_HOOKS_PHASE": phase,
647
      "GANETI_HOOKS_PATH": hpath,
648
      }
649

    
650
    if self.htype:
651
      env["GANETI_OBJECT_TYPE"] = self.htype
652

    
653
    if self.cluster_name is not None:
654
      env["GANETI_CLUSTER"] = self.cluster_name
655

    
656
    if self.master_name is not None:
657
      env["GANETI_MASTER"] = self.master_name
658

    
659
    if phase_env:
660
      env = utils.algo.JoinDisjointDicts(env, phase_env)
661

    
662
    # Convert everything to strings
663
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
664

    
665
    assert compat.all(key == "PATH" or key.startswith("GANETI_")
666
                      for key in env)
667

    
668
    return self.hooks_execution_fn(node_list, hpath, phase, env)
669

    
670
  def RunPhase(self, phase, nodes=None):
671
    """Run all the scripts for a phase.
672

673
    This is the main function of the HookMaster.
674
    It executes self.hooks_execution_fn, and after running
675
    self.hooks_results_adapt_fn on its results it expects them to be in the form
676
    {node_name: (fail_msg, [(script, result, output), ...]}).
677

678
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
679
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
680
    @param nodes: overrides the predefined list of nodes for the given phase
681
    @return: the processed results of the hooks multi-node rpc call
682
    @raise errors.HooksFailure: on communication failure to the nodes
683
    @raise errors.HooksAbort: on failure of one of the hooks
684

685
    """
686
    if phase == constants.HOOKS_PHASE_PRE:
687
      if nodes is None:
688
        nodes = self.pre_nodes
689
      env = self.pre_env
690
    elif phase == constants.HOOKS_PHASE_POST:
691
      if nodes is None:
692
        nodes = self.post_nodes
693
      env = self._BuildEnv(phase)
694
    else:
695
      raise AssertionError("Unknown phase '%s'" % phase)
696

    
697
    if not nodes:
698
      # empty node list, we should not attempt to run this as either
699
      # we're in the cluster init phase and the rpc client part can't
700
      # even attempt to run, or this LU doesn't do hooks at all
701
      return
702

    
703
    results = self._RunWrapper(nodes, self.hooks_path, phase, env)
704
    if not results:
705
      msg = "Communication Failure"
706
      if phase == constants.HOOKS_PHASE_PRE:
707
        raise errors.HooksFailure(msg)
708
      else:
709
        self.log_fn(msg)
710
        return results
711

    
712
    converted_res = results
713
    if self.hooks_results_adapt_fn:
714
      converted_res = self.hooks_results_adapt_fn(results)
715

    
716
    errs = []
717
    for node_name, (fail_msg, offline, hooks_results) in converted_res.items():
718
      if offline:
719
        continue
720

    
721
      if fail_msg:
722
        self.log_fn("Communication failure to node %s: %s", node_name, fail_msg)
723
        continue
724

    
725
      for script, hkr, output in hooks_results:
726
        if hkr == constants.HKR_FAIL:
727
          if phase == constants.HOOKS_PHASE_PRE:
728
            errs.append((node_name, script, output))
729
          else:
730
            if not output:
731
              output = "(no output)"
732
            self.log_fn("On %s script %s failed, output: %s" %
733
                        (node_name, script, output))
734

    
735
    if errs and phase == constants.HOOKS_PHASE_PRE:
736
      raise errors.HooksAbort(errs)
737

    
738
    return results
739

    
740
  def RunConfigUpdate(self):
741
    """Run the special configuration update hook
742

743
    This is a special hook that runs only on the master after each
744
    top-level LI if the configuration has been updated.
745

746
    """
747
    phase = constants.HOOKS_PHASE_POST
748
    hpath = constants.HOOKS_NAME_CFGUPDATE
749
    nodes = [self.master_name]
750
    self._RunWrapper(nodes, hpath, phase, self.pre_env)
751

    
752
  @staticmethod
753
  def BuildFromLu(hooks_execution_fn, lu):
754
    if lu.HPATH is None:
755
      nodes = (None, None)
756
    else:
757
      nodes = map(frozenset, lu.BuildHooksNodes())
758

    
759
    master_name = cluster_name = None
760
    if lu.cfg:
761
      master_name = lu.cfg.GetMasterNode()
762
      cluster_name = lu.cfg.GetClusterName()
763

    
764
    return HooksMaster(lu.op.OP_ID, lu.HPATH, nodes, hooks_execution_fn,
765
                       _RpcResultsToHooksResults, lu.BuildHooksEnv,
766
                       lu.LogWarning, lu.HTYPE, cluster_name, master_name)