Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 80b207df

History | View | Annotate | Download (23.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import logging
32
import random
33
import time
34
import itertools
35

    
36
from ganeti import opcodes
37
from ganeti import constants
38
from ganeti import errors
39
from ganeti import cmdlib
40
from ganeti import locking
41
from ganeti import utils
42
from ganeti import compat
43

    
44

    
45
_OP_PREFIX = "Op"
46
_LU_PREFIX = "LU"
47

    
48

    
49
class LockAcquireTimeout(Exception):
50
  """Exception to report timeouts on acquiring locks.
51

52
  """
53

    
54

    
55
def _CalculateLockAttemptTimeouts():
56
  """Calculate timeouts for lock attempts.
57

58
  """
59
  result = [constants.LOCK_ATTEMPTS_MINWAIT]
60
  running_sum = result[0]
61

    
62
  # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
63
  # blocking acquire
64
  while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
65
    timeout = (result[-1] * 1.05) ** 1.25
66

    
67
    # Cap max timeout. This gives other jobs a chance to run even if
68
    # we're still trying to get our locks, before finally moving to a
69
    # blocking acquire.
70
    timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
71
    # And also cap the lower boundary for safety
72
    timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
73

    
74
    result.append(timeout)
75
    running_sum += timeout
76

    
77
  return result
78

    
79

    
80
class LockAttemptTimeoutStrategy(object):
81
  """Class with lock acquire timeout strategy.
82

83
  """
84
  __slots__ = [
85
    "_timeouts",
86
    "_random_fn",
87
    "_time_fn",
88
    ]
89

    
90
  _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
91

    
92
  def __init__(self, _time_fn=time.time, _random_fn=random.random):
93
    """Initializes this class.
94

95
    @param _time_fn: Time function for unittests
96
    @param _random_fn: Random number generator for unittests
97

98
    """
99
    object.__init__(self)
100

    
101
    self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
102
    self._time_fn = _time_fn
103
    self._random_fn = _random_fn
104

    
105
  def NextAttempt(self):
106
    """Returns the timeout for the next attempt.
107

108
    """
109
    try:
110
      timeout = self._timeouts.next()
111
    except StopIteration:
112
      # No more timeouts, do blocking acquire
113
      timeout = None
114

    
115
    if timeout is not None:
116
      # Add a small variation (-/+ 5%) to timeout. This helps in situations
117
      # where two or more jobs are fighting for the same lock(s).
118
      variation_range = timeout * 0.1
119
      timeout += ((self._random_fn() * variation_range) -
120
                  (variation_range * 0.5))
121

    
122
    return timeout
123

    
124

    
125
class OpExecCbBase: # pylint: disable=W0232
126
  """Base class for OpCode execution callbacks.
127

128
  """
129
  def NotifyStart(self):
130
    """Called when we are about to execute the LU.
131

132
    This function is called when we're about to start the lu's Exec() method,
133
    that is, after we have acquired all locks.
134

135
    """
136

    
137
  def Feedback(self, *args):
138
    """Sends feedback from the LU code to the end-user.
139

140
    """
141

    
142
  def CheckCancel(self):
143
    """Check whether job has been cancelled.
144

145
    """
146

    
147
  def SubmitManyJobs(self, jobs):
148
    """Submits jobs for processing.
149

150
    See L{jqueue.JobQueue.SubmitManyJobs}.
151

152
    """
153
    raise NotImplementedError
154

    
155

    
156
def _LUNameForOpName(opname):
157
  """Computes the LU name for a given OpCode name.
158

159
  """
160
  assert opname.startswith(_OP_PREFIX), \
161
      "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
162

    
163
  return _LU_PREFIX + opname[len(_OP_PREFIX):]
164

    
165

    
166
def _ComputeDispatchTable():
167
  """Computes the opcode-to-lu dispatch table.
168

169
  """
170
  return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
171
              for op in opcodes.OP_MAPPING.values()
172
              if op.WITH_LU)
173

    
174

    
175
def _SetBaseOpParams(src, defcomment, dst):
176
  """Copies basic opcode parameters.
177

178
  @type src: L{opcodes.OpCode}
179
  @param src: Source opcode
180
  @type defcomment: string
181
  @param defcomment: Comment to specify if not already given
182
  @type dst: L{opcodes.OpCode}
183
  @param dst: Destination opcode
184

185
  """
