Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 17e82923

History | View | Annotate | Download (14.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import logging
32

    
33
from ganeti import opcodes
34
from ganeti import constants
35
from ganeti import errors
36
from ganeti import rpc
37
from ganeti import cmdlib
38
from ganeti import locking
39

    
40

    
41
class Processor(object):
42
  """Object which runs OpCodes"""
43
  DISPATCH_TABLE = {
44
    # Cluster
45
    opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
46
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
47
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
48
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
49
    opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
50
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
51
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
52
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
53
    opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
54
    opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
55
    # node lu
56
    opcodes.OpAddNode: cmdlib.LUAddNode,
57
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
58
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
59
    opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
60
    opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
61
    opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
62
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
63
    opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
64
    opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
65
    opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
66
    opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
67
    # instance lu
68
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
69
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
70
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
71
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
72
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
73
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
74
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
75
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
76
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
77
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
78
    opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
79
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
80
    opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
81
    opcodes.OpMoveInstance: cmdlib.LUMoveInstance,
82
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
83
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
84
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
85
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
86
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
87
    # os lu
88
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
89
    # exports lu
90
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
91
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
92
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
93
    # tags lu
94
    opcodes.OpGetTags: cmdlib.LUGetTags,
95
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
96
    opcodes.OpAddTags: cmdlib.LUAddTags,
97
    opcodes.OpDelTags: cmdlib.LUDelTags,
98
    # test lu
99
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
100
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
101
    }
102

    
103
  def __init__(self, context):
104
    """Constructor for Processor
105

106
    Args:
107
     - feedback_fn: the feedback function (taking one string) to be run when
108
                    interesting events are happening
109
    """
110
    self.context = context
111
    self._feedback_fn = None
112
    self.exclusive_BGL = False
113
    self.rpc = rpc.RpcRunner(context.cfg)
114

    
115
  def _ExecLU(self, lu):
116
    """Logical Unit execution sequence.
117

118
    """
119
    write_count = self.context.cfg.write_count
120
    lu.CheckPrereq()
121
    hm = HooksMaster(self.rpc.call_hooks_runner, lu)
122
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
123
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
124
                     self._feedback_fn, None)
125

    
126
    if getattr(lu.op, "dry_run", False):
127
      # in this mode, no post-hooks are run, and the config is not
128
      # written (as it might have been modified by another LU, and we
129
      # shouldn't do writeout on behalf of other threads
130
      self.LogInfo("dry-run mode requested, not actually executing"
131
                   " the operation")
132
      return lu.dry_run_result
133

    
134
    try:
135
      result = lu.Exec(self._feedback_fn)
136
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
137
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
138
                                self._feedback_fn, result)
139
    finally:
140
      # FIXME: This needs locks if not lu_class.REQ_BGL
141
      if write_count != self.context.cfg.write_count:
142
        hm.RunConfigUpdate()
143

    
144
    return result
145

    
146
  def _LockAndExecLU(self, lu, level):
147
    """Execute a Logical Unit, with the needed locks.
148

149
    This is a recursive function that starts locking the given level, and
150
    proceeds up, till there are no more locks to acquire. Then it executes the
151
    given LU and its opcodes.
152

153
    """
154
    adding_locks = level in lu.add_locks
155
    acquiring_locks = level in lu.needed_locks
156
    if level not in locking.LEVELS:
157
      if callable(self._run_notifier):
158
        self._run_notifier()
159
      result = self._ExecLU(lu)
160
    elif adding_locks and acquiring_locks:
161
      # We could both acquire and add locks at the same level, but for now we
162
      # don't need this, so we'll avoid the complicated code needed.
163
      raise NotImplementedError(
164
        "Can't declare locks to acquire when adding others")
165
    elif adding_locks or acquiring_locks:
166
      lu.DeclareLocks(level)
167
      share = lu.share_locks[level]
168
      if acquiring_locks:
169
        needed_locks = lu.needed_locks[level]
170
        lu.acquired_locks[level] = self.context.glm.acquire(level,
171
                                                            needed_locks,
172
                                                            shared=share)
173
      else: # adding_locks
174
        add_locks = lu.add_locks[level]
175
        lu.remove_locks[level] = add_locks
176
        try:
177
          self.context.glm.add(level, add_locks, acquired=1, shared=share)
178
        except errors.LockError:
179
          raise errors.OpPrereqError(
180
            "Couldn't add locks (%s), probably because of a race condition"
181
            " with another job, who added them first" % add_locks)
182
      try:
183
        try:
184
          if adding_locks:
185
            lu.acquired_locks[level] = add_locks
186
          result = self._LockAndExecLU(lu, level + 1)
187
        finally:
188
          if level in lu.remove_locks:
189
            self.context.glm.remove(level, lu.remove_locks[level])
190
      finally:
191
        if self.context.glm.is_owned(level):
192
          self.context.glm.release(level)
193
    else:
194
      result = self._LockAndExecLU(lu, level + 1)
195

    
196
    return result
197

    
198
  def ExecOpCode(self, op, feedback_fn, run_notifier):
199
    """Execute an opcode.
200

201
    @type op: an OpCode instance
202
    @param op: the opcode to be executed
203
    @type feedback_fn: a function that takes a single argument
204
    @param feedback_fn: this function will be used as feedback from the LU
205
                        code to the end-user
206
    @type run_notifier: callable (no arguments) or None
207
    @param run_notifier:  this function (if callable) will be called when
208
                          we are about to call the lu's Exec() method, that
209
                          is, after we have acquired all locks
210

211
    """
212
    if not isinstance(op, opcodes.OpCode):
213
      raise errors.ProgrammerError("Non-opcode instance passed"
214
                                   " to ExecOpcode")
215

    
216
    self._feedback_fn = feedback_fn
