Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ b3989551

History | View | Annotate | Download (169.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement CheckPrereq which also fills in the opcode instance
53
      with all the fields (even if as None)
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_MASTER: the LU needs to run on the master node
59
        REQ_WSSTORE: the LU needs a writable SimpleStore
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_WSSTORE = False
69

    
70
  def __init__(self, processor, op, cfg, sstore):
71
    """Constructor for LogicalUnit.
72

73
    This needs to be overriden in derived classes in order to check op
74
    validity.
75

76
    """
77
    self.proc = processor
78
    self.op = op
79
    self.cfg = cfg
80
    self.sstore = sstore
81
    self.__ssh = None
82

    
83
    for attr_name in self._OP_REQP:
84
      attr_val = getattr(op, attr_name, None)
85
      if attr_val is None:
86
        raise errors.OpPrereqError("Required parameter '%s' missing" %
87
                                   attr_name)
88

    
89
    if not cfg.IsCluster():
90
      raise errors.OpPrereqError("Cluster not initialized yet,"
91
                                 " use 'gnt-cluster init' first.")
92
    if self.REQ_MASTER:
93
      master = sstore.GetMasterNode()
94
      if master != utils.HostInfo().name:
95
        raise errors.OpPrereqError("Commands must be run on the master"
96
                                   " node %s" % master)
97

    
98
  def __GetSSH(self):
99
    """Returns the SshRunner object
100

101
    """
102
    if not self.__ssh:
103
      self.__ssh = ssh.SshRunner(self.sstore)
104
    return self.__ssh
105

    
106
  ssh = property(fget=__GetSSH)
107

    
108
  def CheckPrereq(self):
109
    """Check prerequisites for this LU.
110

111
    This method should check that the prerequisites for the execution
112
    of this LU are fulfilled. It can do internode communication, but
113
    it should be idempotent - no cluster or system changes are
114
    allowed.
115

116
    The method should raise errors.OpPrereqError in case something is
117
    not fulfilled. Its return value is ignored.
118

119
    This method should also update all the parameters of the opcode to
120
    their canonical form; e.g. a short node name must be fully
121
    expanded after this method has successfully completed (so that
122
    hooks, logging, etc. work correctly).
123

124
    """
125
    raise NotImplementedError
126

    
127
  def Exec(self, feedback_fn):
128
    """Execute the LU.
129

130
    This method should implement the actual work. It should raise
131
    errors.OpExecError for failures that are somewhat dealt with in
132
    code, or expected.
133

134
    """
135
    raise NotImplementedError
136

    
137
  def BuildHooksEnv(self):
138
    """Build hooks environment for this LU.
139

140
    This method should return a three-node tuple consisting of: a dict
141
    containing the environment that will be used for running the
142
    specific hook for this LU, a list of node names on which the hook
143
    should run before the execution, and a list of node names on which
144
    the hook should run after the execution.
145

146
    The keys of the dict must not have 'GANETI_' prefixed as this will
147
    be handled in the hooks runner. Also note additional keys will be
148
    added by the hooks runner. If the LU doesn't define any
149
    environment, an empty dict (and not None) should be returned.
150

151
    No nodes should be returned as an empty list (and not None).
152

153
    Note that if the HPATH for a LU class is None, this function will
154
    not be called.
155

156
    """
157
    raise NotImplementedError
158

    
159
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
160
    """Notify the LU about the results of its hooks.
161

162
    This method is called every time a hooks phase is executed, and notifies
163
    the Logical Unit about the hooks' result. The LU can then use it to alter
164
    its result based on the hooks.  By default the method does nothing and the
165
    previous result is passed back unchanged but any LU can define it if it
166
    wants to use the local cluster hook-scripts somehow.
167

168
    Args:
169
      phase: the hooks phase that has just been run
170
      hooks_results: the results of the multi-node hooks rpc call
171
      feedback_fn: function to send feedback back to the caller
172
      lu_result: the previous result this LU had, or None in the PRE phase.
173

174
    """
175
    return lu_result
176

    
177

    
178
class NoHooksLU(LogicalUnit):
179
  """Simple LU which runs no hooks.
180

181
  This LU is intended as a parent for other LogicalUnits which will
182
  run no hooks, in order to reduce duplicate code.
183

184
  """
185
  HPATH = None
186
  HTYPE = None
187

    
188

    
189
def _GetWantedNodes(lu, nodes):
190
  """Returns list of checked and expanded node names.
191

192
  Args:
193
    nodes: List of nodes (strings) or None for all
194

195
  """
196
  if not isinstance(nodes, list):
197
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
198

    
199
  if nodes:
200
    wanted = []
201

    
202
    for name in nodes:
203
      node = lu.cfg.ExpandNodeName(name)
204
      if node is None:
205
        raise errors.OpPrereqError("No such node name '%s'" % name)
206
      wanted.append(node)
207

    
208
  else:
209
    wanted = lu.cfg.GetNodeList()
210
  return utils.NiceSort(wanted)
211

    
212

    
213
def _GetWantedInstances(lu, instances):
214
  """Returns list of checked and expanded instance names.
215

216
  Args:
217
    instances: List of instances (strings) or None for all
218

219
  """
220
  if not isinstance(instances, list):
221
    raise errors.OpPrereqError("Invalid argument type 'instances'")
222

    
223
  if instances:
224
    wanted = []
225

    
226
    for name in instances:
227
      instance = lu.cfg.ExpandInstanceName(name)
228
      if instance is None:
229
        raise errors.OpPrereqError("No such instance name '%s'" % name)
230
      wanted.append(instance)
231

    
232
  else:
233
    wanted = lu.cfg.GetInstanceList()
234
  return utils.NiceSort(wanted)
235

    
236

    
237
def _CheckOutputFields(static, dynamic, selected):
238
  """Checks whether all selected fields are valid.
239

240
  Args:
241
    static: Static fields
242
    dynamic: Dynamic fields
243

244
  """
245
  static_fields = frozenset(static)
246
  dynamic_fields = frozenset(dynamic)
247

    
248
  all_fields = static_fields | dynamic_fields
249

    
250
  if not all_fields.issuperset(selected):
251
    raise errors.OpPrereqError("Unknown output fields selected: %s"
252
                               % ",".join(frozenset(selected).
253
                                          difference(all_fields)))
254

    
255

    
256
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
257
                          memory, vcpus, nics):
258
  """Builds instance related env variables for hooks from single variables.
259

260
  Args:
261
    secondary_nodes: List of secondary nodes as strings
262
  """
263
  env = {
264
    "OP_TARGET": name,
265
    "INSTANCE_NAME": name,
266
    "INSTANCE_PRIMARY": primary_node,
267
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
268
    "INSTANCE_OS_TYPE": os_type,
269
    "INSTANCE_STATUS": status,
270
    "INSTANCE_MEMORY": memory,
271
    "INSTANCE_VCPUS": vcpus,
272
  }
273

    
274
  if nics:
275
    nic_count = len(nics)
276
    for idx, (ip, bridge, mac) in enumerate(nics):
277
      if ip is None:
278
        ip = ""
279
      env["INSTANCE_NIC%d_IP" % idx] = ip
280
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
281
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
282
  else:
283
    nic_count = 0
284

    
285
  env["INSTANCE_NIC_COUNT"] = nic_count
286

    
287
  return env
288

    
289

    
290
def _BuildInstanceHookEnvByObject(instance, override=None):
291
  """Builds instance related env variables for hooks from an object.
292

293
  Args:
294
    instance: objects.Instance object of instance
295
    override: dict of values to override
296
  """
297
  args = {
298
    'name': instance.name,
299
    'primary_node': instance.primary_node,
300
    'secondary_nodes': instance.secondary_nodes,
301
    'os_type': instance.os,
302
    'status': instance.os,
303
    'memory': instance.memory,
304
    'vcpus': instance.vcpus,
305
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
306
  }
307
  if override:
308
    args.update(override)
309
  return _BuildInstanceHookEnv(**args)
310

    
311

    
312
def _CheckInstanceBridgesExist(instance):
313
  """Check that the brigdes needed by an instance exist.
314

315
  """
316
  # check bridges existance
317
  brlist = [nic.bridge for nic in instance.nics]
318
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
319
    raise errors.OpPrereqError("one or more target bridges %s does not"
320
                               " exist on destination node '%s'" %
321
                               (brlist, instance.primary_node))
322

    
323

    
324
class LUDestroyCluster(NoHooksLU):
325
  """Logical unit for destroying the cluster.
326

327
  """
328
  _OP_REQP = []
329

    
330
  def CheckPrereq(self):
331
    """Check prerequisites.
332

333
    This checks whether the cluster is empty.
334

335
    Any errors are signalled by raising errors.OpPrereqError.
336

337
    """
338
    master = self.sstore.GetMasterNode()
339

    
340
    nodelist = self.cfg.GetNodeList()
341
    if len(nodelist) != 1 or nodelist[0] != master:
342
      raise errors.OpPrereqError("There are still %d node(s) in"
343
                                 " this cluster." % (len(nodelist) - 1))
344
    instancelist = self.cfg.GetInstanceList()
345
    if instancelist:
346
      raise errors.OpPrereqError("There are still %d instance(s) in"
347
                                 " this cluster." % len(instancelist))
348

    
349
  def Exec(self, feedback_fn):
350
    """Destroys the cluster.
351

352
    """
353
    master = self.sstore.GetMasterNode()
354
    if not rpc.call_node_stop_master(master):
355
      raise errors.OpExecError("Could not disable the master role")
356
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
357
    utils.CreateBackup(priv_key)
358
    utils.CreateBackup(pub_key)
359
    rpc.call_node_leave_cluster(master)
360

    
361

    
362
class LUVerifyCluster(LogicalUnit):
363
  """Verifies the cluster status.
364

365
  """
366
  HPATH = "cluster-verify"
367
  HTYPE = constants.HTYPE_CLUSTER
368
  _OP_REQP = ["skip_checks"]
369

    
370
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
371
                  remote_version, feedback_fn):
372
    """Run multiple tests against a node.
373

374
    Test list:
375
      - compares ganeti version
376
      - checks vg existance and size > 20G
377
      - checks config file checksum
378
      - checks ssh to other nodes
379

380
    Args:
381
      node: name of the node to check
382
      file_list: required list of files
383
      local_cksum: dictionary of local files and their checksums
384

385
    """
386
    # compares ganeti version
387
    local_version = constants.PROTOCOL_VERSION
388
    if not remote_version:
389
      feedback_fn("  - ERROR: connection to %s failed" % (node))
390
      return True
391

    
392
    if local_version != remote_version:
393
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
394
                      (local_version, node, remote_version))
395
      return True
396

    
397
    # checks vg existance and size > 20G
398

    
399
    bad = False
400
    if not vglist:
401
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
402
                      (node,))
403
      bad = True
404
    else:
405
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
406
                                            constants.MIN_VG_SIZE)
407
      if vgstatus:
408
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
409
        bad = True
410

    
411
    # checks config file checksum
412
    # checks ssh to any
413

    
414
    if 'filelist' not in node_result:
415
      bad = True
416
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
417
    else:
418
      remote_cksum = node_result['filelist']
419
      for file_name in file_list:
420
        if file_name not in remote_cksum:
421
          bad = True
422
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
423
        elif remote_cksum[file_name] != local_cksum[file_name]:
424
          bad = True
425
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
426

    
427
    if 'nodelist' not in node_result:
428
      bad = True
429
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
430
    else:
431
      if node_result['nodelist']:
432
        bad = True
433
        for node in node_result['nodelist']:
434
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
435
                          (node, node_result['nodelist'][node]))
436
    if 'node-net-test' not in node_result:
437
      bad = True
438
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
439
    else:
440
      if node_result['node-net-test']:
441
        bad = True
442
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
443
        for node in nlist:
444
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
445
                          (node, node_result['node-net-test'][node]))
446

    
447
    hyp_result = node_result.get('hypervisor', None)
448
    if hyp_result is not None:
449
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
450
    return bad
451

    
452
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
453
                      node_instance, feedback_fn):
454
    """Verify an instance.
455

456
    This function checks to see if the required block devices are
457
    available on the instance's node.
458

459
    """
460
    bad = False
461

    
462
    node_current = instanceconfig.primary_node
463

    
464
    node_vol_should = {}
465
    instanceconfig.MapLVsByNode(node_vol_should)
466

    
467
    for node in node_vol_should:
468
      for volume in node_vol_should[node]:
469
        if node not in node_vol_is or volume not in node_vol_is[node]:
470
          feedback_fn("  - ERROR: volume %s missing on node %s" %
471
                          (volume, node))
472
          bad = True
473

    
474
    if not instanceconfig.status == 'down':
475
      if (node_current not in node_instance or
476
          not instance in node_instance[node_current]):
477
        feedback_fn("  - ERROR: instance %s not running on node %s" %
478
                        (instance, node_current))
479
        bad = True
480

    
481
    for node in node_instance:
482
      if (not node == node_current):
483
        if instance in node_instance[node]:
484
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
485
                          (instance, node))
486
          bad = True
487

    
488
    return bad
489

    
490
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
491
    """Verify if there are any unknown volumes in the cluster.
492

493
    The .os, .swap and backup volumes are ignored. All other volumes are
494
    reported as unknown.
495

496
    """
497
    bad = False
498

    
499
    for node in node_vol_is:
500
      for volume in node_vol_is[node]:
501
        if node not in node_vol_should or volume not in node_vol_should[node]:
502
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
503
                      (volume, node))
504
          bad = True
505
    return bad
506

    
507
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
508
    """Verify the list of running instances.
509

510
    This checks what instances are running but unknown to the cluster.
511

512
    """
513
    bad = False
514
    for node in node_instance:
515
      for runninginstance in node_instance[node]:
516
        if runninginstance not in instancelist:
517
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
518
                          (runninginstance, node))
519
          bad = True
520
    return bad
521

    
522
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
523
    """Verify N+1 Memory Resilience.
524

525
    Check that if one single node dies we can still start all the instances it
526
    was primary for.
527

528
    """
529
    bad = False
530

    
531
    for node, nodeinfo in node_info.iteritems():
532
      # This code checks that every node which is now listed as secondary has
533
      # enough memory to host all instances it is supposed to should a single
534
      # other node in the cluster fail.
535
      # FIXME: not ready for failover to an arbitrary node
536
      # FIXME: does not support file-backed instances
537
      # WARNING: we currently take into account down instances as well as up
538
      # ones, considering that even if they're down someone might want to start
539
      # them even in the event of a node failure.
540
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
541
        needed_mem = 0
542
        for instance in instances:
543
          needed_mem += instance_cfg[instance].memory
544
        if nodeinfo['mfree'] < needed_mem:
545
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
546
                      " failovers should node %s fail" % (node, prinode))
547
          bad = True
548
    return bad
549

    
550
  def CheckPrereq(self):
551
    """Check prerequisites.
552

553
    Transform the list of checks we're going to skip into a set and check that
554
    all its members are valid.
555

556
    """
557
    self.skip_set = frozenset(self.op.skip_checks)
558
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
559
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
560

    
561
  def BuildHooksEnv(self):
562
    """Build hooks env.
563

564
    Cluster-Verify hooks just rone in the post phase and their failure makes
565
    the output be logged in the verify output and the verification to fail.
566

567
    """
568
    all_nodes = self.cfg.GetNodeList()
569
    # TODO: populate the environment with useful information for verify hooks
570
    env = {}
571
    return env, [], all_nodes
572

    
573
  def Exec(self, feedback_fn):
574
    """Verify integrity of cluster, performing various test on nodes.
575

576
    """
577
    bad = False
578
    feedback_fn("* Verifying global settings")
579
    for msg in self.cfg.VerifyConfig():
580
      feedback_fn("  - ERROR: %s" % msg)
581

    
582
    vg_name = self.cfg.GetVGName()
583
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
584
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
585
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
586
    i_non_redundant = [] # Non redundant instances
587
    node_volume = {}
588
    node_instance = {}
589
    node_info = {}
590
    instance_cfg = {}
591

    
592
    # FIXME: verify OS list
593
    # do local checksums
594
    file_names = list(self.sstore.GetFileList())
595
    file_names.append(constants.SSL_CERT_FILE)
596
    file_names.append(constants.CLUSTER_CONF_FILE)
597
    local_checksums = utils.FingerprintFiles(file_names)
