Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 4fe5cf90

History | View | Annotate | Download (18.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import logging
32
import random
33
import time
34

    
35
from ganeti import opcodes
36
from ganeti import constants
37
from ganeti import errors
38
from ganeti import rpc
39
from ganeti import cmdlib
40
from ganeti import locking
41
from ganeti import utils
42

    
43

    
44
class LockAcquireTimeout(Exception):
45
  """Exception to report timeouts on acquiring locks.
46

47
  """
48

    
49

    
50
def _CalculateLockAttemptTimeouts():
51
  """Calculate timeouts for lock attempts.
52

53
  """
54
  result = [1.0]
55

    
56
  # Wait for a total of at least 150s before doing a blocking acquire
57
  while sum(result) < 150.0:
58
    timeout = (result[-1] * 1.05) ** 1.25
59

    
60
    # Cap timeout at 10 seconds. This gives other jobs a chance to run
61
    # even if we're still trying to get our locks, before finally moving
62
    # to a blocking acquire.
63
    if timeout > 10.0:
64
      timeout = 10.0
65

    
66
    elif timeout < 0.1:
67
      # Lower boundary for safety
68
      timeout = 0.1
69

    
70
    result.append(timeout)
71

    
72
  return result
73

    
74

    
75
class LockAttemptTimeoutStrategy(object):
76
  """Class with lock acquire timeout strategy.
77

78
  """
79
  __slots__ = [
80
    "_timeouts",
81
    "_random_fn",
82
    "_time_fn",
83
    ]
84

    
85
  _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
86

    
87
  def __init__(self, _time_fn=time.time, _random_fn=random.random):
88
    """Initializes this class.
89

90
    @param _time_fn: Time function for unittests
91
    @param _random_fn: Random number generator for unittests
92

93
    """
94
    object.__init__(self)
95

    
96
    self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
97
    self._time_fn = _time_fn
98
    self._random_fn = _random_fn
99

    
100
  def NextAttempt(self):
101
    """Returns the timeout for the next attempt.
102

103
    """
104
    try:
105
      timeout = self._timeouts.next()
106
    except StopIteration:
107
      # No more timeouts, do blocking acquire
108
      timeout = None
109

    
110
    if timeout is not None:
111
      # Add a small variation (-/+ 5%) to timeout. This helps in situations
112
      # where two or more jobs are fighting for the same lock(s).
113
      variation_range = timeout * 0.1
114
      timeout += ((self._random_fn() * variation_range) -
115
                  (variation_range * 0.5))
116

    
117
    return timeout
118

    
119

    
120
class OpExecCbBase: # pylint: disable-msg=W0232
121
  """Base class for OpCode execution callbacks.
122

123
  """
124
  def NotifyStart(self):
125
    """Called when we are about to execute the LU.
126

127
    This function is called when we're about to start the lu's Exec() method,
128
    that is, after we have acquired all locks.
129

130
    """
131

    
132
  def Feedback(self, *args):
133
    """Sends feedback from the LU code to the end-user.
134

135
    """
136

    
137
  def CheckCancel(self):
138
    """Check whether job has been cancelled.
139

140
    """
141

    
142

    
143
class Processor(object):
144
  """Object which runs OpCodes"""
145
  DISPATCH_TABLE = {
146
    # Cluster
147
    opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
148
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
149
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
150
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
151
    opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
152
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
153
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
154
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
155
    opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
156
    opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
157
    opcodes.OpQuery: cmdlib.LUQuery,
158
    opcodes.OpQueryFields: cmdlib.LUQueryFields,
159
    # node lu
160
    opcodes.OpAddNode: cmdlib.LUAddNode,
161
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
162
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
163
    opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
164
    opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
165
    opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
166
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
167
    opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
168
    opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
169
    opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
170
    opcodes.OpNodeEvacuationStrategy: cmdlib.LUNodeEvacuationStrategy,
171
    # instance lu
172
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
173
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
174
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
175
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
176
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
177
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
178
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
179
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
180
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
181
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
182
    opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
183
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
184
    opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
185
    opcodes.OpMoveInstance: cmdlib.LUMoveInstance,
186
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
187
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
188
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
189
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
190
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
191
    # node group lu
192
    opcodes.OpAddGroup: cmdlib.LUAddGroup,
193
    opcodes.OpQueryGroups: cmdlib.LUQueryGroups,
194
    opcodes.OpRemoveGroup: cmdlib.LURemoveGroup,
195
    opcodes.OpRenameGroup: cmdlib.LURenameGroup,
196
    # os lu
197
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
198
    # exports lu
199
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
200
    opcodes.OpPrepareExport: cmdlib.LUPrepareExport,
201
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
202
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
203
    # tags lu
204
    opcodes.OpGetTags: cmdlib.LUGetTags,
205
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
206
    opcodes.OpAddTags: cmdlib.LUAddTags,
207
    opcodes.OpDelTags: cmdlib.LUDelTags,
208
    # test lu
209
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
210
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
211
    opcodes.OpTestJobqueue: cmdlib.LUTestJobqueue,
212
    # OOB lu
213
    opcodes.OpOutOfBand: cmdlib.LUOutOfBand,
214
    }
215

    
216
  def __init__(self, context, ec_id):
217
    """Constructor for Processor
218

219
    @type context: GanetiContext
220
    @param context: global Ganeti context
221
    @type ec_id: string
222
    @param ec_id: execution context identifier
223

224
    """
225
    self.context = context
226
    self._ec_id = ec_id
227
    self._cbs = None
228
    self.rpc = rpc.RpcRunner(context.cfg)
229
    self.hmclass = HooksMaster
230

    
231
  def _AcquireLocks(self, level, names, shared, timeout, priority):
232
    """Acquires locks via the Ganeti lock manager.
233

234
    @type level: int
235
    @param level: Lock level
236
    @type names: list or string
237
    @param names: Lock names
238
    @type shared: bool
239
    @param shared: Whether the locks should be acquired in shared mode
240
    @type timeout: None or float
241
    @param timeout: Timeout for acquiring the locks
242
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
243
        amount of time
244

245
    """
246
    if self._cbs:
247
      self._cbs.CheckCancel()
248

    
249
    acquired = self.context.glm.acquire(level, names, shared=shared,
250
                                        timeout=timeout, priority=priority)
251

    
252
    if acquired is None:
253
      raise LockAcquireTimeout()
254

    
255
    return acquired
256

    
257
  def _ExecLU(self, lu):
258
    """Logical Unit execution sequence.
259

260
    """
261
    write_count = self.context.cfg.write_count
262
    lu.CheckPrereq()
263
    hm = HooksMaster(self.rpc.call_hooks_runner, lu)
264
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
265
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
266
                     self.Log, None)