186
  if hasattr(src, "debug_level"):
187
    dst.debug_level = src.debug_level
188

    
189
  if (getattr(dst, "priority", None) is None and
190
      hasattr(src, "priority")):
191
    dst.priority = src.priority
192

    
193
  if not getattr(dst, opcodes.COMMENT_ATTR, None):
194
    dst.comment = defcomment
195

    
196

    
197
def _ProcessResult(submit_fn, op, result):
198
  """Examines opcode result.
199

200
  If necessary, additional processing on the result is done.
201

202
  """
203
  if isinstance(result, cmdlib.ResultWithJobs):
204
    # Copy basic parameters (e.g. priority)
205
    map(compat.partial(_SetBaseOpParams, op,
206
                       "Submitted by %s" % op.OP_ID),
207
        itertools.chain(*result.jobs))
208

    
209
    # Submit jobs
210
    job_submission = submit_fn(result.jobs)
211

    
212
    # Build dictionary
213
    result = result.other
214

    
215
    assert constants.JOB_IDS_KEY not in result, \
216
      "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
217

    
218
    result[constants.JOB_IDS_KEY] = job_submission
219

    
220
  return result
221

    
222

    
223
def _FailingSubmitManyJobs(_):
224
  """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
225

226
  """
227
  raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
228
                               " queries) can not submit jobs")
229

    
230

    
231
def _RpcResultsToHooksResults(rpc_results):
232
  """Function to convert RPC results to the format expected by HooksMaster.
233

234
  @type rpc_results: dict(node: L{rpc.RpcResult})
235
  @param rpc_results: RPC results
236
  @rtype: dict(node: (fail_msg, offline, hooks_results))
237
  @return: RPC results unpacked according to the format expected by
238
    L({mcpu.HooksMaster}
239

240
  """
241
  return dict((node, (rpc_res.fail_msg, rpc_res.offline, rpc_res.payload))
242
              for (node, rpc_res) in rpc_results.items())
243

    
244

    
245
class Processor(object):
246
  """Object which runs OpCodes"""
247
  DISPATCH_TABLE = _ComputeDispatchTable()
248

    
249
  def __init__(self, context, ec_id, enable_locks=True):
250
    """Constructor for Processor
251

252
    @type context: GanetiContext
253
    @param context: global Ganeti context
254
    @type ec_id: string
255
    @param ec_id: execution context identifier
256

257
    """
258
    self.context = context
259
    self._ec_id = ec_id
260
    self._cbs = None
261
    self.rpc = context.rpc
262
    self.hmclass = HooksMaster
263
    self._enable_locks = enable_locks
264

    
265
  def _CheckLocksEnabled(self):
266
    """Checks if locking is enabled.
267

268
    @raise errors.ProgrammerError: In case locking is not enabled
269

270
    """
271
    if not self._enable_locks:
272
      raise errors.ProgrammerError("Attempted to use disabled locks")
273

    
274
  def _AcquireLocks(self, level, names, shared, timeout, priority):
275
    """Acquires locks via the Ganeti lock manager.
276

277
    @type level: int
278
    @param level: Lock level
279
    @type names: list or string
280
    @param names: Lock names
281
    @type shared: bool
282
    @param shared: Whether the locks should be acquired in shared mode
283
    @type timeout: None or float
284
    @param timeout: Timeout for acquiring the locks
285
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
286
        amount of time
287

288
    """
289
    self._CheckLocksEnabled()
290

    
291
    if self._cbs:
292
      self._cbs.CheckCancel()
293

    
294
    acquired = self.context.glm.acquire(level, names, shared=shared,
295
                                        timeout=timeout, priority=priority)
296

    
297
    if acquired is None:
298
      raise LockAcquireTimeout()
299

    
300
    return acquired
301

    
302
  def _ExecLU(self, lu):
303
    """Logical Unit execution sequence.
304

305
    """
306
    write_count = self.context.cfg.write_count
307
    lu.CheckPrereq()
