Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 9e5442ce

History | View | Annotate | Download (13.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import logging
32

    
33
from ganeti import opcodes
34
from ganeti import constants
35
from ganeti import errors
36
from ganeti import rpc
37
from ganeti import cmdlib
38
from ganeti import locking
39

    
40

    
41
class Processor(object):
42
  """Object which runs OpCodes"""
43
  DISPATCH_TABLE = {
44
    # Cluster
45
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
46
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
47
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
48
    opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
49
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
50
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
51
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
52
    opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
53
    # node lu
54
    opcodes.OpAddNode: cmdlib.LUAddNode,
55
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
56
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
57
    opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
58
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
59
    opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
60
    opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
61
    opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
62
    opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
63
    # instance lu
64
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
65
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
66
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
67
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
68
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
69
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
70
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
71
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
72
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
73
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
74
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
75
    opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
76
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
77
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
78
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
79
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
80
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
81
    # os lu
82
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
83
    # exports lu
84
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
85
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
86
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
87
    # tags lu
88
    opcodes.OpGetTags: cmdlib.LUGetTags,
89
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
90
    opcodes.OpAddTags: cmdlib.LUAddTags,
91
    opcodes.OpDelTags: cmdlib.LUDelTags,
92
    # test lu
93
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
94
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
95
    }
96

    
97
  def __init__(self, context):
98
    """Constructor for Processor
99

100
    Args:
101
     - feedback_fn: the feedback function (taking one string) to be run when
102
                    interesting events are happening
103
    """
104
    self.context = context
105
    self._feedback_fn = None
106
    self.exclusive_BGL = False
107
    self.rpc = rpc.RpcRunner(context.cfg)
108

    
109
  def _ExecLU(self, lu):
110
    """Logical Unit execution sequence.
111

112
    """
113
    write_count = self.context.cfg.write_count
114
    lu.CheckPrereq()
115
    hm = HooksMaster(self.rpc.call_hooks_runner, self, lu)
116
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
117
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
118
                     self._feedback_fn, None)
119

    
120
    if getattr(lu.op, "dry_run", False):
121
      # in this mode, no post-hooks are run, and the config is not
122
      # written (as it might have been modified by another LU, and we
123
      # shouldn't do writeout on behalf of other threads
124
      self.LogInfo("dry-run mode requested, not actually executing"
125
                   " the operation")
126
      return lu.dry_run_result
127

    
128
    try:
129
      result = lu.Exec(self._feedback_fn)
130
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
131
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
132
                                self._feedback_fn, result)
133
    finally:
134
      # FIXME: This needs locks if not lu_class.REQ_BGL
135
      if write_count != self.context.cfg.write_count:
136
        hm.RunConfigUpdate()
137

    
138
    return result
139

    
140
  def _LockAndExecLU(self, lu, level):
141
    """Execute a Logical Unit, with the needed locks.
142

143
    This is a recursive function that starts locking the given level, and
144
    proceeds up, till there are no more locks to acquire. Then it executes the
145
    given LU and its opcodes.
146

147
    """
148
    adding_locks = level in lu.add_locks
149
    acquiring_locks = level in lu.needed_locks
150
    if level not in locking.LEVELS:
151
      if callable(self._run_notifier):
152
        self._run_notifier()
153
      result = self._ExecLU(lu)
154
    elif adding_locks and acquiring_locks:
155
      # We could both acquire and add locks at the same level, but for now we
156
      # don't need this, so we'll avoid the complicated code needed.
157
      raise NotImplementedError(
158
        "Can't declare locks to acquire when adding others")
159
    elif adding_locks or acquiring_locks:
160
      lu.DeclareLocks(level)
161
      share = lu.share_locks[level]
162
      if acquiring_locks:
163
        needed_locks = lu.needed_locks[level]
164
        lu.acquired_locks[level] = self.context.glm.acquire(level,
165
                                                            needed_locks,
166
                                                            shared=share)
167
      else: # adding_locks
168
        add_locks = lu.add_locks[level]
169
        lu.remove_locks[level] = add_locks
170
        try:
171
          self.context.glm.add(level, add_locks, acquired=1, shared=share)
172
        except errors.LockError:
173
          raise errors.OpPrereqError(
174
            "Couldn't add locks (%s), probably because of a race condition"
175
            " with another job, who added them first" % add_locks)
176
      try:
177
        try:
178
          if adding_locks:
179
            lu.acquired_locks[level] = add_locks
180
          result = self._LockAndExecLU(lu, level + 1)
181
        finally:
182
          if level in lu.remove_locks:
183
            self.context.glm.remove(level, lu.remove_locks[level])
184
      finally:
185
        if self.context.glm.is_owned(level):
186
          self.context.glm.release(level)
187
    else:
188
      result = self._LockAndExecLU(lu, level + 1)
189

    
190
    return result
191

    
192
  def ExecOpCode(self, op, feedback_fn, run_notifier):
193
    """Execute an opcode.
194

195
    @type op: an OpCode instance
196
    @param op: the opcode to be executed
197
    @type feedback_fn: a function that takes a single argument
198
    @param feedback_fn: this function will be used as feedback from the LU
199
                        code to the end-user
200
    @type run_notifier: callable (no arguments) or None
201
    @param run_notifier:  this function (if callable) will be called when
202
                          we are about to call the lu's Exec() method, that
203
                          is, after we have acquired all locks
204

205
    """
206
    if not isinstance(op, opcodes.OpCode):
207
      raise errors.ProgrammerError("Non-opcode instance passed"
208
                                   " to ExecOpcode")
209

    
210
    self._feedback_fn = feedback_fn
