Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 4b5e8271

History | View | Annotate | Download (13.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import logging
32

    
33
from ganeti import opcodes
34
from ganeti import constants
35
from ganeti import errors
36
from ganeti import rpc
37
from ganeti import cmdlib
38
from ganeti import locking
39

    
40

    
41
class Processor(object):
42
  """Object which runs OpCodes"""
43
  DISPATCH_TABLE = {
44
    # Cluster
45
    opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
46
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
47
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
48
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
49
    opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
50
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
51
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
52
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
53
    opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
54
    opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
55
    # node lu
56
    opcodes.OpAddNode: cmdlib.LUAddNode,
57
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
58
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
59
    opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
60
    opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
61
    opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
62
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
63
    opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
64
    opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
65
    opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
66
    opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
67
    # instance lu
68
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
69
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
70
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
71
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
72
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
73
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
74
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
75
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
76
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
77
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
78
    opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
79
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
80
    opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
81
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
82
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
83
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
84
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
85
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
86
    # os lu
87
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
88
    # exports lu
89
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
90
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
91
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
92
    # tags lu
93
    opcodes.OpGetTags: cmdlib.LUGetTags,
94
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
95
    opcodes.OpAddTags: cmdlib.LUAddTags,
96
    opcodes.OpDelTags: cmdlib.LUDelTags,
97
    # test lu
98
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
99
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
100
    }
101

    
102
  def __init__(self, context):
103
    """Constructor for Processor
104

105
    Args:
106
     - feedback_fn: the feedback function (taking one string) to be run when
107
                    interesting events are happening
108
    """
109
    self.context = context
110
    self._feedback_fn = None
111
    self.exclusive_BGL = False
112
    self.rpc = rpc.RpcRunner(context.cfg)
113

    
114
  def _ExecLU(self, lu):
115
    """Logical Unit execution sequence.
116

117
    """
118
    write_count = self.context.cfg.write_count
119
    lu.CheckPrereq()
120
    hm = HooksMaster(self.rpc.call_hooks_runner, lu)
121
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
122
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
123
                     self._feedback_fn, None)
124

    
125
    if getattr(lu.op, "dry_run", False):
126
      # in this mode, no post-hooks are run, and the config is not
127
      # written (as it might have been modified by another LU, and we
128
      # shouldn't do writeout on behalf of other threads
129
      self.LogInfo("dry-run mode requested, not actually executing"
130
                   " the operation")
131
      return lu.dry_run_result
132

    
133
    try:
134
      result = lu.Exec(self._feedback_fn)
135
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
136
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
137
                                self._feedback_fn, result)
138
    finally:
139
      # FIXME: This needs locks if not lu_class.REQ_BGL
140
      if write_count != self.context.cfg.write_count:
141
        hm.RunConfigUpdate()
142

    
143
    return result
144

    
145
  def _LockAndExecLU(self, lu, level):
146
    """Execute a Logical Unit, with the needed locks.
147

148
    This is a recursive function that starts locking the given level, and
149
    proceeds up, till there are no more locks to acquire. Then it executes the
150
    given LU and its opcodes.
151

152
    """
153
    adding_locks = level in lu.add_locks
154
    acquiring_locks = level in lu.needed_locks
155
    if level not in locking.LEVELS:
156
      if callable(self._run_notifier):
157
        self._run_notifier()
158
      result = self._ExecLU(lu)
159
    elif adding_locks and acquiring_locks:
160
      # We could both acquire and add locks at the same level, but for now we
161
      # don't need this, so we'll avoid the complicated code needed.
162
      raise NotImplementedError(
163
        "Can't declare locks to acquire when adding others")
164
    elif adding_locks or acquiring_locks:
165
      lu.DeclareLocks(level)
166
      share = lu.share_locks[level]
167
      if acquiring_locks:
168
        needed_locks = lu.needed_locks[level]
169
        lu.acquired_locks[level] = self.context.glm.acquire(level,
170
                                                            needed_locks,
171
                                                            shared=share)
172
      else: # adding_locks
173
        add_locks = lu.add_locks[level]
174
        lu.remove_locks[level] = add_locks
175
        try:
176
          self.context.glm.add(level, add_locks, acquired=1, shared=share)
177
        except errors.LockError:
178
          raise errors.OpPrereqError(
179
            "Couldn't add locks (%s), probably because of a race condition"
180
            " with another job, who added them first" % add_locks)
181
      try:
182
        try:
183
          if adding_locks:
184
            lu.acquired_locks[level] = add_locks
185
          result = self._LockAndExecLU(lu, level + 1)
186
        finally:
187
          if level in lu.remove_locks:
188
            self.context.glm.remove(level, lu.remove_locks[level])
189
      finally:
190
        if self.context.glm.is_owned(level):
191
          self.context.glm.release(level)
192
    else:
193
      result = self._LockAndExecLU(lu, level + 1)
194

    
195
    return result
196

    
197
  def ExecOpCode(self, op, feedback_fn, run_notifier):
198
    """Execute an opcode.
199

200
    @type op: an OpCode instance
201
    @param op: the opcode to be executed
202
    @type feedback_fn: a function that takes a single argument
203
    @param feedback_fn: this function will be used as feedback from the LU
204
                        code to the end-user
205
    @type run_notifier: callable (no arguments) or None
206
    @param run_notifier:  this function (if callable) will be called when
207
                          we are about to call the lu's Exec() method, that
208
                          is, after we have acquired all locks
209

210
    """
211
    if not isinstance(op, opcodes.OpCode):
212
      raise errors.ProgrammerError("Non-opcode instance passed"
213
                                   " to ExecOpcode")
