Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 6b95b76d

History | View | Annotate | Download (15.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import logging
32

    
33
from ganeti import opcodes
34
from ganeti import constants
35
from ganeti import errors
36
from ganeti import rpc
37
from ganeti import cmdlib
38
from ganeti import locking
39
from ganeti import utils
40

    
41

    
42
class OpExecCbBase:
43
  """Base class for OpCode execution callbacks.
44

45
  """
46
  def NotifyStart(self):
47
    """Called when we are about to execute the LU.
48

49
    This function is called when we're about to start the lu's Exec() method,
50
    that is, after we have acquired all locks.
51

52
    """
53

    
54
  def Feedback(self, *args):
55
    """Sends feedback from the LU code to the end-user.
56

57
    """
58

    
59
  def ReportLocks(self, msg):
60
    """Report lock operations.
61

62
    """
63

    
64

    
65
class Processor(object):
66
  """Object which runs OpCodes"""
67
  DISPATCH_TABLE = {
68
    # Cluster
69
    opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
70
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
71
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
72
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
73
    opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
74
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
75
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
76
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
77
    opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
78
    opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
79
    # node lu
80
    opcodes.OpAddNode: cmdlib.LUAddNode,
81
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
82
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
83
    opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
84
    opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
85
    opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
86
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
87
    opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
88
    opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
89
    opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
90
    opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
91
    # instance lu
92
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
93
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
94
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
95
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
96
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
97
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
98
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
99
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
100
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
101
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
102
    opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
103
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
104
    opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
105
    opcodes.OpMoveInstance: cmdlib.LUMoveInstance,
106
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
107
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
108
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
109
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
110
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
111
    # os lu
112
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
113
    # exports lu
114
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
115
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
116
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
117
    # tags lu
118
    opcodes.OpGetTags: cmdlib.LUGetTags,
119
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
120
    opcodes.OpAddTags: cmdlib.LUAddTags,
121
    opcodes.OpDelTags: cmdlib.LUDelTags,
122
    # test lu
123
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
124
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
125
    }
126

    
127
  def __init__(self, context):
128
    """Constructor for Processor
129

130
    """
131
    self.context = context
132
    self._cbs = None
133
    self.rpc = rpc.RpcRunner(context.cfg)
134
    self.hmclass = HooksMaster
135

    
136
  def _ReportLocks(self, level, names, shared, acquired):
137
    """Reports lock operations.
138

139
    @type level: int
140
    @param level: Lock level
141
    @type names: list or string
142
    @param names: Lock names
143
    @type shared: bool
144
    @param shared: Whether the lock should be acquired in shared mode
145
    @type acquired: bool
146
    @param acquired: Whether the lock has already been acquired
147

148
    """
149
    parts = []
150

    
151
    # Build message
152
    if acquired:
153
      parts.append("acquired")
154
    else:
155
      parts.append("waiting")
156

    
157
    parts.append(locking.LEVEL_NAMES[level])
158

    
159
    if names == locking.ALL_SET:
160
      parts.append("ALL")
161
    elif isinstance(names, basestring):
162
      parts.append(names)
163
    else:
164
      parts.append(",".join(names))
165

    
166
    if shared:
167
      parts.append("shared")
168
    else:
169
      parts.append("exclusive")
170

    
171
    msg = "/".join(parts)
172

    
173
    logging.debug("LU locks %s", msg)
174

    
175
    if self._cbs:
176
      self._cbs.ReportLocks(msg)
177

    
178
  def _ExecLU(self, lu):
179
    """Logical Unit execution sequence.
180

181
    """
182
    write_count = self.context.cfg.write_count
183
    lu.CheckPrereq()
184
    hm = HooksMaster(self.rpc.call_hooks_runner, lu)
185
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
186
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
187
                     self._Feedback, None)
188

    
189
    if getattr(lu.op, "dry_run", False):
190
      # in this mode, no post-hooks are run, and the config is not
191
      # written (as it might have been modified by another LU, and we
192
      # shouldn't do writeout on behalf of other threads
193
      self.LogInfo("dry-run mode requested, not actually executing"
194
                   " the operation")
195
      return lu.dry_run_result
196

    
197
    try:
198
      result = lu.Exec(self._Feedback)
199
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
200
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
201
                                self._Feedback, result)
202
    finally:
203
      # FIXME: This needs locks if not lu_class.REQ_BGL
204
      if write_count != self.context.cfg.write_count:
205
        hm.RunConfigUpdate()
206

    
207
    return result
208

    
209
  def _LockAndExecLU(self, lu, level):
