Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ a21dda8b

History | View | Annotate | Download (10.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31

    
32
from ganeti import opcodes
33
from ganeti import constants
34
from ganeti import errors
35
from ganeti import rpc
36
from ganeti import cmdlib
37
from ganeti import config
38
from ganeti import ssconf
39
from ganeti import logger
40

    
41

    
42
class Processor(object):
43
  """Object which runs OpCodes"""
44
  DISPATCH_TABLE = {
45
    # Cluster
46
    opcodes.OpInitCluster: cmdlib.LUInitCluster,
47
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
48
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
49
    opcodes.OpClusterCopyFile: cmdlib.LUClusterCopyFile,
50
    opcodes.OpRunClusterCommand: cmdlib.LURunClusterCommand,
51
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
52
    opcodes.OpMasterFailover: cmdlib.LUMasterFailover,
53
    opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
54
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
55
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
56
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
57
    # node lu
58
    opcodes.OpAddNode: cmdlib.LUAddNode,
59
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
60
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
61
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
62
    # instance lu
63
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
64
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
65
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
66
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
67
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
68
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
69
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
70
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
71
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
72
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
73
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
74
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
75
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
76
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
77
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
78
    # os lu
79
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
80
    # exports lu
81
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
82
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
83
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
84
    # tags lu
85
    opcodes.OpGetTags: cmdlib.LUGetTags,
86
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
87
    opcodes.OpAddTags: cmdlib.LUAddTags,
88
    opcodes.OpDelTags: cmdlib.LUDelTags,
89
    # test lu
90
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
91
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
92
    }
93

    
94
  def __init__(self, feedback=None):
95
    """Constructor for Processor
96

97
    Args:
98
     - feedback_fn: the feedback function (taking one string) to be run when
99
                    interesting events are happening
100
    """
101
    self.cfg = None
102
    self.sstore = None
103
    self._feedback_fn = feedback
104

    
105
  def ExecOpCode(self, op):
106
    """Execute an opcode.
107

108
    Args:
109
      op: the opcode to be executed
110

111
    """
112
    if not isinstance(op, opcodes.OpCode):
113
      raise errors.ProgrammerError("Non-opcode instance passed"
114
                                   " to ExecOpcode")
115

    
116
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
117
    if lu_class is None:
118
      raise errors.OpCodeUnknown("Unknown opcode")
119

    
120
    if lu_class.REQ_CLUSTER and self.cfg is None:
121
      self.cfg = config.ConfigWriter()
122
      self.sstore = ssconf.SimpleStore()
123
    if self.cfg is not None:
124
      write_count = self.cfg.write_count
125
    else:
126
      write_count = 0
127
    lu = lu_class(self, op, self.cfg, self.sstore)
128
    lu.CheckPrereq()
129
    hm = HooksMaster(rpc.call_hooks_runner, self, lu)
130
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
131
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
132
                     h_results, self._feedback_fn, None)
133
    try:
134
      result = lu.Exec(self._feedback_fn)
135
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
136
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
137
                       h_results, self._feedback_fn, result)
138
    finally:
139
      if lu.cfg is not None:
140
        # we use lu.cfg and not self.cfg as for init cluster, self.cfg
141
        # is None but lu.cfg has been recently initialized in the
142
        # lu.Exec method
143
        if write_count != lu.cfg.write_count:
144
          hm.RunConfigUpdate()
145

    
146
    return result
147

    
148
  def ChainOpCode(self, op):
149
    """Chain and execute an opcode.
150

151
    This is used by LUs when they need to execute a child LU.
152

153
    Args:
154
     - opcode: the opcode to be executed
155

156
    """
157
    if not isinstance(op, opcodes.OpCode):
158
      raise errors.ProgrammerError("Non-opcode instance passed"
159
                                   " to ExecOpcode")
160

    
161
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
162
    if lu_class is None:
163
      raise errors.OpCodeUnknown("Unknown opcode")
164

    
165
    if lu_class.REQ_CLUSTER and self.cfg is None:
166
      self.cfg = config.ConfigWriter()
167
      self.sstore = ssconf.SimpleStore()
168
    #do_hooks = lu_class.HPATH is not None
169
    lu = lu_class(self, op, self.cfg, self.sstore)
170
    lu.CheckPrereq()
171
    #if do_hooks:
172
    #  hm = HooksMaster(rpc.call_hooks_runner, self, lu)
173
    #  h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
174
    #  lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
175
    #                   h_results, self._feedback_fn, None)
176
    result = lu.Exec(self._feedback_fn)
177
    #if do_hooks:
178
    #  h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
179
    #  result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
