Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 2971c913

History | View | Annotate | Download (13.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import logging
32

    
33
from ganeti import opcodes
34
from ganeti import constants
35
from ganeti import errors
36
from ganeti import rpc
37
from ganeti import cmdlib
38
from ganeti import locking
39

    
40

    
41
class Processor(object):
42
  """Object which runs OpCodes"""
43
  DISPATCH_TABLE = {
44
    # Cluster
45
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
46
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
47
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
48
    opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
49
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
50
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
51
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
52
    opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
53
    # node lu
54
    opcodes.OpAddNode: cmdlib.LUAddNode,
55
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
56
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
57
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
58
    opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
59
    # instance lu
60
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
61
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
62
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
63
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
64
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
65
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
66
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
67
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
68
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
69
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
70
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
71
    opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
72
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
73
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
74
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
75
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
76
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
77
    # os lu
78
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
79
    # exports lu
80
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
81
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
82
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
83
    # tags lu
84
    opcodes.OpGetTags: cmdlib.LUGetTags,
85
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
86
    opcodes.OpAddTags: cmdlib.LUAddTags,
87
    opcodes.OpDelTags: cmdlib.LUDelTags,
88
    # test lu
89
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
90
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
91
    }
92

    
93
  def __init__(self, context):
94
    """Constructor for Processor
95

96
    Args:
97
     - feedback_fn: the feedback function (taking one string) to be run when
98
                    interesting events are happening
99
    """
100
    self.context = context
101
    self._feedback_fn = None
102
    self.exclusive_BGL = False
103
    self.rpc = rpc.RpcRunner(context.cfg)
104

    
105
  def _ExecLU(self, lu):
106
    """Logical Unit execution sequence.
107

108
    """
109
    write_count = self.context.cfg.write_count
110
    lu.CheckPrereq()
111
    hm = HooksMaster(self.rpc.call_hooks_runner, self, lu)
112
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
113
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
114
                     self._feedback_fn, None)
115
    try:
116
      result = lu.Exec(self._feedback_fn)
117
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
118
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
119
                                self._feedback_fn, result)
120
    finally:
121
      # FIXME: This needs locks if not lu_class.REQ_BGL
122
      if write_count != self.context.cfg.write_count:
123
        hm.RunConfigUpdate()
124

    
125
    return result
126

    
127
  def _LockAndExecLU(self, lu, level):
128
    """Execute a Logical Unit, with the needed locks.
129

130
    This is a recursive function that starts locking the given level, and
131
    proceeds up, till there are no more locks to acquire. Then it executes the
132
    given LU and its opcodes.
133

134
    """
135
    adding_locks = level in lu.add_locks
136
    acquiring_locks = level in lu.needed_locks
137
    if level not in locking.LEVELS:
138
      if callable(self._run_notifier):
139
        self._run_notifier()
140
      result = self._ExecLU(lu)
141
    elif adding_locks and acquiring_locks:
142
      # We could both acquire and add locks at the same level, but for now we
143
      # don't need this, so we'll avoid the complicated code needed.
144
      raise NotImplementedError(
145
        "Can't declare locks to acquire when adding others")
146
    elif adding_locks or acquiring_locks:
147
      lu.DeclareLocks(level)
148
      share = lu.share_locks[level]
149
      if acquiring_locks:
150
        needed_locks = lu.needed_locks[level]
151
        lu.acquired_locks[level] = self.context.glm.acquire(level,
152
                                                            needed_locks,
153
                                                            shared=share)
154
      else: # adding_locks
155
        add_locks = lu.add_locks[level]
156
        lu.remove_locks[level] = add_locks
157
        try:
158
          self.context.glm.add(level, add_locks, acquired=1, shared=share)
159
        except errors.LockError:
160
          raise errors.OpPrereqError(
161
            "Coudn't add locks (%s), probably because of a race condition"
162
            " with another job, who added them first" % add_locks)
163
      try:
164
        try:
165
          if adding_locks:
166
            lu.acquired_locks[level] = add_locks
167
          result = self._LockAndExecLU(lu, level + 1)
168
        finally:
169
          if level in lu.remove_locks:
170
            self.context.glm.remove(level, lu.remove_locks[level])
171
      finally:
172
        if self.context.glm.is_owned(level):
173
          self.context.glm.release(level)
174
    else:
175
      result = self._LockAndExecLU(lu, level + 1)
176

    
177
    return result
178

    
179
  def ExecOpCode(self, op, feedback_fn, run_notifier):
180
    """Execute an opcode.
181

182
    @type op: an OpCode instance
183
    @param op: the opcode to be executed
184
    @type feedback_fn: a function that takes a single argument
185
    @param feedback_fn: this function will be used as feedback from the LU
186
                        code to the end-user
187
    @type run_notifier: callable (no arguments) or None
188
    @param run_notifier:  this function (if callable) will be called when
189
                          we are about to call the lu's Exec() method, that
190
                          is, after we have aquired all locks
191

192
    """
193
    if not isinstance(op, opcodes.OpCode):
194
      raise errors.ProgrammerError("Non-opcode instance passed"
195
                                   " to ExecOpcode")
196

    
197
    self._feedback_fn = feedback_fn
198
    self._run_notifier = run_notifier
199
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
200
    if lu_class is None:
201
      raise errors.OpCodeUnknown("Unknown opcode")
202

    
203
    # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
204
    # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