598

    
599
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
600
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
601
    all_instanceinfo = rpc.call_instance_list(nodelist)
602
    all_vglist = rpc.call_vg_list(nodelist)
603
    node_verify_param = {
604
      'filelist': file_names,
605
      'nodelist': nodelist,
606
      'hypervisor': None,
607
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
608
                        for node in nodeinfo]
609
      }
610
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
611
    all_rversion = rpc.call_version(nodelist)
612
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
613

    
614
    for node in nodelist:
615
      feedback_fn("* Verifying node %s" % node)
616
      result = self._VerifyNode(node, file_names, local_checksums,
617
                                all_vglist[node], all_nvinfo[node],
618
                                all_rversion[node], feedback_fn)
619
      bad = bad or result
620

    
621
      # node_volume
622
      volumeinfo = all_volumeinfo[node]
623

    
624
      if isinstance(volumeinfo, basestring):
625
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
626
                    (node, volumeinfo[-400:].encode('string_escape')))
627
        bad = True
628
        node_volume[node] = {}
629
      elif not isinstance(volumeinfo, dict):
630
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
631
        bad = True
632
        continue
633
      else:
634
        node_volume[node] = volumeinfo
635

    
636
      # node_instance
637
      nodeinstance = all_instanceinfo[node]
638
      if type(nodeinstance) != list:
639
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
640
        bad = True
641
        continue
642

    
643
      node_instance[node] = nodeinstance
644

    
645
      # node_info
646
      nodeinfo = all_ninfo[node]
647
      if not isinstance(nodeinfo, dict):
648
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
649
        bad = True
650
        continue
651

    
652
      try:
653
        node_info[node] = {
654
          "mfree": int(nodeinfo['memory_free']),
655
          "dfree": int(nodeinfo['vg_free']),
656
          "pinst": [],
657
          "sinst": [],
658
          # dictionary holding all instances this node is secondary for,
659
          # grouped by their primary node. Each key is a cluster node, and each
660
          # value is a list of instances which have the key as primary and the
661
          # current node as secondary.  this is handy to calculate N+1 memory
662
          # availability if you can only failover from a primary to its
663
          # secondary.
664
          "sinst-by-pnode": {},
665
        }
666
      except ValueError:
667
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
668
        bad = True
669
        continue
670

    
671
    node_vol_should = {}
672

    
673
    for instance in instancelist:
674
      feedback_fn("* Verifying instance %s" % instance)
675
      inst_config = self.cfg.GetInstanceInfo(instance)
676
      result =  self._VerifyInstance(instance, inst_config, node_volume,
677
                                     node_instance, feedback_fn)
678
      bad = bad or result
679

    
680
      inst_config.MapLVsByNode(node_vol_should)
681

    
682
      instance_cfg[instance] = inst_config
683

    
684
      pnode = inst_config.primary_node
685
      if pnode in node_info:
686
        node_info[pnode]['pinst'].append(instance)
687
      else:
688
        feedback_fn("  - ERROR: instance %s, connection to primary node"
689
                    " %s failed" % (instance, pnode))
690
        bad = True
691

    
692
      # If the instance is non-redundant we cannot survive losing its primary
693
      # node, so we are not N+1 compliant. On the other hand we have no disk
694
      # templates with more than one secondary so that situation is not well
695
      # supported either.
696
      # FIXME: does not support file-backed instances
697
      if len(inst_config.secondary_nodes) == 0:
698
        i_non_redundant.append(instance)
699
      elif len(inst_config.secondary_nodes) > 1:
700
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
701
                    % instance)
702

    
703
      for snode in inst_config.secondary_nodes:
704
        if snode in node_info:
705
          node_info[snode]['sinst'].append(instance)
706
          if pnode not in node_info[snode]['sinst-by-pnode']:
707
            node_info[snode]['sinst-by-pnode'][pnode] = []
708
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
709
        else:
710
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
711
                      " %s failed" % (instance, snode))
712

    
713
    feedback_fn("* Verifying orphan volumes")
714
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
715
                                       feedback_fn)
716
    bad = bad or result
717

    
718
    feedback_fn("* Verifying remaining instances")
719
    result = self._VerifyOrphanInstances(instancelist, node_instance,
720
                                         feedback_fn)
721
    bad = bad or result
722

    
723
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
724
      feedback_fn("* Verifying N+1 Memory redundancy")
725
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
726
      bad = bad or result
727

    
728
    feedback_fn("* Other Notes")
729
    if i_non_redundant:
730
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
731
                  % len(i_non_redundant))
732

    
733
    return int(bad)
734

    
735
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
736
    """Analize the post-hooks' result, handle it, and send some
737
    nicely-formatted feedback back to the user.
738

739
    Args:
740
      phase: the hooks phase that has just been run
741
      hooks_results: the results of the multi-node hooks rpc call
742
      feedback_fn: function to send feedback back to the caller
743
      lu_result: previous Exec result
744

745
    """
746
    # We only really run POST phase hooks, and are only interested in their results
747
    if phase == constants.HOOKS_PHASE_POST:
748
      # Used to change hooks' output to proper indentation
749
      indent_re = re.compile('^', re.M)
750
      feedback_fn("* Hooks Results")
751
      if not hooks_results:
752
        feedback_fn("  - ERROR: general communication failure")
753
        lu_result = 1
754
      else:
755
        for node_name in hooks_results:
756
          show_node_header = True
757
          res = hooks_results[node_name]
758
          if res is False or not isinstance(res, list):
759
            feedback_fn("    Communication failure")
760
            lu_result = 1
761
            continue
762
          for script, hkr, output in res:
763
            if hkr == constants.HKR_FAIL:
764
              # The node header is only shown once, if there are
765
              # failing hooks on that node
766
              if show_node_header:
767
                feedback_fn("  Node %s:" % node_name)
768
                show_node_header = False
769
              feedback_fn("    ERROR: Script %s failed, output:" % script)
770
              output = indent_re.sub('      ', output)
771
              feedback_fn("%s" % output)
772
              lu_result = 1
773

    
774
      return lu_result
775

    
776

    
777
class LUVerifyDisks(NoHooksLU):
778
  """Verifies the cluster disks status.
779

780
  """
781
  _OP_REQP = []
782

    
783
  def CheckPrereq(self):
784
    """Check prerequisites.
785

786
    This has no prerequisites.
787

788
    """
789
    pass
790

    
791
  def Exec(self, feedback_fn):
792
    """Verify integrity of cluster disks.
793

794
    """
795
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
796

    
797
    vg_name = self.cfg.GetVGName()
798
    nodes = utils.NiceSort(self.cfg.GetNodeList())
799
    instances = [self.cfg.GetInstanceInfo(name)
800
                 for name in self.cfg.GetInstanceList()]
801

    
802
    nv_dict = {}
803
    for inst in instances:
804
      inst_lvs = {}
805
      if (inst.status != "up" or
806
          inst.disk_template not in constants.DTS_NET_MIRROR):
807
        continue
808
      inst.MapLVsByNode(inst_lvs)
809
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
810
      for node, vol_list in inst_lvs.iteritems():
811
        for vol in vol_list:
812
          nv_dict[(node, vol)] = inst
813

    
814
    if not nv_dict:
815
      return result
816

    
817
    node_lvs = rpc.call_volume_list(nodes, vg_name)
818

    
819
    to_act = set()
820
    for node in nodes:
821
      # node_volume
822
      lvs = node_lvs[node]
823

    
824
      if isinstance(lvs, basestring):
825
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
826
        res_nlvm[node] = lvs
827
      elif not isinstance(lvs, dict):
828
        logger.Info("connection to node %s failed or invalid data returned" %
829
                    (node,))
830
        res_nodes.append(node)
831
        continue
832

    
833
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
834
        inst = nv_dict.pop((node, lv_name), None)
835
        if (not lv_online and inst is not None
836
            and inst.name not in res_instances):
837
          res_instances.append(inst.name)
838

    
839
    # any leftover items in nv_dict are missing LVs, let's arrange the
840
    # data better
841
    for key, inst in nv_dict.iteritems():
842
      if inst.name not in res_missing:
843
        res_missing[inst.name] = []
844
      res_missing[inst.name].append(key)
845

    
846
    return result
847

    
848

    
849
class LURenameCluster(LogicalUnit):
850
  """Rename the cluster.
851

852
  """
853
  HPATH = "cluster-rename"
854
  HTYPE = constants.HTYPE_CLUSTER
855
  _OP_REQP = ["name"]
856
  REQ_WSSTORE = True
857

    
858
  def BuildHooksEnv(self):
859
    """Build hooks env.
860

861
    """
862
    env = {
863
      "OP_TARGET": self.sstore.GetClusterName(),
864
      "NEW_NAME": self.op.name,
865
      }
866
    mn = self.sstore.GetMasterNode()
867
    return env, [mn], [mn]
868

    
869
  def CheckPrereq(self):
870
    """Verify that the passed name is a valid one.
871

872
    """
873
    hostname = utils.HostInfo(self.op.name)
874

    
875
    new_name = hostname.name
876
    self.ip = new_ip = hostname.ip
877
    old_name = self.sstore.GetClusterName()
878
    old_ip = self.sstore.GetMasterIP()
879
    if new_name == old_name and new_ip == old_ip:
880
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
881
                                 " cluster has changed")
882
    if new_ip != old_ip:
883
      result = utils.RunCmd(["fping", "-q", new_ip])
884
      if not result.failed:
885
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
886
                                   " reachable on the network. Aborting." %
887
                                   new_ip)
888

    
889
    self.op.name = new_name
890

    
891
  def Exec(self, feedback_fn):
892
    """Rename the cluster.
893

894
    """
895
    clustername = self.op.name
896
    ip = self.ip
897
    ss = self.sstore
898

    
899
    # shutdown the master IP
900
    master = ss.GetMasterNode()
901
    if not rpc.call_node_stop_master(master):
902
      raise errors.OpExecError("Could not disable the master role")
903

    
904
    try:
905
      # modify the sstore
906
      ss.SetKey(ss.SS_MASTER_IP, ip)
907
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
908

    
909
      # Distribute updated ss config to all nodes
910
      myself = self.cfg.GetNodeInfo(master)
911
      dist_nodes = self.cfg.GetNodeList()
912
      if myself.name in dist_nodes:
913
        dist_nodes.remove(myself.name)
914

    
915
      logger.Debug("Copying updated ssconf data to all nodes")
916
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
917
        fname = ss.KeyToFilename(keyname)
918
        result = rpc.call_upload_file(dist_nodes, fname)
919
        for to_node in dist_nodes:
920
          if not result[to_node]:
921
            logger.Error("copy of file %s to node %s failed" %
922
                         (fname, to_node))
923
    finally:
924
      if not rpc.call_node_start_master(master):
925
        logger.Error("Could not re-enable the master role on the master,"
926
                     " please restart manually.")
927

    
928

    
929
def _RecursiveCheckIfLVMBased(disk):
930
  """Check if the given disk or its children are lvm-based.
931

932
  Args:
933
    disk: ganeti.objects.Disk object
934

935
  Returns:
936
    boolean indicating whether a LD_LV dev_type was found or not
937

938
  """
939
  if disk.children:
940
    for chdisk in disk.children:
941
      if _RecursiveCheckIfLVMBased(chdisk):
942
        return True
943
  return disk.dev_type == constants.LD_LV
944

    
945

    
946
class LUSetClusterParams(LogicalUnit):
947
  """Change the parameters of the cluster.
948

949
  """
950
  HPATH = "cluster-modify"
951
  HTYPE = constants.HTYPE_CLUSTER
952
  _OP_REQP = []
953

    
954
  def BuildHooksEnv(self):
955
    """Build hooks env.
956

957
    """
958
    env = {
959
      "OP_TARGET": self.sstore.GetClusterName(),
960
      "NEW_VG_NAME": self.op.vg_name,
961
      }
962
    mn = self.sstore.GetMasterNode()
963
    return env, [mn], [mn]
964

    
965
  def CheckPrereq(self):
966
    """Check prerequisites.
967

968
    This checks whether the given params don't conflict and
969
    if the given volume group is valid.
970

971
    """
972
    if not self.op.vg_name:
973
      instances = [self.cfg.GetInstanceInfo(name)
974
                   for name in self.cfg.GetInstanceList()]
975
      for inst in instances:
976
        for disk in inst.disks:
977
          if _RecursiveCheckIfLVMBased(disk):
978
            raise errors.OpPrereqError("Cannot disable lvm storage while"
979
                                       " lvm-based instances exist")
980

    
981
    # if vg_name not None, checks given volume group on all nodes
982
    if self.op.vg_name:
983
      node_list = self.cfg.GetNodeList()
984
      vglist = rpc.call_vg_list(node_list)
985
      for node in node_list:
986
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
987
                                              constants.MIN_VG_SIZE)
988
        if vgstatus:
989
          raise errors.OpPrereqError("Error on node '%s': %s" %
990
                                     (node, vgstatus))
991

    
992
  def Exec(self, feedback_fn):
993
    """Change the parameters of the cluster.
994

995
    """
996
    if self.op.vg_name != self.cfg.GetVGName():
997
      self.cfg.SetVGName(self.op.vg_name)
998
    else:
999
      feedback_fn("Cluster LVM configuration already in desired"
1000
                  " state, not changing")
1001

    
1002

    
1003
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1004
  """Sleep and poll for an instance's disk to sync.
1005

1006
  """
1007
  if not instance.disks:
1008
    return True
1009

    
1010
  if not oneshot:
1011
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1012

    
1013
  node = instance.primary_node
1014

    
1015
  for dev in instance.disks:
1016
    cfgw.SetDiskID(dev, node)
1017

    
1018
  retries = 0
1019
  while True:
1020
    max_time = 0
1021
    done = True
1022
    cumul_degraded = False
1023
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1024
    if not rstats:
1025
      proc.LogWarning("Can't get any data from node %s" % node)
1026
      retries += 1
1027
      if retries >= 10:
1028
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1029
                                 " aborting." % node)
1030
      time.sleep(6)
1031
      continue
1032
    retries = 0
1033
    for i in range(len(rstats)):
1034
      mstat = rstats[i]
1035
      if mstat is None:
1036
        proc.LogWarning("Can't compute data for node %s/%s" %
1037
                        (node, instance.disks[i].iv_name))
1038
        continue
1039
      # we ignore the ldisk parameter
1040
      perc_done, est_time, is_degraded, _ = mstat
1041
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1042
      if perc_done is not None:
1043
        done = False
1044
        if est_time is not None:
1045
          rem_time = "%d estimated seconds remaining" % est_time
1046
          max_time = est_time
1047
        else:
1048
          rem_time = "no time estimate"
1049
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1050
                     (instance.disks[i].iv_name, perc_done, rem_time))
1051
    if done or oneshot:
1052
      break
1053

    
1054
    if unlock:
1055
      #utils.Unlock('cmd')
1056
      pass
1057
    try:
1058
      time.sleep(min(60, max_time))
1059
    finally:
1060
      if unlock:
1061
        #utils.Lock('cmd')
1062
        pass
1063

    
1064
  if done:
1065
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1066
  return not cumul_degraded
1067

    
1068

    
1069
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1070
  """Check that mirrors are not degraded.
1071

1072
  The ldisk parameter, if True, will change the test from the
1073
  is_degraded attribute (which represents overall non-ok status for
1074
  the device(s)) to the ldisk (representing the local storage status).
1075

1076
  """
1077
  cfgw.SetDiskID(dev, node)
1078
  if ldisk:
1079
    idx = 6
1080
  else:
1081
    idx = 5
1082

    
1083
  result = True
1084
  if on_primary or dev.AssembleOnSecondary():
1085
    rstats = rpc.call_blockdev_find(node, dev)
1086
    if not rstats:
1087
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1088
      result = False
1089
    else:
1090
      result = result and (not rstats[idx])
1091
  if dev.children:
1092
    for child in dev.children:
1093
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1094

    
1095
  return result
1096

    
1097

    
1098
class LUDiagnoseOS(NoHooksLU):
1099
  """Logical unit for OS diagnose/query.
1100

1101
  """
1102
  _OP_REQP = ["output_fields", "names"]
1103

    
1104
  def CheckPrereq(self):
1105
    """Check prerequisites.
1106

1107
    This always succeeds, since this is a pure query LU.
1108

1109
    """
1110
    if self.op.names:
1111
      raise errors.OpPrereqError("Selective OS query not supported")
