Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 6bf01bbb

History | View | Annotate | Download (179.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_WSSTORE: the LU needs a writable SimpleStore
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_WSSTORE = False
69
  REQ_BGL = True
70

    
71
  def __init__(self, processor, op, context, sstore):
72
    """Constructor for LogicalUnit.
73

74
    This needs to be overriden in derived classes in order to check op
75
    validity.
76

77
    """
78
    self.proc = processor
79
    self.op = op
80
    self.cfg = context.cfg
81
    self.sstore = sstore
82
    self.context = context
83
    self.needed_locks = None
84
    self.acquired_locks = {}
85
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89

    
90
    for attr_name in self._OP_REQP:
91
      attr_val = getattr(op, attr_name, None)
92
      if attr_val is None:
93
        raise errors.OpPrereqError("Required parameter '%s' missing" %
94
                                   attr_name)
95

    
96
    if not self.cfg.IsCluster():
97
      raise errors.OpPrereqError("Cluster not initialized yet,"
98
                                 " use 'gnt-cluster init' first.")
99
    if self.REQ_MASTER:
100
      master = sstore.GetMasterNode()
101
      if master != utils.HostInfo().name:
102
        raise errors.OpPrereqError("Commands must be run on the master"
103
                                   " node %s" % master)
104

    
105
  def __GetSSH(self):
106
    """Returns the SshRunner object
107

108
    """
109
    if not self.__ssh:
110
      self.__ssh = ssh.SshRunner(self.sstore)
111
    return self.__ssh
112

    
113
  ssh = property(fget=__GetSSH)
114

    
115
  def ExpandNames(self):
116
    """Expand names for this LU.
117

118
    This method is called before starting to execute the opcode, and it should
119
    update all the parameters of the opcode to their canonical form (e.g. a
120
    short node name must be fully expanded after this method has successfully
121
    completed). This way locking, hooks, logging, ecc. can work correctly.
122

123
    LUs which implement this method must also populate the self.needed_locks
124
    member, as a dict with lock levels as keys, and a list of needed lock names
125
    as values. Rules:
126
      - Use an empty dict if you don't need any lock
127
      - If you don't need any lock at a particular level omit that level
128
      - Don't put anything for the BGL level
129
      - If you want all locks at a level use None as a value
130
        (this reflects what LockSet does, and will be replaced before
131
        CheckPrereq with the full list of nodes that have been locked)
132

133
    If you need to share locks (rather than acquire them exclusively) at one
134
    level you can modify self.share_locks, setting a true value (usually 1) for
135
    that level. By default locks are not shared.
136

137
    Examples:
138
    # Acquire all nodes and one instance
139
    self.needed_locks = {
140
      locking.LEVEL_NODE: None,
141
      locking.LEVEL_INSTANCES: ['instance1.example.tld'],
142
    }
143
    # Acquire just two nodes
144
    self.needed_locks = {
145
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
146
    }
147
    # Acquire no locks
148
    self.needed_locks = {} # No, you can't leave it to the default value None
149

150
    """
151
    # The implementation of this method is mandatory only if the new LU is
152
    # concurrent, so that old LUs don't need to be changed all at the same
153
    # time.
154
    if self.REQ_BGL:
155
      self.needed_locks = {} # Exclusive LUs don't need locks.
156
    else:
157
      raise NotImplementedError
158

    
159
  def DeclareLocks(self, level):
160
    """Declare LU locking needs for a level
161

162
    While most LUs can just declare their locking needs at ExpandNames time,
163
    sometimes there's the need to calculate some locks after having acquired
164
    the ones before. This function is called just before acquiring locks at a
165
    particular level, but after acquiring the ones at lower levels, and permits
166
    such calculations. It can be used to modify self.needed_locks, and by
167
    default it does nothing.
168

169
    This function is only called if you have something already set in
170
    self.needed_locks for the level.
171

172
    @param level: Locking level which is going to be locked
173
    @type level: member of ganeti.locking.LEVELS
174

175
    """
176

    
177
  def CheckPrereq(self):
178
    """Check prerequisites for this LU.
179

180
    This method should check that the prerequisites for the execution
181
    of this LU are fulfilled. It can do internode communication, but
182
    it should be idempotent - no cluster or system changes are
183
    allowed.
184

185
    The method should raise errors.OpPrereqError in case something is
186
    not fulfilled. Its return value is ignored.
187

188
    This method should also update all the parameters of the opcode to
189
    their canonical form if it hasn't been done by ExpandNames before.
190

191
    """
192
    raise NotImplementedError
193

    
194
  def Exec(self, feedback_fn):
195
    """Execute the LU.
196

197
    This method should implement the actual work. It should raise
198
    errors.OpExecError for failures that are somewhat dealt with in
199
    code, or expected.
200

201
    """
202
    raise NotImplementedError
203

    
204
  def BuildHooksEnv(self):
205
    """Build hooks environment for this LU.
206

207
    This method should return a three-node tuple consisting of: a dict
208
    containing the environment that will be used for running the
209
    specific hook for this LU, a list of node names on which the hook
210
    should run before the execution, and a list of node names on which
211
    the hook should run after the execution.
212

213
    The keys of the dict must not have 'GANETI_' prefixed as this will
214
    be handled in the hooks runner. Also note additional keys will be
215
    added by the hooks runner. If the LU doesn't define any
216
    environment, an empty dict (and not None) should be returned.
217

218
    No nodes should be returned as an empty list (and not None).
219

220
    Note that if the HPATH for a LU class is None, this function will
221
    not be called.
222

223
    """
224
    raise NotImplementedError
225

    
226
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
227
    """Notify the LU about the results of its hooks.
228

229
    This method is called every time a hooks phase is executed, and notifies
230
    the Logical Unit about the hooks' result. The LU can then use it to alter
231
    its result based on the hooks.  By default the method does nothing and the
232
    previous result is passed back unchanged but any LU can define it if it
233
    wants to use the local cluster hook-scripts somehow.
234

235
    Args:
236
      phase: the hooks phase that has just been run
237
      hooks_results: the results of the multi-node hooks rpc call
238
      feedback_fn: function to send feedback back to the caller
239
      lu_result: the previous result this LU had, or None in the PRE phase.
240

241
    """
242
    return lu_result
243

    
244
  def _ExpandAndLockInstance(self):
245
    """Helper function to expand and lock an instance.
246

247
    Many LUs that work on an instance take its name in self.op.instance_name
248
    and need to expand it and then declare the expanded name for locking. This
249
    function does it, and then updates self.op.instance_name to the expanded
250
    name. It also initializes needed_locks as a dict, if this hasn't been done
251
    before.
252

253
    """
254
    if self.needed_locks is None:
255
      self.needed_locks = {}
256
    else:
257
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
258
        "_ExpandAndLockInstance called with instance-level locks set"
259
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
260
    if expanded_name is None:
261
      raise errors.OpPrereqError("Instance '%s' not known" %
262
                                  self.op.instance_name)
263
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
264
    self.op.instance_name = expanded_name
265

    
266
  def _LockInstancesNodes(self):
267
    """Helper function to declare instances' nodes for locking.
268

269
    This function should be called after locking one or more instances to lock
270
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
271
    with all primary or secondary nodes for instances already locked and
272
    present in self.needed_locks[locking.LEVEL_INSTANCE].
273

274
    It should be called from DeclareLocks, and for safety only works if
275
    self.recalculate_locks[locking.LEVEL_NODE] is set.
276

277
    In the future it may grow parameters to just lock some instance's nodes, or
278
    to just lock primaries or secondary nodes, if needed.
279

280
    If should be called in DeclareLocks in a way similar to:
281

282
    if level == locking.LEVEL_NODE:
283
      self._LockInstancesNodes()
284

285
    """
286
    assert locking.LEVEL_NODE in self.recalculate_locks, \
287
      "_LockInstancesNodes helper function called with no nodes to recalculate"
288

    
289
    # TODO: check if we're really been called with the instance locks held
290

    
291
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
292
    # future we might want to have different behaviors depending on the value
293
    # of self.recalculate_locks[locking.LEVEL_NODE]
294
    wanted_nodes = []
295
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
296
      instance = self.context.cfg.GetInstanceInfo(instance_name)
297
      wanted_nodes.append(instance.primary_node)
298
      wanted_nodes.extend(instance.secondary_nodes)
299
    self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
300

    
301
    del self.recalculate_locks[locking.LEVEL_NODE]
302

    
303

    
304
class NoHooksLU(LogicalUnit):
305
  """Simple LU which runs no hooks.
306

307
  This LU is intended as a parent for other LogicalUnits which will
308
  run no hooks, in order to reduce duplicate code.
309

310
  """
311
  HPATH = None
312
  HTYPE = None
313

    
314

    
315
def _GetWantedNodes(lu, nodes):
316
  """Returns list of checked and expanded node names.
317

318
  Args:
319
    nodes: List of nodes (strings) or None for all
320

321
  """
322
  if not isinstance(nodes, list):
323
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
324

    
325
  if nodes:
326
    wanted = []
327

    
328
    for name in nodes:
329
      node = lu.cfg.ExpandNodeName(name)
330
      if node is None:
331
        raise errors.OpPrereqError("No such node name '%s'" % name)
332
      wanted.append(node)
333

    
334
  else:
335
    wanted = lu.cfg.GetNodeList()
336
  return utils.NiceSort(wanted)
337

    
338

    
339
def _GetWantedInstances(lu, instances):
340
  """Returns list of checked and expanded instance names.
341

342
  Args:
343
    instances: List of instances (strings) or None for all
344

345
  """
346
  if not isinstance(instances, list):
347
    raise errors.OpPrereqError("Invalid argument type 'instances'")
348

    
349
  if instances:
350
    wanted = []
351

    
352
    for name in instances:
353
      instance = lu.cfg.ExpandInstanceName(name)
354
      if instance is None:
355
        raise errors.OpPrereqError("No such instance name '%s'" % name)
356
      wanted.append(instance)
357

    
358
  else:
359
    wanted = lu.cfg.GetInstanceList()
360
  return utils.NiceSort(wanted)
361

    
362

    
363
def _CheckOutputFields(static, dynamic, selected):
364
  """Checks whether all selected fields are valid.
365

366
  Args:
367
    static: Static fields
368
    dynamic: Dynamic fields
369

370
  """
371
  static_fields = frozenset(static)
372
  dynamic_fields = frozenset(dynamic)
373

    
374
  all_fields = static_fields | dynamic_fields
375

    
376
  if not all_fields.issuperset(selected):
377
    raise errors.OpPrereqError("Unknown output fields selected: %s"
378
                               % ",".join(frozenset(selected).
379
                                          difference(all_fields)))
380

    
381

    
382
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
383
                          memory, vcpus, nics):
384
  """Builds instance related env variables for hooks from single variables.
385

386
  Args:
387
    secondary_nodes: List of secondary nodes as strings
388
  """
389
  env = {
390
    "OP_TARGET": name,
391
    "INSTANCE_NAME": name,
392
    "INSTANCE_PRIMARY": primary_node,
393
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
394
    "INSTANCE_OS_TYPE": os_type,
395
    "INSTANCE_STATUS": status,
396
    "INSTANCE_MEMORY": memory,
397
    "INSTANCE_VCPUS": vcpus,
398
  }
399

    
400
  if nics:
401
    nic_count = len(nics)
402
    for idx, (ip, bridge, mac) in enumerate(nics):
403
      if ip is None:
404
        ip = ""
405
      env["INSTANCE_NIC%d_IP" % idx] = ip
406
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
407
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
408
  else:
409
    nic_count = 0
410

    
411
  env["INSTANCE_NIC_COUNT"] = nic_count
412

    
413
  return env
414

    
415

    
416
def _BuildInstanceHookEnvByObject(instance, override=None):
417
  """Builds instance related env variables for hooks from an object.
418

419
  Args:
420
    instance: objects.Instance object of instance
421
    override: dict of values to override
422
  """
423
  args = {
424
    'name': instance.name,
425
    'primary_node': instance.primary_node,
426
    'secondary_nodes': instance.secondary_nodes,
427
    'os_type': instance.os,
428
    'status': instance.os,
429
    'memory': instance.memory,
430
    'vcpus': instance.vcpus,
431
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
432
  }
433
  if override:
434
    args.update(override)
435
  return _BuildInstanceHookEnv(**args)
436

    
437

    
438
def _CheckInstanceBridgesExist(instance):
439
  """Check that the brigdes needed by an instance exist.
440

441
  """
442
  # check bridges existance
443
  brlist = [nic.bridge for nic in instance.nics]
444
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
445
    raise errors.OpPrereqError("one or more target bridges %s does not"
446
                               " exist on destination node '%s'" %
447
                               (brlist, instance.primary_node))
448

    
449

    
450
class LUDestroyCluster(NoHooksLU):
451
  """Logical unit for destroying the cluster.
452

453
  """
454
  _OP_REQP = []
455

    
456
  def CheckPrereq(self):
457
    """Check prerequisites.
458

459
    This checks whether the cluster is empty.
460

461
    Any errors are signalled by raising errors.OpPrereqError.
462

463
    """
464
    master = self.sstore.GetMasterNode()
465

    
466
    nodelist = self.cfg.GetNodeList()
467
    if len(nodelist) != 1 or nodelist[0] != master:
468
      raise errors.OpPrereqError("There are still %d node(s) in"
469
                                 " this cluster." % (len(nodelist) - 1))
470
    instancelist = self.cfg.GetInstanceList()
471
    if instancelist:
472
      raise errors.OpPrereqError("There are still %d instance(s) in"
473
                                 " this cluster." % len(instancelist))
474

    
475
  def Exec(self, feedback_fn):
476
    """Destroys the cluster.
477

478
    """
479
    master = self.sstore.GetMasterNode()
480
    if not rpc.call_node_stop_master(master, False):
481
      raise errors.OpExecError("Could not disable the master role")
482
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
483
    utils.CreateBackup(priv_key)
484
    utils.CreateBackup(pub_key)
485
    return master
486

    
487

    
488
class LUVerifyCluster(LogicalUnit):
489
  """Verifies the cluster status.
490

491
  """
492
  HPATH = "cluster-verify"
493
  HTYPE = constants.HTYPE_CLUSTER
494
  _OP_REQP = ["skip_checks"]
495

    
496
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
497
                  remote_version, feedback_fn):
498
    """Run multiple tests against a node.
499

500
    Test list:
501
      - compares ganeti version
502
      - checks vg existance and size > 20G
503
      - checks config file checksum
504
      - checks ssh to other nodes
505

506
    Args:
507
      node: name of the node to check
508
      file_list: required list of files
509
      local_cksum: dictionary of local files and their checksums
510

511
    """
512
    # compares ganeti version
513
    local_version = constants.PROTOCOL_VERSION
514
    if not remote_version:
515
      feedback_fn("  - ERROR: connection to %s failed" % (node))
516
      return True
517

    
518
    if local_version != remote_version:
519
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
520
                      (local_version, node, remote_version))
521
      return True
522

    
523
    # checks vg existance and size > 20G
524

    
525
    bad = False
526
    if not vglist:
527
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
528
                      (node,))
529
      bad = True
530
    else:
531
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
532
                                            constants.MIN_VG_SIZE)
533
      if vgstatus:
534
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
535
        bad = True
536

    
537
    # checks config file checksum
538
    # checks ssh to any
539

    
540
    if 'filelist' not in node_result:
541
      bad = True
542
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
543
    else:
544
      remote_cksum = node_result['filelist']
545
      for file_name in file_list:
546
        if file_name not in remote_cksum:
547
          bad = True
548
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
549
        elif remote_cksum[file_name] != local_cksum[file_name]:
550
          bad = True
551
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
552

    
553
    if 'nodelist' not in node_result:
554
      bad = True
555
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
556
    else:
557
      if node_result['nodelist']:
558
        bad = True
559
        for node in node_result['nodelist']:
560
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
561
                          (node, node_result['nodelist'][node]))
562
    if 'node-net-test' not in node_result:
563
      bad = True
564
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
565
    else:
566
      if node_result['node-net-test']:
567
        bad = True
568
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
569
        for node in nlist:
570
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
571
                          (node, node_result['node-net-test'][node]))
572

    
573
    hyp_result = node_result.get('hypervisor', None)
574
    if hyp_result is not None:
575
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
576
    return bad
577

    
578
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
579
                      node_instance, feedback_fn):
580
    """Verify an instance.
581

582
    This function checks to see if the required block devices are
583
    available on the instance's node.
584

585
    """
