Fix LU processor's GetECId
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32 import random
33 import time
34
35 from ganeti import opcodes
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import rpc
39 from ganeti import cmdlib
40 from ganeti import locking
41 from ganeti import utils
42
43
44 _OP_PREFIX = "Op"
45 _LU_PREFIX = "LU"
46
47
48 class LockAcquireTimeout(Exception):
49   """Exception to report timeouts on acquiring locks.
50
51   """
52
53
54 def _CalculateLockAttemptTimeouts():
55   """Calculate timeouts for lock attempts.
56
57   """
58   result = [1.0]
59
60   # Wait for a total of at least 150s before doing a blocking acquire
61   while sum(result) < 150.0:
62     timeout = (result[-1] * 1.05) ** 1.25
63
64     # Cap timeout at 10 seconds. This gives other jobs a chance to run
65     # even if we're still trying to get our locks, before finally moving
66     # to a blocking acquire.
67     if timeout > 10.0:
68       timeout = 10.0
69
70     elif timeout < 0.1:
71       # Lower boundary for safety
72       timeout = 0.1
73
74     result.append(timeout)
75
76   return result
77
78
79 class LockAttemptTimeoutStrategy(object):
80   """Class with lock acquire timeout strategy.
81
82   """
83   __slots__ = [
84     "_timeouts",
85     "_random_fn",
86     "_time_fn",
87     ]
88
89   _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
90
91   def __init__(self, _time_fn=time.time, _random_fn=random.random):
92     """Initializes this class.
93
94     @param _time_fn: Time function for unittests
95     @param _random_fn: Random number generator for unittests
96
97     """
98     object.__init__(self)
99
100     self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
101     self._time_fn = _time_fn
102     self._random_fn = _random_fn
103
104   def NextAttempt(self):
105     """Returns the timeout for the next attempt.
106
107     """
108     try:
109       timeout = self._timeouts.next()
110     except StopIteration:
111       # No more timeouts, do blocking acquire
112       timeout = None
113
114     if timeout is not None:
115       # Add a small variation (-/+ 5%) to timeout. This helps in situations
116       # where two or more jobs are fighting for the same lock(s).
117       variation_range = timeout * 0.1
118       timeout += ((self._random_fn() * variation_range) -
119                   (variation_range * 0.5))
120
121     return timeout
122
123
124 class OpExecCbBase: # pylint: disable-msg=W0232
125   """Base class for OpCode execution callbacks.
126
127   """
128   def NotifyStart(self):
129     """Called when we are about to execute the LU.
130
131     This function is called when we're about to start the lu's Exec() method,
132     that is, after we have acquired all locks.
133
134     """
135
136   def Feedback(self, *args):
137     """Sends feedback from the LU code to the end-user.
138
139     """
140
141   def CheckCancel(self):
142     """Check whether job has been cancelled.
143
144     """
145
146
147 def _LUNameForOpName(opname):
148   """Computes the LU name for a given OpCode name.
149
150   """
151   assert opname.startswith(_OP_PREFIX), \
152       "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
153
154   return _LU_PREFIX + opname[len(_OP_PREFIX):]
155
156
157 def _ComputeDispatchTable():
158   """Computes the opcode-to-lu dispatch table.
159
160   """
161   return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
162               for op in opcodes.OP_MAPPING.values()
163               if op.WITH_LU)
164
165
166 class Processor(object):
167   """Object which runs OpCodes"""
168   DISPATCH_TABLE = _ComputeDispatchTable()
169
170   def __init__(self, context, ec_id):
171     """Constructor for Processor
172
173     @type context: GanetiContext
174     @param context: global Ganeti context
175     @type ec_id: string
176     @param ec_id: execution context identifier
177
178     """
179     self.context = context
180     self._ec_id = ec_id
181     self._cbs = None
182     self.rpc = rpc.RpcRunner(context.cfg)
183     self.hmclass = HooksMaster
184
185   def _AcquireLocks(self, level, names, shared, timeout, priority):
186     """Acquires locks via the Ganeti lock manager.
187
188     @type level: int
189     @param level: Lock level
190     @type names: list or string
191     @param names: Lock names
192     @type shared: bool
193     @param shared: Whether the locks should be acquired in shared mode
194     @type timeout: None or float
195     @param timeout: Timeout for acquiring the locks
196     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
197         amount of time
198
199     """
200     if self._cbs:
201       self._cbs.CheckCancel()
202
203     acquired = self.context.glm.acquire(level, names, shared=shared,
204                                         timeout=timeout, priority=priority)
205
206     if acquired is None:
207       raise LockAcquireTimeout()
208
209     return acquired
210
211   def _ExecLU(self, lu):
212     """Logical Unit execution sequence.
213
214     """
215     write_count = self.context.cfg.write_count
216     lu.CheckPrereq()
217     hm = HooksMaster(self.rpc.call_hooks_runner, lu)
218     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
219     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
220                      self.Log, None)
221
222     if getattr(lu.op, "dry_run", False):
223       # in this mode, no post-hooks are run, and the config is not
224       # written (as it might have been modified by another LU, and we
225       # shouldn't do writeout on behalf of other threads
226       self.LogInfo("dry-run mode requested, not actually executing"
227                    " the operation")
228       return lu.dry_run_result
229
230     try:
231       result = lu.Exec(self.Log)
232       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
233       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
234                                 self.Log, result)
235     finally:
236       # FIXME: This needs locks if not lu_class.REQ_BGL
237       if write_count != self.context.cfg.write_count:
238         hm.RunConfigUpdate()
239
240     return result
241
242   def _LockAndExecLU(self, lu, level, calc_timeout, priority):
243     """Execute a Logical Unit, with the needed locks.
244
245     This is a recursive function that starts locking the given level, and
246     proceeds up, till there are no more locks to acquire. Then it executes the
247     given LU and its opcodes.
248
249     """
250     adding_locks = level in lu.add_locks
251     acquiring_locks = level in lu.needed_locks
252     if level not in locking.LEVELS:
253       if self._cbs:
254         self._cbs.NotifyStart()
255
256       result = self._ExecLU(lu)
257
258     elif adding_locks and acquiring_locks:
259       # We could both acquire and add locks at the same level, but for now we
260       # don't need this, so we'll avoid the complicated code needed.
261       raise NotImplementedError("Can't declare locks to acquire when adding"
262                                 " others")
263
264     elif adding_locks or acquiring_locks:
265       lu.DeclareLocks(level)
266       share = lu.share_locks[level]
267
268       try:
269         assert adding_locks ^ acquiring_locks, \
270           "Locks must be either added or acquired"
271
272         if acquiring_locks:
273           # Acquiring locks
274           needed_locks = lu.needed_locks[level]
275
276           acquired = self._AcquireLocks(level, needed_locks, share,
277                                         calc_timeout(), priority)
278         else:
279           # Adding locks
280           add_locks = lu.add_locks[level]
281           lu.remove_locks[level] = add_locks
282
283           try:
284             self.context.glm.add(level, add_locks, acquired=1, shared=share)
285           except errors.LockError:
286             raise errors.OpPrereqError(
287               "Couldn't add locks (%s), probably because of a race condition"
288               " with another job, who added them first" % add_locks,
289               errors.ECODE_FAULT)
290
291           acquired = add_locks
292
293         try:
294           lu.acquired_locks[level] = acquired
295
296           result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
297         finally:
298           if level in lu.remove_locks:
299             self.context.glm.remove(level, lu.remove_locks[level])
300       finally:
301         if self.context.glm.is_owned(level):
302           self.context.glm.release(level)
303
304     else:
305       result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
306
307     return result
308
309   def ExecOpCode(self, op, cbs, timeout=None, priority=None):
310     """Execute an opcode.
311
312     @type op: an OpCode instance
313     @param op: the opcode to be executed
314     @type cbs: L{OpExecCbBase}
315     @param cbs: Runtime callbacks
316     @type timeout: float or None
317     @param timeout: Maximum time to acquire all locks, None for no timeout
318     @type priority: number or None
319     @param priority: Priority for acquiring lock(s)
320     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
321         amount of time
322
323     """
324     if not isinstance(op, opcodes.OpCode):
325       raise errors.ProgrammerError("Non-opcode instance passed"
326                                    " to ExecOpcode")
327
328     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
329     if lu_class is None:
330       raise errors.OpCodeUnknown("Unknown opcode")
331
332     if timeout is None:
333       calc_timeout = lambda: None
334     else:
335       calc_timeout = utils.RunningTimeout(timeout, False).Remaining
336
337     self._cbs = cbs
338     try:
339       # Acquire the Big Ganeti Lock exclusively if this LU requires it,
340       # and in a shared fashion otherwise (to prevent concurrent run with
341       # an exclusive LU.
342       self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
343                           not lu_class.REQ_BGL, calc_timeout(),
344                           priority)
345       try:
346         lu = lu_class(self, op, self.context, self.rpc)
347         lu.ExpandNames()
348         assert lu.needed_locks is not None, "needed_locks not set by LU"
349
350         try:
351           return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
352                                      priority)
353         finally:
354           if self._ec_id:
355             self.context.cfg.DropECReservations(self._ec_id)
356       finally:
357         self.context.glm.release(locking.LEVEL_CLUSTER)
358     finally:
359       self._cbs = None
360
361   def Log(self, *args):
362     """Forward call to feedback callback function.
363
364     """
365     if self._cbs:
366       self._cbs.Feedback(*args)
367
368   def LogStep(self, current, total, message):
369     """Log a change in LU execution progress.
370
371     """
372     logging.debug("Step %d/%d %s", current, total, message)
373     self.Log("STEP %d/%d %s" % (current, total, message))
374
375   def LogWarning(self, message, *args, **kwargs):
376     """Log a warning to the logs and the user.
377
378     The optional keyword argument is 'hint' and can be used to show a
379     hint to the user (presumably related to the warning). If the
380     message is empty, it will not be printed at all, allowing one to
381     show only a hint.
382
383     """
384     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
385            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
386     if args:
387       message = message % tuple(args)
388     if message:
389       logging.warning(message)
390       self.Log(" - WARNING: %s" % message)
391     if "hint" in kwargs:
392       self.Log("      Hint: %s" % kwargs["hint"])
393
394   def LogInfo(self, message, *args):
395     """Log an informational message to the logs and the user.
396
397     """
398     if args:
399       message = message % tuple(args)
400     logging.info(message)
401     self.Log(" - INFO: %s" % message)
402
403   def GetECId(self):
404     """Returns the current execution context ID.
405
406     """
407     if not self._ec_id:
408       raise errors.ProgrammerError("Tried to use execution context id when"
409                                    " not set")
410     return self._ec_id
411
412
413 class HooksMaster(object):
414   """Hooks master.
415
416   This class distributes the run commands to the nodes based on the
417   specific LU class.
418
419   In order to remove the direct dependency on the rpc module, the
420   constructor needs a function which actually does the remote
421   call. This will usually be rpc.call_hooks_runner, but any function
422   which behaves the same works.
423
424   """
425   def __init__(self, callfn, lu):
426     self.callfn = callfn
427     self.lu = lu
428     self.op = lu.op
429     self.env, node_list_pre, node_list_post = self._BuildEnv()
430     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
431                       constants.HOOKS_PHASE_POST: node_list_post}
432
433   def _BuildEnv(self):
434     """Compute the environment and the target nodes.
435
436     Based on the opcode and the current node list, this builds the
437     environment for the hooks and the target node list for the run.
438
439     """
440     env = {
441       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
442       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
443       "GANETI_OP_CODE": self.op.OP_ID,
444       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
445       "GANETI_DATA_DIR": constants.DATA_DIR,
446       }
447
448     if self.lu.HPATH is not None:
449       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
450       if lu_env:
451         for key in lu_env:
452           env["GANETI_" + key] = lu_env[key]
453     else:
454       lu_nodes_pre = lu_nodes_post = []
455
456     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
457
458   def _RunWrapper(self, node_list, hpath, phase):
459     """Simple wrapper over self.callfn.
460
461     This method fixes the environment before doing the rpc call.
462
463     """
464     env = self.env.copy()
465     env["GANETI_HOOKS_PHASE"] = phase
466     env["GANETI_HOOKS_PATH"] = hpath
467     if self.lu.cfg is not None:
468       env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
469       env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
470
471     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
472
473     return self.callfn(node_list, hpath, phase, env)
474
475   def RunPhase(self, phase, nodes=None):
476     """Run all the scripts for a phase.
477
478     This is the main function of the HookMaster.
479
480     @param phase: one of L{constants.HOOKS_PHASE_POST} or
481         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
482     @param nodes: overrides the predefined list of nodes for the given phase
483     @return: the processed results of the hooks multi-node rpc call
484     @raise errors.HooksFailure: on communication failure to the nodes
485     @raise errors.HooksAbort: on failure of one of the hooks
486
487     """
488     if not self.node_list[phase] and not nodes:
489       # empty node list, we should not attempt to run this as either
490       # we're in the cluster init phase and the rpc client part can't
491       # even attempt to run, or this LU doesn't do hooks at all
492       return
493     hpath = self.lu.HPATH
494     if nodes is not None:
495       results = self._RunWrapper(nodes, hpath, phase)
496     else:
497       results = self._RunWrapper(self.node_list[phase], hpath, phase)
498     errs = []
499     if not results:
500       msg = "Communication Failure"
501       if phase == constants.HOOKS_PHASE_PRE:
502         raise errors.HooksFailure(msg)
503       else:
504         self.lu.LogWarning(msg)
505         return results
506     for node_name in results:
507       res = results[node_name]
508       if res.offline:
509         continue
510       msg = res.fail_msg
511       if msg:
512         self.lu.LogWarning("Communication failure to node %s: %s",
513                            node_name, msg)
514         continue
515       for script, hkr, output in res.payload:
516         if hkr == constants.HKR_FAIL:
517           if phase == constants.HOOKS_PHASE_PRE:
518             errs.append((node_name, script, output))
519           else:
520             if not output:
521               output = "(no output)"
522             self.lu.LogWarning("On %s script %s failed, output: %s" %
523                                (node_name, script, output))
524     if errs and phase == constants.HOOKS_PHASE_PRE:
525       raise errors.HooksAbort(errs)
526     return results
527
528   def RunConfigUpdate(self):
529     """Run the special configuration update hook
530
531     This is a special hook that runs only on the master after each
532     top-level LI if the configuration has been updated.
533
534     """
535     phase = constants.HOOKS_PHASE_POST
536     hpath = constants.HOOKS_NAME_CFGUPDATE
537     nodes = [self.lu.cfg.GetMasterNode()]
538     self._RunWrapper(nodes, hpath, phase)