Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 8c96d01f

History | View | Annotate | Download (13.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import logging
32

    
33
from ganeti import opcodes
34
from ganeti import constants
35
from ganeti import errors
36
from ganeti import rpc
37
from ganeti import cmdlib
38
from ganeti import locking
39

    
40

    
41
class Processor(object):
42
  """Object which runs OpCodes"""
43
  DISPATCH_TABLE = {
44
    # Cluster
45
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
46
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
47
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
48
    opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
49
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
50
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
51
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
52
    opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
53
    # node lu
54
    opcodes.OpAddNode: cmdlib.LUAddNode,
55
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
56
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
57
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
58
    opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
59
    opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
60
    opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
61
    # instance lu
62
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
63
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
64
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
65
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
66
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
67
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
68
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
69
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
70
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
71
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
72
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
73
    opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
74
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
75
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
76
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
77
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
78
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
79
    # os lu
80
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
81
    # exports lu
82
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
83
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
84
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
85
    # tags lu
86
    opcodes.OpGetTags: cmdlib.LUGetTags,
87
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
88
    opcodes.OpAddTags: cmdlib.LUAddTags,
89
    opcodes.OpDelTags: cmdlib.LUDelTags,
90
    # test lu
91
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
92
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
93
    }
94

    
95
  def __init__(self, context):
96
    """Constructor for Processor
97

98
    Args:
99
     - feedback_fn: the feedback function (taking one string) to be run when
100
                    interesting events are happening
101
    """
102
    self.context = context
103
    self._feedback_fn = None
104
    self.exclusive_BGL = False
105
    self.rpc = rpc.RpcRunner(context.cfg)
106

    
107
  def _ExecLU(self, lu):
108
    """Logical Unit execution sequence.
109

110
    """
111
    write_count = self.context.cfg.write_count
112
    lu.CheckPrereq()
113
    hm = HooksMaster(self.rpc.call_hooks_runner, self, lu)
114
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
115
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
116
                     self._feedback_fn, None)
117

    
118
    if getattr(lu.op, "dry_run", False):
119
      # in this mode, no post-hooks are run, and the config is not
120
      # written (as it might have been modified by another LU, and we
121
      # shouldn't do writeout on behalf of other threads
122
      self.LogInfo("dry-run mode requested, not actually executing"
123
                   " the operation")
124
      return lu.dry_run_result
125

    
126
    try:
127
      result = lu.Exec(self._feedback_fn)
128
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
129
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
130
                                self._feedback_fn, result)
131
    finally:
132
      # FIXME: This needs locks if not lu_class.REQ_BGL
133
      if write_count != self.context.cfg.write_count:
134
        hm.RunConfigUpdate()
135

    
136
    return result
137

    
138
  def _LockAndExecLU(self, lu, level):
139
    """Execute a Logical Unit, with the needed locks.
140

141
    This is a recursive function that starts locking the given level, and
142
    proceeds up, till there are no more locks to acquire. Then it executes the
143
    given LU and its opcodes.
144

145
    """
146
    adding_locks = level in lu.add_locks
147
    acquiring_locks = level in lu.needed_locks
148
    if level not in locking.LEVELS:
149
      if callable(self._run_notifier):
150
        self._run_notifier()
151
      result = self._ExecLU(lu)
152
    elif adding_locks and acquiring_locks:
153
      # We could both acquire and add locks at the same level, but for now we
154
      # don't need this, so we'll avoid the complicated code needed.
155
      raise NotImplementedError(
156
        "Can't declare locks to acquire when adding others")
157
    elif adding_locks or acquiring_locks:
158
      lu.DeclareLocks(level)
159
      share = lu.share_locks[level]
160
      if acquiring_locks:
161
        needed_locks = lu.needed_locks[level]
162
        lu.acquired_locks[level] = self.context.glm.acquire(level,
163
                                                            needed_locks,
164
                                                            shared=share)
165
      else: # adding_locks
166
        add_locks = lu.add_locks[level]
167
        lu.remove_locks[level] = add_locks
168
        try:
169
          self.context.glm.add(level, add_locks, acquired=1, shared=share)
170
        except errors.LockError:
171
          raise errors.OpPrereqError(
172
            "Couldn't add locks (%s), probably because of a race condition"
173
            " with another job, who added them first" % add_locks)
174
      try:
175
        try:
176
          if adding_locks:
177
            lu.acquired_locks[level] = add_locks
178
          result = self._LockAndExecLU(lu, level + 1)
179
        finally:
180
          if level in lu.remove_locks:
181
            self.context.glm.remove(level, lu.remove_locks[level])
182
      finally:
183
        if self.context.glm.is_owned(level):
184
          self.context.glm.release(level)
185
    else:
186
      result = self._LockAndExecLU(lu, level + 1)
187

    
188
    return result
189

    
190
  def ExecOpCode(self, op, feedback_fn, run_notifier):
191
    """Execute an opcode.
192

193
    @type op: an OpCode instance
194
    @param op: the opcode to be executed
195
    @type feedback_fn: a function that takes a single argument
196
    @param feedback_fn: this function will be used as feedback from the LU
197
                        code to the end-user
198
    @type run_notifier: callable (no arguments) or None
199
    @param run_notifier:  this function (if callable) will be called when
200
                          we are about to call the lu's Exec() method, that
201
                          is, after we have acquired all locks
202

203
    """
204
    if not isinstance(op, opcodes.OpCode):
205
      raise errors.ProgrammerError("Non-opcode instance passed"
206
                                   " to ExecOpcode")
207

    
208
    self._feedback_fn = feedback_fn