1112

    
1113
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1114
    _CheckOutputFields(static=[],
1115
                       dynamic=self.dynamic_fields,
1116
                       selected=self.op.output_fields)
1117

    
1118
  @staticmethod
1119
  def _DiagnoseByOS(node_list, rlist):
1120
    """Remaps a per-node return list into an a per-os per-node dictionary
1121

1122
      Args:
1123
        node_list: a list with the names of all nodes
1124
        rlist: a map with node names as keys and OS objects as values
1125

1126
      Returns:
1127
        map: a map with osnames as keys and as value another map, with
1128
             nodes as
1129
             keys and list of OS objects as values
1130
             e.g. {"debian-etch": {"node1": [<object>,...],
1131
                                   "node2": [<object>,]}
1132
                  }
1133

1134
    """
1135
    all_os = {}
1136
    for node_name, nr in rlist.iteritems():
1137
      if not nr:
1138
        continue
1139
      for os_obj in nr:
1140
        if os_obj.name not in all_os:
1141
          # build a list of nodes for this os containing empty lists
1142
          # for each node in node_list
1143
          all_os[os_obj.name] = {}
1144
          for nname in node_list:
1145
            all_os[os_obj.name][nname] = []
1146
        all_os[os_obj.name][node_name].append(os_obj)
1147
    return all_os
1148

    
1149
  def Exec(self, feedback_fn):
1150
    """Compute the list of OSes.
1151

1152
    """
1153
    node_list = self.cfg.GetNodeList()
1154
    node_data = rpc.call_os_diagnose(node_list)
1155
    if node_data == False:
1156
      raise errors.OpExecError("Can't gather the list of OSes")
1157
    pol = self._DiagnoseByOS(node_list, node_data)
1158
    output = []
1159
    for os_name, os_data in pol.iteritems():
1160
      row = []
1161
      for field in self.op.output_fields:
1162
        if field == "name":
1163
          val = os_name
1164
        elif field == "valid":
1165
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1166
        elif field == "node_status":
1167
          val = {}
1168
          for node_name, nos_list in os_data.iteritems():
1169
            val[node_name] = [(v.status, v.path) for v in nos_list]
1170
        else:
1171
          raise errors.ParameterError(field)
1172
        row.append(val)
1173
      output.append(row)
1174

    
1175
    return output
1176

    
1177

    
1178
class LURemoveNode(LogicalUnit):
1179
  """Logical unit for removing a node.
1180

1181
  """
1182
  HPATH = "node-remove"
1183
  HTYPE = constants.HTYPE_NODE
1184
  _OP_REQP = ["node_name"]
1185

    
1186
  def BuildHooksEnv(self):
1187
    """Build hooks env.
1188

1189
    This doesn't run on the target node in the pre phase as a failed
1190
    node would not allows itself to run.
1191

1192
    """
1193
    env = {
1194
      "OP_TARGET": self.op.node_name,
1195
      "NODE_NAME": self.op.node_name,
1196
      }
1197
    all_nodes = self.cfg.GetNodeList()
1198
    all_nodes.remove(self.op.node_name)
1199
    return env, all_nodes, all_nodes
1200

    
1201
  def CheckPrereq(self):
1202
    """Check prerequisites.
1203

1204
    This checks:
1205
     - the node exists in the configuration
1206
     - it does not have primary or secondary instances
1207
     - it's not the master
1208

1209
    Any errors are signalled by raising errors.OpPrereqError.
1210

1211
    """
1212
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1213
    if node is None:
1214
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1215

    
1216
    instance_list = self.cfg.GetInstanceList()
1217

    
1218
    masternode = self.sstore.GetMasterNode()
1219
    if node.name == masternode:
1220
      raise errors.OpPrereqError("Node is the master node,"
1221
                                 " you need to failover first.")
1222

    
1223
    for instance_name in instance_list:
1224
      instance = self.cfg.GetInstanceInfo(instance_name)
1225
      if node.name == instance.primary_node:
1226
        raise errors.OpPrereqError("Instance %s still running on the node,"
1227
                                   " please remove first." % instance_name)
1228
      if node.name in instance.secondary_nodes:
1229
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1230
                                   " please remove first." % instance_name)
1231
    self.op.node_name = node.name
1232
    self.node = node
1233

    
1234
  def Exec(self, feedback_fn):
1235
    """Removes the node from the cluster.
1236

1237
    """
1238
    node = self.node
1239
    logger.Info("stopping the node daemon and removing configs from node %s" %
1240
                node.name)
1241

    
1242
    rpc.call_node_leave_cluster(node.name)
1243

    
1244
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1245

    
1246
    logger.Info("Removing node %s from config" % node.name)
1247

    
1248
    self.cfg.RemoveNode(node.name)
1249

    
1250
    utils.RemoveHostFromEtcHosts(node.name)
1251

    
1252

    
1253
class LUQueryNodes(NoHooksLU):
1254
  """Logical unit for querying nodes.
1255

1256
  """
1257
  _OP_REQP = ["output_fields", "names"]
1258

    
1259
  def CheckPrereq(self):
1260
    """Check prerequisites.
1261

1262
    This checks that the fields required are valid output fields.
1263

1264
    """
1265
    self.dynamic_fields = frozenset([
1266
      "dtotal", "dfree",
1267
      "mtotal", "mnode", "mfree",
1268
      "bootid",
1269
      "ctotal",
1270
      ])
1271

    
1272
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1273
                               "pinst_list", "sinst_list",
1274
                               "pip", "sip", "tags"],
1275
                       dynamic=self.dynamic_fields,
1276
                       selected=self.op.output_fields)
1277

    
1278
    self.wanted = _GetWantedNodes(self, self.op.names)
1279

    
1280
  def Exec(self, feedback_fn):
1281
    """Computes the list of nodes and their attributes.
1282

1283
    """
1284
    nodenames = self.wanted
1285
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1286

    
1287
    # begin data gathering
1288

    
1289
    if self.dynamic_fields.intersection(self.op.output_fields):
1290
      live_data = {}
1291
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1292
      for name in nodenames:
1293
        nodeinfo = node_data.get(name, None)
1294
        if nodeinfo:
1295
          live_data[name] = {
1296
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1297
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1298
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1299
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1300
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1301
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1302
            "bootid": nodeinfo['bootid'],
1303
            }
1304
        else:
1305
          live_data[name] = {}
1306
    else:
1307
      live_data = dict.fromkeys(nodenames, {})
1308

    
1309
    node_to_primary = dict([(name, set()) for name in nodenames])
1310
    node_to_secondary = dict([(name, set()) for name in nodenames])
1311

    
1312
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1313
                             "sinst_cnt", "sinst_list"))
1314
    if inst_fields & frozenset(self.op.output_fields):
1315
      instancelist = self.cfg.GetInstanceList()
1316

    
1317
      for instance_name in instancelist:
1318
        inst = self.cfg.GetInstanceInfo(instance_name)
1319
        if inst.primary_node in node_to_primary:
1320
          node_to_primary[inst.primary_node].add(inst.name)
1321
        for secnode in inst.secondary_nodes:
1322
          if secnode in node_to_secondary:
1323
            node_to_secondary[secnode].add(inst.name)
1324

    
1325
    # end data gathering
1326

    
1327
    output = []
1328
    for node in nodelist:
1329
      node_output = []
1330
      for field in self.op.output_fields:
1331
        if field == "name":
1332
          val = node.name
1333
        elif field == "pinst_list":
1334
          val = list(node_to_primary[node.name])
1335
        elif field == "sinst_list":
1336
          val = list(node_to_secondary[node.name])
1337
        elif field == "pinst_cnt":
1338
          val = len(node_to_primary[node.name])
1339
        elif field == "sinst_cnt":
1340
          val = len(node_to_secondary[node.name])
1341
        elif field == "pip":
1342
          val = node.primary_ip
1343
        elif field == "sip":
1344
          val = node.secondary_ip
1345
        elif field == "tags":
1346
          val = list(node.GetTags())
1347
        elif field in self.dynamic_fields:
1348
          val = live_data[node.name].get(field, None)
1349
        else:
1350
          raise errors.ParameterError(field)
1351
        node_output.append(val)
1352
      output.append(node_output)
1353

    
1354
    return output
1355

    
1356

    
1357
class LUQueryNodeVolumes(NoHooksLU):
1358
  """Logical unit for getting volumes on node(s).
1359

1360
  """
1361
  _OP_REQP = ["nodes", "output_fields"]
1362

    
1363
  def CheckPrereq(self):
1364
    """Check prerequisites.
1365

1366
    This checks that the fields required are valid output fields.
1367

1368
    """
1369
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1370

    
1371
    _CheckOutputFields(static=["node"],
1372
                       dynamic=["phys", "vg", "name", "size", "instance"],
1373
                       selected=self.op.output_fields)
1374

    
1375

    
1376
  def Exec(self, feedback_fn):
1377
    """Computes the list of nodes and their attributes.
1378

1379
    """
1380
    nodenames = self.nodes
1381
    volumes = rpc.call_node_volumes(nodenames)
1382

    
1383
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1384
             in self.cfg.GetInstanceList()]
1385

    
1386
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1387

    
1388
    output = []
1389
    for node in nodenames:
1390
      if node not in volumes or not volumes[node]:
1391
        continue
1392

    
1393
      node_vols = volumes[node][:]
1394
      node_vols.sort(key=lambda vol: vol['dev'])
1395

    
1396
      for vol in node_vols:
1397
        node_output = []
1398
        for field in self.op.output_fields:
1399
          if field == "node":
1400
            val = node
1401
          elif field == "phys":
1402
            val = vol['dev']
1403
          elif field == "vg":
1404
            val = vol['vg']
1405
          elif field == "name":
1406
            val = vol['name']
1407
          elif field == "size":
1408
            val = int(float(vol['size']))
1409
          elif field == "instance":
1410
            for inst in ilist:
1411
              if node not in lv_by_node[inst]:
1412
                continue
1413
              if vol['name'] in lv_by_node[inst][node]:
1414
                val = inst.name
1415
                break
1416
            else:
1417
              val = '-'
1418
          else:
1419
            raise errors.ParameterError(field)
1420
          node_output.append(str(val))
1421

    
1422
        output.append(node_output)
1423

    
1424
    return output
1425

    
1426

    
1427
class LUAddNode(LogicalUnit):
1428
  """Logical unit for adding node to the cluster.
1429

1430
  """
1431
  HPATH = "node-add"
1432
  HTYPE = constants.HTYPE_NODE
1433
  _OP_REQP = ["node_name"]
1434

    
1435
  def BuildHooksEnv(self):
1436
    """Build hooks env.
1437

1438
    This will run on all nodes before, and on all nodes + the new node after.
1439

1440
    """
1441
    env = {
1442
      "OP_TARGET": self.op.node_name,
1443
      "NODE_NAME": self.op.node_name,
1444
      "NODE_PIP": self.op.primary_ip,
1445
      "NODE_SIP": self.op.secondary_ip,
1446
      }
1447
    nodes_0 = self.cfg.GetNodeList()
1448
    nodes_1 = nodes_0 + [self.op.node_name, ]
1449
    return env, nodes_0, nodes_1
1450

    
1451
  def CheckPrereq(self):
1452
    """Check prerequisites.
1453

1454
    This checks:
1455
     - the new node is not already in the config
1456
     - it is resolvable
1457
     - its parameters (single/dual homed) matches the cluster
1458

1459
    Any errors are signalled by raising errors.OpPrereqError.
1460

1461
    """
1462
    node_name = self.op.node_name
1463
    cfg = self.cfg
1464

    
1465
    dns_data = utils.HostInfo(node_name)
1466

    
1467
    node = dns_data.name
1468
    primary_ip = self.op.primary_ip = dns_data.ip
1469
    secondary_ip = getattr(self.op, "secondary_ip", None)
1470
    if secondary_ip is None:
1471
      secondary_ip = primary_ip
1472
    if not utils.IsValidIP(secondary_ip):
1473
      raise errors.OpPrereqError("Invalid secondary IP given")
1474
    self.op.secondary_ip = secondary_ip
1475

    
1476
    node_list = cfg.GetNodeList()
1477
    if not self.op.readd and node in node_list:
1478
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1479
                                 node)
1480
    elif self.op.readd and node not in node_list:
1481
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1482

    
1483
    for existing_node_name in node_list:
1484
      existing_node = cfg.GetNodeInfo(existing_node_name)
1485

    
1486
      if self.op.readd and node == existing_node_name:
1487
        if (existing_node.primary_ip != primary_ip or
1488
            existing_node.secondary_ip != secondary_ip):
1489
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1490
                                     " address configuration as before")
1491
        continue
1492

    
1493
      if (existing_node.primary_ip == primary_ip or
1494
          existing_node.secondary_ip == primary_ip or
1495
          existing_node.primary_ip == secondary_ip or
1496
          existing_node.secondary_ip == secondary_ip):
1497
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1498
                                   " existing node %s" % existing_node.name)
1499

    
1500
    # check that the type of the node (single versus dual homed) is the
1501
    # same as for the master
1502
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1503
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1504
    newbie_singlehomed = secondary_ip == primary_ip
1505
    if master_singlehomed != newbie_singlehomed:
1506
      if master_singlehomed:
1507
        raise errors.OpPrereqError("The master has no private ip but the"
1508
                                   " new node has one")
1509
      else:
1510
        raise errors.OpPrereqError("The master has a private ip but the"
1511
                                   " new node doesn't have one")
1512

    
1513
    # checks reachablity
1514
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1515
      raise errors.OpPrereqError("Node not reachable by ping")
1516

    
1517
    if not newbie_singlehomed:
1518
      # check reachability from my secondary ip to newbie's secondary ip
1519
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1520
                           source=myself.secondary_ip):
1521
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1522
                                   " based ping to noded port")
1523

    
1524
    self.new_node = objects.Node(name=node,
1525
                                 primary_ip=primary_ip,
1526
                                 secondary_ip=secondary_ip)
1527

    
1528
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1529
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1530
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1531
                                   constants.VNC_PASSWORD_FILE)
1532

    
1533
  def Exec(self, feedback_fn):
1534
    """Adds the new node to the cluster.
1535

1536
    """
1537
    new_node = self.new_node
1538
    node = new_node.name
1539

    
1540
    # set up inter-node password and certificate and restarts the node daemon
1541
    gntpass = self.sstore.GetNodeDaemonPassword()
1542
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1543
      raise errors.OpExecError("ganeti password corruption detected")
1544
    f = open(constants.SSL_CERT_FILE)
1545
    try:
1546
      gntpem = f.read(8192)
1547
    finally:
1548
      f.close()
1549
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1550
    # so we use this to detect an invalid certificate; as long as the
1551
    # cert doesn't contain this, the here-document will be correctly
1552
    # parsed by the shell sequence below
1553
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1554
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1555
    if not gntpem.endswith("\n"):
1556
      raise errors.OpExecError("PEM must end with newline")
1557
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1558

    
1559
    # and then connect with ssh to set password and start ganeti-noded
1560
    # note that all the below variables are sanitized at this point,
1561
    # either by being constants or by the checks above
1562
    ss = self.sstore
1563
    mycommand = ("umask 077 && "
1564
                 "echo '%s' > '%s' && "
1565
                 "cat > '%s' << '!EOF.' && \n"
1566
                 "%s!EOF.\n%s restart" %
1567
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1568
                  constants.SSL_CERT_FILE, gntpem,
1569
                  constants.NODE_INITD_SCRIPT))
1570

    
1571
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1572
    if result.failed:
1573
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1574
                               " output: %s" %
1575
                               (node, result.fail_reason, result.output))
1576

    
1577
    # check connectivity
1578
    time.sleep(4)
1579

    
1580
    result = rpc.call_version([node])[node]
1581
    if result:
1582
      if constants.PROTOCOL_VERSION == result:
1583
        logger.Info("communication to node %s fine, sw version %s match" %
1584
                    (node, result))
1585
      else:
1586
        raise errors.OpExecError("Version mismatch master version %s,"
1587
                                 " node version %s" %
1588
                                 (constants.PROTOCOL_VERSION, result))
1589
    else:
1590
      raise errors.OpExecError("Cannot get version from the new node")
1591

    
1592
    # setup ssh on node
1593
    logger.Info("copy ssh key to node %s" % node)
1594
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1595
    keyarray = []
1596
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1597
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1598
                priv_key, pub_key]
