Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 36c381d7

History | View | Annotate | Download (11.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31

    
32
from ganeti import opcodes
33
from ganeti import constants
34
from ganeti import errors
35
from ganeti import rpc
36
from ganeti import cmdlib
37
from ganeti import config
38
from ganeti import ssconf
39
from ganeti import logger
40
from ganeti import locking
41

    
42

    
43
class Processor(object):
44
  """Object which runs OpCodes"""
45
  DISPATCH_TABLE = {
46
    # Cluster
47
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
48
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
49
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
50
    opcodes.OpMasterFailover: cmdlib.LUMasterFailover,
51
    opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
52
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
53
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
54
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
55
    # node lu
56
    opcodes.OpAddNode: cmdlib.LUAddNode,
57
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
58
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
59
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
60
    # instance lu
61
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
62
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
63
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
64
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
65
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
66
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
67
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
68
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
69
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
70
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
71
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
72
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
73
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
74
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
75
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
76
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
77
    # os lu
78
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
79
    # exports lu
80
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
81
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
82
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
83
    # tags lu
84
    opcodes.OpGetTags: cmdlib.LUGetTags,
85
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
86
    opcodes.OpAddTags: cmdlib.LUAddTags,
87
    opcodes.OpDelTags: cmdlib.LUDelTags,
88
    # test lu
89
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
90
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
91
    }
92

    
93
  def __init__(self, context, feedback=None):
94
    """Constructor for Processor
95

96
    Args:
97
     - feedback_fn: the feedback function (taking one string) to be run when
98
                    interesting events are happening
99
    """
100
    self.context = context
101
    self._feedback_fn = feedback
102
    self.exclusive_BGL = False
103

    
104
  def _ExecLU(self, lu):
105
    """Logical Unit execution sequence.
106

107
    """
108
    write_count = self.context.cfg.write_count
109
    lu.CheckPrereq()
110
    hm = HooksMaster(rpc.call_hooks_runner, self, lu)
111
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
112
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
113
                     self._feedback_fn, None)
114
    try:
115
      result = lu.Exec(self._feedback_fn)
116
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
117
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
118
                                self._feedback_fn, result)
119
    finally:
120
      # FIXME: This needs locks if not lu_class.REQ_BGL
121
      if write_count != self.context.cfg.write_count:
122
        hm.RunConfigUpdate()
123

    
124
    return result
125

    
126
  def ExecOpCode(self, op):
127
    """Execute an opcode.
128

129
    Args:
130
      op: the opcode to be executed
131

132
    """
133
    if not isinstance(op, opcodes.OpCode):
134
      raise errors.ProgrammerError("Non-opcode instance passed"
135
                                   " to ExecOpcode")
136

    
137
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
138
    if lu_class is None:
139
      raise errors.OpCodeUnknown("Unknown opcode")
140

    
141
    if lu_class.REQ_WSSTORE:
142
      sstore = ssconf.WritableSimpleStore()
143
    else:
144
      sstore = ssconf.SimpleStore()
145

    
146
    # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
147
    # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
148
    self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
149
                             shared=not lu_class.REQ_BGL)
150
    try:
151
      self.exclusive_BGL = lu_class.REQ_BGL
152
      lu = lu_class(self, op, self.context, sstore)
153
      result = self._ExecLU(lu)
154
    finally:
155
      self.context.glm.release(locking.LEVEL_CLUSTER)
156
      self.exclusive_BGL = False
157

    
158
    return result
159

    
160
  def ChainOpCode(self, op):
161
    """Chain and execute an opcode.
162

163
    This is used by LUs when they need to execute a child LU.
164

165
    Args:
166
     - opcode: the opcode to be executed
167

168
    """
169
    if not isinstance(op, opcodes.OpCode):
170
      raise errors.ProgrammerError("Non-opcode instance passed"
171
                                   " to ExecOpcode")
172

    
173
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
174
    if lu_class is None:
175
      raise errors.OpCodeUnknown("Unknown opcode")
176

    
177
    if lu_class.REQ_BGL and not self.exclusive_BGL:
178
      raise errors.ProgrammerError("LUs which require the BGL cannot"
179
                                   " be chained to granular ones.")
180

    
181
    if lu_class.REQ_WSSTORE:
182
      sstore = ssconf.WritableSimpleStore()
183
    else:
184
      sstore = ssconf.SimpleStore()
185

    
186
    #do_hooks = lu_class.HPATH is not None
187
    lu = lu_class(self, op, self.context, sstore)
188
    lu.CheckPrereq()
189
    #if do_hooks:
190
    #  hm = HooksMaster(rpc.call_hooks_runner, self, lu)
191
    #  h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
192
    #  lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
193
    #                   h_results, self._feedback_fn, None)