211
    self._run_notifier = run_notifier
212
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
213
    if lu_class is None:
214
      raise errors.OpCodeUnknown("Unknown opcode")
215

    
216
    # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
217
    # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
218
    self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
219
                             shared=not lu_class.REQ_BGL)
220
    try:
221
      self.exclusive_BGL = lu_class.REQ_BGL
222
      lu = lu_class(self, op, self.context, self.rpc)
223
      lu.ExpandNames()
224
      assert lu.needed_locks is not None, "needed_locks not set by LU"
225
      result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
226
    finally:
227
      self.context.glm.release(locking.LEVEL_CLUSTER)
228
      self.exclusive_BGL = False
229

    
230
    return result
231

    
232
  def LogStep(self, current, total, message):
233
    """Log a change in LU execution progress.
234

235
    """
236
    logging.debug("Step %d/%d %s", current, total, message)
237
    self._feedback_fn("STEP %d/%d %s" % (current, total, message))
238

    
239
  def LogWarning(self, message, *args, **kwargs):
240
    """Log a warning to the logs and the user.
241

242
    The optional keyword argument is 'hint' and can be used to show a
243
    hint to the user (presumably related to the warning). If the
244
    message is empty, it will not be printed at all, allowing one to
245
    show only a hint.
246

247
    """
248
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
249
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
250
    if args:
251
      message = message % tuple(args)
252
    if message:
253
      logging.warning(message)
254
      self._feedback_fn(" - WARNING: %s" % message)
255
    if "hint" in kwargs:
256
      self._feedback_fn("      Hint: %s" % kwargs["hint"])
257

    
258
  def LogInfo(self, message, *args):
259
    """Log an informational message to the logs and the user.
260

261
    """
262
    if args:
263
      message = message % tuple(args)
264
    logging.info(message)
265
    self._feedback_fn(" - INFO: %s" % message)
266

    
267

    
268
class HooksMaster(object):
269
  """Hooks master.
270

271
  This class distributes the run commands to the nodes based on the
272
  specific LU class.
273

274
  In order to remove the direct dependency on the rpc module, the
275
  constructor needs a function which actually does the remote
276
  call. This will usually be rpc.call_hooks_runner, but any function
277
  which behaves the same works.
278

279
  """
280
  def __init__(self, callfn, proc, lu):
281
    self.callfn = callfn
282
    self.proc = proc
283
    self.lu = lu
284
    self.op = lu.op
285
    self.env, node_list_pre, node_list_post = self._BuildEnv()
286
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
287
                      constants.HOOKS_PHASE_POST: node_list_post}
288

    
289
  def _BuildEnv(self):
290
    """Compute the environment and the target nodes.
291

292
    Based on the opcode and the current node list, this builds the
293
    environment for the hooks and the target node list for the run.
294

295
    """
296
    env = {
297
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
298
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
299
      "GANETI_OP_CODE": self.op.OP_ID,
300
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
301
      "GANETI_DATA_DIR": constants.DATA_DIR,
302
      }
303

    
304
    if self.lu.HPATH is not None:
305
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
306
      if lu_env:
307
        for key in lu_env:
308
          env["GANETI_" + key] = lu_env[key]
309
    else:
310
      lu_nodes_pre = lu_nodes_post = []
311

    
312
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
313

    
314
  def _RunWrapper(self, node_list, hpath, phase):
315
    """Simple wrapper over self.callfn.
316

317
    This method fixes the environment before doing the rpc call.
318

319
    """
320
    env = self.env.copy()
321
    env["GANETI_HOOKS_PHASE"] = phase
322
    env["GANETI_HOOKS_PATH"] = hpath
323
    if self.lu.cfg is not None:
324
      env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
325
      env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
326

    
327
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
328

    
329
    return self.callfn(node_list, hpath, phase, env)
330

    
331
  def RunPhase(self, phase):
332
    """Run all the scripts for a phase.
333

334
    This is the main function of the HookMaster.
335

336
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
337
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
338
    @return: the processed results of the hooks multi-node rpc call
339
    @raise errors.HooksFailure: on communication failure to the nodes
340

341
    """
342
    if not self.node_list[phase]:
343
      # empty node list, we should not attempt to run this as either
344
      # we're in the cluster init phase and the rpc client part can't
345
      # even attempt to run, or this LU doesn't do hooks at all
346
      return
347
    hpath = self.lu.HPATH
348
    results = self._RunWrapper(self.node_list[phase], hpath, phase)
349
    if phase == constants.HOOKS_PHASE_PRE:
350
      errs = []
351
      if not results:
352
        raise errors.HooksFailure("Communication failure")
353
      for node_name in results:
354
        res = results[node_name]
355
        if res.offline:
356
          continue
357
        msg = res.RemoteFailMsg()
358
        if msg:
359
          self.proc.LogWarning("Communication failure to node %s: %s",
360
                               node_name, msg)
361
          continue
362
        for script, hkr, output in res.payload:
363
          if hkr == constants.HKR_FAIL:
364
            errs.append((node_name, script, output))
365
      if errs:
366
        raise errors.HooksAbort(errs)
367
    return results
368

    
369
  def RunConfigUpdate(self):
370
    """Run the special configuration update hook
371

372
    This is a special hook that runs only on the master after each
373
    top-level LI if the configuration has been updated.
374

375
    """
376
    phase = constants.HOOKS_PHASE_POST
377
    hpath = constants.HOOKS_NAME_CFGUPDATE
378
    nodes = [self.lu.cfg.GetMasterNode()]
379
    self._RunWrapper(nodes, hpath, phase)