cmdlib: Remove punctuation from error messages
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32 import random
33 import time
34
35 from ganeti import opcodes
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import rpc
39 from ganeti import cmdlib
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import compat
43
44
45 _OP_PREFIX = "Op"
46 _LU_PREFIX = "LU"
47
48
49 class LockAcquireTimeout(Exception):
50   """Exception to report timeouts on acquiring locks.
51
52   """
53
54
55 def _CalculateLockAttemptTimeouts():
56   """Calculate timeouts for lock attempts.
57
58   """
59   result = [constants.LOCK_ATTEMPTS_MINWAIT]
60   running_sum = result[0]
61
62   # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
63   # blocking acquire
64   while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
65     timeout = (result[-1] * 1.05) ** 1.25
66
67     # Cap max timeout. This gives other jobs a chance to run even if
68     # we're still trying to get our locks, before finally moving to a
69     # blocking acquire.
70     timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
71     # And also cap the lower boundary for safety
72     timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
73
74     result.append(timeout)
75     running_sum += timeout
76
77   return result
78
79
80 class LockAttemptTimeoutStrategy(object):
81   """Class with lock acquire timeout strategy.
82
83   """
84   __slots__ = [
85     "_timeouts",
86     "_random_fn",
87     "_time_fn",
88     ]
89
90   _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
91
92   def __init__(self, _time_fn=time.time, _random_fn=random.random):
93     """Initializes this class.
94
95     @param _time_fn: Time function for unittests
96     @param _random_fn: Random number generator for unittests
97
98     """
99     object.__init__(self)
100
101     self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
102     self._time_fn = _time_fn
103     self._random_fn = _random_fn
104
105   def NextAttempt(self):
106     """Returns the timeout for the next attempt.
107
108     """
109     try:
110       timeout = self._timeouts.next()
111     except StopIteration:
112       # No more timeouts, do blocking acquire
113       timeout = None
114
115     if timeout is not None:
116       # Add a small variation (-/+ 5%) to timeout. This helps in situations
117       # where two or more jobs are fighting for the same lock(s).
118       variation_range = timeout * 0.1
119       timeout += ((self._random_fn() * variation_range) -
120                   (variation_range * 0.5))
121
122     return timeout
123
124
125 class OpExecCbBase: # pylint: disable-msg=W0232
126   """Base class for OpCode execution callbacks.
127
128   """
129   def NotifyStart(self):
130     """Called when we are about to execute the LU.
131
132     This function is called when we're about to start the lu's Exec() method,
133     that is, after we have acquired all locks.
134
135     """
136
137   def Feedback(self, *args):
138     """Sends feedback from the LU code to the end-user.
139
140     """
141
142   def CheckCancel(self):
143     """Check whether job has been cancelled.
144
145     """
146
147   def SubmitManyJobs(self, jobs):
148     """Submits jobs for processing.
149
150     See L{jqueue.JobQueue.SubmitManyJobs}.
151
152     """
153     raise NotImplementedError
154
155
156 def _LUNameForOpName(opname):
157   """Computes the LU name for a given OpCode name.
158
159   """
160   assert opname.startswith(_OP_PREFIX), \
161       "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
162
163   return _LU_PREFIX + opname[len(_OP_PREFIX):]
164
165
166 def _ComputeDispatchTable():
167   """Computes the opcode-to-lu dispatch table.
168
169   """
170   return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
171               for op in opcodes.OP_MAPPING.values()
172               if op.WITH_LU)
173
174
175 class Processor(object):
176   """Object which runs OpCodes"""
177   DISPATCH_TABLE = _ComputeDispatchTable()
178
179   def __init__(self, context, ec_id):
180     """Constructor for Processor
181
182     @type context: GanetiContext
183     @param context: global Ganeti context
184     @type ec_id: string
185     @param ec_id: execution context identifier
186
187     """
188     self.context = context
189     self._ec_id = ec_id
190     self._cbs = None
191     self.rpc = rpc.RpcRunner(context.cfg)
192     self.hmclass = HooksMaster
193
194   def _AcquireLocks(self, level, names, shared, timeout, priority):
195     """Acquires locks via the Ganeti lock manager.
196
197     @type level: int
198     @param level: Lock level
199     @type names: list or string
200     @param names: Lock names
201     @type shared: bool
202     @param shared: Whether the locks should be acquired in shared mode
203     @type timeout: None or float
204     @param timeout: Timeout for acquiring the locks
205     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
206         amount of time
207
208     """
209     if self._cbs:
210       self._cbs.CheckCancel()
211
212     acquired = self.context.glm.acquire(level, names, shared=shared,
213                                         timeout=timeout, priority=priority)
214
215     if acquired is None:
216       raise LockAcquireTimeout()
217
218     return acquired
219
220   def _ProcessResult(self, result):
221     """
222
223     """
224     if isinstance(result, cmdlib.ResultWithJobs):
225       # Submit jobs
226       job_submission = self._cbs.SubmitManyJobs(result.jobs)
227
228       # Build dictionary
229       result = result.other
230
231       assert constants.JOB_IDS_KEY not in result, \
232         "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
233
234       result[constants.JOB_IDS_KEY] = job_submission
235
236     return result
237
238   def _ExecLU(self, lu):
239     """Logical Unit execution sequence.
240
241     """
242     write_count = self.context.cfg.write_count
243     lu.CheckPrereq()
244     hm = HooksMaster(self.rpc.call_hooks_runner, lu)
245     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
246     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
247                      self.Log, None)
248
249     if getattr(lu.op, "dry_run", False):
250       # in this mode, no post-hooks are run, and the config is not
251       # written (as it might have been modified by another LU, and we
252       # shouldn't do writeout on behalf of other threads
253       self.LogInfo("dry-run mode requested, not actually executing"
254                    " the operation")
255       return lu.dry_run_result
256
257     try:
258       result = self._ProcessResult(lu.Exec(self.Log))
259       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
260       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
261                                 self.Log, result)
262     finally:
263       # FIXME: This needs locks if not lu_class.REQ_BGL
264       if write_count != self.context.cfg.write_count:
265         hm.RunConfigUpdate()
266
267     return result
268
269   def _LockAndExecLU(self, lu, level, calc_timeout, priority):
270     """Execute a Logical Unit, with the needed locks.
271
272     This is a recursive function that starts locking the given level, and
273     proceeds up, till there are no more locks to acquire. Then it executes the
274     given LU and its opcodes.
275
276     """
277     adding_locks = level in lu.add_locks
278     acquiring_locks = level in lu.needed_locks
279     if level not in locking.LEVELS:
280       if self._cbs:
281         self._cbs.NotifyStart()
282
283       result = self._ExecLU(lu)
284
285     elif adding_locks and acquiring_locks:
286       # We could both acquire and add locks at the same level, but for now we
287       # don't need this, so we'll avoid the complicated code needed.
288       raise NotImplementedError("Can't declare locks to acquire when adding"
289                                 " others")
290
291     elif adding_locks or acquiring_locks:
292       lu.DeclareLocks(level)
293       share = lu.share_locks[level]
294
295       try:
296         assert adding_locks ^ acquiring_locks, \
297           "Locks must be either added or acquired"
298
299         if acquiring_locks:
300           # Acquiring locks
301           needed_locks = lu.needed_locks[level]
302
303           self._AcquireLocks(level, needed_locks, share,
304                              calc_timeout(), priority)
305         else:
306           # Adding locks
307           add_locks = lu.add_locks[level]
308           lu.remove_locks[level] = add_locks
309
310           try:
311             self.context.glm.add(level, add_locks, acquired=1, shared=share)
312           except errors.LockError:
313             raise errors.OpPrereqError(
314               "Couldn't add locks (%s), probably because of a race condition"
315               " with another job, who added them first" % add_locks,
316               errors.ECODE_FAULT)
317
318         try:
319           result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
320         finally:
321           if level in lu.remove_locks:
322             self.context.glm.remove(level, lu.remove_locks[level])
323       finally:
324         if self.context.glm.is_owned(level):
325           self.context.glm.release(level)
326
327     else:
328       result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
329
330     return result
331
332   def ExecOpCode(self, op, cbs, timeout=None, priority=None):
333     """Execute an opcode.
334
335     @type op: an OpCode instance
336     @param op: the opcode to be executed
337     @type cbs: L{OpExecCbBase}
338     @param cbs: Runtime callbacks
339     @type timeout: float or None
340     @param timeout: Maximum time to acquire all locks, None for no timeout
341     @type priority: number or None
342     @param priority: Priority for acquiring lock(s)
343     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
344         amount of time
345
346     """
347     if not isinstance(op, opcodes.OpCode):
348       raise errors.ProgrammerError("Non-opcode instance passed"
349                                    " to ExecOpcode")
350
351     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
352     if lu_class is None:
353       raise errors.OpCodeUnknown("Unknown opcode")
354
355     if timeout is None:
356       calc_timeout = lambda: None
357     else:
358       calc_timeout = utils.RunningTimeout(timeout, False).Remaining
359
360     self._cbs = cbs
361     try:
362       # Acquire the Big Ganeti Lock exclusively if this LU requires it,
363       # and in a shared fashion otherwise (to prevent concurrent run with
364       # an exclusive LU.
365       self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
366                           not lu_class.REQ_BGL, calc_timeout(),
367                           priority)
368       try:
369         lu = lu_class(self, op, self.context, self.rpc)
370         lu.ExpandNames()
371         assert lu.needed_locks is not None, "needed_locks not set by LU"
372
373         try:
374           return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
375                                      priority)
376         finally:
377           if self._ec_id:
378             self.context.cfg.DropECReservations(self._ec_id)
379       finally:
380         self.context.glm.release(locking.LEVEL_CLUSTER)
381     finally:
382       self._cbs = None
383
384   def Log(self, *args):
385     """Forward call to feedback callback function.
386
387     """
388     if self._cbs:
389       self._cbs.Feedback(*args)
390
391   def LogStep(self, current, total, message):
392     """Log a change in LU execution progress.
393
394     """
395     logging.debug("Step %d/%d %s", current, total, message)
396     self.Log("STEP %d/%d %s" % (current, total, message))
397
398   def LogWarning(self, message, *args, **kwargs):
399     """Log a warning to the logs and the user.
400
401     The optional keyword argument is 'hint' and can be used to show a
402     hint to the user (presumably related to the warning). If the
403     message is empty, it will not be printed at all, allowing one to
404     show only a hint.
405
406     """
407     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
408            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
409     if args:
410       message = message % tuple(args)
411     if message:
412       logging.warning(message)
413       self.Log(" - WARNING: %s" % message)
414     if "hint" in kwargs:
415       self.Log("      Hint: %s" % kwargs["hint"])
416
417   def LogInfo(self, message, *args):
418     """Log an informational message to the logs and the user.
419
420     """
421     if args:
422       message = message % tuple(args)
423     logging.info(message)
424     self.Log(" - INFO: %s" % message)
425
426   def GetECId(self):
427     """Returns the current execution context ID.
428
429     """
430     if not self._ec_id:
431       raise errors.ProgrammerError("Tried to use execution context id when"
432                                    " not set")
433     return self._ec_id
434
435
436 class HooksMaster(object):
437   """Hooks master.
438
439   This class distributes the run commands to the nodes based on the
440   specific LU class.
441
442   In order to remove the direct dependency on the rpc module, the
443   constructor needs a function which actually does the remote
444   call. This will usually be rpc.call_hooks_runner, but any function
445   which behaves the same works.
446
447   """
448   def __init__(self, callfn, lu):
449     self.callfn = callfn
450     self.lu = lu
451     self.op = lu.op
452     self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
453
454     if self.lu.HPATH is None:
455       nodes = (None, None)
456     else:
457       nodes = map(frozenset, self.lu.BuildHooksNodes())
458
459     (self.pre_nodes, self.post_nodes) = nodes
460
461   def _BuildEnv(self, phase):
462     """Compute the environment and the target nodes.
463
464     Based on the opcode and the current node list, this builds the
465     environment for the hooks and the target node list for the run.
466
467     """
468     if phase == constants.HOOKS_PHASE_PRE:
469       prefix = "GANETI_"
470     elif phase == constants.HOOKS_PHASE_POST:
471       prefix = "GANETI_POST_"
472     else:
473       raise AssertionError("Unknown phase '%s'" % phase)
474
475     env = {}
476
477     if self.lu.HPATH is not None:
478       lu_env = self.lu.BuildHooksEnv()
479       if lu_env:
480         assert not compat.any(key.upper().startswith(prefix) for key in lu_env)
481         env.update(("%s%s" % (prefix, key), value)
482                    for (key, value) in lu_env.items())
483
484     if phase == constants.HOOKS_PHASE_PRE:
485       assert compat.all((key.startswith("GANETI_") and
486                          not key.startswith("GANETI_POST_"))
487                         for key in env)
488
489     elif phase == constants.HOOKS_PHASE_POST:
490       assert compat.all(key.startswith("GANETI_POST_") for key in env)
491       assert isinstance(self.pre_env, dict)
492
493       # Merge with pre-phase environment
494       assert not compat.any(key.startswith("GANETI_POST_")
495                             for key in self.pre_env)
496       env.update(self.pre_env)
497     else:
498       raise AssertionError("Unknown phase '%s'" % phase)
499
500     return env
501
502   def _RunWrapper(self, node_list, hpath, phase, phase_env):
503     """Simple wrapper over self.callfn.
504
505     This method fixes the environment before doing the rpc call.
506
507     """
508     cfg = self.lu.cfg
509
510     env = {
511       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
512       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
513       "GANETI_OP_CODE": self.op.OP_ID,
514       "GANETI_DATA_DIR": constants.DATA_DIR,
515       "GANETI_HOOKS_PHASE": phase,
516       "GANETI_HOOKS_PATH": hpath,
517       }
518
519     if self.lu.HTYPE:
520       env["GANETI_OBJECT_TYPE"] = self.lu.HTYPE
521
522     if cfg is not None:
523       env["GANETI_CLUSTER"] = cfg.GetClusterName()
524       env["GANETI_MASTER"] = cfg.GetMasterNode()
525
526     if phase_env:
527       assert not (set(env) & set(phase_env)), "Environment variables conflict"
528       env.update(phase_env)
529
530     # Convert everything to strings
531     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
532
533     assert compat.all(key == "PATH" or key.startswith("GANETI_")
534                       for key in env)
535
536     return self.callfn(node_list, hpath, phase, env)
537
538   def RunPhase(self, phase, nodes=None):
539     """Run all the scripts for a phase.
540
541     This is the main function of the HookMaster.
542
543     @param phase: one of L{constants.HOOKS_PHASE_POST} or
544         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
545     @param nodes: overrides the predefined list of nodes for the given phase
546     @return: the processed results of the hooks multi-node rpc call
547     @raise errors.HooksFailure: on communication failure to the nodes
548     @raise errors.HooksAbort: on failure of one of the hooks
549
550     """
551     if phase == constants.HOOKS_PHASE_PRE:
552       if nodes is None:
553         nodes = self.pre_nodes
554       env = self.pre_env
555     elif phase == constants.HOOKS_PHASE_POST:
556       if nodes is None:
557         nodes = self.post_nodes
558       env = self._BuildEnv(phase)
559     else:
560       raise AssertionError("Unknown phase '%s'" % phase)
561
562     if not nodes:
563       # empty node list, we should not attempt to run this as either
564       # we're in the cluster init phase and the rpc client part can't
565       # even attempt to run, or this LU doesn't do hooks at all
566       return
567
568     results = self._RunWrapper(nodes, self.lu.HPATH, phase, env)
569     if not results:
570       msg = "Communication Failure"
571       if phase == constants.HOOKS_PHASE_PRE:
572         raise errors.HooksFailure(msg)
573       else:
574         self.lu.LogWarning(msg)
575         return results
576
577     errs = []
578     for node_name in results:
579       res = results[node_name]
580       if res.offline:
581         continue
582
583       msg = res.fail_msg
584       if msg:
585         self.lu.LogWarning("Communication failure to node %s: %s",
586                            node_name, msg)
587         continue
588
589       for script, hkr, output in res.payload:
590         if hkr == constants.HKR_FAIL:
591           if phase == constants.HOOKS_PHASE_PRE:
592             errs.append((node_name, script, output))
593           else:
594             if not output:
595               output = "(no output)"
596             self.lu.LogWarning("On %s script %s failed, output: %s" %
597                                (node_name, script, output))
598
599     if errs and phase == constants.HOOKS_PHASE_PRE:
600       raise errors.HooksAbort(errs)
601
602     return results
603
604   def RunConfigUpdate(self):
605     """Run the special configuration update hook
606
607     This is a special hook that runs only on the master after each
608     top-level LI if the configuration has been updated.
609
610     """
611     phase = constants.HOOKS_PHASE_POST
612     hpath = constants.HOOKS_NAME_CFGUPDATE
613     nodes = [self.lu.cfg.GetMasterNode()]
614     self._RunWrapper(nodes, hpath, phase, self.pre_env)