Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 00fb8246

History | View | Annotate | Download (169.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement CheckPrereq which also fills in the opcode instance
53
      with all the fields (even if as None)
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_MASTER: the LU needs to run on the master node
59
        REQ_WSSTORE: the LU needs a writable SimpleStore
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_WSSTORE = False
69

    
70
  def __init__(self, processor, op, cfg, sstore):
71
    """Constructor for LogicalUnit.
72

73
    This needs to be overriden in derived classes in order to check op
74
    validity.
75

76
    """
77
    self.proc = processor
78
    self.op = op
79
    self.cfg = cfg
80
    self.sstore = sstore
81
    self.__ssh = None
82

    
83
    for attr_name in self._OP_REQP:
84
      attr_val = getattr(op, attr_name, None)
85
      if attr_val is None:
86
        raise errors.OpPrereqError("Required parameter '%s' missing" %
87
                                   attr_name)
88

    
89
    if not cfg.IsCluster():
90
      raise errors.OpPrereqError("Cluster not initialized yet,"
91
                                 " use 'gnt-cluster init' first.")
92
    if self.REQ_MASTER:
93
      master = sstore.GetMasterNode()
94
      if master != utils.HostInfo().name:
95
        raise errors.OpPrereqError("Commands must be run on the master"
96
                                   " node %s" % master)
97

    
98
  def __GetSSH(self):
99
    """Returns the SshRunner object
100

101
    """
102
    if not self.__ssh:
103
      self.__ssh = ssh.SshRunner(self.sstore)
104
    return self.__ssh
105

    
106
  ssh = property(fget=__GetSSH)
107

    
108
  def CheckPrereq(self):
109
    """Check prerequisites for this LU.
110

111
    This method should check that the prerequisites for the execution
112
    of this LU are fulfilled. It can do internode communication, but
113
    it should be idempotent - no cluster or system changes are
114
    allowed.
115

116
    The method should raise errors.OpPrereqError in case something is
117
    not fulfilled. Its return value is ignored.
118

119
    This method should also update all the parameters of the opcode to
120
    their canonical form; e.g. a short node name must be fully
121
    expanded after this method has successfully completed (so that
122
    hooks, logging, etc. work correctly).
123

124
    """
125
    raise NotImplementedError
126

    
127
  def Exec(self, feedback_fn):
128
    """Execute the LU.
129

130
    This method should implement the actual work. It should raise
131
    errors.OpExecError for failures that are somewhat dealt with in
132
    code, or expected.
133

134
    """
135
    raise NotImplementedError
136

    
137
  def BuildHooksEnv(self):
138
    """Build hooks environment for this LU.
139

140
    This method should return a three-node tuple consisting of: a dict
141
    containing the environment that will be used for running the
142
    specific hook for this LU, a list of node names on which the hook
143
    should run before the execution, and a list of node names on which
144
    the hook should run after the execution.
145

146
    The keys of the dict must not have 'GANETI_' prefixed as this will
147
    be handled in the hooks runner. Also note additional keys will be
148
    added by the hooks runner. If the LU doesn't define any
149
    environment, an empty dict (and not None) should be returned.
150

151
    No nodes should be returned as an empty list (and not None).
152

153
    Note that if the HPATH for a LU class is None, this function will
154
    not be called.
155

156
    """
157
    raise NotImplementedError
158

    
159
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
160
    """Notify the LU about the results of its hooks.
161

162
    This method is called every time a hooks phase is executed, and notifies
163
    the Logical Unit about the hooks' result. The LU can then use it to alter
164
    its result based on the hooks.  By default the method does nothing and the
165
    previous result is passed back unchanged but any LU can define it if it
166
    wants to use the local cluster hook-scripts somehow.
167

168
    Args:
169
      phase: the hooks phase that has just been run
170
      hooks_results: the results of the multi-node hooks rpc call
171
      feedback_fn: function to send feedback back to the caller
172
      lu_result: the previous result this LU had, or None in the PRE phase.
173

174
    """
175
    return lu_result
176

    
177

    
178
class NoHooksLU(LogicalUnit):
179
  """Simple LU which runs no hooks.
180

181
  This LU is intended as a parent for other LogicalUnits which will
182
  run no hooks, in order to reduce duplicate code.
183

184
  """
185
  HPATH = None
186
  HTYPE = None
187

    
188

    
189
def _GetWantedNodes(lu, nodes):
190
  """Returns list of checked and expanded node names.
191

192
  Args:
193
    nodes: List of nodes (strings) or None for all
194

195
  """
196
  if not isinstance(nodes, list):
197
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
198

    
199
  if nodes:
200
    wanted = []
201

    
202
    for name in nodes:
203
      node = lu.cfg.ExpandNodeName(name)
204
      if node is None:
205
        raise errors.OpPrereqError("No such node name '%s'" % name)
206
      wanted.append(node)
207

    
208
  else:
209
    wanted = lu.cfg.GetNodeList()
210
  return utils.NiceSort(wanted)
211

    
212

    
213
def _GetWantedInstances(lu, instances):
214
  """Returns list of checked and expanded instance names.
215

216
  Args:
217
    instances: List of instances (strings) or None for all
218

219
  """
220
  if not isinstance(instances, list):
221
    raise errors.OpPrereqError("Invalid argument type 'instances'")
222

    
223
  if instances:
224
    wanted = []
225

    
226
    for name in instances:
227
      instance = lu.cfg.ExpandInstanceName(name)
228
      if instance is None:
229
        raise errors.OpPrereqError("No such instance name '%s'" % name)
230
      wanted.append(instance)
231

    
232
  else:
233
    wanted = lu.cfg.GetInstanceList()
234
  return utils.NiceSort(wanted)
235

    
236

    
237
def _CheckOutputFields(static, dynamic, selected):
238
  """Checks whether all selected fields are valid.
239

240
  Args:
241
    static: Static fields
242
    dynamic: Dynamic fields
243

244
  """
245
  static_fields = frozenset(static)
246
  dynamic_fields = frozenset(dynamic)
247

    
248
  all_fields = static_fields | dynamic_fields
249

    
250
  if not all_fields.issuperset(selected):
251
    raise errors.OpPrereqError("Unknown output fields selected: %s"
252
                               % ",".join(frozenset(selected).
253
                                          difference(all_fields)))
254

    
255

    
256
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
257
                          memory, vcpus, nics):
258
  """Builds instance related env variables for hooks from single variables.
259

260
  Args:
261
    secondary_nodes: List of secondary nodes as strings
262
  """
263
  env = {
264
    "OP_TARGET": name,
265
    "INSTANCE_NAME": name,
266
    "INSTANCE_PRIMARY": primary_node,
267
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
268
    "INSTANCE_OS_TYPE": os_type,
269
    "INSTANCE_STATUS": status,
270
    "INSTANCE_MEMORY": memory,
271
    "INSTANCE_VCPUS": vcpus,
272
  }
273

    
274
  if nics:
275
    nic_count = len(nics)
276
    for idx, (ip, bridge, mac) in enumerate(nics):
277
      if ip is None:
278
        ip = ""
279
      env["INSTANCE_NIC%d_IP" % idx] = ip
280
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
281
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
282
  else:
283
    nic_count = 0
284

    
285
  env["INSTANCE_NIC_COUNT"] = nic_count
286

    
287
  return env
288

    
289

    
290
def _BuildInstanceHookEnvByObject(instance, override=None):
291
  """Builds instance related env variables for hooks from an object.
292

293
  Args:
294
    instance: objects.Instance object of instance
295
    override: dict of values to override
296
  """
297
  args = {
298
    'name': instance.name,
299
    'primary_node': instance.primary_node,
300
    'secondary_nodes': instance.secondary_nodes,
301
    'os_type': instance.os,
302
    'status': instance.os,
303
    'memory': instance.memory,
304
    'vcpus': instance.vcpus,
305
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
306
  }
307
  if override:
308
    args.update(override)
309
  return _BuildInstanceHookEnv(**args)
310

    
311

    
312
def _CheckInstanceBridgesExist(instance):
313
  """Check that the brigdes needed by an instance exist.
314

315
  """
316
  # check bridges existance
317
  brlist = [nic.bridge for nic in instance.nics]
318
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
319
    raise errors.OpPrereqError("one or more target bridges %s does not"
320
                               " exist on destination node '%s'" %
321
                               (brlist, instance.primary_node))
322

    
323

    
324
class LUDestroyCluster(NoHooksLU):
325
  """Logical unit for destroying the cluster.
326

327
  """
328
  _OP_REQP = []
329

    
330
  def CheckPrereq(self):
331
    """Check prerequisites.
332

333
    This checks whether the cluster is empty.
334

335
    Any errors are signalled by raising errors.OpPrereqError.
336

337
    """
338
    master = self.sstore.GetMasterNode()
339

    
340
    nodelist = self.cfg.GetNodeList()
341
    if len(nodelist) != 1 or nodelist[0] != master:
342
      raise errors.OpPrereqError("There are still %d node(s) in"
343
                                 " this cluster." % (len(nodelist) - 1))
344
    instancelist = self.cfg.GetInstanceList()
345
    if instancelist:
346
      raise errors.OpPrereqError("There are still %d instance(s) in"
347
                                 " this cluster." % len(instancelist))
348

    
349
  def Exec(self, feedback_fn):
350
    """Destroys the cluster.
351

352
    """
353
    master = self.sstore.GetMasterNode()
354
    if not rpc.call_node_stop_master(master):
355
      raise errors.OpExecError("Could not disable the master role")
356
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
357
    utils.CreateBackup(priv_key)
358
    utils.CreateBackup(pub_key)
359
    rpc.call_node_leave_cluster(master)
360

    
361

    
362
class LUVerifyCluster(LogicalUnit):
363
  """Verifies the cluster status.
364

365
  """
366
  HPATH = "cluster-verify"
367
  HTYPE = constants.HTYPE_CLUSTER
368
  _OP_REQP = ["skip_checks"]
369

    
370
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
371
                  remote_version, feedback_fn):
372
    """Run multiple tests against a node.
373

374
    Test list:
375
      - compares ganeti version
376
      - checks vg existance and size > 20G
377
      - checks config file checksum
378
      - checks ssh to other nodes
379

380
    Args:
381
      node: name of the node to check
382
      file_list: required list of files
383
      local_cksum: dictionary of local files and their checksums
384

385
    """
386
    # compares ganeti version
387
    local_version = constants.PROTOCOL_VERSION
388
    if not remote_version:
389
      feedback_fn("  - ERROR: connection to %s failed" % (node))
390
      return True
391

    
392
    if local_version != remote_version:
393
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
394
                      (local_version, node, remote_version))
395
      return True
396

    
397
    # checks vg existance and size > 20G
398

    
399
    bad = False
400
    if not vglist:
401
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
402
                      (node,))
403
      bad = True
404
    else:
405
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
406
                                            constants.MIN_VG_SIZE)
407
      if vgstatus:
408
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
409
        bad = True
410

    
411
    # checks config file checksum
412
    # checks ssh to any
413

    
414
    if 'filelist' not in node_result:
415
      bad = True
416
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
417
    else:
418
      remote_cksum = node_result['filelist']
419
      for file_name in file_list:
420
        if file_name not in remote_cksum:
421
          bad = True
422
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
423
        elif remote_cksum[file_name] != local_cksum[file_name]:
424
          bad = True
425
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
426

    
427
    if 'nodelist' not in node_result:
428
      bad = True
429
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
430
    else:
431
      if node_result['nodelist']:
432
        bad = True
433
        for node in node_result['nodelist']:
434
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
435
                          (node, node_result['nodelist'][node]))
436
    if 'node-net-test' not in node_result:
437
      bad = True
438
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
439
    else:
440
      if node_result['node-net-test']:
441
        bad = True
442
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
443
        for node in nlist:
444
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
445
                          (node, node_result['node-net-test'][node]))
446

    
447
    hyp_result = node_result.get('hypervisor', None)
448
    if hyp_result is not None:
449
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
450
    return bad
451

    
452
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
453
                      node_instance, feedback_fn):
454
    """Verify an instance.
455

456
    This function checks to see if the required block devices are
457
    available on the instance's node.
458

459
    """
460
    bad = False
461

    
462
    node_current = instanceconfig.primary_node
463

    
464
    node_vol_should = {}
465
    instanceconfig.MapLVsByNode(node_vol_should)
466

    
467
    for node in node_vol_should:
468
      for volume in node_vol_should[node]:
469
        if node not in node_vol_is or volume not in node_vol_is[node]:
470
          feedback_fn("  - ERROR: volume %s missing on node %s" %
471
                          (volume, node))
472
          bad = True
473

    
474
    if not instanceconfig.status == 'down':
475
      if (node_current not in node_instance or
476
          not instance in node_instance[node_current]):
477
        feedback_fn("  - ERROR: instance %s not running on node %s" %
478
                        (instance, node_current))
479
        bad = True
480

    
481
    for node in node_instance:
482
      if (not node == node_current):
483
        if instance in node_instance[node]:
484
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
485
                          (instance, node))
486
          bad = True
487

    
488
    return bad
489

    
490
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
491
    """Verify if there are any unknown volumes in the cluster.
492

493
    The .os, .swap and backup volumes are ignored. All other volumes are
494
    reported as unknown.
495

496
    """
497
    bad = False
498

    
499
    for node in node_vol_is:
500
      for volume in node_vol_is[node]:
501
        if node not in node_vol_should or volume not in node_vol_should[node]:
502
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
503
                      (volume, node))
504
          bad = True
505
    return bad
506

    
507
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
508
    """Verify the list of running instances.
509

510
    This checks what instances are running but unknown to the cluster.
511

512
    """
513
    bad = False
514
    for node in node_instance:
515
      for runninginstance in node_instance[node]:
516
        if runninginstance not in instancelist:
517
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
518
                          (runninginstance, node))
519
          bad = True
520
    return bad
521

    
522
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
523
    """Verify N+1 Memory Resilience.
524

525
    Check that if one single node dies we can still start all the instances it
526
    was primary for.
527

528
    """
529
    bad = False
530

    
531
    for node, nodeinfo in node_info.iteritems():
532
      # This code checks that every node which is now listed as secondary has
533
      # enough memory to host all instances it is supposed to should a single
534
      # other node in the cluster fail.
535
      # FIXME: not ready for failover to an arbitrary node
536
      # FIXME: does not support file-backed instances
537
      # WARNING: we currently take into account down instances as well as up
538
      # ones, considering that even if they're down someone might want to start
539
      # them even in the event of a node failure.
540
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
541
        needed_mem = 0
542
        for instance in instances:
543
          needed_mem += instance_cfg[instance].memory
544
        if nodeinfo['mfree'] < needed_mem:
545
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
546
                      " failovers should node %s fail" % (node, prinode))
547
          bad = True
548
    return bad
549

    
550
  def CheckPrereq(self):
551
    """Check prerequisites.
552

553
    Transform the list of checks we're going to skip into a set and check that
554
    all its members are valid.
555

556
    """
557
    self.skip_set = frozenset(self.op.skip_checks)
558
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
559
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
560

    
561
  def BuildHooksEnv(self):
562
    """Build hooks env.
563

564
    Cluster-Verify hooks just rone in the post phase and their failure makes
565
    the output be logged in the verify output and the verification to fail.
566

567
    """
568
    all_nodes = self.cfg.GetNodeList()
569
    # TODO: populate the environment with useful information for verify hooks
570
    env = {}
571
    return env, [], all_nodes
572

    
573
  def Exec(self, feedback_fn):
574
    """Verify integrity of cluster, performing various test on nodes.
575

576
    """
577
    bad = False
578
    feedback_fn("* Verifying global settings")
579
    for msg in self.cfg.VerifyConfig():
580
      feedback_fn("  - ERROR: %s" % msg)
581

    
582
    vg_name = self.cfg.GetVGName()
583
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
584
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
585
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
586
    i_non_redundant = [] # Non redundant instances
587
    node_volume = {}
588
    node_instance = {}
589
    node_info = {}
590
    instance_cfg = {}
591

    
592
    # FIXME: verify OS list
593
    # do local checksums
594
    file_names = list(self.sstore.GetFileList())
595
    file_names.append(constants.SSL_CERT_FILE)
596
    file_names.append(constants.CLUSTER_CONF_FILE)
597
    local_checksums = utils.FingerprintFiles(file_names)
598

    
599
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
600
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
601
    all_instanceinfo = rpc.call_instance_list(nodelist)