214

    
215
    self._feedback_fn = feedback_fn
216
    self._run_notifier = run_notifier
217
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
218
    if lu_class is None:
219
      raise errors.OpCodeUnknown("Unknown opcode")
220

    
221
    # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
222
    # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
223
    self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
224
                             shared=not lu_class.REQ_BGL)
225
    try:
226
      self.exclusive_BGL = lu_class.REQ_BGL
227
      lu = lu_class(self, op, self.context, self.rpc)
228
      lu.ExpandNames()
229
      assert lu.needed_locks is not None, "needed_locks not set by LU"
230
      result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
231
    finally:
232
      self.context.glm.release(locking.LEVEL_CLUSTER)
233
      self.exclusive_BGL = False
234

    
235
    return result
236

    
237
  def LogStep(self, current, total, message):
238
    """Log a change in LU execution progress.
239

240
    """
241
    logging.debug("Step %d/%d %s", current, total, message)
242
    self._feedback_fn("STEP %d/%d %s" % (current, total, message))
243

    
244
  def LogWarning(self, message, *args, **kwargs):
245
    """Log a warning to the logs and the user.
246

247
    The optional keyword argument is 'hint' and can be used to show a
248
    hint to the user (presumably related to the warning). If the
249
    message is empty, it will not be printed at all, allowing one to
250
    show only a hint.
251

252
    """
253
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
254
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
255
    if args:
256
      message = message % tuple(args)
257
    if message:
258
      logging.warning(message)
259
      self._feedback_fn(" - WARNING: %s" % message)
260
    if "hint" in kwargs:
261
      self._feedback_fn("      Hint: %s" % kwargs["hint"])
262

    
263
  def LogInfo(self, message, *args):
264
    """Log an informational message to the logs and the user.
265

266
    """
267
    if args:
268
      message = message % tuple(args)
269
    logging.info(message)
270
    self._feedback_fn(" - INFO: %s" % message)
271

    
272

    
273
class HooksMaster(object):
274
  """Hooks master.
275

276
  This class distributes the run commands to the nodes based on the
277
  specific LU class.
278

279
  In order to remove the direct dependency on the rpc module, the
280
  constructor needs a function which actually does the remote
281
  call. This will usually be rpc.call_hooks_runner, but any function
282
  which behaves the same works.
283

284
  """
285
  def __init__(self, callfn, lu):
286
    self.callfn = callfn
287
    self.lu = lu
288
    self.op = lu.op
289
    self.env, node_list_pre, node_list_post = self._BuildEnv()
290
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
291
                      constants.HOOKS_PHASE_POST: node_list_post}
292

    
293
  def _BuildEnv(self):
294
    """Compute the environment and the target nodes.
295

296
    Based on the opcode and the current node list, this builds the
297
    environment for the hooks and the target node list for the run.
298

299
    """
300
    env = {
301
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
302
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
303
      "GANETI_OP_CODE": self.op.OP_ID,
304
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
305
      "GANETI_DATA_DIR": constants.DATA_DIR,
306
      }
307

    
308
    if self.lu.HPATH is not None:
309
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
310
      if lu_env:
311
        for key in lu_env:
312
          env["GANETI_" + key] = lu_env[key]
313
    else:
314
      lu_nodes_pre = lu_nodes_post = []
315

    
316
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
317

    
318
  def _RunWrapper(self, node_list, hpath, phase):
319
    """Simple wrapper over self.callfn.
320

321
    This method fixes the environment before doing the rpc call.
322

323
    """
324
    env = self.env.copy()
325
    env["GANETI_HOOKS_PHASE"] = phase
326
    env["GANETI_HOOKS_PATH"] = hpath
327
    if self.lu.cfg is not None:
328
      env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
329
      env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
330

    
331
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
332

    
333
    return self.callfn(node_list, hpath, phase, env)
334

    
335
  def RunPhase(self, phase):
336
    """Run all the scripts for a phase.
337

338
    This is the main function of the HookMaster.
339

340
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
341
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
342
    @return: the processed results of the hooks multi-node rpc call
343
    @raise errors.HooksFailure: on communication failure to the nodes
344

345
    """
346
    if not self.node_list[phase]:
347
      # empty node list, we should not attempt to run this as either
348
      # we're in the cluster init phase and the rpc client part can't
349
      # even attempt to run, or this LU doesn't do hooks at all
350
      return
351
    hpath = self.lu.HPATH
352
    results = self._RunWrapper(self.node_list[phase], hpath, phase)
353
    if phase == constants.HOOKS_PHASE_PRE:
354
      errs = []
355
      if not results:
356
        raise errors.HooksFailure("Communication failure")
357
      for node_name in results:
358
        res = results[node_name]
359
        if res.offline:
360
          continue
361
        msg = res.RemoteFailMsg()
362
        if msg:
363
          self.lu.LogWarning("Communication failure to node %s: %s",
364
                             node_name, msg)
365
          continue
366
        for script, hkr, output in res.payload:
367
          if hkr == constants.HKR_FAIL:
368
            errs.append((node_name, script, output))
369
      if errs:
370
        raise errors.HooksAbort(errs)
371
    return results
372

    
373
  def RunConfigUpdate(self):
374
    """Run the special configuration update hook
375

376
    This is a special hook that runs only on the master after each
377
    top-level LI if the configuration has been updated.
378

379
    """
380
    phase = constants.HOOKS_PHASE_POST
381
    hpath = constants.HOOKS_NAME_CFGUPDATE
382
    nodes = [self.lu.cfg.GetMasterNode()]
383
    self._RunWrapper(nodes, hpath, phase)