Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ d385a174

History | View | Annotate | Download (16.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import logging
32
import random
33
import time
34

    
35
from ganeti import opcodes
36
from ganeti import constants
37
from ganeti import errors
38
from ganeti import rpc
39
from ganeti import cmdlib
40
from ganeti import locking
41
from ganeti import utils
42

    
43

    
44
_OP_PREFIX = "Op"
45
_LU_PREFIX = "LU"
46

    
47

    
48
class LockAcquireTimeout(Exception):
49
  """Exception to report timeouts on acquiring locks.
50

51
  """
52

    
53

    
54
def _CalculateLockAttemptTimeouts():
55
  """Calculate timeouts for lock attempts.
56

57
  """
58
  result = [constants.LOCK_ATTEMPTS_MINWAIT]
59
  running_sum = result[0]
60

    
61
  # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
62
  # blocking acquire
63
  while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
64
    timeout = (result[-1] * 1.05) ** 1.25
65

    
66
    # Cap max timeout. This gives other jobs a chance to run even if
67
    # we're still trying to get our locks, before finally moving to a
68
    # blocking acquire.
69
    timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
70
    # And also cap the lower boundary for safety
71
    timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
72

    
73
    result.append(timeout)
74
    running_sum += timeout
75

    
76
  return result
77

    
78

    
79
class LockAttemptTimeoutStrategy(object):
80
  """Class with lock acquire timeout strategy.
81

82
  """
83
  __slots__ = [
84
    "_timeouts",
85
    "_random_fn",
86
    "_time_fn",
87
    ]
88

    
89
  _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
90

    
91
  def __init__(self, _time_fn=time.time, _random_fn=random.random):
92
    """Initializes this class.
93

94
    @param _time_fn: Time function for unittests
95
    @param _random_fn: Random number generator for unittests
96

97
    """
98
    object.__init__(self)
99

    
100
    self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
101
    self._time_fn = _time_fn
102
    self._random_fn = _random_fn
103

    
104
  def NextAttempt(self):
105
    """Returns the timeout for the next attempt.
106

107
    """
108
    try:
109
      timeout = self._timeouts.next()
110
    except StopIteration:
111
      # No more timeouts, do blocking acquire
112
      timeout = None
113

    
114
    if timeout is not None:
115
      # Add a small variation (-/+ 5%) to timeout. This helps in situations
116
      # where two or more jobs are fighting for the same lock(s).
117
      variation_range = timeout * 0.1
118
      timeout += ((self._random_fn() * variation_range) -
119
                  (variation_range * 0.5))
120

    
121
    return timeout
122

    
123

    
124
class OpExecCbBase: # pylint: disable-msg=W0232
125
  """Base class for OpCode execution callbacks.
126

127
  """
128
  def NotifyStart(self):
129
    """Called when we are about to execute the LU.
130

131
    This function is called when we're about to start the lu's Exec() method,
132
    that is, after we have acquired all locks.
133

134
    """
135

    
136
  def Feedback(self, *args):
137
    """Sends feedback from the LU code to the end-user.
138

139
    """
140

    
141
  def CheckCancel(self):
142
    """Check whether job has been cancelled.
143

144
    """
145

    
146

    
147
def _LUNameForOpName(opname):
148
  """Computes the LU name for a given OpCode name.
149

150
  """
151
  assert opname.startswith(_OP_PREFIX), \
152
      "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
153

    
154
  return _LU_PREFIX + opname[len(_OP_PREFIX):]
155

    
156

    
157
def _ComputeDispatchTable():
158
  """Computes the opcode-to-lu dispatch table.
159

160
  """
161
  return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
162
              for op in opcodes.OP_MAPPING.values()
163
              if op.WITH_LU)
164

    
165

    
166
class Processor(object):
167
  """Object which runs OpCodes"""
168
  DISPATCH_TABLE = _ComputeDispatchTable()
169

    
170
  def __init__(self, context, ec_id):
171
    """Constructor for Processor
172

173
    @type context: GanetiContext
174
    @param context: global Ganeti context
175
    @type ec_id: string
176
    @param ec_id: execution context identifier
177

178
    """
