Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 70a6a926

History | View | Annotate | Download (18.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import logging
32
import random
33
import time
34

    
35
from ganeti import opcodes
36
from ganeti import constants
37
from ganeti import errors
38
from ganeti import rpc
39
from ganeti import cmdlib
40
from ganeti import locking
41
from ganeti import utils
42

    
43

    
44
class LockAcquireTimeout(Exception):
45
  """Exception to report timeouts on acquiring locks.
46

47
  """
48

    
49

    
50
def _CalculateLockAttemptTimeouts():
51
  """Calculate timeouts for lock attempts.
52

53
  """
54
  result = [1.0]
55

    
56
  # Wait for a total of at least 150s before doing a blocking acquire
57
  while sum(result) < 150.0:
58
    timeout = (result[-1] * 1.05) ** 1.25
59

    
60
    # Cap timeout at 10 seconds. This gives other jobs a chance to run
61
    # even if we're still trying to get our locks, before finally moving
62
    # to a blocking acquire.
63
    if timeout > 10.0:
64
      timeout = 10.0
65

    
66
    elif timeout < 0.1:
67
      # Lower boundary for safety
68
      timeout = 0.1
69

    
70
    result.append(timeout)
71

    
72
  return result
73

    
74

    
75
class LockAttemptTimeoutStrategy(object):
76
  """Class with lock acquire timeout strategy.
77

78
  """
79
  __slots__ = [
80
    "_timeouts",
81
    "_random_fn",
82
    "_time_fn",
83
    ]
84

    
85
  _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
86

    
87
  def __init__(self, _time_fn=time.time, _random_fn=random.random):
88
    """Initializes this class.
89

90
    @param _time_fn: Time function for unittests
91
    @param _random_fn: Random number generator for unittests
92

93
    """
94
    object.__init__(self)
95

    
96
    self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
97
    self._time_fn = _time_fn
98
    self._random_fn = _random_fn
99

    
100
  def NextAttempt(self):
101
    """Returns the timeout for the next attempt.
102

103
    """
104
    try:
105
      timeout = self._timeouts.next()
106
    except StopIteration:
107
      # No more timeouts, do blocking acquire
108
      timeout = None
109

    
110
    if timeout is not None:
111
      # Add a small variation (-/+ 5%) to timeout. This helps in situations
112
      # where two or more jobs are fighting for the same lock(s).
113
      variation_range = timeout * 0.1
114
      timeout += ((self._random_fn() * variation_range) -
115
                  (variation_range * 0.5))
116

    
117
    return timeout
118

    
119

    
120
class OpExecCbBase: # pylint: disable-msg=W0232
121
  """Base class for OpCode execution callbacks.
122

123
  """
124
  def NotifyStart(self):
125
    """Called when we are about to execute the LU.
126

127
    This function is called when we're about to start the lu's Exec() method,
128
    that is, after we have acquired all locks.
129

130
    """
131

    
132
  def Feedback(self, *args):
133
    """Sends feedback from the LU code to the end-user.
134

135
    """
136

    
137
  def CheckCancel(self):
138
    """Check whether job has been cancelled.
139

140
    """
141

    
142

    
143
class Processor(object):
144
  """Object which runs OpCodes"""
145
  DISPATCH_TABLE = {
146
    # Cluster
147
    opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
148
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
149
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
150
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
151
    opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
152
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
153
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
154
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
155
    opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
156
    opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
157
    opcodes.OpQuery: cmdlib.LUQuery,
158
    opcodes.OpQueryFields: cmdlib.LUQueryFields,
159
    # node lu
160
    opcodes.OpAddNode: cmdlib.LUAddNode,
161
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
162
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
163
    opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
164
    opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
165
    opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
166
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
167
    opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
168
    opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
169
    opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
170
    opcodes.OpNodeEvacuationStrategy: cmdlib.LUNodeEvacuationStrategy,
171
    # instance lu
172
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
173
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
174
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
175
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
176
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
177
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
178
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
179
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
180
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
181
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
182
    opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
183
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
184
    opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
185
    opcodes.OpMoveInstance: cmdlib.LUMoveInstance,
186
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
187
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
188
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
189
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
190
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
191
    # node group lu
192
    opcodes.OpQueryGroups: cmdlib.LUQueryGroups,
193
    # os lu
194
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
195
    # exports lu
196
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
197
    opcodes.OpPrepareExport: cmdlib.LUPrepareExport,
198
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
199
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
200
    # tags lu
201
    opcodes.OpGetTags: cmdlib.LUGetTags,
202
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
203
    opcodes.OpAddTags: cmdlib.LUAddTags,
204
    opcodes.OpDelTags: cmdlib.LUDelTags,
205
    # test lu
206
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
207
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
208
    opcodes.OpTestJobqueue: cmdlib.LUTestJobqueue,
209
    }
210

    
211
  def __init__(self, context, ec_id):
212
    """Constructor for Processor
213

214
    @type context: GanetiContext
215
    @param context: global Ganeti context
216
    @type ec_id: string
217
    @param ec_id: execution context identifier
218

219
    """
220
    self.context = context
221
    self._ec_id = ec_id
222
    self._cbs = None
223
    self.rpc = rpc.RpcRunner(context.cfg)
224
    self.hmclass = HooksMaster
225

    
226
  def _AcquireLocks(self, level, names, shared, timeout, priority):
227
    """Acquires locks via the Ganeti lock manager.
228

229
    @type level: int
230
    @param level: Lock level
231
    @type names: list or string
232
    @param names: Lock names
233
    @type shared: bool
234
    @param shared: Whether the locks should be acquired in shared mode
235
    @type timeout: None or float
236
    @param timeout: Timeout for acquiring the locks
237
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
238
        amount of time
239

240
    """
241
    if self._cbs:
242
      self._cbs.CheckCancel()
243

    
244
    acquired = self.context.glm.acquire(level, names, shared=shared,
245
                                        timeout=timeout, priority=priority)
246

    
247
    if acquired is None:
248
      raise LockAcquireTimeout()
249

    
250
    return acquired
251

    
252
  def _ExecLU(self, lu):
253
    """Logical Unit execution sequence.
254

255
    """
256
    write_count = self.context.cfg.write_count
257
    lu.CheckPrereq()
258
    hm = HooksMaster(self.rpc.call_hooks_runner, lu)
259
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
260
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
261
                     self.Log, None)
