Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ a0c9f010

History | View | Annotate | Download (10.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31

    
32
from ganeti import opcodes
33
from ganeti import constants
34
from ganeti import errors
35
from ganeti import rpc
36
from ganeti import cmdlib
37
from ganeti import config
38
from ganeti import ssconf
39
from ganeti import logger
40

    
41

    
42
class Processor(object):
43
  """Object which runs OpCodes"""
44
  DISPATCH_TABLE = {
45
    # Cluster
46
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
47
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
48
    opcodes.OpClusterCopyFile: cmdlib.LUClusterCopyFile,
49
    opcodes.OpRunClusterCommand: cmdlib.LURunClusterCommand,
50
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
51
    opcodes.OpMasterFailover: cmdlib.LUMasterFailover,
52
    opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
53
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
54
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
55
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
56
    # node lu
57
    opcodes.OpAddNode: cmdlib.LUAddNode,
58
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
59
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
60
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
61
    # instance lu
62
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
63
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
64
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
65
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
66
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
67
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
68
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
69
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
70
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
71
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
72
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
73
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
74
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
75
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
76
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
77
    # os lu
78
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
79
    # exports lu
80
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
81
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
82
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
83
    # tags lu
84
    opcodes.OpGetTags: cmdlib.LUGetTags,
85
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
86
    opcodes.OpAddTags: cmdlib.LUAddTags,
87
    opcodes.OpDelTags: cmdlib.LUDelTags,
88
    # test lu
89
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
90
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
91
    }
92

    
93
  def __init__(self, feedback=None):
94
    """Constructor for Processor
95

96
    Args:
97
     - feedback_fn: the feedback function (taking one string) to be run when
98
                    interesting events are happening
99
    """
100
    self.cfg = None
101
    self.sstore = None
102
    self._feedback_fn = feedback
103

    
104
  def ExecOpCode(self, op):
105
    """Execute an opcode.
106

107
    Args:
108
      op: the opcode to be executed
109

110
    """
111
    if not isinstance(op, opcodes.OpCode):
112
      raise errors.ProgrammerError("Non-opcode instance passed"
113
                                   " to ExecOpcode")
114

    
115
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
116
    if lu_class is None:
117
      raise errors.OpCodeUnknown("Unknown opcode")
118

    
119
    if self.cfg is None:
120
      self.cfg = config.ConfigWriter()
121
      self.sstore = ssconf.SimpleStore()
122
    if self.cfg is not None:
123
      write_count = self.cfg.write_count
124
    else:
125
      write_count = 0
126
    lu = lu_class(self, op, self.cfg, self.sstore)
127
    lu.CheckPrereq()
128
    hm = HooksMaster(rpc.call_hooks_runner, self, lu)
129
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
130
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
131
                     h_results, self._feedback_fn, None)
132
    try:
133
      result = lu.Exec(self._feedback_fn)
134
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
135
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
136
                       h_results, self._feedback_fn, result)
137
    finally:
138
      if lu.cfg is not None:
139
        # we use lu.cfg and not self.cfg as for init cluster, self.cfg
140
        # is None but lu.cfg has been recently initialized in the
141
        # lu.Exec method
142
        if write_count != lu.cfg.write_count:
143
          hm.RunConfigUpdate()
144

    
145
    return result
146

    
147
  def ChainOpCode(self, op):
148
    """Chain and execute an opcode.
149

150
    This is used by LUs when they need to execute a child LU.
151

152
    Args:
153
     - opcode: the opcode to be executed
154

155
    """
156
    if not isinstance(op, opcodes.OpCode):
157
      raise errors.ProgrammerError("Non-opcode instance passed"
158
                                   " to ExecOpcode")
159

    
160
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
161
    if lu_class is None:
162
      raise errors.OpCodeUnknown("Unknown opcode")
163

    
164
    if self.cfg is None:
165
      self.cfg = config.ConfigWriter()
166
      self.sstore = ssconf.SimpleStore()
167
    #do_hooks = lu_class.HPATH is not None
168
    lu = lu_class(self, op, self.cfg, self.sstore)
169
    lu.CheckPrereq()
170
    #if do_hooks:
171
    #  hm = HooksMaster(rpc.call_hooks_runner, self, lu)
172
    #  h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
173
    #  lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
174
    #                   h_results, self._feedback_fn, None)
175
    result = lu.Exec(self._feedback_fn)
176
    #if do_hooks:
177
    #  h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
178
    #  result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
179
    #                   h_results, self._feedback_fn, result)
180
    return result
