Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ ef2df7d3

History | View | Annotate | Download (15.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import logging
32

    
33
from ganeti import opcodes
34
from ganeti import constants
35
from ganeti import errors
36
from ganeti import rpc
37
from ganeti import cmdlib
38
from ganeti import locking
39
from ganeti import utils
40

    
41

    
42
class OpExecCbBase:
43
  """Base class for OpCode execution callbacks.
44

45
  """
46
  def NotifyStart(self):
47
    """Called when we are about to execute the LU.
48

49
    This function is called when we're about to start the lu's Exec() method,
50
    that is, after we have acquired all locks.
51

52
    """
53

    
54
  def Feedback(self, *args):
55
    """Sends feedback from the LU code to the end-user.
56

57
    """
58

    
59
  def ReportLocks(self, msg):
60
    """Report lock operations.
61

62
    """
63

    
64

    
65
class Processor(object):
66
  """Object which runs OpCodes"""
67
  DISPATCH_TABLE = {
68
    # Cluster
69
    opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
70
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
71
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
72
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
73
    opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
74
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
75
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
76
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
77
    opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
78
    opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
79
    # node lu
80
    opcodes.OpAddNode: cmdlib.LUAddNode,
81
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
82
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
83
    opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
84
    opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
85
    opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
86
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
87
    opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
88
    opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
89
    opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
90
    opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
91
    # instance lu
92
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
93
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
94
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
95
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
96
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
97
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
98
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
99
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
100
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
101
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
102
    opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
103
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
104
    opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
105
    opcodes.OpMoveInstance: cmdlib.LUMoveInstance,
106
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
107
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
108
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
109
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
110
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
111
    # os lu
112
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
113
    # exports lu
114
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
115
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
116
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
117
    # tags lu
118
    opcodes.OpGetTags: cmdlib.LUGetTags,
119
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
120
    opcodes.OpAddTags: cmdlib.LUAddTags,
121
    opcodes.OpDelTags: cmdlib.LUDelTags,
122
    # test lu
123
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
124
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
125
    }
126

    
127
  def __init__(self, context):
128
    """Constructor for Processor
129

130
    """
131
    self.context = context
132
    self._cbs = None
133
    self.exclusive_BGL = False
134
    self.rpc = rpc.RpcRunner(context.cfg)
135
    self.hmclass = HooksMaster
136

    
137
  def _ReportLocks(self, level, names, shared, acquired):
138
    """Reports lock operations.
139

140
    @type level: int
141
    @param level: Lock level
142
    @type names: list or string
143
    @param names: Lock names
144
    @type shared: bool
145
    @param shared: Whether the lock should be acquired in shared mode
146
    @type acquired: bool
147
    @param acquired: Whether the lock has already been acquired
148

149
    """
150
    parts = []
151

    
152
    # Build message
153
    if acquired:
154
      parts.append("acquired")
155
    else:
156
      parts.append("waiting")
157

    
158
    parts.append(locking.LEVEL_NAMES[level])
159

    
160
    if names == locking.ALL_SET:
161
      parts.append("ALL")
162
    elif isinstance(names, basestring):
163
      parts.append(names)
164
    else:
165
      parts.append(",".join(names))
166

    
167
    if shared:
168
      parts.append("shared")
169
    else:
170
      parts.append("exclusive")
171

    
172
    msg = "/".join(parts)
173

    
174
    logging.debug("LU locks %s", msg)
175

    
176
    if self._cbs:
177
      self._cbs.ReportLocks(msg)
178

    
179
  def _ExecLU(self, lu):
180
    """Logical Unit execution sequence.
181

182
    """
183
    write_count = self.context.cfg.write_count
184
    lu.CheckPrereq()
185
    hm = HooksMaster(self.rpc.call_hooks_runner, lu)
186
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
187
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
188
                     self._Feedback, None)
189

    
190
    if getattr(lu.op, "dry_run", False):
191
      # in this mode, no post-hooks are run, and the config is not
192
      # written (as it might have been modified by another LU, and we
193
      # shouldn't do writeout on behalf of other threads
194
      self.LogInfo("dry-run mode requested, not actually executing"
195
                   " the operation")
196
      return lu.dry_run_result
197

    
198
    try:
199
      result = lu.Exec(self._Feedback)
200
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
201
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
202
                                self._Feedback, result)
203
    finally:
204
      # FIXME: This needs locks if not lu_class.REQ_BGL
205
      if write_count != self.context.cfg.write_count:
206
        hm.RunConfigUpdate()
207

    
208
    return result
209

    
210
  def _LockAndExecLU(self, lu, level):