267

    
268
    if getattr(lu.op, "dry_run", False):
269
      # in this mode, no post-hooks are run, and the config is not
270
      # written (as it might have been modified by another LU, and we
271
      # shouldn't do writeout on behalf of other threads
272
      self.LogInfo("dry-run mode requested, not actually executing"
273
                   " the operation")
274
      return lu.dry_run_result
275

    
276
    try:
277
      result = lu.Exec(self.Log)
278
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
279
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
280
                                self.Log, result)
281
    finally:
282
      # FIXME: This needs locks if not lu_class.REQ_BGL
283
      if write_count != self.context.cfg.write_count:
284
        hm.RunConfigUpdate()
285

    
286
    return result
287

    
288
  def _LockAndExecLU(self, lu, level, calc_timeout, priority):
289
    """Execute a Logical Unit, with the needed locks.
290

291
    This is a recursive function that starts locking the given level, and
292
    proceeds up, till there are no more locks to acquire. Then it executes the
293
    given LU and its opcodes.
294

295
    """
296
    adding_locks = level in lu.add_locks
297
    acquiring_locks = level in lu.needed_locks
298
    if level not in locking.LEVELS:
299
      if self._cbs:
300
        self._cbs.NotifyStart()
301

    
302
      result = self._ExecLU(lu)
303

    
304
    elif adding_locks and acquiring_locks:
305
      # We could both acquire and add locks at the same level, but for now we
306
      # don't need this, so we'll avoid the complicated code needed.
307
      raise NotImplementedError("Can't declare locks to acquire when adding"
308
                                " others")
309

    
310
    elif adding_locks or acquiring_locks:
311
      lu.DeclareLocks(level)
312
      share = lu.share_locks[level]
313

    
314
      try:
315
        assert adding_locks ^ acquiring_locks, \
316
          "Locks must be either added or acquired"
317

    
318
        if acquiring_locks:
319
          # Acquiring locks
320
          needed_locks = lu.needed_locks[level]