586
    bad = False
587

    
588
    node_current = instanceconfig.primary_node
589

    
590
    node_vol_should = {}
591
    instanceconfig.MapLVsByNode(node_vol_should)
592

    
593
    for node in node_vol_should:
594
      for volume in node_vol_should[node]:
595
        if node not in node_vol_is or volume not in node_vol_is[node]:
596
          feedback_fn("  - ERROR: volume %s missing on node %s" %
597
                          (volume, node))
598
          bad = True
599

    
600
    if not instanceconfig.status == 'down':
601
      if (node_current not in node_instance or
602
          not instance in node_instance[node_current]):
603
        feedback_fn("  - ERROR: instance %s not running on node %s" %
604
                        (instance, node_current))
605
        bad = True
606

    
607
    for node in node_instance:
608
      if (not node == node_current):
609
        if instance in node_instance[node]:
610
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
611
                          (instance, node))
612
          bad = True
613

    
614
    return bad
615

    
616
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
617
    """Verify if there are any unknown volumes in the cluster.
618

619
    The .os, .swap and backup volumes are ignored. All other volumes are
620
    reported as unknown.
621

622
    """
623
    bad = False
624

    
625
    for node in node_vol_is:
626
      for volume in node_vol_is[node]:
627
        if node not in node_vol_should or volume not in node_vol_should[node]:
628
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
629
                      (volume, node))
630
          bad = True
631
    return bad
632

    
633
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
634
    """Verify the list of running instances.
635

636
    This checks what instances are running but unknown to the cluster.
637

638
    """
639
    bad = False
640
    for node in node_instance:
641
      for runninginstance in node_instance[node]:
642
        if runninginstance not in instancelist:
643
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
644
                          (runninginstance, node))
645
          bad = True
646
    return bad
647

    
648
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
649
    """Verify N+1 Memory Resilience.
650

651
    Check that if one single node dies we can still start all the instances it
652
    was primary for.
653

654
    """
655
    bad = False
656

    
657
    for node, nodeinfo in node_info.iteritems():
658
      # This code checks that every node which is now listed as secondary has
659
      # enough memory to host all instances it is supposed to should a single
660
      # other node in the cluster fail.
661
      # FIXME: not ready for failover to an arbitrary node
662
      # FIXME: does not support file-backed instances
663
      # WARNING: we currently take into account down instances as well as up
664
      # ones, considering that even if they're down someone might want to start
665
      # them even in the event of a node failure.
666
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
667
        needed_mem = 0
668
        for instance in instances:
669
          needed_mem += instance_cfg[instance].memory
670
        if nodeinfo['mfree'] < needed_mem:
671
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
672
                      " failovers should node %s fail" % (node, prinode))
673
          bad = True
674
    return bad
675

    
676
  def CheckPrereq(self):
677
    """Check prerequisites.
678

679
    Transform the list of checks we're going to skip into a set and check that
680
    all its members are valid.
681

682
    """
683
    self.skip_set = frozenset(self.op.skip_checks)
684
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
685
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
686

    
687
  def BuildHooksEnv(self):
688
    """Build hooks env.
689

690
    Cluster-Verify hooks just rone in the post phase and their failure makes
691
    the output be logged in the verify output and the verification to fail.
692

693
    """
694
    all_nodes = self.cfg.GetNodeList()
695
    # TODO: populate the environment with useful information for verify hooks
696
    env = {}
697
    return env, [], all_nodes
698

    
699
  def Exec(self, feedback_fn):
700
    """Verify integrity of cluster, performing various test on nodes.
701

702
    """
703
    bad = False
704
    feedback_fn("* Verifying global settings")
705
    for msg in self.cfg.VerifyConfig():
706
      feedback_fn("  - ERROR: %s" % msg)
707

    
708
    vg_name = self.cfg.GetVGName()
709
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
710
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
711
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
712
    i_non_redundant = [] # Non redundant instances
713
    node_volume = {}
714
    node_instance = {}
715
    node_info = {}
716
    instance_cfg = {}
717

    
718
    # FIXME: verify OS list
719
    # do local checksums
720
    file_names = list(self.sstore.GetFileList())
721
    file_names.append(constants.SSL_CERT_FILE)
722
    file_names.append(constants.CLUSTER_CONF_FILE)
723
    local_checksums = utils.FingerprintFiles(file_names)
724

    
725
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
726
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
727
    all_instanceinfo = rpc.call_instance_list(nodelist)
728
    all_vglist = rpc.call_vg_list(nodelist)
729
    node_verify_param = {
730
      'filelist': file_names,
731
      'nodelist': nodelist,
732
      'hypervisor': None,
733
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
734
                        for node in nodeinfo]
735
      }
736
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
737
    all_rversion = rpc.call_version(nodelist)
738
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
739

    
740
    for node in nodelist:
741
      feedback_fn("* Verifying node %s" % node)
742
      result = self._VerifyNode(node, file_names, local_checksums,
743
                                all_vglist[node], all_nvinfo[node],
744
                                all_rversion[node], feedback_fn)
745
      bad = bad or result
746

    
747
      # node_volume
748
      volumeinfo = all_volumeinfo[node]
749

    
750
      if isinstance(volumeinfo, basestring):
751
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
752
                    (node, volumeinfo[-400:].encode('string_escape')))
753
        bad = True
754
        node_volume[node] = {}
755
      elif not isinstance(volumeinfo, dict):
756
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
757
        bad = True
758
        continue
759
      else:
760
        node_volume[node] = volumeinfo
761

    
762
      # node_instance
763
      nodeinstance = all_instanceinfo[node]
764
      if type(nodeinstance) != list:
765
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
766
        bad = True
767
        continue
768

    
769
      node_instance[node] = nodeinstance
770

    
771
      # node_info
772
      nodeinfo = all_ninfo[node]
773
      if not isinstance(nodeinfo, dict):
774
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
775
        bad = True
776
        continue
777

    
778
      try:
779
        node_info[node] = {
780
          "mfree": int(nodeinfo['memory_free']),
781
          "dfree": int(nodeinfo['vg_free']),
782
          "pinst": [],
783
          "sinst": [],
784
          # dictionary holding all instances this node is secondary for,
785
          # grouped by their primary node. Each key is a cluster node, and each
786
          # value is a list of instances which have the key as primary and the
787
          # current node as secondary.  this is handy to calculate N+1 memory
788
          # availability if you can only failover from a primary to its
789
          # secondary.
790
          "sinst-by-pnode": {},
791
        }
792
      except ValueError:
793
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
794
        bad = True
795
        continue
796

    
797
    node_vol_should = {}
798

    
799
    for instance in instancelist:
800
      feedback_fn("* Verifying instance %s" % instance)
801
      inst_config = self.cfg.GetInstanceInfo(instance)
802
      result =  self._VerifyInstance(instance, inst_config, node_volume,
803
                                     node_instance, feedback_fn)
804
      bad = bad or result
805

    
806
      inst_config.MapLVsByNode(node_vol_should)
807

    
808
      instance_cfg[instance] = inst_config
809

    
810
      pnode = inst_config.primary_node
811
      if pnode in node_info:
812
        node_info[pnode]['pinst'].append(instance)
813
      else:
814
        feedback_fn("  - ERROR: instance %s, connection to primary node"
815
                    " %s failed" % (instance, pnode))
816
        bad = True
817

    
818
      # If the instance is non-redundant we cannot survive losing its primary
819
      # node, so we are not N+1 compliant. On the other hand we have no disk
820
      # templates with more than one secondary so that situation is not well
821
      # supported either.
822
      # FIXME: does not support file-backed instances
823
      if len(inst_config.secondary_nodes) == 0:
824
        i_non_redundant.append(instance)
825
      elif len(inst_config.secondary_nodes) > 1:
826
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
827
                    % instance)
828

    
829
      for snode in inst_config.secondary_nodes:
830
        if snode in node_info:
831
          node_info[snode]['sinst'].append(instance)
832
          if pnode not in node_info[snode]['sinst-by-pnode']:
833
            node_info[snode]['sinst-by-pnode'][pnode] = []
834
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
835
        else:
836
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
837
                      " %s failed" % (instance, snode))
838

    
839
    feedback_fn("* Verifying orphan volumes")
840
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
841
                                       feedback_fn)
842
    bad = bad or result
843

    
844
    feedback_fn("* Verifying remaining instances")
845
    result = self._VerifyOrphanInstances(instancelist, node_instance,
846
                                         feedback_fn)
847
    bad = bad or result
848

    
849
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
850
      feedback_fn("* Verifying N+1 Memory redundancy")
851
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
852
      bad = bad or result
853

    
854
    feedback_fn("* Other Notes")
855
    if i_non_redundant:
856
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
857
                  % len(i_non_redundant))
858

    
859
    return not bad
860

    
861
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
862
    """Analize the post-hooks' result, handle it, and send some
863
    nicely-formatted feedback back to the user.
864

865
    Args:
866
      phase: the hooks phase that has just been run
867
      hooks_results: the results of the multi-node hooks rpc call
868
      feedback_fn: function to send feedback back to the caller
869
      lu_result: previous Exec result
870

871
    """
872
    # We only really run POST phase hooks, and are only interested in
873
    # their results
874
    if phase == constants.HOOKS_PHASE_POST:
875
      # Used to change hooks' output to proper indentation
876
      indent_re = re.compile('^', re.M)
877
      feedback_fn("* Hooks Results")
878
      if not hooks_results:
879
        feedback_fn("  - ERROR: general communication failure")
880
        lu_result = 1
881
      else:
882
        for node_name in hooks_results:
883
          show_node_header = True
884
          res = hooks_results[node_name]
885
          if res is False or not isinstance(res, list):
886
            feedback_fn("    Communication failure")
887
            lu_result = 1
888
            continue
889
          for script, hkr, output in res:
890
            if hkr == constants.HKR_FAIL:
891
              # The node header is only shown once, if there are
892
              # failing hooks on that node
893
              if show_node_header:
894
                feedback_fn("  Node %s:" % node_name)
895
                show_node_header = False
896
              feedback_fn("    ERROR: Script %s failed, output:" % script)
897
              output = indent_re.sub('      ', output)
898
              feedback_fn("%s" % output)
899
              lu_result = 1
900

    
901
      return lu_result
902

    
903

    
904
class LUVerifyDisks(NoHooksLU):
905
  """Verifies the cluster disks status.
906

907
  """
908
  _OP_REQP = []
909

    
910
  def CheckPrereq(self):
911
    """Check prerequisites.
912

913
    This has no prerequisites.
914

915
    """
916
    pass
917

    
918
  def Exec(self, feedback_fn):
919
    """Verify integrity of cluster disks.
920

921
    """
922
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
923

    
924
    vg_name = self.cfg.GetVGName()
925
    nodes = utils.NiceSort(self.cfg.GetNodeList())
926
    instances = [self.cfg.GetInstanceInfo(name)
927
                 for name in self.cfg.GetInstanceList()]
928

    
929
    nv_dict = {}
930
    for inst in instances:
931
      inst_lvs = {}
932
      if (inst.status != "up" or
933
          inst.disk_template not in constants.DTS_NET_MIRROR):
934
        continue
935
      inst.MapLVsByNode(inst_lvs)
936
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
937
      for node, vol_list in inst_lvs.iteritems():
938
        for vol in vol_list:
939
          nv_dict[(node, vol)] = inst
940

    
941
    if not nv_dict:
942
      return result
943

    
944
    node_lvs = rpc.call_volume_list(nodes, vg_name)
945

    
946
    to_act = set()
947
    for node in nodes:
948
      # node_volume
949
      lvs = node_lvs[node]
950

    
951
      if isinstance(lvs, basestring):
952
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
953
        res_nlvm[node] = lvs
954
      elif not isinstance(lvs, dict):
955
        logger.Info("connection to node %s failed or invalid data returned" %
956
                    (node,))
957
        res_nodes.append(node)
958
        continue
959

    
960
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
961
        inst = nv_dict.pop((node, lv_name), None)
962
        if (not lv_online and inst is not None
963
            and inst.name not in res_instances):
964
          res_instances.append(inst.name)
965

    
966
    # any leftover items in nv_dict are missing LVs, let's arrange the
967
    # data better
968
    for key, inst in nv_dict.iteritems():
969
      if inst.name not in res_missing:
970
        res_missing[inst.name] = []
971
      res_missing[inst.name].append(key)
972

    
973
    return result
974

    
975

    
976
class LURenameCluster(LogicalUnit):
977
  """Rename the cluster.
978

979
  """
980
  HPATH = "cluster-rename"
981
  HTYPE = constants.HTYPE_CLUSTER
982
  _OP_REQP = ["name"]
983
  REQ_WSSTORE = True
984

    
985
  def BuildHooksEnv(self):
986
    """Build hooks env.
987

988
    """
989
    env = {
990
      "OP_TARGET": self.sstore.GetClusterName(),
991
      "NEW_NAME": self.op.name,
992
      }
993
    mn = self.sstore.GetMasterNode()
994
    return env, [mn], [mn]
995

    
996
  def CheckPrereq(self):
997
    """Verify that the passed name is a valid one.
998

999
    """
1000
    hostname = utils.HostInfo(self.op.name)
1001

    
1002
    new_name = hostname.name
1003
    self.ip = new_ip = hostname.ip
1004
    old_name = self.sstore.GetClusterName()
1005
    old_ip = self.sstore.GetMasterIP()
1006
    if new_name == old_name and new_ip == old_ip:
1007
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1008
                                 " cluster has changed")
1009
    if new_ip != old_ip:
1010
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1011
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1012
                                   " reachable on the network. Aborting." %
1013
                                   new_ip)
1014

    
1015
    self.op.name = new_name
1016

    
1017
  def Exec(self, feedback_fn):
1018
    """Rename the cluster.
1019

1020
    """
1021
    clustername = self.op.name
1022
    ip = self.ip
1023
    ss = self.sstore
1024

    
1025
    # shutdown the master IP
1026
    master = ss.GetMasterNode()
1027
    if not rpc.call_node_stop_master(master, False):
1028
      raise errors.OpExecError("Could not disable the master role")
1029

    
1030
    try:
1031
      # modify the sstore
1032
      ss.SetKey(ss.SS_MASTER_IP, ip)
1033
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1034

    
1035
      # Distribute updated ss config to all nodes
1036
      myself = self.cfg.GetNodeInfo(master)
1037
      dist_nodes = self.cfg.GetNodeList()
1038
      if myself.name in dist_nodes:
1039
        dist_nodes.remove(myself.name)
1040

    
1041
      logger.Debug("Copying updated ssconf data to all nodes")
1042
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1043
        fname = ss.KeyToFilename(keyname)
1044
        result = rpc.call_upload_file(dist_nodes, fname)
1045
        for to_node in dist_nodes:
1046
          if not result[to_node]:
1047
            logger.Error("copy of file %s to node %s failed" %
1048
                         (fname, to_node))
1049
    finally:
1050
      if not rpc.call_node_start_master(master, False):
1051
        logger.Error("Could not re-enable the master role on the master,"
1052
                     " please restart manually.")
1053

    
1054

    
1055
def _RecursiveCheckIfLVMBased(disk):
1056
  """Check if the given disk or its children are lvm-based.
1057

1058
  Args:
1059
    disk: ganeti.objects.Disk object
1060

1061
  Returns:
1062
    boolean indicating whether a LD_LV dev_type was found or not
1063

1064
  """
1065
  if disk.children:
1066
    for chdisk in disk.children:
1067
      if _RecursiveCheckIfLVMBased(chdisk):
1068
        return True
1069
  return disk.dev_type == constants.LD_LV
1070

    
1071

    
1072
class LUSetClusterParams(LogicalUnit):
1073
  """Change the parameters of the cluster.
1074

1075
  """
1076
  HPATH = "cluster-modify"
1077
  HTYPE = constants.HTYPE_CLUSTER
1078
  _OP_REQP = []
1079

    
1080
  def BuildHooksEnv(self):
1081
    """Build hooks env.
1082

1083
    """
1084
    env = {
1085
      "OP_TARGET": self.sstore.GetClusterName(),
1086
      "NEW_VG_NAME": self.op.vg_name,
1087
      }
1088
    mn = self.sstore.GetMasterNode()
1089
    return env, [mn], [mn]
1090

    
1091
  def CheckPrereq(self):
1092
    """Check prerequisites.
1093

1094
    This checks whether the given params don't conflict and
1095
    if the given volume group is valid.
1096

1097
    """
