Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 827f753e

History | View | Annotate | Download (167.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement CheckPrereq which also fills in the opcode instance
53
      with all the fields (even if as None)
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_MASTER: the LU needs to run on the master node
59
        REQ_WSSTORE: the LU needs a writable SimpleStore
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_WSSTORE = False
69

    
70
  def __init__(self, processor, op, cfg, sstore):
71
    """Constructor for LogicalUnit.
72

73
    This needs to be overriden in derived classes in order to check op
74
    validity.
75

76
    """
77
    self.proc = processor
78
    self.op = op
79
    self.cfg = cfg
80
    self.sstore = sstore
81
    self.__ssh = None
82

    
83
    for attr_name in self._OP_REQP:
84
      attr_val = getattr(op, attr_name, None)
85
      if attr_val is None:
86
        raise errors.OpPrereqError("Required parameter '%s' missing" %
87
                                   attr_name)
88

    
89
    if not cfg.IsCluster():
90
      raise errors.OpPrereqError("Cluster not initialized yet,"
91
                                 " use 'gnt-cluster init' first.")
92
    if self.REQ_MASTER:
93
      master = sstore.GetMasterNode()
94
      if master != utils.HostInfo().name:
95
        raise errors.OpPrereqError("Commands must be run on the master"
96
                                   " node %s" % master)
97

    
98
  def __GetSSH(self):
99
    """Returns the SshRunner object
100

101
    """
102
    if not self.__ssh:
103
      self.__ssh = ssh.SshRunner(self.sstore)
104
    return self.__ssh
105

    
106
  ssh = property(fget=__GetSSH)
107

    
108
  def CheckPrereq(self):
109
    """Check prerequisites for this LU.
110

111
    This method should check that the prerequisites for the execution
112
    of this LU are fulfilled. It can do internode communication, but
113
    it should be idempotent - no cluster or system changes are
114
    allowed.
115

116
    The method should raise errors.OpPrereqError in case something is
117
    not fulfilled. Its return value is ignored.
118

119
    This method should also update all the parameters of the opcode to
120
    their canonical form; e.g. a short node name must be fully
121
    expanded after this method has successfully completed (so that
122
    hooks, logging, etc. work correctly).
123

124
    """
125
    raise NotImplementedError
126

    
127
  def Exec(self, feedback_fn):
128
    """Execute the LU.
129

130
    This method should implement the actual work. It should raise
131
    errors.OpExecError for failures that are somewhat dealt with in
132
    code, or expected.
133

134
    """
135
    raise NotImplementedError
136

    
137
  def BuildHooksEnv(self):
138
    """Build hooks environment for this LU.
139

140
    This method should return a three-node tuple consisting of: a dict
141
    containing the environment that will be used for running the
142
    specific hook for this LU, a list of node names on which the hook
143
    should run before the execution, and a list of node names on which
144
    the hook should run after the execution.
145

146
    The keys of the dict must not have 'GANETI_' prefixed as this will
147
    be handled in the hooks runner. Also note additional keys will be
148
    added by the hooks runner. If the LU doesn't define any
149
    environment, an empty dict (and not None) should be returned.
150

151
    No nodes should be returned as an empty list (and not None).
152

153
    Note that if the HPATH for a LU class is None, this function will
154
    not be called.
155

156
    """
157
    raise NotImplementedError
158

    
159
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
160
    """Notify the LU about the results of its hooks.
161

162
    This method is called every time a hooks phase is executed, and notifies
163
    the Logical Unit about the hooks' result. The LU can then use it to alter
164
    its result based on the hooks.  By default the method does nothing and the
165
    previous result is passed back unchanged but any LU can define it if it
166
    wants to use the local cluster hook-scripts somehow.
167

168
    Args:
169
      phase: the hooks phase that has just been run
170
      hooks_results: the results of the multi-node hooks rpc call
171
      feedback_fn: function to send feedback back to the caller
172
      lu_result: the previous result this LU had, or None in the PRE phase.
173

174
    """
175
    return lu_result
176

    
177

    
178
class NoHooksLU(LogicalUnit):
179
  """Simple LU which runs no hooks.
180

181
  This LU is intended as a parent for other LogicalUnits which will
182
  run no hooks, in order to reduce duplicate code.
183

184
  """
185
  HPATH = None
186
  HTYPE = None
187

    
188

    
189
def _GetWantedNodes(lu, nodes):
190
  """Returns list of checked and expanded node names.
191

192
  Args:
193
    nodes: List of nodes (strings) or None for all
194

195
  """
196
  if not isinstance(nodes, list):
197
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
198

    
199
  if nodes:
200
    wanted = []
201

    
202
    for name in nodes:
203
      node = lu.cfg.ExpandNodeName(name)
204
      if node is None:
205
        raise errors.OpPrereqError("No such node name '%s'" % name)
206
      wanted.append(node)
207

    
208
  else:
209
    wanted = lu.cfg.GetNodeList()
210
  return utils.NiceSort(wanted)
211

    
212

    
213
def _GetWantedInstances(lu, instances):
214
  """Returns list of checked and expanded instance names.
215

216
  Args:
217
    instances: List of instances (strings) or None for all
218

219
  """
220
  if not isinstance(instances, list):
221
    raise errors.OpPrereqError("Invalid argument type 'instances'")
222

    
223
  if instances:
224
    wanted = []
225

    
226
    for name in instances:
227
      instance = lu.cfg.ExpandInstanceName(name)
228
      if instance is None:
229
        raise errors.OpPrereqError("No such instance name '%s'" % name)
230
      wanted.append(instance)
231

    
232
  else:
233
    wanted = lu.cfg.GetInstanceList()
234
  return utils.NiceSort(wanted)
235

    
236

    
237
def _CheckOutputFields(static, dynamic, selected):
238
  """Checks whether all selected fields are valid.
239

240
  Args:
241
    static: Static fields
242
    dynamic: Dynamic fields
243

244
  """
245
  static_fields = frozenset(static)
246
  dynamic_fields = frozenset(dynamic)
247

    
248
  all_fields = static_fields | dynamic_fields
249

    
250
  if not all_fields.issuperset(selected):
251
    raise errors.OpPrereqError("Unknown output fields selected: %s"
252
                               % ",".join(frozenset(selected).
253
                                          difference(all_fields)))
254

    
255

    
256
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
257
                          memory, vcpus, nics):
258
  """Builds instance related env variables for hooks from single variables.
259

260
  Args:
261
    secondary_nodes: List of secondary nodes as strings
262
  """
263
  env = {
264
    "OP_TARGET": name,
265
    "INSTANCE_NAME": name,
266
    "INSTANCE_PRIMARY": primary_node,
267
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
268
    "INSTANCE_OS_TYPE": os_type,
269
    "INSTANCE_STATUS": status,
270
    "INSTANCE_MEMORY": memory,
271
    "INSTANCE_VCPUS": vcpus,
272
  }
273

    
274
  if nics:
275
    nic_count = len(nics)
276
    for idx, (ip, bridge, mac) in enumerate(nics):
277
      if ip is None:
278
        ip = ""
279
      env["INSTANCE_NIC%d_IP" % idx] = ip
280
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
281
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
282
  else:
283
    nic_count = 0
284

    
285
  env["INSTANCE_NIC_COUNT"] = nic_count
286

    
287
  return env
288

    
289

    
290
def _BuildInstanceHookEnvByObject(instance, override=None):
291
  """Builds instance related env variables for hooks from an object.
292

293
  Args:
294
    instance: objects.Instance object of instance
295
    override: dict of values to override
296
  """
297
  args = {
298
    'name': instance.name,
299
    'primary_node': instance.primary_node,
300
    'secondary_nodes': instance.secondary_nodes,
301
    'os_type': instance.os,
302
    'status': instance.os,
303
    'memory': instance.memory,
304
    'vcpus': instance.vcpus,
305
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
306
  }
307
  if override:
308
    args.update(override)
309
  return _BuildInstanceHookEnv(**args)
310

    
311

    
312
def _CheckInstanceBridgesExist(instance):
313
  """Check that the brigdes needed by an instance exist.
314

315
  """
316
  # check bridges existance
317
  brlist = [nic.bridge for nic in instance.nics]
318
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
319
    raise errors.OpPrereqError("one or more target bridges %s does not"
320
                               " exist on destination node '%s'" %
321
                               (brlist, instance.primary_node))
322

    
323

    
324
class LUDestroyCluster(NoHooksLU):
325
  """Logical unit for destroying the cluster.
326

327
  """
328
  _OP_REQP = []
329

    
330
  def CheckPrereq(self):
331
    """Check prerequisites.
332

333
    This checks whether the cluster is empty.
334

335
    Any errors are signalled by raising errors.OpPrereqError.
336

337
    """
338
    master = self.sstore.GetMasterNode()
339

    
340
    nodelist = self.cfg.GetNodeList()
341
    if len(nodelist) != 1 or nodelist[0] != master:
342
      raise errors.OpPrereqError("There are still %d node(s) in"
343
                                 " this cluster." % (len(nodelist) - 1))
344
    instancelist = self.cfg.GetInstanceList()
345
    if instancelist:
346
      raise errors.OpPrereqError("There are still %d instance(s) in"
347
                                 " this cluster." % len(instancelist))
348

    
349
  def Exec(self, feedback_fn):
350
    """Destroys the cluster.
351

352
    """
353
    master = self.sstore.GetMasterNode()
354
    if not rpc.call_node_stop_master(master):
355
      raise errors.OpExecError("Could not disable the master role")
356
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
357
    utils.CreateBackup(priv_key)
358
    utils.CreateBackup(pub_key)
359
    rpc.call_node_leave_cluster(master)
360

    
361

    
362
class LUVerifyCluster(LogicalUnit):
363
  """Verifies the cluster status.
364

365
  """
366
  HPATH = "cluster-verify"
367
  HTYPE = constants.HTYPE_CLUSTER
368
  _OP_REQP = ["skip_checks"]
369

    
370
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
371
                  remote_version, feedback_fn):
372
    """Run multiple tests against a node.
373

374
    Test list:
375
      - compares ganeti version
376
      - checks vg existance and size > 20G
377
      - checks config file checksum
378
      - checks ssh to other nodes
379

380
    Args:
381
      node: name of the node to check
382
      file_list: required list of files
383
      local_cksum: dictionary of local files and their checksums
384

385
    """
386
    # compares ganeti version
387
    local_version = constants.PROTOCOL_VERSION
388
    if not remote_version:
389
      feedback_fn("  - ERROR: connection to %s failed" % (node))
390
      return True
391

    
392
    if local_version != remote_version:
393
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
394
                      (local_version, node, remote_version))
395
      return True
396

    
397
    # checks vg existance and size > 20G
398

    
399
    bad = False
400
    if not vglist:
401
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
402
                      (node,))
403
      bad = True
404
    else:
405
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
406
                                            constants.MIN_VG_SIZE)
407
      if vgstatus:
408
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
409
        bad = True
410

    
411
    # checks config file checksum
412
    # checks ssh to any
413

    
414
    if 'filelist' not in node_result:
415
      bad = True
416
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
417
    else:
418
      remote_cksum = node_result['filelist']
419
      for file_name in file_list:
420
        if file_name not in remote_cksum:
421
          bad = True
422
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
423
        elif remote_cksum[file_name] != local_cksum[file_name]:
424
          bad = True
425
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
426

    
427
    if 'nodelist' not in node_result:
428
      bad = True
429
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
430
    else:
431
      if node_result['nodelist']:
432
        bad = True
433
        for node in node_result['nodelist']:
434
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
435
                          (node, node_result['nodelist'][node]))
436
    if 'node-net-test' not in node_result:
437
      bad = True
438
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
439
    else:
440
      if node_result['node-net-test']:
441
        bad = True
442
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
443
        for node in nlist:
444
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
445
                          (node, node_result['node-net-test'][node]))
446

    
447
    hyp_result = node_result.get('hypervisor', None)
448
    if hyp_result is not None:
449
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
450
    return bad
451

    
452
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
453
                      node_instance, feedback_fn):
454
    """Verify an instance.
455

456
    This function checks to see if the required block devices are
457
    available on the instance's node.
458

459
    """
460
    bad = False
461

    
462
    node_current = instanceconfig.primary_node
463

    
464
    node_vol_should = {}
465
    instanceconfig.MapLVsByNode(node_vol_should)
466

    
467
    for node in node_vol_should:
468
      for volume in node_vol_should[node]:
469
        if node not in node_vol_is or volume not in node_vol_is[node]:
470
          feedback_fn("  - ERROR: volume %s missing on node %s" %
471
                          (volume, node))
472
          bad = True
473

    
474
    if not instanceconfig.status == 'down':
475
      if (node_current not in node_instance or
476
          not instance in node_instance[node_current]):
477
        feedback_fn("  - ERROR: instance %s not running on node %s" %
478
                        (instance, node_current))
479
        bad = True
480

    
481
    for node in node_instance:
482
      if (not node == node_current):
483
        if instance in node_instance[node]:
484
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
485
                          (instance, node))
486
          bad = True
487

    
488
    return bad
489

    
490
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
491
    """Verify if there are any unknown volumes in the cluster.
492

493
    The .os, .swap and backup volumes are ignored. All other volumes are
494
    reported as unknown.
495

496
    """
497
    bad = False
498

    
499
    for node in node_vol_is:
500
      for volume in node_vol_is[node]:
501
        if node not in node_vol_should or volume not in node_vol_should[node]:
502
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
503
                      (volume, node))
504
          bad = True
505
    return bad
506

    
507
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
508
    """Verify the list of running instances.
509

510
    This checks what instances are running but unknown to the cluster.
511

512
    """
513
    bad = False
514
    for node in node_instance:
515
      for runninginstance in node_instance[node]:
516
        if runninginstance not in instancelist:
517
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
518
                          (runninginstance, node))
519
          bad = True
520
    return bad
521

    
522
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
523
    """Verify N+1 Memory Resilience.
524

525
    Check that if one single node dies we can still start all the instances it
526
    was primary for.
527

528
    """
529
    bad = False
530

    
531
    for node, nodeinfo in node_info.iteritems():
532
      # This code checks that every node which is now listed as secondary has
533
      # enough memory to host all instances it is supposed to should a single
534
      # other node in the cluster fail.
535
      # FIXME: not ready for failover to an arbitrary node
536
      # FIXME: does not support file-backed instances
537
      # WARNING: we currently take into account down instances as well as up
538
      # ones, considering that even if they're down someone might want to start
539
      # them even in the event of a node failure.
540
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
541
        needed_mem = 0
542
        for instance in instances:
543
          needed_mem += instance_cfg[instance].memory
544
        if nodeinfo['mfree'] < needed_mem:
545
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
546
                      " failovers should node %s fail" % (node, prinode))
547
          bad = True
548
    return bad
549

    
550
  def CheckPrereq(self):
551
    """Check prerequisites.
552

553
    Transform the list of checks we're going to skip into a set and check that
554
    all its members are valid.
555

556
    """
557
    self.skip_set = frozenset(self.op.skip_checks)
558
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
559
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
560

    
561
  def BuildHooksEnv(self):
562
    """Build hooks env.
563

564
    Cluster-Verify hooks just rone in the post phase and their failure makes
565
    the output be logged in the verify output and the verification to fail.
566

567
    """
568
    all_nodes = self.cfg.GetNodeList()
569
    # TODO: populate the environment with useful information for verify hooks
570
    env = {}
571
    return env, [], all_nodes
572

    
573
  def Exec(self, feedback_fn):
574
    """Verify integrity of cluster, performing various test on nodes.
575

576
    """
577
    bad = False
578
    feedback_fn("* Verifying global settings")
579
    for msg in self.cfg.VerifyConfig():
580
      feedback_fn("  - ERROR: %s" % msg)
581

    
582
    vg_name = self.cfg.GetVGName()
583
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
584
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
585
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
586
    i_non_redundant = [] # Non redundant instances
587
    node_volume = {}
588
    node_instance = {}
589
    node_info = {}
590
    instance_cfg = {}
591

    
592
    # FIXME: verify OS list
593
    # do local checksums
594
    file_names = list(self.sstore.GetFileList())
595
    file_names.append(constants.SSL_CERT_FILE)
596
    file_names.append(constants.CLUSTER_CONF_FILE)
597
    local_checksums = utils.FingerprintFiles(file_names)
598

    
599
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
600
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
601
    all_instanceinfo = rpc.call_instance_list(nodelist)
