errors: Document arguments to QueryFilterParseError
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32 import random
33 import time
34 import itertools
35
36 from ganeti import opcodes
37 from ganeti import constants
38 from ganeti import errors
39 from ganeti import cmdlib
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import compat
43 from ganeti import pathutils
44
45
46 _OP_PREFIX = "Op"
47 _LU_PREFIX = "LU"
48
49
50 class LockAcquireTimeout(Exception):
51   """Exception to report timeouts on acquiring locks.
52
53   """
54
55
56 def _CalculateLockAttemptTimeouts():
57   """Calculate timeouts for lock attempts.
58
59   """
60   result = [constants.LOCK_ATTEMPTS_MINWAIT]
61   running_sum = result[0]
62
63   # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
64   # blocking acquire
65   while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
66     timeout = (result[-1] * 1.05) ** 1.25
67
68     # Cap max timeout. This gives other jobs a chance to run even if
69     # we're still trying to get our locks, before finally moving to a
70     # blocking acquire.
71     timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
72     # And also cap the lower boundary for safety
73     timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
74
75     result.append(timeout)
76     running_sum += timeout
77
78   return result
79
80
81 class LockAttemptTimeoutStrategy(object):
82   """Class with lock acquire timeout strategy.
83
84   """
85   __slots__ = [
86     "_timeouts",
87     "_random_fn",
88     "_time_fn",
89     ]
90
91   _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
92
93   def __init__(self, _time_fn=time.time, _random_fn=random.random):
94     """Initializes this class.
95
96     @param _time_fn: Time function for unittests
97     @param _random_fn: Random number generator for unittests
98
99     """
100     object.__init__(self)
101
102     self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
103     self._time_fn = _time_fn
104     self._random_fn = _random_fn
105
106   def NextAttempt(self):
107     """Returns the timeout for the next attempt.
108
109     """
110     try:
111       timeout = self._timeouts.next()
112     except StopIteration:
113       # No more timeouts, do blocking acquire
114       timeout = None
115
116     if timeout is not None:
117       # Add a small variation (-/+ 5%) to timeout. This helps in situations
118       # where two or more jobs are fighting for the same lock(s).
119       variation_range = timeout * 0.1
120       timeout += ((self._random_fn() * variation_range) -
121                   (variation_range * 0.5))
122
123     return timeout
124
125
126 class OpExecCbBase: # pylint: disable=W0232
127   """Base class for OpCode execution callbacks.
128
129   """
130   def NotifyStart(self):
131     """Called when we are about to execute the LU.
132
133     This function is called when we're about to start the lu's Exec() method,
134     that is, after we have acquired all locks.
135
136     """
137
138   def Feedback(self, *args):
139     """Sends feedback from the LU code to the end-user.
140
141     """
142
143   def CheckCancel(self):
144     """Check whether job has been cancelled.
145
146     """
147
148   def SubmitManyJobs(self, jobs):
149     """Submits jobs for processing.
150
151     See L{jqueue.JobQueue.SubmitManyJobs}.
152
153     """
154     raise NotImplementedError
155
156
157 def _LUNameForOpName(opname):
158   """Computes the LU name for a given OpCode name.
159
160   """
161   assert opname.startswith(_OP_PREFIX), \
162       "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
163
164   return _LU_PREFIX + opname[len(_OP_PREFIX):]
165
166
167 def _ComputeDispatchTable():
168   """Computes the opcode-to-lu dispatch table.
169
170   """
171   return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
172               for op in opcodes.OP_MAPPING.values()
173               if op.WITH_LU)
174
175
176 def _SetBaseOpParams(src, defcomment, dst):
177   """Copies basic opcode parameters.
178
179   @type src: L{opcodes.OpCode}
180   @param src: Source opcode
181   @type defcomment: string
182   @param defcomment: Comment to specify if not already given
183   @type dst: L{opcodes.OpCode}
184   @param dst: Destination opcode
185
186   """
187   if hasattr(src, "debug_level"):
188     dst.debug_level = src.debug_level
189
190   if (getattr(dst, "priority", None) is None and
191       hasattr(src, "priority")):
192     dst.priority = src.priority
193
194   if not getattr(dst, opcodes.COMMENT_ATTR, None):
195     dst.comment = defcomment
196
197
198 def _ProcessResult(submit_fn, op, result):
199   """Examines opcode result.
200
201   If necessary, additional processing on the result is done.
202
203   """
204   if isinstance(result, cmdlib.ResultWithJobs):
205     # Copy basic parameters (e.g. priority)
206     map(compat.partial(_SetBaseOpParams, op,
207                        "Submitted by %s" % op.OP_ID),
208         itertools.chain(*result.jobs))
209
210     # Submit jobs
211     job_submission = submit_fn(result.jobs)
212
213     # Build dictionary
214     result = result.other
215
216     assert constants.JOB_IDS_KEY not in result, \
217       "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
218
219     result[constants.JOB_IDS_KEY] = job_submission
220
221   return result
222
223
224 def _FailingSubmitManyJobs(_):
225   """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
226
227   """
228   raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
229                                " queries) can not submit jobs")
230
231
232 def _RpcResultsToHooksResults(rpc_results):
233   """Function to convert RPC results to the format expected by HooksMaster.
234
235   @type rpc_results: dict(node: L{rpc.RpcResult})
236   @param rpc_results: RPC results
237   @rtype: dict(node: (fail_msg, offline, hooks_results))
238   @return: RPC results unpacked according to the format expected by
239     L({mcpu.HooksMaster}
240
241   """
242   return dict((node, (rpc_res.fail_msg, rpc_res.offline, rpc_res.payload))
243               for (node, rpc_res) in rpc_results.items())
244
245
246 class Processor(object):
247   """Object which runs OpCodes"""
248   DISPATCH_TABLE = _ComputeDispatchTable()
249
250   def __init__(self, context, ec_id, enable_locks=True):
251     """Constructor for Processor
252
253     @type context: GanetiContext
254     @param context: global Ganeti context
255     @type ec_id: string
256     @param ec_id: execution context identifier
257
258     """
259     self.context = context
260     self._ec_id = ec_id
261     self._cbs = None
262     self.rpc = context.rpc
263     self.hmclass = HooksMaster
264     self._enable_locks = enable_locks
265
266   def _CheckLocksEnabled(self):
267     """Checks if locking is enabled.
268
269     @raise errors.ProgrammerError: In case locking is not enabled
270
271     """
272     if not self._enable_locks:
273       raise errors.ProgrammerError("Attempted to use disabled locks")
274
275   def _AcquireLocks(self, level, names, shared, timeout, priority):
276     """Acquires locks via the Ganeti lock manager.
277
278     @type level: int
279     @param level: Lock level
280     @type names: list or string
281     @param names: Lock names
282     @type shared: bool
283     @param shared: Whether the locks should be acquired in shared mode
284     @type timeout: None or float
285     @param timeout: Timeout for acquiring the locks
286     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
287         amount of time
288
289     """
290     self._CheckLocksEnabled()
291
292     if self._cbs:
293       self._cbs.CheckCancel()
294
295     acquired = self.context.glm.acquire(level, names, shared=shared,
296                                         timeout=timeout, priority=priority)
297
298     if acquired is None:
299       raise LockAcquireTimeout()
300
301     return acquired
302
303   def _ExecLU(self, lu):
304     """Logical Unit execution sequence.
305
306     """
307     write_count = self.context.cfg.write_count
308     lu.CheckPrereq()
309
310     hm = self.BuildHooksManager(lu)
311     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
312     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
313                      self.Log, None)
314
315     if getattr(lu.op, "dry_run", False):
316       # in this mode, no post-hooks are run, and the config is not
317       # written (as it might have been modified by another LU, and we
318       # shouldn't do writeout on behalf of other threads
319       self.LogInfo("dry-run mode requested, not actually executing"
320                    " the operation")
321       return lu.dry_run_result
322
323     if self._cbs:
324       submit_mj_fn = self._cbs.SubmitManyJobs
325     else:
326       submit_mj_fn = _FailingSubmitManyJobs
327
328     try:
329       result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
330       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
331       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
332                                 self.Log, result)
333     finally:
334       # FIXME: This needs locks if not lu_class.REQ_BGL
335       if write_count != self.context.cfg.write_count:
336         hm.RunConfigUpdate()
337
338     return result
339
340   def BuildHooksManager(self, lu):
341     return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
342
343   def _LockAndExecLU(self, lu, level, calc_timeout, priority):
344     """Execute a Logical Unit, with the needed locks.
345
346     This is a recursive function that starts locking the given level, and
347     proceeds up, till there are no more locks to acquire. Then it executes the
348     given LU and its opcodes.
349
350     """
351     adding_locks = level in lu.add_locks
352     acquiring_locks = level in lu.needed_locks
353     if level not in locking.LEVELS:
354       if self._cbs:
355         self._cbs.NotifyStart()
356
357       result = self._ExecLU(lu)
358
359     elif adding_locks and acquiring_locks:
360       # We could both acquire and add locks at the same level, but for now we
361       # don't need this, so we'll avoid the complicated code needed.
362       raise NotImplementedError("Can't declare locks to acquire when adding"
363                                 " others")
364
365     elif adding_locks or acquiring_locks:
366       self._CheckLocksEnabled()
367
368       lu.DeclareLocks(level)
369       share = lu.share_locks[level]
370
371       try:
372         assert adding_locks ^ acquiring_locks, \
373           "Locks must be either added or acquired"
374
375         if acquiring_locks:
376           # Acquiring locks
377           needed_locks = lu.needed_locks[level]
378
379           self._AcquireLocks(level, needed_locks, share,
380                              calc_timeout(), priority)
381         else:
382           # Adding locks
383           add_locks = lu.add_locks[level]
384           lu.remove_locks[level] = add_locks
385
386           try:
387             self.context.glm.add(level, add_locks, acquired=1, shared=share)
388           except errors.LockError:
389             raise errors.OpPrereqError(
390               "Couldn't add locks (%s), probably because of a race condition"
391               " with another job, who added them first" % add_locks,
392               errors.ECODE_FAULT)
393
394         try:
395           result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
396         finally:
397           if level in lu.remove_locks:
398             self.context.glm.remove(level, lu.remove_locks[level])
399       finally:
400         if self.context.glm.is_owned(level):
401           self.context.glm.release(level)
402
403     else:
404       result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
405
406     return result
407
408   def ExecOpCode(self, op, cbs, timeout=None, priority=None):
409     """Execute an opcode.
410
411     @type op: an OpCode instance
412     @param op: the opcode to be executed
413     @type cbs: L{OpExecCbBase}
414     @param cbs: Runtime callbacks
415     @type timeout: float or None
416     @param timeout: Maximum time to acquire all locks, None for no timeout
417     @type priority: number or None
418     @param priority: Priority for acquiring lock(s)
419     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
420         amount of time
421
422     """
423     if not isinstance(op, opcodes.OpCode):
424       raise errors.ProgrammerError("Non-opcode instance passed"
425                                    " to ExecOpcode (%s)" % type(op))
426
427     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
428     if lu_class is None:
429       raise errors.OpCodeUnknown("Unknown opcode")
430
431     if timeout is None:
432       calc_timeout = lambda: None
433     else:
434       calc_timeout = utils.RunningTimeout(timeout, False).Remaining
435
436     self._cbs = cbs
437     try:
438       if self._enable_locks:
439         # Acquire the Big Ganeti Lock exclusively if this LU requires it,
440         # and in a shared fashion otherwise (to prevent concurrent run with
441         # an exclusive LU.
442         self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
443                             not lu_class.REQ_BGL, calc_timeout(),
444                             priority)
445       elif lu_class.REQ_BGL:
446         raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
447                                      " disabled" % op.OP_ID)
448
449       try:
450         lu = lu_class(self, op, self.context, self.rpc)
451         lu.ExpandNames()
452         assert lu.needed_locks is not None, "needed_locks not set by LU"
453
454         try:
455           result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
456                                        priority)
457         finally:
458           if self._ec_id:
459             self.context.cfg.DropECReservations(self._ec_id)
460       finally:
461         # Release BGL if owned
462         if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
463           assert self._enable_locks
464           self.context.glm.release(locking.LEVEL_CLUSTER)
465     finally:
466       self._cbs = None
467
468     resultcheck_fn = op.OP_RESULT
469     if not (resultcheck_fn is None or resultcheck_fn(result)):
470       logging.error("Expected opcode result matching %s, got %s",
471                     resultcheck_fn, result)
472       raise errors.OpResultError("Opcode result does not match %s: %s" %
473                                  (resultcheck_fn, utils.Truncate(result, 80)))
474
475     return result
476
477   def Log(self, *args):
478     """Forward call to feedback callback function.
479
480     """
481     if self._cbs:
482       self._cbs.Feedback(*args)
483
484   def LogStep(self, current, total, message):
485     """Log a change in LU execution progress.
486
487     """
488     logging.debug("Step %d/%d %s", current, total, message)
489     self.Log("STEP %d/%d %s" % (current, total, message))
490
491   def LogWarning(self, message, *args, **kwargs):
492     """Log a warning to the logs and the user.
493
494     The optional keyword argument is 'hint' and can be used to show a
495     hint to the user (presumably related to the warning). If the
496     message is empty, it will not be printed at all, allowing one to
497     show only a hint.
498
499     """
500     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
501            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
502     if args:
503       message = message % tuple(args)
504     if message:
505       logging.warning(message)
506       self.Log(" - WARNING: %s" % message)
507     if "hint" in kwargs:
508       self.Log("      Hint: %s" % kwargs["hint"])
509
510   def LogInfo(self, message, *args):
511     """Log an informational message to the logs and the user.
512
513     """
514     if args:
515       message = message % tuple(args)
516     logging.info(message)
517     self.Log(" - INFO: %s" % message)
518
519   def GetECId(self):
520     """Returns the current execution context ID.
521
522     """
523     if not self._ec_id:
524       raise errors.ProgrammerError("Tried to use execution context id when"
525                                    " not set")
526     return self._ec_id
527
528
529 class HooksMaster(object):
530   def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn,
531                hooks_results_adapt_fn, build_env_fn, log_fn, htype=None,
532                cluster_name=None, master_name=None):
533     """Base class for hooks masters.
534
535     This class invokes the execution of hooks according to the behaviour
536     specified by its parameters.
537
538     @type opcode: string
539     @param opcode: opcode of the operation to which the hooks are tied
540     @type hooks_path: string
541     @param hooks_path: prefix of the hooks directories
542     @type nodes: 2-tuple of lists
543     @param nodes: 2-tuple of lists containing nodes on which pre-hooks must be
544       run and nodes on which post-hooks must be run
545     @type hooks_execution_fn: function that accepts the following parameters:
546       (node_list, hooks_path, phase, environment)
547     @param hooks_execution_fn: function that will execute the hooks; can be
548       None, indicating that no conversion is necessary.
549     @type hooks_results_adapt_fn: function
550     @param hooks_results_adapt_fn: function that will adapt the return value of
551       hooks_execution_fn to the format expected by RunPhase
552     @type build_env_fn: function that returns a dictionary having strings as
553       keys
554     @param build_env_fn: function that builds the environment for the hooks
555     @type log_fn: function that accepts a string
556     @param log_fn: logging function
557     @type htype: string or None
558     @param htype: None or one of L{constants.HTYPE_CLUSTER},
559      L{constants.HTYPE_NODE}, L{constants.HTYPE_INSTANCE}
560     @type cluster_name: string
561     @param cluster_name: name of the cluster
562     @type master_name: string
563     @param master_name: name of the master
564
565     """
566     self.opcode = opcode
567     self.hooks_path = hooks_path
568     self.hooks_execution_fn = hooks_execution_fn
569     self.hooks_results_adapt_fn = hooks_results_adapt_fn
570     self.build_env_fn = build_env_fn
571     self.log_fn = log_fn
572     self.htype = htype
573     self.cluster_name = cluster_name
574     self.master_name = master_name
575
576     self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
577     (self.pre_nodes, self.post_nodes) = nodes
578
579   def _BuildEnv(self, phase):
580     """Compute the environment and the target nodes.
581
582     Based on the opcode and the current node list, this builds the
583     environment for the hooks and the target node list for the run.
584
585     """
586     if phase == constants.HOOKS_PHASE_PRE:
587       prefix = "GANETI_"
588     elif phase == constants.HOOKS_PHASE_POST:
589       prefix = "GANETI_POST_"
590     else:
591       raise AssertionError("Unknown phase '%s'" % phase)
592
593     env = {}
594
595     if self.hooks_path is not None:
596       phase_env = self.build_env_fn()
597       if phase_env:
598         assert not compat.any(key.upper().startswith(prefix)
599                               for key in phase_env)
600         env.update(("%s%s" % (prefix, key), value)
601                    for (key, value) in phase_env.items())
602
603     if phase == constants.HOOKS_PHASE_PRE:
604       assert compat.all((key.startswith("GANETI_") and
605                          not key.startswith("GANETI_POST_"))
606                         for key in env)
607
608     elif phase == constants.HOOKS_PHASE_POST:
609       assert compat.all(key.startswith("GANETI_POST_") for key in env)
610       assert isinstance(self.pre_env, dict)
611
612       # Merge with pre-phase environment
613       assert not compat.any(key.startswith("GANETI_POST_")
614                             for key in self.pre_env)
615       env.update(self.pre_env)
616     else:
617       raise AssertionError("Unknown phase '%s'" % phase)
618
619     return env
620
621   def _RunWrapper(self, node_list, hpath, phase, phase_env):
622     """Simple wrapper over self.callfn.
623
624     This method fixes the environment before executing the hooks.
625
626     """
627     env = {
628       "PATH": constants.HOOKS_PATH,
629       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
630       "GANETI_OP_CODE": self.opcode,
631       "GANETI_DATA_DIR": pathutils.DATA_DIR,
632       "GANETI_HOOKS_PHASE": phase,
633       "GANETI_HOOKS_PATH": hpath,
634       }
635
636     if self.htype:
637       env["GANETI_OBJECT_TYPE"] = self.htype
638
639     if self.cluster_name is not None:
640       env["GANETI_CLUSTER"] = self.cluster_name
641
642     if self.master_name is not None:
643       env["GANETI_MASTER"] = self.master_name
644
645     if phase_env:
646       env = utils.algo.JoinDisjointDicts(env, phase_env)
647
648     # Convert everything to strings
649     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
650
651     assert compat.all(key == "PATH" or key.startswith("GANETI_")
652                       for key in env)
653
654     return self.hooks_execution_fn(node_list, hpath, phase, env)
655
656   def RunPhase(self, phase, nodes=None):
657     """Run all the scripts for a phase.
658
659     This is the main function of the HookMaster.
660     It executes self.hooks_execution_fn, and after running
661     self.hooks_results_adapt_fn on its results it expects them to be in the form
662     {node_name: (fail_msg, [(script, result, output), ...]}).
663
664     @param phase: one of L{constants.HOOKS_PHASE_POST} or
665         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
666     @param nodes: overrides the predefined list of nodes for the given phase
667     @return: the processed results of the hooks multi-node rpc call
668     @raise errors.HooksFailure: on communication failure to the nodes
669     @raise errors.HooksAbort: on failure of one of the hooks
670
671     """
672     if phase == constants.HOOKS_PHASE_PRE:
673       if nodes is None:
674         nodes = self.pre_nodes
675       env = self.pre_env
676     elif phase == constants.HOOKS_PHASE_POST:
677       if nodes is None:
678         nodes = self.post_nodes
679       env = self._BuildEnv(phase)
680     else:
681       raise AssertionError("Unknown phase '%s'" % phase)
682
683     if not nodes:
684       # empty node list, we should not attempt to run this as either
685       # we're in the cluster init phase and the rpc client part can't
686       # even attempt to run, or this LU doesn't do hooks at all
687       return
688
689     results = self._RunWrapper(nodes, self.hooks_path, phase, env)
690     if not results:
691       msg = "Communication Failure"
692       if phase == constants.HOOKS_PHASE_PRE:
693         raise errors.HooksFailure(msg)
694       else:
695         self.log_fn(msg)
696         return results
697
698     converted_res = results
699     if self.hooks_results_adapt_fn:
700       converted_res = self.hooks_results_adapt_fn(results)
701
702     errs = []
703     for node_name, (fail_msg, offline, hooks_results) in converted_res.items():
704       if offline:
705         continue
706
707       if fail_msg:
708         self.log_fn("Communication failure to node %s: %s", node_name, fail_msg)
709         continue
710
711       for script, hkr, output in hooks_results:
712         if hkr == constants.HKR_FAIL:
713           if phase == constants.HOOKS_PHASE_PRE:
714             errs.append((node_name, script, output))
715           else:
716             if not output:
717               output = "(no output)"
718             self.log_fn("On %s script %s failed, output: %s" %
719                         (node_name, script, output))
720
721     if errs and phase == constants.HOOKS_PHASE_PRE:
722       raise errors.HooksAbort(errs)
723
724     return results
725
726   def RunConfigUpdate(self):
727     """Run the special configuration update hook
728
729     This is a special hook that runs only on the master after each
730     top-level LI if the configuration has been updated.
731
732     """
733     phase = constants.HOOKS_PHASE_POST
734     hpath = constants.HOOKS_NAME_CFGUPDATE
735     nodes = [self.master_name]
736     self._RunWrapper(nodes, hpath, phase, self.pre_env)
737
738   @staticmethod
739   def BuildFromLu(hooks_execution_fn, lu):
740     if lu.HPATH is None:
741       nodes = (None, None)
742     else:
743       nodes = map(frozenset, lu.BuildHooksNodes())
744
745     master_name = cluster_name = None
746     if lu.cfg:
747       master_name = lu.cfg.GetMasterNode()
748       cluster_name = lu.cfg.GetClusterName()
749
750     return HooksMaster(lu.op.OP_ID, lu.HPATH, nodes, hooks_execution_fn,
751                        _RpcResultsToHooksResults, lu.BuildHooksEnv,
752                        lu.LogWarning, lu.HTYPE, cluster_name, master_name)