1098
    if not self.op.vg_name:
1099
      instances = [self.cfg.GetInstanceInfo(name)
1100
                   for name in self.cfg.GetInstanceList()]
1101
      for inst in instances:
1102
        for disk in inst.disks:
1103
          if _RecursiveCheckIfLVMBased(disk):
1104
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1105
                                       " lvm-based instances exist")
1106

    
1107
    # if vg_name not None, checks given volume group on all nodes
1108
    if self.op.vg_name:
1109
      node_list = self.cfg.GetNodeList()
1110
      vglist = rpc.call_vg_list(node_list)
1111
      for node in node_list:
1112
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1113
                                              constants.MIN_VG_SIZE)
1114
        if vgstatus:
1115
          raise errors.OpPrereqError("Error on node '%s': %s" %
1116
                                     (node, vgstatus))
1117

    
1118
  def Exec(self, feedback_fn):
1119
    """Change the parameters of the cluster.
1120

1121
    """
1122
    if self.op.vg_name != self.cfg.GetVGName():
1123
      self.cfg.SetVGName(self.op.vg_name)
1124
    else:
1125
      feedback_fn("Cluster LVM configuration already in desired"
1126
                  " state, not changing")
1127

    
1128

    
1129
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1130
  """Sleep and poll for an instance's disk to sync.
1131

1132
  """
1133
  if not instance.disks:
1134
    return True
1135

    
1136
  if not oneshot:
1137
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1138

    
1139
  node = instance.primary_node
1140

    
1141
  for dev in instance.disks:
1142
    cfgw.SetDiskID(dev, node)
1143

    
1144
  retries = 0
1145
  while True:
1146
    max_time = 0
1147
    done = True
1148
    cumul_degraded = False
1149
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1150
    if not rstats:
1151
      proc.LogWarning("Can't get any data from node %s" % node)
1152
      retries += 1
1153
      if retries >= 10:
1154
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1155
                                 " aborting." % node)
1156
      time.sleep(6)
1157
      continue
1158
    retries = 0
1159
    for i in range(len(rstats)):
1160
      mstat = rstats[i]
1161
      if mstat is None:
1162
        proc.LogWarning("Can't compute data for node %s/%s" %
1163
                        (node, instance.disks[i].iv_name))
1164
        continue
1165
      # we ignore the ldisk parameter
1166
      perc_done, est_time, is_degraded, _ = mstat
1167
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1168
      if perc_done is not None:
1169
        done = False
1170
        if est_time is not None:
1171
          rem_time = "%d estimated seconds remaining" % est_time
1172
          max_time = est_time
1173
        else:
1174
          rem_time = "no time estimate"
1175
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1176
                     (instance.disks[i].iv_name, perc_done, rem_time))
1177
    if done or oneshot:
1178
      break
1179

    
1180
    time.sleep(min(60, max_time))
1181

    
1182
  if done:
1183
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1184
  return not cumul_degraded
1185

    
1186

    
1187
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1188
  """Check that mirrors are not degraded.
1189

1190
  The ldisk parameter, if True, will change the test from the
1191
  is_degraded attribute (which represents overall non-ok status for
1192
  the device(s)) to the ldisk (representing the local storage status).
1193

1194
  """
1195
  cfgw.SetDiskID(dev, node)
1196
  if ldisk:
1197
    idx = 6
1198
  else:
1199
    idx = 5
1200

    
1201
  result = True
1202
  if on_primary or dev.AssembleOnSecondary():
1203
    rstats = rpc.call_blockdev_find(node, dev)
1204
    if not rstats:
1205
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1206
      result = False
1207
    else:
1208
      result = result and (not rstats[idx])
1209
  if dev.children:
1210
    for child in dev.children:
1211
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1212

    
1213
  return result
1214

    
1215

    
1216
class LUDiagnoseOS(NoHooksLU):
1217
  """Logical unit for OS diagnose/query.
1218

1219
  """
1220
  _OP_REQP = ["output_fields", "names"]
1221
  REQ_BGL = False
1222

    
1223
  def ExpandNames(self):
1224
    if self.op.names:
1225
      raise errors.OpPrereqError("Selective OS query not supported")
1226

    
1227
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1228
    _CheckOutputFields(static=[],
1229
                       dynamic=self.dynamic_fields,
1230
                       selected=self.op.output_fields)
1231

    
1232
    # Lock all nodes, in shared mode
1233
    self.needed_locks = {}
1234
    self.share_locks[locking.LEVEL_NODE] = 1
1235
    self.needed_locks[locking.LEVEL_NODE] = None
1236

    
1237
  def CheckPrereq(self):
1238
    """Check prerequisites.
1239

1240
    """
1241

    
1242
  @staticmethod
1243
  def _DiagnoseByOS(node_list, rlist):
1244
    """Remaps a per-node return list into an a per-os per-node dictionary
1245

1246
      Args:
1247
        node_list: a list with the names of all nodes
1248
        rlist: a map with node names as keys and OS objects as values
1249

1250
      Returns:
1251
        map: a map with osnames as keys and as value another map, with
1252
             nodes as
1253
             keys and list of OS objects as values
1254
             e.g. {"debian-etch": {"node1": [<object>,...],
1255
                                   "node2": [<object>,]}
1256
                  }
1257

1258
    """
1259
    all_os = {}
1260
    for node_name, nr in rlist.iteritems():
1261
      if not nr:
1262
        continue
1263
      for os_obj in nr:
1264
        if os_obj.name not in all_os:
1265
          # build a list of nodes for this os containing empty lists
1266
          # for each node in node_list
1267
          all_os[os_obj.name] = {}
1268
          for nname in node_list:
1269
            all_os[os_obj.name][nname] = []
1270
        all_os[os_obj.name][node_name].append(os_obj)
1271
    return all_os
1272

    
1273
  def Exec(self, feedback_fn):
1274
    """Compute the list of OSes.
1275

1276
    """
1277
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1278
    node_data = rpc.call_os_diagnose(node_list)
1279
    if node_data == False:
1280
      raise errors.OpExecError("Can't gather the list of OSes")
1281
    pol = self._DiagnoseByOS(node_list, node_data)
1282
    output = []
1283
    for os_name, os_data in pol.iteritems():
1284
      row = []
1285
      for field in self.op.output_fields:
1286
        if field == "name":
1287
          val = os_name
1288
        elif field == "valid":
1289
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1290
        elif field == "node_status":
1291
          val = {}
1292
          for node_name, nos_list in os_data.iteritems():
1293
            val[node_name] = [(v.status, v.path) for v in nos_list]
1294
        else:
1295
          raise errors.ParameterError(field)
1296
        row.append(val)
1297
      output.append(row)
1298

    
1299
    return output
1300

    
1301

    
1302
class LURemoveNode(LogicalUnit):
1303
  """Logical unit for removing a node.
1304

1305
  """
1306
  HPATH = "node-remove"
1307
  HTYPE = constants.HTYPE_NODE
1308
  _OP_REQP = ["node_name"]
1309

    
1310
  def BuildHooksEnv(self):
1311
    """Build hooks env.
1312

1313
    This doesn't run on the target node in the pre phase as a failed
1314
    node would then be impossible to remove.
1315

1316
    """
1317
    env = {
1318
      "OP_TARGET": self.op.node_name,
1319
      "NODE_NAME": self.op.node_name,
1320
      }
1321
    all_nodes = self.cfg.GetNodeList()
1322
    all_nodes.remove(self.op.node_name)
1323
    return env, all_nodes, all_nodes
1324

    
1325
  def CheckPrereq(self):
1326
    """Check prerequisites.
1327

1328
    This checks:
1329
     - the node exists in the configuration
1330
     - it does not have primary or secondary instances
1331
     - it's not the master
1332

1333
    Any errors are signalled by raising errors.OpPrereqError.
1334

1335
    """
1336
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1337
    if node is None:
1338
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1339

    
1340
    instance_list = self.cfg.GetInstanceList()
1341

    
1342
    masternode = self.sstore.GetMasterNode()
1343
    if node.name == masternode:
1344
      raise errors.OpPrereqError("Node is the master node,"
1345
                                 " you need to failover first.")
1346

    
1347
    for instance_name in instance_list:
1348
      instance = self.cfg.GetInstanceInfo(instance_name)
1349
      if node.name == instance.primary_node:
1350
        raise errors.OpPrereqError("Instance %s still running on the node,"
1351
                                   " please remove first." % instance_name)
1352
      if node.name in instance.secondary_nodes:
1353
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1354
                                   " please remove first." % instance_name)
1355
    self.op.node_name = node.name
1356
    self.node = node
1357

    
1358
  def Exec(self, feedback_fn):
1359
    """Removes the node from the cluster.
1360

1361
    """
1362
    node = self.node
1363
    logger.Info("stopping the node daemon and removing configs from node %s" %
1364
                node.name)
1365

    
1366
    self.context.RemoveNode(node.name)
1367

    
1368
    rpc.call_node_leave_cluster(node.name)
1369

    
1370

    
1371
class LUQueryNodes(NoHooksLU):
1372
  """Logical unit for querying nodes.
1373

1374
  """
1375
  _OP_REQP = ["output_fields", "names"]
1376
  REQ_BGL = False
1377

    
1378
  def ExpandNames(self):
1379
    self.dynamic_fields = frozenset([
1380
      "dtotal", "dfree",
1381
      "mtotal", "mnode", "mfree",
1382
      "bootid",
1383
      "ctotal",
1384
      ])
1385

    
1386
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1387
                               "pinst_list", "sinst_list",
1388
                               "pip", "sip", "tags"],
1389
                       dynamic=self.dynamic_fields,
1390
                       selected=self.op.output_fields)
1391

    
1392
    self.needed_locks = {}
1393
    self.share_locks[locking.LEVEL_NODE] = 1
1394
    # TODO: we could lock nodes only if the user asked for dynamic fields. For
1395
    # that we need atomic ways to get info for a group of nodes from the
1396
    # config, though.
1397
    if not self.op.names:
1398
      self.needed_locks[locking.LEVEL_NODE] = None
1399
    else:
1400
      self.needed_locks[locking.LEVEL_NODE] = \
1401
        _GetWantedNodes(self, self.op.names)
1402

    
1403
  def CheckPrereq(self):
1404
    """Check prerequisites.
1405

1406
    """
1407
    # This of course is valid only if we locked the nodes
1408
    self.wanted = self.acquired_locks[locking.LEVEL_NODE]
1409

    
1410
  def Exec(self, feedback_fn):
1411
    """Computes the list of nodes and their attributes.
1412

1413
    """
1414
    nodenames = self.wanted
1415
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1416

    
1417
    # begin data gathering
1418

    
1419
    if self.dynamic_fields.intersection(self.op.output_fields):
1420
      live_data = {}
1421
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1422
      for name in nodenames:
1423
        nodeinfo = node_data.get(name, None)
1424
        if nodeinfo:
1425
          live_data[name] = {
1426
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1427
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1428
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1429
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1430
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1431
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1432
            "bootid": nodeinfo['bootid'],
1433
            }
1434
        else:
1435
          live_data[name] = {}
1436
    else:
1437
      live_data = dict.fromkeys(nodenames, {})
1438

    
1439
    node_to_primary = dict([(name, set()) for name in nodenames])
1440
    node_to_secondary = dict([(name, set()) for name in nodenames])
1441

    
1442
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1443
                             "sinst_cnt", "sinst_list"))
1444
    if inst_fields & frozenset(self.op.output_fields):
1445
      instancelist = self.cfg.GetInstanceList()
1446

    
1447
      for instance_name in instancelist:
1448
        inst = self.cfg.GetInstanceInfo(instance_name)
1449
        if inst.primary_node in node_to_primary:
1450
          node_to_primary[inst.primary_node].add(inst.name)
1451
        for secnode in inst.secondary_nodes:
1452
          if secnode in node_to_secondary:
1453
            node_to_secondary[secnode].add(inst.name)
1454

    
1455
    # end data gathering
1456

    
1457
    output = []
1458
    for node in nodelist:
1459
      node_output = []
1460
      for field in self.op.output_fields:
1461
        if field == "name":
1462
          val = node.name
1463
        elif field == "pinst_list":
1464
          val = list(node_to_primary[node.name])
1465
        elif field == "sinst_list":
1466
          val = list(node_to_secondary[node.name])
1467
        elif field == "pinst_cnt":
1468
          val = len(node_to_primary[node.name])
1469
        elif field == "sinst_cnt":
1470
          val = len(node_to_secondary[node.name])
1471
        elif field == "pip":
1472
          val = node.primary_ip
1473
        elif field == "sip":
1474
          val = node.secondary_ip
1475
        elif field == "tags":
1476
          val = list(node.GetTags())
1477
        elif field in self.dynamic_fields:
1478
          val = live_data[node.name].get(field, None)
1479
        else:
1480
          raise errors.ParameterError(field)
1481
        node_output.append(val)
1482
      output.append(node_output)
1483

    
1484
    return output
1485

    
1486

    
1487
class LUQueryNodeVolumes(NoHooksLU):
1488
  """Logical unit for getting volumes on node(s).
1489

1490
  """
1491
  _OP_REQP = ["nodes", "output_fields"]
1492

    
1493
  def CheckPrereq(self):
1494
    """Check prerequisites.
1495

1496
    This checks that the fields required are valid output fields.
1497

1498
    """
1499
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1500

    
1501
    _CheckOutputFields(static=["node"],
1502
                       dynamic=["phys", "vg", "name", "size", "instance"],
1503
                       selected=self.op.output_fields)
1504

    
1505

    
1506
  def Exec(self, feedback_fn):
1507
    """Computes the list of nodes and their attributes.
1508

1509
    """
1510
    nodenames = self.nodes
1511
    volumes = rpc.call_node_volumes(nodenames)
1512

    
1513
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1514
             in self.cfg.GetInstanceList()]
1515

    
1516
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1517

    
1518
    output = []
1519
    for node in nodenames:
1520
      if node not in volumes or not volumes[node]:
1521
        continue
1522

    
1523
      node_vols = volumes[node][:]
1524
      node_vols.sort(key=lambda vol: vol['dev'])
1525

    
1526
      for vol in node_vols:
1527
        node_output = []
1528
        for field in self.op.output_fields:
1529
          if field == "node":
1530
            val = node
1531
          elif field == "phys":
1532
            val = vol['dev']
1533
          elif field == "vg":
1534
            val = vol['vg']
1535
          elif field == "name":
1536
            val = vol['name']
1537
          elif field == "size":
1538
            val = int(float(vol['size']))
1539
          elif field == "instance":
1540
            for inst in ilist:
1541
              if node not in lv_by_node[inst]:
1542
                continue
1543
              if vol['name'] in lv_by_node[inst][node]:
1544
                val = inst.name
1545
                break
1546
            else:
1547
              val = '-'
1548
          else:
1549
            raise errors.ParameterError(field)
1550
          node_output.append(str(val))
1551

    
1552
        output.append(node_output)
1553

    
1554
    return output
1555

    
1556

    
1557
class LUAddNode(LogicalUnit):
1558
  """Logical unit for adding node to the cluster.
1559

1560
  """
1561
  HPATH = "node-add"
1562
  HTYPE = constants.HTYPE_NODE
1563
  _OP_REQP = ["node_name"]
1564

    
1565
  def BuildHooksEnv(self):
1566
    """Build hooks env.
1567

1568
    This will run on all nodes before, and on all nodes + the new node after.
1569

1570
    """
1571
    env = {
1572
      "OP_TARGET": self.op.node_name,
1573
      "NODE_NAME": self.op.node_name,
1574
      "NODE_PIP": self.op.primary_ip,
1575
      "NODE_SIP": self.op.secondary_ip,
1576
      }
1577
    nodes_0 = self.cfg.GetNodeList()
1578
    nodes_1 = nodes_0 + [self.op.node_name, ]
1579
    return env, nodes_0, nodes_1
1580

    
1581
  def CheckPrereq(self):
1582
    """Check prerequisites.
1583

1584
    This checks:
1585
     - the new node is not already in the config
1586
     - it is resolvable
1587
     - its parameters (single/dual homed) matches the cluster
1588

1589
    Any errors are signalled by raising errors.OpPrereqError.
1590

1591
    """
1592
    node_name = self.op.node_name
1593
    cfg = self.cfg
1594

    
1595
    dns_data = utils.HostInfo(node_name)
1596

    
1597
    node = dns_data.name
