Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ e873317a

History | View | Annotate | Download (12.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31

    
32
from ganeti import opcodes
33
from ganeti import constants
34
from ganeti import errors
35
from ganeti import rpc
36
from ganeti import cmdlib
37
from ganeti import config
38
from ganeti import ssconf
39
from ganeti import logger
40
from ganeti import locking
41

    
42

    
43
class Processor(object):
44
  """Object which runs OpCodes"""
45
  DISPATCH_TABLE = {
46
    # Cluster
47
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
48
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
49
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
50
    opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
51
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
52
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
53
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
54
    # node lu
55
    opcodes.OpAddNode: cmdlib.LUAddNode,
56
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
57
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
58
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
59
    # instance lu
60
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
61
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
62
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
63
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
64
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
65
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
66
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
67
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
68
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
69
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
70
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
71
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
72
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
73
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
74
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
75
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
76
    # os lu
77
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
78
    # exports lu
79
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
80
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
81
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
82
    # tags lu
83
    opcodes.OpGetTags: cmdlib.LUGetTags,
84
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
85
    opcodes.OpAddTags: cmdlib.LUAddTags,
86
    opcodes.OpDelTags: cmdlib.LUDelTags,
87
    # test lu
88
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
89
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
90
    }
91

    
92
  def __init__(self, context):
93
    """Constructor for Processor
94

95
    Args:
96
     - feedback_fn: the feedback function (taking one string) to be run when
97
                    interesting events are happening
98
    """
99
    self.context = context
100
    self._feedback_fn = None
101
    self.exclusive_BGL = False
102

    
103
  def _ExecLU(self, lu):
104
    """Logical Unit execution sequence.
105

106
    """
107
    write_count = self.context.cfg.write_count
108
    lu.CheckPrereq()
109
    hm = HooksMaster(rpc.call_hooks_runner, self, lu)
110
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
111
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
112
                     self._feedback_fn, None)
113
    try:
114
      result = lu.Exec(self._feedback_fn)
115
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
116
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
117
                                self._feedback_fn, result)
118
    finally:
119
      # FIXME: This needs locks if not lu_class.REQ_BGL
120
      if write_count != self.context.cfg.write_count:
121
        hm.RunConfigUpdate()
122

    
123
    return result
124

    
125
  def _LockAndExecLU(self, lu, level):
126
    """Execute a Logical Unit, with the needed locks.
127

128
    This is a recursive function that starts locking the given level, and
129
    proceeds up, till there are no more locks to acquire. Then it executes the
130
    given LU and its opcodes.
131

132
    """
133
    if level in lu.needed_locks:
134
      # This gives a chance to LUs to make last-minute changes after acquiring
135
      # locks at any preceding level.
136
      lu.DeclareLocks(level)
137
      needed_locks = lu.needed_locks[level]
138
      share = lu.share_locks[level]
139
      # This is always safe to do, as we can't acquire more/less locks than
140
      # what was requested.
141
      lu.needed_locks[level] = self.context.glm.acquire(level,
142
                                                        needed_locks,
143
                                                        shared=share)
144
      try:
145
        result = self._LockAndExecLU(lu, level + 1)
146
      finally:
147
        if lu.needed_locks[level]:
148
          self.context.glm.release(level)
149
    else:
150
      result = self._ExecLU(lu)
151

    
152
    return result
153

    
154
  def ExecOpCode(self, op, feedback_fn):
155
    """Execute an opcode.
156

157
    Args:
158
      op: the opcode to be executed
159

160
    """
161
    if not isinstance(op, opcodes.OpCode):
162
      raise errors.ProgrammerError("Non-opcode instance passed"
163
                                   " to ExecOpcode")
164

    
165
    self._feedback_fn = feedback_fn
166
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
167
    if lu_class is None:
168
      raise errors.OpCodeUnknown("Unknown opcode")
169

    
170
    if lu_class.REQ_WSSTORE:
171
      sstore = ssconf.WritableSimpleStore()
172
    else:
173
      sstore = ssconf.SimpleStore()
174

    
175
    # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
176
    # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
177
    self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
178
                             shared=not lu_class.REQ_BGL)
179
    try:
180
      self.exclusive_BGL = lu_class.REQ_BGL
181
      lu = lu_class(self, op, self.context, sstore)
182
      lu.ExpandNames()
183
      assert lu.needed_locks is not None, "needed_locks not set by LU"
184
      result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
185
    finally:
186
      self.context.glm.release(locking.LEVEL_CLUSTER)
187
      self.exclusive_BGL = False
188

    
189
    return result
190

    
191
  def ChainOpCode(self, op):
192
    """Chain and execute an opcode.
193

194
    This is used by LUs when they need to execute a child LU.
195

196
    Args:
197
     - opcode: the opcode to be executed
198

199
    """
200
    if not isinstance(op, opcodes.OpCode):
201
      raise errors.ProgrammerError("Non-opcode instance passed"
202
                                   " to ExecOpcode")
203

    
204
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
205
    if lu_class is None:
206
      raise errors.OpCodeUnknown("Unknown opcode")
207

    
208
    if lu_class.REQ_BGL and not self.exclusive_BGL:
209
      raise errors.ProgrammerError("LUs which require the BGL cannot"
210
                                   " be chained to granular ones.")
