Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ b2f29800

History | View | Annotate | Download (18.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import logging
32
import random
33
import time
34

    
35
from ganeti import opcodes
36
from ganeti import constants
37
from ganeti import errors
38
from ganeti import rpc
39
from ganeti import cmdlib
40
from ganeti import locking
41
from ganeti import utils
42

    
43

    
44
class LockAcquireTimeout(Exception):
45
  """Exception to report timeouts on acquiring locks.
46

47
  """
48

    
49

    
50
def _CalculateLockAttemptTimeouts():
51
  """Calculate timeouts for lock attempts.
52

53
  """
54
  result = [1.0]
55

    
56
  # Wait for a total of at least 150s before doing a blocking acquire
57
  while sum(result) < 150.0:
58
    timeout = (result[-1] * 1.05) ** 1.25
59

    
60
    # Cap timeout at 10 seconds. This gives other jobs a chance to run
61
    # even if we're still trying to get our locks, before finally moving
62
    # to a blocking acquire.
63
    if timeout > 10.0:
64
      timeout = 10.0
65

    
66
    elif timeout < 0.1:
67
      # Lower boundary for safety
68
      timeout = 0.1
69

    
70
    result.append(timeout)
71

    
72
  return result
73

    
74

    
75
class LockAttemptTimeoutStrategy(object):
76
  """Class with lock acquire timeout strategy.
77

78
  """
79
  __slots__ = [
80
    "_timeouts",
81
    "_random_fn",
82
    "_time_fn",
83
    ]
84

    
85
  _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
86

    
87
  def __init__(self, _time_fn=time.time, _random_fn=random.random):
88
    """Initializes this class.
89

90
    @param _time_fn: Time function for unittests
91
    @param _random_fn: Random number generator for unittests
92

93
    """
94
    object.__init__(self)
95

    
96
    self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
97
    self._time_fn = _time_fn
98
    self._random_fn = _random_fn
99

    
100
  def NextAttempt(self):
101
    """Returns the timeout for the next attempt.
102

103
    """
104
    try:
105
      timeout = self._timeouts.next()
106
    except StopIteration:
107
      # No more timeouts, do blocking acquire
108
      timeout = None
109

    
110
    if timeout is not None:
111
      # Add a small variation (-/+ 5%) to timeout. This helps in situations
112
      # where two or more jobs are fighting for the same lock(s).
113
      variation_range = timeout * 0.1
114
      timeout += ((self._random_fn() * variation_range) -
115
                  (variation_range * 0.5))
116

    
117
    return timeout
118

    
119

    
120
class OpExecCbBase: # pylint: disable-msg=W0232
121
  """Base class for OpCode execution callbacks.
122

123
  """
124
  def NotifyStart(self):
125
    """Called when we are about to execute the LU.
126

127
    This function is called when we're about to start the lu's Exec() method,
128
    that is, after we have acquired all locks.
129

130
    """
131

    
132
  def Feedback(self, *args):
133
    """Sends feedback from the LU code to the end-user.
134

135
    """
136

    
137
  def CheckCancel(self):
138
    """Check whether job has been cancelled.
139

140
    """
141

    
142

    
143
class Processor(object):
144
  """Object which runs OpCodes"""
145
  DISPATCH_TABLE = {
146
    # Cluster
147
    opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
148
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
149
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
150
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
151
    opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
152
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
153
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
154
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
155
    opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
156
    opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
157
    opcodes.OpQuery: cmdlib.LUQuery,
158
    opcodes.OpQueryFields: cmdlib.LUQueryFields,
159
    # node lu
160
    opcodes.OpAddNode: cmdlib.LUAddNode,
161
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
162
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
163
    opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
164
    opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
165
    opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
166
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
167
    opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
168
    opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
169
    opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
170
    opcodes.OpNodeEvacuationStrategy: cmdlib.LUNodeEvacuationStrategy,
171
    # instance lu
172
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
173
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
174
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
175
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
176
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
177
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
178
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
179
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
180
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
181
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
182
    opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
183
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
184
    opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
185
    opcodes.OpMoveInstance: cmdlib.LUMoveInstance,
186
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
187
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
188
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
189
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
190
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
191
    # os lu
192
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
193
    # exports lu
194
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
195
    opcodes.OpPrepareExport: cmdlib.LUPrepareExport,
196
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
197
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
198
    # tags lu
199
    opcodes.OpGetTags: cmdlib.LUGetTags,
200
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
201
    opcodes.OpAddTags: cmdlib.LUAddTags,
202
    opcodes.OpDelTags: cmdlib.LUDelTags,
203
    # test lu
204
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
205
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
206
    opcodes.OpTestJobqueue: cmdlib.LUTestJobqueue,
207
    }
208

    
209
  def __init__(self, context, ec_id):
210
    """Constructor for Processor
211

212
    @type context: GanetiContext
213
    @param context: global Ganeti context
214
    @type ec_id: string
215
    @param ec_id: execution context identifier
216

217
    """
218
    self.context = context
219
    self._ec_id = ec_id
220
    self._cbs = None
221
    self.rpc = rpc.RpcRunner(context.cfg)
222
    self.hmclass = HooksMaster
223

    
224
  def _AcquireLocks(self, level, names, shared, timeout, priority):
225
    """Acquires locks via the Ganeti lock manager.
226

227
    @type level: int
228
    @param level: Lock level
229
    @type names: list or string
230
    @param names: Lock names
231
    @type shared: bool
232
    @param shared: Whether the locks should be acquired in shared mode
233
    @type timeout: None or float
234
    @param timeout: Timeout for acquiring the locks
235
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
236
        amount of time
237

238
    """
239
    if self._cbs:
240
      self._cbs.CheckCancel()
241

    
242
    acquired = self.context.glm.acquire(level, names, shared=shared,
243
                                        timeout=timeout, priority=priority)
244

    
245
    if acquired is None:
246
      raise LockAcquireTimeout()
247

    
248
    return acquired
249

    
250
  def _ExecLU(self, lu):
251
    """Logical Unit execution sequence.
252

253
    """
254
    write_count = self.context.cfg.write_count
255
    lu.CheckPrereq()
256
    hm = HooksMaster(self.rpc.call_hooks_runner, lu)
257
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
258
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
259
                     self.Log, None)
