Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ b9bddb6b

History | View | Annotate | Download (12.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31

    
32
from ganeti import opcodes
33
from ganeti import constants
34
from ganeti import errors
35
from ganeti import rpc
36
from ganeti import cmdlib
37
from ganeti import logger
38
from ganeti import locking
39

    
40

    
41
class Processor(object):
42
  """Object which runs OpCodes"""
43
  DISPATCH_TABLE = {
44
    # Cluster
45
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
46
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
47
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
48
    opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
49
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
50
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
51
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
52
    # node lu
53
    opcodes.OpAddNode: cmdlib.LUAddNode,
54
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
55
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
56
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
57
    # instance lu
58
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
59
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
60
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
61
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
62
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
63
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
64
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
65
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
66
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
67
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
68
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
69
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
70
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
71
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
72
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
73
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
74
    # os lu
75
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
76
    # exports lu
77
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
78
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
79
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
80
    # tags lu
81
    opcodes.OpGetTags: cmdlib.LUGetTags,
82
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
83
    opcodes.OpAddTags: cmdlib.LUAddTags,
84
    opcodes.OpDelTags: cmdlib.LUDelTags,
85
    # test lu
86
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
87
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
88
    }
89

    
90
  def __init__(self, context):
91
    """Constructor for Processor
92

93
    Args:
94
     - feedback_fn: the feedback function (taking one string) to be run when
95
                    interesting events are happening
96
    """
97
    self.context = context
98
    self._feedback_fn = None
99
    self.exclusive_BGL = False
100

    
101
  def _ExecLU(self, lu):
102
    """Logical Unit execution sequence.
103

104
    """
105
    write_count = self.context.cfg.write_count
106
    lu.CheckPrereq()
107
    hm = HooksMaster(rpc.call_hooks_runner, self, lu)
108
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
109
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
110
                     self._feedback_fn, None)
111
    try:
112
      result = lu.Exec(self._feedback_fn)
113
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
114
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
115
                                self._feedback_fn, result)
116
    finally:
117
      # FIXME: This needs locks if not lu_class.REQ_BGL
118
      if write_count != self.context.cfg.write_count:
119
        hm.RunConfigUpdate()
120

    
121
    return result
122

    
123
  def _LockAndExecLU(self, lu, level):
124
    """Execute a Logical Unit, with the needed locks.
125

126
    This is a recursive function that starts locking the given level, and
127
    proceeds up, till there are no more locks to acquire. Then it executes the
128
    given LU and its opcodes.
129

130
    """
131
    adding_locks = level in lu.add_locks
132
    acquiring_locks = level in lu.needed_locks
133
    if level not in locking.LEVELS:
134
      if callable(self._run_notifier):
135
        self._run_notifier()
136
      result = self._ExecLU(lu)
137
    elif adding_locks and acquiring_locks:
138
      # We could both acquire and add locks at the same level, but for now we
139
      # don't need this, so we'll avoid the complicated code needed.
140
      raise NotImplementedError(
141
        "Can't declare locks to acquire when adding others")
142
    elif adding_locks or acquiring_locks:
143
      lu.DeclareLocks(level)
144
      share = lu.share_locks[level]
145
      if acquiring_locks:
146
        needed_locks = lu.needed_locks[level]
147
        lu.acquired_locks[level] = self.context.glm.acquire(level,
148
                                                            needed_locks,
149
                                                            shared=share)
150
      else: # adding_locks
151
        add_locks = lu.add_locks[level]
152
        lu.remove_locks[level] = add_locks
153
        try:
154
          self.context.glm.add(level, add_locks, acquired=1, shared=share)
155
        except errors.LockError:
156
          raise errors.OpPrereqError(
157
            "Coudn't add locks (%s), probably because of a race condition"
158
            " with another job, who added them first" % add_locks)
159
      try:
160
        try:
161
          if adding_locks:
162
            lu.acquired_locks[level] = add_locks
163
          result = self._LockAndExecLU(lu, level + 1)
164
        finally:
165
          if level in lu.remove_locks:
166
            self.context.glm.remove(level, lu.remove_locks[level])
167
      finally:
168
        if self.context.glm.is_owned(level):
169
          self.context.glm.release(level)
170
    else:
171
      result = self._LockAndExecLU(lu, level + 1)
172

    
173
    return result
174

    
175
  def ExecOpCode(self, op, feedback_fn, run_notifier):
176
    """Execute an opcode.
177

178
    @type op: an OpCode instance
179
    @param op: the opcode to be executed
180
    @type feedback_fn: a function that takes a single argument
181
    @param feedback_fn: this function will be used as feedback from the LU
182
                        code to the end-user
183
    @type run_notifier: callable (no arguments) or None
184
    @param run_notifier:  this function (if callable) will be called when
185
                          we are about to call the lu's Exec() method, that
186
                          is, after we have aquired all locks
187

188
    """
189
    if not isinstance(op, opcodes.OpCode):
190
      raise errors.ProgrammerError("Non-opcode instance passed"
191
                                   " to ExecOpcode")
192

    
193
    self._feedback_fn = feedback_fn