602
    all_vglist = rpc.call_vg_list(nodelist)
603
    node_verify_param = {
604
      'filelist': file_names,
605
      'nodelist': nodelist,
606
      'hypervisor': None,
607
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
608
                        for node in nodeinfo]
609
      }
610
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
611
    all_rversion = rpc.call_version(nodelist)
612
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
613

    
614
    for node in nodelist:
615
      feedback_fn("* Verifying node %s" % node)
616
      result = self._VerifyNode(node, file_names, local_checksums,
617
                                all_vglist[node], all_nvinfo[node],
618
                                all_rversion[node], feedback_fn)
619
      bad = bad or result
620

    
621
      # node_volume
622
      volumeinfo = all_volumeinfo[node]
623

    
624
      if isinstance(volumeinfo, basestring):
625
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
626
                    (node, volumeinfo[-400:].encode('string_escape')))
627
        bad = True
628
        node_volume[node] = {}
629
      elif not isinstance(volumeinfo, dict):
630
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
631
        bad = True
632
        continue
633
      else:
634
        node_volume[node] = volumeinfo
635

    
636
      # node_instance
637
      nodeinstance = all_instanceinfo[node]
638
      if type(nodeinstance) != list:
639
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
640
        bad = True
641
        continue
642

    
643
      node_instance[node] = nodeinstance
644

    
645
      # node_info
646
      nodeinfo = all_ninfo[node]
647
      if not isinstance(nodeinfo, dict):
648
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
649
        bad = True
650
        continue
651

    
652
      try:
653
        node_info[node] = {
654
          "mfree": int(nodeinfo['memory_free']),
655
          "dfree": int(nodeinfo['vg_free']),
656
          "pinst": [],
657
          "sinst": [],
658
          # dictionary holding all instances this node is secondary for,
659
          # grouped by their primary node. Each key is a cluster node, and each
660
          # value is a list of instances which have the key as primary and the
661
          # current node as secondary.  this is handy to calculate N+1 memory
662
          # availability if you can only failover from a primary to its
663
          # secondary.
664
          "sinst-by-pnode": {},
665
        }
666
      except ValueError:
667
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
668
        bad = True
669
        continue
670

    
671
    node_vol_should = {}
672

    
673
    for instance in instancelist:
674
      feedback_fn("* Verifying instance %s" % instance)
675
      inst_config = self.cfg.GetInstanceInfo(instance)
676
      result =  self._VerifyInstance(instance, inst_config, node_volume,
677
                                     node_instance, feedback_fn)
678
      bad = bad or result
679

    
680
      inst_config.MapLVsByNode(node_vol_should)
681

    
682
      instance_cfg[instance] = inst_config
683

    
684
      pnode = inst_config.primary_node
685
      if pnode in node_info:
686
        node_info[pnode]['pinst'].append(instance)
687
      else:
688
        feedback_fn("  - ERROR: instance %s, connection to primary node"
689
                    " %s failed" % (instance, pnode))
690
        bad = True
691

    
692
      # If the instance is non-redundant we cannot survive losing its primary
693
      # node, so we are not N+1 compliant. On the other hand we have no disk
694
      # templates with more than one secondary so that situation is not well
695
      # supported either.
696
      # FIXME: does not support file-backed instances
697
      if len(inst_config.secondary_nodes) == 0:
698
        i_non_redundant.append(instance)
699
      elif len(inst_config.secondary_nodes) > 1:
700
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
701
                    % instance)
702

    
703
      for snode in inst_config.secondary_nodes:
704
        if snode in node_info:
705
          node_info[snode]['sinst'].append(instance)
706
          if pnode not in node_info[snode]['sinst-by-pnode']:
707
            node_info[snode]['sinst-by-pnode'][pnode] = []
708
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
709
        else:
710
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
711
                      " %s failed" % (instance, snode))
712

    
713
    feedback_fn("* Verifying orphan volumes")
714
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
715
                                       feedback_fn)
716
    bad = bad or result
717

    
718
    feedback_fn("* Verifying remaining instances")
719
    result = self._VerifyOrphanInstances(instancelist, node_instance,
720
                                         feedback_fn)
721
    bad = bad or result
722

    
723
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
724
      feedback_fn("* Verifying N+1 Memory redundancy")
725
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
726
      bad = bad or result
727

    
728
    feedback_fn("* Other Notes")
729
    if i_non_redundant:
730
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
731
                  % len(i_non_redundant))
732

    
733
    return int(bad)
734

    
735
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
736
    """Analize the post-hooks' result, handle it, and send some
737
    nicely-formatted feedback back to the user.
738

739
    Args:
740
      phase: the hooks phase that has just been run
741
      hooks_results: the results of the multi-node hooks rpc call
742
      feedback_fn: function to send feedback back to the caller
743
      lu_result: previous Exec result
744

745
    """
746
    # We only really run POST phase hooks, and are only interested in their results
747
    if phase == constants.HOOKS_PHASE_POST:
748
      # Used to change hooks' output to proper indentation
749
      indent_re = re.compile('^', re.M)
750
      feedback_fn("* Hooks Results")
751
      if not hooks_results:
752
        feedback_fn("  - ERROR: general communication failure")
753
        lu_result = 1
754
      else:
755
        for node_name in hooks_results:
756
          show_node_header = True
757
          res = hooks_results[node_name]
758
          if res is False or not isinstance(res, list):
759
            feedback_fn("    Communication failure")
760
            lu_result = 1
761
            continue
762
          for script, hkr, output in res:
763
            if hkr == constants.HKR_FAIL:
764
              # The node header is only shown once, if there are
765
              # failing hooks on that node
766
              if show_node_header:
767
                feedback_fn("  Node %s:" % node_name)
768
                show_node_header = False
769
              feedback_fn("    ERROR: Script %s failed, output:" % script)
770
              output = indent_re.sub('      ', output)
771
              feedback_fn("%s" % output)
772
              lu_result = 1
773

    
774
      return lu_result
775

    
776

    
777
class LUVerifyDisks(NoHooksLU):
778
  """Verifies the cluster disks status.
779

780
  """
781
  _OP_REQP = []
782

    
783
  def CheckPrereq(self):
784
    """Check prerequisites.
785

786
    This has no prerequisites.
787

788
    """
789
    pass
790

    
791
  def Exec(self, feedback_fn):
792
    """Verify integrity of cluster disks.
793

794
    """
795
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
796

    
797
    vg_name = self.cfg.GetVGName()
798
    nodes = utils.NiceSort(self.cfg.GetNodeList())
799
    instances = [self.cfg.GetInstanceInfo(name)
800
                 for name in self.cfg.GetInstanceList()]
801

    
802
    nv_dict = {}
803
    for inst in instances:
804
      inst_lvs = {}
805
      if (inst.status != "up" or
806
          inst.disk_template not in constants.DTS_NET_MIRROR):
807
        continue
808
      inst.MapLVsByNode(inst_lvs)
809
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
810
      for node, vol_list in inst_lvs.iteritems():
811
        for vol in vol_list:
812
          nv_dict[(node, vol)] = inst
813

    
814
    if not nv_dict:
815
      return result
816

    
817
    node_lvs = rpc.call_volume_list(nodes, vg_name)
818

    
819
    to_act = set()
820
    for node in nodes:
821
      # node_volume
822
      lvs = node_lvs[node]
823

    
824
      if isinstance(lvs, basestring):
825
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
826
        res_nlvm[node] = lvs
827
      elif not isinstance(lvs, dict):
828
        logger.Info("connection to node %s failed or invalid data returned" %
829
                    (node,))
830
        res_nodes.append(node)
831
        continue
832

    
833
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
834
        inst = nv_dict.pop((node, lv_name), None)
835
        if (not lv_online and inst is not None
836
            and inst.name not in res_instances):
837
          res_instances.append(inst.name)
838

    
839
    # any leftover items in nv_dict are missing LVs, let's arrange the
840
    # data better
841
    for key, inst in nv_dict.iteritems():
842
      if inst.name not in res_missing:
843
        res_missing[inst.name] = []
844
      res_missing[inst.name].append(key)
845

    
846
    return result
847

    
848

    
849
class LURenameCluster(LogicalUnit):
850
  """Rename the cluster.
851

852
  """
853
  HPATH = "cluster-rename"
854
  HTYPE = constants.HTYPE_CLUSTER
855
  _OP_REQP = ["name"]
856
  REQ_WSSTORE = True
857

    
858
  def BuildHooksEnv(self):
859
    """Build hooks env.
860

861
    """
862
    env = {
863
      "OP_TARGET": self.sstore.GetClusterName(),
864
      "NEW_NAME": self.op.name,
865
      }
866
    mn = self.sstore.GetMasterNode()
867
    return env, [mn], [mn]
868

    
869
  def CheckPrereq(self):
870
    """Verify that the passed name is a valid one.
871

872
    """
873
    hostname = utils.HostInfo(self.op.name)
874

    
875
    new_name = hostname.name
876
    self.ip = new_ip = hostname.ip
877
    old_name = self.sstore.GetClusterName()
878
    old_ip = self.sstore.GetMasterIP()
879
    if new_name == old_name and new_ip == old_ip:
880
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
881
                                 " cluster has changed")
882
    if new_ip != old_ip:
883
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
884
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
885
                                   " reachable on the network. Aborting." %
886
                                   new_ip)
887

    
888
    self.op.name = new_name
889

    
890
  def Exec(self, feedback_fn):
891
    """Rename the cluster.
892

893
    """
894
    clustername = self.op.name
895
    ip = self.ip
896
    ss = self.sstore
897

    
898
    # shutdown the master IP
899
    master = ss.GetMasterNode()
900
    if not rpc.call_node_stop_master(master):
901
      raise errors.OpExecError("Could not disable the master role")
902

    
903
    try:
904
      # modify the sstore
905
      ss.SetKey(ss.SS_MASTER_IP, ip)
906
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
907

    
908
      # Distribute updated ss config to all nodes
909
      myself = self.cfg.GetNodeInfo(master)
910
      dist_nodes = self.cfg.GetNodeList()
911
      if myself.name in dist_nodes:
912
        dist_nodes.remove(myself.name)
913

    
914
      logger.Debug("Copying updated ssconf data to all nodes")
915
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
916
        fname = ss.KeyToFilename(keyname)
917
        result = rpc.call_upload_file(dist_nodes, fname)
918
        for to_node in dist_nodes:
919
          if not result[to_node]:
920
            logger.Error("copy of file %s to node %s failed" %
921
                         (fname, to_node))
922
    finally:
923
      if not rpc.call_node_start_master(master):
924
        logger.Error("Could not re-enable the master role on the master,"
925
                     " please restart manually.")
926

    
927

    
928
def _RecursiveCheckIfLVMBased(disk):
929
  """Check if the given disk or its children are lvm-based.
930

931
  Args:
932
    disk: ganeti.objects.Disk object
933

934
  Returns:
935
    boolean indicating whether a LD_LV dev_type was found or not
936

937
  """
938
  if disk.children:
939
    for chdisk in disk.children:
940
      if _RecursiveCheckIfLVMBased(chdisk):
941
        return True
942
  return disk.dev_type == constants.LD_LV
943

    
944

    
945
class LUSetClusterParams(LogicalUnit):
946
  """Change the parameters of the cluster.
947

948
  """
949
  HPATH = "cluster-modify"
950
  HTYPE = constants.HTYPE_CLUSTER
951
  _OP_REQP = []
952

    
953
  def BuildHooksEnv(self):
954
    """Build hooks env.
955

956
    """
957
    env = {
958
      "OP_TARGET": self.sstore.GetClusterName(),
959
      "NEW_VG_NAME": self.op.vg_name,
960
      }
961
    mn = self.sstore.GetMasterNode()
962
    return env, [mn], [mn]
963

    
964
  def CheckPrereq(self):
965
    """Check prerequisites.
966

967
    This checks whether the given params don't conflict and
968
    if the given volume group is valid.
969

970
    """
971
    if not self.op.vg_name:
972
      instances = [self.cfg.GetInstanceInfo(name)
973
                   for name in self.cfg.GetInstanceList()]
974
      for inst in instances:
975
        for disk in inst.disks:
976
          if _RecursiveCheckIfLVMBased(disk):
977
            raise errors.OpPrereqError("Cannot disable lvm storage while"
978
                                       " lvm-based instances exist")
979

    
980
    # if vg_name not None, checks given volume group on all nodes
981
    if self.op.vg_name:
982
      node_list = self.cfg.GetNodeList()
983
      vglist = rpc.call_vg_list(node_list)
984
      for node in node_list:
985
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
986
                                              constants.MIN_VG_SIZE)
987
        if vgstatus:
988
          raise errors.OpPrereqError("Error on node '%s': %s" %
989
                                     (node, vgstatus))
990

    
991
  def Exec(self, feedback_fn):
992
    """Change the parameters of the cluster.
993

994
    """
995
    if self.op.vg_name != self.cfg.GetVGName():
996
      self.cfg.SetVGName(self.op.vg_name)
997
    else:
998
      feedback_fn("Cluster LVM configuration already in desired"
999
                  " state, not changing")
1000

    
1001

    
1002
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1003
  """Sleep and poll for an instance's disk to sync.
1004

1005
  """
1006
  if not instance.disks:
1007
    return True
1008

    
1009
  if not oneshot:
1010
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1011

    
1012
  node = instance.primary_node
1013

    
1014
  for dev in instance.disks:
1015
    cfgw.SetDiskID(dev, node)
1016

    
1017
  retries = 0
1018
  while True:
1019
    max_time = 0
1020
    done = True
1021
    cumul_degraded = False
1022
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1023
    if not rstats:
1024
      proc.LogWarning("Can't get any data from node %s" % node)
1025
      retries += 1
1026
      if retries >= 10:
1027
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1028
                                 " aborting." % node)
1029
      time.sleep(6)
1030
      continue
1031
    retries = 0
1032
    for i in range(len(rstats)):
1033
      mstat = rstats[i]
1034
      if mstat is None:
1035
        proc.LogWarning("Can't compute data for node %s/%s" %
1036
                        (node, instance.disks[i].iv_name))
1037
        continue
1038
      # we ignore the ldisk parameter
1039
      perc_done, est_time, is_degraded, _ = mstat
1040
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1041
      if perc_done is not None:
1042
        done = False
1043
        if est_time is not None:
1044
          rem_time = "%d estimated seconds remaining" % est_time
1045
          max_time = est_time
1046
        else:
1047
          rem_time = "no time estimate"
1048
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1049
                     (instance.disks[i].iv_name, perc_done, rem_time))
1050
    if done or oneshot:
1051
      break
1052

    
1053
    if unlock:
1054
      #utils.Unlock('cmd')
1055
      pass
1056
    try:
1057
      time.sleep(min(60, max_time))
1058
    finally:
1059
      if unlock:
1060
        #utils.Lock('cmd')
1061
        pass
1062

    
1063
  if done:
1064
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1065
  return not cumul_degraded
1066

    
1067

    
1068
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1069
  """Check that mirrors are not degraded.
1070

1071
  The ldisk parameter, if True, will change the test from the
1072
  is_degraded attribute (which represents overall non-ok status for
1073
  the device(s)) to the ldisk (representing the local storage status).
1074

1075
  """
1076
  cfgw.SetDiskID(dev, node)
1077
  if ldisk:
1078
    idx = 6
1079
  else:
1080
    idx = 5
1081

    
1082
  result = True
1083
  if on_primary or dev.AssembleOnSecondary():
1084
    rstats = rpc.call_blockdev_find(node, dev)
1085
    if not rstats:
1086
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1087
      result = False
1088
    else:
1089
      result = result and (not rstats[idx])
1090
  if dev.children:
1091
    for child in dev.children:
1092
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1093

    
1094
  return result
1095

    
1096

    
1097
class LUDiagnoseOS(NoHooksLU):
1098
  """Logical unit for OS diagnose/query.
1099

1100
  """
1101
  _OP_REQP = ["output_fields", "names"]
1102

    
1103
  def CheckPrereq(self):
1104
    """Check prerequisites.
1105

1106
    This always succeeds, since this is a pure query LU.
1107

1108
    """
1109
    if self.op.names:
1110
      raise errors.OpPrereqError("Selective OS query not supported")
1111

    
1112
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1113
    _CheckOutputFields(static=[],
1114
                       dynamic=self.dynamic_fields,
1115
                       selected=self.op.output_fields)
1116

    
1117
  @staticmethod
1118
  def _DiagnoseByOS(node_list, rlist):
