Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 87ed6b79

History | View | Annotate | Download (22.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import sys
32
import logging
33
import random
34
import time
35
import itertools
36
import traceback
37

    
38
from ganeti import opcodes
39
from ganeti import opcodes_base
40
from ganeti import constants
41
from ganeti import errors
42
from ganeti import hooksmaster
43
from ganeti import cmdlib
44
from ganeti import locking
45
from ganeti import utils
46
from ganeti import compat
47
from ganeti import wconfd
48

    
49

    
50
_OP_PREFIX = "Op"
51
_LU_PREFIX = "LU"
52

    
53
#: LU classes which don't need to acquire the node allocation lock
54
#: (L{locking.NAL}) when they acquire all node or node resource locks
55
_NODE_ALLOC_WHITELIST = frozenset([])
56

    
57
#: LU classes which don't need to acquire the node allocation lock
58
#: (L{locking.NAL}) in the same mode (shared/exclusive) as the node
59
#: or node resource locks
60
_NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([
61
  cmdlib.LUBackupExport,
62
  cmdlib.LUBackupRemove,
63
  cmdlib.LUOobCommand,
64
  ])
65

    
66

    
67
class LockAcquireTimeout(Exception):
68
  """Exception to report timeouts on acquiring locks.
69

70
  """
71

    
72

    
73
def _CalculateLockAttemptTimeouts():
74
  """Calculate timeouts for lock attempts.
75

76
  """
77
  result = [constants.LOCK_ATTEMPTS_MINWAIT]
78
  running_sum = result[0]
79

    
80
  # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
81
  # blocking acquire
82
  while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
83
    timeout = (result[-1] * 1.05) ** 1.25
84

    
85
    # Cap max timeout. This gives other jobs a chance to run even if
86
    # we're still trying to get our locks, before finally moving to a
87
    # blocking acquire.
88
    timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
89
    # And also cap the lower boundary for safety
90
    timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
91

    
92
    result.append(timeout)
93
    running_sum += timeout
94

    
95
  return result
96

    
97

    
98
class LockAttemptTimeoutStrategy(object):
99
  """Class with lock acquire timeout strategy.
100

101
  """
102
  __slots__ = [
103
    "_timeouts",
104
    "_random_fn",
105
    "_time_fn",
106
    ]
107

    
108
  _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
109

    
110
  def __init__(self, _time_fn=time.time, _random_fn=random.random):
111
    """Initializes this class.
112

113
    @param _time_fn: Time function for unittests
114
    @param _random_fn: Random number generator for unittests
115

116
    """
117
    object.__init__(self)
118

    
119
    self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
120
    self._time_fn = _time_fn
121
    self._random_fn = _random_fn
122

    
123
  def NextAttempt(self):
124
    """Returns the timeout for the next attempt.
125

126
    """
127
    try:
128
      timeout = self._timeouts.next()
129
    except StopIteration:
130
      # No more timeouts, do blocking acquire
131
      timeout = None
132

    
133
    if timeout is not None:
134
      # Add a small variation (-/+ 5%) to timeout. This helps in situations
135
      # where two or more jobs are fighting for the same lock(s).
136
      variation_range = timeout * 0.1
137
      timeout += ((self._random_fn() * variation_range) -
138
                  (variation_range * 0.5))
139

    
140
    return timeout
141

    
142

    
143
class OpExecCbBase: # pylint: disable=W0232
144
  """Base class for OpCode execution callbacks.
145

146
  """
147
  def NotifyStart(self):
148
    """Called when we are about to execute the LU.
149

150
    This function is called when we're about to start the lu's Exec() method,
151
    that is, after we have acquired all locks.
152

153
    """
154

    
155
  def Feedback(self, *args):
156
    """Sends feedback from the LU code to the end-user.
157

158
    """
159

    
160
  def CurrentPriority(self): # pylint: disable=R0201
161
    """Returns current priority or C{None}.
162

163
    """
164
    return None
165

    
166
  def SubmitManyJobs(self, jobs):
167
    """Submits jobs for processing.
168

169
    See L{jqueue.JobQueue.SubmitManyJobs}.
170

171
    """
172
    raise NotImplementedError
173

    
174

    
175
def _LUNameForOpName(opname):
176
  """Computes the LU name for a given OpCode name.
177

178
  """
179
  assert opname.startswith(_OP_PREFIX), \
180
      "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
181

    
182
  return _LU_PREFIX + opname[len(_OP_PREFIX):]
183

    
184

    
185
def _ComputeDispatchTable():
186
  """Computes the opcode-to-lu dispatch table.
187

188
  """
189
  return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
190
              for op in opcodes.OP_MAPPING.values()
191
              if op.WITH_LU)