308

    
309
    hm = self.BuildHooksManager(lu)
310
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
311
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
312
                     self.Log, None)
313

    
314
    if getattr(lu.op, "dry_run", False):
315
      # in this mode, no post-hooks are run, and the config is not
316
      # written (as it might have been modified by another LU, and we
317
      # shouldn't do writeout on behalf of other threads
318
      self.LogInfo("dry-run mode requested, not actually executing"
319
                   " the operation")
320
      return lu.dry_run_result
321

    
322
    if self._cbs:
323
      submit_mj_fn = self._cbs.SubmitManyJobs
324
    else:
325
      submit_mj_fn = _FailingSubmitManyJobs
326

    
327
    try:
328
      result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
329
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
330
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
331
                                self.Log, result)
332
    finally:
333
      # FIXME: This needs locks if not lu_class.REQ_BGL
334
      if write_count != self.context.cfg.write_count:
335
        hm.RunConfigUpdate()
336

    
337
    return result
338

    
339
  def BuildHooksManager(self, lu):
340
    return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
341

    
342
  def _LockAndExecLU(self, lu, level, calc_timeout, priority):
343
    """Execute a Logical Unit, with the needed locks.
344

345
    This is a recursive function that starts locking the given level, and
346
    proceeds up, till there are no more locks to acquire. Then it executes the
347
    given LU and its opcodes.
348

349
    """
350
    adding_locks = level in lu.add_locks
351
    acquiring_locks = level in lu.needed_locks
352
    if level not in locking.LEVELS:
353
      if self._cbs:
354
        self._cbs.NotifyStart()
355

    
356
      result = self._ExecLU(lu)
357

    
358
    elif adding_locks and acquiring_locks:
359
      # We could both acquire and add locks at the same level, but for now we
360
      # don't need this, so we'll avoid the complicated code needed.
361
      raise NotImplementedError("Can't declare locks to acquire when adding"
362
                                " others")
363

    
364
    elif adding_locks or acquiring_locks:
365
      self._CheckLocksEnabled()
366

    
367
      lu.DeclareLocks(level)
368
      share = lu.share_locks[level]
369

    
370
      try:
371
        assert adding_locks ^ acquiring_locks, \
372
          "Locks must be either added or acquired"
373

    
374
        if acquiring_locks:
375
          # Acquiring locks
376
          needed_locks = lu.needed_locks[level]
377

    
378
          self._AcquireLocks(level, needed_locks, share,
379
                             calc_timeout(), priority)
380
        else:
381
          # Adding locks
382
          add_locks = lu.add_locks[level]
383
          lu.remove_locks[level] = add_locks
384

    
385
          try:
386
            self.context.glm.add(level, add_locks, acquired=1, shared=share)
387
          except errors.LockError:
388
            logging.exception("Detected lock error in level %s for locks"
389
                              " %s, shared=%s", level, add_locks, share)
390
            raise errors.OpPrereqError(
391
              "Couldn't add locks (%s), most likely because of another"
392
              " job who added them first" % add_locks,
393
              errors.ECODE_NOTUNIQUE)
394

    
395
        try:
396
          result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
397
        finally:
398
          if level in lu.remove_locks:
399
            self.context.glm.remove(level, lu.remove_locks[level])
400
      finally:
401
        if self.context.glm.is_owned(level):
402
          self.context.glm.release(level)
403

    
404
    else:
405
      result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
406

    
407
    return result
408

    
409
  def ExecOpCode(self, op, cbs, timeout=None, priority=None):
410
    """Execute an opcode.
411

412
    @type op: an OpCode instance
413
    @param op: the opcode to be executed
414
    @type cbs: L{OpExecCbBase}
415
    @param cbs: Runtime callbacks
416
    @type timeout: float or None
417
    @param timeout: Maximum time to acquire all locks, None for no timeout
418
    @type priority: number or None
419
    @param priority: Priority for acquiring lock(s)
420
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
421
        amount of time
422

423
    """
424
    if not isinstance(op, opcodes.OpCode):
425
      raise errors.ProgrammerError("Non-opcode instance passed"
426
                                   " to ExecOpcode (%s)" % type(op))
427

    
428
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
429
    if lu_class is None:
430
      raise errors.OpCodeUnknown("Unknown opcode")
431

    
432
    if timeout is None:
433
      calc_timeout = lambda: None
434
    else:
435
      calc_timeout = utils.RunningTimeout(timeout, False).Remaining
436

    
437
    self._cbs = cbs
438
    try:
439
      if self._enable_locks:
440
        # Acquire the Big Ganeti Lock exclusively if this LU requires it,
441
        # and in a shared fashion otherwise (to prevent concurrent run with
442
        # an exclusive LU.
443
        self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
444
                            not lu_class.REQ_BGL, calc_timeout(),
445
                            priority)
446
      elif lu_class.REQ_BGL:
447
        raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
448
                                     " disabled" % op.OP_ID)
449

    
450
      try:
451
        lu = lu_class(self, op, self.context, self.rpc)
452
        lu.ExpandNames()
453
        assert lu.needed_locks is not None, "needed_locks not set by LU"
454

    
455
        try:
456
          result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
457
                                       priority)
458
        finally:
459
          if self._ec_id:
460
            self.context.cfg.DropECReservations(self._ec_id)
461
      finally:
462
        # Release BGL if owned
463
        if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
464
          assert self._enable_locks
465
          self.context.glm.release(locking.LEVEL_CLUSTER)
466
    finally:
467
      self._cbs = None
468

    
469
    resultcheck_fn = op.OP_RESULT
470
    if not (resultcheck_fn is None or resultcheck_fn(result)):
471
      logging.error("Expected opcode result matching %s, got %s",
472
                    resultcheck_fn, result)
473
      if not getattr(op, "dry_run", False):
474
        # FIXME: LUs should still behave in dry_run mode, or
475
        # alternately we should have OP_DRYRUN_RESULT; in the
476
        # meantime, we simply skip the OP_RESULT check in dry-run mode
477
        raise errors.OpResultError("Opcode result does not match %s: %s" %
478
                                   (resultcheck_fn, utils.Truncate(result, 80)))
479

    
480
    return result
481

    
482
  def Log(self, *args):
483
    """Forward call to feedback callback function.
484

485
    """
486
    if self._cbs:
487
      self._cbs.Feedback(*args)
488

    
489
  def LogStep(self, current, total, message):
490
    """Log a change in LU execution progress.
491

492
    """
493
    logging.debug("Step %d/%d %s", current, total, message)
494
    self.Log("STEP %d/%d %s" % (current, total, message))
495

    
496
  def LogWarning(self, message, *args, **kwargs):
497
    """Log a warning to the logs and the user.
498

499
    The optional keyword argument is 'hint' and can be used to show a
500
    hint to the user (presumably related to the warning). If the
501
    message is empty, it will not be printed at all, allowing one to
502
    show only a hint.
503

504
    """
505
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
506
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
507
    if args:
508
      message = message % tuple(args)
509
    if message:
510
      logging.warning(message)
511
      self.Log(" - WARNING: %s" % message)
512
    if "hint" in kwargs:
513
      self.Log("      Hint: %s" % kwargs["hint"])
514

    
515
  def LogInfo(self, message, *args):
516
    """Log an informational message to the logs and the user.
517

518
    """
519
    if args:
520
      message = message % tuple(args)
521
    logging.info(message)
522
    self.Log(" - INFO: %s" % message)
523

    
524
  def GetECId(self):
525
    """Returns the current execution context ID.
526

527
    """
528
    if not self._ec_id:
529
      raise errors.ProgrammerError("Tried to use execution context id when"
530
                                   " not set")
531
    return self._ec_id
532

    
533

    
534
class HooksMaster(object):
535
  def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn,
536
    hooks_results_adapt_fn, build_env_fn, log_fn, htype=None, cluster_name=None,
537
    master_name=None):