602
    all_vglist = rpc.call_vg_list(nodelist)
603
    node_verify_param = {
604
      'filelist': file_names,
605
      'nodelist': nodelist,
606
      'hypervisor': None,
607
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
608
                        for node in nodeinfo]
609
      }
610
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
611
    all_rversion = rpc.call_version(nodelist)
612
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
613

    
614
    for node in nodelist:
615
      feedback_fn("* Verifying node %s" % node)
616
      result = self._VerifyNode(node, file_names, local_checksums,
617
                                all_vglist[node], all_nvinfo[node],
618
                                all_rversion[node], feedback_fn)
619
      bad = bad or result
620

    
621
      # node_volume
622
      volumeinfo = all_volumeinfo[node]
623

    
624
      if isinstance(volumeinfo, basestring):
625
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
626
                    (node, volumeinfo[-400:].encode('string_escape')))
627
        bad = True
628
        node_volume[node] = {}
629
      elif not isinstance(volumeinfo, dict):
630
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
631
        bad = True
632
        continue
633
      else:
634
        node_volume[node] = volumeinfo
635

    
636
      # node_instance
637
      nodeinstance = all_instanceinfo[node]
638
      if type(nodeinstance) != list:
639
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
640
        bad = True
641
        continue
642

    
643
      node_instance[node] = nodeinstance
644

    
645
      # node_info
646
      nodeinfo = all_ninfo[node]
647
      if not isinstance(nodeinfo, dict):
648
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
649
        bad = True
650
        continue
651

    
652
      try:
653
        node_info[node] = {
654
          "mfree": int(nodeinfo['memory_free']),
655
          "dfree": int(nodeinfo['vg_free']),
656
          "pinst": [],
657
          "sinst": [],
658
          # dictionary holding all instances this node is secondary for,
659
          # grouped by their primary node. Each key is a cluster node, and each
660
          # value is a list of instances which have the key as primary and the
661
          # current node as secondary.  this is handy to calculate N+1 memory
662
          # availability if you can only failover from a primary to its
663
          # secondary.
664
          "sinst-by-pnode": {},
665
        }
666
      except ValueError:
667
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
668
        bad = True
669
        continue
670

    
671
    node_vol_should = {}
672

    
673
    for instance in instancelist:
674
      feedback_fn("* Verifying instance %s" % instance)
675
      inst_config = self.cfg.GetInstanceInfo(instance)
676
      result =  self._VerifyInstance(instance, inst_config, node_volume,
677
                                     node_instance, feedback_fn)
678
      bad = bad or result
679

    
680
      inst_config.MapLVsByNode(node_vol_should)
681

    
682
      instance_cfg[instance] = inst_config
683

    
684
      pnode = inst_config.primary_node
685
      if pnode in node_info:
686
        node_info[pnode]['pinst'].append(instance)
687
      else:
688
        feedback_fn("  - ERROR: instance %s, connection to primary node"
689
                    " %s failed" % (instance, pnode))
690
        bad = True
691

    
692
      # If the instance is non-redundant we cannot survive losing its primary
693
      # node, so we are not N+1 compliant. On the other hand we have no disk
694
      # templates with more than one secondary so that situation is not well
695
      # supported either.
696
      # FIXME: does not support file-backed instances
697
      if len(inst_config.secondary_nodes) == 0:
698
        i_non_redundant.append(instance)
699
      elif len(inst_config.secondary_nodes) > 1:
700
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
701
                    % instance)
702

    
703
      for snode in inst_config.secondary_nodes:
704
        if snode in node_info:
705
          node_info[snode]['sinst'].append(instance)
706
          if pnode not in node_info[snode]['sinst-by-pnode']:
707
            node_info[snode]['sinst-by-pnode'][pnode] = []
708
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
709
        else:
710
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
711
                      " %s failed" % (instance, snode))
712

    
713
    feedback_fn("* Verifying orphan volumes")
714
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
715
                                       feedback_fn)
716
    bad = bad or result
717

    
718
    feedback_fn("* Verifying remaining instances")
719
    result = self._VerifyOrphanInstances(instancelist, node_instance,
720
                                         feedback_fn)
721
    bad = bad or result
722

    
723
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
724
      feedback_fn("* Verifying N+1 Memory redundancy")
725
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
726
      bad = bad or result
727

    
728
    feedback_fn("* Other Notes")
729
    if i_non_redundant:
730
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
731
                  % len(i_non_redundant))
732

    
733
    return int(bad)
734

    
735
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
736
    """Analize the post-hooks' result, handle it, and send some
737
    nicely-formatted feedback back to the user.
738

739
    Args:
740
      phase: the hooks phase that has just been run
741
      hooks_results: the results of the multi-node hooks rpc call
742
      feedback_fn: function to send feedback back to the caller
743
      lu_result: previous Exec result
744

745
    """
746
    # We only really run POST phase hooks, and are only interested in their results
747
    if phase == constants.HOOKS_PHASE_POST:
748
      # Used to change hooks' output to proper indentation
749
      indent_re = re.compile('^', re.M)
750
      feedback_fn("* Hooks Results")
751
      if not hooks_results:
752
        feedback_fn("  - ERROR: general communication failure")
753
        lu_result = 1
754
      else:
755
        for node_name in hooks_results:
756
          show_node_header = True
757
          res = hooks_results[node_name]
758
          if res is False or not isinstance(res, list):
759
            feedback_fn("    Communication failure")
760
            lu_result = 1
761
            continue
762
          for script, hkr, output in res:
763
            if hkr == constants.HKR_FAIL:
764
              # The node header is only shown once, if there are
765
              # failing hooks on that node
766
              if show_node_header:
767
                feedback_fn("  Node %s:" % node_name)
768
                show_node_header = False
769
              feedback_fn("    ERROR: Script %s failed, output:" % script)
770
              output = indent_re.sub('      ', output)
771
              feedback_fn("%s" % output)
772
              lu_result = 1
773

    
774
      return lu_result
775

    
776

    
777
class LUVerifyDisks(NoHooksLU):
778
  """Verifies the cluster disks status.
779

780
  """
781
  _OP_REQP = []
782

    
783
  def CheckPrereq(self):
784
    """Check prerequisites.
785

786
    This has no prerequisites.
787

788
    """
789
    pass
790

    
791
  def Exec(self, feedback_fn):
792
    """Verify integrity of cluster disks.
793

794
    """
795
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
796

    
797
    vg_name = self.cfg.GetVGName()
798
    nodes = utils.NiceSort(self.cfg.GetNodeList())
799
    instances = [self.cfg.GetInstanceInfo(name)
800
                 for name in self.cfg.GetInstanceList()]
801

    
802
    nv_dict = {}
803
    for inst in instances:
804
      inst_lvs = {}
805
      if (inst.status != "up" or
806
          inst.disk_template not in constants.DTS_NET_MIRROR):
807
        continue
808
      inst.MapLVsByNode(inst_lvs)
809
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
810
      for node, vol_list in inst_lvs.iteritems():
811
        for vol in vol_list:
812
          nv_dict[(node, vol)] = inst
813

    
814
    if not nv_dict:
815
      return result
816

    
817
    node_lvs = rpc.call_volume_list(nodes, vg_name)
818

    
819
    to_act = set()
820
    for node in nodes:
821
      # node_volume
822
      lvs = node_lvs[node]
823

    
824
      if isinstance(lvs, basestring):
825
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
826
        res_nlvm[node] = lvs
827
      elif not isinstance(lvs, dict):
828
        logger.Info("connection to node %s failed or invalid data returned" %
829
                    (node,))
830
        res_nodes.append(node)
831
        continue
832

    
833
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
834
        inst = nv_dict.pop((node, lv_name), None)
835
        if (not lv_online and inst is not None
836
            and inst.name not in res_instances):
837
          res_instances.append(inst.name)
838

    
839
    # any leftover items in nv_dict are missing LVs, let's arrange the
840
    # data better
841
    for key, inst in nv_dict.iteritems():
842
      if inst.name not in res_missing:
843
        res_missing[inst.name] = []
844
      res_missing[inst.name].append(key)
845

    
846
    return result
847

    
848

    
849
class LURenameCluster(LogicalUnit):
850
  """Rename the cluster.
851

852
  """
853
  HPATH = "cluster-rename"
854
  HTYPE = constants.HTYPE_CLUSTER
855
  _OP_REQP = ["name"]
856
  REQ_WSSTORE = True
857

    
858
  def BuildHooksEnv(self):
859
    """Build hooks env.
860

861
    """
862
    env = {
863
      "OP_TARGET": self.sstore.GetClusterName(),
864
      "NEW_NAME": self.op.name,
865
      }
866
    mn = self.sstore.GetMasterNode()
867
    return env, [mn], [mn]
868

    
869
  def CheckPrereq(self):
870
    """Verify that the passed name is a valid one.
871

872
    """
873
    hostname = utils.HostInfo(self.op.name)
874

    
875
    new_name = hostname.name
876
    self.ip = new_ip = hostname.ip
877
    old_name = self.sstore.GetClusterName()
878
    old_ip = self.sstore.GetMasterIP()
879
    if new_name == old_name and new_ip == old_ip:
880
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
881
                                 " cluster has changed")
882
    if new_ip != old_ip:
883
      result = utils.RunCmd(["fping", "-q", new_ip])
884
      if not result.failed:
885
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
886
                                   " reachable on the network. Aborting." %
887
                                   new_ip)
888

    
889
    self.op.name = new_name
890

    
891
  def Exec(self, feedback_fn):
892
    """Rename the cluster.
893

894
    """
895
    clustername = self.op.name
896
    ip = self.ip
897
    ss = self.sstore
898

    
899
    # shutdown the master IP
900
    master = ss.GetMasterNode()
901
    if not rpc.call_node_stop_master(master):
902
      raise errors.OpExecError("Could not disable the master role")
903

    
904
    try:
905
      # modify the sstore
906
      ss.SetKey(ss.SS_MASTER_IP, ip)
907
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
908

    
909
      # Distribute updated ss config to all nodes
910
      myself = self.cfg.GetNodeInfo(master)
911
      dist_nodes = self.cfg.GetNodeList()
912
      if myself.name in dist_nodes:
913
        dist_nodes.remove(myself.name)
914

    
915
      logger.Debug("Copying updated ssconf data to all nodes")
916
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
917
        fname = ss.KeyToFilename(keyname)
918
        result = rpc.call_upload_file(dist_nodes, fname)
919
        for to_node in dist_nodes:
920
          if not result[to_node]:
921
            logger.Error("copy of file %s to node %s failed" %
922
                         (fname, to_node))
923
    finally:
924
      if not rpc.call_node_start_master(master):
925
        logger.Error("Could not re-enable the master role on the master,"
926
                     " please restart manually.")
927

    
928

    
929
def _RecursiveCheckIfLVMBased(disk):
930
  """Check if the given disk or its children are lvm-based.
931

932
  Args:
933
    disk: ganeti.objects.Disk object
934

935
  Returns:
936
    boolean indicating whether a LD_LV dev_type was found or not
937

938
  """
939
  if disk.children:
940
    for chdisk in disk.children:
941
      if _RecursiveCheckIfLVMBased(chdisk):
942
        return True
943
  return disk.dev_type == constants.LD_LV
944

    
945

    
946
class LUSetClusterParams(LogicalUnit):
947
  """Change the parameters of the cluster.
948

949
  """
950
  HPATH = "cluster-modify"
951
  HTYPE = constants.HTYPE_CLUSTER
952
  _OP_REQP = []
953

    
954
  def BuildHooksEnv(self):
955
    """Build hooks env.
956

957
    """
958
    env = {
959
      "OP_TARGET": self.sstore.GetClusterName(),
960
      "NEW_VG_NAME": self.op.vg_name,
961
      }
962
    mn = self.sstore.GetMasterNode()
963
    return env, [mn], [mn]
964

    
965
  def CheckPrereq(self):
966
    """Check prerequisites.
967

968
    This checks whether the given params don't conflict and
969
    if the given volume group is valid.
970

971
    """
972
    if not self.op.vg_name:
973
      instances = [self.cfg.GetInstanceInfo(name)
974
                   for name in self.cfg.GetInstanceList()]
975
      for inst in instances:
976
        for disk in inst.disks:
977
          if _RecursiveCheckIfLVMBased(disk):
978
            raise errors.OpPrereqError("Cannot disable lvm storage while"
979
                                       " lvm-based instances exist")
980

    
981
    # if vg_name not None, checks given volume group on all nodes
982
    if self.op.vg_name:
983
      node_list = self.cfg.GetNodeList()
984
      vglist = rpc.call_vg_list(node_list)
985
      for node in node_list:
986
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
987
                                              constants.MIN_VG_SIZE)
988
        if vgstatus:
989
          raise errors.OpPrereqError("Error on node '%s': %s" %
990
                                     (node, vgstatus))
991

    
992
  def Exec(self, feedback_fn):
993
    """Change the parameters of the cluster.
994

995
    """
996
    if self.op.vg_name != self.cfg.GetVGName():
997
      self.cfg.SetVGName(self.op.vg_name)
998
    else:
999
      feedback_fn("Cluster LVM configuration already in desired"
1000
                  " state, not changing")
1001

    
1002

    
1003
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1004
  """Sleep and poll for an instance's disk to sync.
1005

1006
  """
1007
  if not instance.disks:
1008
    return True
1009

    
1010
  if not oneshot:
1011
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1012

    
1013
  node = instance.primary_node
1014

    
1015
  for dev in instance.disks:
1016
    cfgw.SetDiskID(dev, node)
1017

    
1018
  retries = 0
1019
  while True:
1020
    max_time = 0
1021
    done = True
1022
    cumul_degraded = False
1023
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1024
    if not rstats:
1025
      proc.LogWarning("Can't get any data from node %s" % node)
1026
      retries += 1
1027
      if retries >= 10:
1028
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1029
                                 " aborting." % node)
1030
      time.sleep(6)
1031
      continue
1032
    retries = 0
1033
    for i in range(len(rstats)):
1034
      mstat = rstats[i]
1035
      if mstat is None:
1036
        proc.LogWarning("Can't compute data for node %s/%s" %
1037
                        (node, instance.disks[i].iv_name))
1038
        continue
1039
      # we ignore the ldisk parameter
1040
      perc_done, est_time, is_degraded, _ = mstat
1041
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1042
      if perc_done is not None:
1043
        done = False
1044
        if est_time is not None:
1045
          rem_time = "%d estimated seconds remaining" % est_time
1046
          max_time = est_time
1047
        else:
1048
          rem_time = "no time estimate"
1049
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1050
                     (instance.disks[i].iv_name, perc_done, rem_time))
1051
    if done or oneshot:
1052
      break
1053

    
1054
    if unlock:
1055
      #utils.Unlock('cmd')
1056
      pass
1057
    try:
1058
      time.sleep(min(60, max_time))
1059
    finally:
1060
      if unlock:
1061
        #utils.Lock('cmd')
1062
        pass
1063

    
1064
  if done:
1065
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1066
  return not cumul_degraded
1067

    
1068

    
1069
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1070
  """Check that mirrors are not degraded.
1071

1072
  The ldisk parameter, if True, will change the test from the
1073
  is_degraded attribute (which represents overall non-ok status for
1074
  the device(s)) to the ldisk (representing the local storage status).
1075

1076
  """
1077
  cfgw.SetDiskID(dev, node)
1078
  if ldisk:
1079
    idx = 6
1080
  else:
1081
    idx = 5
1082

    
1083
  result = True
1084
  if on_primary or dev.AssembleOnSecondary():
1085
    rstats = rpc.call_blockdev_find(node, dev)
1086
    if not rstats:
1087
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1088
      result = False
1089
    else:
1090
      result = result and (not rstats[idx])
1091
  if dev.children:
1092
    for child in dev.children:
1093
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1094

    
1095
  return result
1096

    
1097

    
1098
class LUDiagnoseOS(NoHooksLU):
1099
  """Logical unit for OS diagnose/query.
1100

1101
  """
1102
  _OP_REQP = ["output_fields", "names"]
1103

    
1104
  def CheckPrereq(self):
1105
    """Check prerequisites.
1106

1107
    This always succeeds, since this is a pure query LU.
1108

1109
    """
1110
    if self.op.names:
1111
      raise errors.OpPrereqError("Selective OS query not supported")
1112

    
1113
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1114
    _CheckOutputFields(static=[],
1115
                       dynamic=self.dynamic_fields,
1116
                       selected=self.op.output_fields)
