Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 76aef8fc

History | View | Annotate | Download (13.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import logging
32

    
33
from ganeti import opcodes
34
from ganeti import constants
35
from ganeti import errors
36
from ganeti import rpc
37
from ganeti import cmdlib
38
from ganeti import locking
39

    
40

    
41
class Processor(object):
42
  """Object which runs OpCodes"""
43
  DISPATCH_TABLE = {
44
    # Cluster
45
    opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
46
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
47
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
48
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
49
    opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
50
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
51
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
52
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
53
    opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
54
    opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
55
    # node lu
56
    opcodes.OpAddNode: cmdlib.LUAddNode,
57
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
58
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
59
    opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
60
    opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
61
    opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
62
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
63
    opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
64
    opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
65
    opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
66
    opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
67
    # instance lu
68
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
69
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
70
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
71
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
72
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
73
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
74
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
75
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
76
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
77
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
78
    opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
79
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
80
    opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
81
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
82
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
83
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
84
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
85
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
86
    # os lu
87
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
88
    # exports lu
89
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
90
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
91
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
92
    # tags lu
93
    opcodes.OpGetTags: cmdlib.LUGetTags,
94
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
95
    opcodes.OpAddTags: cmdlib.LUAddTags,
96
    opcodes.OpDelTags: cmdlib.LUDelTags,
97
    # test lu
98
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
99
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
100
    }
101

    
102
  def __init__(self, context):
103
    """Constructor for Processor
104

105
    Args:
106
     - feedback_fn: the feedback function (taking one string) to be run when
107
                    interesting events are happening
108
    """
109
    self.context = context
110
    self._feedback_fn = None
111
    self.exclusive_BGL = False
112
    self.rpc = rpc.RpcRunner(context.cfg)
113

    
114
  def _ExecLU(self, lu):
115
    """Logical Unit execution sequence.
116

117
    """
118
    write_count = self.context.cfg.write_count
119
    lu.CheckPrereq()
120
    hm = HooksMaster(self.rpc.call_hooks_runner, self, lu)
121
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
122
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
123
                     self._feedback_fn, None)
124

    
125
    if getattr(lu.op, "dry_run", False):
126
      # in this mode, no post-hooks are run, and the config is not
127
      # written (as it might have been modified by another LU, and we
128
      # shouldn't do writeout on behalf of other threads
129
      self.LogInfo("dry-run mode requested, not actually executing"
130
                   " the operation")
131
      return lu.dry_run_result
132

    
133
    try:
134
      result = lu.Exec(self._feedback_fn)
135
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
136
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
137
                                self._feedback_fn, result)
138
    finally:
139
      # FIXME: This needs locks if not lu_class.REQ_BGL
140
      if write_count != self.context.cfg.write_count:
141
        hm.RunConfigUpdate()
142

    
143
    return result
144

    
145
  def _LockAndExecLU(self, lu, level):
146
    """Execute a Logical Unit, with the needed locks.
147

148
    This is a recursive function that starts locking the given level, and
149
    proceeds up, till there are no more locks to acquire. Then it executes the
150
    given LU and its opcodes.
151

152
    """
153
    adding_locks = level in lu.add_locks
154
    acquiring_locks = level in lu.needed_locks
155
    if level not in locking.LEVELS:
156
      if callable(self._run_notifier):
157
        self._run_notifier()
158
      result = self._ExecLU(lu)
159
    elif adding_locks and acquiring_locks:
160
      # We could both acquire and add locks at the same level, but for now we
161
      # don't need this, so we'll avoid the complicated code needed.
162
      raise NotImplementedError(
163
        "Can't declare locks to acquire when adding others")
164
    elif adding_locks or acquiring_locks:
165
      lu.DeclareLocks(level)
166
      share = lu.share_locks[level]
167
      if acquiring_locks:
168
        needed_locks = lu.needed_locks[level]
169
        lu.acquired_locks[level] = self.context.glm.acquire(level,
170
                                                            needed_locks,
171
                                                            shared=share)
172
      else: # adding_locks
173
        add_locks = lu.add_locks[level]
174
        lu.remove_locks[level] = add_locks
175
        try:
176
          self.context.glm.add(level, add_locks, acquired=1, shared=share)
177
        except errors.LockError:
178
          raise errors.OpPrereqError(
179
            "Couldn't add locks (%s), probably because of a race condition"
180
            " with another job, who added them first" % add_locks)
181
      try:
182
        try:
183
          if adding_locks:
184
            lu.acquired_locks[level] = add_locks
185
          result = self._LockAndExecLU(lu, level + 1)
186
        finally:
187
          if level in lu.remove_locks:
188
            self.context.glm.remove(level, lu.remove_locks[level])
189
      finally:
190
        if self.context.glm.is_owned(level):
191
          self.context.glm.release(level)
192
    else:
193
      result = self._LockAndExecLU(lu, level + 1)
194

    
195
    return result
196

    
197
  def ExecOpCode(self, op, feedback_fn, run_notifier):
198
    """Execute an opcode.
199

200
    @type op: an OpCode instance
201
    @param op: the opcode to be executed
202
    @type feedback_fn: a function that takes a single argument
203
    @param feedback_fn: this function will be used as feedback from the LU
204
                        code to the end-user
205
    @type run_notifier: callable (no arguments) or None
206
    @param run_notifier:  this function (if callable) will be called when
207
                          we are about to call the lu's Exec() method, that
208
                          is, after we have acquired all locks
209

210
    """
211
    if not isinstance(op, opcodes.OpCode):
212
      raise errors.ProgrammerError("Non-opcode instance passed"
213
                                   " to ExecOpcode")
