7ab7c18e6ddd8504a44b7adad543beb9f42d9455
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32 import random
33 import time
34
35 from ganeti import opcodes
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import rpc
39 from ganeti import cmdlib
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import compat
43
44
45 _OP_PREFIX = "Op"
46 _LU_PREFIX = "LU"
47
48
49 class LockAcquireTimeout(Exception):
50   """Exception to report timeouts on acquiring locks.
51
52   """
53
54
55 def _CalculateLockAttemptTimeouts():
56   """Calculate timeouts for lock attempts.
57
58   """
59   result = [1.0]
60
61   # Wait for a total of at least 150s before doing a blocking acquire
62   while sum(result) < 150.0:
63     timeout = (result[-1] * 1.05) ** 1.25
64
65     # Cap timeout at 10 seconds. This gives other jobs a chance to run
66     # even if we're still trying to get our locks, before finally moving
67     # to a blocking acquire.
68     if timeout > 10.0:
69       timeout = 10.0
70
71     elif timeout < 0.1:
72       # Lower boundary for safety
73       timeout = 0.1
74
75     result.append(timeout)
76
77   return result
78
79
80 class LockAttemptTimeoutStrategy(object):
81   """Class with lock acquire timeout strategy.
82
83   """
84   __slots__ = [
85     "_timeouts",
86     "_random_fn",
87     "_time_fn",
88     ]
89
90   _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
91
92   def __init__(self, _time_fn=time.time, _random_fn=random.random):
93     """Initializes this class.
94
95     @param _time_fn: Time function for unittests
96     @param _random_fn: Random number generator for unittests
97
98     """
99     object.__init__(self)
100
101     self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
102     self._time_fn = _time_fn
103     self._random_fn = _random_fn
104
105   def NextAttempt(self):
106     """Returns the timeout for the next attempt.
107
108     """
109     try:
110       timeout = self._timeouts.next()
111     except StopIteration:
112       # No more timeouts, do blocking acquire
113       timeout = None
114
115     if timeout is not None:
116       # Add a small variation (-/+ 5%) to timeout. This helps in situations
117       # where two or more jobs are fighting for the same lock(s).
118       variation_range = timeout * 0.1
119       timeout += ((self._random_fn() * variation_range) -
120                   (variation_range * 0.5))
121
122     return timeout
123
124
125 class OpExecCbBase: # pylint: disable-msg=W0232
126   """Base class for OpCode execution callbacks.
127
128   """
129   def NotifyStart(self):
130     """Called when we are about to execute the LU.
131
132     This function is called when we're about to start the lu's Exec() method,
133     that is, after we have acquired all locks.
134
135     """
136
137   def Feedback(self, *args):
138     """Sends feedback from the LU code to the end-user.
139
140     """
141
142   def CheckCancel(self):
143     """Check whether job has been cancelled.
144
145     """
146
147
148 def _LUNameForOpName(opname):
149   """Computes the LU name for a given OpCode name.
150
151   """
152   assert opname.startswith(_OP_PREFIX), \
153       "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
154
155   return _LU_PREFIX + opname[len(_OP_PREFIX):]
156
157
158 def _ComputeDispatchTable():
159   """Computes the opcode-to-lu dispatch table.
160
161   """
162   return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
163               for op in opcodes.OP_MAPPING.values()
164               if op.WITH_LU)
165
166
167 class Processor(object):
168   """Object which runs OpCodes"""
169   DISPATCH_TABLE = _ComputeDispatchTable()
170
171   def __init__(self, context, ec_id):
172     """Constructor for Processor
173
174     @type context: GanetiContext
175     @param context: global Ganeti context
176     @type ec_id: string
177     @param ec_id: execution context identifier
178
179     """
180     self.context = context
181     self._ec_id = ec_id
182     self._cbs = None
183     self.rpc = rpc.RpcRunner(context.cfg)
184     self.hmclass = HooksMaster
185
186   def _AcquireLocks(self, level, names, shared, timeout, priority):
187     """Acquires locks via the Ganeti lock manager.
188
189     @type level: int
190     @param level: Lock level
191     @type names: list or string
192     @param names: Lock names
193     @type shared: bool
194     @param shared: Whether the locks should be acquired in shared mode
195     @type timeout: None or float
196     @param timeout: Timeout for acquiring the locks
197     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
198         amount of time
199
200     """
201     if self._cbs:
202       self._cbs.CheckCancel()
203
204     acquired = self.context.glm.acquire(level, names, shared=shared,
205                                         timeout=timeout, priority=priority)
206
207     if acquired is None:
208       raise LockAcquireTimeout()
209
210     return acquired
211
212   def _ExecLU(self, lu):
213     """Logical Unit execution sequence.
214
215     """
216     write_count = self.context.cfg.write_count
217     lu.CheckPrereq()
218     hm = HooksMaster(self.rpc.call_hooks_runner, lu)
219     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
220     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
221                      self.Log, None)
222
223     if getattr(lu.op, "dry_run", False):
224       # in this mode, no post-hooks are run, and the config is not
225       # written (as it might have been modified by another LU, and we
226       # shouldn't do writeout on behalf of other threads
227       self.LogInfo("dry-run mode requested, not actually executing"
228                    " the operation")
229       return lu.dry_run_result
230
231     try:
232       result = lu.Exec(self.Log)
233       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
234       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
235                                 self.Log, result)
236     finally:
237       # FIXME: This needs locks if not lu_class.REQ_BGL
238       if write_count != self.context.cfg.write_count:
239         hm.RunConfigUpdate()
240
241     return result
242
243   def _LockAndExecLU(self, lu, level, calc_timeout, priority):
244     """Execute a Logical Unit, with the needed locks.
245
246     This is a recursive function that starts locking the given level, and
247     proceeds up, till there are no more locks to acquire. Then it executes the
248     given LU and its opcodes.
249
250     """
251     adding_locks = level in lu.add_locks
252     acquiring_locks = level in lu.needed_locks
253     if level not in locking.LEVELS:
254       if self._cbs:
255         self._cbs.NotifyStart()
256
257       result = self._ExecLU(lu)
258
259     elif adding_locks and acquiring_locks:
260       # We could both acquire and add locks at the same level, but for now we
261       # don't need this, so we'll avoid the complicated code needed.
262       raise NotImplementedError("Can't declare locks to acquire when adding"
263                                 " others")
264
265     elif adding_locks or acquiring_locks:
266       lu.DeclareLocks(level)
267       share = lu.share_locks[level]
268
269       try:
270         assert adding_locks ^ acquiring_locks, \
271           "Locks must be either added or acquired"
272
273         if acquiring_locks:
274           # Acquiring locks
275           needed_locks = lu.needed_locks[level]
276
277           acquired = self._AcquireLocks(level, needed_locks, share,
278                                         calc_timeout(), priority)
279         else:
280           # Adding locks
281           add_locks = lu.add_locks[level]
282           lu.remove_locks[level] = add_locks
283
284           try:
285             self.context.glm.add(level, add_locks, acquired=1, shared=share)
286           except errors.LockError:
287             raise errors.OpPrereqError(
288               "Couldn't add locks (%s), probably because of a race condition"
289               " with another job, who added them first" % add_locks,
290               errors.ECODE_FAULT)
291
292           acquired = add_locks
293
294         try:
295           lu.acquired_locks[level] = acquired
296
297           result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
298         finally:
299           if level in lu.remove_locks:
300             self.context.glm.remove(level, lu.remove_locks[level])
301       finally:
302         if self.context.glm.is_owned(level):
303           self.context.glm.release(level)
304
305     else:
306       result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
307
308     return result
309
310   def ExecOpCode(self, op, cbs, timeout=None, priority=None):
311     """Execute an opcode.
312
313     @type op: an OpCode instance
314     @param op: the opcode to be executed
315     @type cbs: L{OpExecCbBase}
316     @param cbs: Runtime callbacks
317     @type timeout: float or None
318     @param timeout: Maximum time to acquire all locks, None for no timeout
319     @type priority: number or None
320     @param priority: Priority for acquiring lock(s)
321     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
322         amount of time
323
324     """
325     if not isinstance(op, opcodes.OpCode):
326       raise errors.ProgrammerError("Non-opcode instance passed"
327                                    " to ExecOpcode")
328
329     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
330     if lu_class is None:
331       raise errors.OpCodeUnknown("Unknown opcode")
332
333     if timeout is None:
334       calc_timeout = lambda: None
335     else:
336       calc_timeout = utils.RunningTimeout(timeout, False).Remaining
337
338     self._cbs = cbs
339     try:
340       # Acquire the Big Ganeti Lock exclusively if this LU requires it,
341       # and in a shared fashion otherwise (to prevent concurrent run with
342       # an exclusive LU.
343       self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
344                           not lu_class.REQ_BGL, calc_timeout(),
345                           priority)
346       try:
347         lu = lu_class(self, op, self.context, self.rpc)
348         lu.ExpandNames()
349         assert lu.needed_locks is not None, "needed_locks not set by LU"
350
351         try:
352           return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
353                                      priority)
354         finally:
355           if self._ec_id:
356             self.context.cfg.DropECReservations(self._ec_id)
357       finally:
358         self.context.glm.release(locking.LEVEL_CLUSTER)
359     finally:
360       self._cbs = None
361
362   def Log(self, *args):
363     """Forward call to feedback callback function.
364
365     """
366     if self._cbs:
367       self._cbs.Feedback(*args)
368
369   def LogStep(self, current, total, message):
370     """Log a change in LU execution progress.
371
372     """
373     logging.debug("Step %d/%d %s", current, total, message)
374     self.Log("STEP %d/%d %s" % (current, total, message))
375
376   def LogWarning(self, message, *args, **kwargs):
377     """Log a warning to the logs and the user.
378
379     The optional keyword argument is 'hint' and can be used to show a
380     hint to the user (presumably related to the warning). If the
381     message is empty, it will not be printed at all, allowing one to
382     show only a hint.
383
384     """
385     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
386            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
387     if args:
388       message = message % tuple(args)
389     if message:
390       logging.warning(message)
391       self.Log(" - WARNING: %s" % message)
392     if "hint" in kwargs:
393       self.Log("      Hint: %s" % kwargs["hint"])
394
395   def LogInfo(self, message, *args):
396     """Log an informational message to the logs and the user.
397
398     """
399     if args:
400       message = message % tuple(args)
401     logging.info(message)
402     self.Log(" - INFO: %s" % message)
403
404   def GetECId(self):
405     """Returns the current execution context ID.
406
407     """
408     if not self._ec_id:
409       raise errors.ProgrammerError("Tried to use execution context id when"
410                                    " not set")
411     return self._ec_id
412
413
414 class HooksMaster(object):
415   """Hooks master.
416
417   This class distributes the run commands to the nodes based on the
418   specific LU class.
419
420   In order to remove the direct dependency on the rpc module, the
421   constructor needs a function which actually does the remote
422   call. This will usually be rpc.call_hooks_runner, but any function
423   which behaves the same works.
424
425   """
426   def __init__(self, callfn, lu):
427     self.callfn = callfn
428     self.lu = lu
429     self.op = lu.op
430     self.pre_env = None
431     self.pre_nodes = None
432
433   def _BuildEnv(self, phase):
434     """Compute the environment and the target nodes.
435
436     Based on the opcode and the current node list, this builds the
437     environment for the hooks and the target node list for the run.
438
439     """
440     if phase == constants.HOOKS_PHASE_PRE:
441       prefix = "GANETI_"
442     elif phase == constants.HOOKS_PHASE_POST:
443       prefix = "GANETI_POST_"
444     else:
445       raise AssertionError("Unknown phase '%s'" % phase)
446
447     env = {}
448
449     if self.lu.HPATH is not None:
450       (lu_env, lu_nodes_pre, lu_nodes_post) = self.lu.BuildHooksEnv()
451       if lu_env:
452         assert not compat.any(key.upper().startswith(prefix)
453                               for key in lu_env)
454         env.update(("%s%s" % (prefix, key), value)
455                    for (key, value) in lu_env.items())
456     else:
457       lu_nodes_pre = lu_nodes_post = []
458
459     if phase == constants.HOOKS_PHASE_PRE:
460       assert compat.all((key.startswith("GANETI_") and
461                          not key.startswith("GANETI_POST_"))
462                         for key in env)
463
464       # Record environment for any post-phase hooks
465       self.pre_env = env
466
467     elif phase == constants.HOOKS_PHASE_POST:
468       assert compat.all(key.startswith("GANETI_POST_") for key in env)
469
470       if self.pre_env:
471         assert not compat.any(key.startswith("GANETI_POST_")
472                               for key in self.pre_env)
473         env.update(self.pre_env)
474     else:
475       raise AssertionError("Unknown phase '%s'" % phase)
476
477     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
478
479   def _RunWrapper(self, node_list, hpath, phase, phase_env):
480     """Simple wrapper over self.callfn.
481
482     This method fixes the environment before doing the rpc call.
483
484     """
485     cfg = self.lu.cfg
486
487     env = {
488       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
489       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
490       "GANETI_OP_CODE": self.op.OP_ID,
491       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
492       "GANETI_DATA_DIR": constants.DATA_DIR,
493       "GANETI_HOOKS_PHASE": phase,
494       "GANETI_HOOKS_PATH": hpath,
495       }
496
497     if cfg is not None:
498       env["GANETI_CLUSTER"] = cfg.GetClusterName()
499       env["GANETI_MASTER"] = cfg.GetMasterNode()
500
501     if phase_env:
502       assert not (set(env) & set(phase_env)), "Environment variables conflict"
503       env.update(phase_env)
504
505     # Convert everything to strings
506     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
507
508     assert compat.all(key == "PATH" or key.startswith("GANETI_")
509                       for key in env)
510
511     return self.callfn(node_list, hpath, phase, env)
512
513   def RunPhase(self, phase, nodes=None):
514     """Run all the scripts for a phase.
515
516     This is the main function of the HookMaster.
517
518     @param phase: one of L{constants.HOOKS_PHASE_POST} or
519         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
520     @param nodes: overrides the predefined list of nodes for the given phase
521     @return: the processed results of the hooks multi-node rpc call
522     @raise errors.HooksFailure: on communication failure to the nodes
523     @raise errors.HooksAbort: on failure of one of the hooks
524
525     """
526     (env, node_list_pre, node_list_post) = self._BuildEnv(phase)
527     if nodes is None:
528       if phase == constants.HOOKS_PHASE_PRE:
529         self.pre_nodes = (node_list_pre, node_list_post)
530         nodes = node_list_pre
531       elif phase == constants.HOOKS_PHASE_POST:
532         post_nodes = (node_list_pre, node_list_post)
533         assert self.pre_nodes == post_nodes, \
534                ("Node lists returned for post-phase hook don't match pre-phase"
535                 " lists (pre %s, post %s)" % (self.pre_nodes, post_nodes))
536         nodes = node_list_post
537       else:
538         raise AssertionError("Unknown phase '%s'" % phase)
539
540     if not nodes:
541       # empty node list, we should not attempt to run this as either
542       # we're in the cluster init phase and the rpc client part can't
543       # even attempt to run, or this LU doesn't do hooks at all
544       return
545
546     results = self._RunWrapper(nodes, self.lu.HPATH, phase, env)
547     if not results:
548       msg = "Communication Failure"
549       if phase == constants.HOOKS_PHASE_PRE:
550         raise errors.HooksFailure(msg)
551       else:
552         self.lu.LogWarning(msg)
553         return results
554
555     errs = []
556     for node_name in results:
557       res = results[node_name]
558       if res.offline:
559         continue
560
561       msg = res.fail_msg
562       if msg:
563         self.lu.LogWarning("Communication failure to node %s: %s",
564                            node_name, msg)
565         continue
566
567       for script, hkr, output in res.payload:
568         if hkr == constants.HKR_FAIL:
569           if phase == constants.HOOKS_PHASE_PRE:
570             errs.append((node_name, script, output))
571           else:
572             if not output:
573               output = "(no output)"
574             self.lu.LogWarning("On %s script %s failed, output: %s" %
575                                (node_name, script, output))
576
577     if errs and phase == constants.HOOKS_PHASE_PRE:
578       raise errors.HooksAbort(errs)
579
580     return results
581
582   def RunConfigUpdate(self):
583     """Run the special configuration update hook
584
585     This is a special hook that runs only on the master after each
586     top-level LI if the configuration has been updated.
587
588     """
589     if self.pre_env is None:
590       raise AssertionError("Pre-phase must be run before configuration update")
591
592     phase = constants.HOOKS_PHASE_POST
593     hpath = constants.HOOKS_NAME_CFGUPDATE
594     nodes = [self.lu.cfg.GetMasterNode()]
595     self._RunWrapper(nodes, hpath, phase, self.pre_env)