Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 8a2941c4

History | View | Annotate | Download (12.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31

    
32
from ganeti import opcodes
33
from ganeti import constants
34
from ganeti import errors
35
from ganeti import rpc
36
from ganeti import cmdlib
37
from ganeti import ssconf
38
from ganeti import logger
39
from ganeti import locking
40

    
41

    
42
class Processor(object):
43
  """Object which runs OpCodes"""
44
  DISPATCH_TABLE = {
45
    # Cluster
46
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
47
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
48
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
49
    opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
50
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
51
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
52
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
53
    # node lu
54
    opcodes.OpAddNode: cmdlib.LUAddNode,
55
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
56
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
57
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
58
    # instance lu
59
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
60
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
61
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
62
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
63
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
64
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
65
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
66
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
67
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
68
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
69
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
70
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
71
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
72
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
73
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
74
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
75
    # os lu
76
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
77
    # exports lu
78
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
79
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
80
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
81
    # tags lu
82
    opcodes.OpGetTags: cmdlib.LUGetTags,
83
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
84
    opcodes.OpAddTags: cmdlib.LUAddTags,
85
    opcodes.OpDelTags: cmdlib.LUDelTags,
86
    # test lu
87
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
88
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
89
    }
90

    
91
  def __init__(self, context):
92
    """Constructor for Processor
93

94
    Args:
95
     - feedback_fn: the feedback function (taking one string) to be run when
96
                    interesting events are happening
97
    """
98
    self.context = context
99
    self._feedback_fn = None
100
    self.exclusive_BGL = False
101

    
102
  def _ExecLU(self, lu):
103
    """Logical Unit execution sequence.
104

105
    """
106
    write_count = self.context.cfg.write_count
107
    lu.CheckPrereq()
108
    hm = HooksMaster(rpc.call_hooks_runner, self, lu)
109
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
110
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
111
                     self._feedback_fn, None)
112
    try:
113
      result = lu.Exec(self._feedback_fn)
114
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
115
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
116
                                self._feedback_fn, result)
117
    finally:
118
      # FIXME: This needs locks if not lu_class.REQ_BGL
119
      if write_count != self.context.cfg.write_count:
120
        hm.RunConfigUpdate()
121

    
122
    return result
123

    
124
  def _LockAndExecLU(self, lu, level):
125
    """Execute a Logical Unit, with the needed locks.
126

127
    This is a recursive function that starts locking the given level, and
128
    proceeds up, till there are no more locks to acquire. Then it executes the
129
    given LU and its opcodes.
130

131
    """
132
    if level not in locking.LEVELS:
133
      result = self._ExecLU(lu)
134
    elif level in lu.needed_locks:
135
      # This gives a chance to LUs to make last-minute changes after acquiring
136
      # locks at any preceding level.
137
      lu.DeclareLocks(level)
138
      needed_locks = lu.needed_locks[level]
139
      share = lu.share_locks[level]
140
      # This is always safe to do, as we can't acquire more/less locks than
141
      # what was requested.
142
      lu.needed_locks[level] = self.context.glm.acquire(level,
143
                                                        needed_locks,
144
                                                        shared=share)
145
      try:
146
        result = self._LockAndExecLU(lu, level + 1)
147
      finally:
148
        if lu.needed_locks[level]:
149
          self.context.glm.release(level)
150
    else:
151
      result = self._LockAndExecLU(lu, level + 1)
152

    
153
    return result
154

    
155
  def ExecOpCode(self, op, feedback_fn):
156
    """Execute an opcode.
157

158
    Args:
159
      op: the opcode to be executed
160

161
    """
162
    if not isinstance(op, opcodes.OpCode):
163
      raise errors.ProgrammerError("Non-opcode instance passed"
164
                                   " to ExecOpcode")
165

    
166
    self._feedback_fn = feedback_fn
167
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
168
    if lu_class is None:
169
      raise errors.OpCodeUnknown("Unknown opcode")
170

    
171
    if lu_class.REQ_WSSTORE:
172
      sstore = ssconf.WritableSimpleStore()
173
    else:
174
      sstore = ssconf.SimpleStore()
175

    
176
    # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
177
    # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
178
    self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
179
                             shared=not lu_class.REQ_BGL)
180
    try:
181
      self.exclusive_BGL = lu_class.REQ_BGL
182
      lu = lu_class(self, op, self.context, sstore)
183
      lu.ExpandNames()
184
      assert lu.needed_locks is not None, "needed_locks not set by LU"
185
      result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
186
    finally:
187
      self.context.glm.release(locking.LEVEL_CLUSTER)
188
      self.exclusive_BGL = False
189

    
190
    return result
191

    
192
  def ChainOpCode(self, op):
193
    """Chain and execute an opcode.
194

195
    This is used by LUs when they need to execute a child LU.
196

197
    Args:
198
     - opcode: the opcode to be executed
199

200
    """
201
    if not isinstance(op, opcodes.OpCode):
202
      raise errors.ProgrammerError("Non-opcode instance passed"
203
                                   " to ExecOpcode")
204

    
205
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
206
    if lu_class is None:
207
      raise errors.OpCodeUnknown("Unknown opcode")
208

    
209
    if lu_class.REQ_BGL and not self.exclusive_BGL:
210
      raise errors.ProgrammerError("LUs which require the BGL cannot"
211
                                   " be chained to granular ones.")