262

    
263
    if getattr(lu.op, "dry_run", False):
264
      # in this mode, no post-hooks are run, and the config is not
265
      # written (as it might have been modified by another LU, and we
266
      # shouldn't do writeout on behalf of other threads
267
      self.LogInfo("dry-run mode requested, not actually executing"
268
                   " the operation")
269
      return lu.dry_run_result
270

    
271
    try:
272
      result = lu.Exec(self.Log)
273
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
274
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
275
                                self.Log, result)
276
    finally:
277
      # FIXME: This needs locks if not lu_class.REQ_BGL
278
      if write_count != self.context.cfg.write_count:
279
        hm.RunConfigUpdate()
280

    
281
    return result
282

    
283
  def _LockAndExecLU(self, lu, level, calc_timeout, priority):
284
    """Execute a Logical Unit, with the needed locks.
285

286
    This is a recursive function that starts locking the given level, and
287
    proceeds up, till there are no more locks to acquire. Then it executes the
288
    given LU and its opcodes.
289

290
    """
291
    adding_locks = level in lu.add_locks
292
    acquiring_locks = level in lu.needed_locks
293
    if level not in locking.LEVELS:
294
      if self._cbs:
295
        self._cbs.NotifyStart()
296

    
297
      result = self._ExecLU(lu)
298

    
299
    elif adding_locks and acquiring_locks:
300
      # We could both acquire and add locks at the same level, but for now we
301
      # don't need this, so we'll avoid the complicated code needed.
302
      raise NotImplementedError("Can't declare locks to acquire when adding"
303
                                " others")
304

    
305
    elif adding_locks or acquiring_locks:
306
      lu.DeclareLocks(level)
307
      share = lu.share_locks[level]
308

    
309
      try:
310
        assert adding_locks ^ acquiring_locks, \
311
          "Locks must be either added or acquired"
312

    
313
        if acquiring_locks:
314
          # Acquiring locks
315
          needed_locks = lu.needed_locks[level]
316

    
317
          acquired = self._AcquireLocks(level, needed_locks, share,
318
                                        calc_timeout(), priority)
319
        else:
320
          # Adding locks