1117

    
1118
  @staticmethod
1119
  def _DiagnoseByOS(node_list, rlist):
1120
    """Remaps a per-node return list into an a per-os per-node dictionary
1121

1122
      Args:
1123
        node_list: a list with the names of all nodes
1124
        rlist: a map with node names as keys and OS objects as values
1125

1126
      Returns:
1127
        map: a map with osnames as keys and as value another map, with
1128
             nodes as
1129
             keys and list of OS objects as values
1130
             e.g. {"debian-etch": {"node1": [<object>,...],
1131
                                   "node2": [<object>,]}
1132
                  }
1133

1134
    """
1135
    all_os = {}
1136
    for node_name, nr in rlist.iteritems():
1137
      if not nr:
1138
        continue
1139
      for os_obj in nr:
1140
        if os_obj.name not in all_os:
1141
          # build a list of nodes for this os containing empty lists
1142
          # for each node in node_list
1143
          all_os[os_obj.name] = {}
1144
          for nname in node_list:
1145
            all_os[os_obj.name][nname] = []
1146
        all_os[os_obj.name][node_name].append(os_obj)
1147
    return all_os
1148

    
1149
  def Exec(self, feedback_fn):
1150
    """Compute the list of OSes.
1151

1152
    """
1153
    node_list = self.cfg.GetNodeList()
1154
    node_data = rpc.call_os_diagnose(node_list)
1155
    if node_data == False:
1156
      raise errors.OpExecError("Can't gather the list of OSes")
1157
    pol = self._DiagnoseByOS(node_list, node_data)
1158
    output = []
1159
    for os_name, os_data in pol.iteritems():
1160
      row = []
1161
      for field in self.op.output_fields:
1162
        if field == "name":
1163
          val = os_name
1164
        elif field == "valid":
1165
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1166
        elif field == "node_status":
1167
          val = {}
1168
          for node_name, nos_list in os_data.iteritems():
1169
            val[node_name] = [(v.status, v.path) for v in nos_list]
1170
        else:
1171
          raise errors.ParameterError(field)
1172
        row.append(val)
1173
      output.append(row)
1174

    
1175
    return output
1176

    
1177

    
1178
class LURemoveNode(LogicalUnit):
1179
  """Logical unit for removing a node.
1180

1181
  """
1182
  HPATH = "node-remove"
1183
  HTYPE = constants.HTYPE_NODE
1184
  _OP_REQP = ["node_name"]
1185

    
1186
  def BuildHooksEnv(self):
1187
    """Build hooks env.
1188

1189
    This doesn't run on the target node in the pre phase as a failed
1190
    node would not allows itself to run.
1191

1192
    """
1193
    env = {
1194
      "OP_TARGET": self.op.node_name,
1195
      "NODE_NAME": self.op.node_name,
1196
      }
1197
    all_nodes = self.cfg.GetNodeList()
1198
    all_nodes.remove(self.op.node_name)
1199
    return env, all_nodes, all_nodes
1200

    
1201
  def CheckPrereq(self):
1202
    """Check prerequisites.
1203

1204
    This checks:
1205
     - the node exists in the configuration
1206
     - it does not have primary or secondary instances
1207
     - it's not the master
1208

1209
    Any errors are signalled by raising errors.OpPrereqError.
1210

1211
    """
1212
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1213
    if node is None:
1214
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1215

    
1216
    instance_list = self.cfg.GetInstanceList()
1217

    
1218
    masternode = self.sstore.GetMasterNode()
1219
    if node.name == masternode:
1220
      raise errors.OpPrereqError("Node is the master node,"
1221
                                 " you need to failover first.")
1222

    
1223
    for instance_name in instance_list:
1224
      instance = self.cfg.GetInstanceInfo(instance_name)
1225
      if node.name == instance.primary_node:
1226
        raise errors.OpPrereqError("Instance %s still running on the node,"
1227
                                   " please remove first." % instance_name)
1228
      if node.name in instance.secondary_nodes:
1229
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1230
                                   " please remove first." % instance_name)
1231
    self.op.node_name = node.name
1232
    self.node = node
1233

    
1234
  def Exec(self, feedback_fn):
1235
    """Removes the node from the cluster.
1236

1237
    """
1238
    node = self.node
1239
    logger.Info("stopping the node daemon and removing configs from node %s" %
1240
                node.name)
1241

    
1242
    rpc.call_node_leave_cluster(node.name)
1243

    
1244
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1245

    
1246
    logger.Info("Removing node %s from config" % node.name)
1247

    
1248
    self.cfg.RemoveNode(node.name)
1249

    
1250
    utils.RemoveHostFromEtcHosts(node.name)
1251

    
1252

    
1253
class LUQueryNodes(NoHooksLU):
1254
  """Logical unit for querying nodes.
1255

1256
  """
1257
  _OP_REQP = ["output_fields", "names"]
1258

    
1259
  def CheckPrereq(self):
1260
    """Check prerequisites.
1261

1262
    This checks that the fields required are valid output fields.
1263

1264
    """
1265
    self.dynamic_fields = frozenset([
1266
      "dtotal", "dfree",
1267
      "mtotal", "mnode", "mfree",
1268
      "bootid",
1269
      "ctotal",
1270
      ])
1271

    
1272
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1273
                               "pinst_list", "sinst_list",
1274
                               "pip", "sip", "tags"],
1275
                       dynamic=self.dynamic_fields,
1276
                       selected=self.op.output_fields)
1277

    
1278
    self.wanted = _GetWantedNodes(self, self.op.names)
1279

    
1280
  def Exec(self, feedback_fn):
1281
    """Computes the list of nodes and their attributes.
1282

1283
    """
1284
    nodenames = self.wanted
1285
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1286

    
1287
    # begin data gathering
1288

    
1289
    if self.dynamic_fields.intersection(self.op.output_fields):
1290
      live_data = {}
1291
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1292
      for name in nodenames:
1293
        nodeinfo = node_data.get(name, None)
1294
        if nodeinfo:
1295
          live_data[name] = {
1296
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1297
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1298
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1299
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1300
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1301
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1302
            "bootid": nodeinfo['bootid'],
1303
            }
1304
        else:
1305
          live_data[name] = {}
1306
    else:
1307
      live_data = dict.fromkeys(nodenames, {})
1308

    
1309
    node_to_primary = dict([(name, set()) for name in nodenames])
1310
    node_to_secondary = dict([(name, set()) for name in nodenames])
1311

    
1312
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1313
                             "sinst_cnt", "sinst_list"))
1314
    if inst_fields & frozenset(self.op.output_fields):
1315
      instancelist = self.cfg.GetInstanceList()
1316

    
1317
      for instance_name in instancelist:
1318
        inst = self.cfg.GetInstanceInfo(instance_name)
1319
        if inst.primary_node in node_to_primary:
1320
          node_to_primary[inst.primary_node].add(inst.name)
1321
        for secnode in inst.secondary_nodes:
1322
          if secnode in node_to_secondary:
1323
            node_to_secondary[secnode].add(inst.name)
1324

    
1325
    # end data gathering
1326

    
1327
    output = []
1328
    for node in nodelist:
1329
      node_output = []
1330
      for field in self.op.output_fields:
1331
        if field == "name":
1332
          val = node.name
1333
        elif field == "pinst_list":
1334
          val = list(node_to_primary[node.name])
1335
        elif field == "sinst_list":
1336
          val = list(node_to_secondary[node.name])
1337
        elif field == "pinst_cnt":
1338
          val = len(node_to_primary[node.name])
1339
        elif field == "sinst_cnt":
1340
          val = len(node_to_secondary[node.name])
1341
        elif field == "pip":
1342
          val = node.primary_ip
1343
        elif field == "sip":
1344
          val = node.secondary_ip
1345
        elif field == "tags":
1346
          val = list(node.GetTags())
1347
        elif field in self.dynamic_fields:
1348
          val = live_data[node.name].get(field, None)
1349
        else:
1350
          raise errors.ParameterError(field)
1351
        node_output.append(val)
1352
      output.append(node_output)
1353

    
1354
    return output
1355

    
1356

    
1357
class LUQueryNodeVolumes(NoHooksLU):
1358
  """Logical unit for getting volumes on node(s).
1359

1360
  """
1361
  _OP_REQP = ["nodes", "output_fields"]
1362

    
1363
  def CheckPrereq(self):
1364
    """Check prerequisites.
1365

1366
    This checks that the fields required are valid output fields.
1367

1368
    """
1369
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1370

    
1371
    _CheckOutputFields(static=["node"],
1372
                       dynamic=["phys", "vg", "name", "size", "instance"],
1373
                       selected=self.op.output_fields)
1374

    
1375

    
1376
  def Exec(self, feedback_fn):
1377
    """Computes the list of nodes and their attributes.
1378

1379
    """
1380
    nodenames = self.nodes
1381
    volumes = rpc.call_node_volumes(nodenames)
1382

    
1383
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1384
             in self.cfg.GetInstanceList()]
1385

    
1386
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1387

    
1388
    output = []
1389
    for node in nodenames:
1390
      if node not in volumes or not volumes[node]:
1391
        continue
1392

    
1393
      node_vols = volumes[node][:]
1394
      node_vols.sort(key=lambda vol: vol['dev'])
1395

    
1396
      for vol in node_vols:
1397
        node_output = []
1398
        for field in self.op.output_fields:
1399
          if field == "node":
1400
            val = node
1401
          elif field == "phys":
1402
            val = vol['dev']
1403
          elif field == "vg":
1404
            val = vol['vg']
1405
          elif field == "name":
1406
            val = vol['name']
1407
          elif field == "size":
1408
            val = int(float(vol['size']))
1409
          elif field == "instance":
1410
            for inst in ilist:
1411
              if node not in lv_by_node[inst]:
1412
                continue
1413
              if vol['name'] in lv_by_node[inst][node]:
1414
                val = inst.name
1415
                break
1416
            else:
1417
              val = '-'
1418
          else:
1419
            raise errors.ParameterError(field)
1420
          node_output.append(str(val))
1421

    
1422
        output.append(node_output)
1423

    
1424
    return output
1425

    
1426

    
1427
class LUAddNode(LogicalUnit):
1428
  """Logical unit for adding node to the cluster.
1429

1430
  """
1431
  HPATH = "node-add"
1432
  HTYPE = constants.HTYPE_NODE
1433
  _OP_REQP = ["node_name"]
1434

    
1435
  def BuildHooksEnv(self):
1436
    """Build hooks env.
1437

1438
    This will run on all nodes before, and on all nodes + the new node after.
1439

1440
    """
1441
    env = {
1442
      "OP_TARGET": self.op.node_name,
1443
      "NODE_NAME": self.op.node_name,
1444
      "NODE_PIP": self.op.primary_ip,
1445
      "NODE_SIP": self.op.secondary_ip,
1446
      }
1447
    nodes_0 = self.cfg.GetNodeList()
1448
    nodes_1 = nodes_0 + [self.op.node_name, ]
1449
    return env, nodes_0, nodes_1
1450

    
1451
  def CheckPrereq(self):
1452
    """Check prerequisites.
1453

1454
    This checks:
1455
     - the new node is not already in the config
1456
     - it is resolvable
1457
     - its parameters (single/dual homed) matches the cluster
1458

1459
    Any errors are signalled by raising errors.OpPrereqError.
1460

1461
    """
1462
    node_name = self.op.node_name
1463
    cfg = self.cfg
1464

    
1465
    dns_data = utils.HostInfo(node_name)
1466

    
1467
    node = dns_data.name
1468
    primary_ip = self.op.primary_ip = dns_data.ip
1469
    secondary_ip = getattr(self.op, "secondary_ip", None)
1470
    if secondary_ip is None:
1471
      secondary_ip = primary_ip
1472
    if not utils.IsValidIP(secondary_ip):
1473
      raise errors.OpPrereqError("Invalid secondary IP given")
1474
    self.op.secondary_ip = secondary_ip
1475

    
1476
    node_list = cfg.GetNodeList()
1477
    if not self.op.readd and node in node_list:
1478
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1479
                                 node)
1480
    elif self.op.readd and node not in node_list:
1481
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1482

    
1483
    for existing_node_name in node_list:
1484
      existing_node = cfg.GetNodeInfo(existing_node_name)
1485

    
1486
      if self.op.readd and node == existing_node_name:
1487
        if (existing_node.primary_ip != primary_ip or
1488
            existing_node.secondary_ip != secondary_ip):
1489
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1490
                                     " address configuration as before")
1491
        continue
1492

    
1493
      if (existing_node.primary_ip == primary_ip or
1494
          existing_node.secondary_ip == primary_ip or
1495
          existing_node.primary_ip == secondary_ip or
1496
          existing_node.secondary_ip == secondary_ip):
1497
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1498
                                   " existing node %s" % existing_node.name)
1499

    
1500
    # check that the type of the node (single versus dual homed) is the
1501
    # same as for the master
1502
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1503
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1504
    newbie_singlehomed = secondary_ip == primary_ip
1505
    if master_singlehomed != newbie_singlehomed:
1506
      if master_singlehomed:
1507
        raise errors.OpPrereqError("The master has no private ip but the"
1508
                                   " new node has one")
1509
      else:
1510
        raise errors.OpPrereqError("The master has a private ip but the"
1511
                                   " new node doesn't have one")
1512

    
1513
    # checks reachablity
1514
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1515
      raise errors.OpPrereqError("Node not reachable by ping")
1516

    
1517
    if not newbie_singlehomed:
1518
      # check reachability from my secondary ip to newbie's secondary ip
1519
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1520
                           source=myself.secondary_ip):
1521
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1522
                                   " based ping to noded port")
1523

    
1524
    self.new_node = objects.Node(name=node,
1525
                                 primary_ip=primary_ip,
1526
                                 secondary_ip=secondary_ip)
1527

    
1528
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1529
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1530
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1531
                                   constants.VNC_PASSWORD_FILE)
1532

    
1533
  def Exec(self, feedback_fn):
1534
    """Adds the new node to the cluster.
1535

1536
    """
1537
    new_node = self.new_node
1538
    node = new_node.name
1539

    
1540
    # set up inter-node password and certificate and restarts the node daemon
1541
    gntpass = self.sstore.GetNodeDaemonPassword()
1542
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1543
      raise errors.OpExecError("ganeti password corruption detected")
1544
    f = open(constants.SSL_CERT_FILE)
1545
    try:
1546
      gntpem = f.read(8192)
1547
    finally:
1548
      f.close()
1549
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1550
    # so we use this to detect an invalid certificate; as long as the
1551
    # cert doesn't contain this, the here-document will be correctly
1552
    # parsed by the shell sequence below
1553
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1554
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1555
    if not gntpem.endswith("\n"):
1556
      raise errors.OpExecError("PEM must end with newline")
1557
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1558

    
1559
    # and then connect with ssh to set password and start ganeti-noded
1560
    # note that all the below variables are sanitized at this point,
1561
    # either by being constants or by the checks above
1562
    ss = self.sstore
1563
    mycommand = ("umask 077 && "
1564
                 "echo '%s' > '%s' && "
1565
                 "cat > '%s' << '!EOF.' && \n"
1566
                 "%s!EOF.\n%s restart" %
1567
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1568
                  constants.SSL_CERT_FILE, gntpem,
1569
                  constants.NODE_INITD_SCRIPT))
1570

    
1571
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1572
    if result.failed:
1573
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1574
                               " output: %s" %
1575
                               (node, result.fail_reason, result.output))
1576

    
1577
    # check connectivity
1578
    time.sleep(4)
1579

    
1580
    result = rpc.call_version([node])[node]
1581
    if result:
1582
      if constants.PROTOCOL_VERSION == result:
1583
        logger.Info("communication to node %s fine, sw version %s match" %
1584
                    (node, result))
1585
      else:
1586
        raise errors.OpExecError("Version mismatch master version %s,"
1587
                                 " node version %s" %
1588
                                 (constants.PROTOCOL_VERSION, result))
1589
    else:
1590
      raise errors.OpExecError("Cannot get version from the new node")
1591

    
1592
    # setup ssh on node
1593
    logger.Info("copy ssh key to node %s" % node)
1594
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1595
    keyarray = []
1596
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1597
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1598
                priv_key, pub_key]
1599

    
1600
    for i in keyfiles:
1601
      f = open(i, 'r')
1602
      try:
1603
        keyarray.append(f.read())
1604
      finally:
1605
        f.close()
1606

    
1607
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1608
                               keyarray[3], keyarray[4], keyarray[5])
