Add config.DropECReservations
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32 import random
33 import time
34
35 from ganeti import opcodes
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import rpc
39 from ganeti import cmdlib
40 from ganeti import locking
41
42
43 class _LockAcquireTimeout(Exception):
44   """Internal exception to report timeouts on acquiring locks.
45
46   """
47
48
49 def _CalculateLockAttemptTimeouts():
50   """Calculate timeouts for lock attempts.
51
52   """
53   running_sum = 0
54   result = [1.0]
55
56   # Wait for a total of at least 150s before doing a blocking acquire
57   while sum(result) < 150.0:
58     timeout = (result[-1] * 1.05) ** 1.25
59
60     # Cap timeout at 10 seconds. This gives other jobs a chance to run
61     # even if we're still trying to get our locks, before finally moving
62     # to a blocking acquire.
63     if timeout > 10.0:
64       timeout = 10.0
65
66     elif timeout < 0.1:
67       # Lower boundary for safety
68       timeout = 0.1
69
70     result.append(timeout)
71
72   return result
73
74
75 class _LockAttemptTimeoutStrategy(object):
76   """Class with lock acquire timeout strategy.
77
78   """
79   __slots__ = [
80     "_attempt",
81     "_random_fn",
82     "_start_time",
83     "_time_fn",
84     "_running_timeout",
85     ]
86
87   _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
88
89   def __init__(self, attempt=0, _time_fn=time.time, _random_fn=random.random):
90     """Initializes this class.
91
92     @type attempt: int
93     @param attempt: Current attempt number
94     @param _time_fn: Time function for unittests
95     @param _random_fn: Random number generator for unittests
96
97     """
98     object.__init__(self)
99
100     if attempt < 0:
101       raise ValueError("Attempt must be zero or positive")
102
103     self._attempt = attempt
104     self._time_fn = _time_fn
105     self._random_fn = _random_fn
106
107     try:
108       timeout = self._TIMEOUT_PER_ATTEMPT[attempt]
109     except IndexError:
110       # No more timeouts, do blocking acquire
111       timeout = None
112
113     self._running_timeout = locking.RunningTimeout(timeout, False,
114                                                    _time_fn=_time_fn)
115
116   def NextAttempt(self):
117     """Returns the strategy for the next attempt.
118
119     """
120     return _LockAttemptTimeoutStrategy(attempt=self._attempt + 1,
121                                        _time_fn=self._time_fn,
122                                        _random_fn=self._random_fn)
123
124   def CalcRemainingTimeout(self):
125     """Returns the remaining timeout.
126
127     """
128     timeout = self._running_timeout.Remaining()
129
130     if timeout is not None:
131       # Add a small variation (-/+ 5%) to timeout. This helps in situations
132       # where two or more jobs are fighting for the same lock(s).
133       variation_range = timeout * 0.1
134       timeout += ((self._random_fn() * variation_range) -
135                   (variation_range * 0.5))
136
137     return timeout
138
139
140 class OpExecCbBase:
141   """Base class for OpCode execution callbacks.
142
143   """
144   def NotifyStart(self):
145     """Called when we are about to execute the LU.
146
147     This function is called when we're about to start the lu's Exec() method,
148     that is, after we have acquired all locks.
149
150     """
151
152   def Feedback(self, *args):
153     """Sends feedback from the LU code to the end-user.
154
155     """
156
157   def ReportLocks(self, msg):
158     """Report lock operations.
159
160     """
161
162
163 class Processor(object):
164   """Object which runs OpCodes"""
165   DISPATCH_TABLE = {
166     # Cluster
167     opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
168     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
169     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
170     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
171     opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
172     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
173     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
174     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
175     opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
176     opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
177     # node lu
178     opcodes.OpAddNode: cmdlib.LUAddNode,
179     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
180     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
181     opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
182     opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
183     opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
184     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
185     opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
186     opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
187     opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
188     opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
189     # instance lu
190     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
191     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
192     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
193     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
194     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
195     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
196     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
197     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
198     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
199     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
200     opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
201     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
202     opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
203     opcodes.OpMoveInstance: cmdlib.LUMoveInstance,
204     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
205     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
206     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
207     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
208     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
209     # os lu
210     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
211     # exports lu
212     opcodes.OpQueryExports: cmdlib.LUQueryExports,
213     opcodes.OpExportInstance: cmdlib.LUExportInstance,
214     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
215     # tags lu
216     opcodes.OpGetTags: cmdlib.LUGetTags,
217     opcodes.OpSearchTags: cmdlib.LUSearchTags,
218     opcodes.OpAddTags: cmdlib.LUAddTags,
219     opcodes.OpDelTags: cmdlib.LUDelTags,
220     # test lu
221     opcodes.OpTestDelay: cmdlib.LUTestDelay,
222     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
223     }
224
225   def __init__(self, context, ec_id):
226     """Constructor for Processor
227
228     @type context: GanetiContext
229     @param context: global Ganeti context
230     @type ec_id: string
231     @param ec_id: execution context identifier
232
233     """
234     self.context = context
235     self._ec_id = ec_id
236     self._cbs = None
237     self.rpc = rpc.RpcRunner(context.cfg)
238     self.hmclass = HooksMaster
239
240   def _ReportLocks(self, level, names, shared, timeout, acquired, result):
241     """Reports lock operations.
242
243     @type level: int
244     @param level: Lock level
245     @type names: list or string
246     @param names: Lock names
247     @type shared: bool
248     @param shared: Whether the locks should be acquired in shared mode
249     @type timeout: None or float
250     @param timeout: Timeout for acquiring the locks
251     @type acquired: bool
252     @param acquired: Whether the locks have already been acquired
253     @type result: None or set
254     @param result: Result from L{locking.GanetiLockManager.acquire}
255
256     """
257     parts = []
258
259     # Build message
260     if acquired:
261       if result is None:
262         parts.append("timeout")
263       else:
264         parts.append("acquired")
265     else:
266       parts.append("waiting")
267       if timeout is None:
268         parts.append("blocking")
269       else:
270         parts.append("timeout=%0.6fs" % timeout)
271
272     parts.append(locking.LEVEL_NAMES[level])
273
274     if names == locking.ALL_SET:
275       parts.append("ALL")
276     elif isinstance(names, basestring):
277       parts.append(names)
278     else:
279       parts.append(",".join(names))
280
281     if shared:
282       parts.append("shared")
283     else:
284       parts.append("exclusive")
285
286     msg = "/".join(parts)
287
288     logging.debug("LU locks %s", msg)
289
290     if self._cbs:
291       self._cbs.ReportLocks(msg)
292
293   def _AcquireLocks(self, level, names, shared, timeout):
294     """Acquires locks via the Ganeti lock manager.
295
296     @type level: int
297     @param level: Lock level
298     @type names: list or string
299     @param names: Lock names
300     @type shared: bool
301     @param shared: Whether the locks should be acquired in shared mode
302     @type timeout: None or float
303     @param timeout: Timeout for acquiring the locks
304
305     """
306     self._ReportLocks(level, names, shared, timeout, False, None)
307
308     acquired = self.context.glm.acquire(level, names, shared=shared,
309                                         timeout=timeout)
310
311     self._ReportLocks(level, names, shared, timeout, True, acquired)
312
313     return acquired
314
315   def _ExecLU(self, lu):
316     """Logical Unit execution sequence.
317
318     """
319     write_count = self.context.cfg.write_count
320     lu.CheckPrereq()
321     hm = HooksMaster(self.rpc.call_hooks_runner, lu)
322     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
323     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
324                      self._Feedback, None)
325
326     if getattr(lu.op, "dry_run", False):
327       # in this mode, no post-hooks are run, and the config is not
328       # written (as it might have been modified by another LU, and we
329       # shouldn't do writeout on behalf of other threads
330       self.LogInfo("dry-run mode requested, not actually executing"
331                    " the operation")
332       return lu.dry_run_result
333
334     try:
335       result = lu.Exec(self._Feedback)
336       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
337       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
338                                 self._Feedback, result)
339     finally:
340       # FIXME: This needs locks if not lu_class.REQ_BGL
341       if write_count != self.context.cfg.write_count:
342         hm.RunConfigUpdate()
343
344     return result
345
346   def _LockAndExecLU(self, lu, level, calc_timeout):
347     """Execute a Logical Unit, with the needed locks.
348
349     This is a recursive function that starts locking the given level, and
350     proceeds up, till there are no more locks to acquire. Then it executes the
351     given LU and its opcodes.
352
353     """
354     adding_locks = level in lu.add_locks
355     acquiring_locks = level in lu.needed_locks
356     if level not in locking.LEVELS:
357       if self._cbs:
358         self._cbs.NotifyStart()
359
360       result = self._ExecLU(lu)
361
362     elif adding_locks and acquiring_locks:
363       # We could both acquire and add locks at the same level, but for now we
364       # don't need this, so we'll avoid the complicated code needed.
365       raise NotImplementedError("Can't declare locks to acquire when adding"
366                                 " others")
367
368     elif adding_locks or acquiring_locks:
369       lu.DeclareLocks(level)
370       share = lu.share_locks[level]
371
372       try:
373         assert adding_locks ^ acquiring_locks, \
374           "Locks must be either added or acquired"
375
376         if acquiring_locks:
377           # Acquiring locks
378           needed_locks = lu.needed_locks[level]
379
380           acquired = self._AcquireLocks(level, needed_locks, share,
381                                         calc_timeout())
382
383           if acquired is None:
384             raise _LockAcquireTimeout()
385
386         else:
387           # Adding locks
388           add_locks = lu.add_locks[level]
389           lu.remove_locks[level] = add_locks
390
391           try:
392             self.context.glm.add(level, add_locks, acquired=1, shared=share)
393           except errors.LockError:
394             raise errors.OpPrereqError(
395               "Couldn't add locks (%s), probably because of a race condition"
396               " with another job, who added them first" % add_locks,
397               errors.ECODE_FAULT)
398
399           acquired = add_locks
400
401         try:
402           lu.acquired_locks[level] = acquired
403
404           result = self._LockAndExecLU(lu, level + 1, calc_timeout)
405         finally:
406           if level in lu.remove_locks:
407             self.context.glm.remove(level, lu.remove_locks[level])
408       finally:
409         if self.context.glm.is_owned(level):
410           self.context.glm.release(level)
411
412     else:
413       result = self._LockAndExecLU(lu, level + 1, calc_timeout)
414
415     return result
416
417   def ExecOpCode(self, op, cbs):
418     """Execute an opcode.
419
420     @type op: an OpCode instance
421     @param op: the opcode to be executed
422     @type cbs: L{OpExecCbBase}
423     @param cbs: Runtime callbacks
424
425     """
426     if not isinstance(op, opcodes.OpCode):
427       raise errors.ProgrammerError("Non-opcode instance passed"
428                                    " to ExecOpcode")
429
430     self._cbs = cbs
431     try:
432       lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
433       if lu_class is None:
434         raise errors.OpCodeUnknown("Unknown opcode")
435
436       timeout_strategy = _LockAttemptTimeoutStrategy()
437
438       while True:
439         try:
440           acquire_timeout = timeout_strategy.CalcRemainingTimeout()
441
442           # Acquire the Big Ganeti Lock exclusively if this LU requires it,
443           # and in a shared fashion otherwise (to prevent concurrent run with
444           # an exclusive LU.
445           if self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
446                                 not lu_class.REQ_BGL, acquire_timeout) is None:
447             raise _LockAcquireTimeout()
448
449           try:
450             lu = lu_class(self, op, self.context, self.rpc)
451             lu.ExpandNames()
452             assert lu.needed_locks is not None, "needed_locks not set by LU"
453
454             try:
455               return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE,
456                                          timeout_strategy.CalcRemainingTimeout)
457             finally:
458               if self._ec_id:
459                 self.context.cfg.DropECReservations(self._ec_id)
460
461           finally:
462             self.context.glm.release(locking.LEVEL_CLUSTER)
463
464         except _LockAcquireTimeout:
465           # Timeout while waiting for lock, try again
466           pass
467
468         timeout_strategy = timeout_strategy.NextAttempt()
469
470     finally:
471       self._cbs = None
472
473   def _Feedback(self, *args):
474     """Forward call to feedback callback function.
475
476     """
477     if self._cbs:
478       self._cbs.Feedback(*args)
479
480   def LogStep(self, current, total, message):
481     """Log a change in LU execution progress.
482
483     """
484     logging.debug("Step %d/%d %s", current, total, message)
485     self._Feedback("STEP %d/%d %s" % (current, total, message))
486
487   def LogWarning(self, message, *args, **kwargs):
488     """Log a warning to the logs and the user.
489
490     The optional keyword argument is 'hint' and can be used to show a
491     hint to the user (presumably related to the warning). If the
492     message is empty, it will not be printed at all, allowing one to
493     show only a hint.
494
495     """
496     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
497            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
498     if args:
499       message = message % tuple(args)
500     if message:
501       logging.warning(message)
502       self._Feedback(" - WARNING: %s" % message)
503     if "hint" in kwargs:
504       self._Feedback("      Hint: %s" % kwargs["hint"])
505
506   def LogInfo(self, message, *args):
507     """Log an informational message to the logs and the user.
508
509     """
510     if args:
511       message = message % tuple(args)
512     logging.info(message)
513     self._Feedback(" - INFO: %s" % message)
514
515   def GetECId(self):
516     if not self._ec_id:
517       errors.ProgrammerError("Tried to use execution context id when not set")
518     return self._ec_id
519
520
521 class HooksMaster(object):
522   """Hooks master.
523
524   This class distributes the run commands to the nodes based on the
525   specific LU class.
526
527   In order to remove the direct dependency on the rpc module, the
528   constructor needs a function which actually does the remote
529   call. This will usually be rpc.call_hooks_runner, but any function
530   which behaves the same works.
531
532   """
533   def __init__(self, callfn, lu):
534     self.callfn = callfn
535     self.lu = lu
536     self.op = lu.op
537     self.env, node_list_pre, node_list_post = self._BuildEnv()
538     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
539                       constants.HOOKS_PHASE_POST: node_list_post}
540
541   def _BuildEnv(self):
542     """Compute the environment and the target nodes.
543
544     Based on the opcode and the current node list, this builds the
545     environment for the hooks and the target node list for the run.
546
547     """
548     env = {
549       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
550       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
551       "GANETI_OP_CODE": self.op.OP_ID,
552       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
553       "GANETI_DATA_DIR": constants.DATA_DIR,
554       }
555
556     if self.lu.HPATH is not None:
557       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
558       if lu_env:
559         for key in lu_env:
560           env["GANETI_" + key] = lu_env[key]
561     else:
562       lu_nodes_pre = lu_nodes_post = []
563
564     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
565
566   def _RunWrapper(self, node_list, hpath, phase):
567     """Simple wrapper over self.callfn.
568
569     This method fixes the environment before doing the rpc call.
570
571     """
572     env = self.env.copy()
573     env["GANETI_HOOKS_PHASE"] = phase
574     env["GANETI_HOOKS_PATH"] = hpath
575     if self.lu.cfg is not None:
576       env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
577       env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
578
579     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
580
581     return self.callfn(node_list, hpath, phase, env)
582
583   def RunPhase(self, phase, nodes=None):
584     """Run all the scripts for a phase.
585
586     This is the main function of the HookMaster.
587
588     @param phase: one of L{constants.HOOKS_PHASE_POST} or
589         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
590     @param nodes: overrides the predefined list of nodes for the given phase
591     @return: the processed results of the hooks multi-node rpc call
592     @raise errors.HooksFailure: on communication failure to the nodes
593     @raise errors.HooksAbort: on failure of one of the hooks
594
595     """
596     if not self.node_list[phase] and not nodes:
597       # empty node list, we should not attempt to run this as either
598       # we're in the cluster init phase and the rpc client part can't
599       # even attempt to run, or this LU doesn't do hooks at all
600       return
601     hpath = self.lu.HPATH
602     if nodes is not None:
603       results = self._RunWrapper(nodes, hpath, phase)
604     else:
605       results = self._RunWrapper(self.node_list[phase], hpath, phase)
606     errs = []
607     if not results:
608       msg = "Communication Failure"
609       if phase == constants.HOOKS_PHASE_PRE:
610         raise errors.HooksFailure(msg)
611       else:
612         self.lu.LogWarning(msg)
613         return results
614     for node_name in results:
615       res = results[node_name]
616       if res.offline:
617         continue
618       msg = res.fail_msg
619       if msg:
620         self.lu.LogWarning("Communication failure to node %s: %s",
621                            node_name, msg)
622         continue
623       for script, hkr, output in res.payload:
624         if hkr == constants.HKR_FAIL:
625           if phase == constants.HOOKS_PHASE_PRE:
626             errs.append((node_name, script, output))
627           else:
628             if not output:
629               output = "(no output)"
630             self.lu.LogWarning("On %s script %s failed, output: %s" %
631                                (node_name, script, output))
632     if errs and phase == constants.HOOKS_PHASE_PRE:
633       raise errors.HooksAbort(errs)
634     return results
635
636   def RunConfigUpdate(self):
637     """Run the special configuration update hook
638
639     This is a special hook that runs only on the master after each
640     top-level LI if the configuration has been updated.
641
642     """
643     phase = constants.HOOKS_PHASE_POST
644     hpath = constants.HOOKS_NAME_CFGUPDATE
645     nodes = [self.lu.cfg.GetMasterNode()]
646     self._RunWrapper(nodes, hpath, phase)