Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 25a8792c

History | View | Annotate | Download (327.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0201
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
import os
30
import os.path
31
import time
32
import re
33
import platform
34
import logging
35
import copy
36
import OpenSSL
37

    
38
from ganeti import ssh
39
from ganeti import utils
40
from ganeti import errors
41
from ganeti import hypervisor
42
from ganeti import locking
43
from ganeti import constants
44
from ganeti import objects
45
from ganeti import serializer
46
from ganeti import ssconf
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq (except when tasklets are used)
55
    - implement Exec (except when tasklets are used)
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  @ivar dry_run_result: the value (if any) that will be returned to the caller
64
      in dry-run mode (signalled by opcode dry_run parameter)
65

66
  """
67
  HPATH = None
68
  HTYPE = None
69
  _OP_REQP = []
70
  REQ_BGL = True
71

    
72
  def __init__(self, processor, op, context, rpc):
73
    """Constructor for LogicalUnit.
74

75
    This needs to be overridden in derived classes in order to check op
76
    validity.
77

78
    """
79
    self.proc = processor
80
    self.op = op
81
    self.cfg = context.cfg
82
    self.context = context
83
    self.rpc = rpc
84
    # Dicts used to declare locking needs to mcpu
85
    self.needed_locks = None
86
    self.acquired_locks = {}
87
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
88
    self.add_locks = {}
89
    self.remove_locks = {}
90
    # Used to force good behavior when calling helper functions
91
    self.recalculate_locks = {}
92
    self.__ssh = None
93
    # logging
94
    self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
95
    self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
96
    self.LogStep = processor.LogStep # pylint: disable-msg=C0103
97
    # support for dry-run
98
    self.dry_run_result = None
99
    # support for generic debug attribute
100
    if (not hasattr(self.op, "debug_level") or
101
        not isinstance(self.op.debug_level, int)):
102
      self.op.debug_level = 0
103

    
104
    # Tasklets
105
    self.tasklets = None
106

    
107
    for attr_name in self._OP_REQP:
108
      attr_val = getattr(op, attr_name, None)
109
      if attr_val is None:
110
        raise errors.OpPrereqError("Required parameter '%s' missing" %
111
                                   attr_name, errors.ECODE_INVAL)
112

    
113
    self.CheckArguments()
114

    
115
  def __GetSSH(self):
116
    """Returns the SshRunner object
117

118
    """
119
    if not self.__ssh:
120
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
121
    return self.__ssh
122

    
123
  ssh = property(fget=__GetSSH)
124

    
125
  def CheckArguments(self):
126
    """Check syntactic validity for the opcode arguments.
127

128
    This method is for doing a simple syntactic check and ensure
129
    validity of opcode parameters, without any cluster-related
130
    checks. While the same can be accomplished in ExpandNames and/or
131
    CheckPrereq, doing these separate is better because:
132

133
      - ExpandNames is left as as purely a lock-related function
134
      - CheckPrereq is run after we have acquired locks (and possible
135
        waited for them)
136

137
    The function is allowed to change the self.op attribute so that
138
    later methods can no longer worry about missing parameters.
139

140
    """
141
    pass
142

    
143
  def ExpandNames(self):
144
    """Expand names for this LU.
145

146
    This method is called before starting to execute the opcode, and it should
147
    update all the parameters of the opcode to their canonical form (e.g. a
148
    short node name must be fully expanded after this method has successfully
149
    completed). This way locking, hooks, logging, ecc. can work correctly.
150

151
    LUs which implement this method must also populate the self.needed_locks
152
    member, as a dict with lock levels as keys, and a list of needed lock names
153
    as values. Rules:
154

155
      - use an empty dict if you don't need any lock
156
      - if you don't need any lock at a particular level omit that level
157
      - don't put anything for the BGL level
158
      - if you want all locks at a level use locking.ALL_SET as a value
159

160
    If you need to share locks (rather than acquire them exclusively) at one
161
    level you can modify self.share_locks, setting a true value (usually 1) for
162
    that level. By default locks are not shared.
163

164
    This function can also define a list of tasklets, which then will be
165
    executed in order instead of the usual LU-level CheckPrereq and Exec
166
    functions, if those are not defined by the LU.
167

168
    Examples::
169

170
      # Acquire all nodes and one instance
171
      self.needed_locks = {
172
        locking.LEVEL_NODE: locking.ALL_SET,
173
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
174
      }
175
      # Acquire just two nodes
176
      self.needed_locks = {
177
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
178
      }
179
      # Acquire no locks
180
      self.needed_locks = {} # No, you can't leave it to the default value None
181

182
    """
183
    # The implementation of this method is mandatory only if the new LU is
184
    # concurrent, so that old LUs don't need to be changed all at the same
185
    # time.
186
    if self.REQ_BGL:
187
      self.needed_locks = {} # Exclusive LUs don't need locks.
188
    else:
189
      raise NotImplementedError
190

    
191
  def DeclareLocks(self, level):
192
    """Declare LU locking needs for a level
193

194
    While most LUs can just declare their locking needs at ExpandNames time,
195
    sometimes there's the need to calculate some locks after having acquired
196
    the ones before. This function is called just before acquiring locks at a
197
    particular level, but after acquiring the ones at lower levels, and permits
198
    such calculations. It can be used to modify self.needed_locks, and by
199
    default it does nothing.
200

201
    This function is only called if you have something already set in
202
    self.needed_locks for the level.
203

204
    @param level: Locking level which is going to be locked
205
    @type level: member of ganeti.locking.LEVELS
206

207
    """
208

    
209
  def CheckPrereq(self):
210
    """Check prerequisites for this LU.
211

212
    This method should check that the prerequisites for the execution
213
    of this LU are fulfilled. It can do internode communication, but
214
    it should be idempotent - no cluster or system changes are
215
    allowed.
216

217
    The method should raise errors.OpPrereqError in case something is
218
    not fulfilled. Its return value is ignored.
219

220
    This method should also update all the parameters of the opcode to
221
    their canonical form if it hasn't been done by ExpandNames before.
222

223
    """
224
    if self.tasklets is not None:
225
      for (idx, tl) in enumerate(self.tasklets):
226
        logging.debug("Checking prerequisites for tasklet %s/%s",
227
                      idx + 1, len(self.tasklets))
228
        tl.CheckPrereq()
229
    else:
230
      raise NotImplementedError
231

    
232
  def Exec(self, feedback_fn):
233
    """Execute the LU.
234

235
    This method should implement the actual work. It should raise
236
    errors.OpExecError for failures that are somewhat dealt with in
237
    code, or expected.
238

239
    """
240
    if self.tasklets is not None:
241
      for (idx, tl) in enumerate(self.tasklets):
242
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
243
        tl.Exec(feedback_fn)
244
    else:
245
      raise NotImplementedError
246

    
247
  def BuildHooksEnv(self):
248
    """Build hooks environment for this LU.
249

250
    This method should return a three-node tuple consisting of: a dict
251
    containing the environment that will be used for running the
252
    specific hook for this LU, a list of node names on which the hook
253
    should run before the execution, and a list of node names on which
254
    the hook should run after the execution.
255

256
    The keys of the dict must not have 'GANETI_' prefixed as this will
257
    be handled in the hooks runner. Also note additional keys will be
258
    added by the hooks runner. If the LU doesn't define any
259
    environment, an empty dict (and not None) should be returned.
260

261
    No nodes should be returned as an empty list (and not None).
262

263
    Note that if the HPATH for a LU class is None, this function will
264
    not be called.
265

266
    """
267
    raise NotImplementedError
268

    
269
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
270
    """Notify the LU about the results of its hooks.
271

272
    This method is called every time a hooks phase is executed, and notifies
273
    the Logical Unit about the hooks' result. The LU can then use it to alter
274
    its result based on the hooks.  By default the method does nothing and the
275
    previous result is passed back unchanged but any LU can define it if it
276
    wants to use the local cluster hook-scripts somehow.
277

278
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
279
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
280
    @param hook_results: the results of the multi-node hooks rpc call
281
    @param feedback_fn: function used send feedback back to the caller
282
    @param lu_result: the previous Exec result this LU had, or None
283
        in the PRE phase
284
    @return: the new Exec result, based on the previous result
285
        and hook results
286

287
    """
288
    # API must be kept, thus we ignore the unused argument and could
289
    # be a function warnings
290
    # pylint: disable-msg=W0613,R0201
291
    return lu_result
292

    
293
  def _ExpandAndLockInstance(self):
294
    """Helper function to expand and lock an instance.
295

296
    Many LUs that work on an instance take its name in self.op.instance_name
297
    and need to expand it and then declare the expanded name for locking. This
298
    function does it, and then updates self.op.instance_name to the expanded
299
    name. It also initializes needed_locks as a dict, if this hasn't been done
300
    before.
301

302
    """
303
    if self.needed_locks is None:
304
      self.needed_locks = {}
305
    else:
306
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
307
        "_ExpandAndLockInstance called with instance-level locks set"
308
    self.op.instance_name = _ExpandInstanceName(self.cfg,
309
                                                self.op.instance_name)
310
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
311

    
312
  def _LockInstancesNodes(self, primary_only=False):
313
    """Helper function to declare instances' nodes for locking.
314

315
    This function should be called after locking one or more instances to lock
316
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
317
    with all primary or secondary nodes for instances already locked and
318
    present in self.needed_locks[locking.LEVEL_INSTANCE].
319

320
    It should be called from DeclareLocks, and for safety only works if
321
    self.recalculate_locks[locking.LEVEL_NODE] is set.
322

323
    In the future it may grow parameters to just lock some instance's nodes, or
324
    to just lock primaries or secondary nodes, if needed.
325

326
    If should be called in DeclareLocks in a way similar to::
327

328
      if level == locking.LEVEL_NODE:
329
        self._LockInstancesNodes()
330

331
    @type primary_only: boolean
332
    @param primary_only: only lock primary nodes of locked instances
333

334
    """
335
    assert locking.LEVEL_NODE in self.recalculate_locks, \
336
      "_LockInstancesNodes helper function called with no nodes to recalculate"
337

    
338
    # TODO: check if we're really been called with the instance locks held
339

    
340
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
341
    # future we might want to have different behaviors depending on the value
342
    # of self.recalculate_locks[locking.LEVEL_NODE]
343
    wanted_nodes = []
344
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
345
      instance = self.context.cfg.GetInstanceInfo(instance_name)
346
      wanted_nodes.append(instance.primary_node)
347
      if not primary_only:
348
        wanted_nodes.extend(instance.secondary_nodes)
349

    
350
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
351
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
352
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
353
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
354

    
355
    del self.recalculate_locks[locking.LEVEL_NODE]
356

    
357

    
358
class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
359
  """Simple LU which runs no hooks.
360

361
  This LU is intended as a parent for other LogicalUnits which will
362
  run no hooks, in order to reduce duplicate code.
363

364
  """
365
  HPATH = None
366
  HTYPE = None
367

    
368
  def BuildHooksEnv(self):
369
    """Empty BuildHooksEnv for NoHooksLu.
370

371
    This just raises an error.
372

373
    """
374
    assert False, "BuildHooksEnv called for NoHooksLUs"
375

    
376

    
377
class Tasklet:
378
  """Tasklet base class.
379

380
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
381
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
382
  tasklets know nothing about locks.
383

384
  Subclasses must follow these rules:
385
    - Implement CheckPrereq
386
    - Implement Exec
387

388
  """
389
  def __init__(self, lu):
390
    self.lu = lu
391

    
392
    # Shortcuts
393
    self.cfg = lu.cfg
394
    self.rpc = lu.rpc
395

    
396
  def CheckPrereq(self):
397
    """Check prerequisites for this tasklets.
398

399
    This method should check whether the prerequisites for the execution of
400
    this tasklet are fulfilled. It can do internode communication, but it
401
    should be idempotent - no cluster or system changes are allowed.
402

403
    The method should raise errors.OpPrereqError in case something is not
404
    fulfilled. Its return value is ignored.
405

406
    This method should also update all parameters to their canonical form if it
407
    hasn't been done before.
408

409
    """
410
    raise NotImplementedError
411

    
412
  def Exec(self, feedback_fn):
413
    """Execute the tasklet.
414

415
    This method should implement the actual work. It should raise
416
    errors.OpExecError for failures that are somewhat dealt with in code, or
417
    expected.
418

419
    """
420
    raise NotImplementedError
421

    
422

    
423
def _GetWantedNodes(lu, nodes):
424
  """Returns list of checked and expanded node names.
425

426
  @type lu: L{LogicalUnit}
427
  @param lu: the logical unit on whose behalf we execute
428
  @type nodes: list
429
  @param nodes: list of node names or None for all nodes
430
  @rtype: list
431
  @return: the list of nodes, sorted
432
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
433

434
  """
435
  if not isinstance(nodes, list):
436
    raise errors.OpPrereqError("Invalid argument type 'nodes'",
437
                               errors.ECODE_INVAL)
438

    
439
  if not nodes:
440
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
441
      " non-empty list of nodes whose name is to be expanded.")
442

    
443
  wanted = [_ExpandNodeName(lu.cfg, name) for name in nodes]
444
  return utils.NiceSort(wanted)
445

    
446

    
447
def _GetWantedInstances(lu, instances):
448
  """Returns list of checked and expanded instance names.
449

450
  @type lu: L{LogicalUnit}
451
  @param lu: the logical unit on whose behalf we execute
452
  @type instances: list
453
  @param instances: list of instance names or None for all instances
454
  @rtype: list
455
  @return: the list of instances, sorted
456
  @raise errors.OpPrereqError: if the instances parameter is wrong type
457
  @raise errors.OpPrereqError: if any of the passed instances is not found
458

459
  """
460
  if not isinstance(instances, list):
461
    raise errors.OpPrereqError("Invalid argument type 'instances'",
462
                               errors.ECODE_INVAL)
463

    
464
  if instances:
465
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
466
  else:
467
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
468
  return wanted
469

    
470

    
471
def _CheckOutputFields(static, dynamic, selected):
472
  """Checks whether all selected fields are valid.
473

474
  @type static: L{utils.FieldSet}
475
  @param static: static fields set
476
  @type dynamic: L{utils.FieldSet}
477
  @param dynamic: dynamic fields set
478

479
  """
480
  f = utils.FieldSet()
481
  f.Extend(static)
482
  f.Extend(dynamic)
483

    
484
  delta = f.NonMatching(selected)
485
  if delta:
486
    raise errors.OpPrereqError("Unknown output fields selected: %s"
487
                               % ",".join(delta), errors.ECODE_INVAL)
488

    
489

    
490
def _CheckBooleanOpField(op, name):
491
  """Validates boolean opcode parameters.
492

493
  This will ensure that an opcode parameter is either a boolean value,
494
  or None (but that it always exists).
495

496
  """
497
  val = getattr(op, name, None)
498
  if not (val is None or isinstance(val, bool)):
499
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
500
                               (name, str(val)), errors.ECODE_INVAL)
501
  setattr(op, name, val)
502

    
503

    
504
def _CheckGlobalHvParams(params):
505
  """Validates that given hypervisor params are not global ones.
506

507
  This will ensure that instances don't get customised versions of
508
  global params.
509

510
  """
511
  used_globals = constants.HVC_GLOBALS.intersection(params)
512
  if used_globals:
513
    msg = ("The following hypervisor parameters are global and cannot"
514
           " be customized at instance level, please modify them at"
515
           " cluster level: %s" % utils.CommaJoin(used_globals))
516
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
517

    
518

    
519
def _CheckNodeOnline(lu, node):
520
  """Ensure that a given node is online.
521

522
  @param lu: the LU on behalf of which we make the check
523
  @param node: the node to check
524
  @raise errors.OpPrereqError: if the node is offline
525

526
  """
527
  if lu.cfg.GetNodeInfo(node).offline:
528
    raise errors.OpPrereqError("Can't use offline node %s" % node,
529
                               errors.ECODE_INVAL)
530

    
531

    
532
def _CheckNodeNotDrained(lu, node):
533
  """Ensure that a given node is not drained.
534

535
  @param lu: the LU on behalf of which we make the check
536
  @param node: the node to check
537
  @raise errors.OpPrereqError: if the node is drained
538

539
  """
540
  if lu.cfg.GetNodeInfo(node).drained:
541
    raise errors.OpPrereqError("Can't use drained node %s" % node,
542
                               errors.ECODE_INVAL)
543

    
544

    
545
def _CheckNodeHasOS(lu, node, os_name, force_variant):
546
  """Ensure that a node supports a given OS.
547

548
  @param lu: the LU on behalf of which we make the check
549
  @param node: the node to check
550
  @param os_name: the OS to query about
551
  @param force_variant: whether to ignore variant errors
552
  @raise errors.OpPrereqError: if the node is not supporting the OS
553

554
  """
555
  result = lu.rpc.call_os_get(node, os_name)
556
  result.Raise("OS '%s' not in supported OS list for node %s" %
557
               (os_name, node),
558
               prereq=True, ecode=errors.ECODE_INVAL)
559
  if not force_variant:
560
    _CheckOSVariant(result.payload, os_name)
561

    
562

    
563
def _CheckDiskTemplate(template):
564
  """Ensure a given disk template is valid.
565

566
  """
567
  if template not in constants.DISK_TEMPLATES:
568
    msg = ("Invalid disk template name '%s', valid templates are: %s" %
569
           (template, utils.CommaJoin(constants.DISK_TEMPLATES)))
570
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
571

    
572

    
573
def _CheckInstanceDown(lu, instance, reason):
574
  """Ensure that an instance is not running."""
575
  if instance.admin_up:
576
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
577
                               (instance.name, reason), errors.ECODE_STATE)
578

    
579
  pnode = instance.primary_node
580
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
581
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
582
              prereq=True, ecode=errors.ECODE_ENVIRON)
583

    
584
  if instance.name in ins_l.payload:
585
    raise errors.OpPrereqError("Instance %s is running, %s" %
586
                               (instance.name, reason), errors.ECODE_STATE)
587

    
588

    
589
def _ExpandItemName(fn, name, kind):
590
  """Expand an item name.
591

592
  @param fn: the function to use for expansion
593
  @param name: requested item name
594
  @param kind: text description ('Node' or 'Instance')
595
  @return: the resolved (full) name
596
  @raise errors.OpPrereqError: if the item is not found
597

598
  """
599
  full_name = fn(name)
600
  if full_name is None:
601
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
602
                               errors.ECODE_NOENT)
603
  return full_name
604

    
605

    
606
def _ExpandNodeName(cfg, name):
607
  """Wrapper over L{_ExpandItemName} for nodes."""
608
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
609

    
610

    
611
def _ExpandInstanceName(cfg, name):
612
  """Wrapper over L{_ExpandItemName} for instance."""
613
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
614

    
615

    
616
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
617
                          memory, vcpus, nics, disk_template, disks,
618
                          bep, hvp, hypervisor_name):
619
  """Builds instance related env variables for hooks
620

621
  This builds the hook environment from individual variables.
622

623
  @type name: string
624
  @param name: the name of the instance
625
  @type primary_node: string
626
  @param primary_node: the name of the instance's primary node
627
  @type secondary_nodes: list
628
  @param secondary_nodes: list of secondary nodes as strings
629
  @type os_type: string
630
  @param os_type: the name of the instance's OS
631
  @type status: boolean
632
  @param status: the should_run status of the instance
633
  @type memory: string
634
  @param memory: the memory size of the instance
635
  @type vcpus: string
636
  @param vcpus: the count of VCPUs the instance has
637
  @type nics: list
638
  @param nics: list of tuples (ip, mac, mode, link) representing
639
      the NICs the instance has
640
  @type disk_template: string
641
  @param disk_template: the disk template of the instance
642
  @type disks: list
643
  @param disks: the list of (size, mode) pairs
644
  @type bep: dict
645
  @param bep: the backend parameters for the instance
646
  @type hvp: dict
647
  @param hvp: the hypervisor parameters for the instance
648
  @type hypervisor_name: string
649
  @param hypervisor_name: the hypervisor for the instance
650
  @rtype: dict
651
  @return: the hook environment for this instance
652

653
  """
654
  if status:
655
    str_status = "up"
656
  else:
657
    str_status = "down"
658
  env = {
659
    "OP_TARGET": name,
660
    "INSTANCE_NAME": name,
661
    "INSTANCE_PRIMARY": primary_node,
662
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
663
    "INSTANCE_OS_TYPE": os_type,
664
    "INSTANCE_STATUS": str_status,
665
    "INSTANCE_MEMORY": memory,
666
    "INSTANCE_VCPUS": vcpus,
667
    "INSTANCE_DISK_TEMPLATE": disk_template,
668
    "INSTANCE_HYPERVISOR": hypervisor_name,
669
  }
670

    
671
  if nics:
672
    nic_count = len(nics)
673
    for idx, (ip, mac, mode, link) in enumerate(nics):
674
      if ip is None:
675
        ip = ""
676
      env["INSTANCE_NIC%d_IP" % idx] = ip
677
      env["INSTANCE_NIC%d_MAC" % idx] = mac
678
      env["INSTANCE_NIC%d_MODE" % idx] = mode
679
      env["INSTANCE_NIC%d_LINK" % idx] = link
680
      if mode == constants.NIC_MODE_BRIDGED:
681
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
682
  else:
683
    nic_count = 0
684

    
685
  env["INSTANCE_NIC_COUNT"] = nic_count
686

    
687
  if disks:
688
    disk_count = len(disks)
689
    for idx, (size, mode) in enumerate(disks):
690
      env["INSTANCE_DISK%d_SIZE" % idx] = size
691
      env["INSTANCE_DISK%d_MODE" % idx] = mode
692
  else:
693
    disk_count = 0
694

    
695
  env["INSTANCE_DISK_COUNT"] = disk_count
696

    
697
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
698
    for key, value in source.items():
699
      env["INSTANCE_%s_%s" % (kind, key)] = value
700

    
701
  return env
702

    
703

    
704
def _NICListToTuple(lu, nics):
705
  """Build a list of nic information tuples.
706

707
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
708
  value in LUQueryInstanceData.
709

710
  @type lu:  L{LogicalUnit}
711
  @param lu: the logical unit on whose behalf we execute
712
  @type nics: list of L{objects.NIC}
713
  @param nics: list of nics to convert to hooks tuples
714

715
  """
716
  hooks_nics = []
717
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
718
  for nic in nics:
719
    ip = nic.ip
720
    mac = nic.mac
721
    filled_params = objects.FillDict(c_nicparams, nic.nicparams)
722
    mode = filled_params[constants.NIC_MODE]
723
    link = filled_params[constants.NIC_LINK]
724
    hooks_nics.append((ip, mac, mode, link))
725
  return hooks_nics
726

    
727

    
728
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
729
  """Builds instance related env variables for hooks from an object.
730

731
  @type lu: L{LogicalUnit}
732
  @param lu: the logical unit on whose behalf we execute
733
  @type instance: L{objects.Instance}
734
  @param instance: the instance for which we should build the
735
      environment
736
  @type override: dict
737
  @param override: dictionary with key/values that will override
738
      our values
739
  @rtype: dict
740
  @return: the hook environment dictionary
741

742
  """
743
  cluster = lu.cfg.GetClusterInfo()
744
  bep = cluster.FillBE(instance)
745
  hvp = cluster.FillHV(instance)
746
  args = {
747
    'name': instance.name,
748
    'primary_node': instance.primary_node,
749
    'secondary_nodes': instance.secondary_nodes,
750
    'os_type': instance.os,
751
    'status': instance.admin_up,
752
    'memory': bep[constants.BE_MEMORY],
753
    'vcpus': bep[constants.BE_VCPUS],
754
    'nics': _NICListToTuple(lu, instance.nics),
755
    'disk_template': instance.disk_template,
756
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
757
    'bep': bep,
758
    'hvp': hvp,
759
    'hypervisor_name': instance.hypervisor,
760
  }
761
  if override:
762
    args.update(override)
763
  return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
764

    
765

    
766
def _AdjustCandidatePool(lu, exceptions):
767
  """Adjust the candidate pool after node operations.
768

769
  """
770
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
771
  if mod_list:
772
    lu.LogInfo("Promoted nodes to master candidate role: %s",
773
               utils.CommaJoin(node.name for node in mod_list))
774
    for name in mod_list:
775
      lu.context.ReaddNode(name)
776
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
777
  if mc_now > mc_max:
778
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
779
               (mc_now, mc_max))
780

    
781

    
782
def _DecideSelfPromotion(lu, exceptions=None):
783
  """Decide whether I should promote myself as a master candidate.
784

785
  """
786
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
787
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
788
  # the new node will increase mc_max with one, so:
789
  mc_should = min(mc_should + 1, cp_size)
790
  return mc_now < mc_should
791

    
792

    
793
def _CheckNicsBridgesExist(lu, target_nics, target_node,
794
                               profile=constants.PP_DEFAULT):
795
  """Check that the brigdes needed by a list of nics exist.
796

797
  """
798
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
799
  paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
800
                for nic in target_nics]
801
  brlist = [params[constants.NIC_LINK] for params in paramslist
802
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
803
  if brlist:
804
    result = lu.rpc.call_bridges_exist(target_node, brlist)
805
    result.Raise("Error checking bridges on destination node '%s'" %
806
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
807

    
808

    
809
def _CheckInstanceBridgesExist(lu, instance, node=None):
810
  """Check that the brigdes needed by an instance exist.
811

812
  """
813
  if node is None:
814
    node = instance.primary_node
815
  _CheckNicsBridgesExist(lu, instance.nics, node)
816

    
817

    
818
def _CheckOSVariant(os_obj, name):
819
  """Check whether an OS name conforms to the os variants specification.
820

821
  @type os_obj: L{objects.OS}
822
  @param os_obj: OS object to check
823
  @type name: string
824
  @param name: OS name passed by the user, to check for validity
825

826
  """
827
  if not os_obj.supported_variants:
828
    return
829
  try:
830
    variant = name.split("+", 1)[1]
831
  except IndexError:
832
    raise errors.OpPrereqError("OS name must include a variant",
833
                               errors.ECODE_INVAL)
834

    
835
  if variant not in os_obj.supported_variants:
836
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
837

    
838

    
839
def _GetNodeInstancesInner(cfg, fn):
840
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
841

    
842

    
843
def _GetNodeInstances(cfg, node_name):
844
  """Returns a list of all primary and secondary instances on a node.
845

846
  """
847

    
848
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
849

    
850

    
851
def _GetNodePrimaryInstances(cfg, node_name):
852
  """Returns primary instances on a node.
853

854
  """
855
  return _GetNodeInstancesInner(cfg,
856
                                lambda inst: node_name == inst.primary_node)
857

    
858

    
859
def _GetNodeSecondaryInstances(cfg, node_name):
860
  """Returns secondary instances on a node.
861

862
  """
863
  return _GetNodeInstancesInner(cfg,
864
                                lambda inst: node_name in inst.secondary_nodes)
865

    
866

    
867
def _GetStorageTypeArgs(cfg, storage_type):
868
  """Returns the arguments for a storage type.
869

870
  """
871
  # Special case for file storage
872
  if storage_type == constants.ST_FILE:
873
    # storage.FileStorage wants a list of storage directories
874
    return [[cfg.GetFileStorageDir()]]
875

    
876
  return []
877

    
878

    
879
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
880
  faulty = []
881

    
882
  for dev in instance.disks:
883
    cfg.SetDiskID(dev, node_name)
884

    
885
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
886
  result.Raise("Failed to get disk status from node %s" % node_name,
887
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
888

    
889
  for idx, bdev_status in enumerate(result.payload):
890
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
891
      faulty.append(idx)
892

    
893
  return faulty
894

    
895

    
896
def _FormatTimestamp(secs):
897
  """Formats a Unix timestamp with the local timezone.
898

899
  """
900
  return time.strftime("%F %T %Z", time.gmtime(secs))
901

    
902

    
903
class LUPostInitCluster(LogicalUnit):
904
  """Logical unit for running hooks after cluster initialization.
905

906
  """
907
  HPATH = "cluster-init"
908
  HTYPE = constants.HTYPE_CLUSTER
909
  _OP_REQP = []
910

    
911
  def BuildHooksEnv(self):
912
    """Build hooks env.
913

914
    """
915
    env = {"OP_TARGET": self.cfg.GetClusterName()}
916
    mn = self.cfg.GetMasterNode()
917
    return env, [], [mn]
918

    
919
  def CheckPrereq(self):
920
    """No prerequisites to check.
921

922
    """
923
    return True
924

    
925
  def Exec(self, feedback_fn):
926
    """Nothing to do.
927

928
    """
929
    return True
930

    
931

    
932
class LUDestroyCluster(LogicalUnit):
933
  """Logical unit for destroying the cluster.
934

935
  """
936
  HPATH = "cluster-destroy"
937
  HTYPE = constants.HTYPE_CLUSTER
938
  _OP_REQP = []
939

    
940
  def BuildHooksEnv(self):
941
    """Build hooks env.
942

943
    """
944
    env = {"OP_TARGET": self.cfg.GetClusterName()}
945
    return env, [], []
946

    
947
  def CheckPrereq(self):
948
    """Check prerequisites.
949

950
    This checks whether the cluster is empty.
951

952
    Any errors are signaled by raising errors.OpPrereqError.
953

954
    """
955
    master = self.cfg.GetMasterNode()
956

    
957
    nodelist = self.cfg.GetNodeList()
958
    if len(nodelist) != 1 or nodelist[0] != master:
959
      raise errors.OpPrereqError("There are still %d node(s) in"
960
                                 " this cluster." % (len(nodelist) - 1),
961
                                 errors.ECODE_INVAL)
962
    instancelist = self.cfg.GetInstanceList()
963
    if instancelist:
964
      raise errors.OpPrereqError("There are still %d instance(s) in"
965
                                 " this cluster." % len(instancelist),
966
                                 errors.ECODE_INVAL)
967

    
968
  def Exec(self, feedback_fn):
969
    """Destroys the cluster.
970

971
    """
972
    master = self.cfg.GetMasterNode()
973
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
974

    
975
    # Run post hooks on master node before it's removed
976
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
977
    try:
978
      hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
979
    except:
980
      # pylint: disable-msg=W0702
981
      self.LogWarning("Errors occurred running hooks on %s" % master)
982

    
983
    result = self.rpc.call_node_stop_master(master, False)
984
    result.Raise("Could not disable the master role")
985

    
986
    if modify_ssh_setup:
987
      priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
988
      utils.CreateBackup(priv_key)
989
      utils.CreateBackup(pub_key)
990

    
991
    return master
992

    
993

    
994
def _VerifyCertificateInner(filename, expired, not_before, not_after, now,
995
                            warn_days=constants.SSL_CERT_EXPIRATION_WARN,
996
                            error_days=constants.SSL_CERT_EXPIRATION_ERROR):
997
  """Verifies certificate details for LUVerifyCluster.
998

999
  """
1000
  if expired:
1001
    msg = "Certificate %s is expired" % filename
1002

    
1003
    if not_before is not None and not_after is not None:
1004
      msg += (" (valid from %s to %s)" %
1005
              (_FormatTimestamp(not_before),
1006
               _FormatTimestamp(not_after)))
1007
    elif not_before is not None:
1008
      msg += " (valid from %s)" % _FormatTimestamp(not_before)
1009
    elif not_after is not None:
1010
      msg += " (valid until %s)" % _FormatTimestamp(not_after)
1011

    
1012
    return (LUVerifyCluster.ETYPE_ERROR, msg)
1013

    
1014
  elif not_before is not None and not_before > now:
1015
    return (LUVerifyCluster.ETYPE_WARNING,
1016
            "Certificate %s not yet valid (valid from %s)" %
1017
            (filename, _FormatTimestamp(not_before)))
1018

    
1019
  elif not_after is not None:
1020
    remaining_days = int((not_after - now) / (24 * 3600))
1021

    
1022
    msg = ("Certificate %s expires in %d days" % (filename, remaining_days))
1023

    
1024
    if remaining_days <= error_days:
1025
      return (LUVerifyCluster.ETYPE_ERROR, msg)
1026

    
1027
    if remaining_days <= warn_days:
1028
      return (LUVerifyCluster.ETYPE_WARNING, msg)
1029

    
1030
  return (None, None)
1031

    
1032

    
1033
def _VerifyCertificate(filename):
1034
  """Verifies a certificate for LUVerifyCluster.
1035

1036
  @type filename: string
1037
  @param filename: Path to PEM file
1038

1039
  """
1040
  try:
1041
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1042
                                           utils.ReadFile(filename))
1043
  except Exception, err: # pylint: disable-msg=W0703
1044
    return (LUVerifyCluster.ETYPE_ERROR,
1045
            "Failed to load X509 certificate %s: %s" % (filename, err))
1046

    
1047
  # Depending on the pyOpenSSL version, this can just return (None, None)
1048
  (not_before, not_after) = utils.GetX509CertValidity(cert)
1049

    
1050
  return _VerifyCertificateInner(filename, cert.has_expired(),
1051
                                 not_before, not_after, time.time())
1052

    
1053

    
1054
class LUVerifyCluster(LogicalUnit):
1055
  """Verifies the cluster status.
1056

1057
  """
1058
  HPATH = "cluster-verify"
1059
  HTYPE = constants.HTYPE_CLUSTER
1060
  _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
1061
  REQ_BGL = False
1062

    
1063
  TCLUSTER = "cluster"
1064
  TNODE = "node"
1065
  TINSTANCE = "instance"
1066

    
1067
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
1068
  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
1069
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
1070
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
1071
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
1072
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1073
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1074
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
1075
  ENODEDRBD = (TNODE, "ENODEDRBD")
1076
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
1077
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
1078
  ENODEHV = (TNODE, "ENODEHV")
1079
  ENODELVM = (TNODE, "ENODELVM")
1080
  ENODEN1 = (TNODE, "ENODEN1")
1081
  ENODENET = (TNODE, "ENODENET")
1082
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
1083
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
1084
  ENODERPC = (TNODE, "ENODERPC")
1085
  ENODESSH = (TNODE, "ENODESSH")
1086
  ENODEVERSION = (TNODE, "ENODEVERSION")
1087
  ENODESETUP = (TNODE, "ENODESETUP")
1088
  ENODETIME = (TNODE, "ENODETIME")
1089

    
1090
  ETYPE_FIELD = "code"
1091
  ETYPE_ERROR = "ERROR"
1092
  ETYPE_WARNING = "WARNING"
1093

    
1094
  def ExpandNames(self):
1095
    self.needed_locks = {
1096
      locking.LEVEL_NODE: locking.ALL_SET,
1097
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1098
    }
1099
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1100

    
1101
  def _Error(self, ecode, item, msg, *args, **kwargs):
1102
    """Format an error message.
1103

1104
    Based on the opcode's error_codes parameter, either format a
1105
    parseable error code, or a simpler error string.
1106

1107
    This must be called only from Exec and functions called from Exec.
1108

1109
    """
1110
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1111
    itype, etxt = ecode
1112
    # first complete the msg
1113
    if args:
1114
      msg = msg % args
1115
    # then format the whole message
1116
    if self.op.error_codes:
1117
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1118
    else:
1119
      if item:
1120
        item = " " + item
1121
      else:
1122
        item = ""
1123
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1124
    # and finally report it via the feedback_fn
1125
    self._feedback_fn("  - %s" % msg)
1126

    
1127
  def _ErrorIf(self, cond, *args, **kwargs):
1128
    """Log an error message if the passed condition is True.
1129

1130
    """
1131
    cond = bool(cond) or self.op.debug_simulate_errors
1132
    if cond:
1133
      self._Error(*args, **kwargs)
1134
    # do not mark the operation as failed for WARN cases only
1135
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1136
      self.bad = self.bad or cond
1137

    
1138
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
1139
                  node_result, master_files, drbd_map, vg_name):
1140
    """Run multiple tests against a node.
1141

1142
    Test list:
1143

1144
      - compares ganeti version
1145
      - checks vg existence and size > 20G
1146
      - checks config file checksum
1147
      - checks ssh to other nodes
1148

1149
    @type nodeinfo: L{objects.Node}
1150
    @param nodeinfo: the node to check
1151
    @param file_list: required list of files
1152
    @param local_cksum: dictionary of local files and their checksums
1153
    @param node_result: the results from the node
1154
    @param master_files: list of files that only masters should have
1155
    @param drbd_map: the useddrbd minors for this node, in
1156
        form of minor: (instance, must_exist) which correspond to instances
1157
        and their running status
1158
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
1159

1160
    """
1161
    node = nodeinfo.name
1162
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1163

    
1164
    # main result, node_result should be a non-empty dict
1165
    test = not node_result or not isinstance(node_result, dict)
1166
    _ErrorIf(test, self.ENODERPC, node,
1167
                  "unable to verify node: no data returned")
1168
    if test:
1169
      return
1170

    
1171
    # compares ganeti version
1172
    local_version = constants.PROTOCOL_VERSION
1173
    remote_version = node_result.get('version', None)
1174
    test = not (remote_version and
1175
                isinstance(remote_version, (list, tuple)) and
1176
                len(remote_version) == 2)
1177
    _ErrorIf(test, self.ENODERPC, node,
1178
             "connection to node returned invalid data")
1179
    if test:
1180
      return
1181

    
1182
    test = local_version != remote_version[0]
1183
    _ErrorIf(test, self.ENODEVERSION, node,
1184
             "incompatible protocol versions: master %s,"
1185
             " node %s", local_version, remote_version[0])
1186
    if test:
1187
      return
1188

    
1189
    # node seems compatible, we can actually try to look into its results
1190

    
1191
    # full package version
1192
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1193
                  self.ENODEVERSION, node,
1194
                  "software version mismatch: master %s, node %s",
1195
                  constants.RELEASE_VERSION, remote_version[1],
1196
                  code=self.ETYPE_WARNING)
1197

    
1198
    # checks vg existence and size > 20G
1199
    if vg_name is not None:
1200
      vglist = node_result.get(constants.NV_VGLIST, None)
1201
      test = not vglist
1202
      _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1203
      if not test:
1204
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1205
                                              constants.MIN_VG_SIZE)
1206
        _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1207

    
1208
    # checks config file checksum
1209

    
1210
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
1211
    test = not isinstance(remote_cksum, dict)
1212
    _ErrorIf(test, self.ENODEFILECHECK, node,
1213
             "node hasn't returned file checksum data")
1214
    if not test:
1215
      for file_name in file_list:
1216
        node_is_mc = nodeinfo.master_candidate
1217
        must_have = (file_name not in master_files) or node_is_mc
1218
        # missing
1219
        test1 = file_name not in remote_cksum
1220
        # invalid checksum
1221
        test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1222
        # existing and good
1223
        test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1224
        _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1225
                 "file '%s' missing", file_name)
1226
        _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1227
                 "file '%s' has wrong checksum", file_name)
1228
        # not candidate and this is not a must-have file
1229
        _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1230
                 "file '%s' should not exist on non master"
1231
                 " candidates (and the file is outdated)", file_name)
1232
        # all good, except non-master/non-must have combination
1233
        _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1234
                 "file '%s' should not exist"
1235
                 " on non master candidates", file_name)
1236

    
1237
    # checks ssh to any
1238

    
1239
    test = constants.NV_NODELIST not in node_result
1240
    _ErrorIf(test, self.ENODESSH, node,
1241
             "node hasn't returned node ssh connectivity data")
1242
    if not test:
1243
      if node_result[constants.NV_NODELIST]:
1244
        for a_node, a_msg in node_result[constants.NV_NODELIST].items():
1245
          _ErrorIf(True, self.ENODESSH, node,
1246
                   "ssh communication with node '%s': %s", a_node, a_msg)
1247

    
1248
    test = constants.NV_NODENETTEST not in node_result
1249
    _ErrorIf(test, self.ENODENET, node,
1250
             "node hasn't returned node tcp connectivity data")
1251
    if not test:
1252
      if node_result[constants.NV_NODENETTEST]:
1253
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
1254
        for anode in nlist:
1255
          _ErrorIf(True, self.ENODENET, node,
1256
                   "tcp communication with node '%s': %s",
1257
                   anode, node_result[constants.NV_NODENETTEST][anode])
1258

    
1259
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
1260
    if isinstance(hyp_result, dict):
1261
      for hv_name, hv_result in hyp_result.iteritems():
1262
        test = hv_result is not None
1263
        _ErrorIf(test, self.ENODEHV, node,
1264
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1265

    
1266
    # check used drbd list
1267
    if vg_name is not None:
1268
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
1269
      test = not isinstance(used_minors, (tuple, list))
1270
      _ErrorIf(test, self.ENODEDRBD, node,
1271
               "cannot parse drbd status file: %s", str(used_minors))
1272
      if not test:
1273
        for minor, (iname, must_exist) in drbd_map.items():
1274
          test = minor not in used_minors and must_exist
1275
          _ErrorIf(test, self.ENODEDRBD, node,
1276
                   "drbd minor %d of instance %s is not active",
1277
                   minor, iname)
1278
        for minor in used_minors:
1279
          test = minor not in drbd_map
1280
          _ErrorIf(test, self.ENODEDRBD, node,
1281
                   "unallocated drbd minor %d is in use", minor)
1282
    test = node_result.get(constants.NV_NODESETUP,
1283
                           ["Missing NODESETUP results"])
1284
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1285
             "; ".join(test))
1286

    
1287
    # check pv names
1288
    if vg_name is not None:
1289
      pvlist = node_result.get(constants.NV_PVLIST, None)
1290
      test = pvlist is None
1291
      _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1292
      if not test:
1293
        # check that ':' is not present in PV names, since it's a
1294
        # special character for lvcreate (denotes the range of PEs to
1295
        # use on the PV)
1296
        for _, pvname, owner_vg in pvlist:
1297
          test = ":" in pvname
1298
          _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1299
                   " '%s' of VG '%s'", pvname, owner_vg)
1300

    
1301
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1302
                      node_instance, n_offline):
1303
    """Verify an instance.
1304

1305
    This function checks to see if the required block devices are
1306
    available on the instance's node.
1307

1308
    """
1309
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1310
    node_current = instanceconfig.primary_node
1311

    
1312
    node_vol_should = {}
1313
    instanceconfig.MapLVsByNode(node_vol_should)
1314

    
1315
    for node in node_vol_should:
1316
      if node in n_offline:
1317
        # ignore missing volumes on offline nodes
1318
        continue
1319
      for volume in node_vol_should[node]:
1320
        test = node not in node_vol_is or volume not in node_vol_is[node]
1321
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1322
                 "volume %s missing on node %s", volume, node)
1323

    
1324
    if instanceconfig.admin_up:
1325
      test = ((node_current not in node_instance or
1326
               not instance in node_instance[node_current]) and
1327
              node_current not in n_offline)
1328
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1329
               "instance not running on its primary node %s",
1330
               node_current)
1331

    
1332
    for node in node_instance:
1333
      if (not node == node_current):
1334
        test = instance in node_instance[node]
1335
        _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1336
                 "instance should not run on node %s", node)
1337

    
1338
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
1339
    """Verify if there are any unknown volumes in the cluster.
1340

1341
    The .os, .swap and backup volumes are ignored. All other volumes are
1342
    reported as unknown.
1343

1344
    """
1345
    for node in node_vol_is:
1346
      for volume in node_vol_is[node]:
1347
        test = (node not in node_vol_should or
1348
                volume not in node_vol_should[node])
1349
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1350
                      "volume %s is unknown", volume)
1351

    
1352
  def _VerifyOrphanInstances(self, instancelist, node_instance):
1353
    """Verify the list of running instances.
1354

1355
    This checks what instances are running but unknown to the cluster.
1356

1357
    """
1358
    for node in node_instance:
1359
      for o_inst in node_instance[node]:
1360
        test = o_inst not in instancelist
1361
        self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1362
                      "instance %s on node %s should not exist", o_inst, node)
1363

    
1364
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
1365
    """Verify N+1 Memory Resilience.
1366

1367
    Check that if one single node dies we can still start all the instances it
1368
    was primary for.
1369

1370
    """
1371
    for node, nodeinfo in node_info.iteritems():
1372
      # This code checks that every node which is now listed as secondary has
1373
      # enough memory to host all instances it is supposed to should a single
1374
      # other node in the cluster fail.
1375
      # FIXME: not ready for failover to an arbitrary node
1376
      # FIXME: does not support file-backed instances
1377
      # WARNING: we currently take into account down instances as well as up
1378
      # ones, considering that even if they're down someone might want to start
1379
      # them even in the event of a node failure.
1380
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1381
        needed_mem = 0
1382
        for instance in instances:
1383
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1384
          if bep[constants.BE_AUTO_BALANCE]:
1385
            needed_mem += bep[constants.BE_MEMORY]
1386
        test = nodeinfo['mfree'] < needed_mem
1387
        self._ErrorIf(test, self.ENODEN1, node,
1388
                      "not enough memory on to accommodate"
1389
                      " failovers should peer node %s fail", prinode)
1390

    
1391
  def CheckPrereq(self):
1392
    """Check prerequisites.
1393

1394
    Transform the list of checks we're going to skip into a set and check that
1395
    all its members are valid.
1396

1397
    """
1398
    self.skip_set = frozenset(self.op.skip_checks)
1399
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1400
      raise errors.OpPrereqError("Invalid checks to be skipped specified",
1401
                                 errors.ECODE_INVAL)
1402

    
1403
  def BuildHooksEnv(self):
1404
    """Build hooks env.
1405

1406
    Cluster-Verify hooks just ran in the post phase and their failure makes
1407
    the output be logged in the verify output and the verification to fail.
1408

1409
    """
1410
    all_nodes = self.cfg.GetNodeList()
1411
    env = {
1412
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1413
      }
1414
    for node in self.cfg.GetAllNodesInfo().values():
1415
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1416

    
1417
    return env, [], all_nodes
1418

    
1419
  def Exec(self, feedback_fn):
1420
    """Verify integrity of cluster, performing various test on nodes.
1421

1422
    """
1423
    self.bad = False
1424
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1425
    verbose = self.op.verbose
1426
    self._feedback_fn = feedback_fn
1427
    feedback_fn("* Verifying global settings")
1428
    for msg in self.cfg.VerifyConfig():
1429
      _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1430

    
1431
    # Check the cluster certificates
1432
    for cert_filename in constants.ALL_CERT_FILES:
1433
      (errcode, msg) = _VerifyCertificate(cert_filename)
1434
      _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
1435

    
1436
    vg_name = self.cfg.GetVGName()
1437
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1438
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
1439
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1440
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1441
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1442
                        for iname in instancelist)
1443
    i_non_redundant = [] # Non redundant instances
1444
    i_non_a_balanced = [] # Non auto-balanced instances
1445
    n_offline = [] # List of offline nodes
1446
    n_drained = [] # List of nodes being drained
1447
    node_volume = {}
1448
    node_instance = {}
1449
    node_info = {}
1450
    instance_cfg = {}
1451

    
1452
    # FIXME: verify OS list
1453
    # do local checksums
1454
    master_files = [constants.CLUSTER_CONF_FILE]
1455

    
1456
    file_names = ssconf.SimpleStore().GetFileList()
1457
    file_names.extend(constants.ALL_CERT_FILES)
1458
    file_names.extend(master_files)
1459

    
1460
    local_checksums = utils.FingerprintFiles(file_names)
1461

    
1462
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1463
    node_verify_param = {
1464
      constants.NV_FILELIST: file_names,
1465
      constants.NV_NODELIST: [node.name for node in nodeinfo
1466
                              if not node.offline],
1467
      constants.NV_HYPERVISOR: hypervisors,
1468
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1469
                                  node.secondary_ip) for node in nodeinfo
1470
                                 if not node.offline],
1471
      constants.NV_INSTANCELIST: hypervisors,
1472
      constants.NV_VERSION: None,
1473
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1474
      constants.NV_NODESETUP: None,
1475
      constants.NV_TIME: None,
1476
      }
1477

    
1478
    if vg_name is not None:
1479
      node_verify_param[constants.NV_VGLIST] = None
1480
      node_verify_param[constants.NV_LVLIST] = vg_name
1481
      node_verify_param[constants.NV_PVLIST] = [vg_name]
1482
      node_verify_param[constants.NV_DRBDLIST] = None
1483

    
1484
    # Due to the way our RPC system works, exact response times cannot be
1485
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
1486
    # time before and after executing the request, we can at least have a time
1487
    # window.
1488
    nvinfo_starttime = time.time()
1489
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1490
                                           self.cfg.GetClusterName())
1491
    nvinfo_endtime = time.time()
1492

    
1493
    cluster = self.cfg.GetClusterInfo()
1494
    master_node = self.cfg.GetMasterNode()
1495
    all_drbd_map = self.cfg.ComputeDRBDMap()
1496

    
1497
    feedback_fn("* Verifying node status")
1498
    for node_i in nodeinfo:
1499
      node = node_i.name
1500

    
1501
      if node_i.offline:
1502
        if verbose:
1503
          feedback_fn("* Skipping offline node %s" % (node,))
1504
        n_offline.append(node)
1505
        continue
1506

    
1507
      if node == master_node:
1508
        ntype = "master"
1509
      elif node_i.master_candidate:
1510
        ntype = "master candidate"
1511
      elif node_i.drained:
1512
        ntype = "drained"
1513
        n_drained.append(node)
1514
      else:
1515
        ntype = "regular"
1516
      if verbose:
1517
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1518

    
1519
      msg = all_nvinfo[node].fail_msg
1520
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1521
      if msg:
1522
        continue
1523

    
1524
      nresult = all_nvinfo[node].payload
1525
      node_drbd = {}
1526
      for minor, instance in all_drbd_map[node].items():
1527
        test = instance not in instanceinfo
1528
        _ErrorIf(test, self.ECLUSTERCFG, None,
1529
                 "ghost instance '%s' in temporary DRBD map", instance)
1530
          # ghost instance should not be running, but otherwise we
1531
          # don't give double warnings (both ghost instance and
1532
          # unallocated minor in use)
1533
        if test:
1534
          node_drbd[minor] = (instance, False)
1535
        else:
1536
          instance = instanceinfo[instance]
1537
          node_drbd[minor] = (instance.name, instance.admin_up)
1538

    
1539
      self._VerifyNode(node_i, file_names, local_checksums,
1540
                       nresult, master_files, node_drbd, vg_name)
1541

    
1542
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1543
      if vg_name is None:
1544
        node_volume[node] = {}
1545
      elif isinstance(lvdata, basestring):
1546
        _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1547
                 utils.SafeEncode(lvdata))
1548
        node_volume[node] = {}
1549
      elif not isinstance(lvdata, dict):
1550
        _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1551
        continue
1552
      else:
1553
        node_volume[node] = lvdata
1554

    
1555
      # node_instance
1556
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1557
      test = not isinstance(idata, list)
1558
      _ErrorIf(test, self.ENODEHV, node,
1559
               "rpc call to node failed (instancelist): %s",
1560
               utils.SafeEncode(str(idata)))
1561
      if test:
1562
        continue
1563

    
1564
      node_instance[node] = idata
1565

    
1566
      # node_info
1567
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1568
      test = not isinstance(nodeinfo, dict)
1569
      _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1570
      if test:
1571
        continue
1572

    
1573
      # Node time
1574
      ntime = nresult.get(constants.NV_TIME, None)
1575
      try:
1576
        ntime_merged = utils.MergeTime(ntime)
1577
      except (ValueError, TypeError):
1578
        _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1579

    
1580
      if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1581
        ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1582
      elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1583
        ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1584
      else:
1585
        ntime_diff = None
1586

    
1587
      _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1588
               "Node time diverges by at least %s from master node time",
1589
               ntime_diff)
1590

    
1591
      if ntime_diff is not None:
1592
        continue
1593

    
1594
      try:
1595
        node_info[node] = {
1596
          "mfree": int(nodeinfo['memory_free']),
1597
          "pinst": [],
1598
          "sinst": [],
1599
          # dictionary holding all instances this node is secondary for,
1600
          # grouped by their primary node. Each key is a cluster node, and each
1601
          # value is a list of instances which have the key as primary and the
1602
          # current node as secondary.  this is handy to calculate N+1 memory
1603
          # availability if you can only failover from a primary to its
1604
          # secondary.
1605
          "sinst-by-pnode": {},
1606
        }
1607
        # FIXME: devise a free space model for file based instances as well
1608
        if vg_name is not None:
1609
          test = (constants.NV_VGLIST not in nresult or
1610
                  vg_name not in nresult[constants.NV_VGLIST])
1611
          _ErrorIf(test, self.ENODELVM, node,
1612
                   "node didn't return data for the volume group '%s'"
1613
                   " - it is either missing or broken", vg_name)
1614
          if test:
1615
            continue
1616
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1617
      except (ValueError, KeyError):
1618
        _ErrorIf(True, self.ENODERPC, node,
1619
                 "node returned invalid nodeinfo, check lvm/hypervisor")
1620
        continue
1621

    
1622
    node_vol_should = {}
1623

    
1624
    feedback_fn("* Verifying instance status")
1625
    for instance in instancelist:
1626
      if verbose:
1627
        feedback_fn("* Verifying instance %s" % instance)
1628
      inst_config = instanceinfo[instance]
1629
      self._VerifyInstance(instance, inst_config, node_volume,
1630
                           node_instance, n_offline)
1631
      inst_nodes_offline = []
1632

    
1633
      inst_config.MapLVsByNode(node_vol_should)
1634

    
1635
      instance_cfg[instance] = inst_config
1636

    
1637
      pnode = inst_config.primary_node
1638
      _ErrorIf(pnode not in node_info and pnode not in n_offline,
1639
               self.ENODERPC, pnode, "instance %s, connection to"
1640
               " primary node failed", instance)
1641
      if pnode in node_info:
1642
        node_info[pnode]['pinst'].append(instance)
1643

    
1644
      if pnode in n_offline:
1645
        inst_nodes_offline.append(pnode)
1646

    
1647
      # If the instance is non-redundant we cannot survive losing its primary
1648
      # node, so we are not N+1 compliant. On the other hand we have no disk
1649
      # templates with more than one secondary so that situation is not well
1650
      # supported either.
1651
      # FIXME: does not support file-backed instances
1652
      if len(inst_config.secondary_nodes) == 0:
1653
        i_non_redundant.append(instance)
1654
      _ErrorIf(len(inst_config.secondary_nodes) > 1,
1655
               self.EINSTANCELAYOUT, instance,
1656
               "instance has multiple secondary nodes", code="WARNING")
1657

    
1658
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1659
        i_non_a_balanced.append(instance)
1660

    
1661
      for snode in inst_config.secondary_nodes:
1662
        _ErrorIf(snode not in node_info and snode not in n_offline,
1663
                 self.ENODERPC, snode,
1664
                 "instance %s, connection to secondary node"
1665
                 " failed", instance)
1666

    
1667
        if snode in node_info:
1668
          node_info[snode]['sinst'].append(instance)
1669
          if pnode not in node_info[snode]['sinst-by-pnode']:
1670
            node_info[snode]['sinst-by-pnode'][pnode] = []
1671
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1672

    
1673
        if snode in n_offline:
1674
          inst_nodes_offline.append(snode)
1675

    
1676
      # warn that the instance lives on offline nodes
1677
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
1678
               "instance lives on offline node(s) %s",
1679
               utils.CommaJoin(inst_nodes_offline))
1680

    
1681
    feedback_fn("* Verifying orphan volumes")
1682
    self._VerifyOrphanVolumes(node_vol_should, node_volume)
1683

    
1684
    feedback_fn("* Verifying remaining instances")
1685
    self._VerifyOrphanInstances(instancelist, node_instance)
1686

    
1687
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1688
      feedback_fn("* Verifying N+1 Memory redundancy")
1689
      self._VerifyNPlusOneMemory(node_info, instance_cfg)
1690

    
1691
    feedback_fn("* Other Notes")
1692
    if i_non_redundant:
1693
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1694
                  % len(i_non_redundant))
1695

    
1696
    if i_non_a_balanced:
1697
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1698
                  % len(i_non_a_balanced))
1699

    
1700
    if n_offline:
1701
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1702

    
1703
    if n_drained:
1704
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1705

    
1706
    return not self.bad
1707

    
1708
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1709
    """Analyze the post-hooks' result
1710

1711
    This method analyses the hook result, handles it, and sends some
1712
    nicely-formatted feedback back to the user.
1713

1714
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1715
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1716
    @param hooks_results: the results of the multi-node hooks rpc call
1717
    @param feedback_fn: function used send feedback back to the caller
1718
    @param lu_result: previous Exec result
1719
    @return: the new Exec result, based on the previous result
1720
        and hook results
1721

1722
    """
1723
    # We only really run POST phase hooks, and are only interested in
1724
    # their results
1725
    if phase == constants.HOOKS_PHASE_POST:
1726
      # Used to change hooks' output to proper indentation
1727
      indent_re = re.compile('^', re.M)
1728
      feedback_fn("* Hooks Results")
1729
      assert hooks_results, "invalid result from hooks"
1730

    
1731
      for node_name in hooks_results:
1732
        res = hooks_results[node_name]
1733
        msg = res.fail_msg
1734
        test = msg and not res.offline
1735
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
1736
                      "Communication failure in hooks execution: %s", msg)
1737
        if res.offline or msg:
1738
          # No need to investigate payload if node is offline or gave an error.
1739
          # override manually lu_result here as _ErrorIf only
1740
          # overrides self.bad
1741
          lu_result = 1
1742
          continue
1743
        for script, hkr, output in res.payload:
1744
          test = hkr == constants.HKR_FAIL
1745
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
1746
                        "Script %s failed, output:", script)
1747
          if test:
1748
            output = indent_re.sub('      ', output)
1749
            feedback_fn("%s" % output)
1750
            lu_result = 0
1751

    
1752
      return lu_result
1753

    
1754

    
1755
class LUVerifyDisks(NoHooksLU):
1756
  """Verifies the cluster disks status.
1757

1758
  """
1759
  _OP_REQP = []
1760
  REQ_BGL = False
1761

    
1762
  def ExpandNames(self):
1763
    self.needed_locks = {
1764
      locking.LEVEL_NODE: locking.ALL_SET,
1765
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1766
    }
1767
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1768

    
1769
  def CheckPrereq(self):
1770
    """Check prerequisites.
1771

1772
    This has no prerequisites.
1773

1774
    """
1775
    pass
1776

    
1777
  def Exec(self, feedback_fn):
1778
    """Verify integrity of cluster disks.
1779

1780
    @rtype: tuple of three items
1781
    @return: a tuple of (dict of node-to-node_error, list of instances
1782
        which need activate-disks, dict of instance: (node, volume) for
1783
        missing volumes
1784

1785
    """
1786
    result = res_nodes, res_instances, res_missing = {}, [], {}
1787

    
1788
    vg_name = self.cfg.GetVGName()
1789
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1790
    instances = [self.cfg.GetInstanceInfo(name)
1791
                 for name in self.cfg.GetInstanceList()]
1792

    
1793
    nv_dict = {}
1794
    for inst in instances:
1795
      inst_lvs = {}
1796
      if (not inst.admin_up or
1797
          inst.disk_template not in constants.DTS_NET_MIRROR):
1798
        continue
1799
      inst.MapLVsByNode(inst_lvs)
1800
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1801
      for node, vol_list in inst_lvs.iteritems():
1802
        for vol in vol_list:
1803
          nv_dict[(node, vol)] = inst
1804

    
1805
    if not nv_dict:
1806
      return result
1807

    
1808
    node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1809

    
1810
    for node in nodes:
1811
      # node_volume
1812
      node_res = node_lvs[node]
1813
      if node_res.offline:
1814
        continue
1815
      msg = node_res.fail_msg
1816
      if msg:
1817
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1818
        res_nodes[node] = msg
1819
        continue
1820

    
1821
      lvs = node_res.payload
1822
      for lv_name, (_, _, lv_online) in lvs.items():
1823
        inst = nv_dict.pop((node, lv_name), None)
1824
        if (not lv_online and inst is not None
1825
            and inst.name not in res_instances):
1826
          res_instances.append(inst.name)
1827

    
1828
    # any leftover items in nv_dict are missing LVs, let's arrange the
1829
    # data better
1830
    for key, inst in nv_dict.iteritems():
1831
      if inst.name not in res_missing:
1832
        res_missing[inst.name] = []
1833
      res_missing[inst.name].append(key)
1834

    
1835
    return result
1836

    
1837

    
1838
class LURepairDiskSizes(NoHooksLU):
1839
  """Verifies the cluster disks sizes.
1840

1841
  """
1842
  _OP_REQP = ["instances"]
1843
  REQ_BGL = False
1844

    
1845
  def ExpandNames(self):
1846
    if not isinstance(self.op.instances, list):
1847
      raise errors.OpPrereqError("Invalid argument type 'instances'",
1848
                                 errors.ECODE_INVAL)
1849

    
1850
    if self.op.instances:
1851
      self.wanted_names = []
1852
      for name in self.op.instances:
1853
        full_name = _ExpandInstanceName(self.cfg, name)
1854
        self.wanted_names.append(full_name)
1855
      self.needed_locks = {
1856
        locking.LEVEL_NODE: [],
1857
        locking.LEVEL_INSTANCE: self.wanted_names,
1858
        }
1859
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1860
    else:
1861
      self.wanted_names = None
1862
      self.needed_locks = {
1863
        locking.LEVEL_NODE: locking.ALL_SET,
1864
        locking.LEVEL_INSTANCE: locking.ALL_SET,
1865
        }
1866
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1867

    
1868
  def DeclareLocks(self, level):
1869
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
1870
      self._LockInstancesNodes(primary_only=True)
1871

    
1872
  def CheckPrereq(self):
1873
    """Check prerequisites.
1874

1875
    This only checks the optional instance list against the existing names.
1876

1877
    """
1878
    if self.wanted_names is None:
1879
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1880

    
1881
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1882
                             in self.wanted_names]
1883

    
1884
  def _EnsureChildSizes(self, disk):
1885
    """Ensure children of the disk have the needed disk size.
1886

1887
    This is valid mainly for DRBD8 and fixes an issue where the
1888
    children have smaller disk size.
1889

1890
    @param disk: an L{ganeti.objects.Disk} object
1891

1892
    """
1893
    if disk.dev_type == constants.LD_DRBD8:
1894
      assert disk.children, "Empty children for DRBD8?"
1895
      fchild = disk.children[0]
1896
      mismatch = fchild.size < disk.size
1897
      if mismatch:
1898
        self.LogInfo("Child disk has size %d, parent %d, fixing",
1899
                     fchild.size, disk.size)
1900
        fchild.size = disk.size
1901

    
1902
      # and we recurse on this child only, not on the metadev
1903
      return self._EnsureChildSizes(fchild) or mismatch
1904
    else:
1905
      return False
1906

    
1907
  def Exec(self, feedback_fn):
1908
    """Verify the size of cluster disks.
1909

1910
    """
1911
    # TODO: check child disks too
1912
    # TODO: check differences in size between primary/secondary nodes
1913
    per_node_disks = {}
1914
    for instance in self.wanted_instances:
1915
      pnode = instance.primary_node
1916
      if pnode not in per_node_disks:
1917
        per_node_disks[pnode] = []
1918
      for idx, disk in enumerate(instance.disks):
1919
        per_node_disks[pnode].append((instance, idx, disk))
1920

    
1921
    changed = []
1922
    for node, dskl in per_node_disks.items():
1923
      newl = [v[2].Copy() for v in dskl]
1924
      for dsk in newl:
1925
        self.cfg.SetDiskID(dsk, node)
1926
      result = self.rpc.call_blockdev_getsizes(node, newl)
1927
      if result.fail_msg:
1928
        self.LogWarning("Failure in blockdev_getsizes call to node"
1929
                        " %s, ignoring", node)
1930
        continue
1931
      if len(result.data) != len(dskl):
1932
        self.LogWarning("Invalid result from node %s, ignoring node results",
1933
                        node)
1934
        continue
1935
      for ((instance, idx, disk), size) in zip(dskl, result.data):
1936
        if size is None:
1937
          self.LogWarning("Disk %d of instance %s did not return size"
1938
                          " information, ignoring", idx, instance.name)
1939
          continue
1940
        if not isinstance(size, (int, long)):
1941
          self.LogWarning("Disk %d of instance %s did not return valid"
1942
                          " size information, ignoring", idx, instance.name)
1943
          continue
1944
        size = size >> 20
1945
        if size != disk.size:
1946
          self.LogInfo("Disk %d of instance %s has mismatched size,"
1947
                       " correcting: recorded %d, actual %d", idx,
1948
                       instance.name, disk.size, size)
1949
          disk.size = size
1950
          self.cfg.Update(instance, feedback_fn)
1951
          changed.append((instance.name, idx, size))
1952
        if self._EnsureChildSizes(disk):
1953
          self.cfg.Update(instance, feedback_fn)
1954
          changed.append((instance.name, idx, disk.size))
1955
    return changed
1956

    
1957

    
1958
class LURenameCluster(LogicalUnit):
1959
  """Rename the cluster.
1960

1961
  """
1962
  HPATH = "cluster-rename"
1963
  HTYPE = constants.HTYPE_CLUSTER
1964
  _OP_REQP = ["name"]
1965

    
1966
  def BuildHooksEnv(self):
1967
    """Build hooks env.
1968

1969
    """
1970
    env = {
1971
      "OP_TARGET": self.cfg.GetClusterName(),
1972
      "NEW_NAME": self.op.name,
1973
      }
1974
    mn = self.cfg.GetMasterNode()
1975
    all_nodes = self.cfg.GetNodeList()
1976
    return env, [mn], all_nodes
1977

    
1978
  def CheckPrereq(self):
1979
    """Verify that the passed name is a valid one.
1980

1981
    """
1982
    hostname = utils.GetHostInfo(self.op.name)
1983

    
1984
    new_name = hostname.name
1985
    self.ip = new_ip = hostname.ip
1986
    old_name = self.cfg.GetClusterName()
1987
    old_ip = self.cfg.GetMasterIP()
1988
    if new_name == old_name and new_ip == old_ip:
1989
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1990
                                 " cluster has changed",
1991
                                 errors.ECODE_INVAL)
1992
    if new_ip != old_ip:
1993
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1994
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1995
                                   " reachable on the network. Aborting." %
1996
                                   new_ip, errors.ECODE_NOTUNIQUE)
1997

    
1998
    self.op.name = new_name
1999

    
2000
  def Exec(self, feedback_fn):
2001
    """Rename the cluster.
2002

2003
    """
2004
    clustername = self.op.name
2005
    ip = self.ip
2006

    
2007
    # shutdown the master IP
2008
    master = self.cfg.GetMasterNode()
2009
    result = self.rpc.call_node_stop_master(master, False)
2010
    result.Raise("Could not disable the master role")
2011

    
2012
    try:
2013
      cluster = self.cfg.GetClusterInfo()
2014
      cluster.cluster_name = clustername
2015
      cluster.master_ip = ip
2016
      self.cfg.Update(cluster, feedback_fn)
2017

    
2018
      # update the known hosts file
2019
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
2020
      node_list = self.cfg.GetNodeList()
2021
      try:
2022
        node_list.remove(master)
2023
      except ValueError:
2024
        pass
2025
      result = self.rpc.call_upload_file(node_list,
2026
                                         constants.SSH_KNOWN_HOSTS_FILE)
2027
      for to_node, to_result in result.iteritems():
2028
        msg = to_result.fail_msg
2029
        if msg:
2030
          msg = ("Copy of file %s to node %s failed: %s" %
2031
                 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
2032
          self.proc.LogWarning(msg)
2033

    
2034
    finally:
2035
      result = self.rpc.call_node_start_master(master, False, False)
2036
      msg = result.fail_msg
2037
      if msg:
2038
        self.LogWarning("Could not re-enable the master role on"
2039
                        " the master, please restart manually: %s", msg)
2040

    
2041

    
2042
def _RecursiveCheckIfLVMBased(disk):
2043
  """Check if the given disk or its children are lvm-based.
2044

2045
  @type disk: L{objects.Disk}
2046
  @param disk: the disk to check
2047
  @rtype: boolean
2048
  @return: boolean indicating whether a LD_LV dev_type was found or not
2049

2050
  """
2051
  if disk.children:
2052
    for chdisk in disk.children:
2053
      if _RecursiveCheckIfLVMBased(chdisk):
2054
        return True
2055
  return disk.dev_type == constants.LD_LV
2056

    
2057

    
2058
class LUSetClusterParams(LogicalUnit):
2059
  """Change the parameters of the cluster.
2060

2061
  """
2062
  HPATH = "cluster-modify"
2063
  HTYPE = constants.HTYPE_CLUSTER
2064
  _OP_REQP = []
2065
  REQ_BGL = False
2066

    
2067
  def CheckArguments(self):
2068
    """Check parameters
2069

2070
    """
2071
    if not hasattr(self.op, "candidate_pool_size"):
2072
      self.op.candidate_pool_size = None
2073
    if self.op.candidate_pool_size is not None:
2074
      try:
2075
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
2076
      except (ValueError, TypeError), err:
2077
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
2078
                                   str(err), errors.ECODE_INVAL)
2079
      if self.op.candidate_pool_size < 1:
2080
        raise errors.OpPrereqError("At least one master candidate needed",
2081
                                   errors.ECODE_INVAL)
2082

    
2083
  def ExpandNames(self):
2084
    # FIXME: in the future maybe other cluster params won't require checking on
2085
    # all nodes to be modified.
2086
    self.needed_locks = {
2087
      locking.LEVEL_NODE: locking.ALL_SET,
2088
    }
2089
    self.share_locks[locking.LEVEL_NODE] = 1
2090

    
2091
  def BuildHooksEnv(self):
2092
    """Build hooks env.
2093

2094
    """
2095
    env = {
2096
      "OP_TARGET": self.cfg.GetClusterName(),
2097
      "NEW_VG_NAME": self.op.vg_name,
2098
      }
2099
    mn = self.cfg.GetMasterNode()
2100
    return env, [mn], [mn]
2101

    
2102
  def CheckPrereq(self):
2103
    """Check prerequisites.
2104

2105
    This checks whether the given params don't conflict and
2106
    if the given volume group is valid.
2107

2108
    """
2109
    if self.op.vg_name is not None and not self.op.vg_name:
2110
      instances = self.cfg.GetAllInstancesInfo().values()
2111
      for inst in instances:
2112
        for disk in inst.disks:
2113
          if _RecursiveCheckIfLVMBased(disk):
2114
            raise errors.OpPrereqError("Cannot disable lvm storage while"
2115
                                       " lvm-based instances exist",
2116
                                       errors.ECODE_INVAL)
2117

    
2118
    node_list = self.acquired_locks[locking.LEVEL_NODE]
2119

    
2120
    # if vg_name not None, checks given volume group on all nodes
2121
    if self.op.vg_name:
2122
      vglist = self.rpc.call_vg_list(node_list)
2123
      for node in node_list:
2124
        msg = vglist[node].fail_msg
2125
        if msg:
2126
          # ignoring down node
2127
          self.LogWarning("Error while gathering data on node %s"
2128
                          " (ignoring node): %s", node, msg)
2129
          continue
2130
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2131
                                              self.op.vg_name,
2132
                                              constants.MIN_VG_SIZE)
2133
        if vgstatus:
2134
          raise errors.OpPrereqError("Error on node '%s': %s" %
2135
                                     (node, vgstatus), errors.ECODE_ENVIRON)
2136

    
2137
    self.cluster = cluster = self.cfg.GetClusterInfo()
2138
    # validate params changes
2139
    if self.op.beparams:
2140
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2141
      self.new_beparams = objects.FillDict(
2142
        cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
2143

    
2144
    if self.op.nicparams:
2145
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2146
      self.new_nicparams = objects.FillDict(
2147
        cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
2148
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
2149
      nic_errors = []
2150

    
2151
      # check all instances for consistency
2152
      for instance in self.cfg.GetAllInstancesInfo().values():
2153
        for nic_idx, nic in enumerate(instance.nics):
2154
          params_copy = copy.deepcopy(nic.nicparams)
2155
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
2156

    
2157
          # check parameter syntax
2158
          try:
2159
            objects.NIC.CheckParameterSyntax(params_filled)
2160
          except errors.ConfigurationError, err:
2161
            nic_errors.append("Instance %s, nic/%d: %s" %
2162
                              (instance.name, nic_idx, err))
2163

    
2164
          # if we're moving instances to routed, check that they have an ip
2165
          target_mode = params_filled[constants.NIC_MODE]
2166
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2167
            nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2168
                              (instance.name, nic_idx))
2169
      if nic_errors:
2170
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2171
                                   "\n".join(nic_errors))
2172

    
2173
    # hypervisor list/parameters
2174
    self.new_hvparams = objects.FillDict(cluster.hvparams, {})
2175
    if self.op.hvparams:
2176
      if not isinstance(self.op.hvparams, dict):
2177
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input",
2178
                                   errors.ECODE_INVAL)
2179
      for hv_name, hv_dict in self.op.hvparams.items():
2180
        if hv_name not in self.new_hvparams:
2181
          self.new_hvparams[hv_name] = hv_dict
2182
        else:
2183
          self.new_hvparams[hv_name].update(hv_dict)
2184

    
2185
    # os hypervisor parameters
2186
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
2187
    if self.op.os_hvp:
2188
      if not isinstance(self.op.os_hvp, dict):
2189
        raise errors.OpPrereqError("Invalid 'os_hvp' parameter on input",
2190
                                   errors.ECODE_INVAL)
2191
      for os_name, hvs in self.op.os_hvp.items():
2192
        if not isinstance(hvs, dict):
2193
          raise errors.OpPrereqError(("Invalid 'os_hvp' parameter on"
2194
                                      " input"), errors.ECODE_INVAL)
2195
        if os_name not in self.new_os_hvp:
2196
          self.new_os_hvp[os_name] = hvs
2197
        else:
2198
          for hv_name, hv_dict in hvs.items():
2199
            if hv_name not in self.new_os_hvp[os_name]:
2200
              self.new_os_hvp[os_name][hv_name] = hv_dict
2201
            else:
2202
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
2203

    
2204
    if self.op.enabled_hypervisors is not None:
2205
      self.hv_list = self.op.enabled_hypervisors
2206
      if not self.hv_list:
2207
        raise errors.OpPrereqError("Enabled hypervisors list must contain at"
2208
                                   " least one member",
2209
                                   errors.ECODE_INVAL)
2210
      invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
2211
      if invalid_hvs:
2212
        raise errors.OpPrereqError("Enabled hypervisors contains invalid"
2213
                                   " entries: %s" %
2214
                                   utils.CommaJoin(invalid_hvs),
2215
                                   errors.ECODE_INVAL)
2216
    else:
2217
      self.hv_list = cluster.enabled_hypervisors
2218

    
2219
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
2220
      # either the enabled list has changed, or the parameters have, validate
2221
      for hv_name, hv_params in self.new_hvparams.items():
2222
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
2223
            (self.op.enabled_hypervisors and
2224
             hv_name in self.op.enabled_hypervisors)):
2225
          # either this is a new hypervisor, or its parameters have changed
2226
          hv_class = hypervisor.GetHypervisor(hv_name)
2227
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2228
          hv_class.CheckParameterSyntax(hv_params)
2229
          _CheckHVParams(self, node_list, hv_name, hv_params)
2230

    
2231
    if self.op.os_hvp:
2232
      # no need to check any newly-enabled hypervisors, since the
2233
      # defaults have already been checked in the above code-block
2234
      for os_name, os_hvp in self.new_os_hvp.items():
2235
        for hv_name, hv_params in os_hvp.items():
2236
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2237
          # we need to fill in the new os_hvp on top of the actual hv_p
2238
          cluster_defaults = self.new_hvparams.get(hv_name, {})
2239
          new_osp = objects.FillDict(cluster_defaults, hv_params)
2240
          hv_class = hypervisor.GetHypervisor(hv_name)
2241
          hv_class.CheckParameterSyntax(new_osp)
2242
          _CheckHVParams(self, node_list, hv_name, new_osp)
2243

    
2244

    
2245
  def Exec(self, feedback_fn):
2246
    """Change the parameters of the cluster.
2247

2248
    """
2249
    if self.op.vg_name is not None:
2250
      new_volume = self.op.vg_name
2251
      if not new_volume:
2252
        new_volume = None
2253
      if new_volume != self.cfg.GetVGName():
2254
        self.cfg.SetVGName(new_volume)
2255
      else:
2256
        feedback_fn("Cluster LVM configuration already in desired"
2257
                    " state, not changing")
2258
    if self.op.hvparams:
2259
      self.cluster.hvparams = self.new_hvparams
2260
    if self.op.os_hvp:
2261
      self.cluster.os_hvp = self.new_os_hvp
2262
    if self.op.enabled_hypervisors is not None:
2263
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2264
    if self.op.beparams:
2265
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2266
    if self.op.nicparams:
2267
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2268

    
2269
    if self.op.candidate_pool_size is not None:
2270
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
2271
      # we need to update the pool size here, otherwise the save will fail
2272
      _AdjustCandidatePool(self, [])
2273

    
2274
    self.cfg.Update(self.cluster, feedback_fn)
2275

    
2276

    
2277
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
2278
  """Distribute additional files which are part of the cluster configuration.
2279

2280
  ConfigWriter takes care of distributing the config and ssconf files, but
2281
  there are more files which should be distributed to all nodes. This function
2282
  makes sure those are copied.
2283

2284
  @param lu: calling logical unit
2285
  @param additional_nodes: list of nodes not in the config to distribute to
2286

2287
  """
2288
  # 1. Gather target nodes
2289
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2290
  dist_nodes = lu.cfg.GetOnlineNodeList()
2291
  if additional_nodes is not None:
2292
    dist_nodes.extend(additional_nodes)
2293
  if myself.name in dist_nodes:
2294
    dist_nodes.remove(myself.name)
2295

    
2296
  # 2. Gather files to distribute
2297
  dist_files = set([constants.ETC_HOSTS,
2298
                    constants.SSH_KNOWN_HOSTS_FILE,
2299
                    constants.RAPI_CERT_FILE,
2300
                    constants.RAPI_USERS_FILE,
2301
                    constants.CONFD_HMAC_KEY,
2302
                   ])
2303

    
2304
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2305
  for hv_name in enabled_hypervisors:
2306
    hv_class = hypervisor.GetHypervisor(hv_name)
2307
    dist_files.update(hv_class.GetAncillaryFiles())
2308

    
2309
  # 3. Perform the files upload
2310
  for fname in dist_files:
2311
    if os.path.exists(fname):
2312
      result = lu.rpc.call_upload_file(dist_nodes, fname)
2313
      for to_node, to_result in result.items():
2314
        msg = to_result.fail_msg
2315
        if msg:
2316
          msg = ("Copy of file %s to node %s failed: %s" %
2317
                 (fname, to_node, msg))
2318
          lu.proc.LogWarning(msg)
2319

    
2320

    
2321
class LURedistributeConfig(NoHooksLU):
2322
  """Force the redistribution of cluster configuration.
2323

2324
  This is a very simple LU.
2325

2326
  """
2327
  _OP_REQP = []
2328
  REQ_BGL = False
2329

    
2330
  def ExpandNames(self):
2331
    self.needed_locks = {
2332
      locking.LEVEL_NODE: locking.ALL_SET,
2333
    }
2334
    self.share_locks[locking.LEVEL_NODE] = 1
2335

    
2336
  def CheckPrereq(self):
2337
    """Check prerequisites.
2338

2339
    """
2340

    
2341
  def Exec(self, feedback_fn):
2342
    """Redistribute the configuration.
2343

2344
    """
2345
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
2346
    _RedistributeAncillaryFiles(self)
2347

    
2348

    
2349
def _WaitForSync(lu, instance, oneshot=False):
2350
  """Sleep and poll for an instance's disk to sync.
2351

2352
  """
2353
  if not instance.disks:
2354
    return True
2355

    
2356
  if not oneshot:
2357
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2358

    
2359
  node = instance.primary_node
2360

    
2361
  for dev in instance.disks:
2362
    lu.cfg.SetDiskID(dev, node)
2363

    
2364
  # TODO: Convert to utils.Retry
2365

    
2366
  retries = 0
2367
  degr_retries = 10 # in seconds, as we sleep 1 second each time
2368
  while True:
2369
    max_time = 0
2370
    done = True
2371
    cumul_degraded = False
2372
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2373
    msg = rstats.fail_msg
2374
    if msg:
2375
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2376
      retries += 1
2377
      if retries >= 10:
2378
        raise errors.RemoteError("Can't contact node %s for mirror data,"
2379
                                 " aborting." % node)
2380
      time.sleep(6)
2381
      continue
2382
    rstats = rstats.payload
2383
    retries = 0
2384
    for i, mstat in enumerate(rstats):
2385
      if mstat is None:
2386
        lu.LogWarning("Can't compute data for node %s/%s",
2387
                           node, instance.disks[i].iv_name)
2388
        continue
2389

    
2390
      cumul_degraded = (cumul_degraded or
2391
                        (mstat.is_degraded and mstat.sync_percent is None))
2392
      if mstat.sync_percent is not None:
2393
        done = False
2394
        if mstat.estimated_time is not None:
2395
          rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2396
          max_time = mstat.estimated_time
2397
        else:
2398
          rem_time = "no time estimate"
2399
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2400
                        (instance.disks[i].iv_name, mstat.sync_percent,
2401
                         rem_time))
2402

    
2403
    # if we're done but degraded, let's do a few small retries, to
2404
    # make sure we see a stable and not transient situation; therefore
2405
    # we force restart of the loop
2406
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
2407
      logging.info("Degraded disks found, %d retries left", degr_retries)
2408
      degr_retries -= 1
2409
      time.sleep(1)
2410
      continue
2411

    
2412
    if done or oneshot:
2413
      break
2414

    
2415
    time.sleep(min(60, max_time))
2416

    
2417
  if done:
2418
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2419
  return not cumul_degraded
2420

    
2421

    
2422
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2423
  """Check that mirrors are not degraded.
2424

2425
  The ldisk parameter, if True, will change the test from the
2426
  is_degraded attribute (which represents overall non-ok status for
2427
  the device(s)) to the ldisk (representing the local storage status).
2428

2429
  """
2430
  lu.cfg.SetDiskID(dev, node)
2431

    
2432
  result = True
2433

    
2434
  if on_primary or dev.AssembleOnSecondary():
2435
    rstats = lu.rpc.call_blockdev_find(node, dev)
2436
    msg = rstats.fail_msg
2437
    if msg:
2438
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2439
      result = False
2440
    elif not rstats.payload:
2441
      lu.LogWarning("Can't find disk on node %s", node)
2442
      result = False
2443
    else:
2444
      if ldisk:
2445
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2446
      else:
2447
        result = result and not rstats.payload.is_degraded
2448

    
2449
  if dev.children:
2450
    for child in dev.children:
2451
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2452

    
2453
  return result
2454

    
2455

    
2456
class LUDiagnoseOS(NoHooksLU):
2457
  """Logical unit for OS diagnose/query.
2458

2459
  """
2460
  _OP_REQP = ["output_fields", "names"]
2461
  REQ_BGL = False
2462
  _FIELDS_STATIC = utils.FieldSet()
2463
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants")
2464
  # Fields that need calculation of global os validity
2465
  _FIELDS_NEEDVALID = frozenset(["valid", "variants"])
2466

    
2467
  def ExpandNames(self):
2468
    if self.op.names:
2469
      raise errors.OpPrereqError("Selective OS query not supported",
2470
                                 errors.ECODE_INVAL)
2471

    
2472
    _CheckOutputFields(static=self._FIELDS_STATIC,
2473
                       dynamic=self._FIELDS_DYNAMIC,
2474
                       selected=self.op.output_fields)
2475

    
2476
    # Lock all nodes, in shared mode
2477
    # Temporary removal of locks, should be reverted later
2478
    # TODO: reintroduce locks when they are lighter-weight
2479
    self.needed_locks = {}
2480
    #self.share_locks[locking.LEVEL_NODE] = 1
2481
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2482

    
2483
  def CheckPrereq(self):
2484
    """Check prerequisites.
2485

2486
    """
2487

    
2488
  @staticmethod
2489
  def _DiagnoseByOS(rlist):
2490
    """Remaps a per-node return list into an a per-os per-node dictionary
2491

2492
    @param rlist: a map with node names as keys and OS objects as values
2493

2494
    @rtype: dict
2495
    @return: a dictionary with osnames as keys and as value another map, with
2496
        nodes as keys and tuples of (path, status, diagnose) as values, eg::
2497

2498
          {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2499
                                     (/srv/..., False, "invalid api")],
2500
                           "node2": [(/srv/..., True, "")]}
2501
          }