179
    self.context = context
180
    self._ec_id = ec_id
181
    self._cbs = None
182
    self.rpc = rpc.RpcRunner(context.cfg)
183
    self.hmclass = HooksMaster
184

    
185
  def _AcquireLocks(self, level, names, shared, timeout, priority):
186
    """Acquires locks via the Ganeti lock manager.
187

188
    @type level: int
189
    @param level: Lock level
190
    @type names: list or string
191
    @param names: Lock names
192
    @type shared: bool
193
    @param shared: Whether the locks should be acquired in shared mode
194
    @type timeout: None or float
195
    @param timeout: Timeout for acquiring the locks
196
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
197
        amount of time
198

199
    """
200
    if self._cbs:
201
      self._cbs.CheckCancel()
202

    
203
    acquired = self.context.glm.acquire(level, names, shared=shared,
204
                                        timeout=timeout, priority=priority)
205

    
206
    if acquired is None:
207
      raise LockAcquireTimeout()
208

    
209
    return acquired
210

    
211
  def _ExecLU(self, lu):
212
    """Logical Unit execution sequence.
213

214
    """
215
    write_count = self.context.cfg.write_count
216
    lu.CheckPrereq()
217
    hm = HooksMaster(self.rpc.call_hooks_runner, lu)
218
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
219
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
220
                     self.Log, None)
221

    
222
    if getattr(lu.op, "dry_run", False):
223
      # in this mode, no post-hooks are run, and the config is not
224
      # written (as it might have been modified by another LU, and we
225
      # shouldn't do writeout on behalf of other threads
226
      self.LogInfo("dry-run mode requested, not actually executing"
227
                   " the operation")
228
      return lu.dry_run_result
229

    
230
    try:
231
      result = lu.Exec(self.Log)
232
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
233
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
234
                                self.Log, result)
235
    finally:
236
      # FIXME: This needs locks if not lu_class.REQ_BGL
237
      if write_count != self.context.cfg.write_count:
238
        hm.RunConfigUpdate()
239

    
240
    return result
241

    
242
  def _LockAndExecLU(self, lu, level, calc_timeout, priority):
243
    """Execute a Logical Unit, with the needed locks.
244

245
    This is a recursive function that starts locking the given level, and
246
    proceeds up, till there are no more locks to acquire. Then it executes the
247
    given LU and its opcodes.
248

249
    """
250
    adding_locks = level in lu.add_locks
251
    acquiring_locks = level in lu.needed_locks
252
    if level not in locking.LEVELS:
253
      if self._cbs:
254
        self._cbs.NotifyStart()
255

    
256
      result = self._ExecLU(lu)
257

    
258
    elif adding_locks and acquiring_locks:
259
      # We could both acquire and add locks at the same level, but for now we
260
      # don't need this, so we'll avoid the complicated code needed.
261
      raise NotImplementedError("Can't declare locks to acquire when adding"
262
                                " others")
263

    
264
    elif adding_locks or acquiring_locks:
265
      lu.DeclareLocks(level)
266
      share = lu.share_locks[level]
267

    
268
      try:
269
        assert adding_locks ^ acquiring_locks, \
270
          "Locks must be either added or acquired"
271

    
272
        if acquiring_locks:
273
          # Acquiring locks
274
          needed_locks = lu.needed_locks[level]
275

    
276
          acquired = self._AcquireLocks(level, needed_locks, share,
277
                                        calc_timeout(), priority)
278
        else:
279
          # Adding locks
280
          add_locks = lu.add_locks[level]
281
          lu.remove_locks[level] = add_locks
282

    
283
          try:
284
            self.context.glm.add(level, add_locks, acquired=1, shared=share)
285
          except errors.LockError:
286
            raise errors.OpPrereqError(
287
              "Couldn't add locks (%s), probably because of a race condition"
288
              " with another job, who added them first" % add_locks,
289
              errors.ECODE_FAULT)
290

    
291
          acquired = add_locks
292

    
293
        try:
294
          lu.acquired_locks[level] = acquired
