Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 3df6e710

History | View | Annotate | Download (247.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import time
29
import re
30
import platform
31
import logging
32
import copy
33

    
34
from ganeti import ssh
35
from ganeti import utils
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import serializer
42
from ganeti import ssconf
43

    
44

    
45
class LogicalUnit(object):
46
  """Logical Unit base class.
47

48
  Subclasses must follow these rules:
49
    - implement ExpandNames
50
    - implement CheckPrereq
51
    - implement Exec
52
    - implement BuildHooksEnv
53
    - redefine HPATH and HTYPE
54
    - optionally redefine their run requirements:
55
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56

57
  Note that all commands require root permissions.
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_BGL = True
64

    
65
  def __init__(self, processor, op, context, rpc):
66
    """Constructor for LogicalUnit.
67

68
    This needs to be overridden in derived classes in order to check op
69
    validity.
70

71
    """
72
    self.proc = processor
73
    self.op = op
74
    self.cfg = context.cfg
75
    self.context = context
76
    self.rpc = rpc
77
    # Dicts used to declare locking needs to mcpu
78
    self.needed_locks = None
79
    self.acquired_locks = {}
80
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
81
    self.add_locks = {}
82
    self.remove_locks = {}
83
    # Used to force good behavior when calling helper functions
84
    self.recalculate_locks = {}
85
    self.__ssh = None
86
    # logging
87
    self.LogWarning = processor.LogWarning
88
    self.LogInfo = processor.LogInfo
89

    
90
    for attr_name in self._OP_REQP:
91
      attr_val = getattr(op, attr_name, None)
92
      if attr_val is None:
93
        raise errors.OpPrereqError("Required parameter '%s' missing" %
94
                                   attr_name)
95
    self.CheckArguments()
96

    
97
  def __GetSSH(self):
98
    """Returns the SshRunner object
99

100
    """
101
    if not self.__ssh:
102
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
103
    return self.__ssh
104

    
105
  ssh = property(fget=__GetSSH)
106

    
107
  def CheckArguments(self):
108
    """Check syntactic validity for the opcode arguments.
109

110
    This method is for doing a simple syntactic check and ensure
111
    validity of opcode parameters, without any cluster-related
112
    checks. While the same can be accomplished in ExpandNames and/or
113
    CheckPrereq, doing these separate is better because:
114

115
      - ExpandNames is left as as purely a lock-related function
116
      - CheckPrereq is run after we have acquired locks (and possible
117
        waited for them)
118

119
    The function is allowed to change the self.op attribute so that
120
    later methods can no longer worry about missing parameters.
121

122
    """
123
    pass
124

    
125
  def ExpandNames(self):
126
    """Expand names for this LU.
127

128
    This method is called before starting to execute the opcode, and it should
129
    update all the parameters of the opcode to their canonical form (e.g. a
130
    short node name must be fully expanded after this method has successfully
131
    completed). This way locking, hooks, logging, ecc. can work correctly.
132

133
    LUs which implement this method must also populate the self.needed_locks
134
    member, as a dict with lock levels as keys, and a list of needed lock names
135
    as values. Rules:
136

137
      - use an empty dict if you don't need any lock
138
      - if you don't need any lock at a particular level omit that level
139
      - don't put anything for the BGL level
140
      - if you want all locks at a level use locking.ALL_SET as a value
141

142
    If you need to share locks (rather than acquire them exclusively) at one
143
    level you can modify self.share_locks, setting a true value (usually 1) for
144
    that level. By default locks are not shared.
145

146
    Examples::
147

148
      # Acquire all nodes and one instance
149
      self.needed_locks = {
150
        locking.LEVEL_NODE: locking.ALL_SET,
151
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
152
      }
153
      # Acquire just two nodes
154
      self.needed_locks = {
155
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
156
      }
157
      # Acquire no locks
158
      self.needed_locks = {} # No, you can't leave it to the default value None
159

160
    """
161
    # The implementation of this method is mandatory only if the new LU is
162
    # concurrent, so that old LUs don't need to be changed all at the same
163
    # time.
164
    if self.REQ_BGL:
165
      self.needed_locks = {} # Exclusive LUs don't need locks.
166
    else:
167
      raise NotImplementedError
168

    
169
  def DeclareLocks(self, level):
170
    """Declare LU locking needs for a level
171

172
    While most LUs can just declare their locking needs at ExpandNames time,
173
    sometimes there's the need to calculate some locks after having acquired
174
    the ones before. This function is called just before acquiring locks at a
175
    particular level, but after acquiring the ones at lower levels, and permits
176
    such calculations. It can be used to modify self.needed_locks, and by
177
    default it does nothing.
178

179
    This function is only called if you have something already set in
180
    self.needed_locks for the level.
181

182
    @param level: Locking level which is going to be locked
183
    @type level: member of ganeti.locking.LEVELS
184

185
    """
186

    
187
  def CheckPrereq(self):
188
    """Check prerequisites for this LU.
189

190
    This method should check that the prerequisites for the execution
191
    of this LU are fulfilled. It can do internode communication, but
192
    it should be idempotent - no cluster or system changes are
193
    allowed.
194

195
    The method should raise errors.OpPrereqError in case something is
196
    not fulfilled. Its return value is ignored.
197

198
    This method should also update all the parameters of the opcode to
199
    their canonical form if it hasn't been done by ExpandNames before.
200

201
    """
202
    raise NotImplementedError
203

    
204
  def Exec(self, feedback_fn):
205
    """Execute the LU.
206

207
    This method should implement the actual work. It should raise
208
    errors.OpExecError for failures that are somewhat dealt with in
209
    code, or expected.
210

211
    """
212
    raise NotImplementedError
213

    
214
  def BuildHooksEnv(self):
215
    """Build hooks environment for this LU.
216

217
    This method should return a three-node tuple consisting of: a dict
218
    containing the environment that will be used for running the
219
    specific hook for this LU, a list of node names on which the hook
220
    should run before the execution, and a list of node names on which
221
    the hook should run after the execution.
222

223
    The keys of the dict must not have 'GANETI_' prefixed as this will
224
    be handled in the hooks runner. Also note additional keys will be
225
    added by the hooks runner. If the LU doesn't define any
226
    environment, an empty dict (and not None) should be returned.
227

228
    No nodes should be returned as an empty list (and not None).
229

230
    Note that if the HPATH for a LU class is None, this function will
231
    not be called.
232

233
    """
234
    raise NotImplementedError
235

    
236
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
237
    """Notify the LU about the results of its hooks.
238

239
    This method is called every time a hooks phase is executed, and notifies
240
    the Logical Unit about the hooks' result. The LU can then use it to alter
241
    its result based on the hooks.  By default the method does nothing and the
242
    previous result is passed back unchanged but any LU can define it if it
243
    wants to use the local cluster hook-scripts somehow.
244

245
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
246
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
247
    @param hook_results: the results of the multi-node hooks rpc call
248
    @param feedback_fn: function used send feedback back to the caller
249
    @param lu_result: the previous Exec result this LU had, or None
250
        in the PRE phase
251
    @return: the new Exec result, based on the previous result
252
        and hook results
253

254
    """
255
    return lu_result
256

    
257
  def _ExpandAndLockInstance(self):
258
    """Helper function to expand and lock an instance.
259

260
    Many LUs that work on an instance take its name in self.op.instance_name
261
    and need to expand it and then declare the expanded name for locking. This
262
    function does it, and then updates self.op.instance_name to the expanded
263
    name. It also initializes needed_locks as a dict, if this hasn't been done
264
    before.
265

266
    """
267
    if self.needed_locks is None:
268
      self.needed_locks = {}
269
    else:
270
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
271
        "_ExpandAndLockInstance called with instance-level locks set"
272
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
273
    if expanded_name is None:
274
      raise errors.OpPrereqError("Instance '%s' not known" %
275
                                  self.op.instance_name)
276
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
277
    self.op.instance_name = expanded_name
278

    
279
  def _LockInstancesNodes(self, primary_only=False):
280
    """Helper function to declare instances' nodes for locking.
281

282
    This function should be called after locking one or more instances to lock
283
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
284
    with all primary or secondary nodes for instances already locked and
285
    present in self.needed_locks[locking.LEVEL_INSTANCE].
286

287
    It should be called from DeclareLocks, and for safety only works if
288
    self.recalculate_locks[locking.LEVEL_NODE] is set.
289

290
    In the future it may grow parameters to just lock some instance's nodes, or
291
    to just lock primaries or secondary nodes, if needed.
292

293
    If should be called in DeclareLocks in a way similar to::
294

295
      if level == locking.LEVEL_NODE:
296
        self._LockInstancesNodes()
297

298
    @type primary_only: boolean
299
    @param primary_only: only lock primary nodes of locked instances
300

301
    """
302
    assert locking.LEVEL_NODE in self.recalculate_locks, \
303
      "_LockInstancesNodes helper function called with no nodes to recalculate"
304

    
305
    # TODO: check if we're really been called with the instance locks held
306

    
307
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
308
    # future we might want to have different behaviors depending on the value
309
    # of self.recalculate_locks[locking.LEVEL_NODE]
310
    wanted_nodes = []
311
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
312
      instance = self.context.cfg.GetInstanceInfo(instance_name)
313
      wanted_nodes.append(instance.primary_node)
314
      if not primary_only:
315
        wanted_nodes.extend(instance.secondary_nodes)
316

    
317
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
318
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
319
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
320
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
321

    
322
    del self.recalculate_locks[locking.LEVEL_NODE]
323

    
324

    
325
class NoHooksLU(LogicalUnit):
326
  """Simple LU which runs no hooks.
327

328
  This LU is intended as a parent for other LogicalUnits which will
329
  run no hooks, in order to reduce duplicate code.
330

331
  """
332
  HPATH = None
333
  HTYPE = None
334

    
335

    
336
def _GetWantedNodes(lu, nodes):
337
  """Returns list of checked and expanded node names.
338

339
  @type lu: L{LogicalUnit}
340
  @param lu: the logical unit on whose behalf we execute
341
  @type nodes: list
342
  @param nodes: list of node names or None for all nodes
343
  @rtype: list
344
  @return: the list of nodes, sorted
345
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
346

347
  """
348
  if not isinstance(nodes, list):
349
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
350

    
351
  if not nodes:
352
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
353
      " non-empty list of nodes whose name is to be expanded.")
354

    
355
  wanted = []
356
  for name in nodes:
357
    node = lu.cfg.ExpandNodeName(name)
358
    if node is None:
359
      raise errors.OpPrereqError("No such node name '%s'" % name)
360
    wanted.append(node)
361

    
362
  return utils.NiceSort(wanted)
363

    
364

    
365
def _GetWantedInstances(lu, instances):
366
  """Returns list of checked and expanded instance names.
367

368
  @type lu: L{LogicalUnit}
369
  @param lu: the logical unit on whose behalf we execute
370
  @type instances: list
371
  @param instances: list of instance names or None for all instances
372
  @rtype: list
373
  @return: the list of instances, sorted
374
  @raise errors.OpPrereqError: if the instances parameter is wrong type
375
  @raise errors.OpPrereqError: if any of the passed instances is not found
376

377
  """
378
  if not isinstance(instances, list):
379
    raise errors.OpPrereqError("Invalid argument type 'instances'")
380

    
381
  if instances:
382
    wanted = []
383

    
384
    for name in instances:
385
      instance = lu.cfg.ExpandInstanceName(name)
386
      if instance is None:
387
        raise errors.OpPrereqError("No such instance name '%s'" % name)
388
      wanted.append(instance)
389

    
390
  else:
391
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
392
  return wanted
393

    
394

    
395
def _CheckOutputFields(static, dynamic, selected):
396
  """Checks whether all selected fields are valid.
397

398
  @type static: L{utils.FieldSet}
399
  @param static: static fields set
400
  @type dynamic: L{utils.FieldSet}
401
  @param dynamic: dynamic fields set
402

403
  """
404
  f = utils.FieldSet()
405
  f.Extend(static)
406
  f.Extend(dynamic)
407

    
408
  delta = f.NonMatching(selected)
409
  if delta:
410
    raise errors.OpPrereqError("Unknown output fields selected: %s"
411
                               % ",".join(delta))
412

    
413

    
414
def _CheckBooleanOpField(op, name):
415
  """Validates boolean opcode parameters.
416

417
  This will ensure that an opcode parameter is either a boolean value,
418
  or None (but that it always exists).
419

420
  """
421
  val = getattr(op, name, None)
422
  if not (val is None or isinstance(val, bool)):
423
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
424
                               (name, str(val)))
425
  setattr(op, name, val)
426

    
427

    
428
def _CheckNodeOnline(lu, node):
429
  """Ensure that a given node is online.
430

431
  @param lu: the LU on behalf of which we make the check
432
  @param node: the node to check
433
  @raise errors.OpPrereqError: if the node is offline
434

435
  """
436
  if lu.cfg.GetNodeInfo(node).offline:
437
    raise errors.OpPrereqError("Can't use offline node %s" % node)
438

    
439

    
440
def _CheckNodeNotDrained(lu, node):
441
  """Ensure that a given node is not drained.
442

443
  @param lu: the LU on behalf of which we make the check
444
  @param node: the node to check
445
  @raise errors.OpPrereqError: if the node is drained
446

447
  """
448
  if lu.cfg.GetNodeInfo(node).drained:
449
    raise errors.OpPrereqError("Can't use drained node %s" % node)
450

    
451

    
452
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
453
                          memory, vcpus, nics, disk_template, disks,
454
                          bep, hvp, hypervisor_name):
455
  """Builds instance related env variables for hooks
456

457
  This builds the hook environment from individual variables.
458

459
  @type name: string
460
  @param name: the name of the instance
461
  @type primary_node: string
462
  @param primary_node: the name of the instance's primary node
463
  @type secondary_nodes: list
464
  @param secondary_nodes: list of secondary nodes as strings
465
  @type os_type: string
466
  @param os_type: the name of the instance's OS
467
  @type status: boolean
468
  @param status: the should_run status of the instance
469
  @type memory: string
470
  @param memory: the memory size of the instance
471
  @type vcpus: string
472
  @param vcpus: the count of VCPUs the instance has
473
  @type nics: list
474
  @param nics: list of tuples (ip, bridge, mac) representing
475
      the NICs the instance  has
476
  @type disk_template: string
477
  @param disk_template: the disk template of the instance
478
  @type disks: list
479
  @param disks: the list of (size, mode) pairs
480
  @type bep: dict
481
  @param bep: the backend parameters for the instance
482
  @type hvp: dict
483
  @param hvp: the hypervisor parameters for the instance
484
  @type hypervisor_name: string
485
  @param hypervisor_name: the hypervisor for the instance
486
  @rtype: dict
487
  @return: the hook environment for this instance
488

489
  """
490
  if status:
491
    str_status = "up"
492
  else:
493
    str_status = "down"
494
  env = {
495
    "OP_TARGET": name,
496
    "INSTANCE_NAME": name,
497
    "INSTANCE_PRIMARY": primary_node,
498
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
499
    "INSTANCE_OS_TYPE": os_type,
500
    "INSTANCE_STATUS": str_status,
501
    "INSTANCE_MEMORY": memory,
502
    "INSTANCE_VCPUS": vcpus,
503
    "INSTANCE_DISK_TEMPLATE": disk_template,
504
    "INSTANCE_HYPERVISOR": hypervisor_name,
505
  }
506

    
507
  if nics:
508
    nic_count = len(nics)
509
    for idx, (ip, bridge, mac) in enumerate(nics):
510
      if ip is None:
511
        ip = ""
512
      env["INSTANCE_NIC%d_IP" % idx] = ip
513
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
514
      env["INSTANCE_NIC%d_MAC" % idx] = mac
515
  else:
516
    nic_count = 0
517

    
518
  env["INSTANCE_NIC_COUNT"] = nic_count
519

    
520
  if disks:
521
    disk_count = len(disks)
522
    for idx, (size, mode) in enumerate(disks):
523
      env["INSTANCE_DISK%d_SIZE" % idx] = size
524
      env["INSTANCE_DISK%d_MODE" % idx] = mode
525
  else:
526
    disk_count = 0
527

    
528
  env["INSTANCE_DISK_COUNT"] = disk_count
529

    
530
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
531
    for key, value in source.items():
532
      env["INSTANCE_%s_%s" % (kind, key)] = value
533

    
534
  return env
535

    
536

    
537
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
538
  """Builds instance related env variables for hooks from an object.
539

540
  @type lu: L{LogicalUnit}
541
  @param lu: the logical unit on whose behalf we execute
542
  @type instance: L{objects.Instance}
543
  @param instance: the instance for which we should build the
544
      environment
545
  @type override: dict
546
  @param override: dictionary with key/values that will override
547
      our values
548
  @rtype: dict
549
  @return: the hook environment dictionary
550

551
  """
552
  cluster = lu.cfg.GetClusterInfo()
553
  bep = cluster.FillBE(instance)
554
  hvp = cluster.FillHV(instance)
555
  args = {
556
    'name': instance.name,
557
    'primary_node': instance.primary_node,
558
    'secondary_nodes': instance.secondary_nodes,
559
    'os_type': instance.os,
560
    'status': instance.admin_up,
561
    'memory': bep[constants.BE_MEMORY],
562
    'vcpus': bep[constants.BE_VCPUS],
563
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
564
    'disk_template': instance.disk_template,
565
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
566
    'bep': bep,
567
    'hvp': hvp,
568
    'hypervisor': instance.hypervisor,
569
  }
570
  if override:
571
    args.update(override)
572
  return _BuildInstanceHookEnv(**args)
573

    
574

    
575
def _AdjustCandidatePool(lu):
576
  """Adjust the candidate pool after node operations.
577

578
  """
579
  mod_list = lu.cfg.MaintainCandidatePool()
580
  if mod_list:
581
    lu.LogInfo("Promoted nodes to master candidate role: %s",
582
               ", ".join(node.name for node in mod_list))
583
    for name in mod_list:
584
      lu.context.ReaddNode(name)
585
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
586
  if mc_now > mc_max:
587
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
588
               (mc_now, mc_max))
589

    
590

    
591
def _CheckInstanceBridgesExist(lu, instance):
592
  """Check that the bridges needed by an instance exist.
593

594
  """
595
  # check bridges existence
596
  brlist = [nic.bridge for nic in instance.nics]
597
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
598
  result.Raise()
599
  if not result.data:
600
    raise errors.OpPrereqError("One or more target bridges %s does not"
601
                               " exist on destination node '%s'" %
602
                               (brlist, instance.primary_node))
603

    
604

    
605
class LUDestroyCluster(NoHooksLU):
606
  """Logical unit for destroying the cluster.
607

608
  """
609
  _OP_REQP = []
610

    
611
  def CheckPrereq(self):
612
    """Check prerequisites.
613

614
    This checks whether the cluster is empty.
615

616
    Any errors are signaled by raising errors.OpPrereqError.
617

618
    """
619
    master = self.cfg.GetMasterNode()
620

    
621
    nodelist = self.cfg.GetNodeList()
622
    if len(nodelist) != 1 or nodelist[0] != master:
623
      raise errors.OpPrereqError("There are still %d node(s) in"
624
                                 " this cluster." % (len(nodelist) - 1))
625
    instancelist = self.cfg.GetInstanceList()
626
    if instancelist:
627
      raise errors.OpPrereqError("There are still %d instance(s) in"
628
                                 " this cluster." % len(instancelist))
629

    
630
  def Exec(self, feedback_fn):
631
    """Destroys the cluster.
632

633
    """
634
    master = self.cfg.GetMasterNode()
635
    result = self.rpc.call_node_stop_master(master, False)
636
    result.Raise()
637
    if not result.data:
638
      raise errors.OpExecError("Could not disable the master role")
639
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
640
    utils.CreateBackup(priv_key)
641
    utils.CreateBackup(pub_key)
642
    return master
643

    
644

    
645
class LUVerifyCluster(LogicalUnit):
646
  """Verifies the cluster status.
647

648
  """
649
  HPATH = "cluster-verify"
650
  HTYPE = constants.HTYPE_CLUSTER
651
  _OP_REQP = ["skip_checks"]
652
  REQ_BGL = False
653

    
654
  def ExpandNames(self):
655
    self.needed_locks = {
656
      locking.LEVEL_NODE: locking.ALL_SET,
657
      locking.LEVEL_INSTANCE: locking.ALL_SET,
658
    }
659
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
660

    
661
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
662
                  node_result, feedback_fn, master_files,
663
                  drbd_map, vg_name):
664
    """Run multiple tests against a node.
665

666
    Test list:
667

668
      - compares ganeti version
669
      - checks vg existence and size > 20G
670
      - checks config file checksum
671
      - checks ssh to other nodes
672

673
    @type nodeinfo: L{objects.Node}
674
    @param nodeinfo: the node to check
675
    @param file_list: required list of files
676
    @param local_cksum: dictionary of local files and their checksums
677
    @param node_result: the results from the node
678
    @param feedback_fn: function used to accumulate results
679
    @param master_files: list of files that only masters should have
680
    @param drbd_map: the useddrbd minors for this node, in
681
        form of minor: (instance, must_exist) which correspond to instances
682
        and their running status
683
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
684

685
    """
686
    node = nodeinfo.name
687

    
688
    # main result, node_result should be a non-empty dict
689
    if not node_result or not isinstance(node_result, dict):
690
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
691
      return True
692

    
693
    # compares ganeti version
694
    local_version = constants.PROTOCOL_VERSION
695
    remote_version = node_result.get('version', None)
696
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
697
            len(remote_version) == 2):
698
      feedback_fn("  - ERROR: connection to %s failed" % (node))
699
      return True
700

    
701
    if local_version != remote_version[0]:
702
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
703
                  " node %s %s" % (local_version, node, remote_version[0]))
704
      return True
705

    
706
    # node seems compatible, we can actually try to look into its results
707

    
708
    bad = False
709

    
710
    # full package version
711
    if constants.RELEASE_VERSION != remote_version[1]:
712
      feedback_fn("  - WARNING: software version mismatch: master %s,"
713
                  " node %s %s" %
714
                  (constants.RELEASE_VERSION, node, remote_version[1]))
715

    
716
    # checks vg existence and size > 20G
717
    if vg_name is not None:
718
      vglist = node_result.get(constants.NV_VGLIST, None)
719
      if not vglist:
720
        feedback_fn("  - ERROR: unable to check volume groups on node %s." %
721
                        (node,))
722
        bad = True
723
      else:
724
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
725
                                              constants.MIN_VG_SIZE)
726
        if vgstatus:
727
          feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
728
          bad = True
729

    
730
    # checks config file checksum
731

    
732
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
733
    if not isinstance(remote_cksum, dict):
734
      bad = True
735
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
736
    else:
737
      for file_name in file_list:
738
        node_is_mc = nodeinfo.master_candidate
739
        must_have_file = file_name not in master_files
740
        if file_name not in remote_cksum:
741
          if node_is_mc or must_have_file:
742
            bad = True
743
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
744
        elif remote_cksum[file_name] != local_cksum[file_name]:
745
          if node_is_mc or must_have_file:
746
            bad = True
747
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
748
          else:
749
            # not candidate and this is not a must-have file
750
            bad = True
751
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
752
                        " candidates (and the file is outdated)" % file_name)
753
        else:
754
          # all good, except non-master/non-must have combination
755
          if not node_is_mc and not must_have_file:
756
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
757
                        " candidates" % file_name)
758

    
759
    # checks ssh to any
760

    
761
    if constants.NV_NODELIST not in node_result:
762
      bad = True
763
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
764
    else:
765
      if node_result[constants.NV_NODELIST]:
766
        bad = True
767
        for node in node_result[constants.NV_NODELIST]:
768
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
769
                          (node, node_result[constants.NV_NODELIST][node]))
770

    
771
    if constants.NV_NODENETTEST not in node_result:
772
      bad = True
773
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
774
    else:
775
      if node_result[constants.NV_NODENETTEST]:
776
        bad = True
777
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
778
        for node in nlist:
779
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
780
                          (node, node_result[constants.NV_NODENETTEST][node]))
781

    
782
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
783
    if isinstance(hyp_result, dict):
784
      for hv_name, hv_result in hyp_result.iteritems():
785
        if hv_result is not None:
786
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
787
                      (hv_name, hv_result))
788

    
789
    # check used drbd list
790
    if vg_name is not None:
791
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
792
      if not isinstance(used_minors, (tuple, list)):
793
        feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
794
                    str(used_minors))
795
      else:
796
        for minor, (iname, must_exist) in drbd_map.items():
797
          if minor not in used_minors and must_exist:
798
            feedback_fn("  - ERROR: drbd minor %d of instance %s is"
799
                        " not active" % (minor, iname))
800
            bad = True
801
        for minor in used_minors:
802
          if minor not in drbd_map:
803
            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
804
                        minor)
805
            bad = True
806

    
807
    return bad
808

    
809
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
810
                      node_instance, feedback_fn, n_offline):
811
    """Verify an instance.
812

813
    This function checks to see if the required block devices are
814
    available on the instance's node.
815

816
    """
817
    bad = False
818

    
819
    node_current = instanceconfig.primary_node
820

    
821
    node_vol_should = {}
822
    instanceconfig.MapLVsByNode(node_vol_should)
823

    
824
    for node in node_vol_should:
825
      if node in n_offline:
826
        # ignore missing volumes on offline nodes
827
        continue
828
      for volume in node_vol_should[node]:
829
        if node not in node_vol_is or volume not in node_vol_is[node]:
830
          feedback_fn("  - ERROR: volume %s missing on node %s" %
831
                          (volume, node))
832
          bad = True
833

    
834
    if instanceconfig.admin_up:
835
      if ((node_current not in node_instance or
836
          not instance in node_instance[node_current]) and
837
          node_current not in n_offline):
838
        feedback_fn("  - ERROR: instance %s not running on node %s" %
839
                        (instance, node_current))
840
        bad = True
841

    
842
    for node in node_instance:
843
      if (not node == node_current):
844
        if instance in node_instance[node]:
845
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
846
                          (instance, node))
847
          bad = True
848

    
849
    return bad
850

    
851
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
852
    """Verify if there are any unknown volumes in the cluster.
853

854
    The .os, .swap and backup volumes are ignored. All other volumes are
855
    reported as unknown.
856

857
    """
858
    bad = False
859

    
860
    for node in node_vol_is:
861
      for volume in node_vol_is[node]:
862
        if node not in node_vol_should or volume not in node_vol_should[node]:
863
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
864
                      (volume, node))
865
          bad = True
866
    return bad
867

    
868
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
869
    """Verify the list of running instances.
870

871
    This checks what instances are running but unknown to the cluster.
872

873
    """
874
    bad = False
875
    for node in node_instance:
876
      for runninginstance in node_instance[node]:
877
        if runninginstance not in instancelist:
878
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
879
                          (runninginstance, node))
880
          bad = True
881
    return bad
882

    
883
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
884
    """Verify N+1 Memory Resilience.
885

886
    Check that if one single node dies we can still start all the instances it
887
    was primary for.
888

889
    """
890
    bad = False
891

    
892
    for node, nodeinfo in node_info.iteritems():
893
      # This code checks that every node which is now listed as secondary has
894
      # enough memory to host all instances it is supposed to should a single
895
      # other node in the cluster fail.
896
      # FIXME: not ready for failover to an arbitrary node
897
      # FIXME: does not support file-backed instances
898
      # WARNING: we currently take into account down instances as well as up
899
      # ones, considering that even if they're down someone might want to start
900
      # them even in the event of a node failure.
901
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
902
        needed_mem = 0
903
        for instance in instances:
904
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
905
          if bep[constants.BE_AUTO_BALANCE]:
906
            needed_mem += bep[constants.BE_MEMORY]
907
        if nodeinfo['mfree'] < needed_mem:
908
          feedback_fn("  - ERROR: not enough memory on node %s to accommodate"
909
                      " failovers should node %s fail" % (node, prinode))
910
          bad = True
911
    return bad
912

    
913
  def CheckPrereq(self):
914
    """Check prerequisites.
915

916
    Transform the list of checks we're going to skip into a set and check that
917
    all its members are valid.
918

919
    """
920
    self.skip_set = frozenset(self.op.skip_checks)
921
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
922
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
923

    
924
  def BuildHooksEnv(self):
925
    """Build hooks env.
926

927
    Cluster-Verify hooks just ran in the post phase and their failure makes
928
    the output be logged in the verify output and the verification to fail.
929

930
    """
931
    all_nodes = self.cfg.GetNodeList()
932
    env = {
933
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
934
      }
935
    for node in self.cfg.GetAllNodesInfo().values():
936
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
937

    
938
    return env, [], all_nodes
939

    
940
  def Exec(self, feedback_fn):
941
    """Verify integrity of cluster, performing various test on nodes.
942

943
    """
944
    bad = False
945
    feedback_fn("* Verifying global settings")
946
    for msg in self.cfg.VerifyConfig():
947
      feedback_fn("  - ERROR: %s" % msg)
948

    
949
    vg_name = self.cfg.GetVGName()
950
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
951
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
952
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
953
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
954
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
955
                        for iname in instancelist)
956
    i_non_redundant = [] # Non redundant instances
957
    i_non_a_balanced = [] # Non auto-balanced instances
958
    n_offline = [] # List of offline nodes
959
    n_drained = [] # List of nodes being drained
960
    node_volume = {}
961
    node_instance = {}
962
    node_info = {}
963
    instance_cfg = {}
964

    
965
    # FIXME: verify OS list
966
    # do local checksums
967
    master_files = [constants.CLUSTER_CONF_FILE]
968

    
969
    file_names = ssconf.SimpleStore().GetFileList()
970
    file_names.append(constants.SSL_CERT_FILE)
971
    file_names.append(constants.RAPI_CERT_FILE)
972
    file_names.extend(master_files)
973

    
974
    local_checksums = utils.FingerprintFiles(file_names)
975

    
976
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
977
    node_verify_param = {
978
      constants.NV_FILELIST: file_names,
979
      constants.NV_NODELIST: [node.name for node in nodeinfo
980
                              if not node.offline],
981
      constants.NV_HYPERVISOR: hypervisors,
982
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
983
                                  node.secondary_ip) for node in nodeinfo
984
                                 if not node.offline],
985
      constants.NV_INSTANCELIST: hypervisors,
986
      constants.NV_VERSION: None,
987
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
988
      }
989
    if vg_name is not None:
990
      node_verify_param[constants.NV_VGLIST] = None
991
      node_verify_param[constants.NV_LVLIST] = vg_name
992
      node_verify_param[constants.NV_DRBDLIST] = None
993
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
994
                                           self.cfg.GetClusterName())
995

    
996
    cluster = self.cfg.GetClusterInfo()
997
    master_node = self.cfg.GetMasterNode()
998
    all_drbd_map = self.cfg.ComputeDRBDMap()
999

    
1000
    for node_i in nodeinfo:
1001
      node = node_i.name
1002
      nresult = all_nvinfo[node].data
1003

    
1004
      if node_i.offline:
1005
        feedback_fn("* Skipping offline node %s" % (node,))
1006
        n_offline.append(node)
1007
        continue
1008

    
1009
      if node == master_node:
1010
        ntype = "master"
1011
      elif node_i.master_candidate:
1012
        ntype = "master candidate"
1013
      elif node_i.drained:
1014
        ntype = "drained"
1015
        n_drained.append(node)
1016
      else:
1017
        ntype = "regular"
1018
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1019

    
1020
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
1021
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
1022
        bad = True
1023
        continue
1024

    
1025
      node_drbd = {}
1026
      for minor, instance in all_drbd_map[node].items():
1027
        if instance not in instanceinfo:
1028
          feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1029
                      instance)
1030
          # ghost instance should not be running, but otherwise we
1031
          # don't give double warnings (both ghost instance and
1032
          # unallocated minor in use)
1033
          node_drbd[minor] = (instance, False)
1034
        else:
1035
          instance = instanceinfo[instance]
1036
          node_drbd[minor] = (instance.name, instance.admin_up)
1037
      result = self._VerifyNode(node_i, file_names, local_checksums,
1038
                                nresult, feedback_fn, master_files,
1039
                                node_drbd, vg_name)
1040
      bad = bad or result
1041

    
1042
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1043
      if vg_name is None:
1044
        node_volume[node] = {}
1045
      elif isinstance(lvdata, basestring):
1046
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1047
                    (node, utils.SafeEncode(lvdata)))
1048
        bad = True
1049
        node_volume[node] = {}
1050
      elif not isinstance(lvdata, dict):
1051
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1052
        bad = True
1053
        continue
1054
      else:
1055
        node_volume[node] = lvdata
1056

    
1057
      # node_instance
1058
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1059
      if not isinstance(idata, list):
1060
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1061
                    (node,))
1062
        bad = True
1063
        continue
1064

    
1065
      node_instance[node] = idata
1066

    
1067
      # node_info
1068
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1069
      if not isinstance(nodeinfo, dict):
1070
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1071
        bad = True
1072
        continue
1073

    
1074
      try:
1075
        node_info[node] = {
1076
          "mfree": int(nodeinfo['memory_free']),
1077
          "pinst": [],
1078
          "sinst": [],
1079
          # dictionary holding all instances this node is secondary for,
1080
          # grouped by their primary node. Each key is a cluster node, and each
1081
          # value is a list of instances which have the key as primary and the
1082
          # current node as secondary.  this is handy to calculate N+1 memory
1083
          # availability if you can only failover from a primary to its
1084
          # secondary.
1085
          "sinst-by-pnode": {},
1086
        }
1087
        # FIXME: devise a free space model for file based instances as well
1088
        if vg_name is not None:
1089
          if (constants.NV_VGLIST not in nresult or
1090
              vg_name not in nresult[constants.NV_VGLIST]):
1091
            feedback_fn("  - ERROR: node %s didn't return data for the"
1092
                        " volume group '%s' - it is either missing or broken" %
1093
                        (node, vg_name))
1094
            bad = True
1095
            continue
1096
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1097
      except (ValueError, KeyError):
1098
        feedback_fn("  - ERROR: invalid nodeinfo value returned"
1099
                    " from node %s" % (node,))
1100
        bad = True
1101
        continue
1102

    
1103
    node_vol_should = {}
1104

    
1105
    for instance in instancelist:
1106
      feedback_fn("* Verifying instance %s" % instance)
1107
      inst_config = instanceinfo[instance]
1108
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1109
                                     node_instance, feedback_fn, n_offline)
1110
      bad = bad or result
1111
      inst_nodes_offline = []
1112

    
1113
      inst_config.MapLVsByNode(node_vol_should)
1114

    
1115
      instance_cfg[instance] = inst_config
1116

    
1117
      pnode = inst_config.primary_node
1118
      if pnode in node_info:
1119
        node_info[pnode]['pinst'].append(instance)
1120
      elif pnode not in n_offline:
1121
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1122
                    " %s failed" % (instance, pnode))
1123
        bad = True
1124

    
1125
      if pnode in n_offline:
1126
        inst_nodes_offline.append(pnode)
1127

    
1128
      # If the instance is non-redundant we cannot survive losing its primary
1129
      # node, so we are not N+1 compliant. On the other hand we have no disk
1130
      # templates with more than one secondary so that situation is not well
1131
      # supported either.
1132
      # FIXME: does not support file-backed instances
1133
      if len(inst_config.secondary_nodes) == 0:
1134
        i_non_redundant.append(instance)
1135
      elif len(inst_config.secondary_nodes) > 1:
1136
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1137
                    % instance)
1138

    
1139
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1140
        i_non_a_balanced.append(instance)
1141

    
1142
      for snode in inst_config.secondary_nodes:
1143
        if snode in node_info:
1144
          node_info[snode]['sinst'].append(instance)
1145
          if pnode not in node_info[snode]['sinst-by-pnode']:
1146
            node_info[snode]['sinst-by-pnode'][pnode] = []
1147
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1148
        elif snode not in n_offline:
1149
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1150
                      " %s failed" % (instance, snode))
1151
          bad = True
1152
        if snode in n_offline:
1153
          inst_nodes_offline.append(snode)
1154

    
1155
      if inst_nodes_offline:
1156
        # warn that the instance lives on offline nodes, and set bad=True
1157
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1158
                    ", ".join(inst_nodes_offline))
1159
        bad = True
1160

    
1161
    feedback_fn("* Verifying orphan volumes")
1162
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1163
                                       feedback_fn)
1164
    bad = bad or result
1165

    
1166
    feedback_fn("* Verifying remaining instances")
1167
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1168
                                         feedback_fn)
1169
    bad = bad or result
1170

    
1171
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1172
      feedback_fn("* Verifying N+1 Memory redundancy")
1173
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1174
      bad = bad or result
1175

    
1176
    feedback_fn("* Other Notes")
1177
    if i_non_redundant:
1178
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1179
                  % len(i_non_redundant))
1180

    
1181
    if i_non_a_balanced:
1182
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1183
                  % len(i_non_a_balanced))
1184

    
1185
    if n_offline:
1186
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1187

    
1188
    if n_drained:
1189
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1190

    
1191
    return not bad
1192

    
1193
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1194
    """Analyze the post-hooks' result
1195

1196
    This method analyses the hook result, handles it, and sends some
1197
    nicely-formatted feedback back to the user.
1198

1199
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1200
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1201
    @param hooks_results: the results of the multi-node hooks rpc call
1202
    @param feedback_fn: function used send feedback back to the caller
1203
    @param lu_result: previous Exec result
1204
    @return: the new Exec result, based on the previous result
1205
        and hook results
1206

1207
    """
1208
    # We only really run POST phase hooks, and are only interested in
1209
    # their results
1210
    if phase == constants.HOOKS_PHASE_POST:
1211
      # Used to change hooks' output to proper indentation
1212
      indent_re = re.compile('^', re.M)
1213
      feedback_fn("* Hooks Results")
1214
      if not hooks_results:
1215
        feedback_fn("  - ERROR: general communication failure")
1216
        lu_result = 1
1217
      else:
1218
        for node_name in hooks_results:
1219
          show_node_header = True
1220
          res = hooks_results[node_name]
1221
          if res.failed or res.data is False or not isinstance(res.data, list):
1222
            if res.offline:
1223
              # no need to warn or set fail return value
1224
              continue
1225
            feedback_fn("    Communication failure in hooks execution")
1226
            lu_result = 1
1227
            continue
1228
          for script, hkr, output in res.data:
1229
            if hkr == constants.HKR_FAIL:
1230
              # The node header is only shown once, if there are
1231
              # failing hooks on that node
1232
              if show_node_header:
1233
                feedback_fn("  Node %s:" % node_name)
1234
                show_node_header = False
1235
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1236
              output = indent_re.sub('      ', output)
1237
              feedback_fn("%s" % output)
1238
              lu_result = 1
1239

    
1240
      return lu_result
1241

    
1242

    
1243
class LUVerifyDisks(NoHooksLU):
1244
  """Verifies the cluster disks status.
1245

1246
  """
1247
  _OP_REQP = []
1248
  REQ_BGL = False
1249

    
1250
  def ExpandNames(self):
1251
    self.needed_locks = {
1252
      locking.LEVEL_NODE: locking.ALL_SET,
1253
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1254
    }
1255
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1256

    
1257
  def CheckPrereq(self):
1258
    """Check prerequisites.
1259

1260
    This has no prerequisites.
1261

1262
    """
1263
    pass
1264

    
1265
  def Exec(self, feedback_fn):
1266
    """Verify integrity of cluster disks.
1267

1268
    """
1269
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1270

    
1271
    vg_name = self.cfg.GetVGName()
1272
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1273
    instances = [self.cfg.GetInstanceInfo(name)
1274
                 for name in self.cfg.GetInstanceList()]
1275

    
1276
    nv_dict = {}
1277
    for inst in instances:
1278
      inst_lvs = {}
1279
      if (not inst.admin_up or
1280
          inst.disk_template not in constants.DTS_NET_MIRROR):
1281
        continue
1282
      inst.MapLVsByNode(inst_lvs)
1283
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1284
      for node, vol_list in inst_lvs.iteritems():
1285
        for vol in vol_list:
1286
          nv_dict[(node, vol)] = inst
1287

    
1288
    if not nv_dict:
1289
      return result
1290

    
1291
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1292

    
1293
    for node in nodes:
1294
      # node_volume
1295
      lvs = node_lvs[node]
1296
      if lvs.failed:
1297
        if not lvs.offline:
1298
          self.LogWarning("Connection to node %s failed: %s" %
1299
                          (node, lvs.data))
1300
        continue
1301
      lvs = lvs.data
1302
      if isinstance(lvs, basestring):
1303
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1304
        res_nlvm[node] = lvs
1305
        continue
1306
      elif not isinstance(lvs, dict):
1307
        logging.warning("Connection to node %s failed or invalid data"
1308
                        " returned", node)
1309
        res_nodes.append(node)
1310
        continue
1311

    
1312
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1313
        inst = nv_dict.pop((node, lv_name), None)
1314
        if (not lv_online and inst is not None
1315
            and inst.name not in res_instances):
1316
          res_instances.append(inst.name)
1317

    
1318
    # any leftover items in nv_dict are missing LVs, let's arrange the
1319
    # data better
1320
    for key, inst in nv_dict.iteritems():
1321
      if inst.name not in res_missing:
1322
        res_missing[inst.name] = []
1323
      res_missing[inst.name].append(key)
1324

    
1325
    return result
1326

    
1327

    
1328
class LURenameCluster(LogicalUnit):
1329
  """Rename the cluster.
1330

1331
  """
1332
  HPATH = "cluster-rename"
1333
  HTYPE = constants.HTYPE_CLUSTER
1334
  _OP_REQP = ["name"]
1335

    
1336
  def BuildHooksEnv(self):
1337
    """Build hooks env.
1338

1339
    """
1340
    env = {
1341
      "OP_TARGET": self.cfg.GetClusterName(),
1342
      "NEW_NAME": self.op.name,
1343
      }
1344
    mn = self.cfg.GetMasterNode()
1345
    return env, [mn], [mn]
1346

    
1347
  def CheckPrereq(self):
1348
    """Verify that the passed name is a valid one.
1349

1350
    """
1351
    hostname = utils.HostInfo(self.op.name)
1352

    
1353
    new_name = hostname.name
1354
    self.ip = new_ip = hostname.ip
1355
    old_name = self.cfg.GetClusterName()
1356
    old_ip = self.cfg.GetMasterIP()
1357
    if new_name == old_name and new_ip == old_ip:
1358
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1359
                                 " cluster has changed")
1360
    if new_ip != old_ip:
1361
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1362
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1363
                                   " reachable on the network. Aborting." %
1364
                                   new_ip)
1365

    
1366
    self.op.name = new_name
1367

    
1368
  def Exec(self, feedback_fn):
1369
    """Rename the cluster.
1370

1371
    """
1372
    clustername = self.op.name
1373
    ip = self.ip
1374

    
1375
    # shutdown the master IP
1376
    master = self.cfg.GetMasterNode()
1377
    result = self.rpc.call_node_stop_master(master, False)
1378
    if result.failed or not result.data:
1379
      raise errors.OpExecError("Could not disable the master role")
1380

    
1381
    try:
1382
      cluster = self.cfg.GetClusterInfo()
1383
      cluster.cluster_name = clustername
1384
      cluster.master_ip = ip
1385
      self.cfg.Update(cluster)
1386

    
1387
      # update the known hosts file
1388
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1389
      node_list = self.cfg.GetNodeList()
1390
      try:
1391
        node_list.remove(master)
1392
      except ValueError:
1393
        pass
1394
      result = self.rpc.call_upload_file(node_list,
1395
                                         constants.SSH_KNOWN_HOSTS_FILE)
1396
      for to_node, to_result in result.iteritems():
1397
        if to_result.failed or not to_result.data:
1398
          logging.error("Copy of file %s to node %s failed",
1399
                        constants.SSH_KNOWN_HOSTS_FILE, to_node)
1400

    
1401
    finally:
1402
      result = self.rpc.call_node_start_master(master, False)
1403
      if result.failed or not result.data:
1404
        self.LogWarning("Could not re-enable the master role on"
1405
                        " the master, please restart manually.")
1406

    
1407

    
1408
def _RecursiveCheckIfLVMBased(disk):
1409
  """Check if the given disk or its children are lvm-based.
1410

1411
  @type disk: L{objects.Disk}
1412
  @param disk: the disk to check
1413
  @rtype: boolean
1414
  @return: boolean indicating whether a LD_LV dev_type was found or not
1415

1416
  """
1417
  if disk.children:
1418
    for chdisk in disk.children:
1419
      if _RecursiveCheckIfLVMBased(chdisk):
1420
        return True
1421
  return disk.dev_type == constants.LD_LV
1422

    
1423

    
1424
class LUSetClusterParams(LogicalUnit):
1425
  """Change the parameters of the cluster.
1426

1427
  """
1428
  HPATH = "cluster-modify"
1429
  HTYPE = constants.HTYPE_CLUSTER
1430
  _OP_REQP = []
1431
  REQ_BGL = False
1432

    
1433
  def CheckArguments(self):
1434
    """Check parameters
1435

1436
    """
1437
    if not hasattr(self.op, "candidate_pool_size"):
1438
      self.op.candidate_pool_size = None
1439
    if self.op.candidate_pool_size is not None:
1440
      try:
1441
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1442
      except (ValueError, TypeError), err:
1443
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1444
                                   str(err))
1445
      if self.op.candidate_pool_size < 1:
1446
        raise errors.OpPrereqError("At least one master candidate needed")
1447

    
1448
  def ExpandNames(self):
1449
    # FIXME: in the future maybe other cluster params won't require checking on
1450
    # all nodes to be modified.
1451
    self.needed_locks = {
1452
      locking.LEVEL_NODE: locking.ALL_SET,
1453
    }
1454
    self.share_locks[locking.LEVEL_NODE] = 1
1455

    
1456
  def BuildHooksEnv(self):
1457
    """Build hooks env.
1458

1459
    """
1460
    env = {
1461
      "OP_TARGET": self.cfg.GetClusterName(),
1462
      "NEW_VG_NAME": self.op.vg_name,
1463
      }
1464
    mn = self.cfg.GetMasterNode()
1465
    return env, [mn], [mn]
1466

    
1467
  def CheckPrereq(self):
1468
    """Check prerequisites.
1469

1470
    This checks whether the given params don't conflict and
1471
    if the given volume group is valid.
1472

1473
    """
1474
    if self.op.vg_name is not None and not self.op.vg_name:
1475
      instances = self.cfg.GetAllInstancesInfo().values()
1476
      for inst in instances:
1477
        for disk in inst.disks:
1478
          if _RecursiveCheckIfLVMBased(disk):
1479
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1480
                                       " lvm-based instances exist")
1481

    
1482
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1483

    
1484
    # if vg_name not None, checks given volume group on all nodes
1485
    if self.op.vg_name:
1486
      vglist = self.rpc.call_vg_list(node_list)
1487
      for node in node_list:
1488
        if vglist[node].failed:
1489
          # ignoring down node
1490
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1491
          continue
1492
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1493
                                              self.op.vg_name,
1494
                                              constants.MIN_VG_SIZE)
1495
        if vgstatus:
1496
          raise errors.OpPrereqError("Error on node '%s': %s" %
1497
                                     (node, vgstatus))
1498

    
1499
    self.cluster = cluster = self.cfg.GetClusterInfo()
1500
    # validate beparams changes
1501
    if self.op.beparams:
1502
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1503
      self.new_beparams = cluster.FillDict(
1504
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1505

    
1506
    # hypervisor list/parameters
1507
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1508
    if self.op.hvparams:
1509
      if not isinstance(self.op.hvparams, dict):
1510
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1511
      for hv_name, hv_dict in self.op.hvparams.items():
1512
        if hv_name not in self.new_hvparams:
1513
          self.new_hvparams[hv_name] = hv_dict
1514
        else:
1515
          self.new_hvparams[hv_name].update(hv_dict)
1516

    
1517
    if self.op.enabled_hypervisors is not None:
1518
      self.hv_list = self.op.enabled_hypervisors
1519
      if not self.hv_list:
1520
        raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1521
                                   " least one member")
1522
      invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1523
      if invalid_hvs:
1524
        raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1525
                                   " entries: %s" % invalid_hvs)
1526
    else:
1527
      self.hv_list = cluster.enabled_hypervisors
1528

    
1529
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1530
      # either the enabled list has changed, or the parameters have, validate
1531
      for hv_name, hv_params in self.new_hvparams.items():
1532
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1533
            (self.op.enabled_hypervisors and
1534
             hv_name in self.op.enabled_hypervisors)):
1535
          # either this is a new hypervisor, or its parameters have changed
1536
          hv_class = hypervisor.GetHypervisor(hv_name)
1537
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1538
          hv_class.CheckParameterSyntax(hv_params)
1539
          _CheckHVParams(self, node_list, hv_name, hv_params)
1540

    
1541
  def Exec(self, feedback_fn):
1542
    """Change the parameters of the cluster.
1543

1544
    """
1545
    if self.op.vg_name is not None:
1546
      new_volume = self.op.vg_name
1547
      if not new_volume:
1548
        new_volume = None
1549
      if new_volume != self.cfg.GetVGName():
1550
        self.cfg.SetVGName(new_volume)
1551
      else:
1552
        feedback_fn("Cluster LVM configuration already in desired"
1553
                    " state, not changing")
1554
    if self.op.hvparams:
1555
      self.cluster.hvparams = self.new_hvparams
1556
    if self.op.enabled_hypervisors is not None:
1557
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1558
    if self.op.beparams:
1559
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1560
    if self.op.candidate_pool_size is not None:
1561
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1562
      # we need to update the pool size here, otherwise the save will fail
1563
      _AdjustCandidatePool(self)
1564

    
1565
    self.cfg.Update(self.cluster)
1566

    
1567

    
1568
class LURedistributeConfig(NoHooksLU):
1569
  """Force the redistribution of cluster configuration.
1570

1571
  This is a very simple LU.
1572

1573
  """
1574
  _OP_REQP = []
1575
  REQ_BGL = False
1576

    
1577
  def ExpandNames(self):
1578
    self.needed_locks = {
1579
      locking.LEVEL_NODE: locking.ALL_SET,
1580
    }
1581
    self.share_locks[locking.LEVEL_NODE] = 1
1582

    
1583
  def CheckPrereq(self):
1584
    """Check prerequisites.
1585

1586
    """
1587

    
1588
  def Exec(self, feedback_fn):
1589
    """Redistribute the configuration.
1590

1591
    """
1592
    self.cfg.Update(self.cfg.GetClusterInfo())
1593

    
1594

    
1595
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1596
  """Sleep and poll for an instance's disk to sync.
1597

1598
  """
1599
  if not instance.disks:
1600
    return True
1601

    
1602
  if not oneshot:
1603
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1604

    
1605
  node = instance.primary_node
1606

    
1607
  for dev in instance.disks:
1608
    lu.cfg.SetDiskID(dev, node)
1609

    
1610
  retries = 0
1611
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1612
  while True:
1613
    max_time = 0
1614
    done = True
1615
    cumul_degraded = False
1616
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1617
    if rstats.failed or not rstats.data:
1618
      lu.LogWarning("Can't get any data from node %s", node)
1619
      retries += 1
1620
      if retries >= 10:
1621
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1622
                                 " aborting." % node)
1623
      time.sleep(6)
1624
      continue
1625
    rstats = rstats.data
1626
    retries = 0
1627
    for i, mstat in enumerate(rstats):
1628
      if mstat is None:
1629
        lu.LogWarning("Can't compute data for node %s/%s",
1630
                           node, instance.disks[i].iv_name)
1631
        continue
1632
      # we ignore the ldisk parameter
1633
      perc_done, est_time, is_degraded, _ = mstat
1634
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1635
      if perc_done is not None:
1636
        done = False
1637
        if est_time is not None:
1638
          rem_time = "%d estimated seconds remaining" % est_time
1639
          max_time = est_time
1640
        else:
1641
          rem_time = "no time estimate"
1642
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1643
                        (instance.disks[i].iv_name, perc_done, rem_time))
1644

    
1645
    # if we're done but degraded, let's do a few small retries, to
1646
    # make sure we see a stable and not transient situation; therefore
1647
    # we force restart of the loop
1648
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1649
      logging.info("Degraded disks found, %d retries left", degr_retries)
1650
      degr_retries -= 1
1651
      time.sleep(1)
1652
      continue
1653

    
1654
    if done or oneshot:
1655
      break
1656

    
1657
    time.sleep(min(60, max_time))
1658

    
1659
  if done:
1660
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1661
  return not cumul_degraded
1662

    
1663

    
1664
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1665
  """Check that mirrors are not degraded.
1666

1667
  The ldisk parameter, if True, will change the test from the
1668
  is_degraded attribute (which represents overall non-ok status for
1669
  the device(s)) to the ldisk (representing the local storage status).
1670

1671
  """
1672
  lu.cfg.SetDiskID(dev, node)
1673
  if ldisk:
1674
    idx = 6
1675
  else:
1676
    idx = 5
1677

    
1678
  result = True
1679
  if on_primary or dev.AssembleOnSecondary():
1680
    rstats = lu.rpc.call_blockdev_find(node, dev)
1681
    msg = rstats.RemoteFailMsg()
1682
    if msg:
1683
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1684
      result = False
1685
    elif not rstats.payload:
1686
      lu.LogWarning("Can't find disk on node %s", node)
1687
      result = False
1688
    else:
1689
      result = result and (not rstats.payload[idx])
1690
  if dev.children:
1691
    for child in dev.children:
1692
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1693

    
1694
  return result
1695

    
1696

    
1697
class LUDiagnoseOS(NoHooksLU):
1698
  """Logical unit for OS diagnose/query.
1699

1700
  """
1701
  _OP_REQP = ["output_fields", "names"]
1702
  REQ_BGL = False
1703
  _FIELDS_STATIC = utils.FieldSet()
1704
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1705

    
1706
  def ExpandNames(self):
1707
    if self.op.names:
1708
      raise errors.OpPrereqError("Selective OS query not supported")
1709

    
1710
    _CheckOutputFields(static=self._FIELDS_STATIC,
1711
                       dynamic=self._FIELDS_DYNAMIC,
1712
                       selected=self.op.output_fields)
1713

    
1714
    # Lock all nodes, in shared mode
1715
    # Temporary removal of locks, should be reverted later
1716
    # TODO: reintroduce locks when they are lighter-weight
1717
    self.needed_locks = {}
1718
    #self.share_locks[locking.LEVEL_NODE] = 1
1719
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1720

    
1721
  def CheckPrereq(self):
1722
    """Check prerequisites.
1723

1724
    """
1725

    
1726
  @staticmethod
1727
  def _DiagnoseByOS(node_list, rlist):
1728
    """Remaps a per-node return list into an a per-os per-node dictionary
1729

1730
    @param node_list: a list with the names of all nodes
1731
    @param rlist: a map with node names as keys and OS objects as values
1732

1733
    @rtype: dict
1734
    @return: a dictionary with osnames as keys and as value another map, with
1735
        nodes as keys and list of OS objects as values, eg::
1736

1737
          {"debian-etch": {"node1": [<object>,...],
1738
                           "node2": [<object>,]}
1739
          }
1740

1741
    """
1742
    all_os = {}
1743
    # we build here the list of nodes that didn't fail the RPC (at RPC
1744
    # level), so that nodes with a non-responding node daemon don't
1745
    # make all OSes invalid
1746
    good_nodes = [node_name for node_name in rlist
1747
                  if not rlist[node_name].failed]
1748
    for node_name, nr in rlist.iteritems():
1749
      if nr.failed or not nr.data:
1750
        continue
1751
      for os_obj in nr.data:
1752
        if os_obj.name not in all_os:
1753
          # build a list of nodes for this os containing empty lists
1754
          # for each node in node_list
1755
          all_os[os_obj.name] = {}
1756
          for nname in good_nodes:
1757
            all_os[os_obj.name][nname] = []
1758
        all_os[os_obj.name][node_name].append(os_obj)
1759
    return all_os
1760

    
1761
  def Exec(self, feedback_fn):
1762
    """Compute the list of OSes.
1763

1764
    """
1765
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1766
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1767
    if node_data == False:
1768
      raise errors.OpExecError("Can't gather the list of OSes")
1769
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1770
    output = []
1771
    for os_name, os_data in pol.iteritems():
1772
      row = []
1773
      for field in self.op.output_fields:
1774
        if field == "name":
1775
          val = os_name
1776
        elif field == "valid":
1777
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1778
        elif field == "node_status":
1779
          val = {}
1780
          for node_name, nos_list in os_data.iteritems():
1781
            val[node_name] = [(v.status, v.path) for v in nos_list]
1782
        else:
1783
          raise errors.ParameterError(field)
1784
        row.append(val)
1785
      output.append(row)
1786

    
1787
    return output
1788

    
1789

    
1790
class LURemoveNode(LogicalUnit):
1791
  """Logical unit for removing a node.
1792

1793
  """
1794
  HPATH = "node-remove"
1795
  HTYPE = constants.HTYPE_NODE
1796
  _OP_REQP = ["node_name"]
1797

    
1798
  def BuildHooksEnv(self):
1799
    """Build hooks env.
1800

1801
    This doesn't run on the target node in the pre phase as a failed
1802
    node would then be impossible to remove.
1803

1804
    """
1805
    env = {
1806
      "OP_TARGET": self.op.node_name,
1807
      "NODE_NAME": self.op.node_name,
1808
      }
1809
    all_nodes = self.cfg.GetNodeList()
1810
    all_nodes.remove(self.op.node_name)
1811
    return env, all_nodes, all_nodes
1812

    
1813
  def CheckPrereq(self):
1814
    """Check prerequisites.
1815

1816
    This checks:
1817
     - the node exists in the configuration
1818
     - it does not have primary or secondary instances
1819
     - it's not the master
1820

1821
    Any errors are signaled by raising errors.OpPrereqError.
1822

1823
    """
1824
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1825
    if node is None:
1826
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1827

    
1828
    instance_list = self.cfg.GetInstanceList()
1829

    
1830
    masternode = self.cfg.GetMasterNode()
1831
    if node.name == masternode:
1832
      raise errors.OpPrereqError("Node is the master node,"
1833
                                 " you need to failover first.")
1834

    
1835
    for instance_name in instance_list:
1836
      instance = self.cfg.GetInstanceInfo(instance_name)
1837
      if node.name in instance.all_nodes:
1838
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1839
                                   " please remove first." % instance_name)
1840
    self.op.node_name = node.name
1841
    self.node = node
1842

    
1843
  def Exec(self, feedback_fn):
1844
    """Removes the node from the cluster.
1845

1846
    """
1847
    node = self.node
1848
    logging.info("Stopping the node daemon and removing configs from node %s",
1849
                 node.name)
1850

    
1851
    self.context.RemoveNode(node.name)
1852

    
1853
    self.rpc.call_node_leave_cluster(node.name)
1854

    
1855
    # Promote nodes to master candidate as needed
1856
    _AdjustCandidatePool(self)
1857

    
1858

    
1859
class LUQueryNodes(NoHooksLU):
1860
  """Logical unit for querying nodes.
1861

1862
  """
1863
  _OP_REQP = ["output_fields", "names", "use_locking"]
1864
  REQ_BGL = False
1865
  _FIELDS_DYNAMIC = utils.FieldSet(
1866
    "dtotal", "dfree",
1867
    "mtotal", "mnode", "mfree",
1868
    "bootid",
1869
    "ctotal", "cnodes", "csockets",
1870
    )
1871

    
1872
  _FIELDS_STATIC = utils.FieldSet(
1873
    "name", "pinst_cnt", "sinst_cnt",
1874
    "pinst_list", "sinst_list",
1875
    "pip", "sip", "tags",
1876
    "serial_no",
1877
    "master_candidate",
1878
    "master",
1879
    "offline",
1880
    "drained",
1881
    "role",
1882
    )
1883

    
1884
  def ExpandNames(self):
1885
    _CheckOutputFields(static=self._FIELDS_STATIC,
1886
                       dynamic=self._FIELDS_DYNAMIC,
1887
                       selected=self.op.output_fields)
1888

    
1889
    self.needed_locks = {}
1890
    self.share_locks[locking.LEVEL_NODE] = 1
1891

    
1892
    if self.op.names:
1893
      self.wanted = _GetWantedNodes(self, self.op.names)
1894
    else:
1895
      self.wanted = locking.ALL_SET
1896

    
1897
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1898
    self.do_locking = self.do_node_query and self.op.use_locking
1899
    if self.do_locking:
1900
      # if we don't request only static fields, we need to lock the nodes
1901
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1902

    
1903

    
1904
  def CheckPrereq(self):
1905
    """Check prerequisites.
1906

1907
    """
1908
    # The validation of the node list is done in the _GetWantedNodes,
1909
    # if non empty, and if empty, there's no validation to do
1910
    pass
1911

    
1912
  def Exec(self, feedback_fn):
1913
    """Computes the list of nodes and their attributes.
1914

1915
    """
1916
    all_info = self.cfg.GetAllNodesInfo()
1917
    if self.do_locking:
1918
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1919
    elif self.wanted != locking.ALL_SET:
1920
      nodenames = self.wanted
1921
      missing = set(nodenames).difference(all_info.keys())
1922
      if missing:
1923
        raise errors.OpExecError(
1924
          "Some nodes were removed before retrieving their data: %s" % missing)
1925
    else:
1926
      nodenames = all_info.keys()
1927

    
1928
    nodenames = utils.NiceSort(nodenames)
1929
    nodelist = [all_info[name] for name in nodenames]
1930

    
1931
    # begin data gathering
1932

    
1933
    if self.do_node_query:
1934
      live_data = {}
1935
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1936
                                          self.cfg.GetHypervisorType())
1937
      for name in nodenames:
1938
        nodeinfo = node_data[name]
1939
        if not nodeinfo.failed and nodeinfo.data:
1940
          nodeinfo = nodeinfo.data
1941
          fn = utils.TryConvert
1942
          live_data[name] = {
1943
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1944
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1945
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1946
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1947
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1948
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1949
            "bootid": nodeinfo.get('bootid', None),
1950
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1951
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1952
            }
1953
        else:
1954
          live_data[name] = {}
1955
    else:
1956
      live_data = dict.fromkeys(nodenames, {})
1957

    
1958
    node_to_primary = dict([(name, set()) for name in nodenames])
1959
    node_to_secondary = dict([(name, set()) for name in nodenames])
1960

    
1961
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1962
                             "sinst_cnt", "sinst_list"))
1963
    if inst_fields & frozenset(self.op.output_fields):
1964
      instancelist = self.cfg.GetInstanceList()
1965

    
1966
      for instance_name in instancelist:
1967
        inst = self.cfg.GetInstanceInfo(instance_name)
1968
        if inst.primary_node in node_to_primary:
1969
          node_to_primary[inst.primary_node].add(inst.name)
1970
        for secnode in inst.secondary_nodes:
1971
          if secnode in node_to_secondary:
1972
            node_to_secondary[secnode].add(inst.name)
1973

    
1974
    master_node = self.cfg.GetMasterNode()
1975

    
1976
    # end data gathering
1977

    
1978
    output = []
1979
    for node in nodelist:
1980
      node_output = []
1981
      for field in self.op.output_fields:
1982
        if field == "name":
1983
          val = node.name
1984
        elif field == "pinst_list":
1985
          val = list(node_to_primary[node.name])
1986
        elif field == "sinst_list":
1987
          val = list(node_to_secondary[node.name])
1988
        elif field == "pinst_cnt":
1989
          val = len(node_to_primary[node.name])
1990
        elif field == "sinst_cnt":
1991
          val = len(node_to_secondary[node.name])
1992
        elif field == "pip":
1993
          val = node.primary_ip
1994
        elif field == "sip":
1995
          val = node.secondary_ip
1996
        elif field == "tags":
1997
          val = list(node.GetTags())
1998
        elif field == "serial_no":
1999
          val = node.serial_no
2000
        elif field == "master_candidate":
2001
          val = node.master_candidate
2002
        elif field == "master":
2003
          val = node.name == master_node
2004
        elif field == "offline":
2005
          val = node.offline
2006
        elif field == "drained":
2007
          val = node.drained
2008
        elif self._FIELDS_DYNAMIC.Matches(field):
2009
          val = live_data[node.name].get(field, None)
2010
        elif field == "role":
2011
          if node.name == master_node:
2012
            val = "M"
2013
          elif node.master_candidate:
2014
            val = "C"
2015
          elif node.drained:
2016
            val = "D"
2017
          elif node.offline:
2018
            val = "O"
2019
          else:
2020
            val = "R"
2021
        else:
2022
          raise errors.ParameterError(field)
2023
        node_output.append(val)
2024
      output.append(node_output)
2025

    
2026
    return output
2027

    
2028

    
2029
class LUQueryNodeVolumes(NoHooksLU):
2030
  """Logical unit for getting volumes on node(s).
2031

2032
  """
2033
  _OP_REQP = ["nodes", "output_fields"]
2034
  REQ_BGL = False
2035
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2036
  _FIELDS_STATIC = utils.FieldSet("node")
2037

    
2038
  def ExpandNames(self):
2039
    _CheckOutputFields(static=self._FIELDS_STATIC,
2040
                       dynamic=self._FIELDS_DYNAMIC,
2041
                       selected=self.op.output_fields)
2042

    
2043
    self.needed_locks = {}
2044
    self.share_locks[locking.LEVEL_NODE] = 1
2045
    if not self.op.nodes:
2046
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2047
    else:
2048
      self.needed_locks[locking.LEVEL_NODE] = \
2049
        _GetWantedNodes(self, self.op.nodes)
2050

    
2051
  def CheckPrereq(self):
2052
    """Check prerequisites.
2053

2054
    This checks that the fields required are valid output fields.
2055

2056
    """
2057
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2058

    
2059
  def Exec(self, feedback_fn):
2060
    """Computes the list of nodes and their attributes.
2061

2062
    """
2063
    nodenames = self.nodes
2064
    volumes = self.rpc.call_node_volumes(nodenames)
2065

    
2066
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2067
             in self.cfg.GetInstanceList()]
2068

    
2069
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2070

    
2071
    output = []
2072
    for node in nodenames:
2073
      if node not in volumes or volumes[node].failed or not volumes[node].data:
2074
        continue
2075

    
2076
      node_vols = volumes[node].data[:]
2077
      node_vols.sort(key=lambda vol: vol['dev'])
2078

    
2079
      for vol in node_vols:
2080
        node_output = []
2081
        for field in self.op.output_fields:
2082
          if field == "node":
2083
            val = node
2084
          elif field == "phys":
2085
            val = vol['dev']
2086
          elif field == "vg":
2087
            val = vol['vg']
2088
          elif field == "name":
2089
            val = vol['name']
2090
          elif field == "size":
2091
            val = int(float(vol['size']))
2092
          elif field == "instance":
2093
            for inst in ilist:
2094
              if node not in lv_by_node[inst]:
2095
                continue
2096
              if vol['name'] in lv_by_node[inst][node]:
2097
                val = inst.name
2098
                break
2099
            else:
2100
              val = '-'
2101
          else:
2102
            raise errors.ParameterError(field)
2103
          node_output.append(str(val))
2104

    
2105
        output.append(node_output)
2106

    
2107
    return output
2108

    
2109

    
2110
class LUAddNode(LogicalUnit):
2111
  """Logical unit for adding node to the cluster.
2112

2113
  """
2114
  HPATH = "node-add"
2115
  HTYPE = constants.HTYPE_NODE
2116
  _OP_REQP = ["node_name"]
2117

    
2118
  def BuildHooksEnv(self):
2119
    """Build hooks env.
2120

2121
    This will run on all nodes before, and on all nodes + the new node after.
2122

2123
    """
2124
    env = {
2125
      "OP_TARGET": self.op.node_name,
2126
      "NODE_NAME": self.op.node_name,
2127
      "NODE_PIP": self.op.primary_ip,
2128
      "NODE_SIP": self.op.secondary_ip,
2129
      }
2130
    nodes_0 = self.cfg.GetNodeList()
2131
    nodes_1 = nodes_0 + [self.op.node_name, ]
2132
    return env, nodes_0, nodes_1
2133

    
2134
  def CheckPrereq(self):
2135
    """Check prerequisites.
2136

2137
    This checks:
2138
     - the new node is not already in the config
2139
     - it is resolvable
2140
     - its parameters (single/dual homed) matches the cluster
2141

2142
    Any errors are signaled by raising errors.OpPrereqError.
2143

2144
    """
2145
    node_name = self.op.node_name
2146
    cfg = self.cfg
2147

    
2148
    dns_data = utils.HostInfo(node_name)
2149

    
2150
    node = dns_data.name
2151
    primary_ip = self.op.primary_ip = dns_data.ip
2152
    secondary_ip = getattr(self.op, "secondary_ip", None)
2153
    if secondary_ip is None:
2154
      secondary_ip = primary_ip
2155
    if not utils.IsValidIP(secondary_ip):
2156
      raise errors.OpPrereqError("Invalid secondary IP given")
2157
    self.op.secondary_ip = secondary_ip
2158

    
2159
    node_list = cfg.GetNodeList()
2160
    if not self.op.readd and node in node_list:
2161
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2162
                                 node)
2163
    elif self.op.readd and node not in node_list:
2164
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2165

    
2166
    for existing_node_name in node_list:
2167
      existing_node = cfg.GetNodeInfo(existing_node_name)
2168

    
2169
      if self.op.readd and node == existing_node_name:
2170
        if (existing_node.primary_ip != primary_ip or
2171
            existing_node.secondary_ip != secondary_ip):
2172
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2173
                                     " address configuration as before")
2174
        continue
2175

    
2176
      if (existing_node.primary_ip == primary_ip or
2177
          existing_node.secondary_ip == primary_ip or
2178
          existing_node.primary_ip == secondary_ip or
2179
          existing_node.secondary_ip == secondary_ip):
2180
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2181
                                   " existing node %s" % existing_node.name)
2182

    
2183
    # check that the type of the node (single versus dual homed) is the
2184
    # same as for the master
2185
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2186
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2187
    newbie_singlehomed = secondary_ip == primary_ip
2188
    if master_singlehomed != newbie_singlehomed:
2189
      if master_singlehomed:
2190
        raise errors.OpPrereqError("The master has no private ip but the"
2191
                                   " new node has one")
2192
      else:
2193
        raise errors.OpPrereqError("The master has a private ip but the"
2194
                                   " new node doesn't have one")
2195

    
2196
    # checks reachability
2197
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2198
      raise errors.OpPrereqError("Node not reachable by ping")
2199

    
2200
    if not newbie_singlehomed:
2201
      # check reachability from my secondary ip to newbie's secondary ip
2202
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2203
                           source=myself.secondary_ip):
2204
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2205
                                   " based ping to noded port")
2206

    
2207
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2208
    if self.op.readd:
2209
      exceptions = [node]
2210
    else:
2211
      exceptions = []
2212
    mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2213
    # the new node will increase mc_max with one, so:
2214
    mc_max = min(mc_max + 1, cp_size)
2215
    self.master_candidate = mc_now < mc_max
2216

    
2217
    if self.op.readd:
2218
      self.new_node = self.cfg.GetNodeInfo(node)
2219
      assert self.new_node is not None, "Can't retrieve locked node %s" % node
2220
    else:
2221
      self.new_node = objects.Node(name=node,
2222
                                   primary_ip=primary_ip,
2223
                                   secondary_ip=secondary_ip,
2224
                                   master_candidate=self.master_candidate,
2225
                                   offline=False, drained=False)
2226

    
2227
  def Exec(self, feedback_fn):
2228
    """Adds the new node to the cluster.
2229

2230
    """
2231
    new_node = self.new_node
2232
    node = new_node.name
2233

    
2234
    # for re-adds, reset the offline/drained/master-candidate flags;
2235
    # we need to reset here, otherwise offline would prevent RPC calls
2236
    # later in the procedure; this also means that if the re-add
2237
    # fails, we are left with a non-offlined, broken node
2238
    if self.op.readd:
2239
      new_node.drained = new_node.offline = False
2240
      self.LogInfo("Readding a node, the offline/drained flags were reset")
2241
      # if we demote the node, we do cleanup later in the procedure
2242
      new_node.master_candidate = self.master_candidate
2243

    
2244
    # notify the user about any possible mc promotion
2245
    if new_node.master_candidate:
2246
      self.LogInfo("Node will be a master candidate")
2247

    
2248
    # check connectivity
2249
    result = self.rpc.call_version([node])[node]
2250
    result.Raise()
2251
    if result.data:
2252
      if constants.PROTOCOL_VERSION == result.data:
2253
        logging.info("Communication to node %s fine, sw version %s match",
2254
                     node, result.data)
2255
      else:
2256
        raise errors.OpExecError("Version mismatch master version %s,"
2257
                                 " node version %s" %
2258
                                 (constants.PROTOCOL_VERSION, result.data))
2259
    else:
2260
      raise errors.OpExecError("Cannot get version from the new node")
2261

    
2262
    # setup ssh on node
2263
    logging.info("Copy ssh key to node %s", node)
2264
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2265
    keyarray = []
2266
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2267
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2268
                priv_key, pub_key]
2269

    
2270
    for i in keyfiles:
2271
      f = open(i, 'r')
2272
      try:
2273
        keyarray.append(f.read())
2274
      finally:
2275
        f.close()
2276

    
2277
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2278
                                    keyarray[2],
2279
                                    keyarray[3], keyarray[4], keyarray[5])
2280

    
2281
    msg = result.RemoteFailMsg()
2282
    if msg:
2283
      raise errors.OpExecError("Cannot transfer ssh keys to the"
2284
                               " new node: %s" % msg)
2285

    
2286
    # Add node to our /etc/hosts, and add key to known_hosts
2287
    utils.AddHostToEtcHosts(new_node.name)
2288

    
2289
    if new_node.secondary_ip != new_node.primary_ip:
2290
      result = self.rpc.call_node_has_ip_address(new_node.name,
2291
                                                 new_node.secondary_ip)
2292
      if result.failed or not result.data:
2293
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2294
                                 " you gave (%s). Please fix and re-run this"
2295
                                 " command." % new_node.secondary_ip)
2296

    
2297
    node_verify_list = [self.cfg.GetMasterNode()]
2298
    node_verify_param = {
2299
      'nodelist': [node],
2300
      # TODO: do a node-net-test as well?
2301
    }
2302

    
2303
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2304
                                       self.cfg.GetClusterName())
2305
    for verifier in node_verify_list:
2306
      if result[verifier].failed or not result[verifier].data:
2307
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2308
                                 " for remote verification" % verifier)
2309
      if result[verifier].data['nodelist']:
2310
        for failed in result[verifier].data['nodelist']:
2311
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2312
                      (verifier, result[verifier].data['nodelist'][failed]))
2313
        raise errors.OpExecError("ssh/hostname verification failed.")
2314

    
2315
    # Distribute updated /etc/hosts and known_hosts to all nodes,
2316
    # including the node just added
2317
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2318
    dist_nodes = self.cfg.GetNodeList()
2319
    if not self.op.readd:
2320
      dist_nodes.append(node)
2321
    if myself.name in dist_nodes:
2322
      dist_nodes.remove(myself.name)
2323

    
2324
    logging.debug("Copying hosts and known_hosts to all nodes")
2325
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2326
      result = self.rpc.call_upload_file(dist_nodes, fname)
2327
      for to_node, to_result in result.iteritems():
2328
        if to_result.failed or not to_result.data:
2329
          logging.error("Copy of file %s to node %s failed", fname, to_node)
2330

    
2331
    to_copy = []
2332
    enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2333
    if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2334
      to_copy.append(constants.VNC_PASSWORD_FILE)
2335

    
2336
    for fname in to_copy:
2337
      result = self.rpc.call_upload_file([node], fname)
2338
      if result[node].failed or not result[node]:
2339
        logging.error("Could not copy file %s to node %s", fname, node)
2340

    
2341
    if self.op.readd:
2342
      self.context.ReaddNode(new_node)
2343
      # make sure we redistribute the config
2344
      self.cfg.Update(new_node)
2345
      # and make sure the new node will not have old files around
2346
      if not new_node.master_candidate:
2347
        result = self.rpc.call_node_demote_from_mc(new_node.name)
2348
        msg = result.RemoteFailMsg()
2349
        if msg:
2350
          self.LogWarning("Node failed to demote itself from master"
2351
                          " candidate status: %s" % msg)
2352
    else:
2353
      self.context.AddNode(new_node)
2354

    
2355

    
2356
class LUSetNodeParams(LogicalUnit):
2357
  """Modifies the parameters of a node.
2358

2359
  """
2360
  HPATH = "node-modify"
2361
  HTYPE = constants.HTYPE_NODE
2362
  _OP_REQP = ["node_name"]
2363
  REQ_BGL = False
2364

    
2365
  def CheckArguments(self):
2366
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2367
    if node_name is None:
2368
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2369
    self.op.node_name = node_name
2370
    _CheckBooleanOpField(self.op, 'master_candidate')
2371
    _CheckBooleanOpField(self.op, 'offline')
2372
    _CheckBooleanOpField(self.op, 'drained')
2373
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2374
    if all_mods.count(None) == 3:
2375
      raise errors.OpPrereqError("Please pass at least one modification")
2376
    if all_mods.count(True) > 1:
2377
      raise errors.OpPrereqError("Can't set the node into more than one"
2378
                                 " state at the same time")
2379

    
2380
  def ExpandNames(self):
2381
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2382

    
2383
  def BuildHooksEnv(self):
2384
    """Build hooks env.
2385

2386
    This runs on the master node.
2387

2388
    """
2389
    env = {
2390
      "OP_TARGET": self.op.node_name,
2391
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2392
      "OFFLINE": str(self.op.offline),
2393
      "DRAINED": str(self.op.drained),
2394
      }
2395
    nl = [self.cfg.GetMasterNode(),
2396
          self.op.node_name]
2397
    return env, nl, nl
2398

    
2399
  def CheckPrereq(self):
2400
    """Check prerequisites.
2401

2402
    This only checks the instance list against the existing names.
2403

2404
    """
2405
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2406

    
2407
    if ((self.op.master_candidate == False or self.op.offline == True or
2408
         self.op.drained == True) and node.master_candidate):
2409
      # we will demote the node from master_candidate
2410
      if self.op.node_name == self.cfg.GetMasterNode():
2411
        raise errors.OpPrereqError("The master node has to be a"
2412
                                   " master candidate, online and not drained")
2413
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2414
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2415
      if num_candidates <= cp_size:
2416
        msg = ("Not enough master candidates (desired"
2417
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2418
        if self.op.force:
2419
          self.LogWarning(msg)
2420
        else:
2421
          raise errors.OpPrereqError(msg)
2422

    
2423
    if (self.op.master_candidate == True and
2424
        ((node.offline and not self.op.offline == False) or
2425
         (node.drained and not self.op.drained == False))):
2426
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2427
                                 " to master_candidate" % node.name)
2428

    
2429
    return
2430

    
2431
  def Exec(self, feedback_fn):
2432
    """Modifies a node.
2433

2434
    """
2435
    node = self.node
2436

    
2437
    result = []
2438
    changed_mc = False
2439

    
2440
    if self.op.offline is not None:
2441
      node.offline = self.op.offline
2442
      result.append(("offline", str(self.op.offline)))
2443
      if self.op.offline == True:
2444
        if node.master_candidate:
2445
          node.master_candidate = False
2446
          changed_mc = True
2447
          result.append(("master_candidate", "auto-demotion due to offline"))
2448
        if node.drained:
2449
          node.drained = False
2450
          result.append(("drained", "clear drained status due to offline"))
2451

    
2452
    if self.op.master_candidate is not None:
2453
      node.master_candidate = self.op.master_candidate
2454
      changed_mc = True
2455
      result.append(("master_candidate", str(self.op.master_candidate)))
2456
      if self.op.master_candidate == False:
2457
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2458
        msg = rrc.RemoteFailMsg()
2459
        if msg:
2460
          self.LogWarning("Node failed to demote itself: %s" % msg)
2461

    
2462
    if self.op.drained is not None:
2463
      node.drained = self.op.drained
2464
      result.append(("drained", str(self.op.drained)))
2465
      if self.op.drained == True:
2466
        if node.master_candidate:
2467
          node.master_candidate = False
2468
          changed_mc = True
2469
          result.append(("master_candidate", "auto-demotion due to drain"))
2470
          rrc = self.rpc.call_node_demote_from_mc(node.name)
2471
          msg = rrc.RemoteFailMsg()
2472
          if msg:
2473
            self.LogWarning("Node failed to demote itself: %s" % msg)
2474
        if node.offline:
2475
          node.offline = False
2476
          result.append(("offline", "clear offline status due to drain"))
2477

    
2478
    # this will trigger configuration file update, if needed
2479
    self.cfg.Update(node)
2480
    # this will trigger job queue propagation or cleanup
2481
    if changed_mc:
2482
      self.context.ReaddNode(node)
2483

    
2484
    return result
2485

    
2486

    
2487
class LUQueryClusterInfo(NoHooksLU):
2488
  """Query cluster configuration.
2489

2490
  """
2491
  _OP_REQP = []
2492
  REQ_BGL = False
2493

    
2494
  def ExpandNames(self):
2495
    self.needed_locks = {}
2496

    
2497
  def CheckPrereq(self):
2498
    """No prerequsites needed for this LU.
2499

2500
    """
2501
    pass
2502

    
2503
  def Exec(self, feedback_fn):
2504
    """Return cluster config.
2505

2506
    """
2507
    cluster = self.cfg.GetClusterInfo()
2508
    result = {
2509
      "software_version": constants.RELEASE_VERSION,
2510
      "protocol_version": constants.PROTOCOL_VERSION,
2511
      "config_version": constants.CONFIG_VERSION,
2512
      "os_api_version": constants.OS_API_VERSION,
2513
      "export_version": constants.EXPORT_VERSION,
2514
      "architecture": (platform.architecture()[0], platform.machine()),
2515
      "name": cluster.cluster_name,
2516
      "master": cluster.master_node,
2517
      "default_hypervisor": cluster.default_hypervisor,
2518
      "enabled_hypervisors": cluster.enabled_hypervisors,
2519
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
2520
                        for hypervisor_name in cluster.enabled_hypervisors]),
2521
      "beparams": cluster.beparams,
2522
      "candidate_pool_size": cluster.candidate_pool_size,
2523
      "default_bridge": cluster.default_bridge,
2524
      "master_netdev": cluster.master_netdev,
2525
      "volume_group_name": cluster.volume_group_name,
2526
      "file_storage_dir": cluster.file_storage_dir,
2527
      }
2528

    
2529
    return result
2530

    
2531

    
2532
class LUQueryConfigValues(NoHooksLU):
2533
  """Return configuration values.
2534

2535
  """
2536
  _OP_REQP = []
2537
  REQ_BGL = False
2538
  _FIELDS_DYNAMIC = utils.FieldSet()
2539
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2540

    
2541
  def ExpandNames(self):
2542
    self.needed_locks = {}
2543

    
2544
    _CheckOutputFields(static=self._FIELDS_STATIC,
2545
                       dynamic=self._FIELDS_DYNAMIC,
2546
                       selected=self.op.output_fields)
2547

    
2548
  def CheckPrereq(self):
2549
    """No prerequisites.
2550

2551
    """
2552
    pass
2553

    
2554
  def Exec(self, feedback_fn):
2555
    """Dump a representation of the cluster config to the standard output.
2556

2557
    """
2558
    values = []
2559
    for field in self.op.output_fields:
2560
      if field == "cluster_name":
2561
        entry = self.cfg.GetClusterName()
2562
      elif field == "master_node":
2563
        entry = self.cfg.GetMasterNode()
2564
      elif field == "drain_flag":
2565
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2566
      else:
2567
        raise errors.ParameterError(field)
2568
      values.append(entry)
2569
    return values
2570

    
2571

    
2572
class LUActivateInstanceDisks(NoHooksLU):
2573
  """Bring up an instance's disks.
2574

2575
  """
2576
  _OP_REQP = ["instance_name"]
2577
  REQ_BGL = False
2578

    
2579
  def ExpandNames(self):
2580
    self._ExpandAndLockInstance()
2581
    self.needed_locks[locking.LEVEL_NODE] = []
2582
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2583

    
2584
  def DeclareLocks(self, level):
2585
    if level == locking.LEVEL_NODE:
2586
      self._LockInstancesNodes()
2587

    
2588
  def CheckPrereq(self):
2589
    """Check prerequisites.
2590

2591
    This checks that the instance is in the cluster.
2592

2593
    """
2594
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2595
    assert self.instance is not None, \
2596
      "Cannot retrieve locked instance %s" % self.op.instance_name
2597
    _CheckNodeOnline(self, self.instance.primary_node)
2598

    
2599
  def Exec(self, feedback_fn):
2600
    """Activate the disks.
2601

2602
    """
2603
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2604
    if not disks_ok:
2605
      raise errors.OpExecError("Cannot activate block devices")
2606

    
2607
    return disks_info
2608

    
2609

    
2610
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2611
  """Prepare the block devices for an instance.
2612

2613
  This sets up the block devices on all nodes.
2614

2615
  @type lu: L{LogicalUnit}
2616
  @param lu: the logical unit on whose behalf we execute
2617
  @type instance: L{objects.Instance}
2618
  @param instance: the instance for whose disks we assemble
2619
  @type ignore_secondaries: boolean
2620
  @param ignore_secondaries: if true, errors on secondary nodes
2621
      won't result in an error return from the function
2622
  @return: False if the operation failed, otherwise a list of
2623
      (host, instance_visible_name, node_visible_name)
2624
      with the mapping from node devices to instance devices
2625

2626
  """
2627
  device_info = []
2628
  disks_ok = True
2629
  iname = instance.name
2630
  # With the two passes mechanism we try to reduce the window of
2631
  # opportunity for the race condition of switching DRBD to primary
2632
  # before handshaking occured, but we do not eliminate it
2633

    
2634
  # The proper fix would be to wait (with some limits) until the
2635
  # connection has been made and drbd transitions from WFConnection
2636
  # into any other network-connected state (Connected, SyncTarget,
2637
  # SyncSource, etc.)
2638

    
2639
  # 1st pass, assemble on all nodes in secondary mode
2640
  for inst_disk in instance.disks:
2641
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2642
      lu.cfg.SetDiskID(node_disk, node)
2643
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2644
      msg = result.RemoteFailMsg()
2645
      if msg:
2646
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2647
                           " (is_primary=False, pass=1): %s",
2648
                           inst_disk.iv_name, node, msg)
2649
        if not ignore_secondaries:
2650
          disks_ok = False
2651

    
2652
  # FIXME: race condition on drbd migration to primary
2653

    
2654
  # 2nd pass, do only the primary node
2655
  for inst_disk in instance.disks:
2656
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2657
      if node != instance.primary_node:
2658
        continue
2659
      lu.cfg.SetDiskID(node_disk, node)
2660
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2661
      msg = result.RemoteFailMsg()
2662
      if msg:
2663
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2664
                           " (is_primary=True, pass=2): %s",
2665
                           inst_disk.iv_name, node, msg)
2666
        disks_ok = False
2667
    device_info.append((instance.primary_node, inst_disk.iv_name,
2668
                        result.payload))
2669

    
2670
  # leave the disks configured for the primary node
2671
  # this is a workaround that would be fixed better by
2672
  # improving the logical/physical id handling
2673
  for disk in instance.disks:
2674
    lu.cfg.SetDiskID(disk, instance.primary_node)
2675

    
2676
  return disks_ok, device_info
2677

    
2678

    
2679
def _StartInstanceDisks(lu, instance, force):
2680
  """Start the disks of an instance.
2681

2682
  """
2683
  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
2684
                                           ignore_secondaries=force)
2685
  if not disks_ok:
2686
    _ShutdownInstanceDisks(lu, instance)
2687
    if force is not None and not force:
2688
      lu.proc.LogWarning("", hint="If the message above refers to a"
2689
                         " secondary node,"
2690
                         " you can retry the operation using '--force'.")
2691
    raise errors.OpExecError("Disk consistency error")
2692

    
2693

    
2694
class LUDeactivateInstanceDisks(NoHooksLU):
2695
  """Shutdown an instance's disks.
2696

2697
  """
2698
  _OP_REQP = ["instance_name"]
2699
  REQ_BGL = False
2700

    
2701
  def ExpandNames(self):
2702
    self._ExpandAndLockInstance()
2703
    self.needed_locks[locking.LEVEL_NODE] = []
2704
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2705

    
2706
  def DeclareLocks(self, level):
2707
    if level == locking.LEVEL_NODE:
2708
      self._LockInstancesNodes()
2709

    
2710
  def CheckPrereq(self):
2711
    """Check prerequisites.
2712

2713
    This checks that the instance is in the cluster.
2714

2715
    """
2716
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2717
    assert self.instance is not None, \
2718
      "Cannot retrieve locked instance %s" % self.op.instance_name
2719

    
2720
  def Exec(self, feedback_fn):
2721
    """Deactivate the disks
2722

2723
    """
2724
    instance = self.instance
2725
    _SafeShutdownInstanceDisks(self, instance)
2726

    
2727

    
2728
def _SafeShutdownInstanceDisks(lu, instance):
2729
  """Shutdown block devices of an instance.
2730

2731
  This function checks if an instance is running, before calling
2732
  _ShutdownInstanceDisks.
2733

2734
  """
2735
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2736
                                      [instance.hypervisor])
2737
  ins_l = ins_l[instance.primary_node]
2738
  if ins_l.failed or not isinstance(ins_l.data, list):
2739
    raise errors.OpExecError("Can't contact node '%s'" %
2740
                             instance.primary_node)
2741

    
2742
  if instance.name in ins_l.data:
2743
    raise errors.OpExecError("Instance is running, can't shutdown"
2744
                             " block devices.")
2745

    
2746
  _ShutdownInstanceDisks(lu, instance)
2747

    
2748

    
2749
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2750
  """Shutdown block devices of an instance.
2751

2752
  This does the shutdown on all nodes of the instance.
2753

2754
  If the ignore_primary is false, errors on the primary node are
2755
  ignored.
2756

2757
  """
2758
  all_result = True
2759
  for disk in instance.disks:
2760
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2761
      lu.cfg.SetDiskID(top_disk, node)
2762
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2763
      msg = result.RemoteFailMsg()
2764
      if msg:
2765
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2766
                      disk.iv_name, node, msg)
2767
        if not ignore_primary or node != instance.primary_node:
2768
          all_result = False
2769
  return all_result
2770

    
2771

    
2772
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2773
  """Checks if a node has enough free memory.
2774

2775
  This function check if a given node has the needed amount of free
2776
  memory. In case the node has less memory or we cannot get the
2777
  information from the node, this function raise an OpPrereqError
2778
  exception.
2779

2780
  @type lu: C{LogicalUnit}
2781
  @param lu: a logical unit from which we get configuration data
2782
  @type node: C{str}
2783
  @param node: the node to check
2784
  @type reason: C{str}
2785
  @param reason: string to use in the error message
2786
  @type requested: C{int}
2787
  @param requested: the amount of memory in MiB to check for
2788
  @type hypervisor_name: C{str}
2789
  @param hypervisor_name: the hypervisor to ask for memory stats
2790
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2791
      we cannot check the node
2792

2793
  """
2794
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2795
  nodeinfo[node].Raise()
2796
  free_mem = nodeinfo[node].data.get('memory_free')
2797
  if not isinstance(free_mem, int):
2798
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2799
                             " was '%s'" % (node, free_mem))
2800
  if requested > free_mem:
2801
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2802
                             " needed %s MiB, available %s MiB" %
2803
                             (node, reason, requested, free_mem))
2804

    
2805

    
2806
class LUStartupInstance(LogicalUnit):
2807
  """Starts an instance.
2808

2809
  """
2810
  HPATH = "instance-start"
2811
  HTYPE = constants.HTYPE_INSTANCE
2812
  _OP_REQP = ["instance_name", "force"]
2813
  REQ_BGL = False
2814

    
2815
  def ExpandNames(self):
2816
    self._ExpandAndLockInstance()
2817

    
2818
  def BuildHooksEnv(self):
2819
    """Build hooks env.
2820

2821
    This runs on master, primary and secondary nodes of the instance.
2822

2823
    """
2824
    env = {
2825
      "FORCE": self.op.force,
2826
      }
2827
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2828
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2829
    return env, nl, nl
2830

    
2831
  def CheckPrereq(self):
2832
    """Check prerequisites.
2833

2834
    This checks that the instance is in the cluster.
2835

2836
    """
2837
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2838
    assert self.instance is not None, \
2839
      "Cannot retrieve locked instance %s" % self.op.instance_name
2840

    
2841
    # extra beparams
2842
    self.beparams = getattr(self.op, "beparams", {})
2843
    if self.beparams:
2844
      if not isinstance(self.beparams, dict):
2845
        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2846
                                   " dict" % (type(self.beparams), ))
2847
      # fill the beparams dict
2848
      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2849
      self.op.beparams = self.beparams
2850

    
2851
    # extra hvparams
2852
    self.hvparams = getattr(self.op, "hvparams", {})
2853
    if self.hvparams:
2854
      if not isinstance(self.hvparams, dict):
2855
        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2856
                                   " dict" % (type(self.hvparams), ))
2857

    
2858
      # check hypervisor parameter syntax (locally)
2859
      cluster = self.cfg.GetClusterInfo()
2860
      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2861
      filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2862
                                    instance.hvparams)
2863
      filled_hvp.update(self.hvparams)
2864
      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2865
      hv_type.CheckParameterSyntax(filled_hvp)
2866
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2867
      self.op.hvparams = self.hvparams
2868

    
2869
    _CheckNodeOnline(self, instance.primary_node)
2870

    
2871
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2872
    # check bridges existence
2873
    _CheckInstanceBridgesExist(self, instance)
2874

    
2875
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2876
                                              instance.name,
2877
                                              instance.hypervisor)
2878
    remote_info.Raise()
2879
    if not remote_info.data:
2880
      _CheckNodeFreeMemory(self, instance.primary_node,
2881
                           "starting instance %s" % instance.name,
2882
                           bep[constants.BE_MEMORY], instance.hypervisor)
2883

    
2884
  def Exec(self, feedback_fn):
2885
    """Start the instance.
2886

2887
    """
2888
    instance = self.instance
2889
    force = self.op.force
2890

    
2891
    self.cfg.MarkInstanceUp(instance.name)
2892

    
2893
    node_current = instance.primary_node
2894

    
2895
    _StartInstanceDisks(self, instance, force)
2896

    
2897
    result = self.rpc.call_instance_start(node_current, instance,
2898
                                          self.hvparams, self.beparams)
2899
    msg = result.RemoteFailMsg()
2900
    if msg:
2901
      _ShutdownInstanceDisks(self, instance)
2902
      raise errors.OpExecError("Could not start instance: %s" % msg)
2903

    
2904

    
2905
class LURebootInstance(LogicalUnit):
2906
  """Reboot an instance.
2907

2908
  """
2909
  HPATH = "instance-reboot"
2910
  HTYPE = constants.HTYPE_INSTANCE
2911
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2912
  REQ_BGL = False
2913

    
2914
  def ExpandNames(self):
2915
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2916
                                   constants.INSTANCE_REBOOT_HARD,
2917
                                   constants.INSTANCE_REBOOT_FULL]:
2918
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2919
                                  (constants.INSTANCE_REBOOT_SOFT,
2920
                                   constants.INSTANCE_REBOOT_HARD,
2921
                                   constants.INSTANCE_REBOOT_FULL))
2922
    self._ExpandAndLockInstance()
2923

    
2924
  def BuildHooksEnv(self):
2925
    """Build hooks env.
2926

2927
    This runs on master, primary and secondary nodes of the instance.
2928

2929
    """
2930
    env = {
2931
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2932
      "REBOOT_TYPE": self.op.reboot_type,
2933
      }
2934
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2935
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2936
    return env, nl, nl
2937

    
2938
  def CheckPrereq(self):
2939
    """Check prerequisites.
2940

2941
    This checks that the instance is in the cluster.
2942

2943
    """
2944
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2945
    assert self.instance is not None, \
2946
      "Cannot retrieve locked instance %s" % self.op.instance_name
2947

    
2948
    _CheckNodeOnline(self, instance.primary_node)
2949

    
2950
    # check bridges existence
2951
    _CheckInstanceBridgesExist(self, instance)
2952

    
2953
  def Exec(self, feedback_fn):
2954
    """Reboot the instance.
2955

2956
    """
2957
    instance = self.instance
2958
    ignore_secondaries = self.op.ignore_secondaries
2959
    reboot_type = self.op.reboot_type
2960

    
2961
    node_current = instance.primary_node
2962

    
2963
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2964
                       constants.INSTANCE_REBOOT_HARD]:
2965
      for disk in instance.disks:
2966
        self.cfg.SetDiskID(disk, node_current)
2967
      result = self.rpc.call_instance_reboot(node_current, instance,
2968
                                             reboot_type)
2969
      msg = result.RemoteFailMsg()
2970
      if msg:
2971
        raise errors.OpExecError("Could not reboot instance: %s" % msg)
2972
    else:
2973
      result = self.rpc.call_instance_shutdown(node_current, instance)
2974
      msg = result.RemoteFailMsg()
2975
      if msg:
2976
        raise errors.OpExecError("Could not shutdown instance for"
2977
                                 " full reboot: %s" % msg)
2978
      _ShutdownInstanceDisks(self, instance)
2979
      _StartInstanceDisks(self, instance, ignore_secondaries)
2980
      result = self.rpc.call_instance_start(node_current, instance, None, None)
2981
      msg = result.RemoteFailMsg()
2982
      if msg:
2983
        _ShutdownInstanceDisks(self, instance)
2984
        raise errors.OpExecError("Could not start instance for"
2985
                                 " full reboot: %s" % msg)
2986

    
2987
    self.cfg.MarkInstanceUp(instance.name)
2988

    
2989

    
2990
class LUShutdownInstance(LogicalUnit):
2991
  """Shutdown an instance.
2992

2993
  """
2994
  HPATH = "instance-stop"
2995
  HTYPE = constants.HTYPE_INSTANCE
2996
  _OP_REQP = ["instance_name"]
2997
  REQ_BGL = False
2998

    
2999
  def ExpandNames(self):
3000
    self._ExpandAndLockInstance()
3001

    
3002
  def BuildHooksEnv(self):
3003
    """Build hooks env.
3004

3005
    This runs on master, primary and secondary nodes of the instance.
3006

3007
    """
3008
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3009
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3010
    return env, nl, nl
3011

    
3012
  def CheckPrereq(self):
3013
    """Check prerequisites.
3014

3015
    This checks that the instance is in the cluster.
3016

3017
    """
3018
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3019
    assert self.instance is not None, \
3020
      "Cannot retrieve locked instance %s" % self.op.instance_name
3021
    _CheckNodeOnline(self, self.instance.primary_node)
3022

    
3023
  def Exec(self, feedback_fn):
3024
    """Shutdown the instance.
3025

3026
    """
3027
    instance = self.instance
3028
    node_current = instance.primary_node
3029
    self.cfg.MarkInstanceDown(instance.name)
3030
    result = self.rpc.call_instance_shutdown(node_current, instance)
3031
    msg = result.RemoteFailMsg()
3032
    if msg:
3033
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3034

    
3035
    _ShutdownInstanceDisks(self, instance)
3036

    
3037

    
3038
class LUReinstallInstance(LogicalUnit):
3039
  """Reinstall an instance.
3040

3041
  """
3042
  HPATH = "instance-reinstall"
3043
  HTYPE = constants.HTYPE_INSTANCE
3044
  _OP_REQP = ["instance_name"]
3045
  REQ_BGL = False
3046

    
3047
  def ExpandNames(self):
3048
    self._ExpandAndLockInstance()
3049

    
3050
  def BuildHooksEnv(self):
3051
    """Build hooks env.
3052

3053
    This runs on master, primary and secondary nodes of the instance.
3054

3055
    """
3056
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3057
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3058
    return env, nl, nl
3059

    
3060
  def CheckPrereq(self):
3061
    """Check prerequisites.
3062

3063
    This checks that the instance is in the cluster and is not running.
3064

3065
    """
3066
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3067
    assert instance is not None, \
3068
      "Cannot retrieve locked instance %s" % self.op.instance_name
3069
    _CheckNodeOnline(self, instance.primary_node)
3070

    
3071
    if instance.disk_template == constants.DT_DISKLESS:
3072
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3073
                                 self.op.instance_name)
3074
    if instance.admin_up:
3075
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3076
                                 self.op.instance_name)
3077
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3078
                                              instance.name,
3079
                                              instance.hypervisor)
3080
    remote_info.Raise()
3081
    if remote_info.data:
3082
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3083
                                 (self.op.instance_name,
3084
                                  instance.primary_node))
3085

    
3086
    self.op.os_type = getattr(self.op, "os_type", None)
3087
    if self.op.os_type is not None:
3088
      # OS verification
3089
      pnode = self.cfg.GetNodeInfo(
3090
        self.cfg.ExpandNodeName(instance.primary_node))
3091
      if pnode is None:
3092
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
3093
                                   self.op.pnode)
3094
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3095
      result.Raise()
3096
      if not isinstance(result.data, objects.OS):
3097
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3098
                                   " primary node"  % self.op.os_type)
3099

    
3100
    self.instance = instance
3101

    
3102
  def Exec(self, feedback_fn):
3103
    """Reinstall the instance.
3104

3105
    """
3106
    inst = self.instance
3107

    
3108
    if self.op.os_type is not None:
3109
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3110
      inst.os = self.op.os_type
3111
      self.cfg.Update(inst)
3112

    
3113
    _StartInstanceDisks(self, inst, None)
3114
    try:
3115
      feedback_fn("Running the instance OS create scripts...")
3116
      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
3117
      msg = result.RemoteFailMsg()
3118
      if msg:
3119
        raise errors.OpExecError("Could not install OS for instance %s"
3120
                                 " on node %s: %s" %
3121
                                 (inst.name, inst.primary_node, msg))
3122
    finally:
3123
      _ShutdownInstanceDisks(self, inst)
3124

    
3125

    
3126
class LURenameInstance(LogicalUnit):
3127
  """Rename an instance.
3128

3129
  """
3130
  HPATH = "instance-rename"
3131
  HTYPE = constants.HTYPE_INSTANCE
3132
  _OP_REQP = ["instance_name", "new_name"]
3133

    
3134
  def BuildHooksEnv(self):
3135
    """Build hooks env.
3136

3137
    This runs on master, primary and secondary nodes of the instance.
3138

3139
    """
3140
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3141
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3142
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3143
    return env, nl, nl
3144

    
3145
  def CheckPrereq(self):
3146
    """Check prerequisites.
3147

3148
    This checks that the instance is in the cluster and is not running.
3149

3150
    """
3151
    instance = self.cfg.GetInstanceInfo(
3152
      self.cfg.ExpandInstanceName(self.op.instance_name))
3153
    if instance is None:
3154
      raise errors.OpPrereqError("Instance '%s' not known" %
3155
                                 self.op.instance_name)
3156
    _CheckNodeOnline(self, instance.primary_node)
3157

    
3158
    if instance.admin_up:
3159
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3160
                                 self.op.instance_name)
3161
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3162
                                              instance.name,
3163
                                              instance.hypervisor)
3164
    remote_info.Raise()
3165
    if remote_info.data:
3166
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3167
                                 (self.op.instance_name,
3168
                                  instance.primary_node))
3169
    self.instance = instance
3170

    
3171
    # new name verification
3172
    name_info = utils.HostInfo(self.op.new_name)
3173

    
3174
    self.op.new_name = new_name = name_info.name
3175
    instance_list = self.cfg.GetInstanceList()
3176
    if new_name in instance_list:
3177
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3178
                                 new_name)
3179

    
3180
    if not getattr(self.op, "ignore_ip", False):
3181
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3182
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3183
                                   (name_info.ip, new_name))
3184

    
3185

    
3186
  def Exec(self, feedback_fn):
3187
    """Reinstall the instance.
3188

3189
    """
3190
    inst = self.instance
3191
    old_name = inst.name
3192

    
3193
    if inst.disk_template == constants.DT_FILE:
3194
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3195

    
3196
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3197
    # Change the instance lock. This is definitely safe while we hold the BGL
3198
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3199
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3200

    
3201
    # re-read the instance from the configuration after rename
3202
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3203

    
3204
    if inst.disk_template == constants.DT_FILE:
3205
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3206
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3207
                                                     old_file_storage_dir,
3208
                                                     new_file_storage_dir)
3209
      result.Raise()
3210
      if not result.data:
3211
        raise errors.OpExecError("Could not connect to node '%s' to rename"
3212
                                 " directory '%s' to '%s' (but the instance"
3213
                                 " has been renamed in Ganeti)" % (
3214
                                 inst.primary_node, old_file_storage_dir,
3215
                                 new_file_storage_dir))
3216

    
3217
      if not result.data[0]:
3218
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3219
                                 " (but the instance has been renamed in"
3220
                                 " Ganeti)" % (old_file_storage_dir,
3221
                                               new_file_storage_dir))
3222

    
3223
    _StartInstanceDisks(self, inst, None)
3224
    try:
3225
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3226
                                                 old_name)
3227
      msg = result.RemoteFailMsg()
3228
      if msg:
3229
        msg = ("Could not run OS rename script for instance %s on node %s"
3230
               " (but the instance has been renamed in Ganeti): %s" %
3231
               (inst.name, inst.primary_node, msg))
3232
        self.proc.LogWarning(msg)
3233
    finally:
3234
      _ShutdownInstanceDisks(self, inst)
3235

    
3236

    
3237
class LURemoveInstance(LogicalUnit):
3238
  """Remove an instance.
3239

3240
  """
3241
  HPATH = "instance-remove"
3242
  HTYPE = constants.HTYPE_INSTANCE
3243
  _OP_REQP = ["instance_name", "ignore_failures"]
3244
  REQ_BGL = False
3245

    
3246
  def ExpandNames(self):
3247
    self._ExpandAndLockInstance()
3248
    self.needed_locks[locking.LEVEL_NODE] = []
3249
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3250

    
3251
  def DeclareLocks(self, level):
3252
    if level == locking.LEVEL_NODE:
3253
      self._LockInstancesNodes()
3254

    
3255
  def BuildHooksEnv(self):
3256
    """Build hooks env.
3257

3258
    This runs on master, primary and secondary nodes of the instance.
3259

3260
    """
3261
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3262
    nl = [self.cfg.GetMasterNode()]
3263
    return env, nl, nl
3264

    
3265
  def CheckPrereq(self):
3266
    """Check prerequisites.
3267

3268
    This checks that the instance is in the cluster.
3269

3270
    """
3271
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3272
    assert self.instance is not None, \
3273
      "Cannot retrieve locked instance %s" % self.op.instance_name
3274

    
3275
  def Exec(self, feedback_fn):
3276
    """Remove the instance.
3277

3278
    """
3279
    instance = self.instance
3280
    logging.info("Shutting down instance %s on node %s",
3281
                 instance.name, instance.primary_node)
3282

    
3283
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3284
    msg = result.RemoteFailMsg()
3285
    if msg:
3286
      if self.op.ignore_failures:
3287
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3288
      else:
3289
        raise errors.OpExecError("Could not shutdown instance %s on"
3290
                                 " node %s: %s" %
3291
                                 (instance.name, instance.primary_node, msg))
3292

    
3293
    logging.info("Removing block devices for instance %s", instance.name)
3294

    
3295
    if not _RemoveDisks(self, instance):
3296
      if self.op.ignore_failures:
3297
        feedback_fn("Warning: can't remove instance's disks")
3298
      else:
3299
        raise errors.OpExecError("Can't remove instance's disks")
3300

    
3301
    logging.info("Removing instance %s out of cluster config", instance.name)
3302

    
3303
    self.cfg.RemoveInstance(instance.name)
3304
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3305

    
3306

    
3307
class LUQueryInstances(NoHooksLU):
3308
  """Logical unit for querying instances.
3309

3310
  """
3311
  _OP_REQP = ["output_fields", "names", "use_locking"]
3312
  REQ_BGL = False
3313
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3314
                                    "admin_state",
3315
                                    "disk_template", "ip", "mac", "bridge",
3316
                                    "sda_size", "sdb_size", "vcpus", "tags",
3317
                                    "network_port", "beparams",
3318
                                    r"(disk)\.(size)/([0-9]+)",
3319
                                    r"(disk)\.(sizes)", "disk_usage",
3320
                                    r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3321
                                    r"(nic)\.(macs|ips|bridges)",
3322
                                    r"(disk|nic)\.(count)",
3323
                                    "serial_no", "hypervisor", "hvparams",] +
3324
                                  ["hv/%s" % name
3325
                                   for name in constants.HVS_PARAMETERS] +
3326
                                  ["be/%s" % name
3327
                                   for name in constants.BES_PARAMETERS])
3328
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3329

    
3330

    
3331
  def ExpandNames(self):
3332
    _CheckOutputFields(static=self._FIELDS_STATIC,
3333
                       dynamic=self._FIELDS_DYNAMIC,
3334
                       selected=self.op.output_fields)
3335

    
3336
    self.needed_locks = {}
3337
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3338
    self.share_locks[locking.LEVEL_NODE] = 1
3339

    
3340
    if self.op.names:
3341
      self.wanted = _GetWantedInstances(self, self.op.names)
3342
    else:
3343
      self.wanted = locking.ALL_SET
3344

    
3345
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3346
    self.do_locking = self.do_node_query and self.op.use_locking
3347
    if self.do_locking:
3348
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3349
      self.needed_locks[locking.LEVEL_NODE] = []
3350
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3351

    
3352
  def DeclareLocks(self, level):
3353
    if level == locking.LEVEL_NODE and self.do_locking:
3354
      self._LockInstancesNodes()
3355

    
3356
  def CheckPrereq(self):
3357
    """Check prerequisites.
3358

3359
    """
3360
    pass
3361

    
3362
  def Exec(self, feedback_fn):
3363
    """Computes the list of nodes and their attributes.
3364

3365
    """
3366
    all_info = self.cfg.GetAllInstancesInfo()
3367
    if self.wanted == locking.ALL_SET:
3368
      # caller didn't specify instance names, so ordering is not important
3369
      if self.do_locking:
3370
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3371
      else:
3372
        instance_names = all_info.keys()
3373
      instance_names = utils.NiceSort(instance_names)
3374
    else:
3375
      # caller did specify names, so we must keep the ordering
3376
      if self.do_locking:
3377
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3378
      else:
3379
        tgt_set = all_info.keys()
3380
      missing = set(self.wanted).difference(tgt_set)
3381
      if missing:
3382
        raise errors.OpExecError("Some instances were removed before"
3383
                                 " retrieving their data: %s" % missing)
3384
      instance_names = self.wanted
3385

    
3386
    instance_list = [all_info[iname] for iname in instance_names]
3387

    
3388
    # begin data gathering
3389

    
3390
    nodes = frozenset([inst.primary_node for inst in instance_list])
3391
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3392

    
3393
    bad_nodes = []
3394
    off_nodes = []
3395
    if self.do_node_query:
3396
      live_data = {}
3397
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3398
      for name in nodes:
3399
        result = node_data[name]
3400
        if result.offline:
3401
          # offline nodes will be in both lists
3402
          off_nodes.append(name)
3403
        if result.failed:
3404
          bad_nodes.append(name)
3405
        else:
3406
          if result.data:
3407
            live_data.update(result.data)
3408
            # else no instance is alive
3409
    else:
3410
      live_data = dict([(name, {}) for name in instance_names])
3411

    
3412
    # end data gathering
3413

    
3414
    HVPREFIX = "hv/"
3415
    BEPREFIX = "be/"
3416
    output = []
3417
    for instance in instance_list:
3418
      iout = []
3419
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3420
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3421
      for field in self.op.output_fields:
3422
        st_match = self._FIELDS_STATIC.Matches(field)
3423
        if field == "name":
3424
          val = instance.name
3425
        elif field == "os":
3426
          val = instance.os
3427
        elif field == "pnode":
3428
          val = instance.primary_node
3429
        elif field == "snodes":
3430
          val = list(instance.secondary_nodes)
3431
        elif field == "admin_state":
3432
          val = instance.admin_up
3433
        elif field == "oper_state":
3434
          if instance.primary_node in bad_nodes:
3435
            val = None
3436
          else:
3437
            val = bool(live_data.get(instance.name))
3438
        elif field == "status":
3439
          if instance.primary_node in off_nodes:
3440
            val = "ERROR_nodeoffline"
3441
          elif instance.primary_node in bad_nodes:
3442
            val = "ERROR_nodedown"
3443
          else:
3444
            running = bool(live_data.get(instance.name))
3445
            if running:
3446
              if instance.admin_up:
3447
                val = "running"
3448
              else:
3449
                val = "ERROR_up"
3450
            else:
3451
              if instance.admin_up:
3452
                val = "ERROR_down"
3453
              else:
3454
                val = "ADMIN_down"
3455
        elif field == "oper_ram":
3456
          if instance.primary_node in bad_nodes:
3457
            val = None
3458
          elif instance.name in live_data:
3459
            val = live_data[instance.name].get("memory", "?")
3460
          else:
3461
            val = "-"
3462
        elif field == "vcpus":
3463
          val = i_be[constants.BE_VCPUS]
3464
        elif field == "disk_template":
3465
          val = instance.disk_template
3466
        elif field == "ip":
3467
          if instance.nics:
3468
            val = instance.nics[0].ip
3469
          else:
3470
            val = None
3471
        elif field == "bridge":
3472
          if instance.nics:
3473
            val = instance.nics[0].bridge
3474
          else:
3475
            val = None
3476
        elif field == "mac":
3477
          if instance.nics:
3478
            val = instance.nics[0].mac
3479
          else:
3480
            val = None
3481
        elif field == "sda_size" or field == "sdb_size":
3482
          idx = ord(field[2]) - ord('a')
3483
          try:
3484
            val = instance.FindDisk(idx).size
3485
          except errors.OpPrereqError:
3486
            val = None
3487
        elif field == "disk_usage": # total disk usage per node
3488
          disk_sizes = [{'size': disk.size} for disk in instance.disks]
3489
          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3490
        elif field == "tags":
3491
          val = list(instance.GetTags())
3492
        elif field == "serial_no":
3493
          val = instance.serial_no
3494
        elif field == "network_port":
3495
          val = instance.network_port
3496
        elif field == "hypervisor":
3497
          val = instance.hypervisor
3498
        elif field == "hvparams":
3499
          val = i_hv
3500
        elif (field.startswith(HVPREFIX) and
3501
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3502
          val = i_hv.get(field[len(HVPREFIX):], None)
3503
        elif field == "beparams":
3504
          val = i_be
3505
        elif (field.startswith(BEPREFIX) and
3506
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3507
          val = i_be.get(field[len(BEPREFIX):], None)
3508
        elif st_match and st_match.groups():
3509
          # matches a variable list
3510
          st_groups = st_match.groups()
3511
          if st_groups and st_groups[0] == "disk":
3512
            if st_groups[1] == "count":
3513
              val = len(instance.disks)
3514
            elif st_groups[1] == "sizes":
3515
              val = [disk.size for disk in instance.disks]
3516
            elif st_groups[1] == "size":
3517
              try:
3518
                val = instance.FindDisk(st_groups[2]).size
3519
              except errors.OpPrereqError:
3520
                val = None
3521
            else:
3522
              assert False, "Unhandled disk parameter"
3523
          elif st_groups[0] == "nic":
3524
            if st_groups[1] == "count":
3525
              val = len(instance.nics)
3526
            elif st_groups[1] == "macs":
3527
              val = [nic.mac for nic in instance.nics]
3528
            elif st_groups[1] == "ips":
3529
              val = [nic.ip for nic in instance.nics]
3530
            elif st_groups[1] == "bridges":
3531
              val = [nic.bridge for nic in instance.nics]
3532
            else:
3533
              # index-based item
3534
              nic_idx = int(st_groups[2])
3535
              if nic_idx >= len(instance.nics):
3536
                val = None
3537
              else:
3538
                if st_groups[1] == "mac":
3539
                  val = instance.nics[nic_idx].mac
3540
                elif st_groups[1] == "ip":
3541
                  val = instance.nics[nic_idx].ip
3542
                elif st_groups[1] == "bridge":
3543
                  val = instance.nics[nic_idx].bridge
3544
                else:
3545
                  assert False, "Unhandled NIC parameter"
3546
          else:
3547
            assert False, ("Declared but unhandled variable parameter '%s'" %
3548
                           field)
3549
        else:
3550
          assert False, "Declared but unhandled parameter '%s'" % field
3551
        iout.append(val)
3552
      output.append(iout)
3553

    
3554
    return output
3555

    
3556

    
3557
class LUFailoverInstance(LogicalUnit):
3558
  """Failover an instance.
3559

3560
  """
3561
  HPATH = "instance-failover"
3562
  HTYPE = constants.HTYPE_INSTANCE
3563
  _OP_REQP = ["instance_name", "ignore_consistency"]
3564
  REQ_BGL = False
3565

    
3566
  def ExpandNames(self):
3567
    self._ExpandAndLockInstance()
3568
    self.needed_locks[locking.LEVEL_NODE] = []
3569
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3570

    
3571
  def DeclareLocks(self, level):
3572
    if level == locking.LEVEL_NODE:
3573
      self._LockInstancesNodes()
3574

    
3575
  def BuildHooksEnv(self):
3576
    """Build hooks env.
3577

3578
    This runs on master, primary and secondary nodes of the instance.
3579

3580
    """
3581
    env = {
3582
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3583
      }
3584
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3585
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3586
    return env, nl, nl
3587

    
3588
  def CheckPrereq(self):
3589
    """Check prerequisites.
3590

3591
    This checks that the instance is in the cluster.
3592

3593
    """
3594
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3595
    assert self.instance is not None, \
3596
      "Cannot retrieve locked instance %s" % self.op.instance_name
3597

    
3598
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3599
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3600
      raise errors.OpPrereqError("Instance's disk layout is not"
3601
                                 " network mirrored, cannot failover.")
3602

    
3603
    secondary_nodes = instance.secondary_nodes
3604
    if not secondary_nodes:
3605
      raise errors.ProgrammerError("no secondary node but using "
3606
                                   "a mirrored disk template")
3607

    
3608
    target_node = secondary_nodes[0]
3609
    _CheckNodeOnline(self, target_node)
3610
    _CheckNodeNotDrained(self, target_node)
3611

    
3612
    if instance.admin_up:
3613
      # check memory requirements on the secondary node
3614
      _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3615
                           instance.name, bep[constants.BE_MEMORY],
3616
                           instance.hypervisor)
3617
    else:
3618
      self.LogInfo("Not checking memory on the secondary node as"
3619
                   " instance will not be started")
3620

    
3621
    # check bridge existence
3622
    brlist = [nic.bridge for nic in instance.nics]
3623
    result = self.rpc.call_bridges_exist(target_node, brlist)
3624
    result.Raise()
3625
    if not result.data:
3626
      raise errors.OpPrereqError("One or more target bridges %s does not"
3627
                                 " exist on destination node '%s'" %
3628
                                 (brlist, target_node))
3629

    
3630
  def Exec(self, feedback_fn):
3631
    """Failover an instance.
3632

3633
    The failover is done by shutting it down on its present node and
3634
    starting it on the secondary.
3635

3636
    """
3637
    instance = self.instance
3638

    
3639
    source_node = instance.primary_node
3640
    target_node = instance.secondary_nodes[0]
3641

    
3642
    feedback_fn("* checking disk consistency between source and target")
3643
    for dev in instance.disks:
3644
      # for drbd, these are drbd over lvm
3645
      if not _CheckDiskConsistency(self, dev, target_node, False):
3646
        if instance.admin_up and not self.op.ignore_consistency:
3647
          raise errors.OpExecError("Disk %s is degraded on target node,"
3648
                                   " aborting failover." % dev.iv_name)
3649

    
3650
    feedback_fn("* shutting down instance on source node")
3651
    logging.info("Shutting down instance %s on node %s",
3652
                 instance.name, source_node)
3653

    
3654
    result = self.rpc.call_instance_shutdown(source_node, instance)
3655
    msg = result.RemoteFailMsg()
3656
    if msg:
3657
      if self.op.ignore_consistency:
3658
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3659
                             " Proceeding anyway. Please make sure node"
3660
                             " %s is down. Error details: %s",
3661
                             instance.name, source_node, source_node, msg)
3662
      else:
3663
        raise errors.OpExecError("Could not shutdown instance %s on"
3664
                                 " node %s: %s" %
3665
                                 (instance.name, source_node, msg))
3666

    
3667
    feedback_fn("* deactivating the instance's disks on source node")
3668
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3669
      raise errors.OpExecError("Can't shut down the instance's disks.")
3670

    
3671
    instance.primary_node = target_node
3672
    # distribute new instance config to the other nodes
3673
    self.cfg.Update(instance)
3674

    
3675
    # Only start the instance if it's marked as up
3676
    if instance.admin_up:
3677
      feedback_fn("* activating the instance's disks on target node")
3678
      logging.info("Starting instance %s on node %s",
3679
                   instance.name, target_node)
3680

    
3681
      disks_ok, _ = _AssembleInstanceDisks(self, instance,
3682
                                               ignore_secondaries=True)
3683
      if not disks_ok:
3684
        _ShutdownInstanceDisks(self, instance)
3685
        raise errors.OpExecError("Can't activate the instance's disks")
3686

    
3687
      feedback_fn("* starting the instance on the target node")
3688
      result = self.rpc.call_instance_start(target_node, instance, None, None)
3689
      msg = result.RemoteFailMsg()
3690
      if msg:
3691
        _ShutdownInstanceDisks(self, instance)
3692
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3693
                                 (instance.name, target_node, msg))
3694

    
3695

    
3696
class LUMigrateInstance(LogicalUnit):
3697
  """Migrate an instance.
3698

3699
  This is migration without shutting down, compared to the failover,
3700
  which is done with shutdown.
3701

3702
  """
3703
  HPATH = "instance-migrate"
3704
  HTYPE = constants.HTYPE_INSTANCE
3705
  _OP_REQP = ["instance_name", "live", "cleanup"]
3706

    
3707
  REQ_BGL = False
3708

    
3709
  def ExpandNames(self):
3710
    self._ExpandAndLockInstance()
3711
    self.needed_locks[locking.LEVEL_NODE] = []
3712
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3713

    
3714
  def DeclareLocks(self, level):
3715
    if level == locking.LEVEL_NODE:
3716
      self._LockInstancesNodes()
3717

    
3718
  def BuildHooksEnv(self):
3719
    """Build hooks env.
3720

3721
    This runs on master, primary and secondary nodes of the instance.
3722

3723
    """
3724
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3725
    env["MIGRATE_LIVE"] = self.op.live
3726
    env["MIGRATE_CLEANUP"] = self.op.cleanup
3727
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3728
    return env, nl, nl
3729

    
3730
  def CheckPrereq(self):
3731
    """Check prerequisites.
3732

3733
    This checks that the instance is in the cluster.
3734

3735
    """
3736
    instance = self.cfg.GetInstanceInfo(
3737
      self.cfg.ExpandInstanceName(self.op.instance_name))
3738
    if instance is None:
3739
      raise errors.OpPrereqError("Instance '%s' not known" %
3740
                                 self.op.instance_name)
3741

    
3742
    if instance.disk_template != constants.DT_DRBD8:
3743
      raise errors.OpPrereqError("Instance's disk layout is not"
3744
                                 " drbd8, cannot migrate.")
3745

    
3746
    secondary_nodes = instance.secondary_nodes
3747
    if not secondary_nodes:
3748
      raise errors.ConfigurationError("No secondary node but using"
3749
                                      " drbd8 disk template")
3750

    
3751
    i_be = self.cfg.GetClusterInfo().FillBE(instance)
3752

    
3753
    target_node = secondary_nodes[0]
3754
    # check memory requirements on the secondary node
3755
    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3756
                         instance.name, i_be[constants.BE_MEMORY],
3757
                         instance.hypervisor)
3758

    
3759
    # check bridge existence
3760
    brlist = [nic.bridge for nic in instance.nics]
3761
    result = self.rpc.call_bridges_exist(target_node, brlist)
3762
    if result.failed or not result.data:
3763
      raise errors.OpPrereqError("One or more target bridges %s does not"
3764
                                 " exist on destination node '%s'" %
3765
                                 (brlist, target_node))
3766

    
3767
    if not self.op.cleanup:
3768
      _CheckNodeNotDrained(self, target_node)
3769
      result = self.rpc.call_instance_migratable(instance.primary_node,
3770
                                                 instance)
3771
      msg = result.RemoteFailMsg()
3772
      if msg:
3773
        raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3774
                                   msg)
3775

    
3776
    self.instance = instance
3777

    
3778
  def _WaitUntilSync(self):
3779
    """Poll with custom rpc for disk sync.
3780

3781
    This uses our own step-based rpc call.
3782

3783
    """
3784
    self.feedback_fn("* wait until resync is done")
3785
    all_done = False
3786
    while not all_done:
3787
      all_done = True
3788
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3789
                                            self.nodes_ip,
3790
                                            self.instance.disks)
3791
      min_percent = 100
3792
      for node, nres in result.items():
3793
        msg = nres.RemoteFailMsg()
3794
        if msg:
3795
          raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3796
                                   (node, msg))
3797
        node_done, node_percent = nres.payload
3798
        all_done = all_done and node_done
3799
        if node_percent is not None:
3800
          min_percent = min(min_percent, node_percent)
3801
      if not all_done:
3802
        if min_percent < 100:
3803
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
3804
        time.sleep(2)
3805

    
3806
  def _EnsureSecondary(self, node):
3807
    """Demote a node to secondary.
3808

3809
    """
3810
    self.feedback_fn("* switching node %s to secondary mode" % node)
3811

    
3812
    for dev in self.instance.disks:
3813
      self.cfg.SetDiskID(dev, node)
3814

    
3815
    result = self.rpc.call_blockdev_close(node, self.instance.name,
3816
                                          self.instance.disks)
3817
    msg = result.RemoteFailMsg()
3818
    if msg:
3819
      raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3820
                               " error %s" % (node, msg))
3821

    
3822
  def _GoStandalone(self):
3823
    """Disconnect from the network.
3824

3825
    """
3826
    self.feedback_fn("* changing into standalone mode")
3827
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3828
                                               self.instance.disks)
3829
    for node, nres in result.items():
3830
      msg = nres.RemoteFailMsg()
3831
      if msg:
3832
        raise errors.OpExecError("Cannot disconnect disks node %s,"
3833
                                 " error %s" % (node, msg))
3834

    
3835
  def _GoReconnect(self, multimaster):
3836
    """Reconnect to the network.
3837

3838
    """
3839
    if multimaster:
3840
      msg = "dual-master"
3841
    else:
3842
      msg = "single-master"
3843
    self.feedback_fn("* changing disks into %s mode" % msg)
3844
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3845
                                           self.instance.disks,
3846
                                           self.instance.name, multimaster)
3847
    for node, nres in result.items():
3848
      msg = nres.RemoteFailMsg()
3849
      if msg:
3850
        raise errors.OpExecError("Cannot change disks config on node %s,"
3851
                                 " error: %s" % (node, msg))
3852

    
3853
  def _ExecCleanup(self):
3854
    """Try to cleanup after a failed migration.
3855

3856
    The cleanup is done by:
3857
      - check that the instance is running only on one node
3858
        (and update the config if needed)
3859
      - change disks on its secondary node to secondary
3860
      - wait until disks are fully synchronized
3861
      - disconnect from the network
3862
      - change disks into single-master mode
3863
      - wait again until disks are fully synchronized
3864

3865
    """
3866
    instance = self.instance
3867
    target_node = self.target_node
3868
    source_node = self.source_node
3869

    
3870
    # check running on only one node
3871
    self.feedback_fn("* checking where the instance actually runs"
3872
                     " (if this hangs, the hypervisor might be in"
3873
                     " a bad state)")
3874
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3875
    for node, result in ins_l.items():
3876
      result.Raise()
3877
      if not isinstance(result.data, list):
3878
        raise errors.OpExecError("Can't contact node '%s'" % node)
3879

    
3880
    runningon_source = instance.name in ins_l[source_node].data
3881
    runningon_target = instance.name in ins_l[target_node].data
3882

    
3883
    if runningon_source and runningon_target:
3884
      raise errors.OpExecError("Instance seems to be running on two nodes,"
3885
                               " or the hypervisor is confused. You will have"
3886
                               " to ensure manually that it runs only on one"
3887
                               " and restart this operation.")
3888

    
3889
    if not (runningon_source or runningon_target):
3890
      raise errors.OpExecError("Instance does not seem to be running at all."
3891
                               " In this case, it's safer to repair by"
3892
                               " running 'gnt-instance stop' to ensure disk"
3893
                               " shutdown, and then restarting it.")
3894

    
3895
    if runningon_target:
3896
      # the migration has actually succeeded, we need to update the config
3897
      self.feedback_fn("* instance running on secondary node (%s),"
3898
                       " updating config" % target_node)
3899
      instance.primary_node = target_node
3900
      self.cfg.Update(instance)
3901
      demoted_node = source_node
3902
    else:
3903
      self.feedback_fn("* instance confirmed to be running on its"
3904
                       " primary node (%s)" % source_node)
3905
      demoted_node = target_node
3906

    
3907
    self._EnsureSecondary(demoted_node)
3908
    try:
3909
      self._WaitUntilSync()
3910
    except errors.OpExecError:
3911
      # we ignore here errors, since if the device is standalone, it
3912
      # won't be able to sync
3913
      pass
3914
    self._GoStandalone()
3915
    self._GoReconnect(False)
3916
    self._WaitUntilSync()
3917

    
3918
    self.feedback_fn("* done")
3919

    
3920
  def _RevertDiskStatus(self):
3921
    """Try to revert the disk status after a failed migration.
3922

3923
    """
3924
    target_node = self.target_node
3925
    try:
3926
      self._EnsureSecondary(target_node)
3927
      self._GoStandalone()
3928
      self._GoReconnect(False)
3929
      self._WaitUntilSync()
3930
    except errors.OpExecError, err:
3931
      self.LogWarning("Migration failed and I can't reconnect the"
3932
                      " drives: error '%s'\n"
3933
                      "Please look and recover the instance status" %
3934
                      str(err))
3935

    
3936
  def _AbortMigration(self):
3937
    """Call the hypervisor code to abort a started migration.
3938

3939
    """
3940
    instance = self.instance
3941
    target_node = self.target_node
3942
    migration_info = self.migration_info
3943

    
3944
    abort_result = self.rpc.call_finalize_migration(target_node,
3945
                                                    instance,
3946
                                                    migration_info,
3947
                                                    False)
3948
    abort_msg = abort_result.RemoteFailMsg()
3949
    if abort_msg:
3950
      logging.error("Aborting migration failed on target node %s: %s" %
3951
                    (target_node, abort_msg))
3952
      # Don't raise an exception here, as we stil have to try to revert the
3953
      # disk status, even if this step failed.
3954

    
3955
  def _ExecMigration(self):
3956
    """Migrate an instance.
3957

3958
    The migrate is done by:
3959
      - change the disks into dual-master mode
3960
      - wait until disks are fully synchronized again
3961
      - migrate the instance
3962
      - change disks on the new secondary node (the old primary) to secondary
3963
      - wait until disks are fully synchronized
3964
      - change disks into single-master mode
3965

3966
    """
3967
    instance = self.instance
3968
    target_node = self.target_node
3969
    source_node = self.source_node
3970

    
3971
    self.feedback_fn("* checking disk consistency between source and target")
3972
    for dev in instance.disks:
3973
      if not _CheckDiskConsistency(self, dev, target_node, False):
3974
        raise errors.OpExecError("Disk %s is degraded or not fully"
3975
                                 " synchronized on target node,"
3976
                                 " aborting migrate." % dev.iv_name)
3977

    
3978
    # First get the migration information from the remote node
3979
    result = self.rpc.call_migration_info(source_node, instance)
3980
    msg = result.RemoteFailMsg()
3981
    if msg:
3982
      log_err = ("Failed fetching source migration information from %s: %s" %
3983
                 (source_node, msg))
3984
      logging.error(log_err)
3985
      raise errors.OpExecError(log_err)
3986

    
3987
    self.migration_info = migration_info = result.payload
3988

    
3989
    # Then switch the disks to master/master mode
3990
    self._EnsureSecondary(target_node)
3991
    self._GoStandalone()
3992
    self._GoReconnect(True)
3993
    self._WaitUntilSync()
3994

    
3995
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
3996
    result = self.rpc.call_accept_instance(target_node,
3997
                                           instance,
3998
                                           migration_info,
3999
                                           self.nodes_ip[target_node])
4000

    
4001
    msg = result.RemoteFailMsg()
4002
    if msg:
4003
      logging.error("Instance pre-migration failed, trying to revert"
4004
                    " disk status: %s", msg)
4005
      self._AbortMigration()
4006
      self._RevertDiskStatus()
4007
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4008
                               (instance.name, msg))
4009

    
4010
    self.feedback_fn("* migrating instance to %s" % target_node)
4011
    time.sleep(10)
4012
    result = self.rpc.call_instance_migrate(source_node, instance,
4013
                                            self.nodes_ip[target_node],
4014
                                            self.op.live)
4015
    msg = result.RemoteFailMsg()
4016
    if msg:
4017
      logging.error("Instance migration failed, trying to revert"
4018
                    " disk status: %s", msg)
4019
      self._AbortMigration()
4020
      self._RevertDiskStatus()
4021
      raise errors.OpExecError("Could not migrate instance %s: %s" %
4022
                               (instance.name, msg))
4023
    time.sleep(10)
4024

    
4025
    instance.primary_node = target_node
4026
    # distribute new instance config to the other nodes
4027
    self.cfg.Update(instance)
4028

    
4029
    result = self.rpc.call_finalize_migration(target_node,
4030
                                              instance,
4031
                                              migration_info,
4032
                                              True)
4033
    msg = result.RemoteFailMsg()
4034
    if msg:
4035
      logging.error("Instance migration succeeded, but finalization failed:"
4036
                    " %s" % msg)
4037
      raise errors.OpExecError("Could not finalize instance migration: %s" %
4038
                               msg)
4039

    
4040
    self._EnsureSecondary(source_node)
4041
    self._WaitUntilSync()
4042
    self._GoStandalone()
4043
    self._GoReconnect(False)
4044
    self._WaitUntilSync()
4045

    
4046
    self.feedback_fn("* done")
4047

    
4048
  def Exec(self, feedback_fn):
4049
    """Perform the migration.
4050

4051
    """
4052
    self.feedback_fn = feedback_fn
4053

    
4054
    self.source_node = self.instance.primary_node
4055
    self.target_node = self.instance.secondary_nodes[0]
4056
    self.all_nodes = [self.source_node, self.target_node]
4057
    self.nodes_ip = {
4058
      self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4059
      self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4060
      }
4061
    if self.op.cleanup:
4062
      return self._ExecCleanup()
4063
    else:
4064
      return self._ExecMigration()
4065

    
4066

    
4067
def _CreateBlockDev(lu, node, instance, device, force_create,
4068
                    info, force_open):
4069
  """Create a tree of block devices on a given node.
4070

4071
  If this device type has to be created on secondaries, create it and
4072
  all its children.
4073

4074
  If not, just recurse to children keeping the same 'force' value.
4075

4076
  @param lu: the lu on whose behalf we execute
4077
  @param node: the node on which to create the device
4078
  @type instance: L{objects.Instance}
4079
  @param instance: the instance which owns the device
4080
  @type device: L{objects.Disk}
4081
  @param device: the device to create
4082
  @type force_create: boolean
4083
  @param force_create: whether to force creation of this device; this
4084
      will be change to True whenever we find a device which has
4085
      CreateOnSecondary() attribute
4086
  @param info: the extra 'metadata' we should attach to the device
4087
      (this will be represented as a LVM tag)
4088
  @type force_open: boolean
4089
  @param force_open: this parameter will be passes to the
4090
      L{backend.BlockdevCreate} function where it specifies
4091
      whether we run on primary or not, and it affects both
4092
      the child assembly and the device own Open() execution
4093

4094
  """
4095
  if device.CreateOnSecondary():
4096
    force_create = True
4097

    
4098
  if device.children:
4099
    for child in device.children:
4100
      _CreateBlockDev(lu, node, instance, child, force_create,
4101
                      info, force_open)
4102

    
4103
  if not force_create:
4104
    return
4105

    
4106
  _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4107

    
4108

    
4109
def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4110
  """Create a single block device on a given node.
4111

4112
  This will not recurse over children of the device, so they must be
4113
  created in advance.
4114

4115
  @param lu: the lu on whose behalf we execute
4116
  @param node: the node on which to create the device
4117
  @type instance: L{objects.Instance}
4118
  @param instance: the instance which owns the device
4119
  @type device: L{objects.Disk}
4120
  @param device: the device to create
4121
  @param info: the extra 'metadata' we should attach to the device
4122
      (this will be represented as a LVM tag)
4123
  @type force_open: boolean
4124
  @param force_open: this parameter will be passes to the
4125
      L{backend.BlockdevCreate} function where it specifies
4126
      whether we run on primary or not, and it affects both
4127
      the child assembly and the device own Open() execution
4128

4129
  """
4130
  lu.cfg.SetDiskID(device, node)
4131
  result = lu.rpc.call_blockdev_create(node, device, device.size,
4132
                                       instance.name, force_open, info)
4133
  msg = result.RemoteFailMsg()
4134
  if msg:
4135
    raise errors.OpExecError("Can't create block device %s on"
4136
                             " node %s for instance %s: %s" %
4137
                             (device, node, instance.name, msg))
4138
  if device.physical_id is None:
4139
    device.physical_id = result.payload
4140

    
4141

    
4142
def _GenerateUniqueNames(lu, exts):
4143
  """Generate a suitable LV name.
4144

4145
  This will generate a logical volume name for the given instance.
4146

4147
  """
4148
  results = []
4149
  for val in exts:
4150
    new_id = lu.cfg.GenerateUniqueID()
4151
    results.append("%s%s" % (new_id, val))
4152
  return results
4153

    
4154

    
4155
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4156
                         p_minor, s_minor):
4157
  """Generate a drbd8 device complete with its children.
4158

4159
  """
4160
  port = lu.cfg.AllocatePort()
4161
  vgname = lu.cfg.GetVGName()
4162
  shared_secret = lu.cfg.GenerateDRBDSecret()
4163
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4164
                          logical_id=(vgname, names[0]))
4165
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4166
                          logical_id=(vgname, names[1]))
4167
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4168
                          logical_id=(primary, secondary, port,
4169
                                      p_minor, s_minor,
4170
                                      shared_secret),
4171
                          children=[dev_data, dev_meta],
4172
                          iv_name=iv_name)
4173
  return drbd_dev
4174

    
4175

    
4176
def _GenerateDiskTemplate(lu, template_name,
4177
                          instance_name, primary_node,
4178
                          secondary_nodes, disk_info,
4179
                          file_storage_dir, file_driver,
4180
                          base_index):
4181
  """Generate the entire disk layout for a given template type.
4182

4183
  """
4184
  #TODO: compute space requirements
4185

    
4186
  vgname = lu.cfg.GetVGName()
4187
  disk_count = len(disk_info)
4188
  disks = []
4189
  if template_name == constants.DT_DISKLESS:
4190
    pass
4191
  elif template_name == constants.DT_PLAIN:
4192
    if len(secondary_nodes) != 0:
4193
      raise errors.ProgrammerError("Wrong template configuration")
4194

    
4195
    names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4196
                                      for i in range(disk_count)])
4197
    for idx, disk in enumerate(disk_info):
4198
      disk_index = idx + base_index
4199
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4200
                              logical_id=(vgname, names[idx]),
4201
                              iv_name="disk/%d" % disk_index,
4202
                              mode=disk["mode"])
4203
      disks.append(disk_dev)
4204
  elif template_name == constants.DT_DRBD8:
4205
    if len(secondary_nodes) != 1:
4206
      raise errors.ProgrammerError("Wrong template configuration")
4207
    remote_node = secondary_nodes[0]
4208
    minors = lu.cfg.AllocateDRBDMinor(
4209
      [primary_node, remote_node] * len(disk_info), instance_name)
4210

    
4211
    names = []
4212
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4213
                                               for i in range(disk_count)]):
4214
      names.append(lv_prefix + "_data")
4215
      names.append(lv_prefix + "_meta")
4216
    for idx, disk in enumerate(disk_info):
4217
      disk_index = idx + base_index
4218
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4219
                                      disk["size"], names[idx*2:idx*2+2],
4220
                                      "disk/%d" % disk_index,
4221
                                      minors[idx*2], minors[idx*2+1])
4222
      disk_dev.mode = disk["mode"]
4223
      disks.append(disk_dev)
4224
  elif template_name == constants.DT_FILE:
4225
    if len(secondary_nodes) != 0:
4226
      raise errors.ProgrammerError("Wrong template configuration")
4227

    
4228
    for idx, disk in enumerate(disk_info):
4229
      disk_index = idx + base_index
4230
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4231
                              iv_name="disk/%d" % disk_index,
4232
                              logical_id=(file_driver,
4233
                                          "%s/disk%d" % (file_storage_dir,
4234
                                                         disk_index)),
4235
                              mode=disk["mode"])
4236
      disks.append(disk_dev)
4237
  else:
4238
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4239
  return disks
4240

    
4241

    
4242
def _GetInstanceInfoText(instance):
4243
  """Compute that text that should be added to the disk's metadata.
4244

4245
  """
4246
  return "originstname+%s" % instance.name
4247

    
4248

    
4249
def _CreateDisks(lu, instance):
4250
  """Create all disks for an instance.
4251

4252
  This abstracts away some work from AddInstance.
4253

4254
  @type lu: L{LogicalUnit}
4255
  @param lu: the logical unit on whose behalf we execute
4256
  @type instance: L{objects.Instance}
4257
  @param instance: the instance whose disks we should create
4258
  @rtype: boolean
4259
  @return: the success of the creation
4260

4261
  """
4262
  info = _GetInstanceInfoText(instance)
4263
  pnode = instance.primary_node
4264

    
4265
  if instance.disk_template == constants.DT_FILE:
4266
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4267
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4268

    
4269
    if result.failed or not result.data:
4270
      raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4271

    
4272
    if not result.data[0]:
4273
      raise errors.OpExecError("Failed to create directory '%s'" %
4274
                               file_storage_dir)
4275

    
4276
  # Note: this needs to be kept in sync with adding of disks in
4277
  # LUSetInstanceParams
4278
  for device in instance.disks:
4279
    logging.info("Creating volume %s for instance %s",
4280
                 device.iv_name, instance.name)
4281
    #HARDCODE
4282
    for node in instance.all_nodes:
4283
      f_create = node == pnode
4284
      _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4285

    
4286

    
4287
def _RemoveDisks(lu, instance):
4288
  """Remove all disks for an instance.
4289

4290
  This abstracts away some work from `AddInstance()` and
4291
  `RemoveInstance()`. Note that in case some of the devices couldn't
4292
  be removed, the removal will continue with the other ones (compare
4293
  with `_CreateDisks()`).
4294

4295
  @type lu: L{LogicalUnit}
4296
  @param lu: the logical unit on whose behalf we execute
4297
  @type instance: L{objects.Instance}
4298
  @param instance: the instance whose disks we should remove
4299
  @rtype: boolean
4300
  @return: the success of the removal
4301

4302
  """
4303
  logging.info("Removing block devices for instance %s", instance.name)
4304

    
4305
  all_result = True
4306
  for device in instance.disks:
4307
    for node, disk in device.ComputeNodeTree(instance.primary_node):
4308
      lu.cfg.SetDiskID(disk, node)
4309
      msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4310
      if msg:
4311
        lu.LogWarning("Could not remove block device %s on node %s,"
4312
                      " continuing anyway: %s", device.iv_name, node, msg)
4313
        all_result = False
4314

    
4315
  if instance.disk_template == constants.DT_FILE:
4316
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4317
    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4318
                                                 file_storage_dir)
4319
    if result.failed or not result.data:
4320
      logging.error("Could not remove directory '%s'", file_storage_dir)
4321
      all_result = False
4322

    
4323
  return all_result
4324

    
4325

    
4326
def _ComputeDiskSize(disk_template, disks):
4327
  """Compute disk size requirements in the volume group
4328

4329
  """
4330
  # Required free disk space as a function of disk and swap space
4331
  req_size_dict = {
4332
    constants.DT_DISKLESS: None,
4333
    constants.DT_PLAIN: sum(d["size"] for d in disks),
4334
    # 128 MB are added for drbd metadata for each disk
4335
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4336
    constants.DT_FILE: None,
4337
  }
4338

    
4339
  if disk_template not in req_size_dict:
4340
    raise errors.ProgrammerError("Disk template '%s' size requirement"
4341
                                 " is unknown" %  disk_template)
4342

    
4343
  return req_size_dict[disk_template]
4344

    
4345

    
4346
def _CheckHVParams(lu, nodenames, hvname, hvparams):
4347
  """Hypervisor parameter validation.
4348

4349
  This function abstract the hypervisor parameter validation to be
4350
  used in both instance create and instance modify.
4351

4352
  @type lu: L{LogicalUnit}
4353
  @param lu: the logical unit for which we check
4354
  @type nodenames: list
4355
  @param nodenames: the list of nodes on which we should check
4356
  @type hvname: string
4357
  @param hvname: the name of the hypervisor we should use
4358
  @type hvparams: dict
4359
  @param hvparams: the parameters which we need to check
4360
  @raise errors.OpPrereqError: if the parameters are not valid
4361

4362
  """
4363
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4364
                                                  hvname,
4365
                                                  hvparams)
4366
  for node in nodenames:
4367
    info = hvinfo[node]
4368
    if info.offline:
4369
      continue
4370
    msg = info.RemoteFailMsg()
4371
    if msg:
4372
      raise errors.OpPrereqError("Hypervisor parameter validation"
4373
                                 " failed on node %s: %s" % (node, msg))
4374

    
4375

    
4376
class LUCreateInstance(LogicalUnit):
4377
  """Create an instance.
4378

4379
  """
4380
  HPATH = "instance-add"
4381
  HTYPE = constants.HTYPE_INSTANCE
4382
  _OP_REQP = ["instance_name", "disks", "disk_template",
4383
              "mode", "start",
4384
              "wait_for_sync", "ip_check", "nics",
4385
              "hvparams", "beparams"]
4386
  REQ_BGL = False
4387

    
4388
  def _ExpandNode(self, node):
4389
    """Expands and checks one node name.
4390

4391
    """
4392
    node_full = self.cfg.ExpandNodeName(node)
4393
    if node_full is None:
4394
      raise errors.OpPrereqError("Unknown node %s" % node)
4395
    return node_full
4396

    
4397
  def ExpandNames(self):
4398
    """ExpandNames for CreateInstance.
4399

4400
    Figure out the right locks for instance creation.
4401

4402
    """
4403
    self.needed_locks = {}
4404

    
4405
    # set optional parameters to none if they don't exist
4406
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4407
      if not hasattr(self.op, attr):
4408
        setattr(self.op, attr, None)
4409

    
4410
    # cheap checks, mostly valid constants given
4411

    
4412
    # verify creation mode
4413
    if self.op.mode not in (constants.INSTANCE_CREATE,
4414
                            constants.INSTANCE_IMPORT):
4415
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4416
                                 self.op.mode)
4417

    
4418
    # disk template and mirror node verification
4419
    if self.op.disk_template not in constants.DISK_TEMPLATES:
4420
      raise errors.OpPrereqError("Invalid disk template name")
4421

    
4422
    if self.op.hypervisor is None:
4423
      self.op.hypervisor = self.cfg.GetHypervisorType()
4424

    
4425
    cluster = self.cfg.GetClusterInfo()
4426
    enabled_hvs = cluster.enabled_hypervisors
4427
    if self.op.hypervisor not in enabled_hvs:
4428
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4429
                                 " cluster (%s)" % (self.op.hypervisor,
4430
                                  ",".join(enabled_hvs)))
4431

    
4432
    # check hypervisor parameter syntax (locally)
4433
    utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4434
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4435
                                  self.op.hvparams)
4436
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4437
    hv_type.CheckParameterSyntax(filled_hvp)
4438
    self.hv_full = filled_hvp
4439

    
4440
    # fill and remember the beparams dict
4441
    utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4442
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4443
                                    self.op.beparams)
4444

    
4445
    #### instance parameters check
4446

    
4447
    # instance name verification
4448
    hostname1 = utils.HostInfo(self.op.instance_name)
4449
    self.op.instance_name = instance_name = hostname1.name
4450

    
4451
    # this is just a preventive check, but someone might still add this
4452
    # instance in the meantime, and creation will fail at lock-add time
4453
    if instance_name in self.cfg.GetInstanceList():
4454
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4455
                                 instance_name)
4456

    
4457
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4458

    
4459
    # NIC buildup
4460
    self.nics = []
4461
    for nic in self.op.nics:
4462
      # ip validity checks
4463
      ip = nic.get("ip", None)
4464
      if ip is None or ip.lower() == "none":
4465
        nic_ip = None
4466
      elif ip.lower() == constants.VALUE_AUTO:
4467
        nic_ip = hostname1.ip
4468
      else:
4469
        if not utils.IsValidIP(ip):
4470
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4471
                                     " like a valid IP" % ip)
4472
        nic_ip = ip
4473

    
4474
      # MAC address verification
4475
      mac = nic.get("mac", constants.VALUE_AUTO)
4476
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4477
        if not utils.IsValidMac(mac.lower()):
4478
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4479
                                     mac)
4480
      # bridge verification
4481
      bridge = nic.get("bridge", None)
4482
      if bridge is None:
4483
        bridge = self.cfg.GetDefBridge()
4484
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4485

    
4486
    # disk checks/pre-build
4487
    self.disks = []
4488
    for disk in self.op.disks:
4489
      mode = disk.get("mode", constants.DISK_RDWR)
4490
      if mode not in constants.DISK_ACCESS_SET:
4491
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4492
                                   mode)
4493
      size = disk.get("size", None)
4494
      if size is None:
4495
        raise errors.OpPrereqError("Missing disk size")
4496
      try:
4497
        size = int(size)
4498
      except ValueError:
4499
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4500
      self.disks.append({"size": size, "mode": mode})
4501

    
4502
    # used in CheckPrereq for ip ping check
4503
    self.check_ip = hostname1.ip
4504

    
4505
    # file storage checks
4506
    if (self.op.file_driver and
4507
        not self.op.file_driver in constants.FILE_DRIVER):
4508
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
4509
                                 self.op.file_driver)
4510

    
4511
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4512
      raise errors.OpPrereqError("File storage directory path not absolute")
4513

    
4514
    ### Node/iallocator related checks
4515
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
4516
      raise errors.OpPrereqError("One and only one of iallocator and primary"
4517
                                 " node must be given")
4518

    
4519
    if self.op.iallocator:
4520
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4521
    else:
4522
      self.op.pnode = self._ExpandNode(self.op.pnode)
4523
      nodelist = [self.op.pnode]
4524
      if self.op.snode is not None:
4525
        self.op.snode = self._ExpandNode(self.op.snode)
4526
        nodelist.append(self.op.snode)
4527
      self.needed_locks[locking.LEVEL_NODE] = nodelist
4528

    
4529
    # in case of import lock the source node too
4530
    if self.op.mode == constants.INSTANCE_IMPORT:
4531
      src_node = getattr(self.op, "src_node", None)
4532
      src_path = getattr(self.op, "src_path", None)
4533

    
4534
      if src_path is None:
4535
        self.op.src_path = src_path = self.op.instance_name
4536

    
4537
      if src_node is None:
4538
        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4539
        self.op.src_node = None
4540
        if os.path.isabs(src_path):
4541
          raise errors.OpPrereqError("Importing an instance from an absolute"
4542
                                     " path requires a source node option.")
4543
      else:
4544
        self.op.src_node = src_node = self._ExpandNode(src_node)
4545
        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4546
          self.needed_locks[locking.LEVEL_NODE].append(src_node)
4547
        if not os.path.isabs(src_path):
4548
          self.op.src_path = src_path = \
4549
            os.path.join(constants.EXPORT_DIR, src_path)
4550

    
4551
    else: # INSTANCE_CREATE
4552
      if getattr(self.op, "os_type", None) is None:
4553
        raise errors.OpPrereqError("No guest OS specified")
4554

    
4555
  def _RunAllocator(self):
4556
    """Run the allocator based on input opcode.
4557

4558
    """
4559
    nics = [n.ToDict() for n in self.nics]
4560
    ial = IAllocator(self,
4561
                     mode=constants.IALLOCATOR_MODE_ALLOC,
4562
                     name=self.op.instance_name,
4563
                     disk_template=self.op.disk_template,
4564
                     tags=[],
4565
                     os=self.op.os_type,
4566
                     vcpus=self.be_full[constants.BE_VCPUS],
4567
                     mem_size=self.be_full[constants.BE_MEMORY],
4568
                     disks=self.disks,
4569
                     nics=nics,
4570
                     hypervisor=self.op.hypervisor,
4571
                     )
4572

    
4573
    ial.Run(self.op.iallocator)
4574

    
4575
    if not ial.success:
4576
      raise errors.OpPrereqError("Can't compute nodes using"
4577
                                 " iallocator '%s': %s" % (self.op.iallocator,
4578
                                                           ial.info))
4579
    if len(ial.nodes) != ial.required_nodes:
4580
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4581
                                 " of nodes (%s), required %s" %
4582
                                 (self.op.iallocator, len(ial.nodes),
4583
                                  ial.required_nodes))
4584
    self.op.pnode = ial.nodes[0]
4585
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4586
                 self.op.instance_name, self.op.iallocator,
4587
                 ", ".join(ial.nodes))
4588
    if ial.required_nodes == 2:
4589
      self.op.snode = ial.nodes[1]
4590

    
4591
  def BuildHooksEnv(self):
4592
    """Build hooks env.
4593

4594
    This runs on master, primary and secondary nodes of the instance.
4595

4596
    """
4597
    env = {
4598
      "ADD_MODE": self.op.mode,
4599
      }
4600
    if self.op.mode == constants.INSTANCE_IMPORT:
4601
      env["SRC_NODE"] = self.op.src_node
4602
      env["SRC_PATH"] = self.op.src_path
4603
      env["SRC_IMAGES"] = self.src_images
4604

    
4605
    env.update(_BuildInstanceHookEnv(
4606
      name=self.op.instance_name,
4607
      primary_node=self.op.pnode,
4608
      secondary_nodes=self.secondaries,
4609
      status=self.op.start,
4610
      os_type=self.op.os_type,
4611
      memory=self.be_full[constants.BE_MEMORY],
4612
      vcpus=self.be_full[constants.BE_VCPUS],
4613
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4614
      disk_template=self.op.disk_template,
4615
      disks=[(d["size"], d["mode"]) for d in self.disks],
4616
      bep=self.be_full,
4617
      hvp=self.hv_full,
4618
      hypervisor_name=self.op.hypervisor,
4619
    ))
4620

    
4621
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4622
          self.secondaries)
4623
    return env, nl, nl
4624

    
4625

    
4626
  def CheckPrereq(self):
4627
    """Check prerequisites.
4628

4629
    """
4630
    if (not self.cfg.GetVGName() and
4631
        self.op.disk_template not in constants.DTS_NOT_LVM):
4632
      raise errors.OpPrereqError("Cluster does not support lvm-based"
4633
                                 " instances")
4634

    
4635
    if self.op.mode == constants.INSTANCE_IMPORT:
4636
      src_node = self.op.src_node
4637
      src_path = self.op.src_path
4638

    
4639
      if src_node is None:
4640
        exp_list = self.rpc.call_export_list(
4641
          self.acquired_locks[locking.LEVEL_NODE])
4642
        found = False
4643
        for node in exp_list:
4644
          if not exp_list[node].failed and src_path in exp_list[node].data:
4645
            found = True
4646
            self.op.src_node = src_node = node
4647
            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4648
                                                       src_path)
4649
            break
4650
        if not found:
4651
          raise errors.OpPrereqError("No export found for relative path %s" %
4652
                                      src_path)
4653

    
4654
      _CheckNodeOnline(self, src_node)
4655
      result = self.rpc.call_export_info(src_node, src_path)
4656
      result.Raise()
4657
      if not result.data:
4658
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
4659

    
4660
      export_info = result.data
4661
      if not export_info.has_section(constants.INISECT_EXP):
4662
        raise errors.ProgrammerError("Corrupted export config")
4663

    
4664
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
4665
      if (int(ei_version) != constants.EXPORT_VERSION):
4666
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4667
                                   (ei_version, constants.EXPORT_VERSION))
4668

    
4669
      # Check that the new instance doesn't have less disks than the export
4670
      instance_disks = len(self.disks)
4671
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4672
      if instance_disks < export_disks:
4673
        raise errors.OpPrereqError("Not enough disks to import."
4674
                                   " (instance: %d, export: %d)" %
4675
                                   (instance_disks, export_disks))
4676

    
4677
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4678
      disk_images = []
4679
      for idx in range(export_disks):
4680
        option = 'disk%d_dump' % idx
4681
        if export_info.has_option(constants.INISECT_INS, option):
4682
          # FIXME: are the old os-es, disk sizes, etc. useful?
4683
          export_name = export_info.get(constants.INISECT_INS, option)
4684
          image = os.path.join(src_path, export_name)
4685
          disk_images.append(image)
4686
        else:
4687
          disk_images.append(False)
4688

    
4689
      self.src_images = disk_images
4690

    
4691
      old_name = export_info.get(constants.INISECT_INS, 'name')
4692
      # FIXME: int() here could throw a ValueError on broken exports
4693
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4694
      if self.op.instance_name == old_name:
4695
        for idx, nic in enumerate(self.nics):
4696
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4697
            nic_mac_ini = 'nic%d_mac' % idx
4698
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4699

    
4700
    # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4701
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
4702
    if self.op.start and not self.op.ip_check:
4703
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4704
                                 " adding an instance in start mode")
4705

    
4706
    if self.op.ip_check:
4707
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4708
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
4709
                                   (self.check_ip, self.op.instance_name))
4710

    
4711
    #### mac address generation
4712
    # By generating here the mac address both the allocator and the hooks get
4713
    # the real final mac address rather than the 'auto' or 'generate' value.
4714
    # There is a race condition between the generation and the instance object
4715
    # creation, which means that we know the mac is valid now, but we're not
4716
    # sure it will be when we actually add the instance. If things go bad
4717
    # adding the instance will abort because of a duplicate mac, and the
4718
    # creation job will fail.
4719
    for nic in self.nics:
4720
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4721
        nic.mac = self.cfg.GenerateMAC()
4722

    
4723
    #### allocator run
4724

    
4725
    if self.op.iallocator is not None:
4726
      self._RunAllocator()
4727

    
4728
    #### node related checks
4729

    
4730
    # check primary node
4731
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4732
    assert self.pnode is not None, \
4733
      "Cannot retrieve locked node %s" % self.op.pnode
4734
    if pnode.offline:
4735
      raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4736
                                 pnode.name)
4737
    if pnode.drained:
4738
      raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4739
                                 pnode.name)
4740

    
4741
    self.secondaries = []
4742

    
4743
    # mirror node verification
4744
    if self.op.disk_template in constants.DTS_NET_MIRROR:
4745
      if self.op.snode is None:
4746
        raise errors.OpPrereqError("The networked disk templates need"
4747
                                   " a mirror node")
4748
      if self.op.snode == pnode.name:
4749
        raise errors.OpPrereqError("The secondary node cannot be"
4750
                                   " the primary node.")
4751
      _CheckNodeOnline(self, self.op.snode)
4752
      _CheckNodeNotDrained(self, self.op.snode)
4753
      self.secondaries.append(self.op.snode)
4754

    
4755
    nodenames = [pnode.name] + self.secondaries
4756

    
4757
    req_size = _ComputeDiskSize(self.op.disk_template,
4758
                                self.disks)
4759

    
4760
    # Check lv size requirements
4761
    if req_size is not None:
4762
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4763
                                         self.op.hypervisor)
4764
      for node in nodenames:
4765
        info = nodeinfo[node]
4766
        info.Raise()
4767
        info = info.data
4768
        if not info:
4769
          raise errors.OpPrereqError("Cannot get current information"
4770
                                     " from node '%s'" % node)
4771
        vg_free = info.get('vg_free', None)
4772
        if not isinstance(vg_free, int):
4773
          raise errors.OpPrereqError("Can't compute free disk space on"
4774
                                     " node %s" % node)
4775
        if req_size > info['vg_free']:
4776
          raise errors.OpPrereqError("Not enough disk space on target node %s."
4777
                                     " %d MB available, %d MB required" %
4778
                                     (node, info['vg_free'], req_size))
4779

    
4780
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4781

    
4782
    # os verification
4783
    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4784
    result.Raise()
4785
    if not isinstance(result.data, objects.OS) or not result.data:
4786
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
4787
                                 " primary node"  % self.op.os_type)
4788

    
4789
    # bridge check on primary node
4790
    bridges = [n.bridge for n in self.nics]
4791
    result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4792
    result.Raise()
4793
    if not result.data:
4794
      raise errors.OpPrereqError("One of the target bridges '%s' does not"
4795
                                 " exist on destination node '%s'" %
4796
                                 (",".join(bridges), pnode.name))
4797

    
4798
    # memory check on primary node
4799
    if self.op.start:
4800
      _CheckNodeFreeMemory(self, self.pnode.name,
4801
                           "creating instance %s" % self.op.instance_name,
4802
                           self.be_full[constants.BE_MEMORY],
4803
                           self.op.hypervisor)
4804

    
4805
  def Exec(self, feedback_fn):
4806
    """Create and add the instance to the cluster.
4807

4808
    """
4809
    instance = self.op.instance_name
4810
    pnode_name = self.pnode.name
4811

    
4812
    ht_kind = self.op.hypervisor
4813
    if ht_kind in constants.HTS_REQ_PORT:
4814
      network_port = self.cfg.AllocatePort()
4815
    else:
4816
      network_port = None
4817

    
4818
    ##if self.op.vnc_bind_address is None:
4819
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4820

    
4821
    # this is needed because os.path.join does not accept None arguments
4822
    if self.op.file_storage_dir is None:
4823
      string_file_storage_dir = ""
4824
    else:
4825
      string_file_storage_dir = self.op.file_storage_dir
4826

    
4827
    # build the full file storage dir path
4828
    file_storage_dir = os.path.normpath(os.path.join(
4829
                                        self.cfg.GetFileStorageDir(),
4830
                                        string_file_storage_dir, instance))
4831

    
4832

    
4833
    disks = _GenerateDiskTemplate(self,
4834
                                  self.op.disk_template,
4835
                                  instance, pnode_name,
4836
                                  self.secondaries,
4837
                                  self.disks,
4838
                                  file_storage_dir,
4839
                                  self.op.file_driver,
4840
                                  0)
4841

    
4842
    iobj = objects.Instance(name=instance, os=self.op.os_type,
4843
                            primary_node=pnode_name,
4844
                            nics=self.nics, disks=disks,
4845
                            disk_template=self.op.disk_template,
4846
                            admin_up=False,
4847
                            network_port=network_port,
4848
                            beparams=self.op.beparams,
4849
                            hvparams=self.op.hvparams,
4850
                            hypervisor=self.op.hypervisor,
4851
                            )
4852

    
4853
    feedback_fn("* creating instance disks...")
4854
    try:
4855
      _CreateDisks(self, iobj)
4856
    except errors.OpExecError:
4857
      self.LogWarning("Device creation failed, reverting...")
4858
      try:
4859
        _RemoveDisks(self, iobj)
4860
      finally:
4861
        self.cfg.ReleaseDRBDMinors(instance)
4862
        raise
4863

    
4864
    feedback_fn("adding instance %s to cluster config" % instance)
4865

    
4866
    self.cfg.AddInstance(iobj)
4867
    # Declare that we don't want to remove the instance lock anymore, as we've
4868
    # added the instance to the config
4869
    del self.remove_locks[locking.LEVEL_INSTANCE]
4870
    # Unlock all the nodes
4871
    if self.op.mode == constants.INSTANCE_IMPORT:
4872
      nodes_keep = [self.op.src_node]
4873
      nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4874
                       if node != self.op.src_node]
4875
      self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4876
      self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4877
    else:
4878
      self.context.glm.release(locking.LEVEL_NODE)
4879
      del self.acquired_locks[locking.LEVEL_NODE]
4880

    
4881
    if self.op.wait_for_sync:
4882
      disk_abort = not _WaitForSync(self, iobj)
4883
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
4884
      # make sure the disks are not degraded (still sync-ing is ok)
4885
      time.sleep(15)
4886
      feedback_fn("* checking mirrors status")
4887
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4888
    else:
4889
      disk_abort = False
4890

    
4891
    if disk_abort:
4892
      _RemoveDisks(self, iobj)
4893
      self.cfg.RemoveInstance(iobj.name)
4894
      # Make sure the instance lock gets removed
4895
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4896
      raise errors.OpExecError("There are some degraded disks for"
4897
                               " this instance")
4898

    
4899
    feedback_fn("creating os for instance %s on node %s" %
4900
                (instance, pnode_name))
4901

    
4902
    if iobj.disk_template != constants.DT_DISKLESS:
4903
      if self.op.mode == constants.INSTANCE_CREATE:
4904
        feedback_fn("* running the instance OS create scripts...")
4905
        result = self.rpc.call_instance_os_add(pnode_name, iobj)
4906
        msg = result.RemoteFailMsg()
4907
        if msg:
4908
          raise errors.OpExecError("Could not add os for instance %s"
4909
                                   " on node %s: %s" %
4910
                                   (instance, pnode_name, msg))
4911

    
4912
      elif self.op.mode == constants.INSTANCE_IMPORT:
4913
        feedback_fn("* running the instance OS import scripts...")
4914
        src_node = self.op.src_node
4915
        src_images = self.src_images
4916
        cluster_name = self.cfg.GetClusterName()
4917
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4918
                                                         src_node, src_images,
4919
                                                         cluster_name)
4920
        import_result.Raise()
4921
        for idx, result in enumerate(import_result.data):
4922
          if not result:
4923
            self.LogWarning("Could not import the image %s for instance"
4924
                            " %s, disk %d, on node %s" %
4925
                            (src_images[idx], instance, idx, pnode_name))
4926
      else:
4927
        # also checked in the prereq part
4928
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4929
                                     % self.op.mode)
4930

    
4931
    if self.op.start:
4932
      iobj.admin_up = True
4933
      self.cfg.Update(iobj)
4934
      logging.info("Starting instance %s on node %s", instance, pnode_name)
4935
      feedback_fn("* starting instance...")
4936
      result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4937
      msg = result.RemoteFailMsg()
4938
      if msg:
4939
        raise errors.OpExecError("Could not start instance: %s" % msg)
4940

    
4941

    
4942
class LUConnectConsole(NoHooksLU):
4943
  """Connect to an instance's console.
4944

4945
  This is somewhat special in that it returns the command line that
4946
  you need to run on the master node in order to connect to the
4947
  console.
4948

4949
  """
4950
  _OP_REQP = ["instance_name"]
4951
  REQ_BGL = False
4952

    
4953
  def ExpandNames(self):
4954
    self._ExpandAndLockInstance()
4955

    
4956
  def CheckPrereq(self):
4957
    """Check prerequisites.
4958

4959
    This checks that the instance is in the cluster.
4960

4961
    """
4962
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4963
    assert self.instance is not None, \
4964
      "Cannot retrieve locked instance %s" % self.op.instance_name
4965
    _CheckNodeOnline(self, self.instance.primary_node)
4966

    
4967
  def Exec(self, feedback_fn):
4968
    """Connect to the console of an instance
4969

4970
    """
4971
    instance = self.instance
4972
    node = instance.primary_node
4973

    
4974
    node_insts = self.rpc.call_instance_list([node],
4975
                                             [instance.hypervisor])[node]
4976
    node_insts.Raise()
4977

    
4978
    if instance.name not in node_insts.data:
4979
      raise errors.OpExecError("Instance %s is not running." % instance.name)
4980

    
4981
    logging.debug("Connecting to console of %s on %s", instance.name, node)
4982

    
4983
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
4984
    cluster = self.cfg.GetClusterInfo()
4985
    # beparams and hvparams are passed separately, to avoid editing the
4986
    # instance and then saving the defaults in the instance itself.
4987
    hvparams = cluster.FillHV(instance)
4988
    beparams = cluster.FillBE(instance)
4989
    console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4990

    
4991
    # build ssh cmdline
4992
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4993

    
4994

    
4995
class LUReplaceDisks(LogicalUnit):
4996
  """Replace the disks of an instance.
4997

4998
  """
4999
  HPATH = "mirrors-replace"
5000
  HTYPE = constants.HTYPE_INSTANCE
5001
  _OP_REQP = ["instance_name", "mode", "disks"]
5002
  REQ_BGL = False
5003

    
5004
  def CheckArguments(self):
5005
    if not hasattr(self.op, "remote_node"):
5006
      self.op.remote_node = None
5007
    if not hasattr(self.op, "iallocator"):
5008
      self.op.iallocator = None
5009

    
5010
    # check for valid parameter combination
5011
    cnt = [self.op.remote_node, self.op.iallocator].count(None)
5012
    if self.op.mode == constants.REPLACE_DISK_CHG:
5013
      if cnt == 2:
5014
        raise errors.OpPrereqError("When changing the secondary either an"
5015
                                   " iallocator script must be used or the"
5016
                                   " new node given")
5017
      elif cnt == 0:
5018
        raise errors.OpPrereqError("Give either the iallocator or the new"
5019
                                   " secondary, not both")
5020
    else: # not replacing the secondary
5021
      if cnt != 2:
5022
        raise errors.OpPrereqError("The iallocator and new node options can"
5023
                                   " be used only when changing the"
5024
                                   " secondary node")
5025

    
5026
  def ExpandNames(self):
5027
    self._ExpandAndLockInstance()
5028

    
5029
    if self.op.iallocator is not None:
5030
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5031
    elif self.op.remote_node is not None:
5032
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5033
      if remote_node is None:
5034
        raise errors.OpPrereqError("Node '%s' not known" %
5035
                                   self.op.remote_node)
5036
      self.op.remote_node = remote_node
5037
      # Warning: do not remove the locking of the new secondary here
5038
      # unless DRBD8.AddChildren is changed to work in parallel;
5039
      # currently it doesn't since parallel invocations of
5040
      # FindUnusedMinor will conflict
5041
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5042
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5043
    else:
5044
      self.needed_locks[locking.LEVEL_NODE] = []
5045
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5046

    
5047
  def DeclareLocks(self, level):
5048
    # If we're not already locking all nodes in the set we have to declare the
5049
    # instance's primary/secondary nodes.
5050
    if (level == locking.LEVEL_NODE and
5051
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5052
      self._LockInstancesNodes()
5053

    
5054
  def _RunAllocator(self):
5055
    """Compute a new secondary node using an IAllocator.
5056

5057
    """
5058
    ial = IAllocator(self,
5059
                     mode=constants.IALLOCATOR_MODE_RELOC,
5060
                     name=self.op.instance_name,
5061
                     relocate_from=[self.sec_node])
5062

    
5063
    ial.Run(self.op.iallocator)
5064

    
5065
    if not ial.success:
5066
      raise errors.OpPrereqError("Can't compute nodes using"
5067
                                 " iallocator '%s': %s" % (self.op.iallocator,
5068
                                                           ial.info))
5069
    if len(ial.nodes) != ial.required_nodes:
5070
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5071
                                 " of nodes (%s), required %s" %
5072
                                 (len(ial.nodes), ial.required_nodes))
5073
    self.op.remote_node = ial.nodes[0]
5074
    self.LogInfo("Selected new secondary for the instance: %s",
5075
                 self.op.remote_node)
5076

    
5077
  def BuildHooksEnv(self):
5078
    """Build hooks env.
5079

5080
    This runs on the master, the primary and all the secondaries.
5081

5082
    """
5083
    env = {
5084
      "MODE": self.op.mode,
5085
      "NEW_SECONDARY": self.op.remote_node,
5086
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
5087
      }
5088
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5089
    nl = [
5090
      self.cfg.GetMasterNode(),
5091
      self.instance.primary_node,
5092
      ]
5093
    if self.op.remote_node is not None:
5094
      nl.append(self.op.remote_node)
5095
    return env, nl, nl
5096

    
5097
  def CheckPrereq(self):
5098
    """Check prerequisites.
5099

5100
    This checks that the instance is in the cluster.
5101

5102
    """
5103
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5104
    assert instance is not None, \
5105
      "Cannot retrieve locked instance %s" % self.op.instance_name
5106
    self.instance = instance
5107

    
5108
    if instance.disk_template != constants.DT_DRBD8:
5109
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5110
                                 " instances")
5111

    
5112
    if len(instance.secondary_nodes) != 1:
5113
      raise errors.OpPrereqError("The instance has a strange layout,"
5114
                                 " expected one secondary but found %d" %
5115
                                 len(instance.secondary_nodes))
5116

    
5117
    self.sec_node = instance.secondary_nodes[0]
5118

    
5119
    if self.op.iallocator is not None:
5120
      self._RunAllocator()
5121

    
5122
    remote_node = self.op.remote_node
5123
    if remote_node is not None:
5124
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5125
      assert self.remote_node_info is not None, \
5126
        "Cannot retrieve locked node %s" % remote_node
5127
    else:
5128
      self.remote_node_info = None
5129
    if remote_node == instance.primary_node:
5130
      raise errors.OpPrereqError("The specified node is the primary node of"
5131
                                 " the instance.")
5132
    elif remote_node == self.sec_node:
5133
      raise errors.OpPrereqError("The specified node is already the"
5134
                                 " secondary node of the instance.")
5135

    
5136
    if self.op.mode == constants.REPLACE_DISK_PRI:
5137
      n1 = self.tgt_node = instance.primary_node
5138
      n2 = self.oth_node = self.sec_node
5139
    elif self.op.mode == constants.REPLACE_DISK_SEC:
5140
      n1 = self.tgt_node = self.sec_node
5141
      n2 = self.oth_node = instance.primary_node
5142
    elif self.op.mode == constants.REPLACE_DISK_CHG:
5143
      n1 = self.new_node = remote_node
5144
      n2 = self.oth_node = instance.primary_node
5145
      self.tgt_node = self.sec_node
5146
      _CheckNodeNotDrained(self, remote_node)
5147
    else:
5148
      raise errors.ProgrammerError("Unhandled disk replace mode")
5149

    
5150
    _CheckNodeOnline(self, n1)
5151
    _CheckNodeOnline(self, n2)
5152

    
5153
    if not self.op.disks:
5154
      self.op.disks = range(len(instance.disks))
5155

    
5156
    for disk_idx in self.op.disks:
5157
      instance.FindDisk(disk_idx)
5158

    
5159
  def _ExecD8DiskOnly(self, feedback_fn):
5160
    """Replace a disk on the primary or secondary for dbrd8.
5161

5162
    The algorithm for replace is quite complicated:
5163

5164
      1. for each disk to be replaced:
5165

5166
        1. create new LVs on the target node with unique names
5167
        1. detach old LVs from the drbd device
5168
        1. rename old LVs to name_replaced.<time_t>
5169
        1. rename new LVs to old LVs
5170
        1. attach the new LVs (with the old names now) to the drbd device
5171

5172
      1. wait for sync across all devices
5173

5174
      1. for each modified disk:
5175

5176
        1. remove old LVs (which have the name name_replaces.<time_t>)
5177

5178
    Failures are not very well handled.
5179

5180
    """
5181
    steps_total = 6
5182
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5183
    instance = self.instance
5184
    iv_names = {}
5185
    vgname = self.cfg.GetVGName()
5186
    # start of work
5187
    cfg = self.cfg
5188
    tgt_node = self.tgt_node
5189
    oth_node = self.oth_node
5190

    
5191
    # Step: check device activation
5192
    self.proc.LogStep(1, steps_total, "check device existence")
5193
    info("checking volume groups")
5194
    my_vg = cfg.GetVGName()
5195
    results = self.rpc.call_vg_list([oth_node, tgt_node])
5196
    if not results:
5197
      raise errors.OpExecError("Can't list volume groups on the nodes")
5198
    for node in oth_node, tgt_node:
5199
      res = results[node]
5200
      if res.failed or not res.data or my_vg not in res.data:
5201
        raise errors.OpExecError("Volume group '%s' not found on %s" %
5202
                                 (my_vg, node))
5203
    for idx, dev in enumerate(instance.disks):
5204
      if idx not in self.op.disks:
5205
        continue
5206
      for node in tgt_node, oth_node:
5207
        info("checking disk/%d on %s" % (idx, node))
5208
        cfg.SetDiskID(dev, node)
5209
        result = self.rpc.call_blockdev_find(node, dev)
5210
        msg = result.RemoteFailMsg()
5211
        if not msg and not result.payload:
5212
          msg = "disk not found"
5213
        if msg:
5214
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5215
                                   (idx, node, msg))
5216

    
5217
    # Step: check other node consistency
5218
    self.proc.LogStep(2, steps_total, "check peer consistency")
5219
    for idx, dev in enumerate(instance.disks):
5220
      if idx not in self.op.disks:
5221
        continue
5222
      info("checking disk/%d consistency on %s" % (idx, oth_node))
5223
      if not _CheckDiskConsistency(self, dev, oth_node,
5224
                                   oth_node==instance.primary_node):
5225
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5226
                                 " to replace disks on this node (%s)" %
5227
                                 (oth_node, tgt_node))
5228

    
5229
    # Step: create new storage
5230
    self.proc.LogStep(3, steps_total, "allocate new storage")
5231
    for idx, dev in enumerate(instance.disks):
5232
      if idx not in self.op.disks:
5233
        continue
5234
      size = dev.size
5235
      cfg.SetDiskID(dev, tgt_node)
5236
      lv_names = [".disk%d_%s" % (idx, suf)
5237
                  for suf in ["data", "meta"]]
5238
      names = _GenerateUniqueNames(self, lv_names)
5239
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5240
                             logical_id=(vgname, names[0]))
5241
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5242
                             logical_id=(vgname, names[1]))
5243
      new_lvs = [lv_data, lv_meta]
5244
      old_lvs = dev.children
5245
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5246
      info("creating new local storage on %s for %s" %
5247
           (tgt_node, dev.iv_name))
5248
      # we pass force_create=True to force the LVM creation
5249
      for new_lv in new_lvs:
5250
        _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5251
                        _GetInstanceInfoText(instance), False)
5252

    
5253
    # Step: for each lv, detach+rename*2+attach
5254
    self.proc.LogStep(4, steps_total, "change drbd configuration")
5255
    for dev, old_lvs, new_lvs in iv_names.itervalues():
5256
      info("detaching %s drbd from local storage" % dev.iv_name)
5257
      result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5258
      result.Raise()
5259
      if not result.data:
5260
        raise errors.OpExecError("Can't detach drbd from local storage on node"
5261
                                 " %s for device %s" % (tgt_node, dev.iv_name))
5262
      #dev.children = []
5263
      #cfg.Update(instance)
5264

    
5265
      # ok, we created the new LVs, so now we know we have the needed
5266
      # storage; as such, we proceed on the target node to rename
5267
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5268
      # using the assumption that logical_id == physical_id (which in
5269
      # turn is the unique_id on that node)
5270

    
5271
      # FIXME(iustin): use a better name for the replaced LVs
5272
      temp_suffix = int(time.time())
5273
      ren_fn = lambda d, suff: (d.physical_id[0],
5274
                                d.physical_id[1] + "_replaced-%s" % suff)
5275
      # build the rename list based on what LVs exist on the node
5276
      rlist = []
5277
      for to_ren in old_lvs:
5278
        result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5279
        if not result.RemoteFailMsg() and result.payload:
5280
          # device exists
5281
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5282

    
5283
      info("renaming the old LVs on the target node")
5284
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5285
      result.Raise()
5286
      if not result.data:
5287
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5288
      # now we rename the new LVs to the old LVs
5289
      info("renaming the new LVs on the target node")
5290
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5291
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5292
      result.Raise()
5293
      if not result.data:
5294
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5295

    
5296
      for old, new in zip(old_lvs, new_lvs):
5297
        new.logical_id = old.logical_id
5298
        cfg.SetDiskID(new, tgt_node)
5299

    
5300
      for disk in old_lvs:
5301
        disk.logical_id = ren_fn(disk, temp_suffix)
5302
        cfg.SetDiskID(disk, tgt_node)
5303

    
5304
      # now that the new lvs have the old name, we can add them to the device
5305
      info("adding new mirror component on %s" % tgt_node)
5306
      result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5307
      if result.failed or not result.data:
5308
        for new_lv in new_lvs:
5309
          msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5310
          if msg:
5311
            warning("Can't rollback device %s: %s", dev, msg,
5312
                    hint="cleanup manually the unused logical volumes")
5313
        raise errors.OpExecError("Can't add local storage to drbd")
5314

    
5315
      dev.children = new_lvs
5316
      cfg.Update(instance)
5317

    
5318
    # Step: wait for sync
5319

    
5320
    # this can fail as the old devices are degraded and _WaitForSync
5321
    # does a combined result over all disks, so we don't check its
5322
    # return value
5323
    self.proc.LogStep(5, steps_total, "sync devices")
5324
    _WaitForSync(self, instance, unlock=True)
5325

    
5326
    # so check manually all the devices
5327
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5328
      cfg.SetDiskID(dev, instance.primary_node)
5329
      result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5330
      msg = result.RemoteFailMsg()
5331
      if not msg and not result.payload:
5332
        msg = "disk not found"
5333
      if msg:
5334
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
5335
                                 (name, msg))
5336
      if result.payload[5]:
5337
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
5338

    
5339
    # Step: remove old storage
5340
    self.proc.LogStep(6, steps_total, "removing old storage")
5341
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5342
      info("remove logical volumes for %s" % name)
5343
      for lv in old_lvs:
5344
        cfg.SetDiskID(lv, tgt_node)
5345
        msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5346
        if msg:
5347
          warning("Can't remove old LV: %s" % msg,
5348
                  hint="manually remove unused LVs")
5349
          continue
5350

    
5351
  def _ExecD8Secondary(self, feedback_fn):
5352
    """Replace the secondary node for drbd8.
5353

5354
    The algorithm for replace is quite complicated:
5355
      - for all disks of the instance:
5356
        - create new LVs on the new node with same names
5357
        - shutdown the drbd device on the old secondary
5358
        - disconnect the drbd network on the primary
5359
        - create the drbd device on the new secondary
5360
        - network attach the drbd on the primary, using an artifice:
5361
          the drbd code for Attach() will connect to the network if it
5362
          finds a device which is connected to the good local disks but
5363
          not network enabled
5364
      - wait for sync across all devices
5365
      - remove all disks from the old secondary
5366

5367
    Failures are not very well handled.
5368

5369
    """
5370
    steps_total = 6
5371
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5372
    instance = self.instance
5373
    iv_names = {}
5374
    # start of work
5375
    cfg = self.cfg
5376
    old_node = self.tgt_node
5377
    new_node = self.new_node
5378
    pri_node = instance.primary_node
5379
    nodes_ip = {
5380
      old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5381
      new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5382
      pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5383
      }
5384

    
5385
    # Step: check device activation
5386
    self.proc.LogStep(1, steps_total, "check device existence")
5387
    info("checking volume groups")
5388
    my_vg = cfg.GetVGName()
5389
    results = self.rpc.call_vg_list([pri_node, new_node])
5390
    for node in pri_node, new_node:
5391
      res = results[node]
5392
      if res.failed or not res.data or my_vg not in res.data:
5393
        raise errors.OpExecError("Volume group '%s' not found on %s" %
5394
                                 (my_vg, node))
5395
    for idx, dev in enumerate(instance.disks):
5396
      if idx not in self.op.disks:
5397
        continue
5398
      info("checking disk/%d on %s" % (idx, pri_node))
5399
      cfg.SetDiskID(dev, pri_node)
5400
      result = self.rpc.call_blockdev_find(pri_node, dev)
5401
      msg = result.RemoteFailMsg()
5402
      if not msg and not result.payload:
5403
        msg = "disk not found"
5404
      if msg:
5405
        raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5406
                                 (idx, pri_node, msg))
5407

    
5408
    # Step: check other node consistency
5409
    self.proc.LogStep(2, steps_total, "check peer consistency")
5410
    for idx, dev in enumerate(instance.disks):
5411
      if idx not in self.op.disks:
5412
        continue
5413
      info("checking disk/%d consistency on %s" % (idx, pri_node))
5414
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5415
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
5416
                                 " unsafe to replace the secondary" %
5417
                                 pri_node)
5418

    
5419
    # Step: create new storage
5420
    self.proc.LogStep(3, steps_total, "allocate new storage")
5421
    for idx, dev in enumerate(instance.disks):
5422
      info("adding new local storage on %s for disk/%d" %
5423
           (new_node, idx))
5424
      # we pass force_create=True to force LVM creation
5425
      for new_lv in dev.children:
5426
        _CreateBlockDev(self, new_node, instance, new_lv, True,
5427
                        _GetInstanceInfoText(instance), False)
5428

    
5429
    # Step 4: dbrd minors and drbd setups changes
5430
    # after this, we must manually remove the drbd minors on both the
5431
    # error and the success paths
5432
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5433
                                   instance.name)
5434
    logging.debug("Allocated minors %s" % (minors,))
5435
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
5436
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5437
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5438
      # create new devices on new_node; note that we create two IDs:
5439
      # one without port, so the drbd will be activated without
5440
      # networking information on the new node at this stage, and one
5441
      # with network, for the latter activation in step 4
5442
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5443
      if pri_node == o_node1:
5444
        p_minor = o_minor1
5445
      else:
5446
        p_minor = o_minor2
5447

    
5448
      new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5449
      new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5450

    
5451
      iv_names[idx] = (dev, dev.children, new_net_id)
5452
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5453
                    new_net_id)
5454
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5455
                              logical_id=new_alone_id,
5456
                              children=dev.children,
5457
                              size=dev.size)
5458
      try:
5459
        _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5460
                              _GetInstanceInfoText(instance), False)
5461
      except errors.GenericError:
5462
        self.cfg.ReleaseDRBDMinors(instance.name)
5463
        raise
5464

    
5465
    for idx, dev in enumerate(instance.disks):
5466
      # we have new devices, shutdown the drbd on the old secondary
5467
      info("shutting down drbd for disk/%d on old node" % idx)
5468
      cfg.SetDiskID(dev, old_node)
5469
      msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5470
      if msg:
5471
        warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5472
                (idx, msg),
5473
                hint="Please cleanup this device manually as soon as possible")
5474

    
5475
    info("detaching primary drbds from the network (=> standalone)")
5476
    result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5477
                                               instance.disks)[pri_node]
5478

    
5479
    msg = result.RemoteFailMsg()
5480
    if msg:
5481
      # detaches didn't succeed (unlikely)
5482
      self.cfg.ReleaseDRBDMinors(instance.name)
5483
      raise errors.OpExecError("Can't detach the disks from the network on"
5484
                               " old node: %s" % (msg,))
5485

    
5486
    # if we managed to detach at least one, we update all the disks of
5487
    # the instance to point to the new secondary
5488
    info("updating instance configuration")
5489
    for dev, _, new_logical_id in iv_names.itervalues():
5490
      dev.logical_id = new_logical_id
5491
      cfg.SetDiskID(dev, pri_node)
5492
    cfg.Update(instance)
5493

    
5494
    # and now perform the drbd attach
5495
    info("attaching primary drbds to new secondary (standalone => connected)")
5496
    result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5497
                                           instance.disks, instance.name,
5498
                                           False)
5499
    for to_node, to_result in result.items():
5500
      msg = to_result.RemoteFailMsg()
5501
      if msg:
5502
        warning("can't attach drbd disks on node %s: %s", to_node, msg,
5503
                hint="please do a gnt-instance info to see the"
5504
                " status of disks")
5505

    
5506
    # this can fail as the old devices are degraded and _WaitForSync
5507
    # does a combined result over all disks, so we don't check its
5508
    # return value
5509
    self.proc.LogStep(5, steps_total, "sync devices")
5510
    _WaitForSync(self, instance, unlock=True)
5511

    
5512
    # so check manually all the devices
5513
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
5514
      cfg.SetDiskID(dev, pri_node)
5515
      result = self.rpc.call_blockdev_find(pri_node, dev)
5516
      msg = result.RemoteFailMsg()
5517
      if not msg and not result.payload:
5518
        msg = "disk not found"
5519
      if msg:
5520
        raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5521
                                 (idx, msg))
5522
      if result.payload[5]:
5523
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5524

    
5525
    self.proc.LogStep(6, steps_total, "removing old storage")
5526
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
5527
      info("remove logical volumes for disk/%d" % idx)
5528
      for lv in old_lvs:
5529
        cfg.SetDiskID(lv, old_node)
5530
        msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5531
        if msg:
5532
          warning("Can't remove LV on old secondary: %s", msg,
5533
                  hint="Cleanup stale volumes by hand")
5534

    
5535
  def Exec(self, feedback_fn):
5536
    """Execute disk replacement.
5537

5538
    This dispatches the disk replacement to the appropriate handler.
5539

5540
    """
5541
    instance = self.instance
5542

    
5543
    # Activate the instance disks if we're replacing them on a down instance
5544
    if not instance.admin_up:
5545
      _StartInstanceDisks(self, instance, True)
5546

    
5547
    if self.op.mode == constants.REPLACE_DISK_CHG:
5548
      fn = self._ExecD8Secondary
5549
    else:
5550
      fn = self._ExecD8DiskOnly
5551

    
5552
    ret = fn(feedback_fn)
5553

    
5554
    # Deactivate the instance disks if we're replacing them on a down instance
5555
    if not instance.admin_up:
5556
      _SafeShutdownInstanceDisks(self, instance)
5557

    
5558
    return ret
5559

    
5560

    
5561
class LUGrowDisk(LogicalUnit):
5562
  """Grow a disk of an instance.
5563

5564
  """
5565
  HPATH = "disk-grow"
5566
  HTYPE = constants.HTYPE_INSTANCE
5567
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5568
  REQ_BGL = False
5569

    
5570
  def ExpandNames(self):
5571
    self._ExpandAndLockInstance()
5572
    self.needed_locks[locking.LEVEL_NODE] = []
5573
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5574

    
5575
  def DeclareLocks(self, level):
5576
    if level == locking.LEVEL_NODE:
5577
      self._LockInstancesNodes()
5578

    
5579
  def BuildHooksEnv(self):
5580
    """Build hooks env.
5581

5582
    This runs on the master, the primary and all the secondaries.
5583

5584
    """
5585
    env = {
5586
      "DISK": self.op.disk,
5587
      "AMOUNT": self.op.amount,
5588
      }
5589
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5590
    nl = [
5591
      self.cfg.GetMasterNode(),
5592
      self.instance.primary_node,
5593
      ]
5594
    return env, nl, nl
5595

    
5596
  def CheckPrereq(self):
5597
    """Check prerequisites.
5598

5599
    This checks that the instance is in the cluster.
5600

5601
    """
5602
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5603
    assert instance is not None, \
5604
      "Cannot retrieve locked instance %s" % self.op.instance_name
5605
    nodenames = list(instance.all_nodes)
5606
    for node in nodenames:
5607
      _CheckNodeOnline(self, node)
5608

    
5609

    
5610
    self.instance = instance
5611

    
5612
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5613
      raise errors.OpPrereqError("Instance's disk layout does not support"
5614
                                 " growing.")
5615

    
5616
    self.disk = instance.FindDisk(self.op.disk)
5617

    
5618
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5619
                                       instance.hypervisor)
5620
    for node in nodenames:
5621
      info = nodeinfo[node]
5622
      if info.failed or not info.data:
5623
        raise errors.OpPrereqError("Cannot get current information"
5624
                                   " from node '%s'" % node)
5625
      vg_free = info.data.get('vg_free', None)
5626
      if not isinstance(vg_free, int):
5627
        raise errors.OpPrereqError("Can't compute free disk space on"
5628
                                   " node %s" % node)
5629
      if self.op.amount > vg_free:
5630
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
5631
                                   " %d MiB available, %d MiB required" %
5632
                                   (node, vg_free, self.op.amount))
5633

    
5634
  def Exec(self, feedback_fn):
5635
    """Execute disk grow.
5636

5637
    """
5638
    instance = self.instance
5639
    disk = self.disk
5640
    for node in instance.all_nodes:
5641
      self.cfg.SetDiskID(disk, node)
5642
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5643
      msg = result.RemoteFailMsg()
5644
      if msg:
5645
        raise errors.OpExecError("Grow request failed to node %s: %s" %
5646
                                 (node, msg))
5647
    disk.RecordGrow(self.op.amount)
5648
    self.cfg.Update(instance)
5649
    if self.op.wait_for_sync:
5650
      disk_abort = not _WaitForSync(self, instance)
5651
      if disk_abort:
5652
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5653
                             " status.\nPlease check the instance.")
5654

    
5655

    
5656
class LUQueryInstanceData(NoHooksLU):
5657
  """Query runtime instance data.
5658

5659
  """
5660
  _OP_REQP = ["instances", "static"]
5661
  REQ_BGL = False
5662

    
5663
  def ExpandNames(self):
5664
    self.needed_locks = {}
5665
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5666

    
5667
    if not isinstance(self.op.instances, list):
5668
      raise errors.OpPrereqError("Invalid argument type 'instances'")
5669

    
5670
    if self.op.instances:
5671
      self.wanted_names = []
5672
      for name in self.op.instances:
5673
        full_name = self.cfg.ExpandInstanceName(name)
5674
        if full_name is None:
5675
          raise errors.OpPrereqError("Instance '%s' not known" % name)
5676
        self.wanted_names.append(full_name)
5677
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5678
    else:
5679
      self.wanted_names = None
5680
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5681

    
5682
    self.needed_locks[locking.LEVEL_NODE] = []
5683
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5684

    
5685
  def DeclareLocks(self, level):
5686
    if level == locking.LEVEL_NODE:
5687
      self._LockInstancesNodes()
5688

    
5689
  def CheckPrereq(self):
5690
    """Check prerequisites.
5691

5692
    This only checks the optional instance list against the existing names.
5693

5694
    """
5695
    if self.wanted_names is None:
5696
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5697

    
5698
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5699
                             in self.wanted_names]
5700
    return
5701

    
5702
  def _ComputeDiskStatus(self, instance, snode, dev):
5703
    """Compute block device status.
5704

5705
    """
5706
    static = self.op.static
5707
    if not static:
5708
      self.cfg.SetDiskID(dev, instance.primary_node)
5709
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5710
      if dev_pstatus.offline:
5711
        dev_pstatus = None
5712
      else:
5713
        msg = dev_pstatus.RemoteFailMsg()
5714
        if msg:
5715
          raise errors.OpExecError("Can't compute disk status for %s: %s" %
5716
                                   (instance.name, msg))
5717
        dev_pstatus = dev_pstatus.payload
5718
    else:
5719
      dev_pstatus = None
5720

    
5721
    if dev.dev_type in constants.LDS_DRBD:
5722
      # we change the snode then (otherwise we use the one passed in)
5723
      if dev.logical_id[0] == instance.primary_node:
5724
        snode = dev.logical_id[1]
5725
      else:
5726
        snode = dev.logical_id[0]
5727

    
5728
    if snode and not static:
5729
      self.cfg.SetDiskID(dev, snode)
5730
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5731
      if dev_sstatus.offline:
5732
        dev_sstatus = None
5733
      else:
5734
        msg = dev_sstatus.RemoteFailMsg()
5735
        if msg:
5736
          raise errors.OpExecError("Can't compute disk status for %s: %s" %
5737
                                   (instance.name, msg))
5738
        dev_sstatus = dev_sstatus.payload
5739
    else:
5740
      dev_sstatus = None
5741

    
5742
    if dev.children:
5743
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
5744
                      for child in dev.children]
5745
    else:
5746
      dev_children = []
5747

    
5748
    data = {
5749
      "iv_name": dev.iv_name,
5750
      "dev_type": dev.dev_type,
5751
      "logical_id": dev.logical_id,
5752
      "physical_id": dev.physical_id,
5753
      "pstatus": dev_pstatus,
5754
      "sstatus": dev_sstatus,
5755
      "children": dev_children,
5756
      "mode": dev.mode,
5757
      "size": dev.size,
5758
      }
5759

    
5760
    return data
5761

    
5762
  def Exec(self, feedback_fn):
5763
    """Gather and return data"""
5764
    result = {}
5765

    
5766
    cluster = self.cfg.GetClusterInfo()
5767

    
5768
    for instance in self.wanted_instances:
5769
      if not self.op.static:
5770
        remote_info = self.rpc.call_instance_info(instance.primary_node,
5771
                                                  instance.name,
5772
                                                  instance.hypervisor)
5773
        remote_info.Raise()
5774
        remote_info = remote_info.data
5775
        if remote_info and "state" in remote_info:
5776
          remote_state = "up"
5777
        else:
5778
          remote_state = "down"
5779
      else:
5780
        remote_state = None
5781
      if instance.admin_up:
5782
        config_state = "up"
5783
      else:
5784
        config_state = "down"
5785

    
5786
      disks = [self._ComputeDiskStatus(instance, None, device)
5787
               for device in instance.disks]
5788

    
5789
      idict = {
5790
        "name": instance.name,
5791
        "config_state": config_state,
5792
        "run_state": remote_state,
5793
        "pnode": instance.primary_node,
5794
        "snodes": instance.secondary_nodes,
5795
        "os": instance.os,
5796
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5797
        "disks": disks,
5798
        "hypervisor": instance.hypervisor,
5799
        "network_port": instance.network_port,
5800
        "hv_instance": instance.hvparams,
5801
        "hv_actual": cluster.FillHV(instance),
5802
        "be_instance": instance.beparams,
5803
        "be_actual": cluster.FillBE(instance),
5804
        }
5805

    
5806
      result[instance.name] = idict
5807

    
5808
    return result
5809

    
5810

    
5811
class LUSetInstanceParams(LogicalUnit):
5812
  """Modifies an instances's parameters.
5813

5814
  """
5815
  HPATH = "instance-modify"
5816
  HTYPE = constants.HTYPE_INSTANCE
5817
  _OP_REQP = ["instance_name"]
5818
  REQ_BGL = False
5819

    
5820
  def CheckArguments(self):
5821
    if not hasattr(self.op, 'nics'):
5822
      self.op.nics = []
5823
    if not hasattr(self.op, 'disks'):
5824
      self.op.disks = []
5825
    if not hasattr(self.op, 'beparams'):
5826
      self.op.beparams = {}
5827
    if not hasattr(self.op, 'hvparams'):
5828
      self.op.hvparams = {}
5829
    self.op.force = getattr(self.op, "force", False)
5830
    if not (self.op.nics or self.op.disks or
5831
            self.op.hvparams or self.op.beparams):
5832
      raise errors.OpPrereqError("No changes submitted")
5833

    
5834
    # Disk validation
5835
    disk_addremove = 0
5836
    for disk_op, disk_dict in self.op.disks:
5837
      if disk_op == constants.DDM_REMOVE:
5838
        disk_addremove += 1
5839
        continue
5840
      elif disk_op == constants.DDM_ADD:
5841
        disk_addremove += 1
5842
      else:
5843
        if not isinstance(disk_op, int):
5844
          raise errors.OpPrereqError("Invalid disk index")
5845
      if disk_op == constants.DDM_ADD:
5846
        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5847
        if mode not in constants.DISK_ACCESS_SET:
5848
          raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5849
        size = disk_dict.get('size', None)
5850
        if size is None:
5851
          raise errors.OpPrereqError("Required disk parameter size missing")
5852
        try:
5853
          size = int(size)
5854
        except ValueError, err:
5855
          raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5856
                                     str(err))
5857
        disk_dict['size'] = size
5858
      else:
5859
        # modification of disk
5860
        if 'size' in disk_dict:
5861
          raise errors.OpPrereqError("Disk size change not possible, use"
5862
                                     " grow-disk")
5863

    
5864
    if disk_addremove > 1:
5865
      raise errors.OpPrereqError("Only one disk add or remove operation"
5866
                                 " supported at a time")
5867

    
5868
    # NIC validation
5869
    nic_addremove = 0
5870
    for nic_op, nic_dict in self.op.nics:
5871
      if nic_op == constants.DDM_REMOVE:
5872
        nic_addremove += 1
5873
        continue
5874
      elif nic_op == constants.DDM_ADD:
5875
        nic_addremove += 1
5876
      else:
5877
        if not isinstance(nic_op, int):
5878
          raise errors.OpPrereqError("Invalid nic index")
5879

    
5880
      # nic_dict should be a dict
5881
      nic_ip = nic_dict.get('ip', None)
5882
      if nic_ip is not None:
5883
        if nic_ip.lower() == constants.VALUE_NONE:
5884
          nic_dict['ip'] = None
5885
        else:
5886
          if not utils.IsValidIP(nic_ip):
5887
            raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5888

    
5889
      if nic_op == constants.DDM_ADD:
5890
        nic_bridge = nic_dict.get('bridge', None)
5891
        if nic_bridge is None:
5892
          nic_dict['bridge'] = self.cfg.GetDefBridge()
5893
        nic_mac = nic_dict.get('mac', None)
5894
        if nic_mac is None:
5895
          nic_dict['mac'] = constants.VALUE_AUTO
5896

    
5897
      if 'mac' in nic_dict:
5898
        nic_mac = nic_dict['mac']
5899
        if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5900
          if not utils.IsValidMac(nic_mac):
5901
            raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5902
        if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5903
          raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5904
                                     " modifying an existing nic")
5905

    
5906
    if nic_addremove > 1:
5907
      raise errors.OpPrereqError("Only one NIC add or remove operation"
5908
                                 " supported at a time")
5909

    
5910
  def ExpandNames(self):
5911
    self._ExpandAndLockInstance()
5912
    self.needed_locks[locking.LEVEL_NODE] = []
5913
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5914

    
5915
  def DeclareLocks(self, level):
5916
    if level == locking.LEVEL_NODE:
5917
      self._LockInstancesNodes()
5918

    
5919
  def BuildHooksEnv(self):
5920
    """Build hooks env.
5921

5922
    This runs on the master, primary and secondaries.
5923

5924
    """
5925
    args = dict()
5926
    if constants.BE_MEMORY in self.be_new:
5927
      args['memory'] = self.be_new[constants.BE_MEMORY]
5928
    if constants.BE_VCPUS in self.be_new:
5929
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
5930
    # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5931
    # information at all.
5932
    if self.op.nics:
5933
      args['nics'] = []
5934
      nic_override = dict(self.op.nics)
5935
      for idx, nic in enumerate(self.instance.nics):
5936
        if idx in nic_override:
5937
          this_nic_override = nic_override[idx]
5938
        else:
5939
          this_nic_override = {}
5940
        if 'ip' in this_nic_override:
5941
          ip = this_nic_override['ip']
5942
        else:
5943
          ip = nic.ip
5944
        if 'bridge' in this_nic_override:
5945
          bridge = this_nic_override['bridge']
5946
        else:
5947
          bridge = nic.bridge
5948
        if 'mac' in this_nic_override:
5949
          mac = this_nic_override['mac']
5950
        else:
5951
          mac = nic.mac
5952
        args['nics'].append((ip, bridge, mac))
5953
      if constants.DDM_ADD in nic_override:
5954
        ip = nic_override[constants.DDM_ADD].get('ip', None)
5955
        bridge = nic_override[constants.DDM_ADD]['bridge']
5956
        mac = nic_override[constants.DDM_ADD]['mac']
5957
        args['nics'].append((ip, bridge, mac))
5958
      elif constants.DDM_REMOVE in nic_override:
5959
        del args['nics'][-1]
5960

    
5961
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5962
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5963
    return env, nl, nl
5964

    
5965
  def CheckPrereq(self):
5966
    """Check prerequisites.
5967

5968
    This only checks the instance list against the existing names.
5969

5970
    """
5971
    self.force = self.op.force
5972

    
5973
    # checking the new params on the primary/secondary nodes
5974

    
5975
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5976
    assert self.instance is not None, \
5977
      "Cannot retrieve locked instance %s" % self.op.instance_name
5978
    pnode = instance.primary_node
5979
    nodelist = list(instance.all_nodes)
5980

    
5981
    # hvparams processing
5982
    if self.op.hvparams:
5983
      i_hvdict = copy.deepcopy(instance.hvparams)
5984
      for key, val in self.op.hvparams.iteritems():
5985
        if val == constants.VALUE_DEFAULT:
5986
          try:
5987
            del i_hvdict[key]
5988
          except KeyError:
5989
            pass
5990
        else:
5991
          i_hvdict[key] = val
5992
      cluster = self.cfg.GetClusterInfo()
5993
      utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5994
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5995
                                i_hvdict)
5996
      # local check
5997
      hypervisor.GetHypervisor(
5998
        instance.hypervisor).CheckParameterSyntax(hv_new)
5999
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6000
      self.hv_new = hv_new # the new actual values
6001
      self.hv_inst = i_hvdict # the new dict (without defaults)
6002
    else:
6003
      self.hv_new = self.hv_inst = {}
6004

    
6005
    # beparams processing
6006
    if self.op.beparams:
6007
      i_bedict = copy.deepcopy(instance.beparams)
6008
      for key, val in self.op.beparams.iteritems():
6009
        if val == constants.VALUE_DEFAULT:
6010
          try:
6011
            del i_bedict[key]
6012
          except KeyError:
6013
            pass
6014
        else:
6015
          i_bedict[key] = val
6016
      cluster = self.cfg.GetClusterInfo()
6017
      utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
6018
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
6019
                                i_bedict)
6020
      self.be_new = be_new # the new actual values
6021
      self.be_inst = i_bedict # the new dict (without defaults)
6022
    else:
6023
      self.be_new = self.be_inst = {}
6024

    
6025
    self.warn = []
6026

    
6027
    if constants.BE_MEMORY in self.op.beparams and not self.force:
6028
      mem_check_list = [pnode]
6029
      if be_new[constants.BE_AUTO_BALANCE]:
6030
        # either we changed auto_balance to yes or it was from before
6031
        mem_check_list.extend(instance.secondary_nodes)
6032
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
6033
                                                  instance.hypervisor)
6034
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6035
                                         instance.hypervisor)
6036
      if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6037
        # Assume the primary node is unreachable and go ahead
6038
        self.warn.append("Can't get info from primary node %s" % pnode)
6039
      else:
6040
        if not instance_info.failed and instance_info.data:
6041
          current_mem = int(instance_info.data['memory'])
6042
        else:
6043
          # Assume instance not running
6044
          # (there is a slight race condition here, but it's not very probable,
6045
          # and we have no other way to check)
6046
          current_mem = 0
6047
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6048
                    nodeinfo[pnode].data['memory_free'])
6049
        if miss_mem > 0:
6050
          raise errors.OpPrereqError("This change will prevent the instance"
6051
                                     " from starting, due to %d MB of memory"
6052
                                     " missing on its primary node" % miss_mem)
6053

    
6054
      if be_new[constants.BE_AUTO_BALANCE]:
6055
        for node, nres in nodeinfo.iteritems():
6056
          if node not in instance.secondary_nodes:
6057
            continue
6058
          if nres.failed or not isinstance(nres.data, dict):
6059
            self.warn.append("Can't get info from secondary node %s" % node)
6060
          elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6061
            self.warn.append("Not enough memory to failover instance to"
6062
                             " secondary node %s" % node)
6063

    
6064
    # NIC processing
6065
    for nic_op, nic_dict in self.op.nics:
6066
      if nic_op == constants.DDM_REMOVE:
6067
        if not instance.nics:
6068
          raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6069
        continue
6070
      if nic_op != constants.DDM_ADD:
6071
        # an existing nic
6072
        if nic_op < 0 or nic_op >= len(instance.nics):
6073
          raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6074
                                     " are 0 to %d" %
6075
                                     (nic_op, len(instance.nics)))
6076
      if 'bridge' in nic_dict:
6077
        nic_bridge = nic_dict['bridge']
6078
        if nic_bridge is None:
6079
          raise errors.OpPrereqError('Cannot set the nic bridge to None')
6080
        if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6081
          msg = ("Bridge '%s' doesn't exist on one of"
6082
                 " the instance nodes" % nic_bridge)
6083
          if self.force:
6084
            self.warn.append(msg)
6085
          else:
6086
            raise errors.OpPrereqError(msg)
6087
      if 'mac' in nic_dict:
6088
        nic_mac = nic_dict['mac']
6089
        if nic_mac is None:
6090
          raise errors.OpPrereqError('Cannot set the nic mac to None')
6091
        elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6092
          # otherwise generate the mac
6093
          nic_dict['mac'] = self.cfg.GenerateMAC()
6094
        else:
6095
          # or validate/reserve the current one
6096
          if self.cfg.IsMacInUse(nic_mac):
6097
            raise errors.OpPrereqError("MAC address %s already in use"
6098
                                       " in cluster" % nic_mac)
6099

    
6100
    # DISK processing
6101
    if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6102
      raise errors.OpPrereqError("Disk operations not supported for"
6103
                                 " diskless instances")
6104
    for disk_op, disk_dict in self.op.disks:
6105
      if disk_op == constants.DDM_REMOVE:
6106
        if len(instance.disks) == 1:
6107
          raise errors.OpPrereqError("Cannot remove the last disk of"
6108
                                     " an instance")
6109
        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6110
        ins_l = ins_l[pnode]
6111
        if ins_l.failed or not isinstance(ins_l.data, list):
6112
          raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6113
        if instance.name in ins_l.data:
6114
          raise errors.OpPrereqError("Instance is running, can't remove"
6115
                                     " disks.")
6116

    
6117
      if (disk_op == constants.DDM_ADD and
6118
          len(instance.nics) >= constants.MAX_DISKS):
6119
        raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6120
                                   " add more" % constants.MAX_DISKS)
6121
      if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6122
        # an existing disk
6123
        if disk_op < 0 or disk_op >= len(instance.disks):
6124
          raise errors.OpPrereqError("Invalid disk index %s, valid values"
6125
                                     " are 0 to %d" %
6126
                                     (disk_op, len(instance.disks)))
6127

    
6128
    return
6129

    
6130
  def Exec(self, feedback_fn):
6131
    """Modifies an instance.
6132

6133
    All parameters take effect only at the next restart of the instance.
6134

6135
    """
6136
    # Process here the warnings from CheckPrereq, as we don't have a
6137
    # feedback_fn there.
6138
    for warn in self.warn:
6139
      feedback_fn("WARNING: %s" % warn)
6140

    
6141
    result = []
6142
    instance = self.instance
6143
    # disk changes
6144
    for disk_op, disk_dict in self.op.disks:
6145
      if disk_op == constants.DDM_REMOVE:
6146
        # remove the last disk
6147
        device = instance.disks.pop()
6148
        device_idx = len(instance.disks)
6149
        for node, disk in device.ComputeNodeTree(instance.primary_node):
6150
          self.cfg.SetDiskID(disk, node)
6151
          msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6152
          if msg:
6153
            self.LogWarning("Could not remove disk/%d on node %s: %s,"
6154
                            " continuing anyway", device_idx, node, msg)
6155
        result.append(("disk/%d" % device_idx, "remove"))
6156
      elif disk_op == constants.DDM_ADD:
6157
        # add a new disk
6158
        if instance.disk_template == constants.DT_FILE:
6159
          file_driver, file_path = instance.disks[0].logical_id
6160
          file_path = os.path.dirname(file_path)
6161
        else:
6162
          file_driver = file_path = None
6163
        disk_idx_base = len(instance.disks)
6164
        new_disk = _GenerateDiskTemplate(self,
6165
                                         instance.disk_template,
6166
                                         instance.name, instance.primary_node,
6167
                                         instance.secondary_nodes,
6168
                                         [disk_dict],
6169
                                         file_path,
6170
                                         file_driver,
6171
                                         disk_idx_base)[0]
6172
        instance.disks.append(new_disk)
6173
        info = _GetInstanceInfoText(instance)
6174

    
6175
        logging.info("Creating volume %s for instance %s",
6176
                     new_disk.iv_name, instance.name)
6177
        # Note: this needs to be kept in sync with _CreateDisks
6178
        #HARDCODE
6179
        for node in instance.all_nodes:
6180
          f_create = node == instance.primary_node
6181
          try:
6182
            _CreateBlockDev(self, node, instance, new_disk,
6183
                            f_create, info, f_create)
6184
          except errors.OpExecError, err:
6185
            self.LogWarning("Failed to create volume %s (%s) on"
6186
                            " node %s: %s",
6187
                            new_disk.iv_name, new_disk, node, err)
6188
        result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6189
                       (new_disk.size, new_disk.mode)))
6190
      else:
6191
        # change a given disk
6192
        instance.disks[disk_op].mode = disk_dict['mode']
6193
        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6194
    # NIC changes
6195
    for nic_op, nic_dict in self.op.nics:
6196
      if nic_op == constants.DDM_REMOVE:
6197
        # remove the last nic
6198
        del instance.nics[-1]
6199
        result.append(("nic.%d" % len(instance.nics), "remove"))
6200
      elif nic_op == constants.DDM_ADD:
6201
        # mac and bridge should be set, by now
6202
        mac = nic_dict['mac']
6203
        bridge = nic_dict['bridge']
6204
        new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6205
                              bridge=bridge)
6206
        instance.nics.append(new_nic)
6207
        result.append(("nic.%d" % (len(instance.nics) - 1),
6208
                       "add:mac=%s,ip=%s,bridge=%s" %
6209
                       (new_nic.mac, new_nic.ip, new_nic.bridge)))
6210
      else:
6211
        # change a given nic
6212
        for key in 'mac', 'ip', 'bridge':
6213
          if key in nic_dict:
6214
            setattr(instance.nics[nic_op], key, nic_dict[key])
6215
            result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6216

    
6217
    # hvparams changes
6218
    if self.op.hvparams:
6219
      instance.hvparams = self.hv_inst
6220
      for key, val in self.op.hvparams.iteritems():
6221
        result.append(("hv/%s" % key, val))
6222

    
6223
    # beparams changes
6224
    if self.op.beparams:
6225
      instance.beparams = self.be_inst
6226
      for key, val in self.op.beparams.iteritems():
6227
        result.append(("be/%s" % key, val))
6228

    
6229
    self.cfg.Update(instance)
6230

    
6231
    return result
6232

    
6233

    
6234
class LUQueryExports(NoHooksLU):
6235
  """Query the exports list
6236

6237
  """
6238
  _OP_REQP = ['nodes']
6239
  REQ_BGL = False
6240

    
6241
  def ExpandNames(self):
6242
    self.needed_locks = {}
6243
    self.share_locks[locking.LEVEL_NODE] = 1
6244
    if not self.op.nodes:
6245
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6246
    else:
6247
      self.needed_locks[locking.LEVEL_NODE] = \
6248
        _GetWantedNodes(self, self.op.nodes)
6249

    
6250
  def CheckPrereq(self):
6251
    """Check prerequisites.
6252

6253
    """
6254
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6255

    
6256
  def Exec(self, feedback_fn):
6257
    """Compute the list of all the exported system images.
6258

6259
    @rtype: dict
6260
    @return: a dictionary with the structure node->(export-list)
6261
        where export-list is a list of the instances exported on
6262
        that node.
6263

6264
    """
6265
    rpcresult = self.rpc.call_export_list(self.nodes)
6266
    result = {}
6267
    for node in rpcresult:
6268
      if rpcresult[node].failed:
6269
        result[node] = False
6270
      else:
6271
        result[node] = rpcresult[node].data
6272

    
6273
    return result
6274

    
6275

    
6276
class LUExportInstance(LogicalUnit):
6277
  """Export an instance to an image in the cluster.
6278

6279
  """
6280
  HPATH = "instance-export"
6281
  HTYPE = constants.HTYPE_INSTANCE
6282
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
6283
  REQ_BGL = False
6284

    
6285
  def ExpandNames(self):
6286
    self._ExpandAndLockInstance()
6287
    # FIXME: lock only instance primary and destination node
6288
    #
6289
    # Sad but true, for now we have do lock all nodes, as we don't know where
6290
    # the previous export might be, and and in this LU we search for it and
6291
    # remove it from its current node. In the future we could fix this by:
6292
    #  - making a tasklet to search (share-lock all), then create the new one,
6293
    #    then one to remove, after
6294
    #  - removing the removal operation altogether
6295
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6296

    
6297
  def DeclareLocks(self, level):
6298
    """Last minute lock declaration."""
6299
    # All nodes are locked anyway, so nothing to do here.
6300

    
6301
  def BuildHooksEnv(self):
6302
    """Build hooks env.
6303

6304
    This will run on the master, primary node and target node.
6305

6306
    """
6307
    env = {
6308
      "EXPORT_NODE": self.op.target_node,
6309
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6310
      }
6311
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6312
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6313
          self.op.target_node]
6314
    return env, nl, nl
6315

    
6316
  def CheckPrereq(self):
6317
    """Check prerequisites.
6318

6319
    This checks that the instance and node names are valid.
6320

6321
    """
6322
    instance_name = self.op.instance_name
6323
    self.instance = self.cfg.GetInstanceInfo(instance_name)
6324
    assert self.instance is not None, \
6325
          "Cannot retrieve locked instance %s" % self.op.instance_name
6326
    _CheckNodeOnline(self, self.instance.primary_node)
6327

    
6328
    self.dst_node = self.cfg.GetNodeInfo(
6329
      self.cfg.ExpandNodeName(self.op.target_node))
6330

    
6331
    if self.dst_node is None:
6332
      # This is wrong node name, not a non-locked node
6333
      raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6334
    _CheckNodeOnline(self, self.dst_node.name)
6335
    _CheckNodeNotDrained(self, self.dst_node.name)
6336

    
6337
    # instance disk type verification
6338
    for disk in self.instance.disks:
6339
      if disk.dev_type == constants.LD_FILE:
6340
        raise errors.OpPrereqError("Export not supported for instances with"
6341
                                   " file-based disks")
6342

    
6343
  def Exec(self, feedback_fn):
6344
    """Export an instance to an image in the cluster.
6345

6346
    """
6347
    instance = self.instance
6348
    dst_node = self.dst_node
6349
    src_node = instance.primary_node
6350
    if self.op.shutdown:
6351
      # shutdown the instance, but not the disks
6352
      result = self.rpc.call_instance_shutdown(src_node, instance)
6353
      msg = result.RemoteFailMsg()
6354
      if msg:
6355
        raise errors.OpExecError("Could not shutdown instance %s on"
6356
                                 " node %s: %s" %
6357
                                 (instance.name, src_node, msg))
6358

    
6359
    vgname = self.cfg.GetVGName()
6360

    
6361
    snap_disks = []
6362

    
6363
    # set the disks ID correctly since call_instance_start needs the
6364
    # correct drbd minor to create the symlinks
6365
    for disk in instance.disks:
6366
      self.cfg.SetDiskID(disk, src_node)
6367

    
6368
    try:
6369
      for idx, disk in enumerate(instance.disks):
6370
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6371
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6372
        if new_dev_name.failed or not new_dev_name.data:
6373
          self.LogWarning("Could not snapshot disk/%d on node %s",
6374
                          idx, src_node)
6375
          snap_disks.append(False)
6376
        else:
6377
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6378
                                 logical_id=(vgname, new_dev_name.data),
6379
                                 physical_id=(vgname, new_dev_name.data),
6380
                                 iv_name=disk.iv_name)
6381
          snap_disks.append(new_dev)
6382

    
6383
    finally:
6384
      if self.op.shutdown and instance.admin_up:
6385
        result = self.rpc.call_instance_start(src_node, instance, None, None)
6386
        msg = result.RemoteFailMsg()
6387
        if msg:
6388
          _ShutdownInstanceDisks(self, instance)
6389
          raise errors.OpExecError("Could not start instance: %s" % msg)
6390

    
6391
    # TODO: check for size
6392

    
6393
    cluster_name = self.cfg.GetClusterName()
6394
    for idx, dev in enumerate(snap_disks):
6395
      if dev:
6396
        result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6397
                                               instance, cluster_name, idx)
6398
        if result.failed or not result.data:
6399
          self.LogWarning("Could not export disk/%d from node %s to"
6400
                          " node %s", idx, src_node, dst_node.name)
6401
        msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6402
        if msg:
6403
          self.LogWarning("Could not remove snapshot for disk/%d from node"
6404
                          " %s: %s", idx, src_node, msg)
6405

    
6406
    result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6407
    if result.failed or not result.data:
6408
      self.LogWarning("Could not finalize export for instance %s on node %s",
6409
                      instance.name, dst_node.name)
6410

    
6411
    nodelist = self.cfg.GetNodeList()
6412
    nodelist.remove(dst_node.name)
6413

    
6414
    # on one-node clusters nodelist will be empty after the removal
6415
    # if we proceed the backup would be removed because OpQueryExports
6416
    # substitutes an empty list with the full cluster node list.
6417
    if nodelist:
6418
      exportlist = self.rpc.call_export_list(nodelist)
6419
      for node in exportlist:
6420
        if exportlist[node].failed:
6421
          continue
6422
        if instance.name in exportlist[node].data:
6423
          if not self.rpc.call_export_remove(node, instance.name):
6424
            self.LogWarning("Could not remove older export for instance %s"
6425
                            " on node %s", instance.name, node)
6426

    
6427

    
6428
class LURemoveExport(NoHooksLU):
6429
  """Remove exports related to the named instance.
6430

6431
  """
6432
  _OP_REQP = ["instance_name"]
6433
  REQ_BGL = False
6434

    
6435
  def ExpandNames(self):
6436
    self.needed_locks = {}
6437
    # We need all nodes to be locked in order for RemoveExport to work, but we
6438
    # don't need to lock the instance itself, as nothing will happen to it (and
6439
    # we can remove exports also for a removed instance)
6440
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6441

    
6442
  def CheckPrereq(self):
6443
    """Check prerequisites.
6444
    """
6445
    pass
6446

    
6447
  def Exec(self, feedback_fn):
6448
    """Remove any export.
6449

6450
    """
6451
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6452
    # If the instance was not found we'll try with the name that was passed in.
6453
    # This will only work if it was an FQDN, though.
6454
    fqdn_warn = False
6455
    if not instance_name:
6456
      fqdn_warn = True
6457
      instance_name = self.op.instance_name
6458

    
6459
    exportlist = self.rpc.call_export_list(self.acquired_locks[
6460
      locking.LEVEL_NODE])
6461
    found = False
6462
    for node in exportlist:
6463
      if exportlist[node].failed:
6464
        self.LogWarning("Failed to query node %s, continuing" % node)
6465
        continue
6466
      if instance_name in exportlist[node].data:
6467
        found = True
6468
        result = self.rpc.call_export_remove(node, instance_name)
6469
        if result.failed or not result.data:
6470
          logging.error("Could not remove export for instance %s"
6471
                        " on node %s", instance_name, node)
6472

    
6473
    if fqdn_warn and not found:
6474
      feedback_fn("Export not found. If trying to remove an export belonging"
6475
                  " to a deleted instance please use its Fully Qualified"
6476
                  " Domain Name.")
6477

    
6478

    
6479
class TagsLU(NoHooksLU):
6480
  """Generic tags LU.
6481

6482
  This is an abstract class which is the parent of all the other tags LUs.
6483

6484
  """
6485

    
6486
  def ExpandNames(self):
6487
    self.needed_locks = {}
6488
    if self.op.kind == constants.TAG_NODE:
6489
      name = self.cfg.ExpandNodeName(self.op.name)
6490
      if name is None:
6491
        raise errors.OpPrereqError("Invalid node name (%s)" %
6492
                                   (self.op.name,))
6493
      self.op.name = name
6494
      self.needed_locks[locking.LEVEL_NODE] = name
6495
    elif self.op.kind == constants.TAG_INSTANCE:
6496
      name = self.cfg.ExpandInstanceName(self.op.name)
6497
      if name is None:
6498
        raise errors.OpPrereqError("Invalid instance name (%s)" %
6499
                                   (self.op.name,))
6500
      self.op.name = name
6501
      self.needed_locks[locking.LEVEL_INSTANCE] = name
6502

    
6503
  def CheckPrereq(self):
6504
    """Check prerequisites.
6505

6506
    """
6507
    if self.op.kind == constants.TAG_CLUSTER:
6508
      self.target = self.cfg.GetClusterInfo()
6509
    elif self.op.kind == constants.TAG_NODE:
6510
      self.target = self.cfg.GetNodeInfo(self.op.name)
6511
    elif self.op.kind == constants.TAG_INSTANCE:
6512
      self.target = self.cfg.GetInstanceInfo(self.op.name)
6513
    else:
6514
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6515
                                 str(self.op.kind))
6516

    
6517

    
6518
class LUGetTags(TagsLU):
6519
  """Returns the tags of a given object.
6520

6521
  """
6522
  _OP_REQP = ["kind", "name"]
6523
  REQ_BGL = False
6524

    
6525
  def Exec(self, feedback_fn):
6526
    """Returns the tag list.
6527

6528
    """
6529
    return list(self.target.GetTags())
6530

    
6531

    
6532
class LUSearchTags(NoHooksLU):
6533
  """Searches the tags for a given pattern.
6534

6535
  """
6536
  _OP_REQP = ["pattern"]
6537
  REQ_BGL = False
6538

    
6539
  def ExpandNames(self):
6540
    self.needed_locks = {}
6541

    
6542
  def CheckPrereq(self):
6543
    """Check prerequisites.
6544

6545
    This checks the pattern passed for validity by compiling it.
6546

6547
    """
6548
    try:
6549
      self.re = re.compile(self.op.pattern)
6550
    except re.error, err:
6551
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6552
                                 (self.op.pattern, err))
6553

    
6554
  def Exec(self, feedback_fn):
6555
    """Returns the tag list.
6556

6557
    """
6558
    cfg = self.cfg
6559
    tgts = [("/cluster", cfg.GetClusterInfo())]
6560
    ilist = cfg.GetAllInstancesInfo().values()
6561
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6562
    nlist = cfg.GetAllNodesInfo().values()
6563
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6564
    results = []
6565
    for path, target in tgts:
6566
      for tag in target.GetTags():
6567
        if self.re.search(tag):
6568
          results.append((path, tag))
6569
    return results
6570

    
6571

    
6572
class LUAddTags(TagsLU):
6573
  """Sets a tag on a given object.
6574

6575
  """
6576
  _OP_REQP = ["kind", "name", "tags"]
6577
  REQ_BGL = False
6578

    
6579
  def CheckPrereq(self):
6580
    """Check prerequisites.
6581

6582
    This checks the type and length of the tag name and value.
6583

6584
    """
6585
    TagsLU.CheckPrereq(self)
6586
    for tag in self.op.tags:
6587
      objects.TaggableObject.ValidateTag(tag)
6588

    
6589
  def Exec(self, feedback_fn):
6590
    """Sets the tag.
6591

6592
    """
6593
    try:
6594
      for tag in self.op.tags:
6595
        self.target.AddTag(tag)
6596
    except errors.TagError, err:
6597
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
6598
    try:
6599
      self.cfg.Update(self.target)
6600
    except errors.ConfigurationError:
6601
      raise errors.OpRetryError("There has been a modification to the"
6602
                                " config file and the operation has been"
6603
                                " aborted. Please retry.")
6604

    
6605

    
6606
class LUDelTags(TagsLU):
6607
  """Delete a list of tags from a given object.
6608

6609
  """
6610
  _OP_REQP = ["kind", "name", "tags"]
6611
  REQ_BGL = False
6612

    
6613
  def CheckPrereq(self):
6614
    """Check prerequisites.
6615

6616
    This checks that we have the given tag.
6617

6618
    """
6619
    TagsLU.CheckPrereq(self)
6620
    for tag in self.op.tags:
6621
      objects.TaggableObject.ValidateTag(tag)
6622
    del_tags = frozenset(self.op.tags)
6623
    cur_tags = self.target.GetTags()
6624
    if not del_tags <= cur_tags:
6625
      diff_tags = del_tags - cur_tags
6626
      diff_names = ["'%s'" % tag for tag in diff_tags]
6627
      diff_names.sort()
6628
      raise errors.OpPrereqError("Tag(s) %s not found" %
6629
                                 (",".join(diff_names)))
6630

    
6631
  def Exec(self, feedback_fn):
6632
    """Remove the tag from the object.
6633

6634
    """
6635
    for tag in self.op.tags:
6636
      self.target.RemoveTag(tag)
6637
    try:
6638
      self.cfg.Update(self.target)
6639
    except errors.ConfigurationError:
6640
      raise errors.OpRetryError("There has been a modification to the"
6641
                                " config file and the operation has been"
6642
                                " aborted. Please retry.")
6643

    
6644

    
6645
class LUTestDelay(NoHooksLU):
6646
  """Sleep for a specified amount of time.
6647

6648
  This LU sleeps on the master and/or nodes for a specified amount of
6649
  time.
6650

6651
  """
6652
  _OP_REQP = ["duration", "on_master", "on_nodes"]
6653
  REQ_BGL = False
6654

    
6655
  def ExpandNames(self):
6656
    """Expand names and set required locks.
6657

6658
    This expands the node list, if any.
6659

6660
    """
6661
    self.needed_locks = {}
6662
    if self.op.on_nodes:
6663
      # _GetWantedNodes can be used here, but is not always appropriate to use
6664
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6665
      # more information.
6666
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6667
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6668

    
6669
  def CheckPrereq(self):
6670
    """Check prerequisites.
6671

6672
    """
6673

    
6674
  def Exec(self, feedback_fn):
6675
    """Do the actual sleep.
6676

6677
    """
6678
    if self.op.on_master:
6679
      if not utils.TestDelay(self.op.duration):
6680
        raise errors.OpExecError("Error during master delay test")
6681
    if self.op.on_nodes:
6682
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6683
      if not result:
6684
        raise errors.OpExecError("Complete failure from rpc call")
6685
      for node, node_result in result.items():
6686
        node_result.Raise()
6687
        if not node_result.data:
6688
          raise errors.OpExecError("Failure during rpc call to node %s,"
6689
                                   " result: %s" % (node, node_result.data))
6690

    
6691

    
6692
class IAllocator(object):
6693
  """IAllocator framework.
6694

6695
  An IAllocator instance has three sets of attributes:
6696
    - cfg that is needed to query the cluster
6697
    - input data (all members of the _KEYS class attribute are required)
6698
    - four buffer attributes (in|out_data|text), that represent the
6699
      input (to the external script) in text and data structure format,
6700
      and the output from it, again in two formats
6701
    - the result variables from the script (success, info, nodes) for
6702
      easy usage
6703

6704
  """
6705
  _ALLO_KEYS = [
6706
    "mem_size", "disks", "disk_template",
6707
    "os", "tags", "nics", "vcpus", "hypervisor",
6708
    ]
6709
  _RELO_KEYS = [
6710
    "relocate_from",
6711
    ]
6712

    
6713
  def __init__(self, lu, mode, name, **kwargs):
6714
    self.lu = lu
6715
    # init buffer variables
6716
    self.in_text = self.out_text = self.in_data = self.out_data = None
6717
    # init all input fields so that pylint is happy
6718
    self.mode = mode
6719
    self.name = name
6720
    self.mem_size = self.disks = self.disk_template = None
6721
    self.os = self.tags = self.nics = self.vcpus = None
6722
    self.hypervisor = None
6723
    self.relocate_from = None
6724
    # computed fields
6725
    self.required_nodes = None
6726
    # init result fields
6727
    self.success = self.info = self.nodes = None
6728
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6729
      keyset = self._ALLO_KEYS
6730
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6731
      keyset = self._RELO_KEYS
6732
    else:
6733
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6734
                                   " IAllocator" % self.mode)
6735
    for key in kwargs:
6736
      if key not in keyset:
6737
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
6738
                                     " IAllocator" % key)
6739
      setattr(self, key, kwargs[key])
6740
    for key in keyset:
6741
      if key not in kwargs:
6742
        raise errors.ProgrammerError("Missing input parameter '%s' to"
6743
                                     " IAllocator" % key)
6744
    self._BuildInputData()
6745

    
6746
  def _ComputeClusterData(self):
6747
    """Compute the generic allocator input data.
6748

6749
    This is the data that is independent of the actual operation.
6750

6751
    """
6752
    cfg = self.lu.cfg
6753
    cluster_info = cfg.GetClusterInfo()
6754
    # cluster data
6755
    data = {
6756
      "version": constants.IALLOCATOR_VERSION,
6757
      "cluster_name": cfg.GetClusterName(),
6758
      "cluster_tags": list(cluster_info.GetTags()),
6759
      "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6760
      # we don't have job IDs
6761
      }
6762
    iinfo = cfg.GetAllInstancesInfo().values()
6763
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6764

    
6765
    # node data
6766
    node_results = {}
6767
    node_list = cfg.GetNodeList()
6768

    
6769
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6770
      hypervisor_name = self.hypervisor
6771
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6772
      hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6773

    
6774
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6775
                                           hypervisor_name)
6776
    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6777
                       cluster_info.enabled_hypervisors)
6778
    for nname, nresult in node_data.items():
6779
      # first fill in static (config-based) values
6780
      ninfo = cfg.GetNodeInfo(nname)
6781
      pnr = {
6782
        "tags": list(ninfo.GetTags()),
6783
        "primary_ip": ninfo.primary_ip,
6784
        "secondary_ip": ninfo.secondary_ip,
6785
        "offline": ninfo.offline,
6786
        "drained": ninfo.drained,
6787
        "master_candidate": ninfo.master_candidate,
6788
        }
6789

    
6790
      if not ninfo.offline:
6791
        nresult.Raise()
6792
        if not isinstance(nresult.data, dict):
6793
          raise errors.OpExecError("Can't get data for node %s" % nname)
6794
        remote_info = nresult.data
6795
        for attr in ['memory_total', 'memory_free', 'memory_dom0',
6796
                     'vg_size', 'vg_free', 'cpu_total']:
6797
          if attr not in remote_info:
6798
            raise errors.OpExecError("Node '%s' didn't return attribute"
6799
                                     " '%s'" % (nname, attr))
6800
          try:
6801
            remote_info[attr] = int(remote_info[attr])
6802
          except ValueError, err:
6803
            raise errors.OpExecError("Node '%s' returned invalid value"
6804
                                     " for '%s': %s" % (nname, attr, err))
6805
        # compute memory used by primary instances
6806
        i_p_mem = i_p_up_mem = 0
6807
        for iinfo, beinfo in i_list:
6808
          if iinfo.primary_node == nname:
6809
            i_p_mem += beinfo[constants.BE_MEMORY]
6810
            if iinfo.name not in node_iinfo[nname].data:
6811
              i_used_mem = 0
6812
            else:
6813
              i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6814
            i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6815
            remote_info['memory_free'] -= max(0, i_mem_diff)
6816

    
6817
            if iinfo.admin_up:
6818
              i_p_up_mem += beinfo[constants.BE_MEMORY]
6819

    
6820
        # compute memory used by instances
6821
        pnr_dyn = {
6822
          "total_memory": remote_info['memory_total'],
6823
          "reserved_memory": remote_info['memory_dom0'],
6824
          "free_memory": remote_info['memory_free'],
6825
          "total_disk": remote_info['vg_size'],
6826
          "free_disk": remote_info['vg_free'],
6827
          "total_cpus": remote_info['cpu_total'],
6828
          "i_pri_memory": i_p_mem,
6829
          "i_pri_up_memory": i_p_up_mem,
6830
          }
6831
        pnr.update(pnr_dyn)
6832

    
6833
      node_results[nname] = pnr
6834
    data["nodes"] = node_results
6835

    
6836
    # instance data
6837
    instance_data = {}
6838
    for iinfo, beinfo in i_list:
6839
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6840
                  for n in iinfo.nics]
6841
      pir = {
6842
        "tags": list(iinfo.GetTags()),
6843
        "admin_up": iinfo.admin_up,
6844
        "vcpus": beinfo[constants.BE_VCPUS],
6845
        "memory": beinfo[constants.BE_MEMORY],
6846
        "os": iinfo.os,
6847
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6848
        "nics": nic_data,
6849
        "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6850
        "disk_template": iinfo.disk_template,
6851
        "hypervisor": iinfo.hypervisor,
6852
        }
6853
      pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6854
                                                 pir["disks"])
6855
      instance_data[iinfo.name] = pir
6856

    
6857
    data["instances"] = instance_data
6858

    
6859
    self.in_data = data
6860

    
6861
  def _AddNewInstance(self):
6862
    """Add new instance data to allocator structure.
6863

6864
    This in combination with _AllocatorGetClusterData will create the
6865
    correct structure needed as input for the allocator.
6866

6867
    The checks for the completeness of the opcode must have already been
6868
    done.
6869

6870
    """
6871
    data = self.in_data
6872

    
6873
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6874

    
6875
    if self.disk_template in constants.DTS_NET_MIRROR:
6876
      self.required_nodes = 2
6877
    else:
6878
      self.required_nodes = 1
6879
    request = {
6880
      "type": "allocate",
6881
      "name": self.name,
6882
      "disk_template": self.disk_template,
6883
      "tags": self.tags,
6884
      "os": self.os,
6885
      "vcpus": self.vcpus,
6886
      "memory": self.mem_size,
6887
      "disks": self.disks,
6888
      "disk_space_total": disk_space,
6889
      "nics": self.nics,
6890
      "required_nodes": self.required_nodes,
6891
      }
6892
    data["request"] = request
6893

    
6894
  def _AddRelocateInstance(self):
6895
    """Add relocate instance data to allocator structure.
6896

6897
    This in combination with _IAllocatorGetClusterData will create the
6898
    correct structure needed as input for the allocator.
6899

6900
    The checks for the completeness of the opcode must have already been
6901
    done.
6902

6903
    """
6904
    instance = self.lu.cfg.GetInstanceInfo(self.name)
6905
    if instance is None:
6906
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
6907
                                   " IAllocator" % self.name)
6908

    
6909
    if instance.disk_template not in constants.DTS_NET_MIRROR:
6910
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6911

    
6912
    if len(instance.secondary_nodes) != 1:
6913
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
6914

    
6915
    self.required_nodes = 1
6916
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
6917
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6918

    
6919
    request = {
6920
      "type": "relocate",
6921
      "name": self.name,
6922
      "disk_space_total": disk_space,
6923
      "required_nodes": self.required_nodes,
6924
      "relocate_from": self.relocate_from,
6925
      }
6926
    self.in_data["request"] = request
6927

    
6928
  def _BuildInputData(self):
6929
    """Build input data structures.
6930

6931
    """
6932
    self._ComputeClusterData()
6933

    
6934
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6935
      self._AddNewInstance()
6936
    else:
6937
      self._AddRelocateInstance()
6938

    
6939
    self.in_text = serializer.Dump(self.in_data)
6940

    
6941
  def Run(self, name, validate=True, call_fn=None):
6942
    """Run an instance allocator and return the results.
6943

6944
    """
6945
    if call_fn is None:
6946
      call_fn = self.lu.rpc.call_iallocator_runner
6947

    
6948
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6949
    result.Raise()
6950

    
6951
    if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6952
      raise errors.OpExecError("Invalid result from master iallocator runner")
6953

    
6954
    rcode, stdout, stderr, fail = result.data
6955

    
6956
    if rcode == constants.IARUN_NOTFOUND:
6957
      raise errors.OpExecError("Can't find allocator '%s'" % name)
6958
    elif rcode == constants.IARUN_FAILURE:
6959
      raise errors.OpExecError("Instance allocator call failed: %s,"
6960
                               " output: %s" % (fail, stdout+stderr))
6961
    self.out_text = stdout
6962
    if validate:
6963
      self._ValidateResult()
6964

    
6965
  def _ValidateResult(self):
6966
    """Process the allocator results.
6967

6968
    This will process and if successful save the result in
6969
    self.out_data and the other parameters.
6970

6971
    """
6972
    try:
6973
      rdict = serializer.Load(self.out_text)
6974
    except Exception, err:
6975
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6976

    
6977
    if not isinstance(rdict, dict):
6978
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
6979

    
6980
    for key in "success", "info", "nodes":
6981
      if key not in rdict:
6982
        raise errors.OpExecError("Can't parse iallocator results:"
6983
                                 " missing key '%s'" % key)
6984
      setattr(self, key, rdict[key])
6985

    
6986
    if not isinstance(rdict["nodes"], list):
6987
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6988
                               " is not a list")
6989
    self.out_data = rdict
6990

    
6991

    
6992
class LUTestAllocator(NoHooksLU):
6993
  """Run allocator tests.
6994

6995
  This LU runs the allocator tests
6996

6997
  """
6998
  _OP_REQP = ["direction", "mode", "name"]
6999

    
7000
  def CheckPrereq(self):
7001
    """Check prerequisites.
7002

7003
    This checks the opcode parameters depending on the director and mode test.
7004

7005
    """
7006
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7007
      for attr in ["name", "mem_size", "disks", "disk_template",
7008
                   "os", "tags", "nics", "vcpus"]:
7009
        if not hasattr(self.op, attr):
7010
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7011
                                     attr)
7012
      iname = self.cfg.ExpandInstanceName(self.op.name)
7013
      if iname is not None:
7014
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7015
                                   iname)
7016
      if not isinstance(self.op.nics, list):
7017
        raise errors.OpPrereqError("Invalid parameter 'nics'")
7018
      for row in self.op.nics:
7019
        if (not isinstance(row, dict) or
7020
            "mac" not in row or
7021
            "ip" not in row or
7022
            "bridge" not in row):
7023
          raise errors.OpPrereqError("Invalid contents of the"
7024
                                     " 'nics' parameter")
7025
      if not isinstance(self.op.disks, list):
7026
        raise errors.OpPrereqError("Invalid parameter 'disks'")
7027
      for row in self.op.disks:
7028
        if (not isinstance(row, dict) or
7029
            "size" not in row or
7030
            not isinstance(row["size"], int) or
7031
            "mode" not in row or
7032
            row["mode"] not in ['r', 'w']):
7033
          raise errors.OpPrereqError("Invalid contents of the"
7034
                                     " 'disks' parameter")
7035
      if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7036
        self.op.hypervisor = self.cfg.GetHypervisorType()
7037
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7038
      if not hasattr(self.op, "name"):
7039
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7040
      fname = self.cfg.ExpandInstanceName(self.op.name)
7041
      if fname is None:
7042
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7043
                                   self.op.name)
7044
      self.op.name = fname
7045
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7046
    else:
7047
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7048
                                 self.op.mode)
7049

    
7050
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7051
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
7052
        raise errors.OpPrereqError("Missing allocator name")
7053
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7054
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
7055
                                 self.op.direction)
7056

    
7057
  def Exec(self, feedback_fn):
7058
    """Run the allocator test.
7059

7060
    """
7061
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7062
      ial = IAllocator(self,
7063
                       mode=self.op.mode,
7064
                       name=self.op.name,
7065
                       mem_size=self.op.mem_size,
7066
                       disks=self.op.disks,
7067
                       disk_template=self.op.disk_template,
7068
                       os=self.op.os,
7069
                       tags=self.op.tags,
7070
                       nics=self.op.nics,
7071
                       vcpus=self.op.vcpus,
7072
                       hypervisor=self.op.hypervisor,
7073
                       )
7074
    else:
7075
      ial = IAllocator(self,
7076
                       mode=self.op.mode,
7077
                       name=self.op.name,
7078
                       relocate_from=list(self.relocate_from),
7079
                       )
7080

    
7081
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
7082
      result = ial.in_text
7083
    else:
7084
      ial.Run(self.op.allocator, validate=False)
7085
      result = ial.out_text
7086
    return result