210
    """Execute a Logical Unit, with the needed locks.
211

212
    This is a recursive function that starts locking the given level, and
213
    proceeds up, till there are no more locks to acquire. Then it executes the
214
    given LU and its opcodes.
215

216
    """
217
    adding_locks = level in lu.add_locks
218
    acquiring_locks = level in lu.needed_locks
219
    if level not in locking.LEVELS:
220
      if self._cbs:
221
        self._cbs.NotifyStart()
222

    
223
      result = self._ExecLU(lu)
224
    elif adding_locks and acquiring_locks:
225
      # We could both acquire and add locks at the same level, but for now we
226
      # don't need this, so we'll avoid the complicated code needed.
227
      raise NotImplementedError(
228
        "Can't declare locks to acquire when adding others")
229
    elif adding_locks or acquiring_locks:
230
      lu.DeclareLocks(level)
231
      share = lu.share_locks[level]
232
      if acquiring_locks:
233
        needed_locks = lu.needed_locks[level]
234

    
235
        self._ReportLocks(level, needed_locks, share, False)
236
        lu.acquired_locks[level] = self.context.glm.acquire(level,
237
                                                            needed_locks,
238
                                                            shared=share)
239
        self._ReportLocks(level, needed_locks, share, True)
240

    
241
      else: # adding_locks
242
        add_locks = lu.add_locks[level]
243
        lu.remove_locks[level] = add_locks
244
        try:
245
          self.context.glm.add(level, add_locks, acquired=1, shared=share)
246
        except errors.LockError:
247
          raise errors.OpPrereqError(
248
            "Couldn't add locks (%s), probably because of a race condition"
249
            " with another job, who added them first" % add_locks)
250
      try:
251
        try:
252
          if adding_locks:
253
            lu.acquired_locks[level] = add_locks
254
          result = self._LockAndExecLU(lu, level + 1)
255
        finally:
256
          if level in lu.remove_locks:
257
            self.context.glm.remove(level, lu.remove_locks[level])
258
      finally:
259
        if self.context.glm.is_owned(level):
260
          self.context.glm.release(level)
261
    else:
262
      result = self._LockAndExecLU(lu, level + 1)
263

    
264
    return result
265

    
266
  def ExecOpCode(self, op, cbs):
267
    """Execute an opcode.
268

269
    @type op: an OpCode instance
270
    @param op: the opcode to be executed
271
    @type cbs: L{OpExecCbBase}
272
    @param cbs: Runtime callbacks
273

274
    """
275
    if not isinstance(op, opcodes.OpCode):
276
      raise errors.ProgrammerError("Non-opcode instance passed"
277
                                   " to ExecOpcode")
278

    
279
    self._cbs = cbs
280
    try:
281
      lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
282
      if lu_class is None:
283
        raise errors.OpCodeUnknown("Unknown opcode")
284

    
285
      # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
286
      # shared fashion otherwise (to prevent concurrent run with an exclusive
287
      # LU.
288
      self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
289
                        not lu_class.REQ_BGL, False)
290
      try:
291
        self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
292
                                 shared=not lu_class.REQ_BGL)
293
      finally:
294
        self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
295
                          not lu_class.REQ_BGL, True)
296
      try:
297
        lu = lu_class(self, op, self.context, self.rpc)
298
        lu.ExpandNames()
299
        assert lu.needed_locks is not None, "needed_locks not set by LU"
300
        result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
301
      finally:
302
        self.context.glm.release(locking.LEVEL_CLUSTER)
303
    finally:
304
      self._cbs = None
305

    
306
    return result
307

    
308
  def _Feedback(self, *args):
309
    """Forward call to feedback callback function.
310

311
    """
312
    if self._cbs:
313
      self._cbs.Feedback(*args)
314

    
315
  def LogStep(self, current, total, message):
316
    """Log a change in LU execution progress.
317

318
    """
319
    logging.debug("Step %d/%d %s", current, total, message)
320
    self._Feedback("STEP %d/%d %s" % (current, total, message))
321

    
322
  def LogWarning(self, message, *args, **kwargs):
323
    """Log a warning to the logs and the user.
324

325
    The optional keyword argument is 'hint' and can be used to show a
326
    hint to the user (presumably related to the warning). If the
327
    message is empty, it will not be printed at all, allowing one to
328
    show only a hint.
329

330
    """
331
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
332
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
333
    if args:
334
      message = message % tuple(args)
335
    if message:
336
      logging.warning(message)
337
      self._Feedback(" - WARNING: %s" % message)
338
    if "hint" in kwargs:
339
      self._Feedback("      Hint: %s" % kwargs["hint"])
