Include PycURL error code in GanetiApiError.
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32 import random
33 import time
34 import itertools
35
36 from ganeti import opcodes
37 from ganeti import constants
38 from ganeti import errors
39 from ganeti import cmdlib
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import compat
43
44
45 _OP_PREFIX = "Op"
46 _LU_PREFIX = "LU"
47
48
49 class LockAcquireTimeout(Exception):
50   """Exception to report timeouts on acquiring locks.
51
52   """
53
54
55 def _CalculateLockAttemptTimeouts():
56   """Calculate timeouts for lock attempts.
57
58   """
59   result = [constants.LOCK_ATTEMPTS_MINWAIT]
60   running_sum = result[0]
61
62   # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
63   # blocking acquire
64   while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
65     timeout = (result[-1] * 1.05) ** 1.25
66
67     # Cap max timeout. This gives other jobs a chance to run even if
68     # we're still trying to get our locks, before finally moving to a
69     # blocking acquire.
70     timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
71     # And also cap the lower boundary for safety
72     timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
73
74     result.append(timeout)
75     running_sum += timeout
76
77   return result
78
79
80 class LockAttemptTimeoutStrategy(object):
81   """Class with lock acquire timeout strategy.
82
83   """
84   __slots__ = [
85     "_timeouts",
86     "_random_fn",
87     "_time_fn",
88     ]
89
90   _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
91
92   def __init__(self, _time_fn=time.time, _random_fn=random.random):
93     """Initializes this class.
94
95     @param _time_fn: Time function for unittests
96     @param _random_fn: Random number generator for unittests
97
98     """
99     object.__init__(self)
100
101     self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
102     self._time_fn = _time_fn
103     self._random_fn = _random_fn
104
105   def NextAttempt(self):
106     """Returns the timeout for the next attempt.
107
108     """
109     try:
110       timeout = self._timeouts.next()
111     except StopIteration:
112       # No more timeouts, do blocking acquire
113       timeout = None
114
115     if timeout is not None:
116       # Add a small variation (-/+ 5%) to timeout. This helps in situations
117       # where two or more jobs are fighting for the same lock(s).
118       variation_range = timeout * 0.1
119       timeout += ((self._random_fn() * variation_range) -
120                   (variation_range * 0.5))
121
122     return timeout
123
124
125 class OpExecCbBase: # pylint: disable=W0232
126   """Base class for OpCode execution callbacks.
127
128   """
129   def NotifyStart(self):
130     """Called when we are about to execute the LU.
131
132     This function is called when we're about to start the lu's Exec() method,
133     that is, after we have acquired all locks.
134
135     """
136
137   def Feedback(self, *args):
138     """Sends feedback from the LU code to the end-user.
139
140     """
141
142   def CheckCancel(self):
143     """Check whether job has been cancelled.
144
145     """
146
147   def SubmitManyJobs(self, jobs):
148     """Submits jobs for processing.
149
150     See L{jqueue.JobQueue.SubmitManyJobs}.
151
152     """
153     raise NotImplementedError
154
155
156 def _LUNameForOpName(opname):
157   """Computes the LU name for a given OpCode name.
158
159   """
160   assert opname.startswith(_OP_PREFIX), \
161       "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
162
163   return _LU_PREFIX + opname[len(_OP_PREFIX):]
164
165
166 def _ComputeDispatchTable():
167   """Computes the opcode-to-lu dispatch table.
168
169   """
170   return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
171               for op in opcodes.OP_MAPPING.values()
172               if op.WITH_LU)
173
174
175 def _SetBaseOpParams(src, defcomment, dst):
176   """Copies basic opcode parameters.
177
178   @type src: L{opcodes.OpCode}
179   @param src: Source opcode
180   @type defcomment: string
181   @param defcomment: Comment to specify if not already given
182   @type dst: L{opcodes.OpCode}
183   @param dst: Destination opcode
184
185   """
186   if hasattr(src, "debug_level"):
187     dst.debug_level = src.debug_level
188
189   if (getattr(dst, "priority", None) is None and
190       hasattr(src, "priority")):
191     dst.priority = src.priority
192
193   if not getattr(dst, opcodes.COMMENT_ATTR, None):
194     dst.comment = defcomment
195
196
197 def _ProcessResult(submit_fn, op, result):
198   """Examines opcode result.
199
200   If necessary, additional processing on the result is done.
201
202   """
203   if isinstance(result, cmdlib.ResultWithJobs):
204     # Copy basic parameters (e.g. priority)
205     map(compat.partial(_SetBaseOpParams, op,
206                        "Submitted by %s" % op.OP_ID),
207         itertools.chain(*result.jobs))
208
209     # Submit jobs
210     job_submission = submit_fn(result.jobs)
211
212     # Build dictionary
213     result = result.other
214
215     assert constants.JOB_IDS_KEY not in result, \
216       "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
217
218     result[constants.JOB_IDS_KEY] = job_submission
219
220   return result
221
222
223 def _FailingSubmitManyJobs(_):
224   """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
225
226   """
227   raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
228                                " queries) can not submit jobs")
229
230
231 def _RpcResultsToHooksResults(rpc_results):
232   """Function to convert RPC results to the format expected by HooksMaster.
233
234   @type rpc_results: dict(node: L{rpc.RpcResult})
235   @param rpc_results: RPC results
236   @rtype: dict(node: (fail_msg, offline, hooks_results))
237   @return: RPC results unpacked according to the format expected by
238     L({mcpu.HooksMaster}
239
240   """
241   return dict((node, (rpc_res.fail_msg, rpc_res.offline, rpc_res.payload))
242               for (node, rpc_res) in rpc_results.items())
243
244
245 class Processor(object):
246   """Object which runs OpCodes"""
247   DISPATCH_TABLE = _ComputeDispatchTable()
248
249   def __init__(self, context, ec_id):
250     """Constructor for Processor
251
252     @type context: GanetiContext
253     @param context: global Ganeti context
254     @type ec_id: string
255     @param ec_id: execution context identifier
256
257     """
258     self.context = context
259     self._ec_id = ec_id
260     self._cbs = None
261     self.rpc = context.rpc
262     self.hmclass = HooksMaster
263
264   def _AcquireLocks(self, level, names, shared, timeout, priority):
265     """Acquires locks via the Ganeti lock manager.
266
267     @type level: int
268     @param level: Lock level
269     @type names: list or string
270     @param names: Lock names
271     @type shared: bool
272     @param shared: Whether the locks should be acquired in shared mode
273     @type timeout: None or float
274     @param timeout: Timeout for acquiring the locks
275     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
276         amount of time
277
278     """
279     if self._cbs:
280       self._cbs.CheckCancel()
281
282     acquired = self.context.glm.acquire(level, names, shared=shared,
283                                         timeout=timeout, priority=priority)
284
285     if acquired is None:
286       raise LockAcquireTimeout()
287
288     return acquired
289
290   def _ExecLU(self, lu):
291     """Logical Unit execution sequence.
292
293     """
294     write_count = self.context.cfg.write_count
295     lu.CheckPrereq()
296
297     hm = self.BuildHooksManager(lu)
298     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
299     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
300                      self.Log, None)
301
302     if getattr(lu.op, "dry_run", False):
303       # in this mode, no post-hooks are run, and the config is not
304       # written (as it might have been modified by another LU, and we
305       # shouldn't do writeout on behalf of other threads
306       self.LogInfo("dry-run mode requested, not actually executing"
307                    " the operation")
308       return lu.dry_run_result
309
310     if self._cbs:
311       submit_mj_fn = self._cbs.SubmitManyJobs
312     else:
313       submit_mj_fn = _FailingSubmitManyJobs
314
315     try:
316       result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
317       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
318       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
319                                 self.Log, result)
320     finally:
321       # FIXME: This needs locks if not lu_class.REQ_BGL
322       if write_count != self.context.cfg.write_count:
323         hm.RunConfigUpdate()
324
325     return result
326
327   def BuildHooksManager(self, lu):
328     return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
329
330   def _LockAndExecLU(self, lu, level, calc_timeout, priority):
331     """Execute a Logical Unit, with the needed locks.
332
333     This is a recursive function that starts locking the given level, and
334     proceeds up, till there are no more locks to acquire. Then it executes the
335     given LU and its opcodes.
336
337     """
338     adding_locks = level in lu.add_locks
339     acquiring_locks = level in lu.needed_locks
340     if level not in locking.LEVELS:
341       if self._cbs:
342         self._cbs.NotifyStart()
343
344       result = self._ExecLU(lu)
345
346     elif adding_locks and acquiring_locks:
347       # We could both acquire and add locks at the same level, but for now we
348       # don't need this, so we'll avoid the complicated code needed.
349       raise NotImplementedError("Can't declare locks to acquire when adding"
350                                 " others")
351
352     elif adding_locks or acquiring_locks:
353       lu.DeclareLocks(level)
354       share = lu.share_locks[level]
355
356       try:
357         assert adding_locks ^ acquiring_locks, \
358           "Locks must be either added or acquired"
359
360         if acquiring_locks:
361           # Acquiring locks
362           needed_locks = lu.needed_locks[level]
363
364           self._AcquireLocks(level, needed_locks, share,
365                              calc_timeout(), priority)
366         else:
367           # Adding locks
368           add_locks = lu.add_locks[level]
369           lu.remove_locks[level] = add_locks
370
371           try:
372             self.context.glm.add(level, add_locks, acquired=1, shared=share)
373           except errors.LockError:
374             raise errors.OpPrereqError(
375               "Couldn't add locks (%s), probably because of a race condition"
376               " with another job, who added them first" % add_locks,
377               errors.ECODE_FAULT)
378
379         try:
380           result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
381         finally:
382           if level in lu.remove_locks:
383             self.context.glm.remove(level, lu.remove_locks[level])
384       finally:
385         if self.context.glm.is_owned(level):
386           self.context.glm.release(level)
387
388     else:
389       result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
390
391     return result
392
393   def ExecOpCode(self, op, cbs, timeout=None, priority=None):
394     """Execute an opcode.
395
396     @type op: an OpCode instance
397     @param op: the opcode to be executed
398     @type cbs: L{OpExecCbBase}
399     @param cbs: Runtime callbacks
400     @type timeout: float or None
401     @param timeout: Maximum time to acquire all locks, None for no timeout
402     @type priority: number or None
403     @param priority: Priority for acquiring lock(s)
404     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
405         amount of time
406
407     """
408     if not isinstance(op, opcodes.OpCode):
409       raise errors.ProgrammerError("Non-opcode instance passed"
410                                    " to ExecOpcode (%s)" % type(op))
411
412     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
413     if lu_class is None:
414       raise errors.OpCodeUnknown("Unknown opcode")
415
416     if timeout is None:
417       calc_timeout = lambda: None
418     else:
419       calc_timeout = utils.RunningTimeout(timeout, False).Remaining
420
421     self._cbs = cbs
422     try:
423       # Acquire the Big Ganeti Lock exclusively if this LU requires it,
424       # and in a shared fashion otherwise (to prevent concurrent run with
425       # an exclusive LU.
426       self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
427                           not lu_class.REQ_BGL, calc_timeout(),
428                           priority)
429       try:
430         lu = lu_class(self, op, self.context, self.rpc)
431         lu.ExpandNames()
432         assert lu.needed_locks is not None, "needed_locks not set by LU"
433
434         try:
435           result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
436                                        priority)
437         finally:
438           if self._ec_id:
439             self.context.cfg.DropECReservations(self._ec_id)
440       finally:
441         self.context.glm.release(locking.LEVEL_CLUSTER)
442     finally:
443       self._cbs = None
444
445     resultcheck_fn = op.OP_RESULT
446     if not (resultcheck_fn is None or resultcheck_fn(result)):
447       logging.error("Expected opcode result matching %s, got %s",
448                     resultcheck_fn, result)
449       raise errors.OpResultError("Opcode result does not match %s: %s" %
450                                  (resultcheck_fn, utils.Truncate(result, 80)))
451
452     return result
453
454   def Log(self, *args):
455     """Forward call to feedback callback function.
456
457     """
458     if self._cbs:
459       self._cbs.Feedback(*args)
460
461   def LogStep(self, current, total, message):
462     """Log a change in LU execution progress.
463
464     """
465     logging.debug("Step %d/%d %s", current, total, message)
466     self.Log("STEP %d/%d %s" % (current, total, message))
467
468   def LogWarning(self, message, *args, **kwargs):
469     """Log a warning to the logs and the user.
470
471     The optional keyword argument is 'hint' and can be used to show a
472     hint to the user (presumably related to the warning). If the
473     message is empty, it will not be printed at all, allowing one to
474     show only a hint.
475
476     """
477     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
478            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
479     if args:
480       message = message % tuple(args)
481     if message:
482       logging.warning(message)
483       self.Log(" - WARNING: %s" % message)
484     if "hint" in kwargs:
485       self.Log("      Hint: %s" % kwargs["hint"])
486
487   def LogInfo(self, message, *args):
488     """Log an informational message to the logs and the user.
489
490     """
491     if args:
492       message = message % tuple(args)
493     logging.info(message)
494     self.Log(" - INFO: %s" % message)
495
496   def GetECId(self):
497     """Returns the current execution context ID.
498
499     """
500     if not self._ec_id:
501       raise errors.ProgrammerError("Tried to use execution context id when"
502                                    " not set")
503     return self._ec_id
504
505
506 class HooksMaster(object):
507   def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn,
508     hooks_results_adapt_fn, build_env_fn, log_fn, htype=None, cluster_name=None,
509     master_name=None):
510     """Base class for hooks masters.
511
512     This class invokes the execution of hooks according to the behaviour
513     specified by its parameters.
514
515     @type opcode: string
516     @param opcode: opcode of the operation to which the hooks are tied
517     @type hooks_path: string
518     @param hooks_path: prefix of the hooks directories
519     @type nodes: 2-tuple of lists
520     @param nodes: 2-tuple of lists containing nodes on which pre-hooks must be
521       run and nodes on which post-hooks must be run
522     @type hooks_execution_fn: function that accepts the following parameters:
523       (node_list, hooks_path, phase, environment)
524     @param hooks_execution_fn: function that will execute the hooks; can be
525       None, indicating that no conversion is necessary.
526     @type hooks_results_adapt_fn: function
527     @param hooks_results_adapt_fn: function that will adapt the return value of
528       hooks_execution_fn to the format expected by RunPhase
529     @type build_env_fn: function that returns a dictionary having strings as
530       keys
531     @param build_env_fn: function that builds the environment for the hooks
532     @type log_fn: function that accepts a string
533     @param log_fn: logging function
534     @type htype: string or None
535     @param htype: None or one of L{constants.HTYPE_CLUSTER},
536      L{constants.HTYPE_NODE}, L{constants.HTYPE_INSTANCE}
537     @type cluster_name: string
538     @param cluster_name: name of the cluster
539     @type master_name: string
540     @param master_name: name of the master
541
542     """
543     self.opcode = opcode
544     self.hooks_path = hooks_path
545     self.hooks_execution_fn = hooks_execution_fn
546     self.hooks_results_adapt_fn = hooks_results_adapt_fn
547     self.build_env_fn = build_env_fn
548     self.log_fn = log_fn
549     self.htype = htype
550     self.cluster_name = cluster_name
551     self.master_name = master_name
552
553     self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
554     (self.pre_nodes, self.post_nodes) = nodes
555
556   def _BuildEnv(self, phase):
557     """Compute the environment and the target nodes.
558
559     Based on the opcode and the current node list, this builds the
560     environment for the hooks and the target node list for the run.
561
562     """
563     if phase == constants.HOOKS_PHASE_PRE:
564       prefix = "GANETI_"
565     elif phase == constants.HOOKS_PHASE_POST:
566       prefix = "GANETI_POST_"
567     else:
568       raise AssertionError("Unknown phase '%s'" % phase)
569
570     env = {}
571
572     if self.hooks_path is not None:
573       phase_env = self.build_env_fn()
574       if phase_env:
575         assert not compat.any(key.upper().startswith(prefix)
576                               for key in phase_env)
577         env.update(("%s%s" % (prefix, key), value)
578                    for (key, value) in phase_env.items())
579
580     if phase == constants.HOOKS_PHASE_PRE:
581       assert compat.all((key.startswith("GANETI_") and
582                          not key.startswith("GANETI_POST_"))
583                         for key in env)
584
585     elif phase == constants.HOOKS_PHASE_POST:
586       assert compat.all(key.startswith("GANETI_POST_") for key in env)
587       assert isinstance(self.pre_env, dict)
588
589       # Merge with pre-phase environment
590       assert not compat.any(key.startswith("GANETI_POST_")
591                             for key in self.pre_env)
592       env.update(self.pre_env)
593     else:
594       raise AssertionError("Unknown phase '%s'" % phase)
595
596     return env
597
598   def _RunWrapper(self, node_list, hpath, phase, phase_env):
599     """Simple wrapper over self.callfn.
600
601     This method fixes the environment before executing the hooks.
602
603     """
604     env = {
605       "PATH": constants.HOOKS_PATH,
606       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
607       "GANETI_OP_CODE": self.opcode,
608       "GANETI_DATA_DIR": constants.DATA_DIR,
609       "GANETI_HOOKS_PHASE": phase,
610       "GANETI_HOOKS_PATH": hpath,
611       }
612
613     if self.htype:
614       env["GANETI_OBJECT_TYPE"] = self.htype
615
616     if self.cluster_name is not None:
617       env["GANETI_CLUSTER"] = self.cluster_name
618
619     if self.master_name is not None:
620       env["GANETI_MASTER"] = self.master_name
621
622     if phase_env:
623       env = utils.algo.JoinDisjointDicts(env, phase_env)
624
625     # Convert everything to strings
626     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
627
628     assert compat.all(key == "PATH" or key.startswith("GANETI_")
629                       for key in env)
630
631     return self.hooks_execution_fn(node_list, hpath, phase, env)
632
633   def RunPhase(self, phase, nodes=None):
634     """Run all the scripts for a phase.
635
636     This is the main function of the HookMaster.
637     It executes self.hooks_execution_fn, and after running
638     self.hooks_results_adapt_fn on its results it expects them to be in the form
639     {node_name: (fail_msg, [(script, result, output), ...]}).
640
641     @param phase: one of L{constants.HOOKS_PHASE_POST} or
642         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
643     @param nodes: overrides the predefined list of nodes for the given phase
644     @return: the processed results of the hooks multi-node rpc call
645     @raise errors.HooksFailure: on communication failure to the nodes
646     @raise errors.HooksAbort: on failure of one of the hooks
647
648     """
649     if phase == constants.HOOKS_PHASE_PRE:
650       if nodes is None:
651         nodes = self.pre_nodes
652       env = self.pre_env
653     elif phase == constants.HOOKS_PHASE_POST:
654       if nodes is None:
655         nodes = self.post_nodes
656       env = self._BuildEnv(phase)
657     else:
658       raise AssertionError("Unknown phase '%s'" % phase)
659
660     if not nodes:
661       # empty node list, we should not attempt to run this as either
662       # we're in the cluster init phase and the rpc client part can't
663       # even attempt to run, or this LU doesn't do hooks at all
664       return
665
666     results = self._RunWrapper(nodes, self.hooks_path, phase, env)
667     if not results:
668       msg = "Communication Failure"
669       if phase == constants.HOOKS_PHASE_PRE:
670         raise errors.HooksFailure(msg)
671       else:
672         self.log_fn(msg)
673         return results
674
675     converted_res = results
676     if self.hooks_results_adapt_fn:
677       converted_res = self.hooks_results_adapt_fn(results)
678
679     errs = []
680     for node_name, (fail_msg, offline, hooks_results) in converted_res.items():
681       if offline:
682         continue
683
684       if fail_msg:
685         self.log_fn("Communication failure to node %s: %s", node_name, fail_msg)
686         continue
687
688       for script, hkr, output in hooks_results:
689         if hkr == constants.HKR_FAIL:
690           if phase == constants.HOOKS_PHASE_PRE:
691             errs.append((node_name, script, output))
692           else:
693             if not output:
694               output = "(no output)"
695             self.log_fn("On %s script %s failed, output: %s" %
696                         (node_name, script, output))
697
698     if errs and phase == constants.HOOKS_PHASE_PRE:
699       raise errors.HooksAbort(errs)
700
701     return results
702
703   def RunConfigUpdate(self):
704     """Run the special configuration update hook
705
706     This is a special hook that runs only on the master after each
707     top-level LI if the configuration has been updated.
708
709     """
710     phase = constants.HOOKS_PHASE_POST
711     hpath = constants.HOOKS_NAME_CFGUPDATE
712     nodes = [self.master_name]
713     self._RunWrapper(nodes, hpath, phase, self.pre_env)
714
715   @staticmethod
716   def BuildFromLu(hooks_execution_fn, lu):
717     if lu.HPATH is None:
718       nodes = (None, None)
719     else:
720       nodes = map(frozenset, lu.BuildHooksNodes())
721
722     master_name = cluster_name = None
723     if lu.cfg:
724       master_name = lu.cfg.GetMasterNode()
725       cluster_name = lu.cfg.GetClusterName()
726
727     return HooksMaster(lu.op.OP_ID, lu.HPATH, nodes, hooks_execution_fn,
728                        _RpcResultsToHooksResults, lu.BuildHooksEnv,
729                        lu.LogWarning, lu.HTYPE, cluster_name, master_name)