2502

2503
    """
2504
    all_os = {}
2505
    # we build here the list of nodes that didn't fail the RPC (at RPC
2506
    # level), so that nodes with a non-responding node daemon don't
2507
    # make all OSes invalid
2508
    good_nodes = [node_name for node_name in rlist
2509
                  if not rlist[node_name].fail_msg]
2510
    for node_name, nr in rlist.items():
2511
      if nr.fail_msg or not nr.payload:
2512
        continue
2513
      for name, path, status, diagnose, variants in nr.payload:
2514
        if name not in all_os:
2515
          # build a list of nodes for this os containing empty lists
2516
          # for each node in node_list
2517
          all_os[name] = {}
2518
          for nname in good_nodes:
2519
            all_os[name][nname] = []
2520
        all_os[name][node_name].append((path, status, diagnose, variants))
2521
    return all_os
2522

    
2523
  def Exec(self, feedback_fn):
2524
    """Compute the list of OSes.
2525

2526
    """
2527
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2528
    node_data = self.rpc.call_os_diagnose(valid_nodes)
2529
    pol = self._DiagnoseByOS(node_data)
2530
    output = []
2531
    calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields)
2532
    calc_variants = "variants" in self.op.output_fields
2533

    
2534
    for os_name, os_data in pol.items():
2535
      row = []
2536
      if calc_valid:
2537
        valid = True
2538
        variants = None
2539
        for osl in os_data.values():
2540
          valid = valid and osl and osl[0][1]
2541
          if not valid:
2542
            variants = None
2543
            break
2544
          if calc_variants:
2545
            node_variants = osl[0][3]
2546
            if variants is None:
2547
              variants = node_variants
2548
            else:
2549
              variants = [v for v in variants if v in node_variants]
2550

    
2551
      for field in self.op.output_fields:
2552
        if field == "name":
2553
          val = os_name
2554
        elif field == "valid":
2555
          val = valid
2556
        elif field == "node_status":
2557
          # this is just a copy of the dict
2558
          val = {}
2559
          for node_name, nos_list in os_data.items():
2560
            val[node_name] = nos_list
2561
        elif field == "variants":
2562
          val =  variants
2563
        else:
2564
          raise errors.ParameterError(field)
2565
        row.append(val)
2566
      output.append(row)
2567

    
2568
    return output
2569

    
2570

    
2571
class LURemoveNode(LogicalUnit):
2572
  """Logical unit for removing a node.
