Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 9c8c69bc

History | View | Annotate | Download (26.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import sys
32
import logging
33
import random
34
import time
35
import itertools
36
import traceback
37

    
38
from ganeti import opcodes
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import cmdlib
42
from ganeti import locking
43
from ganeti import utils
44
from ganeti import compat
45
from ganeti import pathutils
46

    
47

    
48
_OP_PREFIX = "Op"
49
_LU_PREFIX = "LU"
50

    
51
#: LU classes which don't need to acquire the node allocation lock
52
#: (L{locking.NAL}) when they acquire all node or node resource locks
53
_NODE_ALLOC_WHITELIST = frozenset([])
54

    
55
#: LU classes which don't need to acquire the node allocation lock
56
#: (L{locking.NAL}) in the same mode (shared/exclusive) as the node
57
#: or node resource locks
58
_NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([
59
  cmdlib.LUBackupExport,
60
  cmdlib.LUBackupRemove,
61
  cmdlib.LUOobCommand,
62
  ])
63

    
64

    
65
class LockAcquireTimeout(Exception):
66
  """Exception to report timeouts on acquiring locks.
67

68
  """
69

    
70

    
71
def _CalculateLockAttemptTimeouts():
72
  """Calculate timeouts for lock attempts.
73

74
  """
75
  result = [constants.LOCK_ATTEMPTS_MINWAIT]
76
  running_sum = result[0]
77

    
78
  # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
79
  # blocking acquire
80
  while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
81
    timeout = (result[-1] * 1.05) ** 1.25
82

    
83
    # Cap max timeout. This gives other jobs a chance to run even if
84
    # we're still trying to get our locks, before finally moving to a
85
    # blocking acquire.
86
    timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
87
    # And also cap the lower boundary for safety
88
    timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
89

    
90
    result.append(timeout)
91
    running_sum += timeout
92

    
93
  return result
94

    
95

    
96
class LockAttemptTimeoutStrategy(object):
97
  """Class with lock acquire timeout strategy.
98

99
  """
100
  __slots__ = [
101
    "_timeouts",
102
    "_random_fn",
103
    "_time_fn",
104
    ]
105

    
106
  _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
107

    
108
  def __init__(self, _time_fn=time.time, _random_fn=random.random):
109
    """Initializes this class.
110

111
    @param _time_fn: Time function for unittests
112
    @param _random_fn: Random number generator for unittests
113

114
    """
115
    object.__init__(self)
116

    
117
    self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
118
    self._time_fn = _time_fn
119
    self._random_fn = _random_fn
120

    
121
  def NextAttempt(self):
122
    """Returns the timeout for the next attempt.
123

124
    """
125
    try:
126
      timeout = self._timeouts.next()
127
    except StopIteration:
128
      # No more timeouts, do blocking acquire
129
      timeout = None
130

    
131
    if timeout is not None:
132
      # Add a small variation (-/+ 5%) to timeout. This helps in situations
133
      # where two or more jobs are fighting for the same lock(s).
134
      variation_range = timeout * 0.1
135
      timeout += ((self._random_fn() * variation_range) -
136
                  (variation_range * 0.5))
137

    
138
    return timeout
139

    
140

    
141
class OpExecCbBase: # pylint: disable=W0232
142
  """Base class for OpCode execution callbacks.
143

144
  """
145
  def NotifyStart(self):
146
    """Called when we are about to execute the LU.
147

148
    This function is called when we're about to start the lu's Exec() method,
149
    that is, after we have acquired all locks.
150

151
    """
152

    
153
  def Feedback(self, *args):
154
    """Sends feedback from the LU code to the end-user.
155

156
    """
157

    
158
  def CurrentPriority(self): # pylint: disable=R0201
159
    """Returns current priority or C{None}.
160

161
    """
162
    return None
163

    
164
  def SubmitManyJobs(self, jobs):
165
    """Submits jobs for processing.
166

167
    See L{jqueue.JobQueue.SubmitManyJobs}.
168

169
    """
170
    raise NotImplementedError
171

    
172

    
173
def _LUNameForOpName(opname):
174
  """Computes the LU name for a given OpCode name.
175

176
  """
177
  assert opname.startswith(_OP_PREFIX), \
178
      "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
179

    
180
  return _LU_PREFIX + opname[len(_OP_PREFIX):]
181

    
182

    
183
def _ComputeDispatchTable():
184
  """Computes the opcode-to-lu dispatch table.
185

186
  """
187
  return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
188
              for op in opcodes.OP_MAPPING.values()
189
              if op.WITH_LU)