211

    
212
    if lu_class.REQ_WSSTORE:
213
      sstore = ssconf.WritableSimpleStore()
214
    else:
215
      sstore = ssconf.SimpleStore()
216

    
217
    #do_hooks = lu_class.HPATH is not None
218
    lu = lu_class(self, op, self.context, sstore)
219
    lu.CheckPrereq()
220
    #if do_hooks:
221
    #  hm = HooksMaster(rpc.call_hooks_runner, self, lu)
222
    #  h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
223
    #  lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
224
    #                   h_results, self._feedback_fn, None)
225
    result = lu.Exec(self._feedback_fn)
226
    #if do_hooks:
227
    #  h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
228
    #  result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
229
    #                   h_results, self._feedback_fn, result)
230
    return result
231

    
232
  def LogStep(self, current, total, message):
233
    """Log a change in LU execution progress.
234

235
    """
236
    logger.Debug("Step %d/%d %s" % (current, total, message))
237
    self._feedback_fn("STEP %d/%d %s" % (current, total, message))
238

    
239
  def LogWarning(self, message, hint=None):
240
    """Log a warning to the logs and the user.
241

242
    """
243
    logger.Error(message)
244
    self._feedback_fn(" - WARNING: %s" % message)
245
    if hint:
246
      self._feedback_fn("      Hint: %s" % hint)
247

    
248
  def LogInfo(self, message):
249
    """Log an informational message to the logs and the user.
250

251
    """
252
    logger.Info(message)
253
    self._feedback_fn(" - INFO: %s" % message)
254

    
255

    
256
class HooksMaster(object):
257
  """Hooks master.
258

259
  This class distributes the run commands to the nodes based on the
260
  specific LU class.
261

262
  In order to remove the direct dependency on the rpc module, the
263
  constructor needs a function which actually does the remote
264
  call. This will usually be rpc.call_hooks_runner, but any function
265
  which behaves the same works.
266

267
  """
268
  def __init__(self, callfn, proc, lu):
269
    self.callfn = callfn
270
    self.proc = proc
271
    self.lu = lu
272
    self.op = lu.op
273
    self.env, node_list_pre, node_list_post = self._BuildEnv()
274
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
275
                      constants.HOOKS_PHASE_POST: node_list_post}
276

    
277
  def _BuildEnv(self):
278
    """Compute the environment and the target nodes.
279

280
    Based on the opcode and the current node list, this builds the
281
    environment for the hooks and the target node list for the run.
282

283
    """
284
    env = {
285
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
286
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
287
      "GANETI_OP_CODE": self.op.OP_ID,
288
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
289
      "GANETI_DATA_DIR": constants.DATA_DIR,
290
      }
291

    
292
    if self.lu.HPATH is not None:
293
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
294
      if lu_env:
295
        for key in lu_env:
296
          env["GANETI_" + key] = lu_env[key]
297
    else:
298
      lu_nodes_pre = lu_nodes_post = []
299

    
300
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
301

    
302
  def _RunWrapper(self, node_list, hpath, phase):
303
    """Simple wrapper over self.callfn.
304

305
    This method fixes the environment before doing the rpc call.
306

307
    """
308
    env = self.env.copy()
309
    env["GANETI_HOOKS_PHASE"] = phase
310
    env["GANETI_HOOKS_PATH"] = hpath
311
    if self.lu.sstore is not None:
312
      env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
313
      env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
314

    
315
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
316

    
317
    return self.callfn(node_list, hpath, phase, env)
318

    
319
  def RunPhase(self, phase):
320
    """Run all the scripts for a phase.
321

322
    This is the main function of the HookMaster.
323

324
    Args:
325
      phase: the hooks phase to run
326

327
    Returns:
328
      the result of the hooks multi-node rpc call
329

330
    """
331
    if not self.node_list[phase]:
332
      # empty node list, we should not attempt to run this as either
333
      # we're in the cluster init phase and the rpc client part can't
334
      # even attempt to run, or this LU doesn't do hooks at all
335
      return
336
    hpath = self.lu.HPATH
337
    results = self._RunWrapper(self.node_list[phase], hpath, phase)
338
    if phase == constants.HOOKS_PHASE_PRE:
339
      errs = []
340
      if not results:
341
        raise errors.HooksFailure("Communication failure")
342
      for node_name in results:
343
        res = results[node_name]
344
        if res is False or not isinstance(res, list):
345
          self.proc.LogWarning("Communication failure to node %s" % node_name)
346
          continue
347
        for script, hkr, output in res:
348
          if hkr == constants.HKR_FAIL:
349
            output = output.strip().encode("string_escape")
350
            errs.append((node_name, script, output))
351
      if errs:
352
        raise errors.HooksAbort(errs)
353
    return results
354

    
355
  def RunConfigUpdate(self):
356
    """Run the special configuration update hook
357

358
    This is a special hook that runs only on the master after each
359
    top-level LI if the configuration has been updated.
360

361
    """
362
    phase = constants.HOOKS_PHASE_POST
363
    hpath = constants.HOOKS_NAME_CFGUPDATE
364
    if self.lu.sstore is None:
365
      raise errors.ProgrammerError("Null sstore on config update hook")
366
    nodes = [self.lu.sstore.GetMasterNode()]
367
    results = self._RunWrapper(nodes, hpath, phase)