340

    
341
  def LogInfo(self, message, *args):
342
    """Log an informational message to the logs and the user.
343

344
    """
345
    if args:
346
      message = message % tuple(args)
347
    logging.info(message)
348
    self._Feedback(" - INFO: %s" % message)
349

    
350

    
351
class HooksMaster(object):
352
  """Hooks master.
353

354
  This class distributes the run commands to the nodes based on the
355
  specific LU class.
356

357
  In order to remove the direct dependency on the rpc module, the
358
  constructor needs a function which actually does the remote
359
  call. This will usually be rpc.call_hooks_runner, but any function
360
  which behaves the same works.
361

362
  """
363
  def __init__(self, callfn, lu):
364
    self.callfn = callfn
365
    self.lu = lu
366
    self.op = lu.op
367
    self.env, node_list_pre, node_list_post = self._BuildEnv()
368
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
369
                      constants.HOOKS_PHASE_POST: node_list_post}
370

    
371
  def _BuildEnv(self):
372
    """Compute the environment and the target nodes.
373

374
    Based on the opcode and the current node list, this builds the
375
    environment for the hooks and the target node list for the run.
376

377
    """
378
    env = {
379
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
380
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
381
      "GANETI_OP_CODE": self.op.OP_ID,
382
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
383
      "GANETI_DATA_DIR": constants.DATA_DIR,
384
      }
385

    
386
    if self.lu.HPATH is not None:
387
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
388
      if lu_env:
389
        for key in lu_env:
390
          env["GANETI_" + key] = lu_env[key]
391
    else:
392
      lu_nodes_pre = lu_nodes_post = []
393

    
394
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
395

    
396
  def _RunWrapper(self, node_list, hpath, phase):
397
    """Simple wrapper over self.callfn.
398

399
    This method fixes the environment before doing the rpc call.
400

401
    """
402
    env = self.env.copy()
403
    env["GANETI_HOOKS_PHASE"] = phase
404
    env["GANETI_HOOKS_PATH"] = hpath
405
    if self.lu.cfg is not None:
406
      env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
407
      env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
408

    
409
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
410

    
411
    return self.callfn(node_list, hpath, phase, env)
412

    
413
  def RunPhase(self, phase, nodes=None):
414
    """Run all the scripts for a phase.
415

416
    This is the main function of the HookMaster.
417

418
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
419
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
420
    @param nodes: overrides the predefined list of nodes for the given phase
421
    @return: the processed results of the hooks multi-node rpc call
422
    @raise errors.HooksFailure: on communication failure to the nodes
423
    @raise errors.HooksAbort: on failure of one of the hooks
424

425
    """
426
    if not self.node_list[phase] and not nodes:
427
      # empty node list, we should not attempt to run this as either
428
      # we're in the cluster init phase and the rpc client part can't
429
      # even attempt to run, or this LU doesn't do hooks at all
430
      return
431
    hpath = self.lu.HPATH
432
    if nodes is not None:
433
      results = self._RunWrapper(nodes, hpath, phase)
434
    else:
435
      results = self._RunWrapper(self.node_list[phase], hpath, phase)
436
    errs = []
437
    if not results:
438
      msg = "Communication Failure"
439
      if phase == constants.HOOKS_PHASE_PRE:
440
        raise errors.HooksFailure(msg)
441
      else:
442
        self.lu.LogWarning(msg)
443
        return results
444
    for node_name in results:
445
      res = results[node_name]
446
      if res.offline:
447
        continue
448
      msg = res.fail_msg
449
      if msg:
450
        self.lu.LogWarning("Communication failure to node %s: %s",
451
                           node_name, msg)
452
        continue
453
      for script, hkr, output in res.payload:
454
        if hkr == constants.HKR_FAIL:
455
          if phase == constants.HOOKS_PHASE_PRE:
456
            errs.append((node_name, script, output))
457
          else:
458
            if not output:
459
              output = "(no output)"
460
            self.lu.LogWarning("On %s script %s failed, output: %s" %
461
                               (node_name, script, output))
462
    if errs and phase == constants.HOOKS_PHASE_PRE:
463
      raise errors.HooksAbort(errs)
464
    return results
465

    
466
  def RunConfigUpdate(self):
467
    """Run the special configuration update hook
468

469
    This is a special hook that runs only on the master after each
470
    top-level LI if the configuration has been updated.
471

472
    """
473
    phase = constants.HOOKS_PHASE_POST
474
    hpath = constants.HOOKS_NAME_CFGUPDATE
475
    nodes = [self.lu.cfg.GetMasterNode()]
476
    self._RunWrapper(nodes, hpath, phase)