1599

    
1600
    for i in keyfiles:
1601
      f = open(i, 'r')
1602
      try:
1603
        keyarray.append(f.read())
1604
      finally:
1605
        f.close()
1606

    
1607
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1608
                               keyarray[3], keyarray[4], keyarray[5])
1609

    
1610
    if not result:
1611
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1612

    
1613
    # Add node to our /etc/hosts, and add key to known_hosts
1614
    utils.AddHostToEtcHosts(new_node.name)
1615

    
1616
    if new_node.secondary_ip != new_node.primary_ip:
1617
      if not rpc.call_node_tcp_ping(new_node.name,
1618
                                    constants.LOCALHOST_IP_ADDRESS,
1619
                                    new_node.secondary_ip,
1620
                                    constants.DEFAULT_NODED_PORT,
1621
                                    10, False):
1622
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1623
                                 " you gave (%s). Please fix and re-run this"
1624
                                 " command." % new_node.secondary_ip)
1625

    
1626
    success, msg = self.ssh.VerifyNodeHostname(node)
1627
    if not success:
1628
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1629
                               " than the one the resolver gives: %s."
1630
                               " Please fix and re-run this command." %
1631
                               (node, msg))
1632

    
1633
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1634
    # including the node just added
1635
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1636
    dist_nodes = self.cfg.GetNodeList()
1637
    if not self.op.readd:
1638
      dist_nodes.append(node)
1639
    if myself.name in dist_nodes:
1640
      dist_nodes.remove(myself.name)
1641

    
1642
    logger.Debug("Copying hosts and known_hosts to all nodes")
1643
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1644
      result = rpc.call_upload_file(dist_nodes, fname)
1645
      for to_node in dist_nodes:
1646
        if not result[to_node]:
1647
          logger.Error("copy of file %s to node %s failed" %
1648
                       (fname, to_node))
1649

    
1650
    to_copy = ss.GetFileList()
1651
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1652
      to_copy.append(constants.VNC_PASSWORD_FILE)
1653
    for fname in to_copy:
1654
      if not self.ssh.CopyFileToNode(node, fname):
1655
        logger.Error("could not copy file %s to node %s" % (fname, node))
1656

    
1657
    if not self.op.readd:
1658
      logger.Info("adding node %s to cluster.conf" % node)
1659
      self.cfg.AddNode(new_node)
1660

    
1661

    
1662
class LUMasterFailover(LogicalUnit):
1663
  """Failover the master node to the current node.
1664

1665
  This is a special LU in that it must run on a non-master node.
1666

1667
  """
1668
  HPATH = "master-failover"
1669
  HTYPE = constants.HTYPE_CLUSTER
1670
  REQ_MASTER = False
1671
  REQ_WSSTORE = True
1672
  _OP_REQP = []
1673

    
1674
  def BuildHooksEnv(self):
1675
    """Build hooks env.
1676

1677
    This will run on the new master only in the pre phase, and on all
1678
    the nodes in the post phase.
1679

1680
    """
1681
    env = {
1682
      "OP_TARGET": self.new_master,
1683
      "NEW_MASTER": self.new_master,
1684
      "OLD_MASTER": self.old_master,
1685
      }
1686
    return env, [self.new_master], self.cfg.GetNodeList()
1687

    
1688
  def CheckPrereq(self):
1689
    """Check prerequisites.
1690

1691
    This checks that we are not already the master.
1692

1693
    """
1694
    self.new_master = utils.HostInfo().name
1695
    self.old_master = self.sstore.GetMasterNode()
1696

    
1697
    if self.old_master == self.new_master:
1698
      raise errors.OpPrereqError("This commands must be run on the node"
1699
                                 " where you want the new master to be."
1700
                                 " %s is already the master" %
1701
                                 self.old_master)
1702

    
1703
  def Exec(self, feedback_fn):
1704
    """Failover the master node.
1705

1706
    This command, when run on a non-master node, will cause the current
1707
    master to cease being master, and the non-master to become new
1708
    master.
1709

1710
    """
1711
    #TODO: do not rely on gethostname returning the FQDN
1712
    logger.Info("setting master to %s, old master: %s" %
1713
                (self.new_master, self.old_master))
1714

    
1715
    if not rpc.call_node_stop_master(self.old_master):
1716
      logger.Error("could disable the master role on the old master"
1717
                   " %s, please disable manually" % self.old_master)
1718

    
1719
    ss = self.sstore
1720
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1721
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1722
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1723
      logger.Error("could not distribute the new simple store master file"
1724
                   " to the other nodes, please check.")
1725

    
1726
    if not rpc.call_node_start_master(self.new_master):
1727
      logger.Error("could not start the master role on the new master"
1728
                   " %s, please check" % self.new_master)
1729
      feedback_fn("Error in activating the master IP on the new master,"
1730
                  " please fix manually.")
1731

    
1732

    
1733

    
1734
class LUQueryClusterInfo(NoHooksLU):
1735
  """Query cluster configuration.
1736

1737
  """
1738
  _OP_REQP = []
1739
  REQ_MASTER = False
1740

    
1741
  def CheckPrereq(self):
1742
    """No prerequsites needed for this LU.
1743

1744
    """
1745
    pass
1746

    
1747
  def Exec(self, feedback_fn):
1748
    """Return cluster config.
1749

1750
    """
1751
    result = {
1752
      "name": self.sstore.GetClusterName(),
1753
      "software_version": constants.RELEASE_VERSION,
1754
      "protocol_version": constants.PROTOCOL_VERSION,
1755
      "config_version": constants.CONFIG_VERSION,
1756
      "os_api_version": constants.OS_API_VERSION,
1757
      "export_version": constants.EXPORT_VERSION,
1758
      "master": self.sstore.GetMasterNode(),
1759
      "architecture": (platform.architecture()[0], platform.machine()),
1760
      "hypervisor_type": self.sstore.GetHypervisorType(),
1761
      }
1762

    
1763
    return result
1764

    
1765

    
1766
class LUDumpClusterConfig(NoHooksLU):
1767
  """Return a text-representation of the cluster-config.
1768

1769
  """
1770
  _OP_REQP = []
1771

    
1772
  def CheckPrereq(self):
1773
    """No prerequisites.
1774

1775
    """
1776
    pass
1777

    
1778
  def Exec(self, feedback_fn):
1779
    """Dump a representation of the cluster config to the standard output.
1780

1781
    """
1782
    return self.cfg.DumpConfig()
1783

    
1784

    
1785
class LUActivateInstanceDisks(NoHooksLU):
1786
  """Bring up an instance's disks.
1787

1788
  """
1789
  _OP_REQP = ["instance_name"]
1790

    
1791
  def CheckPrereq(self):
1792
    """Check prerequisites.
1793

1794
    This checks that the instance is in the cluster.
1795

1796
    """
1797
    instance = self.cfg.GetInstanceInfo(
1798
      self.cfg.ExpandInstanceName(self.op.instance_name))
1799
    if instance is None:
1800
      raise errors.OpPrereqError("Instance '%s' not known" %
1801
                                 self.op.instance_name)
1802
    self.instance = instance
1803

    
1804

    
1805
  def Exec(self, feedback_fn):
1806
    """Activate the disks.
1807

1808
    """
1809
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1810
    if not disks_ok:
1811
      raise errors.OpExecError("Cannot activate block devices")
1812

    
1813
    return disks_info
1814

    
1815

    
1816
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1817
  """Prepare the block devices for an instance.
1818

1819
  This sets up the block devices on all nodes.
1820

1821
  Args:
1822
    instance: a ganeti.objects.Instance object
1823
    ignore_secondaries: if true, errors on secondary nodes won't result
1824
                        in an error return from the function
1825

1826
  Returns:
1827
    false if the operation failed
1828
    list of (host, instance_visible_name, node_visible_name) if the operation
1829
         suceeded with the mapping from node devices to instance devices
1830
  """
1831
  device_info = []
1832
  disks_ok = True
1833
  iname = instance.name
1834
  # With the two passes mechanism we try to reduce the window of
1835
  # opportunity for the race condition of switching DRBD to primary
1836
  # before handshaking occured, but we do not eliminate it
1837

    
1838
  # The proper fix would be to wait (with some limits) until the
1839
  # connection has been made and drbd transitions from WFConnection
1840
  # into any other network-connected state (Connected, SyncTarget,
1841
  # SyncSource, etc.)
1842

    
1843
  # 1st pass, assemble on all nodes in secondary mode
1844
  for inst_disk in instance.disks:
1845
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1846
      cfg.SetDiskID(node_disk, node)
1847
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1848
      if not result:
1849
        logger.Error("could not prepare block device %s on node %s"
1850
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1851
        if not ignore_secondaries:
1852
          disks_ok = False
1853

    
1854
  # FIXME: race condition on drbd migration to primary
1855

    
1856
  # 2nd pass, do only the primary node
1857
  for inst_disk in instance.disks:
1858
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1859
      if node != instance.primary_node:
1860
        continue
1861
      cfg.SetDiskID(node_disk, node)
1862
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1863
      if not result:
1864
        logger.Error("could not prepare block device %s on node %s"
1865
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1866
        disks_ok = False
1867
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1868

    
1869
  # leave the disks configured for the primary node
1870
  # this is a workaround that would be fixed better by
1871
  # improving the logical/physical id handling
1872
  for disk in instance.disks:
1873
    cfg.SetDiskID(disk, instance.primary_node)
1874

    
1875
  return disks_ok, device_info
1876

    
1877

    
1878
def _StartInstanceDisks(cfg, instance, force):
1879
  """Start the disks of an instance.
1880

1881
  """
1882
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1883
                                           ignore_secondaries=force)
1884
  if not disks_ok:
1885
    _ShutdownInstanceDisks(instance, cfg)
1886
    if force is not None and not force:
1887
      logger.Error("If the message above refers to a secondary node,"
1888
                   " you can retry the operation using '--force'.")
1889
    raise errors.OpExecError("Disk consistency error")
1890

    
1891

    
1892
class LUDeactivateInstanceDisks(NoHooksLU):
1893
  """Shutdown an instance's disks.
1894

1895
  """
1896
  _OP_REQP = ["instance_name"]
1897

    
1898
  def CheckPrereq(self):
1899
    """Check prerequisites.
1900

1901
    This checks that the instance is in the cluster.
1902

1903
    """
1904
    instance = self.cfg.GetInstanceInfo(
1905
      self.cfg.ExpandInstanceName(self.op.instance_name))
1906
    if instance is None:
1907
      raise errors.OpPrereqError("Instance '%s' not known" %
1908
                                 self.op.instance_name)
1909
    self.instance = instance
1910

    
1911
  def Exec(self, feedback_fn):
1912
    """Deactivate the disks
1913

1914
    """
1915
    instance = self.instance
1916
    ins_l = rpc.call_instance_list([instance.primary_node])
1917
    ins_l = ins_l[instance.primary_node]
1918
    if not type(ins_l) is list:
1919
      raise errors.OpExecError("Can't contact node '%s'" %
1920
                               instance.primary_node)
1921

    
1922
    if self.instance.name in ins_l:
1923
      raise errors.OpExecError("Instance is running, can't shutdown"
1924
                               " block devices.")
1925

    
1926
    _ShutdownInstanceDisks(instance, self.cfg)
1927

    
1928

    
1929
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1930
  """Shutdown block devices of an instance.
1931

1932
  This does the shutdown on all nodes of the instance.
1933

1934
  If the ignore_primary is false, errors on the primary node are
1935
  ignored.
1936

1937
  """
1938
  result = True
1939
  for disk in instance.disks:
1940
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1941
      cfg.SetDiskID(top_disk, node)
1942
      if not rpc.call_blockdev_shutdown(node, top_disk):
1943
        logger.Error("could not shutdown block device %s on node %s" %
1944
                     (disk.iv_name, node))
1945
        if not ignore_primary or node != instance.primary_node:
1946
          result = False
1947
  return result
1948

    
1949

    
1950
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1951
  """Checks if a node has enough free memory.
1952

1953
  This function check if a given node has the needed amount of free
1954
  memory. In case the node has less memory or we cannot get the
1955
  information from the node, this function raise an OpPrereqError
1956
  exception.
1957

1958
  Args:
1959
    - cfg: a ConfigWriter instance
1960
    - node: the node name
1961
    - reason: string to use in the error message
1962
    - requested: the amount of memory in MiB
1963

1964
  """
1965
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1966
  if not nodeinfo or not isinstance(nodeinfo, dict):
1967
    raise errors.OpPrereqError("Could not contact node %s for resource"
1968
                             " information" % (node,))
1969

    
1970
  free_mem = nodeinfo[node].get('memory_free')
1971
  if not isinstance(free_mem, int):
1972
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1973
                             " was '%s'" % (node, free_mem))
1974
  if requested > free_mem:
1975
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1976
                             " needed %s MiB, available %s MiB" %
1977
                             (node, reason, requested, free_mem))
1978

    
1979

    
1980
class LUStartupInstance(LogicalUnit):
1981
  """Starts an instance.
1982

1983
  """
1984
  HPATH = "instance-start"
1985
  HTYPE = constants.HTYPE_INSTANCE
1986
  _OP_REQP = ["instance_name", "force"]
1987

    
1988
  def BuildHooksEnv(self):
1989
    """Build hooks env.
1990

1991
    This runs on master, primary and secondary nodes of the instance.
1992

1993
    """
1994
    env = {
1995
      "FORCE": self.op.force,
1996
      }
1997
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1998
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1999
          list(self.instance.secondary_nodes))
2000
    return env, nl, nl
2001

    
2002
  def CheckPrereq(self):
2003
    """Check prerequisites.
2004

2005
    This checks that the instance is in the cluster.
2006

2007
    """
2008
    instance = self.cfg.GetInstanceInfo(
2009
      self.cfg.ExpandInstanceName(self.op.instance_name))
2010
    if instance is None:
2011
      raise errors.OpPrereqError("Instance '%s' not known" %
2012
                                 self.op.instance_name)
2013

    
2014
    # check bridges existance
2015
    _CheckInstanceBridgesExist(instance)
2016

    
2017
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2018
                         "starting instance %s" % instance.name,
2019
                         instance.memory)
2020

    
2021
    self.instance = instance
2022
    self.op.instance_name = instance.name
2023

    
2024
  def Exec(self, feedback_fn):
2025
    """Start the instance.
2026

2027
    """
2028
    instance = self.instance
2029
    force = self.op.force
2030
    extra_args = getattr(self.op, "extra_args", "")
2031

    
2032
    self.cfg.MarkInstanceUp(instance.name)
2033

    
2034
    node_current = instance.primary_node
2035

    
2036
    _StartInstanceDisks(self.cfg, instance, force)
2037

    
2038
    if not rpc.call_instance_start(node_current, instance, extra_args):
2039
      _ShutdownInstanceDisks(instance, self.cfg)
2040
      raise errors.OpExecError("Could not start instance")
2041

    
2042

    
2043
class LURebootInstance(LogicalUnit):
2044
  """Reboot an instance.
2045

2046
  """
2047
  HPATH = "instance-reboot"
2048
  HTYPE = constants.HTYPE_INSTANCE
2049
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2050

    
2051
  def BuildHooksEnv(self):
2052
    """Build hooks env.
2053

2054
    This runs on master, primary and secondary nodes of the instance.
2055

2056
    """
2057
    env = {
2058
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2059
      }
2060
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2061
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2062
          list(self.instance.secondary_nodes))
2063
    return env, nl, nl
2064

    
2065
  def CheckPrereq(self):
2066
    """Check prerequisites.
2067

2068
    This checks that the instance is in the cluster.
2069

2070
    """
2071
    instance = self.cfg.GetInstanceInfo(
2072
      self.cfg.ExpandInstanceName(self.op.instance_name))
2073
    if instance is None:
2074
      raise errors.OpPrereqError("Instance '%s' not known" %
2075
                                 self.op.instance_name)
2076

    
2077
    # check bridges existance
2078
    _CheckInstanceBridgesExist(instance)
2079

    
2080
    self.instance = instance
2081
    self.op.instance_name = instance.name
2082

    
2083
  def Exec(self, feedback_fn):
2084
    """Reboot the instance.
2085

2086
    """
2087
    instance = self.instance
2088
    ignore_secondaries = self.op.ignore_secondaries
2089
    reboot_type = self.op.reboot_type
2090
    extra_args = getattr(self.op, "extra_args", "")