217
    self._run_notifier = run_notifier
218
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
219
    if lu_class is None:
220
      raise errors.OpCodeUnknown("Unknown opcode")
221

    
222
    # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
223
    # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
224
    self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
225
                             shared=not lu_class.REQ_BGL)
226
    try:
227
      self.exclusive_BGL = lu_class.REQ_BGL
228
      lu = lu_class(self, op, self.context, self.rpc)
229
      lu.ExpandNames()
230
      assert lu.needed_locks is not None, "needed_locks not set by LU"
231
      result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
232
    finally:
233
      self.context.glm.release(locking.LEVEL_CLUSTER)
234
      self.exclusive_BGL = False
235

    
236
    return result
237

    
238
  def LogStep(self, current, total, message):
239
    """Log a change in LU execution progress.
240

241
    """
242
    logging.debug("Step %d/%d %s", current, total, message)
243
    self._feedback_fn("STEP %d/%d %s" % (current, total, message))
244

    
245
  def LogWarning(self, message, *args, **kwargs):
246
    """Log a warning to the logs and the user.
247

248
    The optional keyword argument is 'hint' and can be used to show a
249
    hint to the user (presumably related to the warning). If the
250
    message is empty, it will not be printed at all, allowing one to
251
    show only a hint.
252

253
    """
254
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
255
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
256
    if args:
257
      message = message % tuple(args)
258
    if message:
259
      logging.warning(message)
260
      self._feedback_fn(" - WARNING: %s" % message)
261
    if "hint" in kwargs:
262
      self._feedback_fn("      Hint: %s" % kwargs["hint"])
263

    
264
  def LogInfo(self, message, *args):
265
    """Log an informational message to the logs and the user.
266

267
    """
268
    if args:
269
      message = message % tuple(args)
270
    logging.info(message)
271
    self._feedback_fn(" - INFO: %s" % message)
272

    
273

    
274
class HooksMaster(object):
275
  """Hooks master.
276

277
  This class distributes the run commands to the nodes based on the
278
  specific LU class.
279

280
  In order to remove the direct dependency on the rpc module, the
281
  constructor needs a function which actually does the remote
282
  call. This will usually be rpc.call_hooks_runner, but any function
283
  which behaves the same works.
284

285
  """
286
  def __init__(self, callfn, lu):
287
    self.callfn = callfn
288
    self.lu = lu
289
    self.op = lu.op
290
    self.env, node_list_pre, node_list_post = self._BuildEnv()
291
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
292
                      constants.HOOKS_PHASE_POST: node_list_post}
293

    
294
  def _BuildEnv(self):
295
    """Compute the environment and the target nodes.
296

297
    Based on the opcode and the current node list, this builds the
298
    environment for the hooks and the target node list for the run.
299

300
    """
301
    env = {
302
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
303
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
304
      "GANETI_OP_CODE": self.op.OP_ID,
305
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
306
      "GANETI_DATA_DIR": constants.DATA_DIR,
307
      }
308

    
309
    if self.lu.HPATH is not None:
310
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
311
      if lu_env:
312
        for key in lu_env:
313
          env["GANETI_" + key] = lu_env[key]
314
    else:
315
      lu_nodes_pre = lu_nodes_post = []
316

    
317
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
318

    
319
  def _RunWrapper(self, node_list, hpath, phase):
320
    """Simple wrapper over self.callfn.
321

322
    This method fixes the environment before doing the rpc call.
323

324
    """
325
    env = self.env.copy()
326
    env["GANETI_HOOKS_PHASE"] = phase
327
    env["GANETI_HOOKS_PATH"] = hpath
328
    if self.lu.cfg is not None:
329
      env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
330
      env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
331

    
332
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
333

    
334
    return self.callfn(node_list, hpath, phase, env)
335

    
336
  def RunPhase(self, phase, nodes=None):
337
    """Run all the scripts for a phase.
338

339
    This is the main function of the HookMaster.
340

341
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
342
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
343
    @param nodes: overrides the predefined list of nodes for the given phase
344
    @return: the processed results of the hooks multi-node rpc call
345
    @raise errors.HooksFailure: on communication failure to the nodes
346

347
    """
348
    if not self.node_list[phase] and not nodes:
349
      # empty node list, we should not attempt to run this as either
350
      # we're in the cluster init phase and the rpc client part can't
351
      # even attempt to run, or this LU doesn't do hooks at all
352
      return
353
    hpath = self.lu.HPATH
354
    if nodes is not None:
355
      results = self._RunWrapper(nodes, hpath, phase)
356
    else:
357
      results = self._RunWrapper(self.node_list[phase], hpath, phase)
358
    if phase == constants.HOOKS_PHASE_PRE:
359
      errs = []
360
      if not results:
361
        raise errors.HooksFailure("Communication failure")
362
      for node_name in results:
363
        res = results[node_name]
364
        if res.offline:
365
          continue
366
        msg = res.RemoteFailMsg()
367
        if msg:
368
          self.lu.LogWarning("Communication failure to node %s: %s",
369
                             node_name, msg)
370
          continue
371
        for script, hkr, output in res.payload:
372
          if hkr == constants.HKR_FAIL:
373
            errs.append((node_name, script, output))
374
      if errs:
375
        raise errors.HooksAbort(errs)
376
    return results
377

    
378
  def RunConfigUpdate(self):
379
    """Run the special configuration update hook
380

381
    This is a special hook that runs only on the master after each
382
    top-level LI if the configuration has been updated.
383

384
    """
385
    phase = constants.HOOKS_PHASE_POST
386
    hpath = constants.HOOKS_NAME_CFGUPDATE
387
    nodes = [self.lu.cfg.GetMasterNode()]
388
    self._RunWrapper(nodes, hpath, phase)