Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 8c4b9364

History | View | Annotate | Download (14.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import logging
32

    
33
from ganeti import opcodes
34
from ganeti import constants
35
from ganeti import errors
36
from ganeti import rpc
37
from ganeti import cmdlib
38
from ganeti import locking
39

    
40

    
41
class Processor(object):
42
  """Object which runs OpCodes"""
43
  DISPATCH_TABLE = {
44
    # Cluster
45
    opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
46
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
47
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
48
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
49
    opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
50
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
51
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
52
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
53
    opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
54
    opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
55
    # node lu
56
    opcodes.OpAddNode: cmdlib.LUAddNode,
57
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
58
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
59
    opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
60
    opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
61
    opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
62
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
63
    opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
64
    opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
65
    opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
66
    opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
67
    # instance lu
68
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
69
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
70
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
71
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
72
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
73
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
74
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
75
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
76
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
77
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
78
    opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
79
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
80
    opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
81
    opcodes.OpMoveInstance: cmdlib.LUMoveInstance,
82
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
83
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
84
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
85
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
86
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
87
    # os lu
88
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
89
    # exports lu
90
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
91
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
92
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
93
    # tags lu
94
    opcodes.OpGetTags: cmdlib.LUGetTags,
95
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
96
    opcodes.OpAddTags: cmdlib.LUAddTags,
97
    opcodes.OpDelTags: cmdlib.LUDelTags,
98
    # test lu
99
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
100
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
101
    }
102

    
103
  def __init__(self, context):
104
    """Constructor for Processor
105

106
    Args:
107
     - feedback_fn: the feedback function (taking one string) to be run when
108
                    interesting events are happening
109
    """
110
    self.context = context
111
    self._feedback_fn = None
112
    self.exclusive_BGL = False
113
    self.rpc = rpc.RpcRunner(context.cfg)
114
    self.hmclass = HooksMaster
115

    
116
  def _ExecLU(self, lu):
117
    """Logical Unit execution sequence.
118

119
    """
120
    write_count = self.context.cfg.write_count
121
    lu.CheckPrereq()
122
    hm = HooksMaster(self.rpc.call_hooks_runner, lu)
123
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
124
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
125
                     self._feedback_fn, None)
126

    
127
    if getattr(lu.op, "dry_run", False):
128
      # in this mode, no post-hooks are run, and the config is not
129
      # written (as it might have been modified by another LU, and we
130
      # shouldn't do writeout on behalf of other threads
131
      self.LogInfo("dry-run mode requested, not actually executing"
132
                   " the operation")
133
      return lu.dry_run_result
134

    
135
    try:
136
      result = lu.Exec(self._feedback_fn)
137
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
138
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
139
                                self._feedback_fn, result)
140
    finally:
141
      # FIXME: This needs locks if not lu_class.REQ_BGL
142
      if write_count != self.context.cfg.write_count:
143
        hm.RunConfigUpdate()
144

    
145
    return result
146

    
147
  def _LockAndExecLU(self, lu, level):
148
    """Execute a Logical Unit, with the needed locks.
149

150
    This is a recursive function that starts locking the given level, and
151
    proceeds up, till there are no more locks to acquire. Then it executes the
152
    given LU and its opcodes.
153

154
    """
155
    adding_locks = level in lu.add_locks
156
    acquiring_locks = level in lu.needed_locks
157
    if level not in locking.LEVELS:
158
      if callable(self._run_notifier):
159
        self._run_notifier()
160
      result = self._ExecLU(lu)
161
    elif adding_locks and acquiring_locks:
162
      # We could both acquire and add locks at the same level, but for now we
163
      # don't need this, so we'll avoid the complicated code needed.
164
      raise NotImplementedError(
165
        "Can't declare locks to acquire when adding others")
166
    elif adding_locks or acquiring_locks:
167
      lu.DeclareLocks(level)
168
      share = lu.share_locks[level]
169
      if acquiring_locks:
170
        needed_locks = lu.needed_locks[level]
