Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 0fa753ba

History | View | Annotate | Download (19.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import logging
32
import random
33
import time
34

    
35
from ganeti import opcodes
36
from ganeti import constants
37
from ganeti import errors
38
from ganeti import rpc
39
from ganeti import cmdlib
40
from ganeti import locking
41
from ganeti import utils
42
from ganeti import compat
43

    
44

    
45
_OP_PREFIX = "Op"
46
_LU_PREFIX = "LU"
47

    
48

    
49
class LockAcquireTimeout(Exception):
50
  """Exception to report timeouts on acquiring locks.
51

52
  """
53

    
54

    
55
def _CalculateLockAttemptTimeouts():
56
  """Calculate timeouts for lock attempts.
57

58
  """
59
  result = [constants.LOCK_ATTEMPTS_MINWAIT]
60
  running_sum = result[0]
61

    
62
  # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
63
  # blocking acquire
64
  while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
65
    timeout = (result[-1] * 1.05) ** 1.25
66

    
67
    # Cap max timeout. This gives other jobs a chance to run even if
68
    # we're still trying to get our locks, before finally moving to a
69
    # blocking acquire.
70
    timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
71
    # And also cap the lower boundary for safety
72
    timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
73

    
74
    result.append(timeout)
75
    running_sum += timeout
76

    
77
  return result
78

    
79

    
80
class LockAttemptTimeoutStrategy(object):
81
  """Class with lock acquire timeout strategy.
82

83
  """
84
  __slots__ = [
85
    "_timeouts",
86
    "_random_fn",
87
    "_time_fn",
88
    ]
89

    
90
  _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
91

    
92
  def __init__(self, _time_fn=time.time, _random_fn=random.random):
93
    """Initializes this class.
94

95
    @param _time_fn: Time function for unittests
96
    @param _random_fn: Random number generator for unittests
97

98
    """
99
    object.__init__(self)
100

    
101
    self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
102
    self._time_fn = _time_fn
103
    self._random_fn = _random_fn
104

    
105
  def NextAttempt(self):
106
    """Returns the timeout for the next attempt.
107

108
    """
109
    try:
110
      timeout = self._timeouts.next()
111
    except StopIteration:
112
      # No more timeouts, do blocking acquire
113
      timeout = None
114

    
115
    if timeout is not None:
116
      # Add a small variation (-/+ 5%) to timeout. This helps in situations
117
      # where two or more jobs are fighting for the same lock(s).
118
      variation_range = timeout * 0.1
119
      timeout += ((self._random_fn() * variation_range) -
120
                  (variation_range * 0.5))
121

    
122
    return timeout
123

    
124

    
125
class OpExecCbBase: # pylint: disable=W0232
126
  """Base class for OpCode execution callbacks.
127

128
  """
129
  def NotifyStart(self):
130
    """Called when we are about to execute the LU.
131

132
    This function is called when we're about to start the lu's Exec() method,
133
    that is, after we have acquired all locks.
134

135
    """
136

    
137
  def Feedback(self, *args):
138
    """Sends feedback from the LU code to the end-user.
139

140
    """
141

    
142
  def CheckCancel(self):
143
    """Check whether job has been cancelled.
144

145
    """
146

    
147
  def SubmitManyJobs(self, jobs):
148
    """Submits jobs for processing.
149

150
    See L{jqueue.JobQueue.SubmitManyJobs}.
151

152
    """
153
    raise NotImplementedError
154

    
155

    
156
def _LUNameForOpName(opname):
157
  """Computes the LU name for a given OpCode name.
158

159
  """
160
  assert opname.startswith(_OP_PREFIX), \
161
      "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
162

    
163
  return _LU_PREFIX + opname[len(_OP_PREFIX):]
164

    
165

    
166
def _ComputeDispatchTable():
167
  """Computes the opcode-to-lu dispatch table.
168

169
  """
170
  return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
171
              for op in opcodes.OP_MAPPING.values()
172
              if op.WITH_LU)
173

    
174

    
175
class Processor(object):
176
  """Object which runs OpCodes"""