260

    
261
    if getattr(lu.op, "dry_run", False):
262
      # in this mode, no post-hooks are run, and the config is not
263
      # written (as it might have been modified by another LU, and we
264
      # shouldn't do writeout on behalf of other threads
265
      self.LogInfo("dry-run mode requested, not actually executing"
266
                   " the operation")
267
      return lu.dry_run_result
268

    
269
    try:
270
      result = lu.Exec(self.Log)
271
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
272
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
273
                                self.Log, result)
274
    finally:
275
      # FIXME: This needs locks if not lu_class.REQ_BGL
276
      if write_count != self.context.cfg.write_count:
277
        hm.RunConfigUpdate()
278

    
279
    return result
280

    
281
  def _LockAndExecLU(self, lu, level, calc_timeout, priority):
282
    """Execute a Logical Unit, with the needed locks.
283

284
    This is a recursive function that starts locking the given level, and
285
    proceeds up, till there are no more locks to acquire. Then it executes the
286
    given LU and its opcodes.
287

288
    """
289
    adding_locks = level in lu.add_locks
290
    acquiring_locks = level in lu.needed_locks
291
    if level not in locking.LEVELS:
292
      if self._cbs:
293
        self._cbs.NotifyStart()
294

    
295
      result = self._ExecLU(lu)
296

    
297
    elif adding_locks and acquiring_locks:
298
      # We could both acquire and add locks at the same level, but for now we
299
      # don't need this, so we'll avoid the complicated code needed.
300
      raise NotImplementedError("Can't declare locks to acquire when adding"
301
                                " others")
302

    
303
    elif adding_locks or acquiring_locks:
304
      lu.DeclareLocks(level)
305
      share = lu.share_locks[level]
306

    
307
      try:
308
        assert adding_locks ^ acquiring_locks, \
309
          "Locks must be either added or acquired"
310

    
311
        if acquiring_locks:
312
          # Acquiring locks
313
          needed_locks = lu.needed_locks[level]
314

    
315
          acquired = self._AcquireLocks(level, needed_locks, share,
316
                                        calc_timeout(), priority)
317
        else:
318
          # Adding locks
319
          add_locks = lu.add_locks[level]