181

    
182
  def LogStep(self, current, total, message):
183
    """Log a change in LU execution progress.
184

185
    """
186
    logger.Debug("Step %d/%d %s" % (current, total, message))
187
    self._feedback_fn("STEP %d/%d %s" % (current, total, message))
188

    
189
  def LogWarning(self, message, hint=None):
190
    """Log a warning to the logs and the user.
191

192
    """
193
    logger.Error(message)
194
    self._feedback_fn(" - WARNING: %s" % message)
195
    if hint:
196
      self._feedback_fn("      Hint: %s" % hint)
197

    
198
  def LogInfo(self, message):
199
    """Log an informational message to the logs and the user.
200

201
    """
202
    logger.Info(message)
203
    self._feedback_fn(" - INFO: %s" % message)
204

    
205

    
206
class HooksMaster(object):
207
  """Hooks master.
208

209
  This class distributes the run commands to the nodes based on the
210
  specific LU class.
211

212
  In order to remove the direct dependency on the rpc module, the
213
  constructor needs a function which actually does the remote
214
  call. This will usually be rpc.call_hooks_runner, but any function
215
  which behaves the same works.
216

217
  """
218
  def __init__(self, callfn, proc, lu):
219
    self.callfn = callfn
220
    self.proc = proc
221
    self.lu = lu
222
    self.op = lu.op
223
    self.env, node_list_pre, node_list_post = self._BuildEnv()
224
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
225
                      constants.HOOKS_PHASE_POST: node_list_post}
226

    
227
  def _BuildEnv(self):
228
    """Compute the environment and the target nodes.
229

230
    Based on the opcode and the current node list, this builds the
231
    environment for the hooks and the target node list for the run.
232

233
    """
234
    env = {
235
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
236
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
237
      "GANETI_OP_CODE": self.op.OP_ID,
238
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
239
      "GANETI_DATA_DIR": constants.DATA_DIR,
240
      }
241

    
242
    if self.lu.HPATH is not None:
243
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
244
      if lu_env:
245
        for key in lu_env:
246
          env["GANETI_" + key] = lu_env[key]
247
    else:
248
      lu_nodes_pre = lu_nodes_post = []
249

    
250
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
251

    
252
  def _RunWrapper(self, node_list, hpath, phase):
253
    """Simple wrapper over self.callfn.
254

255
    This method fixes the environment before doing the rpc call.
256

257
    """
258
    env = self.env.copy()
259
    env["GANETI_HOOKS_PHASE"] = phase
260
    env["GANETI_HOOKS_PATH"] = hpath
261
    if self.lu.sstore is not None:
262
      env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
263
      env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
264

    
265
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
266

    
267
    return self.callfn(node_list, hpath, phase, env)
268

    
269
  def RunPhase(self, phase):
270
    """Run all the scripts for a phase.
271

272
    This is the main function of the HookMaster.
273

274
    Args:
275
      phase: the hooks phase to run
276

277
    Returns:
278
      the result of the hooks multi-node rpc call
279

280
    """
281
    if not self.node_list[phase]:
282
      # empty node list, we should not attempt to run this as either
283
      # we're in the cluster init phase and the rpc client part can't
284
      # even attempt to run, or this LU doesn't do hooks at all
285
      return
286
    hpath = self.lu.HPATH
287
    results = self._RunWrapper(self.node_list[phase], hpath, phase)
288
    if phase == constants.HOOKS_PHASE_PRE:
289
      errs = []
290
      if not results:
291
        raise errors.HooksFailure("Communication failure")
292
      for node_name in results:
293
        res = results[node_name]
294
        if res is False or not isinstance(res, list):
295
          self.proc.LogWarning("Communication failure to node %s" % node_name)
296
          continue
297
        for script, hkr, output in res:
298
          if hkr == constants.HKR_FAIL:
299
            output = output.strip().encode("string_escape")
300
            errs.append((node_name, script, output))
301
      if errs:
302
        raise errors.HooksAbort(errs)
303
    return results
304

    
305
  def RunConfigUpdate(self):
306
    """Run the special configuration update hook
307

308
    This is a special hook that runs only on the master after each
309
    top-level LI if the configuration has been updated.
310

311
    """
312
    phase = constants.HOOKS_PHASE_POST
313
    hpath = constants.HOOKS_NAME_CFGUPDATE
314
    if self.lu.sstore is None:
315
      raise errors.ProgrammerError("Null sstore on config update hook")
316
    nodes = [self.lu.sstore.GetMasterNode()]
317
    results = self._RunWrapper(nodes, hpath, phase)