1119
    """Remaps a per-node return list into an a per-os per-node dictionary
1120

1121
      Args:
1122
        node_list: a list with the names of all nodes
1123
        rlist: a map with node names as keys and OS objects as values
1124

1125
      Returns:
1126
        map: a map with osnames as keys and as value another map, with
1127
             nodes as
1128
             keys and list of OS objects as values
1129
             e.g. {"debian-etch": {"node1": [<object>,...],
1130
                                   "node2": [<object>,]}
1131
                  }
1132

1133
    """
1134
    all_os = {}
1135
    for node_name, nr in rlist.iteritems():
1136
      if not nr:
1137
        continue
1138
      for os_obj in nr:
1139
        if os_obj.name not in all_os:
1140
          # build a list of nodes for this os containing empty lists
1141
          # for each node in node_list
1142
          all_os[os_obj.name] = {}
1143
          for nname in node_list:
1144
            all_os[os_obj.name][nname] = []
1145
        all_os[os_obj.name][node_name].append(os_obj)
1146
    return all_os
1147

    
1148
  def Exec(self, feedback_fn):
1149
    """Compute the list of OSes.
1150

1151
    """
1152
    node_list = self.cfg.GetNodeList()
1153
    node_data = rpc.call_os_diagnose(node_list)
1154
    if node_data == False:
1155
      raise errors.OpExecError("Can't gather the list of OSes")
1156
    pol = self._DiagnoseByOS(node_list, node_data)
1157
    output = []
1158
    for os_name, os_data in pol.iteritems():
1159
      row = []
1160
      for field in self.op.output_fields:
1161
        if field == "name":
1162
          val = os_name
1163
        elif field == "valid":
1164
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1165
        elif field == "node_status":
1166
          val = {}
1167
          for node_name, nos_list in os_data.iteritems():
1168
            val[node_name] = [(v.status, v.path) for v in nos_list]
1169
        else:
1170
          raise errors.ParameterError(field)
1171
        row.append(val)
1172
      output.append(row)
1173

    
1174
    return output
1175

    
1176

    
1177
class LURemoveNode(LogicalUnit):
1178
  """Logical unit for removing a node.
1179

1180
  """
1181
  HPATH = "node-remove"
1182
  HTYPE = constants.HTYPE_NODE
1183
  _OP_REQP = ["node_name"]
1184

    
1185
  def BuildHooksEnv(self):
1186
    """Build hooks env.
1187

1188
    This doesn't run on the target node in the pre phase as a failed
1189
    node would then be impossible to remove.
1190

1191
    """
1192
    env = {
1193
      "OP_TARGET": self.op.node_name,
1194
      "NODE_NAME": self.op.node_name,
1195
      }
1196
    all_nodes = self.cfg.GetNodeList()
1197
    all_nodes.remove(self.op.node_name)
1198
    return env, all_nodes, all_nodes
1199

    
1200
  def CheckPrereq(self):
1201
    """Check prerequisites.
1202

1203
    This checks:
1204
     - the node exists in the configuration
1205
     - it does not have primary or secondary instances
1206
     - it's not the master
1207

1208
    Any errors are signalled by raising errors.OpPrereqError.
1209

1210
    """
1211
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1212
    if node is None:
1213
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1214

    
1215
    instance_list = self.cfg.GetInstanceList()
1216

    
1217
    masternode = self.sstore.GetMasterNode()
1218
    if node.name == masternode:
1219
      raise errors.OpPrereqError("Node is the master node,"
1220
                                 " you need to failover first.")
1221

    
1222
    for instance_name in instance_list:
1223
      instance = self.cfg.GetInstanceInfo(instance_name)
1224
      if node.name == instance.primary_node:
1225
        raise errors.OpPrereqError("Instance %s still running on the node,"
1226
                                   " please remove first." % instance_name)
1227
      if node.name in instance.secondary_nodes:
1228
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1229
                                   " please remove first." % instance_name)
1230
    self.op.node_name = node.name
1231
    self.node = node
1232

    
1233
  def Exec(self, feedback_fn):
1234
    """Removes the node from the cluster.
1235

1236
    """
1237
    node = self.node
1238
    logger.Info("stopping the node daemon and removing configs from node %s" %
1239
                node.name)
1240

    
1241
    rpc.call_node_leave_cluster(node.name)
1242

    
1243
    logger.Info("Removing node %s from config" % node.name)
1244

    
1245
    self.cfg.RemoveNode(node.name)
1246

    
1247
    utils.RemoveHostFromEtcHosts(node.name)
1248

    
1249

    
1250
class LUQueryNodes(NoHooksLU):
1251
  """Logical unit for querying nodes.
1252

1253
  """
1254
  _OP_REQP = ["output_fields", "names"]
1255

    
1256
  def CheckPrereq(self):
1257
    """Check prerequisites.
1258

1259
    This checks that the fields required are valid output fields.
1260

1261
    """
1262
    self.dynamic_fields = frozenset([
1263
      "dtotal", "dfree",
1264
      "mtotal", "mnode", "mfree",
1265
      "bootid",
1266
      "ctotal",
1267
      ])
1268

    
1269
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1270
                               "pinst_list", "sinst_list",
1271
                               "pip", "sip", "tags"],
1272
                       dynamic=self.dynamic_fields,
1273
                       selected=self.op.output_fields)
1274

    
1275
    self.wanted = _GetWantedNodes(self, self.op.names)
1276

    
1277
  def Exec(self, feedback_fn):
1278
    """Computes the list of nodes and their attributes.
1279

1280
    """
1281
    nodenames = self.wanted
1282
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1283

    
1284
    # begin data gathering
1285

    
1286
    if self.dynamic_fields.intersection(self.op.output_fields):
1287
      live_data = {}
1288
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1289
      for name in nodenames:
1290
        nodeinfo = node_data.get(name, None)
1291
        if nodeinfo:
1292
          live_data[name] = {
1293
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1294
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1295
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1296
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1297
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1298
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1299
            "bootid": nodeinfo['bootid'],
1300
            }
1301
        else:
1302
          live_data[name] = {}
1303
    else:
1304
      live_data = dict.fromkeys(nodenames, {})
1305

    
1306
    node_to_primary = dict([(name, set()) for name in nodenames])
1307
    node_to_secondary = dict([(name, set()) for name in nodenames])
1308

    
1309
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1310
                             "sinst_cnt", "sinst_list"))
1311
    if inst_fields & frozenset(self.op.output_fields):
1312
      instancelist = self.cfg.GetInstanceList()
1313

    
1314
      for instance_name in instancelist:
1315
        inst = self.cfg.GetInstanceInfo(instance_name)
1316
        if inst.primary_node in node_to_primary:
1317
          node_to_primary[inst.primary_node].add(inst.name)
1318
        for secnode in inst.secondary_nodes:
1319
          if secnode in node_to_secondary:
1320
            node_to_secondary[secnode].add(inst.name)
1321

    
1322
    # end data gathering
1323

    
1324
    output = []
1325
    for node in nodelist:
1326
      node_output = []
1327
      for field in self.op.output_fields:
1328
        if field == "name":
1329
          val = node.name
1330
        elif field == "pinst_list":
1331
          val = list(node_to_primary[node.name])
1332
        elif field == "sinst_list":
1333
          val = list(node_to_secondary[node.name])
1334
        elif field == "pinst_cnt":
1335
          val = len(node_to_primary[node.name])
1336
        elif field == "sinst_cnt":
1337
          val = len(node_to_secondary[node.name])
1338
        elif field == "pip":
1339
          val = node.primary_ip
1340
        elif field == "sip":
1341
          val = node.secondary_ip
1342
        elif field == "tags":
1343
          val = list(node.GetTags())
1344
        elif field in self.dynamic_fields:
1345
          val = live_data[node.name].get(field, None)
1346
        else:
1347
          raise errors.ParameterError(field)
1348
        node_output.append(val)
1349
      output.append(node_output)
1350

    
1351
    return output
1352

    
1353

    
1354
class LUQueryNodeVolumes(NoHooksLU):
1355
  """Logical unit for getting volumes on node(s).
1356

1357
  """
1358
  _OP_REQP = ["nodes", "output_fields"]
1359

    
1360
  def CheckPrereq(self):
1361
    """Check prerequisites.
1362

1363
    This checks that the fields required are valid output fields.
1364

1365
    """
1366
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1367

    
1368
    _CheckOutputFields(static=["node"],
1369
                       dynamic=["phys", "vg", "name", "size", "instance"],
1370
                       selected=self.op.output_fields)
1371

    
1372

    
1373
  def Exec(self, feedback_fn):
1374
    """Computes the list of nodes and their attributes.
1375

1376
    """
1377
    nodenames = self.nodes
1378
    volumes = rpc.call_node_volumes(nodenames)
1379

    
1380
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1381
             in self.cfg.GetInstanceList()]
1382

    
1383
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1384

    
1385
    output = []
1386
    for node in nodenames:
1387
      if node not in volumes or not volumes[node]:
1388
        continue
1389

    
1390
      node_vols = volumes[node][:]
1391
      node_vols.sort(key=lambda vol: vol['dev'])
1392

    
1393
      for vol in node_vols:
1394
        node_output = []
1395
        for field in self.op.output_fields:
1396
          if field == "node":
1397
            val = node
1398
          elif field == "phys":
1399
            val = vol['dev']
1400
          elif field == "vg":
1401
            val = vol['vg']
1402
          elif field == "name":
1403
            val = vol['name']
1404
          elif field == "size":
1405
            val = int(float(vol['size']))
1406
          elif field == "instance":
1407
            for inst in ilist:
1408
              if node not in lv_by_node[inst]:
1409
                continue
1410
              if vol['name'] in lv_by_node[inst][node]:
1411
                val = inst.name
1412
                break
1413
            else:
1414
              val = '-'
1415
          else:
1416
            raise errors.ParameterError(field)
1417
          node_output.append(str(val))
1418

    
1419
        output.append(node_output)
1420

    
1421
    return output
1422

    
1423

    
1424
class LUAddNode(LogicalUnit):
1425
  """Logical unit for adding node to the cluster.
1426

1427
  """
1428
  HPATH = "node-add"
1429
  HTYPE = constants.HTYPE_NODE
1430
  _OP_REQP = ["node_name"]
1431

    
1432
  def BuildHooksEnv(self):
1433
    """Build hooks env.
1434

1435
    This will run on all nodes before, and on all nodes + the new node after.
1436

1437
    """
1438
    env = {
1439
      "OP_TARGET": self.op.node_name,
1440
      "NODE_NAME": self.op.node_name,
1441
      "NODE_PIP": self.op.primary_ip,
1442
      "NODE_SIP": self.op.secondary_ip,
1443
      }
1444
    nodes_0 = self.cfg.GetNodeList()
1445
    nodes_1 = nodes_0 + [self.op.node_name, ]
1446
    return env, nodes_0, nodes_1
1447

    
1448
  def CheckPrereq(self):
1449
    """Check prerequisites.
1450

1451
    This checks:
1452
     - the new node is not already in the config
1453
     - it is resolvable
1454
     - its parameters (single/dual homed) matches the cluster
1455

1456
    Any errors are signalled by raising errors.OpPrereqError.
1457

1458
    """
1459
    node_name = self.op.node_name
1460
    cfg = self.cfg
1461

    
1462
    dns_data = utils.HostInfo(node_name)
1463

    
1464
    node = dns_data.name
1465
    primary_ip = self.op.primary_ip = dns_data.ip
1466
    secondary_ip = getattr(self.op, "secondary_ip", None)
1467
    if secondary_ip is None:
1468
      secondary_ip = primary_ip
1469
    if not utils.IsValidIP(secondary_ip):
1470
      raise errors.OpPrereqError("Invalid secondary IP given")
1471
    self.op.secondary_ip = secondary_ip
1472

    
1473
    node_list = cfg.GetNodeList()
1474
    if not self.op.readd and node in node_list:
1475
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1476
                                 node)
1477
    elif self.op.readd and node not in node_list:
1478
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1479

    
1480
    for existing_node_name in node_list:
1481
      existing_node = cfg.GetNodeInfo(existing_node_name)
1482

    
1483
      if self.op.readd and node == existing_node_name:
1484
        if (existing_node.primary_ip != primary_ip or
1485
            existing_node.secondary_ip != secondary_ip):
1486
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1487
                                     " address configuration as before")
1488
        continue
1489

    
1490
      if (existing_node.primary_ip == primary_ip or
1491
          existing_node.secondary_ip == primary_ip or
1492
          existing_node.primary_ip == secondary_ip or
1493
          existing_node.secondary_ip == secondary_ip):
1494
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1495
                                   " existing node %s" % existing_node.name)
1496

    
1497
    # check that the type of the node (single versus dual homed) is the
1498
    # same as for the master
1499
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1500
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1501
    newbie_singlehomed = secondary_ip == primary_ip
1502
    if master_singlehomed != newbie_singlehomed:
1503
      if master_singlehomed:
1504
        raise errors.OpPrereqError("The master has no private ip but the"
1505
                                   " new node has one")
1506
      else:
1507
        raise errors.OpPrereqError("The master has a private ip but the"
1508
                                   " new node doesn't have one")
1509

    
1510
    # checks reachablity
1511
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1512
      raise errors.OpPrereqError("Node not reachable by ping")
1513

    
1514
    if not newbie_singlehomed:
1515
      # check reachability from my secondary ip to newbie's secondary ip
1516
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1517
                           source=myself.secondary_ip):
1518
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1519
                                   " based ping to noded port")
1520

    
1521
    self.new_node = objects.Node(name=node,
1522
                                 primary_ip=primary_ip,
1523
                                 secondary_ip=secondary_ip)
1524

    
1525
  def Exec(self, feedback_fn):
1526
    """Adds the new node to the cluster.
1527

1528
    """
1529
    new_node = self.new_node
1530
    node = new_node.name
1531

    
1532
    # check connectivity
1533
    result = rpc.call_version([node])[node]
1534
    if result:
1535
      if constants.PROTOCOL_VERSION == result:
1536
        logger.Info("communication to node %s fine, sw version %s match" %
1537
                    (node, result))
1538
      else:
1539
        raise errors.OpExecError("Version mismatch master version %s,"
1540
                                 " node version %s" %
1541
                                 (constants.PROTOCOL_VERSION, result))
1542
    else:
1543
      raise errors.OpExecError("Cannot get version from the new node")
1544

    
1545
    # setup ssh on node
1546
    logger.Info("copy ssh key to node %s" % node)
1547
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1548
    keyarray = []
1549
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1550
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1551
                priv_key, pub_key]
1552

    
1553
    for i in keyfiles:
1554
      f = open(i, 'r')
1555
      try:
1556
        keyarray.append(f.read())
1557
      finally:
1558
        f.close()
1559

    
1560
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1561
                               keyarray[3], keyarray[4], keyarray[5])
1562

    
1563
    if not result:
1564
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1565

    
1566
    # Add node to our /etc/hosts, and add key to known_hosts
1567
    utils.AddHostToEtcHosts(new_node.name)
1568

    
1569
    if new_node.secondary_ip != new_node.primary_ip:
1570
      if not rpc.call_node_tcp_ping(new_node.name,
1571
                                    constants.LOCALHOST_IP_ADDRESS,
1572
                                    new_node.secondary_ip,
1573
                                    constants.DEFAULT_NODED_PORT,
1574
                                    10, False):
1575
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1576
                                 " you gave (%s). Please fix and re-run this"
1577
                                 " command." % new_node.secondary_ip)
1578

    
1579
    node_verify_list = [self.sstore.GetMasterNode()]
1580
    node_verify_param = {
1581
      'nodelist': [node],
1582
      # TODO: do a node-net-test as well?
1583
    }
1584

    
1585
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1586
    for verifier in node_verify_list:
1587
      if not result[verifier]:
1588
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1589
                                 " for remote verification" % verifier)
1590
      if result[verifier]['nodelist']:
1591
        for failed in result[verifier]['nodelist']:
1592
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1593
                      (verifier, result[verifier]['nodelist'][failed]))
1594
        raise errors.OpExecError("ssh/hostname verification failed.")
1595

    
1596
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1597
    # including the node just added
1598
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1599
    dist_nodes = self.cfg.GetNodeList()
1600
    if not self.op.readd:
1601
      dist_nodes.append(node)
1602
    if myself.name in dist_nodes:
1603
      dist_nodes.remove(myself.name)
