Fix breakage introduced by commit 8044bf655
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32 import random
33 import time
34
35 from ganeti import opcodes
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import rpc
39 from ganeti import cmdlib
40 from ganeti import locking
41
42
43 class _LockAcquireTimeout(Exception):
44   """Internal exception to report timeouts on acquiring locks.
45
46   """
47
48
49 def _CalculateLockAttemptTimeouts():
50   """Calculate timeouts for lock attempts.
51
52   """
53   result = [1.0]
54
55   # Wait for a total of at least 150s before doing a blocking acquire
56   while sum(result) < 150.0:
57     timeout = (result[-1] * 1.05) ** 1.25
58
59     # Cap timeout at 10 seconds. This gives other jobs a chance to run
60     # even if we're still trying to get our locks, before finally moving
61     # to a blocking acquire.
62     if timeout > 10.0:
63       timeout = 10.0
64
65     elif timeout < 0.1:
66       # Lower boundary for safety
67       timeout = 0.1
68
69     result.append(timeout)
70
71   return result
72
73
74 class _LockAttemptTimeoutStrategy(object):
75   """Class with lock acquire timeout strategy.
76
77   """
78   __slots__ = [
79     "_attempt",
80     "_random_fn",
81     "_start_time",
82     "_time_fn",
83     "_running_timeout",
84     ]
85
86   _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
87
88   def __init__(self, attempt=0, _time_fn=time.time, _random_fn=random.random):
89     """Initializes this class.
90
91     @type attempt: int
92     @param attempt: Current attempt number
93     @param _time_fn: Time function for unittests
94     @param _random_fn: Random number generator for unittests
95
96     """
97     object.__init__(self)
98
99     if attempt < 0:
100       raise ValueError("Attempt must be zero or positive")
101
102     self._attempt = attempt
103     self._time_fn = _time_fn
104     self._random_fn = _random_fn
105
106     try:
107       timeout = self._TIMEOUT_PER_ATTEMPT[attempt]
108     except IndexError:
109       # No more timeouts, do blocking acquire
110       timeout = None
111
112     self._running_timeout = locking.RunningTimeout(timeout, False,
113                                                    _time_fn=_time_fn)
114
115   def NextAttempt(self):
116     """Returns the strategy for the next attempt.
117
118     """
119     return _LockAttemptTimeoutStrategy(attempt=self._attempt + 1,
120                                        _time_fn=self._time_fn,
121                                        _random_fn=self._random_fn)
122
123   def CalcRemainingTimeout(self):
124     """Returns the remaining timeout.
125
126     """
127     timeout = self._running_timeout.Remaining()
128
129     if timeout is not None:
130       # Add a small variation (-/+ 5%) to timeout. This helps in situations
131       # where two or more jobs are fighting for the same lock(s).
132       variation_range = timeout * 0.1
133       timeout += ((self._random_fn() * variation_range) -
134                   (variation_range * 0.5))
135
136     return timeout
137
138
139 class OpExecCbBase: # pylint: disable-msg=W0232
140   """Base class for OpCode execution callbacks.
141
142   """
143   def NotifyStart(self):
144     """Called when we are about to execute the LU.
145
146     This function is called when we're about to start the lu's Exec() method,
147     that is, after we have acquired all locks.
148
149     """
150
151   def Feedback(self, *args):
152     """Sends feedback from the LU code to the end-user.
153
154     """
155
156   def ReportLocks(self, msg):
157     """Report lock operations.
158
159     """
160
161
162 class Processor(object):
163   """Object which runs OpCodes"""
164   DISPATCH_TABLE = {
165     # Cluster
166     opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
167     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
168     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
169     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
170     opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
171     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
172     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
173     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
174     opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
175     opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
176     # node lu
177     opcodes.OpAddNode: cmdlib.LUAddNode,
178     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
179     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
180     opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
181     opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
182     opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
183     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
184     opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
185     opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
186     opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
187     opcodes.OpNodeEvacuationStrategy: cmdlib.LUNodeEvacuationStrategy,
188     # instance lu
189     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
190     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
191     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
192     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
193     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
194     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
195     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
196     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
197     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
198     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
199     opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
200     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
201     opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
202     opcodes.OpMoveInstance: cmdlib.LUMoveInstance,
203     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
204     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
205     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
206     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
207     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
208     # os lu
209     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
210     # exports lu
211     opcodes.OpQueryExports: cmdlib.LUQueryExports,
212     opcodes.OpPrepareExport: cmdlib.LUPrepareExport,
213     opcodes.OpExportInstance: cmdlib.LUExportInstance,
214     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
215     # tags lu
216     opcodes.OpGetTags: cmdlib.LUGetTags,
217     opcodes.OpSearchTags: cmdlib.LUSearchTags,
218     opcodes.OpAddTags: cmdlib.LUAddTags,
219     opcodes.OpDelTags: cmdlib.LUDelTags,
220     # test lu
221     opcodes.OpTestDelay: cmdlib.LUTestDelay,
222     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
223     opcodes.OpTestJobqueue: cmdlib.LUTestJobqueue,
224     }
225
226   def __init__(self, context, ec_id):
227     """Constructor for Processor
228
229     @type context: GanetiContext
230     @param context: global Ganeti context
231     @type ec_id: string
232     @param ec_id: execution context identifier
233
234     """
235     self.context = context
236     self._ec_id = ec_id
237     self._cbs = None
238     self.rpc = rpc.RpcRunner(context.cfg)
239     self.hmclass = HooksMaster
240
241   def _ReportLocks(self, level, names, shared, timeout, acquired, result):
242     """Reports lock operations.
243
244     @type level: int
245     @param level: Lock level
246     @type names: list or string
247     @param names: Lock names
248     @type shared: bool
249     @param shared: Whether the locks should be acquired in shared mode
250     @type timeout: None or float
251     @param timeout: Timeout for acquiring the locks
252     @type acquired: bool
253     @param acquired: Whether the locks have already been acquired
254     @type result: None or set
255     @param result: Result from L{locking.GanetiLockManager.acquire}
256
257     """
258     parts = []
259
260     # Build message
261     if acquired:
262       if result is None:
263         parts.append("timeout")
264       else:
265         parts.append("acquired")
266     else:
267       parts.append("waiting")
268       if timeout is None:
269         parts.append("blocking")
270       else:
271         parts.append("timeout=%0.6fs" % timeout)
272
273     parts.append(locking.LEVEL_NAMES[level])
274
275     if names == locking.ALL_SET:
276       parts.append("ALL")
277     elif isinstance(names, basestring):
278       parts.append(names)
279     else:
280       parts.append(",".join(sorted(names)))
281
282     if shared:
283       parts.append("shared")
284     else:
285       parts.append("exclusive")
286
287     msg = "/".join(parts)
288
289     logging.debug("LU locks %s", msg)
290
291     if self._cbs:
292       self._cbs.ReportLocks(msg)
293
294   def _AcquireLocks(self, level, names, shared, timeout):
295     """Acquires locks via the Ganeti lock manager.
296
297     @type level: int
298     @param level: Lock level
299     @type names: list or string
300     @param names: Lock names
301     @type shared: bool
302     @param shared: Whether the locks should be acquired in shared mode
303     @type timeout: None or float
304     @param timeout: Timeout for acquiring the locks
305
306     """
307     self._ReportLocks(level, names, shared, timeout, False, None)
308
309     acquired = self.context.glm.acquire(level, names, shared=shared,
310                                         timeout=timeout)
311
312     self._ReportLocks(level, names, shared, timeout, True, acquired)
313
314     return acquired
315
316   def _ExecLU(self, lu):
317     """Logical Unit execution sequence.
318
319     """
320     write_count = self.context.cfg.write_count
321     lu.CheckPrereq()
322     hm = HooksMaster(self.rpc.call_hooks_runner, lu)
323     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
324     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
325                      self.Log, None)
326
327     if getattr(lu.op, "dry_run", False):
328       # in this mode, no post-hooks are run, and the config is not
329       # written (as it might have been modified by another LU, and we
330       # shouldn't do writeout on behalf of other threads
331       self.LogInfo("dry-run mode requested, not actually executing"
332                    " the operation")
333       return lu.dry_run_result
334
335     try:
336       result = lu.Exec(self.Log)
337       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
338       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
339                                 self.Log, result)
340     finally:
341       # FIXME: This needs locks if not lu_class.REQ_BGL
342       if write_count != self.context.cfg.write_count:
343         hm.RunConfigUpdate()
344
345     return result
346
347   def _LockAndExecLU(self, lu, level, calc_timeout):
348     """Execute a Logical Unit, with the needed locks.
349
350     This is a recursive function that starts locking the given level, and
351     proceeds up, till there are no more locks to acquire. Then it executes the
352     given LU and its opcodes.
353
354     """
355     adding_locks = level in lu.add_locks
356     acquiring_locks = level in lu.needed_locks
357     if level not in locking.LEVELS:
358       if self._cbs:
359         self._cbs.NotifyStart()
360
361       result = self._ExecLU(lu)
362
363     elif adding_locks and acquiring_locks:
364       # We could both acquire and add locks at the same level, but for now we
365       # don't need this, so we'll avoid the complicated code needed.
366       raise NotImplementedError("Can't declare locks to acquire when adding"
367                                 " others")
368
369     elif adding_locks or acquiring_locks:
370       lu.DeclareLocks(level)
371       share = lu.share_locks[level]
372
373       try:
374         assert adding_locks ^ acquiring_locks, \
375           "Locks must be either added or acquired"
376
377         if acquiring_locks:
378           # Acquiring locks
379           needed_locks = lu.needed_locks[level]
380
381           acquired = self._AcquireLocks(level, needed_locks, share,
382                                         calc_timeout())
383
384           if acquired is None:
385             raise _LockAcquireTimeout()
386
387         else:
388           # Adding locks
389           add_locks = lu.add_locks[level]
390           lu.remove_locks[level] = add_locks
391
392           try:
393             self.context.glm.add(level, add_locks, acquired=1, shared=share)
394           except errors.LockError:
395             raise errors.OpPrereqError(
396               "Couldn't add locks (%s), probably because of a race condition"
397               " with another job, who added them first" % add_locks,
398               errors.ECODE_FAULT)
399
400           acquired = add_locks
401
402         try:
403           lu.acquired_locks[level] = acquired
404
405           result = self._LockAndExecLU(lu, level + 1, calc_timeout)
406         finally:
407           if level in lu.remove_locks:
408             self.context.glm.remove(level, lu.remove_locks[level])
409       finally:
410         if self.context.glm.is_owned(level):
411           self.context.glm.release(level)
412
413     else:
414       result = self._LockAndExecLU(lu, level + 1, calc_timeout)
415
416     return result
417
418   def ExecOpCode(self, op, cbs):
419     """Execute an opcode.
420
421     @type op: an OpCode instance
422     @param op: the opcode to be executed
423     @type cbs: L{OpExecCbBase}
424     @param cbs: Runtime callbacks
425
426     """
427     if not isinstance(op, opcodes.OpCode):
428       raise errors.ProgrammerError("Non-opcode instance passed"
429                                    " to ExecOpcode")
430
431     self._cbs = cbs
432     try:
433       lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
434       if lu_class is None:
435         raise errors.OpCodeUnknown("Unknown opcode")
436
437       timeout_strategy = _LockAttemptTimeoutStrategy()
438
439       while True:
440         try:
441           acquire_timeout = timeout_strategy.CalcRemainingTimeout()
442
443           # Acquire the Big Ganeti Lock exclusively if this LU requires it,
444           # and in a shared fashion otherwise (to prevent concurrent run with
445           # an exclusive LU.
446           if self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
447                                 not lu_class.REQ_BGL, acquire_timeout) is None:
448             raise _LockAcquireTimeout()
449
450           try:
451             lu = lu_class(self, op, self.context, self.rpc)
452             lu.ExpandNames()
453             assert lu.needed_locks is not None, "needed_locks not set by LU"
454
455             try:
456               return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE,
457                                          timeout_strategy.CalcRemainingTimeout)
458             finally:
459               if self._ec_id:
460                 self.context.cfg.DropECReservations(self._ec_id)
461
462           finally:
463             self.context.glm.release(locking.LEVEL_CLUSTER)
464
465         except _LockAcquireTimeout:
466           # Timeout while waiting for lock, try again
467           pass
468
469         timeout_strategy = timeout_strategy.NextAttempt()
470
471     finally:
472       self._cbs = None
473
474   def Log(self, *args):
475     """Forward call to feedback callback function.
476
477     """
478     if self._cbs:
479       self._cbs.Feedback(*args)
480
481   def LogStep(self, current, total, message):
482     """Log a change in LU execution progress.
483
484     """
485     logging.debug("Step %d/%d %s", current, total, message)
486     self.Log("STEP %d/%d %s" % (current, total, message))
487
488   def LogWarning(self, message, *args, **kwargs):
489     """Log a warning to the logs and the user.
490
491     The optional keyword argument is 'hint' and can be used to show a
492     hint to the user (presumably related to the warning). If the
493     message is empty, it will not be printed at all, allowing one to
494     show only a hint.
495
496     """
497     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
498            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
499     if args:
500       message = message % tuple(args)
501     if message:
502       logging.warning(message)
503       self.Log(" - WARNING: %s" % message)
504     if "hint" in kwargs:
505       self.Log("      Hint: %s" % kwargs["hint"])
506
507   def LogInfo(self, message, *args):
508     """Log an informational message to the logs and the user.
509
510     """
511     if args:
512       message = message % tuple(args)
513     logging.info(message)
514     self.Log(" - INFO: %s" % message)
515
516   def GetECId(self):
517     if not self._ec_id:
518       errors.ProgrammerError("Tried to use execution context id when not set")
519     return self._ec_id
520
521
522 class HooksMaster(object):
523   """Hooks master.
524
525   This class distributes the run commands to the nodes based on the
526   specific LU class.
527
528   In order to remove the direct dependency on the rpc module, the
529   constructor needs a function which actually does the remote
530   call. This will usually be rpc.call_hooks_runner, but any function
531   which behaves the same works.
532
533   """
534   def __init__(self, callfn, lu):
535     self.callfn = callfn
536     self.lu = lu
537     self.op = lu.op
538     self.env, node_list_pre, node_list_post = self._BuildEnv()
539     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
540                       constants.HOOKS_PHASE_POST: node_list_post}
541
542   def _BuildEnv(self):
543     """Compute the environment and the target nodes.
544
545     Based on the opcode and the current node list, this builds the
546     environment for the hooks and the target node list for the run.
547
548     """
549     env = {
550       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
551       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
552       "GANETI_OP_CODE": self.op.OP_ID,
553       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
554       "GANETI_DATA_DIR": constants.DATA_DIR,
555       }
556
557     if self.lu.HPATH is not None:
558       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
559       if lu_env:
560         for key in lu_env:
561           env["GANETI_" + key] = lu_env[key]
562     else:
563       lu_nodes_pre = lu_nodes_post = []
564
565     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
566
567   def _RunWrapper(self, node_list, hpath, phase):
568     """Simple wrapper over self.callfn.
569
570     This method fixes the environment before doing the rpc call.
571
572     """
573     env = self.env.copy()
574     env["GANETI_HOOKS_PHASE"] = phase
575     env["GANETI_HOOKS_PATH"] = hpath
576     if self.lu.cfg is not None:
577       env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
578       env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
579
580     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
581
582     return self.callfn(node_list, hpath, phase, env)
583
584   def RunPhase(self, phase, nodes=None):
585     """Run all the scripts for a phase.
586
587     This is the main function of the HookMaster.
588
589     @param phase: one of L{constants.HOOKS_PHASE_POST} or
590         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
591     @param nodes: overrides the predefined list of nodes for the given phase
592     @return: the processed results of the hooks multi-node rpc call
593     @raise errors.HooksFailure: on communication failure to the nodes
594     @raise errors.HooksAbort: on failure of one of the hooks
595
596     """
597     if not self.node_list[phase] and not nodes:
598       # empty node list, we should not attempt to run this as either
599       # we're in the cluster init phase and the rpc client part can't
600       # even attempt to run, or this LU doesn't do hooks at all
601       return
602     hpath = self.lu.HPATH
603     if nodes is not None:
604       results = self._RunWrapper(nodes, hpath, phase)
605     else:
606       results = self._RunWrapper(self.node_list[phase], hpath, phase)
607     errs = []
608     if not results:
609       msg = "Communication Failure"
610       if phase == constants.HOOKS_PHASE_PRE:
611         raise errors.HooksFailure(msg)
612       else:
613         self.lu.LogWarning(msg)
614         return results
615     for node_name in results:
616       res = results[node_name]
617       if res.offline:
618         continue
619       msg = res.fail_msg
620       if msg:
621         self.lu.LogWarning("Communication failure to node %s: %s",
622                            node_name, msg)
623         continue
624       for script, hkr, output in res.payload:
625         if hkr == constants.HKR_FAIL:
626           if phase == constants.HOOKS_PHASE_PRE:
627             errs.append((node_name, script, output))
628           else:
629             if not output:
630               output = "(no output)"
631             self.lu.LogWarning("On %s script %s failed, output: %s" %
632                                (node_name, script, output))
633     if errs and phase == constants.HOOKS_PHASE_PRE:
634       raise errors.HooksAbort(errs)
635     return results
636
637   def RunConfigUpdate(self):
638     """Run the special configuration update hook
639
640     This is a special hook that runs only on the master after each
641     top-level LI if the configuration has been updated.
642
643     """
644     phase = constants.HOOKS_PHASE_POST
645     hpath = constants.HOOKS_NAME_CFGUPDATE
646     nodes = [self.lu.cfg.GetMasterNode()]
647     self._RunWrapper(nodes, hpath, phase)