2091

    
2092
    node_current = instance.primary_node
2093

    
2094
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2095
                           constants.INSTANCE_REBOOT_HARD,
2096
                           constants.INSTANCE_REBOOT_FULL]:
2097
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2098
                                  (constants.INSTANCE_REBOOT_SOFT,
2099
                                   constants.INSTANCE_REBOOT_HARD,
2100
                                   constants.INSTANCE_REBOOT_FULL))
2101

    
2102
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2103
                       constants.INSTANCE_REBOOT_HARD]:
2104
      if not rpc.call_instance_reboot(node_current, instance,
2105
                                      reboot_type, extra_args):
2106
        raise errors.OpExecError("Could not reboot instance")
2107
    else:
2108
      if not rpc.call_instance_shutdown(node_current, instance):
2109
        raise errors.OpExecError("could not shutdown instance for full reboot")
2110
      _ShutdownInstanceDisks(instance, self.cfg)
2111
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2112
      if not rpc.call_instance_start(node_current, instance, extra_args):
2113
        _ShutdownInstanceDisks(instance, self.cfg)
2114
        raise errors.OpExecError("Could not start instance for full reboot")
2115

    
2116
    self.cfg.MarkInstanceUp(instance.name)
2117

    
2118

    
2119
class LUShutdownInstance(LogicalUnit):
2120
  """Shutdown an instance.
2121

2122
  """
2123
  HPATH = "instance-stop"
2124
  HTYPE = constants.HTYPE_INSTANCE
2125
  _OP_REQP = ["instance_name"]
2126

    
2127
  def BuildHooksEnv(self):
2128
    """Build hooks env.
2129

2130
    This runs on master, primary and secondary nodes of the instance.
2131

2132
    """
2133
    env = _BuildInstanceHookEnvByObject(self.instance)
2134
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2135
          list(self.instance.secondary_nodes))
2136
    return env, nl, nl
2137

    
2138
  def CheckPrereq(self):
2139
    """Check prerequisites.
2140

2141
    This checks that the instance is in the cluster.
2142

2143
    """
2144
    instance = self.cfg.GetInstanceInfo(
2145
      self.cfg.ExpandInstanceName(self.op.instance_name))
2146
    if instance is None:
2147
      raise errors.OpPrereqError("Instance '%s' not known" %
2148
                                 self.op.instance_name)
2149
    self.instance = instance
2150

    
2151
  def Exec(self, feedback_fn):
2152
    """Shutdown the instance.
2153

2154
    """
2155
    instance = self.instance
2156
    node_current = instance.primary_node
2157
    self.cfg.MarkInstanceDown(instance.name)
2158
    if not rpc.call_instance_shutdown(node_current, instance):
2159
      logger.Error("could not shutdown instance")
2160

    
2161
    _ShutdownInstanceDisks(instance, self.cfg)
2162

    
2163

    
2164
class LUReinstallInstance(LogicalUnit):
2165
  """Reinstall an instance.
2166

2167
  """
2168
  HPATH = "instance-reinstall"
2169
  HTYPE = constants.HTYPE_INSTANCE
2170
  _OP_REQP = ["instance_name"]
2171

    
2172
  def BuildHooksEnv(self):
2173
    """Build hooks env.
2174

2175
    This runs on master, primary and secondary nodes of the instance.
2176

2177
    """
2178
    env = _BuildInstanceHookEnvByObject(self.instance)
2179
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2180
          list(self.instance.secondary_nodes))
2181
    return env, nl, nl
2182

    
2183
  def CheckPrereq(self):
2184
    """Check prerequisites.
2185

2186
    This checks that the instance is in the cluster and is not running.
2187

2188
    """
2189
    instance = self.cfg.GetInstanceInfo(
2190
      self.cfg.ExpandInstanceName(self.op.instance_name))
2191
    if instance is None:
2192
      raise errors.OpPrereqError("Instance '%s' not known" %
2193
                                 self.op.instance_name)
2194
    if instance.disk_template == constants.DT_DISKLESS:
2195
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2196
                                 self.op.instance_name)
2197
    if instance.status != "down":
2198
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2199
                                 self.op.instance_name)
2200
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2201
    if remote_info:
2202
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2203
                                 (self.op.instance_name,
2204
                                  instance.primary_node))
2205

    
2206
    self.op.os_type = getattr(self.op, "os_type", None)
2207
    if self.op.os_type is not None:
2208
      # OS verification
2209
      pnode = self.cfg.GetNodeInfo(
2210
        self.cfg.ExpandNodeName(instance.primary_node))
2211
      if pnode is None:
2212
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2213
                                   self.op.pnode)
2214
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2215
      if not os_obj:
2216
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2217
                                   " primary node"  % self.op.os_type)
2218

    
2219
    self.instance = instance
2220

    
2221
  def Exec(self, feedback_fn):
2222
    """Reinstall the instance.
2223

2224
    """
2225
    inst = self.instance
2226

    
2227
    if self.op.os_type is not None:
2228
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2229
      inst.os = self.op.os_type
2230
      self.cfg.AddInstance(inst)
2231

    
2232
    _StartInstanceDisks(self.cfg, inst, None)
2233
    try:
2234
      feedback_fn("Running the instance OS create scripts...")
2235
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2236
        raise errors.OpExecError("Could not install OS for instance %s"
2237
                                 " on node %s" %
2238
                                 (inst.name, inst.primary_node))
2239
    finally:
2240
      _ShutdownInstanceDisks(inst, self.cfg)
2241

    
2242

    
2243
class LURenameInstance(LogicalUnit):
2244
  """Rename an instance.
2245

2246
  """
2247
  HPATH = "instance-rename"
2248
  HTYPE = constants.HTYPE_INSTANCE
2249
  _OP_REQP = ["instance_name", "new_name"]
2250

    
2251
  def BuildHooksEnv(self):
2252
    """Build hooks env.
2253

2254
    This runs on master, primary and secondary nodes of the instance.
2255

2256
    """
2257
    env = _BuildInstanceHookEnvByObject(self.instance)
2258
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2259
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2260
          list(self.instance.secondary_nodes))
2261
    return env, nl, nl
2262

    
2263
  def CheckPrereq(self):
2264
    """Check prerequisites.
2265

2266
    This checks that the instance is in the cluster and is not running.
2267

2268
    """
2269
    instance = self.cfg.GetInstanceInfo(
2270
      self.cfg.ExpandInstanceName(self.op.instance_name))
2271
    if instance is None:
2272
      raise errors.OpPrereqError("Instance '%s' not known" %
2273
                                 self.op.instance_name)
2274
    if instance.status != "down":
2275
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2276
                                 self.op.instance_name)
2277
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2278
    if remote_info:
2279
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2280
                                 (self.op.instance_name,
2281
                                  instance.primary_node))
2282
    self.instance = instance
2283

    
2284
    # new name verification
2285
    name_info = utils.HostInfo(self.op.new_name)
2286

    
2287
    self.op.new_name = new_name = name_info.name
2288
    instance_list = self.cfg.GetInstanceList()
2289
    if new_name in instance_list:
2290
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2291
                                 new_name)
2292

    
2293
    if not getattr(self.op, "ignore_ip", False):
2294
      command = ["fping", "-q", name_info.ip]
2295
      result = utils.RunCmd(command)
2296
      if not result.failed:
2297
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2298
                                   (name_info.ip, new_name))
2299

    
2300

    
2301
  def Exec(self, feedback_fn):
2302
    """Reinstall the instance.
2303

2304
    """
2305
    inst = self.instance
2306
    old_name = inst.name
2307

    
2308
    if inst.disk_template == constants.DT_FILE:
2309
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2310

    
2311
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2312

    
2313
    # re-read the instance from the configuration after rename
2314
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2315

    
2316
    if inst.disk_template == constants.DT_FILE:
2317
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2318
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2319
                                                old_file_storage_dir,
2320
                                                new_file_storage_dir)
2321

    
2322
      if not result:
2323
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2324
                                 " directory '%s' to '%s' (but the instance"
2325
                                 " has been renamed in Ganeti)" % (
2326
                                 inst.primary_node, old_file_storage_dir,
2327
                                 new_file_storage_dir))
2328

    
2329
      if not result[0]:
2330
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2331
                                 " (but the instance has been renamed in"
2332
                                 " Ganeti)" % (old_file_storage_dir,
2333
                                               new_file_storage_dir))
2334

    
2335
    _StartInstanceDisks(self.cfg, inst, None)
2336
    try:
2337
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2338
                                          "sda", "sdb"):
2339
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2340
               " instance has been renamed in Ganeti)" %
2341
               (inst.name, inst.primary_node))
2342
        logger.Error(msg)
2343
    finally:
2344
      _ShutdownInstanceDisks(inst, self.cfg)
2345

    
2346

    
2347
class LURemoveInstance(LogicalUnit):
2348
  """Remove an instance.
2349

2350
  """
2351
  HPATH = "instance-remove"
2352
  HTYPE = constants.HTYPE_INSTANCE
2353
  _OP_REQP = ["instance_name", "ignore_failures"]
2354

    
2355
  def BuildHooksEnv(self):
2356
    """Build hooks env.
2357

2358
    This runs on master, primary and secondary nodes of the instance.
2359

2360
    """
2361
    env = _BuildInstanceHookEnvByObject(self.instance)
2362
    nl = [self.sstore.GetMasterNode()]
2363
    return env, nl, nl
2364

    
2365
  def CheckPrereq(self):
2366
    """Check prerequisites.
2367

2368
    This checks that the instance is in the cluster.
2369

2370
    """
2371
    instance = self.cfg.GetInstanceInfo(
2372
      self.cfg.ExpandInstanceName(self.op.instance_name))
2373
    if instance is None:
2374
      raise errors.OpPrereqError("Instance '%s' not known" %
2375
                                 self.op.instance_name)
2376
    self.instance = instance
2377

    
2378
  def Exec(self, feedback_fn):
2379
    """Remove the instance.
2380

2381
    """
2382
    instance = self.instance
2383
    logger.Info("shutting down instance %s on node %s" %
2384
                (instance.name, instance.primary_node))
2385

    
2386
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2387
      if self.op.ignore_failures:
2388
        feedback_fn("Warning: can't shutdown instance")
2389
      else:
2390
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2391
                                 (instance.name, instance.primary_node))
2392

    
2393
    logger.Info("removing block devices for instance %s" % instance.name)
2394

    
2395
    if not _RemoveDisks(instance, self.cfg):
2396
      if self.op.ignore_failures:
2397
        feedback_fn("Warning: can't remove instance's disks")
2398
      else:
2399
        raise errors.OpExecError("Can't remove instance's disks")
2400

    
2401
    logger.Info("removing instance %s out of cluster config" % instance.name)
2402

    
2403
    self.cfg.RemoveInstance(instance.name)
2404

    
2405

    
2406
class LUQueryInstances(NoHooksLU):
2407
  """Logical unit for querying instances.
2408

2409
  """
2410
  _OP_REQP = ["output_fields", "names"]
2411

    
2412
  def CheckPrereq(self):
2413
    """Check prerequisites.
2414

2415
    This checks that the fields required are valid output fields.
2416

2417
    """
2418
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2419
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2420
                               "admin_state", "admin_ram",
2421
                               "disk_template", "ip", "mac", "bridge",
2422
                               "sda_size", "sdb_size", "vcpus", "tags"],
2423
                       dynamic=self.dynamic_fields,
2424
                       selected=self.op.output_fields)
2425

    
2426
    self.wanted = _GetWantedInstances(self, self.op.names)
2427

    
2428
  def Exec(self, feedback_fn):
2429
    """Computes the list of nodes and their attributes.
2430

2431
    """
2432
    instance_names = self.wanted
2433
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2434
                     in instance_names]
2435

    
2436
    # begin data gathering
2437

    
2438
    nodes = frozenset([inst.primary_node for inst in instance_list])
2439

    
2440
    bad_nodes = []
2441
    if self.dynamic_fields.intersection(self.op.output_fields):
2442
      live_data = {}
2443
      node_data = rpc.call_all_instances_info(nodes)
2444
      for name in nodes:
2445
        result = node_data[name]
2446
        if result:
2447
          live_data.update(result)
2448
        elif result == False:
2449
          bad_nodes.append(name)
2450
        # else no instance is alive
2451
    else:
2452
      live_data = dict([(name, {}) for name in instance_names])
2453

    
2454
    # end data gathering
2455

    
2456
    output = []
2457
    for instance in instance_list:
2458
      iout = []
2459
      for field in self.op.output_fields:
2460
        if field == "name":
2461
          val = instance.name
2462
        elif field == "os":
2463
          val = instance.os
2464
        elif field == "pnode":
2465
          val = instance.primary_node
2466
        elif field == "snodes":
2467
          val = list(instance.secondary_nodes)
2468
        elif field == "admin_state":
2469
          val = (instance.status != "down")
2470
        elif field == "oper_state":
2471
          if instance.primary_node in bad_nodes:
2472
            val = None
2473
          else:
2474
            val = bool(live_data.get(instance.name))
2475
        elif field == "status":
2476
          if instance.primary_node in bad_nodes:
2477
            val = "ERROR_nodedown"
2478
          else:
2479
            running = bool(live_data.get(instance.name))
2480
            if running:
2481
              if instance.status != "down":
2482
                val = "running"
2483
              else:
2484
                val = "ERROR_up"
2485
            else:
2486
              if instance.status != "down":
2487
                val = "ERROR_down"
2488
              else:
2489
                val = "ADMIN_down"
2490
        elif field == "admin_ram":
2491
          val = instance.memory
2492
        elif field == "oper_ram":
2493
          if instance.primary_node in bad_nodes:
2494
            val = None
2495
          elif instance.name in live_data:
2496
            val = live_data[instance.name].get("memory", "?")
2497
          else:
2498
            val = "-"
2499
        elif field == "disk_template":
2500
          val = instance.disk_template
2501
        elif field == "ip":
2502
          val = instance.nics[0].ip
2503
        elif field == "bridge":
2504
          val = instance.nics[0].bridge
2505
        elif field == "mac":
2506
          val = instance.nics[0].mac
2507
        elif field == "sda_size" or field == "sdb_size":
2508
          disk = instance.FindDisk(field[:3])
2509
          if disk is None:
2510
            val = None
2511
          else:
2512
            val = disk.size
2513
        elif field == "vcpus":
2514
          val = instance.vcpus
2515
        elif field == "tags":
2516
          val = list(instance.GetTags())
2517
        else:
2518
          raise errors.ParameterError(field)
2519
        iout.append(val)
2520
      output.append(iout)
2521

    
2522
    return output
2523

    
2524

    
2525
class LUFailoverInstance(LogicalUnit):
2526
  """Failover an instance.
2527

2528
  """
2529
  HPATH = "instance-failover"
2530
  HTYPE = constants.HTYPE_INSTANCE
2531
  _OP_REQP = ["instance_name", "ignore_consistency"]
2532

    
2533
  def BuildHooksEnv(self):
2534
    """Build hooks env.
2535

2536
    This runs on master, primary and secondary nodes of the instance.
2537

2538
    """
2539
    env = {
2540
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2541
      }
2542
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2543
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2544
    return env, nl, nl
2545

    
2546
  def CheckPrereq(self):
2547
    """Check prerequisites.
2548

2549
    This checks that the instance is in the cluster.
2550

2551
    """
2552
    instance = self.cfg.GetInstanceInfo(
2553
      self.cfg.ExpandInstanceName(self.op.instance_name))
2554
    if instance is None:
2555
      raise errors.OpPrereqError("Instance '%s' not known" %
2556
                                 self.op.instance_name)
2557

    
2558
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2559
      raise errors.OpPrereqError("Instance's disk layout is not"
2560
                                 " network mirrored, cannot failover.")
2561

    
2562
    secondary_nodes = instance.secondary_nodes
2563
    if not secondary_nodes:
2564
      raise errors.ProgrammerError("no secondary node but using "
2565
                                   "a mirrored disk template")
2566

    
2567
    target_node = secondary_nodes[0]
2568
    # check memory requirements on the secondary node
2569
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2570
                         instance.name, instance.memory)
2571

    
2572
    # check bridge existance
2573
    brlist = [nic.bridge for nic in instance.nics]
2574
    if not rpc.call_bridges_exist(target_node, brlist):