190

    
191

    
192
def _SetBaseOpParams(src, defcomment, dst):
193
  """Copies basic opcode parameters.
194

195
  @type src: L{opcodes.OpCode}
196
  @param src: Source opcode
197
  @type defcomment: string
198
  @param defcomment: Comment to specify if not already given
199
  @type dst: L{opcodes.OpCode}
200
  @param dst: Destination opcode
201

202
  """
203
  if hasattr(src, "debug_level"):
204
    dst.debug_level = src.debug_level
205

    
206
  if (getattr(dst, "priority", None) is None and
207
      hasattr(src, "priority")):
208
    dst.priority = src.priority
209

    
210
  if not getattr(dst, opcodes.COMMENT_ATTR, None):
211
    dst.comment = defcomment
212

    
213

    
214
def _ProcessResult(submit_fn, op, result):
215
  """Examines opcode result.
216

217
  If necessary, additional processing on the result is done.
218

219
  """
220
  if isinstance(result, cmdlib.ResultWithJobs):
221
    # Copy basic parameters (e.g. priority)
222
    map(compat.partial(_SetBaseOpParams, op,
223
                       "Submitted by %s" % op.OP_ID),
224
        itertools.chain(*result.jobs))
225

    
226
    # Submit jobs
227
    job_submission = submit_fn(result.jobs)
228

    
229
    # Build dictionary
230
    result = result.other
231

    
232
    assert constants.JOB_IDS_KEY not in result, \
233
      "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
234

    
235
    result[constants.JOB_IDS_KEY] = job_submission
236

    
237
  return result
238

    
239

    
240
def _FailingSubmitManyJobs(_):
241
  """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
242

243
  """
244
  raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
245
                               " queries) can not submit jobs")
246

    
247

    
248
def _RpcResultsToHooksResults(rpc_results):
249
  """Function to convert RPC results to the format expected by HooksMaster.
250

251
  @type rpc_results: dict(node: L{rpc.RpcResult})
252
  @param rpc_results: RPC results
253
  @rtype: dict(node: (fail_msg, offline, hooks_results))
254
  @return: RPC results unpacked according to the format expected by
255
    L({mcpu.HooksMaster}
256

257
  """
258
  return dict((node, (rpc_res.fail_msg, rpc_res.offline, rpc_res.payload))
259
              for (node, rpc_res) in rpc_results.items())
260

    
261

    
262
def _VerifyLocks(lu, glm, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST,
263
                 _nal_whitelist=_NODE_ALLOC_WHITELIST):
264
  """Performs consistency checks on locks acquired by a logical unit.
265

266
  @type lu: L{cmdlib.LogicalUnit}
267
  @param lu: Logical unit instance
268
  @type glm: L{locking.GanetiLockManager}
269
  @param glm: Lock manager
270

271
  """
272
  if not __debug__:
273
    return
274

    
275
  have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL)
276

    
277
  for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
278
    # TODO: Verify using actual lock mode, not using LU variables
279
    if level in lu.needed_locks:
280
      share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC]
281
      share_level = lu.share_locks[level]
282

    
283
      if lu.__class__ in _mode_whitelist:
284
        assert share_node_alloc != share_level, \
285
          "LU is whitelisted to use different modes for node allocation lock"
286
      else:
287
        assert bool(share_node_alloc) == bool(share_level), \
288
          ("Node allocation lock must be acquired using the same mode as nodes"
289
           " and node resources")