1609

    
1610
    if not result:
1611
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1612

    
1613
    # Add node to our /etc/hosts, and add key to known_hosts
1614
    utils.AddHostToEtcHosts(new_node.name)
1615

    
1616
    if new_node.secondary_ip != new_node.primary_ip:
1617
      if not rpc.call_node_tcp_ping(new_node.name,
1618
                                    constants.LOCALHOST_IP_ADDRESS,
1619
                                    new_node.secondary_ip,
1620
                                    constants.DEFAULT_NODED_PORT,
1621
                                    10, False):
1622
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1623
                                 " you gave (%s). Please fix and re-run this"
1624
                                 " command." % new_node.secondary_ip)
1625

    
1626
    success, msg = self.ssh.VerifyNodeHostname(node)
1627
    if not success:
1628
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1629
                               " than the one the resolver gives: %s."
1630
                               " Please fix and re-run this command." %
1631
                               (node, msg))
1632

    
1633
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1634
    # including the node just added
1635
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1636
    dist_nodes = self.cfg.GetNodeList()
1637
    if not self.op.readd:
1638
      dist_nodes.append(node)
1639
    if myself.name in dist_nodes:
1640
      dist_nodes.remove(myself.name)
1641

    
1642
    logger.Debug("Copying hosts and known_hosts to all nodes")
1643
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1644
      result = rpc.call_upload_file(dist_nodes, fname)
1645
      for to_node in dist_nodes:
1646
        if not result[to_node]:
1647
          logger.Error("copy of file %s to node %s failed" %
1648
                       (fname, to_node))
1649

    
1650
    to_copy = ss.GetFileList()
1651
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1652
      to_copy.append(constants.VNC_PASSWORD_FILE)
1653
    for fname in to_copy:
1654
      if not self.ssh.CopyFileToNode(node, fname):
1655
        logger.Error("could not copy file %s to node %s" % (fname, node))
1656

    
1657
    if not self.op.readd:
1658
      logger.Info("adding node %s to cluster.conf" % node)
1659
      self.cfg.AddNode(new_node)
1660

    
1661

    
1662
class LUMasterFailover(LogicalUnit):
1663
  """Failover the master node to the current node.
1664

1665
  This is a special LU in that it must run on a non-master node.
1666

1667
  """
1668
  HPATH = "master-failover"
1669
  HTYPE = constants.HTYPE_CLUSTER
1670
  REQ_MASTER = False
1671
  REQ_WSSTORE = True
1672
  _OP_REQP = []
1673

    
1674
  def BuildHooksEnv(self):
1675
    """Build hooks env.
1676

1677
    This will run on the new master only in the pre phase, and on all
1678
    the nodes in the post phase.
1679

1680
    """
1681
    env = {
1682
      "OP_TARGET": self.new_master,
1683
      "NEW_MASTER": self.new_master,
1684
      "OLD_MASTER": self.old_master,
1685
      }
1686
    return env, [self.new_master], self.cfg.GetNodeList()
1687

    
1688
  def CheckPrereq(self):
1689
    """Check prerequisites.
1690

1691
    This checks that we are not already the master.
1692

1693
    """
1694
    self.new_master = utils.HostInfo().name
1695
    self.old_master = self.sstore.GetMasterNode()
1696

    
1697
    if self.old_master == self.new_master:
1698
      raise errors.OpPrereqError("This commands must be run on the node"
1699
                                 " where you want the new master to be."
1700
                                 " %s is already the master" %
1701
                                 self.old_master)
1702

    
1703
  def Exec(self, feedback_fn):
1704
    """Failover the master node.
1705

1706
    This command, when run on a non-master node, will cause the current
1707
    master to cease being master, and the non-master to become new
1708
    master.
1709

1710
    """
1711
    #TODO: do not rely on gethostname returning the FQDN
1712
    logger.Info("setting master to %s, old master: %s" %
1713
                (self.new_master, self.old_master))
1714

    
1715
    if not rpc.call_node_stop_master(self.old_master):
1716
      logger.Error("could disable the master role on the old master"
1717
                   " %s, please disable manually" % self.old_master)
1718

    
1719
    ss = self.sstore
1720
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1721
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1722
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1723
      logger.Error("could not distribute the new simple store master file"
1724
                   " to the other nodes, please check.")
1725

    
1726
    if not rpc.call_node_start_master(self.new_master):
1727
      logger.Error("could not start the master role on the new master"
1728
                   " %s, please check" % self.new_master)
1729
      feedback_fn("Error in activating the master IP on the new master,"
1730
                  " please fix manually.")
1731

    
1732

    
1733

    
1734
class LUQueryClusterInfo(NoHooksLU):
1735
  """Query cluster configuration.
1736

1737
  """
1738
  _OP_REQP = []
1739
  REQ_MASTER = False
1740

    
1741
  def CheckPrereq(self):
1742
    """No prerequsites needed for this LU.
1743

1744
    """
1745
    pass
1746

    
1747
  def Exec(self, feedback_fn):
1748
    """Return cluster config.
1749

1750
    """
1751
    result = {
1752
      "name": self.sstore.GetClusterName(),
1753
      "software_version": constants.RELEASE_VERSION,
1754
      "protocol_version": constants.PROTOCOL_VERSION,
1755
      "config_version": constants.CONFIG_VERSION,
1756
      "os_api_version": constants.OS_API_VERSION,
1757
      "export_version": constants.EXPORT_VERSION,
1758
      "master": self.sstore.GetMasterNode(),
1759
      "architecture": (platform.architecture()[0], platform.machine()),
1760
      "hypervisor_type": self.sstore.GetHypervisorType(),
1761
      }
1762

    
1763
    return result
1764

    
1765

    
1766
class LUDumpClusterConfig(NoHooksLU):
1767
  """Return a text-representation of the cluster-config.
1768

1769
  """
1770
  _OP_REQP = []
1771

    
1772
  def CheckPrereq(self):
1773
    """No prerequisites.
1774

1775
    """
1776
    pass
1777

    
1778
  def Exec(self, feedback_fn):
1779
    """Dump a representation of the cluster config to the standard output.
1780

1781
    """
1782
    return self.cfg.DumpConfig()
1783

    
1784

    
1785
class LUActivateInstanceDisks(NoHooksLU):
1786
  """Bring up an instance's disks.
1787

1788
  """
1789
  _OP_REQP = ["instance_name"]
1790

    
1791
  def CheckPrereq(self):
1792
    """Check prerequisites.
1793

1794
    This checks that the instance is in the cluster.
1795

1796
    """
1797
    instance = self.cfg.GetInstanceInfo(
1798
      self.cfg.ExpandInstanceName(self.op.instance_name))
1799
    if instance is None:
1800
      raise errors.OpPrereqError("Instance '%s' not known" %
1801
                                 self.op.instance_name)
1802
    self.instance = instance
1803

    
1804

    
1805
  def Exec(self, feedback_fn):
1806
    """Activate the disks.
1807

1808
    """
1809
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1810
    if not disks_ok:
1811
      raise errors.OpExecError("Cannot activate block devices")
1812

    
1813
    return disks_info
1814

    
1815

    
1816
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1817
  """Prepare the block devices for an instance.
1818

1819
  This sets up the block devices on all nodes.
1820

1821
  Args:
1822
    instance: a ganeti.objects.Instance object
1823
    ignore_secondaries: if true, errors on secondary nodes won't result
1824
                        in an error return from the function
1825

1826
  Returns:
1827
    false if the operation failed
1828
    list of (host, instance_visible_name, node_visible_name) if the operation
1829
         suceeded with the mapping from node devices to instance devices
1830
  """
1831
  device_info = []
1832
  disks_ok = True
1833
  iname = instance.name
1834
  # With the two passes mechanism we try to reduce the window of
1835
  # opportunity for the race condition of switching DRBD to primary
1836
  # before handshaking occured, but we do not eliminate it
1837

    
1838
  # The proper fix would be to wait (with some limits) until the
1839
  # connection has been made and drbd transitions from WFConnection
1840
  # into any other network-connected state (Connected, SyncTarget,
1841
  # SyncSource, etc.)
1842

    
1843
  # 1st pass, assemble on all nodes in secondary mode
1844
  for inst_disk in instance.disks:
1845
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1846
      cfg.SetDiskID(node_disk, node)
1847
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1848
      if not result:
1849
        logger.Error("could not prepare block device %s on node %s"
1850
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1851
        if not ignore_secondaries:
1852
          disks_ok = False
1853

    
1854
  # FIXME: race condition on drbd migration to primary
1855

    
1856
  # 2nd pass, do only the primary node
1857
  for inst_disk in instance.disks:
1858
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1859
      if node != instance.primary_node:
1860
        continue
1861
      cfg.SetDiskID(node_disk, node)
1862
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1863
      if not result:
1864
        logger.Error("could not prepare block device %s on node %s"
1865
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1866
        disks_ok = False
1867
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1868

    
1869
  # leave the disks configured for the primary node
1870
  # this is a workaround that would be fixed better by
1871
  # improving the logical/physical id handling
1872
  for disk in instance.disks:
1873
    cfg.SetDiskID(disk, instance.primary_node)
1874

    
1875
  return disks_ok, device_info
1876

    
1877

    
1878
def _StartInstanceDisks(cfg, instance, force):
1879
  """Start the disks of an instance.
1880

1881
  """
1882
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1883
                                           ignore_secondaries=force)
1884
  if not disks_ok:
1885
    _ShutdownInstanceDisks(instance, cfg)
1886
    if force is not None and not force:
1887
      logger.Error("If the message above refers to a secondary node,"
1888
                   " you can retry the operation using '--force'.")
1889
    raise errors.OpExecError("Disk consistency error")
1890

    
1891

    
1892
class LUDeactivateInstanceDisks(NoHooksLU):
1893
  """Shutdown an instance's disks.
1894

1895
  """
1896
  _OP_REQP = ["instance_name"]
1897

    
1898
  def CheckPrereq(self):
1899
    """Check prerequisites.
1900

1901
    This checks that the instance is in the cluster.
1902

1903
    """
1904
    instance = self.cfg.GetInstanceInfo(
1905
      self.cfg.ExpandInstanceName(self.op.instance_name))
1906
    if instance is None:
1907
      raise errors.OpPrereqError("Instance '%s' not known" %
1908
                                 self.op.instance_name)
1909
    self.instance = instance
1910

    
1911
  def Exec(self, feedback_fn):
1912
    """Deactivate the disks
1913

1914
    """
1915
    instance = self.instance
1916
    ins_l = rpc.call_instance_list([instance.primary_node])
1917
    ins_l = ins_l[instance.primary_node]
1918
    if not type(ins_l) is list:
1919
      raise errors.OpExecError("Can't contact node '%s'" %
1920
                               instance.primary_node)
1921

    
1922
    if self.instance.name in ins_l:
1923
      raise errors.OpExecError("Instance is running, can't shutdown"
1924
                               " block devices.")
1925

    
1926
    _ShutdownInstanceDisks(instance, self.cfg)
1927

    
1928

    
1929
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1930
  """Shutdown block devices of an instance.
1931

1932
  This does the shutdown on all nodes of the instance.
1933

1934
  If the ignore_primary is false, errors on the primary node are
1935
  ignored.
1936

1937
  """
1938
  result = True
1939
  for disk in instance.disks:
1940
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1941
      cfg.SetDiskID(top_disk, node)
1942
      if not rpc.call_blockdev_shutdown(node, top_disk):
1943
        logger.Error("could not shutdown block device %s on node %s" %
1944
                     (disk.iv_name, node))
1945
        if not ignore_primary or node != instance.primary_node:
1946
          result = False
1947
  return result
1948

    
1949

    
1950
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1951
  """Checks if a node has enough free memory.
1952

1953
  This function check if a given node has the needed amount of free
1954
  memory. In case the node has less memory or we cannot get the
1955
  information from the node, this function raise an OpPrereqError
1956
  exception.
1957

1958
  Args:
1959
    - cfg: a ConfigWriter instance
1960
    - node: the node name
1961
    - reason: string to use in the error message
1962
    - requested: the amount of memory in MiB
1963

1964
  """
1965
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1966
  if not nodeinfo or not isinstance(nodeinfo, dict):
1967
    raise errors.OpPrereqError("Could not contact node %s for resource"
1968
                             " information" % (node,))
1969

    
1970
  free_mem = nodeinfo[node].get('memory_free')
1971
  if not isinstance(free_mem, int):
1972
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1973
                             " was '%s'" % (node, free_mem))
1974
  if requested > free_mem:
1975
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1976
                             " needed %s MiB, available %s MiB" %
1977
                             (node, reason, requested, free_mem))
1978

    
1979

    
1980
class LUStartupInstance(LogicalUnit):
1981
  """Starts an instance.
1982

1983
  """
1984
  HPATH = "instance-start"
1985
  HTYPE = constants.HTYPE_INSTANCE
1986
  _OP_REQP = ["instance_name", "force"]
1987

    
1988
  def BuildHooksEnv(self):
1989
    """Build hooks env.
1990

1991
    This runs on master, primary and secondary nodes of the instance.
1992

1993
    """
1994
    env = {
1995
      "FORCE": self.op.force,
1996
      }
1997
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1998
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1999
          list(self.instance.secondary_nodes))
2000
    return env, nl, nl
2001

    
2002
  def CheckPrereq(self):
2003
    """Check prerequisites.
2004

2005
    This checks that the instance is in the cluster.
2006

2007
    """
2008
    instance = self.cfg.GetInstanceInfo(
2009
      self.cfg.ExpandInstanceName(self.op.instance_name))
2010
    if instance is None:
2011
      raise errors.OpPrereqError("Instance '%s' not known" %
2012
                                 self.op.instance_name)
2013

    
2014
    # check bridges existance
2015
    _CheckInstanceBridgesExist(instance)
2016

    
2017
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2018
                         "starting instance %s" % instance.name,
2019
                         instance.memory)
2020

    
2021
    self.instance = instance
2022
    self.op.instance_name = instance.name
2023

    
2024
  def Exec(self, feedback_fn):
2025
    """Start the instance.
2026

2027
    """
2028
    instance = self.instance
2029
    force = self.op.force
2030
    extra_args = getattr(self.op, "extra_args", "")
2031

    
2032
    self.cfg.MarkInstanceUp(instance.name)
2033

    
2034
    node_current = instance.primary_node
2035

    
2036
    _StartInstanceDisks(self.cfg, instance, force)
2037

    
2038
    if not rpc.call_instance_start(node_current, instance, extra_args):
2039
      _ShutdownInstanceDisks(instance, self.cfg)
2040
      raise errors.OpExecError("Could not start instance")
2041

    
2042

    
2043
class LURebootInstance(LogicalUnit):
2044
  """Reboot an instance.
2045

2046
  """
2047
  HPATH = "instance-reboot"
2048
  HTYPE = constants.HTYPE_INSTANCE
2049
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2050

    
2051
  def BuildHooksEnv(self):
2052
    """Build hooks env.
2053

2054
    This runs on master, primary and secondary nodes of the instance.
2055

2056
    """
2057
    env = {
2058
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2059
      }
2060
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2061
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2062
          list(self.instance.secondary_nodes))
2063
    return env, nl, nl
2064

    
2065
  def CheckPrereq(self):
2066
    """Check prerequisites.
2067

2068
    This checks that the instance is in the cluster.
2069

2070
    """
2071
    instance = self.cfg.GetInstanceInfo(
2072
      self.cfg.ExpandInstanceName(self.op.instance_name))
2073
    if instance is None:
2074
      raise errors.OpPrereqError("Instance '%s' not known" %
2075
                                 self.op.instance_name)
2076

    
2077
    # check bridges existance
2078
    _CheckInstanceBridgesExist(instance)
2079

    
2080
    self.instance = instance
2081
    self.op.instance_name = instance.name
2082

    
2083
  def Exec(self, feedback_fn):
2084
    """Reboot the instance.
2085

2086
    """
2087
    instance = self.instance
2088
    ignore_secondaries = self.op.ignore_secondaries
2089
    reboot_type = self.op.reboot_type
2090
    extra_args = getattr(self.op, "extra_args", "")
2091

    
2092
    node_current = instance.primary_node
2093

    
2094
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2095
                           constants.INSTANCE_REBOOT_HARD,
2096
                           constants.INSTANCE_REBOOT_FULL]:
2097
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2098
                                  (constants.INSTANCE_REBOOT_SOFT,
2099
                                   constants.INSTANCE_REBOOT_HARD,
2100
                                   constants.INSTANCE_REBOOT_FULL))
