Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ f1048938

History | View | Annotate | Download (12.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31

    
32
from ganeti import opcodes
33
from ganeti import constants
34
from ganeti import errors
35
from ganeti import rpc
36
from ganeti import cmdlib
37
from ganeti import config
38
from ganeti import ssconf
39
from ganeti import logger
40
from ganeti import locking
41

    
42

    
43
class Processor(object):
44
  """Object which runs OpCodes"""
45
  DISPATCH_TABLE = {
46
    # Cluster
47
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
48
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
49
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
50
    opcodes.OpMasterFailover: cmdlib.LUMasterFailover,
51
    opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
52
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
53
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
54
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
55
    # node lu
56
    opcodes.OpAddNode: cmdlib.LUAddNode,
57
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
58
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
59
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
60
    # instance lu
61
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
62
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
63
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
64
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
65
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
66
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
67
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
68
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
69
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
70
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
71
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
72
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
73
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
74
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
75
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
76
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
77
    # os lu
78
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
79
    # exports lu
80
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
81
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
82
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
83
    # tags lu
84
    opcodes.OpGetTags: cmdlib.LUGetTags,
85
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
86
    opcodes.OpAddTags: cmdlib.LUAddTags,
87
    opcodes.OpDelTags: cmdlib.LUDelTags,
88
    # test lu
89
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
90
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
91
    }
92

    
93
  def __init__(self, context):
94
    """Constructor for Processor
95

96
    Args:
97
     - feedback_fn: the feedback function (taking one string) to be run when
98
                    interesting events are happening
99
    """
100
    self.context = context
101
    self._feedback_fn = None
102
    self.exclusive_BGL = False
103

    
104
  def _ExecLU(self, lu):
105
    """Logical Unit execution sequence.
106

107
    """
108
    write_count = self.context.cfg.write_count
109
    lu.CheckPrereq()
110
    hm = HooksMaster(rpc.call_hooks_runner, self, lu)
111
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
112
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
113
                     self._feedback_fn, None)
114
    try:
115
      result = lu.Exec(self._feedback_fn)
116
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
117
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
118
                                self._feedback_fn, result)
119
    finally:
120
      # FIXME: This needs locks if not lu_class.REQ_BGL
121
      if write_count != self.context.cfg.write_count:
122
        hm.RunConfigUpdate()
123

    
124
    return result
125

    
126
  def _LockAndExecLU(self, lu, level):
127
    """Execute a Logical Unit, with the needed locks.
128

129
    This is a recursive function that starts locking the given level, and
130
    proceeds up, till there are no more locks to acquire. Then it executes the
131
    given LU and its opcodes.
132

133
    """
134
    if level in lu.needed_locks:
135
      # This is always safe to do, as we can't acquire more/less locks than
136
      # what was requested.
137
      lu.needed_locks[level] = self.context.glm.acquire(level,
138
                                                        lu.needed_locks[level])
139
      try:
140
        result = self._LockAndExecLU(lu, level + 1)
141
      finally:
142
        if lu.needed_locks[level]:
143
          self.context.glm.release(level)
144
    else:
145
      result = self._ExecLU(lu)
146

    
147
    return result
148

    
149
  def ExecOpCode(self, op, feedback_fn):
150
    """Execute an opcode.
151

152
    Args:
153
      op: the opcode to be executed
154

155
    """
156
    if not isinstance(op, opcodes.OpCode):
157
      raise errors.ProgrammerError("Non-opcode instance passed"
158
                                   " to ExecOpcode")
159

    
160
    self._feedback_fn = feedback_fn
161
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
162
    if lu_class is None:
163
      raise errors.OpCodeUnknown("Unknown opcode")
164

    
165
    if lu_class.REQ_WSSTORE:
166
      sstore = ssconf.WritableSimpleStore()
167
    else:
168
      sstore = ssconf.SimpleStore()
169

    
170
    # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
171
    # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
172
    self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
173
                             shared=not lu_class.REQ_BGL)
174
    try:
175
      self.exclusive_BGL = lu_class.REQ_BGL
176
      lu = lu_class(self, op, self.context, sstore)
177
      lu.ExpandNames()
178
      assert lu.needed_locks is not None, "needed_locks not set by LU"
179
      result = self._LockAndExecLU(lu, locking.LEVEL_NODE)
180
    finally:
181
      self.context.glm.release(locking.LEVEL_CLUSTER)
182
      self.exclusive_BGL = False
183

    
184
    return result
185

    
186
  def ChainOpCode(self, op):
187
    """Chain and execute an opcode.
188

189
    This is used by LUs when they need to execute a child LU.
190

191
    Args:
192
     - opcode: the opcode to be executed
193

194
    """
195
    if not isinstance(op, opcodes.OpCode):
196
      raise errors.ProgrammerError("Non-opcode instance passed"
197
                                   " to ExecOpcode")
198

    
199
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
200
    if lu_class is None:
201
      raise errors.OpCodeUnknown("Unknown opcode")
202

    
203
    if lu_class.REQ_BGL and not self.exclusive_BGL:
204
      raise errors.ProgrammerError("LUs which require the BGL cannot"
205
                                   " be chained to granular ones.")
206

    
207
    if lu_class.REQ_WSSTORE:
208
      sstore = ssconf.WritableSimpleStore()
209
    else:
210
      sstore = ssconf.SimpleStore()
211

    
212
    #do_hooks = lu_class.HPATH is not None
213
    lu = lu_class(self, op, self.context, sstore)
214
    lu.CheckPrereq()
215
    #if do_hooks:
216
    #  hm = HooksMaster(rpc.call_hooks_runner, self, lu)
217
    #  h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
218
    #  lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
219
    #                   h_results, self._feedback_fn, None)
220
    result = lu.Exec(self._feedback_fn)
221
    #if do_hooks:
222
    #  h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
223
    #  result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
224
    #                   h_results, self._feedback_fn, result)
225
    return result
226

    
227
  def LogStep(self, current, total, message):
228
    """Log a change in LU execution progress.
229

230
    """
231
    logger.Debug("Step %d/%d %s" % (current, total, message))
232
    self._feedback_fn("STEP %d/%d %s" % (current, total, message))
233

    
234
  def LogWarning(self, message, hint=None):
235
    """Log a warning to the logs and the user.
236

237
    """
238
    logger.Error(message)
239
    self._feedback_fn(" - WARNING: %s" % message)
240
    if hint:
241
      self._feedback_fn("      Hint: %s" % hint)
242

    
243
  def LogInfo(self, message):
244
    """Log an informational message to the logs and the user.
245

246
    """
247
    logger.Info(message)
248
    self._feedback_fn(" - INFO: %s" % message)
249

    
250

    
251
class HooksMaster(object):
252
  """Hooks master.
253

254
  This class distributes the run commands to the nodes based on the
255
  specific LU class.
256

257
  In order to remove the direct dependency on the rpc module, the
258
  constructor needs a function which actually does the remote
259
  call. This will usually be rpc.call_hooks_runner, but any function
260
  which behaves the same works.
261

262
  """
263
  def __init__(self, callfn, proc, lu):
264
    self.callfn = callfn
265
    self.proc = proc
266
    self.lu = lu
267
    self.op = lu.op
268
    self.env, node_list_pre, node_list_post = self._BuildEnv()
269
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
270
                      constants.HOOKS_PHASE_POST: node_list_post}
271

    
272
  def _BuildEnv(self):
273
    """Compute the environment and the target nodes.
274

275
    Based on the opcode and the current node list, this builds the
276
    environment for the hooks and the target node list for the run.
277

278
    """
279
    env = {
280
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
281
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
282
      "GANETI_OP_CODE": self.op.OP_ID,
283
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
284
      "GANETI_DATA_DIR": constants.DATA_DIR,
285
      }
286

    
287
    if self.lu.HPATH is not None:
288
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
289
      if lu_env:
290
        for key in lu_env:
291
          env["GANETI_" + key] = lu_env[key]
292
    else:
293
      lu_nodes_pre = lu_nodes_post = []
294

    
295
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
296

    
297
  def _RunWrapper(self, node_list, hpath, phase):
298
    """Simple wrapper over self.callfn.
299

300
    This method fixes the environment before doing the rpc call.
301

302
    """
303
    env = self.env.copy()
304
    env["GANETI_HOOKS_PHASE"] = phase
305
    env["GANETI_HOOKS_PATH"] = hpath
306
    if self.lu.sstore is not None:
307
      env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
308
      env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
309

    
310
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
311

    
312
    return self.callfn(node_list, hpath, phase, env)
313

    
314
  def RunPhase(self, phase):
315
    """Run all the scripts for a phase.
316

317
    This is the main function of the HookMaster.
318

319
    Args:
320
      phase: the hooks phase to run
321

322
    Returns:
323
      the result of the hooks multi-node rpc call
324

325
    """
326
    if not self.node_list[phase]:
327
      # empty node list, we should not attempt to run this as either
328
      # we're in the cluster init phase and the rpc client part can't
329
      # even attempt to run, or this LU doesn't do hooks at all
330
      return
331
    hpath = self.lu.HPATH
332
    results = self._RunWrapper(self.node_list[phase], hpath, phase)
333
    if phase == constants.HOOKS_PHASE_PRE:
334
      errs = []
335
      if not results:
336
        raise errors.HooksFailure("Communication failure")
337
      for node_name in results:
338
        res = results[node_name]
339
        if res is False or not isinstance(res, list):
340
          self.proc.LogWarning("Communication failure to node %s" % node_name)
341
          continue
342
        for script, hkr, output in res:
343
          if hkr == constants.HKR_FAIL:
344
            output = output.strip().encode("string_escape")
345
            errs.append((node_name, script, output))
346
      if errs:
347
        raise errors.HooksAbort(errs)
348
    return results
349

    
350
  def RunConfigUpdate(self):
351
    """Run the special configuration update hook
352

353
    This is a special hook that runs only on the master after each
354
    top-level LI if the configuration has been updated.
355

356
    """
357
    phase = constants.HOOKS_PHASE_POST
358
    hpath = constants.HOOKS_NAME_CFGUPDATE
359
    if self.lu.sstore is None:
360
      raise errors.ProgrammerError("Null sstore on config update hook")
361
    nodes = [self.lu.sstore.GetMasterNode()]
362
    results = self._RunWrapper(nodes, hpath, phase)