Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 1c901d13

History | View | Annotate | Download (10.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31

    
32
from ganeti import opcodes
33
from ganeti import constants
34
from ganeti import errors
35
from ganeti import rpc
36
from ganeti import cmdlib
37
from ganeti import config
38
from ganeti import ssconf
39
from ganeti import logger
40

    
41

    
42
class Processor(object):
43
  """Object which runs OpCodes"""
44
  DISPATCH_TABLE = {
45
    # Cluster
46
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
47
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
48
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
49
    opcodes.OpMasterFailover: cmdlib.LUMasterFailover,
50
    opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
51
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
52
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
53
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
54
    # node lu
55
    opcodes.OpAddNode: cmdlib.LUAddNode,
56
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
57
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
58
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
59
    # instance lu
60
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
61
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
62
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
63
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
64
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
65
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
66
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
67
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
68
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
69
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
70
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
71
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
72
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
73
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
74
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
75
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
76
    # os lu
77
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
78
    # exports lu
79
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
80
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
81
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
82
    # tags lu
83
    opcodes.OpGetTags: cmdlib.LUGetTags,
84
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
85
    opcodes.OpAddTags: cmdlib.LUAddTags,
86
    opcodes.OpDelTags: cmdlib.LUDelTags,
87
    # test lu
88
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
89
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
90
    }
91

    
92
  def __init__(self, context, feedback=None):
93
    """Constructor for Processor
94

95
    Args:
96
     - feedback_fn: the feedback function (taking one string) to be run when
97
                    interesting events are happening
98
    """
99
    self.context = context
100
    self._feedback_fn = feedback
101

    
102
  def ExecOpCode(self, op):
103
    """Execute an opcode.
104

105
    Args:
106
      op: the opcode to be executed
107

108
    """
109
    if not isinstance(op, opcodes.OpCode):
110
      raise errors.ProgrammerError("Non-opcode instance passed"
111
                                   " to ExecOpcode")
112

    
113
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
114
    if lu_class is None:
115
      raise errors.OpCodeUnknown("Unknown opcode")
116

    
117
    if lu_class.REQ_WSSTORE:
118
      sstore = ssconf.WritableSimpleStore()
119
    else:
120
      sstore = ssconf.SimpleStore()
121

    
122
    write_count = self.context.cfg.write_count
123
    lu = lu_class(self, op, self.context.cfg, sstore)
124
    lu.CheckPrereq()
125
    hm = HooksMaster(rpc.call_hooks_runner, self, lu)
126
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
127
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
128
                     h_results, self._feedback_fn, None)
129
    try:
130
      result = lu.Exec(self._feedback_fn)
131
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
132
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
133
                       h_results, self._feedback_fn, result)
134
    finally:
135
      if lu.cfg is not None:
136
        # we use lu.cfg and not self.cfg as for init cluster, self.cfg
137
        # is None but lu.cfg has been recently initialized in the
138
        # lu.Exec method
139
        if write_count != lu.cfg.write_count:
140
          hm.RunConfigUpdate()
141

    
142
    return result
143

    
144
  def ChainOpCode(self, op):
145
    """Chain and execute an opcode.
146

147
    This is used by LUs when they need to execute a child LU.
148

149
    Args:
150
     - opcode: the opcode to be executed
151

152
    """
153
    if not isinstance(op, opcodes.OpCode):
154
      raise errors.ProgrammerError("Non-opcode instance passed"
155
                                   " to ExecOpcode")
156

    
157
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
158
    if lu_class is None:
159
      raise errors.OpCodeUnknown("Unknown opcode")
160

    
161
    if lu_class.REQ_WSSTORE:
162
      sstore = ssconf.WritableSimpleStore()
163
    else:
164
      sstore = ssconf.SimpleStore()
165

    
166
    #do_hooks = lu_class.HPATH is not None
167
    lu = lu_class(self, op, self.context.cfg, sstore)
168
    lu.CheckPrereq()
169
    #if do_hooks:
170
    #  hm = HooksMaster(rpc.call_hooks_runner, self, lu)
171
    #  h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
172
    #  lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
173
    #                   h_results, self._feedback_fn, None)
174
    result = lu.Exec(self._feedback_fn)
175
    #if do_hooks:
176
    #  h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
177
    #  result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
178
    #                   h_results, self._feedback_fn, result)
179
    return result
180

    
181
  def LogStep(self, current, total, message):
182
    """Log a change in LU execution progress.
183

184
    """