177
  DISPATCH_TABLE = _ComputeDispatchTable()
178

    
179
  def __init__(self, context, ec_id, enable_locks=True):
180
    """Constructor for Processor
181

182
    @type context: GanetiContext
183
    @param context: global Ganeti context
184
    @type ec_id: string
185
    @param ec_id: execution context identifier
186

187
    """
188
    self.context = context
189
    self._ec_id = ec_id
190
    self._cbs = None
191
    self.rpc = rpc.RpcRunner(context.cfg)
192
    self.hmclass = HooksMaster
193
    self._enable_locks = enable_locks
194

    
195
  def _CheckLocksEnabled(self):
196
    """Checks if locking is enabled.
197

198
    @raise errors.ProgrammerError: In case locking is not enabled
199

200
    """
201
    if not self._enable_locks:
202
      raise errors.ProgrammerError("Attempted to use disabled locks")
203

    
204
  def _AcquireLocks(self, level, names, shared, timeout, priority):
205
    """Acquires locks via the Ganeti lock manager.
206

207
    @type level: int
208
    @param level: Lock level
209
    @type names: list or string
210
    @param names: Lock names
211
    @type shared: bool
212
    @param shared: Whether the locks should be acquired in shared mode
213
    @type timeout: None or float
214
    @param timeout: Timeout for acquiring the locks
215
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
216
        amount of time
217

218
    """
219
    self._CheckLocksEnabled()
220

    
221
    if self._cbs:
222
      self._cbs.CheckCancel()
223

    
224
    acquired = self.context.glm.acquire(level, names, shared=shared,
225
                                        timeout=timeout, priority=priority)
226

    
227
    if acquired is None:
228
      raise LockAcquireTimeout()
229

    
230
    return acquired
231

    
232
  def _ProcessResult(self, result):
233
    """Examines opcode result.
234

235
    If necessary, additional processing on the result is done.
236

237
    """
238
    if isinstance(result, cmdlib.ResultWithJobs):
239
      # Submit jobs
240
      job_submission = self._cbs.SubmitManyJobs(result.jobs)
241

    
242
      # Build dictionary
243
      result = result.other
244

    
245
      assert constants.JOB_IDS_KEY not in result, \
246
        "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
247

    
248
      result[constants.JOB_IDS_KEY] = job_submission
249

    
250
    return result
251

    
252
  def _ExecLU(self, lu):
253
    """Logical Unit execution sequence.
254

255
    """
256
    write_count = self.context.cfg.write_count
257
    lu.CheckPrereq()
258
    hm = HooksMaster(self.rpc.call_hooks_runner, lu)
259
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
260
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
261
                     self.Log, None)
262

    
263
    if getattr(lu.op, "dry_run", False):
264
      # in this mode, no post-hooks are run, and the config is not
265
      # written (as it might have been modified by another LU, and we
266
      # shouldn't do writeout on behalf of other threads
267
      self.LogInfo("dry-run mode requested, not actually executing"
268
                   " the operation")
269
      return lu.dry_run_result
270

    
271
    try:
272
      result = self._ProcessResult(lu.Exec(self.Log))
273
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
274
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
275
                                self.Log, result)
276
    finally:
277
      # FIXME: This needs locks if not lu_class.REQ_BGL
278
      if write_count != self.context.cfg.write_count:
279
        hm.RunConfigUpdate()
280

    
281
    return result
282

    
283
  def _LockAndExecLU(self, lu, level, calc_timeout, priority):
284
    """Execute a Logical Unit, with the needed locks.
285

286
    This is a recursive function that starts locking the given level, and
287
    proceeds up, till there are no more locks to acquire. Then it executes the
288
    given LU and its opcodes.
289

290
    """
291
    adding_locks = level in lu.add_locks
292
    acquiring_locks = level in lu.needed_locks
293
    if level not in locking.LEVELS:
294
      if self._cbs:
295
        self._cbs.NotifyStart()
296

    
297
      result = self._ExecLU(lu)
298

    
299
    elif adding_locks and acquiring_locks:
300
      # We could both acquire and add locks at the same level, but for now we
301
      # don't need this, so we'll avoid the complicated code needed.
302
      raise NotImplementedError("Can't declare locks to acquire when adding"
303
                                " others")
304

    
305
    elif adding_locks or acquiring_locks:
306
      self._CheckLocksEnabled()
307

    
308
      lu.DeclareLocks(level)
309
      share = lu.share_locks[level]
310

    
311
      try:
312
        assert adding_locks ^ acquiring_locks, \
313
          "Locks must be either added or acquired"
314

    
315
        if acquiring_locks:
316
          # Acquiring locks
317
          needed_locks = lu.needed_locks[level]
318

    
319
          self._AcquireLocks(level, needed_locks, share,
320
                             calc_timeout(), priority)
321
        else:
322
          # Adding locks
323
          add_locks = lu.add_locks[level]
324
          lu.remove_locks[level] = add_locks
325

    
326
          try:
327
            self.context.glm.add(level, add_locks, acquired=1, shared=share)
328
          except errors.LockError:
329
            raise errors.OpPrereqError(
330
              "Couldn't add locks (%s), probably because of a race condition"
331
              " with another job, who added them first" % add_locks,
332
              errors.ECODE_FAULT)
333

    
334
        try:
335
          result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
336
        finally:
337
          if level in lu.remove_locks:
338
            self.context.glm.remove(level, lu.remove_locks[level])
339
      finally:
340
        if self.context.glm.is_owned(level):
341
          self.context.glm.release(level)
342

    
343
    else:
344
      result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
345

    
346
    return result
347

    
348
  def ExecOpCode(self, op, cbs, timeout=None, priority=None):
349
    """Execute an opcode.
350

351
    @type op: an OpCode instance
352
    @param op: the opcode to be executed
353
    @type cbs: L{OpExecCbBase}
354
    @param cbs: Runtime callbacks
355
    @type timeout: float or None
356
    @param timeout: Maximum time to acquire all locks, None for no timeout
357
    @type priority: number or None
358
    @param priority: Priority for acquiring lock(s)
359
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
360
        amount of time
361

362
    """
363
    if not isinstance(op, opcodes.OpCode):
364
      raise errors.ProgrammerError("Non-opcode instance passed"
365
                                   " to ExecOpcode (%s)" % type(op))
366

    
367
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
368
    if lu_class is None:
369
      raise errors.OpCodeUnknown("Unknown opcode")
370

    
371
    if timeout is None:
372
      calc_timeout = lambda: None
373
    else:
374
      calc_timeout = utils.RunningTimeout(timeout, False).Remaining
375

    
376
    self._cbs = cbs
377
    try:
378
      if self._enable_locks:
379
        # Acquire the Big Ganeti Lock exclusively if this LU requires it,
380
        # and in a shared fashion otherwise (to prevent concurrent run with
381
        # an exclusive LU.
382
        self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
383
                            not lu_class.REQ_BGL, calc_timeout(),
384
                            priority)
385
      elif lu_class.REQ_BGL:
386
        raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
387
                                     " disabled" % op.OP_ID)
388

    
389
      try:
390
        lu = lu_class(self, op, self.context, self.rpc)
391
        lu.ExpandNames()
392
        assert lu.needed_locks is not None, "needed_locks not set by LU"
393

    
394
        try:
395
          result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
396
                                       priority)
397
        finally:
398
          if self._ec_id:
399
            self.context.cfg.DropECReservations(self._ec_id)
400
      finally:
401
        # Release BGL if owned
402
        if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
403
          assert self._enable_locks
404
          self.context.glm.release(locking.LEVEL_CLUSTER)
405
    finally:
406
      self._cbs = None
407

    
408
    resultcheck_fn = op.OP_RESULT
409
    if not (resultcheck_fn is None or resultcheck_fn(result)):
410
      logging.error("Expected opcode result matching %s, got %s",
411
                    resultcheck_fn, result)
412
      raise errors.OpResultError("Opcode result does not match %s" %
413
                                 resultcheck_fn)
