mcpu: Add missing docstring to _ProcessResult
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32 import random
33 import time
34
35 from ganeti import opcodes
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import rpc
39 from ganeti import cmdlib
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import compat
43
44
45 _OP_PREFIX = "Op"
46 _LU_PREFIX = "LU"
47
48
49 class LockAcquireTimeout(Exception):
50   """Exception to report timeouts on acquiring locks.
51
52   """
53
54
55 def _CalculateLockAttemptTimeouts():
56   """Calculate timeouts for lock attempts.
57
58   """
59   result = [constants.LOCK_ATTEMPTS_MINWAIT]
60   running_sum = result[0]
61
62   # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
63   # blocking acquire
64   while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
65     timeout = (result[-1] * 1.05) ** 1.25
66
67     # Cap max timeout. This gives other jobs a chance to run even if
68     # we're still trying to get our locks, before finally moving to a
69     # blocking acquire.
70     timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
71     # And also cap the lower boundary for safety
72     timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
73
74     result.append(timeout)
75     running_sum += timeout
76
77   return result
78
79
80 class LockAttemptTimeoutStrategy(object):
81   """Class with lock acquire timeout strategy.
82
83   """
84   __slots__ = [
85     "_timeouts",
86     "_random_fn",
87     "_time_fn",
88     ]
89
90   _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
91
92   def __init__(self, _time_fn=time.time, _random_fn=random.random):
93     """Initializes this class.
94
95     @param _time_fn: Time function for unittests
96     @param _random_fn: Random number generator for unittests
97
98     """
99     object.__init__(self)
100
101     self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
102     self._time_fn = _time_fn
103     self._random_fn = _random_fn
104
105   def NextAttempt(self):
106     """Returns the timeout for the next attempt.
107
108     """
109     try:
110       timeout = self._timeouts.next()
111     except StopIteration:
112       # No more timeouts, do blocking acquire
113       timeout = None
114
115     if timeout is not None:
116       # Add a small variation (-/+ 5%) to timeout. This helps in situations
117       # where two or more jobs are fighting for the same lock(s).
118       variation_range = timeout * 0.1
119       timeout += ((self._random_fn() * variation_range) -
120                   (variation_range * 0.5))
121
122     return timeout
123
124
125 class OpExecCbBase: # pylint: disable-msg=W0232
126   """Base class for OpCode execution callbacks.
127
128   """
129   def NotifyStart(self):
130     """Called when we are about to execute the LU.
131
132     This function is called when we're about to start the lu's Exec() method,
133     that is, after we have acquired all locks.
134
135     """
136
137   def Feedback(self, *args):
138     """Sends feedback from the LU code to the end-user.
139
140     """
141
142   def CheckCancel(self):
143     """Check whether job has been cancelled.
144
145     """
146
147   def SubmitManyJobs(self, jobs):
148     """Submits jobs for processing.
149
150     See L{jqueue.JobQueue.SubmitManyJobs}.
151
152     """
153     raise NotImplementedError
154
155
156 def _LUNameForOpName(opname):
157   """Computes the LU name for a given OpCode name.
158
159   """
160   assert opname.startswith(_OP_PREFIX), \
161       "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
162
163   return _LU_PREFIX + opname[len(_OP_PREFIX):]
164
165
166 def _ComputeDispatchTable():
167   """Computes the opcode-to-lu dispatch table.
168
169   """
170   return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
171               for op in opcodes.OP_MAPPING.values()
172               if op.WITH_LU)
173
174
175 class Processor(object):
176   """Object which runs OpCodes"""
177   DISPATCH_TABLE = _ComputeDispatchTable()
178
179   def __init__(self, context, ec_id):
180     """Constructor for Processor
181
182     @type context: GanetiContext
183     @param context: global Ganeti context
184     @type ec_id: string
185     @param ec_id: execution context identifier
186
187     """
188     self.context = context
189     self._ec_id = ec_id
190     self._cbs = None
191     self.rpc = rpc.RpcRunner(context.cfg)
192     self.hmclass = HooksMaster
193
194   def _AcquireLocks(self, level, names, shared, timeout, priority):
195     """Acquires locks via the Ganeti lock manager.
196
197     @type level: int
198     @param level: Lock level
199     @type names: list or string
200     @param names: Lock names
201     @type shared: bool
202     @param shared: Whether the locks should be acquired in shared mode
203     @type timeout: None or float
204     @param timeout: Timeout for acquiring the locks
205     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
206         amount of time
207
208     """
209     if self._cbs:
210       self._cbs.CheckCancel()
211
212     acquired = self.context.glm.acquire(level, names, shared=shared,
213                                         timeout=timeout, priority=priority)
214
215     if acquired is None:
216       raise LockAcquireTimeout()
217
218     return acquired
219
220   def _ProcessResult(self, result):
221     """Examines opcode result.
222
223     If necessary, additional processing on the result is done.
224
225     """
226     if isinstance(result, cmdlib.ResultWithJobs):
227       # Submit jobs
228       job_submission = self._cbs.SubmitManyJobs(result.jobs)
229
230       # Build dictionary
231       result = result.other
232
233       assert constants.JOB_IDS_KEY not in result, \
234         "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
235
236       result[constants.JOB_IDS_KEY] = job_submission
237
238     return result
239
240   def _ExecLU(self, lu):
241     """Logical Unit execution sequence.
242
243     """
244     write_count = self.context.cfg.write_count
245     lu.CheckPrereq()
246     hm = HooksMaster(self.rpc.call_hooks_runner, lu)
247     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
248     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
249                      self.Log, None)
250
251     if getattr(lu.op, "dry_run", False):
252       # in this mode, no post-hooks are run, and the config is not
253       # written (as it might have been modified by another LU, and we
254       # shouldn't do writeout on behalf of other threads
255       self.LogInfo("dry-run mode requested, not actually executing"
256                    " the operation")
257       return lu.dry_run_result
258
259     try:
260       result = self._ProcessResult(lu.Exec(self.Log))
261       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
262       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
263                                 self.Log, result)
264     finally:
265       # FIXME: This needs locks if not lu_class.REQ_BGL
266       if write_count != self.context.cfg.write_count:
267         hm.RunConfigUpdate()
268
269     return result
270
271   def _LockAndExecLU(self, lu, level, calc_timeout, priority):
272     """Execute a Logical Unit, with the needed locks.
273
274     This is a recursive function that starts locking the given level, and
275     proceeds up, till there are no more locks to acquire. Then it executes the
276     given LU and its opcodes.
277
278     """
279     adding_locks = level in lu.add_locks
280     acquiring_locks = level in lu.needed_locks
281     if level not in locking.LEVELS:
282       if self._cbs:
283         self._cbs.NotifyStart()
284
285       result = self._ExecLU(lu)
286
287     elif adding_locks and acquiring_locks:
288       # We could both acquire and add locks at the same level, but for now we
289       # don't need this, so we'll avoid the complicated code needed.
290       raise NotImplementedError("Can't declare locks to acquire when adding"
291                                 " others")
292
293     elif adding_locks or acquiring_locks:
294       lu.DeclareLocks(level)
295       share = lu.share_locks[level]
296
297       try:
298         assert adding_locks ^ acquiring_locks, \
299           "Locks must be either added or acquired"
300
301         if acquiring_locks:
302           # Acquiring locks
303           needed_locks = lu.needed_locks[level]
304
305           self._AcquireLocks(level, needed_locks, share,
306                              calc_timeout(), priority)
307         else:
308           # Adding locks
309           add_locks = lu.add_locks[level]
310           lu.remove_locks[level] = add_locks
311
312           try:
313             self.context.glm.add(level, add_locks, acquired=1, shared=share)
314           except errors.LockError:
315             raise errors.OpPrereqError(
316               "Couldn't add locks (%s), probably because of a race condition"
317               " with another job, who added them first" % add_locks,
318               errors.ECODE_FAULT)
319
320         try:
321           result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
322         finally:
323           if level in lu.remove_locks:
324             self.context.glm.remove(level, lu.remove_locks[level])
325       finally:
326         if self.context.glm.is_owned(level):
327           self.context.glm.release(level)
328
329     else:
330       result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
331
332     return result
333
334   def ExecOpCode(self, op, cbs, timeout=None, priority=None):
335     """Execute an opcode.
336
337     @type op: an OpCode instance
338     @param op: the opcode to be executed
339     @type cbs: L{OpExecCbBase}
340     @param cbs: Runtime callbacks
341     @type timeout: float or None
342     @param timeout: Maximum time to acquire all locks, None for no timeout
343     @type priority: number or None
344     @param priority: Priority for acquiring lock(s)
345     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
346         amount of time
347
348     """
349     if not isinstance(op, opcodes.OpCode):
350       raise errors.ProgrammerError("Non-opcode instance passed"
351                                    " to ExecOpcode")
352
353     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
354     if lu_class is None:
355       raise errors.OpCodeUnknown("Unknown opcode")
356
357     if timeout is None:
358       calc_timeout = lambda: None
359     else:
360       calc_timeout = utils.RunningTimeout(timeout, False).Remaining
361
362     self._cbs = cbs
363     try:
364       # Acquire the Big Ganeti Lock exclusively if this LU requires it,
365       # and in a shared fashion otherwise (to prevent concurrent run with
366       # an exclusive LU.
367       self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
368                           not lu_class.REQ_BGL, calc_timeout(),
369                           priority)
370       try:
371         lu = lu_class(self, op, self.context, self.rpc)
372         lu.ExpandNames()
373         assert lu.needed_locks is not None, "needed_locks not set by LU"
374
375         try:
376           return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
377                                      priority)
378         finally:
379           if self._ec_id:
380             self.context.cfg.DropECReservations(self._ec_id)
381       finally:
382         self.context.glm.release(locking.LEVEL_CLUSTER)
383     finally:
384       self._cbs = None
385
386   def Log(self, *args):
387     """Forward call to feedback callback function.
388
389     """
390     if self._cbs:
391       self._cbs.Feedback(*args)
392
393   def LogStep(self, current, total, message):
394     """Log a change in LU execution progress.
395
396     """
397     logging.debug("Step %d/%d %s", current, total, message)
398     self.Log("STEP %d/%d %s" % (current, total, message))
399
400   def LogWarning(self, message, *args, **kwargs):
401     """Log a warning to the logs and the user.
402
403     The optional keyword argument is 'hint' and can be used to show a
404     hint to the user (presumably related to the warning). If the
405     message is empty, it will not be printed at all, allowing one to
406     show only a hint.
407
408     """
409     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
410            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
411     if args:
412       message = message % tuple(args)
413     if message:
414       logging.warning(message)
415       self.Log(" - WARNING: %s" % message)
416     if "hint" in kwargs:
417       self.Log("      Hint: %s" % kwargs["hint"])
418
419   def LogInfo(self, message, *args):
420     """Log an informational message to the logs and the user.
421
422     """
423     if args:
424       message = message % tuple(args)
425     logging.info(message)
426     self.Log(" - INFO: %s" % message)
427
428   def GetECId(self):
429     """Returns the current execution context ID.
430
431     """
432     if not self._ec_id:
433       raise errors.ProgrammerError("Tried to use execution context id when"
434                                    " not set")
435     return self._ec_id
436
437
438 class HooksMaster(object):
439   """Hooks master.
440
441   This class distributes the run commands to the nodes based on the
442   specific LU class.
443
444   In order to remove the direct dependency on the rpc module, the
445   constructor needs a function which actually does the remote
446   call. This will usually be rpc.call_hooks_runner, but any function
447   which behaves the same works.
448
449   """
450   def __init__(self, callfn, lu):
451     self.callfn = callfn
452     self.lu = lu
453     self.op = lu.op
454     self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
455
456     if self.lu.HPATH is None:
457       nodes = (None, None)
458     else:
459       nodes = map(frozenset, self.lu.BuildHooksNodes())
460
461     (self.pre_nodes, self.post_nodes) = nodes
462
463   def _BuildEnv(self, phase):
464     """Compute the environment and the target nodes.
465
466     Based on the opcode and the current node list, this builds the
467     environment for the hooks and the target node list for the run.
468
469     """
470     if phase == constants.HOOKS_PHASE_PRE:
471       prefix = "GANETI_"
472     elif phase == constants.HOOKS_PHASE_POST:
473       prefix = "GANETI_POST_"
474     else:
475       raise AssertionError("Unknown phase '%s'" % phase)
476
477     env = {}
478
479     if self.lu.HPATH is not None:
480       lu_env = self.lu.BuildHooksEnv()
481       if lu_env:
482         assert not compat.any(key.upper().startswith(prefix) for key in lu_env)
483         env.update(("%s%s" % (prefix, key), value)
484                    for (key, value) in lu_env.items())
485
486     if phase == constants.HOOKS_PHASE_PRE:
487       assert compat.all((key.startswith("GANETI_") and
488                          not key.startswith("GANETI_POST_"))
489                         for key in env)
490
491     elif phase == constants.HOOKS_PHASE_POST:
492       assert compat.all(key.startswith("GANETI_POST_") for key in env)
493       assert isinstance(self.pre_env, dict)
494
495       # Merge with pre-phase environment
496       assert not compat.any(key.startswith("GANETI_POST_")
497                             for key in self.pre_env)
498       env.update(self.pre_env)
499     else:
500       raise AssertionError("Unknown phase '%s'" % phase)
501
502     return env
503
504   def _RunWrapper(self, node_list, hpath, phase, phase_env):
505     """Simple wrapper over self.callfn.
506
507     This method fixes the environment before doing the rpc call.
508
509     """
510     cfg = self.lu.cfg
511
512     env = {
513       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
514       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
515       "GANETI_OP_CODE": self.op.OP_ID,
516       "GANETI_DATA_DIR": constants.DATA_DIR,
517       "GANETI_HOOKS_PHASE": phase,
518       "GANETI_HOOKS_PATH": hpath,
519       }
520
521     if self.lu.HTYPE:
522       env["GANETI_OBJECT_TYPE"] = self.lu.HTYPE
523
524     if cfg is not None:
525       env["GANETI_CLUSTER"] = cfg.GetClusterName()
526       env["GANETI_MASTER"] = cfg.GetMasterNode()
527
528     if phase_env:
529       assert not (set(env) & set(phase_env)), "Environment variables conflict"
530       env.update(phase_env)
531
532     # Convert everything to strings
533     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
534
535     assert compat.all(key == "PATH" or key.startswith("GANETI_")
536                       for key in env)
537
538     return self.callfn(node_list, hpath, phase, env)
539
540   def RunPhase(self, phase, nodes=None):
541     """Run all the scripts for a phase.
542
543     This is the main function of the HookMaster.
544
545     @param phase: one of L{constants.HOOKS_PHASE_POST} or
546         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
547     @param nodes: overrides the predefined list of nodes for the given phase
548     @return: the processed results of the hooks multi-node rpc call
549     @raise errors.HooksFailure: on communication failure to the nodes
550     @raise errors.HooksAbort: on failure of one of the hooks
551
552     """
553     if phase == constants.HOOKS_PHASE_PRE:
554       if nodes is None:
555         nodes = self.pre_nodes
556       env = self.pre_env
557     elif phase == constants.HOOKS_PHASE_POST:
558       if nodes is None:
559         nodes = self.post_nodes
560       env = self._BuildEnv(phase)
561     else:
562       raise AssertionError("Unknown phase '%s'" % phase)
563
564     if not nodes:
565       # empty node list, we should not attempt to run this as either
566       # we're in the cluster init phase and the rpc client part can't
567       # even attempt to run, or this LU doesn't do hooks at all
568       return
569
570     results = self._RunWrapper(nodes, self.lu.HPATH, phase, env)
571     if not results:
572       msg = "Communication Failure"
573       if phase == constants.HOOKS_PHASE_PRE:
574         raise errors.HooksFailure(msg)
575       else:
576         self.lu.LogWarning(msg)
577         return results
578
579     errs = []
580     for node_name in results:
581       res = results[node_name]
582       if res.offline:
583         continue
584
585       msg = res.fail_msg
586       if msg:
587         self.lu.LogWarning("Communication failure to node %s: %s",
588                            node_name, msg)
589         continue
590
591       for script, hkr, output in res.payload:
592         if hkr == constants.HKR_FAIL:
593           if phase == constants.HOOKS_PHASE_PRE:
594             errs.append((node_name, script, output))
595           else:
596             if not output:
597               output = "(no output)"
598             self.lu.LogWarning("On %s script %s failed, output: %s" %
599                                (node_name, script, output))
600
601     if errs and phase == constants.HOOKS_PHASE_PRE:
602       raise errors.HooksAbort(errs)
603
604     return results
605
606   def RunConfigUpdate(self):
607     """Run the special configuration update hook
608
609     This is a special hook that runs only on the master after each
610     top-level LI if the configuration has been updated.
611
612     """
613     phase = constants.HOOKS_PHASE_POST
614     hpath = constants.HOOKS_NAME_CFGUPDATE
615     nodes = [self.lu.cfg.GetMasterNode()]
616     self._RunWrapper(nodes, hpath, phase, self.pre_env)