295

    
296
          result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
297
        finally:
298
          if level in lu.remove_locks:
299
            self.context.glm.remove(level, lu.remove_locks[level])
300
      finally:
301
        if self.context.glm.is_owned(level):
302
          self.context.glm.release(level)
303

    
304
    else:
305
      result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
306

    
307
    return result
308

    
309
  def ExecOpCode(self, op, cbs, timeout=None, priority=None):
310
    """Execute an opcode.
311

312
    @type op: an OpCode instance
313
    @param op: the opcode to be executed
314
    @type cbs: L{OpExecCbBase}
315
    @param cbs: Runtime callbacks
316
    @type timeout: float or None
317
    @param timeout: Maximum time to acquire all locks, None for no timeout
318
    @type priority: number or None
319
    @param priority: Priority for acquiring lock(s)
320
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
321
        amount of time
322

323
    """
324
    if not isinstance(op, opcodes.OpCode):
325
      raise errors.ProgrammerError("Non-opcode instance passed"
326
                                   " to ExecOpcode")
327

    
328
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
329
    if lu_class is None:
330
      raise errors.OpCodeUnknown("Unknown opcode")
331

    
332
    if timeout is None:
333
      calc_timeout = lambda: None
334
    else:
335
      calc_timeout = utils.RunningTimeout(timeout, False).Remaining
336

    
337
    self._cbs = cbs
338
    try:
339
      # Acquire the Big Ganeti Lock exclusively if this LU requires it,
340
      # and in a shared fashion otherwise (to prevent concurrent run with
341
      # an exclusive LU.
342
      self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
343
                          not lu_class.REQ_BGL, calc_timeout(),
344
                          priority)
345
      try:
346
        lu = lu_class(self, op, self.context, self.rpc)
347
        lu.ExpandNames()
348
        assert lu.needed_locks is not None, "needed_locks not set by LU"
349

    
350
        try:
351
          return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
352
                                     priority)
353
        finally:
354
          if self._ec_id:
355
            self.context.cfg.DropECReservations(self._ec_id)
356
      finally:
357
        self.context.glm.release(locking.LEVEL_CLUSTER)
358
    finally:
359
      self._cbs = None
360

    
361
  def Log(self, *args):
362
    """Forward call to feedback callback function.
363

364
    """
365
    if self._cbs:
366
      self._cbs.Feedback(*args)
367

    
368
  def LogStep(self, current, total, message):
369
    """Log a change in LU execution progress.
370

371
    """
372
    logging.debug("Step %d/%d %s", current, total, message)
373
    self.Log("STEP %d/%d %s" % (current, total, message))
374

    
375
  def LogWarning(self, message, *args, **kwargs):
376
    """Log a warning to the logs and the user.
377

378
    The optional keyword argument is 'hint' and can be used to show a
379
    hint to the user (presumably related to the warning). If the
380
    message is empty, it will not be printed at all, allowing one to
381
    show only a hint.
382

383
    """
384
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
385
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
386
    if args:
387
      message = message % tuple(args)
388
    if message:
389
      logging.warning(message)
390
      self.Log(" - WARNING: %s" % message)
391
    if "hint" in kwargs:
392
      self.Log("      Hint: %s" % kwargs["hint"])
393

    
394
  def LogInfo(self, message, *args):
395
    """Log an informational message to the logs and the user.
396

397
    """
398
    if args:
399
      message = message % tuple(args)
400
    logging.info(message)
401
    self.Log(" - INFO: %s" % message)
402

    
403
  def GetECId(self):
404
    """Returns the current execution context ID.
405

406
    """
407
    if not self._ec_id:
408
      raise errors.ProgrammerError("Tried to use execution context id when"
409
                                   " not set")
410
    return self._ec_id
411

    
412

    
413
class HooksMaster(object):
414
  """Hooks master.
415

416
  This class distributes the run commands to the nodes based on the
417
  specific LU class.
418

419
  In order to remove the direct dependency on the rpc module, the
420
  constructor needs a function which actually does the remote
421
  call. This will usually be rpc.call_hooks_runner, but any function
422
  which behaves the same works.
423

424
  """
