Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 441e7cfd

History | View | Annotate | Download (12.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31

    
32
from ganeti import opcodes
33
from ganeti import constants
34
from ganeti import errors
35
from ganeti import rpc
36
from ganeti import cmdlib
37
from ganeti import ssconf
38
from ganeti import logger
39
from ganeti import locking
40

    
41

    
42
class Processor(object):
43
  """Object which runs OpCodes"""
44
  DISPATCH_TABLE = {
45
    # Cluster
46
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
47
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
48
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
49
    opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
50
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
51
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
52
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
53
    # node lu
54
    opcodes.OpAddNode: cmdlib.LUAddNode,
55
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
56
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
57
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
58
    # instance lu
59
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
60
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
61
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
62
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
63
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
64
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
65
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
66
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
67
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
68
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
69
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
70
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
71
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
72
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
73
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
74
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
75
    # os lu
76
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
77
    # exports lu
78
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
79
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
80
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
81
    # tags lu
82
    opcodes.OpGetTags: cmdlib.LUGetTags,
83
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
84
    opcodes.OpAddTags: cmdlib.LUAddTags,
85
    opcodes.OpDelTags: cmdlib.LUDelTags,
86
    # test lu
87
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
88
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
89
    }
90

    
91
  def __init__(self, context):
92
    """Constructor for Processor
93

94
    Args:
95
     - feedback_fn: the feedback function (taking one string) to be run when
96
                    interesting events are happening
97
    """
98
    self.context = context
99
    self._feedback_fn = None
100
    self.exclusive_BGL = False
101

    
102
  def _ExecLU(self, lu):
103
    """Logical Unit execution sequence.
104

105
    """
106
    write_count = self.context.cfg.write_count
107
    lu.CheckPrereq()
108
    hm = HooksMaster(rpc.call_hooks_runner, self, lu)
109
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
110
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
111
                     self._feedback_fn, None)
112
    try:
113
      result = lu.Exec(self._feedback_fn)
114
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
115
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
116
                                self._feedback_fn, result)
117
    finally:
118
      # FIXME: This needs locks if not lu_class.REQ_BGL
119
      if write_count != self.context.cfg.write_count:
120
        hm.RunConfigUpdate()
121

    
122
    return result
123

    
124
  def _LockAndExecLU(self, lu, level):
125
    """Execute a Logical Unit, with the needed locks.
126

127
    This is a recursive function that starts locking the given level, and
128
    proceeds up, till there are no more locks to acquire. Then it executes the
129
    given LU and its opcodes.
130

131
    """
132
    if level in lu.needed_locks:
133
      # This gives a chance to LUs to make last-minute changes after acquiring
134
      # locks at any preceding level.
135
      lu.DeclareLocks(level)
136
      needed_locks = lu.needed_locks[level]
137
      share = lu.share_locks[level]
138
      # This is always safe to do, as we can't acquire more/less locks than
139
      # what was requested.
140
      lu.needed_locks[level] = self.context.glm.acquire(level,
141
                                                        needed_locks,
142
                                                        shared=share)
143
      try:
144
        result = self._LockAndExecLU(lu, level + 1)
145
      finally:
146
        if lu.needed_locks[level]:
147
          self.context.glm.release(level)
148
    else:
149
      result = self._ExecLU(lu)
150

    
151
    return result
152

    
153
  def ExecOpCode(self, op, feedback_fn):
154
    """Execute an opcode.
155

156
    Args:
157
      op: the opcode to be executed
158

159
    """
160
    if not isinstance(op, opcodes.OpCode):
161
      raise errors.ProgrammerError("Non-opcode instance passed"
162
                                   " to ExecOpcode")
163

    
164
    self._feedback_fn = feedback_fn
165
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
166
    if lu_class is None:
167
      raise errors.OpCodeUnknown("Unknown opcode")
168

    
169
    if lu_class.REQ_WSSTORE:
170
      sstore = ssconf.WritableSimpleStore()
171
    else:
172
      sstore = ssconf.SimpleStore()
173

    
174
    # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
175
    # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
176
    self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
177
                             shared=not lu_class.REQ_BGL)
178
    try:
179
      self.exclusive_BGL = lu_class.REQ_BGL
180
      lu = lu_class(self, op, self.context, sstore)
181
      lu.ExpandNames()
182
      assert lu.needed_locks is not None, "needed_locks not set by LU"
183
      result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
184
    finally:
185
      self.context.glm.release(locking.LEVEL_CLUSTER)
186
      self.exclusive_BGL = False
187

    
188
    return result
189

    
190
  def ChainOpCode(self, op):
191
    """Chain and execute an opcode.
192

193
    This is used by LUs when they need to execute a child LU.
194

195
    Args:
196
     - opcode: the opcode to be executed
197

198
    """
199
    if not isinstance(op, opcodes.OpCode):
200
      raise errors.ProgrammerError("Non-opcode instance passed"
201
                                   " to ExecOpcode")
202

    
203
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
204
    if lu_class is None:
205
      raise errors.OpCodeUnknown("Unknown opcode")
206

    
207
    if lu_class.REQ_BGL and not self.exclusive_BGL:
208
      raise errors.ProgrammerError("LUs which require the BGL cannot"
209
                                   " be chained to granular ones.")
210

    
211
    assert lu_class.REQ_BGL, "ChainOpCode is still BGL-only"