2101

    
2102
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2103
                       constants.INSTANCE_REBOOT_HARD]:
2104
      if not rpc.call_instance_reboot(node_current, instance,
2105
                                      reboot_type, extra_args):
2106
        raise errors.OpExecError("Could not reboot instance")
2107
    else:
2108
      if not rpc.call_instance_shutdown(node_current, instance):
2109
        raise errors.OpExecError("could not shutdown instance for full reboot")
2110
      _ShutdownInstanceDisks(instance, self.cfg)
2111
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2112
      if not rpc.call_instance_start(node_current, instance, extra_args):
2113
        _ShutdownInstanceDisks(instance, self.cfg)
2114
        raise errors.OpExecError("Could not start instance for full reboot")
2115

    
2116
    self.cfg.MarkInstanceUp(instance.name)
2117

    
2118

    
2119
class LUShutdownInstance(LogicalUnit):
2120
  """Shutdown an instance.
2121

2122
  """
2123
  HPATH = "instance-stop"
2124
  HTYPE = constants.HTYPE_INSTANCE
2125
  _OP_REQP = ["instance_name"]
2126

    
2127
  def BuildHooksEnv(self):
2128
    """Build hooks env.
2129

2130
    This runs on master, primary and secondary nodes of the instance.
2131

2132
    """
2133
    env = _BuildInstanceHookEnvByObject(self.instance)
2134
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2135
          list(self.instance.secondary_nodes))
2136
    return env, nl, nl
2137

    
2138
  def CheckPrereq(self):
2139
    """Check prerequisites.
2140

2141
    This checks that the instance is in the cluster.
2142

2143
    """
2144
    instance = self.cfg.GetInstanceInfo(
2145
      self.cfg.ExpandInstanceName(self.op.instance_name))
2146
    if instance is None:
2147
      raise errors.OpPrereqError("Instance '%s' not known" %
2148
                                 self.op.instance_name)
2149
    self.instance = instance
2150

    
2151
  def Exec(self, feedback_fn):
2152
    """Shutdown the instance.
2153

2154
    """
2155
    instance = self.instance
2156
    node_current = instance.primary_node
2157
    self.cfg.MarkInstanceDown(instance.name)
2158
    if not rpc.call_instance_shutdown(node_current, instance):
2159
      logger.Error("could not shutdown instance")
2160

    
2161
    _ShutdownInstanceDisks(instance, self.cfg)
2162

    
2163

    
2164
class LUReinstallInstance(LogicalUnit):
2165
  """Reinstall an instance.
2166

2167
  """
2168
  HPATH = "instance-reinstall"
2169
  HTYPE = constants.HTYPE_INSTANCE
2170
  _OP_REQP = ["instance_name"]
2171

    
2172
  def BuildHooksEnv(self):
2173
    """Build hooks env.
2174

2175
    This runs on master, primary and secondary nodes of the instance.
2176

2177
    """
2178
    env = _BuildInstanceHookEnvByObject(self.instance)
2179
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2180
          list(self.instance.secondary_nodes))
2181
    return env, nl, nl
2182

    
2183
  def CheckPrereq(self):
2184
    """Check prerequisites.
2185

2186
    This checks that the instance is in the cluster and is not running.
2187

2188
    """
2189
    instance = self.cfg.GetInstanceInfo(
2190
      self.cfg.ExpandInstanceName(self.op.instance_name))
2191
    if instance is None:
2192
      raise errors.OpPrereqError("Instance '%s' not known" %
2193
                                 self.op.instance_name)
2194
    if instance.disk_template == constants.DT_DISKLESS:
2195
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2196
                                 self.op.instance_name)
2197
    if instance.status != "down":
2198
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2199
                                 self.op.instance_name)
2200
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2201
    if remote_info:
2202
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2203
                                 (self.op.instance_name,
2204
                                  instance.primary_node))
2205

    
2206
    self.op.os_type = getattr(self.op, "os_type", None)
2207
    if self.op.os_type is not None:
2208
      # OS verification
2209
      pnode = self.cfg.GetNodeInfo(
2210
        self.cfg.ExpandNodeName(instance.primary_node))
2211
      if pnode is None:
2212
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2213
                                   self.op.pnode)
2214
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2215
      if not os_obj:
2216
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2217
                                   " primary node"  % self.op.os_type)
2218

    
2219
    self.instance = instance
2220

    
2221
  def Exec(self, feedback_fn):
2222
    """Reinstall the instance.
2223

2224
    """
2225
    inst = self.instance
2226

    
2227
    if self.op.os_type is not None:
2228
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2229
      inst.os = self.op.os_type
2230
      self.cfg.AddInstance(inst)
2231

    
2232
    _StartInstanceDisks(self.cfg, inst, None)
2233
    try:
2234
      feedback_fn("Running the instance OS create scripts...")
2235
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2236
        raise errors.OpExecError("Could not install OS for instance %s"
2237
                                 " on node %s" %
2238
                                 (inst.name, inst.primary_node))
2239
    finally:
2240
      _ShutdownInstanceDisks(inst, self.cfg)
2241

    
2242

    
2243
class LURenameInstance(LogicalUnit):
2244
  """Rename an instance.
2245

2246
  """
2247
  HPATH = "instance-rename"
2248
  HTYPE = constants.HTYPE_INSTANCE
2249
  _OP_REQP = ["instance_name", "new_name"]
2250

    
2251
  def BuildHooksEnv(self):
2252
    """Build hooks env.
2253

2254
    This runs on master, primary and secondary nodes of the instance.
2255

2256
    """
2257
    env = _BuildInstanceHookEnvByObject(self.instance)
2258
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2259
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2260
          list(self.instance.secondary_nodes))
2261
    return env, nl, nl
2262

    
2263
  def CheckPrereq(self):
2264
    """Check prerequisites.
2265

2266
    This checks that the instance is in the cluster and is not running.
2267

2268
    """
2269
    instance = self.cfg.GetInstanceInfo(
2270
      self.cfg.ExpandInstanceName(self.op.instance_name))
2271
    if instance is None:
2272
      raise errors.OpPrereqError("Instance '%s' not known" %
2273
                                 self.op.instance_name)
2274
    if instance.status != "down":
2275
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2276
                                 self.op.instance_name)
2277
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2278
    if remote_info:
2279
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2280
                                 (self.op.instance_name,
2281
                                  instance.primary_node))
2282
    self.instance = instance
2283

    
2284
    # new name verification
2285
    name_info = utils.HostInfo(self.op.new_name)
2286

    
2287
    self.op.new_name = new_name = name_info.name
2288
    instance_list = self.cfg.GetInstanceList()
2289
    if new_name in instance_list:
2290
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2291
                                 new_name)
2292

    
2293
    if not getattr(self.op, "ignore_ip", False):
2294
      command = ["fping", "-q", name_info.ip]
2295
      result = utils.RunCmd(command)
2296
      if not result.failed:
2297
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2298
                                   (name_info.ip, new_name))
2299

    
2300

    
2301
  def Exec(self, feedback_fn):
2302
    """Reinstall the instance.
2303

2304
    """
2305
    inst = self.instance
2306
    old_name = inst.name
2307

    
2308
    if inst.disk_template == constants.DT_FILE:
2309
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2310

    
2311
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2312

    
2313
    # re-read the instance from the configuration after rename
2314
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2315

    
2316
    if inst.disk_template == constants.DT_FILE:
2317
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2318
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2319
                                                old_file_storage_dir,
2320
                                                new_file_storage_dir)
2321

    
2322
      if not result:
2323
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2324
                                 " directory '%s' to '%s' (but the instance"
2325
                                 " has been renamed in Ganeti)" % (
2326
                                 inst.primary_node, old_file_storage_dir,
2327
                                 new_file_storage_dir))
2328

    
2329
      if not result[0]:
2330
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2331
                                 " (but the instance has been renamed in"
2332
                                 " Ganeti)" % (old_file_storage_dir,
2333
                                               new_file_storage_dir))
2334

    
2335
    _StartInstanceDisks(self.cfg, inst, None)
2336
    try:
2337
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2338
                                          "sda", "sdb"):
2339
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2340
               " instance has been renamed in Ganeti)" %
2341
               (inst.name, inst.primary_node))
2342
        logger.Error(msg)
2343
    finally:
2344
      _ShutdownInstanceDisks(inst, self.cfg)
2345

    
2346

    
2347
class LURemoveInstance(LogicalUnit):
2348
  """Remove an instance.
2349

2350
  """
2351
  HPATH = "instance-remove"
2352
  HTYPE = constants.HTYPE_INSTANCE
2353
  _OP_REQP = ["instance_name", "ignore_failures"]
2354

    
2355
  def BuildHooksEnv(self):
2356
    """Build hooks env.
2357

2358
    This runs on master, primary and secondary nodes of the instance.
2359

2360
    """
2361
    env = _BuildInstanceHookEnvByObject(self.instance)
2362
    nl = [self.sstore.GetMasterNode()]
2363
    return env, nl, nl
2364

    
2365
  def CheckPrereq(self):
2366
    """Check prerequisites.
2367

2368
    This checks that the instance is in the cluster.
2369

2370
    """
2371
    instance = self.cfg.GetInstanceInfo(
2372
      self.cfg.ExpandInstanceName(self.op.instance_name))
2373
    if instance is None:
2374
      raise errors.OpPrereqError("Instance '%s' not known" %
2375
                                 self.op.instance_name)
2376
    self.instance = instance
2377

    
2378
  def Exec(self, feedback_fn):
2379
    """Remove the instance.
2380

2381
    """
2382
    instance = self.instance
2383
    logger.Info("shutting down instance %s on node %s" %
2384
                (instance.name, instance.primary_node))
2385

    
2386
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2387
      if self.op.ignore_failures:
2388
        feedback_fn("Warning: can't shutdown instance")
2389
      else:
2390
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2391
                                 (instance.name, instance.primary_node))
2392

    
2393
    logger.Info("removing block devices for instance %s" % instance.name)
2394

    
2395
    if not _RemoveDisks(instance, self.cfg):
2396
      if self.op.ignore_failures:
2397
        feedback_fn("Warning: can't remove instance's disks")
2398
      else:
2399
        raise errors.OpExecError("Can't remove instance's disks")
2400

    
2401
    logger.Info("removing instance %s out of cluster config" % instance.name)
2402

    
2403
    self.cfg.RemoveInstance(instance.name)
2404

    
2405

    
2406
class LUQueryInstances(NoHooksLU):
2407
  """Logical unit for querying instances.
2408

2409
  """
2410
  _OP_REQP = ["output_fields", "names"]
2411

    
2412
  def CheckPrereq(self):
2413
    """Check prerequisites.
2414

2415
    This checks that the fields required are valid output fields.
2416

2417
    """
2418
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2419
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2420
                               "admin_state", "admin_ram",
2421
                               "disk_template", "ip", "mac", "bridge",
2422
                               "sda_size", "sdb_size", "vcpus", "tags"],
2423
                       dynamic=self.dynamic_fields,
2424
                       selected=self.op.output_fields)
2425

    
2426
    self.wanted = _GetWantedInstances(self, self.op.names)
2427

    
2428
  def Exec(self, feedback_fn):
2429
    """Computes the list of nodes and their attributes.
2430

2431
    """
2432
    instance_names = self.wanted
2433
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2434
                     in instance_names]
2435

    
2436
    # begin data gathering
2437

    
2438
    nodes = frozenset([inst.primary_node for inst in instance_list])
2439

    
2440
    bad_nodes = []
2441
    if self.dynamic_fields.intersection(self.op.output_fields):
2442
      live_data = {}
2443
      node_data = rpc.call_all_instances_info(nodes)
2444
      for name in nodes:
2445
        result = node_data[name]
2446
        if result:
2447
          live_data.update(result)
2448
        elif result == False:
2449
          bad_nodes.append(name)
2450
        # else no instance is alive
2451
    else:
2452
      live_data = dict([(name, {}) for name in instance_names])
2453

    
2454
    # end data gathering
2455

    
2456
    output = []
2457
    for instance in instance_list:
2458
      iout = []
2459
      for field in self.op.output_fields:
2460
        if field == "name":
2461
          val = instance.name
2462
        elif field == "os":
2463
          val = instance.os
2464
        elif field == "pnode":
2465
          val = instance.primary_node
2466
        elif field == "snodes":
2467
          val = list(instance.secondary_nodes)
2468
        elif field == "admin_state":
2469
          val = (instance.status != "down")
2470
        elif field == "oper_state":
2471
          if instance.primary_node in bad_nodes:
2472
            val = None
2473
          else:
2474
            val = bool(live_data.get(instance.name))
2475
        elif field == "status":
2476
          if instance.primary_node in bad_nodes:
2477
            val = "ERROR_nodedown"
2478
          else:
2479
            running = bool(live_data.get(instance.name))
2480
            if running:
2481
              if instance.status != "down":
2482
                val = "running"
2483
              else:
2484
                val = "ERROR_up"
2485
            else:
2486
              if instance.status != "down":
2487
                val = "ERROR_down"
2488
              else:
2489
                val = "ADMIN_down"
2490
        elif field == "admin_ram":
2491
          val = instance.memory
2492
        elif field == "oper_ram":
2493
          if instance.primary_node in bad_nodes:
2494
            val = None
2495
          elif instance.name in live_data:
2496
            val = live_data[instance.name].get("memory", "?")
2497
          else:
2498
            val = "-"
2499
        elif field == "disk_template":
2500
          val = instance.disk_template
2501
        elif field == "ip":
2502
          val = instance.nics[0].ip
2503
        elif field == "bridge":
2504
          val = instance.nics[0].bridge
2505
        elif field == "mac":
2506
          val = instance.nics[0].mac
2507
        elif field == "sda_size" or field == "sdb_size":
2508
          disk = instance.FindDisk(field[:3])
2509
          if disk is None:
2510
            val = None
2511
          else:
2512
            val = disk.size
2513
        elif field == "vcpus":
2514
          val = instance.vcpus
2515
        elif field == "tags":
2516
          val = list(instance.GetTags())
2517
        else:
2518
          raise errors.ParameterError(field)
2519
        iout.append(val)
2520
      output.append(iout)
2521

    
2522
    return output
2523

    
2524

    
2525
class LUFailoverInstance(LogicalUnit):
2526
  """Failover an instance.
2527

2528
  """
2529
  HPATH = "instance-failover"
2530
  HTYPE = constants.HTYPE_INSTANCE
2531
  _OP_REQP = ["instance_name", "ignore_consistency"]
2532

    
2533
  def BuildHooksEnv(self):
2534
    """Build hooks env.
2535

2536
    This runs on master, primary and secondary nodes of the instance.
2537

2538
    """
2539
    env = {
2540
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2541
      }
2542
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2543
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2544
    return env, nl, nl
2545

    
2546
  def CheckPrereq(self):
2547
    """Check prerequisites.
2548

2549
    This checks that the instance is in the cluster.
2550

2551
    """
2552
    instance = self.cfg.GetInstanceInfo(
2553
      self.cfg.ExpandInstanceName(self.op.instance_name))
2554
    if instance is None:
2555
      raise errors.OpPrereqError("Instance '%s' not known" %
2556
                                 self.op.instance_name)
2557

    
2558
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2559
      raise errors.OpPrereqError("Instance's disk layout is not"
2560
                                 " network mirrored, cannot failover.")
2561

    
2562
    secondary_nodes = instance.secondary_nodes
2563
    if not secondary_nodes:
2564
      raise errors.ProgrammerError("no secondary node but using "
2565
                                   "a mirrored disk template")
2566

    
2567
    target_node = secondary_nodes[0]
2568
    # check memory requirements on the secondary node
2569
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2570
                         instance.name, instance.memory)
2571

    
2572
    # check bridge existance
2573
    brlist = [nic.bridge for nic in instance.nics]
2574
    if not rpc.call_bridges_exist(target_node, brlist):
2575
      raise errors.OpPrereqError("One or more target bridges %s does not"
2576
                                 " exist on destination node '%s'" %
2577
                                 (brlist, target_node))
2578

    
2579
    self.instance = instance
2580

    
2581
  def Exec(self, feedback_fn):
2582
    """Failover an instance.
2583

2584
    The failover is done by shutting it down on its present node and
2585
    starting it on the secondary.
2586

2587
    """
2588
    instance = self.instance
2589

    
2590
    source_node = instance.primary_node