2573

2574
  """
2575
  HPATH = "node-remove"
2576
  HTYPE = constants.HTYPE_NODE
2577
  _OP_REQP = ["node_name"]
2578

    
2579
  def BuildHooksEnv(self):
2580
    """Build hooks env.
2581

2582
    This doesn't run on the target node in the pre phase as a failed
2583
    node would then be impossible to remove.
2584

2585
    """
2586
    env = {
2587
      "OP_TARGET": self.op.node_name,
2588
      "NODE_NAME": self.op.node_name,
2589
      }
2590
    all_nodes = self.cfg.GetNodeList()
2591
    try:
2592
      all_nodes.remove(self.op.node_name)
2593
    except ValueError:
2594
      logging.warning("Node %s which is about to be removed not found"
2595
                      " in the all nodes list", self.op.node_name)
2596
    return env, all_nodes, all_nodes
2597

    
2598
  def CheckPrereq(self):
2599
    """Check prerequisites.
2600

2601
    This checks:
2602
     - the node exists in the configuration
2603
     - it does not have primary or secondary instances
2604
     - it's not the master
2605

2606
    Any errors are signaled by raising errors.OpPrereqError.
2607

2608
    """
2609
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
2610
    node = self.cfg.GetNodeInfo(self.op.node_name)
2611
    assert node is not None
2612

    
2613
    instance_list = self.cfg.GetInstanceList()
2614

    
2615
    masternode = self.cfg.GetMasterNode()
2616
    if node.name == masternode:
2617
      raise errors.OpPrereqError("Node is the master node,"
2618
                                 " you need to failover first.",
2619
                                 errors.ECODE_INVAL)
2620

    
2621
    for instance_name in instance_list:
2622
      instance = self.cfg.GetInstanceInfo(instance_name)
2623
      if node.name in instance.all_nodes:
2624
        raise errors.OpPrereqError("Instance %s is still running on the node,"
2625
                                   " please remove first." % instance_name,
2626
                                   errors.ECODE_INVAL)
2627
    self.op.node_name = node.name
2628
    self.node = node
2629

    
2630
  def Exec(self, feedback_fn):
2631
    """Removes the node from the cluster.
