Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 05f86716

History | View | Annotate | Download (10.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31

    
32
from ganeti import opcodes
33
from ganeti import constants
34
from ganeti import errors
35
from ganeti import rpc
36
from ganeti import cmdlib
37
from ganeti import config
38
from ganeti import ssconf
39
from ganeti import logger
40

    
41

    
42
class Processor(object):
43
  """Object which runs OpCodes"""
44
  DISPATCH_TABLE = {
45
    # Cluster
46
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
47
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
48
    opcodes.OpClusterCopyFile: cmdlib.LUClusterCopyFile,
49
    opcodes.OpRunClusterCommand: cmdlib.LURunClusterCommand,
50
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
51
    opcodes.OpMasterFailover: cmdlib.LUMasterFailover,
52
    opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
53
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
54
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
55
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
56
    # node lu
57
    opcodes.OpAddNode: cmdlib.LUAddNode,
58
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
59
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
60
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
61
    # instance lu
62
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
63
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
64
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
65
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
66
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
67
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
68
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
69
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
70
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
71
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
72
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
73
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
74
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
75
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
76
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
77
    # os lu
78
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
79
    # exports lu
80
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
81
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
82
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
83
    # tags lu
84
    opcodes.OpGetTags: cmdlib.LUGetTags,
85
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
86
    opcodes.OpAddTags: cmdlib.LUAddTags,
87
    opcodes.OpDelTags: cmdlib.LUDelTags,
88
    # test lu
89
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
90
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
91
    }
92

    
93
  def __init__(self, feedback=None):
94
    """Constructor for Processor
95

96
    Args:
97
     - feedback_fn: the feedback function (taking one string) to be run when
98
                    interesting events are happening
99
    """
100
    self.cfg = None
101
    self.sstore = None
102
    self._feedback_fn = feedback
103

    
104
  def ExecOpCode(self, op):
105
    """Execute an opcode.
106

107
    Args:
108
      op: the opcode to be executed
109

110
    """
111
    if not isinstance(op, opcodes.OpCode):
112
      raise errors.ProgrammerError("Non-opcode instance passed"
113
                                   " to ExecOpcode")
114

    
115
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
116
    if lu_class is None:
117
      raise errors.OpCodeUnknown("Unknown opcode")
118

    
119
    if self.cfg is None:
120
      self.cfg = config.ConfigWriter()
121
      if lu_class.REQ_WSSTORE:
122
        self.sstore = ssconf.WritableSimpleStore()
123
      else:
124
        self.sstore = ssconf.SimpleStore()
125
    if self.cfg is not None:
126
      write_count = self.cfg.write_count
127
    else:
128
      write_count = 0
129
    lu = lu_class(self, op, self.cfg, self.sstore)
130
    lu.CheckPrereq()
131
    hm = HooksMaster(rpc.call_hooks_runner, self, lu)
132
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
133
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
134
                     h_results, self._feedback_fn, None)
135
    try:
136
      result = lu.Exec(self._feedback_fn)
137
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
138
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
139
                       h_results, self._feedback_fn, result)
140
    finally:
141
      if lu.cfg is not None:
142
        # we use lu.cfg and not self.cfg as for init cluster, self.cfg
143
        # is None but lu.cfg has been recently initialized in the
144
        # lu.Exec method
145
        if write_count != lu.cfg.write_count:
146
          hm.RunConfigUpdate()
147

    
148
    return result
149

    
150
  def ChainOpCode(self, op):
151
    """Chain and execute an opcode.
152

153
    This is used by LUs when they need to execute a child LU.
154

155
    Args:
156
     - opcode: the opcode to be executed
157

158
    """
159
    if not isinstance(op, opcodes.OpCode):
160
      raise errors.ProgrammerError("Non-opcode instance passed"
161
                                   " to ExecOpcode")
162

    
163
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
164
    if lu_class is None:
165
      raise errors.OpCodeUnknown("Unknown opcode")
166

    
167
    if self.cfg is None:
168
      self.cfg = config.ConfigWriter()
169
      self.sstore = ssconf.SimpleStore()
170
    #do_hooks = lu_class.HPATH is not None
171
    lu = lu_class(self, op, self.cfg, self.sstore)
172
    lu.CheckPrereq()
173
    #if do_hooks:
174
    #  hm = HooksMaster(rpc.call_hooks_runner, self, lu)
175
    #  h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
176
    #  lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
177
    #                   h_results, self._feedback_fn, None)
178
    result = lu.Exec(self._feedback_fn)
179
    #if do_hooks:
180
    #  h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
181
    #  result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
182
    #                   h_results, self._feedback_fn, result)
