Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ b31c8676

History | View | Annotate | Download (12.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import logging
32

    
33
from ganeti import opcodes
34
from ganeti import constants
35
from ganeti import errors
36
from ganeti import rpc
37
from ganeti import cmdlib
38
from ganeti import locking
39

    
40

    
41
class Processor(object):
42
  """Object which runs OpCodes"""
43
  DISPATCH_TABLE = {
44
    # Cluster
45
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
46
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
47
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
48
    opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
49
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
50
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
51
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
52
    # node lu
53
    opcodes.OpAddNode: cmdlib.LUAddNode,
54
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
55
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
56
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
57
    opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
58
    # instance lu
59
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
60
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
61
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
62
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
63
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
64
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
65
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
66
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
67
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
68
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
69
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
70
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
71
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
72
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
73
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
74
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
75
    # os lu
76
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
77
    # exports lu
78
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
79
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
80
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
81
    # tags lu
82
    opcodes.OpGetTags: cmdlib.LUGetTags,
83
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
84
    opcodes.OpAddTags: cmdlib.LUAddTags,
85
    opcodes.OpDelTags: cmdlib.LUDelTags,
86
    # test lu
87
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
88
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
89
    }
90

    
91
  def __init__(self, context):
92
    """Constructor for Processor
93

94
    Args:
95
     - feedback_fn: the feedback function (taking one string) to be run when
96
                    interesting events are happening
97
    """
98
    self.context = context
99
    self._feedback_fn = None
100
    self.exclusive_BGL = False
101
    self.rpc = rpc.RpcRunner(context.cfg)
102

    
103
  def _ExecLU(self, lu):
104
    """Logical Unit execution sequence.
105

106
    """
107
    write_count = self.context.cfg.write_count
108
    lu.CheckPrereq()
109
    hm = HooksMaster(self.rpc.call_hooks_runner, self, lu)
110
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
111
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
112
                     self._feedback_fn, None)
113
    try:
114
      result = lu.Exec(self._feedback_fn)
115
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
116
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
117
                                self._feedback_fn, result)
118
    finally:
119
      # FIXME: This needs locks if not lu_class.REQ_BGL
120
      if write_count != self.context.cfg.write_count:
121
        hm.RunConfigUpdate()
122

    
123
    return result
124

    
125
  def _LockAndExecLU(self, lu, level):
126
    """Execute a Logical Unit, with the needed locks.
127

128
    This is a recursive function that starts locking the given level, and
129
    proceeds up, till there are no more locks to acquire. Then it executes the
130
    given LU and its opcodes.
131

132
    """
133
    adding_locks = level in lu.add_locks
134
    acquiring_locks = level in lu.needed_locks
135
    if level not in locking.LEVELS:
136
      if callable(self._run_notifier):
137
        self._run_notifier()
138
      result = self._ExecLU(lu)
139
    elif adding_locks and acquiring_locks:
140
      # We could both acquire and add locks at the same level, but for now we
141
      # don't need this, so we'll avoid the complicated code needed.
142
      raise NotImplementedError(
143
        "Can't declare locks to acquire when adding others")
144
    elif adding_locks or acquiring_locks:
145
      lu.DeclareLocks(level)
146
      share = lu.share_locks[level]
147
      if acquiring_locks:
148
        needed_locks = lu.needed_locks[level]
149
        lu.acquired_locks[level] = self.context.glm.acquire(level,
150
                                                            needed_locks,
151
                                                            shared=share)
152
      else: # adding_locks
153
        add_locks = lu.add_locks[level]
154
        lu.remove_locks[level] = add_locks
155
        try:
156
          self.context.glm.add(level, add_locks, acquired=1, shared=share)
157
        except errors.LockError:
158
          raise errors.OpPrereqError(
159
            "Coudn't add locks (%s), probably because of a race condition"
160
            " with another job, who added them first" % add_locks)
161
      try:
162
        try:
163
          if adding_locks:
164
            lu.acquired_locks[level] = add_locks
165
          result = self._LockAndExecLU(lu, level + 1)
166
        finally:
167
          if level in lu.remove_locks:
168
            self.context.glm.remove(level, lu.remove_locks[level])
169
      finally:
170
        if self.context.glm.is_owned(level):
171
          self.context.glm.release(level)
172
    else:
173
      result = self._LockAndExecLU(lu, level + 1)
174

    
175
    return result
176

    
177
  def ExecOpCode(self, op, feedback_fn, run_notifier):
178
    """Execute an opcode.
179

180
    @type op: an OpCode instance
181
    @param op: the opcode to be executed
182
    @type feedback_fn: a function that takes a single argument
183
    @param feedback_fn: this function will be used as feedback from the LU
184
                        code to the end-user
185
    @type run_notifier: callable (no arguments) or None
186
    @param run_notifier:  this function (if callable) will be called when
187
                          we are about to call the lu's Exec() method, that
188
                          is, after we have aquired all locks
189

190
    """
191
    if not isinstance(op, opcodes.OpCode):
192
      raise errors.ProgrammerError("Non-opcode instance passed"
193
                                   " to ExecOpcode")
194

    
195
    self._feedback_fn = feedback_fn
196
    self._run_notifier = run_notifier
197
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
198
    if lu_class is None:
199
      raise errors.OpCodeUnknown("Unknown opcode")
200

    
201
    # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
202
    # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