2632

2633
    """
2634
    node = self.node
2635
    logging.info("Stopping the node daemon and removing configs from node %s",
2636
                 node.name)
2637

    
2638
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
2639

    
2640
    # Promote nodes to master candidate as needed
2641
    _AdjustCandidatePool(self, exceptions=[node.name])
2642
    self.context.RemoveNode(node.name)
2643

    
2644
    # Run post hooks on the node before it's removed
2645
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2646
    try:
2647
      hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2648
    except:
2649
      # pylint: disable-msg=W0702
2650
      self.LogWarning("Errors occurred running hooks on %s" % node.name)
2651

    
2652
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
2653
    msg = result.fail_msg
2654
    if msg:
2655
      self.LogWarning("Errors encountered on the remote node while leaving"
2656
                      " the cluster: %s", msg)
2657

    
2658

    
2659
class LUQueryNodes(NoHooksLU):
2660
  """Logical unit for querying nodes.
2661

2662
  """
2663
  # pylint: disable-msg=W0142
2664
  _OP_REQP = ["output_fields", "names", "use_locking"]
2665
  REQ_BGL = False
2666

    
2667
  _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
2668
                    "master_candidate", "offline", "drained"]
2669

    
2670
  _FIELDS_DYNAMIC = utils.FieldSet(
2671
    "dtotal", "dfree",
2672
    "mtotal", "mnode", "mfree",
2673
    "bootid",
2674
    "ctotal", "cnodes", "csockets",
2675
    )
2676

    
2677
  _FIELDS_STATIC = utils.FieldSet(*[
2678
    "pinst_cnt", "sinst_cnt",
2679
    "pinst_list", "sinst_list",
2680
    "pip", "sip", "tags",
2681
    "master",
2682
    "role"] + _SIMPLE_FIELDS
2683
    )
2684

    
2685
  def ExpandNames(self):
2686
    _CheckOutputFields(static=self._FIELDS_STATIC,
2687
                       dynamic=self._FIELDS_DYNAMIC,
2688
                       selected=self.op.output_fields)
2689

    
2690
    self.needed_locks = {}
2691
    self.share_locks[locking.LEVEL_NODE] = 1
2692

    
2693
    if self.op.names:
2694
      self.wanted = _GetWantedNodes(self, self.op.names)
2695
    else:
2696
      self.wanted = locking.ALL_SET
2697

    
2698
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2699
    self.do_locking = self.do_node_query and self.op.use_locking
2700
    if self.do_locking:
2701
      # if we don't request only static fields, we need to lock the nodes
2702
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
2703

    
2704
  def CheckPrereq(self):
2705
    """Check prerequisites.