1604

    
1605
    logger.Debug("Copying hosts and known_hosts to all nodes")
1606
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1607
      result = rpc.call_upload_file(dist_nodes, fname)
1608
      for to_node in dist_nodes:
1609
        if not result[to_node]:
1610
          logger.Error("copy of file %s to node %s failed" %
1611
                       (fname, to_node))
1612

    
1613
    to_copy = self.sstore.GetFileList()
1614
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1615
      to_copy.append(constants.VNC_PASSWORD_FILE)
1616
    for fname in to_copy:
1617
      result = rpc.call_upload_file([node], fname)
1618
      if not result[node]:
1619
        logger.Error("could not copy file %s to node %s" % (fname, node))
1620

    
1621
    if not self.op.readd:
1622
      logger.Info("adding node %s to cluster.conf" % node)
1623
      self.cfg.AddNode(new_node)
1624

    
1625

    
1626
class LUMasterFailover(LogicalUnit):
1627
  """Failover the master node to the current node.
1628

1629
  This is a special LU in that it must run on a non-master node.
1630

1631
  """
1632
  HPATH = "master-failover"
1633
  HTYPE = constants.HTYPE_CLUSTER
1634
  REQ_MASTER = False
1635
  REQ_WSSTORE = True
1636
  _OP_REQP = []
1637

    
1638
  def BuildHooksEnv(self):
1639
    """Build hooks env.
1640

1641
    This will run on the new master only in the pre phase, and on all
1642
    the nodes in the post phase.
1643

1644
    """
1645
    env = {
1646
      "OP_TARGET": self.new_master,
1647
      "NEW_MASTER": self.new_master,
1648
      "OLD_MASTER": self.old_master,
1649
      }
1650
    return env, [self.new_master], self.cfg.GetNodeList()
1651

    
1652
  def CheckPrereq(self):
1653
    """Check prerequisites.
1654

1655
    This checks that we are not already the master.
1656

1657
    """
1658
    self.new_master = utils.HostInfo().name
1659
    self.old_master = self.sstore.GetMasterNode()
1660

    
1661
    if self.old_master == self.new_master:
1662
      raise errors.OpPrereqError("This commands must be run on the node"
1663
                                 " where you want the new master to be."
1664
                                 " %s is already the master" %
1665
                                 self.old_master)
1666

    
1667
  def Exec(self, feedback_fn):
1668
    """Failover the master node.
1669

1670
    This command, when run on a non-master node, will cause the current
1671
    master to cease being master, and the non-master to become new
1672
    master.
1673

1674
    """
1675
    #TODO: do not rely on gethostname returning the FQDN
1676
    logger.Info("setting master to %s, old master: %s" %
1677
                (self.new_master, self.old_master))
1678

    
1679
    if not rpc.call_node_stop_master(self.old_master):
1680
      logger.Error("could disable the master role on the old master"
1681
                   " %s, please disable manually" % self.old_master)
1682

    
1683
    ss = self.sstore
1684
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1685
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1686
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1687
      logger.Error("could not distribute the new simple store master file"
1688
                   " to the other nodes, please check.")
1689

    
1690
    if not rpc.call_node_start_master(self.new_master):
1691
      logger.Error("could not start the master role on the new master"
1692
                   " %s, please check" % self.new_master)
1693
      feedback_fn("Error in activating the master IP on the new master,"
1694
                  " please fix manually.")
1695

    
1696

    
1697

    
1698
class LUQueryClusterInfo(NoHooksLU):
1699
  """Query cluster configuration.
1700

1701
  """
1702
  _OP_REQP = []
1703
  REQ_MASTER = False
1704

    
1705
  def CheckPrereq(self):
1706
    """No prerequsites needed for this LU.
1707

1708
    """
1709
    pass
1710

    
1711
  def Exec(self, feedback_fn):
1712
    """Return cluster config.
1713

1714
    """
1715
    result = {
1716
      "name": self.sstore.GetClusterName(),
1717
      "software_version": constants.RELEASE_VERSION,
1718
      "protocol_version": constants.PROTOCOL_VERSION,
1719
      "config_version": constants.CONFIG_VERSION,
1720
      "os_api_version": constants.OS_API_VERSION,
1721
      "export_version": constants.EXPORT_VERSION,
1722
      "master": self.sstore.GetMasterNode(),
1723
      "architecture": (platform.architecture()[0], platform.machine()),
1724
      "hypervisor_type": self.sstore.GetHypervisorType(),
1725
      }
1726

    
1727
    return result
1728

    
1729

    
1730
class LUDumpClusterConfig(NoHooksLU):
1731
  """Return a text-representation of the cluster-config.
1732

1733
  """
1734
  _OP_REQP = []
1735

    
1736
  def CheckPrereq(self):
1737
    """No prerequisites.
1738

1739
    """
1740
    pass
1741

    
1742
  def Exec(self, feedback_fn):
1743
    """Dump a representation of the cluster config to the standard output.
1744

1745
    """
1746
    return self.cfg.DumpConfig()
1747

    
1748

    
1749
class LUActivateInstanceDisks(NoHooksLU):
1750
  """Bring up an instance's disks.
1751

1752
  """
1753
  _OP_REQP = ["instance_name"]
1754

    
1755
  def CheckPrereq(self):
1756
    """Check prerequisites.
1757

1758
    This checks that the instance is in the cluster.
1759

1760
    """
1761
    instance = self.cfg.GetInstanceInfo(
1762
      self.cfg.ExpandInstanceName(self.op.instance_name))
1763
    if instance is None:
1764
      raise errors.OpPrereqError("Instance '%s' not known" %
1765
                                 self.op.instance_name)
1766
    self.instance = instance
1767

    
1768

    
1769
  def Exec(self, feedback_fn):
1770
    """Activate the disks.
1771

1772
    """
1773
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1774
    if not disks_ok:
1775
      raise errors.OpExecError("Cannot activate block devices")
1776

    
1777
    return disks_info
1778

    
1779

    
1780
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1781
  """Prepare the block devices for an instance.
1782

1783
  This sets up the block devices on all nodes.
1784

1785
  Args:
1786
    instance: a ganeti.objects.Instance object
1787
    ignore_secondaries: if true, errors on secondary nodes won't result
1788
                        in an error return from the function
1789

1790
  Returns:
1791
    false if the operation failed
1792
    list of (host, instance_visible_name, node_visible_name) if the operation
1793
         suceeded with the mapping from node devices to instance devices
1794
  """
1795
  device_info = []
1796
  disks_ok = True
1797
  iname = instance.name
1798
  # With the two passes mechanism we try to reduce the window of
1799
  # opportunity for the race condition of switching DRBD to primary
1800
  # before handshaking occured, but we do not eliminate it
1801

    
1802
  # The proper fix would be to wait (with some limits) until the
1803
  # connection has been made and drbd transitions from WFConnection
1804
  # into any other network-connected state (Connected, SyncTarget,
1805
  # SyncSource, etc.)
1806

    
1807
  # 1st pass, assemble on all nodes in secondary mode
1808
  for inst_disk in instance.disks:
1809
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1810
      cfg.SetDiskID(node_disk, node)
1811
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1812
      if not result:
1813
        logger.Error("could not prepare block device %s on node %s"
1814
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1815
        if not ignore_secondaries:
1816
          disks_ok = False
1817

    
1818
  # FIXME: race condition on drbd migration to primary
1819

    
1820
  # 2nd pass, do only the primary node
1821
  for inst_disk in instance.disks:
1822
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1823
      if node != instance.primary_node:
1824
        continue
1825
      cfg.SetDiskID(node_disk, node)
1826
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1827
      if not result:
1828
        logger.Error("could not prepare block device %s on node %s"
1829
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1830
        disks_ok = False
1831
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1832

    
1833
  # leave the disks configured for the primary node
1834
  # this is a workaround that would be fixed better by
1835
  # improving the logical/physical id handling
1836
  for disk in instance.disks:
1837
    cfg.SetDiskID(disk, instance.primary_node)
1838

    
1839
  return disks_ok, device_info
1840

    
1841

    
1842
def _StartInstanceDisks(cfg, instance, force):
1843
  """Start the disks of an instance.
1844

1845
  """
1846
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1847
                                           ignore_secondaries=force)
1848
  if not disks_ok:
1849
    _ShutdownInstanceDisks(instance, cfg)
1850
    if force is not None and not force:
1851
      logger.Error("If the message above refers to a secondary node,"
1852
                   " you can retry the operation using '--force'.")
1853
    raise errors.OpExecError("Disk consistency error")
1854

    
1855

    
1856
class LUDeactivateInstanceDisks(NoHooksLU):
1857
  """Shutdown an instance's disks.
1858

1859
  """
1860
  _OP_REQP = ["instance_name"]
1861

    
1862
  def CheckPrereq(self):
1863
    """Check prerequisites.
1864

1865
    This checks that the instance is in the cluster.
1866

1867
    """
1868
    instance = self.cfg.GetInstanceInfo(
1869
      self.cfg.ExpandInstanceName(self.op.instance_name))
1870
    if instance is None:
1871
      raise errors.OpPrereqError("Instance '%s' not known" %
1872
                                 self.op.instance_name)
1873
    self.instance = instance
1874

    
1875
  def Exec(self, feedback_fn):
1876
    """Deactivate the disks
1877

1878
    """
1879
    instance = self.instance
1880
    ins_l = rpc.call_instance_list([instance.primary_node])
1881
    ins_l = ins_l[instance.primary_node]
1882
    if not type(ins_l) is list:
1883
      raise errors.OpExecError("Can't contact node '%s'" %
1884
                               instance.primary_node)
1885

    
1886
    if self.instance.name in ins_l:
1887
      raise errors.OpExecError("Instance is running, can't shutdown"
1888
                               " block devices.")
1889

    
1890
    _ShutdownInstanceDisks(instance, self.cfg)
1891

    
1892

    
1893
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1894
  """Shutdown block devices of an instance.
1895

1896
  This does the shutdown on all nodes of the instance.
1897

1898
  If the ignore_primary is false, errors on the primary node are
1899
  ignored.
1900

1901
  """
1902
  result = True
1903
  for disk in instance.disks:
1904
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1905
      cfg.SetDiskID(top_disk, node)
1906
      if not rpc.call_blockdev_shutdown(node, top_disk):
1907
        logger.Error("could not shutdown block device %s on node %s" %
1908
                     (disk.iv_name, node))
1909
        if not ignore_primary or node != instance.primary_node:
1910
          result = False
1911
  return result
1912

    
1913

    
1914
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1915
  """Checks if a node has enough free memory.
1916

1917
  This function check if a given node has the needed amount of free
1918
  memory. In case the node has less memory or we cannot get the
1919
  information from the node, this function raise an OpPrereqError
1920
  exception.
1921

1922
  Args:
1923
    - cfg: a ConfigWriter instance
1924
    - node: the node name
1925
    - reason: string to use in the error message
1926
    - requested: the amount of memory in MiB
1927

1928
  """
1929
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1930
  if not nodeinfo or not isinstance(nodeinfo, dict):
1931
    raise errors.OpPrereqError("Could not contact node %s for resource"
1932
                             " information" % (node,))
1933

    
1934
  free_mem = nodeinfo[node].get('memory_free')
1935
  if not isinstance(free_mem, int):
1936
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1937
                             " was '%s'" % (node, free_mem))
1938
  if requested > free_mem:
1939
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1940
                             " needed %s MiB, available %s MiB" %
1941
                             (node, reason, requested, free_mem))
1942

    
1943

    
1944
class LUStartupInstance(LogicalUnit):
1945
  """Starts an instance.
1946

1947
  """
1948
  HPATH = "instance-start"
1949
  HTYPE = constants.HTYPE_INSTANCE
1950
  _OP_REQP = ["instance_name", "force"]
1951

    
1952
  def BuildHooksEnv(self):
1953
    """Build hooks env.
1954

1955
    This runs on master, primary and secondary nodes of the instance.
1956

1957
    """
1958
    env = {
1959
      "FORCE": self.op.force,
1960
      }
1961
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1962
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1963
          list(self.instance.secondary_nodes))
1964
    return env, nl, nl
1965

    
1966
  def CheckPrereq(self):
1967
    """Check prerequisites.
1968

1969
    This checks that the instance is in the cluster.
1970

1971
    """
1972
    instance = self.cfg.GetInstanceInfo(
1973
      self.cfg.ExpandInstanceName(self.op.instance_name))
1974
    if instance is None:
1975
      raise errors.OpPrereqError("Instance '%s' not known" %
1976
                                 self.op.instance_name)
1977

    
1978
    # check bridges existance
1979
    _CheckInstanceBridgesExist(instance)
1980

    
1981
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
1982
                         "starting instance %s" % instance.name,
1983
                         instance.memory)
1984

    
1985
    self.instance = instance
1986
    self.op.instance_name = instance.name
1987

    
1988
  def Exec(self, feedback_fn):
1989
    """Start the instance.
1990

1991
    """
1992
    instance = self.instance
1993
    force = self.op.force
1994
    extra_args = getattr(self.op, "extra_args", "")
1995

    
1996
    self.cfg.MarkInstanceUp(instance.name)
1997

    
1998
    node_current = instance.primary_node
1999

    
2000
    _StartInstanceDisks(self.cfg, instance, force)
2001

    
2002
    if not rpc.call_instance_start(node_current, instance, extra_args):
2003
      _ShutdownInstanceDisks(instance, self.cfg)
2004
      raise errors.OpExecError("Could not start instance")
2005

    
2006

    
2007
class LURebootInstance(LogicalUnit):
2008
  """Reboot an instance.
2009

2010
  """
2011
  HPATH = "instance-reboot"
2012
  HTYPE = constants.HTYPE_INSTANCE
2013
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2014

    
2015
  def BuildHooksEnv(self):
2016
    """Build hooks env.
2017

2018
    This runs on master, primary and secondary nodes of the instance.
2019

2020
    """
2021
    env = {
2022
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2023
      }
2024
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2025
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2026
          list(self.instance.secondary_nodes))
2027
    return env, nl, nl
2028

    
2029
  def CheckPrereq(self):
2030
    """Check prerequisites.
2031

2032
    This checks that the instance is in the cluster.
2033

2034
    """
2035
    instance = self.cfg.GetInstanceInfo(
2036
      self.cfg.ExpandInstanceName(self.op.instance_name))
2037
    if instance is None:
2038
      raise errors.OpPrereqError("Instance '%s' not known" %
2039
                                 self.op.instance_name)
2040

    
2041
    # check bridges existance
2042
    _CheckInstanceBridgesExist(instance)
2043

    
2044
    self.instance = instance
2045
    self.op.instance_name = instance.name
2046

    
2047
  def Exec(self, feedback_fn):
2048
    """Reboot the instance.
2049

2050
    """
2051
    instance = self.instance
2052
    ignore_secondaries = self.op.ignore_secondaries
2053
    reboot_type = self.op.reboot_type
2054
    extra_args = getattr(self.op, "extra_args", "")
2055

    
2056
    node_current = instance.primary_node
2057

    
2058
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2059
                           constants.INSTANCE_REBOOT_HARD,
2060
                           constants.INSTANCE_REBOOT_FULL]:
2061
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2062
                                  (constants.INSTANCE_REBOOT_SOFT,
2063
                                   constants.INSTANCE_REBOOT_HARD,
2064
                                   constants.INSTANCE_REBOOT_FULL))
2065

    
2066
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2067
                       constants.INSTANCE_REBOOT_HARD]:
2068
      if not rpc.call_instance_reboot(node_current, instance,
2069
                                      reboot_type, extra_args):
2070
        raise errors.OpExecError("Could not reboot instance")
2071
    else:
2072
      if not rpc.call_instance_shutdown(node_current, instance):
2073
        raise errors.OpExecError("could not shutdown instance for full reboot")
2074
      _ShutdownInstanceDisks(instance, self.cfg)
2075
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2076
      if not rpc.call_instance_start(node_current, instance, extra_args):
2077
        _ShutdownInstanceDisks(instance, self.cfg)
2078
        raise errors.OpExecError("Could not start instance for full reboot")
2079

    
2080
    self.cfg.MarkInstanceUp(instance.name)
2081

    
2082

    
2083
class LUShutdownInstance(LogicalUnit):
2084
  """Shutdown an instance.
2085

2086
  """
2087
  HPATH = "instance-stop"
2088
  HTYPE = constants.HTYPE_INSTANCE
2089
  _OP_REQP = ["instance_name"]
