Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 20777413

History | View | Annotate | Download (13.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import logging
32

    
33
from ganeti import opcodes
34
from ganeti import constants
35
from ganeti import errors
36
from ganeti import rpc
37
from ganeti import cmdlib
38
from ganeti import locking
39

    
40

    
41
class Processor(object):
42
  """Object which runs OpCodes"""
43
  DISPATCH_TABLE = {
44
    # Cluster
45
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
46
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
47
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
48
    opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
49
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
50
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
51
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
52
    opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
53
    # node lu
54
    opcodes.OpAddNode: cmdlib.LUAddNode,
55
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
56
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
57
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
58
    opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
59
    opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
60
    # instance lu
61
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
62
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
63
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
64
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
65
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
66
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
67
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
68
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
69
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
70
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
71
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
72
    opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
73
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
74
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
75
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
76
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
77
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
78
    # os lu
79
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
80
    # exports lu
81
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
82
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
83
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
84
    # tags lu
85
    opcodes.OpGetTags: cmdlib.LUGetTags,
86
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
87
    opcodes.OpAddTags: cmdlib.LUAddTags,
88
    opcodes.OpDelTags: cmdlib.LUDelTags,
89
    # test lu
90
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
91
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
92
    }
93

    
94
  def __init__(self, context):
95
    """Constructor for Processor
96

97
    Args:
98
     - feedback_fn: the feedback function (taking one string) to be run when
99
                    interesting events are happening
100
    """
101
    self.context = context
102
    self._feedback_fn = None
103
    self.exclusive_BGL = False
104
    self.rpc = rpc.RpcRunner(context.cfg)
105

    
106
  def _ExecLU(self, lu):
107
    """Logical Unit execution sequence.
108

109
    """
110
    write_count = self.context.cfg.write_count
111
    lu.CheckPrereq()
112
    hm = HooksMaster(self.rpc.call_hooks_runner, self, lu)
113
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
114
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
115
                     self._feedback_fn, None)
116

    
117
    if getattr(lu.op, "dry_run", False):
118
      # in this mode, no post-hooks are run, and the config is not
119
      # written (as it might have been modified by another LU, and we
120
      # shouldn't do writeout on behalf of other threads
121
      self.LogInfo("dry-run mode requested, not actually executing"
122
                   " the operation")
123
      return lu.dry_run_result
124

    
125
    try:
126
      result = lu.Exec(self._feedback_fn)
127
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
128
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
129
                                self._feedback_fn, result)
130
    finally:
131
      # FIXME: This needs locks if not lu_class.REQ_BGL
132
      if write_count != self.context.cfg.write_count:
133
        hm.RunConfigUpdate()
134

    
135
    return result
136

    
137
  def _LockAndExecLU(self, lu, level):
138
    """Execute a Logical Unit, with the needed locks.
139

140
    This is a recursive function that starts locking the given level, and
141
    proceeds up, till there are no more locks to acquire. Then it executes the
142
    given LU and its opcodes.
143

144
    """
145
    adding_locks = level in lu.add_locks
146
    acquiring_locks = level in lu.needed_locks
147
    if level not in locking.LEVELS:
148
      if callable(self._run_notifier):
149
        self._run_notifier()
150
      result = self._ExecLU(lu)
151
    elif adding_locks and acquiring_locks:
152
      # We could both acquire and add locks at the same level, but for now we
153
      # don't need this, so we'll avoid the complicated code needed.
154
      raise NotImplementedError(
155
        "Can't declare locks to acquire when adding others")
156
    elif adding_locks or acquiring_locks:
157
      lu.DeclareLocks(level)
158
      share = lu.share_locks[level]
159
      if acquiring_locks:
160
        needed_locks = lu.needed_locks[level]
161
        lu.acquired_locks[level] = self.context.glm.acquire(level,
162
                                                            needed_locks,
163
                                                            shared=share)
164
      else: # adding_locks
165
        add_locks = lu.add_locks[level]
166
        lu.remove_locks[level] = add_locks
167
        try:
168
          self.context.glm.add(level, add_locks, acquired=1, shared=share)
169
        except errors.LockError:
170
          raise errors.OpPrereqError(
171
            "Coudn't add locks (%s), probably because of a race condition"
172
            " with another job, who added them first" % add_locks)
173
      try:
174
        try:
175
          if adding_locks:
176
            lu.acquired_locks[level] = add_locks
177
          result = self._LockAndExecLU(lu, level + 1)
178
        finally:
179
          if level in lu.remove_locks:
180
            self.context.glm.remove(level, lu.remove_locks[level])
181
      finally:
182
        if self.context.glm.is_owned(level):
183
          self.context.glm.release(level)
184
    else:
185
      result = self._LockAndExecLU(lu, level + 1)
186

    
187
    return result
188

    
189
  def ExecOpCode(self, op, feedback_fn, run_notifier):
190
    """Execute an opcode.
191

192
    @type op: an OpCode instance
193
    @param op: the opcode to be executed
194
    @type feedback_fn: a function that takes a single argument
195
    @param feedback_fn: this function will be used as feedback from the LU
196
                        code to the end-user
197
    @type run_notifier: callable (no arguments) or None
198
    @param run_notifier:  this function (if callable) will be called when
199
                          we are about to call the lu's Exec() method, that
200
                          is, after we have aquired all locks
201

202
    """
203
    if not isinstance(op, opcodes.OpCode):
204
      raise errors.ProgrammerError("Non-opcode instance passed"
205
                                   " to ExecOpcode")
206

    
207
    self._feedback_fn = feedback_fn