203
    self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
204
                             shared=not lu_class.REQ_BGL)
205
    try:
206
      self.exclusive_BGL = lu_class.REQ_BGL
207
      lu = lu_class(self, op, self.context, self.rpc)
208
      lu.ExpandNames()
209
      assert lu.needed_locks is not None, "needed_locks not set by LU"
210
      result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
211
    finally:
212
      self.context.glm.release(locking.LEVEL_CLUSTER)
213
      self.exclusive_BGL = False
214

    
215
    return result
216

    
217
  def LogStep(self, current, total, message):
218
    """Log a change in LU execution progress.
219

220
    """
221
    logging.debug("Step %d/%d %s", current, total, message)
222
    self._feedback_fn("STEP %d/%d %s" % (current, total, message))
223

    
224
  def LogWarning(self, message, *args, **kwargs):
225
    """Log a warning to the logs and the user.
226

227
    The optional keyword argument is 'hint' and can be used to show a
228
    hint to the user (presumably related to the warning). If the
229
    message is empty, it will not be printed at all, allowing one to
230
    show only a hint.
231

232
    """
233
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
234
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
235
    if args:
236
      message = message % tuple(args)
237
    if message:
238
      logging.warning(message)
239
      self._feedback_fn(" - WARNING: %s" % message)
240
    if "hint" in kwargs:
241
      self._feedback_fn("      Hint: %s" % kwargs["hint"])
242

    
243
  def LogInfo(self, message, *args):
244
    """Log an informational message to the logs and the user.
245

246
    """
247
    if args:
248
      message = message % tuple(args)
249
    logging.info(message)
250
    self._feedback_fn(" - INFO: %s" % message)
251

    
252

    
253
class HooksMaster(object):
254
  """Hooks master.
255

256
  This class distributes the run commands to the nodes based on the
257
  specific LU class.
258

259
  In order to remove the direct dependency on the rpc module, the
260
  constructor needs a function which actually does the remote
261
  call. This will usually be rpc.call_hooks_runner, but any function
262
  which behaves the same works.
263

264
  """
265
  def __init__(self, callfn, proc, lu):
266
    self.callfn = callfn
267
    self.proc = proc
268
    self.lu = lu
269
    self.op = lu.op
270
    self.env, node_list_pre, node_list_post = self._BuildEnv()
271
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
272
                      constants.HOOKS_PHASE_POST: node_list_post}
273

    
274
  def _BuildEnv(self):
275
    """Compute the environment and the target nodes.
276

277
    Based on the opcode and the current node list, this builds the
278
    environment for the hooks and the target node list for the run.
279

280
    """
281
    env = {
282
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
283
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
284
      "GANETI_OP_CODE": self.op.OP_ID,
285
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
286
      "GANETI_DATA_DIR": constants.DATA_DIR,
287
      }
288

    
289
    if self.lu.HPATH is not None:
290
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
291
      if lu_env:
292
        for key in lu_env:
293
          env["GANETI_" + key] = lu_env[key]
294
    else:
295
      lu_nodes_pre = lu_nodes_post = []
296

    
297
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
298

    
299
  def _RunWrapper(self, node_list, hpath, phase):
300
    """Simple wrapper over self.callfn.
301

302
    This method fixes the environment before doing the rpc call.
303

304
    """
305
    env = self.env.copy()
306
    env["GANETI_HOOKS_PHASE"] = phase
307
    env["GANETI_HOOKS_PATH"] = hpath
308
    if self.lu.cfg is not None:
309
      env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
310
      env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
311

    
312
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
313

    
314
    return self.callfn(node_list, hpath, phase, env)
315

    
316
  def RunPhase(self, phase):
317
    """Run all the scripts for a phase.
318

319
    This is the main function of the HookMaster.
320

321
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
322
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
323
    @return: the processed results of the hooks multi-node rpc call
324
    @raise errors.HooksFailure: on communication failure to the nodes
325

326
    """
327
    if not self.node_list[phase]:
328
      # empty node list, we should not attempt to run this as either
329
      # we're in the cluster init phase and the rpc client part can't
330
      # even attempt to run, or this LU doesn't do hooks at all
331
      return
332
    hpath = self.lu.HPATH
333
    results = self._RunWrapper(self.node_list[phase], hpath, phase)
334
    if phase == constants.HOOKS_PHASE_PRE:
335
      errs = []
336
      if not results:
337
        raise errors.HooksFailure("Communication failure")
338
      for node_name in results:
339
        res = results[node_name]
340
        if res is False or not isinstance(res, list):
341
          self.proc.LogWarning("Communication failure to node %s" % node_name)
342
          continue
343
        for script, hkr, output in res:
344
          if hkr == constants.HKR_FAIL:
345
            output = output.strip().encode("string_escape")
346
            errs.append((node_name, script, output))
347
      if errs:
348
        raise errors.HooksAbort(errs)
349
    return results
350

    
351
  def RunConfigUpdate(self):
352
    """Run the special configuration update hook
353

354
    This is a special hook that runs only on the master after each
355
    top-level LI if the configuration has been updated.
356

357
    """
358
    phase = constants.HOOKS_PHASE_POST
359
    hpath = constants.HOOKS_NAME_CFGUPDATE
360
    nodes = [self.lu.cfg.GetMasterNode()]
361
    results = self._RunWrapper(nodes, hpath, phase)