2591
    target_node = instance.secondary_nodes[0]
2592

    
2593
    feedback_fn("* checking disk consistency between source and target")
2594
    for dev in instance.disks:
2595
      # for drbd, these are drbd over lvm
2596
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2597
        if instance.status == "up" and not self.op.ignore_consistency:
2598
          raise errors.OpExecError("Disk %s is degraded on target node,"
2599
                                   " aborting failover." % dev.iv_name)
2600

    
2601
    feedback_fn("* shutting down instance on source node")
2602
    logger.Info("Shutting down instance %s on node %s" %
2603
                (instance.name, source_node))
2604

    
2605
    if not rpc.call_instance_shutdown(source_node, instance):
2606
      if self.op.ignore_consistency:
2607
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2608
                     " anyway. Please make sure node %s is down"  %
2609
                     (instance.name, source_node, source_node))
2610
      else:
2611
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2612
                                 (instance.name, source_node))
2613

    
2614
    feedback_fn("* deactivating the instance's disks on source node")
2615
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2616
      raise errors.OpExecError("Can't shut down the instance's disks.")
2617

    
2618
    instance.primary_node = target_node
2619
    # distribute new instance config to the other nodes
2620
    self.cfg.Update(instance)
2621

    
2622
    # Only start the instance if it's marked as up
2623
    if instance.status == "up":
2624
      feedback_fn("* activating the instance's disks on target node")
2625
      logger.Info("Starting instance %s on node %s" %
2626
                  (instance.name, target_node))
2627

    
2628
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2629
                                               ignore_secondaries=True)
2630
      if not disks_ok:
2631
        _ShutdownInstanceDisks(instance, self.cfg)
2632
        raise errors.OpExecError("Can't activate the instance's disks")
2633

    
2634
      feedback_fn("* starting the instance on the target node")
2635
      if not rpc.call_instance_start(target_node, instance, None):
2636
        _ShutdownInstanceDisks(instance, self.cfg)
2637
        raise errors.OpExecError("Could not start instance %s on node %s." %
2638
                                 (instance.name, target_node))
2639

    
2640

    
2641
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2642
  """Create a tree of block devices on the primary node.
2643

2644
  This always creates all devices.
2645

2646
  """
2647
  if device.children:
2648
    for child in device.children:
2649
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2650
        return False
2651

    
2652
  cfg.SetDiskID(device, node)
2653
  new_id = rpc.call_blockdev_create(node, device, device.size,
2654
                                    instance.name, True, info)
2655
  if not new_id:
2656
    return False
2657
  if device.physical_id is None:
2658
    device.physical_id = new_id
2659
  return True
2660

    
2661

    
2662
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2663
  """Create a tree of block devices on a secondary node.
2664

2665
  If this device type has to be created on secondaries, create it and
2666
  all its children.
2667

2668
  If not, just recurse to children keeping the same 'force' value.
2669

2670
  """
2671
  if device.CreateOnSecondary():
2672
    force = True
2673
  if device.children:
2674
    for child in device.children:
2675
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2676
                                        child, force, info):
2677
        return False
2678

    
2679
  if not force:
2680
    return True
2681
  cfg.SetDiskID(device, node)
2682
  new_id = rpc.call_blockdev_create(node, device, device.size,
2683
                                    instance.name, False, info)
2684
  if not new_id:
2685
    return False
2686
  if device.physical_id is None:
2687
    device.physical_id = new_id
2688
  return True
2689

    
2690

    
2691
def _GenerateUniqueNames(cfg, exts):
2692
  """Generate a suitable LV name.
2693

2694
  This will generate a logical volume name for the given instance.
2695

2696
  """
2697
  results = []
2698
  for val in exts:
2699
    new_id = cfg.GenerateUniqueID()
2700
    results.append("%s%s" % (new_id, val))
2701
  return results
2702

    
2703

    
2704
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2705
  """Generate a drbd8 device complete with its children.
2706

2707
  """
2708
  port = cfg.AllocatePort()
2709
  vgname = cfg.GetVGName()
2710
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2711
                          logical_id=(vgname, names[0]))
2712
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2713
                          logical_id=(vgname, names[1]))
2714
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2715
                          logical_id = (primary, secondary, port),
2716
                          children = [dev_data, dev_meta],
2717
                          iv_name=iv_name)
2718
  return drbd_dev
2719

    
2720

    
2721
def _GenerateDiskTemplate(cfg, template_name,
2722
                          instance_name, primary_node,
2723
                          secondary_nodes, disk_sz, swap_sz,
2724
                          file_storage_dir, file_driver):
2725
  """Generate the entire disk layout for a given template type.
2726

2727
  """
2728
  #TODO: compute space requirements
2729

    
2730
  vgname = cfg.GetVGName()
2731
  if template_name == constants.DT_DISKLESS:
2732
    disks = []
2733
  elif template_name == constants.DT_PLAIN:
2734
    if len(secondary_nodes) != 0:
2735
      raise errors.ProgrammerError("Wrong template configuration")
2736

    
2737
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2738
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2739
                           logical_id=(vgname, names[0]),
2740
                           iv_name = "sda")
2741
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2742
                           logical_id=(vgname, names[1]),
2743
                           iv_name = "sdb")
2744
    disks = [sda_dev, sdb_dev]
2745
  elif template_name == constants.DT_DRBD8:
2746
    if len(secondary_nodes) != 1:
2747
      raise errors.ProgrammerError("Wrong template configuration")
2748
    remote_node = secondary_nodes[0]
2749
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2750
                                       ".sdb_data", ".sdb_meta"])
2751
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2752
                                         disk_sz, names[0:2], "sda")
2753
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2754
                                         swap_sz, names[2:4], "sdb")
2755
    disks = [drbd_sda_dev, drbd_sdb_dev]
2756
  elif template_name == constants.DT_FILE:
2757
    if len(secondary_nodes) != 0:
2758
      raise errors.ProgrammerError("Wrong template configuration")
2759

    
2760
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2761
                                iv_name="sda", logical_id=(file_driver,
2762
                                "%s/sda" % file_storage_dir))
2763
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2764
                                iv_name="sdb", logical_id=(file_driver,
2765
                                "%s/sdb" % file_storage_dir))
2766
    disks = [file_sda_dev, file_sdb_dev]
2767
  else:
2768
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2769
  return disks
2770

    
2771

    
2772
def _GetInstanceInfoText(instance):
2773
  """Compute that text that should be added to the disk's metadata.
2774

2775
  """
2776
  return "originstname+%s" % instance.name
2777

    
2778

    
2779
def _CreateDisks(cfg, instance):
2780
  """Create all disks for an instance.
2781

2782
  This abstracts away some work from AddInstance.
2783

2784
  Args:
2785
    instance: the instance object
2786

2787
  Returns:
2788
    True or False showing the success of the creation process
2789

2790
  """
2791
  info = _GetInstanceInfoText(instance)
2792

    
2793
  if instance.disk_template == constants.DT_FILE:
2794
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2795
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2796
                                              file_storage_dir)
2797

    
2798
    if not result:
2799
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2800
      return False
2801

    
2802
    if not result[0]:
2803
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2804
      return False
2805

    
2806
  for device in instance.disks:
2807
    logger.Info("creating volume %s for instance %s" %
2808
                (device.iv_name, instance.name))
2809
    #HARDCODE
2810
    for secondary_node in instance.secondary_nodes:
2811
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2812
                                        device, False, info):
2813
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2814
                     (device.iv_name, device, secondary_node))
2815
        return False
2816
    #HARDCODE
2817
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2818
                                    instance, device, info):
2819
      logger.Error("failed to create volume %s on primary!" %
2820
                   device.iv_name)
2821
      return False
2822

    
2823
  return True
2824

    
2825

    
2826
def _RemoveDisks(instance, cfg):
2827
  """Remove all disks for an instance.
2828

2829
  This abstracts away some work from `AddInstance()` and
2830
  `RemoveInstance()`. Note that in case some of the devices couldn't
2831
  be removed, the removal will continue with the other ones (compare
2832
  with `_CreateDisks()`).
2833

2834
  Args:
2835
    instance: the instance object
2836

2837
  Returns:
2838
    True or False showing the success of the removal proces
2839

2840
  """
2841
  logger.Info("removing block devices for instance %s" % instance.name)
2842

    
2843
  result = True
2844
  for device in instance.disks:
2845
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2846
      cfg.SetDiskID(disk, node)
2847
      if not rpc.call_blockdev_remove(node, disk):
2848
        logger.Error("could not remove block device %s on node %s,"
2849
                     " continuing anyway" %
2850
                     (device.iv_name, node))
2851
        result = False
2852

    
2853
  if instance.disk_template == constants.DT_FILE:
2854
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2855
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
2856
                                            file_storage_dir):
2857
      logger.Error("could not remove directory '%s'" % file_storage_dir)
2858
      result = False
2859

    
2860
  return result
2861

    
2862

    
2863
def _ComputeDiskSize(disk_template, disk_size, swap_size):
2864
  """Compute disk size requirements in the volume group
2865

2866
  This is currently hard-coded for the two-drive layout.
2867

2868
  """
2869
  # Required free disk space as a function of disk and swap space
2870
  req_size_dict = {
2871
    constants.DT_DISKLESS: None,
2872
    constants.DT_PLAIN: disk_size + swap_size,
2873
    # 256 MB are added for drbd metadata, 128MB for each drbd device
2874
    constants.DT_DRBD8: disk_size + swap_size + 256,
2875
    constants.DT_FILE: None,
2876
  }
2877

    
2878
  if disk_template not in req_size_dict:
2879
    raise errors.ProgrammerError("Disk template '%s' size requirement"
2880
                                 " is unknown" %  disk_template)
2881

    
2882
  return req_size_dict[disk_template]
2883

    
2884

    
2885
class LUCreateInstance(LogicalUnit):
2886
  """Create an instance.
2887

2888
  """
2889
  HPATH = "instance-add"
2890
  HTYPE = constants.HTYPE_INSTANCE
2891
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
2892
              "disk_template", "swap_size", "mode", "start", "vcpus",
2893
              "wait_for_sync", "ip_check", "mac"]
2894

    
2895
  def _RunAllocator(self):
2896
    """Run the allocator based on input opcode.
2897

2898
    """
2899
    disks = [{"size": self.op.disk_size, "mode": "w"},
2900
             {"size": self.op.swap_size, "mode": "w"}]
2901
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2902
             "bridge": self.op.bridge}]
2903
    ial = IAllocator(self.cfg, self.sstore,
2904
                     mode=constants.IALLOCATOR_MODE_ALLOC,
2905
                     name=self.op.instance_name,
2906
                     disk_template=self.op.disk_template,
2907
                     tags=[],
2908
                     os=self.op.os_type,
2909
                     vcpus=self.op.vcpus,
2910
                     mem_size=self.op.mem_size,
2911
                     disks=disks,
2912
                     nics=nics,
2913
                     )
2914

    
2915
    ial.Run(self.op.iallocator)
2916

    
2917
    if not ial.success:
2918
      raise errors.OpPrereqError("Can't compute nodes using"
2919
                                 " iallocator '%s': %s" % (self.op.iallocator,
2920
                                                           ial.info))
2921
    if len(ial.nodes) != ial.required_nodes:
2922
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2923
                                 " of nodes (%s), required %s" %
2924
                                 (len(ial.nodes), ial.required_nodes))
2925
    self.op.pnode = ial.nodes[0]
2926
    logger.ToStdout("Selected nodes for the instance: %s" %
2927
                    (", ".join(ial.nodes),))
2928
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2929
                (self.op.instance_name, self.op.iallocator, ial.nodes))
2930
    if ial.required_nodes == 2:
2931
      self.op.snode = ial.nodes[1]
2932

    
2933
  def BuildHooksEnv(self):
2934
    """Build hooks env.
2935

2936
    This runs on master, primary and secondary nodes of the instance.
2937

2938
    """
2939
    env = {
2940
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2941
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2942
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2943
      "INSTANCE_ADD_MODE": self.op.mode,
2944
      }
2945
    if self.op.mode == constants.INSTANCE_IMPORT:
2946
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2947
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2948
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2949

    
2950
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2951
      primary_node=self.op.pnode,
2952
      secondary_nodes=self.secondaries,
2953
      status=self.instance_status,
2954
      os_type=self.op.os_type,
2955
      memory=self.op.mem_size,
2956
      vcpus=self.op.vcpus,
2957
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2958
    ))
2959

    
2960
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2961
          self.secondaries)
2962
    return env, nl, nl
2963

    
2964

    
2965
  def CheckPrereq(self):
2966
    """Check prerequisites.
2967

2968
    """
2969
    # set optional parameters to none if they don't exist
2970
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
2971
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
2972
                 "vnc_bind_address"]:
2973
      if not hasattr(self.op, attr):
2974
        setattr(self.op, attr, None)
2975

    
2976
    if self.op.mode not in (constants.INSTANCE_CREATE,
2977
                            constants.INSTANCE_IMPORT):
2978
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2979
                                 self.op.mode)
2980

    
2981
    if (not self.cfg.GetVGName() and
2982
        self.op.disk_template not in constants.DTS_NOT_LVM):
2983
      raise errors.OpPrereqError("Cluster does not support lvm-based"
2984
                                 " instances")
2985

    
2986
    if self.op.mode == constants.INSTANCE_IMPORT:
2987
      src_node = getattr(self.op, "src_node", None)
2988
      src_path = getattr(self.op, "src_path", None)
2989
      if src_node is None or src_path is None:
2990
        raise errors.OpPrereqError("Importing an instance requires source"
2991
                                   " node and path options")
2992
      src_node_full = self.cfg.ExpandNodeName(src_node)
2993
      if src_node_full is None:
2994
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2995
      self.op.src_node = src_node = src_node_full
2996

    
2997
      if not os.path.isabs(src_path):
2998
        raise errors.OpPrereqError("The source path must be absolute")
2999

    
3000
      export_info = rpc.call_export_info(src_node, src_path)
3001

    
3002
      if not export_info:
3003
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3004

    
3005
      if not export_info.has_section(constants.INISECT_EXP):
3006
        raise errors.ProgrammerError("Corrupted export config")
3007

    
3008
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3009
      if (int(ei_version) != constants.EXPORT_VERSION):
3010
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3011
                                   (ei_version, constants.EXPORT_VERSION))
3012

    
3013
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3014
        raise errors.OpPrereqError("Can't import instance with more than"
3015
                                   " one data disk")
3016

    
3017
      # FIXME: are the old os-es, disk sizes, etc. useful?
3018
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3019
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3020
                                                         'disk0_dump'))
3021
      self.src_image = diskimage
3022
    else: # INSTANCE_CREATE
3023
      if getattr(self.op, "os_type", None) is None:
3024
        raise errors.OpPrereqError("No guest OS specified")
3025

    
3026
    #### instance parameters check
3027

    
3028
    # disk template and mirror node verification
3029
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3030
      raise errors.OpPrereqError("Invalid disk template name")
3031

    
3032
    # instance name verification
3033
    hostname1 = utils.HostInfo(self.op.instance_name)
3034

    
3035
    self.op.instance_name = instance_name = hostname1.name
3036
    instance_list = self.cfg.GetInstanceList()
3037
    if instance_name in instance_list:
3038
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3039
                                 instance_name)
3040

    
3041
    # ip validity checks
3042
    ip = getattr(self.op, "ip", None)
3043
    if ip is None or ip.lower() == "none":
3044
      inst_ip = None
3045
    elif ip.lower() == "auto":
3046
      inst_ip = hostname1.ip
3047
    else:
3048
      if not utils.IsValidIP(ip):
3049
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3050
                                   " like a valid IP" % ip)
3051
      inst_ip = ip
3052
    self.inst_ip = self.op.ip = inst_ip
3053

    
3054
    if self.op.start and not self.op.ip_check:
3055
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3056
                                 " adding an instance in start mode")
3057

    
3058
    if self.op.ip_check:
3059
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3060
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3061
                                   (hostname1.ip, instance_name))
3062

    
3063
    # MAC address verification
3064
    if self.op.mac != "auto":
3065
      if not utils.IsValidMac(self.op.mac.lower()):
3066
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3067
                                   self.op.mac)
3068

    
3069
    # bridge verification
3070
    bridge = getattr(self.op, "bridge", None)
3071
    if bridge is None:
3072
      self.op.bridge = self.cfg.GetDefBridge()
3073
    else:
3074
      self.op.bridge = bridge
3075

    
3076
    # boot order verification