290

    
291
      if lu.__class__ in _nal_whitelist:
292
        assert not have_nal, \
293
          "LU is whitelisted for not acquiring the node allocation lock"
294
      elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level):
295
        assert have_nal, \
296
          ("Node allocation lock must be used if an LU acquires all nodes"
297
           " or node resources")
298

    
299

    
300
class Processor(object):
301
  """Object which runs OpCodes"""
302
  DISPATCH_TABLE = _ComputeDispatchTable()
303

    
304
  def __init__(self, context, ec_id, enable_locks=True):
305
    """Constructor for Processor
306

307
    @type context: GanetiContext
308
    @param context: global Ganeti context
309
    @type ec_id: string
310
    @param ec_id: execution context identifier
311

312
    """
313
    self.context = context
314
    self._ec_id = ec_id
315
    self._cbs = None
316
    self.rpc = context.rpc
317
    self.hmclass = HooksMaster
318
    self._enable_locks = enable_locks
319

    
320
  def _CheckLocksEnabled(self):
321
    """Checks if locking is enabled.
322

323
    @raise errors.ProgrammerError: In case locking is not enabled
324

325
    """
326
    if not self._enable_locks:
327
      raise errors.ProgrammerError("Attempted to use disabled locks")
328

    
329
  def _AcquireLocks(self, level, names, shared, opportunistic, timeout):
330
    """Acquires locks via the Ganeti lock manager.
331

332
    @type level: int
333
    @param level: Lock level
334
    @type names: list or string
335
    @param names: Lock names
336
    @type shared: bool
337
    @param shared: Whether the locks should be acquired in shared mode
338
    @type opportunistic: bool
339
    @param opportunistic: Whether to acquire opportunistically
340
    @type timeout: None or float
341
    @param timeout: Timeout for acquiring the locks
342
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
343
        amount of time
344

345
    """
346
    self._CheckLocksEnabled()
347

    
348
    if self._cbs:
349
      priority = self._cbs.CurrentPriority()
350
    else:
351
      priority = None
352

    
353
    acquired = self.context.glm.acquire(level, names, shared=shared,
354
                                        timeout=timeout, priority=priority,
355
                                        opportunistic=opportunistic)
356

    
357
    if acquired is None:
358
      raise LockAcquireTimeout()
359

    
360
    return acquired
361

    
362
  def _ExecLU(self, lu):
363
    """Logical Unit execution sequence.
364

365
    """
366
    write_count = self.context.cfg.write_count
367
    lu.CheckPrereq()
368

    
369
    hm = self.BuildHooksManager(lu)
370
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
371
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
372
                     self.Log, None)
373

    
374
    if getattr(lu.op, "dry_run", False):
375
      # in this mode, no post-hooks are run, and the config is not
376
      # written (as it might have been modified by another LU, and we
377
      # shouldn't do writeout on behalf of other threads
378
      self.LogInfo("dry-run mode requested, not actually executing"
379
                   " the operation")
380
      return lu.dry_run_result
381

    
382
    if self._cbs:
383
      submit_mj_fn = self._cbs.SubmitManyJobs
384
    else:
385
      submit_mj_fn = _FailingSubmitManyJobs
386

    
387
    try:
388
      result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
389
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
390
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
391
                                self.Log, result)
392
    finally:
393
      # FIXME: This needs locks if not lu_class.REQ_BGL
394
      if write_count != self.context.cfg.write_count:
395
        hm.RunConfigUpdate()
396

    
397
    return result
398

    
399
  def BuildHooksManager(self, lu):
400
    return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
401

    
402
  def _LockAndExecLU(self, lu, level, calc_timeout):
403
    """Execute a Logical Unit, with the needed locks.
404

405
    This is a recursive function that starts locking the given level, and
406
    proceeds up, till there are no more locks to acquire. Then it executes the
407
    given LU and its opcodes.
408

409
    """
410
    glm = self.context.glm
411
    adding_locks = level in lu.add_locks