1598
    primary_ip = self.op.primary_ip = dns_data.ip
1599
    secondary_ip = getattr(self.op, "secondary_ip", None)
1600
    if secondary_ip is None:
1601
      secondary_ip = primary_ip
1602
    if not utils.IsValidIP(secondary_ip):
1603
      raise errors.OpPrereqError("Invalid secondary IP given")
1604
    self.op.secondary_ip = secondary_ip
1605

    
1606
    node_list = cfg.GetNodeList()
1607
    if not self.op.readd and node in node_list:
1608
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1609
                                 node)
1610
    elif self.op.readd and node not in node_list:
1611
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1612

    
1613
    for existing_node_name in node_list:
1614
      existing_node = cfg.GetNodeInfo(existing_node_name)
1615

    
1616
      if self.op.readd and node == existing_node_name:
1617
        if (existing_node.primary_ip != primary_ip or
1618
            existing_node.secondary_ip != secondary_ip):
1619
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1620
                                     " address configuration as before")
1621
        continue
1622

    
1623
      if (existing_node.primary_ip == primary_ip or
1624
          existing_node.secondary_ip == primary_ip or
1625
          existing_node.primary_ip == secondary_ip or
1626
          existing_node.secondary_ip == secondary_ip):
1627
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1628
                                   " existing node %s" % existing_node.name)
1629

    
1630
    # check that the type of the node (single versus dual homed) is the
1631
    # same as for the master
1632
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1633
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1634
    newbie_singlehomed = secondary_ip == primary_ip
1635
    if master_singlehomed != newbie_singlehomed:
1636
      if master_singlehomed:
1637
        raise errors.OpPrereqError("The master has no private ip but the"
1638
                                   " new node has one")
1639
      else:
1640
        raise errors.OpPrereqError("The master has a private ip but the"
1641
                                   " new node doesn't have one")
1642

    
1643
    # checks reachablity
1644
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1645
      raise errors.OpPrereqError("Node not reachable by ping")
1646

    
1647
    if not newbie_singlehomed:
1648
      # check reachability from my secondary ip to newbie's secondary ip
1649
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1650
                           source=myself.secondary_ip):
1651
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1652
                                   " based ping to noded port")
1653

    
1654
    self.new_node = objects.Node(name=node,
1655
                                 primary_ip=primary_ip,
1656
                                 secondary_ip=secondary_ip)
1657

    
1658
  def Exec(self, feedback_fn):
1659
    """Adds the new node to the cluster.
1660

1661
    """
1662
    new_node = self.new_node
1663
    node = new_node.name
1664

    
1665
    # check connectivity
1666
    result = rpc.call_version([node])[node]
1667
    if result:
1668
      if constants.PROTOCOL_VERSION == result:
1669
        logger.Info("communication to node %s fine, sw version %s match" %
1670
                    (node, result))
1671
      else:
1672
        raise errors.OpExecError("Version mismatch master version %s,"
1673
                                 " node version %s" %
1674
                                 (constants.PROTOCOL_VERSION, result))
1675
    else:
1676
      raise errors.OpExecError("Cannot get version from the new node")
1677

    
1678
    # setup ssh on node
1679
    logger.Info("copy ssh key to node %s" % node)
1680
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1681
    keyarray = []
1682
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1683
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1684
                priv_key, pub_key]
1685

    
1686
    for i in keyfiles:
1687
      f = open(i, 'r')
1688
      try:
1689
        keyarray.append(f.read())
1690
      finally:
1691
        f.close()
1692

    
1693
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1694
                               keyarray[3], keyarray[4], keyarray[5])
1695

    
1696
    if not result:
1697
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1698

    
1699
    # Add node to our /etc/hosts, and add key to known_hosts
1700
    utils.AddHostToEtcHosts(new_node.name)
1701

    
1702
    if new_node.secondary_ip != new_node.primary_ip:
1703
      if not rpc.call_node_tcp_ping(new_node.name,
1704
                                    constants.LOCALHOST_IP_ADDRESS,
1705
                                    new_node.secondary_ip,
1706
                                    constants.DEFAULT_NODED_PORT,
1707
                                    10, False):
1708
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1709
                                 " you gave (%s). Please fix and re-run this"
1710
                                 " command." % new_node.secondary_ip)
1711

    
1712
    node_verify_list = [self.sstore.GetMasterNode()]
1713
    node_verify_param = {
1714
      'nodelist': [node],
1715
      # TODO: do a node-net-test as well?
1716
    }
1717

    
1718
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1719
    for verifier in node_verify_list:
1720
      if not result[verifier]:
1721
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1722
                                 " for remote verification" % verifier)
1723
      if result[verifier]['nodelist']:
1724
        for failed in result[verifier]['nodelist']:
1725
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1726
                      (verifier, result[verifier]['nodelist'][failed]))
1727
        raise errors.OpExecError("ssh/hostname verification failed.")
1728

    
1729
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1730
    # including the node just added
1731
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1732
    dist_nodes = self.cfg.GetNodeList()
1733
    if not self.op.readd:
1734
      dist_nodes.append(node)
1735
    if myself.name in dist_nodes:
1736
      dist_nodes.remove(myself.name)
1737

    
1738
    logger.Debug("Copying hosts and known_hosts to all nodes")
1739
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1740
      result = rpc.call_upload_file(dist_nodes, fname)
1741
      for to_node in dist_nodes:
1742
        if not result[to_node]:
1743
          logger.Error("copy of file %s to node %s failed" %
1744
                       (fname, to_node))
1745

    
1746
    to_copy = self.sstore.GetFileList()
1747
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1748
      to_copy.append(constants.VNC_PASSWORD_FILE)
1749
    for fname in to_copy:
1750
      result = rpc.call_upload_file([node], fname)
1751
      if not result[node]:
1752
        logger.Error("could not copy file %s to node %s" % (fname, node))
1753

    
1754
    if self.op.readd:
1755
      self.context.ReaddNode(new_node)
1756
    else:
1757
      self.context.AddNode(new_node)
1758

    
1759

    
1760
class LUQueryClusterInfo(NoHooksLU):
1761
  """Query cluster configuration.
1762

1763
  """
1764
  _OP_REQP = []
1765
  REQ_MASTER = False
1766
  REQ_BGL = False
1767

    
1768
  def ExpandNames(self):
1769
    self.needed_locks = {}
1770

    
1771
  def CheckPrereq(self):
1772
    """No prerequsites needed for this LU.
1773

1774
    """
1775
    pass
1776

    
1777
  def Exec(self, feedback_fn):
1778
    """Return cluster config.
1779

1780
    """
1781
    result = {
1782
      "name": self.sstore.GetClusterName(),
1783
      "software_version": constants.RELEASE_VERSION,
1784
      "protocol_version": constants.PROTOCOL_VERSION,
1785
      "config_version": constants.CONFIG_VERSION,
1786
      "os_api_version": constants.OS_API_VERSION,
1787
      "export_version": constants.EXPORT_VERSION,
1788
      "master": self.sstore.GetMasterNode(),
1789
      "architecture": (platform.architecture()[0], platform.machine()),
1790
      "hypervisor_type": self.sstore.GetHypervisorType(),
1791
      }
1792

    
1793
    return result
1794

    
1795

    
1796
class LUDumpClusterConfig(NoHooksLU):
1797
  """Return a text-representation of the cluster-config.
1798

1799
  """
1800
  _OP_REQP = []
1801
  REQ_BGL = False
1802

    
1803
  def ExpandNames(self):
1804
    self.needed_locks = {}
1805

    
1806
  def CheckPrereq(self):
1807
    """No prerequisites.
1808

1809
    """
1810
    pass
1811

    
1812
  def Exec(self, feedback_fn):
1813
    """Dump a representation of the cluster config to the standard output.
1814

1815
    """
1816
    return self.cfg.DumpConfig()
1817

    
1818

    
1819
class LUActivateInstanceDisks(NoHooksLU):
1820
  """Bring up an instance's disks.
1821

1822
  """
1823
  _OP_REQP = ["instance_name"]
1824

    
1825
  def CheckPrereq(self):
1826
    """Check prerequisites.
1827

1828
    This checks that the instance is in the cluster.
1829

1830
    """
1831
    instance = self.cfg.GetInstanceInfo(
1832
      self.cfg.ExpandInstanceName(self.op.instance_name))
1833
    if instance is None:
1834
      raise errors.OpPrereqError("Instance '%s' not known" %
1835
                                 self.op.instance_name)
1836
    self.instance = instance
1837

    
1838

    
1839
  def Exec(self, feedback_fn):
1840
    """Activate the disks.
1841

1842
    """
1843
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1844
    if not disks_ok:
1845
      raise errors.OpExecError("Cannot activate block devices")
1846

    
1847
    return disks_info
1848

    
1849

    
1850
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1851
  """Prepare the block devices for an instance.
1852

1853
  This sets up the block devices on all nodes.
1854

1855
  Args:
1856
    instance: a ganeti.objects.Instance object
1857
    ignore_secondaries: if true, errors on secondary nodes won't result
1858
                        in an error return from the function
1859

1860
  Returns:
1861
    false if the operation failed
1862
    list of (host, instance_visible_name, node_visible_name) if the operation
1863
         suceeded with the mapping from node devices to instance devices
1864
  """
1865
  device_info = []
1866
  disks_ok = True
1867
  iname = instance.name
1868
  # With the two passes mechanism we try to reduce the window of
1869
  # opportunity for the race condition of switching DRBD to primary
1870
  # before handshaking occured, but we do not eliminate it
1871

    
1872
  # The proper fix would be to wait (with some limits) until the
1873
  # connection has been made and drbd transitions from WFConnection
1874
  # into any other network-connected state (Connected, SyncTarget,
1875
  # SyncSource, etc.)
1876

    
1877
  # 1st pass, assemble on all nodes in secondary mode
1878
  for inst_disk in instance.disks:
1879
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1880
      cfg.SetDiskID(node_disk, node)
1881
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1882
      if not result:
1883
        logger.Error("could not prepare block device %s on node %s"
1884
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1885
        if not ignore_secondaries:
1886
          disks_ok = False
1887

    
1888
  # FIXME: race condition on drbd migration to primary
1889

    
1890
  # 2nd pass, do only the primary node
1891
  for inst_disk in instance.disks:
1892
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1893
      if node != instance.primary_node:
1894
        continue
1895
      cfg.SetDiskID(node_disk, node)
1896
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1897
      if not result:
1898
        logger.Error("could not prepare block device %s on node %s"
1899
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1900
        disks_ok = False
1901
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1902

    
1903
  # leave the disks configured for the primary node
1904
  # this is a workaround that would be fixed better by
1905
  # improving the logical/physical id handling
1906
  for disk in instance.disks:
1907
    cfg.SetDiskID(disk, instance.primary_node)
1908

    
1909
  return disks_ok, device_info
1910

    
1911

    
1912
def _StartInstanceDisks(cfg, instance, force):
1913
  """Start the disks of an instance.
1914

1915
  """
1916
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1917
                                           ignore_secondaries=force)
1918
  if not disks_ok:
1919
    _ShutdownInstanceDisks(instance, cfg)
1920
    if force is not None and not force:
1921
      logger.Error("If the message above refers to a secondary node,"
1922
                   " you can retry the operation using '--force'.")
1923
    raise errors.OpExecError("Disk consistency error")
1924

    
1925

    
1926
class LUDeactivateInstanceDisks(NoHooksLU):
1927
  """Shutdown an instance's disks.
1928

1929
  """
1930
  _OP_REQP = ["instance_name"]
1931

    
1932
  def CheckPrereq(self):
1933
    """Check prerequisites.
1934

1935
    This checks that the instance is in the cluster.
1936

1937
    """
1938
    instance = self.cfg.GetInstanceInfo(
1939
      self.cfg.ExpandInstanceName(self.op.instance_name))
1940
    if instance is None:
1941
      raise errors.OpPrereqError("Instance '%s' not known" %
1942
                                 self.op.instance_name)
1943
    self.instance = instance
1944

    
1945
  def Exec(self, feedback_fn):
1946
    """Deactivate the disks
1947

1948
    """
1949
    instance = self.instance
1950
    ins_l = rpc.call_instance_list([instance.primary_node])
1951
    ins_l = ins_l[instance.primary_node]
1952
    if not type(ins_l) is list:
1953
      raise errors.OpExecError("Can't contact node '%s'" %
1954
                               instance.primary_node)
1955

    
1956
    if self.instance.name in ins_l:
1957
      raise errors.OpExecError("Instance is running, can't shutdown"
1958
                               " block devices.")
1959

    
1960
    _ShutdownInstanceDisks(instance, self.cfg)
1961

    
1962

    
1963
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1964
  """Shutdown block devices of an instance.
1965

1966
  This does the shutdown on all nodes of the instance.
1967

1968
  If the ignore_primary is false, errors on the primary node are
1969
  ignored.
1970

1971
  """
1972
  result = True
1973
  for disk in instance.disks:
1974
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1975
      cfg.SetDiskID(top_disk, node)
1976
      if not rpc.call_blockdev_shutdown(node, top_disk):
1977
        logger.Error("could not shutdown block device %s on node %s" %
1978
                     (disk.iv_name, node))
1979
        if not ignore_primary or node != instance.primary_node:
1980
          result = False
1981
  return result
1982

    
1983

    
1984
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1985
  """Checks if a node has enough free memory.
1986

1987
  This function check if a given node has the needed amount of free
1988
  memory. In case the node has less memory or we cannot get the
1989
  information from the node, this function raise an OpPrereqError
1990
  exception.
1991

1992
  Args:
1993
    - cfg: a ConfigWriter instance
1994
    - node: the node name
1995
    - reason: string to use in the error message
1996
    - requested: the amount of memory in MiB
1997

1998
  """
1999
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2000
  if not nodeinfo or not isinstance(nodeinfo, dict):
2001
    raise errors.OpPrereqError("Could not contact node %s for resource"
2002
                             " information" % (node,))
2003

    
2004
  free_mem = nodeinfo[node].get('memory_free')
2005
  if not isinstance(free_mem, int):
2006
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2007
                             " was '%s'" % (node, free_mem))
2008
  if requested > free_mem:
2009
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2010
                             " needed %s MiB, available %s MiB" %
2011
                             (node, reason, requested, free_mem))
2012

    
2013

    
2014
class LUStartupInstance(LogicalUnit):
2015
  """Starts an instance.
2016

2017
  """
2018
  HPATH = "instance-start"
2019
  HTYPE = constants.HTYPE_INSTANCE
2020
  _OP_REQP = ["instance_name", "force"]
2021
  REQ_BGL = False
2022

    
2023
  def ExpandNames(self):
2024
    self._ExpandAndLockInstance()
2025
    self.needed_locks[locking.LEVEL_NODE] = []
2026
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2027

    
2028
  def DeclareLocks(self, level):
2029
    if level == locking.LEVEL_NODE:
2030
      self._LockInstancesNodes()
2031

    
2032
  def BuildHooksEnv(self):
2033
    """Build hooks env.
2034

2035
    This runs on master, primary and secondary nodes of the instance.
2036

2037
    """
2038
    env = {
2039
      "FORCE": self.op.force,
2040
      }
2041
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2042
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2043
          list(self.instance.secondary_nodes))
2044
    return env, nl, nl
2045

    
2046
  def CheckPrereq(self):
2047
    """Check prerequisites.
2048

2049
    This checks that the instance is in the cluster.
2050

2051
    """
2052
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2053
    assert self.instance is not None, \
2054
      "Cannot retrieve locked instance %s" % self.op.instance_name
2055

    
2056
    # check bridges existance
2057
    _CheckInstanceBridgesExist(instance)
2058

    
2059
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2060
                         "starting instance %s" % instance.name,
2061
                         instance.memory)
2062

    
2063
  def Exec(self, feedback_fn):
2064
    """Start the instance.
2065

2066
    """
2067
    instance = self.instance
2068
    force = self.op.force
2069
    extra_args = getattr(self.op, "extra_args", "")
2070

    
2071
    self.cfg.MarkInstanceUp(instance.name)
2072

    
2073
    node_current = instance.primary_node
2074

    
2075
    _StartInstanceDisks(self.cfg, instance, force)
2076

    
2077
    if not rpc.call_instance_start(node_current, instance, extra_args):
2078
      _ShutdownInstanceDisks(instance, self.cfg)
2079
      raise errors.OpExecError("Could not start instance")
2080

    
2081

    
2082
class LURebootInstance(LogicalUnit):
2083
  """Reboot an instance.
2084

2085
  """