208
    self._run_notifier = run_notifier
209
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
210
    if lu_class is None:
211
      raise errors.OpCodeUnknown("Unknown opcode")
212

    
213
    # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
214
    # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
215
    self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
216
                             shared=not lu_class.REQ_BGL)
217
    try:
218
      self.exclusive_BGL = lu_class.REQ_BGL
219
      lu = lu_class(self, op, self.context, self.rpc)
220
      lu.ExpandNames()
221
      assert lu.needed_locks is not None, "needed_locks not set by LU"
222
      result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
223
    finally:
224
      self.context.glm.release(locking.LEVEL_CLUSTER)
225
      self.exclusive_BGL = False
226

    
227
    return result
228

    
229
  def LogStep(self, current, total, message):
230
    """Log a change in LU execution progress.
231

232
    """
233
    logging.debug("Step %d/%d %s", current, total, message)
234
    self._feedback_fn("STEP %d/%d %s" % (current, total, message))
235

    
236
  def LogWarning(self, message, *args, **kwargs):
237
    """Log a warning to the logs and the user.
238

239
    The optional keyword argument is 'hint' and can be used to show a
240
    hint to the user (presumably related to the warning). If the
241
    message is empty, it will not be printed at all, allowing one to
242
    show only a hint.
243

244
    """
245
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
246
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
247
    if args:
248
      message = message % tuple(args)
249
    if message:
250
      logging.warning(message)
251
      self._feedback_fn(" - WARNING: %s" % message)
252
    if "hint" in kwargs:
253
      self._feedback_fn("      Hint: %s" % kwargs["hint"])
254

    
255
  def LogInfo(self, message, *args):
256
    """Log an informational message to the logs and the user.
257

258
    """
259
    if args:
260
      message = message % tuple(args)
261
    logging.info(message)
262
    self._feedback_fn(" - INFO: %s" % message)
263

    
264

    
265
class HooksMaster(object):
266
  """Hooks master.
267

268
  This class distributes the run commands to the nodes based on the
269
  specific LU class.
270

271
  In order to remove the direct dependency on the rpc module, the
272
  constructor needs a function which actually does the remote
273
  call. This will usually be rpc.call_hooks_runner, but any function
274
  which behaves the same works.
275

276
  """
277
  def __init__(self, callfn, proc, lu):
278
    self.callfn = callfn
279
    self.proc = proc
280
    self.lu = lu
281
    self.op = lu.op
282
    self.env, node_list_pre, node_list_post = self._BuildEnv()
283
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
284
                      constants.HOOKS_PHASE_POST: node_list_post}
285

    
286
  def _BuildEnv(self):
287
    """Compute the environment and the target nodes.
288

289
    Based on the opcode and the current node list, this builds the
290
    environment for the hooks and the target node list for the run.
291

292
    """
293
    env = {
294
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
295
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
296
      "GANETI_OP_CODE": self.op.OP_ID,
297
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
298
      "GANETI_DATA_DIR": constants.DATA_DIR,
299
      }
300

    
301
    if self.lu.HPATH is not None:
302
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
303
      if lu_env:
304
        for key in lu_env:
305
          env["GANETI_" + key] = lu_env[key]
306
    else:
307
      lu_nodes_pre = lu_nodes_post = []
308

    
309
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
310

    
311
  def _RunWrapper(self, node_list, hpath, phase):
312
    """Simple wrapper over self.callfn.
313

314
    This method fixes the environment before doing the rpc call.
315

316
    """
317
    env = self.env.copy()
318
    env["GANETI_HOOKS_PHASE"] = phase
319
    env["GANETI_HOOKS_PATH"] = hpath
320
    if self.lu.cfg is not None:
321
      env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
322
      env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
323

    
324
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
325

    
326
    return self.callfn(node_list, hpath, phase, env)
327

    
328
  def RunPhase(self, phase):
329
    """Run all the scripts for a phase.
330

331
    This is the main function of the HookMaster.
332

333
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
334
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
335
    @return: the processed results of the hooks multi-node rpc call
336
    @raise errors.HooksFailure: on communication failure to the nodes
337

338
    """
339
    if not self.node_list[phase]:
340
      # empty node list, we should not attempt to run this as either
341
      # we're in the cluster init phase and the rpc client part can't
342
      # even attempt to run, or this LU doesn't do hooks at all
343
      return
344
    hpath = self.lu.HPATH
345
    results = self._RunWrapper(self.node_list[phase], hpath, phase)
346
    if phase == constants.HOOKS_PHASE_PRE:
347
      errs = []
348
      if not results:
349
        raise errors.HooksFailure("Communication failure")
350
      for node_name in results:
351
        res = results[node_name]
352
        if res.offline:
353
          continue
354
        msg = res.RemoteFailMsg()
355
        if msg:
356
          self.proc.LogWarning("Communication failure to node %s: %s",
357
                               node_name, msg)
358
          continue
359
        for script, hkr, output in res.payload:
360
          if hkr == constants.HKR_FAIL:
361
            errs.append((node_name, script, output))
362
      if errs:
363
        raise errors.HooksAbort(errs)
364
    return results
365

    
366
  def RunConfigUpdate(self):
367
    """Run the special configuration update hook
368

369
    This is a special hook that runs only on the master after each
370
    top-level LI if the configuration has been updated.
371

372
    """
373
    phase = constants.HOOKS_PHASE_POST
374
    hpath = constants.HOOKS_NAME_CFGUPDATE
375
    nodes = [self.lu.cfg.GetMasterNode()]
376
    self._RunWrapper(nodes, hpath, phase)