Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ b1b6ea87

History | View | Annotate | Download (12.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31

    
32
from ganeti import opcodes
33
from ganeti import constants
34
from ganeti import errors
35
from ganeti import rpc
36
from ganeti import cmdlib
37
from ganeti import config
38
from ganeti import ssconf
39
from ganeti import logger
40
from ganeti import locking
41

    
42

    
43
class Processor(object):
44
  """Object which runs OpCodes"""
45
  DISPATCH_TABLE = {
46
    # Cluster
47
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
48
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
49
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
50
    opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
51
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
52
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
53
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
54
    # node lu
55
    opcodes.OpAddNode: cmdlib.LUAddNode,
56
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
57
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
58
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
59
    # instance lu
60
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
61
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
62
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
63
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
64
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
65
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
66
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
67
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
68
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
69
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
70
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
71
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
72
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
73
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
74
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
75
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
76
    # os lu
77
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
78
    # exports lu
79
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
80
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
81
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
82
    # tags lu
83
    opcodes.OpGetTags: cmdlib.LUGetTags,
84
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
85
    opcodes.OpAddTags: cmdlib.LUAddTags,
86
    opcodes.OpDelTags: cmdlib.LUDelTags,
87
    # test lu
88
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
89
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
90
    }
91

    
92
  def __init__(self, context):
93
    """Constructor for Processor
94

95
    Args:
96
     - feedback_fn: the feedback function (taking one string) to be run when
97
                    interesting events are happening
98
    """
99
    self.context = context
100
    self._feedback_fn = None
101
    self.exclusive_BGL = False
102

    
103
  def _ExecLU(self, lu):
104
    """Logical Unit execution sequence.
105

106
    """
107
    write_count = self.context.cfg.write_count
108
    lu.CheckPrereq()
109
    hm = HooksMaster(rpc.call_hooks_runner, self, lu)
110
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
111
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
112
                     self._feedback_fn, None)
113
    try:
114
      result = lu.Exec(self._feedback_fn)
115
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
116
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
117
                                self._feedback_fn, result)
118
    finally:
119
      # FIXME: This needs locks if not lu_class.REQ_BGL
120
      if write_count != self.context.cfg.write_count:
121
        hm.RunConfigUpdate()
122

    
123
    return result
124

    
125
  def _LockAndExecLU(self, lu, level):
126
    """Execute a Logical Unit, with the needed locks.
127

128
    This is a recursive function that starts locking the given level, and
129
    proceeds up, till there are no more locks to acquire. Then it executes the
130
    given LU and its opcodes.
131

132
    """
133
    if level in lu.needed_locks:
134
      # This is always safe to do, as we can't acquire more/less locks than
135
      # what was requested.
136
      lu.needed_locks[level] = self.context.glm.acquire(level,
137
                                                        lu.needed_locks[level])
138
      try:
139
        result = self._LockAndExecLU(lu, level + 1)
140
      finally:
141
        if lu.needed_locks[level]:
142
          self.context.glm.release(level)
143
    else:
144
      result = self._ExecLU(lu)
145

    
146
    return result
147

    
148
  def ExecOpCode(self, op, feedback_fn):
149
    """Execute an opcode.
150

151
    Args:
152
      op: the opcode to be executed
153

154
    """
155
    if not isinstance(op, opcodes.OpCode):
156
      raise errors.ProgrammerError("Non-opcode instance passed"
157
                                   " to ExecOpcode")
158

    
159
    self._feedback_fn = feedback_fn
160
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
161
    if lu_class is None:
162
      raise errors.OpCodeUnknown("Unknown opcode")
163

    
164
    if lu_class.REQ_WSSTORE:
165
      sstore = ssconf.WritableSimpleStore()
166
    else:
167
      sstore = ssconf.SimpleStore()
168

    
169
    # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
170
    # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
171
    self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
172
                             shared=not lu_class.REQ_BGL)
173
    try:
174
      self.exclusive_BGL = lu_class.REQ_BGL
175
      lu = lu_class(self, op, self.context, sstore)
176
      lu.ExpandNames()
177
      assert lu.needed_locks is not None, "needed_locks not set by LU"
178
      result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
179
    finally:
180
      self.context.glm.release(locking.LEVEL_CLUSTER)
181
      self.exclusive_BGL = False
182

    
183
    return result
184

    
185
  def ChainOpCode(self, op):
186
    """Chain and execute an opcode.
187

188
    This is used by LUs when they need to execute a child LU.
189

190
    Args:
191
     - opcode: the opcode to be executed
192

193
    """
194
    if not isinstance(op, opcodes.OpCode):
195
      raise errors.ProgrammerError("Non-opcode instance passed"
196
                                   " to ExecOpcode")
197

    
198
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
199
    if lu_class is None:
200
      raise errors.OpCodeUnknown("Unknown opcode")
201

    
202
    if lu_class.REQ_BGL and not self.exclusive_BGL:
203
      raise errors.ProgrammerError("LUs which require the BGL cannot"
204
                                   " be chained to granular ones.")
205

    
206
    if lu_class.REQ_WSSTORE:
207
      sstore = ssconf.WritableSimpleStore()
208
    else:
209
      sstore = ssconf.SimpleStore()
210

    
211
    #do_hooks = lu_class.HPATH is not None
212
    lu = lu_class(self, op, self.context, sstore)
213
    lu.CheckPrereq()
214
    #if do_hooks:
215
    #  hm = HooksMaster(rpc.call_hooks_runner, self, lu)
216
    #  h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
217
    #  lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
218
    #                   h_results, self._feedback_fn, None)
219
    result = lu.Exec(self._feedback_fn)
220
    #if do_hooks:
221
    #  h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
222
    #  result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
223
    #                   h_results, self._feedback_fn, result)
224
    return result
225

    
226
  def LogStep(self, current, total, message):
227
    """Log a change in LU execution progress.
228

229
    """
230
    logger.Debug("Step %d/%d %s" % (current, total, message))
231
    self._feedback_fn("STEP %d/%d %s" % (current, total, message))
232

    
233
  def LogWarning(self, message, hint=None):
234
    """Log a warning to the logs and the user.
235

236
    """
237
    logger.Error(message)
238
    self._feedback_fn(" - WARNING: %s" % message)
239
    if hint:
240
      self._feedback_fn("      Hint: %s" % hint)
241

    
242
  def LogInfo(self, message):
243
    """Log an informational message to the logs and the user.
244

245
    """
246
    logger.Info(message)
247
    self._feedback_fn(" - INFO: %s" % message)
248

    
249

    
250
class HooksMaster(object):
251
  """Hooks master.
252

253
  This class distributes the run commands to the nodes based on the
254
  specific LU class.
255

256
  In order to remove the direct dependency on the rpc module, the
257
  constructor needs a function which actually does the remote
258
  call. This will usually be rpc.call_hooks_runner, but any function
259
  which behaves the same works.
260

261
  """
262
  def __init__(self, callfn, proc, lu):
263
    self.callfn = callfn
264
    self.proc = proc
265
    self.lu = lu
266
    self.op = lu.op
267
    self.env, node_list_pre, node_list_post = self._BuildEnv()
268
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
269
                      constants.HOOKS_PHASE_POST: node_list_post}
270

    
271
  def _BuildEnv(self):
272
    """Compute the environment and the target nodes.
273

274
    Based on the opcode and the current node list, this builds the
275
    environment for the hooks and the target node list for the run.
276

277
    """
278
    env = {
279
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
280
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
281
      "GANETI_OP_CODE": self.op.OP_ID,
282
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
283
      "GANETI_DATA_DIR": constants.DATA_DIR,
284
      }
285

    
286
    if self.lu.HPATH is not None:
287
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
288
      if lu_env:
289
        for key in lu_env:
290
          env["GANETI_" + key] = lu_env[key]
291
    else:
292
      lu_nodes_pre = lu_nodes_post = []
293

    
294
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
295

    
296
  def _RunWrapper(self, node_list, hpath, phase):
297
    """Simple wrapper over self.callfn.
298

299
    This method fixes the environment before doing the rpc call.
300

301
    """
302
    env = self.env.copy()
303
    env["GANETI_HOOKS_PHASE"] = phase
304
    env["GANETI_HOOKS_PATH"] = hpath
305
    if self.lu.sstore is not None:
306
      env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
307
      env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
308

    
309
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
310

    
311
    return self.callfn(node_list, hpath, phase, env)
312

    
313
  def RunPhase(self, phase):
314
    """Run all the scripts for a phase.
315

316
    This is the main function of the HookMaster.
317

318
    Args:
319
      phase: the hooks phase to run
320

321
    Returns:
322
      the result of the hooks multi-node rpc call
323

324
    """
325
    if not self.node_list[phase]:
326
      # empty node list, we should not attempt to run this as either
327
      # we're in the cluster init phase and the rpc client part can't
328
      # even attempt to run, or this LU doesn't do hooks at all
329
      return
330
    hpath = self.lu.HPATH
331
    results = self._RunWrapper(self.node_list[phase], hpath, phase)
332
    if phase == constants.HOOKS_PHASE_PRE:
333
      errs = []
334
      if not results:
335
        raise errors.HooksFailure("Communication failure")
336
      for node_name in results:
337
        res = results[node_name]
338
        if res is False or not isinstance(res, list):
339
          self.proc.LogWarning("Communication failure to node %s" % node_name)
340
          continue
341
        for script, hkr, output in res:
342
          if hkr == constants.HKR_FAIL:
343
            output = output.strip().encode("string_escape")
344
            errs.append((node_name, script, output))
345
      if errs:
346
        raise errors.HooksAbort(errs)
347
    return results
348

    
349
  def RunConfigUpdate(self):
350
    """Run the special configuration update hook
351

352
    This is a special hook that runs only on the master after each
353
    top-level LI if the configuration has been updated.
354

355
    """
356
    phase = constants.HOOKS_PHASE_POST
357
    hpath = constants.HOOKS_NAME_CFGUPDATE
358
    if self.lu.sstore is None:
359
      raise errors.ProgrammerError("Null sstore on config update hook")
360
    nodes = [self.lu.sstore.GetMasterNode()]
361
    results = self._RunWrapper(nodes, hpath, phase)