321

    
322
          acquired = self._AcquireLocks(level, needed_locks, share,
323
                                        calc_timeout(), priority)
324
        else:
325
          # Adding locks
326
          add_locks = lu.add_locks[level]
327
          lu.remove_locks[level] = add_locks
328

    
329
          try:
330
            self.context.glm.add(level, add_locks, acquired=1, shared=share)
331
          except errors.LockError:
332
            raise errors.OpPrereqError(
333
              "Couldn't add locks (%s), probably because of a race condition"
334
              " with another job, who added them first" % add_locks,
335
              errors.ECODE_FAULT)
336

    
337
          acquired = add_locks
338

    
339
        try:
340
          lu.acquired_locks[level] = acquired
341

    
342
          result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
343
        finally:
344
          if level in lu.remove_locks:
345
            self.context.glm.remove(level, lu.remove_locks[level])
346
      finally:
347
        if self.context.glm.is_owned(level):
348
          self.context.glm.release(level)
349

    
350
    else:
351
      result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
352

    
353
    return result
354

    
355
  def ExecOpCode(self, op, cbs, timeout=None, priority=None):
356
    """Execute an opcode.
357

358
    @type op: an OpCode instance
359
    @param op: the opcode to be executed
360
    @type cbs: L{OpExecCbBase}
361
    @param cbs: Runtime callbacks
362
    @type timeout: float or None
363
    @param timeout: Maximum time to acquire all locks, None for no timeout
364
    @type priority: number or None
365
    @param priority: Priority for acquiring lock(s)
366
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
367
        amount of time
368

369
    """
370
    if not isinstance(op, opcodes.OpCode):
371
      raise errors.ProgrammerError("Non-opcode instance passed"
372
                                   " to ExecOpcode")
373

    
374
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
375
    if lu_class is None:
376
      raise errors.OpCodeUnknown("Unknown opcode")
377

    
378
    if timeout is None:
379
      calc_timeout = lambda: None
380
    else:
381
      calc_timeout = utils.RunningTimeout(timeout, False).Remaining
382

    
383
    self._cbs = cbs
384
    try:
385
      # Acquire the Big Ganeti Lock exclusively if this LU requires it,
386
      # and in a shared fashion otherwise (to prevent concurrent run with
387
      # an exclusive LU.
388
      self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
389
                          not lu_class.REQ_BGL, calc_timeout(),
390
                          priority)
391
      try:
392
        lu = lu_class(self, op, self.context, self.rpc)
393
        lu.ExpandNames()
394
        assert lu.needed_locks is not None, "needed_locks not set by LU"
395

    
396
        try:
397
          return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
398
                                     priority)
399
        finally:
400
          if self._ec_id:
401
            self.context.cfg.DropECReservations(self._ec_id)
402
      finally:
403
        self.context.glm.release(locking.LEVEL_CLUSTER)
404
    finally:
405
      self._cbs = None
406

    
407
  def Log(self, *args):
408
    """Forward call to feedback callback function.
409

410
    """
411
    if self._cbs:
412
      self._cbs.Feedback(*args)
413

    
414
  def LogStep(self, current, total, message):
415
    """Log a change in LU execution progress.
416

417
    """
418
    logging.debug("Step %d/%d %s", current, total, message)
419
    self.Log("STEP %d/%d %s" % (current, total, message))
420

    
421
  def LogWarning(self, message, *args, **kwargs):
422
    """Log a warning to the logs and the user.
423

424
    The optional keyword argument is 'hint' and can be used to show a
425
    hint to the user (presumably related to the warning). If the
426
    message is empty, it will not be printed at all, allowing one to
427
    show only a hint.
428

429
    """
430
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
431
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
432
    if args:
433
      message = message % tuple(args)
434
    if message:
435
      logging.warning(message)
436
      self.Log(" - WARNING: %s" % message)
437
    if "hint" in kwargs:
438
      self.Log("      Hint: %s" % kwargs["hint"])
439

    
440
  def LogInfo(self, message, *args):
441
    """Log an informational message to the logs and the user.
442

443
    """
444
    if args:
445
      message = message % tuple(args)
446
    logging.info(message)
447
    self.Log(" - INFO: %s" % message)
448

    
449
  def GetECId(self):
450
    if not self._ec_id:
451
      errors.ProgrammerError("Tried to use execution context id when not set")
452
    return self._ec_id
