Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 3fb4f740

History | View | Annotate | Download (13.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import logging
32

    
33
from ganeti import opcodes
34
from ganeti import constants
35
from ganeti import errors
36
from ganeti import rpc
37
from ganeti import cmdlib
38
from ganeti import locking
39

    
40

    
41
class Processor(object):
42
  """Object which runs OpCodes"""
43
  DISPATCH_TABLE = {
44
    # Cluster
45
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
46
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
47
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
48
    opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
49
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
50
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
51
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
52
    opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
53
    # node lu
54
    opcodes.OpAddNode: cmdlib.LUAddNode,
55
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
56
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
57
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
58
    opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
59
    opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
60
    # instance lu
61
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
62
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
63
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
64
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
65
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
66
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
67
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
68
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
69
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
70
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
71
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
72
    opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
73
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
74
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
75
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
76
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
77
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
78
    # os lu
79
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
80
    # exports lu
81
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
82
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
83
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
84
    # tags lu
85
    opcodes.OpGetTags: cmdlib.LUGetTags,
86
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
87
    opcodes.OpAddTags: cmdlib.LUAddTags,
88
    opcodes.OpDelTags: cmdlib.LUDelTags,
89
    # test lu
90
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
91
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
92
    }
93

    
94
  def __init__(self, context):
95
    """Constructor for Processor
96

97
    Args:
98
     - feedback_fn: the feedback function (taking one string) to be run when
99
                    interesting events are happening
100
    """
101
    self.context = context
102
    self._feedback_fn = None
103
    self.exclusive_BGL = False
104
    self.rpc = rpc.RpcRunner(context.cfg)
105

    
106
  def _ExecLU(self, lu):
107
    """Logical Unit execution sequence.
108

109
    """
110
    write_count = self.context.cfg.write_count
111
    lu.CheckPrereq()
112
    hm = HooksMaster(self.rpc.call_hooks_runner, self, lu)
113
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
114
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
115
                     self._feedback_fn, None)
116
    try:
117
      result = lu.Exec(self._feedback_fn)
118
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
119
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
120
                                self._feedback_fn, result)
121
    finally:
122
      # FIXME: This needs locks if not lu_class.REQ_BGL
123
      if write_count != self.context.cfg.write_count:
124
        hm.RunConfigUpdate()
125

    
126
    return result
127

    
128
  def _LockAndExecLU(self, lu, level):
129
    """Execute a Logical Unit, with the needed locks.
130

131
    This is a recursive function that starts locking the given level, and
132
    proceeds up, till there are no more locks to acquire. Then it executes the
133
    given LU and its opcodes.
134

135
    """
136
    adding_locks = level in lu.add_locks
137
    acquiring_locks = level in lu.needed_locks
138
    if level not in locking.LEVELS:
139
      if callable(self._run_notifier):
140
        self._run_notifier()
141
      result = self._ExecLU(lu)
142
    elif adding_locks and acquiring_locks:
143
      # We could both acquire and add locks at the same level, but for now we
144
      # don't need this, so we'll avoid the complicated code needed.
145
      raise NotImplementedError(
146
        "Can't declare locks to acquire when adding others")
147
    elif adding_locks or acquiring_locks:
148
      lu.DeclareLocks(level)
149
      share = lu.share_locks[level]
150
      if acquiring_locks:
151
        needed_locks = lu.needed_locks[level]
152
        lu.acquired_locks[level] = self.context.glm.acquire(level,
153
                                                            needed_locks,
154
                                                            shared=share)
155
      else: # adding_locks
156
        add_locks = lu.add_locks[level]
157
        lu.remove_locks[level] = add_locks
158
        try:
159
          self.context.glm.add(level, add_locks, acquired=1, shared=share)
160
        except errors.LockError:
161
          raise errors.OpPrereqError(
162
            "Coudn't add locks (%s), probably because of a race condition"
163
            " with another job, who added them first" % add_locks)
164
      try:
165
        try:
166
          if adding_locks:
167
            lu.acquired_locks[level] = add_locks
168
          result = self._LockAndExecLU(lu, level + 1)
169
        finally:
170
          if level in lu.remove_locks:
171
            self.context.glm.remove(level, lu.remove_locks[level])
172
      finally:
173
        if self.context.glm.is_owned(level):
174
          self.context.glm.release(level)
175
    else:
176
      result = self._LockAndExecLU(lu, level + 1)
177

    
178
    return result
179

    
180
  def ExecOpCode(self, op, feedback_fn, run_notifier):
181
    """Execute an opcode.
182

183
    @type op: an OpCode instance
184
    @param op: the opcode to be executed
185
    @type feedback_fn: a function that takes a single argument
186
    @param feedback_fn: this function will be used as feedback from the LU
187
                        code to the end-user
188
    @type run_notifier: callable (no arguments) or None
189
    @param run_notifier:  this function (if callable) will be called when
190
                          we are about to call the lu's Exec() method, that
191
                          is, after we have aquired all locks
192

193
    """
194
    if not isinstance(op, opcodes.OpCode):
195
      raise errors.ProgrammerError("Non-opcode instance passed"
196
                                   " to ExecOpcode")
197

    
198
    self._feedback_fn = feedback_fn
199
    self._run_notifier = run_notifier
200
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
201
    if lu_class is None:
202
      raise errors.OpCodeUnknown("Unknown opcode")
203

    
204
    # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
205
    # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