192

    
193

    
194
def _SetBaseOpParams(src, defcomment, dst):
195
  """Copies basic opcode parameters.
196

197
  @type src: L{opcodes.OpCode}
198
  @param src: Source opcode
199
  @type defcomment: string
200
  @param defcomment: Comment to specify if not already given
201
  @type dst: L{opcodes.OpCode}
202
  @param dst: Destination opcode
203

204
  """
205
  if hasattr(src, "debug_level"):
206
    dst.debug_level = src.debug_level
207

    
208
  if (getattr(dst, "priority", None) is None and
209
      hasattr(src, "priority")):
210
    dst.priority = src.priority
211

    
212
  if not getattr(dst, opcodes_base.COMMENT_ATTR, None):
213
    dst.comment = defcomment
214

    
215
  # FIXME: extend reason trail, showing the derivedness
216
  if not getattr(dst, constants.OPCODE_REASON, None):
217
    dst.reason = getattr(src, constants.OPCODE_REASON, [])
218

    
219

    
220
def _ProcessResult(submit_fn, op, result):
221
  """Examines opcode result.
222

223
  If necessary, additional processing on the result is done.
224

225
  """
226
  if isinstance(result, cmdlib.ResultWithJobs):
227
    # Copy basic parameters (e.g. priority)
228
    map(compat.partial(_SetBaseOpParams, op,
229
                       "Submitted by %s" % op.OP_ID),
230
        itertools.chain(*result.jobs))
231

    
232
    # Submit jobs
233
    job_submission = submit_fn(result.jobs)
234

    
235
    # Build dictionary
236
    result = result.other
237

    
238
    assert constants.JOB_IDS_KEY not in result, \
239
      "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
240

    
241
    result[constants.JOB_IDS_KEY] = job_submission
242

    
243
  return result
244

    
245

    
246
def _FailingSubmitManyJobs(_):
247
  """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
248

249
  """
250
  raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
251
                               " queries) can not submit jobs")
252

    
253

    
254
def _VerifyLocks(lu, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST,
255
                 _nal_whitelist=_NODE_ALLOC_WHITELIST):
256
  """Performs consistency checks on locks acquired by a logical unit.
257

258
  @type lu: L{cmdlib.LogicalUnit}
259
  @param lu: Logical unit instance
260

261
  """
262
  if not __debug__:
263
    return
264

    
265
  allocset = lu.owned_locks(locking.LEVEL_NODE_ALLOC)
266
  have_nal = locking.NAL in allocset
267

    
268
  for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
269
    # TODO: Verify using actual lock mode, not using LU variables
270
    if level in lu.needed_locks:
271
      share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC]
272
      share_level = lu.share_locks[level]
273

    
274
      if lu.__class__ in _mode_whitelist:
275
        assert share_node_alloc != share_level, \
276
          "LU is whitelisted to use different modes for node allocation lock"
277
      else:
278
        assert bool(share_node_alloc) == bool(share_level), \
279
          ("Node allocation lock must be acquired using the same mode as nodes"
280
           " and node resources")
281

    
282
      if lu.__class__ in _nal_whitelist:
283
        assert not have_nal, \
284
          "LU is whitelisted for not acquiring the node allocation lock"