171
        lu.acquired_locks[level] = self.context.glm.acquire(level,
172
                                                            needed_locks,
173
                                                            shared=share)
174
      else: # adding_locks
175
        add_locks = lu.add_locks[level]
176
        lu.remove_locks[level] = add_locks
177
        try:
178
          self.context.glm.add(level, add_locks, acquired=1, shared=share)
179
        except errors.LockError:
180
          raise errors.OpPrereqError(
181
            "Couldn't add locks (%s), probably because of a race condition"
182
            " with another job, who added them first" % add_locks)
183
      try:
184
        try:
185
          if adding_locks:
186
            lu.acquired_locks[level] = add_locks
187
          result = self._LockAndExecLU(lu, level + 1)
188
        finally:
189
          if level in lu.remove_locks:
190
            self.context.glm.remove(level, lu.remove_locks[level])
191
      finally:
192
        if self.context.glm.is_owned(level):
193
          self.context.glm.release(level)
194
    else:
195
      result = self._LockAndExecLU(lu, level + 1)
196

    
197
    return result
198

    
199
  def ExecOpCode(self, op, feedback_fn, run_notifier):
200
    """Execute an opcode.
201

202
    @type op: an OpCode instance
203
    @param op: the opcode to be executed
204
    @type feedback_fn: a function that takes a single argument
205
    @param feedback_fn: this function will be used as feedback from the LU
206
                        code to the end-user
207
    @type run_notifier: callable (no arguments) or None
208
    @param run_notifier:  this function (if callable) will be called when
209
                          we are about to call the lu's Exec() method, that
210
                          is, after we have acquired all locks
211

212
    """
213
    if not isinstance(op, opcodes.OpCode):
214
      raise errors.ProgrammerError("Non-opcode instance passed"
215
                                   " to ExecOpcode")
216

    
217
    self._feedback_fn = feedback_fn
218
    self._run_notifier = run_notifier
219
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
220
    if lu_class is None:
221
      raise errors.OpCodeUnknown("Unknown opcode")
222

    
223
    # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
224
    # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
225
    self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
226
                             shared=not lu_class.REQ_BGL)
227
    try:
228
      self.exclusive_BGL = lu_class.REQ_BGL
229
      lu = lu_class(self, op, self.context, self.rpc)
230
      lu.ExpandNames()
231
      assert lu.needed_locks is not None, "needed_locks not set by LU"
232
      result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
233
    finally:
234
      self.context.glm.release(locking.LEVEL_CLUSTER)
235
      self.exclusive_BGL = False
236

    
237
    return result
238

    
239
  def LogStep(self, current, total, message):
240
    """Log a change in LU execution progress.
241

242
    """
243
    logging.debug("Step %d/%d %s", current, total, message)
244
    self._feedback_fn("STEP %d/%d %s" % (current, total, message))
245

    
246
  def LogWarning(self, message, *args, **kwargs):
247
    """Log a warning to the logs and the user.
248

249
    The optional keyword argument is 'hint' and can be used to show a
250
    hint to the user (presumably related to the warning). If the
251
    message is empty, it will not be printed at all, allowing one to
252
    show only a hint.
253

254
    """
255
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
256
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
257
    if args:
258
      message = message % tuple(args)
259
    if message:
260
      logging.warning(message)
261
      self._feedback_fn(" - WARNING: %s" % message)
262
    if "hint" in kwargs:
263
      self._feedback_fn("      Hint: %s" % kwargs["hint"])
264

    
265
  def LogInfo(self, message, *args):
266
    """Log an informational message to the logs and the user.
267

268
    """
269
    if args:
270
      message = message % tuple(args)
271
    logging.info(message)
272
    self._feedback_fn(" - INFO: %s" % message)
273

    
274

    
275
class HooksMaster(object):
276
  """Hooks master.
277

278
  This class distributes the run commands to the nodes based on the
279
  specific LU class.
280

281
  In order to remove the direct dependency on the rpc module, the
282
  constructor needs a function which actually does the remote
283
  call. This will usually be rpc.call_hooks_runner, but any function
284
  which behaves the same works.
285

286
  """