412
    acquiring_locks = level in lu.needed_locks
413

    
414
    if level not in locking.LEVELS:
415
      _VerifyLocks(lu, glm)
416

    
417
      if self._cbs:
418
        self._cbs.NotifyStart()
419

    
420
      try:
421
        result = self._ExecLU(lu)
422
      except AssertionError, err:
423
        # this is a bit ugly, as we don't know from which phase
424
        # (prereq, exec) this comes; but it's better than an exception
425
        # with no information
426
        (_, _, tb) = sys.exc_info()
427
        err_info = traceback.format_tb(tb)
428
        del tb
429
        logging.exception("Detected AssertionError")
430
        raise errors.OpExecError("Internal assertion error: please report"
431
                                 " this as a bug.\nError message: '%s';"
432
                                 " location:\n%s" % (str(err), err_info[-1]))
433

    
434
    elif adding_locks and acquiring_locks:
435
      # We could both acquire and add locks at the same level, but for now we
436
      # don't need this, so we'll avoid the complicated code needed.
437
      raise NotImplementedError("Can't declare locks to acquire when adding"
438
                                " others")
439

    
440
    elif adding_locks or acquiring_locks:
441
      self._CheckLocksEnabled()
442

    
443
      lu.DeclareLocks(level)
444
      share = lu.share_locks[level]
445
      opportunistic = lu.opportunistic_locks[level]
446

    
447
      try:
448
        assert adding_locks ^ acquiring_locks, \
449
          "Locks must be either added or acquired"
450

    
451
        if acquiring_locks:
452
          # Acquiring locks
453
          needed_locks = lu.needed_locks[level]
454

    
455
          self._AcquireLocks(level, needed_locks, share, opportunistic,
456
                             calc_timeout())
457
        else:
458
          # Adding locks
459
          add_locks = lu.add_locks[level]
460
          lu.remove_locks[level] = add_locks
461

    
462
          try:
463
            glm.add(level, add_locks, acquired=1, shared=share)
464
          except errors.LockError:
465
            logging.exception("Detected lock error in level %s for locks"
466
                              " %s, shared=%s", level, add_locks, share)
467
            raise errors.OpPrereqError(
468
              "Couldn't add locks (%s), most likely because of another"
469
              " job who added them first" % add_locks,
470
              errors.ECODE_NOTUNIQUE)
471

    
472
        try:
473
          result = self._LockAndExecLU(lu, level + 1, calc_timeout)
474
        finally:
475
          if level in lu.remove_locks:
476
            glm.remove(level, lu.remove_locks[level])
477
      finally:
478
        if glm.is_owned(level):
479
          glm.release(level)
480

    
481
    else:
482
      result = self._LockAndExecLU(lu, level + 1, calc_timeout)
483

    
484
    return result
485

    
486
  def ExecOpCode(self, op, cbs, timeout=None):
487
    """Execute an opcode.
488

489
    @type op: an OpCode instance
490
    @param op: the opcode to be executed
491
    @type cbs: L{OpExecCbBase}
492
    @param cbs: Runtime callbacks
493
    @type timeout: float or None
494
    @param timeout: Maximum time to acquire all locks, None for no timeout
495
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
496
        amount of time
497

498
    """
499
    if not isinstance(op, opcodes.OpCode):
500
      raise errors.ProgrammerError("Non-opcode instance passed"
501
                                   " to ExecOpcode (%s)" % type(op))
502

    
503
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
504
    if lu_class is None:
505
      raise errors.OpCodeUnknown("Unknown opcode")
506

    
507
    if timeout is None:
508
      calc_timeout = lambda: None
509
    else:
510
      calc_timeout = utils.RunningTimeout(timeout, False).Remaining
511

    
512
    self._cbs = cbs
513
    try:
514
      if self._enable_locks:
515
        # Acquire the Big Ganeti Lock exclusively if this LU requires it,
516
        # and in a shared fashion otherwise (to prevent concurrent run with
517
        # an exclusive LU.