285
      elif lu.needed_locks[level] == locking.ALL_SET:
286
        assert have_nal, \
287
          ("Node allocation lock must be used if an LU acquires all nodes"
288
           " or node resources")
289

    
290

    
291
class Processor(object):
292
  """Object which runs OpCodes"""
293
  DISPATCH_TABLE = _ComputeDispatchTable()
294

    
295
  def __init__(self, context, ec_id, enable_locks=True):
296
    """Constructor for Processor
297

298
    @type context: GanetiContext
299
    @param context: global Ganeti context
300
    @type ec_id: string
301
    @param ec_id: execution context identifier
302

303
    """
304
    self.context = context
305
    self._ec_id = ec_id
306
    self._cbs = None
307
    self.rpc = context.rpc
308
    self.hmclass = hooksmaster.HooksMaster
309
    self._enable_locks = enable_locks
310
    self.wconfd = wconfd # Indirection to allow testing
311

    
312
  def _CheckLocksEnabled(self):
313
    """Checks if locking is enabled.
314

315
    @raise errors.ProgrammerError: In case locking is not enabled
316

317
    """
318
    if not self._enable_locks:
319
      raise errors.ProgrammerError("Attempted to use disabled locks")
320

    
321
  def _AcquireLocks(self, level, names, shared, opportunistic, timeout):
322
    """Acquires locks via the Ganeti lock manager.
323

324
    @type level: int
325
    @param level: Lock level
326
    @type names: list or string
327
    @param names: Lock names
328
    @type shared: bool
329
    @param shared: Whether the locks should be acquired in shared mode
330
    @type opportunistic: bool
331
    @param opportunistic: Whether to acquire opportunistically
332
    @type timeout: None or float
333
    @param timeout: Timeout for acquiring the locks
334
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
335
        amount of time
336

337
    """
338
    self._CheckLocksEnabled()
339

    
340
    # TODO: honor priority in lock allocation
341
    if self._cbs:
342
      priority = self._cbs.CurrentPriority() # pylint: disable=W0612
343
    else:
344
      priority = None
345

    
346
    if names == locking.ALL_SET:
347
      if opportunistic:
348
        expand_fns = {
349
          locking.LEVEL_CLUSTER: (lambda: [locking.BGL]),
350
          locking.LEVEL_INSTANCE: self.context.cfg.GetInstanceList,
351
          locking.LEVEL_NODE_ALLOC: (lambda: [locking.NAL]),
352
          locking.LEVEL_NODEGROUP: self.context.cfg.GetNodeGroupList,
353
          locking.LEVEL_NODE: self.context.cfg.GetNodeList,
354
          locking.LEVEL_NODE_RES: self.context.cfg.GetNodeList,
355
          locking.LEVEL_NETWORK: self.context.cfg.GetNetworkList,
356
          }
357
        names = expand_fns[level]()
358
      else:
359
        names = locking.LOCKSET_NAME
360

    
361
    if isinstance(names, str):
362
      names = [names]
363

    
364
    levelname = locking.LEVEL_NAMES[level]
365
    jid = int(self.GetECId())
366
    livelockfile = self.context.livelock.lockfile.name
367

    
368
    locks = ["%s/%s" % (levelname, lock) for lock in list(names)]
369

    
370
    if not names:
371
      logging.debug("Acquiring no locks for %d (%s) at level %s",
372
                    jid, livelockfile, levelname)
373
      return []
374

    
375
    if shared:
376
      request = [[lock, "shared"] for lock in locks]
377
    else:
378
      request = [[lock, "exclusive"] for lock in locks]
379

    
380
    if opportunistic:
381
      logging.debug("Opportunistically acquring some of %s for %d (%s).",
382
                    locks, jid, livelockfile)
383
      locks = self.wconfd.Client().OpportunisticLockUnion(jid, livelockfile,
384
                                                          request)
385
    elif timeout is None:
386
      while True:
387
        ## TODO: use asynchronous wait instead of polling