3077
    if self.op.hvm_boot_order is not None:
3078
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3079
        raise errors.OpPrereqError("invalid boot order specified,"
3080
                                   " must be one or more of [acdn]")
3081
    # file storage checks
3082
    if (self.op.file_driver and
3083
        not self.op.file_driver in constants.FILE_DRIVER):
3084
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3085
                                 self.op.file_driver)
3086

    
3087
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3088
      raise errors.OpPrereqError("File storage directory not a relative"
3089
                                 " path")
3090
    #### allocator run
3091

    
3092
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3093
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3094
                                 " node must be given")
3095

    
3096
    if self.op.iallocator is not None:
3097
      self._RunAllocator()
3098

    
3099
    #### node related checks
3100

    
3101
    # check primary node
3102
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3103
    if pnode is None:
3104
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3105
                                 self.op.pnode)
3106
    self.op.pnode = pnode.name
3107
    self.pnode = pnode
3108
    self.secondaries = []
3109

    
3110
    # mirror node verification
3111
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3112
      if getattr(self.op, "snode", None) is None:
3113
        raise errors.OpPrereqError("The networked disk templates need"
3114
                                   " a mirror node")
3115

    
3116
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3117
      if snode_name is None:
3118
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3119
                                   self.op.snode)
3120
      elif snode_name == pnode.name:
3121
        raise errors.OpPrereqError("The secondary node cannot be"
3122
                                   " the primary node.")
3123
      self.secondaries.append(snode_name)
3124

    
3125
    req_size = _ComputeDiskSize(self.op.disk_template,
3126
                                self.op.disk_size, self.op.swap_size)
3127

    
3128
    # Check lv size requirements
3129
    if req_size is not None:
3130
      nodenames = [pnode.name] + self.secondaries
3131
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3132
      for node in nodenames:
3133
        info = nodeinfo.get(node, None)
3134
        if not info:
3135
          raise errors.OpPrereqError("Cannot get current information"
3136
                                     " from node '%s'" % node)
3137
        vg_free = info.get('vg_free', None)
3138
        if not isinstance(vg_free, int):
3139
          raise errors.OpPrereqError("Can't compute free disk space on"
3140
                                     " node %s" % node)
3141
        if req_size > info['vg_free']:
3142
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3143
                                     " %d MB available, %d MB required" %
3144
                                     (node, info['vg_free'], req_size))
3145

    
3146
    # os verification
3147
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3148
    if not os_obj:
3149
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3150
                                 " primary node"  % self.op.os_type)
3151

    
3152
    if self.op.kernel_path == constants.VALUE_NONE:
3153
      raise errors.OpPrereqError("Can't set instance kernel to none")
3154

    
3155

    
3156
    # bridge check on primary node
3157
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3158
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3159
                                 " destination node '%s'" %
3160
                                 (self.op.bridge, pnode.name))
3161

    
3162
    # memory check on primary node
3163
    if self.op.start:
3164
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3165
                           "creating instance %s" % self.op.instance_name,
3166
                           self.op.mem_size)
3167

    
3168
    # hvm_cdrom_image_path verification
3169
    if self.op.hvm_cdrom_image_path is not None:
3170
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3171
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3172
                                   " be an absolute path or None, not %s" %
3173
                                   self.op.hvm_cdrom_image_path)
3174
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3175
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3176
                                   " regular file or a symlink pointing to"
3177
                                   " an existing regular file, not %s" %
3178
                                   self.op.hvm_cdrom_image_path)
3179

    
3180
    # vnc_bind_address verification
3181
    if self.op.vnc_bind_address is not None:
3182
      if not utils.IsValidIP(self.op.vnc_bind_address):
3183
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3184
                                   " like a valid IP address" %
3185
                                   self.op.vnc_bind_address)
3186

    
3187
    if self.op.start:
3188
      self.instance_status = 'up'
3189
    else:
3190
      self.instance_status = 'down'
3191

    
3192
  def Exec(self, feedback_fn):
3193
    """Create and add the instance to the cluster.
3194

3195
    """
3196
    instance = self.op.instance_name
3197
    pnode_name = self.pnode.name
3198

    
3199
    if self.op.mac == "auto":
3200
      mac_address = self.cfg.GenerateMAC()
3201
    else:
3202
      mac_address = self.op.mac
3203

    
3204
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3205
    if self.inst_ip is not None:
3206
      nic.ip = self.inst_ip
3207

    
3208
    ht_kind = self.sstore.GetHypervisorType()
3209
    if ht_kind in constants.HTS_REQ_PORT:
3210
      network_port = self.cfg.AllocatePort()
3211
    else:
3212
      network_port = None
3213

    
3214
    if self.op.vnc_bind_address is None:
3215
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3216

    
3217
    # this is needed because os.path.join does not accept None arguments
3218
    if self.op.file_storage_dir is None:
3219
      string_file_storage_dir = ""
3220
    else:
3221
      string_file_storage_dir = self.op.file_storage_dir
3222

    
3223
    # build the full file storage dir path
3224
    file_storage_dir = os.path.normpath(os.path.join(
3225
                                        self.sstore.GetFileStorageDir(),
3226
                                        string_file_storage_dir, instance))
3227

    
3228

    
3229
    disks = _GenerateDiskTemplate(self.cfg,
3230
                                  self.op.disk_template,
3231
                                  instance, pnode_name,
3232
                                  self.secondaries, self.op.disk_size,
3233
                                  self.op.swap_size,
3234
                                  file_storage_dir,
3235
                                  self.op.file_driver)
3236

    
3237
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3238
                            primary_node=pnode_name,
3239
                            memory=self.op.mem_size,
3240
                            vcpus=self.op.vcpus,
3241
                            nics=[nic], disks=disks,
3242
                            disk_template=self.op.disk_template,
3243
                            status=self.instance_status,
3244
                            network_port=network_port,
3245
                            kernel_path=self.op.kernel_path,
3246
                            initrd_path=self.op.initrd_path,
3247
                            hvm_boot_order=self.op.hvm_boot_order,
3248
                            hvm_acpi=self.op.hvm_acpi,
3249
                            hvm_pae=self.op.hvm_pae,
3250
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3251
                            vnc_bind_address=self.op.vnc_bind_address,
3252
                            )
3253

    
3254
    feedback_fn("* creating instance disks...")
3255
    if not _CreateDisks(self.cfg, iobj):
3256
      _RemoveDisks(iobj, self.cfg)
3257
      raise errors.OpExecError("Device creation failed, reverting...")
3258

    
3259
    feedback_fn("adding instance %s to cluster config" % instance)
3260

    
3261
    self.cfg.AddInstance(iobj)
3262

    
3263
    if self.op.wait_for_sync:
3264
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3265
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3266
      # make sure the disks are not degraded (still sync-ing is ok)
3267
      time.sleep(15)
3268
      feedback_fn("* checking mirrors status")
3269
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3270
    else:
3271
      disk_abort = False
3272

    
3273
    if disk_abort:
3274
      _RemoveDisks(iobj, self.cfg)
3275
      self.cfg.RemoveInstance(iobj.name)
3276
      raise errors.OpExecError("There are some degraded disks for"
3277
                               " this instance")
3278

    
3279
    feedback_fn("creating os for instance %s on node %s" %
3280
                (instance, pnode_name))
3281

    
3282
    if iobj.disk_template != constants.DT_DISKLESS:
3283
      if self.op.mode == constants.INSTANCE_CREATE:
3284
        feedback_fn("* running the instance OS create scripts...")
3285
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3286
          raise errors.OpExecError("could not add os for instance %s"
3287
                                   " on node %s" %
3288
                                   (instance, pnode_name))
3289

    
3290
      elif self.op.mode == constants.INSTANCE_IMPORT:
3291
        feedback_fn("* running the instance OS import scripts...")
3292
        src_node = self.op.src_node
3293
        src_image = self.src_image
3294
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3295
                                                src_node, src_image):
3296
          raise errors.OpExecError("Could not import os for instance"
3297
                                   " %s on node %s" %
3298
                                   (instance, pnode_name))
3299
      else:
3300
        # also checked in the prereq part
3301
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3302
                                     % self.op.mode)
3303

    
3304
    if self.op.start:
3305
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3306
      feedback_fn("* starting instance...")
3307
      if not rpc.call_instance_start(pnode_name, iobj, None):
3308
        raise errors.OpExecError("Could not start instance")
3309

    
3310

    
3311
class LUConnectConsole(NoHooksLU):
3312
  """Connect to an instance's console.
3313

3314
  This is somewhat special in that it returns the command line that
3315
  you need to run on the master node in order to connect to the
3316
  console.
3317

3318
  """
3319
  _OP_REQP = ["instance_name"]
3320

    
3321
  def CheckPrereq(self):
3322
    """Check prerequisites.
3323

3324
    This checks that the instance is in the cluster.
3325

3326
    """
3327
    instance = self.cfg.GetInstanceInfo(
3328
      self.cfg.ExpandInstanceName(self.op.instance_name))
3329
    if instance is None:
3330
      raise errors.OpPrereqError("Instance '%s' not known" %
3331
                                 self.op.instance_name)
3332
    self.instance = instance
3333

    
3334
  def Exec(self, feedback_fn):
3335
    """Connect to the console of an instance
3336

3337
    """
3338
    instance = self.instance
3339
    node = instance.primary_node
3340

    
3341
    node_insts = rpc.call_instance_list([node])[node]
3342
    if node_insts is False:
3343
      raise errors.OpExecError("Can't connect to node %s." % node)
3344

    
3345
    if instance.name not in node_insts:
3346
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3347

    
3348
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3349

    
3350
    hyper = hypervisor.GetHypervisor()
3351
    console_cmd = hyper.GetShellCommandForConsole(instance)
3352

    
3353
    # build ssh cmdline
3354
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3355

    
3356

    
3357
class LUReplaceDisks(LogicalUnit):
3358
  """Replace the disks of an instance.
3359

3360
  """
3361
  HPATH = "mirrors-replace"
3362
  HTYPE = constants.HTYPE_INSTANCE
3363
  _OP_REQP = ["instance_name", "mode", "disks"]
3364

    
3365
  def _RunAllocator(self):
3366
    """Compute a new secondary node using an IAllocator.
3367

3368
    """
3369
    ial = IAllocator(self.cfg, self.sstore,
3370
                     mode=constants.IALLOCATOR_MODE_RELOC,
3371
                     name=self.op.instance_name,
3372
                     relocate_from=[self.sec_node])
3373

    
3374
    ial.Run(self.op.iallocator)
3375

    
3376
    if not ial.success:
3377
      raise errors.OpPrereqError("Can't compute nodes using"
3378
                                 " iallocator '%s': %s" % (self.op.iallocator,
3379
                                                           ial.info))
3380
    if len(ial.nodes) != ial.required_nodes:
3381
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3382
                                 " of nodes (%s), required %s" %
3383
                                 (len(ial.nodes), ial.required_nodes))
3384
    self.op.remote_node = ial.nodes[0]
3385
    logger.ToStdout("Selected new secondary for the instance: %s" %
3386
                    self.op.remote_node)
3387

    
3388
  def BuildHooksEnv(self):
3389
    """Build hooks env.
3390

3391
    This runs on the master, the primary and all the secondaries.
3392

3393
    """
3394
    env = {
3395
      "MODE": self.op.mode,
3396
      "NEW_SECONDARY": self.op.remote_node,
3397
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3398
      }
3399
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3400
    nl = [
3401
      self.sstore.GetMasterNode(),
3402
      self.instance.primary_node,
3403
      ]
3404
    if self.op.remote_node is not None:
3405
      nl.append(self.op.remote_node)
3406
    return env, nl, nl
3407

    
3408
  def CheckPrereq(self):
3409
    """Check prerequisites.
3410

3411
    This checks that the instance is in the cluster.
3412

3413
    """
3414
    if not hasattr(self.op, "remote_node"):
3415
      self.op.remote_node = None
3416

    
3417
    instance = self.cfg.GetInstanceInfo(
3418
      self.cfg.ExpandInstanceName(self.op.instance_name))
3419
    if instance is None:
3420
      raise errors.OpPrereqError("Instance '%s' not known" %
3421
                                 self.op.instance_name)
3422
    self.instance = instance
3423
    self.op.instance_name = instance.name
3424

    
3425
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3426
      raise errors.OpPrereqError("Instance's disk layout is not"
3427
                                 " network mirrored.")
3428

    
3429
    if len(instance.secondary_nodes) != 1:
3430
      raise errors.OpPrereqError("The instance has a strange layout,"
3431
                                 " expected one secondary but found %d" %
3432
                                 len(instance.secondary_nodes))
3433

    
3434
    self.sec_node = instance.secondary_nodes[0]
3435

    
3436
    ia_name = getattr(self.op, "iallocator", None)
3437
    if ia_name is not None:
3438
      if self.op.remote_node is not None:
3439
        raise errors.OpPrereqError("Give either the iallocator or the new"
3440
                                   " secondary, not both")
3441
      self.op.remote_node = self._RunAllocator()
3442

    
3443
    remote_node = self.op.remote_node
3444
    if remote_node is not None:
3445
      remote_node = self.cfg.ExpandNodeName(remote_node)
3446
      if remote_node is None:
3447
        raise errors.OpPrereqError("Node '%s' not known" %
3448
                                   self.op.remote_node)
3449
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3450
    else:
3451
      self.remote_node_info = None
3452
    if remote_node == instance.primary_node:
3453
      raise errors.OpPrereqError("The specified node is the primary node of"
3454
                                 " the instance.")
3455
    elif remote_node == self.sec_node:
3456
      if self.op.mode == constants.REPLACE_DISK_SEC:
3457
        # this is for DRBD8, where we can't execute the same mode of
3458
        # replacement as for drbd7 (no different port allocated)
3459
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3460
                                   " replacement")
3461
    if instance.disk_template == constants.DT_DRBD8:
3462
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3463
          remote_node is not None):
3464
        # switch to replace secondary mode
3465
        self.op.mode = constants.REPLACE_DISK_SEC
3466

    
3467
      if self.op.mode == constants.REPLACE_DISK_ALL:
3468
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3469
                                   " secondary disk replacement, not"
3470
                                   " both at once")
3471
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3472
        if remote_node is not None:
3473
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3474
                                     " the secondary while doing a primary"
3475
                                     " node disk replacement")
3476
        self.tgt_node = instance.primary_node
3477
        self.oth_node = instance.secondary_nodes[0]
3478
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3479
        self.new_node = remote_node # this can be None, in which case
3480
                                    # we don't change the secondary
3481
        self.tgt_node = instance.secondary_nodes[0]
3482
        self.oth_node = instance.primary_node
3483
      else:
3484
        raise errors.ProgrammerError("Unhandled disk replace mode")
3485

    
3486
    for name in self.op.disks:
3487
      if instance.FindDisk(name) is None:
3488
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3489
                                   (name, instance.name))
3490
    self.op.remote_node = remote_node
3491

    
3492
  def _ExecD8DiskOnly(self, feedback_fn):
3493
    """Replace a disk on the primary or secondary for dbrd8.
3494

3495
    The algorithm for replace is quite complicated:
3496
      - for each disk to be replaced:
3497
        - create new LVs on the target node with unique names
3498
        - detach old LVs from the drbd device
3499
        - rename old LVs to name_replaced.<time_t>
3500
        - rename new LVs to old LVs
3501
        - attach the new LVs (with the old names now) to the drbd device
3502
      - wait for sync across all devices
3503
      - for each modified disk:
3504
        - remove old LVs (which have the name name_replaces.<time_t>)
3505

3506
    Failures are not very well handled.
3507

3508
    """
3509
    steps_total = 6
3510
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3511
    instance = self.instance
3512
    iv_names = {}
3513
    vgname = self.cfg.GetVGName()
3514
    # start of work
3515
    cfg = self.cfg
3516
    tgt_node = self.tgt_node
3517
    oth_node = self.oth_node
3518

    
3519
    # Step: check device activation
3520
    self.proc.LogStep(1, steps_total, "check device existence")
3521
    info("checking volume groups")
3522
    my_vg = cfg.GetVGName()
3523
    results = rpc.call_vg_list([oth_node, tgt_node])
3524
    if not results:
3525
      raise errors.OpExecError("Can't list volume groups on the nodes")
3526
    for node in oth_node, tgt_node:
3527
      res = results.get(node, False)
3528
      if not res or my_vg not in res:
3529
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3530
                                 (my_vg, node))