180
    #                   h_results, self._feedback_fn, result)
181
    return result
182

    
183
  def LogStep(self, current, total, message):
184
    """Log a change in LU execution progress.
185

186
    """
187
    logger.Debug("Step %d/%d %s" % (current, total, message))
188
    self._feedback_fn("STEP %d/%d %s" % (current, total, message))
189

    
190
  def LogWarning(self, message, hint=None):
191
    """Log a warning to the logs and the user.
192

193
    """
194
    logger.Error(message)
195
    self._feedback_fn(" - WARNING: %s" % message)
196
    if hint:
197
      self._feedback_fn("      Hint: %s" % hint)
198

    
199
  def LogInfo(self, message):
200
    """Log an informational message to the logs and the user.
201

202
    """
203
    logger.Info(message)
204
    self._feedback_fn(" - INFO: %s" % message)
205

    
206

    
207
class HooksMaster(object):
208
  """Hooks master.
209

210
  This class distributes the run commands to the nodes based on the
211
  specific LU class.
212

213
  In order to remove the direct dependency on the rpc module, the
214
  constructor needs a function which actually does the remote
215
  call. This will usually be rpc.call_hooks_runner, but any function
216
  which behaves the same works.
217

218
  """
219
  def __init__(self, callfn, proc, lu):
220
    self.callfn = callfn
221
    self.proc = proc
222
    self.lu = lu
223
    self.op = lu.op
224
    self.env, node_list_pre, node_list_post = self._BuildEnv()
225
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
226
                      constants.HOOKS_PHASE_POST: node_list_post}
227

    
228
  def _BuildEnv(self):
229
    """Compute the environment and the target nodes.
230

231
    Based on the opcode and the current node list, this builds the
232
    environment for the hooks and the target node list for the run.
233

234
    """
235
    env = {
236
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
237
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
238
      "GANETI_OP_CODE": self.op.OP_ID,
239
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
240
      "GANETI_DATA_DIR": constants.DATA_DIR,
241
      }
242

    
243
    if self.lu.HPATH is not None:
244
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
245
      if lu_env:
246
        for key in lu_env:
247
          env["GANETI_" + key] = lu_env[key]
248
    else:
249
      lu_nodes_pre = lu_nodes_post = []
250

    
251
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
252

    
253
  def _RunWrapper(self, node_list, hpath, phase):
254
    """Simple wrapper over self.callfn.
255

256
    This method fixes the environment before doing the rpc call.
257

258
    """
259
    env = self.env.copy()
260
    env["GANETI_HOOKS_PHASE"] = phase
261
    env["GANETI_HOOKS_PATH"] = hpath
262
    if self.lu.sstore is not None:
263
      env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
264
      env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
265

    
266
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
267

    
268
    return self.callfn(node_list, hpath, phase, env)
269

    
270
  def RunPhase(self, phase):
271
    """Run all the scripts for a phase.
272

273
    This is the main function of the HookMaster.
274

275
    Args:
276
      phase: the hooks phase to run
277

278
    Returns:
279
      the result of the hooks multi-node rpc call
280

281
    """
282
    if not self.node_list[phase]:
283
      # empty node list, we should not attempt to run this as either
284
      # we're in the cluster init phase and the rpc client part can't
285
      # even attempt to run, or this LU doesn't do hooks at all
286
      return
287
    hpath = self.lu.HPATH
288
    results = self._RunWrapper(self.node_list[phase], hpath, phase)
289
    if phase == constants.HOOKS_PHASE_PRE:
290
      errs = []
291
      if not results:
292
        raise errors.HooksFailure("Communication failure")
293
      for node_name in results:
294
        res = results[node_name]
295
        if res is False or not isinstance(res, list):
296
          self.proc.LogWarning("Communication failure to node %s" % node_name)
297
          continue
298
        for script, hkr, output in res:
299
          if hkr == constants.HKR_FAIL:
300
            output = output.strip().encode("string_escape")
301
            errs.append((node_name, script, output))
302
      if errs:
303
        raise errors.HooksAbort(errs)
304
    return results
305

    
306
  def RunConfigUpdate(self):
307
    """Run the special configuration update hook
308

309
    This is a special hook that runs only on the master after each
310
    top-level LI if the configuration has been updated.
311

312
    """
313
    phase = constants.HOOKS_PHASE_POST
314
    hpath = constants.HOOKS_NAME_CFGUPDATE
315
    if self.lu.sstore is None:
316
      raise errors.ProgrammerError("Null sstore on config update hook")
317
    nodes = [self.lu.sstore.GetMasterNode()]
318
    results = self._RunWrapper(nodes, hpath, phase)