388
        blockedon = self.wconfd.Client().TryUpdateLocks(jid, livelockfile,
389
                                                        request)
390
        logging.debug("Requesting %s for %d (%s) blocked on %s",
391
                      request, jid, livelockfile, blockedon)
392
        if not blockedon:
393
          break
394
        time.sleep(random.random())
395
    else:
396
      logging.debug("Trying %ss to request %s for %d (%s)",
397
                    timeout, request, jid, livelockfile)
398
      ## TODO: use blocking wait instead of polling
399
      blocked = utils.SimpleRetry([], self.wconfd.Client().TryUpdateLocks, 0.1,
400
                                  timeout, args=[jid, livelockfile, request])
401
      if blocked:
402
        raise LockAcquireTimeout()
403

    
404
    return locks
405

    
406
  def _ExecLU(self, lu):
407
    """Logical Unit execution sequence.
408

409
    """
410
    write_count = self.context.cfg.write_count
411
    lu.CheckPrereq()
412

    
413
    hm = self.BuildHooksManager(lu)
414
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
415
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
416
                     self.Log, None)
417

    
418
    if getattr(lu.op, "dry_run", False):
419
      # in this mode, no post-hooks are run, and the config is not
420
      # written (as it might have been modified by another LU, and we
421
      # shouldn't do writeout on behalf of other threads
422
      self.LogInfo("dry-run mode requested, not actually executing"
423
                   " the operation")
424
      return lu.dry_run_result
425

    
426
    if self._cbs:
427
      submit_mj_fn = self._cbs.SubmitManyJobs
428
    else:
429
      submit_mj_fn = _FailingSubmitManyJobs
430

    
431
    try:
432
      result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
433
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
434
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
435
                                self.Log, result)
436
    finally:
437
      # FIXME: This needs locks if not lu_class.REQ_BGL
438
      if write_count != self.context.cfg.write_count:
439
        hm.RunConfigUpdate()
440

    
441
    return result
442

    
443
  def BuildHooksManager(self, lu):
444
    return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
445

    
446
  def _LockAndExecLU(self, lu, level, calc_timeout):
447
    """Execute a Logical Unit, with the needed locks.
448

449
    This is a recursive function that starts locking the given level, and
450
    proceeds up, till there are no more locks to acquire. Then it executes the
451
    given LU and its opcodes.
452

453
    """
454
    adding_locks = level in lu.add_locks
455
    acquiring_locks = level in lu.needed_locks
456

    
457
    if level not in locking.LEVELS:
458
      _VerifyLocks(lu)
459

    
460
      if self._cbs:
461
        self._cbs.NotifyStart()
462

    
463
      try:
464
        result = self._ExecLU(lu)
465
      except AssertionError, err:
466
        # this is a bit ugly, as we don't know from which phase
467
        # (prereq, exec) this comes; but it's better than an exception
468
        # with no information
469
        (_, _, tb) = sys.exc_info()
470
        err_info = traceback.format_tb(tb)
471
        del tb
472
        logging.exception("Detected AssertionError")
473
        raise errors.OpExecError("Internal assertion error: please report"
474
                                 " this as a bug.\nError message: '%s';"
475
                                 " location:\n%s" % (str(err), err_info[-1]))
476

    
477
    elif adding_locks and acquiring_locks:
478
      # We could both acquire and add locks at the same level, but for now we
479
      # don't need this, so we'll avoid the complicated code needed.
480
      raise NotImplementedError("Can't declare locks to acquire when adding"
481
                                " others")
482

    
483
    elif adding_locks or acquiring_locks:
484
      self._CheckLocksEnabled()
485

    
486
      lu.DeclareLocks(level)
487
      share = lu.share_locks[level]
488
      opportunistic = lu.opportunistic_locks[level]
489

    
490
      try:
491
        assert adding_locks ^ acquiring_locks, \
492
          "Locks must be either added or acquired"
493

    
494
        if acquiring_locks:
495
          # Acquiring locks