518
        self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
519
                            not lu_class.REQ_BGL, False, calc_timeout())
520
      elif lu_class.REQ_BGL:
521
        raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
522
                                     " disabled" % op.OP_ID)
523

    
524
      try:
525
        lu = lu_class(self, op, self.context, self.rpc)
526
        lu.ExpandNames()
527
        assert lu.needed_locks is not None, "needed_locks not set by LU"
528

    
529
        try:
530
          result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
531
                                       calc_timeout)
532
        finally:
533
          if self._ec_id:
534
            self.context.cfg.DropECReservations(self._ec_id)
535
      finally:
536
        # Release BGL if owned
537
        if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
538
          assert self._enable_locks
539
          self.context.glm.release(locking.LEVEL_CLUSTER)
540
    finally:
541
      self._cbs = None
542

    
543
    resultcheck_fn = op.OP_RESULT
544
    if not (resultcheck_fn is None or resultcheck_fn(result)):
545
      logging.error("Expected opcode result matching %s, got %s",
546
                    resultcheck_fn, result)
547
      if not getattr(op, "dry_run", False):
548
        # FIXME: LUs should still behave in dry_run mode, or
549
        # alternately we should have OP_DRYRUN_RESULT; in the
550
        # meantime, we simply skip the OP_RESULT check in dry-run mode
551
        raise errors.OpResultError("Opcode result does not match %s: %s" %
552
                                   (resultcheck_fn, utils.Truncate(result, 80)))
553

    
554
    return result
555

    
556
  def Log(self, *args):
557
    """Forward call to feedback callback function.
558

559
    """
560
    if self._cbs:
561
      self._cbs.Feedback(*args)
562

    
563
  def LogStep(self, current, total, message):
564
    """Log a change in LU execution progress.
565

566
    """
567
    logging.debug("Step %d/%d %s", current, total, message)
568
    self.Log("STEP %d/%d %s" % (current, total, message))
569

    
570
  def LogWarning(self, message, *args, **kwargs):
571
    """Log a warning to the logs and the user.
572

573
    The optional keyword argument is 'hint' and can be used to show a
574
    hint to the user (presumably related to the warning). If the
575
    message is empty, it will not be printed at all, allowing one to
576
    show only a hint.
577

578
    """
579
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
580
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
581
    if args:
582
      message = message % tuple(args)
583
    if message:
584
      logging.warning(message)
585
      self.Log(" - WARNING: %s" % message)
586
    if "hint" in kwargs:
587
      self.Log("      Hint: %s" % kwargs["hint"])
588

    
589
  def LogInfo(self, message, *args):
590
    """Log an informational message to the logs and the user.
591

592
    """
593
    if args:
594
      message = message % tuple(args)
595
    logging.info(message)
596
    self.Log(" - INFO: %s" % message)
597

    
598
  def GetECId(self):
599
    """Returns the current execution context ID.
600

601
    """
602
    if not self._ec_id:
603
      raise errors.ProgrammerError("Tried to use execution context id when"
604
                                   " not set")
605
    return self._ec_id
606

    
607

    
608
class HooksMaster(object):
609
  def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn,
610
               hooks_results_adapt_fn, build_env_fn, log_fn, htype=None,
611
               cluster_name=None, master_name=None):