2706

2707
    """
2708
    # The validation of the node list is done in the _GetWantedNodes,
2709
    # if non empty, and if empty, there's no validation to do
2710
    pass
2711

    
2712
  def Exec(self, feedback_fn):
2713
    """Computes the list of nodes and their attributes.
2714

2715
    """
2716
    all_info = self.cfg.GetAllNodesInfo()
2717
    if self.do_locking:
2718
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
2719
    elif self.wanted != locking.ALL_SET:
2720
      nodenames = self.wanted
2721
      missing = set(nodenames).difference(all_info.keys())
2722
      if missing:
2723
        raise errors.OpExecError(
2724
          "Some nodes were removed before retrieving their data: %s" % missing)
2725
    else:
2726
      nodenames = all_info.keys()
2727

    
2728
    nodenames = utils.NiceSort(nodenames)
2729
    nodelist = [all_info[name] for name in nodenames]
2730

    
2731
    # begin data gathering
2732

    
2733
    if self.do_node_query:
2734
      live_data = {}
2735
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2736
                                          self.cfg.GetHypervisorType())
2737
      for name in nodenames:
2738
        nodeinfo = node_data[name]
2739
        if not nodeinfo.fail_msg and nodeinfo.payload:
2740
          nodeinfo = nodeinfo.payload
2741
          fn = utils.TryConvert
2742
          live_data[name] = {
2743
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2744
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2745
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
2746
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2747
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
2748
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2749
            "bootid": nodeinfo.get('bootid', None),
2750
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2751
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2752
            }
2753
        else:
2754
          live_data[name] = {}
2755
    else:
2756
      live_data = dict.fromkeys(nodenames, {})
2757

    
2758
    node_to_primary = dict([(name, set()) for name in nodenames])
2759
    node_to_secondary = dict([(name, set()) for name in nodenames])
2760

    
2761
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
2762
                             "sinst_cnt", "sinst_list"))
2763
    if inst_fields & frozenset(self.op.output_fields):
2764
      inst_data = self.cfg.GetAllInstancesInfo()
2765

    
2766
      for inst in inst_data.values():
2767
        if inst.primary_node in node_to_primary:
2768
          node_to_primary[inst.primary_node].add(inst.name)
2769
        for secnode in inst.secondary_nodes:
2770
          if secnode in node_to_secondary:
2771
            node_to_secondary[secnode].add(inst.name)
2772

    
2773
    master_node = self.cfg.GetMasterNode()
2774

    
2775
    # end data gathering
2776

    
2777
    output = []
2778
    for node in nodelist:
2779
      node_output = []
2780
      for field in self.op.output_fields:
2781
        if field in self._SIMPLE_FIELDS:
2782
          val = getattr(node, field)
2783
        elif field == "pinst_list":
2784
          val = list(node_to_primary[node.name])
2785
        elif field == "sinst_list":
2786
          val = list(node_to_secondary[node.name])
2787
        elif field == "pinst_cnt":
2788
          val = len(node_to_primary[node.name])
2789
        elif field == "sinst_cnt":
2790
          val = len(node_to_secondary[node.name])
2791
        elif field == "pip":
2792
          val = node.primary_ip
2793
        elif field == "sip":
2794
          val = node.secondary_ip
2795
        elif field == "tags":
2796
          val = list(node.GetTags())
2797
        elif field == "master":
2798
          val = node.name == master_node
2799
        elif self._FIELDS_DYNAMIC.Matches(field):
2800
          val = live_data[node.name].get(field, None)
2801
        elif field == "role":
2802
          if node.name == master_node:
2803
            val = "M"
2804
          elif node.master_candidate:
2805
            val = "C"
2806
          elif node.drained:
2807
            val = "D"
2808
          elif node.offline:
2809
            val = "O"
2810
          else:
2811
            val = "R"
2812
        else:
2813
          raise errors.ParameterError(field)
2814
        node_output.append(val)
2815
      output.append(node_output)
2816

    
2817
    return output
2818

    
2819

    
2820
class LUQueryNodeVolumes(NoHooksLU):
2821
  """Logical unit for getting volumes on node(s).