496
          needed_locks = lu.needed_locks[level]
497

    
498
          self._AcquireLocks(level, needed_locks, share, opportunistic,
499
                             calc_timeout())
500
          (jid, livelockfile) = lu.wconfdcontext
501
          lu.wconfdlocks = self.wconfd.Client().ListLocks(jid, livelockfile)
502
        else:
503
          # Adding locks
504
          add_locks = lu.add_locks[level]
505
          if isinstance(add_locks, str):
506
            add_locks = [add_locks]
507
          lu.remove_locks[level] = add_locks
508

    
509
          try:
510
            jid = int(self.GetECId())
511
            livelockfile = self.context.livelock.lockfile.name
512
            levelname = locking.LEVEL_NAMES[level]
513

    
514
            if share:
515
              mode = "shared"
516
            else:
517
              mode = "exclusive"
518

    
519
            request = [["%s/%s" % (levelname, lock), mode]
520
                       for lock in add_locks]
521

    
522
            logging.debug("Requesting %s for %d (%s)",
523
                          request, jid, livelockfile)
524
            blocked = \
525
              self.wconfd.Client().TryUpdateLocks(jid, livelockfile, request)
526
            assert blocked == [], "Allocating newly 'created' locks failed"
527
            (jid, livelockfile) = lu.wconfdcontext
528
            lu.wconfdlocks = self.wconfd.Client().ListLocks(jid, livelockfile)
529
          except errors.GenericError:
530
            # TODO: verify what actually caused the error
531
            logging.exception("Detected lock error in level %s for locks"
532
                              " %s, shared=%s", level, add_locks, share)
533
            raise errors.OpPrereqError(
534
              "Couldn't add locks (%s), most likely because of another"
535
              " job who added them first" % add_locks,
536
              errors.ECODE_NOTUNIQUE)
537

    
538
        try:
539
          result = self._LockAndExecLU(lu, level + 1, calc_timeout)
540
        finally:
541
          if level in lu.remove_locks:
542
            jid = int(self.GetECId())
543
            livelockfile = self.context.livelock.lockfile.name
544
            levelname = locking.LEVEL_NAMES[level]
545
            request = [["%s/%s" % (levelname, lock), "release"]
546
                       for lock in lu.remove_locks[level]]
547
            blocked = \
548
              self.wconfd.Client().TryUpdateLocks(jid, livelockfile, request)
549
            assert blocked == [], "Release may not fail"
550
      finally:
551
        jid = int(self.GetECId())
552
        livelockfile = self.context.livelock.lockfile.name
553
        levelname = locking.LEVEL_NAMES[level]
554
        logging.debug("Freeing locks at level %s for %d (%s)",
555
                      levelname, jid, livelockfile)
556
        self.wconfd.Client().FreeLocksLevel(jid, livelockfile, levelname)
557
    else:
558
      result = self._LockAndExecLU(lu, level + 1, calc_timeout)
559

    
560
    return result
561

    
562
  # pylint: disable=R0201
563
  def _CheckLUResult(self, op, result):
564
    """Check the LU result against the contract in the opcode.
565

566
    """
567
    resultcheck_fn = op.OP_RESULT
568
    if not (resultcheck_fn is None or resultcheck_fn(result)):
569
      logging.error("Expected opcode result matching %s, got %s",
570
                    resultcheck_fn, result)
571
      if not getattr(op, "dry_run", False):
572
        # FIXME: LUs should still behave in dry_run mode, or
573
        # alternately we should have OP_DRYRUN_RESULT; in the
574
        # meantime, we simply skip the OP_RESULT check in dry-run mode
575
        raise errors.OpResultError("Opcode result does not match %s: %s" %
576
                                   (resultcheck_fn, utils.Truncate(result, 80)))
577

    
578
  def ExecOpCode(self, op, cbs, timeout=None):