414

    
415
    return result
416

    
417
  def Log(self, *args):
418
    """Forward call to feedback callback function.
419

420
    """
421
    if self._cbs:
422
      self._cbs.Feedback(*args)
423

    
424
  def LogStep(self, current, total, message):
425
    """Log a change in LU execution progress.
426

427
    """
428
    logging.debug("Step %d/%d %s", current, total, message)
429
    self.Log("STEP %d/%d %s" % (current, total, message))
430

    
431
  def LogWarning(self, message, *args, **kwargs):
432
    """Log a warning to the logs and the user.
433

434
    The optional keyword argument is 'hint' and can be used to show a
435
    hint to the user (presumably related to the warning). If the
436
    message is empty, it will not be printed at all, allowing one to
437
    show only a hint.
438

439
    """
440
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
441
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
442
    if args:
443
      message = message % tuple(args)
444
    if message:
445
      logging.warning(message)
446
      self.Log(" - WARNING: %s" % message)
447
    if "hint" in kwargs:
448
      self.Log("      Hint: %s" % kwargs["hint"])
449

    
450
  def LogInfo(self, message, *args):
451
    """Log an informational message to the logs and the user.
452

453
    """
454
    if args:
455
      message = message % tuple(args)
456
    logging.info(message)
457
    self.Log(" - INFO: %s" % message)
458

    
459
  def GetECId(self):
460
    """Returns the current execution context ID.
461

462
    """
463
    if not self._ec_id:
464
      raise errors.ProgrammerError("Tried to use execution context id when"
465
                                   " not set")
466
    return self._ec_id
467

    
468

    
469
class HooksMaster(object):
470
  """Hooks master.
471

472
  This class distributes the run commands to the nodes based on the
473
  specific LU class.
474

475
  In order to remove the direct dependency on the rpc module, the
476
  constructor needs a function which actually does the remote
477
  call. This will usually be rpc.call_hooks_runner, but any function
478
  which behaves the same works.
479

480
  """
481
  def __init__(self, callfn, lu):
482
    self.callfn = callfn
483
    self.lu = lu
484
    self.op = lu.op
485
    self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
486

    
487
    if self.lu.HPATH is None:
488
      nodes = (None, None)
489
    else:
490
      nodes = map(frozenset, self.lu.BuildHooksNodes())
491

    
492
    (self.pre_nodes, self.post_nodes) = nodes
493

    
494
  def _BuildEnv(self, phase):
495
    """Compute the environment and the target nodes.
496

497
    Based on the opcode and the current node list, this builds the
498
    environment for the hooks and the target node list for the run.
499

500
    """
501
    if phase == constants.HOOKS_PHASE_PRE:
502
      prefix = "GANETI_"
503
    elif phase == constants.HOOKS_PHASE_POST:
504
      prefix = "GANETI_POST_"
505
    else:
506
      raise AssertionError("Unknown phase '%s'" % phase)
507

    
508
    env = {}
509

    
510
    if self.lu.HPATH is not None:
511
      lu_env = self.lu.BuildHooksEnv()
512
      if lu_env:
513
        assert not compat.any(key.upper().startswith(prefix) for key in lu_env)
514
        env.update(("%s%s" % (prefix, key), value)
515
                   for (key, value) in lu_env.items())
516

    
517
    if phase == constants.HOOKS_PHASE_PRE:
518
      assert compat.all((key.startswith("GANETI_") and
519
                         not key.startswith("GANETI_POST_"))
520
                        for key in env)
521

    
522
    elif phase == constants.HOOKS_PHASE_POST:
523
      assert compat.all(key.startswith("GANETI_POST_") for key in env)
524
      assert isinstance(self.pre_env, dict)
525

    
526
      # Merge with pre-phase environment
527
      assert not compat.any(key.startswith("GANETI_POST_")
528
                            for key in self.pre_env)
529
      env.update(self.pre_env)
530
    else:
531
      raise AssertionError("Unknown phase '%s'" % phase)
532

    
533
    return env
534

    
535
  def _RunWrapper(self, node_list, hpath, phase, phase_env):