194
    self._run_notifier = run_notifier
195
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
196
    if lu_class is None:
197
      raise errors.OpCodeUnknown("Unknown opcode")
198

    
199
    # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
200
    # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
201
    self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
202
                             shared=not lu_class.REQ_BGL)
203
    try:
204
      self.exclusive_BGL = lu_class.REQ_BGL
205
      lu = lu_class(self, op, self.context)
206
      lu.ExpandNames()
207
      assert lu.needed_locks is not None, "needed_locks not set by LU"
208
      result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
209
    finally:
210
      self.context.glm.release(locking.LEVEL_CLUSTER)
211
      self.exclusive_BGL = False
212

    
213
    return result
214

    
215
  def LogStep(self, current, total, message):
216
    """Log a change in LU execution progress.
217

218
    """
219
    logger.Debug("Step %d/%d %s" % (current, total, message))
220
    self._feedback_fn("STEP %d/%d %s" % (current, total, message))
221

    
222
  def LogWarning(self, message, hint=None):
223
    """Log a warning to the logs and the user.
224

225
    """
226
    logger.Error(message)
227
    self._feedback_fn(" - WARNING: %s" % message)
228
    if hint:
229
      self._feedback_fn("      Hint: %s" % hint)
230

    
231
  def LogInfo(self, message):
232
    """Log an informational message to the logs and the user.
233

234
    """
235
    logger.Info(message)
236
    self._feedback_fn(" - INFO: %s" % message)
237

    
238

    
239
class HooksMaster(object):
240
  """Hooks master.
241

242
  This class distributes the run commands to the nodes based on the
243
  specific LU class.
244

245
  In order to remove the direct dependency on the rpc module, the
246
  constructor needs a function which actually does the remote
247
  call. This will usually be rpc.call_hooks_runner, but any function
248
  which behaves the same works.
249

250
  """
251
  def __init__(self, callfn, proc, lu):
252
    self.callfn = callfn
253
    self.proc = proc
254
    self.lu = lu
255
    self.op = lu.op
256
    self.env, node_list_pre, node_list_post = self._BuildEnv()
257
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
258
                      constants.HOOKS_PHASE_POST: node_list_post}
259

    
260
  def _BuildEnv(self):
261
    """Compute the environment and the target nodes.
262

263
    Based on the opcode and the current node list, this builds the
264
    environment for the hooks and the target node list for the run.
265

266
    """
267
    env = {
268
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
269
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
270
      "GANETI_OP_CODE": self.op.OP_ID,
271
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
272
      "GANETI_DATA_DIR": constants.DATA_DIR,
273
      }
274

    
275
    if self.lu.HPATH is not None:
276
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
277
      if lu_env:
278
        for key in lu_env:
279
          env["GANETI_" + key] = lu_env[key]
280
    else:
281
      lu_nodes_pre = lu_nodes_post = []
282

    
283
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
284

    
285
  def _RunWrapper(self, node_list, hpath, phase):
286
    """Simple wrapper over self.callfn.
287

288
    This method fixes the environment before doing the rpc call.
289

290
    """
291
    env = self.env.copy()
292
    env["GANETI_HOOKS_PHASE"] = phase
293
    env["GANETI_HOOKS_PATH"] = hpath
294
    if self.lu.cfg is not None:
295
      env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
296
      env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
297

    
298
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
299

    
300
    return self.callfn(node_list, hpath, phase, env)
301

    
302
  def RunPhase(self, phase):
303
    """Run all the scripts for a phase.
304

305
    This is the main function of the HookMaster.
306

307
    Args:
308
      phase: the hooks phase to run
309

310
    Returns:
311
      the result of the hooks multi-node rpc call
312

313
    """
314
    if not self.node_list[phase]:
315
      # empty node list, we should not attempt to run this as either
316
      # we're in the cluster init phase and the rpc client part can't
317
      # even attempt to run, or this LU doesn't do hooks at all
318
      return
319
    hpath = self.lu.HPATH
320
    results = self._RunWrapper(self.node_list[phase], hpath, phase)
321
    if phase == constants.HOOKS_PHASE_PRE:
322
      errs = []
323
      if not results:
324
        raise errors.HooksFailure("Communication failure")
325
      for node_name in results:
326
        res = results[node_name]
327
        if res is False or not isinstance(res, list):
328
          self.proc.LogWarning("Communication failure to node %s" % node_name)
329
          continue
330
        for script, hkr, output in res:
331
          if hkr == constants.HKR_FAIL:
332
            output = output.strip().encode("string_escape")
333
            errs.append((node_name, script, output))
334
      if errs:
335
        raise errors.HooksAbort(errs)
336
    return results
337

    
338
  def RunConfigUpdate(self):
339
    """Run the special configuration update hook
340

341
    This is a special hook that runs only on the master after each
342
    top-level LI if the configuration has been updated.
343

344
    """
345
    phase = constants.HOOKS_PHASE_POST
346
    hpath = constants.HOOKS_NAME_CFGUPDATE
347
    nodes = [self.lu.cfg.GetMasterNode()]
348
    results = self._RunWrapper(nodes, hpath, phase)