2822

2823
  """
2824
  _OP_REQP = ["nodes", "output_fields"]
2825
  REQ_BGL = False
2826
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2827
  _FIELDS_STATIC = utils.FieldSet("node")
2828

    
2829
  def ExpandNames(self):
2830
    _CheckOutputFields(static=self._FIELDS_STATIC,
2831
                       dynamic=self._FIELDS_DYNAMIC,
2832
                       selected=self.op.output_fields)
2833

    
2834
    self.needed_locks = {}
2835
    self.share_locks[locking.LEVEL_NODE] = 1
2836
    if not self.op.nodes:
2837
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2838
    else:
2839
      self.needed_locks[locking.LEVEL_NODE] = \
2840
        _GetWantedNodes(self, self.op.nodes)
2841

    
2842
  def CheckPrereq(self):
2843
    """Check prerequisites.
2844

2845
    This checks that the fields required are valid output fields.
2846

2847
    """
2848
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2849

    
2850
  def Exec(self, feedback_fn):
2851
    """Computes the list of nodes and their attributes.
2852

2853
    """
2854
    nodenames = self.nodes
2855
    volumes = self.rpc.call_node_volumes(nodenames)
2856

    
2857
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2858
             in self.cfg.GetInstanceList()]
2859

    
2860
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2861

    
2862
    output = []
2863
    for node in nodenames:
2864
      nresult = volumes[node]
2865
      if nresult.offline:
2866
        continue
2867
      msg = nresult.fail_msg
2868
      if msg:
2869
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2870
        continue
2871

    
2872
      node_vols = nresult.payload[:]
2873
      node_vols.sort(key=lambda vol: vol['dev'])
2874

    
2875
      for vol in node_vols:
2876
        node_output = []
2877
        for field in self.op.output_fields:
2878
          if field == "node":
2879
            val = node
2880
          elif field == "phys":
2881
            val = vol['dev']
2882
          elif field == "vg":
2883
            val = vol['vg']
2884
          elif field == "name":
2885
            val = vol['name']
2886
          elif field == "size":
2887
            val = int(float(vol['size']))
2888
          elif field == "instance":
2889
            for inst in ilist:
2890
              if node not in lv_by_node[inst]:
2891
                continue
2892
              if vol['name'] in lv_by_node[inst][node]:
2893
                val = inst.name
2894
                break
2895
            else:
2896
              val = '-'
2897
          else:
2898
            raise errors.ParameterError(field)
2899
          node_output.append(str(val))
2900

    
2901
        output.append(node_output)
2902

    
2903
    return output
2904

    
2905

    
2906
class LUQueryNodeStorage(NoHooksLU):
2907
  """Logical unit for getting information on storage units on node(s).
2908

2909
  """
2910
  _OP_REQP = ["nodes", "storage_type", "output_fields"]
2911
  REQ_BGL = False
2912
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
2913

    
2914
  def ExpandNames(self):
2915
    storage_type = self.op.storage_type
2916

    
2917
    if storage_type not in constants.VALID_STORAGE_TYPES:
2918
      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2919
                                 errors.ECODE_INVAL)
2920

    
2921
    _CheckOutputFields(static=self._FIELDS_STATIC,
2922
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
2923
                       selected=self.op.output_fields)
2924

    
2925
    self.needed_locks = {}
2926
    self.share_locks[locking.LEVEL_NODE] = 1
2927

    
2928
    if self.op.nodes:
2929
      self.needed_locks[locking.LEVEL_NODE] = \
2930
        _GetWantedNodes(self, self.op.nodes)
2931
    else:
2932
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2933

    
2934
  def CheckPrereq(self):
2935
    """Check prerequisites.
2936

2937
    This checks that the fields required are valid output fields.
2938

2939
    """
2940
    self.op.name = getattr(self.op, "name", None)
2941

    
2942
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2943

    
2944
  def Exec(self, feedback_fn):
2945
    """Computes the list of nodes and their attributes.
2946

2947
    """
2948
    # Always get name to sort by
2949
    if constants.SF_NAME in self.op.output_fields:
2950
      fields = self.op.output_fields[:]
2951
    else:
2952
      fields = [constants.SF_NAME] + self.op.output_fields
2953

    
2954
    # Never ask for node or type as it's only known to the LU
2955
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
2956
      while extra in fields:
2957
        fields.remove(extra)
2958

    
2959
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2960
    name_idx = field_idx[constants.SF_NAME]
2961

    
2962
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2963
    data = self.rpc.call_storage_list(self.nodes,
2964
                                      self.op.storage_type, st_args,
2965
                                      self.op.name, fields)
2966

    
2967
    result = []
2968

    
2969
    for node in utils.NiceSort(self.nodes):
2970
      nresult = data[node]
2971
      if nresult.offline:
2972
        continue
2973

    
2974
      msg = nresult.fail_msg
2975
      if msg:
2976
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2977
        continue
2978

    
2979
      rows = dict([(row[name_idx], row) for row in nresult.payload])
2980

    
2981
      for name in utils.NiceSort(rows.keys()):
2982
        row = rows[name]
2983

    
2984
        out = []
2985

    
2986
        for field in self.op.output_fields:
2987
          if field == constants.SF_NODE:
2988
            val = node
2989
          elif field == constants.SF_TYPE:
2990
            val = self.op.storage_type
2991
          elif field in field_idx:
2992
            val = row[field_idx[field]]
2993
          else:
2994
            raise errors.ParameterError(field)
2995

    
2996
          out.append(val)
2997

    
2998
        result.append(out)
2999

    
3000
    return result
3001

    
3002

    
3003
class LUModifyNodeStorage(NoHooksLU):
3004
  """Logical unit for modifying a storage volume on a node.
3005

3006
  """
3007
  _OP_REQP = ["node_name", "storage_type", "name", "changes"]
3008
  REQ_BGL = False
3009

    
3010
  def CheckArguments(self):
3011
    self.opnode_name = _ExpandNodeName(self.cfg, self.op.node_name)
3012

    
3013
    storage_type = self.op.storage_type
3014
    if storage_type not in constants.VALID_STORAGE_TYPES:
3015
      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
3016
                                 errors.ECODE_INVAL)
3017

    
3018
  def ExpandNames(self):
3019
    self.needed_locks = {
3020
      locking.LEVEL_NODE: self.op.node_name,
3021
      }
3022

    
3023
  def CheckPrereq(self):
3024
    """Check prerequisites.
3025

3026
    """
3027
    storage_type = self.op.storage_type
3028

    
3029
    try:
3030
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
3031
    except KeyError:
3032
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
3033
                                 " modified" % storage_type,
3034
                                 errors.ECODE_INVAL)
3035

    
3036
    diff = set(self.op.changes.keys()) - modifiable
3037
    if diff:
3038
      raise errors.OpPrereqError("The following fields can not be modified for"
3039
                                 " storage units of type '%s': %r" %
3040
                                 (storage_type, list(diff)),
3041
                                 errors.ECODE_INVAL)
3042

    
3043
  def Exec(self, feedback_fn):
3044
    """Computes the list of nodes and their attributes.
3045

3046
    """
3047
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
3048
    result = self.rpc.call_storage_modify(self.op.node_name,
3049
                                          self.op.storage_type, st_args,
3050
                                          self.op.name, self.op.changes)
3051
    result.Raise("Failed to modify storage unit '%s' on %s" %
3052
                 (self.op.name, self.op.node_name))
3053

    
3054

    
3055
class LUAddNode(LogicalUnit):
3056
  """Logical unit for adding node to the cluster.
3057

3058
  """
3059
  HPATH = "node-add"
3060
  HTYPE = constants.HTYPE_NODE
3061
  _OP_REQP = ["node_name"]
3062

    
3063
  def CheckArguments(self):
3064
    # validate/normalize the node name
3065
    self.op.node_name = utils.HostInfo.NormalizeName(self.op.node_name)
3066

    
3067
  def BuildHooksEnv(self):
3068
    """Build hooks env.