2575
      raise errors.OpPrereqError("One or more target bridges %s does not"
2576
                                 " exist on destination node '%s'" %
2577
                                 (brlist, target_node))
2578

    
2579
    self.instance = instance
2580

    
2581
  def Exec(self, feedback_fn):
2582
    """Failover an instance.
2583

2584
    The failover is done by shutting it down on its present node and
2585
    starting it on the secondary.
2586

2587
    """
2588
    instance = self.instance
2589

    
2590
    source_node = instance.primary_node
2591
    target_node = instance.secondary_nodes[0]
2592

    
2593
    feedback_fn("* checking disk consistency between source and target")
2594
    for dev in instance.disks:
2595
      # for drbd, these are drbd over lvm
2596
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2597
        if instance.status == "up" and not self.op.ignore_consistency:
2598
          raise errors.OpExecError("Disk %s is degraded on target node,"
2599
                                   " aborting failover." % dev.iv_name)
2600

    
2601
    feedback_fn("* shutting down instance on source node")
2602
    logger.Info("Shutting down instance %s on node %s" %
2603
                (instance.name, source_node))
2604

    
2605
    if not rpc.call_instance_shutdown(source_node, instance):
2606
      if self.op.ignore_consistency:
2607
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2608
                     " anyway. Please make sure node %s is down"  %
2609
                     (instance.name, source_node, source_node))
2610
      else:
2611
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2612
                                 (instance.name, source_node))
2613

    
2614
    feedback_fn("* deactivating the instance's disks on source node")
2615
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2616
      raise errors.OpExecError("Can't shut down the instance's disks.")
2617

    
2618
    instance.primary_node = target_node
2619
    # distribute new instance config to the other nodes
2620
    self.cfg.Update(instance)
2621

    
2622
    # Only start the instance if it's marked as up
2623
    if instance.status == "up":
2624
      feedback_fn("* activating the instance's disks on target node")
2625
      logger.Info("Starting instance %s on node %s" %
2626
                  (instance.name, target_node))
2627

    
2628
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2629
                                               ignore_secondaries=True)
2630
      if not disks_ok:
2631
        _ShutdownInstanceDisks(instance, self.cfg)
2632
        raise errors.OpExecError("Can't activate the instance's disks")
2633

    
2634
      feedback_fn("* starting the instance on the target node")
2635
      if not rpc.call_instance_start(target_node, instance, None):
2636
        _ShutdownInstanceDisks(instance, self.cfg)
2637
        raise errors.OpExecError("Could not start instance %s on node %s." %
2638
                                 (instance.name, target_node))
2639

    
2640

    
2641
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2642
  """Create a tree of block devices on the primary node.
2643

2644
  This always creates all devices.
2645

2646
  """
2647
  if device.children:
2648
    for child in device.children:
2649
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2650
        return False
2651

    
2652
  cfg.SetDiskID(device, node)
2653
  new_id = rpc.call_blockdev_create(node, device, device.size,
2654
                                    instance.name, True, info)
2655
  if not new_id:
2656
    return False
2657
  if device.physical_id is None:
2658
    device.physical_id = new_id
2659
  return True
2660

    
2661

    
2662
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2663
  """Create a tree of block devices on a secondary node.
2664

2665
  If this device type has to be created on secondaries, create it and
2666
  all its children.
2667

2668
  If not, just recurse to children keeping the same 'force' value.
2669

2670
  """
2671
  if device.CreateOnSecondary():
2672
    force = True
2673
  if device.children:
2674
    for child in device.children:
2675
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2676
                                        child, force, info):
2677
        return False
2678

    
2679
  if not force:
2680
    return True
2681
  cfg.SetDiskID(device, node)
2682
  new_id = rpc.call_blockdev_create(node, device, device.size,
2683
                                    instance.name, False, info)
2684
  if not new_id:
2685
    return False
2686
  if device.physical_id is None:
2687
    device.physical_id = new_id
2688
  return True
2689

    
2690

    
2691
def _GenerateUniqueNames(cfg, exts):
2692
  """Generate a suitable LV name.
2693

2694
  This will generate a logical volume name for the given instance.
2695

2696
  """
2697
  results = []
2698
  for val in exts:
2699
    new_id = cfg.GenerateUniqueID()
2700
    results.append("%s%s" % (new_id, val))
2701
  return results
2702

    
2703

    
2704
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2705
  """Generate a drbd device complete with its children.
2706

2707
  """
2708
  port = cfg.AllocatePort()
2709
  vgname = cfg.GetVGName()
2710
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2711
                          logical_id=(vgname, names[0]))
2712
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2713
                          logical_id=(vgname, names[1]))
2714
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2715
                          logical_id = (primary, secondary, port),
2716
                          children = [dev_data, dev_meta])
2717
  return drbd_dev
2718

    
2719

    
2720
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2721
  """Generate a drbd8 device complete with its children.
2722

2723
  """
2724
  port = cfg.AllocatePort()
2725
  vgname = cfg.GetVGName()
2726
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2727
                          logical_id=(vgname, names[0]))
2728
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2729
                          logical_id=(vgname, names[1]))
2730
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2731
                          logical_id = (primary, secondary, port),
2732
                          children = [dev_data, dev_meta],
2733
                          iv_name=iv_name)
2734
  return drbd_dev
2735

    
2736

    
2737
def _GenerateDiskTemplate(cfg, template_name,
2738
                          instance_name, primary_node,
2739
                          secondary_nodes, disk_sz, swap_sz,
2740
                          file_storage_dir, file_driver):
2741
  """Generate the entire disk layout for a given template type.
2742

2743
  """
2744
  #TODO: compute space requirements
2745

    
2746
  vgname = cfg.GetVGName()
2747
  if template_name == constants.DT_DISKLESS:
2748
    disks = []
2749
  elif template_name == constants.DT_PLAIN:
2750
    if len(secondary_nodes) != 0:
2751
      raise errors.ProgrammerError("Wrong template configuration")
2752

    
2753
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2754
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2755
                           logical_id=(vgname, names[0]),
2756
                           iv_name = "sda")
2757
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2758
                           logical_id=(vgname, names[1]),
2759
                           iv_name = "sdb")
2760
    disks = [sda_dev, sdb_dev]
2761
  elif template_name == constants.DT_DRBD8:
2762
    if len(secondary_nodes) != 1:
2763
      raise errors.ProgrammerError("Wrong template configuration")
2764
    remote_node = secondary_nodes[0]
2765
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2766
                                       ".sdb_data", ".sdb_meta"])
2767
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2768
                                         disk_sz, names[0:2], "sda")
2769
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2770
                                         swap_sz, names[2:4], "sdb")
2771
    disks = [drbd_sda_dev, drbd_sdb_dev]
2772
  elif template_name == constants.DT_FILE:
2773
    if len(secondary_nodes) != 0:
2774
      raise errors.ProgrammerError("Wrong template configuration")
2775

    
2776
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2777
                                iv_name="sda", logical_id=(file_driver,
2778
                                "%s/sda" % file_storage_dir))
2779
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2780
                                iv_name="sdb", logical_id=(file_driver,
2781
                                "%s/sdb" % file_storage_dir))
2782
    disks = [file_sda_dev, file_sdb_dev]
2783
  else:
2784
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2785
  return disks
2786

    
2787

    
2788
def _GetInstanceInfoText(instance):
2789
  """Compute that text that should be added to the disk's metadata.
2790

2791
  """
2792
  return "originstname+%s" % instance.name
2793

    
2794

    
2795
def _CreateDisks(cfg, instance):
2796
  """Create all disks for an instance.
2797

2798
  This abstracts away some work from AddInstance.
2799

2800
  Args:
2801
    instance: the instance object
2802

2803
  Returns:
2804
    True or False showing the success of the creation process
2805

2806
  """
2807
  info = _GetInstanceInfoText(instance)
2808

    
2809
  if instance.disk_template == constants.DT_FILE:
2810
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2811
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2812
                                              file_storage_dir)
2813

    
2814
    if not result:
2815
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2816
      return False
2817

    
2818
    if not result[0]:
2819
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2820
      return False
2821

    
2822
  for device in instance.disks:
2823
    logger.Info("creating volume %s for instance %s" %
2824
                (device.iv_name, instance.name))
2825
    #HARDCODE
2826
    for secondary_node in instance.secondary_nodes:
2827
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2828
                                        device, False, info):
2829
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2830
                     (device.iv_name, device, secondary_node))
2831
        return False
2832
    #HARDCODE
2833
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2834
                                    instance, device, info):
2835
      logger.Error("failed to create volume %s on primary!" %
2836
                   device.iv_name)
2837
      return False
2838

    
2839
  return True
2840

    
2841

    
2842
def _RemoveDisks(instance, cfg):
2843
  """Remove all disks for an instance.
2844

2845
  This abstracts away some work from `AddInstance()` and
2846
  `RemoveInstance()`. Note that in case some of the devices couldn't
2847
  be removed, the removal will continue with the other ones (compare
2848
  with `_CreateDisks()`).
2849

2850
  Args:
2851
    instance: the instance object
2852

2853
  Returns:
2854
    True or False showing the success of the removal proces
2855

2856
  """
2857
  logger.Info("removing block devices for instance %s" % instance.name)
2858

    
2859
  result = True
2860
  for device in instance.disks:
2861
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2862
      cfg.SetDiskID(disk, node)
2863
      if not rpc.call_blockdev_remove(node, disk):
2864
        logger.Error("could not remove block device %s on node %s,"
2865
                     " continuing anyway" %
2866
                     (device.iv_name, node))
2867
        result = False
2868

    
2869
  if instance.disk_template == constants.DT_FILE:
2870
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2871
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
2872
                                            file_storage_dir):
2873
      logger.Error("could not remove directory '%s'" % file_storage_dir)
2874
      result = False
2875

    
2876
  return result
2877

    
2878

    
2879
def _ComputeDiskSize(disk_template, disk_size, swap_size):
2880
  """Compute disk size requirements in the volume group
2881

2882
  This is currently hard-coded for the two-drive layout.
2883

2884
  """
2885
  # Required free disk space as a function of disk and swap space
2886
  req_size_dict = {
2887
    constants.DT_DISKLESS: None,
2888
    constants.DT_PLAIN: disk_size + swap_size,
2889
    # 256 MB are added for drbd metadata, 128MB for each drbd device
2890
    constants.DT_DRBD8: disk_size + swap_size + 256,
2891
    constants.DT_FILE: None,
2892
  }
2893

    
2894
  if disk_template not in req_size_dict:
2895
    raise errors.ProgrammerError("Disk template '%s' size requirement"
2896
                                 " is unknown" %  disk_template)
2897

    
2898
  return req_size_dict[disk_template]
2899

    
2900

    
2901
class LUCreateInstance(LogicalUnit):
2902
  """Create an instance.
2903

2904
  """
2905
  HPATH = "instance-add"
2906
  HTYPE = constants.HTYPE_INSTANCE
2907
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
2908
              "disk_template", "swap_size", "mode", "start", "vcpus",
2909
              "wait_for_sync", "ip_check", "mac"]
2910

    
2911
  def _RunAllocator(self):
2912
    """Run the allocator based on input opcode.
2913

2914
    """
2915
    disks = [{"size": self.op.disk_size, "mode": "w"},
2916
             {"size": self.op.swap_size, "mode": "w"}]
2917
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2918
             "bridge": self.op.bridge}]
2919
    ial = IAllocator(self.cfg, self.sstore,
2920
                     mode=constants.IALLOCATOR_MODE_ALLOC,
2921
                     name=self.op.instance_name,
2922
                     disk_template=self.op.disk_template,
2923
                     tags=[],
2924
                     os=self.op.os_type,
2925
                     vcpus=self.op.vcpus,
2926
                     mem_size=self.op.mem_size,
2927
                     disks=disks,
2928
                     nics=nics,
2929
                     )
2930

    
2931
    ial.Run(self.op.iallocator)
2932

    
2933
    if not ial.success:
2934
      raise errors.OpPrereqError("Can't compute nodes using"
2935
                                 " iallocator '%s': %s" % (self.op.iallocator,
2936
                                                           ial.info))
2937
    if len(ial.nodes) != ial.required_nodes:
2938
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2939
                                 " of nodes (%s), required %s" %
2940
                                 (len(ial.nodes), ial.required_nodes))
2941
    self.op.pnode = ial.nodes[0]
2942
    logger.ToStdout("Selected nodes for the instance: %s" %
2943
                    (", ".join(ial.nodes),))
2944
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2945
                (self.op.instance_name, self.op.iallocator, ial.nodes))
2946
    if ial.required_nodes == 2:
2947
      self.op.snode = ial.nodes[1]
2948

    
2949
  def BuildHooksEnv(self):
2950
    """Build hooks env.
2951

2952
    This runs on master, primary and secondary nodes of the instance.
2953

2954
    """
2955
    env = {
2956
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2957
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2958
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2959
      "INSTANCE_ADD_MODE": self.op.mode,
2960
      }
2961
    if self.op.mode == constants.INSTANCE_IMPORT:
2962
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2963
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2964
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2965

    
2966
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2967
      primary_node=self.op.pnode,
2968
      secondary_nodes=self.secondaries,
2969
      status=self.instance_status,
2970
      os_type=self.op.os_type,
2971
      memory=self.op.mem_size,
2972
      vcpus=self.op.vcpus,
2973
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2974
    ))
2975

    
2976
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2977
          self.secondaries)
2978
    return env, nl, nl
2979

    
2980

    
2981
  def CheckPrereq(self):
2982
    """Check prerequisites.
2983

2984
    """
2985
    # set optional parameters to none if they don't exist
2986
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
2987
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
2988
                 "vnc_bind_address"]:
2989
      if not hasattr(self.op, attr):
2990
        setattr(self.op, attr, None)
2991

    
2992
    if self.op.mode not in (constants.INSTANCE_CREATE,
2993
                            constants.INSTANCE_IMPORT):
2994
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2995
                                 self.op.mode)
2996

    
2997
    if (not self.cfg.GetVGName() and
2998
        self.op.disk_template not in constants.DTS_NOT_LVM):
2999
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3000
                                 " instances")
3001

    
3002
    if self.op.mode == constants.INSTANCE_IMPORT:
3003
      src_node = getattr(self.op, "src_node", None)
3004
      src_path = getattr(self.op, "src_path", None)
3005
      if src_node is None or src_path is None:
3006
        raise errors.OpPrereqError("Importing an instance requires source"
3007
                                   " node and path options")
3008
      src_node_full = self.cfg.ExpandNodeName(src_node)
3009
      if src_node_full is None:
3010
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3011
      self.op.src_node = src_node = src_node_full
3012

    
3013
      if not os.path.isabs(src_path):
3014
        raise errors.OpPrereqError("The source path must be absolute")
3015

    
3016
      export_info = rpc.call_export_info(src_node, src_path)
3017

    
3018
      if not export_info:
3019
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3020

    
3021
      if not export_info.has_section(constants.INISECT_EXP):
3022
        raise errors.ProgrammerError("Corrupted export config")
3023

    
3024
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3025
      if (int(ei_version) != constants.EXPORT_VERSION):
3026
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3027
                                   (ei_version, constants.EXPORT_VERSION))
3028

    
3029
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3030
        raise errors.OpPrereqError("Can't import instance with more than"
3031
                                   " one data disk")
3032

    
3033
      # FIXME: are the old os-es, disk sizes, etc. useful?
3034
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3035
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3036
                                                         'disk0_dump'))
3037
      self.src_image = diskimage
3038
    else: # INSTANCE_CREATE
3039
      if getattr(self.op, "os_type", None) is None:
3040
        raise errors.OpPrereqError("No guest OS specified")
3041

    
3042
    #### instance parameters check
3043

    
3044
    # disk template and mirror node verification
3045
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3046
      raise errors.OpPrereqError("Invalid disk template name")
3047

    
3048
    # instance name verification
3049
    hostname1 = utils.HostInfo(self.op.instance_name)
3050

    
3051
    self.op.instance_name = instance_name = hostname1.name
3052
    instance_list = self.cfg.GetInstanceList()
3053
    if instance_name in instance_list:
3054
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3055
                                 instance_name)
3056

    
3057
    # ip validity checks
3058
    ip = getattr(self.op, "ip", None)
3059
    if ip is None or ip.lower() == "none":
3060
      inst_ip = None
3061
    elif ip.lower() == "auto":
3062
      inst_ip = hostname1.ip