209
    self._run_notifier = run_notifier
210
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
211
    if lu_class is None:
212
      raise errors.OpCodeUnknown("Unknown opcode")
213

    
214
    # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
215
    # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
216
    self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
217
                             shared=not lu_class.REQ_BGL)
218
    try:
219
      self.exclusive_BGL = lu_class.REQ_BGL
220
      lu = lu_class(self, op, self.context, self.rpc)
221
      lu.ExpandNames()
222
      assert lu.needed_locks is not None, "needed_locks not set by LU"
223
      result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
224
    finally:
225
      self.context.glm.release(locking.LEVEL_CLUSTER)
226
      self.exclusive_BGL = False
227

    
228
    return result
229

    
230
  def LogStep(self, current, total, message):
231
    """Log a change in LU execution progress.
232

233
    """
234
    logging.debug("Step %d/%d %s", current, total, message)
235
    self._feedback_fn("STEP %d/%d %s" % (current, total, message))
236

    
237
  def LogWarning(self, message, *args, **kwargs):
238
    """Log a warning to the logs and the user.
239

240
    The optional keyword argument is 'hint' and can be used to show a
241
    hint to the user (presumably related to the warning). If the
242
    message is empty, it will not be printed at all, allowing one to
243
    show only a hint.
244

245
    """
246
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
247
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
248
    if args:
249
      message = message % tuple(args)
250
    if message:
251
      logging.warning(message)
252
      self._feedback_fn(" - WARNING: %s" % message)
253
    if "hint" in kwargs:
254
      self._feedback_fn("      Hint: %s" % kwargs["hint"])
255

    
256
  def LogInfo(self, message, *args):
257
    """Log an informational message to the logs and the user.
258

259
    """
260
    if args:
261
      message = message % tuple(args)
262
    logging.info(message)
263
    self._feedback_fn(" - INFO: %s" % message)
264

    
265

    
266
class HooksMaster(object):
267
  """Hooks master.
268

269
  This class distributes the run commands to the nodes based on the
270
  specific LU class.
271

272
  In order to remove the direct dependency on the rpc module, the
273
  constructor needs a function which actually does the remote
274
  call. This will usually be rpc.call_hooks_runner, but any function
275
  which behaves the same works.
276

277
  """
278
  def __init__(self, callfn, proc, lu):
279
    self.callfn = callfn
280
    self.proc = proc
281
    self.lu = lu
282
    self.op = lu.op
283
    self.env, node_list_pre, node_list_post = self._BuildEnv()
284
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
285
                      constants.HOOKS_PHASE_POST: node_list_post}
286

    
287
  def _BuildEnv(self):
288
    """Compute the environment and the target nodes.
289

290
    Based on the opcode and the current node list, this builds the
291
    environment for the hooks and the target node list for the run.
292

293
    """
294
    env = {
295
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
296
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
297
      "GANETI_OP_CODE": self.op.OP_ID,
298
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
299
      "GANETI_DATA_DIR": constants.DATA_DIR,
300
      }
301

    
302
    if self.lu.HPATH is not None:
303
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
304
      if lu_env:
305
        for key in lu_env:
306
          env["GANETI_" + key] = lu_env[key]
307
    else:
308
      lu_nodes_pre = lu_nodes_post = []
309

    
310
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
311

    
312
  def _RunWrapper(self, node_list, hpath, phase):
313
    """Simple wrapper over self.callfn.
314

315
    This method fixes the environment before doing the rpc call.
316

317
    """
318
    env = self.env.copy()
319
    env["GANETI_HOOKS_PHASE"] = phase
320
    env["GANETI_HOOKS_PATH"] = hpath
321
    if self.lu.cfg is not None:
322
      env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
323
      env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
324

    
325
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
326

    
327
    return self.callfn(node_list, hpath, phase, env)
328

    
329
  def RunPhase(self, phase):
330
    """Run all the scripts for a phase.
331

332
    This is the main function of the HookMaster.
333

334
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
335
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
336
    @return: the processed results of the hooks multi-node rpc call
337
    @raise errors.HooksFailure: on communication failure to the nodes
338

339
    """
340
    if not self.node_list[phase]:
341
      # empty node list, we should not attempt to run this as either
342
      # we're in the cluster init phase and the rpc client part can't
343
      # even attempt to run, or this LU doesn't do hooks at all
344
      return
345
    hpath = self.lu.HPATH
346
    results = self._RunWrapper(self.node_list[phase], hpath, phase)
347
    if phase == constants.HOOKS_PHASE_PRE:
348
      errs = []
349
      if not results:
350
        raise errors.HooksFailure("Communication failure")
351
      for node_name in results:
352
        res = results[node_name]
353
        if res.offline:
354
          continue
355
        msg = res.RemoteFailMsg()
356
        if msg:
357
          self.proc.LogWarning("Communication failure to node %s: %s",
358
                               node_name, msg)
359
          continue
360
        for script, hkr, output in res.payload:
361
          if hkr == constants.HKR_FAIL:
362
            errs.append((node_name, script, output))
363
      if errs:
364
        raise errors.HooksAbort(errs)
365
    return results
366

    
367
  def RunConfigUpdate(self):
368
    """Run the special configuration update hook
369

370
    This is a special hook that runs only on the master after each
371
    top-level LI if the configuration has been updated.
372

373
    """
374
    phase = constants.HOOKS_PHASE_POST
375
    hpath = constants.HOOKS_NAME_CFGUPDATE
376
    nodes = [self.lu.cfg.GetMasterNode()]
377
    self._RunWrapper(nodes, hpath, phase)