206
    self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
207
                             shared=not lu_class.REQ_BGL)
208
    try:
209
      self.exclusive_BGL = lu_class.REQ_BGL
210
      lu = lu_class(self, op, self.context, self.rpc)
211
      lu.ExpandNames()
212
      assert lu.needed_locks is not None, "needed_locks not set by LU"
213
      result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
214
    finally:
215
      self.context.glm.release(locking.LEVEL_CLUSTER)
216
      self.exclusive_BGL = False
217

    
218
    return result
219

    
220
  def LogStep(self, current, total, message):
221
    """Log a change in LU execution progress.
222

223
    """
224
    logging.debug("Step %d/%d %s", current, total, message)
225
    self._feedback_fn("STEP %d/%d %s" % (current, total, message))
226

    
227
  def LogWarning(self, message, *args, **kwargs):
228
    """Log a warning to the logs and the user.
229

230
    The optional keyword argument is 'hint' and can be used to show a
231
    hint to the user (presumably related to the warning). If the
232
    message is empty, it will not be printed at all, allowing one to
233
    show only a hint.
234

235
    """
236
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
237
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
238
    if args:
239
      message = message % tuple(args)
240
    if message:
241
      logging.warning(message)
242
      self._feedback_fn(" - WARNING: %s" % message)
243
    if "hint" in kwargs:
244
      self._feedback_fn("      Hint: %s" % kwargs["hint"])
245

    
246
  def LogInfo(self, message, *args):
247
    """Log an informational message to the logs and the user.
248

249
    """
250
    if args:
251
      message = message % tuple(args)
252
    logging.info(message)
253
    self._feedback_fn(" - INFO: %s" % message)
254

    
255

    
256
class HooksMaster(object):
257
  """Hooks master.
258

259
  This class distributes the run commands to the nodes based on the
260
  specific LU class.
261

262
  In order to remove the direct dependency on the rpc module, the
263
  constructor needs a function which actually does the remote
264
  call. This will usually be rpc.call_hooks_runner, but any function
265
  which behaves the same works.
266

267
  """
268
  def __init__(self, callfn, proc, lu):
269
    self.callfn = callfn
270
    self.proc = proc
271
    self.lu = lu
272
    self.op = lu.op
273
    self.env, node_list_pre, node_list_post = self._BuildEnv()
274
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
275
                      constants.HOOKS_PHASE_POST: node_list_post}
276

    
277
  def _BuildEnv(self):
278
    """Compute the environment and the target nodes.
279

280
    Based on the opcode and the current node list, this builds the
281
    environment for the hooks and the target node list for the run.
282

283
    """
284
    env = {
285
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
286
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
287
      "GANETI_OP_CODE": self.op.OP_ID,
288
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
289
      "GANETI_DATA_DIR": constants.DATA_DIR,
290
      }
291

    
292
    if self.lu.HPATH is not None:
293
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
294
      if lu_env:
295
        for key in lu_env:
296
          env["GANETI_" + key] = lu_env[key]
297
    else:
298
      lu_nodes_pre = lu_nodes_post = []
299

    
300
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
301

    
302
  def _RunWrapper(self, node_list, hpath, phase):
303
    """Simple wrapper over self.callfn.
304

305
    This method fixes the environment before doing the rpc call.
306

307
    """
308
    env = self.env.copy()
309
    env["GANETI_HOOKS_PHASE"] = phase
310
    env["GANETI_HOOKS_PATH"] = hpath
311
    if self.lu.cfg is not None:
312
      env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
313
      env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
314

    
315
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
316

    
317
    return self.callfn(node_list, hpath, phase, env)
318

    
319
  def RunPhase(self, phase):
320
    """Run all the scripts for a phase.
321

322
    This is the main function of the HookMaster.
323

324
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
325
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
326
    @return: the processed results of the hooks multi-node rpc call
327
    @raise errors.HooksFailure: on communication failure to the nodes
328

329
    """
330
    if not self.node_list[phase]:
331
      # empty node list, we should not attempt to run this as either
332
      # we're in the cluster init phase and the rpc client part can't
333
      # even attempt to run, or this LU doesn't do hooks at all
334
      return
335
    hpath = self.lu.HPATH
336
    results = self._RunWrapper(self.node_list[phase], hpath, phase)
337
    if phase == constants.HOOKS_PHASE_PRE:
338
      errs = []
339
      if not results:
340
        raise errors.HooksFailure("Communication failure")
341
      for node_name in results:
342
        res = results[node_name]
343
        if res.offline:
344
          continue
345
        msg = res.RemoteFailMsg()
346
        if msg:
347
          self.proc.LogWarning("Communication failure to node %s: %s",
348
                               node_name, msg)
349
          continue
350
        for script, hkr, output in res.payload:
351
          if hkr == constants.HKR_FAIL:
352
            errs.append((node_name, script, output))
353
      if errs:
354
        raise errors.HooksAbort(errs)
355
    return results
356

    
357
  def RunConfigUpdate(self):
358
    """Run the special configuration update hook
359

360
    This is a special hook that runs only on the master after each
361
    top-level LI if the configuration has been updated.
362

363
    """
364
    phase = constants.HOOKS_PHASE_POST
365
    hpath = constants.HOOKS_NAME_CFGUPDATE
366
    nodes = [self.lu.cfg.GetMasterNode()]
367
    results = self._RunWrapper(nodes, hpath, phase)