Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 8e8092cc

History | View | Annotate | Download (23.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import logging
32
import random
33
import time
34
import itertools
35

    
36
from ganeti import opcodes
37
from ganeti import constants
38
from ganeti import errors
39
from ganeti import cmdlib
40
from ganeti import locking
41
from ganeti import utils
42
from ganeti import compat
43

    
44

    
45
_OP_PREFIX = "Op"
46
_LU_PREFIX = "LU"
47

    
48

    
49
class LockAcquireTimeout(Exception):
50
  """Exception to report timeouts on acquiring locks.
51

52
  """
53

    
54

    
55
def _CalculateLockAttemptTimeouts():
56
  """Calculate timeouts for lock attempts.
57

58
  """
59
  result = [constants.LOCK_ATTEMPTS_MINWAIT]
60
  running_sum = result[0]
61

    
62
  # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
63
  # blocking acquire
64
  while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
65
    timeout = (result[-1] * 1.05) ** 1.25
66

    
67
    # Cap max timeout. This gives other jobs a chance to run even if
68
    # we're still trying to get our locks, before finally moving to a
69
    # blocking acquire.
70
    timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
71
    # And also cap the lower boundary for safety
72
    timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
73

    
74
    result.append(timeout)
75
    running_sum += timeout
76

    
77
  return result
78

    
79

    
80
class LockAttemptTimeoutStrategy(object):
81
  """Class with lock acquire timeout strategy.
82

83
  """
84
  __slots__ = [
85
    "_timeouts",
86
    "_random_fn",
87
    "_time_fn",
88
    ]
89

    
90
  _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
91

    
92
  def __init__(self, _time_fn=time.time, _random_fn=random.random):
93
    """Initializes this class.
94

95
    @param _time_fn: Time function for unittests
96
    @param _random_fn: Random number generator for unittests
97

98
    """
99
    object.__init__(self)
100

    
101
    self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
102
    self._time_fn = _time_fn
103
    self._random_fn = _random_fn
104

    
105
  def NextAttempt(self):
106
    """Returns the timeout for the next attempt.
107

108
    """
109
    try:
110
      timeout = self._timeouts.next()
111
    except StopIteration:
112
      # No more timeouts, do blocking acquire
113
      timeout = None
114

    
115
    if timeout is not None:
116
      # Add a small variation (-/+ 5%) to timeout. This helps in situations
117
      # where two or more jobs are fighting for the same lock(s).
118
      variation_range = timeout * 0.1
119
      timeout += ((self._random_fn() * variation_range) -
120
                  (variation_range * 0.5))
121

    
122
    return timeout
123

    
124

    
125
class OpExecCbBase: # pylint: disable=W0232
126
  """Base class for OpCode execution callbacks.
127

128
  """
129
  def NotifyStart(self):
130
    """Called when we are about to execute the LU.
131

132
    This function is called when we're about to start the lu's Exec() method,
133
    that is, after we have acquired all locks.
134

135
    """
136

    
137
  def Feedback(self, *args):
138
    """Sends feedback from the LU code to the end-user.
139

140
    """
141

    
142
  def CheckCancel(self):
143
    """Check whether job has been cancelled.
144

145
    """
146

    
147
  def SubmitManyJobs(self, jobs):
148
    """Submits jobs for processing.
149

150
    See L{jqueue.JobQueue.SubmitManyJobs}.
151

152
    """
153
    raise NotImplementedError
154

    
155

    
156
def _LUNameForOpName(opname):
157
  """Computes the LU name for a given OpCode name.
158

159
  """
160
  assert opname.startswith(_OP_PREFIX), \
161
      "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
162

    
163
  return _LU_PREFIX + opname[len(_OP_PREFIX):]
164

    
165

    
166
def _ComputeDispatchTable():
167
  """Computes the opcode-to-lu dispatch table.
168

169
  """
170
  return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
171
              for op in opcodes.OP_MAPPING.values()
172
              if op.WITH_LU)
