Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 031a3e57

History | View | Annotate | Download (14.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import logging
32

    
33
from ganeti import opcodes
34
from ganeti import constants
35
from ganeti import errors
36
from ganeti import rpc
37
from ganeti import cmdlib
38
from ganeti import locking
39

    
40

    
41
class OpExecCbBase:
42
  """Base class for OpCode execution callbacks.
43

44
  """
45
  def NotifyStart(self):
46
    """Called when we are about to execute the LU.
47

48
    This function is called when we're about to start the lu's Exec() method,
49
    that is, after we have acquired all locks.
50

51
    """
52

    
53
  def Feedback(self, *args):
54
    """Sends feedback from the LU code to the end-user.
55

56
    """
57

    
58

    
59
class Processor(object):
60
  """Object which runs OpCodes"""
61
  DISPATCH_TABLE = {
62
    # Cluster
63
    opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
64
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
65
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
66
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
67
    opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
68
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
69
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
70
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
71
    opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
72
    opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
73
    # node lu
74
    opcodes.OpAddNode: cmdlib.LUAddNode,
75
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
76
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
77
    opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
78
    opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
79
    opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
80
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
81
    opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
82
    opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
83
    opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
84
    opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
85
    # instance lu
86
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
87
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
88
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
89
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
90
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
91
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
92
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
93
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
94
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
95
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
96
    opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
97
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
98
    opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
99
    opcodes.OpMoveInstance: cmdlib.LUMoveInstance,
100
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
101
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
102
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
103
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
104
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
105
    # os lu
106
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
107
    # exports lu
108
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
109
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
110
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
111
    # tags lu
112
    opcodes.OpGetTags: cmdlib.LUGetTags,
113
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
114
    opcodes.OpAddTags: cmdlib.LUAddTags,
115
    opcodes.OpDelTags: cmdlib.LUDelTags,
116
    # test lu
117
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
118
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
119
    }
120

    
121
  def __init__(self, context):
122
    """Constructor for Processor
123

124
    """
125
    self.context = context
126
    self._cbs = None
127
    self.exclusive_BGL = False
128
    self.rpc = rpc.RpcRunner(context.cfg)
129
    self.hmclass = HooksMaster
130

    
131
  def _ExecLU(self, lu):
132
    """Logical Unit execution sequence.
133

134
    """
135
    write_count = self.context.cfg.write_count
136
    lu.CheckPrereq()
137
    hm = HooksMaster(self.rpc.call_hooks_runner, lu)
138
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
139
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
140
                     self._Feedback, None)
141

    
142
    if getattr(lu.op, "dry_run", False):
143
      # in this mode, no post-hooks are run, and the config is not
144
      # written (as it might have been modified by another LU, and we
145
      # shouldn't do writeout on behalf of other threads
146
      self.LogInfo("dry-run mode requested, not actually executing"
147
                   " the operation")
148
      return lu.dry_run_result
149

    
150
    try:
151
      result = lu.Exec(self._Feedback)
152
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
153
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
154
                                self._Feedback, result)
155
    finally:
156
      # FIXME: This needs locks if not lu_class.REQ_BGL
157
      if write_count != self.context.cfg.write_count:
158
        hm.RunConfigUpdate()
159

    
160
    return result
161

    
162
  def _LockAndExecLU(self, lu, level):
163
    """Execute a Logical Unit, with the needed locks.
164

165
    This is a recursive function that starts locking the given level, and
166
    proceeds up, till there are no more locks to acquire. Then it executes the
167
    given LU and its opcodes.
168

169
    """
170
    adding_locks = level in lu.add_locks
171
    acquiring_locks = level in lu.needed_locks
172
    if level not in locking.LEVELS:
173
      if self._cbs:
174
        self._cbs.NotifyStart()
175

    
176
      result = self._ExecLU(lu)
177
    elif adding_locks and acquiring_locks:
178
      # We could both acquire and add locks at the same level, but for now we
179
      # don't need this, so we'll avoid the complicated code needed.
180
      raise NotImplementedError(
181
        "Can't declare locks to acquire when adding others")
182
    elif adding_locks or acquiring_locks:
183
      lu.DeclareLocks(level)
184
      share = lu.share_locks[level]
185
      if acquiring_locks:
186
        needed_locks = lu.needed_locks[level]
187
        lu.acquired_locks[level] = self.context.glm.acquire(level,
188
                                                            needed_locks,
189
                                                            shared=share)
190
      else: # adding_locks
191
        add_locks = lu.add_locks[level]
192
        lu.remove_locks[level] = add_locks
193
        try:
194
          self.context.glm.add(level, add_locks, acquired=1, shared=share)
195
        except errors.LockError:
196
          raise errors.OpPrereqError(
197
            "Couldn't add locks (%s), probably because of a race condition"
198
            " with another job, who added them first" % add_locks)
199
      try:
200
        try:
201
          if adding_locks:
202
            lu.acquired_locks[level] = add_locks
203
          result = self._LockAndExecLU(lu, level + 1)
204
        finally:
205
          if level in lu.remove_locks:
206
            self.context.glm.remove(level, lu.remove_locks[level])
207
      finally:
208
        if self.context.glm.is_owned(level):
209
          self.context.glm.release(level)
210
    else:
211
      result = self._LockAndExecLU(lu, level + 1)
212

    
213
    return result
214

    
215
  def ExecOpCode(self, op, cbs):
216
    """Execute an opcode.
217

218
    @type op: an OpCode instance
219
    @param op: the opcode to be executed
220
    @type cbs: L{OpExecCbBase}
221
    @param cbs: Runtime callbacks
222

223
    """
224
    if not isinstance(op, opcodes.OpCode):
225
      raise errors.ProgrammerError("Non-opcode instance passed"
226
                                   " to ExecOpcode")
227

    
228
    self._cbs = cbs
229
    try:
230
      lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
231
      if lu_class is None:
232
        raise errors.OpCodeUnknown("Unknown opcode")
233

    
234
      # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
235
      # shared fashion otherwise (to prevent concurrent run with an exclusive
236
      # LU.
237
      self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
238
                               shared=not lu_class.REQ_BGL)
239
      try:
240
        self.exclusive_BGL = lu_class.REQ_BGL
241
        lu = lu_class(self, op, self.context, self.rpc)
242
        lu.ExpandNames()
243
        assert lu.needed_locks is not None, "needed_locks not set by LU"
244
        result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
245
      finally:
246
        self.context.glm.release(locking.LEVEL_CLUSTER)
247
        self.exclusive_BGL = False
248
    finally:
249
      self._cbs = None
250

    
251
    return result
252

    
253
  def _Feedback(self, *args):
254
    """Forward call to feedback callback function.
255

256
    """
257
    if self._cbs:
258
      self._cbs.Feedback(*args)
259

    
260
  def LogStep(self, current, total, message):
261
    """Log a change in LU execution progress.
262

263
    """
264
    logging.debug("Step %d/%d %s", current, total, message)
265
    self._Feedback("STEP %d/%d %s" % (current, total, message))
266

    
267
  def LogWarning(self, message, *args, **kwargs):
268
    """Log a warning to the logs and the user.
269

270
    The optional keyword argument is 'hint' and can be used to show a
271
    hint to the user (presumably related to the warning). If the
272
    message is empty, it will not be printed at all, allowing one to
273
    show only a hint.
274

275
    """
276
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
277
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
278
    if args:
279
      message = message % tuple(args)
280
    if message:
281
      logging.warning(message)
282
      self._Feedback(" - WARNING: %s" % message)
283
    if "hint" in kwargs:
284
      self._Feedback("      Hint: %s" % kwargs["hint"])
285

    
286
  def LogInfo(self, message, *args):
287
    """Log an informational message to the logs and the user.
288

289
    """
290
    if args:
291
      message = message % tuple(args)
292
    logging.info(message)
293
    self._Feedback(" - INFO: %s" % message)
294

    
295

    
296
class HooksMaster(object):
297
  """Hooks master.
298

299
  This class distributes the run commands to the nodes based on the
300
  specific LU class.
301

302
  In order to remove the direct dependency on the rpc module, the
303
  constructor needs a function which actually does the remote
304
  call. This will usually be rpc.call_hooks_runner, but any function
305
  which behaves the same works.
306

307
  """