2086
  HPATH = "instance-reboot"
2087
  HTYPE = constants.HTYPE_INSTANCE
2088
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2089
  REQ_BGL = False
2090

    
2091
  def ExpandNames(self):
2092
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2093
                                   constants.INSTANCE_REBOOT_HARD,
2094
                                   constants.INSTANCE_REBOOT_FULL]:
2095
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2096
                                  (constants.INSTANCE_REBOOT_SOFT,
2097
                                   constants.INSTANCE_REBOOT_HARD,
2098
                                   constants.INSTANCE_REBOOT_FULL))
2099
    self._ExpandAndLockInstance()
2100
    self.needed_locks[locking.LEVEL_NODE] = []
2101
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2102

    
2103
  def DeclareLocks(self, level):
2104
    if level == locking.LEVEL_NODE:
2105
      # FIXME: lock only primary on (not constants.INSTANCE_REBOOT_FULL)
2106
      self._LockInstancesNodes()
2107

    
2108
  def BuildHooksEnv(self):
2109
    """Build hooks env.
2110

2111
    This runs on master, primary and secondary nodes of the instance.
2112

2113
    """
2114
    env = {
2115
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2116
      }
2117
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2118
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2119
          list(self.instance.secondary_nodes))
2120
    return env, nl, nl
2121

    
2122
  def CheckPrereq(self):
2123
    """Check prerequisites.
2124

2125
    This checks that the instance is in the cluster.
2126

2127
    """
2128
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2129
    assert self.instance is not None, \
2130
      "Cannot retrieve locked instance %s" % self.op.instance_name
2131

    
2132
    # check bridges existance
2133
    _CheckInstanceBridgesExist(instance)
2134

    
2135
  def Exec(self, feedback_fn):
2136
    """Reboot the instance.
2137

2138
    """
2139
    instance = self.instance
2140
    ignore_secondaries = self.op.ignore_secondaries
2141
    reboot_type = self.op.reboot_type
2142
    extra_args = getattr(self.op, "extra_args", "")
2143

    
2144
    node_current = instance.primary_node
2145

    
2146
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2147
                       constants.INSTANCE_REBOOT_HARD]:
2148
      if not rpc.call_instance_reboot(node_current, instance,
2149
                                      reboot_type, extra_args):
2150
        raise errors.OpExecError("Could not reboot instance")
2151
    else:
2152
      if not rpc.call_instance_shutdown(node_current, instance):
2153
        raise errors.OpExecError("could not shutdown instance for full reboot")
2154
      _ShutdownInstanceDisks(instance, self.cfg)
2155
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2156
      if not rpc.call_instance_start(node_current, instance, extra_args):
2157
        _ShutdownInstanceDisks(instance, self.cfg)
2158
        raise errors.OpExecError("Could not start instance for full reboot")
2159

    
2160
    self.cfg.MarkInstanceUp(instance.name)
2161

    
2162

    
2163
class LUShutdownInstance(LogicalUnit):
2164
  """Shutdown an instance.
2165

2166
  """
2167
  HPATH = "instance-stop"
2168
  HTYPE = constants.HTYPE_INSTANCE
2169
  _OP_REQP = ["instance_name"]
2170
  REQ_BGL = False
2171

    
2172
  def ExpandNames(self):
2173
    self._ExpandAndLockInstance()
2174
    self.needed_locks[locking.LEVEL_NODE] = []
2175
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2176

    
2177
  def DeclareLocks(self, level):
2178
    if level == locking.LEVEL_NODE:
2179
      self._LockInstancesNodes()
2180

    
2181
  def BuildHooksEnv(self):
2182
    """Build hooks env.
2183

2184
    This runs on master, primary and secondary nodes of the instance.
2185

2186
    """
2187
    env = _BuildInstanceHookEnvByObject(self.instance)
2188
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2189
          list(self.instance.secondary_nodes))
2190
    return env, nl, nl
2191

    
2192
  def CheckPrereq(self):
2193
    """Check prerequisites.
2194

2195
    This checks that the instance is in the cluster.
2196

2197
    """
2198
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2199
    assert self.instance is not None, \
2200
      "Cannot retrieve locked instance %s" % self.op.instance_name
2201

    
2202
  def Exec(self, feedback_fn):
2203
    """Shutdown the instance.
2204

2205
    """
2206
    instance = self.instance
2207
    node_current = instance.primary_node
2208
    self.cfg.MarkInstanceDown(instance.name)
2209
    if not rpc.call_instance_shutdown(node_current, instance):
2210
      logger.Error("could not shutdown instance")
2211

    
2212
    _ShutdownInstanceDisks(instance, self.cfg)
2213

    
2214

    
2215
class LUReinstallInstance(LogicalUnit):
2216
  """Reinstall an instance.
2217

2218
  """
2219
  HPATH = "instance-reinstall"
2220
  HTYPE = constants.HTYPE_INSTANCE
2221
  _OP_REQP = ["instance_name"]
2222
  REQ_BGL = False
2223

    
2224
  def ExpandNames(self):
2225
    self._ExpandAndLockInstance()
2226
    self.needed_locks[locking.LEVEL_NODE] = []
2227
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2228

    
2229
  def DeclareLocks(self, level):
2230
    if level == locking.LEVEL_NODE:
2231
      self._LockInstancesNodes()
2232

    
2233
  def BuildHooksEnv(self):
2234
    """Build hooks env.
2235

2236
    This runs on master, primary and secondary nodes of the instance.
2237

2238
    """
2239
    env = _BuildInstanceHookEnvByObject(self.instance)
2240
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2241
          list(self.instance.secondary_nodes))
2242
    return env, nl, nl
2243

    
2244
  def CheckPrereq(self):
2245
    """Check prerequisites.
2246

2247
    This checks that the instance is in the cluster and is not running.
2248

2249
    """
2250
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2251
    assert instance is not None, \
2252
      "Cannot retrieve locked instance %s" % self.op.instance_name
2253

    
2254
    if instance.disk_template == constants.DT_DISKLESS:
2255
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2256
                                 self.op.instance_name)
2257
    if instance.status != "down":
2258
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2259
                                 self.op.instance_name)
2260
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2261
    if remote_info:
2262
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2263
                                 (self.op.instance_name,
2264
                                  instance.primary_node))
2265

    
2266
    self.op.os_type = getattr(self.op, "os_type", None)
2267
    if self.op.os_type is not None:
2268
      # OS verification
2269
      pnode = self.cfg.GetNodeInfo(
2270
        self.cfg.ExpandNodeName(instance.primary_node))
2271
      if pnode is None:
2272
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2273
                                   self.op.pnode)
2274
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2275
      if not os_obj:
2276
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2277
                                   " primary node"  % self.op.os_type)
2278

    
2279
    self.instance = instance
2280

    
2281
  def Exec(self, feedback_fn):
2282
    """Reinstall the instance.
2283

2284
    """
2285
    inst = self.instance
2286

    
2287
    if self.op.os_type is not None:
2288
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2289
      inst.os = self.op.os_type
2290
      self.cfg.AddInstance(inst)
2291

    
2292
    _StartInstanceDisks(self.cfg, inst, None)
2293
    try:
2294
      feedback_fn("Running the instance OS create scripts...")
2295
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2296
        raise errors.OpExecError("Could not install OS for instance %s"
2297
                                 " on node %s" %
2298
                                 (inst.name, inst.primary_node))
2299
    finally:
2300
      _ShutdownInstanceDisks(inst, self.cfg)
2301

    
2302

    
2303
class LURenameInstance(LogicalUnit):
2304
  """Rename an instance.
2305

2306
  """
2307
  HPATH = "instance-rename"
2308
  HTYPE = constants.HTYPE_INSTANCE
2309
  _OP_REQP = ["instance_name", "new_name"]
2310

    
2311
  def BuildHooksEnv(self):
2312
    """Build hooks env.
2313

2314
    This runs on master, primary and secondary nodes of the instance.
2315

2316
    """
2317
    env = _BuildInstanceHookEnvByObject(self.instance)
2318
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2319
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2320
          list(self.instance.secondary_nodes))
2321
    return env, nl, nl
2322

    
2323
  def CheckPrereq(self):
2324
    """Check prerequisites.
2325

2326
    This checks that the instance is in the cluster and is not running.
2327

2328
    """
2329
    instance = self.cfg.GetInstanceInfo(
2330
      self.cfg.ExpandInstanceName(self.op.instance_name))
2331
    if instance is None:
2332
      raise errors.OpPrereqError("Instance '%s' not known" %
2333
                                 self.op.instance_name)
2334
    if instance.status != "down":
2335
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2336
                                 self.op.instance_name)
2337
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2338
    if remote_info:
2339
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2340
                                 (self.op.instance_name,
2341
                                  instance.primary_node))
2342
    self.instance = instance
2343

    
2344
    # new name verification
2345
    name_info = utils.HostInfo(self.op.new_name)
2346

    
2347
    self.op.new_name = new_name = name_info.name
2348
    instance_list = self.cfg.GetInstanceList()
2349
    if new_name in instance_list:
2350
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2351
                                 new_name)
2352

    
2353
    if not getattr(self.op, "ignore_ip", False):
2354
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2355
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2356
                                   (name_info.ip, new_name))
2357

    
2358

    
2359
  def Exec(self, feedback_fn):
2360
    """Reinstall the instance.
2361

2362
    """
2363
    inst = self.instance
2364
    old_name = inst.name
2365

    
2366
    if inst.disk_template == constants.DT_FILE:
2367
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2368

    
2369
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2370
    # Change the instance lock. This is definitely safe while we hold the BGL
2371
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2372
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2373

    
2374
    # re-read the instance from the configuration after rename
2375
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2376

    
2377
    if inst.disk_template == constants.DT_FILE:
2378
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2379
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2380
                                                old_file_storage_dir,
2381
                                                new_file_storage_dir)
2382

    
2383
      if not result:
2384
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2385
                                 " directory '%s' to '%s' (but the instance"
2386
                                 " has been renamed in Ganeti)" % (
2387
                                 inst.primary_node, old_file_storage_dir,
2388
                                 new_file_storage_dir))
2389

    
2390
      if not result[0]:
2391
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2392
                                 " (but the instance has been renamed in"
2393
                                 " Ganeti)" % (old_file_storage_dir,
2394
                                               new_file_storage_dir))
2395

    
2396
    _StartInstanceDisks(self.cfg, inst, None)
2397
    try:
2398
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2399
                                          "sda", "sdb"):
2400
        msg = ("Could not run OS rename script for instance %s on node %s"
2401
               " (but the instance has been renamed in Ganeti)" %
2402
               (inst.name, inst.primary_node))
2403
        logger.Error(msg)
2404
    finally:
2405
      _ShutdownInstanceDisks(inst, self.cfg)
2406

    
2407

    
2408
class LURemoveInstance(LogicalUnit):
2409
  """Remove an instance.
2410

2411
  """
2412
  HPATH = "instance-remove"
2413
  HTYPE = constants.HTYPE_INSTANCE
2414
  _OP_REQP = ["instance_name", "ignore_failures"]
2415

    
2416
  def BuildHooksEnv(self):
2417
    """Build hooks env.
2418

2419
    This runs on master, primary and secondary nodes of the instance.
2420

2421
    """
2422
    env = _BuildInstanceHookEnvByObject(self.instance)
2423
    nl = [self.sstore.GetMasterNode()]
2424
    return env, nl, nl
2425

    
2426
  def CheckPrereq(self):
2427
    """Check prerequisites.
2428

2429
    This checks that the instance is in the cluster.
2430

2431
    """
2432
    instance = self.cfg.GetInstanceInfo(
2433
      self.cfg.ExpandInstanceName(self.op.instance_name))
2434
    if instance is None:
2435
      raise errors.OpPrereqError("Instance '%s' not known" %
2436
                                 self.op.instance_name)
2437
    self.instance = instance
2438

    
2439
  def Exec(self, feedback_fn):
2440
    """Remove the instance.
2441

2442
    """
2443
    instance = self.instance
2444
    logger.Info("shutting down instance %s on node %s" %
2445
                (instance.name, instance.primary_node))
2446

    
2447
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2448
      if self.op.ignore_failures:
2449
        feedback_fn("Warning: can't shutdown instance")
2450
      else:
2451
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2452
                                 (instance.name, instance.primary_node))
2453

    
2454
    logger.Info("removing block devices for instance %s" % instance.name)
2455

    
2456
    if not _RemoveDisks(instance, self.cfg):
2457
      if self.op.ignore_failures:
2458
        feedback_fn("Warning: can't remove instance's disks")
2459
      else:
2460
        raise errors.OpExecError("Can't remove instance's disks")
2461

    
2462
    logger.Info("removing instance %s out of cluster config" % instance.name)
2463

    
2464
    self.cfg.RemoveInstance(instance.name)
2465
    # Remove the new instance from the Ganeti Lock Manager
2466
    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2467

    
2468

    
2469
class LUQueryInstances(NoHooksLU):
2470
  """Logical unit for querying instances.
2471

2472
  """
2473
  _OP_REQP = ["output_fields", "names"]
2474
  REQ_BGL = False
2475

    
2476
  def ExpandNames(self):
2477
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2478
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2479
                               "admin_state", "admin_ram",
2480
                               "disk_template", "ip", "mac", "bridge",
2481
                               "sda_size", "sdb_size", "vcpus", "tags",
2482
                               "auto_balance",
2483
                               "network_port", "kernel_path", "initrd_path",
2484
                               "hvm_boot_order", "hvm_acpi", "hvm_pae",
2485
                               "hvm_cdrom_image_path", "hvm_nic_type",
2486
                               "hvm_disk_type", "vnc_bind_address"],
2487
                       dynamic=self.dynamic_fields,
2488
                       selected=self.op.output_fields)
2489

    
2490
    self.needed_locks = {}
2491
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2492
    self.share_locks[locking.LEVEL_NODE] = 1
2493

    
2494
    # TODO: we could lock instances (and nodes) only if the user asked for
2495
    # dynamic fields. For that we need atomic ways to get info for a group of
2496
    # instances from the config, though.
2497
    if not self.op.names:
2498
      self.needed_locks[locking.LEVEL_INSTANCE] = None # Acquire all
2499
    else:
2500
      self.needed_locks[locking.LEVEL_INSTANCE] = \
2501
        _GetWantedInstances(self, self.op.names)
2502

    
2503
    self.needed_locks[locking.LEVEL_NODE] = []
2504
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2505

    
2506
  def DeclareLocks(self, level):
2507
    # TODO: locking of nodes could be avoided when not querying them
2508
    if level == locking.LEVEL_NODE:
2509
      self._LockInstancesNodes()
2510

    
2511
  def CheckPrereq(self):
2512
    """Check prerequisites.
2513

2514
    """
2515
    # This of course is valid only if we locked the instances
2516
    self.wanted = self.acquired_locks[locking.LEVEL_INSTANCE]
2517

    
2518
  def Exec(self, feedback_fn):
2519
    """Computes the list of nodes and their attributes.
2520

2521
    """
2522
    instance_names = self.wanted
2523
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2524
                     in instance_names]
2525

    
2526
    # begin data gathering
2527

    
2528
    nodes = frozenset([inst.primary_node for inst in instance_list])
2529

    
2530
    bad_nodes = []
2531
    if self.dynamic_fields.intersection(self.op.output_fields):
2532
      live_data = {}
2533
      node_data = rpc.call_all_instances_info(nodes)
2534
      for name in nodes:
2535
        result = node_data[name]
2536
        if result:
2537
          live_data.update(result)
2538
        elif result == False:
2539
          bad_nodes.append(name)
2540
        # else no instance is alive
2541
    else:
2542
      live_data = dict([(name, {}) for name in instance_names])
2543

    
2544
    # end data gathering
2545

    
2546
    output = []
2547
    for instance in instance_list:
2548
      iout = []
2549
      for field in self.op.output_fields:
2550
        if field == "name":
2551
          val = instance.name
2552
        elif field == "os":
2553
          val = instance.os
2554
        elif field == "pnode":
2555
          val = instance.primary_node
2556
        elif field == "snodes":
2557
          val = list(instance.secondary_nodes)
2558
        elif field == "admin_state":
2559
          val = (instance.status != "down")
2560
        elif field == "oper_state":
2561
          if instance.primary_node in bad_nodes:
2562
            val = None
2563
          else:
2564
            val = bool(live_data.get(instance.name))
2565
        elif field == "status":