211
    """Execute a Logical Unit, with the needed locks.
212

213
    This is a recursive function that starts locking the given level, and
214
    proceeds up, till there are no more locks to acquire. Then it executes the
215
    given LU and its opcodes.
216

217
    """
218
    adding_locks = level in lu.add_locks
219
    acquiring_locks = level in lu.needed_locks
220
    if level not in locking.LEVELS:
221
      if self._cbs:
222
        self._cbs.NotifyStart()
223

    
224
      result = self._ExecLU(lu)
225
    elif adding_locks and acquiring_locks:
226
      # We could both acquire and add locks at the same level, but for now we
227
      # don't need this, so we'll avoid the complicated code needed.
228
      raise NotImplementedError(
229
        "Can't declare locks to acquire when adding others")
230
    elif adding_locks or acquiring_locks:
231
      lu.DeclareLocks(level)
232
      share = lu.share_locks[level]
233
      if acquiring_locks:
234
        needed_locks = lu.needed_locks[level]
235

    
236
        self._ReportLocks(level, needed_locks, share, False)
237
        lu.acquired_locks[level] = self.context.glm.acquire(level,
238
                                                            needed_locks,
239
                                                            shared=share)
240
        self._ReportLocks(level, needed_locks, share, True)
241

    
242
      else: # adding_locks
243
        add_locks = lu.add_locks[level]
244
        lu.remove_locks[level] = add_locks
245
        try:
246
          self.context.glm.add(level, add_locks, acquired=1, shared=share)
247
        except errors.LockError:
248
          raise errors.OpPrereqError(
249
            "Couldn't add locks (%s), probably because of a race condition"
250
            " with another job, who added them first" % add_locks)
251
      try:
252
        try:
253
          if adding_locks:
254
            lu.acquired_locks[level] = add_locks
255
          result = self._LockAndExecLU(lu, level + 1)
256
        finally:
257
          if level in lu.remove_locks:
258
            self.context.glm.remove(level, lu.remove_locks[level])
259
      finally:
260
        if self.context.glm.is_owned(level):
261
          self.context.glm.release(level)
262
    else:
263
      result = self._LockAndExecLU(lu, level + 1)
264

    
265
    return result
266

    
267
  def ExecOpCode(self, op, cbs):
268
    """Execute an opcode.
269

270
    @type op: an OpCode instance
271
    @param op: the opcode to be executed
272
    @type cbs: L{OpExecCbBase}
273
    @param cbs: Runtime callbacks
274

275
    """
276
    if not isinstance(op, opcodes.OpCode):
277
      raise errors.ProgrammerError("Non-opcode instance passed"
278
                                   " to ExecOpcode")
279

    
280
    self._cbs = cbs
281
    try:
282
      lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
283
      if lu_class is None:
284
        raise errors.OpCodeUnknown("Unknown opcode")
285

    
286
      # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
287
      # shared fashion otherwise (to prevent concurrent run with an exclusive
288
      # LU.
289
      self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
290
                        not lu_class.REQ_BGL, False)
291
      try:
292
        self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
293
                                 shared=not lu_class.REQ_BGL)
294
      finally:
295
        self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
296
                          not lu_class.REQ_BGL, True)
297
      try:
298
        self.exclusive_BGL = lu_class.REQ_BGL
299
        lu = lu_class(self, op, self.context, self.rpc)
300
        lu.ExpandNames()
301
        assert lu.needed_locks is not None, "needed_locks not set by LU"
302
        result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
303
      finally:
304
        self.context.glm.release(locking.LEVEL_CLUSTER)
305
        self.exclusive_BGL = False
306
    finally:
307
      self._cbs = None
308

    
309
    return result
310

    
311
  def _Feedback(self, *args):
312
    """Forward call to feedback callback function.
313

314
    """
315
    if self._cbs:
316
      self._cbs.Feedback(*args)
317

    
318
  def LogStep(self, current, total, message):
319
    """Log a change in LU execution progress.
320

321
    """
322
    logging.debug("Step %d/%d %s", current, total, message)
323
    self._Feedback("STEP %d/%d %s" % (current, total, message))
324

    
325
  def LogWarning(self, message, *args, **kwargs):
326
    """Log a warning to the logs and the user.
327

328
    The optional keyword argument is 'hint' and can be used to show a
329
    hint to the user (presumably related to the warning). If the
330
    message is empty, it will not be printed at all, allowing one to
331
    show only a hint.
332

333
    """
334
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
335
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
336
    if args:
337
      message = message % tuple(args)
338
    if message:
339
      logging.warning(message)
340
      self._Feedback(" - WARNING: %s" % message)
341
    if "hint" in kwargs:
342
      self._Feedback("      Hint: %s" % kwargs["hint"])