536
    """Simple wrapper over self.callfn.
537

538
    This method fixes the environment before doing the rpc call.
539

540
    """
541
    cfg = self.lu.cfg
542

    
543
    env = {
544
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
545
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
546
      "GANETI_OP_CODE": self.op.OP_ID,
547
      "GANETI_DATA_DIR": constants.DATA_DIR,
548
      "GANETI_HOOKS_PHASE": phase,
549
      "GANETI_HOOKS_PATH": hpath,
550
      }
551

    
552
    if self.lu.HTYPE:
553
      env["GANETI_OBJECT_TYPE"] = self.lu.HTYPE
554

    
555
    if cfg is not None:
556
      env["GANETI_CLUSTER"] = cfg.GetClusterName()
557
      env["GANETI_MASTER"] = cfg.GetMasterNode()
558

    
559
    if phase_env:
560
      assert not (set(env) & set(phase_env)), "Environment variables conflict"
561
      env.update(phase_env)
562

    
563
    # Convert everything to strings
564
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
565

    
566
    assert compat.all(key == "PATH" or key.startswith("GANETI_")
567
                      for key in env)
568

    
569
    return self.callfn(node_list, hpath, phase, env)
570

    
571
  def RunPhase(self, phase, nodes=None):
572
    """Run all the scripts for a phase.
573

574
    This is the main function of the HookMaster.
575

576
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
577
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
578
    @param nodes: overrides the predefined list of nodes for the given phase
579
    @return: the processed results of the hooks multi-node rpc call
580
    @raise errors.HooksFailure: on communication failure to the nodes
581
    @raise errors.HooksAbort: on failure of one of the hooks
582

583
    """
584
    if phase == constants.HOOKS_PHASE_PRE:
585
      if nodes is None:
586
        nodes = self.pre_nodes
587
      env = self.pre_env
588
    elif phase == constants.HOOKS_PHASE_POST:
589
      if nodes is None:
590
        nodes = self.post_nodes
591
      env = self._BuildEnv(phase)
592
    else:
593
      raise AssertionError("Unknown phase '%s'" % phase)
594

    
595
    if not nodes:
596
      # empty node list, we should not attempt to run this as either
597
      # we're in the cluster init phase and the rpc client part can't
598
      # even attempt to run, or this LU doesn't do hooks at all
599
      return
600

    
601
    results = self._RunWrapper(nodes, self.lu.HPATH, phase, env)
602
    if not results:
603
      msg = "Communication Failure"
604
      if phase == constants.HOOKS_PHASE_PRE:
605
        raise errors.HooksFailure(msg)
606
      else:
607
        self.lu.LogWarning(msg)
608
        return results
609

    
610
    errs = []
611
    for node_name in results:
612
      res = results[node_name]
613
      if res.offline:
614
        continue
615

    
616
      msg = res.fail_msg
617
      if msg:
618
        self.lu.LogWarning("Communication failure to node %s: %s",
619
                           node_name, msg)
620
        continue
621

    
622
      for script, hkr, output in res.payload:
623
        if hkr == constants.HKR_FAIL:
624
          if phase == constants.HOOKS_PHASE_PRE:
625
            errs.append((node_name, script, output))
626
          else:
627
            if not output:
628
              output = "(no output)"
629
            self.lu.LogWarning("On %s script %s failed, output: %s" %
630
                               (node_name, script, output))
631

    
632
    if errs and phase == constants.HOOKS_PHASE_PRE:
633
      raise errors.HooksAbort(errs)
634

    
635
    return results
636

    
637
  def RunConfigUpdate(self):
638
    """Run the special configuration update hook
639

640
    This is a special hook that runs only on the master after each
641
    top-level LI if the configuration has been updated.
642

643
    """
644
    phase = constants.HOOKS_PHASE_POST
645
    hpath = constants.HOOKS_NAME_CFGUPDATE
646
    nodes = [self.lu.cfg.GetMasterNode()]
647
    self._RunWrapper(nodes, hpath, phase, self.pre_env)