308
  def __init__(self, callfn, lu):
309
    self.callfn = callfn
310
    self.lu = lu
311
    self.op = lu.op
312
    self.env, node_list_pre, node_list_post = self._BuildEnv()
313
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
314
                      constants.HOOKS_PHASE_POST: node_list_post}
315

    
316
  def _BuildEnv(self):
317
    """Compute the environment and the target nodes.
318

319
    Based on the opcode and the current node list, this builds the
320
    environment for the hooks and the target node list for the run.
321

322
    """
323
    env = {
324
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
325
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
326
      "GANETI_OP_CODE": self.op.OP_ID,
327
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
328
      "GANETI_DATA_DIR": constants.DATA_DIR,
329
      }
330

    
331
    if self.lu.HPATH is not None:
332
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
333
      if lu_env:
334
        for key in lu_env:
335
          env["GANETI_" + key] = lu_env[key]
336
    else:
337
      lu_nodes_pre = lu_nodes_post = []
338

    
339
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
340

    
341
  def _RunWrapper(self, node_list, hpath, phase):
342
    """Simple wrapper over self.callfn.
343

344
    This method fixes the environment before doing the rpc call.
345

346
    """
347
    env = self.env.copy()
348
    env["GANETI_HOOKS_PHASE"] = phase
349
    env["GANETI_HOOKS_PATH"] = hpath
350
    if self.lu.cfg is not None:
351
      env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
352
      env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
353

    
354
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
355

    
356
    return self.callfn(node_list, hpath, phase, env)
357

    
358
  def RunPhase(self, phase, nodes=None):
359
    """Run all the scripts for a phase.
360

361
    This is the main function of the HookMaster.
362

363
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
364
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
365
    @param nodes: overrides the predefined list of nodes for the given phase
366
    @return: the processed results of the hooks multi-node rpc call
367
    @raise errors.HooksFailure: on communication failure to the nodes
368
    @raise errors.HooksAbort: on failure of one of the hooks
369

370
    """
371
    if not self.node_list[phase] and not nodes:
372
      # empty node list, we should not attempt to run this as either
373
      # we're in the cluster init phase and the rpc client part can't
374
      # even attempt to run, or this LU doesn't do hooks at all
375
      return
376
    hpath = self.lu.HPATH
377
    if nodes is not None:
378
      results = self._RunWrapper(nodes, hpath, phase)
379
    else:
380
      results = self._RunWrapper(self.node_list[phase], hpath, phase)
381
    errs = []
382
    if not results:
383
      msg = "Communication Failure"
384
      if phase == constants.HOOKS_PHASE_PRE:
385
        raise errors.HooksFailure(msg)
386
      else:
387
        self.lu.LogWarning(msg)
388
        return results
389
    for node_name in results:
390
      res = results[node_name]
391
      if res.offline:
392
        continue
393
      msg = res.RemoteFailMsg()
394
      if msg:
395
        self.lu.LogWarning("Communication failure to node %s: %s",
396
                           node_name, msg)
397
        continue
398
      for script, hkr, output in res.payload:
399
        if hkr == constants.HKR_FAIL:
400
          if phase == constants.HOOKS_PHASE_PRE:
401
            errs.append((node_name, script, output))
402
          else:
403
            if not output:
404
              output = "(no output)"
405
            self.lu.LogWarning("On %s script %s failed, output: %s" %
406
                               (node_name, script, output))
407
    if errs and phase == constants.HOOKS_PHASE_PRE:
408
      raise errors.HooksAbort(errs)
409
    return results
410

    
411
  def RunConfigUpdate(self):
412
    """Run the special configuration update hook
413

414
    This is a special hook that runs only on the master after each
415
    top-level LI if the configuration has been updated.
416

417
    """
418
    phase = constants.HOOKS_PHASE_POST
419
    hpath = constants.HOOKS_NAME_CFGUPDATE
420
    nodes = [self.lu.cfg.GetMasterNode()]
421
    self._RunWrapper(nodes, hpath, phase)