214

    
215
    self._feedback_fn = feedback_fn
216
    self._run_notifier = run_notifier
217
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
218
    if lu_class is None:
219
      raise errors.OpCodeUnknown("Unknown opcode")
220

    
221
    # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
222
    # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
223
    self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
224
                             shared=not lu_class.REQ_BGL)
225
    try:
226
      self.exclusive_BGL = lu_class.REQ_BGL
227
      lu = lu_class(self, op, self.context, self.rpc)
228
      lu.ExpandNames()
229
      assert lu.needed_locks is not None, "needed_locks not set by LU"
230
      result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
231
    finally:
232
      self.context.glm.release(locking.LEVEL_CLUSTER)
233
      self.exclusive_BGL = False
234

    
235
    return result
236

    
237
  def LogStep(self, current, total, message):
238
    """Log a change in LU execution progress.
239

240
    """
241
    logging.debug("Step %d/%d %s", current, total, message)
242
    self._feedback_fn("STEP %d/%d %s" % (current, total, message))
243

    
244
  def LogWarning(self, message, *args, **kwargs):
245
    """Log a warning to the logs and the user.
246

247
    The optional keyword argument is 'hint' and can be used to show a
248
    hint to the user (presumably related to the warning). If the
249
    message is empty, it will not be printed at all, allowing one to
250
    show only a hint.
251

252
    """
253
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
254
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
255
    if args:
256
      message = message % tuple(args)
257
    if message:
258
      logging.warning(message)
259
      self._feedback_fn(" - WARNING: %s" % message)
260
    if "hint" in kwargs:
261
      self._feedback_fn("      Hint: %s" % kwargs["hint"])
262

    
263
  def LogInfo(self, message, *args):
264
    """Log an informational message to the logs and the user.
265

266
    """
267
    if args:
268
      message = message % tuple(args)
269
    logging.info(message)
270
    self._feedback_fn(" - INFO: %s" % message)
271

    
272

    
273
class HooksMaster(object):
274
  """Hooks master.
275

276
  This class distributes the run commands to the nodes based on the
277
  specific LU class.
278

279
  In order to remove the direct dependency on the rpc module, the
280
  constructor needs a function which actually does the remote
281
  call. This will usually be rpc.call_hooks_runner, but any function
282
  which behaves the same works.
283

284
  """
285
  def __init__(self, callfn, proc, lu):
286
    self.callfn = callfn
287
    self.proc = proc
288
    self.lu = lu
289
    self.op = lu.op
290
    self.env, node_list_pre, node_list_post = self._BuildEnv()
291
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
292
                      constants.HOOKS_PHASE_POST: node_list_post}
293

    
294
  def _BuildEnv(self):
295
    """Compute the environment and the target nodes.
296

297
    Based on the opcode and the current node list, this builds the
298
    environment for the hooks and the target node list for the run.
299

300
    """
301
    env = {
302
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
303
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
304
      "GANETI_OP_CODE": self.op.OP_ID,
305
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
306
      "GANETI_DATA_DIR": constants.DATA_DIR,
307
      }
308

    
309
    if self.lu.HPATH is not None:
310
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
311
      if lu_env:
312
        for key in lu_env:
313
          env["GANETI_" + key] = lu_env[key]
314
    else:
315
      lu_nodes_pre = lu_nodes_post = []
316

    
317
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
318

    
319
  def _RunWrapper(self, node_list, hpath, phase):
320
    """Simple wrapper over self.callfn.
321

322
    This method fixes the environment before doing the rpc call.
323

324
    """
325
    env = self.env.copy()
326
    env["GANETI_HOOKS_PHASE"] = phase
327
    env["GANETI_HOOKS_PATH"] = hpath
328
    if self.lu.cfg is not None:
329
      env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
330
      env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
331

    
332
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
333

    
334
    return self.callfn(node_list, hpath, phase, env)
335

    
336
  def RunPhase(self, phase):
337
    """Run all the scripts for a phase.
338

339
    This is the main function of the HookMaster.
340

341
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
342
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
343
    @return: the processed results of the hooks multi-node rpc call
344
    @raise errors.HooksFailure: on communication failure to the nodes
345

346
    """
347
    if not self.node_list[phase]:
348
      # empty node list, we should not attempt to run this as either
349
      # we're in the cluster init phase and the rpc client part can't
350
      # even attempt to run, or this LU doesn't do hooks at all
351
      return
352
    hpath = self.lu.HPATH
353
    results = self._RunWrapper(self.node_list[phase], hpath, phase)
354
    if phase == constants.HOOKS_PHASE_PRE:
355
      errs = []
356
      if not results:
357
        raise errors.HooksFailure("Communication failure")
358
      for node_name in results:
359
        res = results[node_name]
360
        if res.offline:
361
          continue
362
        msg = res.RemoteFailMsg()
363
        if msg:
364
          self.proc.LogWarning("Communication failure to node %s: %s",
365
                               node_name, msg)
366
          continue
367
        for script, hkr, output in res.payload:
368
          if hkr == constants.HKR_FAIL:
369
            errs.append((node_name, script, output))
370
      if errs:
371
        raise errors.HooksAbort(errs)
372
    return results
373

    
374
  def RunConfigUpdate(self):
375
    """Run the special configuration update hook
376

377
    This is a special hook that runs only on the master after each
378
    top-level LI if the configuration has been updated.
379

380
    """
381
    phase = constants.HOOKS_PHASE_POST
382
    hpath = constants.HOOKS_NAME_CFGUPDATE
383
    nodes = [self.lu.cfg.GetMasterNode()]
384
    self._RunWrapper(nodes, hpath, phase)