173

    
174

    
175
def _SetBaseOpParams(src, defcomment, dst):
176
  """Copies basic opcode parameters.
177

178
  @type src: L{opcodes.OpCode}
179
  @param src: Source opcode
180
  @type defcomment: string
181
  @param defcomment: Comment to specify if not already given
182
  @type dst: L{opcodes.OpCode}
183
  @param dst: Destination opcode
184

185
  """
186
  if hasattr(src, "debug_level"):
187
    dst.debug_level = src.debug_level
188

    
189
  if (getattr(dst, "priority", None) is None and
190
      hasattr(src, "priority")):
191
    dst.priority = src.priority
192

    
193
  if not getattr(dst, opcodes.COMMENT_ATTR, None):
194
    dst.comment = defcomment
195

    
196

    
197
def _ProcessResult(submit_fn, op, result):
198
  """Examines opcode result.
199

200
  If necessary, additional processing on the result is done.
201

202
  """
203
  if isinstance(result, cmdlib.ResultWithJobs):
204
    # Copy basic parameters (e.g. priority)
205
    map(compat.partial(_SetBaseOpParams, op,
206
                       "Submitted by %s" % op.OP_ID),
207
        itertools.chain(*result.jobs))
208

    
209
    # Submit jobs
210
    job_submission = submit_fn(result.jobs)
211

    
212
    # Build dictionary
213
    result = result.other
214

    
215
    assert constants.JOB_IDS_KEY not in result, \
216
      "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
217

    
218
    result[constants.JOB_IDS_KEY] = job_submission
219

    
220
  return result
221

    
222

    
223
def _FailingSubmitManyJobs(_):
224
  """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
225

226
  """
227
  raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
228
                               " queries) can not submit jobs")
229

    
230

    
231
def _RpcResultsToHooksResults(rpc_results):
232
  """Function to convert RPC results to the format expected by HooksMaster.
233

234
  @type rpc_results: dict(node: L{rpc.RpcResult})
235
  @param rpc_results: RPC results
236
  @rtype: dict(node: (fail_msg, offline, hooks_results))
237
  @return: RPC results unpacked according to the format expected by
238
    L({mcpu.HooksMaster}
239

240
  """
241
  return dict((node, (rpc_res.fail_msg, rpc_res.offline, rpc_res.payload))
242
              for (node, rpc_res) in rpc_results.items())
243

    
244

    
245
class Processor(object):
246
  """Object which runs OpCodes"""
247
  DISPATCH_TABLE = _ComputeDispatchTable()
248

    
249
  def __init__(self, context, ec_id, enable_locks=True):
250
    """Constructor for Processor
251

252
    @type context: GanetiContext
253
    @param context: global Ganeti context
254
    @type ec_id: string
255
    @param ec_id: execution context identifier
256

257
    """
258
    self.context = context
259
    self._ec_id = ec_id
260
    self._cbs = None
261
    self.rpc = context.rpc
262
    self.hmclass = HooksMaster
263
    self._enable_locks = enable_locks
264

    
265
  def _CheckLocksEnabled(self):
266
    """Checks if locking is enabled.
267

268
    @raise errors.ProgrammerError: In case locking is not enabled
269

270
    """
271
    if not self._enable_locks:
272
      raise errors.ProgrammerError("Attempted to use disabled locks")
273

    
274
  def _AcquireLocks(self, level, names, shared, timeout, priority):
275
    """Acquires locks via the Ganeti lock manager.
276

277
    @type level: int
278
    @param level: Lock level
279
    @type names: list or string
280
    @param names: Lock names
281
    @type shared: bool
282
    @param shared: Whether the locks should be acquired in shared mode
283
    @type timeout: None or float
284
    @param timeout: Timeout for acquiring the locks
285
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
286
        amount of time
287

288
    """
289
    self._CheckLocksEnabled()
290

    
291
    if self._cbs:
292
      self._cbs.CheckCancel()
293

    
294
    acquired = self.context.glm.acquire(level, names, shared=shared,
295
                                        timeout=timeout, priority=priority)
296

    
297
    if acquired is None:
298
      raise LockAcquireTimeout()
299

    
300
    return acquired
301

    
302
  def _ExecLU(self, lu):
303
    """Logical Unit execution sequence.
304

305
    """