212

    
213
    assert lu_class.REQ_BGL, "ChainOpCode is still BGL-only"
214

    
215
    if lu_class.REQ_WSSTORE:
216
      sstore = ssconf.WritableSimpleStore()
217
    else:
218
      sstore = ssconf.SimpleStore()
219

    
220
    #do_hooks = lu_class.HPATH is not None
221
    lu = lu_class(self, op, self.context, sstore)
222
    lu.CheckPrereq()
223
    #if do_hooks:
224
    #  hm = HooksMaster(rpc.call_hooks_runner, self, lu)
225
    #  h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
226
    #  lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
227
    #                   h_results, self._feedback_fn, None)
228
    result = lu.Exec(self._feedback_fn)
229
    #if do_hooks:
230
    #  h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
231
    #  result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
232
    #                   h_results, self._feedback_fn, result)
233
    return result
234

    
235
  def LogStep(self, current, total, message):
236
    """Log a change in LU execution progress.
237

238
    """
239
    logger.Debug("Step %d/%d %s" % (current, total, message))
240
    self._feedback_fn("STEP %d/%d %s" % (current, total, message))
241

    
242
  def LogWarning(self, message, hint=None):
243
    """Log a warning to the logs and the user.
244

245
    """
246
    logger.Error(message)
247
    self._feedback_fn(" - WARNING: %s" % message)
248
    if hint:
249
      self._feedback_fn("      Hint: %s" % hint)
250

    
251
  def LogInfo(self, message):
252
    """Log an informational message to the logs and the user.
253

254
    """
255
    logger.Info(message)
256
    self._feedback_fn(" - INFO: %s" % message)
257

    
258

    
259
class HooksMaster(object):
260
  """Hooks master.
261

262
  This class distributes the run commands to the nodes based on the
263
  specific LU class.
264

265
  In order to remove the direct dependency on the rpc module, the
266
  constructor needs a function which actually does the remote
267
  call. This will usually be rpc.call_hooks_runner, but any function
268
  which behaves the same works.
269

270
  """
271
  def __init__(self, callfn, proc, lu):
272
    self.callfn = callfn
273
    self.proc = proc
274
    self.lu = lu
275
    self.op = lu.op
276
    self.env, node_list_pre, node_list_post = self._BuildEnv()
277
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
278
                      constants.HOOKS_PHASE_POST: node_list_post}
279

    
280
  def _BuildEnv(self):
281
    """Compute the environment and the target nodes.
282

283
    Based on the opcode and the current node list, this builds the
284
    environment for the hooks and the target node list for the run.
285

286
    """
287
    env = {
288
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
289
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
290
      "GANETI_OP_CODE": self.op.OP_ID,
291
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
292
      "GANETI_DATA_DIR": constants.DATA_DIR,
293
      }
294

    
295
    if self.lu.HPATH is not None:
296
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
297
      if lu_env:
298
        for key in lu_env:
299
          env["GANETI_" + key] = lu_env[key]
300
    else:
301
      lu_nodes_pre = lu_nodes_post = []
302

    
303
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
304

    
305
  def _RunWrapper(self, node_list, hpath, phase):
306
    """Simple wrapper over self.callfn.
307

308
    This method fixes the environment before doing the rpc call.
309

310
    """
311
    env = self.env.copy()
312
    env["GANETI_HOOKS_PHASE"] = phase
313
    env["GANETI_HOOKS_PATH"] = hpath
314
    if self.lu.sstore is not None:
315
      env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
316
      env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
317

    
318
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
319

    
320
    return self.callfn(node_list, hpath, phase, env)
321

    
322
  def RunPhase(self, phase):
323
    """Run all the scripts for a phase.
324

325
    This is the main function of the HookMaster.
326

327
    Args:
328
      phase: the hooks phase to run
329

330
    Returns:
331
      the result of the hooks multi-node rpc call
332

333
    """
334
    if not self.node_list[phase]:
335
      # empty node list, we should not attempt to run this as either
336
      # we're in the cluster init phase and the rpc client part can't
337
      # even attempt to run, or this LU doesn't do hooks at all
338
      return
339
    hpath = self.lu.HPATH
340
    results = self._RunWrapper(self.node_list[phase], hpath, phase)
341
    if phase == constants.HOOKS_PHASE_PRE:
342
      errs = []
343
      if not results:
344
        raise errors.HooksFailure("Communication failure")
345
      for node_name in results:
346
        res = results[node_name]
347
        if res is False or not isinstance(res, list):
348
          self.proc.LogWarning("Communication failure to node %s" % node_name)
349
          continue
350
        for script, hkr, output in res:
351
          if hkr == constants.HKR_FAIL:
352
            output = output.strip().encode("string_escape")
353
            errs.append((node_name, script, output))
354
      if errs:
355
        raise errors.HooksAbort(errs)
356
    return results
357

    
358
  def RunConfigUpdate(self):
359
    """Run the special configuration update hook
360

361
    This is a special hook that runs only on the master after each
362
    top-level LI if the configuration has been updated.
363

364
    """
365
    phase = constants.HOOKS_PHASE_POST
366
    hpath = constants.HOOKS_NAME_CFGUPDATE
367
    if self.lu.sstore is None:
368
      raise errors.ProgrammerError("Null sstore on config update hook")
369
    nodes = [self.lu.sstore.GetMasterNode()]
370
    results = self._RunWrapper(nodes, hpath, phase)