320
          lu.remove_locks[level] = add_locks
321

    
322
          try:
323
            self.context.glm.add(level, add_locks, acquired=1, shared=share)
324
          except errors.LockError:
325
            raise errors.OpPrereqError(
326
              "Couldn't add locks (%s), probably because of a race condition"
327
              " with another job, who added them first" % add_locks,
328
              errors.ECODE_FAULT)
329

    
330
          acquired = add_locks
331

    
332
        try:
333
          lu.acquired_locks[level] = acquired
334

    
335
          result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
336
        finally:
337
          if level in lu.remove_locks:
338
            self.context.glm.remove(level, lu.remove_locks[level])
339
      finally:
340
        if self.context.glm.is_owned(level):
341
          self.context.glm.release(level)
342

    
343
    else:
344
      result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
345

    
346
    return result
347

    
348
  def ExecOpCode(self, op, cbs, timeout=None, priority=None):
349
    """Execute an opcode.
350

351
    @type op: an OpCode instance
352
    @param op: the opcode to be executed
353
    @type cbs: L{OpExecCbBase}
354
    @param cbs: Runtime callbacks
355
    @type timeout: float or None
356
    @param timeout: Maximum time to acquire all locks, None for no timeout
357
    @type priority: number or None
358
    @param priority: Priority for acquiring lock(s)
359
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
360
        amount of time
361

362
    """
363
    if not isinstance(op, opcodes.OpCode):
364
      raise errors.ProgrammerError("Non-opcode instance passed"
365
                                   " to ExecOpcode")
366

    
367
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
368
    if lu_class is None:
369
      raise errors.OpCodeUnknown("Unknown opcode")
370

    
371
    if timeout is None:
372
      calc_timeout = lambda: None
373
    else:
374
      calc_timeout = utils.RunningTimeout(timeout, False).Remaining
375

    
376
    self._cbs = cbs
377
    try:
378
      # Acquire the Big Ganeti Lock exclusively if this LU requires it,
379
      # and in a shared fashion otherwise (to prevent concurrent run with
380
      # an exclusive LU.
381
      self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
382
                          not lu_class.REQ_BGL, calc_timeout(),
383
                          priority)
384
      try:
385
        lu = lu_class(self, op, self.context, self.rpc)
386
        lu.ExpandNames()
387
        assert lu.needed_locks is not None, "needed_locks not set by LU"
388

    
389
        try:
390
          return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
391
                                     priority)
392
        finally:
393
          if self._ec_id:
394
            self.context.cfg.DropECReservations(self._ec_id)
395
      finally:
396
        self.context.glm.release(locking.LEVEL_CLUSTER)
397
    finally:
398
      self._cbs = None
399

    
400
  def Log(self, *args):
401
    """Forward call to feedback callback function.
402

403
    """
404
    if self._cbs:
405
      self._cbs.Feedback(*args)
406

    
407
  def LogStep(self, current, total, message):
408
    """Log a change in LU execution progress.
409

410
    """
411
    logging.debug("Step %d/%d %s", current, total, message)
412
    self.Log("STEP %d/%d %s" % (current, total, message))
413

    
414
  def LogWarning(self, message, *args, **kwargs):
415
    """Log a warning to the logs and the user.
416

417
    The optional keyword argument is 'hint' and can be used to show a
418
    hint to the user (presumably related to the warning). If the
419
    message is empty, it will not be printed at all, allowing one to
420
    show only a hint.
421

422
    """
423
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
424
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
425
    if args:
426
      message = message % tuple(args)
427
    if message:
428
      logging.warning(message)
429
      self.Log(" - WARNING: %s" % message)
430
    if "hint" in kwargs:
431
      self.Log("      Hint: %s" % kwargs["hint"])
432

    
433
  def LogInfo(self, message, *args):
434
    """Log an informational message to the logs and the user.
435

436
    """
437
    if args:
438
      message = message % tuple(args)
439
    logging.info(message)
440
    self.Log(" - INFO: %s" % message)
441

    
442
  def GetECId(self):
443
    if not self._ec_id:
444
      errors.ProgrammerError("Tried to use execution context id when not set")
445
    return self._ec_id