306
    write_count = self.context.cfg.write_count
307
    lu.CheckPrereq()
308

    
309
    hm = self.BuildHooksManager(lu)
310
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
311
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
312
                     self.Log, None)
313

    
314
    if getattr(lu.op, "dry_run", False):
315
      # in this mode, no post-hooks are run, and the config is not
316
      # written (as it might have been modified by another LU, and we
317
      # shouldn't do writeout on behalf of other threads
318
      self.LogInfo("dry-run mode requested, not actually executing"
319
                   " the operation")
320
      return lu.dry_run_result
321

    
322
    if self._cbs:
323
      submit_mj_fn = self._cbs.SubmitManyJobs
324
    else:
325
      submit_mj_fn = _FailingSubmitManyJobs
326

    
327
    try:
328
      result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
329
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
330
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
331
                                self.Log, result)
332
    finally:
333
      # FIXME: This needs locks if not lu_class.REQ_BGL
334
      if write_count != self.context.cfg.write_count:
335
        hm.RunConfigUpdate()
336

    
337
    return result
338

    
339
  def BuildHooksManager(self, lu):
340
    return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
341

    
342
  def _LockAndExecLU(self, lu, level, calc_timeout, priority):
343
    """Execute a Logical Unit, with the needed locks.
344

345
    This is a recursive function that starts locking the given level, and
346
    proceeds up, till there are no more locks to acquire. Then it executes the
347
    given LU and its opcodes.
348

349
    """
350
    adding_locks = level in lu.add_locks
351
    acquiring_locks = level in lu.needed_locks
352
    if level not in locking.LEVELS:
353
      if self._cbs:
354
        self._cbs.NotifyStart()
355

    
356
      result = self._ExecLU(lu)
357

    
358
    elif adding_locks and acquiring_locks:
359
      # We could both acquire and add locks at the same level, but for now we
360
      # don't need this, so we'll avoid the complicated code needed.
361
      raise NotImplementedError("Can't declare locks to acquire when adding"
362
                                " others")
363

    
364
    elif adding_locks or acquiring_locks:
365
      self._CheckLocksEnabled()
366

    
367
      lu.DeclareLocks(level)
368
      share = lu.share_locks[level]
369

    
370
      try:
371
        assert adding_locks ^ acquiring_locks, \
372
          "Locks must be either added or acquired"
373

    
374
        if acquiring_locks:
375
          # Acquiring locks
376
          needed_locks = lu.needed_locks[level]
377

    
378
          self._AcquireLocks(level, needed_locks, share,
379
                             calc_timeout(), priority)
380
        else:
381
          # Adding locks
382
          add_locks = lu.add_locks[level]
383
          lu.remove_locks[level] = add_locks
384

    
385
          try:
386
            self.context.glm.add(level, add_locks, acquired=1, shared=share)
387
          except errors.LockError:
388
            logging.exception("Detected lock error in level %s for locks"
389
                              " %s, shared=%s", level, add_locks, share)
390
            raise errors.OpPrereqError(
391
              "Couldn't add locks (%s), most likely because of another"
392
              " job who added them first" % add_locks,
393
              errors.ECODE_NOTUNIQUE)
394

    
395
        try:
396
          result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
397
        finally:
398
          if level in lu.remove_locks:
399
            self.context.glm.remove(level, lu.remove_locks[level])
400
      finally:
401
        if self.context.glm.is_owned(level):
402
          self.context.glm.release(level)
403

    
404
    else:
405
      result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
406

    
407
    return result
408

    
409
  def ExecOpCode(self, op, cbs, timeout=None, priority=None):
410
    """Execute an opcode.
411

412
    @type op: an OpCode instance
413
    @param op: the opcode to be executed
414
    @type cbs: L{OpExecCbBase}
415
    @param cbs: Runtime callbacks
416
    @type timeout: float or None
417
    @param timeout: Maximum time to acquire all locks, None for no timeout
418
    @type priority: number or None
419
    @param priority: Priority for acquiring lock(s)
420
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
421
        amount of time
422

423
    """
424
    if not isinstance(op, opcodes.OpCode):