3069

3070
    This will run on all nodes before, and on all nodes + the new node after.
3071

3072
    """
3073
    env = {
3074
      "OP_TARGET": self.op.node_name,
3075
      "NODE_NAME": self.op.node_name,
3076
      "NODE_PIP": self.op.primary_ip,
3077
      "NODE_SIP": self.op.secondary_ip,
3078
      }
3079
    nodes_0 = self.cfg.GetNodeList()
3080
    nodes_1 = nodes_0 + [self.op.node_name, ]
3081
    return env, nodes_0, nodes_1
3082

    
3083
  def CheckPrereq(self):
3084
    """Check prerequisites.
3085

3086
    This checks:
3087
     - the new node is not already in the config
3088
     - it is resolvable
3089
     - its parameters (single/dual homed) matches the cluster
3090

3091
    Any errors are signaled by raising errors.OpPrereqError.
3092

3093
    """
3094
    node_name = self.op.node_name
3095
    cfg = self.cfg
3096

    
3097
    dns_data = utils.GetHostInfo(node_name)
3098

    
3099
    node = dns_data.name
3100
    primary_ip = self.op.primary_ip = dns_data.ip
3101
    secondary_ip = getattr(self.op, "secondary_ip", None)
3102
    if secondary_ip is None:
3103
      secondary_ip = primary_ip
3104
    if not utils.IsValidIP(secondary_ip):
3105
      raise errors.OpPrereqError("Invalid secondary IP given",
3106
                                 errors.ECODE_INVAL)
3107
    self.op.secondary_ip = secondary_ip
3108

    
3109
    node_list = cfg.GetNodeList()
3110
    if not self.op.readd and node in node_list:
3111
      raise errors.OpPrereqError("Node %s is already in the configuration" %
3112
                                 node, errors.ECODE_EXISTS)
3113
    elif self.op.readd and node not in node_list:
3114
      raise errors.OpPrereqError("Node %s is not in the configuration" % node,
3115
                                 errors.ECODE_NOENT)
3116

    
3117
    for existing_node_name in node_list:
3118
      existing_node = cfg.GetNodeInfo(existing_node_name)
3119

    
3120
      if self.op.readd and node == existing_node_name:
3121
        if (existing_node.primary_ip != primary_ip or
3122
            existing_node.secondary_ip != secondary_ip):
3123
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
3124
                                     " address configuration as before",
3125
                                     errors.ECODE_INVAL)
3126
        continue
3127

    
3128
      if (existing_node.primary_ip == primary_ip or
3129
          existing_node.secondary_ip == primary_ip or
3130
          existing_node.primary_ip == secondary_ip or
3131
          existing_node.secondary_ip == secondary_ip):
3132
        raise errors.OpPrereqError("New node ip address(es) conflict with"
3133
                                   " existing node %s" % existing_node.name,
3134
                                   errors.ECODE_NOTUNIQUE)
3135

    
3136
    # check that the type of the node (single versus dual homed) is the
3137
    # same as for the master
3138
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
3139
    master_singlehomed = myself.secondary_ip == myself.primary_ip
3140
    newbie_singlehomed = secondary_ip == primary_ip
3141
    if master_singlehomed != newbie_singlehomed:
3142
      if master_singlehomed:
3143
        raise errors.OpPrereqError("The master has no private ip but the"
3144
                                   " new node has one",
3145
                                   errors.ECODE_INVAL)
3146
      else:
3147
        raise errors.OpPrereqError("The master has a private ip but the"
3148
                                   " new node doesn't have one",
3149
                                   errors.ECODE_INVAL)
3150

    
3151
    # checks reachability
3152
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
3153
      raise errors.OpPrereqError("Node not reachable by ping",
3154
                                 errors.ECODE_ENVIRON)
3155

    
3156
    if not newbie_singlehomed:
3157
      # check reachability from my secondary ip to newbie's secondary ip
3158
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
3159
                           source=myself.secondary_ip):
3160
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
3161
                                   " based ping to noded port",
3162
                                   errors.ECODE_ENVIRON)
3163

    
3164
    if self.op.readd:
3165
      exceptions = [node]
3166
    else:
3167
      exceptions = []
3168

    
3169
    self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
3170

    
3171
    if self.op.readd:
3172
      self.new_node = self.cfg.GetNodeInfo(node)
3173
      assert self.new_node is not None, "Can't retrieve locked node %s" % node
3174
    else:
3175
      self.new_node = objects.Node(name=node,
3176
                                   primary_ip=primary_ip,
3177
                                   secondary_ip=secondary_ip,
3178
                                   master_candidate=self.master_candidate,
3179
                                   offline=False, drained=False)
3180

    
3181
  def Exec(self, feedback_fn):
3182
    """Adds the new node to the cluster.
3183

3184
    """
3185
    new_node = self.new_node
3186
    node = new_node.name
3187

    
3188
    # for re-adds, reset the offline/drained/master-candidate flags;
3189
    # we need to reset here, otherwise offline would prevent RPC calls
3190
    # later in the procedure; this also means that if the re-add
3191
    # fails, we are left with a non-offlined, broken node
3192
    if self.op.readd:
3193
      new_node.drained = new_node.offline = False # pylint: disable-msg=W0201
3194
      self.LogInfo("Readding a node, the offline/drained flags were reset")
3195
      # if we demote the node, we do cleanup later in the procedure
3196
      new_node.master_candidate = self.master_candidate
3197

    
3198
    # notify the user about any possible mc promotion
3199
    if new_node.master_candidate:
3200
      self.LogInfo("Node will be a master candidate")
3201

    
3202
    # check connectivity
3203
    result = self.rpc.call_version([node])[node]
3204
    result.Raise("Can't get version information from node %s" % node)
3205
    if constants.PROTOCOL_VERSION == result.payload:
3206
      logging.info("Communication to node %s fine, sw version %s match",
3207
                   node, result.payload)
3208
    else:
3209
      raise errors.OpExecError("Version mismatch master version %s,"
3210
                               " node version %s" %
3211
                               (constants.PROTOCOL_VERSION, result.payload))
3212

    
3213
    # setup ssh on node
3214
    if self.cfg.GetClusterInfo().modify_ssh_setup:
3215
      logging.info("Copy ssh key to node %s", node)
3216
      priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
3217
      keyarray = []
3218
      keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
3219
                  constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
3220
                  priv_key, pub_key]
3221

    
3222
      for i in keyfiles:
3223
        keyarray.append(utils.ReadFile(i))
3224

    
3225
      result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
3226
                                      keyarray[2], keyarray[3], keyarray[4],
3227
                                      keyarray[5])
3228
      result.Raise("Cannot transfer ssh keys to the new node")
3229

    
3230
    # Add node to our /etc/hosts, and add key to known_hosts
3231
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3232
      utils.AddHostToEtcHosts(new_node.name)
3233

    
3234
    if new_node.secondary_ip != new_node.primary_ip:
3235
      result = self.rpc.call_node_has_ip_address(new_node.name,
3236
                                                 new_node.secondary_ip)
3237
      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
3238
                   prereq=True, ecode=errors.ECODE_ENVIRON)
3239
      if not result.payload:
3240
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
3241
                                 " you gave (%s). Please fix and re-run this"
3242
                                 " command." % new_node.secondary_ip)
3243

    
3244
    node_verify_list = [self.cfg.GetMasterNode()]
3245
    node_verify_param = {
3246
      constants.NV_NODELIST: [node],
3247
      # TODO: do a node-net-test as well?
3248
    }
3249

    
3250
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
3251
                                       self.cfg.GetClusterName())
3252
    for verifier in node_verify_list:
3253
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
3254
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
3255
      if nl_payload:
3256
        for failed in nl_payload:
3257
          feedback_fn("ssh/hostname verification failed"
3258
                      " (checking from %s): %s" %
3259
                      (verifier, nl_payload[failed]))
3260
        raise errors.OpExecError("ssh/hostname verification failed.")
3261

    
3262
    if self.op.readd:
3263
      _RedistributeAncillaryFiles(self)
3264
      self.context.ReaddNode(new_node)
3265
      # make sure we redistribute the config
3266
      self.cfg.Update(new_node, feedback_fn)
3267
      # and make sure the new node will not have old files around
3268
      if not new_node.master_candidate:
3269
        result = self.rpc.call_node_demote_from_mc(new_node.name)
3270
        msg = result.fail_msg
3271
        if msg:
3272
          self.LogWarning("Node failed to demote itself from master"
3273
                          " candidate status: %s" % msg)
3274
    else:
3275
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
3276
      self.context.AddNode(new_node, self.proc.GetECId())
3277

    
3278

    
3279
class LUSetNodeParams(LogicalUnit):
3280
  """Modifies the parameters of a node.
3281

3282
  """
3283
  HPATH = "node-modify"
3284
  HTYPE = constants.HTYPE_NODE
3285
  _OP_REQP = ["node_name"]
3286
  REQ_BGL = False
3287

    
3288
  def CheckArguments(self):
3289
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3290
    _CheckBooleanOpField(self.op, 'master_candidate')
3291
    _CheckBooleanOpField(self.op, 'offline')
3292
    _CheckBooleanOpField(self.op, 'drained')
3293
    _CheckBooleanOpField(self.op, 'auto_promote')
3294
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
3295
    if all_mods.count(None) == 3:
3296
      raise errors.OpPrereqError("Please pass at least one modification",
3297
                                 errors.ECODE_INVAL)
3298
    if all_mods.count(True) > 1:
3299
      raise errors.OpPrereqError("Can't set the node into more than one"
3300
                                 " state at the same time",
3301
                                 errors.ECODE_INVAL)
3302

    
3303
    # Boolean value that tells us whether we're offlining or draining the node
3304
    self.offline_or_drain = (self.op.offline == True or
3305
                             self.op.drained == True)
3306
    self.deoffline_or_drain = (self.op.offline == False or
3307
                               self.op.drained == False)
3308
    self.might_demote = (self.op.master_candidate == False or
3309
                         self.offline_or_drain)
3310

    
3311
    self.lock_all = self.op.auto_promote and self.might_demote
3312

    
3313

    
3314
  def ExpandNames(self):
3315
    if self.lock_all:
3316
      self.needed_locks = {locking.LEVEL_NODE: locking.ALL_SET}
3317
    else:
3318
      self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
3319

    
3320
  def BuildHooksEnv(self):
3321
    """Build hooks env.
3322

3323
    This runs on the master node.
3324

3325
    """
3326
    env = {
3327
      "OP_TARGET": self.op.node_name,
3328
      "MASTER_CANDIDATE": str(self.op.master_candidate),
3329
      "OFFLINE": str(self.op.offline),
3330
      "DRAINED": str(self.op.drained),
3331
      }
3332
    nl = [self.cfg.GetMasterNode(),
3333
          self.op.node_name]
3334
    return env, nl, nl
3335

    
3336
  def CheckPrereq(self):
3337
    """Check prerequisites.
3338

3339
    This only checks the instance list against the existing names.
3340

3341
    """
3342
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3343

    
3344
    if (self.op.master_candidate is not None or
3345
        self.op.drained is not None or
3346
        self.op.offline is not None):
3347
      # we can't change the master's node flags
3348
      if self.op.node_name == self.cfg.GetMasterNode():
3349
        raise errors.OpPrereqError("The master role can be changed"
3350
                                   " only via masterfailover",
3351
                                   errors.ECODE_INVAL)
3352

    
3353

    
3354
    if node.master_candidate and self.might_demote and not self.lock_all:
3355
      assert not self.op.auto_promote, "auto-promote set but lock_all not"
3356
      # check if after removing the current node, we're missing master
3357
      # candidates
3358
      (mc_remaining, mc_should, _) = \
3359
          self.cfg.GetMasterCandidateStats(exceptions=[node.name])
3360
      if mc_remaining < mc_should:
3361
        raise errors.OpPrereqError("Not enough master candidates, please"
3362
                                   " pass auto_promote to allow promotion",
3363
                                   errors.ECODE_INVAL)
3364

    
3365
    if (self.op.master_candidate == True and
3366
        ((node.offline and not self.op.offline == False) or
3367
         (node.drained and not self.op.drained == False))):
3368
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3369
                                 " to master_candidate" % node.name,
3370
                                 errors.ECODE_INVAL)
3371

    
3372
    # If we're being deofflined/drained, we'll MC ourself if needed
3373
    if (self.deoffline_or_drain and not self.offline_or_drain and not
3374
        self.op.master_candidate == True and not node.master_candidate):
3375
      self.op.master_candidate = _DecideSelfPromotion(self)
3376
      if self.op.master_candidate:
3377
        self.LogInfo("Autopromoting node to master candidate")
3378

    
3379
    return
3380

    
3381
  def Exec(self, feedback_fn):
3382
    """Modifies a node.
3383

3384
    """
3385
    node = self.node
3386

    
3387
    result = []
3388
    changed_mc = False
3389

    
3390
    if self.op.offline is not None:
3391
      node.offline = self.op.offline
3392
      result.append(("offline", str(self.op.offline)))
3393
      if self.op.offline == True:
3394
        if node.master_candidate:
3395
          node.master_candidate = False
3396
          changed_mc = True
3397
          result.append(("master_candidate", "auto-demotion due to offline"))
3398
        if node.drained:
3399
          node.drained = False
3400
          result.append(("drained", "clear drained status due to offline"))
3401

    
3402
    if self.op.master_candidate is not None:
3403
      node.master_candidate = self.op.master_candidate
3404
      changed_mc = True
3405
      result.append(("master_candidate", str(self.op.master_candidate)))
3406
      if self.op.master_candidate == False:
3407
        rrc = self.rpc.call_node_demote_from_mc(node.name)
3408
        msg = rrc.fail_msg
3409
        if msg:
3410
          self.LogWarning("Node failed to demote itself: %s" % msg)
3411

    
3412
    if self.op.drained is not None:
3413
      node.drained = self.op.drained
3414
      result.append(("drained", str(self.op.drained)))
3415
      if self.op.drained == True:
3416
        if node.master_candidate:
3417
          node.master_candidate = False
3418
          changed_mc = True
3419
          result.append(("master_candidate", "auto-demotion due to drain"))
3420
          rrc = self.rpc.call_node_demote_from_mc(node.name)
3421
          msg = rrc.fail_msg
3422
          if msg:
3423
            self.LogWarning("Node failed to demote itself: %s" % msg)
3424
        if node.offline:
3425
          node.offline = False
3426
          result.append(("offline", "clear offline status due to drain"))
3427

    
3428
    # we locked all nodes, we adjust the CP before updating this node
3429
    if self.lock_all:
3430
      _AdjustCandidatePool(self, [node.name])
3431

    
3432
    # this will trigger configuration file update, if needed
3433
    self.cfg.Update(node, feedback_fn)
3434

    
3435
    # this will trigger job queue propagation or cleanup
3436
    if changed_mc:
3437
      self.context.ReaddNode(node)
3438

    
3439
    return result
3440

    
3441

    
3442
class LUPowercycleNode(NoHooksLU):
3443
  """Powercycles a node.
3444

3445
  """
3446
  _OP_REQP = ["node_name", "force"]
3447
  REQ_BGL = False
3448

    
3449
  def CheckArguments(self):
3450
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3451
    if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
3452
      raise errors.OpPrereqError("The node is the master and the force"
3453
                                 " parameter was not set",
3454
                                 errors.ECODE_INVAL)
3455

    
3456
  def ExpandNames(self):
3457
    """Locking for PowercycleNode.
3458

3459
    This is a last-resort option and shouldn't block on other
3460
    jobs. Therefore, we grab no locks.
3461

3462
    """
3463
    self.needed_locks = {}
3464

    
3465
  def CheckPrereq(self):
3466
    """Check prerequisites.
3467

3468
    This LU has no prereqs.
3469

3470
    """
3471
    pass
3472

    
3473
  def Exec(self, feedback_fn):
3474
    """Reboots a node.
3475

3476
    """
3477
    result = self.rpc.call_node_powercycle(self.op.node_name,
3478
                                           self.cfg.GetHypervisorType())
3479
    result.Raise("Failed to schedule the reboot")
3480
    return result.payload
3481

    
3482

    
3483
class LUQueryClusterInfo(NoHooksLU):
3484
  """Query cluster configuration.
3485

3486
  """
3487
  _OP_REQP = []
3488
  REQ_BGL = False
3489

    
3490
  def ExpandNames(self):
3491
    self.needed_locks = {}
3492

    
3493
  def CheckPrereq(self):
3494
    """No prerequsites needed for this LU.
3495

3496
    """
3497
    pass
3498

    
3499
  def Exec(self, feedback_fn):
3500
    """Return cluster config.
3501