321
          add_locks = lu.add_locks[level]
322
          lu.remove_locks[level] = add_locks
323

    
324
          try:
325
            self.context.glm.add(level, add_locks, acquired=1, shared=share)
326
          except errors.LockError:
327
            raise errors.OpPrereqError(
328
              "Couldn't add locks (%s), probably because of a race condition"
329
              " with another job, who added them first" % add_locks,
330
              errors.ECODE_FAULT)
331

    
332
          acquired = add_locks
333

    
334
        try:
335
          lu.acquired_locks[level] = acquired
336

    
337
          result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
338
        finally:
339
          if level in lu.remove_locks:
340
            self.context.glm.remove(level, lu.remove_locks[level])
341
      finally:
342
        if self.context.glm.is_owned(level):
343
          self.context.glm.release(level)
344

    
345
    else:
346
      result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
347

    
348
    return result
349

    
350
  def ExecOpCode(self, op, cbs, timeout=None, priority=None):
351
    """Execute an opcode.
352

353
    @type op: an OpCode instance
354
    @param op: the opcode to be executed
355
    @type cbs: L{OpExecCbBase}
356
    @param cbs: Runtime callbacks
357
    @type timeout: float or None
358
    @param timeout: Maximum time to acquire all locks, None for no timeout
359
    @type priority: number or None
360
    @param priority: Priority for acquiring lock(s)
361
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
362
        amount of time
363

364
    """
365
    if not isinstance(op, opcodes.OpCode):
366
      raise errors.ProgrammerError("Non-opcode instance passed"
367
                                   " to ExecOpcode")
368

    
369
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
370
    if lu_class is None:
371
      raise errors.OpCodeUnknown("Unknown opcode")
372

    
373
    if timeout is None:
374
      calc_timeout = lambda: None
375
    else:
376
      calc_timeout = utils.RunningTimeout(timeout, False).Remaining
377

    
378
    self._cbs = cbs
379
    try:
380
      # Acquire the Big Ganeti Lock exclusively if this LU requires it,
381
      # and in a shared fashion otherwise (to prevent concurrent run with
382
      # an exclusive LU.
383
      self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
384
                          not lu_class.REQ_BGL, calc_timeout(),
385
                          priority)
386
      try:
387
        lu = lu_class(self, op, self.context, self.rpc)
388
        lu.ExpandNames()
389
        assert lu.needed_locks is not None, "needed_locks not set by LU"
390

    
391
        try:
392
          return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
393
                                     priority)
394
        finally:
395
          if self._ec_id:
396
            self.context.cfg.DropECReservations(self._ec_id)
397
      finally:
398
        self.context.glm.release(locking.LEVEL_CLUSTER)
399
    finally:
400
      self._cbs = None
401

    
402
  def Log(self, *args):
403
    """Forward call to feedback callback function.
404

405
    """
406
    if self._cbs:
407
      self._cbs.Feedback(*args)
408

    
409
  def LogStep(self, current, total, message):
410
    """Log a change in LU execution progress.
411

412
    """
413
    logging.debug("Step %d/%d %s", current, total, message)
414
    self.Log("STEP %d/%d %s" % (current, total, message))
415

    
416
  def LogWarning(self, message, *args, **kwargs):
417
    """Log a warning to the logs and the user.
418

419
    The optional keyword argument is 'hint' and can be used to show a
420
    hint to the user (presumably related to the warning). If the
421
    message is empty, it will not be printed at all, allowing one to
422
    show only a hint.
423

424
    """
425
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
426
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
427
    if args:
428
      message = message % tuple(args)
429
    if message:
430
      logging.warning(message)
431
      self.Log(" - WARNING: %s" % message)
432
    if "hint" in kwargs:
433
      self.Log("      Hint: %s" % kwargs["hint"])
434

    
435
  def LogInfo(self, message, *args):
436
    """Log an informational message to the logs and the user.
437

438
    """
439
    if args:
440
      message = message % tuple(args)
441
    logging.info(message)
442
    self.Log(" - INFO: %s" % message)
443

    
444
  def GetECId(self):
445
    if not self._ec_id:
446
      errors.ProgrammerError("Tried to use execution context id when not set")
447
    return self._ec_id