343

    
344
  def LogInfo(self, message, *args):
345
    """Log an informational message to the logs and the user.
346

347
    """
348
    if args:
349
      message = message % tuple(args)
350
    logging.info(message)
351
    self._Feedback(" - INFO: %s" % message)
352

    
353

    
354
class HooksMaster(object):
355
  """Hooks master.
356

357
  This class distributes the run commands to the nodes based on the
358
  specific LU class.
359

360
  In order to remove the direct dependency on the rpc module, the
361
  constructor needs a function which actually does the remote
362
  call. This will usually be rpc.call_hooks_runner, but any function
363
  which behaves the same works.
364

365
  """
366
  def __init__(self, callfn, lu):
367
    self.callfn = callfn
368
    self.lu = lu
369
    self.op = lu.op
370
    self.env, node_list_pre, node_list_post = self._BuildEnv()
371
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
372
                      constants.HOOKS_PHASE_POST: node_list_post}
373

    
374
  def _BuildEnv(self):
375
    """Compute the environment and the target nodes.
376

377
    Based on the opcode and the current node list, this builds the
378
    environment for the hooks and the target node list for the run.
379

380
    """
381
    env = {
382
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
383
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
384
      "GANETI_OP_CODE": self.op.OP_ID,
385
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
386
      "GANETI_DATA_DIR": constants.DATA_DIR,
387
      }
388

    
389
    if self.lu.HPATH is not None:
390
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
391
      if lu_env:
392
        for key in lu_env:
393
          env["GANETI_" + key] = lu_env[key]
394
    else:
395
      lu_nodes_pre = lu_nodes_post = []
396

    
397
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
398

    
399
  def _RunWrapper(self, node_list, hpath, phase):
400
    """Simple wrapper over self.callfn.
401

402
    This method fixes the environment before doing the rpc call.
403

404
    """
405
    env = self.env.copy()
406
    env["GANETI_HOOKS_PHASE"] = phase
407
    env["GANETI_HOOKS_PATH"] = hpath
408
    if self.lu.cfg is not None:
409
      env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
410
      env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
411

    
412
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
413

    
414
    return self.callfn(node_list, hpath, phase, env)
415

    
416
  def RunPhase(self, phase, nodes=None):
417
    """Run all the scripts for a phase.
418

419
    This is the main function of the HookMaster.
420

421
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
422
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
423
    @param nodes: overrides the predefined list of nodes for the given phase
424
    @return: the processed results of the hooks multi-node rpc call
425
    @raise errors.HooksFailure: on communication failure to the nodes
426
    @raise errors.HooksAbort: on failure of one of the hooks
427

428
    """
429
    if not self.node_list[phase] and not nodes:
430
      # empty node list, we should not attempt to run this as either
431
      # we're in the cluster init phase and the rpc client part can't
432
      # even attempt to run, or this LU doesn't do hooks at all
433
      return
434
    hpath = self.lu.HPATH
435
    if nodes is not None:
436
      results = self._RunWrapper(nodes, hpath, phase)
437
    else:
438
      results = self._RunWrapper(self.node_list[phase], hpath, phase)
439
    errs = []
440
    if not results:
441
      msg = "Communication Failure"
442
      if phase == constants.HOOKS_PHASE_PRE:
443
        raise errors.HooksFailure(msg)
444
      else:
445
        self.lu.LogWarning(msg)
446
        return results
447
    for node_name in results:
448
      res = results[node_name]
449
      if res.offline:
450
        continue
451
      msg = res.RemoteFailMsg()
452
      if msg:
453
        self.lu.LogWarning("Communication failure to node %s: %s",
454
                           node_name, msg)
455
        continue
456
      for script, hkr, output in res.payload:
457
        if hkr == constants.HKR_FAIL:
458
          if phase == constants.HOOKS_PHASE_PRE:
459
            errs.append((node_name, script, output))
460
          else:
461
            if not output:
462
              output = "(no output)"
463
            self.lu.LogWarning("On %s script %s failed, output: %s" %
464
                               (node_name, script, output))
465
    if errs and phase == constants.HOOKS_PHASE_PRE:
466
      raise errors.HooksAbort(errs)
467
    return results
468

    
469
  def RunConfigUpdate(self):
470
    """Run the special configuration update hook
471

472
    This is a special hook that runs only on the master after each
473
    top-level LI if the configuration has been updated.
474

475
    """
476
    phase = constants.HOOKS_PHASE_POST
477
    hpath = constants.HOOKS_NAME_CFGUPDATE
478
    nodes = [self.lu.cfg.GetMasterNode()]
479
    self._RunWrapper(nodes, hpath, phase)