425
      raise errors.ProgrammerError("Non-opcode instance passed"
426
                                   " to ExecOpcode (%s)" % type(op))
427

    
428
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
429
    if lu_class is None:
430
      raise errors.OpCodeUnknown("Unknown opcode")
431

    
432
    if timeout is None:
433
      calc_timeout = lambda: None
434
    else:
435
      calc_timeout = utils.RunningTimeout(timeout, False).Remaining
436

    
437
    self._cbs = cbs
438
    try:
439
      if self._enable_locks:
440
        # Acquire the Big Ganeti Lock exclusively if this LU requires it,
441
        # and in a shared fashion otherwise (to prevent concurrent run with
442
        # an exclusive LU.
443
        self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
444
                            not lu_class.REQ_BGL, calc_timeout(),
445
                            priority)
446
      elif lu_class.REQ_BGL:
447
        raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
448
                                     " disabled" % op.OP_ID)
449

    
450
      try:
451
        lu = lu_class(self, op, self.context, self.rpc)
452
        lu.ExpandNames()
453
        assert lu.needed_locks is not None, "needed_locks not set by LU"
454

    
455
        try:
456
          result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
457
                                       priority)
458
        finally:
459
          if self._ec_id:
460
            self.context.cfg.DropECReservations(self._ec_id)
461
      finally:
462
        # Release BGL if owned
463
        if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
464
          assert self._enable_locks
465
          self.context.glm.release(locking.LEVEL_CLUSTER)
466
    finally:
467
      self._cbs = None
468

    
469
    resultcheck_fn = op.OP_RESULT
470
    if not (resultcheck_fn is None or resultcheck_fn(result)):
471
      logging.error("Expected opcode result matching %s, got %s",
472
                    resultcheck_fn, result)
473
      raise errors.OpResultError("Opcode result does not match %s: %s" %
474
                                 (resultcheck_fn, utils.Truncate(result, 80)))
475

    
476
    return result
477

    
478
  def Log(self, *args):
479
    """Forward call to feedback callback function.
480

481
    """
482
    if self._cbs:
483
      self._cbs.Feedback(*args)
484

    
485
  def LogStep(self, current, total, message):
486
    """Log a change in LU execution progress.
487

488
    """
489
    logging.debug("Step %d/%d %s", current, total, message)
490
    self.Log("STEP %d/%d %s" % (current, total, message))
491

    
492
  def LogWarning(self, message, *args, **kwargs):
493
    """Log a warning to the logs and the user.
494

495
    The optional keyword argument is 'hint' and can be used to show a
496
    hint to the user (presumably related to the warning). If the
497
    message is empty, it will not be printed at all, allowing one to
498
    show only a hint.
499

500
    """
501
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
502
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
503
    if args:
504
      message = message % tuple(args)
505
    if message:
506
      logging.warning(message)
507
      self.Log(" - WARNING: %s" % message)
508
    if "hint" in kwargs:
509
      self.Log("      Hint: %s" % kwargs["hint"])
510

    
511
  def LogInfo(self, message, *args):
512
    """Log an informational message to the logs and the user.
513

514
    """
515
    if args:
516
      message = message % tuple(args)
517
    logging.info(message)
518
    self.Log(" - INFO: %s" % message)
519

    
520
  def GetECId(self):
521
    """Returns the current execution context ID.
522

523
    """
524
    if not self._ec_id:
525
      raise errors.ProgrammerError("Tried to use execution context id when"
526
                                   " not set")
527
    return self._ec_id
528

    
529

    
530
class HooksMaster(object):
531
  def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn,
532
    hooks_results_adapt_fn, build_env_fn, log_fn, htype=None, cluster_name=None,
533
    master_name=None):
