Small improvements for cluster verify
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32 import random
33 import time
34
35 from ganeti import opcodes
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import rpc
39 from ganeti import cmdlib
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import compat
43
44
45 _OP_PREFIX = "Op"
46 _LU_PREFIX = "LU"
47
48
49 class LockAcquireTimeout(Exception):
50   """Exception to report timeouts on acquiring locks.
51
52   """
53
54
55 def _CalculateLockAttemptTimeouts():
56   """Calculate timeouts for lock attempts.
57
58   """
59   result = [constants.LOCK_ATTEMPTS_MINWAIT]
60   running_sum = result[0]
61
62   # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
63   # blocking acquire
64   while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
65     timeout = (result[-1] * 1.05) ** 1.25
66
67     # Cap max timeout. This gives other jobs a chance to run even if
68     # we're still trying to get our locks, before finally moving to a
69     # blocking acquire.
70     timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
71     # And also cap the lower boundary for safety
72     timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
73
74     result.append(timeout)
75     running_sum += timeout
76
77   return result
78
79
80 class LockAttemptTimeoutStrategy(object):
81   """Class with lock acquire timeout strategy.
82
83   """
84   __slots__ = [
85     "_timeouts",
86     "_random_fn",
87     "_time_fn",
88     ]
89
90   _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
91
92   def __init__(self, _time_fn=time.time, _random_fn=random.random):
93     """Initializes this class.
94
95     @param _time_fn: Time function for unittests
96     @param _random_fn: Random number generator for unittests
97
98     """
99     object.__init__(self)
100
101     self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
102     self._time_fn = _time_fn
103     self._random_fn = _random_fn
104
105   def NextAttempt(self):
106     """Returns the timeout for the next attempt.
107
108     """
109     try:
110       timeout = self._timeouts.next()
111     except StopIteration:
112       # No more timeouts, do blocking acquire
113       timeout = None
114
115     if timeout is not None:
116       # Add a small variation (-/+ 5%) to timeout. This helps in situations
117       # where two or more jobs are fighting for the same lock(s).
118       variation_range = timeout * 0.1
119       timeout += ((self._random_fn() * variation_range) -
120                   (variation_range * 0.5))
121
122     return timeout
123
124
125 class OpExecCbBase: # pylint: disable-msg=W0232
126   """Base class for OpCode execution callbacks.
127
128   """
129   def NotifyStart(self):
130     """Called when we are about to execute the LU.
131
132     This function is called when we're about to start the lu's Exec() method,
133     that is, after we have acquired all locks.
134
135     """
136
137   def Feedback(self, *args):
138     """Sends feedback from the LU code to the end-user.
139
140     """
141
142   def CheckCancel(self):
143     """Check whether job has been cancelled.
144
145     """
146
147   def SubmitManyJobs(self, jobs):
148     """Submits jobs for processing.
149
150     See L{jqueue.JobQueue.SubmitManyJobs}.
151
152     """
153     raise NotImplementedError
154
155
156 def _LUNameForOpName(opname):
157   """Computes the LU name for a given OpCode name.
158
159   """
160   assert opname.startswith(_OP_PREFIX), \
161       "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
162
163   return _LU_PREFIX + opname[len(_OP_PREFIX):]
164
165
166 def _ComputeDispatchTable():
167   """Computes the opcode-to-lu dispatch table.
168
169   """
170   return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
171               for op in opcodes.OP_MAPPING.values()
172               if op.WITH_LU)
173
174
175 class Processor(object):
176   """Object which runs OpCodes"""
177   DISPATCH_TABLE = _ComputeDispatchTable()
178
179   def __init__(self, context, ec_id):
180     """Constructor for Processor
181
182     @type context: GanetiContext
183     @param context: global Ganeti context
184     @type ec_id: string
185     @param ec_id: execution context identifier
186
187     """
188     self.context = context
189     self._ec_id = ec_id
190     self._cbs = None
191     self.rpc = rpc.RpcRunner(context.cfg)
192     self.hmclass = HooksMaster
193
194   def _AcquireLocks(self, level, names, shared, timeout, priority):
195     """Acquires locks via the Ganeti lock manager.
196
197     @type level: int
198     @param level: Lock level
199     @type names: list or string
200     @param names: Lock names
201     @type shared: bool
202     @param shared: Whether the locks should be acquired in shared mode
203     @type timeout: None or float
204     @param timeout: Timeout for acquiring the locks
205     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
206         amount of time
207
208     """
209     if self._cbs:
210       self._cbs.CheckCancel()
211
212     acquired = self.context.glm.acquire(level, names, shared=shared,
213                                         timeout=timeout, priority=priority)
214
215     if acquired is None:
216       raise LockAcquireTimeout()
217
218     return acquired
219
220   def _ProcessResult(self, result):
221     """Examines opcode result.
222
223     If necessary, additional processing on the result is done.
224
225     """
226     if isinstance(result, cmdlib.ResultWithJobs):
227       # Submit jobs
228       job_submission = self._cbs.SubmitManyJobs(result.jobs)
229
230       # Build dictionary
231       result = result.other
232
233       assert constants.JOB_IDS_KEY not in result, \
234         "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
235
236       result[constants.JOB_IDS_KEY] = job_submission
237
238     return result
239
240   def _ExecLU(self, lu):
241     """Logical Unit execution sequence.
242
243     """
244     write_count = self.context.cfg.write_count
245     lu.CheckPrereq()
246     hm = HooksMaster(self.rpc.call_hooks_runner, lu)
247     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
248     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
249                      self.Log, None)
250
251     if getattr(lu.op, "dry_run", False):
252       # in this mode, no post-hooks are run, and the config is not
253       # written (as it might have been modified by another LU, and we
254       # shouldn't do writeout on behalf of other threads
255       self.LogInfo("dry-run mode requested, not actually executing"
256                    " the operation")
257       return lu.dry_run_result
258
259     try:
260       result = self._ProcessResult(lu.Exec(self.Log))
261       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
262       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
263                                 self.Log, result)
264     finally:
265       # FIXME: This needs locks if not lu_class.REQ_BGL
266       if write_count != self.context.cfg.write_count:
267         hm.RunConfigUpdate()
268
269     return result
270
271   def _LockAndExecLU(self, lu, level, calc_timeout, priority):
272     """Execute a Logical Unit, with the needed locks.
273
274     This is a recursive function that starts locking the given level, and
275     proceeds up, till there are no more locks to acquire. Then it executes the
276     given LU and its opcodes.
277
278     """
279     adding_locks = level in lu.add_locks
280     acquiring_locks = level in lu.needed_locks
281     if level not in locking.LEVELS:
282       if self._cbs:
283         self._cbs.NotifyStart()
284
285       result = self._ExecLU(lu)
286
287     elif adding_locks and acquiring_locks:
288       # We could both acquire and add locks at the same level, but for now we
289       # don't need this, so we'll avoid the complicated code needed.
290       raise NotImplementedError("Can't declare locks to acquire when adding"
291                                 " others")
292
293     elif adding_locks or acquiring_locks:
294       lu.DeclareLocks(level)
295       share = lu.share_locks[level]
296
297       try:
298         assert adding_locks ^ acquiring_locks, \
299           "Locks must be either added or acquired"
300
301         if acquiring_locks:
302           # Acquiring locks
303           needed_locks = lu.needed_locks[level]
304
305           self._AcquireLocks(level, needed_locks, share,
306                              calc_timeout(), priority)
307         else:
308           # Adding locks
309           add_locks = lu.add_locks[level]
310           lu.remove_locks[level] = add_locks
311
312           try:
313             self.context.glm.add(level, add_locks, acquired=1, shared=share)
314           except errors.LockError:
315             raise errors.OpPrereqError(
316               "Couldn't add locks (%s), probably because of a race condition"
317               " with another job, who added them first" % add_locks,
318               errors.ECODE_FAULT)
319
320         try:
321           result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
322         finally:
323           if level in lu.remove_locks:
324             self.context.glm.remove(level, lu.remove_locks[level])
325       finally:
326         if self.context.glm.is_owned(level):
327           self.context.glm.release(level)
328
329     else:
330       result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
331
332     return result
333
334   def ExecOpCode(self, op, cbs, timeout=None, priority=None):
335     """Execute an opcode.
336
337     @type op: an OpCode instance
338     @param op: the opcode to be executed
339     @type cbs: L{OpExecCbBase}
340     @param cbs: Runtime callbacks
341     @type timeout: float or None
342     @param timeout: Maximum time to acquire all locks, None for no timeout
343     @type priority: number or None
344     @param priority: Priority for acquiring lock(s)
345     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
346         amount of time
347
348     """
349     if not isinstance(op, opcodes.OpCode):
350       raise errors.ProgrammerError("Non-opcode instance passed"
351                                    " to ExecOpcode (%s)" % type(op))
352
353     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
354     if lu_class is None:
355       raise errors.OpCodeUnknown("Unknown opcode")
356
357     if timeout is None:
358       calc_timeout = lambda: None
359     else:
360       calc_timeout = utils.RunningTimeout(timeout, False).Remaining
361
362     self._cbs = cbs
363     try:
364       # Acquire the Big Ganeti Lock exclusively if this LU requires it,
365       # and in a shared fashion otherwise (to prevent concurrent run with
366       # an exclusive LU.
367       self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
368                           not lu_class.REQ_BGL, calc_timeout(),
369                           priority)
370       try:
371         lu = lu_class(self, op, self.context, self.rpc)
372         lu.ExpandNames()
373         assert lu.needed_locks is not None, "needed_locks not set by LU"
374
375         try:
376           result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
377                                        priority)
378         finally:
379           if self._ec_id:
380             self.context.cfg.DropECReservations(self._ec_id)
381       finally:
382         self.context.glm.release(locking.LEVEL_CLUSTER)
383     finally:
384       self._cbs = None
385
386     resultcheck_fn = op.OP_RESULT
387     if not (resultcheck_fn is None or resultcheck_fn(result)):
388       logging.error("Expected opcode result matching %s, got %s",
389                     resultcheck_fn, result)
390       raise errors.OpResultError("Opcode result does not match %s" %
391                                  resultcheck_fn)
392
393     return result
394
395   def Log(self, *args):
396     """Forward call to feedback callback function.
397
398     """
399     if self._cbs:
400       self._cbs.Feedback(*args)
401
402   def LogStep(self, current, total, message):
403     """Log a change in LU execution progress.
404
405     """
406     logging.debug("Step %d/%d %s", current, total, message)
407     self.Log("STEP %d/%d %s" % (current, total, message))
408
409   def LogWarning(self, message, *args, **kwargs):
410     """Log a warning to the logs and the user.
411
412     The optional keyword argument is 'hint' and can be used to show a
413     hint to the user (presumably related to the warning). If the
414     message is empty, it will not be printed at all, allowing one to
415     show only a hint.
416
417     """
418     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
419            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
420     if args:
421       message = message % tuple(args)
422     if message:
423       logging.warning(message)
424       self.Log(" - WARNING: %s" % message)
425     if "hint" in kwargs:
426       self.Log("      Hint: %s" % kwargs["hint"])
427
428   def LogInfo(self, message, *args):
429     """Log an informational message to the logs and the user.
430
431     """
432     if args:
433       message = message % tuple(args)
434     logging.info(message)
435     self.Log(" - INFO: %s" % message)
436
437   def GetECId(self):
438     """Returns the current execution context ID.
439
440     """
441     if not self._ec_id:
442       raise errors.ProgrammerError("Tried to use execution context id when"
443                                    " not set")
444     return self._ec_id
445
446
447 class HooksMaster(object):
448   """Hooks master.
449
450   This class distributes the run commands to the nodes based on the
451   specific LU class.
452
453   In order to remove the direct dependency on the rpc module, the
454   constructor needs a function which actually does the remote
455   call. This will usually be rpc.call_hooks_runner, but any function
456   which behaves the same works.
457
458   """
459   def __init__(self, callfn, lu):
460     self.callfn = callfn
461     self.lu = lu
462     self.op = lu.op
463     self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
464
465     if self.lu.HPATH is None:
466       nodes = (None, None)
467     else:
468       nodes = map(frozenset, self.lu.BuildHooksNodes())
469
470     (self.pre_nodes, self.post_nodes) = nodes
471
472   def _BuildEnv(self, phase):
473     """Compute the environment and the target nodes.
474
475     Based on the opcode and the current node list, this builds the
476     environment for the hooks and the target node list for the run.
477
478     """
479     if phase == constants.HOOKS_PHASE_PRE:
480       prefix = "GANETI_"
481     elif phase == constants.HOOKS_PHASE_POST:
482       prefix = "GANETI_POST_"
483     else:
484       raise AssertionError("Unknown phase '%s'" % phase)
485
486     env = {}
487
488     if self.lu.HPATH is not None:
489       lu_env = self.lu.BuildHooksEnv()
490       if lu_env:
491         assert not compat.any(key.upper().startswith(prefix) for key in lu_env)
492         env.update(("%s%s" % (prefix, key), value)
493                    for (key, value) in lu_env.items())
494
495     if phase == constants.HOOKS_PHASE_PRE:
496       assert compat.all((key.startswith("GANETI_") and
497                          not key.startswith("GANETI_POST_"))
498                         for key in env)
499
500     elif phase == constants.HOOKS_PHASE_POST:
501       assert compat.all(key.startswith("GANETI_POST_") for key in env)
502       assert isinstance(self.pre_env, dict)
503
504       # Merge with pre-phase environment
505       assert not compat.any(key.startswith("GANETI_POST_")
506                             for key in self.pre_env)
507       env.update(self.pre_env)
508     else:
509       raise AssertionError("Unknown phase '%s'" % phase)
510
511     return env
512
513   def _RunWrapper(self, node_list, hpath, phase, phase_env):
514     """Simple wrapper over self.callfn.
515
516     This method fixes the environment before doing the rpc call.
517
518     """
519     cfg = self.lu.cfg
520
521     env = {
522       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
523       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
524       "GANETI_OP_CODE": self.op.OP_ID,
525       "GANETI_DATA_DIR": constants.DATA_DIR,
526       "GANETI_HOOKS_PHASE": phase,
527       "GANETI_HOOKS_PATH": hpath,
528       }
529
530     if self.lu.HTYPE:
531       env["GANETI_OBJECT_TYPE"] = self.lu.HTYPE
532
533     if cfg is not None:
534       env["GANETI_CLUSTER"] = cfg.GetClusterName()
535       env["GANETI_MASTER"] = cfg.GetMasterNode()
536
537     if phase_env:
538       assert not (set(env) & set(phase_env)), "Environment variables conflict"
539       env.update(phase_env)
540
541     # Convert everything to strings
542     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
543
544     assert compat.all(key == "PATH" or key.startswith("GANETI_")
545                       for key in env)
546
547     return self.callfn(node_list, hpath, phase, env)
548
549   def RunPhase(self, phase, nodes=None):
550     """Run all the scripts for a phase.
551
552     This is the main function of the HookMaster.
553
554     @param phase: one of L{constants.HOOKS_PHASE_POST} or
555         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
556     @param nodes: overrides the predefined list of nodes for the given phase
557     @return: the processed results of the hooks multi-node rpc call
558     @raise errors.HooksFailure: on communication failure to the nodes
559     @raise errors.HooksAbort: on failure of one of the hooks
560
561     """
562     if phase == constants.HOOKS_PHASE_PRE:
563       if nodes is None:
564         nodes = self.pre_nodes
565       env = self.pre_env
566     elif phase == constants.HOOKS_PHASE_POST:
567       if nodes is None:
568         nodes = self.post_nodes
569       env = self._BuildEnv(phase)
570     else:
571       raise AssertionError("Unknown phase '%s'" % phase)
572
573     if not nodes:
574       # empty node list, we should not attempt to run this as either
575       # we're in the cluster init phase and the rpc client part can't
576       # even attempt to run, or this LU doesn't do hooks at all
577       return
578
579     results = self._RunWrapper(nodes, self.lu.HPATH, phase, env)
580     if not results:
581       msg = "Communication Failure"
582       if phase == constants.HOOKS_PHASE_PRE:
583         raise errors.HooksFailure(msg)
584       else:
585         self.lu.LogWarning(msg)
586         return results
587
588     errs = []
589     for node_name in results:
590       res = results[node_name]
591       if res.offline:
592         continue
593
594       msg = res.fail_msg
595       if msg:
596         self.lu.LogWarning("Communication failure to node %s: %s",
597                            node_name, msg)
598         continue
599
600       for script, hkr, output in res.payload:
601         if hkr == constants.HKR_FAIL:
602           if phase == constants.HOOKS_PHASE_PRE:
603             errs.append((node_name, script, output))
604           else:
605             if not output:
606               output = "(no output)"
607             self.lu.LogWarning("On %s script %s failed, output: %s" %
608                                (node_name, script, output))
609
610     if errs and phase == constants.HOOKS_PHASE_PRE:
611       raise errors.HooksAbort(errs)
612
613     return results
614
615   def RunConfigUpdate(self):
616     """Run the special configuration update hook
617
618     This is a special hook that runs only on the master after each
619     top-level LI if the configuration has been updated.
620
621     """
622     phase = constants.HOOKS_PHASE_POST
623     hpath = constants.HOOKS_NAME_CFGUPDATE
624     nodes = [self.lu.cfg.GetMasterNode()]
625     self._RunWrapper(nodes, hpath, phase, self.pre_env)