612
    """Base class for hooks masters.
613

614
    This class invokes the execution of hooks according to the behaviour
615
    specified by its parameters.
616

617
    @type opcode: string
618
    @param opcode: opcode of the operation to which the hooks are tied
619
    @type hooks_path: string
620
    @param hooks_path: prefix of the hooks directories
621
    @type nodes: 2-tuple of lists
622
    @param nodes: 2-tuple of lists containing nodes on which pre-hooks must be
623
      run and nodes on which post-hooks must be run
624
    @type hooks_execution_fn: function that accepts the following parameters:
625
      (node_list, hooks_path, phase, environment)
626
    @param hooks_execution_fn: function that will execute the hooks; can be
627
      None, indicating that no conversion is necessary.
628
    @type hooks_results_adapt_fn: function
629
    @param hooks_results_adapt_fn: function that will adapt the return value of
630
      hooks_execution_fn to the format expected by RunPhase
631
    @type build_env_fn: function that returns a dictionary having strings as
632
      keys
633
    @param build_env_fn: function that builds the environment for the hooks
634
    @type log_fn: function that accepts a string
635
    @param log_fn: logging function
636
    @type htype: string or None
637
    @param htype: None or one of L{constants.HTYPE_CLUSTER},
638
     L{constants.HTYPE_NODE}, L{constants.HTYPE_INSTANCE}
639
    @type cluster_name: string
640
    @param cluster_name: name of the cluster
641
    @type master_name: string
642
    @param master_name: name of the master
643

644
    """
645
    self.opcode = opcode
646
    self.hooks_path = hooks_path
647
    self.hooks_execution_fn = hooks_execution_fn
648
    self.hooks_results_adapt_fn = hooks_results_adapt_fn
649
    self.build_env_fn = build_env_fn
650
    self.log_fn = log_fn
651
    self.htype = htype
652
    self.cluster_name = cluster_name
653
    self.master_name = master_name
654

    
655
    self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
656
    (self.pre_nodes, self.post_nodes) = nodes
657

    
658
  def _BuildEnv(self, phase):
659
    """Compute the environment and the target nodes.
660

661
    Based on the opcode and the current node list, this builds the
662
    environment for the hooks and the target node list for the run.
663

664
    """
665
    if phase == constants.HOOKS_PHASE_PRE:
666
      prefix = "GANETI_"
667
    elif phase == constants.HOOKS_PHASE_POST:
668
      prefix = "GANETI_POST_"
669
    else:
670
      raise AssertionError("Unknown phase '%s'" % phase)
671

    
672
    env = {}
673

    
674
    if self.hooks_path is not None:
675
      phase_env = self.build_env_fn()
676
      if phase_env:
677
        assert not compat.any(key.upper().startswith(prefix)
678
                              for key in phase_env)
679
        env.update(("%s%s" % (prefix, key), value)
680
                   for (key, value) in phase_env.items())
681

    
682
    if phase == constants.HOOKS_PHASE_PRE:
683
      assert compat.all((key.startswith("GANETI_") and
684
                         not key.startswith("GANETI_POST_"))
685
                        for key in env)
686

    
687
    elif phase == constants.HOOKS_PHASE_POST:
688
      assert compat.all(key.startswith("GANETI_POST_") for key in env)
689
      assert isinstance(self.pre_env, dict)
690

    
691
      # Merge with pre-phase environment
692
      assert not compat.any(key.startswith("GANETI_POST_")
693
                            for key in self.pre_env)
694
      env.update(self.pre_env)
695
    else:
696
      raise AssertionError("Unknown phase '%s'" % phase)
697

    
698
    return env
699

    
700
  def _RunWrapper(self, node_list, hpath, phase, phase_env):
701
    """Simple wrapper over self.callfn.
702

703
    This method fixes the environment before executing the hooks.
704

705
    """
706
    env = {
707
      "PATH": constants.HOOKS_PATH,
708
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
709
      "GANETI_OP_CODE": self.opcode,
710
      "GANETI_DATA_DIR": pathutils.DATA_DIR,
711
      "GANETI_HOOKS_PHASE": phase,
712
      "GANETI_HOOKS_PATH": hpath,
713
      }
714

    
715
    if self.htype:
716
      env["GANETI_OBJECT_TYPE"] = self.htype
717

    
718
    if self.cluster_name is not None:
719
      env["GANETI_CLUSTER"] = self.cluster_name
720

    
721
    if self.master_name is not None:
722
      env["GANETI_MASTER"] = self.master_name
723

    
724
    if phase_env:
725
      env = utils.algo.JoinDisjointDicts(env, phase_env)
726

    
727
    # Convert everything to strings