3531
    for dev in instance.disks:
3532
      if not dev.iv_name in self.op.disks:
3533
        continue
3534
      for node in tgt_node, oth_node:
3535
        info("checking %s on %s" % (dev.iv_name, node))
3536
        cfg.SetDiskID(dev, node)
3537
        if not rpc.call_blockdev_find(node, dev):
3538
          raise errors.OpExecError("Can't find device %s on node %s" %
3539
                                   (dev.iv_name, node))
3540

    
3541
    # Step: check other node consistency
3542
    self.proc.LogStep(2, steps_total, "check peer consistency")
3543
    for dev in instance.disks:
3544
      if not dev.iv_name in self.op.disks:
3545
        continue
3546
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3547
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3548
                                   oth_node==instance.primary_node):
3549
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3550
                                 " to replace disks on this node (%s)" %
3551
                                 (oth_node, tgt_node))
3552

    
3553
    # Step: create new storage
3554
    self.proc.LogStep(3, steps_total, "allocate new storage")
3555
    for dev in instance.disks:
3556
      if not dev.iv_name in self.op.disks:
3557
        continue
3558
      size = dev.size
3559
      cfg.SetDiskID(dev, tgt_node)
3560
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3561
      names = _GenerateUniqueNames(cfg, lv_names)
3562
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3563
                             logical_id=(vgname, names[0]))
3564
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3565
                             logical_id=(vgname, names[1]))
3566
      new_lvs = [lv_data, lv_meta]
3567
      old_lvs = dev.children
3568
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3569
      info("creating new local storage on %s for %s" %
3570
           (tgt_node, dev.iv_name))
3571
      # since we *always* want to create this LV, we use the
3572
      # _Create...OnPrimary (which forces the creation), even if we
3573
      # are talking about the secondary node
3574
      for new_lv in new_lvs:
3575
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3576
                                        _GetInstanceInfoText(instance)):
3577
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3578
                                   " node '%s'" %
3579
                                   (new_lv.logical_id[1], tgt_node))
3580

    
3581
    # Step: for each lv, detach+rename*2+attach
3582
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3583
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3584
      info("detaching %s drbd from local storage" % dev.iv_name)
3585
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3586
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3587
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3588
      #dev.children = []
3589
      #cfg.Update(instance)
3590

    
3591
      # ok, we created the new LVs, so now we know we have the needed
3592
      # storage; as such, we proceed on the target node to rename
3593
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3594
      # using the assumption that logical_id == physical_id (which in
3595
      # turn is the unique_id on that node)
3596

    
3597
      # FIXME(iustin): use a better name for the replaced LVs
3598
      temp_suffix = int(time.time())
3599
      ren_fn = lambda d, suff: (d.physical_id[0],
3600
                                d.physical_id[1] + "_replaced-%s" % suff)
3601
      # build the rename list based on what LVs exist on the node
3602
      rlist = []
3603
      for to_ren in old_lvs:
3604
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3605
        if find_res is not None: # device exists
3606
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3607

    
3608
      info("renaming the old LVs on the target node")
3609
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3610
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3611
      # now we rename the new LVs to the old LVs
3612
      info("renaming the new LVs on the target node")
3613
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3614
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3615
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3616

    
3617
      for old, new in zip(old_lvs, new_lvs):
3618
        new.logical_id = old.logical_id
3619
        cfg.SetDiskID(new, tgt_node)
3620

    
3621
      for disk in old_lvs:
3622
        disk.logical_id = ren_fn(disk, temp_suffix)
3623
        cfg.SetDiskID(disk, tgt_node)
3624

    
3625
      # now that the new lvs have the old name, we can add them to the device
3626
      info("adding new mirror component on %s" % tgt_node)
3627
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3628
        for new_lv in new_lvs:
3629
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3630
            warning("Can't rollback device %s", hint="manually cleanup unused"
3631
                    " logical volumes")
3632
        raise errors.OpExecError("Can't add local storage to drbd")
3633

    
3634
      dev.children = new_lvs
3635
      cfg.Update(instance)
3636

    
3637
    # Step: wait for sync
3638

    
3639
    # this can fail as the old devices are degraded and _WaitForSync
3640
    # does a combined result over all disks, so we don't check its
3641
    # return value
3642
    self.proc.LogStep(5, steps_total, "sync devices")
3643
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3644

    
3645
    # so check manually all the devices
3646
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3647
      cfg.SetDiskID(dev, instance.primary_node)
3648
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3649
      if is_degr:
3650
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3651

    
3652
    # Step: remove old storage
3653
    self.proc.LogStep(6, steps_total, "removing old storage")
3654
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3655
      info("remove logical volumes for %s" % name)
3656
      for lv in old_lvs:
3657
        cfg.SetDiskID(lv, tgt_node)
3658
        if not rpc.call_blockdev_remove(tgt_node, lv):
3659
          warning("Can't remove old LV", hint="manually remove unused LVs")
3660
          continue
3661

    
3662
  def _ExecD8Secondary(self, feedback_fn):
3663
    """Replace the secondary node for drbd8.
3664

3665
    The algorithm for replace is quite complicated:
3666
      - for all disks of the instance:
3667
        - create new LVs on the new node with same names
3668
        - shutdown the drbd device on the old secondary
3669
        - disconnect the drbd network on the primary
3670
        - create the drbd device on the new secondary
3671
        - network attach the drbd on the primary, using an artifice:
3672
          the drbd code for Attach() will connect to the network if it
3673
          finds a device which is connected to the good local disks but
3674
          not network enabled
3675
      - wait for sync across all devices
3676
      - remove all disks from the old secondary
3677

3678
    Failures are not very well handled.
3679

3680
    """
3681
    steps_total = 6
3682
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3683
    instance = self.instance
3684
    iv_names = {}
3685
    vgname = self.cfg.GetVGName()
3686
    # start of work
3687
    cfg = self.cfg
3688
    old_node = self.tgt_node
3689
    new_node = self.new_node
3690
    pri_node = instance.primary_node
3691

    
3692
    # Step: check device activation
3693
    self.proc.LogStep(1, steps_total, "check device existence")
3694
    info("checking volume groups")
3695
    my_vg = cfg.GetVGName()
3696
    results = rpc.call_vg_list([pri_node, new_node])
3697
    if not results:
3698
      raise errors.OpExecError("Can't list volume groups on the nodes")
3699
    for node in pri_node, new_node:
3700
      res = results.get(node, False)
3701
      if not res or my_vg not in res:
3702
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3703
                                 (my_vg, node))
3704
    for dev in instance.disks:
3705
      if not dev.iv_name in self.op.disks:
3706
        continue
3707
      info("checking %s on %s" % (dev.iv_name, pri_node))
3708
      cfg.SetDiskID(dev, pri_node)
3709
      if not rpc.call_blockdev_find(pri_node, dev):
3710
        raise errors.OpExecError("Can't find device %s on node %s" %
3711
                                 (dev.iv_name, pri_node))
3712

    
3713
    # Step: check other node consistency
3714
    self.proc.LogStep(2, steps_total, "check peer consistency")
3715
    for dev in instance.disks:
3716
      if not dev.iv_name in self.op.disks:
3717
        continue
3718
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3719
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3720
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3721
                                 " unsafe to replace the secondary" %
3722
                                 pri_node)
3723

    
3724
    # Step: create new storage
3725
    self.proc.LogStep(3, steps_total, "allocate new storage")
3726
    for dev in instance.disks:
3727
      size = dev.size
3728
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3729
      # since we *always* want to create this LV, we use the
3730
      # _Create...OnPrimary (which forces the creation), even if we
3731
      # are talking about the secondary node
3732
      for new_lv in dev.children:
3733
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3734
                                        _GetInstanceInfoText(instance)):
3735
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3736
                                   " node '%s'" %
3737
                                   (new_lv.logical_id[1], new_node))
3738

    
3739
      iv_names[dev.iv_name] = (dev, dev.children)
3740

    
3741
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3742
    for dev in instance.disks:
3743
      size = dev.size
3744
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3745
      # create new devices on new_node
3746
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3747
                              logical_id=(pri_node, new_node,
3748
                                          dev.logical_id[2]),
3749
                              children=dev.children)
3750
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3751
                                        new_drbd, False,
3752
                                      _GetInstanceInfoText(instance)):
3753
        raise errors.OpExecError("Failed to create new DRBD on"
3754
                                 " node '%s'" % new_node)
3755

    
3756
    for dev in instance.disks:
3757
      # we have new devices, shutdown the drbd on the old secondary
3758
      info("shutting down drbd for %s on old node" % dev.iv_name)
3759
      cfg.SetDiskID(dev, old_node)
3760
      if not rpc.call_blockdev_shutdown(old_node, dev):
3761
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3762
                hint="Please cleanup this device manually as soon as possible")
3763

    
3764
    info("detaching primary drbds from the network (=> standalone)")
3765
    done = 0
3766
    for dev in instance.disks:
3767
      cfg.SetDiskID(dev, pri_node)
3768
      # set the physical (unique in bdev terms) id to None, meaning
3769
      # detach from network
3770
      dev.physical_id = (None,) * len(dev.physical_id)
3771
      # and 'find' the device, which will 'fix' it to match the
3772
      # standalone state
3773
      if rpc.call_blockdev_find(pri_node, dev):
3774
        done += 1
3775
      else:
3776
        warning("Failed to detach drbd %s from network, unusual case" %
3777
                dev.iv_name)
3778

    
3779
    if not done:
3780
      # no detaches succeeded (very unlikely)
3781
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3782

    
3783
    # if we managed to detach at least one, we update all the disks of
3784
    # the instance to point to the new secondary
3785
    info("updating instance configuration")
3786
    for dev in instance.disks:
3787
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3788
      cfg.SetDiskID(dev, pri_node)
3789
    cfg.Update(instance)
3790

    
3791
    # and now perform the drbd attach
3792
    info("attaching primary drbds to new secondary (standalone => connected)")
3793
    failures = []
3794
    for dev in instance.disks:
3795
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3796
      # since the attach is smart, it's enough to 'find' the device,
3797
      # it will automatically activate the network, if the physical_id
3798
      # is correct
3799
      cfg.SetDiskID(dev, pri_node)
3800
      if not rpc.call_blockdev_find(pri_node, dev):
3801
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3802
                "please do a gnt-instance info to see the status of disks")
3803

    
3804
    # this can fail as the old devices are degraded and _WaitForSync
3805
    # does a combined result over all disks, so we don't check its
3806
    # return value
3807
    self.proc.LogStep(5, steps_total, "sync devices")
3808
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3809

    
3810
    # so check manually all the devices
3811
    for name, (dev, old_lvs) in iv_names.iteritems():
3812
      cfg.SetDiskID(dev, pri_node)
3813
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3814
      if is_degr:
3815
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3816

    
3817
    self.proc.LogStep(6, steps_total, "removing old storage")
3818
    for name, (dev, old_lvs) in iv_names.iteritems():
3819
      info("remove logical volumes for %s" % name)
3820
      for lv in old_lvs:
3821
        cfg.SetDiskID(lv, old_node)
3822
        if not rpc.call_blockdev_remove(old_node, lv):
3823
          warning("Can't remove LV on old secondary",
3824
                  hint="Cleanup stale volumes by hand")
3825

    
3826
  def Exec(self, feedback_fn):
3827
    """Execute disk replacement.
3828

3829
    This dispatches the disk replacement to the appropriate handler.
3830

3831
    """
3832
    instance = self.instance
3833

    
3834
    # Activate the instance disks if we're replacing them on a down instance
3835
    if instance.status == "down":
3836
      op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3837
      self.proc.ChainOpCode(op)
3838

    
3839
    if instance.disk_template == constants.DT_DRBD8:
3840
      if self.op.remote_node is None:
3841
        fn = self._ExecD8DiskOnly
3842
      else:
3843
        fn = self._ExecD8Secondary
3844
    else:
3845
      raise errors.ProgrammerError("Unhandled disk replacement case")
3846

    
3847
    ret = fn(feedback_fn)
3848

    
3849
    # Deactivate the instance disks if we're replacing them on a down instance
3850
    if instance.status == "down":
3851
      op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3852
      self.proc.ChainOpCode(op)
3853

    
3854
    return ret
3855

    
3856

    
3857
class LUGrowDisk(LogicalUnit):
3858
  """Grow a disk of an instance.
3859

3860
  """
3861
  HPATH = "disk-grow"
3862
  HTYPE = constants.HTYPE_INSTANCE
3863
  _OP_REQP = ["instance_name", "disk", "amount"]
3864

    
3865
  def BuildHooksEnv(self):
3866
    """Build hooks env.
3867

3868
    This runs on the master, the primary and all the secondaries.
3869

3870
    """
3871
    env = {
3872
      "DISK": self.op.disk,
3873
      "AMOUNT": self.op.amount,
3874
      }
3875
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3876
    nl = [
3877
      self.sstore.GetMasterNode(),
3878
      self.instance.primary_node,
3879
      ]
3880
    return env, nl, nl
3881

    
3882
  def CheckPrereq(self):
3883
    """Check prerequisites.
3884

3885
    This checks that the instance is in the cluster.
3886

3887
    """
3888
    instance = self.cfg.GetInstanceInfo(
3889
      self.cfg.ExpandInstanceName(self.op.instance_name))
3890
    if instance is None:
3891
      raise errors.OpPrereqError("Instance '%s' not known" %
3892
                                 self.op.instance_name)
3893
    self.instance = instance
3894
    self.op.instance_name = instance.name
3895

    
3896
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3897
      raise errors.OpPrereqError("Instance's disk layout does not support"
3898
                                 " growing.")
3899

    
3900
    if instance.FindDisk(self.op.disk) is None:
3901
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3902
                                 (self.op.disk, instance.name))
3903

    
3904
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3905
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3906
    for node in nodenames:
3907
      info = nodeinfo.get(node, None)
3908
      if not info:
3909
        raise errors.OpPrereqError("Cannot get current information"
3910
                                   " from node '%s'" % node)
3911
      vg_free = info.get('vg_free', None)
3912
      if not isinstance(vg_free, int):
3913
        raise errors.OpPrereqError("Can't compute free disk space on"
3914
                                   " node %s" % node)
3915
      if self.op.amount > info['vg_free']:
3916
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
3917
                                   " %d MiB available, %d MiB required" %
3918
                                   (node, info['vg_free'], self.op.amount))
3919

    
3920
  def Exec(self, feedback_fn):
3921
    """Execute disk grow.
3922

3923
    """
3924
    instance = self.instance
3925
    disk = instance.FindDisk(self.op.disk)
3926
    for node in (instance.secondary_nodes + (instance.primary_node,)):
3927
      self.cfg.SetDiskID(disk, node)
3928
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3929
      if not result or not isinstance(result, tuple) or len(result) != 2:
3930
        raise errors.OpExecError("grow request failed to node %s" % node)
3931
      elif not result[0]:
3932
        raise errors.OpExecError("grow request failed to node %s: %s" %
3933
                                 (node, result[1]))
3934
    disk.RecordGrow(self.op.amount)
3935
    self.cfg.Update(instance)
3936
    return
3937

    
3938

    
3939
class LUQueryInstanceData(NoHooksLU):
3940
  """Query runtime instance data.
3941

3942
  """
3943
  _OP_REQP = ["instances"]
3944

    
3945
  def CheckPrereq(self):
3946
    """Check prerequisites.
3947

3948
    This only checks the optional instance list against the existing names.
3949

3950
    """
3951
    if not isinstance(self.op.instances, list):
3952
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3953
    if self.op.instances:
3954
      self.wanted_instances = []
3955
      names = self.op.instances
3956
      for name in names:
3957
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3958
        if instance is None:
3959
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3960
        self.wanted_instances.append(instance)
3961
    else:
3962
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3963
                               in self.cfg.GetInstanceList()]
3964
    return
3965

    
3966

    
3967
  def _ComputeDiskStatus(self, instance, snode, dev):
3968
    """Compute block device status.
3969

3970
    """
3971
    self.cfg.SetDiskID(dev, instance.primary_node)
3972
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3973
    if dev.dev_type in constants.LDS_DRBD:
3974
      # we change the snode then (otherwise we use the one passed in)
3975
      if dev.logical_id[0] == instance.primary_node:
3976
        snode = dev.logical_id[1]
3977
      else:
3978
        snode = dev.logical_id[0]
3979

    
3980
    if snode:
3981
      self.cfg.SetDiskID(dev, snode)
3982
      dev_sstatu