2566
          if instance.primary_node in bad_nodes:
2567
            val = "ERROR_nodedown"
2568
          else:
2569
            running = bool(live_data.get(instance.name))
2570
            if running:
2571
              if instance.status != "down":
2572
                val = "running"
2573
              else:
2574
                val = "ERROR_up"
2575
            else:
2576
              if instance.status != "down":
2577
                val = "ERROR_down"
2578
              else:
2579
                val = "ADMIN_down"
2580
        elif field == "admin_ram":
2581
          val = instance.memory
2582
        elif field == "oper_ram":
2583
          if instance.primary_node in bad_nodes:
2584
            val = None
2585
          elif instance.name in live_data:
2586
            val = live_data[instance.name].get("memory", "?")
2587
          else:
2588
            val = "-"
2589
        elif field == "disk_template":
2590
          val = instance.disk_template
2591
        elif field == "ip":
2592
          val = instance.nics[0].ip
2593
        elif field == "bridge":
2594
          val = instance.nics[0].bridge
2595
        elif field == "mac":
2596
          val = instance.nics[0].mac
2597
        elif field == "sda_size" or field == "sdb_size":
2598
          disk = instance.FindDisk(field[:3])
2599
          if disk is None:
2600
            val = None
2601
          else:
2602
            val = disk.size
2603
        elif field == "vcpus":
2604
          val = instance.vcpus
2605
        elif field == "tags":
2606
          val = list(instance.GetTags())
2607
        elif field in ("network_port", "kernel_path", "initrd_path",
2608
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2609
                       "hvm_cdrom_image_path", "hvm_nic_type",
2610
                       "hvm_disk_type", "vnc_bind_address"):
2611
          val = getattr(instance, field, None)
2612
          if val is not None:
2613
            pass
2614
          elif field in ("hvm_nic_type", "hvm_disk_type",
2615
                         "kernel_path", "initrd_path"):
2616
            val = "default"
2617
          else:
2618
            val = "-"
2619
        else:
2620
          raise errors.ParameterError(field)
2621
        iout.append(val)
2622
      output.append(iout)
2623

    
2624
    return output
2625

    
2626

    
2627
class LUFailoverInstance(LogicalUnit):
2628
  """Failover an instance.
2629

2630
  """
2631
  HPATH = "instance-failover"
2632
  HTYPE = constants.HTYPE_INSTANCE
2633
  _OP_REQP = ["instance_name", "ignore_consistency"]
2634
  REQ_BGL = False
2635

    
2636
  def ExpandNames(self):
2637
    self._ExpandAndLockInstance()
2638
    self.needed_locks[locking.LEVEL_NODE] = []
2639
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2640

    
2641
  def DeclareLocks(self, level):
2642
    if level == locking.LEVEL_NODE:
2643
      self._LockInstancesNodes()
2644

    
2645
  def BuildHooksEnv(self):
2646
    """Build hooks env.
2647

2648
    This runs on master, primary and secondary nodes of the instance.
2649

2650
    """
2651
    env = {
2652
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2653
      }
2654
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2655
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2656
    return env, nl, nl
2657

    
2658
  def CheckPrereq(self):
2659
    """Check prerequisites.
2660

2661
    This checks that the instance is in the cluster.
2662

2663
    """
2664
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2665
    assert self.instance is not None, \
2666
      "Cannot retrieve locked instance %s" % self.op.instance_name
2667

    
2668
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2669
      raise errors.OpPrereqError("Instance's disk layout is not"
2670
                                 " network mirrored, cannot failover.")
2671

    
2672
    secondary_nodes = instance.secondary_nodes
2673
    if not secondary_nodes:
2674
      raise errors.ProgrammerError("no secondary node but using "
2675
                                   "a mirrored disk template")
2676

    
2677
    target_node = secondary_nodes[0]
2678
    # check memory requirements on the secondary node
2679
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2680
                         instance.name, instance.memory)
2681

    
2682
    # check bridge existance
2683
    brlist = [nic.bridge for nic in instance.nics]
2684
    if not rpc.call_bridges_exist(target_node, brlist):
2685
      raise errors.OpPrereqError("One or more target bridges %s does not"
2686
                                 " exist on destination node '%s'" %
2687
                                 (brlist, target_node))
2688

    
2689
  def Exec(self, feedback_fn):
2690
    """Failover an instance.
2691

2692
    The failover is done by shutting it down on its present node and
2693
    starting it on the secondary.
2694

2695
    """
2696
    instance = self.instance
2697

    
2698
    source_node = instance.primary_node
2699
    target_node = instance.secondary_nodes[0]
2700

    
2701
    feedback_fn("* checking disk consistency between source and target")
2702
    for dev in instance.disks:
2703
      # for drbd, these are drbd over lvm
2704
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2705
        if instance.status == "up" and not self.op.ignore_consistency:
2706
          raise errors.OpExecError("Disk %s is degraded on target node,"
2707
                                   " aborting failover." % dev.iv_name)
2708

    
2709
    feedback_fn("* shutting down instance on source node")
2710
    logger.Info("Shutting down instance %s on node %s" %
2711
                (instance.name, source_node))
2712

    
2713
    if not rpc.call_instance_shutdown(source_node, instance):
2714
      if self.op.ignore_consistency:
2715
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2716
                     " anyway. Please make sure node %s is down"  %
2717
                     (instance.name, source_node, source_node))
2718
      else:
2719
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2720
                                 (instance.name, source_node))
2721

    
2722
    feedback_fn("* deactivating the instance's disks on source node")
2723
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2724
      raise errors.OpExecError("Can't shut down the instance's disks.")
2725

    
2726
    instance.primary_node = target_node
2727
    # distribute new instance config to the other nodes
2728
    self.cfg.Update(instance)
2729

    
2730
    # Only start the instance if it's marked as up
2731
    if instance.status == "up":
2732
      feedback_fn("* activating the instance's disks on target node")
2733
      logger.Info("Starting instance %s on node %s" %
2734
                  (instance.name, target_node))
2735

    
2736
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2737
                                               ignore_secondaries=True)
2738
      if not disks_ok:
2739
        _ShutdownInstanceDisks(instance, self.cfg)
2740
        raise errors.OpExecError("Can't activate the instance's disks")
2741

    
2742
      feedback_fn("* starting the instance on the target node")
2743
      if not rpc.call_instance_start(target_node, instance, None):
2744
        _ShutdownInstanceDisks(instance, self.cfg)
2745
        raise errors.OpExecError("Could not start instance %s on node %s." %
2746
                                 (instance.name, target_node))
2747

    
2748

    
2749
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2750
  """Create a tree of block devices on the primary node.
2751

2752
  This always creates all devices.
2753

2754
  """
2755
  if device.children:
2756
    for child in device.children:
2757
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2758
        return False
2759

    
2760
  cfg.SetDiskID(device, node)
2761
  new_id = rpc.call_blockdev_create(node, device, device.size,
2762
                                    instance.name, True, info)
2763
  if not new_id:
2764
    return False
2765
  if device.physical_id is None:
2766
    device.physical_id = new_id
2767
  return True
2768

    
2769

    
2770
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2771
  """Create a tree of block devices on a secondary node.
2772

2773
  If this device type has to be created on secondaries, create it and
2774
  all its children.
2775

2776
  If not, just recurse to children keeping the same 'force' value.
2777

2778
  """
2779
  if device.CreateOnSecondary():
2780
    force = True
2781
  if device.children:
2782
    for child in device.children:
2783
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2784
                                        child, force, info):
2785
        return False
2786

    
2787
  if not force:
2788
    return True
2789
  cfg.SetDiskID(device, node)
2790
  new_id = rpc.call_blockdev_create(node, device, device.size,
2791
                                    instance.name, False, info)
2792
  if not new_id:
2793
    return False
2794
  if device.physical_id is None:
2795
    device.physical_id = new_id
2796
  return True
2797

    
2798

    
2799
def _GenerateUniqueNames(cfg, exts):
2800
  """Generate a suitable LV name.
2801

2802
  This will generate a logical volume name for the given instance.
2803

2804
  """
2805
  results = []
2806
  for val in exts:
2807
    new_id = cfg.GenerateUniqueID()
2808
    results.append("%s%s" % (new_id, val))
2809
  return results
2810

    
2811

    
2812
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2813
  """Generate a drbd8 device complete with its children.
2814

2815
  """
2816
  port = cfg.AllocatePort()
2817
  vgname = cfg.GetVGName()
2818
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2819
                          logical_id=(vgname, names[0]))
2820
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2821
                          logical_id=(vgname, names[1]))
2822
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2823
                          logical_id = (primary, secondary, port),
2824
                          children = [dev_data, dev_meta],
2825
                          iv_name=iv_name)
2826
  return drbd_dev
2827

    
2828

    
2829
def _GenerateDiskTemplate(cfg, template_name,
2830
                          instance_name, primary_node,
2831
                          secondary_nodes, disk_sz, swap_sz,
2832
                          file_storage_dir, file_driver):
2833
  """Generate the entire disk layout for a given template type.
2834

2835
  """
2836
  #TODO: compute space requirements
2837

    
2838
  vgname = cfg.GetVGName()
2839
  if template_name == constants.DT_DISKLESS:
2840
    disks = []
2841
  elif template_name == constants.DT_PLAIN:
2842
    if len(secondary_nodes) != 0:
2843
      raise errors.ProgrammerError("Wrong template configuration")
2844

    
2845
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2846
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2847
                           logical_id=(vgname, names[0]),
2848
                           iv_name = "sda")
2849
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2850
                           logical_id=(vgname, names[1]),
2851
                           iv_name = "sdb")
2852
    disks = [sda_dev, sdb_dev]
2853
  elif template_name == constants.DT_DRBD8:
2854
    if len(secondary_nodes) != 1:
2855
      raise errors.ProgrammerError("Wrong template configuration")
2856
    remote_node = secondary_nodes[0]
2857
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2858
                                       ".sdb_data", ".sdb_meta"])
2859
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2860
                                         disk_sz, names[0:2], "sda")
2861
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2862
                                         swap_sz, names[2:4], "sdb")
2863
    disks = [drbd_sda_dev, drbd_sdb_dev]
2864
  elif template_name == constants.DT_FILE:
2865
    if len(secondary_nodes) != 0:
2866
      raise errors.ProgrammerError("Wrong template configuration")
2867

    
2868
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2869
                                iv_name="sda", logical_id=(file_driver,
2870
                                "%s/sda" % file_storage_dir))
2871
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2872
                                iv_name="sdb", logical_id=(file_driver,
2873
                                "%s/sdb" % file_storage_dir))
2874
    disks = [file_sda_dev, file_sdb_dev]
2875
  else:
2876
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2877
  return disks
2878

    
2879

    
2880
def _GetInstanceInfoText(instance):
2881
  """Compute that text that should be added to the disk's metadata.
2882

2883
  """
2884
  return "originstname+%s" % instance.name
2885

    
2886

    
2887
def _CreateDisks(cfg, instance):
2888
  """Create all disks for an instance.
2889

2890
  This abstracts away some work from AddInstance.
2891

2892
  Args:
2893
    instance: the instance object
2894

2895
  Returns:
2896
    True or False showing the success of the creation process
2897

2898
  """
2899
  info = _GetInstanceInfoText(instance)
2900

    
2901
  if instance.disk_template == constants.DT_FILE:
2902
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2903
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2904
                                              file_storage_dir)
2905

    
2906
    if not result:
2907
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2908
      return False
2909

    
2910
    if not result[0]:
2911
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2912
      return False
2913

    
2914
  for device in instance.disks:
2915
    logger.Info("creating volume %s for instance %s" %
2916
                (device.iv_name, instance.name))
2917
    #HARDCODE
2918
    for secondary_node in instance.secondary_nodes:
2919
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2920
                                        device, False, info):
2921
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2922
                     (device.iv_name, device, secondary_node))
2923
        return False
2924
    #HARDCODE
2925
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2926
                                    instance, device, info):
2927
      logger.Error("failed to create volume %s on primary!" %
2928
                   device.iv_name)
2929
      return False
2930

    
2931
  return True
2932

    
2933

    
2934
def _RemoveDisks(instance, cfg):
2935
  """Remove all disks for an instance.
2936

2937
  This abstracts away some work from `AddInstance()` and
2938
  `RemoveInstance()`. Note that in case some of the devices couldn't
2939
  be removed, the removal will continue with the other ones (compare
2940
  with `_CreateDisks()`).
2941

2942
  Args:
2943
    instance: the instance object
2944

2945
  Returns:
2946
    True or False showing the success of the removal proces
2947

2948
  """
2949
  logger.Info("removing block devices for instance %s" % instance.name)
2950

    
2951
  result = True
2952
  for device in instance.disks:
2953
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2954
      cfg.SetDiskID(disk, node)
2955
      if not rpc.call_blockdev_remove(node, disk):
2956
        logger.Error("could not remove block device %s on node %s,"
2957
                     " continuing anyway" %
2958
                     (device.iv_name, node))
2959
        result = False
2960

    
2961
  if instance.disk_template == constants.DT_FILE:
2962
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2963
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
2964
                                            file_storage_dir):
2965
      logger.Error("could not remove directory '%s'" % file_storage_dir)
2966
      result = False
2967

    
2968
  return result
2969

    
2970

    
2971
def _ComputeDiskSize(disk_template, disk_size, swap_size):
2972
  """Compute disk size requirements in the volume group
2973

2974
  This is currently hard-coded for the two-drive layout.
2975

2976
  """
2977
  # Required free disk space as a function of disk and swap space
2978
  req_size_dict = {
2979
    constants.DT_DISKLESS: None,
2980
    constants.DT_PLAIN: disk_size + swap_size,
2981
    # 256 MB are added for drbd metadata, 128MB for each drbd device
2982
    constants.DT_DRBD8: disk_size + swap_size + 256,
2983
    constants.DT_FILE: None,
2984
  }
2985

    
2986
  if disk_template not in req_size_dict:
2987
    raise errors.ProgrammerError("Disk template '%s' size requirement"
2988
                                 " is unknown" %  disk_template)
2989

    
2990
  return req_size_dict[disk_template]
2991

    
2992

    
2993
class LUCreateInstance(LogicalUnit):
2994
  """Create an instance.
2995

2996
  """
2997
  HPATH = "instance-add"
2998
  HTYPE = constants.HTYPE_INSTANCE
2999
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3000
              "disk_template", "swap_size", "mode", "start", "vcpus",
3001
              "wait_for_sync", "ip_check", "mac"]
3002

    
3003
  def _RunAllocator(self):
3004
    """Run the allocator based on input opcode.
3005

3006
    """
3007
    disks = [{"size": self.op.disk_size, "mode": "w"},
3008
             {"size": self.op.swap_size, "mode": "w"}]
3009
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3010
             "bridge": self.op.bridge}]
3011
    ial = IAllocator(self.cfg, self.sstore,
3012
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3013
                     name=self.op.instance_name,
3014
                     disk_template=self.op.disk_template,
3015
                     tags=[],
3016
                     os=self.op.os_type,
3017
                     vcpus=self.op.vcpus,
3018
                     mem_size=self.op.mem_size,
3019
                     disks=disks,
3020
                     nics=nics,
3021
                     )
3022

    
3023
    ial.Run(self.op.iallocator)
3024

    
3025
    if not ial.success:
3026
      raise errors.OpPrereqError("Can't compute nodes using"
3027
                                 " iallocator '%s': %s" % (self.op.iallocator,
3028
                                                           ial.info))
3029
    if len(ial.nodes) != ial.required_nodes:
3030
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3031
                                 " of nodes (%s), required %s" %
3032
                                 (len(ial.nodes), ial.required_nodes))
3033
    self.op.pnode = ial.nodes[0]
3034
    logger.ToStdout("Selected nodes for the instance: %s" %
3035
                    (", ".join(ial.nodes),))
3036
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3037
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3038
    if ial.required_nodes == 2:
3039
      self.op.snode = ial.nodes[1]
3040

    
3041
  def BuildHooksEnv(self):
3042
    """Build hooks env.
3043

3044
    This runs on master, primary and secondary nodes of the instance.
3045

3046
    """
3047
    env = {
3048
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3049
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3050
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3051
      "INSTANCE_ADD_MODE": self.op.mode,
3052
      }
3053
    if self.op.mode == constants.INSTANCE_IMPORT:
3054
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3055
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3056
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3057

    
3058
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3059
      primary_node=self.op.pnode,