538
    """Base class for hooks masters.
539

540
    This class invokes the execution of hooks according to the behaviour
541
    specified by its parameters.
542

543
    @type opcode: string
544
    @param opcode: opcode of the operation to which the hooks are tied
545
    @type hooks_path: string
546
    @param hooks_path: prefix of the hooks directories
547
    @type nodes: 2-tuple of lists
548
    @param nodes: 2-tuple of lists containing nodes on which pre-hooks must be
549
      run and nodes on which post-hooks must be run
550
    @type hooks_execution_fn: function that accepts the following parameters:
551
      (node_list, hooks_path, phase, environment)
552
    @param hooks_execution_fn: function that will execute the hooks; can be
553
      None, indicating that no conversion is necessary.
554
    @type hooks_results_adapt_fn: function
555
    @param hooks_results_adapt_fn: function that will adapt the return value of
556
      hooks_execution_fn to the format expected by RunPhase
557
    @type build_env_fn: function that returns a dictionary having strings as
558
      keys
559
    @param build_env_fn: function that builds the environment for the hooks
560
    @type log_fn: function that accepts a string
561
    @param log_fn: logging function
562
    @type htype: string or None
563
    @param htype: None or one of L{constants.HTYPE_CLUSTER},
564
     L{constants.HTYPE_NODE}, L{constants.HTYPE_INSTANCE}
565
    @type cluster_name: string
566
    @param cluster_name: name of the cluster
567
    @type master_name: string
568
    @param master_name: name of the master
569

570
    """
571
    self.opcode = opcode
572
    self.hooks_path = hooks_path
573
    self.hooks_execution_fn = hooks_execution_fn
574
    self.hooks_results_adapt_fn = hooks_results_adapt_fn
575
    self.build_env_fn = build_env_fn
576
    self.log_fn = log_fn
577
    self.htype = htype
578
    self.cluster_name = cluster_name
579
    self.master_name = master_name
580

    
581
    self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
582
    (self.pre_nodes, self.post_nodes) = nodes
583

    
584
  def _BuildEnv(self, phase):
585
    """Compute the environment and the target nodes.
586

587
    Based on the opcode and the current node list, this builds the
588
    environment for the hooks and the target node list for the run.
589

590
    """
591
    if phase == constants.HOOKS_PHASE_PRE:
592
      prefix = "GANETI_"
593
    elif phase == constants.HOOKS_PHASE_POST:
594
      prefix = "GANETI_POST_"
595
    else:
596
      raise AssertionError("Unknown phase '%s'" % phase)
597

    
598
    env = {}
599

    
600
    if self.hooks_path is not None:
601
      phase_env = self.build_env_fn()
602
      if phase_env:
603
        assert not compat.any(key.upper().startswith(prefix)
604
                              for key in phase_env)
605
        env.update(("%s%s" % (prefix, key), value)
606
                   for (key, value) in phase_env.items())
607

    
608
    if phase == constants.HOOKS_PHASE_PRE:
609
      assert compat.all((key.startswith("GANETI_") and
610
                         not key.startswith("GANETI_POST_"))
611
                        for key in env)
612

    
613
    elif phase == constants.HOOKS_PHASE_POST:
614
      assert compat.all(key.startswith("GANETI_POST_") for key in env)
615
      assert isinstance(self.pre_env, dict)
616

    
617
      # Merge with pre-phase environment
618
      assert not compat.any(key.startswith("GANETI_POST_")
619
                            for key in self.pre_env)
620
      env.update(self.pre_env)
621
    else:
622
      raise AssertionError("Unknown phase '%s'" % phase)
623

    
624
    return env
625

    
626
  def _RunWrapper(self, node_list, hpath, phase, phase_env):
627
    """Simple wrapper over self.callfn.
628

629
    This method fixes the environment before executing the hooks.
630

631
    """
632
    env = {
633
      "PATH": constants.HOOKS_PATH,
634
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
635
      "GANETI_OP_CODE": self.opcode,
636
      "GANETI_DATA_DIR": constants.DATA_DIR,
637
      "GANETI_HOOKS_PHASE": phase,
638
      "GANETI_HOOKS_PATH": hpath,
639
      }
640

    
641
    if self.htype:
642
      env["GANETI_OBJECT_TYPE"] = self.htype
643

    
644
    if self.cluster_name is not None:
645
      env["GANETI_CLUSTER"] = self.cluster_name
646

    
647
    if self.master_name is not None:
648
      env["GANETI_MASTER"] = self.master_name
649

    
650
    if phase_env:
651
      env = utils.algo.JoinDisjointDicts(env, phase_env)
652

    
653
    # Convert everything to strings
654
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
655

    
656
    assert compat.all(key == "PATH" or key.startswith("GANETI_")
657
                      for key in env)
658

    
659
    return self.hooks_execution_fn(node_list, hpath, phase, env)
660

    
661
  def RunPhase(self, phase, nodes=None):
662
    """Run all the scripts for a phase.
663

664
    This is the main function of the HookMaster.
665
    It executes self.hooks_execution_fn, and after running
666
    self.hooks_results_adapt_fn on its results it expects them to be in the form
667
    {node_name: (fail_msg, [(script, result, output), ...]}).
668

669
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
670
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
671
    @param nodes: overrides the predefined list of nodes for the given phase
672
    @return: the processed results of the hooks multi-node rpc call
673
    @raise errors.HooksFailure: on communication failure to the nodes
674
    @raise errors.HooksAbort: on failure of one of the hooks
675

676
    """
677
    if phase == constants.HOOKS_PHASE_PRE:
678
      if nodes is None:
679
        nodes = self.pre_nodes
680
      env = self.pre_env
681
    elif phase == constants.HOOKS_PHASE_POST:
682
      if nodes is None:
683
        nodes = self.post_nodes
684
      env = self._BuildEnv(phase)
685
    else:
686
      raise AssertionError("Unknown phase '%s'" % phase)
687

    
688
    if not nodes:
689
      # empty node list, we should not attempt to run this as either
690
      # we're in the cluster init phase and the rpc client part can't
691
      # even attempt to run, or this LU doesn't do hooks at all
692
      return
693

    
694
    results = self._RunWrapper(nodes, self.hooks_path, phase, env)
695
    if not results:
696
      msg = "Communication Failure"
697
      if phase == constants.HOOKS_PHASE_PRE:
698
        raise errors.HooksFailure(msg)
699
      else:
700
        self.log_fn(msg)
701
        return results
702

    
703
    converted_res = results
704
    if self.hooks_results_adapt_fn:
705
      converted_res = self.hooks_results_adapt_fn(results)
706

    
707
    errs = []
708
    for node_name, (fail_msg, offline, hooks_results) in converted_res.items():
709
      if offline:
710
        continue
711

    
712
      if fail_msg:
713
        self.log_fn("Communication failure to node %s: %s", node_name, fail_msg)
714
        continue
715

    
716
      for script, hkr, output in hooks_results:
717
        if hkr == constants.HKR_FAIL:
718
          if phase == constants.HOOKS_PHASE_PRE:
719
            errs.append((node_name, script, output))
720
          else:
721
            if not output:
722
              output = "(no output)"
723
            self.log_fn("On %s script %s failed, output: %s" %
724
                        (node_name, script, output))
725

    
726
    if errs and phase == constants.HOOKS_PHASE_PRE:
727
      raise errors.HooksAbort(errs)
728

    
729
    return results
730

    
731
  def RunConfigUpdate(self):
732
    """Run the special configuration update hook
733

734
    This is a special hook that runs only on the master after each
735
    top-level LI if the configuration has been updated.
736

737
    """
738
    phase = constants.HOOKS_PHASE_POST
739
    hpath = constants.HOOKS_NAME_CFGUPDATE
740
    nodes = [self.master_name]
741
    self._RunWrapper(nodes, hpath, phase, self.pre_env)
742

    
743
  @staticmethod
744
  def BuildFromLu(hooks_execution_fn, lu):
745
    if lu.HPATH is None:
746
      nodes = (None, None)
747
    else:
748
      nodes = map(frozenset, lu.BuildHooksNodes())
749

    
750
    master_name = cluster_name = None
751
    if lu.cfg:
752
      master_name = lu.cfg.GetMasterNode()
753
      cluster_name = lu.cfg.GetClusterName()
754

    
755
    return HooksMaster(lu.op.OP_ID, lu.HPATH, nodes, hooks_execution_fn,
756
                       _RpcResultsToHooksResults, lu.BuildHooksEnv,
757
                       lu.LogWarning, lu.HTYPE, cluster_name, master_name)