2090

    
2091
  def BuildHooksEnv(self):
2092
    """Build hooks env.
2093

2094
    This runs on master, primary and secondary nodes of the instance.
2095

2096
    """
2097
    env = _BuildInstanceHookEnvByObject(self.instance)
2098
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2099
          list(self.instance.secondary_nodes))
2100
    return env, nl, nl
2101

    
2102
  def CheckPrereq(self):
2103
    """Check prerequisites.
2104

2105
    This checks that the instance is in the cluster.
2106

2107
    """
2108
    instance = self.cfg.GetInstanceInfo(
2109
      self.cfg.ExpandInstanceName(self.op.instance_name))
2110
    if instance is None:
2111
      raise errors.OpPrereqError("Instance '%s' not known" %
2112
                                 self.op.instance_name)
2113
    self.instance = instance
2114

    
2115
  def Exec(self, feedback_fn):
2116
    """Shutdown the instance.
2117

2118
    """
2119
    instance = self.instance
2120
    node_current = instance.primary_node
2121
    self.cfg.MarkInstanceDown(instance.name)
2122
    if not rpc.call_instance_shutdown(node_current, instance):
2123
      logger.Error("could not shutdown instance")
2124

    
2125
    _ShutdownInstanceDisks(instance, self.cfg)
2126

    
2127

    
2128
class LUReinstallInstance(LogicalUnit):
2129
  """Reinstall an instance.
2130

2131
  """
2132
  HPATH = "instance-reinstall"
2133
  HTYPE = constants.HTYPE_INSTANCE
2134
  _OP_REQP = ["instance_name"]
2135

    
2136
  def BuildHooksEnv(self):
2137
    """Build hooks env.
2138

2139
    This runs on master, primary and secondary nodes of the instance.
2140

2141
    """
2142
    env = _BuildInstanceHookEnvByObject(self.instance)
2143
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2144
          list(self.instance.secondary_nodes))
2145
    return env, nl, nl
2146

    
2147
  def CheckPrereq(self):
2148
    """Check prerequisites.
2149

2150
    This checks that the instance is in the cluster and is not running.
2151

2152
    """
2153
    instance = self.cfg.GetInstanceInfo(
2154
      self.cfg.ExpandInstanceName(self.op.instance_name))
2155
    if instance is None:
2156
      raise errors.OpPrereqError("Instance '%s' not known" %
2157
                                 self.op.instance_name)
2158
    if instance.disk_template == constants.DT_DISKLESS:
2159
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2160
                                 self.op.instance_name)
2161
    if instance.status != "down":
2162
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2163
                                 self.op.instance_name)
2164
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2165
    if remote_info:
2166
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2167
                                 (self.op.instance_name,
2168
                                  instance.primary_node))
2169

    
2170
    self.op.os_type = getattr(self.op, "os_type", None)
2171
    if self.op.os_type is not None:
2172
      # OS verification
2173
      pnode = self.cfg.GetNodeInfo(
2174
        self.cfg.ExpandNodeName(instance.primary_node))
2175
      if pnode is None:
2176
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2177
                                   self.op.pnode)
2178
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2179
      if not os_obj:
2180
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2181
                                   " primary node"  % self.op.os_type)
2182

    
2183
    self.instance = instance
2184

    
2185
  def Exec(self, feedback_fn):
2186
    """Reinstall the instance.
2187

2188
    """
2189
    inst = self.instance
2190

    
2191
    if self.op.os_type is not None:
2192
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2193
      inst.os = self.op.os_type
2194
      self.cfg.AddInstance(inst)
2195

    
2196
    _StartInstanceDisks(self.cfg, inst, None)
2197
    try:
2198
      feedback_fn("Running the instance OS create scripts...")
2199
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2200
        raise errors.OpExecError("Could not install OS for instance %s"
2201
                                 " on node %s" %
2202
                                 (inst.name, inst.primary_node))
2203
    finally:
2204
      _ShutdownInstanceDisks(inst, self.cfg)
2205

    
2206

    
2207
class LURenameInstance(LogicalUnit):
2208
  """Rename an instance.
2209

2210
  """
2211
  HPATH = "instance-rename"
2212
  HTYPE = constants.HTYPE_INSTANCE
2213
  _OP_REQP = ["instance_name", "new_name"]
2214

    
2215
  def BuildHooksEnv(self):
2216
    """Build hooks env.
2217

2218
    This runs on master, primary and secondary nodes of the instance.
2219

2220
    """
2221
    env = _BuildInstanceHookEnvByObject(self.instance)
2222
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2223
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2224
          list(self.instance.secondary_nodes))
2225
    return env, nl, nl
2226

    
2227
  def CheckPrereq(self):
2228
    """Check prerequisites.
2229

2230
    This checks that the instance is in the cluster and is not running.
2231

2232
    """
2233
    instance = self.cfg.GetInstanceInfo(
2234
      self.cfg.ExpandInstanceName(self.op.instance_name))
2235
    if instance is None:
2236
      raise errors.OpPrereqError("Instance '%s' not known" %
2237
                                 self.op.instance_name)
2238
    if instance.status != "down":
2239
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2240
                                 self.op.instance_name)
2241
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2242
    if remote_info:
2243
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2244
                                 (self.op.instance_name,
2245
                                  instance.primary_node))
2246
    self.instance = instance
2247

    
2248
    # new name verification
2249
    name_info = utils.HostInfo(self.op.new_name)
2250

    
2251
    self.op.new_name = new_name = name_info.name
2252
    instance_list = self.cfg.GetInstanceList()
2253
    if new_name in instance_list:
2254
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2255
                                 new_name)
2256

    
2257
    if not getattr(self.op, "ignore_ip", False):
2258
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2259
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2260
                                   (name_info.ip, new_name))
2261

    
2262

    
2263
  def Exec(self, feedback_fn):
2264
    """Reinstall the instance.
2265

2266
    """
2267
    inst = self.instance
2268
    old_name = inst.name
2269

    
2270
    if inst.disk_template == constants.DT_FILE:
2271
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2272

    
2273
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2274

    
2275
    # re-read the instance from the configuration after rename
2276
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2277

    
2278
    if inst.disk_template == constants.DT_FILE:
2279
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2280
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2281
                                                old_file_storage_dir,
2282
                                                new_file_storage_dir)
2283

    
2284
      if not result:
2285
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2286
                                 " directory '%s' to '%s' (but the instance"
2287
                                 " has been renamed in Ganeti)" % (
2288
                                 inst.primary_node, old_file_storage_dir,
2289
                                 new_file_storage_dir))
2290

    
2291
      if not result[0]:
2292
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2293
                                 " (but the instance has been renamed in"
2294
                                 " Ganeti)" % (old_file_storage_dir,
2295
                                               new_file_storage_dir))
2296

    
2297
    _StartInstanceDisks(self.cfg, inst, None)
2298
    try:
2299
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2300
                                          "sda", "sdb"):
2301
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2302
               " instance has been renamed in Ganeti)" %
2303
               (inst.name, inst.primary_node))
2304
        logger.Error(msg)
2305
    finally:
2306
      _ShutdownInstanceDisks(inst, self.cfg)
2307

    
2308

    
2309
class LURemoveInstance(LogicalUnit):
2310
  """Remove an instance.
2311

2312
  """
2313
  HPATH = "instance-remove"
2314
  HTYPE = constants.HTYPE_INSTANCE
2315
  _OP_REQP = ["instance_name", "ignore_failures"]
2316

    
2317
  def BuildHooksEnv(self):
2318
    """Build hooks env.
2319

2320
    This runs on master, primary and secondary nodes of the instance.
2321

2322
    """
2323
    env = _BuildInstanceHookEnvByObject(self.instance)
2324
    nl = [self.sstore.GetMasterNode()]
2325
    return env, nl, nl
2326

    
2327
  def CheckPrereq(self):
2328
    """Check prerequisites.
2329

2330
    This checks that the instance is in the cluster.
2331

2332
    """
2333
    instance = self.cfg.GetInstanceInfo(
2334
      self.cfg.ExpandInstanceName(self.op.instance_name))
2335
    if instance is None:
2336
      raise errors.OpPrereqError("Instance '%s' not known" %
2337
                                 self.op.instance_name)
2338
    self.instance = instance
2339

    
2340
  def Exec(self, feedback_fn):
2341
    """Remove the instance.
2342

2343
    """
2344
    instance = self.instance
2345
    logger.Info("shutting down instance %s on node %s" %
2346
                (instance.name, instance.primary_node))
2347

    
2348
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2349
      if self.op.ignore_failures:
2350
        feedback_fn("Warning: can't shutdown instance")
2351
      else:
2352
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2353
                                 (instance.name, instance.primary_node))
2354

    
2355
    logger.Info("removing block devices for instance %s" % instance.name)
2356

    
2357
    if not _RemoveDisks(instance, self.cfg):
2358
      if self.op.ignore_failures:
2359
        feedback_fn("Warning: can't remove instance's disks")
2360
      else:
2361
        raise errors.OpExecError("Can't remove instance's disks")
2362

    
2363
    logger.Info("removing instance %s out of cluster config" % instance.name)
2364

    
2365
    self.cfg.RemoveInstance(instance.name)
2366

    
2367

    
2368
class LUQueryInstances(NoHooksLU):
2369
  """Logical unit for querying instances.
2370

2371
  """
2372
  _OP_REQP = ["output_fields", "names"]
2373

    
2374
  def CheckPrereq(self):
2375
    """Check prerequisites.
2376

2377
    This checks that the fields required are valid output fields.
2378

2379
    """
2380
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2381
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2382
                               "admin_state", "admin_ram",
2383
                               "disk_template", "ip", "mac", "bridge",
2384
                               "sda_size", "sdb_size", "vcpus", "tags"],
2385
                       dynamic=self.dynamic_fields,
2386
                       selected=self.op.output_fields)
2387

    
2388
    self.wanted = _GetWantedInstances(self, self.op.names)
2389

    
2390
  def Exec(self, feedback_fn):
2391
    """Computes the list of nodes and their attributes.
2392

2393
    """
2394
    instance_names = self.wanted
2395
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2396
                     in instance_names]
2397

    
2398
    # begin data gathering
2399

    
2400
    nodes = frozenset([inst.primary_node for inst in instance_list])
2401

    
2402
    bad_nodes = []
2403
    if self.dynamic_fields.intersection(self.op.output_fields):
2404
      live_data = {}
2405
      node_data = rpc.call_all_instances_info(nodes)
2406
      for name in nodes:
2407
        result = node_data[name]
2408
        if result:
2409
          live_data.update(result)
2410
        elif result == False:
2411
          bad_nodes.append(name)
2412
        # else no instance is alive
2413
    else:
2414
      live_data = dict([(name, {}) for name in instance_names])
2415

    
2416
    # end data gathering
2417

    
2418
    output = []
2419
    for instance in instance_list:
2420
      iout = []
2421
      for field in self.op.output_fields:
2422
        if field == "name":
2423
          val = instance.name
2424
        elif field == "os":
2425
          val = instance.os
2426
        elif field == "pnode":
2427
          val = instance.primary_node
2428
        elif field == "snodes":
2429
          val = list(instance.secondary_nodes)
2430
        elif field == "admin_state":
2431
          val = (instance.status != "down")
2432
        elif field == "oper_state":
2433
          if instance.primary_node in bad_nodes:
2434
            val = None
2435
          else:
2436
            val = bool(live_data.get(instance.name))
2437
        elif field == "status":
2438
          if instance.primary_node in bad_nodes:
2439
            val = "ERROR_nodedown"
2440
          else:
2441
            running = bool(live_data.get(instance.name))
2442
            if running:
2443
              if instance.status != "down":
2444
                val = "running"
2445
              else:
2446
                val = "ERROR_up"
2447
            else:
2448
              if instance.status != "down":
2449
                val = "ERROR_down"
2450
              else:
2451
                val = "ADMIN_down"
2452
        elif field == "admin_ram":
2453
          val = instance.memory
2454
        elif field == "oper_ram":
2455
          if instance.primary_node in bad_nodes:
2456
            val = None
2457
          elif instance.name in live_data:
2458
            val = live_data[instance.name].get("memory", "?")
2459
          else:
2460
            val = "-"
2461
        elif field == "disk_template":
2462
          val = instance.disk_template
2463
        elif field == "ip":
2464
          val = instance.nics[0].ip
2465
        elif field == "bridge":
2466
          val = instance.nics[0].bridge
2467
        elif field == "mac":
2468
          val = instance.nics[0].mac
2469
        elif field == "sda_size" or field == "sdb_size":
2470
          disk = instance.FindDisk(field[:3])
2471
          if disk is None:
2472
            val = None
2473
          else:
2474
            val = disk.size
2475
        elif field == "vcpus":
2476
          val = instance.vcpus
2477
        elif field == "tags":
2478
          val = list(instance.GetTags())
2479
        else:
2480
          raise errors.ParameterError(field)
2481
        iout.append(val)
2482
      output.append(iout)
2483

    
2484
    return output
2485

    
2486

    
2487
class LUFailoverInstance(LogicalUnit):
2488
  """Failover an instance.
2489

2490
  """
2491
  HPATH = "instance-failover"
2492
  HTYPE = constants.HTYPE_INSTANCE
2493
  _OP_REQP = ["instance_name", "ignore_consistency"]
2494

    
2495
  def BuildHooksEnv(self):
2496
    """Build hooks env.
2497

2498
    This runs on master, primary and secondary nodes of the instance.
2499

2500
    """
2501
    env = {
2502
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2503
      }
2504
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2505
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2506
    return env, nl, nl
2507

    
2508
  def CheckPrereq(self):
2509
    """Check prerequisites.
2510

2511
    This checks that the instance is in the cluster.
2512

2513
    """
2514
    instance = self.cfg.GetInstanceInfo(
2515
      self.cfg.ExpandInstanceName(self.op.instance_name))
2516
    if instance is None:
2517
      raise errors.OpPrereqError("Instance '%s' not known" %
2518
                                 self.op.instance_name)
2519

    
2520
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2521
      raise errors.OpPrereqError("Instance's disk layout is not"
2522
                                 " network mirrored, cannot failover.")
2523

    
2524
    secondary_nodes = instance.secondary_nodes
2525
    if not secondary_nodes:
2526
      raise errors.ProgrammerError("no secondary node but using "
2527
                                   "a mirrored disk template")
2528

    
2529
    target_node = secondary_nodes[0]
2530
    # check memory requirements on the secondary node
2531
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2532
                         instance.name, instance.memory)
2533

    
2534
    # check bridge existance
2535
    brlist = [nic.bridge for nic in instance.nics]
2536
    if not rpc.call_bridges_exist(target_node, brlist):
2537
      raise errors.OpPrereqError("One or more target bridges %s does not"
2538
                                 " exist on destination node '%s'" %
2539
                                 (brlist, target_node))
2540

    
2541
    self.instance = instance
2542

    
2543
  def Exec(self, feedback_fn):
2544
    """Failover an instance.
2545

2546
    The failover is done by shutting it down on its present node and
2547
    starting it on the secondary.
2548

2549
    """
2550
    instance = self.instance
2551

    
2552
    source_node = instance.primary_node
2553
    target_node = instance.secondary_nodes[0]
2554

    
2555
    feedback_fn("* checking disk consistency between source and target")
2556
    for dev in instance.disks:
2557
      # for drbd, these are drbd over lvm
2558
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2559
        if instance.status == "up" and not self.op.ignore_consistency:
2560
          raise errors.OpExecError("Disk %s is degraded on target node,"
2561
                                   " aborting failover." % dev.iv_name)
2562

    
2563
    feedback_fn("* shutting down instance on source node")
2564
    logger.Info("Shutting down instance %s on node %s" %
2565
                (instance.name, source_node))
2566

    
2567
    if not rpc.call_instance_shutdown(source_node, instance):
2568
      if self.op.ignore_consistency:
2569
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2570
                     " anyway. Please make sure node %s is down"  %
2571
                     (instance.name, source_node, source_node))
2572
      else:
2573
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2574
                                 (instance.name, source_node))
2575

    
2576
    feedback_fn("* deactivating the instance's disks on source node")
2577
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2578
      raise errors.OpExecError("Can't shut down the instance's disks.")
2579

    
2580
    instance.primary_node = target_node
2581
    # distribute new instance config to the other nodes
2582
    self.cfg.Update(instance)
2583

    
2584
    # Only start the instance if it's marked as up
2585
    if instance.status == "up":