3060
      secondary_nodes=self.secondaries,
3061
      status=self.instance_status,
3062
      os_type=self.op.os_type,
3063
      memory=self.op.mem_size,
3064
      vcpus=self.op.vcpus,
3065
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3066
    ))
3067

    
3068
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3069
          self.secondaries)
3070
    return env, nl, nl
3071

    
3072

    
3073
  def CheckPrereq(self):
3074
    """Check prerequisites.
3075

3076
    """
3077
    # set optional parameters to none if they don't exist
3078
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3079
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3080
                 "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]:
3081
      if not hasattr(self.op, attr):
3082
        setattr(self.op, attr, None)
3083

    
3084
    if self.op.mode not in (constants.INSTANCE_CREATE,
3085
                            constants.INSTANCE_IMPORT):
3086
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3087
                                 self.op.mode)
3088

    
3089
    if (not self.cfg.GetVGName() and
3090
        self.op.disk_template not in constants.DTS_NOT_LVM):
3091
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3092
                                 " instances")
3093

    
3094
    if self.op.mode == constants.INSTANCE_IMPORT:
3095
      src_node = getattr(self.op, "src_node", None)
3096
      src_path = getattr(self.op, "src_path", None)
3097
      if src_node is None or src_path is None:
3098
        raise errors.OpPrereqError("Importing an instance requires source"
3099
                                   " node and path options")
3100
      src_node_full = self.cfg.ExpandNodeName(src_node)
3101
      if src_node_full is None:
3102
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3103
      self.op.src_node = src_node = src_node_full
3104

    
3105
      if not os.path.isabs(src_path):
3106
        raise errors.OpPrereqError("The source path must be absolute")
3107

    
3108
      export_info = rpc.call_export_info(src_node, src_path)
3109

    
3110
      if not export_info:
3111
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3112

    
3113
      if not export_info.has_section(constants.INISECT_EXP):
3114
        raise errors.ProgrammerError("Corrupted export config")
3115

    
3116
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3117
      if (int(ei_version) != constants.EXPORT_VERSION):
3118
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3119
                                   (ei_version, constants.EXPORT_VERSION))
3120

    
3121
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3122
        raise errors.OpPrereqError("Can't import instance with more than"
3123
                                   " one data disk")
3124

    
3125
      # FIXME: are the old os-es, disk sizes, etc. useful?
3126
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3127
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3128
                                                         'disk0_dump'))
3129
      self.src_image = diskimage
3130
    else: # INSTANCE_CREATE
3131
      if getattr(self.op, "os_type", None) is None:
3132
        raise errors.OpPrereqError("No guest OS specified")
3133

    
3134
    #### instance parameters check
3135

    
3136
    # disk template and mirror node verification
3137
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3138
      raise errors.OpPrereqError("Invalid disk template name")
3139

    
3140
    # instance name verification
3141
    hostname1 = utils.HostInfo(self.op.instance_name)
3142

    
3143
    self.op.instance_name = instance_name = hostname1.name
3144
    instance_list = self.cfg.GetInstanceList()
3145
    if instance_name in instance_list:
3146
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3147
                                 instance_name)
3148

    
3149
    # ip validity checks
3150
    ip = getattr(self.op, "ip", None)
3151
    if ip is None or ip.lower() == "none":
3152
      inst_ip = None
3153
    elif ip.lower() == "auto":
3154
      inst_ip = hostname1.ip
3155
    else:
3156
      if not utils.IsValidIP(ip):
3157
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3158
                                   " like a valid IP" % ip)
3159
      inst_ip = ip
3160
    self.inst_ip = self.op.ip = inst_ip
3161

    
3162
    if self.op.start and not self.op.ip_check:
3163
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3164
                                 " adding an instance in start mode")
3165

    
3166
    if self.op.ip_check:
3167
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3168
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3169
                                   (hostname1.ip, instance_name))
3170

    
3171
    # MAC address verification
3172
    if self.op.mac != "auto":
3173
      if not utils.IsValidMac(self.op.mac.lower()):
3174
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3175
                                   self.op.mac)
3176

    
3177
    # bridge verification
3178
    bridge = getattr(self.op, "bridge", None)
3179
    if bridge is None:
3180
      self.op.bridge = self.cfg.GetDefBridge()
3181
    else:
3182
      self.op.bridge = bridge
3183

    
3184
    # boot order verification
3185
    if self.op.hvm_boot_order is not None:
3186
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3187
        raise errors.OpPrereqError("invalid boot order specified,"
3188
                                   " must be one or more of [acdn]")
3189
    # file storage checks
3190
    if (self.op.file_driver and
3191
        not self.op.file_driver in constants.FILE_DRIVER):
3192
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3193
                                 self.op.file_driver)
3194

    
3195
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3196
      raise errors.OpPrereqError("File storage directory not a relative"
3197
                                 " path")
3198
    #### allocator run
3199

    
3200
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3201
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3202
                                 " node must be given")
3203

    
3204
    if self.op.iallocator is not None:
3205
      self._RunAllocator()
3206

    
3207
    #### node related checks
3208

    
3209
    # check primary node
3210
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3211
    if pnode is None:
3212
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3213
                                 self.op.pnode)
3214
    self.op.pnode = pnode.name
3215
    self.pnode = pnode
3216
    self.secondaries = []
3217

    
3218
    # mirror node verification
3219
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3220
      if getattr(self.op, "snode", None) is None:
3221
        raise errors.OpPrereqError("The networked disk templates need"
3222
                                   " a mirror node")
3223

    
3224
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3225
      if snode_name is None:
3226
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3227
                                   self.op.snode)
3228
      elif snode_name == pnode.name:
3229
        raise errors.OpPrereqError("The secondary node cannot be"
3230
                                   " the primary node.")
3231
      self.secondaries.append(snode_name)
3232

    
3233
    req_size = _ComputeDiskSize(self.op.disk_template,
3234
                                self.op.disk_size, self.op.swap_size)
3235

    
3236
    # Check lv size requirements
3237
    if req_size is not None:
3238
      nodenames = [pnode.name] + self.secondaries
3239
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3240
      for node in nodenames:
3241
        info = nodeinfo.get(node, None)
3242
        if not info:
3243
          raise errors.OpPrereqError("Cannot get current information"
3244
                                     " from node '%s'" % node)
3245
        vg_free = info.get('vg_free', None)
3246
        if not isinstance(vg_free, int):
3247
          raise errors.OpPrereqError("Can't compute free disk space on"
3248
                                     " node %s" % node)
3249
        if req_size > info['vg_free']:
3250
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3251
                                     " %d MB available, %d MB required" %
3252
                                     (node, info['vg_free'], req_size))
3253

    
3254
    # os verification
3255
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3256
    if not os_obj:
3257
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3258
                                 " primary node"  % self.op.os_type)
3259

    
3260
    if self.op.kernel_path == constants.VALUE_NONE:
3261
      raise errors.OpPrereqError("Can't set instance kernel to none")
3262

    
3263

    
3264
    # bridge check on primary node
3265
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3266
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3267
                                 " destination node '%s'" %
3268
                                 (self.op.bridge, pnode.name))
3269

    
3270
    # memory check on primary node
3271
    if self.op.start:
3272
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3273
                           "creating instance %s" % self.op.instance_name,
3274
                           self.op.mem_size)
3275

    
3276
    # hvm_cdrom_image_path verification
3277
    if self.op.hvm_cdrom_image_path is not None:
3278
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3279
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3280
                                   " be an absolute path or None, not %s" %
3281
                                   self.op.hvm_cdrom_image_path)
3282
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3283
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3284
                                   " regular file or a symlink pointing to"
3285
                                   " an existing regular file, not %s" %
3286
                                   self.op.hvm_cdrom_image_path)
3287

    
3288
    # vnc_bind_address verification
3289
    if self.op.vnc_bind_address is not None:
3290
      if not utils.IsValidIP(self.op.vnc_bind_address):
3291
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3292
                                   " like a valid IP address" %
3293
                                   self.op.vnc_bind_address)
3294

    
3295
    # Xen HVM device type checks
3296
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3297
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3298
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3299
                                   " hypervisor" % self.op.hvm_nic_type)
3300
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3301
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3302
                                   " hypervisor" % self.op.hvm_disk_type)
3303

    
3304
    if self.op.start:
3305
      self.instance_status = 'up'
3306
    else:
3307
      self.instance_status = 'down'
3308

    
3309
  def Exec(self, feedback_fn):
3310
    """Create and add the instance to the cluster.
3311

3312
    """
3313
    instance = self.op.instance_name
3314
    pnode_name = self.pnode.name
3315

    
3316
    if self.op.mac == "auto":
3317
      mac_address = self.cfg.GenerateMAC()
3318
    else:
3319
      mac_address = self.op.mac
3320

    
3321
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3322
    if self.inst_ip is not None:
3323
      nic.ip = self.inst_ip
3324

    
3325
    ht_kind = self.sstore.GetHypervisorType()
3326
    if ht_kind in constants.HTS_REQ_PORT:
3327
      network_port = self.cfg.AllocatePort()
3328
    else:
3329
      network_port = None
3330

    
3331
    if self.op.vnc_bind_address is None:
3332
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3333

    
3334
    # this is needed because os.path.join does not accept None arguments
3335
    if self.op.file_storage_dir is None:
3336
      string_file_storage_dir = ""
3337
    else:
3338
      string_file_storage_dir = self.op.file_storage_dir
3339

    
3340
    # build the full file storage dir path
3341
    file_storage_dir = os.path.normpath(os.path.join(
3342
                                        self.sstore.GetFileStorageDir(),
3343
                                        string_file_storage_dir, instance))
3344

    
3345

    
3346
    disks = _GenerateDiskTemplate(self.cfg,
3347
                                  self.op.disk_template,
3348
                                  instance, pnode_name,
3349
                                  self.secondaries, self.op.disk_size,
3350
                                  self.op.swap_size,
3351
                                  file_storage_dir,
3352
                                  self.op.file_driver)
3353

    
3354
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3355
                            primary_node=pnode_name,
3356
                            memory=self.op.mem_size,
3357
                            vcpus=self.op.vcpus,
3358
                            nics=[nic], disks=disks,
3359
                            disk_template=self.op.disk_template,
3360
                            status=self.instance_status,
3361
                            network_port=network_port,
3362
                            kernel_path=self.op.kernel_path,
3363
                            initrd_path=self.op.initrd_path,
3364
                            hvm_boot_order=self.op.hvm_boot_order,
3365
                            hvm_acpi=self.op.hvm_acpi,
3366
                            hvm_pae=self.op.hvm_pae,
3367
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3368
                            vnc_bind_address=self.op.vnc_bind_address,
3369
                            hvm_nic_type=self.op.hvm_nic_type,
3370
                            hvm_disk_type=self.op.hvm_disk_type,
3371
                            )
3372

    
3373
    feedback_fn("* creating instance disks...")
3374
    if not _CreateDisks(self.cfg, iobj):
3375
      _RemoveDisks(iobj, self.cfg)
3376
      raise errors.OpExecError("Device creation failed, reverting...")
3377

    
3378
    feedback_fn("adding instance %s to cluster config" % instance)
3379

    
3380
    self.cfg.AddInstance(iobj)
3381
    # Add the new instance to the Ganeti Lock Manager
3382
    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3383

    
3384
    if self.op.wait_for_sync:
3385
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3386
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3387
      # make sure the disks are not degraded (still sync-ing is ok)
3388
      time.sleep(15)
3389
      feedback_fn("* checking mirrors status")
3390
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3391
    else:
3392
      disk_abort = False
3393

    
3394
    if disk_abort:
3395
      _RemoveDisks(iobj, self.cfg)
3396
      self.cfg.RemoveInstance(iobj.name)
3397
      # Remove the new instance from the Ganeti Lock Manager
3398
      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3399
      raise errors.OpExecError("There are some degraded disks for"
3400
                               " this instance")
3401

    
3402
    feedback_fn("creating os for instance %s on node %s" %
3403
                (instance, pnode_name))
3404

    
3405
    if iobj.disk_template != constants.DT_DISKLESS:
3406
      if self.op.mode == constants.INSTANCE_CREATE:
3407
        feedback_fn("* running the instance OS create scripts...")
3408
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3409
          raise errors.OpExecError("could not add os for instance %s"
3410
                                   " on node %s" %
3411
                                   (instance, pnode_name))
3412

    
3413
      elif self.op.mode == constants.INSTANCE_IMPORT:
3414
        feedback_fn("* running the instance OS import scripts...")
3415
        src_node = self.op.src_node
3416
        src_image = self.src_image
3417
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3418
                                                src_node, src_image):
3419
          raise errors.OpExecError("Could not import os for instance"
3420
                                   " %s on node %s" %
3421
                                   (instance, pnode_name))
3422
      else:
3423
        # also checked in the prereq part
3424
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3425
                                     % self.op.mode)
3426

    
3427
    if self.op.start:
3428
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3429
      feedback_fn("* starting instance...")
3430
      if not rpc.call_instance_start(pnode_name, iobj, None):
3431
        raise errors.OpExecError("Could not start instance")
3432

    
3433

    
3434
class LUConnectConsole(NoHooksLU):
3435
  """Connect to an instance's console.
3436

3437
  This is somewhat special in that it returns the command line that
3438
  you need to run on the master node in order to connect to the
3439
  console.
3440

3441
  """
3442
  _OP_REQP = ["instance_name"]
3443
  REQ_BGL = False
3444

    
3445
  def ExpandNames(self):
3446
    self._ExpandAndLockInstance()
3447

    
3448
  def CheckPrereq(self):
3449
    """Check prerequisites.
3450

3451
    This checks that the instance is in the cluster.
3452

3453
    """
3454
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3455
    assert self.instance is not None, \
3456
      "Cannot retrieve locked instance %s" % self.op.instance_name
3457

    
3458
  def Exec(self, feedback_fn):
3459
    """Connect to the console of an instance
3460

3461
    """
3462
    instance = self.instance
3463
    node = instance.primary_node
3464

    
3465
    node_insts = rpc.call_instance_list([node])[node]
3466
    if node_insts is False:
3467
      raise errors.OpExecError("Can't connect to node %s." % node)
3468

    
3469
    if instance.name not in node_insts:
3470
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3471

    
3472
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3473

    
3474
    hyper = hypervisor.GetHypervisor()
3475
    console_cmd = hyper.GetShellCommandForConsole(instance)
3476

    
3477
    # build ssh cmdline
3478
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3479

    
3480

    
3481
class LUReplaceDisks(LogicalUnit):
3482
  """Replace the disks of an instance.
3483

3484
  """
3485
  HPATH = "mirrors-replace"
3486
  HTYPE = constants.HTYPE_INSTANCE
3487
  _OP_REQP = ["instance_name", "mode", "disks"]
3488

    
3489
  def _RunAllocator(self):
3490
    """Compute a new secondary node using an IAllocator.
3491

3492
    """
3493
    ial = IAllocator(self.cfg, self.sstore,
3494
                     mode=constants.IALLOCATOR_MODE_RELOC,
3495
                     name=self.op.instance_name,
3496
                     relocate_from=[self.sec_node])
3497

    
3498
    ial.Run(self.op.iallocator)
3499

    
3500
    if not ial.success:
3501
      raise errors.OpPrereqError("Can't compute nodes using"
3502
                                 " iallocator '%s': %s" % (self.op.iallocator,
3503
                                                           ial.info))
3504
    if len(ial.nodes) != ial.required_nodes:
3505
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3506
                                 " of nodes (%s), required %s" %
3507
                                 (len(ial.nodes), ial.required_nodes))
3508
    self.op.remote_node = ial.nodes[0]
3509
    logger.ToStdout("Selected new secondary for the instance: %s" %
3510
                    self.op.remote_node)
3511

    
3512
  def BuildHooksEnv(self):
3513
    """Build hooks env.
3514

3515
    This runs on the master, the primary and all the secondaries.
3516

3517
    """
3518
    env = {
3519
      "MODE": self.op.mode,
3520
      "NEW_SECONDARY": self.op.remote_node,
3521
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3522
      }
3523
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3524
    nl = [
3525
      self.sstore.GetMasterNode(),
3526
      self.instance.primary_node,
3527
      ]
3528
    if self.op.remote_node is not None:
3529
      nl.append(self.op.remote_node)
3530
    return env, nl, nl
3531

    
3532
  def CheckPrereq(self):
3533
    """Check prerequisites.
3534

3535
    This checks that the instance is in the cluster.
3536

3537
    """