728
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
729

    
730
    assert compat.all(key == "PATH" or key.startswith("GANETI_")
731
                      for key in env)
732

    
733
    return self.hooks_execution_fn(node_list, hpath, phase, env)
734

    
735
  def RunPhase(self, phase, nodes=None):
736
    """Run all the scripts for a phase.
737

738
    This is the main function of the HookMaster.
739
    It executes self.hooks_execution_fn, and after running
740
    self.hooks_results_adapt_fn on its results it expects them to be in the form
741
    {node_name: (fail_msg, [(script, result, output), ...]}).
742

743
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
744
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
745
    @param nodes: overrides the predefined list of nodes for the given phase
746
    @return: the processed results of the hooks multi-node rpc call
747
    @raise errors.HooksFailure: on communication failure to the nodes
748
    @raise errors.HooksAbort: on failure of one of the hooks
749

750
    """
751
    if phase == constants.HOOKS_PHASE_PRE:
752
      if nodes is None:
753
        nodes = self.pre_nodes
754
      env = self.pre_env
755
    elif phase == constants.HOOKS_PHASE_POST:
756
      if nodes is None:
757
        nodes = self.post_nodes
758
      env = self._BuildEnv(phase)
759
    else:
760
      raise AssertionError("Unknown phase '%s'" % phase)
761

    
762
    if not nodes:
763
      # empty node list, we should not attempt to run this as either
764
      # we're in the cluster init phase and the rpc client part can't
765
      # even attempt to run, or this LU doesn't do hooks at all
766
      return
767

    
768
    results = self._RunWrapper(nodes, self.hooks_path, phase, env)
769
    if not results:
770
      msg = "Communication Failure"
771
      if phase == constants.HOOKS_PHASE_PRE:
772
        raise errors.HooksFailure(msg)
773
      else:
774
        self.log_fn(msg)
775
        return results
776

    
777
    converted_res = results
778
    if self.hooks_results_adapt_fn:
779
      converted_res = self.hooks_results_adapt_fn(results)
780

    
781
    errs = []
782
    for node_name, (fail_msg, offline, hooks_results) in converted_res.items():
783
      if offline:
784
        continue
785

    
786
      if fail_msg:
787
        self.log_fn("Communication failure to node %s: %s", node_name, fail_msg)
788
        continue
789

    
790
      for script, hkr, output in hooks_results:
791
        if hkr == constants.HKR_FAIL:
792
          if phase == constants.HOOKS_PHASE_PRE:
793
            errs.append((node_name, script, output))
794
          else:
795
            if not output:
796
              output = "(no output)"
797
            self.log_fn("On %s script %s failed, output: %s" %
798
                        (node_name, script, output))
799

    
800
    if errs and phase == constants.HOOKS_PHASE_PRE:
801
      raise errors.HooksAbort(errs)
802

    
803
    return results
804

    
805
  def RunConfigUpdate(self):
806
    """Run the special configuration update hook
807

808
    This is a special hook that runs only on the master after each
809
    top-level LI if the configuration has been updated.
810

811
    """
812
    phase = constants.HOOKS_PHASE_POST
813
    hpath = constants.HOOKS_NAME_CFGUPDATE
814
    nodes = [self.master_name]
815
    self._RunWrapper(nodes, hpath, phase, self.pre_env)
816

    
817
  @staticmethod
818
  def BuildFromLu(hooks_execution_fn, lu):
819
    if lu.HPATH is None:
820
      nodes = (None, None)
821
    else:
822
      nodes = map(frozenset, lu.BuildHooksNodes())
823

    
824
    master_name = cluster_name = None
825
    if lu.cfg:
826
      master_name = lu.cfg.GetMasterNode()
827
      cluster_name = lu.cfg.GetClusterName()
828

    
829
    return HooksMaster(lu.op.OP_ID, lu.HPATH, nodes, hooks_execution_fn,
830
                       _RpcResultsToHooksResults, lu.BuildHooksEnv,
831
                       lu.LogWarning, lu.HTYPE, cluster_name, master_name)