Stop acquiring BGL for LUXI queries
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32 import random
33 import time
34
35 from ganeti import opcodes
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import rpc
39 from ganeti import cmdlib
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import compat
43
44
45 _OP_PREFIX = "Op"
46 _LU_PREFIX = "LU"
47
48
49 class LockAcquireTimeout(Exception):
50   """Exception to report timeouts on acquiring locks.
51
52   """
53
54
55 def _CalculateLockAttemptTimeouts():
56   """Calculate timeouts for lock attempts.
57
58   """
59   result = [constants.LOCK_ATTEMPTS_MINWAIT]
60   running_sum = result[0]
61
62   # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
63   # blocking acquire
64   while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
65     timeout = (result[-1] * 1.05) ** 1.25
66
67     # Cap max timeout. This gives other jobs a chance to run even if
68     # we're still trying to get our locks, before finally moving to a
69     # blocking acquire.
70     timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
71     # And also cap the lower boundary for safety
72     timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
73
74     result.append(timeout)
75     running_sum += timeout
76
77   return result
78
79
80 class LockAttemptTimeoutStrategy(object):
81   """Class with lock acquire timeout strategy.
82
83   """
84   __slots__ = [
85     "_timeouts",
86     "_random_fn",
87     "_time_fn",
88     ]
89
90   _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
91
92   def __init__(self, _time_fn=time.time, _random_fn=random.random):
93     """Initializes this class.
94
95     @param _time_fn: Time function for unittests
96     @param _random_fn: Random number generator for unittests
97
98     """
99     object.__init__(self)
100
101     self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
102     self._time_fn = _time_fn
103     self._random_fn = _random_fn
104
105   def NextAttempt(self):
106     """Returns the timeout for the next attempt.
107
108     """
109     try:
110       timeout = self._timeouts.next()
111     except StopIteration:
112       # No more timeouts, do blocking acquire
113       timeout = None
114
115     if timeout is not None:
116       # Add a small variation (-/+ 5%) to timeout. This helps in situations
117       # where two or more jobs are fighting for the same lock(s).
118       variation_range = timeout * 0.1
119       timeout += ((self._random_fn() * variation_range) -
120                   (variation_range * 0.5))
121
122     return timeout
123
124
125 class OpExecCbBase: # pylint: disable=W0232
126   """Base class for OpCode execution callbacks.
127
128   """
129   def NotifyStart(self):
130     """Called when we are about to execute the LU.
131
132     This function is called when we're about to start the lu's Exec() method,
133     that is, after we have acquired all locks.
134
135     """
136
137   def Feedback(self, *args):
138     """Sends feedback from the LU code to the end-user.
139
140     """
141
142   def CheckCancel(self):
143     """Check whether job has been cancelled.
144
145     """
146
147   def SubmitManyJobs(self, jobs):
148     """Submits jobs for processing.
149
150     See L{jqueue.JobQueue.SubmitManyJobs}.
151
152     """
153     raise NotImplementedError
154
155
156 def _LUNameForOpName(opname):
157   """Computes the LU name for a given OpCode name.
158
159   """
160   assert opname.startswith(_OP_PREFIX), \
161       "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
162
163   return _LU_PREFIX + opname[len(_OP_PREFIX):]
164
165
166 def _ComputeDispatchTable():
167   """Computes the opcode-to-lu dispatch table.
168
169   """
170   return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
171               for op in opcodes.OP_MAPPING.values()
172               if op.WITH_LU)
173
174
175 class Processor(object):
176   """Object which runs OpCodes"""
177   DISPATCH_TABLE = _ComputeDispatchTable()
178
179   def __init__(self, context, ec_id, enable_locks=True):
180     """Constructor for Processor
181
182     @type context: GanetiContext
183     @param context: global Ganeti context
184     @type ec_id: string
185     @param ec_id: execution context identifier
186
187     """
188     self.context = context
189     self._ec_id = ec_id
190     self._cbs = None
191     self.rpc = rpc.RpcRunner(context.cfg)
192     self.hmclass = HooksMaster
193     self._enable_locks = enable_locks
194
195   def _CheckLocksEnabled(self):
196     """Checks if locking is enabled.
197
198     @raise errors.ProgrammerError: In case locking is not enabled
199
200     """
201     if not self._enable_locks:
202       raise errors.ProgrammerError("Attempted to use disabled locks")
203
204   def _AcquireLocks(self, level, names, shared, timeout, priority):
205     """Acquires locks via the Ganeti lock manager.
206
207     @type level: int
208     @param level: Lock level
209     @type names: list or string
210     @param names: Lock names
211     @type shared: bool
212     @param shared: Whether the locks should be acquired in shared mode
213     @type timeout: None or float
214     @param timeout: Timeout for acquiring the locks
215     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
216         amount of time
217
218     """
219     self._CheckLocksEnabled()
220
221     if self._cbs:
222       self._cbs.CheckCancel()
223
224     acquired = self.context.glm.acquire(level, names, shared=shared,
225                                         timeout=timeout, priority=priority)
226
227     if acquired is None:
228       raise LockAcquireTimeout()
229
230     return acquired
231
232   def _ProcessResult(self, result):
233     """Examines opcode result.
234
235     If necessary, additional processing on the result is done.
236
237     """
238     if isinstance(result, cmdlib.ResultWithJobs):
239       # Submit jobs
240       job_submission = self._cbs.SubmitManyJobs(result.jobs)
241
242       # Build dictionary
243       result = result.other
244
245       assert constants.JOB_IDS_KEY not in result, \
246         "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
247
248       result[constants.JOB_IDS_KEY] = job_submission
249
250     return result
251
252   def _ExecLU(self, lu):
253     """Logical Unit execution sequence.
254
255     """
256     write_count = self.context.cfg.write_count
257     lu.CheckPrereq()
258     hm = HooksMaster(self.rpc.call_hooks_runner, lu)
259     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
260     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
261                      self.Log, None)
262
263     if getattr(lu.op, "dry_run", False):
264       # in this mode, no post-hooks are run, and the config is not
265       # written (as it might have been modified by another LU, and we
266       # shouldn't do writeout on behalf of other threads
267       self.LogInfo("dry-run mode requested, not actually executing"
268                    " the operation")
269       return lu.dry_run_result
270
271     try:
272       result = self._ProcessResult(lu.Exec(self.Log))
273       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
274       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
275                                 self.Log, result)
276     finally:
277       # FIXME: This needs locks if not lu_class.REQ_BGL
278       if write_count != self.context.cfg.write_count:
279         hm.RunConfigUpdate()
280
281     return result
282
283   def _LockAndExecLU(self, lu, level, calc_timeout, priority):
284     """Execute a Logical Unit, with the needed locks.
285
286     This is a recursive function that starts locking the given level, and
287     proceeds up, till there are no more locks to acquire. Then it executes the
288     given LU and its opcodes.
289
290     """
291     adding_locks = level in lu.add_locks
292     acquiring_locks = level in lu.needed_locks
293     if level not in locking.LEVELS:
294       if self._cbs:
295         self._cbs.NotifyStart()
296
297       result = self._ExecLU(lu)
298
299     elif adding_locks and acquiring_locks:
300       # We could both acquire and add locks at the same level, but for now we
301       # don't need this, so we'll avoid the complicated code needed.
302       raise NotImplementedError("Can't declare locks to acquire when adding"
303                                 " others")
304
305     elif adding_locks or acquiring_locks:
306       self._CheckLocksEnabled()
307
308       lu.DeclareLocks(level)
309       share = lu.share_locks[level]
310
311       try:
312         assert adding_locks ^ acquiring_locks, \
313           "Locks must be either added or acquired"
314
315         if acquiring_locks:
316           # Acquiring locks
317           needed_locks = lu.needed_locks[level]
318
319           self._AcquireLocks(level, needed_locks, share,
320                              calc_timeout(), priority)
321         else:
322           # Adding locks
323           add_locks = lu.add_locks[level]
324           lu.remove_locks[level] = add_locks
325
326           try:
327             self.context.glm.add(level, add_locks, acquired=1, shared=share)
328           except errors.LockError:
329             raise errors.OpPrereqError(
330               "Couldn't add locks (%s), probably because of a race condition"
331               " with another job, who added them first" % add_locks,
332               errors.ECODE_FAULT)
333
334         try:
335           result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
336         finally:
337           if level in lu.remove_locks:
338             self.context.glm.remove(level, lu.remove_locks[level])
339       finally:
340         if self.context.glm.is_owned(level):
341           self.context.glm.release(level)
342
343     else:
344       result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
345
346     return result
347
348   def ExecOpCode(self, op, cbs, timeout=None, priority=None):
349     """Execute an opcode.
350
351     @type op: an OpCode instance
352     @param op: the opcode to be executed
353     @type cbs: L{OpExecCbBase}
354     @param cbs: Runtime callbacks
355     @type timeout: float or None
356     @param timeout: Maximum time to acquire all locks, None for no timeout
357     @type priority: number or None
358     @param priority: Priority for acquiring lock(s)
359     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
360         amount of time
361
362     """
363     if not isinstance(op, opcodes.OpCode):
364       raise errors.ProgrammerError("Non-opcode instance passed"
365                                    " to ExecOpcode (%s)" % type(op))
366
367     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
368     if lu_class is None:
369       raise errors.OpCodeUnknown("Unknown opcode")
370
371     if timeout is None:
372       calc_timeout = lambda: None
373     else:
374       calc_timeout = utils.RunningTimeout(timeout, False).Remaining
375
376     self._cbs = cbs
377     try:
378       if self._enable_locks:
379         # Acquire the Big Ganeti Lock exclusively if this LU requires it,
380         # and in a shared fashion otherwise (to prevent concurrent run with
381         # an exclusive LU.
382         self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
383                             not lu_class.REQ_BGL, calc_timeout(),
384                             priority)
385       elif lu_class.REQ_BGL:
386         raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
387                                      " disabled" % op.OP_ID)
388
389       try:
390         lu = lu_class(self, op, self.context, self.rpc)
391         lu.ExpandNames()
392         assert lu.needed_locks is not None, "needed_locks not set by LU"
393
394         try:
395           result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
396                                        priority)
397         finally:
398           if self._ec_id:
399             self.context.cfg.DropECReservations(self._ec_id)
400       finally:
401         # Release BGL if owned
402         if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
403           assert self._enable_locks
404           self.context.glm.release(locking.LEVEL_CLUSTER)
405     finally:
406       self._cbs = None
407
408     resultcheck_fn = op.OP_RESULT
409     if not (resultcheck_fn is None or resultcheck_fn(result)):
410       logging.error("Expected opcode result matching %s, got %s",
411                     resultcheck_fn, result)
412       raise errors.OpResultError("Opcode result does not match %s" %
413                                  resultcheck_fn)
414
415     return result
416
417   def Log(self, *args):
418     """Forward call to feedback callback function.
419
420     """
421     if self._cbs:
422       self._cbs.Feedback(*args)
423
424   def LogStep(self, current, total, message):
425     """Log a change in LU execution progress.
426
427     """
428     logging.debug("Step %d/%d %s", current, total, message)
429     self.Log("STEP %d/%d %s" % (current, total, message))
430
431   def LogWarning(self, message, *args, **kwargs):
432     """Log a warning to the logs and the user.
433
434     The optional keyword argument is 'hint' and can be used to show a
435     hint to the user (presumably related to the warning). If the
436     message is empty, it will not be printed at all, allowing one to
437     show only a hint.
438
439     """
440     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
441            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
442     if args:
443       message = message % tuple(args)
444     if message:
445       logging.warning(message)
446       self.Log(" - WARNING: %s" % message)
447     if "hint" in kwargs:
448       self.Log("      Hint: %s" % kwargs["hint"])
449
450   def LogInfo(self, message, *args):
451     """Log an informational message to the logs and the user.
452
453     """
454     if args:
455       message = message % tuple(args)
456     logging.info(message)
457     self.Log(" - INFO: %s" % message)
458
459   def GetECId(self):
460     """Returns the current execution context ID.
461
462     """
463     if not self._ec_id:
464       raise errors.ProgrammerError("Tried to use execution context id when"
465                                    " not set")
466     return self._ec_id
467
468
469 class HooksMaster(object):
470   """Hooks master.
471
472   This class distributes the run commands to the nodes based on the
473   specific LU class.
474
475   In order to remove the direct dependency on the rpc module, the
476   constructor needs a function which actually does the remote
477   call. This will usually be rpc.call_hooks_runner, but any function
478   which behaves the same works.
479
480   """
481   def __init__(self, callfn, lu):
482     self.callfn = callfn
483     self.lu = lu
484     self.op = lu.op
485     self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
486
487     if self.lu.HPATH is None:
488       nodes = (None, None)
489     else:
490       nodes = map(frozenset, self.lu.BuildHooksNodes())
491
492     (self.pre_nodes, self.post_nodes) = nodes
493
494   def _BuildEnv(self, phase):
495     """Compute the environment and the target nodes.
496
497     Based on the opcode and the current node list, this builds the
498     environment for the hooks and the target node list for the run.
499
500     """
501     if phase == constants.HOOKS_PHASE_PRE:
502       prefix = "GANETI_"
503     elif phase == constants.HOOKS_PHASE_POST:
504       prefix = "GANETI_POST_"
505     else:
506       raise AssertionError("Unknown phase '%s'" % phase)
507
508     env = {}
509
510     if self.lu.HPATH is not None:
511       lu_env = self.lu.BuildHooksEnv()
512       if lu_env:
513         assert not compat.any(key.upper().startswith(prefix) for key in lu_env)
514         env.update(("%s%s" % (prefix, key), value)
515                    for (key, value) in lu_env.items())
516
517     if phase == constants.HOOKS_PHASE_PRE:
518       assert compat.all((key.startswith("GANETI_") and
519                          not key.startswith("GANETI_POST_"))
520                         for key in env)
521
522     elif phase == constants.HOOKS_PHASE_POST:
523       assert compat.all(key.startswith("GANETI_POST_") for key in env)
524       assert isinstance(self.pre_env, dict)
525
526       # Merge with pre-phase environment
527       assert not compat.any(key.startswith("GANETI_POST_")
528                             for key in self.pre_env)
529       env.update(self.pre_env)
530     else:
531       raise AssertionError("Unknown phase '%s'" % phase)
532
533     return env
534
535   def _RunWrapper(self, node_list, hpath, phase, phase_env):
536     """Simple wrapper over self.callfn.
537
538     This method fixes the environment before doing the rpc call.
539
540     """
541     cfg = self.lu.cfg
542
543     env = {
544       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
545       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
546       "GANETI_OP_CODE": self.op.OP_ID,
547       "GANETI_DATA_DIR": constants.DATA_DIR,
548       "GANETI_HOOKS_PHASE": phase,
549       "GANETI_HOOKS_PATH": hpath,
550       }
551
552     if self.lu.HTYPE:
553       env["GANETI_OBJECT_TYPE"] = self.lu.HTYPE
554
555     if cfg is not None:
556       env["GANETI_CLUSTER"] = cfg.GetClusterName()
557       env["GANETI_MASTER"] = cfg.GetMasterNode()
558
559     if phase_env:
560       assert not (set(env) & set(phase_env)), "Environment variables conflict"
561       env.update(phase_env)
562
563     # Convert everything to strings
564     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
565
566     assert compat.all(key == "PATH" or key.startswith("GANETI_")
567                       for key in env)
568
569     return self.callfn(node_list, hpath, phase, env)
570
571   def RunPhase(self, phase, nodes=None):
572     """Run all the scripts for a phase.
573
574     This is the main function of the HookMaster.
575
576     @param phase: one of L{constants.HOOKS_PHASE_POST} or
577         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
578     @param nodes: overrides the predefined list of nodes for the given phase
579     @return: the processed results of the hooks multi-node rpc call
580     @raise errors.HooksFailure: on communication failure to the nodes
581     @raise errors.HooksAbort: on failure of one of the hooks
582
583     """
584     if phase == constants.HOOKS_PHASE_PRE:
585       if nodes is None:
586         nodes = self.pre_nodes
587       env = self.pre_env
588     elif phase == constants.HOOKS_PHASE_POST:
589       if nodes is None:
590         nodes = self.post_nodes
591       env = self._BuildEnv(phase)
592     else:
593       raise AssertionError("Unknown phase '%s'" % phase)
594
595     if not nodes:
596       # empty node list, we should not attempt to run this as either
597       # we're in the cluster init phase and the rpc client part can't
598       # even attempt to run, or this LU doesn't do hooks at all
599       return
600
601     results = self._RunWrapper(nodes, self.lu.HPATH, phase, env)
602     if not results:
603       msg = "Communication Failure"
604       if phase == constants.HOOKS_PHASE_PRE:
605         raise errors.HooksFailure(msg)
606       else:
607         self.lu.LogWarning(msg)
608         return results
609
610     errs = []
611     for node_name in results:
612       res = results[node_name]
613       if res.offline:
614         continue
615
616       msg = res.fail_msg
617       if msg:
618         self.lu.LogWarning("Communication failure to node %s: %s",
619                            node_name, msg)
620         continue
621
622       for script, hkr, output in res.payload:
623         if hkr == constants.HKR_FAIL:
624           if phase == constants.HOOKS_PHASE_PRE:
625             errs.append((node_name, script, output))
626           else:
627             if not output:
628               output = "(no output)"
629             self.lu.LogWarning("On %s script %s failed, output: %s" %
630                                (node_name, script, output))
631
632     if errs and phase == constants.HOOKS_PHASE_PRE:
633       raise errors.HooksAbort(errs)
634
635     return results
636
637   def RunConfigUpdate(self):
638     """Run the special configuration update hook
639
640     This is a special hook that runs only on the master after each
641     top-level LI if the configuration has been updated.
642
643     """
644     phase = constants.HOOKS_PHASE_POST
645     hpath = constants.HOOKS_NAME_CFGUPDATE
646     nodes = [self.lu.cfg.GetMasterNode()]
647     self._RunWrapper(nodes, hpath, phase, self.pre_env)