Fix bug in OpNetworkQuery result check
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import sys
32 import logging
33 import random
34 import time
35 import itertools
36 import traceback
37
38 from ganeti import opcodes
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import cmdlib
42 from ganeti import locking
43 from ganeti import utils
44 from ganeti import compat
45 from ganeti import pathutils
46
47
48 _OP_PREFIX = "Op"
49 _LU_PREFIX = "LU"
50
51
52 class LockAcquireTimeout(Exception):
53   """Exception to report timeouts on acquiring locks.
54
55   """
56
57
58 def _CalculateLockAttemptTimeouts():
59   """Calculate timeouts for lock attempts.
60
61   """
62   result = [constants.LOCK_ATTEMPTS_MINWAIT]
63   running_sum = result[0]
64
65   # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
66   # blocking acquire
67   while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
68     timeout = (result[-1] * 1.05) ** 1.25
69
70     # Cap max timeout. This gives other jobs a chance to run even if
71     # we're still trying to get our locks, before finally moving to a
72     # blocking acquire.
73     timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
74     # And also cap the lower boundary for safety
75     timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
76
77     result.append(timeout)
78     running_sum += timeout
79
80   return result
81
82
83 class LockAttemptTimeoutStrategy(object):
84   """Class with lock acquire timeout strategy.
85
86   """
87   __slots__ = [
88     "_timeouts",
89     "_random_fn",
90     "_time_fn",
91     ]
92
93   _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
94
95   def __init__(self, _time_fn=time.time, _random_fn=random.random):
96     """Initializes this class.
97
98     @param _time_fn: Time function for unittests
99     @param _random_fn: Random number generator for unittests
100
101     """
102     object.__init__(self)
103
104     self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
105     self._time_fn = _time_fn
106     self._random_fn = _random_fn
107
108   def NextAttempt(self):
109     """Returns the timeout for the next attempt.
110
111     """
112     try:
113       timeout = self._timeouts.next()
114     except StopIteration:
115       # No more timeouts, do blocking acquire
116       timeout = None
117
118     if timeout is not None:
119       # Add a small variation (-/+ 5%) to timeout. This helps in situations
120       # where two or more jobs are fighting for the same lock(s).
121       variation_range = timeout * 0.1
122       timeout += ((self._random_fn() * variation_range) -
123                   (variation_range * 0.5))
124
125     return timeout
126
127
128 class OpExecCbBase: # pylint: disable=W0232
129   """Base class for OpCode execution callbacks.
130
131   """
132   def NotifyStart(self):
133     """Called when we are about to execute the LU.
134
135     This function is called when we're about to start the lu's Exec() method,
136     that is, after we have acquired all locks.
137
138     """
139
140   def Feedback(self, *args):
141     """Sends feedback from the LU code to the end-user.
142
143     """
144
145   def CurrentPriority(self): # pylint: disable=R0201
146     """Returns current priority or C{None}.
147
148     """
149     return None
150
151   def SubmitManyJobs(self, jobs):
152     """Submits jobs for processing.
153
154     See L{jqueue.JobQueue.SubmitManyJobs}.
155
156     """
157     raise NotImplementedError
158
159
160 def _LUNameForOpName(opname):
161   """Computes the LU name for a given OpCode name.
162
163   """
164   assert opname.startswith(_OP_PREFIX), \
165       "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
166
167   return _LU_PREFIX + opname[len(_OP_PREFIX):]
168
169
170 def _ComputeDispatchTable():
171   """Computes the opcode-to-lu dispatch table.
172
173   """
174   return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
175               for op in opcodes.OP_MAPPING.values()
176               if op.WITH_LU)
177
178
179 def _SetBaseOpParams(src, defcomment, dst):
180   """Copies basic opcode parameters.
181
182   @type src: L{opcodes.OpCode}
183   @param src: Source opcode
184   @type defcomment: string
185   @param defcomment: Comment to specify if not already given
186   @type dst: L{opcodes.OpCode}
187   @param dst: Destination opcode
188
189   """
190   if hasattr(src, "debug_level"):
191     dst.debug_level = src.debug_level
192
193   if (getattr(dst, "priority", None) is None and
194       hasattr(src, "priority")):
195     dst.priority = src.priority
196
197   if not getattr(dst, opcodes.COMMENT_ATTR, None):
198     dst.comment = defcomment
199
200
201 def _ProcessResult(submit_fn, op, result):
202   """Examines opcode result.
203
204   If necessary, additional processing on the result is done.
205
206   """
207   if isinstance(result, cmdlib.ResultWithJobs):
208     # Copy basic parameters (e.g. priority)
209     map(compat.partial(_SetBaseOpParams, op,
210                        "Submitted by %s" % op.OP_ID),
211         itertools.chain(*result.jobs))
212
213     # Submit jobs
214     job_submission = submit_fn(result.jobs)
215
216     # Build dictionary
217     result = result.other
218
219     assert constants.JOB_IDS_KEY not in result, \
220       "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
221
222     result[constants.JOB_IDS_KEY] = job_submission
223
224   return result
225
226
227 def _FailingSubmitManyJobs(_):
228   """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
229
230   """
231   raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
232                                " queries) can not submit jobs")
233
234
235 def _RpcResultsToHooksResults(rpc_results):
236   """Function to convert RPC results to the format expected by HooksMaster.
237
238   @type rpc_results: dict(node: L{rpc.RpcResult})
239   @param rpc_results: RPC results
240   @rtype: dict(node: (fail_msg, offline, hooks_results))
241   @return: RPC results unpacked according to the format expected by
242     L({mcpu.HooksMaster}
243
244   """
245   return dict((node, (rpc_res.fail_msg, rpc_res.offline, rpc_res.payload))
246               for (node, rpc_res) in rpc_results.items())
247
248
249 class Processor(object):
250   """Object which runs OpCodes"""
251   DISPATCH_TABLE = _ComputeDispatchTable()
252
253   def __init__(self, context, ec_id, enable_locks=True):
254     """Constructor for Processor
255
256     @type context: GanetiContext
257     @param context: global Ganeti context
258     @type ec_id: string
259     @param ec_id: execution context identifier
260
261     """
262     self.context = context
263     self._ec_id = ec_id
264     self._cbs = None
265     self.rpc = context.rpc
266     self.hmclass = HooksMaster
267     self._enable_locks = enable_locks
268
269   def _CheckLocksEnabled(self):
270     """Checks if locking is enabled.
271
272     @raise errors.ProgrammerError: In case locking is not enabled
273
274     """
275     if not self._enable_locks:
276       raise errors.ProgrammerError("Attempted to use disabled locks")
277
278   def _AcquireLocks(self, level, names, shared, timeout):
279     """Acquires locks via the Ganeti lock manager.
280
281     @type level: int
282     @param level: Lock level
283     @type names: list or string
284     @param names: Lock names
285     @type shared: bool
286     @param shared: Whether the locks should be acquired in shared mode
287     @type timeout: None or float
288     @param timeout: Timeout for acquiring the locks
289     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
290         amount of time
291
292     """
293     self._CheckLocksEnabled()
294
295     if self._cbs:
296       priority = self._cbs.CurrentPriority()
297     else:
298       priority = None
299
300     acquired = self.context.glm.acquire(level, names, shared=shared,
301                                         timeout=timeout, priority=priority)
302
303     if acquired is None:
304       raise LockAcquireTimeout()
305
306     return acquired
307
308   def _ExecLU(self, lu):
309     """Logical Unit execution sequence.
310
311     """
312     write_count = self.context.cfg.write_count
313     lu.CheckPrereq()
314
315     hm = self.BuildHooksManager(lu)
316     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
317     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
318                      self.Log, None)
319
320     if getattr(lu.op, "dry_run", False):
321       # in this mode, no post-hooks are run, and the config is not
322       # written (as it might have been modified by another LU, and we
323       # shouldn't do writeout on behalf of other threads
324       self.LogInfo("dry-run mode requested, not actually executing"
325                    " the operation")
326       return lu.dry_run_result
327
328     if self._cbs:
329       submit_mj_fn = self._cbs.SubmitManyJobs
330     else:
331       submit_mj_fn = _FailingSubmitManyJobs
332
333     try:
334       result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
335       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
336       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
337                                 self.Log, result)
338     finally:
339       # FIXME: This needs locks if not lu_class.REQ_BGL
340       if write_count != self.context.cfg.write_count:
341         hm.RunConfigUpdate()
342
343     return result
344
345   def BuildHooksManager(self, lu):
346     return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
347
348   def _LockAndExecLU(self, lu, level, calc_timeout):
349     """Execute a Logical Unit, with the needed locks.
350
351     This is a recursive function that starts locking the given level, and
352     proceeds up, till there are no more locks to acquire. Then it executes the
353     given LU and its opcodes.
354
355     """
356     adding_locks = level in lu.add_locks
357     acquiring_locks = level in lu.needed_locks
358     if level not in locking.LEVELS:
359       if self._cbs:
360         self._cbs.NotifyStart()
361
362       try:
363         result = self._ExecLU(lu)
364       except AssertionError, err:
365         # this is a bit ugly, as we don't know from which phase
366         # (prereq, exec) this comes; but it's better than an exception
367         # with no information
368         (_, _, tb) = sys.exc_info()
369         err_info = traceback.format_tb(tb)
370         del tb
371         logging.exception("Detected AssertionError")
372         raise errors.OpExecError("Internal assertion error: please report"
373                                  " this as a bug.\nError message: '%s';"
374                                  " location:\n%s" % (str(err), err_info[-1]))
375
376     elif adding_locks and acquiring_locks:
377       # We could both acquire and add locks at the same level, but for now we
378       # don't need this, so we'll avoid the complicated code needed.
379       raise NotImplementedError("Can't declare locks to acquire when adding"
380                                 " others")
381
382     elif adding_locks or acquiring_locks:
383       self._CheckLocksEnabled()
384
385       lu.DeclareLocks(level)
386       share = lu.share_locks[level]
387
388       try:
389         assert adding_locks ^ acquiring_locks, \
390           "Locks must be either added or acquired"
391
392         if acquiring_locks:
393           # Acquiring locks
394           needed_locks = lu.needed_locks[level]
395
396           self._AcquireLocks(level, needed_locks, share,
397                              calc_timeout())
398         else:
399           # Adding locks
400           add_locks = lu.add_locks[level]
401           lu.remove_locks[level] = add_locks
402
403           try:
404             self.context.glm.add(level, add_locks, acquired=1, shared=share)
405           except errors.LockError:
406             logging.exception("Detected lock error in level %s for locks"
407                               " %s, shared=%s", level, add_locks, share)
408             raise errors.OpPrereqError(
409               "Couldn't add locks (%s), most likely because of another"
410               " job who added them first" % add_locks,
411               errors.ECODE_NOTUNIQUE)
412
413         try:
414           result = self._LockAndExecLU(lu, level + 1, calc_timeout)
415         finally:
416           if level in lu.remove_locks:
417             self.context.glm.remove(level, lu.remove_locks[level])
418       finally:
419         if self.context.glm.is_owned(level):
420           self.context.glm.release(level)
421
422     else:
423       result = self._LockAndExecLU(lu, level + 1, calc_timeout)
424
425     return result
426
427   def ExecOpCode(self, op, cbs, timeout=None):
428     """Execute an opcode.
429
430     @type op: an OpCode instance
431     @param op: the opcode to be executed
432     @type cbs: L{OpExecCbBase}
433     @param cbs: Runtime callbacks
434     @type timeout: float or None
435     @param timeout: Maximum time to acquire all locks, None for no timeout
436     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
437         amount of time
438
439     """
440     if not isinstance(op, opcodes.OpCode):
441       raise errors.ProgrammerError("Non-opcode instance passed"
442                                    " to ExecOpcode (%s)" % type(op))
443
444     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
445     if lu_class is None:
446       raise errors.OpCodeUnknown("Unknown opcode")
447
448     if timeout is None:
449       calc_timeout = lambda: None
450     else:
451       calc_timeout = utils.RunningTimeout(timeout, False).Remaining
452
453     self._cbs = cbs
454     try:
455       if self._enable_locks:
456         # Acquire the Big Ganeti Lock exclusively if this LU requires it,
457         # and in a shared fashion otherwise (to prevent concurrent run with
458         # an exclusive LU.
459         self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
460                             not lu_class.REQ_BGL, calc_timeout())
461       elif lu_class.REQ_BGL:
462         raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
463                                      " disabled" % op.OP_ID)
464
465       try:
466         lu = lu_class(self, op, self.context, self.rpc)
467         lu.ExpandNames()
468         assert lu.needed_locks is not None, "needed_locks not set by LU"
469
470         try:
471           result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout)
472         finally:
473           if self._ec_id:
474             self.context.cfg.DropECReservations(self._ec_id)
475       finally:
476         # Release BGL if owned
477         if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
478           assert self._enable_locks
479           self.context.glm.release(locking.LEVEL_CLUSTER)
480     finally:
481       self._cbs = None
482
483     resultcheck_fn = op.OP_RESULT
484     if not (resultcheck_fn is None or resultcheck_fn(result)):
485       logging.error("Expected opcode result matching %s, got %s",
486                     resultcheck_fn, result)
487       raise errors.OpResultError("Opcode result does not match %s: %s" %
488                                  (resultcheck_fn, utils.Truncate(result, 80)))
489
490     return result
491
492   def Log(self, *args):
493     """Forward call to feedback callback function.
494
495     """
496     if self._cbs:
497       self._cbs.Feedback(*args)
498
499   def LogStep(self, current, total, message):
500     """Log a change in LU execution progress.
501
502     """
503     logging.debug("Step %d/%d %s", current, total, message)
504     self.Log("STEP %d/%d %s" % (current, total, message))
505
506   def LogWarning(self, message, *args, **kwargs):
507     """Log a warning to the logs and the user.
508
509     The optional keyword argument is 'hint' and can be used to show a
510     hint to the user (presumably related to the warning). If the
511     message is empty, it will not be printed at all, allowing one to
512     show only a hint.
513
514     """
515     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
516            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
517     if args:
518       message = message % tuple(args)
519     if message:
520       logging.warning(message)
521       self.Log(" - WARNING: %s" % message)
522     if "hint" in kwargs:
523       self.Log("      Hint: %s" % kwargs["hint"])
524
525   def LogInfo(self, message, *args):
526     """Log an informational message to the logs and the user.
527
528     """
529     if args:
530       message = message % tuple(args)
531     logging.info(message)
532     self.Log(" - INFO: %s" % message)
533
534   def GetECId(self):
535     """Returns the current execution context ID.
536
537     """
538     if not self._ec_id:
539       raise errors.ProgrammerError("Tried to use execution context id when"
540                                    " not set")
541     return self._ec_id
542
543
544 class HooksMaster(object):
545   def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn,
546                hooks_results_adapt_fn, build_env_fn, log_fn, htype=None,
547                cluster_name=None, master_name=None):
548     """Base class for hooks masters.
549
550     This class invokes the execution of hooks according to the behaviour
551     specified by its parameters.
552
553     @type opcode: string
554     @param opcode: opcode of the operation to which the hooks are tied
555     @type hooks_path: string
556     @param hooks_path: prefix of the hooks directories
557     @type nodes: 2-tuple of lists
558     @param nodes: 2-tuple of lists containing nodes on which pre-hooks must be
559       run and nodes on which post-hooks must be run
560     @type hooks_execution_fn: function that accepts the following parameters:
561       (node_list, hooks_path, phase, environment)
562     @param hooks_execution_fn: function that will execute the hooks; can be
563       None, indicating that no conversion is necessary.
564     @type hooks_results_adapt_fn: function
565     @param hooks_results_adapt_fn: function that will adapt the return value of
566       hooks_execution_fn to the format expected by RunPhase
567     @type build_env_fn: function that returns a dictionary having strings as
568       keys
569     @param build_env_fn: function that builds the environment for the hooks
570     @type log_fn: function that accepts a string
571     @param log_fn: logging function
572     @type htype: string or None
573     @param htype: None or one of L{constants.HTYPE_CLUSTER},
574      L{constants.HTYPE_NODE}, L{constants.HTYPE_INSTANCE}
575     @type cluster_name: string
576     @param cluster_name: name of the cluster
577     @type master_name: string
578     @param master_name: name of the master
579
580     """
581     self.opcode = opcode
582     self.hooks_path = hooks_path
583     self.hooks_execution_fn = hooks_execution_fn
584     self.hooks_results_adapt_fn = hooks_results_adapt_fn
585     self.build_env_fn = build_env_fn
586     self.log_fn = log_fn
587     self.htype = htype
588     self.cluster_name = cluster_name
589     self.master_name = master_name
590
591     self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
592     (self.pre_nodes, self.post_nodes) = nodes
593
594   def _BuildEnv(self, phase):
595     """Compute the environment and the target nodes.
596
597     Based on the opcode and the current node list, this builds the
598     environment for the hooks and the target node list for the run.
599
600     """
601     if phase == constants.HOOKS_PHASE_PRE:
602       prefix = "GANETI_"
603     elif phase == constants.HOOKS_PHASE_POST:
604       prefix = "GANETI_POST_"
605     else:
606       raise AssertionError("Unknown phase '%s'" % phase)
607
608     env = {}
609
610     if self.hooks_path is not None:
611       phase_env = self.build_env_fn()
612       if phase_env:
613         assert not compat.any(key.upper().startswith(prefix)
614                               for key in phase_env)
615         env.update(("%s%s" % (prefix, key), value)
616                    for (key, value) in phase_env.items())
617
618     if phase == constants.HOOKS_PHASE_PRE:
619       assert compat.all((key.startswith("GANETI_") and
620                          not key.startswith("GANETI_POST_"))
621                         for key in env)
622
623     elif phase == constants.HOOKS_PHASE_POST:
624       assert compat.all(key.startswith("GANETI_POST_") for key in env)
625       assert isinstance(self.pre_env, dict)
626
627       # Merge with pre-phase environment
628       assert not compat.any(key.startswith("GANETI_POST_")
629                             for key in self.pre_env)
630       env.update(self.pre_env)
631     else:
632       raise AssertionError("Unknown phase '%s'" % phase)
633
634     return env
635
636   def _RunWrapper(self, node_list, hpath, phase, phase_env):
637     """Simple wrapper over self.callfn.
638
639     This method fixes the environment before executing the hooks.
640
641     """
642     env = {
643       "PATH": constants.HOOKS_PATH,
644       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
645       "GANETI_OP_CODE": self.opcode,
646       "GANETI_DATA_DIR": pathutils.DATA_DIR,
647       "GANETI_HOOKS_PHASE": phase,
648       "GANETI_HOOKS_PATH": hpath,
649       }
650
651     if self.htype:
652       env["GANETI_OBJECT_TYPE"] = self.htype
653
654     if self.cluster_name is not None:
655       env["GANETI_CLUSTER"] = self.cluster_name
656
657     if self.master_name is not None:
658       env["GANETI_MASTER"] = self.master_name
659
660     if phase_env:
661       env = utils.algo.JoinDisjointDicts(env, phase_env)
662
663     # Convert everything to strings
664     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
665
666     assert compat.all(key == "PATH" or key.startswith("GANETI_")
667                       for key in env)
668
669     return self.hooks_execution_fn(node_list, hpath, phase, env)
670
671   def RunPhase(self, phase, nodes=None):
672     """Run all the scripts for a phase.
673
674     This is the main function of the HookMaster.
675     It executes self.hooks_execution_fn, and after running
676     self.hooks_results_adapt_fn on its results it expects them to be in the form
677     {node_name: (fail_msg, [(script, result, output), ...]}).
678
679     @param phase: one of L{constants.HOOKS_PHASE_POST} or
680         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
681     @param nodes: overrides the predefined list of nodes for the given phase
682     @return: the processed results of the hooks multi-node rpc call
683     @raise errors.HooksFailure: on communication failure to the nodes
684     @raise errors.HooksAbort: on failure of one of the hooks
685
686     """
687     if phase == constants.HOOKS_PHASE_PRE:
688       if nodes is None:
689         nodes = self.pre_nodes
690       env = self.pre_env
691     elif phase == constants.HOOKS_PHASE_POST:
692       if nodes is None:
693         nodes = self.post_nodes
694       env = self._BuildEnv(phase)
695     else:
696       raise AssertionError("Unknown phase '%s'" % phase)
697
698     if not nodes:
699       # empty node list, we should not attempt to run this as either
700       # we're in the cluster init phase and the rpc client part can't
701       # even attempt to run, or this LU doesn't do hooks at all
702       return
703
704     results = self._RunWrapper(nodes, self.hooks_path, phase, env)
705     if not results:
706       msg = "Communication Failure"
707       if phase == constants.HOOKS_PHASE_PRE:
708         raise errors.HooksFailure(msg)
709       else:
710         self.log_fn(msg)
711         return results
712
713     converted_res = results
714     if self.hooks_results_adapt_fn:
715       converted_res = self.hooks_results_adapt_fn(results)
716
717     errs = []
718     for node_name, (fail_msg, offline, hooks_results) in converted_res.items():
719       if offline:
720         continue
721
722       if fail_msg:
723         self.log_fn("Communication failure to node %s: %s", node_name, fail_msg)
724         continue
725
726       for script, hkr, output in hooks_results:
727         if hkr == constants.HKR_FAIL:
728           if phase == constants.HOOKS_PHASE_PRE:
729             errs.append((node_name, script, output))
730           else:
731             if not output:
732               output = "(no output)"
733             self.log_fn("On %s script %s failed, output: %s" %
734                         (node_name, script, output))
735
736     if errs and phase == constants.HOOKS_PHASE_PRE:
737       raise errors.HooksAbort(errs)
738
739     return results
740
741   def RunConfigUpdate(self):
742     """Run the special configuration update hook
743
744     This is a special hook that runs only on the master after each
745     top-level LI if the configuration has been updated.
746
747     """
748     phase = constants.HOOKS_PHASE_POST
749     hpath = constants.HOOKS_NAME_CFGUPDATE
750     nodes = [self.master_name]
751     self._RunWrapper(nodes, hpath, phase, self.pre_env)
752
753   @staticmethod
754   def BuildFromLu(hooks_execution_fn, lu):
755     if lu.HPATH is None:
756       nodes = (None, None)
757     else:
758       nodes = map(frozenset, lu.BuildHooksNodes())
759
760     master_name = cluster_name = None
761     if lu.cfg:
762       master_name = lu.cfg.GetMasterNode()
763       cluster_name = lu.cfg.GetClusterName()
764
765     return HooksMaster(lu.op.OP_ID, lu.HPATH, nodes, hooks_execution_fn,
766                        _RpcResultsToHooksResults, lu.BuildHooksEnv,
767                        lu.LogWarning, lu.HTYPE, cluster_name, master_name)