3063
    else:
3064
      if not utils.IsValidIP(ip):
3065
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3066
                                   " like a valid IP" % ip)
3067
      inst_ip = ip
3068
    self.inst_ip = self.op.ip = inst_ip
3069

    
3070
    if self.op.start and not self.op.ip_check:
3071
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3072
                                 " adding an instance in start mode")
3073

    
3074
    if self.op.ip_check:
3075
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3076
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3077
                                   (hostname1.ip, instance_name))
3078

    
3079
    # MAC address verification
3080
    if self.op.mac != "auto":
3081
      if not utils.IsValidMac(self.op.mac.lower()):
3082
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3083
                                   self.op.mac)
3084

    
3085
    # bridge verification
3086
    bridge = getattr(self.op, "bridge", None)
3087
    if bridge is None:
3088
      self.op.bridge = self.cfg.GetDefBridge()
3089
    else:
3090
      self.op.bridge = bridge
3091

    
3092
    # boot order verification
3093
    if self.op.hvm_boot_order is not None:
3094
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3095
        raise errors.OpPrereqError("invalid boot order specified,"
3096
                                   " must be one or more of [acdn]")
3097
    # file storage checks
3098
    if (self.op.file_driver and
3099
        not self.op.file_driver in constants.FILE_DRIVER):
3100
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3101
                                 self.op.file_driver)
3102

    
3103
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3104
      raise errors.OpPrereqError("File storage directory not a relative"
3105
                                 " path")
3106
    #### allocator run
3107

    
3108
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3109
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3110
                                 " node must be given")
3111

    
3112
    if self.op.iallocator is not None:
3113
      self._RunAllocator()
3114

    
3115
    #### node related checks
3116

    
3117
    # check primary node
3118
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3119
    if pnode is None:
3120
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3121
                                 self.op.pnode)
3122
    self.op.pnode = pnode.name
3123
    self.pnode = pnode
3124
    self.secondaries = []
3125

    
3126
    # mirror node verification
3127
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3128
      if getattr(self.op, "snode", None) is None:
3129
        raise errors.OpPrereqError("The networked disk templates need"
3130
                                   " a mirror node")
3131

    
3132
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3133
      if snode_name is None:
3134
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3135
                                   self.op.snode)
3136
      elif snode_name == pnode.name:
3137
        raise errors.OpPrereqError("The secondary node cannot be"
3138
                                   " the primary node.")
3139
      self.secondaries.append(snode_name)
3140

    
3141
    req_size = _ComputeDiskSize(self.op.disk_template,
3142
                                self.op.disk_size, self.op.swap_size)
3143

    
3144
    # Check lv size requirements
3145
    if req_size is not None:
3146
      nodenames = [pnode.name] + self.secondaries
3147
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3148
      for node in nodenames:
3149
        info = nodeinfo.get(node, None)
3150
        if not info:
3151
          raise errors.OpPrereqError("Cannot get current information"
3152
                                     " from node '%s'" % node)
3153
        vg_free = info.get('vg_free', None)
3154
        if not isinstance(vg_free, int):
3155
          raise errors.OpPrereqError("Can't compute free disk space on"
3156
                                     " node %s" % node)
3157
        if req_size > info['vg_free']:
3158
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3159
                                     " %d MB available, %d MB required" %
3160
                                     (node, info['vg_free'], req_size))
3161

    
3162
    # os verification
3163
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3164
    if not os_obj:
3165
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3166
                                 " primary node"  % self.op.os_type)
3167

    
3168
    if self.op.kernel_path == constants.VALUE_NONE:
3169
      raise errors.OpPrereqError("Can't set instance kernel to none")
3170

    
3171

    
3172
    # bridge check on primary node
3173
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3174
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3175
                                 " destination node '%s'" %
3176
                                 (self.op.bridge, pnode.name))
3177

    
3178
    # memory check on primary node
3179
    if self.op.start:
3180
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3181
                           "creating instance %s" % self.op.instance_name,
3182
                           self.op.mem_size)
3183

    
3184
    # hvm_cdrom_image_path verification
3185
    if self.op.hvm_cdrom_image_path is not None:
3186
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3187
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3188
                                   " be an absolute path or None, not %s" %
3189
                                   self.op.hvm_cdrom_image_path)
3190
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3191
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3192
                                   " regular file or a symlink pointing to"
3193
                                   " an existing regular file, not %s" %
3194
                                   self.op.hvm_cdrom_image_path)
3195

    
3196
    # vnc_bind_address verification
3197
    if self.op.vnc_bind_address is not None:
3198
      if not utils.IsValidIP(self.op.vnc_bind_address):
3199
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3200
                                   " like a valid IP address" %
3201
                                   self.op.vnc_bind_address)
3202

    
3203
    if self.op.start:
3204
      self.instance_status = 'up'
3205
    else:
3206
      self.instance_status = 'down'
3207

    
3208
  def Exec(self, feedback_fn):
3209
    """Create and add the instance to the cluster.
3210

3211
    """
3212
    instance = self.op.instance_name
3213
    pnode_name = self.pnode.name
3214

    
3215
    if self.op.mac == "auto":
3216
      mac_address = self.cfg.GenerateMAC()
3217
    else:
3218
      mac_address = self.op.mac
3219

    
3220
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3221
    if self.inst_ip is not None:
3222
      nic.ip = self.inst_ip
3223

    
3224
    ht_kind = self.sstore.GetHypervisorType()
3225
    if ht_kind in constants.HTS_REQ_PORT:
3226
      network_port = self.cfg.AllocatePort()
3227
    else:
3228
      network_port = None
3229

    
3230
    if self.op.vnc_bind_address is None:
3231
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3232

    
3233
    # this is needed because os.path.join does not accept None arguments
3234
    if self.op.file_storage_dir is None:
3235
      string_file_storage_dir = ""
3236
    else:
3237
      string_file_storage_dir = self.op.file_storage_dir
3238

    
3239
    # build the full file storage dir path
3240
    file_storage_dir = os.path.normpath(os.path.join(
3241
                                        self.sstore.GetFileStorageDir(),
3242
                                        string_file_storage_dir, instance))
3243

    
3244

    
3245
    disks = _GenerateDiskTemplate(self.cfg,
3246
                                  self.op.disk_template,
3247
                                  instance, pnode_name,
3248
                                  self.secondaries, self.op.disk_size,
3249
                                  self.op.swap_size,
3250
                                  file_storage_dir,
3251
                                  self.op.file_driver)
3252

    
3253
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3254
                            primary_node=pnode_name,
3255
                            memory=self.op.mem_size,
3256
                            vcpus=self.op.vcpus,
3257
                            nics=[nic], disks=disks,
3258
                            disk_template=self.op.disk_template,
3259
                            status=self.instance_status,
3260
                            network_port=network_port,
3261
                            kernel_path=self.op.kernel_path,
3262
                            initrd_path=self.op.initrd_path,
3263
                            hvm_boot_order=self.op.hvm_boot_order,
3264
                            hvm_acpi=self.op.hvm_acpi,
3265
                            hvm_pae=self.op.hvm_pae,
3266
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3267
                            vnc_bind_address=self.op.vnc_bind_address,
3268
                            )
3269

    
3270
    feedback_fn("* creating instance disks...")
3271
    if not _CreateDisks(self.cfg, iobj):
3272
      _RemoveDisks(iobj, self.cfg)
3273
      raise errors.OpExecError("Device creation failed, reverting...")
3274

    
3275
    feedback_fn("adding instance %s to cluster config" % instance)
3276

    
3277
    self.cfg.AddInstance(iobj)
3278

    
3279
    if self.op.wait_for_sync:
3280
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3281
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3282
      # make sure the disks are not degraded (still sync-ing is ok)
3283
      time.sleep(15)
3284
      feedback_fn("* checking mirrors status")
3285
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3286
    else:
3287
      disk_abort = False
3288

    
3289
    if disk_abort:
3290
      _RemoveDisks(iobj, self.cfg)
3291
      self.cfg.RemoveInstance(iobj.name)
3292
      raise errors.OpExecError("There are some degraded disks for"
3293
                               " this instance")
3294

    
3295
    feedback_fn("creating os for instance %s on node %s" %
3296
                (instance, pnode_name))
3297

    
3298
    if iobj.disk_template != constants.DT_DISKLESS:
3299
      if self.op.mode == constants.INSTANCE_CREATE:
3300
        feedback_fn("* running the instance OS create scripts...")
3301
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3302
          raise errors.OpExecError("could not add os for instance %s"
3303
                                   " on node %s" %
3304
                                   (instance, pnode_name))
3305

    
3306
      elif self.op.mode == constants.INSTANCE_IMPORT:
3307
        feedback_fn("* running the instance OS import scripts...")
3308
        src_node = self.op.src_node
3309
        src_image = self.src_image
3310
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3311
                                                src_node, src_image):
3312
          raise errors.OpExecError("Could not import os for instance"
3313
                                   " %s on node %s" %
3314
                                   (instance, pnode_name))
3315
      else:
3316
        # also checked in the prereq part
3317
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3318
                                     % self.op.mode)
3319

    
3320
    if self.op.start:
3321
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3322
      feedback_fn("* starting instance...")
3323
      if not rpc.call_instance_start(pnode_name, iobj, None):
3324
        raise errors.OpExecError("Could not start instance")
3325

    
3326

    
3327
class LUConnectConsole(NoHooksLU):
3328
  """Connect to an instance's console.
3329

3330
  This is somewhat special in that it returns the command line that
3331
  you need to run on the master node in order to connect to the
3332
  console.
3333

3334
  """
3335
  _OP_REQP = ["instance_name"]
3336

    
3337
  def CheckPrereq(self):
3338
    """Check prerequisites.
3339

3340
    This checks that the instance is in the cluster.
3341

3342
    """
3343
    instance = self.cfg.GetInstanceInfo(
3344
      self.cfg.ExpandInstanceName(self.op.instance_name))
3345
    if instance is None:
3346
      raise errors.OpPrereqError("Instance '%s' not known" %
3347
                                 self.op.instance_name)
3348
    self.instance = instance
3349

    
3350
  def Exec(self, feedback_fn):
3351
    """Connect to the console of an instance
3352

3353
    """
3354
    instance = self.instance
3355
    node = instance.primary_node
3356

    
3357
    node_insts = rpc.call_instance_list([node])[node]
3358
    if node_insts is False:
3359
      raise errors.OpExecError("Can't connect to node %s." % node)
3360

    
3361
    if instance.name not in node_insts:
3362
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3363

    
3364
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3365

    
3366
    hyper = hypervisor.GetHypervisor()
3367
    console_cmd = hyper.GetShellCommandForConsole(instance)
3368

    
3369
    # build ssh cmdline
3370
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3371

    
3372

    
3373
class LUReplaceDisks(LogicalUnit):
3374
  """Replace the disks of an instance.
3375

3376
  """
3377
  HPATH = "mirrors-replace"
3378
  HTYPE = constants.HTYPE_INSTANCE
3379
  _OP_REQP = ["instance_name", "mode", "disks"]
3380

    
3381
  def _RunAllocator(self):
3382
    """Compute a new secondary node using an IAllocator.
3383

3384
    """
3385
    ial = IAllocator(self.cfg, self.sstore,
3386
                     mode=constants.IALLOCATOR_MODE_RELOC,
3387
                     name=self.op.instance_name,
3388
                     relocate_from=[self.sec_node])
3389

    
3390
    ial.Run(self.op.iallocator)
3391

    
3392
    if not ial.success:
3393
      raise errors.OpPrereqError("Can't compute nodes using"
3394
                                 " iallocator '%s': %s" % (self.op.iallocator,
3395
                                                           ial.info))
3396
    if len(ial.nodes) != ial.required_nodes:
3397
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3398
                                 " of nodes (%s), required %s" %
3399
                                 (len(ial.nodes), ial.required_nodes))
3400
    self.op.remote_node = ial.nodes[0]
3401
    logger.ToStdout("Selected new secondary for the instance: %s" %
3402
                    self.op.remote_node)
3403

    
3404
  def BuildHooksEnv(self):
3405
    """Build hooks env.
3406

3407
    This runs on the master, the primary and all the secondaries.
3408

3409
    """
3410
    env = {
3411
      "MODE": self.op.mode,
3412
      "NEW_SECONDARY": self.op.remote_node,
3413
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3414
      }
3415
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3416
    nl = [
3417
      self.sstore.GetMasterNode(),
3418
      self.instance.primary_node,
3419
      ]
3420
    if self.op.remote_node is not None:
3421
      nl.append(self.op.remote_node)
3422
    return env, nl, nl
3423

    
3424
  def CheckPrereq(self):
3425
    """Check prerequisites.
3426

3427
    This checks that the instance is in the cluster.
3428

3429
    """
3430
    if not hasattr(self.op, "remote_node"):
3431
      self.op.remote_node = None
3432

    
3433
    instance = self.cfg.GetInstanceInfo(
3434
      self.cfg.ExpandInstanceName(self.op.instance_name))
3435
    if instance is None:
3436
      raise errors.OpPrereqError("Instance '%s' not known" %
3437
                                 self.op.instance_name)
3438
    self.instance = instance
3439
    self.op.instance_name = instance.name
3440

    
3441
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3442
      raise errors.OpPrereqError("Instance's disk layout is not"
3443
                                 " network mirrored.")
3444

    
3445
    if len(instance.secondary_nodes) != 1:
3446
      raise errors.OpPrereqError("The instance has a strange layout,"
3447
                                 " expected one secondary but found %d" %
3448
                                 len(instance.secondary_nodes))
3449

    
3450
    self.sec_node = instance.secondary_nodes[0]
3451

    
3452
    ia_name = getattr(self.op, "iallocator", None)
3453
    if ia_name is not None:
3454
      if self.op.remote_node is not None:
3455
        raise errors.OpPrereqError("Give either the iallocator or the new"
3456
                                   " secondary, not both")
3457
      self.op.remote_node = self._RunAllocator()
3458

    
3459
    remote_node = self.op.remote_node
3460
    if remote_node is not None:
3461
      remote_node = self.cfg.ExpandNodeName(remote_node)
3462
      if remote_node is None:
3463
        raise errors.OpPrereqError("Node '%s' not known" %
3464
                                   self.op.remote_node)
3465
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3466
    else:
3467
      self.remote_node_info = None
3468
    if remote_node == instance.primary_node:
3469
      raise errors.OpPrereqError("The specified node is the primary node of"
3470
                                 " the instance.")
3471
    elif remote_node == self.sec_node:
3472
      if self.op.mode == constants.REPLACE_DISK_SEC:
3473
        # this is for DRBD8, where we can't execute the same mode of
3474
        # replacement as for drbd7 (no different port allocated)
3475
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3476
                                   " replacement")
3477
    if instance.disk_template == constants.DT_DRBD8:
3478
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3479
          remote_node is not None):
3480
        # switch to replace secondary mode
3481
        self.op.mode = constants.REPLACE_DISK_SEC
3482

    
3483
      if self.op.mode == constants.REPLACE_DISK_ALL:
3484
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3485
                                   " secondary disk replacement, not"
3486
                                   " both at once")
3487
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3488
        if remote_node is not None:
3489
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3490
                                     " the secondary while doing a primary"
3491
                                     " node disk replacement")
3492
        self.tgt_node = instance.primary_node
3493
        self.oth_node = instance.secondary_nodes[0]
3494
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3495
        self.new_node = remote_node # this can be None, in which case
3496
                                    # we don't change the secondary
3497
        self.tgt_node = instance.secondary_nodes[0]
3498
        self.oth_node = instance.primary_node
3499
      else:
3500
        raise errors.ProgrammerError("Unhandled disk replace mode")
3501

    
3502
    for name in self.op.disks:
3503
      if instance.FindDisk(name) is None:
3504
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3505
                                   (name, instance.name))
3506
    self.op.remote_node = remote_node
3507

    
3508
  def _ExecD8DiskOnly(self, feedback_fn):
3509
    """Replace a disk on the primary or secondary for dbrd8.
3510

3511
    The algorithm for replace is quite complicated:
3512
      - for each disk to be replaced:
3513
        - create new LVs on the target node with unique names
3514
        - detach old LVs from the drbd device
3515
        - rename old LVs to name_replaced.<time_t>
3516
        - rename new LVs to old LVs
3517
        - attach the new LVs (with the old names now) to the drbd device
3518
      - wait for sync across all devices
3519
      - for each modified disk:
3520
        - remove old LVs (which have the name name_replaces.<time_t>)
3521

3522
    Failures are not very well handled.
3523

3524
    """