194
    result = lu.Exec(self._feedback_fn)
195
    #if do_hooks:
196
    #  h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
197
    #  result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
198
    #                   h_results, self._feedback_fn, result)
199
    return result
200

    
201
  def LogStep(self, current, total, message):
202
    """Log a change in LU execution progress.
203

204
    """
205
    logger.Debug("Step %d/%d %s" % (current, total, message))
206
    self._feedback_fn("STEP %d/%d %s" % (current, total, message))
207

    
208
  def LogWarning(self, message, hint=None):
209
    """Log a warning to the logs and the user.
210

211
    """
212
    logger.Error(message)
213
    self._feedback_fn(" - WARNING: %s" % message)
214
    if hint:
215
      self._feedback_fn("      Hint: %s" % hint)
216

    
217
  def LogInfo(self, message):
218
    """Log an informational message to the logs and the user.
219

220
    """
221
    logger.Info(message)
222
    self._feedback_fn(" - INFO: %s" % message)
223

    
224

    
225
class HooksMaster(object):
226
  """Hooks master.
227

228
  This class distributes the run commands to the nodes based on the
229
  specific LU class.
230

231
  In order to remove the direct dependency on the rpc module, the
232
  constructor needs a function which actually does the remote
233
  call. This will usually be rpc.call_hooks_runner, but any function
234
  which behaves the same works.
235

236
  """
237
  def __init__(self, callfn, proc, lu):
238
    self.callfn = callfn
239
    self.proc = proc
240
    self.lu = lu
241
    self.op = lu.op
242
    self.env, node_list_pre, node_list_post = self._BuildEnv()
243
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
244
                      constants.HOOKS_PHASE_POST: node_list_post}
245

    
246
  def _BuildEnv(self):
247
    """Compute the environment and the target nodes.
248

249
    Based on the opcode and the current node list, this builds the
250
    environment for the hooks and the target node list for the run.
251

252
    """
253
    env = {
254
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
255
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
256
      "GANETI_OP_CODE": self.op.OP_ID,
257
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
258
      "GANETI_DATA_DIR": constants.DATA_DIR,
259
      }
260

    
261
    if self.lu.HPATH is not None:
262
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
263
      if lu_env:
264
        for key in lu_env:
265
          env["GANETI_" + key] = lu_env[key]
266
    else:
267
      lu_nodes_pre = lu_nodes_post = []
268

    
269
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
270

    
271
  def _RunWrapper(self, node_list, hpath, phase):
272
    """Simple wrapper over self.callfn.
273

274
    This method fixes the environment before doing the rpc call.
275

276
    """
277
    env = self.env.copy()
278
    env["GANETI_HOOKS_PHASE"] = phase
279
    env["GANETI_HOOKS_PATH"] = hpath
280
    if self.lu.sstore is not None:
281
      env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
282
      env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
283

    
284
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
285

    
286
    return self.callfn(node_list, hpath, phase, env)
287

    
288
  def RunPhase(self, phase):
289
    """Run all the scripts for a phase.
290

291
    This is the main function of the HookMaster.
292

293
    Args:
294
      phase: the hooks phase to run
295

296
    Returns:
297
      the result of the hooks multi-node rpc call
298

299
    """
300
    if not self.node_list[phase]:
301
      # empty node list, we should not attempt to run this as either
302
      # we're in the cluster init phase and the rpc client part can't
303
      # even attempt to run, or this LU doesn't do hooks at all
304
      return
305
    hpath = self.lu.HPATH
306
    results = self._RunWrapper(self.node_list[phase], hpath, phase)
307
    if phase == constants.HOOKS_PHASE_PRE:
308
      errs = []
309
      if not results:
310
        raise errors.HooksFailure("Communication failure")
311
      for node_name in results:
312
        res = results[node_name]
313
        if res is False or not isinstance(res, list):
314
          self.proc.LogWarning("Communication failure to node %s" % node_name)
315
          continue
316
        for script, hkr, output in res:
317
          if hkr == constants.HKR_FAIL:
318
            output = output.strip().encode("string_escape")
319
            errs.append((node_name, script, output))
320
      if errs:
321
        raise errors.HooksAbort(errs)
322
    return results
323

    
324
  def RunConfigUpdate(self):
325
    """Run the special configuration update hook
326

327
    This is a special hook that runs only on the master after each
328
    top-level LI if the configuration has been updated.
329

330
    """
331
    phase = constants.HOOKS_PHASE_POST
332
    hpath = constants.HOOKS_NAME_CFGUPDATE
333
    if self.lu.sstore is None:
334
      raise errors.ProgrammerError("Null sstore on config update hook")
335
    nodes = [self.lu.sstore.GetMasterNode()]
336
    results = self._RunWrapper(nodes, hpath, phase)