425
  def __init__(self, callfn, lu):
426
    self.callfn = callfn
427
    self.lu = lu
428
    self.op = lu.op
429
    self.env, node_list_pre, node_list_post = self._BuildEnv()
430
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
431
                      constants.HOOKS_PHASE_POST: node_list_post}
432

    
433
  def _BuildEnv(self):
434
    """Compute the environment and the target nodes.
435

436
    Based on the opcode and the current node list, this builds the
437
    environment for the hooks and the target node list for the run.
438

439
    """
440
    env = {
441
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
442
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
443
      "GANETI_OP_CODE": self.op.OP_ID,
444
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
445
      "GANETI_DATA_DIR": constants.DATA_DIR,
446
      }
447

    
448
    if self.lu.HPATH is not None:
449
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
450
      if lu_env:
451
        for key in lu_env:
452
          env["GANETI_" + key] = lu_env[key]
453
    else:
454
      lu_nodes_pre = lu_nodes_post = []
455

    
456
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
457

    
458
  def _RunWrapper(self, node_list, hpath, phase):
459
    """Simple wrapper over self.callfn.
460

461
    This method fixes the environment before doing the rpc call.
462

463
    """
464
    env = self.env.copy()
465
    env["GANETI_HOOKS_PHASE"] = phase
466
    env["GANETI_HOOKS_PATH"] = hpath
467
    if self.lu.cfg is not None:
468
      env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
469
      env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
470

    
471
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
472

    
473
    return self.callfn(node_list, hpath, phase, env)
474

    
475
  def RunPhase(self, phase, nodes=None):
476
    """Run all the scripts for a phase.
477

478
    This is the main function of the HookMaster.
479

480
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
481
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
482
    @param nodes: overrides the predefined list of nodes for the given phase
483
    @return: the processed results of the hooks multi-node rpc call
484
    @raise errors.HooksFailure: on communication failure to the nodes
485
    @raise errors.HooksAbort: on failure of one of the hooks
486

487
    """
488
    if not self.node_list[phase] and not nodes:
489
      # empty node list, we should not attempt to run this as either
490
      # we're in the cluster init phase and the rpc client part can't
491
      # even attempt to run, or this LU doesn't do hooks at all
492
      return
493
    hpath = self.lu.HPATH
494
    if nodes is not None:
495
      results = self._RunWrapper(nodes, hpath, phase)
496
    else:
497
      results = self._RunWrapper(self.node_list[phase], hpath, phase)
498
    errs = []
499
    if not results:
500
      msg = "Communication Failure"
501
      if phase == constants.HOOKS_PHASE_PRE:
502
        raise errors.HooksFailure(msg)
503
      else:
504
        self.lu.LogWarning(msg)
505
        return results
506
    for node_name in results:
507
      res = results[node_name]
508
      if res.offline:
509
        continue
510
      msg = res.fail_msg
511
      if msg:
512
        self.lu.LogWarning("Communication failure to node %s: %s",
513
                           node_name, msg)
514
        continue
515
      for script, hkr, output in res.payload:
516
        if hkr == constants.HKR_FAIL:
517
          if phase == constants.HOOKS_PHASE_PRE:
518
            errs.append((node_name, script, output))
519
          else:
520
            if not output:
521
              output = "(no output)"
522
            self.lu.LogWarning("On %s script %s failed, output: %s" %
523
                               (node_name, script, output))
524
    if errs and phase == constants.HOOKS_PHASE_PRE:
525
      raise errors.HooksAbort(errs)
526
    return results
527

    
528
  def RunConfigUpdate(self):
529
    """Run the special configuration update hook
530

531
    This is a special hook that runs only on the master after each
532
    top-level LI if the configuration has been updated.
533

534
    """
535
    phase = constants.HOOKS_PHASE_POST
536
    hpath = constants.HOOKS_NAME_CFGUPDATE
537
    nodes = [self.lu.cfg.GetMasterNode()]
538
    self._RunWrapper(nodes, hpath, phase)