453

    
454

    
455
class HooksMaster(object):
456
  """Hooks master.
457

458
  This class distributes the run commands to the nodes based on the
459
  specific LU class.
460

461
  In order to remove the direct dependency on the rpc module, the
462
  constructor needs a function which actually does the remote
463
  call. This will usually be rpc.call_hooks_runner, but any function
464
  which behaves the same works.
465

466
  """
467
  def __init__(self, callfn, lu):
468
    self.callfn = callfn
469
    self.lu = lu
470
    self.op = lu.op
471
    self.env, node_list_pre, node_list_post = self._BuildEnv()
472
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
473
                      constants.HOOKS_PHASE_POST: node_list_post}
474

    
475
  def _BuildEnv(self):
476
    """Compute the environment and the target nodes.
477

478
    Based on the opcode and the current node list, this builds the
479
    environment for the hooks and the target node list for the run.
480

481
    """
482
    env = {
483
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
484
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
485
      "GANETI_OP_CODE": self.op.OP_ID,
486
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
487
      "GANETI_DATA_DIR": constants.DATA_DIR,
488
      }
489

    
490
    if self.lu.HPATH is not None:
491
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
492
      if lu_env:
493
        for key in lu_env:
494
          env["GANETI_" + key] = lu_env[key]
495
    else:
496
      lu_nodes_pre = lu_nodes_post = []
497

    
498
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
499

    
500
  def _RunWrapper(self, node_list, hpath, phase):
501
    """Simple wrapper over self.callfn.
502

503
    This method fixes the environment before doing the rpc call.
504

505
    """
506
    env = self.env.copy()
507
    env["GANETI_HOOKS_PHASE"] = phase
508
    env["GANETI_HOOKS_PATH"] = hpath
509
    if self.lu.cfg is not None:
510
      env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
511
      env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
512

    
513
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
514

    
515
    return self.callfn(node_list, hpath, phase, env)
516

    
517
  def RunPhase(self, phase, nodes=None):
518
    """Run all the scripts for a phase.
519

520
    This is the main function of the HookMaster.
521

522
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
523
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
524
    @param nodes: overrides the predefined list of nodes for the given phase
525
    @return: the processed results of the hooks multi-node rpc call
526
    @raise errors.HooksFailure: on communication failure to the nodes
527
    @raise errors.HooksAbort: on failure of one of the hooks
528

529
    """
530
    if not self.node_list[phase] and not nodes:
531
      # empty node list, we should not attempt to run this as either
532
      # we're in the cluster init phase and the rpc client part can't
533
      # even attempt to run, or this LU doesn't do hooks at all
534
      return
535
    hpath = self.lu.HPATH
536
    if nodes is not None:
537
      results = self._RunWrapper(nodes, hpath, phase)
538
    else:
539
      results = self._RunWrapper(self.node_list[phase], hpath, phase)
540
    errs = []
541
    if not results:
542
      msg = "Communication Failure"
543
      if phase == constants.HOOKS_PHASE_PRE:
544
        raise errors.HooksFailure(msg)
545
      else:
546
        self.lu.LogWarning(msg)
547
        return results
548
    for node_name in results:
549
      res = results[node_name]
550
      if res.offline:
551
        continue
552
      msg = res.fail_msg
553
      if msg:
554
        self.lu.LogWarning("Communication failure to node %s: %s",
555
                           node_name, msg)
556
        continue
557
      for script, hkr, output in res.payload:
558
        if hkr == constants.HKR_FAIL:
559
          if phase == constants.HOOKS_PHASE_PRE:
560
            errs.append((node_name, script, output))
561
          else:
562
            if not output:
563
              output = "(no output)"
564
            self.lu.LogWarning("On %s script %s failed, output: %s" %
565
                               (node_name, script, output))
566
    if errs and phase == constants.HOOKS_PHASE_PRE:
567
      raise errors.HooksAbort(errs)
568
    return results
569

    
570
  def RunConfigUpdate(self):
571
    """Run the special configuration update hook
572

573
    This is a special hook that runs only on the master after each
574
    top-level LI if the configuration has been updated.
575

576
    """
577
    phase = constants.HOOKS_PHASE_POST
578
    hpath = constants.HOOKS_NAME_CFGUPDATE
579
    nodes = [self.lu.cfg.GetMasterNode()]
580
    self._RunWrapper(nodes, hpath, phase)