579
    """Execute an opcode.
580

581
    @type op: an OpCode instance
582
    @param op: the opcode to be executed
583
    @type cbs: L{OpExecCbBase}
584
    @param cbs: Runtime callbacks
585
    @type timeout: float or None
586
    @param timeout: Maximum time to acquire all locks, None for no timeout
587
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
588
        amount of time
589

590
    """
591
    if not isinstance(op, opcodes.OpCode):
592
      raise errors.ProgrammerError("Non-opcode instance passed"
593
                                   " to ExecOpcode (%s)" % type(op))
594

    
595
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
596
    if lu_class is None:
597
      raise errors.OpCodeUnknown("Unknown opcode")
598

    
599
    if timeout is None:
600
      calc_timeout = lambda: None
601
    else:
602
      calc_timeout = utils.RunningTimeout(timeout, False).Remaining
603

    
604
    self._cbs = cbs
605
    try:
606
      if self._enable_locks:
607
        # Acquire the Big Ganeti Lock exclusively if this LU requires it,
608
        # and in a shared fashion otherwise (to prevent concurrent run with
609
        # an exclusive LU.
610
        self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
611
                            not lu_class.REQ_BGL, False, calc_timeout())
612
      elif lu_class.REQ_BGL:
613
        raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
614
                                     " disabled" % op.OP_ID)
615

    
616
      try:
617
        jid = int(self.GetECId())
618
        livelockfile = self.context.livelock.lockfile.name
619
        lu = lu_class(self, op, self.context, self.rpc, (jid, livelockfile),
620
                      self.wconfd)
621
        lu.wconfdlocks = self.wconfd.Client().ListLocks(jid, livelockfile)
622
        lu.ExpandNames()
623
        assert lu.needed_locks is not None, "needed_locks not set by LU"
624

    
625
        try:
626
          result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
627
                                       calc_timeout)
628
        finally:
629
          if self._ec_id:
630
            self.context.cfg.DropECReservations(self._ec_id)
631
      finally:
632
        # Release BGL if owned
633
        jid = int(self.GetECId())
634
        livelockfile = self.context.livelock.lockfile.name
635
        bglname = "%s/%s" % (locking.LEVEL_NAMES[locking.LEVEL_CLUSTER],
636
                             locking.BGL)
637
        self.wconfd.Client().TryUpdateLocks(jid, livelockfile,
638
                                            [[bglname, "release"]])
639
    finally:
640
      self._cbs = None
641

    
642
    self._CheckLUResult(op, result)
643

    
644
    return result
645

    
646
  def Log(self, *args):
647
    """Forward call to feedback callback function.
648

649
    """
650
    if self._cbs:
651
      self._cbs.Feedback(*args)
652

    
653
  def LogStep(self, current, total, message):
654
    """Log a change in LU execution progress.
655

656
    """
657
    logging.debug("Step %d/%d %s", current, total, message)
658
    self.Log("STEP %d/%d %s" % (current, total, message))
659

    
660
  def LogWarning(self, message, *args, **kwargs):
661
    """Log a warning to the logs and the user.
662

663
    The optional keyword argument is 'hint' and can be used to show a
664
    hint to the user (presumably related to the warning). If the
665
    message is empty, it will not be printed at all, allowing one to
666
    show only a hint.
667

668
    """
669
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
670
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
671
    if args:
672
      message = message % tuple(args)
673
    if message:
674
      logging.warning(message)
675
      self.Log(" - WARNING: %s" % message)
676
    if "hint" in kwargs:
677
      self.Log("      Hint: %s" % kwargs["hint"])
678

    
679
  def LogInfo(self, message, *args):
680
    """Log an informational message to the logs and the user.
681

682
    """
683
    if args:
684
      message = message % tuple(args)
685
    logging.info(message)
686
    self.Log(" - INFO: %s" % message)
687

    
688
  def GetECId(self):
689
    """Returns the current execution context ID.
690

691
    """
692
    if not self._ec_id:
693
      raise errors.ProgrammerError("Tried to use execution context id when"
694
                                   " not set")
695
    return self._ec_id