185
    logger.Debug("Step %d/%d %s" % (current, total, message))
186
    self._feedback_fn("STEP %d/%d %s" % (current, total, message))
187

    
188
  def LogWarning(self, message, hint=None):
189
    """Log a warning to the logs and the user.
190

191
    """
192
    logger.Error(message)
193
    self._feedback_fn(" - WARNING: %s" % message)
194
    if hint:
195
      self._feedback_fn("      Hint: %s" % hint)
196

    
197
  def LogInfo(self, message):
198
    """Log an informational message to the logs and the user.
199

200
    """
201
    logger.Info(message)
202
    self._feedback_fn(" - INFO: %s" % message)
203

    
204

    
205
class HooksMaster(object):
206
  """Hooks master.
207

208
  This class distributes the run commands to the nodes based on the
209
  specific LU class.
210

211
  In order to remove the direct dependency on the rpc module, the
212
  constructor needs a function which actually does the remote
213
  call. This will usually be rpc.call_hooks_runner, but any function
214
  which behaves the same works.
215

216
  """
217
  def __init__(self, callfn, proc, lu):
218
    self.callfn = callfn
219
    self.proc = proc
220
    self.lu = lu
221
    self.op = lu.op
222
    self.env, node_list_pre, node_list_post = self._BuildEnv()
223
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
224
                      constants.HOOKS_PHASE_POST: node_list_post}
225

    
226
  def _BuildEnv(self):
227
    """Compute the environment and the target nodes.
228

229
    Based on the opcode and the current node list, this builds the
230
    environment for the hooks and the target node list for the run.
231

232
    """
233
    env = {
234
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
235
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
236
      "GANETI_OP_CODE": self.op.OP_ID,
237
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
238
      "GANETI_DATA_DIR": constants.DATA_DIR,
239
      }
240

    
241
    if self.lu.HPATH is not None:
242
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
243
      if lu_env:
244
        for key in lu_env:
245
          env["GANETI_" + key] = lu_env[key]
246
    else:
247
      lu_nodes_pre = lu_nodes_post = []
248

    
249
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
250

    
251
  def _RunWrapper(self, node_list, hpath, phase):
252
    """Simple wrapper over self.callfn.
253

254
    This method fixes the environment before doing the rpc call.
255

256
    """
257
    env = self.env.copy()
258
    env["GANETI_HOOKS_PHASE"] = phase
259
    env["GANETI_HOOKS_PATH"] = hpath
260
    if self.lu.sstore is not None:
261
      env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
262
      env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
263

    
264
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
265

    
266
    return self.callfn(node_list, hpath, phase, env)
267

    
268
  def RunPhase(self, phase):
269
    """Run all the scripts for a phase.
270

271
    This is the main function of the HookMaster.
272

273
    Args:
274
      phase: the hooks phase to run
275

276
    Returns:
277
      the result of the hooks multi-node rpc call
278

279
    """
280
    if not self.node_list[phase]:
281
      # empty node list, we should not attempt to run this as either
282
      # we're in the cluster init phase and the rpc client part can't
283
      # even attempt to run, or this LU doesn't do hooks at all
284
      return
285
    hpath = self.lu.HPATH
286
    results = self._RunWrapper(self.node_list[phase], hpath, phase)
287
    if phase == constants.HOOKS_PHASE_PRE:
288
      errs = []
289
      if not results:
290
        raise errors.HooksFailure("Communication failure")
291
      for node_name in results:
292
        res = results[node_name]
293
        if res is False or not isinstance(res, list):
294
          self.proc.LogWarning("Communication failure to node %s" % node_name)
295
          continue
296
        for script, hkr, output in res:
297
          if hkr == constants.HKR_FAIL:
298
            output = output.strip().encode("string_escape")
299
            errs.append((node_name, script, output))
300
      if errs:
301
        raise errors.HooksAbort(errs)
302
    return results
303

    
304
  def RunConfigUpdate(self):
305
    """Run the special configuration update hook
306

307
    This is a special hook that runs only on the master after each
308
    top-level LI if the configuration has been updated.
309

310
    """
311
    phase = constants.HOOKS_PHASE_POST
312
    hpath = constants.HOOKS_NAME_CFGUPDATE
313
    if self.lu.sstore is None:
314
      raise errors.ProgrammerError("Null sstore on config update hook")
315
    nodes = [self.lu.sstore.GetMasterNode()]
316
    results = self._RunWrapper(nodes, hpath, phase)