Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 0b38cf6e

History | View | Annotate | Download (11.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31

    
32
from ganeti import opcodes
33
from ganeti import constants
34
from ganeti import errors
35
from ganeti import rpc
36
from ganeti import cmdlib
37
from ganeti import logger
38
from ganeti import locking
39

    
40

    
41
class Processor(object):
42
  """Object which runs OpCodes"""
43
  DISPATCH_TABLE = {
44
    # Cluster
45
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
46
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
47
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
48
    opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
49
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
50
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
51
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
52
    # node lu
53
    opcodes.OpAddNode: cmdlib.LUAddNode,
54
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
55
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
56
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
57
    # instance lu
58
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
59
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
60
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
61
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
62
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
63
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
64
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
65
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
66
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
67
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
68
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
69
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
70
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
71
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
72
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
73
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
74
    # os lu
75
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
76
    # exports lu
77
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
78
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
79
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
80
    # tags lu
81
    opcodes.OpGetTags: cmdlib.LUGetTags,
82
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
83
    opcodes.OpAddTags: cmdlib.LUAddTags,
84
    opcodes.OpDelTags: cmdlib.LUDelTags,
85
    # test lu
86
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
87
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
88
    }
89

    
90
  def __init__(self, context):
91
    """Constructor for Processor
92

93
    Args:
94
     - feedback_fn: the feedback function (taking one string) to be run when
95
                    interesting events are happening
96
    """
97
    self.context = context
98
    self._feedback_fn = None
99
    self.exclusive_BGL = False
100

    
101
  def _ExecLU(self, lu):
102
    """Logical Unit execution sequence.
103

104
    """
105
    write_count = self.context.cfg.write_count
106
    lu.CheckPrereq()
107
    hm = HooksMaster(rpc.call_hooks_runner, self, lu)
108
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
109
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
110
                     self._feedback_fn, None)
111
    try:
112
      result = lu.Exec(self._feedback_fn)
113
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
114
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
115
                                self._feedback_fn, result)
116
    finally:
117
      # FIXME: This needs locks if not lu_class.REQ_BGL
118
      if write_count != self.context.cfg.write_count:
119
        hm.RunConfigUpdate()
120

    
121
    return result
122

    
123
  def _LockAndExecLU(self, lu, level):
124
    """Execute a Logical Unit, with the needed locks.
125

126
    This is a recursive function that starts locking the given level, and
127
    proceeds up, till there are no more locks to acquire. Then it executes the
128
    given LU and its opcodes.
129

130
    """
131
    adding_locks = level in lu.add_locks
132
    acquiring_locks = level in lu.needed_locks
133
    if level not in locking.LEVELS:
134
      result = self._ExecLU(lu)
135
    elif adding_locks and acquiring_locks:
136
      # We could both acquire and add locks at the same level, but for now we
137
      # don't need this, so we'll avoid the complicated code needed.
138
      raise NotImplementedError(
139
        "Can't declare locks to acquire when adding others")
140
    elif adding_locks or acquiring_locks:
141
      lu.DeclareLocks(level)
142
      share = lu.share_locks[level]
143
      if acquiring_locks:
144
        needed_locks = lu.needed_locks[level]
145
        lu.acquired_locks[level] = self.context.glm.acquire(level,
146
                                                            needed_locks,
147
                                                            shared=share)
148
      else: # adding_locks
149
        add_locks = lu.add_locks[level]
150
        lu.remove_locks[level] = add_locks
151
        try:
152
          self.context.glm.add(level, add_locks, acquired=1, shared=share)
153
        except errors.LockError:
154
          raise errors.OpPrereqError(
155
            "Coudn't add locks (%s), probably because of a race condition"
156
            " with another job, who added them first" % add_locks)
157
      try:
158
        try:
159
          if adding_locks:
160
            lu.acquired_locks[level] = add_locks
161
          result = self._LockAndExecLU(lu, level + 1)
162
        finally:
163
          if level in lu.remove_locks:
164
            self.context.glm.remove(level, lu.remove_locks[level])
165
      finally:
166
        if self.context.glm.is_owned(level):
167
          self.context.glm.release(level)
168
    else:
169
      result = self._LockAndExecLU(lu, level + 1)
170

    
171
    return result
172

    
173
  def ExecOpCode(self, op, feedback_fn):
174
    """Execute an opcode.
175

176
    Args:
177
      op: the opcode to be executed
178

179
    """
180
    if not isinstance(op, opcodes.OpCode):
181
      raise errors.ProgrammerError("Non-opcode instance passed"
182
                                   " to ExecOpcode")
183

    
184
    self._feedback_fn = feedback_fn
185
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
186
    if lu_class is None:
187
      raise errors.OpCodeUnknown("Unknown opcode")
188

    
189
    # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
190
    # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
191
    self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
192
                             shared=not lu_class.REQ_BGL)
193
    try:
194
      self.exclusive_BGL = lu_class.REQ_BGL
195
      lu = lu_class(self, op, self.context)
196
      lu.ExpandNames()
197
      assert lu.needed_locks is not None, "needed_locks not set by LU"
198
      result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
199
    finally:
200
      self.context.glm.release(locking.LEVEL_CLUSTER)
201
      self.exclusive_BGL = False
202

    
203
    return result
204

    
205
  def LogStep(self, current, total, message):
206
    """Log a change in LU execution progress.
207

208
    """
209
    logger.Debug("Step %d/%d %s" % (current, total, message))
210
    self._feedback_fn("STEP %d/%d %s" % (current, total, message))
211

    
212
  def LogWarning(self, message, hint=None):
213
    """Log a warning to the logs and the user.
214

215
    """
216
    logger.Error(message)
217
    self._feedback_fn(" - WARNING: %s" % message)
218
    if hint:
219
      self._feedback_fn("      Hint: %s" % hint)
220

    
221
  def LogInfo(self, message):
222
    """Log an informational message to the logs and the user.
223

224
    """
225
    logger.Info(message)
226
    self._feedback_fn(" - INFO: %s" % message)
227

    
228

    
229
class HooksMaster(object):
230
  """Hooks master.
231

232
  This class distributes the run commands to the nodes based on the
233
  specific LU class.
234

235
  In order to remove the direct dependency on the rpc module, the
236
  constructor needs a function which actually does the remote
237
  call. This will usually be rpc.call_hooks_runner, but any function
238
  which behaves the same works.
239

240
  """
241
  def __init__(self, callfn, proc, lu):
242
    self.callfn = callfn
243
    self.proc = proc
244
    self.lu = lu
245
    self.op = lu.op
246
    self.env, node_list_pre, node_list_post = self._BuildEnv()
247
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
248
                      constants.HOOKS_PHASE_POST: node_list_post}
249

    
250
  def _BuildEnv(self):
251
    """Compute the environment and the target nodes.
252

253
    Based on the opcode and the current node list, this builds the
254
    environment for the hooks and the target node list for the run.
255

256
    """
257
    env = {
258
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
259
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
260
      "GANETI_OP_CODE": self.op.OP_ID,
261
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
262
      "GANETI_DATA_DIR": constants.DATA_DIR,
263
      }
264

    
265
    if self.lu.HPATH is not None:
266
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
267
      if lu_env:
268
        for key in lu_env:
269
          env["GANETI_" + key] = lu_env[key]
270
    else:
271
      lu_nodes_pre = lu_nodes_post = []
272

    
273
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
274

    
275
  def _RunWrapper(self, node_list, hpath, phase):
276
    """Simple wrapper over self.callfn.
277

278
    This method fixes the environment before doing the rpc call.
279

280
    """
281
    env = self.env.copy()
282
    env["GANETI_HOOKS_PHASE"] = phase
283
    env["GANETI_HOOKS_PATH"] = hpath
284
    if self.lu.cfg is not None:
285
      env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
286
      env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
287

    
288
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
289

    
290
    return self.callfn(node_list, hpath, phase, env)
291

    
292
  def RunPhase(self, phase):
293
    """Run all the scripts for a phase.
294

295
    This is the main function of the HookMaster.
296

297
    Args:
298
      phase: the hooks phase to run
299

300
    Returns:
301
      the result of the hooks multi-node rpc call
302

303
    """
304
    if not self.node_list[phase]:
305
      # empty node list, we should not attempt to run this as either
306
      # we're in the cluster init phase and the rpc client part can't
307
      # even attempt to run, or this LU doesn't do hooks at all
308
      return
309
    hpath = self.lu.HPATH
310
    results = self._RunWrapper(self.node_list[phase], hpath, phase)
311
    if phase == constants.HOOKS_PHASE_PRE:
312
      errs = []
313
      if not results:
314
        raise errors.HooksFailure("Communication failure")
315
      for node_name in results:
316
        res = results[node_name]
317
        if res is False or not isinstance(res, list):
318
          self.proc.LogWarning("Communication failure to node %s" % node_name)
319
          continue
320
        for script, hkr, output in res:
321
          if hkr == constants.HKR_FAIL:
322
            output = output.strip().encode("string_escape")
323
            errs.append((node_name, script, output))
324
      if errs:
325
        raise errors.HooksAbort(errs)
326
    return results
327

    
328
  def RunConfigUpdate(self):
329
    """Run the special configuration update hook
330

331
    This is a special hook that runs only on the master after each
332
    top-level LI if the configuration has been updated.
333

334
    """
335
    phase = constants.HOOKS_PHASE_POST
336
    hpath = constants.HOOKS_NAME_CFGUPDATE
337
    nodes = [self.lu.cfg.GetMasterNode()]
338
    results = self._RunWrapper(nodes, hpath, phase)