205
    self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
206
                             shared=not lu_class.REQ_BGL)
207
    try:
208
      self.exclusive_BGL = lu_class.REQ_BGL
209
      lu = lu_class(self, op, self.context, self.rpc)
210
      lu.ExpandNames()
211
      assert lu.needed_locks is not None, "needed_locks not set by LU"
212
      result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
213
    finally:
214
      self.context.glm.release(locking.LEVEL_CLUSTER)
215
      self.exclusive_BGL = False
216

    
217
    return result
218

    
219
  def LogStep(self, current, total, message):
220
    """Log a change in LU execution progress.
221

222
    """
223
    logging.debug("Step %d/%d %s", current, total, message)
224
    self._feedback_fn("STEP %d/%d %s" % (current, total, message))
225

    
226
  def LogWarning(self, message, *args, **kwargs):
227
    """Log a warning to the logs and the user.
228

229
    The optional keyword argument is 'hint' and can be used to show a
230
    hint to the user (presumably related to the warning). If the
231
    message is empty, it will not be printed at all, allowing one to
232
    show only a hint.
233

234
    """
235
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
236
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
237
    if args:
238
      message = message % tuple(args)
239
    if message:
240
      logging.warning(message)
241
      self._feedback_fn(" - WARNING: %s" % message)
242
    if "hint" in kwargs:
243
      self._feedback_fn("      Hint: %s" % kwargs["hint"])
244

    
245
  def LogInfo(self, message, *args):
246
    """Log an informational message to the logs and the user.
247

248
    """
249
    if args:
250
      message = message % tuple(args)
251
    logging.info(message)
252
    self._feedback_fn(" - INFO: %s" % message)
253

    
254

    
255
class HooksMaster(object):
256
  """Hooks master.
257

258
  This class distributes the run commands to the nodes based on the
259
  specific LU class.
260

261
  In order to remove the direct dependency on the rpc module, the
262
  constructor needs a function which actually does the remote
263
  call. This will usually be rpc.call_hooks_runner, but any function
264
  which behaves the same works.
265

266
  """
267
  def __init__(self, callfn, proc, lu):
268
    self.callfn = callfn
269
    self.proc = proc
270
    self.lu = lu
271
    self.op = lu.op
272
    self.env, node_list_pre, node_list_post = self._BuildEnv()
273
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
274
                      constants.HOOKS_PHASE_POST: node_list_post}
275

    
276
  def _BuildEnv(self):
277
    """Compute the environment and the target nodes.
278

279
    Based on the opcode and the current node list, this builds the
280
    environment for the hooks and the target node list for the run.
281

282
    """
283
    env = {
284
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
285
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
286
      "GANETI_OP_CODE": self.op.OP_ID,
287
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
288
      "GANETI_DATA_DIR": constants.DATA_DIR,
289
      }
290

    
291
    if self.lu.HPATH is not None:
292
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
293
      if lu_env:
294
        for key in lu_env:
295
          env["GANETI_" + key] = lu_env[key]
296
    else:
297
      lu_nodes_pre = lu_nodes_post = []
298

    
299
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
300

    
301
  def _RunWrapper(self, node_list, hpath, phase):
302
    """Simple wrapper over self.callfn.
303

304
    This method fixes the environment before doing the rpc call.
305

306
    """
307
    env = self.env.copy()
308
    env["GANETI_HOOKS_PHASE"] = phase
309
    env["GANETI_HOOKS_PATH"] = hpath
310
    if self.lu.cfg is not None:
311
      env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
312
      env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
313

    
314
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
315

    
316
    return self.callfn(node_list, hpath, phase, env)
317

    
318
  def RunPhase(self, phase):
319
    """Run all the scripts for a phase.
320

321
    This is the main function of the HookMaster.
322

323
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
324
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
325
    @return: the processed results of the hooks multi-node rpc call
326
    @raise errors.HooksFailure: on communication failure to the nodes
327

328
    """
329
    if not self.node_list[phase]:
330
      # empty node list, we should not attempt to run this as either
331
      # we're in the cluster init phase and the rpc client part can't
332
      # even attempt to run, or this LU doesn't do hooks at all
333
      return
334
    hpath = self.lu.HPATH
335
    results = self._RunWrapper(self.node_list[phase], hpath, phase)
336
    if phase == constants.HOOKS_PHASE_PRE:
337
      errs = []
338
      if not results:
339
        raise errors.HooksFailure("Communication failure")
340
      for node_name in results:
341
        res = results[node_name]
342
        if res.failed or res.data is False or not isinstance(res.data, list):
343
          if not res.offline:
344
            self.proc.LogWarning("Communication failure to node %s" %
345
                                 node_name)
346
          continue
347
        for script, hkr, output in res.data:
348
          if hkr == constants.HKR_FAIL:
349
            errs.append((node_name, script, output))
350
      if errs:
351
        raise errors.HooksAbort(errs)
352
    return results
353

    
354
  def RunConfigUpdate(self):
355
    """Run the special configuration update hook
356

357
    This is a special hook that runs only on the master after each
358
    top-level LI if the configuration has been updated.
359

360
    """
361
    phase = constants.HOOKS_PHASE_POST
362
    hpath = constants.HOOKS_NAME_CFGUPDATE
363
    nodes = [self.lu.cfg.GetMasterNode()]
364
    results = self._RunWrapper(nodes, hpath, phase)