212

    
213
    if lu_class.REQ_WSSTORE:
214
      sstore = ssconf.WritableSimpleStore()
215
    else:
216
      sstore = ssconf.SimpleStore()
217

    
218
    #do_hooks = lu_class.HPATH is not None
219
    lu = lu_class(self, op, self.context, sstore)
220
    lu.CheckPrereq()
221
    #if do_hooks:
222
    #  hm = HooksMaster(rpc.call_hooks_runner, self, lu)
223
    #  h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
224
    #  lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
225
    #                   h_results, self._feedback_fn, None)
226
    result = lu.Exec(self._feedback_fn)
227
    #if do_hooks:
228
    #  h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
229
    #  result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
230
    #                   h_results, self._feedback_fn, result)
231
    return result
232

    
233
  def LogStep(self, current, total, message):
234
    """Log a change in LU execution progress.
235

236
    """
237
    logger.Debug("Step %d/%d %s" % (current, total, message))
238
    self._feedback_fn("STEP %d/%d %s" % (current, total, message))
239

    
240
  def LogWarning(self, message, hint=None):
241
    """Log a warning to the logs and the user.
242

243
    """
244
    logger.Error(message)
245
    self._feedback_fn(" - WARNING: %s" % message)
246
    if hint:
247
      self._feedback_fn("      Hint: %s" % hint)
248

    
249
  def LogInfo(self, message):
250
    """Log an informational message to the logs and the user.
251

252
    """
253
    logger.Info(message)
254
    self._feedback_fn(" - INFO: %s" % message)
255

    
256

    
257
class HooksMaster(object):
258
  """Hooks master.
259

260
  This class distributes the run commands to the nodes based on the
261
  specific LU class.
262

263
  In order to remove the direct dependency on the rpc module, the
264
  constructor needs a function which actually does the remote
265
  call. This will usually be rpc.call_hooks_runner, but any function
266
  which behaves the same works.
267

268
  """
269
  def __init__(self, callfn, proc, lu):
270
    self.callfn = callfn
271
    self.proc = proc
272
    self.lu = lu
273
    self.op = lu.op
274
    self.env, node_list_pre, node_list_post = self._BuildEnv()
275
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
276
                      constants.HOOKS_PHASE_POST: node_list_post}
277

    
278
  def _BuildEnv(self):
279
    """Compute the environment and the target nodes.
280

281
    Based on the opcode and the current node list, this builds the
282
    environment for the hooks and the target node list for the run.
283

284
    """
285
    env = {
286
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
287
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
288
      "GANETI_OP_CODE": self.op.OP_ID,
289
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
290
      "GANETI_DATA_DIR": constants.DATA_DIR,
291
      }
292

    
293
    if self.lu.HPATH is not None:
294
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
295
      if lu_env:
296
        for key in lu_env:
297
          env["GANETI_" + key] = lu_env[key]
298
    else:
299
      lu_nodes_pre = lu_nodes_post = []
300

    
301
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
302

    
303
  def _RunWrapper(self, node_list, hpath, phase):
304
    """Simple wrapper over self.callfn.
305

306
    This method fixes the environment before doing the rpc call.
307

308
    """
309
    env = self.env.copy()
310
    env["GANETI_HOOKS_PHASE"] = phase
311
    env["GANETI_HOOKS_PATH"] = hpath
312
    if self.lu.sstore is not None:
313
      env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
314
      env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
315

    
316
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
317

    
318
    return self.callfn(node_list, hpath, phase, env)
319

    
320
  def RunPhase(self, phase):
321
    """Run all the scripts for a phase.
322

323
    This is the main function of the HookMaster.
324

325
    Args:
326
      phase: the hooks phase to run
327

328
    Returns:
329
      the result of the hooks multi-node rpc call
330

331
    """
332
    if not self.node_list[phase]:
333
      # empty node list, we should not attempt to run this as either
334
      # we're in the cluster init phase and the rpc client part can't
335
      # even attempt to run, or this LU doesn't do hooks at all
336
      return
337
    hpath = self.lu.HPATH
338
    results = self._RunWrapper(self.node_list[phase], hpath, phase)
339
    if phase == constants.HOOKS_PHASE_PRE:
340
      errs = []
341
      if not results:
342
        raise errors.HooksFailure("Communication failure")
343
      for node_name in results:
344
        res = results[node_name]
345
        if res is False or not isinstance(res, list):
346
          self.proc.LogWarning("Communication failure to node %s" % node_name)
347
          continue
348
        for script, hkr, output in res:
349
          if hkr == constants.HKR_FAIL:
350
            output = output.strip().encode("string_escape")
351
            errs.append((node_name, script, output))
352
      if errs:
353
        raise errors.HooksAbort(errs)
354
    return results
355

    
356
  def RunConfigUpdate(self):
357
    """Run the special configuration update hook
358

359
    This is a special hook that runs only on the master after each
360
    top-level LI if the configuration has been updated.
361

362
    """
363
    phase = constants.HOOKS_PHASE_POST
364
    hpath = constants.HOOKS_NAME_CFGUPDATE
365
    if self.lu.sstore is None:
366
      raise errors.ProgrammerError("Null sstore on config update hook")
367
    nodes = [self.lu.sstore.GetMasterNode()]
368
    results = self._RunWrapper(nodes, hpath, phase)