qlang: Add parser for query filter language
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32 import random
33 import time
34
35 from ganeti import opcodes
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import rpc
39 from ganeti import cmdlib
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import compat
43
44
45 _OP_PREFIX = "Op"
46 _LU_PREFIX = "LU"
47
48
49 class LockAcquireTimeout(Exception):
50   """Exception to report timeouts on acquiring locks.
51
52   """
53
54
55 def _CalculateLockAttemptTimeouts():
56   """Calculate timeouts for lock attempts.
57
58   """
59   result = [constants.LOCK_ATTEMPTS_MINWAIT]
60   running_sum = result[0]
61
62   # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
63   # blocking acquire
64   while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
65     timeout = (result[-1] * 1.05) ** 1.25
66
67     # Cap max timeout. This gives other jobs a chance to run even if
68     # we're still trying to get our locks, before finally moving to a
69     # blocking acquire.
70     timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
71     # And also cap the lower boundary for safety
72     timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
73
74     result.append(timeout)
75     running_sum += timeout
76
77   return result
78
79
80 class LockAttemptTimeoutStrategy(object):
81   """Class with lock acquire timeout strategy.
82
83   """
84   __slots__ = [
85     "_timeouts",
86     "_random_fn",
87     "_time_fn",
88     ]
89
90   _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
91
92   def __init__(self, _time_fn=time.time, _random_fn=random.random):
93     """Initializes this class.
94
95     @param _time_fn: Time function for unittests
96     @param _random_fn: Random number generator for unittests
97
98     """
99     object.__init__(self)
100
101     self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
102     self._time_fn = _time_fn
103     self._random_fn = _random_fn
104
105   def NextAttempt(self):
106     """Returns the timeout for the next attempt.
107
108     """
109     try:
110       timeout = self._timeouts.next()
111     except StopIteration:
112       # No more timeouts, do blocking acquire
113       timeout = None
114
115     if timeout is not None:
116       # Add a small variation (-/+ 5%) to timeout. This helps in situations
117       # where two or more jobs are fighting for the same lock(s).
118       variation_range = timeout * 0.1
119       timeout += ((self._random_fn() * variation_range) -
120                   (variation_range * 0.5))
121
122     return timeout
123
124
125 class OpExecCbBase: # pylint: disable-msg=W0232
126   """Base class for OpCode execution callbacks.
127
128   """
129   def NotifyStart(self):
130     """Called when we are about to execute the LU.
131
132     This function is called when we're about to start the lu's Exec() method,
133     that is, after we have acquired all locks.
134
135     """
136
137   def Feedback(self, *args):
138     """Sends feedback from the LU code to the end-user.
139
140     """
141
142   def CheckCancel(self):
143     """Check whether job has been cancelled.
144
145     """
146
147   def SubmitManyJobs(self, jobs):
148     """Submits jobs for processing.
149
150     See L{jqueue.JobQueue.SubmitManyJobs}.
151
152     """
153     raise NotImplementedError
154
155
156 def _LUNameForOpName(opname):
157   """Computes the LU name for a given OpCode name.
158
159   """
160   assert opname.startswith(_OP_PREFIX), \
161       "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
162
163   return _LU_PREFIX + opname[len(_OP_PREFIX):]
164
165
166 def _ComputeDispatchTable():
167   """Computes the opcode-to-lu dispatch table.
168
169   """
170   return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
171               for op in opcodes.OP_MAPPING.values()
172               if op.WITH_LU)
173
174
175 class Processor(object):
176   """Object which runs OpCodes"""
177   DISPATCH_TABLE = _ComputeDispatchTable()
178
179   def __init__(self, context, ec_id):
180     """Constructor for Processor
181
182     @type context: GanetiContext
183     @param context: global Ganeti context
184     @type ec_id: string
185     @param ec_id: execution context identifier
186
187     """
188     self.context = context
189     self._ec_id = ec_id
190     self._cbs = None
191     self.rpc = rpc.RpcRunner(context.cfg)
192     self.hmclass = HooksMaster
193
194   def _AcquireLocks(self, level, names, shared, timeout, priority):
195     """Acquires locks via the Ganeti lock manager.
196
197     @type level: int
198     @param level: Lock level
199     @type names: list or string
200     @param names: Lock names
201     @type shared: bool
202     @param shared: Whether the locks should be acquired in shared mode
203     @type timeout: None or float
204     @param timeout: Timeout for acquiring the locks
205     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
206         amount of time
207
208     """
209     if self._cbs:
210       self._cbs.CheckCancel()
211
212     acquired = self.context.glm.acquire(level, names, shared=shared,
213                                         timeout=timeout, priority=priority)
214
215     if acquired is None:
216       raise LockAcquireTimeout()
217
218     return acquired
219
220   def _ProcessResult(self, result):
221     """
222
223     """
224     if isinstance(result, cmdlib.ResultWithJobs):
225       # Submit jobs
226       job_submission = self._cbs.SubmitManyJobs(result.jobs)
227
228       # Build dictionary
229       result = result.other
230
231       assert constants.JOB_IDS_KEY not in result, \
232         "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
233
234       result[constants.JOB_IDS_KEY] = job_submission
235
236     return result
237
238   def _ExecLU(self, lu):
239     """Logical Unit execution sequence.
240
241     """
242     write_count = self.context.cfg.write_count
243     lu.CheckPrereq()
244     hm = HooksMaster(self.rpc.call_hooks_runner, lu)
245     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
246     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
247                      self.Log, None)
248
249     if getattr(lu.op, "dry_run", False):
250       # in this mode, no post-hooks are run, and the config is not
251       # written (as it might have been modified by another LU, and we
252       # shouldn't do writeout on behalf of other threads
253       self.LogInfo("dry-run mode requested, not actually executing"
254                    " the operation")
255       return lu.dry_run_result
256
257     try:
258       result = self._ProcessResult(lu.Exec(self.Log))
259       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
260       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
261                                 self.Log, result)
262     finally:
263       # FIXME: This needs locks if not lu_class.REQ_BGL
264       if write_count != self.context.cfg.write_count:
265         hm.RunConfigUpdate()
266
267     return result
268
269   def _LockAndExecLU(self, lu, level, calc_timeout, priority):
270     """Execute a Logical Unit, with the needed locks.
271
272     This is a recursive function that starts locking the given level, and
273     proceeds up, till there are no more locks to acquire. Then it executes the
274     given LU and its opcodes.
275
276     """
277     adding_locks = level in lu.add_locks
278     acquiring_locks = level in lu.needed_locks
279     if level not in locking.LEVELS:
280       if self._cbs:
281         self._cbs.NotifyStart()
282
283       result = self._ExecLU(lu)
284
285     elif adding_locks and acquiring_locks:
286       # We could both acquire and add locks at the same level, but for now we
287       # don't need this, so we'll avoid the complicated code needed.
288       raise NotImplementedError("Can't declare locks to acquire when adding"
289                                 " others")
290
291     elif adding_locks or acquiring_locks:
292       lu.DeclareLocks(level)
293       share = lu.share_locks[level]
294
295       try:
296         assert adding_locks ^ acquiring_locks, \
297           "Locks must be either added or acquired"
298
299         if acquiring_locks:
300           # Acquiring locks
301           needed_locks = lu.needed_locks[level]
302
303           acquired = self._AcquireLocks(level, needed_locks, share,
304                                         calc_timeout(), priority)
305         else:
306           # Adding locks
307           add_locks = lu.add_locks[level]
308           lu.remove_locks[level] = add_locks
309
310           try:
311             self.context.glm.add(level, add_locks, acquired=1, shared=share)
312           except errors.LockError:
313             raise errors.OpPrereqError(
314               "Couldn't add locks (%s), probably because of a race condition"
315               " with another job, who added them first" % add_locks,
316               errors.ECODE_FAULT)
317
318           acquired = add_locks
319
320         try:
321           lu.acquired_locks[level] = acquired
322
323           result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
324         finally:
325           if level in lu.remove_locks:
326             self.context.glm.remove(level, lu.remove_locks[level])
327       finally:
328         if self.context.glm.is_owned(level):
329           self.context.glm.release(level)
330
331     else:
332       result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
333
334     return result
335
336   def ExecOpCode(self, op, cbs, timeout=None, priority=None):
337     """Execute an opcode.
338
339     @type op: an OpCode instance
340     @param op: the opcode to be executed
341     @type cbs: L{OpExecCbBase}
342     @param cbs: Runtime callbacks
343     @type timeout: float or None
344     @param timeout: Maximum time to acquire all locks, None for no timeout
345     @type priority: number or None
346     @param priority: Priority for acquiring lock(s)
347     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
348         amount of time
349
350     """
351     if not isinstance(op, opcodes.OpCode):
352       raise errors.ProgrammerError("Non-opcode instance passed"
353                                    " to ExecOpcode")
354
355     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
356     if lu_class is None:
357       raise errors.OpCodeUnknown("Unknown opcode")
358
359     if timeout is None:
360       calc_timeout = lambda: None
361     else:
362       calc_timeout = utils.RunningTimeout(timeout, False).Remaining
363
364     self._cbs = cbs
365     try:
366       # Acquire the Big Ganeti Lock exclusively if this LU requires it,
367       # and in a shared fashion otherwise (to prevent concurrent run with
368       # an exclusive LU.
369       self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
370                           not lu_class.REQ_BGL, calc_timeout(),
371                           priority)
372       try:
373         lu = lu_class(self, op, self.context, self.rpc)
374         lu.ExpandNames()
375         assert lu.needed_locks is not None, "needed_locks not set by LU"
376
377         try:
378           return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
379                                      priority)
380         finally:
381           if self._ec_id:
382             self.context.cfg.DropECReservations(self._ec_id)
383       finally:
384         self.context.glm.release(locking.LEVEL_CLUSTER)
385     finally:
386       self._cbs = None
387
388   def Log(self, *args):
389     """Forward call to feedback callback function.
390
391     """
392     if self._cbs:
393       self._cbs.Feedback(*args)
394
395   def LogStep(self, current, total, message):
396     """Log a change in LU execution progress.
397
398     """
399     logging.debug("Step %d/%d %s", current, total, message)
400     self.Log("STEP %d/%d %s" % (current, total, message))
401
402   def LogWarning(self, message, *args, **kwargs):
403     """Log a warning to the logs and the user.
404
405     The optional keyword argument is 'hint' and can be used to show a
406     hint to the user (presumably related to the warning). If the
407     message is empty, it will not be printed at all, allowing one to
408     show only a hint.
409
410     """
411     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
412            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
413     if args:
414       message = message % tuple(args)
415     if message:
416       logging.warning(message)
417       self.Log(" - WARNING: %s" % message)
418     if "hint" in kwargs:
419       self.Log("      Hint: %s" % kwargs["hint"])
420
421   def LogInfo(self, message, *args):
422     """Log an informational message to the logs and the user.
423
424     """
425     if args:
426       message = message % tuple(args)
427     logging.info(message)
428     self.Log(" - INFO: %s" % message)
429
430   def GetECId(self):
431     """Returns the current execution context ID.
432
433     """
434     if not self._ec_id:
435       raise errors.ProgrammerError("Tried to use execution context id when"
436                                    " not set")
437     return self._ec_id
438
439
440 class HooksMaster(object):
441   """Hooks master.
442
443   This class distributes the run commands to the nodes based on the
444   specific LU class.
445
446   In order to remove the direct dependency on the rpc module, the
447   constructor needs a function which actually does the remote
448   call. This will usually be rpc.call_hooks_runner, but any function
449   which behaves the same works.
450
451   """
452   def __init__(self, callfn, lu):
453     self.callfn = callfn
454     self.lu = lu
455     self.op = lu.op
456     self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
457
458     if self.lu.HPATH is None:
459       nodes = (None, None)
460     else:
461       nodes = map(frozenset, self.lu.BuildHooksNodes())
462
463     (self.pre_nodes, self.post_nodes) = nodes
464
465   def _BuildEnv(self, phase):
466     """Compute the environment and the target nodes.
467
468     Based on the opcode and the current node list, this builds the
469     environment for the hooks and the target node list for the run.
470
471     """
472     if phase == constants.HOOKS_PHASE_PRE:
473       prefix = "GANETI_"
474     elif phase == constants.HOOKS_PHASE_POST:
475       prefix = "GANETI_POST_"
476     else:
477       raise AssertionError("Unknown phase '%s'" % phase)
478
479     env = {}
480
481     if self.lu.HPATH is not None:
482       lu_env = self.lu.BuildHooksEnv()
483       if lu_env:
484         assert not compat.any(key.upper().startswith(prefix) for key in lu_env)
485         env.update(("%s%s" % (prefix, key), value)
486                    for (key, value) in lu_env.items())
487
488     if phase == constants.HOOKS_PHASE_PRE:
489       assert compat.all((key.startswith("GANETI_") and
490                          not key.startswith("GANETI_POST_"))
491                         for key in env)
492
493     elif phase == constants.HOOKS_PHASE_POST:
494       assert compat.all(key.startswith("GANETI_POST_") for key in env)
495       assert isinstance(self.pre_env, dict)
496
497       # Merge with pre-phase environment
498       assert not compat.any(key.startswith("GANETI_POST_")
499                             for key in self.pre_env)
500       env.update(self.pre_env)
501     else:
502       raise AssertionError("Unknown phase '%s'" % phase)
503
504     return env
505
506   def _RunWrapper(self, node_list, hpath, phase, phase_env):
507     """Simple wrapper over self.callfn.
508
509     This method fixes the environment before doing the rpc call.
510
511     """
512     cfg = self.lu.cfg
513
514     env = {
515       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
516       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
517       "GANETI_OP_CODE": self.op.OP_ID,
518       "GANETI_DATA_DIR": constants.DATA_DIR,
519       "GANETI_HOOKS_PHASE": phase,
520       "GANETI_HOOKS_PATH": hpath,
521       }
522
523     if self.lu.HTYPE:
524       env["GANETI_OBJECT_TYPE"] = self.lu.HTYPE
525
526     if cfg is not None:
527       env["GANETI_CLUSTER"] = cfg.GetClusterName()
528       env["GANETI_MASTER"] = cfg.GetMasterNode()
529
530     if phase_env:
531       assert not (set(env) & set(phase_env)), "Environment variables conflict"
532       env.update(phase_env)
533
534     # Convert everything to strings
535     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
536
537     assert compat.all(key == "PATH" or key.startswith("GANETI_")
538                       for key in env)
539
540     return self.callfn(node_list, hpath, phase, env)
541
542   def RunPhase(self, phase, nodes=None):
543     """Run all the scripts for a phase.
544
545     This is the main function of the HookMaster.
546
547     @param phase: one of L{constants.HOOKS_PHASE_POST} or
548         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
549     @param nodes: overrides the predefined list of nodes for the given phase
550     @return: the processed results of the hooks multi-node rpc call
551     @raise errors.HooksFailure: on communication failure to the nodes
552     @raise errors.HooksAbort: on failure of one of the hooks
553
554     """
555     if phase == constants.HOOKS_PHASE_PRE:
556       if nodes is None:
557         nodes = self.pre_nodes
558       env = self.pre_env
559     elif phase == constants.HOOKS_PHASE_POST:
560       if nodes is None:
561         nodes = self.post_nodes
562       env = self._BuildEnv(phase)
563     else:
564       raise AssertionError("Unknown phase '%s'" % phase)
565
566     if not nodes:
567       # empty node list, we should not attempt to run this as either
568       # we're in the cluster init phase and the rpc client part can't
569       # even attempt to run, or this LU doesn't do hooks at all
570       return
571
572     results = self._RunWrapper(nodes, self.lu.HPATH, phase, env)
573     if not results:
574       msg = "Communication Failure"
575       if phase == constants.HOOKS_PHASE_PRE:
576         raise errors.HooksFailure(msg)
577       else:
578         self.lu.LogWarning(msg)
579         return results
580
581     errs = []
582     for node_name in results:
583       res = results[node_name]
584       if res.offline:
585         continue
586
587       msg = res.fail_msg
588       if msg:
589         self.lu.LogWarning("Communication failure to node %s: %s",
590                            node_name, msg)
591         continue
592
593       for script, hkr, output in res.payload:
594         if hkr == constants.HKR_FAIL:
595           if phase == constants.HOOKS_PHASE_PRE:
596             errs.append((node_name, script, output))
597           else:
598             if not output:
599               output = "(no output)"
600             self.lu.LogWarning("On %s script %s failed, output: %s" %
601                                (node_name, script, output))
602
603     if errs and phase == constants.HOOKS_PHASE_PRE:
604       raise errors.HooksAbort(errs)
605
606     return results
607
608   def RunConfigUpdate(self):
609     """Run the special configuration update hook
610
611     This is a special hook that runs only on the master after each
612     top-level LI if the configuration has been updated.
613
614     """
615     phase = constants.HOOKS_PHASE_POST
616     hpath = constants.HOOKS_NAME_CFGUPDATE
617     nodes = [self.lu.cfg.GetMasterNode()]
618     self._RunWrapper(nodes, hpath, phase, self.pre_env)