2586
      feedback_fn("* activating the instance's disks on target node")
2587
      logger.Info("Starting instance %s on node %s" %
2588
                  (instance.name, target_node))
2589

    
2590
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2591
                                               ignore_secondaries=True)
2592
      if not disks_ok:
2593
        _ShutdownInstanceDisks(instance, self.cfg)
2594
        raise errors.OpExecError("Can't activate the instance's disks")
2595

    
2596
      feedback_fn("* starting the instance on the target node")
2597
      if not rpc.call_instance_start(target_node, instance, None):
2598
        _ShutdownInstanceDisks(instance, self.cfg)
2599
        raise errors.OpExecError("Could not start instance %s on node %s." %
2600
                                 (instance.name, target_node))
2601

    
2602

    
2603
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2604
  """Create a tree of block devices on the primary node.
2605

2606
  This always creates all devices.
2607

2608
  """
2609
  if device.children:
2610
    for child in device.children:
2611
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2612
        return False
2613

    
2614
  cfg.SetDiskID(device, node)
2615
  new_id = rpc.call_blockdev_create(node, device, device.size,
2616
                                    instance.name, True, info)
2617
  if not new_id:
2618
    return False
2619
  if device.physical_id is None:
2620
    device.physical_id = new_id
2621
  return True
2622

    
2623

    
2624
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2625
  """Create a tree of block devices on a secondary node.
2626

2627
  If this device type has to be created on secondaries, create it and
2628
  all its children.
2629

2630
  If not, just recurse to children keeping the same 'force' value.
2631

2632
  """
2633
  if device.CreateOnSecondary():
2634
    force = True
2635
  if device.children:
2636
    for child in device.children:
2637
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2638
                                        child, force, info):
2639
        return False
2640

    
2641
  if not force:
2642
    return True
2643
  cfg.SetDiskID(device, node)
2644
  new_id = rpc.call_blockdev_create(node, device, device.size,
2645
                                    instance.name, False, info)
2646
  if not new_id:
2647
    return False
2648
  if device.physical_id is None:
2649
    device.physical_id = new_id
2650
  return True
2651

    
2652

    
2653
def _GenerateUniqueNames(cfg, exts):
2654
  """Generate a suitable LV name.
2655

2656
  This will generate a logical volume name for the given instance.
2657

2658
  """
2659
  results = []
2660
  for val in exts:
2661
    new_id = cfg.GenerateUniqueID()
2662
    results.append("%s%s" % (new_id, val))
2663
  return results
2664

    
2665

    
2666
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2667
  """Generate a drbd8 device complete with its children.
2668

2669
  """
2670
  port = cfg.AllocatePort()
2671
  vgname = cfg.GetVGName()
2672
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2673
                          logical_id=(vgname, names[0]))
2674
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2675
                          logical_id=(vgname, names[1]))
2676
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2677
                          logical_id = (primary, secondary, port),
2678
                          children = [dev_data, dev_meta],
2679
                          iv_name=iv_name)
2680
  return drbd_dev
2681

    
2682

    
2683
def _GenerateDiskTemplate(cfg, template_name,
2684
                          instance_name, primary_node,
2685
                          secondary_nodes, disk_sz, swap_sz,
2686
                          file_storage_dir, file_driver):
2687
  """Generate the entire disk layout for a given template type.
2688

2689
  """
2690
  #TODO: compute space requirements
2691

    
2692
  vgname = cfg.GetVGName()
2693
  if template_name == constants.DT_DISKLESS:
2694
    disks = []
2695
  elif template_name == constants.DT_PLAIN:
2696
    if len(secondary_nodes) != 0:
2697
      raise errors.ProgrammerError("Wrong template configuration")
2698

    
2699
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2700
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2701
                           logical_id=(vgname, names[0]),
2702
                           iv_name = "sda")
2703
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2704
                           logical_id=(vgname, names[1]),
2705
                           iv_name = "sdb")
2706
    disks = [sda_dev, sdb_dev]
2707
  elif template_name == constants.DT_DRBD8:
2708
    if len(secondary_nodes) != 1:
2709
      raise errors.ProgrammerError("Wrong template configuration")
2710
    remote_node = secondary_nodes[0]
2711
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2712
                                       ".sdb_data", ".sdb_meta"])
2713
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2714
                                         disk_sz, names[0:2], "sda")
2715
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2716
                                         swap_sz, names[2:4], "sdb")
2717
    disks = [drbd_sda_dev, drbd_sdb_dev]
2718
  elif template_name == constants.DT_FILE:
2719
    if len(secondary_nodes) != 0:
2720
      raise errors.ProgrammerError("Wrong template configuration")
2721

    
2722
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2723
                                iv_name="sda", logical_id=(file_driver,
2724
                                "%s/sda" % file_storage_dir))
2725
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2726
                                iv_name="sdb", logical_id=(file_driver,
2727
                                "%s/sdb" % file_storage_dir))
2728
    disks = [file_sda_dev, file_sdb_dev]
2729
  else:
2730
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2731
  return disks
2732

    
2733

    
2734
def _GetInstanceInfoText(instance):
2735
  """Compute that text that should be added to the disk's metadata.
2736

2737
  """
2738
  return "originstname+%s" % instance.name
2739

    
2740

    
2741
def _CreateDisks(cfg, instance):
2742
  """Create all disks for an instance.
2743

2744
  This abstracts away some work from AddInstance.
2745

2746
  Args:
2747
    instance: the instance object
2748

2749
  Returns:
2750
    True or False showing the success of the creation process
2751

2752
  """
2753
  info = _GetInstanceInfoText(instance)
2754

    
2755
  if instance.disk_template == constants.DT_FILE:
2756
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2757
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2758
                                              file_storage_dir)
2759

    
2760
    if not result:
2761
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2762
      return False
2763

    
2764
    if not result[0]:
2765
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2766
      return False
2767

    
2768
  for device in instance.disks:
2769
    logger.Info("creating volume %s for instance %s" %
2770
                (device.iv_name, instance.name))
2771
    #HARDCODE
2772
    for secondary_node in instance.secondary_nodes:
2773
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2774
                                        device, False, info):
2775
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2776
                     (device.iv_name, device, secondary_node))
2777
        return False
2778
    #HARDCODE
2779
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2780
                                    instance, device, info):
2781
      logger.Error("failed to create volume %s on primary!" %
2782
                   device.iv_name)
2783
      return False
2784

    
2785
  return True
2786

    
2787

    
2788
def _RemoveDisks(instance, cfg):
2789
  """Remove all disks for an instance.
2790

2791
  This abstracts away some work from `AddInstance()` and
2792
  `RemoveInstance()`. Note that in case some of the devices couldn't
2793
  be removed, the removal will continue with the other ones (compare
2794
  with `_CreateDisks()`).
2795

2796
  Args:
2797
    instance: the instance object
2798

2799
  Returns:
2800
    True or False showing the success of the removal proces
2801

2802
  """
2803
  logger.Info("removing block devices for instance %s" % instance.name)
2804

    
2805
  result = True
2806
  for device in instance.disks:
2807
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2808
      cfg.SetDiskID(disk, node)
2809
      if not rpc.call_blockdev_remove(node, disk):
2810
        logger.Error("could not remove block device %s on node %s,"
2811
                     " continuing anyway" %
2812
                     (device.iv_name, node))
2813
        result = False
2814

    
2815
  if instance.disk_template == constants.DT_FILE:
2816
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2817
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
2818
                                            file_storage_dir):
2819
      logger.Error("could not remove directory '%s'" % file_storage_dir)
2820
      result = False
2821

    
2822
  return result
2823

    
2824

    
2825
def _ComputeDiskSize(disk_template, disk_size, swap_size):
2826
  """Compute disk size requirements in the volume group
2827

2828
  This is currently hard-coded for the two-drive layout.
2829

2830
  """
2831
  # Required free disk space as a function of disk and swap space
2832
  req_size_dict = {
2833
    constants.DT_DISKLESS: None,
2834
    constants.DT_PLAIN: disk_size + swap_size,
2835
    # 256 MB are added for drbd metadata, 128MB for each drbd device
2836
    constants.DT_DRBD8: disk_size + swap_size + 256,
2837
    constants.DT_FILE: None,
2838
  }
2839

    
2840
  if disk_template not in req_size_dict:
2841
    raise errors.ProgrammerError("Disk template '%s' size requirement"
2842
                                 " is unknown" %  disk_template)
2843

    
2844
  return req_size_dict[disk_template]
2845

    
2846

    
2847
class LUCreateInstance(LogicalUnit):
2848
  """Create an instance.
2849

2850
  """
2851
  HPATH = "instance-add"
2852
  HTYPE = constants.HTYPE_INSTANCE
2853
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
2854
              "disk_template", "swap_size", "mode", "start", "vcpus",
2855
              "wait_for_sync", "ip_check", "mac"]
2856

    
2857
  def _RunAllocator(self):
2858
    """Run the allocator based on input opcode.
2859

2860
    """
2861
    disks = [{"size": self.op.disk_size, "mode": "w"},
2862
             {"size": self.op.swap_size, "mode": "w"}]
2863
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2864
             "bridge": self.op.bridge}]
2865
    ial = IAllocator(self.cfg, self.sstore,
2866
                     mode=constants.IALLOCATOR_MODE_ALLOC,
2867
                     name=self.op.instance_name,
2868
                     disk_template=self.op.disk_template,
2869
                     tags=[],
2870
                     os=self.op.os_type,
2871
                     vcpus=self.op.vcpus,
2872
                     mem_size=self.op.mem_size,
2873
                     disks=disks,
2874
                     nics=nics,
2875
                     )
2876

    
2877
    ial.Run(self.op.iallocator)
2878

    
2879
    if not ial.success:
2880
      raise errors.OpPrereqError("Can't compute nodes using"
2881
                                 " iallocator '%s': %s" % (self.op.iallocator,
2882
                                                           ial.info))
2883
    if len(ial.nodes) != ial.required_nodes:
2884
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2885
                                 " of nodes (%s), required %s" %
2886
                                 (len(ial.nodes), ial.required_nodes))
2887
    self.op.pnode = ial.nodes[0]
2888
    logger.ToStdout("Selected nodes for the instance: %s" %
2889
                    (", ".join(ial.nodes),))
2890
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2891
                (self.op.instance_name, self.op.iallocator, ial.nodes))
2892
    if ial.required_nodes == 2:
2893
      self.op.snode = ial.nodes[1]
2894

    
2895
  def BuildHooksEnv(self):
2896
    """Build hooks env.
2897

2898
    This runs on master, primary and secondary nodes of the instance.
2899

2900
    """
2901
    env = {
2902
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2903
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2904
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2905
      "INSTANCE_ADD_MODE": self.op.mode,
2906
      }
2907
    if self.op.mode == constants.INSTANCE_IMPORT:
2908
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2909
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2910
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2911

    
2912
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2913
      primary_node=self.op.pnode,
2914
      secondary_nodes=self.secondaries,
2915
      status=self.instance_status,
2916
      os_type=self.op.os_type,
2917
      memory=self.op.mem_size,
2918
      vcpus=self.op.vcpus,
2919
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2920
    ))
2921

    
2922
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2923
          self.secondaries)
2924
    return env, nl, nl
2925

    
2926

    
2927
  def CheckPrereq(self):
2928
    """Check prerequisites.
2929

2930
    """
2931
    # set optional parameters to none if they don't exist
2932
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
2933
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
2934
                 "vnc_bind_address"]:
2935
      if not hasattr(self.op, attr):
2936
        setattr(self.op, attr, None)
2937

    
2938
    if self.op.mode not in (constants.INSTANCE_CREATE,
2939
                            constants.INSTANCE_IMPORT):
2940
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2941
                                 self.op.mode)
2942

    
2943
    if (not self.cfg.GetVGName() and
2944
        self.op.disk_template not in constants.DTS_NOT_LVM):
2945
      raise errors.OpPrereqError("Cluster does not support lvm-based"
2946
                                 " instances")
2947

    
2948
    if self.op.mode == constants.INSTANCE_IMPORT:
2949
      src_node = getattr(self.op, "src_node", None)
2950
      src_path = getattr(self.op, "src_path", None)
2951
      if src_node is None or src_path is None:
2952
        raise errors.OpPrereqError("Importing an instance requires source"
2953
                                   " node and path options")
2954
      src_node_full = self.cfg.ExpandNodeName(src_node)
2955
      if src_node_full is None:
2956
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2957
      self.op.src_node = src_node = src_node_full
2958

    
2959
      if not os.path.isabs(src_path):
2960
        raise errors.OpPrereqError("The source path must be absolute")
2961

    
2962
      export_info = rpc.call_export_info(src_node, src_path)
2963

    
2964
      if not export_info:
2965
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2966

    
2967
      if not export_info.has_section(constants.INISECT_EXP):
2968
        raise errors.ProgrammerError("Corrupted export config")
2969

    
2970
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2971
      if (int(ei_version) != constants.EXPORT_VERSION):
2972
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2973
                                   (ei_version, constants.EXPORT_VERSION))
2974

    
2975
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2976
        raise errors.OpPrereqError("Can't import instance with more than"
2977
                                   " one data disk")
2978

    
2979
      # FIXME: are the old os-es, disk sizes, etc. useful?
2980
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2981
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2982
                                                         'disk0_dump'))
2983
      self.src_image = diskimage
2984
    else: # INSTANCE_CREATE
2985
      if getattr(self.op, "os_type", None) is None:
2986
        raise errors.OpPrereqError("No guest OS specified")
2987

    
2988
    #### instance parameters check
2989

    
2990
    # disk template and mirror node verification
2991
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2992
      raise errors.OpPrereqError("Invalid disk template name")
2993

    
2994
    # instance name verification
2995
    hostname1 = utils.HostInfo(self.op.instance_name)
2996

    
2997
    self.op.instance_name = instance_name = hostname1.name
2998
    instance_list = self.cfg.GetInstanceList()
2999
    if instance_name in instance_list:
3000
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3001
                                 instance_name)
3002

    
3003
    # ip validity checks
3004
    ip = getattr(self.op, "ip", None)
3005
    if ip is None or ip.lower() == "none":
3006
      inst_ip = None
3007
    elif ip.lower() == "auto":
3008
      inst_ip = hostname1.ip
3009
    else:
3010
      if not utils.IsValidIP(ip):
3011
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3012
                                   " like a valid IP" % ip)
3013
      inst_ip = ip
3014
    self.inst_ip = self.op.ip = inst_ip
3015

    
3016
    if self.op.start and not self.op.ip_check:
3017
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3018
                                 " adding an instance in start mode")
3019

    
3020
    if self.op.ip_check:
3021
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3022
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3023
                                   (hostname1.ip, instance_name))
3024

    
3025
    # MAC address verification
3026
    if self.op.mac != "auto":
3027
      if not utils.IsValidMac(self.op.mac.lower()):
3028
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3029
                                   self.op.mac)
3030

    
3031
    # bridge verification
3032
    bridge = getattr(self.op, "bridge", None)
3033
    if bridge is None:
3034
      self.op.bridge = self.cfg.GetDefBridge()
3035
    else:
3036
      self.op.bridge = bridge
3037

    
3038
    # boot order verification
3039
    if self.op.hvm_boot_order is not None:
3040
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3041
        raise errors.OpPrereqError("invalid boot order specified,"
3042
                                   " must be one or more of [acdn]")
3043
    # file storage checks
3044
    if (self.op.file_driver and
3045
        not self.op.file_driver in constants.FILE_DRIVER):
3046
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3047
                                 self.op.file_driver)
3048

    
3049
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3050
      raise errors.OpPrereqError("File storage directory not a relative"
3051
                                 " path")
3052
    #### allocator run
3053

    
3054
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3055
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3056
                                 " node must be given")
3057

    
3058
    if self.op.iallocator is not None:
3059
      self._RunAllocator()
3060

    
3061
    #### node related checks
3062

    
3063
    # check primary node
3064
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3065
    if pnode is None:
3066
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3067
                                 self.op.pnode)
3068
    self.op.pnode = pnode.name
3069
    self.pnode = pnode
3070
    self.secondaries = []
3071

    
3072
    # mirror node verification
3073
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3074
      if getattr(self.op, "snode", None) is None:
3075
        raise errors.OpPrereqError("The networked disk templates need"
3076
                                   " a mirror node")
3077

    
3078
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3079
      if snode_name is None:
3080
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3081
                                   self.op.snode)
3082
      elif snode_name == pnode.name:
3083
        raise errors.OpPrereqError("The secondary node cannot be"
3084
                                   " the primary node.")
3085
      self.secondaries.append(snode_name)
3086

    
3087
    req_size = _ComputeDiskSize(self.op.disk_template,
3088
                                self.op.disk_size, self.op.swap_size)
3089

    
3090
    # Check lv size requirements
3091
    if req_size is not None:
3092
      nodenames = [pnode.name] + self.secondaries
3093
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3094
      for node in nodenames:
3095
        info = nodeinfo.get(node, None)
3096
        if not info:
3097
          raise errors.OpPrereqError("Cannot get current information"
3098
                                     " from node '%s'" % node)
3099
        vg_free = info.get('vg_free', None)
3100
        if not isinstance(vg_free, int):
3101
          raise errors.OpPrereqError("Can't compute free disk space on"
3102
                                     " node %s" % node)
3103
        if req_size > info['vg_free']:
3104
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3105
                                     " %d MB available, %d MB required" %
3106
                                     (node, info['vg_free'], req_size))
3107

    
3108
    # os verification
3109
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3110
    if not os_obj:
3111
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3112
                                 " primary node"  % self.op.os_type)
3113

    
3114
    if self.op.kernel_path == constants.VALUE_NONE:
3115
      raise errors.OpPrereqError("Can't set instance kernel to none")
3116

    
3117

    
3118
    # bridge check on primary node
3119
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3120
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3121
                                 " destination node '%s'" %
3122
                                 (self.op.bridge, pnode.name))
3123

    
3124
    # memory check on primary node
3125
    if self.op.start:
3126
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3127
                           "creating instance %s" % self.op.instance_name,
3128
                           self.op.mem_size)
3129

    
3130
    # hvm_cdrom_image_path verification
3131
    if self.op.hvm_cdrom_image_path is not None:
3132
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3133
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3134
                                   " be an absolute path or None, not %s" %
3135
                                   self.op.hvm_cdrom_image_path)
3136
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3137
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3138
                                   " regular file or a symlink pointing to"
3139
                                   " an existing regular file, not %s" %
3140
                                   self.op.hvm_cdrom_image_path)
3141

    
3142
    # vnc_bind_address verification
3143
    if self.op.vnc_bind_address is not None:
3144
      if not utils.IsValidIP(self.op.vnc_bind_address):
3145
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3146
                                   " like a valid IP address" %
3147
                                   self.op.vnc_bind_address)
3148

    
3149
    if self.op.start:
3150
      self.instance_status = 'up'
3151
    else:
3152
      self.instance_status = 'down'
3153

    
3154
  def Exec(self, feedback_fn):
3155
    """Create and add the instance to the cluster.
3156

3157
    """
3158
    instance = self.op.instance_name
3159
    pnode_name = self.pnode.name
3160

    
3161
    if self.op.mac == "auto":
3162
      mac_address = self.cfg.GenerateMAC()
3163
    else:
3164
      mac_address = self.op.mac
3165

    
3166
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3167
    if self.inst_ip is not None:
3168
      nic.ip = self.inst_ip
3169

    
3170
    ht_kind = self.sstore.GetHypervisorType()
3171
    if ht_kind in constants.HTS_REQ_PORT:
3172
      network_port = self.cfg.AllocatePort()
3173
    else:
3174
      network_port = None
3175

    
3176
    if self.op.vnc_bind_address is None:
3177
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3178

    
3179
    # this is needed because os.path.join does not accept None arguments
3180
    if self.op.file_storage_dir is None:
3181
      string_file_storage_dir = ""
3182
    else:
3183
      string_file_storage_dir = self.op.file_storage_dir
3184

    
3185
    # build the full file storage dir path
3186
    file_storage_dir = os.path.normpath(os.path.join(
3187
                                        self.sstore.GetFileStorageDir(),
3188
                                        string_file_storage_dir, instance))
3189

    
3190

    
3191
    disks = _GenerateDiskTemplate(self.cfg,
3192
                                  self.op.disk_template,
3193
                                  instance, pnode_name,
3194
                                  self.secondaries, self.op.disk_size,
3195
                                  self.op.swap_size,
3196
                                  file_storage_dir,
3197
                                  self.op.file_driver)
3198

    
3199
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3200
                            primary_node=pnode_name,
3201
                            memory=self.op.mem_size,
3202
                            vcpus=self.op.vcpus,
3203
                            nics=[nic], disks=disks,
3204
                            disk_template=self.op.disk_template,
3205
                            status=self.instance_status,
3206
                            network_port=network_port,
3207
                            kernel_path=self.op.kernel_path,
3208
                            initrd_path=self.op.initrd_path,
3209
                            hvm_boot_order=self.op.hvm_boot_order,
3210
                            hvm_acpi=self.op.hvm_acpi,
3211
                            hvm_pae=self.op.hvm_pae,
3212
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3213
                            vnc_bind_address=self.op.vnc_bind_address,
3214
                            )
3215

    
3216
    feedback_fn("* creating instance disks...")
3217
    if not _CreateDisks(self.cfg, iobj):
3218
      _RemoveDisks(iobj, self.cfg)
3219
      raise errors.OpExecError("Device creation failed, reverting...")
3220

    
3221
    feedback_fn("adding instance %s to cluster config" % instance)
3222

    
3223
    self.cfg.AddInstance(iobj)
3224

    
3225
    if self.op.wait_for_sync:
3226
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3227
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3228
      # make sure the disks are not degraded (still sync-ing is ok)
3229
      time.sleep(15)
3230
      feedback_fn("* checking mirrors status")
3231
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3232
    else:
3233
      disk_abort = False
3234

    
3235
    if disk_abort:
3236
      _RemoveDisks(iobj, self.cfg)
3237
      self.cfg.RemoveInstance(iobj.name)
3238
      raise errors.OpExecError("There are some degraded disks for"
3239
                               " this instance")
3240

    
3241
    feedback_fn("creating os for instance %s on node %s" %
3242
                (instance, pnode_name))
3243

    
3244
    if iobj.disk_template != constants.DT_DISKLESS:
3245
      if self.op.mode == constants.INSTANCE_CREATE:
3246
        feedback_fn("* running the instance OS create scripts...")
3247
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3248
          raise errors.OpExecError("could not add os for instance %s"
3249
                                   " on node %s" %
3250
                                   (instance, pnode_name))
3251

    
3252
      elif self.op.mode == constants.INSTANCE_IMPORT:
3253
        feedback_fn("* running the instance OS import scripts...")
3254
        src_node = self.op.src_node
3255
        src_image = self.src_image
3256
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3257
                                                src_node, src_image):
3258
          raise errors.OpExecError("Could not import os for instance"
3259
                                   " %s on node %s" %
3260
                                   (instance, pnode_name))
3261
      else:
3262
        # also checked in the prereq part
3263
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3264
                                     % self.op.mode)
3265

    
3266
    if self.op.start:
3267
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3268
      feedback_fn("* starting instance...")
3269
      if not rpc.call_instance_start(pnode_name, iobj, None):
3270
        raise errors.OpExecError("Could not start instance")
3271

    
3272

    
3273
class LUConnectConsole(NoHooksLU):
3274
  """Connect to an instance's console.
3275

3276
  This is somewhat special in that it returns the command line that
3277
  you need to run on the master node in order to connect to the
3278
  console.
3279

3280
  """
3281
  _OP_REQP = ["instance_name"]
3282

    
3283
  def CheckPrereq(self):
3284
    """Check prerequisites.
3285

3286
    This checks that the instance is in the cluster.
3287

3288
    """
3289
    instance = self.cfg.GetInstanceInfo(
3290
      self.cfg.ExpandInstanceName(self.op.instance_name))
3291
    if instance is None:
3292
      raise errors.OpPrereqError("Instance '%s' not known" %
3293
                                 self.op.instance_name)
3294
    self.instance = instance
3295

    
3296
  def Exec(self, feedback_fn):
3297
    """Connect to the console of an instance
3298

3299
    """
3300
    instance = self.instance
3301
    node = instance.primary_node
3302

    
3303
    node_insts = rpc.call_instance_list([node])[node]
3304
    if node_insts is False:
3305
      raise errors.OpExecError("Can't connect to node %s." % node)
3306

    
3307
    if instance.name not in node_insts:
3308
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3309

    
3310
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3311

    
3312
    hyper = hypervisor.GetHypervisor()
3313
    console_cmd = hyper.GetShellCommandForConsole(instance)
3314

    
3315
    # build ssh cmdline
3316
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3317

    
3318

    
3319
class LUReplaceDisks(LogicalUnit):
3320
  """Replace the disks of an instance.
3321

3322
  """
3323
  HPATH = "mirrors-replace"
3324
  HTYPE = constants.HTYPE_INSTANCE
3325
  _OP_REQP = ["instance_name", "mode", "disks"]
3326

    
3327
  def _RunAllocator(self):
3328
    """Compute a new secondary node using an IAllocator.
3329

3330
    """
3331
    ial = IAllocator(self.cfg, self.sstore,
3332
                     mode=constants.IALLOCATOR_MODE_RELOC,
3333
                     name=self.op.instance_name,
3334
                     relocate_from=[self.sec_node])
3335

    
3336
    ial.Run(self.op.iallocator)
3337

    
3338
    if not ial.success:
3339
      raise errors.OpPrereqError("Can't compute nodes using"
3340
                                 " iallocator '%s': %s" % (self.op.iallocator,
3341
                                                           ial.info))
3342
    if len(ial.nodes) != ial.required_nodes:
3343
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3344
                                 " of nodes (%s), required %s" %
3345
                                 (len(ial.nodes), ial.required_nodes))
3346
    self.op.remote_node = ial.nodes[0]
3347
    logger.ToStdout("Selected new secondary for the instance: %s" %
3348
                    self.op.remote_node)
3349

    
3350
  def BuildHooksEnv(self):
3351
    """Build hooks env.
3352

3353
    This runs on the master, the primary and all the secondaries.
3354

3355
    """
3356
    env = {
3357
      "MODE": self.op.mode,
3358
      "NEW_SECONDARY": self.op.remote_node,
3359
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3360
      }
3361
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3362
    nl = [
3363
      self.sstore.GetMasterNode(),
3364
      self.instance.primary_node,
3365
      ]
3366
    if self.op.remote_node is not None:
3367
      nl.append(self.op.remote_node)
3368
    return env, nl, nl
3369

    
3370
  def CheckPrereq(self):
3371
    """Check prerequisites.
3372

3373
    This checks that the instance is in the cluster.
3374

3375
    """
3376
    if not hasattr(self.op, "remote_node"):
3377
      self.op.remote_node = None
3378

    
3379
    instance = self.cfg.GetInstanceInfo(
3380
      self.cfg.ExpandInstanceName(self.op.instance_name))
3381
    if instance is None:
3382
      raise errors.OpPrereqError("Instance '%s' not known" %
3383
                                 self.op.instance_name)
3384
    self.instance = instance
3385
    self.op.instance_name = instance.name
3386

    
3387
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3388
      raise errors.OpPrereqError("Instance's disk layout is not"
3389
                                 " network mirrored.")
3390

    
3391
    if len(instance.secondary_nodes) != 1:
3392
      raise errors.OpPrereqError("The instance has a strange layout,"
3393
                                 " expected one secondary but found %d" %
3394
                                 len(instance.secondary_nodes))
3395

    
3396
    self.sec_node = instance.secondary_nodes[0]
3397

    
3398
    ia_name = getattr(self.op, "iallocator", None)
3399
    if ia_name is not None:
3400
      if self.op.remote_node is not None:
3401
        raise errors.OpPrereqError("Give either the iallocator or the new"
3402
                                   " secondary, not both")
3403
      self.op.remote_node = self._RunAllocator()
3404

    
3405
    remote_node = self.op.remote_node
3406
    if remote_node is not None:
3407
      remote_node = self.cfg.ExpandNodeName(remote_node)
3408
      if remote_node is None:
3409
        raise errors.OpPrereqError("Node '%s' not known" %
3410
                                   self.op.remote_node)
3411
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3412
    else:
3413
      self.remote_node_info = None
3414
    if remote_node == instance.primary_node:
3415
      raise errors.OpPrereqError("The specified node is the primary node of"
3416
                                 " the instance.")
3417
    elif remote_node == self.sec_node:
3418
      if self.op.mode == constants.REPLACE_DISK_SEC:
3419
        # this is for DRBD8, where we can't execute the same mode of
3420
        # replacement as for drbd7 (no different port allocated)
3421
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3422
                                   " replacement")
3423
    if instance.disk_template == constants.DT_DRBD8:
3424
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3425
          remote_node is not None):
3426
        # switch to replace secondary mode
3427
        self.op.mode = constants.REPLACE_DISK_SEC
3428

    
3429
      if self.op.mode == constants.REPLACE_DISK_ALL:
3430
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3431
                                   " secondary disk replacement, not"
3432
                                   " both at once")
3433
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3434
        if remote_node is not None:
3435
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3436
                                     " the secondary while doing a primary"
3437
                                     " node disk replacement")
3438
        self.tgt_node = instance.primary_node
3439
        self.oth_node = instance.secondary_nodes[0]
3440
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3441
        self.new_node = remote_node # this can be None, in which case
3442
                                    # we don't change the secondary
3443
        self.tgt_node = instance.secondary_nodes[0]
3444
        self.oth_node = instance.primary_node
3445
      else:
3446
        raise errors.ProgrammerError("Unhandled disk replace mode")
3447

    
3448
    for name in self.op.disks:
3449
      if instance.FindDisk(name) is None:
3450
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3451
                                   (name, instance.name))
3452
    self.op.remote_node = remote_node
3453

    
3454
  def _ExecD8DiskOnly(self, feedback_fn):
3455
    """Replace a disk on the primary or secondary for dbrd8.
3456

3457
    The algorithm for replace is quite complicated:
3458
      - for each disk to be replaced:
3459
        - create new LVs on the target node with unique names
3460
        - detach old LVs from the drbd device
3461
        - rename old LVs to name_replaced.<time_t>
3462
        - rename new LVs to old LVs
3463
        - attach the new LVs (with the old names now) to the drbd device
3464
      - wait for sync across all devices
3465
      - for each modified disk:
3466
        - remove old LVs (which have the name name_replaces.<time_t>)
3467

3468
    Failures are not very well handled.
3469

3470
    """
3471
    steps_total = 6
3472
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3473
    instance = self.instance
3474
    iv_names = {}
3475
    vgname = self.cfg.GetVGName()
3476
    # start of work
3477
    cfg = self.cfg
3478
    tgt_node = self.tgt_node
3479
    oth_node = self.oth_node
3480

    
3481
    # Step: check device activation
3482
    self.proc.LogStep(1, steps_total, "check device existence")
3483
    info("checking volume groups")
3484
    my_vg = cfg.GetVGName()
3485
    results = rpc.call_vg_list([oth_node, tgt_node])
3486
    if not results:
3487
      raise errors.OpExecError("Can't list volume groups on the nodes")
3488
    for node in oth_node, tgt_node:
3489
      res = results.get(node, False)
3490
      if not res or my_vg not in res:
3491
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3492
                                 (my_vg, node))
3493
    for dev in instance.disks:
3494
      if not dev.iv_name in self.op.disks:
3495
        continue
3496
      for node in tgt_node, oth_node:
3497
        info("checking %s on %s" % (dev.iv_name, node))
3498
        cfg.SetDiskID(dev, node)
3499
        if not rpc.call_blockdev_find(node, dev):
3500
          raise errors.OpExecError("Can't find device %s on node %s" %
3501
                                   (dev.iv_name, node))
3502

    
3503
    # Step: check other node consistency
3504
    self.proc.LogStep(2, steps_total, "check peer consistency")
3505
    for dev in instance.disks:
3506
      if not dev.iv_name in self.op.disks:
3507
        continue
3508
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3509
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3510
                                   oth_node==instance.primary_node):
3511
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3512
                                 " to replace disks on this node (%s)" %
3513
                                 (oth_node, tgt_node))
3514

    
3515
    # Step: create new storage
3516
    self.proc.LogStep(3, steps_total, "allocate new storage")
3517
    for dev in instance.disks:
3518
      if not dev.iv_name in self.op.disks:
3519
        continue
3520
      size = dev.size
3521
      cfg.SetDiskID(dev, tgt_node)
3522
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3523
      names = _GenerateUniqueNames(cfg, lv_names)
3524
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3525
                             logical_id=(vgname, names[0]))