446

    
447

    
448
class HooksMaster(object):
449
  """Hooks master.
450

451
  This class distributes the run commands to the nodes based on the
452
  specific LU class.
453

454
  In order to remove the direct dependency on the rpc module, the
455
  constructor needs a function which actually does the remote
456
  call. This will usually be rpc.call_hooks_runner, but any function
457
  which behaves the same works.
458

459
  """
460
  def __init__(self, callfn, lu):
461
    self.callfn = callfn
462
    self.lu = lu
463
    self.op = lu.op
464
    self.env, node_list_pre, node_list_post = self._BuildEnv()
465
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
466
                      constants.HOOKS_PHASE_POST: node_list_post}
467

    
468
  def _BuildEnv(self):
469
    """Compute the environment and the target nodes.
470

471
    Based on the opcode and the current node list, this builds the
472
    environment for the hooks and the target node list for the run.
473

474
    """
475
    env = {
476
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
477
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
478
      "GANETI_OP_CODE": self.op.OP_ID,
479
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
480
      "GANETI_DATA_DIR": constants.DATA_DIR,
481
      }
482

    
483
    if self.lu.HPATH is not None:
484
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
485
      if lu_env:
486
        for key in lu_env:
487
          env["GANETI_" + key] = lu_env[key]
488
    else:
489
      lu_nodes_pre = lu_nodes_post = []
490

    
491
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
492

    
493
  def _RunWrapper(self, node_list, hpath, phase):
494
    """Simple wrapper over self.callfn.
495

496
    This method fixes the environment before doing the rpc call.
497

498
    """
499
    env = self.env.copy()
500
    env["GANETI_HOOKS_PHASE"] = phase
501
    env["GANETI_HOOKS_PATH"] = hpath
502
    if self.lu.cfg is not None:
503
      env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
504
      env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
505

    
506
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
507

    
508
    return self.callfn(node_list, hpath, phase, env)
509

    
510
  def RunPhase(self, phase, nodes=None):
511
    """Run all the scripts for a phase.
512

513
    This is the main function of the HookMaster.
514

515
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
516
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
517
    @param nodes: overrides the predefined list of nodes for the given phase
518
    @return: the processed results of the hooks multi-node rpc call
519
    @raise errors.HooksFailure: on communication failure to the nodes
520
    @raise errors.HooksAbort: on failure of one of the hooks
521

522
    """
523
    if not self.node_list[phase] and not nodes:
524
      # empty node list, we should not attempt to run this as either
525
      # we're in the cluster init phase and the rpc client part can't
526
      # even attempt to run, or this LU doesn't do hooks at all
527
      return
528
    hpath = self.lu.HPATH
529
    if nodes is not None:
530
      results = self._RunWrapper(nodes, hpath, phase)
531
    else:
532
      results = self._RunWrapper(self.node_list[phase], hpath, phase)
533
    errs = []
534
    if not results:
535
      msg = "Communication Failure"
536
      if phase == constants.HOOKS_PHASE_PRE:
537
        raise errors.HooksFailure(msg)
538
      else:
539
        self.lu.LogWarning(msg)
540
        return results
541
    for node_name in results:
542
      res = results[node_name]
543
      if res.offline:
544
        continue
545
      msg = res.fail_msg
546
      if msg:
547
        self.lu.LogWarning("Communication failure to node %s: %s",
548
                           node_name, msg)
549
        continue
550
      for script, hkr, output in res.payload:
551
        if hkr == constants.HKR_FAIL:
552
          if phase == constants.HOOKS_PHASE_PRE:
553
            errs.append((node_name, script, output))
554
          else:
555
            if not output:
556
              output = "(no output)"
557
            self.lu.LogWarning("On %s script %s failed, output: %s" %
558
                               (node_name, script, output))
559
    if errs and phase == constants.HOOKS_PHASE_PRE:
560
      raise errors.HooksAbort(errs)
561
    return results
562

    
563
  def RunConfigUpdate(self):
564
    """Run the special configuration update hook
565

566
    This is a special hook that runs only on the master after each
567
    top-level LI if the configuration has been updated.
568

569
    """
570
    phase = constants.HOOKS_PHASE_POST
571
    hpath = constants.HOOKS_NAME_CFGUPDATE
572
    nodes = [self.lu.cfg.GetMasterNode()]
573
    self._RunWrapper(nodes, hpath, phase)