534
    """Base class for hooks masters.
535

536
    This class invokes the execution of hooks according to the behaviour
537
    specified by its parameters.
538

539
    @type opcode: string
540
    @param opcode: opcode of the operation to which the hooks are tied
541
    @type hooks_path: string
542
    @param hooks_path: prefix of the hooks directories
543
    @type nodes: 2-tuple of lists
544
    @param nodes: 2-tuple of lists containing nodes on which pre-hooks must be
545
      run and nodes on which post-hooks must be run
546
    @type hooks_execution_fn: function that accepts the following parameters:
547
      (node_list, hooks_path, phase, environment)
548
    @param hooks_execution_fn: function that will execute the hooks; can be
549
      None, indicating that no conversion is necessary.
550
    @type hooks_results_adapt_fn: function
551
    @param hooks_results_adapt_fn: function that will adapt the return value of
552
      hooks_execution_fn to the format expected by RunPhase
553
    @type build_env_fn: function that returns a dictionary having strings as
554
      keys
555
    @param build_env_fn: function that builds the environment for the hooks
556
    @type log_fn: function that accepts a string
557
    @param log_fn: logging function
558
    @type htype: string or None
559
    @param htype: None or one of L{constants.HTYPE_CLUSTER},
560
     L{constants.HTYPE_NODE}, L{constants.HTYPE_INSTANCE}
561
    @type cluster_name: string
562
    @param cluster_name: name of the cluster
563
    @type master_name: string
564
    @param master_name: name of the master
565

566
    """
567
    self.opcode = opcode
568
    self.hooks_path = hooks_path
569
    self.hooks_execution_fn = hooks_execution_fn
570
    self.hooks_results_adapt_fn = hooks_results_adapt_fn
571
    self.build_env_fn = build_env_fn
572
    self.log_fn = log_fn
573
    self.htype = htype
574
    self.cluster_name = cluster_name
575
    self.master_name = master_name
576

    
577
    self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
578
    (self.pre_nodes, self.post_nodes) = nodes
579

    
580
  def _BuildEnv(self, phase):
581
    """Compute the environment and the target nodes.
582

583
    Based on the opcode and the current node list, this builds the
584
    environment for the hooks and the target node list for the run.
585

586
    """
587
    if phase == constants.HOOKS_PHASE_PRE:
588
      prefix = "GANETI_"
589
    elif phase == constants.HOOKS_PHASE_POST:
590
      prefix = "GANETI_POST_"
591
    else:
592
      raise AssertionError("Unknown phase '%s'" % phase)
593

    
594
    env = {}
595

    
596
    if self.hooks_path is not None:
597
      phase_env = self.build_env_fn()
598
      if phase_env:
599
        assert not compat.any(key.upper().startswith(prefix)
600
                              for key in phase_env)
601
        env.update(("%s%s" % (prefix, key), value)
602
                   for (key, value) in phase_env.items())
603

    
604
    if phase == constants.HOOKS_PHASE_PRE:
605
      assert compat.all((key.startswith("GANETI_") and
606
                         not key.startswith("GANETI_POST_"))
607
                        for key in env)
608

    
609
    elif phase == constants.HOOKS_PHASE_POST:
610
      assert compat.all(key.startswith("GANETI_POST_") for key in env)
611
      assert isinstance(self.pre_env, dict)
612

    
613
      # Merge with pre-phase environment
614
      assert not compat.any(key.startswith("GANETI_POST_")
615
                            for key in self.pre_env)
616
      env.update(self.pre_env)
617
    else:
618
      raise AssertionError("Unknown phase '%s'" % phase)
619

    
620
    return env
621

    
622
  def _RunWrapper(self, node_list, hpath, phase, phase_env):
623
    """Simple wrapper over self.callfn.
624

625
    This method fixes the environment before executing the hooks.
626

627
    """
628
    env = {
629
      "PATH": constants.HOOKS_PATH,
630
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
631
      "GANETI_OP_CODE": self.opcode,
632
      "GANETI_DATA_DIR": constants.DATA_DIR,
633
      "GANETI_HOOKS_PHASE": phase,
634
      "GANETI_HOOKS_PATH": hpath,
635
      }
636

    
637
    if self.htype:
638
      env["GANETI_OBJECT_TYPE"] = self.htype
639

    
640
    if self.cluster_name is not None:
641
      env["GANETI_CLUSTER"] = self.cluster_name
642

    
643
    if self.master_name is not None:
644
      env["GANETI_MASTER"] = self.master_name
645

    
646
    if phase_env:
647
      env = utils.algo.JoinDisjointDicts(env, phase_env)
648

    
649
    # Convert everything to strings
650
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
651

    
652
    assert compat.all(key == "PATH" or key.startswith("GANETI_")
653
                      for key in env)
654

    
655
    return self.hooks_execution_fn(node_list, hpath, phase, env)
656

    
657
  def RunPhase(self, phase, nodes=None):
658
    """Run all the scripts for a phase.
659

660
    This is the main function of the HookMaster.
661
    It executes self.hooks_execution_fn, and after running
662
    self.hooks_results_adapt_fn on its results it expects them to be in the form
663
    {node_name: (fail_msg, [(script, result, output), ...]}).
664

665
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
666
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
667
    @param nodes: overrides the predefined list of nodes for the given phase
668
    @return: the processed results of the hooks multi-node rpc call
669
    @raise errors.HooksFailure: on communication failure to the nodes
670
    @raise errors.HooksAbort: on failure of one of the hooks
671

672
    """
673
    if phase == constants.HOOKS_PHASE_PRE:
674
      if nodes is None:
675
        nodes = self.pre_nodes
676
      env = self.pre_env
677
    elif phase == constants.HOOKS_PHASE_POST:
678
      if nodes is None:
679
        nodes = self.post_nodes
680
      env = self._BuildEnv(phase)
681
    else:
682
      raise AssertionError("Unknown phase '%s'" % phase)
683

    
684
    if not nodes:
685
      # empty node list, we should not attempt to run this as either
686
      # we're in the cluster init phase and the rpc client part can't
687
      # even attempt to run, or this LU doesn't do hooks at all
688
      return
689

    
690
    results = self._RunWrapper(nodes, self.hooks_path, phase, env)
691
    if not results:
692
      msg = "Communication Failure"
693
      if phase == constants.HOOKS_PHASE_PRE:
694
        raise errors.HooksFailure(msg)
695
      else:
696
        self.log_fn(msg)
697
        return results
698

    
699
    converted_res = results
700
    if self.hooks_results_adapt_fn:
701
      converted_res = self.hooks_results_adapt_fn(results)
702

    
703
    errs = []
704
    for node_name, (fail_msg, offline, hooks_results) in converted_res.items():
705
      if offline:
706
        continue
707

    
708
      if fail_msg:
709
        self.log_fn("Communication failure to node %s: %s", node_name, fail_msg)
710
        continue
711

    
712
      for script, hkr, output in hooks_results:
713
        if hkr == constants.HKR_FAIL:
714
          if phase == constants.HOOKS_PHASE_PRE:
715
            errs.append((node_name, script, output))
716
          else:
717
            if not output:
718
              output = "(no output)"
719
            self.log_fn("On %s script %s failed, output: %s" %
720
                        (node_name, script, output))
721

    
722
    if errs and phase == constants.HOOKS_PHASE_PRE:
723
      raise errors.HooksAbort(errs)
724

    
725
    return results
726

    
727
  def RunConfigUpdate(self):
728
    """Run the special configuration update hook
729

730
    This is a special hook that runs only on the master after each
731
    top-level LI if the configuration has been updated.
732

733
    """
734
    phase = constants.HOOKS_PHASE_POST
735
    hpath = constants.HOOKS_NAME_CFGUPDATE
736
    nodes = [self.master_name]
737
    self._RunWrapper(nodes, hpath, phase, self.pre_env)
738

    
739
  @staticmethod
740
  def BuildFromLu(hooks_execution_fn, lu):
741
    if lu.HPATH is None:
742
      nodes = (None, None)
743
    else:
744
      nodes = map(frozenset, lu.BuildHooksNodes())
745

    
746
    master_name = cluster_name = None
747
    if lu.cfg:
748
      master_name = lu.cfg.GetMasterNode()
749
      cluster_name = lu.cfg.GetClusterName()
750

    
751
    return HooksMaster(lu.op.OP_ID, lu.HPATH, nodes, hooks_execution_fn,
752
                       _RpcResultsToHooksResults, lu.BuildHooksEnv,
753
                       lu.LogWarning, lu.HTYPE, cluster_name, master_name)