183
    return result
184

    
185
  def LogStep(self, current, total, message):
186
    """Log a change in LU execution progress.
187

188
    """
189
    logger.Debug("Step %d/%d %s" % (current, total, message))
190
    self._feedback_fn("STEP %d/%d %s" % (current, total, message))
191

    
192
  def LogWarning(self, message, hint=None):
193
    """Log a warning to the logs and the user.
194

195
    """
196
    logger.Error(message)
197
    self._feedback_fn(" - WARNING: %s" % message)
198
    if hint:
199
      self._feedback_fn("      Hint: %s" % hint)
200

    
201
  def LogInfo(self, message):
202
    """Log an informational message to the logs and the user.
203

204
    """
205
    logger.Info(message)
206
    self._feedback_fn(" - INFO: %s" % message)
207

    
208

    
209
class HooksMaster(object):
210
  """Hooks master.
211

212
  This class distributes the run commands to the nodes based on the
213
  specific LU class.
214

215
  In order to remove the direct dependency on the rpc module, the
216
  constructor needs a function which actually does the remote
217
  call. This will usually be rpc.call_hooks_runner, but any function
218
  which behaves the same works.
219

220
  """
221
  def __init__(self, callfn, proc, lu):
222
    self.callfn = callfn
223
    self.proc = proc
224
    self.lu = lu
225
    self.op = lu.op
226
    self.env, node_list_pre, node_list_post = self._BuildEnv()
227
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
228
                      constants.HOOKS_PHASE_POST: node_list_post}
229

    
230
  def _BuildEnv(self):
231
    """Compute the environment and the target nodes.
232

233
    Based on the opcode and the current node list, this builds the
234
    environment for the hooks and the target node list for the run.
235

236
    """
237
    env = {
238
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
239
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
240
      "GANETI_OP_CODE": self.op.OP_ID,
241
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
242
      "GANETI_DATA_DIR": constants.DATA_DIR,
243
      }
244

    
245
    if self.lu.HPATH is not None:
246
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
247
      if lu_env:
248
        for key in lu_env:
249
          env["GANETI_" + key] = lu_env[key]
250
    else:
251
      lu_nodes_pre = lu_nodes_post = []
252

    
253
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
254

    
255
  def _RunWrapper(self, node_list, hpath, phase):
256
    """Simple wrapper over self.callfn.
257

258
    This method fixes the environment before doing the rpc call.
259

260
    """
261
    env = self.env.copy()
262
    env["GANETI_HOOKS_PHASE"] = phase
263
    env["GANETI_HOOKS_PATH"] = hpath
264
    if self.lu.sstore is not None:
265
      env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
266
      env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
267

    
268
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
269

    
270
    return self.callfn(node_list, hpath, phase, env)
271

    
272
  def RunPhase(self, phase):
273
    """Run all the scripts for a phase.
274

275
    This is the main function of the HookMaster.
276

277
    Args:
278
      phase: the hooks phase to run
279

280
    Returns:
281
      the result of the hooks multi-node rpc call
282

283
    """
284
    if not self.node_list[phase]:
285
      # empty node list, we should not attempt to run this as either
286
      # we're in the cluster init phase and the rpc client part can't
287
      # even attempt to run, or this LU doesn't do hooks at all
288
      return
289
    hpath = self.lu.HPATH
290
    results = self._RunWrapper(self.node_list[phase], hpath, phase)
291
    if phase == constants.HOOKS_PHASE_PRE:
292
      errs = []
293
      if not results:
294
        raise errors.HooksFailure("Communication failure")
295
      for node_name in results:
296
        res = results[node_name]
297
        if res is False or not isinstance(res, list):
298
          self.proc.LogWarning("Communication failure to node %s" % node_name)
299
          continue
300
        for script, hkr, output in res:
301
          if hkr == constants.HKR_FAIL:
302
            output = output.strip().encode("string_escape")
303
            errs.append((node_name, script, output))
304
      if errs:
305
        raise errors.HooksAbort(errs)
306
    return results
307

    
308
  def RunConfigUpdate(self):
309
    """Run the special configuration update hook
310

311
    This is a special hook that runs only on the master after each
312
    top-level LI if the configuration has been updated.
313

314
    """
315
    phase = constants.HOOKS_PHASE_POST
316
    hpath = constants.HOOKS_NAME_CFGUPDATE
317
    if self.lu.sstore is None:
318
      raise errors.ProgrammerError("Null sstore on config update hook")
319
    nodes = [self.lu.sstore.GetMasterNode()]
320
    results = self._RunWrapper(nodes, hpath, phase)