3525
    steps_total = 6
3526
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3527
    instance = self.instance
3528
    iv_names = {}
3529
    vgname = self.cfg.GetVGName()
3530
    # start of work
3531
    cfg = self.cfg
3532
    tgt_node = self.tgt_node
3533
    oth_node = self.oth_node
3534

    
3535
    # Step: check device activation
3536
    self.proc.LogStep(1, steps_total, "check device existence")
3537
    info("checking volume groups")
3538
    my_vg = cfg.GetVGName()
3539
    results = rpc.call_vg_list([oth_node, tgt_node])
3540
    if not results:
3541
      raise errors.OpExecError("Can't list volume groups on the nodes")
3542
    for node in oth_node, tgt_node:
3543
      res = results.get(node, False)
3544
      if not res or my_vg not in res:
3545
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3546
                                 (my_vg, node))
3547
    for dev in instance.disks:
3548
      if not dev.iv_name in self.op.disks:
3549
        continue
3550
      for node in tgt_node, oth_node:
3551
        info("checking %s on %s" % (dev.iv_name, node))
3552
        cfg.SetDiskID(dev, node)
3553
        if not rpc.call_blockdev_find(node, dev):
3554
          raise errors.OpExecError("Can't find device %s on node %s" %
3555
                                   (dev.iv_name, node))
3556

    
3557
    # Step: check other node consistency
3558
    self.proc.LogStep(2, steps_total, "check peer consistency")
3559
    for dev in instance.disks:
3560
      if not dev.iv_name in self.op.disks:
3561
        continue
3562
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3563
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3564
                                   oth_node==instance.primary_node):
3565
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3566
                                 " to replace disks on this node (%s)" %
3567
                                 (oth_node, tgt_node))
3568

    
3569
    # Step: create new storage
3570
    self.proc.LogStep(3, steps_total, "allocate new storage")
3571
    for dev in instance.disks:
3572
      if not dev.iv_name in self.op.disks:
3573
        continue
3574
      size = dev.size
3575
      cfg.SetDiskID(dev, tgt_node)
3576
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3577
      names = _GenerateUniqueNames(cfg, lv_names)
3578
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3579
                             logical_id=(vgname, names[0]))
3580
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3581
                             logical_id=(vgname, names[1]))
3582
      new_lvs = [lv_data, lv_meta]
3583
      old_lvs = dev.children
3584
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3585
      info("creating new local storage on %s for %s" %
3586
           (tgt_node, dev.iv_name))
3587
      # since we *always* want to create this LV, we use the
3588
      # _Create...OnPrimary (which forces the creation), even if we
3589
      # are talking about the secondary node
3590
      for new_lv in new_lvs:
3591
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3592
                                        _GetInstanceInfoText(instance)):
3593
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3594
                                   " node '%s'" %
3595
                                   (new_lv.logical_id[1], tgt_node))
3596

    
3597
    # Step: for each lv, detach+rename*2+attach
3598
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3599
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3600
      info("detaching %s drbd from local storage" % dev.iv_name)
3601
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3602
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3603
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3604
      #dev.children = []
3605
      #cfg.Update(instance)
3606

    
3607
      # ok, we created the new LVs, so now we know we have the needed
3608
      # storage; as such, we proceed on the target node to rename
3609
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3610
      # using the assumption that logical_id == physical_id (which in
3611
      # turn is the unique_id on that node)
3612

    
3613
      # FIXME(iustin): use a better name for the replaced LVs
3614
      temp_suffix = int(time.time())
3615
      ren_fn = lambda d, suff: (d.physical_id[0],
3616
                                d.physical_id[1] + "_replaced-%s" % suff)
3617
      # build the rename list based on what LVs exist on the node
3618
      rlist = []
3619
      for to_ren in old_lvs:
3620
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3621
        if find_res is not None: # device exists
3622
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3623

    
3624
      info("renaming the old LVs on the target node")
3625
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3626
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3627
      # now we rename the new LVs to the old LVs
3628
      info("renaming the new LVs on the target node")
3629
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3630
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3631
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3632

    
3633
      for old, new in zip(old_lvs, new_lvs):
3634
        new.logical_id = old.logical_id
3635
        cfg.SetDiskID(new, tgt_node)
3636

    
3637
      for disk in old_lvs:
3638
        disk.logical_id = ren_fn(disk, temp_suffix)
3639
        cfg.SetDiskID(disk, tgt_node)
3640

    
3641
      # now that the new lvs have the old name, we can add them to the device
3642
      info("adding new mirror component on %s" % tgt_node)
3643
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3644
        for new_lv in new_lvs:
3645
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3646
            warning("Can't rollback device %s", hint="manually cleanup unused"
3647
                    " logical volumes")
3648
        raise errors.OpExecError("Can't add local storage to drbd")
3649

    
3650
      dev.children = new_lvs
3651
      cfg.Update(instance)
3652

    
3653
    # Step: wait for sync
3654

    
3655
    # this can fail as the old devices are degraded and _WaitForSync
3656
    # does a combined result over all disks, so we don't check its
3657
    # return value
3658
    self.proc.LogStep(5, steps_total, "sync devices")
3659
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3660

    
3661
    # so check manually all the devices
3662
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3663
      cfg.SetDiskID(dev, instance.primary_node)
3664
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3665
      if is_degr:
3666
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3667

    
3668
    # Step: remove old storage
3669
    self.proc.LogStep(6, steps_total, "removing old storage")
3670
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3671
      info("remove logical volumes for %s" % name)
3672
      for lv in old_lvs:
3673
        cfg.SetDiskID(lv, tgt_node)
3674
        if not rpc.call_blockdev_remove(tgt_node, lv):
3675
          warning("Can't remove old LV", hint="manually remove unused LVs")
3676
          continue
3677

    
3678
  def _ExecD8Secondary(self, feedback_fn):
3679
    """Replace the secondary node for drbd8.
3680

3681
    The algorithm for replace is quite complicated:
3682
      - for all disks of the instance:
3683
        - create new LVs on the new node with same names
3684
        - shutdown the drbd device on the old secondary
3685
        - disconnect the drbd network on the primary
3686
        - create the drbd device on the new secondary
3687
        - network attach the drbd on the primary, using an artifice:
3688
          the drbd code for Attach() will connect to the network if it
3689
          finds a device which is connected to the good local disks but
3690
          not network enabled
3691
      - wait for sync across all devices
3692
      - remove all disks from the old secondary
3693

3694
    Failures are not very well handled.
3695

3696
    """
3697
    steps_total = 6
3698
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3699
    instance = self.instance
3700
    iv_names = {}
3701
    vgname = self.cfg.GetVGName()
3702
    # start of work
3703
    cfg = self.cfg
3704
    old_node = self.tgt_node
3705
    new_node = self.new_node
3706
    pri_node = instance.primary_node
3707

    
3708
    # Step: check device activation
3709
    self.proc.LogStep(1, steps_total, "check device existence")
3710
    info("checking volume groups")
3711
    my_vg = cfg.GetVGName()
3712
    results = rpc.call_vg_list([pri_node, new_node])
3713
    if not results:
3714
      raise errors.OpExecError("Can't list volume groups on the nodes")
3715
    for node in pri_node, new_node:
3716
      res = results.get(node, False)
3717
      if not res or my_vg not in res:
3718
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3719
                                 (my_vg, node))
3720
    for dev in instance.disks:
3721
      if not dev.iv_name in self.op.disks:
3722
        continue
3723
      info("checking %s on %s" % (dev.iv_name, pri_node))
3724
      cfg.SetDiskID(dev, pri_node)
3725
      if not rpc.call_blockdev_find(pri_node, dev):
3726
        raise errors.OpExecError("Can't find device %s on node %s" %
3727
                                 (dev.iv_name, pri_node))
3728

    
3729
    # Step: check other node consistency
3730
    self.proc.LogStep(2, steps_total, "check peer consistency")
3731
    for dev in instance.disks:
3732
      if not dev.iv_name in self.op.disks:
3733
        continue
3734
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3735
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3736
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3737
                                 " unsafe to replace the secondary" %
3738
                                 pri_node)
3739

    
3740
    # Step: create new storage
3741
    self.proc.LogStep(3, steps_total, "allocate new storage")
3742
    for dev in instance.disks:
3743
      size = dev.size
3744
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3745
      # since we *always* want to create this LV, we use the
3746
      # _Create...OnPrimary (which forces the creation), even if we
3747
      # are talking about the secondary node
3748
      for new_lv in dev.children:
3749
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3750
                                        _GetInstanceInfoText(instance)):
3751
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3752
                                   " node '%s'" %
3753
                                   (new_lv.logical_id[1], new_node))
3754

    
3755
      iv_names[dev.iv_name] = (dev, dev.children)
3756

    
3757
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3758
    for dev in instance.disks:
3759
      size = dev.size
3760
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3761
      # create new devices on new_node
3762
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3763
                              logical_id=(pri_node, new_node,
3764
                                          dev.logical_id[2]),
3765
                              children=dev.children)
3766
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3767
                                        new_drbd, False,
3768
                                      _GetInstanceInfoText(instance)):
3769
        raise errors.OpExecError("Failed to create new DRBD on"
3770
                                 " node '%s'" % new_node)
3771

    
3772
    for dev in instance.disks:
3773
      # we have new devices, shutdown the drbd on the old secondary
3774
      info("shutting down drbd for %s on old node" % dev.iv_name)
3775
      cfg.SetDiskID(dev, old_node)
3776
      if not rpc.call_blockdev_shutdown(old_node, dev):
3777
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3778
                hint="Please cleanup this device manually as soon as possible")
3779

    
3780
    info("detaching primary drbds from the network (=> standalone)")
3781
    done = 0
3782
    for dev in instance.disks:
3783
      cfg.SetDiskID(dev, pri_node)
3784
      # set the physical (unique in bdev terms) id to None, meaning
3785
      # detach from network
3786
      dev.physical_id = (None,) * len(dev.physical_id)
3787
      # and 'find' the device, which will 'fix' it to match the
3788
      # standalone state
3789
      if rpc.call_blockdev_find(pri_node, dev):
3790
        done += 1
3791
      else:
3792
        warning("Failed to detach drbd %s from network, unusual case" %
3793
                dev.iv_name)
3794

    
3795
    if not done:
3796
      # no detaches succeeded (very unlikely)
3797
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3798

    
3799
    # if we managed to detach at least one, we update all the disks of
3800
    # the instance to point to the new secondary
3801
    info("updating instance configuration")
3802
    for dev in instance.disks:
3803
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3804
      cfg.SetDiskID(dev, pri_node)
3805
    cfg.Update(instance)
3806

    
3807
    # and now perform the drbd attach
3808
    info("attaching primary drbds to new secondary (standalone => connected)")
3809
    failures = []
3810
    for dev in instance.disks:
3811
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3812
      # since the attach is smart, it's enough to 'find' the device,
3813
      # it will automatically activate the network, if the physical_id
3814
      # is correct
3815
      cfg.SetDiskID(dev, pri_node)
3816
      if not rpc.call_blockdev_find(pri_node, dev):
3817
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3818
                "please do a gnt-instance info to see the status of disks")
3819

    
3820
    # this can fail as the old devices are degraded and _WaitForSync
3821
    # does a combined result over all disks, so we don't check its
3822
    # return value
3823
    self.proc.LogStep(5, steps_total, "sync devices")
3824
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3825

    
3826
    # so check manually all the devices
3827
    for name, (dev, old_lvs) in iv_names.iteritems():
3828
      cfg.SetDiskID(dev, pri_node)
3829
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3830
      if is_degr:
3831
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3832

    
3833
    self.proc.LogStep(6, steps_total, "removing old storage")
3834
    for name, (dev, old_lvs) in iv_names.iteritems():
3835
      info("remove logical volumes for %s" % name)
3836
      for lv in old_lvs:
3837
        cfg.SetDiskID(lv, old_node)
3838
        if not rpc.call_blockdev_remove(old_node, lv):
3839
          warning("Can't remove LV on old secondary",
3840
                  hint="Cleanup stale volumes by hand")
3841

    
3842
  def Exec(self, feedback_fn):
3843
    """Execute disk replacement.
3844

3845
    This dispatches the disk replacement to the appropriate handler.
3846

3847
    """
3848
    instance = self.instance
3849

    
3850
    # Activate the instance disks if we're replacing them on a down instance
3851
    if instance.status == "down":
3852
      op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3853
      self.proc.ChainOpCode(op)
3854

    
3855
    if instance.disk_template == constants.DT_DRBD8:
3856
      if self.op.remote_node is None:
3857
        fn = self._ExecD8DiskOnly
3858
      else:
3859
        fn = self._ExecD8Secondary
3860
    else:
3861
      raise errors.ProgrammerError("Unhandled disk replacement case")
3862

    
3863
    ret = fn(feedback_fn)
3864

    
3865
    # Deactivate the instance disks if we're replacing them on a down instance
3866
    if instance.status == "down":
3867
      op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3868
      self.proc.ChainOpCode(op)
3869

    
3870
    return ret
3871

    
3872

    
3873
class LUGrowDisk(LogicalUnit):
3874
  """Grow a disk of an instance.
3875

3876
  """
3877
  HPATH = "disk-grow"
3878
  HTYPE = constants.HTYPE_INSTANCE
3879
  _OP_REQP = ["instance_name", "disk", "amount"]
3880

    
3881
  def BuildHooksEnv(self):
3882
    """Build hooks env.
3883

3884
    This runs on the master, the primary and all the secondaries.
3885

3886
    """
3887
    env = {
3888
      "DISK": self.op.disk,
3889
      "AMOUNT": self.op.amount,
3890
      }
3891
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3892
    nl = [
3893
      self.sstore.GetMasterNode(),
3894
      self.instance.primary_node,
3895
      ]
3896
    return env, nl, nl
3897

    
3898
  def CheckPrereq(self):
3899
    """Check prerequisites.
3900

3901
    This checks that the instance is in the cluster.
3902

3903
    """
3904
    instance = self.cfg.GetInstanceInfo(
3905
      self.cfg.ExpandInstanceName(self.op.instance_name))
3906
    if instance is None:
3907
      raise errors.OpPrereqError("Instance '%s' not known" %
3908
                                 self.op.instance_name)
3909
    self.instance = instance
3910
    self.op.instance_name = instance.name
3911

    
3912
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3913
      raise errors.OpPrereqError("Instance's disk layout does not support"
3914
                                 " growing.")
3915

    
3916
    if instance.FindDisk(self.op.disk) is None:
3917
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3918
                                 (self.op.disk, instance.name))
3919

    
3920
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3921
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3922
    for node in nodenames:
3923
      info = nodeinfo.get(node, None)
3924
      if not info:
3925
        raise errors.OpPrereqError("Cannot get current information"
3926
                                   " from node '%s'" % node)
3927
      vg_free = info.get('vg_free', None)
3928
      if not isinstance(vg_free, int):
3929
        raise errors.OpPrereqError("Can't compute free disk space on"
3930
                                   " node %s" % node)
3931
      if self.op.amount > info['vg_free']:
3932
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
3933
                                   " %d MiB available, %d MiB required" %
3934
                                   (node, info['vg_free'], self.op.amount))
3935

    
3936
  def Exec(self, feedback_fn):
3937
    """Execute disk grow.
3938

3939
    """
3940
    instance = self.instance
3941
    disk = instance.FindDisk(self.op.disk)
3942
    for node in (instance.secondary_nodes + (instance.primary_node,)):
3943
      self.cfg.SetDiskID(disk, node)
3944
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3945
      if not result or not isinstance(result, tuple) or len(result) != 2:
3946
        raise errors.OpExecError("grow request failed to node %s" % node)
3947
      elif not result[0]:
3948
        raise errors.OpExecError("grow request failed to node %s: %s" %
3949
                                 (node, result[1]))
3950
    disk.RecordGrow(self.op.amount)
3951
    self.cfg.Update(instance)
3952
    return
3953

    
3954

    
3955
class LUQueryInstanceData(NoHooksLU):
3956
  """Query runtime instance data.
3957

3958
  """
3959
  _OP_REQP = ["instances"]
3960

    
3961
  def CheckPrereq(self):
3962
    """Check prerequisites.