3538
    if not hasattr(self.op, "remote_node"):
3539
      self.op.remote_node = None
3540

    
3541
    instance = self.cfg.GetInstanceInfo(
3542
      self.cfg.ExpandInstanceName(self.op.instance_name))
3543
    if instance is None:
3544
      raise errors.OpPrereqError("Instance '%s' not known" %
3545
                                 self.op.instance_name)
3546
    self.instance = instance
3547
    self.op.instance_name = instance.name
3548

    
3549
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3550
      raise errors.OpPrereqError("Instance's disk layout is not"
3551
                                 " network mirrored.")
3552

    
3553
    if len(instance.secondary_nodes) != 1:
3554
      raise errors.OpPrereqError("The instance has a strange layout,"
3555
                                 " expected one secondary but found %d" %
3556
                                 len(instance.secondary_nodes))
3557

    
3558
    self.sec_node = instance.secondary_nodes[0]
3559

    
3560
    ia_name = getattr(self.op, "iallocator", None)
3561
    if ia_name is not None:
3562
      if self.op.remote_node is not None:
3563
        raise errors.OpPrereqError("Give either the iallocator or the new"
3564
                                   " secondary, not both")
3565
      self.op.remote_node = self._RunAllocator()
3566

    
3567
    remote_node = self.op.remote_node
3568
    if remote_node is not None:
3569
      remote_node = self.cfg.ExpandNodeName(remote_node)
3570
      if remote_node is None:
3571
        raise errors.OpPrereqError("Node '%s' not known" %
3572
                                   self.op.remote_node)
3573
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3574
    else:
3575
      self.remote_node_info = None
3576
    if remote_node == instance.primary_node:
3577
      raise errors.OpPrereqError("The specified node is the primary node of"
3578
                                 " the instance.")
3579
    elif remote_node == self.sec_node:
3580
      if self.op.mode == constants.REPLACE_DISK_SEC:
3581
        # this is for DRBD8, where we can't execute the same mode of
3582
        # replacement as for drbd7 (no different port allocated)
3583
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3584
                                   " replacement")
3585
    if instance.disk_template == constants.DT_DRBD8:
3586
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3587
          remote_node is not None):
3588
        # switch to replace secondary mode
3589
        self.op.mode = constants.REPLACE_DISK_SEC
3590

    
3591
      if self.op.mode == constants.REPLACE_DISK_ALL:
3592
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3593
                                   " secondary disk replacement, not"
3594
                                   " both at once")
3595
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3596
        if remote_node is not None:
3597
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3598
                                     " the secondary while doing a primary"
3599
                                     " node disk replacement")
3600
        self.tgt_node = instance.primary_node
3601
        self.oth_node = instance.secondary_nodes[0]
3602
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3603
        self.new_node = remote_node # this can be None, in which case
3604
                                    # we don't change the secondary
3605
        self.tgt_node = instance.secondary_nodes[0]
3606
        self.oth_node = instance.primary_node
3607
      else:
3608
        raise errors.ProgrammerError("Unhandled disk replace mode")
3609

    
3610
    for name in self.op.disks:
3611
      if instance.FindDisk(name) is None:
3612
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3613
                                   (name, instance.name))
3614
    self.op.remote_node = remote_node
3615

    
3616
  def _ExecD8DiskOnly(self, feedback_fn):
3617
    """Replace a disk on the primary or secondary for dbrd8.
3618

3619
    The algorithm for replace is quite complicated:
3620
      - for each disk to be replaced:
3621
        - create new LVs on the target node with unique names
3622
        - detach old LVs from the drbd device
3623
        - rename old LVs to name_replaced.<time_t>
3624
        - rename new LVs to old LVs
3625
        - attach the new LVs (with the old names now) to the drbd device
3626
      - wait for sync across all devices
3627
      - for each modified disk:
3628
        - remove old LVs (which have the name name_replaces.<time_t>)
3629

3630
    Failures are not very well handled.
3631

3632
    """
3633
    steps_total = 6
3634
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3635
    instance = self.instance
3636
    iv_names = {}
3637
    vgname = self.cfg.GetVGName()
3638
    # start of work
3639
    cfg = self.cfg
3640
    tgt_node = self.tgt_node
3641
    oth_node = self.oth_node
3642

    
3643
    # Step: check device activation
3644
    self.proc.LogStep(1, steps_total, "check device existence")
3645
    info("checking volume groups")
3646
    my_vg = cfg.GetVGName()
3647
    results = rpc.call_vg_list([oth_node, tgt_node])
3648
    if not results:
3649
      raise errors.OpExecError("Can't list volume groups on the nodes")
3650
    for node in oth_node, tgt_node:
3651
      res = results.get(node, False)
3652
      if not res or my_vg not in res:
3653
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3654
                                 (my_vg, node))
3655
    for dev in instance.disks:
3656
      if not dev.iv_name in self.op.disks:
3657
        continue
3658
      for node in tgt_node, oth_node:
3659
        info("checking %s on %s" % (dev.iv_name, node))
3660
        cfg.SetDiskID(dev, node)
3661
        if not rpc.call_blockdev_find(node, dev):
3662
          raise errors.OpExecError("Can't find device %s on node %s" %
3663
                                   (dev.iv_name, node))
3664

    
3665
    # Step: check other node consistency
3666
    self.proc.LogStep(2, steps_total, "check peer consistency")
3667
    for dev in instance.disks:
3668
      if not dev.iv_name in self.op.disks:
3669
        continue
3670
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3671
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3672
                                   oth_node==instance.primary_node):
3673
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3674
                                 " to replace disks on this node (%s)" %
3675
                                 (oth_node, tgt_node))
3676

    
3677
    # Step: create new storage
3678
    self.proc.LogStep(3, steps_total, "allocate new storage")
3679
    for dev in instance.disks:
3680
      if not dev.iv_name in self.op.disks:
3681
        continue
3682
      size = dev.size
3683
      cfg.SetDiskID(dev, tgt_node)
3684
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3685
      names = _GenerateUniqueNames(cfg, lv_names)
3686
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3687
                             logical_id=(vgname, names[0]))
3688
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3689
                             logical_id=(vgname, names[1]))
3690
      new_lvs = [lv_data, lv_meta]
3691
      old_lvs = dev.children
3692
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3693
      info("creating new local storage on %s for %s" %
3694
           (tgt_node, dev.iv_name))
3695
      # since we *always* want to create this LV, we use the
3696
      # _Create...OnPrimary (which forces the creation), even if we
3697
      # are talking about the secondary node
3698
      for new_lv in new_lvs:
3699
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3700
                                        _GetInstanceInfoText(instance)):
3701
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3702
                                   " node '%s'" %
3703
                                   (new_lv.logical_id[1], tgt_node))
3704

    
3705
    # Step: for each lv, detach+rename*2+attach
3706
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3707
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3708
      info("detaching %s drbd from local storage" % dev.iv_name)
3709
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3710
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3711
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3712
      #dev.children = []
3713
      #cfg.Update(instance)
3714

    
3715
      # ok, we created the new LVs, so now we know we have the needed
3716
      # storage; as such, we proceed on the target node to rename
3717
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3718
      # using the assumption that logical_id == physical_id (which in
3719
      # turn is the unique_id on that node)
3720

    
3721
      # FIXME(iustin): use a better name for the replaced LVs
3722
      temp_suffix = int(time.time())
3723
      ren_fn = lambda d, suff: (d.physical_id[0],
3724
                                d.physical_id[1] + "_replaced-%s" % suff)
3725
      # build the rename list based on what LVs exist on the node
3726
      rlist = []
3727
      for to_ren in old_lvs:
3728
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3729
        if find_res is not None: # device exists
3730
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3731

    
3732
      info("renaming the old LVs on the target node")
3733
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3734
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3735
      # now we rename the new LVs to the old LVs
3736
      info("renaming the new LVs on the target node")
3737
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3738
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3739
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3740

    
3741
      for old, new in zip(old_lvs, new_lvs):
3742
        new.logical_id = old.logical_id
3743
        cfg.SetDiskID(new, tgt_node)
3744

    
3745
      for disk in old_lvs:
3746
        disk.logical_id = ren_fn(disk, temp_suffix)
3747
        cfg.SetDiskID(disk, tgt_node)
3748

    
3749
      # now that the new lvs have the old name, we can add them to the device
3750
      info("adding new mirror component on %s" % tgt_node)
3751
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3752
        for new_lv in new_lvs:
3753
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3754
            warning("Can't rollback device %s", hint="manually cleanup unused"
3755
                    " logical volumes")
3756
        raise errors.OpExecError("Can't add local storage to drbd")
3757

    
3758
      dev.children = new_lvs
3759
      cfg.Update(instance)
3760

    
3761
    # Step: wait for sync
3762

    
3763
    # this can fail as the old devices are degraded and _WaitForSync
3764
    # does a combined result over all disks, so we don't check its
3765
    # return value
3766
    self.proc.LogStep(5, steps_total, "sync devices")
3767
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3768

    
3769
    # so check manually all the devices
3770
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3771
      cfg.SetDiskID(dev, instance.primary_node)
3772
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3773
      if is_degr:
3774
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3775

    
3776
    # Step: remove old storage
3777
    self.proc.LogStep(6, steps_total, "removing old storage")
3778
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3779
      info("remove logical volumes for %s" % name)
3780
      for lv in old_lvs:
3781
        cfg.SetDiskID(lv, tgt_node)
3782
        if not rpc.call_blockdev_remove(tgt_node, lv):
3783
          warning("Can't remove old LV", hint="manually remove unused LVs")
3784
          continue
3785

    
3786
  def _ExecD8Secondary(self, feedback_fn):
3787
    """Replace the secondary node for drbd8.
3788

3789
    The algorithm for replace is quite complicated:
3790
      - for all disks of the instance:
3791
        - create new LVs on the new node with same names
3792
        - shutdown the drbd device on the old secondary
3793
        - disconnect the drbd network on the primary
3794
        - create the drbd device on the new secondary
3795
        - network attach the drbd on the primary, using an artifice:
3796
          the drbd code for Attach() will connect to the network if it
3797
          finds a device which is connected to the good local disks but
3798
          not network enabled
3799
      - wait for sync across all devices
3800
      - remove all disks from the old secondary
3801

3802
    Failures are not very well handled.
3803

3804
    """
3805
    steps_total = 6
3806
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3807
    instance = self.instance
3808
    iv_names = {}
3809
    vgname = self.cfg.GetVGName()
3810
    # start of work
3811
    cfg = self.cfg
3812
    old_node = self.tgt_node
3813
    new_node = self.new_node
3814
    pri_node = instance.primary_node
3815

    
3816
    # Step: check device activation
3817
    self.proc.LogStep(1, steps_total, "check device existence")
3818
    info("checking volume groups")
3819
    my_vg = cfg.GetVGName()
3820
    results = rpc.call_vg_list([pri_node, new_node])
3821
    if not results:
3822
      raise errors.OpExecError("Can't list volume groups on the nodes")
3823
    for node in pri_node, new_node:
3824
      res = results.get(node, False)
3825
      if not res or my_vg not in res:
3826
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3827
                                 (my_vg, node))
3828
    for dev in instance.disks:
3829
      if not dev.iv_name in self.op.disks:
3830
        continue
3831
      info("checking %s on %s" % (dev.iv_name, pri_node))
3832
      cfg.SetDiskID(dev, pri_node)
3833
      if not rpc.call_blockdev_find(pri_node, dev):
3834
        raise errors.OpExecError("Can't find device %s on node %s" %
3835
                                 (dev.iv_name, pri_node))
3836

    
3837
    # Step: check other node consistency
3838
    self.proc.LogStep(2, steps_total, "check peer consistency")
3839
    for dev in instance.disks:
3840
      if not dev.iv_name in self.op.disks:
3841
        continue
3842
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3843
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3844
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3845
                                 " unsafe to replace the secondary" %
3846
                                 pri_node)
3847

    
3848
    # Step: create new storage
3849
    self.proc.LogStep(3, steps_total, "allocate new storage")
3850
    for dev in instance.disks:
3851
      size = dev.size
3852
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3853
      # since we *always* want to create this LV, we use the
3854
      # _Create...OnPrimary (which forces the creation), even if we
3855
      # are talking about the secondary node
3856
      for new_lv in dev.children:
3857
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3858
                                        _GetInstanceInfoText(instance)):
3859
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3860
                                   " node '%s'" %
3861
                                   (new_lv.logical_id[1], new_node))
3862

    
3863
      iv_names[dev.iv_name] = (dev, dev.children)
3864

    
3865
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3866
    for dev in instance.disks:
3867
      size = dev.size
3868
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3869
      # create new devices on new_node
3870
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3871
                              logical_id=(pri_node, new_node,
3872
                                          dev.logical_id[2]),
3873
                              children=dev.children)
3874
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3875
                                        new_drbd, False,
3876
                                      _GetInstanceInfoText(instance)):
3877
        raise errors.OpExecError("Failed to create new DRBD on"
3878
                                 " node '%s'" % new_node)
3879

    
3880
    for dev in instance.disks:
3881
      # we have new devices, shutdown the drbd on the old secondary
3882
      info("shutting down drbd for %s on old node" % dev.iv_name)
3883
      cfg.SetDiskID(dev, old_node)
3884
      if not rpc.call_blockdev_shutdown(old_node, dev):
3885
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3886
                hint="Please cleanup this device manually as soon as possible")
3887

    
3888
    info("detaching primary drbds from the network (=> standalone)")
3889
    done = 0
3890
    for dev in instance.disks:
3891
      cfg.SetDiskID(dev, pri_node)
3892
      # set the physical (unique in bdev terms) id to None, meaning
3893
      # detach from network
3894
      dev.physical_id = (None,) * len(dev.physical_id)
3895
      # and 'find' the device, which will 'fix' it to match the
3896
      # standalone state
3897
      if rpc.call_blockdev_find(pri_node, dev):
3898
        done += 1
3899
      else:
3900
        warning("Failed to detach drbd %s from network, unusual case" %
3901
                dev.iv_name)
3902

    
3903
    if not done:
3904
      # no detaches succeeded (very unlikely)
3905
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3906

    
3907
    # if we managed to detach at least one, we update all the disks of
3908
    # the instance to point to the new secondary
3909
    info("updating instance configuration")
3910
    for dev in instance.disks:
3911
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3912
      cfg.SetDiskID(dev, pri_node)
3913
    cfg.Update(instance)
3914

    
3915
    # and now perform the drbd attach
3916
    info("attaching primary drbds to new secondary (standalone => connected)")
3917
    failures = []
3918
    for dev in instance.disks:
3919
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3920
      # since the attach is smart, it's enough to 'find' the device,
3921
      # it will automatically activate the network, if the physical_id
3922
      # is correct
3923
      cfg.SetDiskID(dev, pri_node)
3924
      if not rpc.call_blockdev_find(pri_node, dev):
3925
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3926
                "please do a gnt-instance info to see the status of disks")
3927

    
3928
    # this can fail as the old devices are degraded and _WaitForSync
3929
    # does a combined result over all disks, so we don't check its
3930
    # return value
3931
    self.proc.LogStep(5, steps_total, "sync devices")
3932
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3933

    
3934
    # so check manually all the devices
3935
    for name, (dev, old_lvs) in iv_names.iteritems():
3936
      cfg.SetDiskID(dev, pri_node)
3937
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3938
      if is_degr:
3939
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3940

    
3941
    self.proc.LogStep(6, steps_total, "removing old storage")
3942
    for name, (dev, old_lvs) in iv_names.iteritems():
3943
      info("remove logical volumes for %s" % name)
3944
      for lv in old_lvs:
3945
        cfg.SetDiskID(lv, old_node)
3946
        if not rpc.call_blockdev_remove(old_node, lv):
3947
          warning("Can't remove LV on old secondary",
3948
                  hint="Cleanup stale volumes by hand")
3949

    
3950
  def Exec(self, feedback_fn):
3951
    """Execute disk replacement.
3952

3953
    This dispatches the disk replacement to the appropriate handler.
3954

3955
    """
3956
    instance = self.instance
3957

    
3958
    # Activate the instance disks if we're replacing them on a down instance
3959
    if instance.status == "down":
3960
      op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3961
      self.proc.ChainOpCode(op)
3962

    
3963
    if instance.disk_template == constants.DT_DRBD8:
3964
      if self.op.remote_node is None:
3965
        fn = self._ExecD8DiskOnly
3966
      else: