Fix wrong option names in QA and cluster-merge
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32 import random
33 import time
34
35 from ganeti import opcodes
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import rpc
39 from ganeti import cmdlib
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import compat
43
44
45 _OP_PREFIX = "Op"
46 _LU_PREFIX = "LU"
47
48
49 class LockAcquireTimeout(Exception):
50   """Exception to report timeouts on acquiring locks.
51
52   """
53
54
55 def _CalculateLockAttemptTimeouts():
56   """Calculate timeouts for lock attempts.
57
58   """
59   result = [constants.LOCK_ATTEMPTS_MINWAIT]
60   running_sum = result[0]
61
62   # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
63   # blocking acquire
64   while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
65     timeout = (result[-1] * 1.05) ** 1.25
66
67     # Cap max timeout. This gives other jobs a chance to run even if
68     # we're still trying to get our locks, before finally moving to a
69     # blocking acquire.
70     timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
71     # And also cap the lower boundary for safety
72     timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
73
74     result.append(timeout)
75     running_sum += timeout
76
77   return result
78
79
80 class LockAttemptTimeoutStrategy(object):
81   """Class with lock acquire timeout strategy.
82
83   """
84   __slots__ = [
85     "_timeouts",
86     "_random_fn",
87     "_time_fn",
88     ]
89
90   _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
91
92   def __init__(self, _time_fn=time.time, _random_fn=random.random):
93     """Initializes this class.
94
95     @param _time_fn: Time function for unittests
96     @param _random_fn: Random number generator for unittests
97
98     """
99     object.__init__(self)
100
101     self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
102     self._time_fn = _time_fn
103     self._random_fn = _random_fn
104
105   def NextAttempt(self):
106     """Returns the timeout for the next attempt.
107
108     """
109     try:
110       timeout = self._timeouts.next()
111     except StopIteration:
112       # No more timeouts, do blocking acquire
113       timeout = None
114
115     if timeout is not None:
116       # Add a small variation (-/+ 5%) to timeout. This helps in situations
117       # where two or more jobs are fighting for the same lock(s).
118       variation_range = timeout * 0.1
119       timeout += ((self._random_fn() * variation_range) -
120                   (variation_range * 0.5))
121
122     return timeout
123
124
125 class OpExecCbBase: # pylint: disable=W0232
126   """Base class for OpCode execution callbacks.
127
128   """
129   def NotifyStart(self):
130     """Called when we are about to execute the LU.
131
132     This function is called when we're about to start the lu's Exec() method,
133     that is, after we have acquired all locks.
134
135     """
136
137   def Feedback(self, *args):
138     """Sends feedback from the LU code to the end-user.
139
140     """
141
142   def CheckCancel(self):
143     """Check whether job has been cancelled.
144
145     """
146
147   def SubmitManyJobs(self, jobs):
148     """Submits jobs for processing.
149
150     See L{jqueue.JobQueue.SubmitManyJobs}.
151
152     """
153     raise NotImplementedError
154
155
156 def _LUNameForOpName(opname):
157   """Computes the LU name for a given OpCode name.
158
159   """
160   assert opname.startswith(_OP_PREFIX), \
161       "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
162
163   return _LU_PREFIX + opname[len(_OP_PREFIX):]
164
165
166 def _ComputeDispatchTable():
167   """Computes the opcode-to-lu dispatch table.
168
169   """
170   return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
171               for op in opcodes.OP_MAPPING.values()
172               if op.WITH_LU)
173
174
175 def _RpcResultsToHooksResults(rpc_results):
176   """Function to convert RPC results to the format expected by HooksMaster.
177
178   @type rpc_results: dict(node: L{rpc.RpcResult})
179   @param rpc_results: RPC results
180   @rtype: dict(node: (fail_msg, offline, hooks_results))
181   @return: RPC results unpacked according to the format expected by
182     L({mcpu.HooksMaster}
183
184   """
185   return dict((node, (rpc_res.fail_msg, rpc_res.offline, rpc_res.payload))
186               for (node, rpc_res) in rpc_results.items())
187
188
189 class Processor(object):
190   """Object which runs OpCodes"""
191   DISPATCH_TABLE = _ComputeDispatchTable()
192
193   def __init__(self, context, ec_id):
194     """Constructor for Processor
195
196     @type context: GanetiContext
197     @param context: global Ganeti context
198     @type ec_id: string
199     @param ec_id: execution context identifier
200
201     """
202     self.context = context
203     self._ec_id = ec_id
204     self._cbs = None
205     self.rpc = rpc.RpcRunner(context.cfg)
206     self.hmclass = HooksMaster
207
208   def _AcquireLocks(self, level, names, shared, timeout, priority):
209     """Acquires locks via the Ganeti lock manager.
210
211     @type level: int
212     @param level: Lock level
213     @type names: list or string
214     @param names: Lock names
215     @type shared: bool
216     @param shared: Whether the locks should be acquired in shared mode
217     @type timeout: None or float
218     @param timeout: Timeout for acquiring the locks
219     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
220         amount of time
221
222     """
223     if self._cbs:
224       self._cbs.CheckCancel()
225
226     acquired = self.context.glm.acquire(level, names, shared=shared,
227                                         timeout=timeout, priority=priority)
228
229     if acquired is None:
230       raise LockAcquireTimeout()
231
232     return acquired
233
234   def _ProcessResult(self, result):
235     """Examines opcode result.
236
237     If necessary, additional processing on the result is done.
238
239     """
240     if isinstance(result, cmdlib.ResultWithJobs):
241       # Submit jobs
242       job_submission = self._cbs.SubmitManyJobs(result.jobs)
243
244       # Build dictionary
245       result = result.other
246
247       assert constants.JOB_IDS_KEY not in result, \
248         "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
249
250       result[constants.JOB_IDS_KEY] = job_submission
251
252     return result
253
254   def _ExecLU(self, lu):
255     """Logical Unit execution sequence.
256
257     """
258     write_count = self.context.cfg.write_count
259     lu.CheckPrereq()
260
261     hm = self.BuildHooksManager(lu)
262     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
263     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
264                      self.Log, None)
265
266     if getattr(lu.op, "dry_run", False):
267       # in this mode, no post-hooks are run, and the config is not
268       # written (as it might have been modified by another LU, and we
269       # shouldn't do writeout on behalf of other threads
270       self.LogInfo("dry-run mode requested, not actually executing"
271                    " the operation")
272       return lu.dry_run_result
273
274     try:
275       result = self._ProcessResult(lu.Exec(self.Log))
276       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
277       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
278                                 self.Log, result)
279     finally:
280       # FIXME: This needs locks if not lu_class.REQ_BGL
281       if write_count != self.context.cfg.write_count:
282         hm.RunConfigUpdate()
283
284     return result
285
286   def BuildHooksManager(self, lu):
287     return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
288
289   def _LockAndExecLU(self, lu, level, calc_timeout, priority):
290     """Execute a Logical Unit, with the needed locks.
291
292     This is a recursive function that starts locking the given level, and
293     proceeds up, till there are no more locks to acquire. Then it executes the
294     given LU and its opcodes.
295
296     """
297     adding_locks = level in lu.add_locks
298     acquiring_locks = level in lu.needed_locks
299     if level not in locking.LEVELS:
300       if self._cbs:
301         self._cbs.NotifyStart()
302
303       result = self._ExecLU(lu)
304
305     elif adding_locks and acquiring_locks:
306       # We could both acquire and add locks at the same level, but for now we
307       # don't need this, so we'll avoid the complicated code needed.
308       raise NotImplementedError("Can't declare locks to acquire when adding"
309                                 " others")
310
311     elif adding_locks or acquiring_locks:
312       lu.DeclareLocks(level)
313       share = lu.share_locks[level]
314
315       try:
316         assert adding_locks ^ acquiring_locks, \
317           "Locks must be either added or acquired"
318
319         if acquiring_locks:
320           # Acquiring locks
321           needed_locks = lu.needed_locks[level]
322
323           self._AcquireLocks(level, needed_locks, share,
324                              calc_timeout(), priority)
325         else:
326           # Adding locks
327           add_locks = lu.add_locks[level]
328           lu.remove_locks[level] = add_locks
329
330           try:
331             self.context.glm.add(level, add_locks, acquired=1, shared=share)
332           except errors.LockError:
333             raise errors.OpPrereqError(
334               "Couldn't add locks (%s), probably because of a race condition"
335               " with another job, who added them first" % add_locks,
336               errors.ECODE_FAULT)
337
338         try:
339           result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
340         finally:
341           if level in lu.remove_locks:
342             self.context.glm.remove(level, lu.remove_locks[level])
343       finally:
344         if self.context.glm.is_owned(level):
345           self.context.glm.release(level)
346
347     else:
348       result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
349
350     return result
351
352   def ExecOpCode(self, op, cbs, timeout=None, priority=None):
353     """Execute an opcode.
354
355     @type op: an OpCode instance
356     @param op: the opcode to be executed
357     @type cbs: L{OpExecCbBase}
358     @param cbs: Runtime callbacks
359     @type timeout: float or None
360     @param timeout: Maximum time to acquire all locks, None for no timeout
361     @type priority: number or None
362     @param priority: Priority for acquiring lock(s)
363     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
364         amount of time
365
366     """
367     if not isinstance(op, opcodes.OpCode):
368       raise errors.ProgrammerError("Non-opcode instance passed"
369                                    " to ExecOpcode (%s)" % type(op))
370
371     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
372     if lu_class is None:
373       raise errors.OpCodeUnknown("Unknown opcode")
374
375     if timeout is None:
376       calc_timeout = lambda: None
377     else:
378       calc_timeout = utils.RunningTimeout(timeout, False).Remaining
379
380     self._cbs = cbs
381     try:
382       # Acquire the Big Ganeti Lock exclusively if this LU requires it,
383       # and in a shared fashion otherwise (to prevent concurrent run with
384       # an exclusive LU.
385       self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
386                           not lu_class.REQ_BGL, calc_timeout(),
387                           priority)
388       try:
389         lu = lu_class(self, op, self.context, self.rpc)
390         lu.ExpandNames()
391         assert lu.needed_locks is not None, "needed_locks not set by LU"
392
393         try:
394           result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
395                                        priority)
396         finally:
397           if self._ec_id:
398             self.context.cfg.DropECReservations(self._ec_id)
399       finally:
400         self.context.glm.release(locking.LEVEL_CLUSTER)
401     finally:
402       self._cbs = None
403
404     resultcheck_fn = op.OP_RESULT
405     if not (resultcheck_fn is None or resultcheck_fn(result)):
406       logging.error("Expected opcode result matching %s, got %s",
407                     resultcheck_fn, result)
408       raise errors.OpResultError("Opcode result does not match %s" %
409                                  resultcheck_fn)
410
411     return result
412
413   def Log(self, *args):
414     """Forward call to feedback callback function.
415
416     """
417     if self._cbs:
418       self._cbs.Feedback(*args)
419
420   def LogStep(self, current, total, message):
421     """Log a change in LU execution progress.
422
423     """
424     logging.debug("Step %d/%d %s", current, total, message)
425     self.Log("STEP %d/%d %s" % (current, total, message))
426
427   def LogWarning(self, message, *args, **kwargs):
428     """Log a warning to the logs and the user.
429
430     The optional keyword argument is 'hint' and can be used to show a
431     hint to the user (presumably related to the warning). If the
432     message is empty, it will not be printed at all, allowing one to
433     show only a hint.
434
435     """
436     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
437            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
438     if args:
439       message = message % tuple(args)
440     if message:
441       logging.warning(message)
442       self.Log(" - WARNING: %s" % message)
443     if "hint" in kwargs:
444       self.Log("      Hint: %s" % kwargs["hint"])
445
446   def LogInfo(self, message, *args):
447     """Log an informational message to the logs and the user.
448
449     """
450     if args:
451       message = message % tuple(args)
452     logging.info(message)
453     self.Log(" - INFO: %s" % message)
454
455   def GetECId(self):
456     """Returns the current execution context ID.
457
458     """
459     if not self._ec_id:
460       raise errors.ProgrammerError("Tried to use execution context id when"
461                                    " not set")
462     return self._ec_id
463
464
465 class HooksMaster(object):
466   def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn,
467     hooks_results_adapt_fn, build_env_fn, log_fn, htype=None, cluster_name=None,
468     master_name=None):
469     """Base class for hooks masters.
470
471     This class invokes the execution of hooks according to the behaviour
472     specified by its parameters.
473
474     @type opcode: string
475     @param opcode: opcode of the operation to which the hooks are tied
476     @type hooks_path: string
477     @param hooks_path: prefix of the hooks directories
478     @type nodes: 2-tuple of lists
479     @param nodes: 2-tuple of lists containing nodes on which pre-hooks must be
480       run and nodes on which post-hooks must be run
481     @type hooks_execution_fn: function that accepts the following parameters:
482       (node_list, hooks_path, phase, environment)
483     @param hooks_execution_fn: function that will execute the hooks; can be
484       None, indicating that no conversion is necessary.
485     @type hooks_results_adapt_fn: function
486     @param hooks_results_adapt_fn: function that will adapt the return value of
487       hooks_execution_fn to the format expected by RunPhase
488     @type build_env_fn: function that returns a dictionary having strings as
489       keys
490     @param build_env_fn: function that builds the environment for the hooks
491     @type log_fn: function that accepts a string
492     @param log_fn: logging function
493     @type htype: string or None
494     @param htype: None or one of L{constants.HTYPE_CLUSTER},
495      L{constants.HTYPE_NODE}, L{constants.HTYPE_INSTANCE}
496     @type cluster_name: string
497     @param cluster_name: name of the cluster
498     @type master_name: string
499     @param master_name: name of the master
500
501     """
502     self.opcode = opcode
503     self.hooks_path = hooks_path
504     self.hooks_execution_fn = hooks_execution_fn
505     self.hooks_results_adapt_fn = hooks_results_adapt_fn
506     self.build_env_fn = build_env_fn
507     self.log_fn = log_fn
508     self.htype = htype
509     self.cluster_name = cluster_name
510     self.master_name = master_name
511
512     self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
513     (self.pre_nodes, self.post_nodes) = nodes
514
515   def _BuildEnv(self, phase):
516     """Compute the environment and the target nodes.
517
518     Based on the opcode and the current node list, this builds the
519     environment for the hooks and the target node list for the run.
520
521     """
522     if phase == constants.HOOKS_PHASE_PRE:
523       prefix = "GANETI_"
524     elif phase == constants.HOOKS_PHASE_POST:
525       prefix = "GANETI_POST_"
526     else:
527       raise AssertionError("Unknown phase '%s'" % phase)
528
529     env = {}
530
531     if self.hooks_path is not None:
532       phase_env = self.build_env_fn()
533       if phase_env:
534         assert not compat.any(key.upper().startswith(prefix)
535                               for key in phase_env)
536         env.update(("%s%s" % (prefix, key), value)
537                    for (key, value) in phase_env.items())
538
539     if phase == constants.HOOKS_PHASE_PRE:
540       assert compat.all((key.startswith("GANETI_") and
541                          not key.startswith("GANETI_POST_"))
542                         for key in env)
543
544     elif phase == constants.HOOKS_PHASE_POST:
545       assert compat.all(key.startswith("GANETI_POST_") for key in env)
546       assert isinstance(self.pre_env, dict)
547
548       # Merge with pre-phase environment
549       assert not compat.any(key.startswith("GANETI_POST_")
550                             for key in self.pre_env)
551       env.update(self.pre_env)
552     else:
553       raise AssertionError("Unknown phase '%s'" % phase)
554
555     return env
556
557   def _RunWrapper(self, node_list, hpath, phase, phase_env):
558     """Simple wrapper over self.callfn.
559
560     This method fixes the environment before executing the hooks.
561
562     """
563     env = {
564       "PATH": constants.HOOKS_PATH,
565       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
566       "GANETI_OP_CODE": self.opcode,
567       "GANETI_DATA_DIR": constants.DATA_DIR,
568       "GANETI_HOOKS_PHASE": phase,
569       "GANETI_HOOKS_PATH": hpath,
570       }
571
572     if self.htype:
573       env["GANETI_OBJECT_TYPE"] = self.htype
574
575     if self.cluster_name is not None:
576       env["GANETI_CLUSTER"] = self.cluster_name
577
578     if self.master_name is not None:
579       env["GANETI_MASTER"] = self.master_name
580
581     if phase_env:
582       env = utils.algo.JoinDisjointDicts(env, phase_env)
583
584     # Convert everything to strings
585     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
586
587     assert compat.all(key == "PATH" or key.startswith("GANETI_")
588                       for key in env)
589
590     return self.hooks_execution_fn(node_list, hpath, phase, env)
591
592   def RunPhase(self, phase, nodes=None):
593     """Run all the scripts for a phase.
594
595     This is the main function of the HookMaster.
596     It executes self.hooks_execution_fn, and after running
597     self.hooks_results_adapt_fn on its results it expects them to be in the form
598     {node_name: (fail_msg, [(script, result, output), ...]}).
599
600     @param phase: one of L{constants.HOOKS_PHASE_POST} or
601         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
602     @param nodes: overrides the predefined list of nodes for the given phase
603     @return: the processed results of the hooks multi-node rpc call
604     @raise errors.HooksFailure: on communication failure to the nodes
605     @raise errors.HooksAbort: on failure of one of the hooks
606
607     """
608     if phase == constants.HOOKS_PHASE_PRE:
609       if nodes is None:
610         nodes = self.pre_nodes
611       env = self.pre_env
612     elif phase == constants.HOOKS_PHASE_POST:
613       if nodes is None:
614         nodes = self.post_nodes
615       env = self._BuildEnv(phase)
616     else:
617       raise AssertionError("Unknown phase '%s'" % phase)
618
619     if not nodes:
620       # empty node list, we should not attempt to run this as either
621       # we're in the cluster init phase and the rpc client part can't
622       # even attempt to run, or this LU doesn't do hooks at all
623       return
624
625     results = self._RunWrapper(nodes, self.hooks_path, phase, env)
626     if not results:
627       msg = "Communication Failure"
628       if phase == constants.HOOKS_PHASE_PRE:
629         raise errors.HooksFailure(msg)
630       else:
631         self.log_fn(msg)
632         return results
633
634     converted_res = results
635     if self.hooks_results_adapt_fn:
636       converted_res = self.hooks_results_adapt_fn(results)
637
638     errs = []
639     for node_name, (fail_msg, offline, hooks_results) in converted_res.items():
640       if offline:
641         continue
642
643       if fail_msg:
644         self.log_fn("Communication failure to node %s: %s", node_name, fail_msg)
645         continue
646
647       for script, hkr, output in hooks_results:
648         if hkr == constants.HKR_FAIL:
649           if phase == constants.HOOKS_PHASE_PRE:
650             errs.append((node_name, script, output))
651           else:
652             if not output:
653               output = "(no output)"
654             self.log_fn("On %s script %s failed, output: %s" %
655                         (node_name, script, output))
656
657     if errs and phase == constants.HOOKS_PHASE_PRE:
658       raise errors.HooksAbort(errs)
659
660     return results
661
662   def RunConfigUpdate(self):
663     """Run the special configuration update hook
664
665     This is a special hook that runs only on the master after each
666     top-level LI if the configuration has been updated.
667
668     """
669     phase = constants.HOOKS_PHASE_POST
670     hpath = constants.HOOKS_NAME_CFGUPDATE
671     nodes = [self.master_name]
672     self._RunWrapper(nodes, hpath, phase, self.pre_env)
673
674   @staticmethod
675   def BuildFromLu(hooks_execution_fn, lu):
676     if lu.HPATH is None:
677       nodes = (None, None)
678     else:
679       nodes = map(frozenset, lu.BuildHooksNodes())
680
681     master_name = cluster_name = None
682     if lu.cfg:
683       master_name = lu.cfg.GetMasterNode()
684       cluster_name = lu.cfg.GetClusterName()
685
686     return HooksMaster(lu.op.OP_ID, lu.HPATH, nodes, hooks_execution_fn,
687                        _RpcResultsToHooksResults, lu.BuildHooksEnv,
688                        lu.LogWarning, lu.HTYPE, cluster_name, master_name)