448

    
449

    
450
class HooksMaster(object):
451
  """Hooks master.
452

453
  This class distributes the run commands to the nodes based on the
454
  specific LU class.
455

456
  In order to remove the direct dependency on the rpc module, the
457
  constructor needs a function which actually does the remote
458
  call. This will usually be rpc.call_hooks_runner, but any function
459
  which behaves the same works.
460

461
  """
462
  def __init__(self, callfn, lu):
463
    self.callfn = callfn
464
    self.lu = lu
465
    self.op = lu.op
466
    self.env, node_list_pre, node_list_post = self._BuildEnv()
467
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
468
                      constants.HOOKS_PHASE_POST: node_list_post}
469

    
470
  def _BuildEnv(self):
471
    """Compute the environment and the target nodes.
472

473
    Based on the opcode and the current node list, this builds the
474
    environment for the hooks and the target node list for the run.
475

476
    """
477
    env = {
478
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
479
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
480
      "GANETI_OP_CODE": self.op.OP_ID,
481
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
482
      "GANETI_DATA_DIR": constants.DATA_DIR,
483
      }
484

    
485
    if self.lu.HPATH is not None:
486
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
487
      if lu_env:
488
        for key in lu_env:
489
          env["GANETI_" + key] = lu_env[key]
490
    else:
491
      lu_nodes_pre = lu_nodes_post = []
492

    
493
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
494

    
495
  def _RunWrapper(self, node_list, hpath, phase):
496
    """Simple wrapper over self.callfn.
497

498
    This method fixes the environment before doing the rpc call.
499

500
    """
501
    env = self.env.copy()
502
    env["GANETI_HOOKS_PHASE"] = phase
503
    env["GANETI_HOOKS_PATH"] = hpath
504
    if self.lu.cfg is not None:
505
      env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
506
      env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
507

    
508
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
509

    
510
    return self.callfn(node_list, hpath, phase, env)
511

    
512
  def RunPhase(self, phase, nodes=None):
513
    """Run all the scripts for a phase.
514

515
    This is the main function of the HookMaster.
516

517
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
518
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
519
    @param nodes: overrides the predefined list of nodes for the given phase
520
    @return: the processed results of the hooks multi-node rpc call
521
    @raise errors.HooksFailure: on communication failure to the nodes
522
    @raise errors.HooksAbort: on failure of one of the hooks
523

524
    """
525
    if not self.node_list[phase] and not nodes:
526
      # empty node list, we should not attempt to run this as either
527
      # we're in the cluster init phase and the rpc client part can't
528
      # even attempt to run, or this LU doesn't do hooks at all
529
      return
530
    hpath = self.lu.HPATH
531
    if nodes is not None:
532
      results = self._RunWrapper(nodes, hpath, phase)
533
    else:
534
      results = self._RunWrapper(self.node_list[phase], hpath, phase)
535
    errs = []
536
    if not results:
537
      msg = "Communication Failure"
538
      if phase == constants.HOOKS_PHASE_PRE:
539
        raise errors.HooksFailure(msg)
540
      else:
541
        self.lu.LogWarning(msg)
542
        return results
543
    for node_name in results:
544
      res = results[node_name]
545
      if res.offline:
546
        continue
547
      msg = res.fail_msg
548
      if msg:
549
        self.lu.LogWarning("Communication failure to node %s: %s",
550
                           node_name, msg)
551
        continue
552
      for script, hkr, output in res.payload:
553
        if hkr == constants.HKR_FAIL:
554
          if phase == constants.HOOKS_PHASE_PRE:
555
            errs.append((node_name, script, output))
556
          else:
557
            if not output:
558
              output = "(no output)"
559
            self.lu.LogWarning("On %s script %s failed, output: %s" %
560
                               (node_name, script, output))
561
    if errs and phase == constants.HOOKS_PHASE_PRE:
562
      raise errors.HooksAbort(errs)
563
    return results
564

    
565
  def RunConfigUpdate(self):
566
    """Run the special configuration update hook
567

568
    This is a special hook that runs only on the master after each
569
    top-level LI if the configuration has been updated.
570

571
    """
572
    phase = constants.HOOKS_PHASE_POST
573
    hpath = constants.HOOKS_NAME_CFGUPDATE
574
    nodes = [self.lu.cfg.GetMasterNode()]
575
    self._RunWrapper(nodes, hpath, phase)