3502
    """
3503
    cluster = self.cfg.GetClusterInfo()
3504
    os_hvp = {}
3505

    
3506
    # Filter just for enabled hypervisors
3507
    for os_name, hv_dict in cluster.os_hvp.items():
3508
      os_hvp[os_name] = {}
3509
      for hv_name, hv_params in hv_dict.items():
3510
        if hv_name in cluster.enabled_hypervisors:
3511
          os_hvp[os_name][hv_name] = hv_params
3512

    
3513
    result = {
3514
      "software_version": constants.RELEASE_VERSION,
3515
      "protocol_version": constants.PROTOCOL_VERSION,
3516
      "config_version": constants.CONFIG_VERSION,
3517
      "os_api_version": max(constants.OS_API_VERSIONS),
3518
      "export_version": constants.EXPORT_VERSION,
3519
      "architecture": (platform.architecture()[0], platform.machine()),
3520
      "name": cluster.cluster_name,
3521
      "master": cluster.master_node,
3522
      "default_hypervisor": cluster.enabled_hypervisors[0],
3523
      "enabled_hypervisors": cluster.enabled_hypervisors,
3524
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3525
                        for hypervisor_name in cluster.enabled_hypervisors]),
3526
      "os_hvp": os_hvp,
3527
      "beparams": cluster.beparams,
3528
      "nicparams": cluster.nicparams,
3529
      "candidate_pool_size": cluster.candidate_pool_size,
3530
      "master_netdev": cluster.master_netdev,
3531
      "volume_group_name": cluster.volume_group_name,
3532
      "file_storage_dir": cluster.file_storage_dir,
3533
      "ctime": cluster.ctime,
3534
      "mtime": cluster.mtime,
3535
      "uuid": cluster.uuid,
3536
      "tags": list(cluster.GetTags()),
3537
      }
3538

    
3539
    return result
3540

    
3541

    
3542
class LUQueryConfigValues(NoHooksLU):
3543
  """Return configuration values.
3544

3545
  """
3546
  _OP_REQP = []
3547
  REQ_BGL = False
3548
  _FIELDS_DYNAMIC = utils.FieldSet()
3549
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3550
                                  "watcher_pause")
3551

    
3552
  def ExpandNames(self):
3553
    self.needed_locks = {}
3554

    
3555
    _CheckOutputFields(static=self._FIELDS_STATIC,
3556
                       dynamic=self._FIELDS_DYNAMIC,
3557
                       selected=self.op.output_fields)
3558

    
3559
  def CheckPrereq(self):
3560
    """No prerequisites.
3561

3562
    """
3563
    pass
3564

    
3565
  def Exec(self, feedback_fn):
3566
    """Dump a representation of the cluster config to the standard output.
3567

3568
    """
3569
    values = []
3570
    for field in self.op.output_fields:
3571
      if field == "cluster_name":
3572
        entry = self.cfg.GetClusterName()
3573
      elif field == "master_node":
3574
        entry = self.cfg.GetMasterNode()
3575
      elif field == "drain_flag":
3576
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3577
      elif field == "watcher_pause":
3578
        entry = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3579
      else:
3580
        raise errors.ParameterError(field)
3581
      values.append(entry)
3582
    return values
3583

    
3584

    
3585
class LUActivateInstanceDisks(NoHooksLU):
3586
  """Bring up an instance's disks.
3587

3588
  """
3589
  _OP_REQP = ["instance_name"]
3590
  REQ_BGL = False
3591

    
3592
  def ExpandNames(self):
3593
    self._ExpandAndLockInstance()
3594
    self.needed_locks[locking.LEVEL_NODE] = []
3595
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3596

    
3597
  def DeclareLocks(self, level):
3598
    if level == locking.LEVEL_NODE:
3599
      self._LockInstancesNodes()
3600

    
3601
  def CheckPrereq(self):
3602
    """Check prerequisites.
3603

3604
    This checks that the instance is in the cluster.
3605

3606
    """
3607
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3608
    assert self.instance is not None, \
3609
      "Cannot retrieve locked instance %s" % self.op.instance_name
3610
    _CheckNodeOnline(self, self.instance.primary_node)
3611
    if not hasattr(self.op, "ignore_size"):
3612
      self.op.ignore_size = False
3613

    
3614
  def Exec(self, feedback_fn):
3615
    """Activate the disks.
3616

3617
    """
3618
    disks_ok, disks_info = \
3619
              _AssembleInstanceDisks(self, self.instance,
3620
                                     ignore_size=self.op.ignore_size)
3621
    if not disks_ok:
3622
      raise errors.OpExecError("Cannot activate block devices")
3623

    
3624
    return disks_info
3625

    
3626

    
3627
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3628
                           ignore_size=False):
3629
  """Prepare the block devices for an instance.
3630

3631
  This sets up the block devices on all nodes.
3632

3633
  @type lu: L{LogicalUnit}
3634
  @param lu: the logical unit on whose behalf we execute
3635
  @type instance: L{objects.Instance}
3636
  @param instance: the instance for whose disks we assemble
3637
  @type ignore_secondaries: boolean
3638
  @param ignore_secondaries: if true, errors on secondary nodes
3639
      won't result in an error return from the function
3640
  @type ignore_size: boolean
3641
  @param ignore_size: if true, the current known size of the disk
3642
      will not be used during the disk activation, useful for cases
3643
      when the size is wrong
3644
  @return: False if the operation failed, otherwise a list of
3645
      (host, instance_visible_name, node_visible_name)
3646
      with the mapping from node devices to instance devices
3647

3648
  """
3649
  device_info = []
3650
  disks_ok = True
3651
  iname = instance.name
3652
  # With the two passes mechanism we try to reduce the window of
3653
  # opportunity for the race condition of switching DRBD to primary
3654
  # before handshaking occured, but we do not eliminate it
3655

    
3656
  # The proper fix would be to wait (with some limits) until the
3657
  # connection has been made and drbd transitions from WFConnection
3658
  # into any other network-connected state (Connected, SyncTarget,
3659
  # SyncSource, etc.)
3660

    
3661
  # 1st pass, assemble on all nodes in secondary mode
3662
  for inst_disk in instance.disks:
3663
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3664
      if ignore_size:
3665
        node_disk = node_disk.Copy()
3666
        node_disk.UnsetSize()
3667
      lu.cfg.SetDiskID(node_disk, node)
3668
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3669
      msg = result.fail_msg
3670
      if msg:
3671
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
3672
                           " (is_primary=False, pass=1): %s",
3673
                           inst_disk.iv_name, node, msg)
3674
        if not ignore_secondaries:
3675
          disks_ok = False
3676

    
3677
  # FIXME: race condition on drbd migration to primary
3678

    
3679
  # 2nd pass, do only the primary node
3680
  for inst_disk in instance.disks:
3681
    dev_path = None
3682

    
3683
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3684
      if node != instance.primary_node:
3685
        continue
3686
      if ignore_size:
3687
        node_disk = node_disk.Copy()
3688
        node_disk.UnsetSize()
3689
      lu.cfg.SetDiskID(node_disk, node)
3690
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3691
      msg = result.fail_msg
3692
      if msg:
3693
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
3694
                           " (is_primary=True, pass=2): %s",
3695
                           inst_disk.iv_name, node, msg)
3696
        disks_ok = False
3697
      else:
3698
        dev_path = result.payload
3699

    
3700
    device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
3701

    
3702
  # leave the disks configured for the primary node
3703
  # this is a workaround that would be fixed better by
3704
  # improving the logical/physical id handling
3705
  for disk in instance.disks:
3706
    lu.cfg.SetDiskID(disk, instance.primary_node)
3707

    
3708
  return disks_ok, device_info
3709

    
3710

    
3711
def _StartInstanceDisks(lu, instance, force):
3712
  """Start the disks of an instance.
3713

3714
  """
3715
  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3716
                                           ignore_secondaries=force)
3717
  if not disks_ok:
3718
    _ShutdownInstanceDisks(lu, instance)
3719
    if force is not None and not force:
3720
      lu.proc.LogWarning("", hint="If the message above refers to a"
3721
                         " secondary node,"
3722
                         " you can retry the operation using '--force'.")
3723
    raise errors.OpExecError("Disk consistency error")
3724

    
3725

    
3726
class LUDeactivateInstanceDisks(NoHooksLU):
3727
  """Shutdown an instance's disks.
3728

3729
  """
3730
  _OP_REQP = ["instance_name"]
3731
  REQ_BGL = False
3732

    
3733
  def ExpandNames(self):
3734
    self._ExpandAndLockInstance()
3735
    self.needed_locks[locking.LEVEL_NODE] = []
3736
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3737

    
3738
  def DeclareLocks(self, level):
3739
    if level == locking.LEVEL_NODE:
3740
      self._LockInstancesNodes()
3741

    
3742
  def CheckPrereq(self):
3743
    """Check prerequisites.
3744

3745
    This checks that the instance is in the cluster.
3746

3747
    """
3748
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3749
    assert self.instance is not None, \
3750
      "Cannot retrieve locked instance %s" % self.op.instance_name
3751

    
3752
  def Exec(self, feedback_fn):
3753
    """Deactivate the disks
3754

3755
    """
3756
    instance = self.instance
3757
    _SafeShutdownInstanceDisks(self, instance)
3758

    
3759

    
3760
def _SafeShutdownInstanceDisks(lu, instance):
3761
  """Shutdown block devices of an instance.
3762

3763
  This function checks if an instance is running, before calling
3764
  _ShutdownInstanceDisks.
3765

3766
  """
3767
  _CheckInstanceDown(lu, instance, "cannot shutdown disks")
3768
  _ShutdownInstanceDisks(lu, instance)
3769

    
3770

    
3771
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3772
  """Shutdown block devices of an instance.
3773

3774
  This does the shutdown on all nodes of the instance.
3775

3776
  If the ignore_primary is false, errors on the primary node are
3777
  ignored.
3778

3779
  """
3780
  all_result = True
3781
  for disk in instance.disks:
3782
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3783
      lu.cfg.SetDiskID(top_disk, node)
3784
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3785
      msg = result.fail_msg
3786
      if msg:
3787
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3788
                      disk.iv_name, node, msg)
3789
        if not ignore_primary or node != instance.primary_node:
3790
          all_result = False
3791
  return all_result
3792

    
3793

    
3794
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3795
  """Checks if a node has enough free memory.
3796

3797
  This function check if a given node has the needed amount of free
3798
  memory. In case the node has less memory or we cannot get the
3799
  information from the node, this function raise an OpPrereqError
3800
  exception.
3801

3802
  @type lu: C{LogicalUnit}
3803
  @param lu: a logical unit from which we get configuration data
3804
  @type node: C{str}
3805
  @param node: the node to check
3806
  @type reason: C{str}
3807
  @param reason: string to use in the error message
3808
  @type requested: C{int}
3809
  @param requested: the amount of memory in MiB to check for
3810
  @type hypervisor_name: C{str}
3811
  @param hypervisor_name: the hypervisor to ask for memory stats
3812
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3813
      we cannot check the node
3814

3815
  """
3816
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3817
  nodeinfo[node].Raise("Can't get data from node %s" % node,
3818
                       prereq=True, ecode=errors.ECODE_ENVIRON)
3819
  free_mem = nodeinfo[node].payload.get('memory_free', None)
3820
  if not isinstance(free_mem, int):
3821
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3822
                               " was '%s'" % (node, free_mem),
3823
                               errors.ECODE_ENVIRON)
3824
  if requested > free_mem:
3825
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3826
                               " needed %s MiB, available %s MiB" %
3827
                               (node, reason, requested, free_mem),
3828
                               errors.ECODE_NORES)
3829

    
3830

    
3831
def _CheckNodesFreeDisk(lu, nodenames, requested):
3832
  """Checks if nodes have enough free disk space in the default VG.
3833

3834
  This function check if all given nodes have the needed amount of
3835
  free disk. In case any node has less disk or we cannot get the
3836
  information from the node, this function raise an OpPrereqError
3837
  exception.
3838

3839
  @type lu: C{LogicalUnit}
3840
  @param lu: a logical unit from which we get configuration data
3841
  @type nodenames: C{list}
3842
  @param node: the list of node names to check
3843
  @type requested: C{int}
3844
  @param requested: the amount of disk in MiB to check for
3845
  @raise errors.OpPrereqError: if the node doesn't have enough disk, or
3846
      we cannot check the node
3847

3848
  """
3849
  nodeinfo = lu.rpc.call_node_info(nodenames, lu.cfg.GetVGName(),
3850
                                   lu.cfg.GetHypervisorType())
3851
  for node in nodenames:
3852
    info = nodeinfo[node]
3853
    info.Raise("Cannot get current information from node %s" % node,
3854
               prereq=True, ecode=errors.ECODE_ENVIRON)
3855
    vg_free = info.payload.get("vg_free", None)
3856
    if not isinstance(vg_free, int):
3857
      raise errors.OpPrereqError("Can't compute free disk space on node %s,"
3858
                                 " result was '%s'" % (node, vg_free),
3859
                                 errors.ECODE_ENVIRON)
3860
    if requested > vg_free:
3861
      raise errors.OpPrereqError("Not enough disk space on target node %s:"
3862
                                 " required %d MiB, available %d MiB" %
3863
                                 (node, requested, vg_free),
3864
                                 errors.ECODE_NORES)
3865

    
3866

    
3867
class LUStartupInstance(LogicalUnit):
3868
  """Starts an instance.
3869

3870
  """
3871
  HPATH = "instance-start"
3872
  HTYPE = constants.HTYPE_INSTANCE
3873
  _OP_REQP = ["instance_name", "force"]
3874
  REQ_BGL = False
3875

    
3876
  def ExpandNames(self):
3877
    self._ExpandAndLockInstance()
3878

    
3879
  def BuildHooksEnv(self):
3880
    """Build hooks env.
3881

3882
    This runs on master, primary and secondary nodes of the instance.
3883

3884
    """
3885
    env = {
3886
      "FORCE": self.op.force,
3887
      }
3888
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3889
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3890
    return env, nl, nl
3891

    
3892
  def CheckPrereq(self):
3893
    """Check prerequisites.
3894

3895
    This checks that the instance is in the cluster.
3896

3897
    """
3898
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3899
    assert self.instance is not None, \
3900
      "Cannot retrieve locked instance %s" % self.op.instance_name
3901

    
3902
    # extra beparams
3903
    self.beparams = getattr(self.op, "beparams", {})
3904
    if self.beparams:
3905
      if not isinstance(self.beparams, dict):
3906
        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3907
                                   " dict" % (type(self.beparams), ),
3908
                                   errors.ECODE_INVAL)
3909
      # fill the beparams dict
3910
      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3911
      self.op.beparams = self.beparams
3912

    
3913
    # extra hvparams
3914
    self.hvparams = getattr(self.op, "hvparams", {})
3915
    if self.hvparams:
3916
      if not isinstance(self.hvparams, dict):
3917
        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3918
                                   " dict" % (type(self.hvparams), ),
3919
                                   errors.ECODE_INVAL)
3920

    
3921
      # check hypervisor parameter syntax (locally)
3922
      cluster = self.cfg.GetClusterInfo()
3923
      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3924
      filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3925
                                    instance.hvparams)
3926
      filled_hvp.update(self.hvparams)
3927
      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3928
      hv_type.CheckParameterSyntax(filled_hvp)
3929
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3930
      self.op.hvparams = self.hvparams
3931

    
3932
    _CheckNodeOnline(self, instance.primary_node)
3933

    
3934
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3935
    # check bridges existence
3936
    _CheckInstanceBridgesExist(self, instance)
3937

    
3938
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3939
                                              instance.name,
3940
                                              instance.hypervisor)
3941
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3942
                      prereq=True, ecode=errors.ECODE_ENVIRON)
3943
    if not remote_info.payload: # not running already
3944
      _CheckNodeFreeMemory(self, instance.primary_node,
3945
                           "starting instance %s" % instance.name,
3946
                           bep[constants.BE_MEMORY], instance.hypervisor)
3947

    
3948
  def Exec(self, feedback_fn):
3949
    """Start the instance.
3950

3951
    """
3952
    instance = self.instance
3953
    force = self.op.force
3954

    
3955
    self.cfg.MarkInstanceUp(instance.name)
3956

    
3957
    node_current = instance.primary_node
3958

    
3959
    _StartInstanceDisks(self, instance, force)
3960

    
3961
    result = self.rpc.call_instance_start(node_current, instance,
3962
                                          self.hvparams, self.beparams)
3963
    msg = result.fail_msg
3964
    if msg:
3965
      _ShutdownInstanceDisks(self, instance)
3966
      raise errors.OpExecError("Could not start instance: %s" % msg)
3967

    
3968

    
3969
class LURebootInstance(LogicalUnit):
3970
  """Reboot an instance.
3971

3972
  """
3973
  HPATH = "instance-reboot"
3974
  HTYPE = constants.HTYPE_INSTANCE
3975
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3976
  REQ_BGL = False
3977

    
3978
  def CheckArguments(self):
3979
    """Check the arguments.
3980

3981
    """
3982
    self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
3983
                                    constants.DEFAULT_SHUTDOWN_TIMEOUT)
3984

    
3985
  def ExpandNames(self):
3986
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3987
                                   constants.INSTANCE_REBOOT_HARD,
3988
                                   constants.INSTANCE_REBOOT_FULL]:
3989
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3990
                                  (constants.INSTANCE_REBOOT_SOFT,
3991
                                   constants.INSTANCE_REBOOT_HARD,
3992
                                   constants.INSTANCE_REBOOT_FULL))
3993
    self._ExpandAndLockInstance()
3994

    
3995
  def BuildHooksEnv(self):
3996
    """Build hooks env.
3997

3998
    This runs on master, primary and secondary nodes of the instance.
3999

4000
    """
4001
    env = {
4002
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
4003
      "REBOOT_TYPE": self.op.reboot_type,
4004
      "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4005
      }
4006
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4007
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4008
    return env, nl, nl
4009

    
4010
  def CheckPrereq(self):