gnt-*: Print better error message for uninitialized cluster
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32 import random
33 import time
34
35 from ganeti import opcodes
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import rpc
39 from ganeti import cmdlib
40 from ganeti import locking
41
42
43 class _LockAcquireTimeout(Exception):
44   """Internal exception to report timeouts on acquiring locks.
45
46   """
47
48
49 def _CalculateLockAttemptTimeouts():
50   """Calculate timeouts for lock attempts.
51
52   """
53   running_sum = 0
54   result = [1.0]
55
56   # Wait for a total of at least 150s before doing a blocking acquire
57   while sum(result) < 150.0:
58     timeout = (result[-1] * 1.05) ** 1.25
59
60     # Cap timeout at 10 seconds. This gives other jobs a chance to run
61     # even if we're still trying to get our locks, before finally moving
62     # to a blocking acquire.
63     if timeout > 10.0:
64       timeout = 10.0
65
66     elif timeout < 0.1:
67       # Lower boundary for safety
68       timeout = 0.1
69
70     result.append(timeout)
71
72   return result
73
74
75 class _LockAttemptTimeoutStrategy(object):
76   """Class with lock acquire timeout strategy.
77
78   """
79   __slots__ = [
80     "_attempt",
81     "_random_fn",
82     "_start_time",
83     "_time_fn",
84     "_running_timeout",
85     ]
86
87   _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
88
89   def __init__(self, attempt=0, _time_fn=time.time, _random_fn=random.random):
90     """Initializes this class.
91
92     @type attempt: int
93     @param attempt: Current attempt number
94     @param _time_fn: Time function for unittests
95     @param _random_fn: Random number generator for unittests
96
97     """
98     object.__init__(self)
99
100     if attempt < 0:
101       raise ValueError("Attempt must be zero or positive")
102
103     self._attempt = attempt
104     self._time_fn = _time_fn
105     self._random_fn = _random_fn
106
107     try:
108       timeout = self._TIMEOUT_PER_ATTEMPT[attempt]
109     except IndexError:
110       # No more timeouts, do blocking acquire
111       timeout = None
112
113     self._running_timeout = locking.RunningTimeout(timeout, False,
114                                                    _time_fn=_time_fn)
115
116   def NextAttempt(self):
117     """Returns the strategy for the next attempt.
118
119     """
120     return _LockAttemptTimeoutStrategy(attempt=self._attempt + 1,
121                                        _time_fn=self._time_fn,
122                                        _random_fn=self._random_fn)
123
124   def CalcRemainingTimeout(self):
125     """Returns the remaining timeout.
126
127     """
128     timeout = self._running_timeout.Remaining()
129
130     if timeout is not None:
131       # Add a small variation (-/+ 5%) to timeout. This helps in situations
132       # where two or more jobs are fighting for the same lock(s).
133       variation_range = timeout * 0.1
134       timeout += ((self._random_fn() * variation_range) -
135                   (variation_range * 0.5))
136
137     return timeout
138
139
140 class OpExecCbBase:
141   """Base class for OpCode execution callbacks.
142
143   """
144   def NotifyStart(self):
145     """Called when we are about to execute the LU.
146
147     This function is called when we're about to start the lu's Exec() method,
148     that is, after we have acquired all locks.
149
150     """
151
152   def Feedback(self, *args):
153     """Sends feedback from the LU code to the end-user.
154
155     """
156
157   def ReportLocks(self, msg):
158     """Report lock operations.
159
160     """
161
162
163 class Processor(object):
164   """Object which runs OpCodes"""
165   DISPATCH_TABLE = {
166     # Cluster
167     opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
168     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
169     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
170     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
171     opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
172     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
173     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
174     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
175     opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
176     opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
177     # node lu
178     opcodes.OpAddNode: cmdlib.LUAddNode,
179     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
180     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
181     opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
182     opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
183     opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
184     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
185     opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
186     opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
187     opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
188     opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
189     # instance lu
190     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
191     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
192     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
193     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
194     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
195     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
196     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
197     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
198     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
199     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
200     opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
201     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
202     opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
203     opcodes.OpMoveInstance: cmdlib.LUMoveInstance,
204     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
205     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
206     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
207     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
208     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
209     # os lu
210     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
211     # exports lu
212     opcodes.OpQueryExports: cmdlib.LUQueryExports,
213     opcodes.OpExportInstance: cmdlib.LUExportInstance,
214     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
215     # tags lu
216     opcodes.OpGetTags: cmdlib.LUGetTags,
217     opcodes.OpSearchTags: cmdlib.LUSearchTags,
218     opcodes.OpAddTags: cmdlib.LUAddTags,
219     opcodes.OpDelTags: cmdlib.LUDelTags,
220     # test lu
221     opcodes.OpTestDelay: cmdlib.LUTestDelay,
222     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
223     }
224
225   def __init__(self, context):
226     """Constructor for Processor
227
228     """
229     self.context = context
230     self._cbs = None
231     self.rpc = rpc.RpcRunner(context.cfg)
232     self.hmclass = HooksMaster
233
234   def _ReportLocks(self, level, names, shared, timeout, acquired, result):
235     """Reports lock operations.
236
237     @type level: int
238     @param level: Lock level
239     @type names: list or string
240     @param names: Lock names
241     @type shared: bool
242     @param shared: Whether the locks should be acquired in shared mode
243     @type timeout: None or float
244     @param timeout: Timeout for acquiring the locks
245     @type acquired: bool
246     @param acquired: Whether the locks have already been acquired
247     @type result: None or set
248     @param result: Result from L{locking.GanetiLockManager.acquire}
249
250     """
251     parts = []
252
253     # Build message
254     if acquired:
255       if result is None:
256         parts.append("timeout")
257       else:
258         parts.append("acquired")
259     else:
260       parts.append("waiting")
261       if timeout is None:
262         parts.append("blocking")
263       else:
264         parts.append("timeout=%0.6fs" % timeout)
265
266     parts.append(locking.LEVEL_NAMES[level])
267
268     if names == locking.ALL_SET:
269       parts.append("ALL")
270     elif isinstance(names, basestring):
271       parts.append(names)
272     else:
273       parts.append(",".join(names))
274
275     if shared:
276       parts.append("shared")
277     else:
278       parts.append("exclusive")
279
280     msg = "/".join(parts)
281
282     logging.debug("LU locks %s", msg)
283
284     if self._cbs:
285       self._cbs.ReportLocks(msg)
286
287   def _AcquireLocks(self, level, names, shared, timeout):
288     """Acquires locks via the Ganeti lock manager.
289
290     @type level: int
291     @param level: Lock level
292     @type names: list or string
293     @param names: Lock names
294     @type shared: bool
295     @param shared: Whether the locks should be acquired in shared mode
296     @type timeout: None or float
297     @param timeout: Timeout for acquiring the locks
298
299     """
300     self._ReportLocks(level, names, shared, timeout, False, None)
301
302     acquired = self.context.glm.acquire(level, names, shared=shared,
303                                         timeout=timeout)
304
305     self._ReportLocks(level, names, shared, timeout, True, acquired)
306
307     return acquired
308
309   def _ExecLU(self, lu):
310     """Logical Unit execution sequence.
311
312     """
313     write_count = self.context.cfg.write_count
314     lu.CheckPrereq()
315     hm = HooksMaster(self.rpc.call_hooks_runner, lu)
316     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
317     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
318                      self._Feedback, None)
319
320     if getattr(lu.op, "dry_run", False):
321       # in this mode, no post-hooks are run, and the config is not
322       # written (as it might have been modified by another LU, and we
323       # shouldn't do writeout on behalf of other threads
324       self.LogInfo("dry-run mode requested, not actually executing"
325                    " the operation")
326       return lu.dry_run_result
327
328     try:
329       result = lu.Exec(self._Feedback)
330       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
331       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
332                                 self._Feedback, result)
333     finally:
334       # FIXME: This needs locks if not lu_class.REQ_BGL
335       if write_count != self.context.cfg.write_count:
336         hm.RunConfigUpdate()
337
338     return result
339
340   def _LockAndExecLU(self, lu, level, calc_timeout):
341     """Execute a Logical Unit, with the needed locks.
342
343     This is a recursive function that starts locking the given level, and
344     proceeds up, till there are no more locks to acquire. Then it executes the
345     given LU and its opcodes.
346
347     """
348     adding_locks = level in lu.add_locks
349     acquiring_locks = level in lu.needed_locks
350     if level not in locking.LEVELS:
351       if self._cbs:
352         self._cbs.NotifyStart()
353
354       result = self._ExecLU(lu)
355
356     elif adding_locks and acquiring_locks:
357       # We could both acquire and add locks at the same level, but for now we
358       # don't need this, so we'll avoid the complicated code needed.
359       raise NotImplementedError("Can't declare locks to acquire when adding"
360                                 " others")
361
362     elif adding_locks or acquiring_locks:
363       lu.DeclareLocks(level)
364       share = lu.share_locks[level]
365
366       try:
367         assert adding_locks ^ acquiring_locks, \
368           "Locks must be either added or acquired"
369
370         if acquiring_locks:
371           # Acquiring locks
372           needed_locks = lu.needed_locks[level]
373
374           acquired = self._AcquireLocks(level, needed_locks, share,
375                                         calc_timeout())
376
377           if acquired is None:
378             raise _LockAcquireTimeout()
379
380         else:
381           # Adding locks
382           add_locks = lu.add_locks[level]
383           lu.remove_locks[level] = add_locks
384
385           try:
386             self.context.glm.add(level, add_locks, acquired=1, shared=share)
387           except errors.LockError:
388             raise errors.OpPrereqError(
389               "Couldn't add locks (%s), probably because of a race condition"
390               " with another job, who added them first" % add_locks,
391               errors.ECODE_FAULT)
392
393           acquired = add_locks
394
395         try:
396           lu.acquired_locks[level] = acquired
397
398           result = self._LockAndExecLU(lu, level + 1, calc_timeout)
399         finally:
400           if level in lu.remove_locks:
401             self.context.glm.remove(level, lu.remove_locks[level])
402       finally:
403         if self.context.glm.is_owned(level):
404           self.context.glm.release(level)
405
406     else:
407       result = self._LockAndExecLU(lu, level + 1, calc_timeout)
408
409     return result
410
411   def ExecOpCode(self, op, cbs):
412     """Execute an opcode.
413
414     @type op: an OpCode instance
415     @param op: the opcode to be executed
416     @type cbs: L{OpExecCbBase}
417     @param cbs: Runtime callbacks
418
419     """
420     if not isinstance(op, opcodes.OpCode):
421       raise errors.ProgrammerError("Non-opcode instance passed"
422                                    " to ExecOpcode")
423
424     self._cbs = cbs
425     try:
426       lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
427       if lu_class is None:
428         raise errors.OpCodeUnknown("Unknown opcode")
429
430       timeout_strategy = _LockAttemptTimeoutStrategy()
431
432       while True:
433         try:
434           acquire_timeout = timeout_strategy.CalcRemainingTimeout()
435
436           # Acquire the Big Ganeti Lock exclusively if this LU requires it,
437           # and in a shared fashion otherwise (to prevent concurrent run with
438           # an exclusive LU.
439           if self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
440                                 not lu_class.REQ_BGL, acquire_timeout) is None:
441             raise _LockAcquireTimeout()
442
443           try:
444             lu = lu_class(self, op, self.context, self.rpc)
445             lu.ExpandNames()
446             assert lu.needed_locks is not None, "needed_locks not set by LU"
447
448             return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE,
449                                        timeout_strategy.CalcRemainingTimeout)
450           finally:
451             self.context.glm.release(locking.LEVEL_CLUSTER)
452
453         except _LockAcquireTimeout:
454           # Timeout while waiting for lock, try again
455           pass
456
457         timeout_strategy = timeout_strategy.NextAttempt()
458
459     finally:
460       self._cbs = None
461
462   def _Feedback(self, *args):
463     """Forward call to feedback callback function.
464
465     """
466     if self._cbs:
467       self._cbs.Feedback(*args)
468
469   def LogStep(self, current, total, message):
470     """Log a change in LU execution progress.
471
472     """
473     logging.debug("Step %d/%d %s", current, total, message)
474     self._Feedback("STEP %d/%d %s" % (current, total, message))
475
476   def LogWarning(self, message, *args, **kwargs):
477     """Log a warning to the logs and the user.
478
479     The optional keyword argument is 'hint' and can be used to show a
480     hint to the user (presumably related to the warning). If the
481     message is empty, it will not be printed at all, allowing one to
482     show only a hint.
483
484     """
485     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
486            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
487     if args:
488       message = message % tuple(args)
489     if message:
490       logging.warning(message)
491       self._Feedback(" - WARNING: %s" % message)
492     if "hint" in kwargs:
493       self._Feedback("      Hint: %s" % kwargs["hint"])
494
495   def LogInfo(self, message, *args):
496     """Log an informational message to the logs and the user.
497
498     """
499     if args:
500       message = message % tuple(args)
501     logging.info(message)
502     self._Feedback(" - INFO: %s" % message)
503
504
505 class HooksMaster(object):
506   """Hooks master.
507
508   This class distributes the run commands to the nodes based on the
509   specific LU class.
510
511   In order to remove the direct dependency on the rpc module, the
512   constructor needs a function which actually does the remote
513   call. This will usually be rpc.call_hooks_runner, but any function
514   which behaves the same works.
515
516   """
517   def __init__(self, callfn, lu):
518     self.callfn = callfn
519     self.lu = lu
520     self.op = lu.op
521     self.env, node_list_pre, node_list_post = self._BuildEnv()
522     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
523                       constants.HOOKS_PHASE_POST: node_list_post}
524
525   def _BuildEnv(self):
526     """Compute the environment and the target nodes.
527
528     Based on the opcode and the current node list, this builds the
529     environment for the hooks and the target node list for the run.
530
531     """
532     env = {
533       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
534       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
535       "GANETI_OP_CODE": self.op.OP_ID,
536       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
537       "GANETI_DATA_DIR": constants.DATA_DIR,
538       }
539
540     if self.lu.HPATH is not None:
541       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
542       if lu_env:
543         for key in lu_env:
544           env["GANETI_" + key] = lu_env[key]
545     else:
546       lu_nodes_pre = lu_nodes_post = []
547
548     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
549
550   def _RunWrapper(self, node_list, hpath, phase):
551     """Simple wrapper over self.callfn.
552
553     This method fixes the environment before doing the rpc call.
554
555     """
556     env = self.env.copy()
557     env["GANETI_HOOKS_PHASE"] = phase
558     env["GANETI_HOOKS_PATH"] = hpath
559     if self.lu.cfg is not None:
560       env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
561       env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
562
563     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
564
565     return self.callfn(node_list, hpath, phase, env)
566
567   def RunPhase(self, phase, nodes=None):
568     """Run all the scripts for a phase.
569
570     This is the main function of the HookMaster.
571
572     @param phase: one of L{constants.HOOKS_PHASE_POST} or
573         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
574     @param nodes: overrides the predefined list of nodes for the given phase
575     @return: the processed results of the hooks multi-node rpc call
576     @raise errors.HooksFailure: on communication failure to the nodes
577     @raise errors.HooksAbort: on failure of one of the hooks
578
579     """
580     if not self.node_list[phase] and not nodes:
581       # empty node list, we should not attempt to run this as either
582       # we're in the cluster init phase and the rpc client part can't
583       # even attempt to run, or this LU doesn't do hooks at all
584       return
585     hpath = self.lu.HPATH
586     if nodes is not None:
587       results = self._RunWrapper(nodes, hpath, phase)
588     else:
589       results = self._RunWrapper(self.node_list[phase], hpath, phase)
590     errs = []
591     if not results:
592       msg = "Communication Failure"
593       if phase == constants.HOOKS_PHASE_PRE:
594         raise errors.HooksFailure(msg)
595       else:
596         self.lu.LogWarning(msg)
597         return results
598     for node_name in results:
599       res = results[node_name]
600       if res.offline:
601         continue
602       msg = res.fail_msg
603       if msg:
604         self.lu.LogWarning("Communication failure to node %s: %s",
605                            node_name, msg)
606         continue
607       for script, hkr, output in res.payload:
608         if hkr == constants.HKR_FAIL:
609           if phase == constants.HOOKS_PHASE_PRE:
610             errs.append((node_name, script, output))
611           else:
612             if not output:
613               output = "(no output)"
614             self.lu.LogWarning("On %s script %s failed, output: %s" %
615                                (node_name, script, output))
616     if errs and phase == constants.HOOKS_PHASE_PRE:
617       raise errors.HooksAbort(errs)
618     return results
619
620   def RunConfigUpdate(self):
621     """Run the special configuration update hook
622
623     This is a special hook that runs only on the master after each
624     top-level LI if the configuration has been updated.
625
626     """
627     phase = constants.HOOKS_PHASE_POST
628     hpath = constants.HOOKS_NAME_CFGUPDATE
629     nodes = [self.lu.cfg.GetMasterNode()]
630     self._RunWrapper(nodes, hpath, phase)