287
  def __init__(self, callfn, lu):
288
    self.callfn = callfn
289
    self.lu = lu
290
    self.op = lu.op
291
    self.env, node_list_pre, node_list_post = self._BuildEnv()
292
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
293
                      constants.HOOKS_PHASE_POST: node_list_post}
294

    
295
  def _BuildEnv(self):
296
    """Compute the environment and the target nodes.
297

298
    Based on the opcode and the current node list, this builds the
299
    environment for the hooks and the target node list for the run.
300

301
    """
302
    env = {
303
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
304
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
305
      "GANETI_OP_CODE": self.op.OP_ID,
306
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
307
      "GANETI_DATA_DIR": constants.DATA_DIR,
308
      }
309

    
310
    if self.lu.HPATH is not None:
311
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
312
      if lu_env:
313
        for key in lu_env:
314
          env["GANETI_" + key] = lu_env[key]
315
    else:
316
      lu_nodes_pre = lu_nodes_post = []
317

    
318
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
319

    
320
  def _RunWrapper(self, node_list, hpath, phase):
321
    """Simple wrapper over self.callfn.
322

323
    This method fixes the environment before doing the rpc call.
324

325
    """
326
    env = self.env.copy()
327
    env["GANETI_HOOKS_PHASE"] = phase
328
    env["GANETI_HOOKS_PATH"] = hpath
329
    if self.lu.cfg is not None:
330
      env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
331
      env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
332

    
333
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
334

    
335
    return self.callfn(node_list, hpath, phase, env)
336

    
337
  def RunPhase(self, phase, nodes=None):
338
    """Run all the scripts for a phase.
339

340
    This is the main function of the HookMaster.
341

342
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
343
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
344
    @param nodes: overrides the predefined list of nodes for the given phase
345
    @return: the processed results of the hooks multi-node rpc call
346
    @raise errors.HooksFailure: on communication failure to the nodes
347
    @raise errors.HooksAbort: on failure of one of the hooks
348

349
    """
350
    if not self.node_list[phase] and not nodes:
351
      # empty node list, we should not attempt to run this as either
352
      # we're in the cluster init phase and the rpc client part can't
353
      # even attempt to run, or this LU doesn't do hooks at all
354
      return
355
    hpath = self.lu.HPATH
356
    if nodes is not None:
357
      results = self._RunWrapper(nodes, hpath, phase)
358
    else:
359
      results = self._RunWrapper(self.node_list[phase], hpath, phase)
360
    errs = []
361
    if not results:
362
      msg = "Communication Failure"
363
      if phase == constants.HOOKS_PHASE_PRE:
364
        raise errors.HooksFailure(msg)
365
      else:
366
        self.lu.LogWarning(msg)
367
    for node_name in results:
368
      res = results[node_name]
369
      if res.offline:
370
        continue
371
      msg = res.RemoteFailMsg()
372
      if msg:
373
        self.lu.LogWarning("Communication failure to node %s: %s",
374
                           node_name, msg)
375
        continue
376
      for script, hkr, output in res.payload:
377
        if hkr == constants.HKR_FAIL:
378
          if phase == constants.HOOKS_PHASE_PRE:
379
            errs.append((node_name, script, output))
380
          else:
381
            if not output:
382
              output = "(no output)"
383
            self.lu.LogWarning("On %s script %s failed, output: %s" %
384
                               (node_name, script, output))
385
    if errs and phase == constants.HOOKS_PHASE_PRE:
386
      raise errors.HooksAbort(errs)
387
    return results
388

    
389
  def RunConfigUpdate(self):
390
    """Run the special configuration update hook
391

392
    This is a special hook that runs only on the master after each
393
    top-level LI if the configuration has been updated.
394

395
    """
396
    phase = constants.HOOKS_PHASE_POST
397
    hpath = constants.HOOKS_NAME_CFGUPDATE
398
    nodes = [self.lu.cfg.GetMasterNode()]
399
    self._RunWrapper(nodes, hpath, phase)