3526
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3527
                             logical_id=(vgname, names[1]))
3528
      new_lvs = [lv_data, lv_meta]
3529
      old_lvs = dev.children
3530
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3531
      info("creating new local storage on %s for %s" %
3532
           (tgt_node, dev.iv_name))
3533
      # since we *always* want to create this LV, we use the
3534
      # _Create...OnPrimary (which forces the creation), even if we
3535
      # are talking about the secondary node
3536
      for new_lv in new_lvs:
3537
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3538
                                        _GetInstanceInfoText(instance)):
3539
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3540
                                   " node '%s'" %
3541
                                   (new_lv.logical_id[1], tgt_node))
3542

    
3543
    # Step: for each lv, detach+rename*2+attach
3544
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3545
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3546
      info("detaching %s drbd from local storage" % dev.iv_name)
3547
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3548
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3549
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3550
      #dev.children = []
3551
      #cfg.Update(instance)
3552

    
3553
      # ok, we created the new LVs, so now we know we have the needed
3554
      # storage; as such, we proceed on the target node to rename
3555
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3556
      # using the assumption that logical_id == physical_id (which in
3557
      # turn is the unique_id on that node)
3558

    
3559
      # FIXME(iustin): use a better name for the replaced LVs
3560
      temp_suffix = int(time.time())
3561
      ren_fn = lambda d, suff: (d.physical_id[0],
3562
                                d.physical_id[1] + "_replaced-%s" % suff)
3563
      # build the rename list based on what LVs exist on the node
3564
      rlist = []
3565
      for to_ren in old_lvs:
3566
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3567
        if find_res is not None: # device exists
3568
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3569

    
3570
      info("renaming the old LVs on the target node")
3571
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3572
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3573
      # now we rename the new LVs to the old LVs
3574
      info("renaming the new LVs on the target node")
3575
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3576
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3577
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3578

    
3579
      for old, new in zip(old_lvs, new_lvs):
3580
        new.logical_id = old.logical_id
3581
        cfg.SetDiskID(new, tgt_node)
3582

    
3583
      for disk in old_lvs:
3584
        disk.logical_id = ren_fn(disk, temp_suffix)
3585
        cfg.SetDiskID(disk, tgt_node)
3586

    
3587
      # now that the new lvs have the old name, we can add them to the device
3588
      info("adding new mirror component on %s" % tgt_node)
3589
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3590
        for new_lv in new_lvs:
3591
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3592
            warning("Can't rollback device %s", hint="manually cleanup unused"
3593
                    " logical volumes")
3594
        raise errors.OpExecError("Can't add local storage to drbd")
3595

    
3596
      dev.children = new_lvs
3597
      cfg.Update(instance)
3598

    
3599
    # Step: wait for sync
3600

    
3601
    # this can fail as the old devices are degraded and _WaitForSync
3602
    # does a combined result over all disks, so we don't check its
3603
    # return value
3604
    self.proc.LogStep(5, steps_total, "sync devices")
3605
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3606

    
3607
    # so check manually all the devices
3608
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3609
      cfg.SetDiskID(dev, instance.primary_node)
3610
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3611
      if is_degr:
3612
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3613

    
3614
    # Step: remove old storage
3615
    self.proc.LogStep(6, steps_total, "removing old storage")
3616
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3617
      info("remove logical volumes for %s" % name)
3618
      for lv in old_lvs:
3619
        cfg.SetDiskID(lv, tgt_node)
3620
        if not rpc.call_blockdev_remove(tgt_node, lv):
3621
          warning("Can't remove old LV", hint="manually remove unused LVs")
3622
          continue
3623

    
3624
  def _ExecD8Secondary(self, feedback_fn):
3625
    """Replace the secondary node for drbd8.
3626

3627
    The algorithm for replace is quite complicated:
3628
      - for all disks of the instance:
3629
        - create new LVs on the new node with same names
3630
        - shutdown the drbd device on the old secondary
3631
        - disconnect the drbd network on the primary
3632
        - create the drbd device on the new secondary
3633
        - network attach the drbd on the primary, using an artifice:
3634
          the drbd code for Attach() will connect to the network if it
3635
          finds a device which is connected to the good local disks but
3636
          not network enabled
3637
      - wait for sync across all devices
3638
      - remove all disks from the old secondary
3639

3640
    Failures are not very well handled.
3641

3642
    """
3643
    steps_total = 6
3644
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3645
    instance = self.instance
3646
    iv_names = {}
3647
    vgname = self.cfg.GetVGName()
3648
    # start of work
3649
    cfg = self.cfg
3650
    old_node = self.tgt_node
3651
    new_node = self.new_node
3652
    pri_node = instance.primary_node
3653

    
3654
    # Step: check device activation
3655
    self.proc.LogStep(1, steps_total, "check device existence")
3656
    info("checking volume groups")
3657
    my_vg = cfg.GetVGName()
3658
    results = rpc.call_vg_list([pri_node, new_node])
3659
    if not results:
3660
      raise errors.OpExecError("Can't list volume groups on the nodes")
3661
    for node in pri_node, new_node:
3662
      res = results.get(node, False)
3663
      if not res or my_vg not in res:
3664
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3665
                                 (my_vg, node))
3666
    for dev in instance.disks:
3667
      if not dev.iv_name in self.op.disks:
3668
        continue
3669
      info("checking %s on %s" % (dev.iv_name, pri_node))
3670
      cfg.SetDiskID(dev, pri_node)
3671
      if not rpc.call_blockdev_find(pri_node, dev):
3672
        raise errors.OpExecError("Can't find device %s on node %s" %
3673
                                 (dev.iv_name, pri_node))
3674

    
3675
    # Step: check other node consistency
3676
    self.proc.LogStep(2, steps_total, "check peer consistency")
3677
    for dev in instance.disks:
3678
      if not dev.iv_name in self.op.disks:
3679
        continue
3680
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3681
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3682
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3683
                                 " unsafe to replace the secondary" %
3684
                                 pri_node)
3685

    
3686
    # Step: create new storage
3687
    self.proc.LogStep(3, steps_total, "allocate new storage")
3688
    for dev in instance.disks:
3689
      size = dev.size
3690
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3691
      # since we *always* want to create this LV, we use the
3692
      # _Create...OnPrimary (which forces the creation), even if we
3693
      # are talking about the secondary node
3694
      for new_lv in dev.children:
3695
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3696
                                        _GetInstanceInfoText(instance)):
3697
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3698
                                   " node '%s'" %
3699
                                   (new_lv.logical_id[1], new_node))
3700

    
3701
      iv_names[dev.iv_name] = (dev, dev.children)
3702

    
3703
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3704
    for dev in instance.disks:
3705
      size = dev.size
3706
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3707
      # create new devices on new_node
3708
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3709
                              logical_id=(pri_node, new_node,
3710
                                          dev.logical_id[2]),
3711
                              children=dev.children)
3712
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3713
                                        new_drbd, False,
3714
                                      _GetInstanceInfoText(instance)):
3715
        raise errors.OpExecError("Failed to create new DRBD on"
3716
                                 " node '%s'" % new_node)
3717

    
3718
    for dev in instance.disks:
3719
      # we have new devices, shutdown the drbd on the old secondary
3720
      info("shutting down drbd for %s on old node" % dev.iv_name)
3721
      cfg.SetDiskID(dev, old_node)
3722
      if not rpc.call_blockdev_shutdown(old_node, dev):
3723
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3724
                hint="Please cleanup this device manually as soon as possible")
3725

    
3726
    info("detaching primary drbds from the network (=> standalone)")
3727
    done = 0
3728
    for dev in instance.disks:
3729
      cfg.SetDiskID(dev, pri_node)
3730
      # set the physical (unique in bdev terms) id to None, meaning
3731
      # detach from network
3732
      dev.physical_id = (None,) * len(dev.physical_id)
3733
      # and 'find' the device, which will 'fix' it to match the
3734
      # standalone state
3735
      if rpc.call_blockdev_find(pri_node, dev):
3736
        done += 1
3737
      else:
3738
        warning("Failed to detach drbd %s from network, unusual case" %
3739
                dev.iv_name)
3740

    
3741
    if not done:
3742
      # no detaches succeeded (very unlikely)
3743
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3744

    
3745
    # if we managed to detach at least one, we update all the disks of
3746
    # the instance to point to the new secondary
3747
    info("updating instance configuration")
3748
    for dev in instance.disks:
3749
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3750
      cfg.SetDiskID(dev, pri_node)
3751
    cfg.Update(instance)
3752

    
3753
    # and now perform the drbd attach
3754
    info("attaching primary drbds to new secondary (standalone => connected)")
3755
    failures = []
3756
    for dev in instance.disks:
3757
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3758
      # since the attach is smart, it's enough to 'find' the device,
3759
      # it will automatically activate the network, if the physical_id
3760
      # is correct
3761
      cfg.SetDiskID(dev, pri_node)
3762
      if not rpc.call_blockdev_find(pri_node, dev):
3763
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3764
                "please do a gnt-instance info to see the status of disks")
3765

    
3766
    # this can fail as the old devices are degraded and _WaitForSync
3767
    # does a combined result over all disks, so we don't check its
3768
    # return value
3769
    self.proc.LogStep(5, steps_total, "sync devices")
3770
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3771

    
3772
    # so check manually all the devices
3773
    for name, (dev, old_lvs) in iv_names.iteritems():
3774
      cfg.SetDiskID(dev, pri_node)
3775
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3776
      if is_degr:
3777
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3778

    
3779
    self.proc.LogStep(6, steps_total, "removing old storage")
3780
    for name, (dev, old_lvs) in iv_names.iteritems():
3781
      info("remove logical volumes for %s" % name)
3782
      for lv in old_lvs:
3783
        cfg.SetDiskID(lv, old_node)
3784
        if not rpc.call_blockdev_remove(old_node, lv):
3785
          warning("Can't remove LV on old secondary",
3786
                  hint="Cleanup stale volumes by hand")
3787

    
3788
  def Exec(self, feedback_fn):
3789
    """Execute disk replacement.
3790

3791
    This dispatches the disk replacement to the appropriate handler.
3792

3793
    """
3794
    instance = self.instance
3795

    
3796
    # Activate the instance disks if we're replacing them on a down instance
3797
    if instance.status == "down":
3798
      op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3799
      self.proc.ChainOpCode(op)
3800

    
3801
    if instance.disk_template == constants.DT_DRBD8:
3802
      if self.op.remote_node is None:
3803
        fn = self._ExecD8DiskOnly
3804
      else:
3805
        fn = self._ExecD8Secondary
3806
    else:
3807
      raise errors.ProgrammerError("Unhandled disk replacement case")
3808

    
3809
    ret = fn(feedback_fn)
3810

    
3811
    # Deactivate the instance disks if we're replacing them on a down instance
3812
    if instance.status == "down":
3813
      op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3814
      self.proc.ChainOpCode(op)
3815

    
3816
    return ret
3817

    
3818

    
3819
class LUGrowDisk(LogicalUnit):
3820
  """Grow a disk of an instance.
3821

3822
  """
3823
  HPATH = "disk-grow"
3824
  HTYPE = constants.HTYPE_INSTANCE
3825
  _OP_REQP = ["instance_name", "disk", "amount"]
3826

    
3827
  def BuildHooksEnv(self):
3828
    """Build hooks env.
3829

3830
    This runs on the master, the primary and all the secondaries.
3831

3832
    """
3833
    env = {
3834
      "DISK": self.op.disk,
3835
      "AMOUNT": self.op.amount,
3836
      }
3837
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3838
    nl = [
3839
      self.sstore.GetMasterNode(),
3840
      self.instance.primary_node,
3841
      ]
3842
    return env, nl, nl
3843

    
3844
  def CheckPrereq(self):
3845
    """Check prerequisites.
3846

3847
    This checks that the instance is in the cluster.
3848

3849
    """
3850
    instance = self.cfg.GetInstanceInfo(
3851
      self.cfg.ExpandInstanceName(self.op.instance_name))
3852
    if instance is None:
3853
      raise errors.OpPrereqError("Instance '%s' not known" %
3854
                                 self.op.instance_name)
3855
    self.instance = instance
3856
    self.op.instance_name = instance.name
3857

    
3858
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3859
      raise errors.OpPrereqError("Instance's disk layout does not support"
3860
                                 " growing.")
3861

    
3862
    if instance.FindDisk(self.op.disk) is None:
3863
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3864
                                 (self.op.disk, instance.name))
3865

    
3866
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3867
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3868
    for node in nodenames:
3869
      info = nodeinfo.get(node, None)
3870
      if not info:
3871
        raise errors.OpPrereqError("Cannot get current information"
3872
                                   " from node '%s'" % node)
3873
      vg_free = info.get('vg_free', None)
3874
      if not isinstance(vg_free, int):
3875
        raise errors.OpPrereqError("Can't compute free disk space on"
3876
                                   " node %s" % node)
3877
      if self.op.amount > info['vg_free']:
3878
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
3879
                                   " %d MiB available, %d MiB required" %
3880
                                   (node, info['vg_free'], self.op.amount))
3881

    
3882
  def Exec(self, feedback_fn):
3883
    """Execute disk grow.
3884

3885
    """
3886
    instance = self.instance
3887
    disk = instance.FindDisk(self.op.disk)
3888
    for node in (instance.secondary_nodes + (instance.primary_node,)):
3889
      self.cfg.SetDiskID(disk, node)
3890
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3891
      if not result or not isinstance(result, tuple) or len(result) != 2:
3892
        raise errors.OpExecError("grow request failed to node %s" % node)
3893
      elif not result[0]:
3894
        raise errors.OpExecError("grow request failed to node %s: %s" %
3895
                                 (node, result[1]))
3896
    disk.RecordGrow(self.op.amount)
3897
    self.cfg.Update(instance)
3898
    return
3899

    
3900

    
3901
class LUQueryInstanceData(NoHooksLU):
3902
  """Query runtime instance data.
3903

3904
  """
3905
  _OP_REQP = ["instances"]
3906

    
3907
  def CheckPrereq(self):
3908
    """Check prerequisites.
3909

3910
    This only checks the optional instance list against the existing names.
3911

3912
    """
3913
    if not isinstance(self.op.instances, list):
3914
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3915
    if self.op.instances:
3916
      self.wanted_instances = []
3917
      names = self.op.instances
3918
      for name in names:
3919
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3920
        if instance is None:
3921
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3922
        self.wanted_instances.append(instance)
3923
    else:
3924
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3925
                               in self.cfg.GetInstanceList()]
3926
    return
3927

    
3928

    
3929
  def _ComputeDiskStatus(self, instance, snode, dev):
3930
    """Compute block device status.
3931

3932
    """
3933
    self.cfg.SetDiskID(dev, instance.primary_node)
3934
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3935
    if dev.dev_type in constants.LDS_DRBD:
3936
      # we change the snode then (otherwise we use the one passed in)
3937
      if dev.logical_id[0] == instance.primary_node:
3938
        snode = dev.logical_id[1]
3939
      else:
3940
        snode = dev.logical_id[0]
3941

    
3942
    if snode:
3943
      self.cfg.SetDiskID(dev, snode)
3944
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3945
    else:
3946
      dev_sstatus = None
3947

    
3948
    if dev.children:
3949
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3950
                      for child in dev.children]
3951
    else:
3952
      dev_children = []
3953

    
3954
    data = {
3955
      "iv_name": dev.iv_name,
3956
      "dev_type": dev.dev_type,
3957
      "logical_id": dev.logical_id,
3958
      "physical_id": dev.physical_id,
3959
      "pstatus": dev_pstatus,
3960
      "sstatus": dev_sstatus,
3961
      "children": dev_children,
3962
      }
3963

    
3964
    return data
3965

    
3966
  def Exec(self, feedback_fn):
3967
    """Gather and return data"""
3968
    result = {}
3969
    for instance in self.wanted_instances:
3970
      remote_info = rpc.call_instance_info(instance.primary_node,
3971
                                                instance.name)
3972
      if remote_info and "state" in remote_info:
3973
        remote_state = "up"
3974
      else:
3975
        remote_state = "down"
3976
      if instance.status == "down":
3977
        config_state = "down"
3978
      else:
3979
        config_state = "up"
3980

    
3981
      disks = [self._ComputeDiskStatus(instance, None, device)
3982
               for device in instance.disks]
3983

    
3984
      idict = {
3985
        "name": instance.name,
3986
        "config_state": config_state,
3987