Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ d4fa5c23

History | View | Annotate | Download (170 kB)

<
1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46
from ganeti import serializer
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq
55
    - implement Exec
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_MASTER: the LU needs to run on the master node
60
        REQ_WSSTORE: the LU needs a writable SimpleStore
61
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
62

63
  Note that all commands require root permissions.
64

65
  """
66
  HPATH = None
67
  HTYPE = None
68
  _OP_REQP = []
69
  REQ_MASTER = True
70
  REQ_WSSTORE = False
71
  REQ_BGL = True
72

    
73
  def __init__(self, processor, op, context, sstore):
74
    """Constructor for LogicalUnit.
75

76
    This needs to be overriden in derived classes in order to check op
77
    validity.
78

79
    """
80
    self.proc = processor
81
    self.op = op
82
    self.cfg = context.cfg
83
    self.sstore = sstore
84
    self.context = context
85
    self.needed_locks = None
86
    self.__ssh = None
87

    
88
    for attr_name in self._OP_REQP:
89
      attr_val = getattr(op, attr_name, None)
90
      if attr_val is None:
91
        raise errors.OpPrereqError("Required parameter '%s' missing" %
92
                                   attr_name)
93

    
94
    if not self.cfg.IsCluster():
95
      raise errors.OpPrereqError("Cluster not initialized yet,"
96
                                 " use 'gnt-cluster init' first.")
97
    if self.REQ_MASTER:
98
      master = sstore.GetMasterNode()
99
      if master != utils.HostInfo().name:
100
        raise errors.OpPrereqError("Commands must be run on the master"
101
                                   " node %s" % master)
102

    
103
  def __GetSSH(self):
104
    """Returns the SshRunner object
105

106
    """
107
    if not self.__ssh:
108
      self.__ssh = ssh.SshRunner(self.sstore)
109
    return self.__ssh
110

    
111
  ssh = property(fget=__GetSSH)
112

    
113
  def ExpandNames(self):
114
    """Expand names for this LU.
115

116
    This method is called before starting to execute the opcode, and it should
117
    update all the parameters of the opcode to their canonical form (e.g. a
118
    short node name must be fully expanded after this method has successfully
119
    completed). This way locking, hooks, logging, ecc. can work correctly.
120

121
    LUs which implement this method must also populate the self.needed_locks
122
    member, as a dict with lock levels as keys, and a list of needed lock names
123
    as values. Rules:
124
      - Use an empty dict if you don't need any lock
125
      - If you don't need any lock at a particular level omit that level
126
      - Don't put anything for the BGL level
127
      - If you want all locks at a level use None as a value
128
        (this reflects what LockSet does, and will be replaced before
129
        CheckPrereq with the full list of nodes that have been locked)
130

131
    Examples:
132
    # Acquire all nodes and one instance
133
    self.needed_locks = {
134
      locking.LEVEL_NODE: None,
135
      locking.LEVEL_INSTANCES: ['instance1.example.tld'],
136
    }
137
    # Acquire just two nodes
138
    self.needed_locks = {
139
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
140
    }
141
    # Acquire no locks
142
    self.needed_locks = {} # No, you can't leave it to the default value None
143

144
    """
145
    # The implementation of this method is mandatory only if the new LU is
146
    # concurrent, so that old LUs don't need to be changed all at the same
147
    # time.
148
    if self.REQ_BGL:
149
      self.needed_locks = {} # Exclusive LUs don't need locks.
150
    else:
151
      raise NotImplementedError
152

    
153
  def CheckPrereq(self):
154
    """Check prerequisites for this LU.
155

156
    This method should check that the prerequisites for the execution
157
    of this LU are fulfilled. It can do internode communication, but
158
    it should be idempotent - no cluster or system changes are
159
    allowed.
160

161
    The method should raise errors.OpPrereqError in case something is
162
    not fulfilled. Its return value is ignored.
163

164
    This method should also update all the parameters of the opcode to
165
    their canonical form if it hasn't been done by ExpandNames before.
166

167
    """
168
    raise NotImplementedError
169

    
170
  def Exec(self, feedback_fn):
171
    """Execute the LU.
172

173
    This method should implement the actual work. It should raise
174
    errors.OpExecError for failures that are somewhat dealt with in
175
    code, or expected.
176

177
    """
178
    raise NotImplementedError
179

    
180
  def BuildHooksEnv(self):
181
    """Build hooks environment for this LU.
182

183
    This method should return a three-node tuple consisting of: a dict
184
    containing the environment that will be used for running the
185
    specific hook for this LU, a list of node names on which the hook
186
    should run before the execution, and a list of node names on which
187
    the hook should run after the execution.
188

189
    The keys of the dict must not have 'GANETI_' prefixed as this will
190
    be handled in the hooks runner. Also note additional keys will be
191
    added by the hooks runner. If the LU doesn't define any
192
    environment, an empty dict (and not None) should be returned.
193

194
    No nodes should be returned as an empty list (and not None).
195

196
    Note that if the HPATH for a LU class is None, this function will
197
    not be called.
198

199
    """
200
    raise NotImplementedError
201

    
202
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
203
    """Notify the LU about the results of its hooks.
204

205
    This method is called every time a hooks phase is executed, and notifies
206
    the Logical Unit about the hooks' result. The LU can then use it to alter
207
    its result based on the hooks.  By default the method does nothing and the
208
    previous result is passed back unchanged but any LU can define it if it
209
    wants to use the local cluster hook-scripts somehow.
210

211
    Args:
212
      phase: the hooks phase that has just been run
213
      hooks_results: the results of the multi-node hooks rpc call
214
      feedback_fn: function to send feedback back to the caller
215
      lu_result: the previous result this LU had, or None in the PRE phase.
216

217
    """
218
    return lu_result
219

    
220

    
221
class NoHooksLU(LogicalUnit):
222
  """Simple LU which runs no hooks.
223

224
  This LU is intended as a parent for other LogicalUnits which will
225
  run no hooks, in order to reduce duplicate code.
226

227
  """
228
  HPATH = None
229
  HTYPE = None
230

    
231

    
232
def _GetWantedNodes(lu, nodes):
233
  """Returns list of checked and expanded node names.
234

235
  Args:
236
    nodes: List of nodes (strings) or None for all
237

238
  """
239
  if not isinstance(nodes, list):
240
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
241

    
242
  if nodes:
243
    wanted = []
244

    
245
    for name in nodes:
246
      node = lu.cfg.ExpandNodeName(name)
247
      if node is None:
248
        raise errors.OpPrereqError("No such node name '%s'" % name)
249
      wanted.append(node)
250

    
251
  else:
252
    wanted = lu.cfg.GetNodeList()
253
  return utils.NiceSort(wanted)
254

    
255

    
256
def _GetWantedInstances(lu, instances):
257
  """Returns list of checked and expanded instance names.
258

259
  Args:
260
    instances: List of instances (strings) or None for all
261

262
  """
263
  if not isinstance(instances, list):
264
    raise errors.OpPrereqError("Invalid argument type 'instances'")
265

    
266
  if instances:
267
    wanted = []
268

    
269
    for name in instances:
270
      instance = lu.cfg.ExpandInstanceName(name)
271
      if instance is None:
272
        raise errors.OpPrereqError("No such instance name '%s'" % name)
273
      wanted.append(instance)
274

    
275
  else:
276
    wanted = lu.cfg.GetInstanceList()
277
  return utils.NiceSort(wanted)
278

    
279

    
280
def _CheckOutputFields(static, dynamic, selected):
281
  """Checks whether all selected fields are valid.
282

283
  Args:
284
    static: Static fields
285
    dynamic: Dynamic fields
286

287
  """
288
  static_fields = frozenset(static)
289
  dynamic_fields = frozenset(dynamic)
290

    
291
  all_fields = static_fields | dynamic_fields
292

    
293
  if not all_fields.issuperset(selected):
294
    raise errors.OpPrereqError("Unknown output fields selected: %s"
295
                               % ",".join(frozenset(selected).
296
                                          difference(all_fields)))
297

    
298

    
299
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
300
                          memory, vcpus, nics):
301
  """Builds instance related env variables for hooks from single variables.
302

303
  Args:
304
    secondary_nodes: List of secondary nodes as strings
305
  """
306
  env = {
307
    "OP_TARGET": name,
308
    "INSTANCE_NAME": name,
309
    "INSTANCE_PRIMARY": primary_node,
310
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
311
    "INSTANCE_OS_TYPE": os_type,
312
    "INSTANCE_STATUS": status,
313
    "INSTANCE_MEMORY": memory,
314
    "INSTANCE_VCPUS": vcpus,
315
  }
316

    
317
  if nics:
318
    nic_count = len(nics)
319
    for idx, (ip, bridge, mac) in enumerate(nics):
320
      if ip is None:
321
        ip = ""
322
      env["INSTANCE_NIC%d_IP" % idx] = ip
323
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
324
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
325
  else:
326
    nic_count = 0
327

    
328
  env["INSTANCE_NIC_COUNT"] = nic_count
329

    
330
  return env
331

    
332

    
333
def _BuildInstanceHookEnvByObject(instance, override=None):
334
  """Builds instance related env variables for hooks from an object.
335

336
  Args:
337
    instance: objects.Instance object of instance
338
    override: dict of values to override
339
  """
340
  args = {
341
    'name': instance.name,
342
    'primary_node': instance.primary_node,
343
    'secondary_nodes': instance.secondary_nodes,
344
    'os_type': instance.os,
345
    'status': instance.os,
346
    'memory': instance.memory,
347
    'vcpus': instance.vcpus,
348
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
349
  }
350
  if override:
351
    args.update(override)
352
  return _BuildInstanceHookEnv(**args)
353

    
354

    
355
def _CheckInstanceBridgesExist(instance):
356
  """Check that the brigdes needed by an instance exist.
357

358
  """
359
  # check bridges existance
360
  brlist = [nic.bridge for nic in instance.nics]
361
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
362
    raise errors.OpPrereqError("one or more target bridges %s does not"
363
                               " exist on destination node '%s'" %
364
                               (brlist, instance.primary_node))
365

    
366

    
367
class LUDestroyCluster(NoHooksLU):
368
  """Logical unit for destroying the cluster.
369

370
  """
371
  _OP_REQP = []
372

    
373
  def CheckPrereq(self):
374
    """Check prerequisites.
375

376
    This checks whether the cluster is empty.
377

378
    Any errors are signalled by raising errors.OpPrereqError.
379

380
    """
381
    master = self.sstore.GetMasterNode()
382

    
383
    nodelist = self.cfg.GetNodeList()
384
    if len(nodelist) != 1 or nodelist[0] != master:
385
      raise errors.OpPrereqError("There are still %d node(s) in"
386
                                 " this cluster." % (len(nodelist) - 1))
387
    instancelist = self.cfg.GetInstanceList()
388
    if instancelist:
389
      raise errors.OpPrereqError("There are still %d instance(s) in"
390
                                 " this cluster." % len(instancelist))
391

    
392
  def Exec(self, feedback_fn):
393
    """Destroys the cluster.
394

395
    """
396
    master = self.sstore.GetMasterNode()
397
    if not rpc.call_node_stop_master(master):
398
      raise errors.OpExecError("Could not disable the master role")
399
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
400
    utils.CreateBackup(priv_key)
401
    utils.CreateBackup(pub_key)
402
    rpc.call_node_leave_cluster(master)
403

    
404

    
405
class LUVerifyCluster(LogicalUnit):
406
  """Verifies the cluster status.
407

408
  """
409
  HPATH = "cluster-verify"
410
  HTYPE = constants.HTYPE_CLUSTER
411
  _OP_REQP = ["skip_checks"]
412

    
413
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
414
                  remote_version, feedback_fn):
415
    """Run multiple tests against a node.
416

417
    Test list:
418
      - compares ganeti version
419
      - checks vg existance and size > 20G
420
      - checks config file checksum
421
      - checks ssh to other nodes
422

423
    Args:
424
      node: name of the node to check
425
      file_list: required list of files
426
      local_cksum: dictionary of local files and their checksums
427

428
    """
429
    # compares ganeti version
430
    local_version = constants.PROTOCOL_VERSION
431
    if not remote_version:
432
      feedback_fn("  - ERROR: connection to %s failed" % (node))
433
      return True
434

    
435
    if local_version != remote_version:
436
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
437
                      (local_version, node, remote_version))
438
      return True
439

    
440
    # checks vg existance and size > 20G
441

    
442
    bad = False
443
    if not vglist:
444
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
445
                      (node,))
446
      bad = True
447
    else:
448
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
449
                                            constants.MIN_VG_SIZE)
450
      if vgstatus:
451
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
452
        bad = True
453

    
454
    # checks config file checksum
455
    # checks ssh to any
456

    
457
    if 'filelist' not in node_result:
458
      bad = True
459
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
460
    else:
461
      remote_cksum = node_result['filelist']
462
      for file_name in file_list:
463
        if file_name not in remote_cksum:
464
          bad = True
465
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
466
        elif remote_cksum[file_name] != local_cksum[file_name]:
467
          bad = True
468
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
469

    
470
    if 'nodelist' not in node_result:
471
      bad = True
472
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
473
    else:
474
      if node_result['nodelist']:
475
        bad = True
476
        for node in node_result['nodelist']:
477
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
478
                          (node, node_result['nodelist'][node]))
479
    if 'node-net-test' not in node_result:
480
      bad = True
481
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
482
    else:
483
      if node_result['node-net-test']:
484
        bad = True
485
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
486
        for node in nlist:
487
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
488
                          (node, node_result['node-net-test'][node]))
489

    
490
    hyp_result = node_result.get('hypervisor', None)
491
    if hyp_result is not None:
492
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
493
    return bad
494

    
495
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
496
                      node_instance, feedback_fn):
497
    """Verify an instance.
498

499
    This function checks to see if the required block devices are
500
    available on the instance's node.
501

502
    """
503
    bad = False
504

    
505
    node_current = instanceconfig.primary_node
506

    
507
    node_vol_should = {}
508
    instanceconfig.MapLVsByNode(node_vol_should)
509

    
510
    for node in node_vol_should:
511
      for volume in node_vol_should[node]:
512
        if node not in node_vol_is or volume not in node_vol_is[node]:
513
          feedback_fn("  - ERROR: volume %s missing on node %s" %
514
                          (volume, node))
515
          bad = True
516

    
517
    if not instanceconfig.status == 'down':
518
      if (node_current not in node_instance or
519
          not instance in node_instance[node_current]):
520
        feedback_fn("  - ERROR: instance %s not running on node %s" %
521
                        (instance, node_current))
522
        bad = True
523

    
524
    for node in node_instance:
525
      if (not node == node_current):
526
        if instance in node_instance[node]:
527
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
528
                          (instance, node))
529
          bad = True
530

    
531
    return bad
532

    
533
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
534
    """Verify if there are any unknown volumes in the cluster.
535

536
    The .os, .swap and backup volumes are ignored. All other volumes are
537
    reported as unknown.
538

539
    """
540
    bad = False
541

    
542
    for node in node_vol_is:
543
      for volume in node_vol_is[node]:
544
        if node not in node_vol_should or volume not in node_vol_should[node]:
545
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
546
                      (volume, node))
547
          bad = True
548
    return bad
549

    
550
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
551
    """Verify the list of running instances.
552

553
    This checks what instances are running but unknown to the cluster.
554

555
    """
556
    bad = False
557
    for node in node_instance:
558
      for runninginstance in node_instance[node]:
559
        if runninginstance not in instancelist:
560
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
561
                          (runninginstance, node))
562
          bad = True
563
    return bad
564

    
565
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
566
    """Verify N+1 Memory Resilience.
567

568
    Check that if one single node dies we can still start all the instances it
569
    was primary for.
570

571
    """
572
    bad = False
573

    
574
    for node, nodeinfo in node_info.iteritems():
575
      # This code checks that every node which is now listed as secondary has
576
      # enough memory to host all instances it is supposed to should a single
577
      # other node in the cluster fail.
578
      # FIXME: not ready for failover to an arbitrary node
579
      # FIXME: does not support file-backed instances
580
      # WARNING: we currently take into account down instances as well as up
581
      # ones, considering that even if they're down someone might want to start
582
      # them even in the event of a node failure.
583
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
584
        needed_mem = 0
585
        for instance in instances:
586
          needed_mem += instance_cfg[instance].memory
587
        if nodeinfo['mfree'] < needed_mem:
588
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
589
                      " failovers should node %s fail" % (node, prinode))
590
          bad = True
591
    return bad
592

    
593
  def CheckPrereq(self):
594
    """Check prerequisites.
595

596
    Transform the list of checks we're going to skip into a set and check that
597
    all its members are valid.
598

599
    """
600
    self.skip_set = frozenset(self.op.skip_checks)
601
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
602
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
603

    
604
  def BuildHooksEnv(self):
605
    """Build hooks env.
606

607
    Cluster-Verify hooks just rone in the post phase and their failure makes
608
    the output be logged in the verify output and the verification to fail.
609

610
    """
611
    all_nodes = self.cfg.GetNodeList()
612
    # TODO: populate the environment with useful information for verify hooks
613
    env = {}
614
    return env, [], all_nodes
615

    
616
  def Exec(self, feedback_fn):
617
    """Verify integrity of cluster, performing various test on nodes.
618

619
    """
620
    bad = False
621
    feedback_fn("* Verifying global settings")
622
    for msg in self.cfg.VerifyConfig():
623
      feedback_fn("  - ERROR: %s" % msg)
624

    
625
    vg_name = self.cfg.GetVGName()
626
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
627
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
628
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
629
    i_non_redundant = [] # Non redundant instances
630
    node_volume = {}
631
    node_instance = {}
632
    node_info = {}
633
    instance_cfg = {}
634

    
635
    # FIXME: verify OS list
636
    # do local checksums
637
    file_names = list(self.sstore.GetFileList())
638
    file_names.append(constants.SSL_CERT_FILE)
639
    file_names.append(constants.CLUSTER_CONF_FILE)
640
    local_checksums = utils.FingerprintFiles(file_names)
641

    
642
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
643
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
644
    all_instanceinfo = rpc.call_instance_list(nodelist)
645
    all_vglist = rpc.call_vg_list(nodelist)
646
    node_verify_param = {
647
      'filelist': file_names,
648
      'nodelist': nodelist,
649
      'hypervisor': None,
650
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
651
                        for node in nodeinfo]
652
      }
653
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
654
    all_rversion = rpc.call_version(nodelist)
655
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
656

    
657
    for node in nodelist:
658
      feedback_fn("* Verifying node %s" % node)
659
      result = self._VerifyNode(node, file_names, local_checksums,
660
                                all_vglist[node], all_nvinfo[node],
661
                                all_rversion[node], feedback_fn)
662
      bad = bad or result
663

    
664
      # node_volume
665
      volumeinfo = all_volumeinfo[node]
666

    
667
      if isinstance(volumeinfo, basestring):
668
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
669
                    (node, volumeinfo[-400:].encode('string_escape')))
670
        bad = True
671
        node_volume[node] = {}
672
      elif not isinstance(volumeinfo, dict):
673
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
674
        bad = True
675
        continue
676
      else:
677
        node_volume[node] = volumeinfo
678

    
679
      # node_instance
680
      nodeinstance = all_instanceinfo[node]
681
      if type(nodeinstance) != list:
682
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
683
        bad = True
684
        continue
685

    
686
      node_instance[node] = nodeinstance
687

    
688
      # node_info
689
      nodeinfo = all_ninfo[node]
690
      if not isinstance(nodeinfo, dict):
691
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
692
        bad = True
693
        continue
694

    
695
      try:
696
        node_info[node] = {
697
          "mfree": int(nodeinfo['memory_free']),
698
          "dfree": int(nodeinfo['vg_free']),
699
          "pinst": [],
700
          "sinst": [],
701
          # dictionary holding all instances this node is secondary for,
702
          # grouped by their primary node. Each key is a cluster node, and each
703
          # value is a list of instances which have the key as primary and the
704
          # current node as secondary.  this is handy to calculate N+1 memory
705
          # availability if you can only failover from a primary to its
706
          # secondary.
707
          "sinst-by-pnode": {},
708
        }
709
      except ValueError:
710
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
711
        bad = True
712
        continue
713

    
714
    node_vol_should = {}
715

    
716
    for instance in instancelist:
717
      feedback_fn("* Verifying instance %s" % instance)
718
      inst_config = self.cfg.GetInstanceInfo(instance)
719
      result =  self._VerifyInstance(instance, inst_config, node_volume,
720
                                     node_instance, feedback_fn)
721
      bad = bad or result
722

    
723
      inst_config.MapLVsByNode(node_vol_should)
724

    
725
      instance_cfg[instance] = inst_config
726

    
727
      pnode = inst_config.primary_node
728
      if pnode in node_info:
729
        node_info[pnode]['pinst'].append(instance)
730
      else:
731
        feedback_fn("  - ERROR: instance %s, connection to primary node"
732
                    " %s failed" % (instance, pnode))
733
        bad = True
734

    
735
      # If the instance is non-redundant we cannot survive losing its primary
736
      # node, so we are not N+1 compliant. On the other hand we have no disk
737
      # templates with more than one secondary so that situation is not well
738
      # supported either.
739
      # FIXME: does not support file-backed instances
740
      if len(inst_config.secondary_nodes) == 0:
741
        i_non_redundant.append(instance)
742
      elif len(inst_config.secondary_nodes) > 1:
743
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
744
                    % instance)
745

    
746
      for snode in inst_config.secondary_nodes:
747
        if snode in node_info:
748
          node_info[snode]['sinst'].append(instance)
749
          if pnode not in node_info[snode]['sinst-by-pnode']:
750
            node_info[snode]['sinst-by-pnode'][pnode] = []
751
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
752
        else:
753
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
754
                      " %s failed" % (instance, snode))
755

    
756
    feedback_fn("* Verifying orphan volumes")
757
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
758
                                       feedback_fn)
759
    bad = bad or result
760

    
761
    feedback_fn("* Verifying remaining instances")
762
    result = self._VerifyOrphanInstances(instancelist, node_instance,
763
                                         feedback_fn)
764
    bad = bad or result
765

    
766
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
767
      feedback_fn("* Verifying N+1 Memory redundancy")
768
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
769
      bad = bad or result
770

    
771
    feedback_fn("* Other Notes")
772
    if i_non_redundant:
773
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
774
                  % len(i_non_redundant))
775

    
776
    return int(bad)
777

    
778
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
779
    """Analize the post-hooks' result, handle it, and send some
780
    nicely-formatted feedback back to the user.
781

782
    Args:
783
      phase: the hooks phase that has just been run
784
      hooks_results: the results of the multi-node hooks rpc call
785
      feedback_fn: function to send feedback back to the caller
786
      lu_result: previous Exec result
787

788
    """
789
    # We only really run POST phase hooks, and are only interested in their results
790
    if phase == constants.HOOKS_PHASE_POST:
791
      # Used to change hooks' output to proper indentation
792
      indent_re = re.compile('^', re.M)
793
      feedback_fn("* Hooks Results")
794
      if not hooks_results:
795
        feedback_fn("  - ERROR: general communication failure")
796
        lu_result = 1
797
      else:
798
        for node_name in hooks_results:
799
          show_node_header = True
800
          res = hooks_results[node_name]
801
          if res is False or not isinstance(res, list):
802
            feedback_fn("    Communication failure")
803
            lu_result = 1
804
            continue
805
          for script, hkr, output in res:
806
            if hkr == constants.HKR_FAIL:
807
              # The node header is only shown once, if there are
808
              # failing hooks on that node
809
              if show_node_header:
810
                feedback_fn("  Node %s:" % node_name)
811
                show_node_header = False
812
              feedback_fn("    ERROR: Script %s failed, output:" % script)
813
              output = indent_re.sub('      ', output)
814
              feedback_fn("%s" % output)
815
              lu_result = 1
816

    
817
      return lu_result
818

    
819

    
820
class LUVerifyDisks(NoHooksLU):
821
  """Verifies the cluster disks status.
822

823
  """
824
  _OP_REQP = []
825

    
826
  def CheckPrereq(self):
827
    """Check prerequisites.
828

829
    This has no prerequisites.
830

831
    """
832
    pass
833

    
834
  def Exec(self, feedback_fn):
835
    """Verify integrity of cluster disks.
836

837
    """
838
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
839

    
840
    vg_name = self.cfg.GetVGName()
841
    nodes = utils.NiceSort(self.cfg.GetNodeList())
842
    instances = [self.cfg.GetInstanceInfo(name)
843
                 for name in self.cfg.GetInstanceList()]
844

    
845
    nv_dict = {}
846
    for inst in instances:
847
      inst_lvs = {}
848
      if (inst.status != "up" or
849
          inst.disk_template not in constants.DTS_NET_MIRROR):
850
        continue
851
      inst.MapLVsByNode(inst_lvs)
852
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
853
      for node, vol_list in inst_lvs.iteritems():
854
        for vol in vol_list:
855
          nv_dict[(node, vol)] = inst
856

    
857
    if not nv_dict:
858
      return result
859

    
860
    node_lvs = rpc.call_volume_list(nodes, vg_name)
861

    
862
    to_act = set()
863
    for node in nodes:
864
      # node_volume
865
      lvs = node_lvs[node]
866

    
867
      if isinstance(lvs, basestring):
868
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
869
        res_nlvm[node] = lvs
870
      elif not isinstance(lvs, dict):
871
        logger.Info("connection to node %s failed or invalid data returned" %
872
                    (node,))
873
        res_nodes.append(node)
874
        continue
875

    
876
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
877
        inst = nv_dict.pop((node, lv_name), None)
878
        if (not lv_online and inst is not None
879
            and inst.name not in res_instances):
880
          res_instances.append(inst.name)
881

    
882
    # any leftover items in nv_dict are missing LVs, let's arrange the
883
    # data better
884
    for key, inst in nv_dict.iteritems():
885
      if inst.name not in res_missing:
886
        res_missing[inst.name] = []
887
      res_missing[inst.name].append(key)
888

    
889
    return result
890

    
891

    
892
class LURenameCluster(LogicalUnit):
893
  """Rename the cluster.
894

895
  """
896
  HPATH = "cluster-rename"
897
  HTYPE = constants.HTYPE_CLUSTER
898
  _OP_REQP = ["name"]
899
  REQ_WSSTORE = True
900

    
901
  def BuildHooksEnv(self):
902
    """Build hooks env.
903

904
    """
905
    env = {
906
      "OP_TARGET": self.sstore.GetClusterName(),
907
      "NEW_NAME": self.op.name,
908
      }
909
    mn = self.sstore.GetMasterNode()
910
    return env, [mn], [mn]
911

    
912
  def CheckPrereq(self):
913
    """Verify that the passed name is a valid one.
914

915
    """
916
    hostname = utils.HostInfo(self.op.name)
917

    
918
    new_name = hostname.name
919
    self.ip = new_ip = hostname.ip
920
    old_name = self.sstore.GetClusterName()
921
    old_ip = self.sstore.GetMasterIP()
922
    if new_name == old_name and new_ip == old_ip:
923
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
924
                                 " cluster has changed")
925
    if new_ip != old_ip:
926
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
927
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
928
                                   " reachable on the network. Aborting." %
929
                                   new_ip)
930

    
931
    self.op.name = new_name
932

    
933
  def Exec(self, feedback_fn):
934
    """Rename the cluster.
935

936
    """
937
    clustername = self.op.name
938
    ip = self.ip
939
    ss = self.sstore
940

    
941
    # shutdown the master IP
942
    master = ss.GetMasterNode()
943
    if not rpc.call_node_stop_master(master):
944
      raise errors.OpExecError("Could not disable the master role")
945

    
946
    try:
947
      # modify the sstore
948
      ss.SetKey(ss.SS_MASTER_IP, ip)
949
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
950

    
951
      # Distribute updated ss config to all nodes
952
      myself = self.cfg.GetNodeInfo(master)
953
      dist_nodes = self.cfg.GetNodeList()
954
      if myself.name in dist_nodes:
955
        dist_nodes.remove(myself.name)
956

    
957
      logger.Debug("Copying updated ssconf data to all nodes")
958
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
959
        fname = ss.KeyToFilename(keyname)
960
        result = rpc.call_upload_file(dist_nodes, fname)
961
        for to_node in dist_nodes:
962
          if not result[to_node]:
963
            logger.Error("copy of file %s to node %s failed" %
964
                         (fname, to_node))
965
    finally:
966
      if not rpc.call_node_start_master(master):
967
        logger.Error("Could not re-enable the master role on the master,"
968
                     " please restart manually.")
969

    
970

    
971
def _RecursiveCheckIfLVMBased(disk):
972
  """Check if the given disk or its children are lvm-based.
973

974
  Args:
975
    disk: ganeti.objects.Disk object
976

977
  Returns:
978
    boolean indicating whether a LD_LV dev_type was found or not
979

980
  """
981
  if disk.children:
982
    for chdisk in disk.children:
983
      if _RecursiveCheckIfLVMBased(chdisk):
984
        return True
985
  return disk.dev_type == constants.LD_LV
986

    
987

    
988
class LUSetClusterParams(LogicalUnit):
989
  """Change the parameters of the cluster.
990

991
  """
992
  HPATH = "cluster-modify"
993
  HTYPE = constants.HTYPE_CLUSTER
994
  _OP_REQP = []
995

    
996
  def BuildHooksEnv(self):
997
    """Build hooks env.
998

999
    """
1000
    env = {
1001
      "OP_TARGET": self.sstore.GetClusterName(),
1002
      "NEW_VG_NAME": self.op.vg_name,
1003
      }
1004
    mn = self.sstore.GetMasterNode()
1005
    return env, [mn], [mn]
1006

    
1007
  def CheckPrereq(self):
1008
    """Check prerequisites.
1009

1010
    This checks whether the given params don't conflict and
1011
    if the given volume group is valid.
1012

1013
    """
1014
    if not self.op.vg_name:
1015
      instances = [self.cfg.GetInstanceInfo(name)
1016
                   for name in self.cfg.GetInstanceList()]
1017
      for inst in instances:
1018
        for disk in inst.disks:
1019
          if _RecursiveCheckIfLVMBased(disk):
1020
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1021
                                       " lvm-based instances exist")
1022

    
1023
    # if vg_name not None, checks given volume group on all nodes
1024
    if self.op.vg_name:
1025
      node_list = self.cfg.GetNodeList()
1026
      vglist = rpc.call_vg_list(node_list)
1027
      for node in node_list:
1028
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1029
                                              constants.MIN_VG_SIZE)
1030
        if vgstatus:
1031
          raise errors.OpPrereqError("Error on node '%s': %s" %
1032
                                     (node, vgstatus))
1033

    
1034
  def Exec(self, feedback_fn):
1035
    """Change the parameters of the cluster.
1036

1037
    """
1038
    if self.op.vg_name != self.cfg.GetVGName():
1039
      self.cfg.SetVGName(self.op.vg_name)
1040
    else:
1041
      feedback_fn("Cluster LVM configuration already in desired"
1042
                  " state, not changing")
1043

    
1044

    
1045
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1046
  """Sleep and poll for an instance's disk to sync.
1047

1048
  """
1049
  if not instance.disks:
1050
    return True
1051

    
1052
  if not oneshot:
1053
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1054

    
1055
  node = instance.primary_node
1056

    
1057
  for dev in instance.disks:
1058
    cfgw.SetDiskID(dev, node)
1059

    
1060
  retries = 0
1061
  while True:
1062
    max_time = 0
1063
    done = True
1064
    cumul_degraded = False
1065
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1066
    if not rstats:
1067
      proc.LogWarning("Can't get any data from node %s" % node)
1068
      retries += 1
1069
      if retries >= 10:
1070
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1071
                                 " aborting." % node)
1072
      time.sleep(6)
1073
      continue
1074
    retries = 0
1075
    for i in range(len(rstats)):
1076
      mstat = rstats[i]
1077
      if mstat is None:
1078
        proc.LogWarning("Can't compute data for node %s/%s" %
1079
                        (node, instance.disks[i].iv_name))
1080
        continue
1081
      # we ignore the ldisk parameter
1082
      perc_done, est_time, is_degraded, _ = mstat
1083
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1084
      if perc_done is not None:
1085
        done = False
1086
        if est_time is not None:
1087
          rem_time = "%d estimated seconds remaining" % est_time
1088
          max_time = est_time
1089
        else:
1090
          rem_time = "no time estimate"
1091
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1092
                     (instance.disks[i].iv_name, perc_done, rem_time))
1093
    if done or oneshot:
1094
      break
1095

    
1096
    time.sleep(min(60, max_time))
1097

    
1098
  if done:
1099
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1100
  return not cumul_degraded
1101

    
1102

    
1103
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1104
  """Check that mirrors are not degraded.
1105

1106
  The ldisk parameter, if True, will change the test from the
1107
  is_degraded attribute (which represents overall non-ok status for
1108
  the device(s)) to the ldisk (representing the local storage status).
1109

1110
  """
1111
  cfgw.SetDiskID(dev, node)
1112
  if ldisk:
1113
    idx = 6
1114
  else:
1115
    idx = 5
1116

    
1117
  result = True
1118
  if on_primary or dev.AssembleOnSecondary():
1119
    rstats = rpc.call_blockdev_find(node, dev)
1120
    if not rstats:
1121
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1122
      result = False
1123
    else:
1124
      result = result and (not rstats[idx])
1125
  if dev.children:
1126
    for child in dev.children:
1127
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1128

    
1129
  return result
1130

    
1131

    
1132
class LUDiagnoseOS(NoHooksLU):
1133
  """Logical unit for OS diagnose/query.
1134

1135
  """
1136
  _OP_REQP = ["output_fields", "names"]
1137

    
1138
  def CheckPrereq(self):
1139
    """Check prerequisites.
1140

1141
    This always succeeds, since this is a pure query LU.
1142

1143
    """
1144
    if self.op.names:
1145
      raise errors.OpPrereqError("Selective OS query not supported")
1146

    
1147
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1148
    _CheckOutputFields(static=[],
1149
                       dynamic=self.dynamic_fields,
1150
                       selected=self.op.output_fields)
1151

    
1152
  @staticmethod
1153
  def _DiagnoseByOS(node_list, rlist):
1154
    """Remaps a per-node return list into an a per-os per-node dictionary
1155

1156
      Args:
1157
        node_list: a list with the names of all nodes
1158
        rlist: a map with node names as keys and OS objects as values
1159

1160
      Returns:
1161
        map: a map with osnames as keys and as value another map, with
1162
             nodes as
1163
             keys and list of OS objects as values
1164
             e.g. {"debian-etch": {"node1": [<object>,...],
1165
                                   "node2": [<object>,]}
1166
                  }
1167

1168
    """
1169
    all_os = {}
1170
    for node_name, nr in rlist.iteritems():
1171
      if not nr:
1172
        continue
1173
      for os_obj in nr:
1174
        if os_obj.name not in all_os:
1175
          # build a list of nodes for this os containing empty lists
1176
          # for each node in node_list
1177
          all_os[os_obj.name] = {}
1178
          for nname in node_list:
1179
            all_os[os_obj.name][nname] = []
1180
        all_os[os_obj.name][node_name].append(os_obj)
1181
    return all_os
1182

    
1183
  def Exec(self, feedback_fn):
1184
    """Compute the list of OSes.
1185

1186
    """
1187
    node_list = self.cfg.GetNodeList()
1188
    node_data = rpc.call_os_diagnose(node_list)
1189
    if node_data == False:
1190
      raise errors.OpExecError("Can't gather the list of OSes")
1191
    pol = self._DiagnoseByOS(node_list, node_data)
1192
    output = []
1193
    for os_name, os_data in pol.iteritems():
1194
      row = []
1195
      for field in self.op.output_fields:
1196
        if field == "name":
1197
          val = os_name
1198
        elif field == "valid":
1199
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1200
        elif field == "node_status":
1201
          val = {}
1202
          for node_name, nos_list in os_data.iteritems():
1203
            val[node_name] = [(v.status, v.path) for v in nos_list]
1204
        else:
1205
          raise errors.ParameterError(field)
1206
        row.append(val)
1207
      output.append(row)
1208

    
1209
    return output
1210

    
1211

    
1212
class LURemoveNode(LogicalUnit):
1213
  """Logical unit for removing a node.
1214

1215
  """
1216
  HPATH = "node-remove"
1217
  HTYPE = constants.HTYPE_NODE
1218
  _OP_REQP = ["node_name"]
1219

    
1220
  def BuildHooksEnv(self):
1221
    """Build hooks env.
1222

1223
    This doesn't run on the target node in the pre phase as a failed
1224
    node would then be impossible to remove.
1225

1226
    """
1227
    env = {
1228
      "OP_TARGET": self.op.node_name,
1229
      "NODE_NAME": self.op.node_name,
1230
      }
1231
    all_nodes = self.cfg.GetNodeList()
1232
    all_nodes.remove(self.op.node_name)
1233
    return env, all_nodes, all_nodes
1234

    
1235
  def CheckPrereq(self):
1236
    """Check prerequisites.
1237

1238
    This checks:
1239
     - the node exists in the configuration
1240
     - it does not have primary or secondary instances
1241
     - it's not the master
1242

1243
    Any errors are signalled by raising errors.OpPrereqError.
1244

1245
    """
1246
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1247
    if node is None:
1248
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1249

    
1250
    instance_list = self.cfg.GetInstanceList()
1251

    
1252
    masternode = self.sstore.GetMasterNode()
1253
    if node.name == masternode:
1254
      raise errors.OpPrereqError("Node is the master node,"
1255
                                 " you need to failover first.")
1256

    
1257
    for instance_name in instance_list:
1258
      instance = self.cfg.GetInstanceInfo(instance_name)
1259
      if node.name == instance.primary_node:
1260
        raise errors.OpPrereqError("Instance %s still running on the node,"
1261
                                   " please remove first." % instance_name)
1262
      if node.name in instance.secondary_nodes:
1263
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1264
                                   " please remove first." % instance_name)
1265
    self.op.node_name = node.name
1266
    self.node = node
1267

    
1268
  def Exec(self, feedback_fn):
1269
    """Removes the node from the cluster.
1270

1271
    """
1272
    node = self.node
1273
    logger.Info("stopping the node daemon and removing configs from node %s" %
1274
                node.name)
1275

    
1276
    rpc.call_node_leave_cluster(node.name)
1277

    
1278
    logger.Info("Removing node %s from config" % node.name)
1279

    
1280
    self.cfg.RemoveNode(node.name)
1281
    # Remove the node from the Ganeti Lock Manager
1282
    self.context.glm.remove(locking.LEVEL_NODE, node.name)
1283

    
1284
    utils.RemoveHostFromEtcHosts(node.name)
1285

    
1286

    
1287
class LUQueryNodes(NoHooksLU):
1288
  """Logical unit for querying nodes.
1289

1290
  """
1291
  _OP_REQP = ["output_fields", "names"]
1292

    
1293
  def CheckPrereq(self):
1294
    """Check prerequisites.
1295

1296
    This checks that the fields required are valid output fields.
1297

1298
    """
1299
    self.dynamic_fields = frozenset([
1300
      "dtotal", "dfree",
1301
      "mtotal", "mnode", "mfree",
1302
      "bootid",
1303
      "ctotal",
1304
      ])
1305

    
1306
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1307
                               "pinst_list", "sinst_list",
1308
                               "pip", "sip", "tags"],
1309
                       dynamic=self.dynamic_fields,
1310
                       selected=self.op.output_fields)
1311

    
1312
    self.wanted = _GetWantedNodes(self, self.op.names)
1313

    
1314
  def Exec(self, feedback_fn):
1315
    """Computes the list of nodes and their attributes.
1316

1317
    """
1318
    nodenames = self.wanted
1319
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1320

    
1321
    # begin data gathering
1322

    
1323
    if self.dynamic_fields.intersection(self.op.output_fields):
1324
      live_data = {}
1325
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1326
      for name in nodenames:
1327
        nodeinfo = node_data.get(name, None)
1328
        if nodeinfo:
1329
          live_data[name] = {
1330
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1331
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1332
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1333
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1334
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1335
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1336
            "bootid": nodeinfo['bootid'],
1337
            }
1338
        else:
1339
          live_data[name] = {}
1340
    else:
1341
      live_data = dict.fromkeys(nodenames, {})
1342

    
1343
    node_to_primary = dict([(name, set()) for name in nodenames])
1344
    node_to_secondary = dict([(name, set()) for name in nodenames])
1345

    
1346
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1347
                             "sinst_cnt", "sinst_list"))
1348
    if inst_fields & frozenset(self.op.output_fields):
1349
      instancelist = self.cfg.GetInstanceList()
1350

    
1351
      for instance_name in instancelist:
1352
        inst = self.cfg.GetInstanceInfo(instance_name)
1353
        if inst.primary_node in node_to_primary:
1354
          node_to_primary[inst.primary_node].add(inst.name)
1355
        for secnode in inst.secondary_nodes:
1356
          if secnode in node_to_secondary:
1357
            node_to_secondary[secnode].add(inst.name)
1358

    
1359
    # end data gathering
1360

    
1361
    output = []
1362
    for node in nodelist:
1363
      node_output = []
1364
      for field in self.op.output_fields:
1365
        if field == "name":
1366
          val = node.name
1367
        elif field == "pinst_list":
1368
          val = list(node_to_primary[node.name])
1369
        elif field == "sinst_list":
1370
          val = list(node_to_secondary[node.name])
1371
        elif field == "pinst_cnt":
1372
          val = len(node_to_primary[node.name])
1373
        elif field == "sinst_cnt":
1374
          val = len(node_to_secondary[node.name])
1375
        elif field == "pip":
1376
          val = node.primary_ip
1377
        elif field == "sip":
1378
          val = node.secondary_ip
1379
        elif field == "tags":
1380
          val = list(node.GetTags())
1381
        elif field in self.dynamic_fields:
1382
          val = live_data[node.name].get(field, None)
1383
        else:
1384
          raise errors.ParameterError(field)
1385
        node_output.append(val)
1386
      output.append(node_output)
1387

    
1388
    return output
1389

    
1390

    
1391
class LUQueryNodeVolumes(NoHooksLU):
1392
  """Logical unit for getting volumes on node(s).
1393

1394
  """
1395
  _OP_REQP = ["nodes", "output_fields"]
1396

    
1397
  def CheckPrereq(self):
1398
    """Check prerequisites.
1399

1400
    This checks that the fields required are valid output fields.
1401

1402
    """
1403
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1404

    
1405
    _CheckOutputFields(static=["node"],
1406
                       dynamic=["phys", "vg", "name", "size", "instance"],
1407
                       selected=self.op.output_fields)
1408

    
1409

    
1410
  def Exec(self, feedback_fn):
1411
    """Computes the list of nodes and their attributes.
1412

1413
    """
1414
    nodenames = self.nodes
1415
    volumes = rpc.call_node_volumes(nodenames)
1416

    
1417
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1418
             in self.cfg.GetInstanceList()]
1419

    
1420
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1421

    
1422
    output = []
1423
    for node in nodenames:
1424
      if node not in volumes or not volumes[node]:
1425
        continue
1426

    
1427
      node_vols = volumes[node][:]
1428
      node_vols.sort(key=lambda vol: vol['dev'])
1429

    
1430
      for vol in node_vols:
1431
        node_output = []
1432
        for field in self.op.output_fields:
1433
          if field == "node":
1434
            val = node
1435
          elif field == "phys":
1436
            val = vol['dev']
1437
          elif field == "vg":
1438
            val = vol['vg']
1439
          elif field == "name":
1440
            val = vol['name']
1441
          elif field == "size":
1442
            val = int(float(vol['size']))
1443
          elif field == "instance":
1444
            for inst in ilist:
1445
              if node not in lv_by_node[inst]:
1446
                continue
1447
              if vol['name'] in lv_by_node[inst][node]:
1448
                val = inst.name
1449
                break
1450
            else:
1451
              val = '-'
1452
          else:
1453
            raise errors.ParameterError(field)
1454
          node_output.append(str(val))
1455

    
1456
        output.append(node_output)
1457

    
1458
    return output
1459

    
1460

    
1461
class LUAddNode(LogicalUnit):
1462
  """Logical unit for adding node to the cluster.
1463

1464
  """
1465
  HPATH = "node-add"
1466
  HTYPE = constants.HTYPE_NODE
1467
  _OP_REQP = ["node_name"]
1468

    
1469
  def BuildHooksEnv(self):
1470
    """Build hooks env.
1471

1472
    This will run on all nodes before, and on all nodes + the new node after.
1473

1474
    """
1475
    env = {
1476
      "OP_TARGET": self.op.node_name,
1477
      "NODE_NAME": self.op.node_name,
1478
      "NODE_PIP": self.op.primary_ip,
1479
      "NODE_SIP": self.op.secondary_ip,
1480
      }
1481
    nodes_0 = self.cfg.GetNodeList()
1482
    nodes_1 = nodes_0 + [self.op.node_name, ]
1483
    return env, nodes_0, nodes_1
1484

    
1485
  def CheckPrereq(self):
1486
    """Check prerequisites.
1487

1488
    This checks:
1489
     - the new node is not already in the config
1490
     - it is resolvable
1491
     - its parameters (single/dual homed) matches the cluster
1492

1493
    Any errors are signalled by raising errors.OpPrereqError.
1494

1495
    """
1496
    node_name = self.op.node_name
1497
    cfg = self.cfg
1498

    
1499
    dns_data = utils.HostInfo(node_name)
1500

    
1501
    node = dns_data.name
1502
    primary_ip = self.op.primary_ip = dns_data.ip
1503
    secondary_ip = getattr(self.op, "secondary_ip", None)
1504
    if secondary_ip is None:
1505
      secondary_ip = primary_ip
1506
    if not utils.IsValidIP(secondary_ip):
1507
      raise errors.OpPrereqError("Invalid secondary IP given")
1508
    self.op.secondary_ip = secondary_ip
1509

    
1510
    node_list = cfg.GetNodeList()
1511
    if not self.op.readd and node in node_list:
1512
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1513
                                 node)
1514
    elif self.op.readd and node not in node_list:
1515
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1516

    
1517
    for existing_node_name in node_list:
1518
      existing_node = cfg.GetNodeInfo(existing_node_name)
1519

    
1520
      if self.op.readd and node == existing_node_name:
1521
        if (existing_node.primary_ip != primary_ip or
1522
            existing_node.secondary_ip != secondary_ip):
1523
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1524
                                     " address configuration as before")
1525
        continue
1526

    
1527
      if (existing_node.primary_ip == primary_ip or
1528
          existing_node.secondary_ip == primary_ip or
1529
          existing_node.primary_ip == secondary_ip or
1530
          existing_node.secondary_ip == secondary_ip):
1531
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1532
                                   " existing node %s" % existing_node.name)
1533

    
1534
    # check that the type of the node (single versus dual homed) is the
1535
    # same as for the master
1536
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1537
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1538
    newbie_singlehomed = secondary_ip == primary_ip
1539
    if master_singlehomed != newbie_singlehomed:
1540
      if master_singlehomed:
1541
        raise errors.OpPrereqError("The master has no private ip but the"
1542
                                   " new node has one")
1543
      else:
1544
        raise errors.OpPrereqError("The master has a private ip but the"
1545
                                   " new node doesn't have one")
1546

    
1547
    # checks reachablity
1548
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1549
      raise errors.OpPrereqError("Node not reachable by ping")
1550

    
1551
    if not newbie_singlehomed:
1552
      # check reachability from my secondary ip to newbie's secondary ip
1553
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1554
                           source=myself.secondary_ip):
1555
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1556
                                   " based ping to noded port")
1557

    
1558
    self.new_node = objects.Node(name=node,
1559
                                 primary_ip=primary_ip,
1560
                                 secondary_ip=secondary_ip)
1561

    
1562
  def Exec(self, feedback_fn):
1563
    """Adds the new node to the cluster.
1564

1565
    """
1566
    new_node = self.new_node
1567
    node = new_node.name
1568

    
1569
    # check connectivity
1570
    result = rpc.call_version([node])[node]
1571
    if result:
1572
      if constants.PROTOCOL_VERSION == result:
1573
        logger.Info("communication to node %s fine, sw version %s match" %
1574
                    (node, result))
1575
      else:
1576
        raise errors.OpExecError("Version mismatch master version %s,"
1577
                                 " node version %s" %
1578
                                 (constants.PROTOCOL_VERSION, result))
1579
    else:
1580
      raise errors.OpExecError("Cannot get version from the new node")
1581

    
1582
    # setup ssh on node
1583
    logger.Info("copy ssh key to node %s" % node)
1584
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1585
    keyarray = []
1586
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1587
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1588
                priv_key, pub_key]
1589

    
1590
    for i in keyfiles:
1591
      f = open(i, 'r')
1592
      try:
1593
        keyarray.append(f.read())
1594
      finally:
1595
        f.close()
1596

    
1597
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1598
                               keyarray[3], keyarray[4], keyarray[5])
1599

    
1600
    if not result:
1601
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1602

    
1603
    # Add node to our /etc/hosts, and add key to known_hosts
1604
    utils.AddHostToEtcHosts(new_node.name)
1605

    
1606
    if new_node.secondary_ip != new_node.primary_ip:
1607
      if not rpc.call_node_tcp_ping(new_node.name,
1608
                                    constants.LOCALHOST_IP_ADDRESS,
1609
                                    new_node.secondary_ip,
1610
                                    constants.DEFAULT_NODED_PORT,
1611
                                    10, False):
1612
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1613
                                 " you gave (%s). Please fix and re-run this"
1614
                                 " command." % new_node.secondary_ip)
1615

    
1616
    node_verify_list = [self.sstore.GetMasterNode()]
1617
    node_verify_param = {
1618
      'nodelist': [node],
1619
      # TODO: do a node-net-test as well?
1620
    }
1621

    
1622
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1623
    for verifier in node_verify_list:
1624
      if not result[verifier]:
1625
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1626
                                 " for remote verification" % verifier)
1627
      if result[verifier]['nodelist']:
1628
        for failed in result[verifier]['nodelist']:
1629
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1630
                      (verifier, result[verifier]['nodelist'][failed]))
1631
        raise errors.OpExecError("ssh/hostname verification failed.")
1632

    
1633
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1634
    # including the node just added
1635
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1636
    dist_nodes = self.cfg.GetNodeList()
1637
    if not self.op.readd:
1638
      dist_nodes.append(node)
1639
    if myself.name in dist_nodes:
1640
      dist_nodes.remove(myself.name)
1641

    
1642
    logger.Debug("Copying hosts and known_hosts to all nodes")
1643
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1644
      result = rpc.call_upload_file(dist_nodes, fname)
1645
      for to_node in dist_nodes:
1646
        if not result[to_node]:
1647
          logger.Error("copy of file %s to node %s failed" %
1648
                       (fname, to_node))
1649

    
1650
    to_copy = self.sstore.GetFileList()
1651
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1652
      to_copy.append(constants.VNC_PASSWORD_FILE)
1653
    for fname in to_copy:
1654
      result = rpc.call_upload_file([node], fname)
1655
      if not result[node]:
1656
        logger.Error("could not copy file %s to node %s" % (fname, node))
1657

    
1658
    if not self.op.readd:
1659
      logger.Info("adding node %s to cluster.conf" % node)
1660
      self.cfg.AddNode(new_node)
1661
      # Add the new node to the Ganeti Lock Manager
1662
      self.context.glm.add(locking.LEVEL_NODE, node)
1663

    
1664

    
1665
class LUMasterFailover(LogicalUnit):
1666
  """Failover the master node to the current node.
1667

1668
  This is a special LU in that it must run on a non-master node.
1669

1670
  """
1671
  HPATH = "master-failover"
1672
  HTYPE = constants.HTYPE_CLUSTER
1673
  REQ_MASTER = False
1674
  REQ_WSSTORE = True
1675
  _OP_REQP = []
1676

    
1677
  def BuildHooksEnv(self):
1678
    """Build hooks env.
1679

1680
    This will run on the new master only in the pre phase, and on all
1681
    the nodes in the post phase.
1682

1683
    """
1684
    env = {
1685
      "OP_TARGET": self.new_master,
1686
      "NEW_MASTER": self.new_master,
1687
      "OLD_MASTER": self.old_master,
1688
      }
1689
    return env, [self.new_master], self.cfg.GetNodeList()
1690

    
1691
  def CheckPrereq(self):
1692
    """Check prerequisites.
1693

1694
    This checks that we are not already the master.
1695

1696
    """
1697
    self.new_master = utils.HostInfo().name
1698
    self.old_master = self.sstore.GetMasterNode()
1699

    
1700
    if self.old_master == self.new_master:
1701
      raise errors.OpPrereqError("This commands must be run on the node"
1702
                                 " where you want the new master to be."
1703
                                 " %s is already the master" %
1704
                                 self.old_master)
1705

    
1706
  def Exec(self, feedback_fn):
1707
    """Failover the master node.
1708

1709
    This command, when run on a non-master node, will cause the current
1710
    master to cease being master, and the non-master to become new
1711
    master.
1712

1713
    """
1714
    #TODO: do not rely on gethostname returning the FQDN
1715
    logger.Info("setting master to %s, old master: %s" %
1716
                (self.new_master, self.old_master))
1717

    
1718
    if not rpc.call_node_stop_master(self.old_master):
1719
      logger.Error("could disable the master role on the old master"
1720
                   " %s, please disable manually" % self.old_master)
1721

    
1722
    ss = self.sstore
1723
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1724
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1725
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1726
      logger.Error("could not distribute the new simple store master file"
1727
                   " to the other nodes, please check.")
1728

    
1729
    if not rpc.call_node_start_master(self.new_master):
1730
      logger.Error("could not start the master role on the new master"
1731
                   " %s, please check" % self.new_master)
1732
      feedback_fn("Error in activating the master IP on the new master,"
1733
                  " please fix manually.")
1734

    
1735

    
1736

    
1737
class LUQueryClusterInfo(NoHooksLU):
1738
  """Query cluster configuration.
1739

1740
  """
1741
  _OP_REQP = []
1742
  REQ_MASTER = False
1743

    
1744
  def CheckPrereq(self):
1745
    """No prerequsites needed for this LU.
1746

1747
    """
1748
    pass
1749

    
1750
  def Exec(self, feedback_fn):
1751
    """Return cluster config.
1752

1753
    """
1754
    result = {
1755
      "name": self.sstore.GetClusterName(),
1756
      "software_version": constants.RELEASE_VERSION,
1757
      "protocol_version": constants.PROTOCOL_VERSION,
1758
      "config_version": constants.CONFIG_VERSION,
1759
      "os_api_version": constants.OS_API_VERSION,
1760
      "export_version": constants.EXPORT_VERSION,
1761
      "master": self.sstore.GetMasterNode(),
1762
      "architecture": (platform.architecture()[0], platform.machine()),
1763
      "hypervisor_type": self.sstore.GetHypervisorType(),
1764
      }
1765

    
1766
    return result
1767

    
1768

    
1769
class LUDumpClusterConfig(NoHooksLU):
1770
  """Return a text-representation of the cluster-config.
1771

1772
  """
1773
  _OP_REQP = []
1774

    
1775
  def CheckPrereq(self):
1776
    """No prerequisites.
1777

1778
    """
1779
    pass
1780

    
1781
  def Exec(self, feedback_fn):
1782
    """Dump a representation of the cluster config to the standard output.
1783

1784
    """
1785
    return self.cfg.DumpConfig()
1786

    
1787

    
1788
class LUActivateInstanceDisks(NoHooksLU):
1789
  """Bring up an instance's disks.
1790

1791
  """
1792
  _OP_REQP = ["instance_name"]
1793

    
1794
  def CheckPrereq(self):
1795
    """Check prerequisites.
1796

1797
    This checks that the instance is in the cluster.
1798

1799
    """
1800
    instance = self.cfg.GetInstanceInfo(
1801
      self.cfg.ExpandInstanceName(self.op.instance_name))
1802
    if instance is None:
1803
      raise errors.OpPrereqError("Instance '%s' not known" %
1804
                                 self.op.instance_name)
1805
    self.instance = instance
1806

    
1807

    
1808
  def Exec(self, feedback_fn):
1809
    """Activate the disks.
1810

1811
    """
1812
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1813
    if not disks_ok:
1814
      raise errors.OpExecError("Cannot activate block devices")
1815

    
1816
    return disks_info
1817

    
1818

    
1819
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1820
  """Prepare the block devices for an instance.
1821

1822
  This sets up the block devices on all nodes.
1823

1824
  Args:
1825
    instance: a ganeti.objects.Instance object
1826
    ignore_secondaries: if true, errors on secondary nodes won't result
1827
                        in an error return from the function
1828

1829
  Returns:
1830
    false if the operation failed
1831
    list of (host, instance_visible_name, node_visible_name) if the operation
1832
         suceeded with the mapping from node devices to instance devices
1833
  """
1834
  device_info = []
1835
  disks_ok = True
1836
  iname = instance.name
1837
  # With the two passes mechanism we try to reduce the window of
1838
  # opportunity for the race condition of switching DRBD to primary
1839
  # before handshaking occured, but we do not eliminate it
1840

    
1841
  # The proper fix would be to wait (with some limits) until the
1842
  # connection has been made and drbd transitions from WFConnection
1843
  # into any other network-connected state (Connected, SyncTarget,
1844
  # SyncSource, etc.)
1845

    
1846
  # 1st pass, assemble on all nodes in secondary mode
1847
  for inst_disk in instance.disks:
1848
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1849
      cfg.SetDiskID(node_disk, node)
1850
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1851
      if not result:
1852
        logger.Error("could not prepare block device %s on node %s"
1853
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1854
        if not ignore_secondaries:
1855
          disks_ok = False
1856

    
1857
  # FIXME: race condition on drbd migration to primary
1858

    
1859
  # 2nd pass, do only the primary node
1860
  for inst_disk in instance.disks:
1861
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1862
      if node != instance.primary_node:
1863
        continue
1864
      cfg.SetDiskID(node_disk, node)
1865
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1866
      if not result:
1867
        logger.Error("could not prepare block device %s on node %s"
1868
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1869
        disks_ok = False
1870
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1871

    
1872
  # leave the disks configured for the primary node
1873
  # this is a workaround that would be fixed better by
1874
  # improving the logical/physical id handling
1875
  for disk in instance.disks:
1876
    cfg.SetDiskID(disk, instance.primary_node)
1877

    
1878
  return disks_ok, device_info
1879

    
1880

    
1881
def _StartInstanceDisks(cfg, instance, force):
1882
  """Start the disks of an instance.
1883

1884
  """
1885
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1886
                                           ignore_secondaries=force)
1887
  if not disks_ok:
1888
    _ShutdownInstanceDisks(instance, cfg)
1889
    if force is not None and not force:
1890
      logger.Error("If the message above refers to a secondary node,"
1891
                   " you can retry the operation using '--force'.")
1892
    raise errors.OpExecError("Disk consistency error")
1893

    
1894

    
1895
class LUDeactivateInstanceDisks(NoHooksLU):
1896
  """Shutdown an instance's disks.
1897

1898
  """
1899
  _OP_REQP = ["instance_name"]
1900

    
1901
  def CheckPrereq(self):
1902
    """Check prerequisites.
1903

1904
    This checks that the instance is in the cluster.
1905

1906
    """
1907
    instance = self.cfg.GetInstanceInfo(
1908
      self.cfg.ExpandInstanceName(self.op.instance_name))
1909
    if instance is None:
1910
      raise errors.OpPrereqError("Instance '%s' not known" %
1911
                                 self.op.instance_name)
1912
    self.instance = instance
1913

    
1914
  def Exec(self, feedback_fn):
1915
    """Deactivate the disks
1916

1917
    """
1918
    instance = self.instance
1919
    ins_l = rpc.call_instance_list([instance.primary_node])
1920
    ins_l = ins_l[instance.primary_node]
1921
    if not type(ins_l) is list:
1922
      raise errors.OpExecError("Can't contact node '%s'" %
1923
                               instance.primary_node)
1924

    
1925
    if self.instance.name in ins_l:
1926
      raise errors.OpExecError("Instance is running, can't shutdown"
1927
                               " block devices.")
1928

    
1929
    _ShutdownInstanceDisks(instance, self.cfg)
1930

    
1931

    
1932
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1933
  """Shutdown block devices of an instance.
1934

1935
  This does the shutdown on all nodes of the instance.
1936

1937
  If the ignore_primary is false, errors on the primary node are
1938
  ignored.
1939

1940
  """
1941
  result = True
1942
  for disk in instance.disks:
1943
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1944
      cfg.SetDiskID(top_disk, node)
1945
      if not rpc.call_blockdev_shutdown(node, top_disk):
1946
        logger.Error("could not shutdown block device %s on node %s" %
1947
                     (disk.iv_name, node))
1948
        if not ignore_primary or node != instance.primary_node:
1949
          result = False
1950
  return result
1951

    
1952

    
1953
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1954
  """Checks if a node has enough free memory.
1955

1956
  This function check if a given node has the needed amount of free
1957
  memory. In case the node has less memory or we cannot get the
1958
  information from the node, this function raise an OpPrereqError
1959
  exception.
1960

1961
  Args:
1962
    - cfg: a ConfigWriter instance
1963
    - node: the node name
1964
    - reason: string to use in the error message
1965
    - requested: the amount of memory in MiB
1966

1967
  """
1968
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1969
  if not nodeinfo or not isinstance(nodeinfo, dict):
1970
    raise errors.OpPrereqError("Could not contact node %s for resource"
1971
                             " information" % (node,))
1972

    
1973
  free_mem = nodeinfo[node].get('memory_free')
1974
  if not isinstance(free_mem, int):
1975
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1976
                             " was '%s'" % (node, free_mem))
1977
  if requested > free_mem:
1978
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1979
                             " needed %s MiB, available %s MiB" %
1980
                             (node, reason, requested, free_mem))
1981

    
1982

    
1983
class LUStartupInstance(LogicalUnit):
1984
  """Starts an instance.
1985

1986
  """
1987
  HPATH = "instance-start"
1988
  HTYPE = constants.HTYPE_INSTANCE
1989
  _OP_REQP = ["instance_name", "force"]
1990

    
1991
  def BuildHooksEnv(self):
1992
    """Build hooks env.
1993

1994
    This runs on master, primary and secondary nodes of the instance.
1995

1996
    """
1997
    env = {
1998
      "FORCE": self.op.force,
1999
      }
2000
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2001
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2002
          list(self.instance.secondary_nodes))
2003
    return env, nl, nl
2004

    
2005
  def CheckPrereq(self):
2006
    """Check prerequisites.
2007

2008
    This checks that the instance is in the cluster.
2009

2010
    """
2011
    instance = self.cfg.GetInstanceInfo(
2012
      self.cfg.ExpandInstanceName(self.op.instance_name))
2013
    if instance is None:
2014
      raise errors.OpPrereqError("Instance '%s' not known" %
2015
                                 self.op.instance_name)
2016

    
2017
    # check bridges existance
2018
    _CheckInstanceBridgesExist(instance)
2019

    
2020
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2021
                         "starting instance %s" % instance.name,
2022
                         instance.memory)
2023

    
2024
    self.instance = instance
2025
    self.op.instance_name = instance.name
2026

    
2027
  def Exec(self, feedback_fn):
2028
    """Start the instance.
2029

2030
    """
2031
    instance = self.instance
2032
    force = self.op.force
2033
    extra_args = getattr(self.op, "extra_args", "")
2034

    
2035
    self.cfg.MarkInstanceUp(instance.name)
2036

    
2037
    node_current = instance.primary_node
2038

    
2039
    _StartInstanceDisks(self.cfg, instance, force)
2040

    
2041
    if not rpc.call_instance_start(node_current, instance, extra_args):
2042
      _ShutdownInstanceDisks(instance, self.cfg)
2043
      raise errors.OpExecError("Could not start instance")
2044

    
2045

    
2046
class LURebootInstance(LogicalUnit):
2047
  """Reboot an instance.
2048

2049
  """
2050
  HPATH = "instance-reboot"
2051
  HTYPE = constants.HTYPE_INSTANCE
2052
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2053

    
2054
  def BuildHooksEnv(self):
2055
    """Build hooks env.
2056

2057
    This runs on master, primary and secondary nodes of the instance.
2058

2059
    """
2060
    env = {
2061
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2062
      }
2063
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2064
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2065
          list(self.instance.secondary_nodes))
2066
    return env, nl, nl
2067

    
2068
  def CheckPrereq(self):
2069
    """Check prerequisites.
2070

2071
    This checks that the instance is in the cluster.
2072

2073
    """
2074
    instance = self.cfg.GetInstanceInfo(
2075
      self.cfg.ExpandInstanceName(self.op.instance_name))
2076
    if instance is None:
2077
      raise errors.OpPrereqError("Instance '%s' not known" %
2078
                                 self.op.instance_name)
2079

    
2080
    # check bridges existance
2081
    _CheckInstanceBridgesExist(instance)
2082

    
2083
    self.instance = instance
2084
    self.op.instance_name = instance.name
2085

    
2086
  def Exec(self, feedback_fn):
2087
    """Reboot the instance.
2088

2089
    """
2090
    instance = self.instance
2091
    ignore_secondaries = self.op.ignore_secondaries
2092
    reboot_type = self.op.reboot_type
2093
    extra_args = getattr(self.op, "extra_args", "")
2094

    
2095
    node_current = instance.primary_node
2096

    
2097
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2098
                           constants.INSTANCE_REBOOT_HARD,
2099
                           constants.INSTANCE_REBOOT_FULL]:
2100
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2101
                                  (constants.INSTANCE_REBOOT_SOFT,
2102
                                   constants.INSTANCE_REBOOT_HARD,
2103
                                   constants.INSTANCE_REBOOT_FULL))
2104

    
2105
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2106
                       constants.INSTANCE_REBOOT_HARD]:
2107
      if not rpc.call_instance_reboot(node_current, instance,
2108
                                      reboot_type, extra_args):
2109
        raise errors.OpExecError("Could not reboot instance")
2110
    else:
2111
      if not rpc.call_instance_shutdown(node_current, instance):
2112
        raise errors.OpExecError("could not shutdown instance for full reboot")
2113
      _ShutdownInstanceDisks(instance, self.cfg)
2114
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2115
      if not rpc.call_instance_start(node_current, instance, extra_args):
2116
        _ShutdownInstanceDisks(instance, self.cfg)
2117
        raise errors.OpExecError("Could not start instance for full reboot")
2118

    
2119
    self.cfg.MarkInstanceUp(instance.name)
2120

    
2121

    
2122
class LUShutdownInstance(LogicalUnit):
2123
  """Shutdown an instance.
2124

2125
  """
2126
  HPATH = "instance-stop"
2127
  HTYPE = constants.HTYPE_INSTANCE
2128
  _OP_REQP = ["instance_name"]
2129

    
2130
  def BuildHooksEnv(self):
2131
    """Build hooks env.
2132

2133
    This runs on master, primary and secondary nodes of the instance.
2134

2135
    """
2136
    env = _BuildInstanceHookEnvByObject(self.instance)
2137
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2138
          list(self.instance.secondary_nodes))
2139
    return env, nl, nl
2140

    
2141
  def CheckPrereq(self):
2142
    """Check prerequisites.
2143

2144
    This checks that the instance is in the cluster.
2145

2146
    """
2147
    instance = self.cfg.GetInstanceInfo(
2148
      self.cfg.ExpandInstanceName(self.op.instance_name))
2149
    if instance is None:
2150
      raise errors.OpPrereqError("Instance '%s' not known" %
2151
                                 self.op.instance_name)
2152
    self.instance = instance
2153

    
2154
  def Exec(self, feedback_fn):
2155
    """Shutdown the instance.
2156

2157
    """
2158
    instance = self.instance
2159
    node_current = instance.primary_node
2160
    self.cfg.MarkInstanceDown(instance.name)
2161
    if not rpc.call_instance_shutdown(node_current, instance):
2162
      logger.Error("could not shutdown instance")
2163

    
2164
    _ShutdownInstanceDisks(instance, self.cfg)
2165

    
2166

    
2167
class LUReinstallInstance(LogicalUnit):
2168
  """Reinstall an instance.
2169

2170
  """
2171
  HPATH = "instance-reinstall"
2172
  HTYPE = constants.HTYPE_INSTANCE
2173
  _OP_REQP = ["instance_name"]
2174

    
2175
  def BuildHooksEnv(self):
2176
    """Build hooks env.
2177

2178
    This runs on master, primary and secondary nodes of the instance.
2179

2180
    """
2181
    env = _BuildInstanceHookEnvByObject(self.instance)
2182
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2183
          list(self.instance.secondary_nodes))
2184
    return env, nl, nl
2185

    
2186
  def CheckPrereq(self):
2187
    """Check prerequisites.
2188

2189
    This checks that the instance is in the cluster and is not running.
2190

2191
    """
2192
    instance = self.cfg.GetInstanceInfo(
2193
      self.cfg.ExpandInstanceName(self.op.instance_name))
2194
    if instance is None:
2195
      raise errors.OpPrereqError("Instance '%s' not known" %
2196
                                 self.op.instance_name)
2197
    if instance.disk_template == constants.DT_DISKLESS:
2198
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2199
                                 self.op.instance_name)
2200
    if instance.status != "down":
2201
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2202
                                 self.op.instance_name)
2203
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2204
    if remote_info:
2205
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2206
                                 (self.op.instance_name,
2207
                                  instance.primary_node))
2208

    
2209
    self.op.os_type = getattr(self.op, "os_type", None)
2210
    if self.op.os_type is not None:
2211
      # OS verification
2212
      pnode = self.cfg.GetNodeInfo(
2213
        self.cfg.ExpandNodeName(instance.primary_node))
2214
      if pnode is None:
2215
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2216
                                   self.op.pnode)
2217
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2218
      if not os_obj:
2219
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2220
                                   " primary node"  % self.op.os_type)
2221

    
2222
    self.instance = instance
2223

    
2224
  def Exec(self, feedback_fn):
2225
    """Reinstall the instance.
2226

2227
    """
2228
    inst = self.instance
2229

    
2230
    if self.op.os_type is not None:
2231
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2232
      inst.os = self.op.os_type
2233
      self.cfg.AddInstance(inst)
2234

    
2235
    _StartInstanceDisks(self.cfg, inst, None)
2236
    try:
2237
      feedback_fn("Running the instance OS create scripts...")
2238
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2239
        raise errors.OpExecError("Could not install OS for instance %s"
2240
                                 " on node %s" %
2241
                                 (inst.name, inst.primary_node))
2242
    finally:
2243
      _ShutdownInstanceDisks(inst, self.cfg)
2244

    
2245

    
2246
class LURenameInstance(LogicalUnit):
2247
  """Rename an instance.
2248

2249
  """
2250
  HPATH = "instance-rename"
2251
  HTYPE = constants.HTYPE_INSTANCE
2252
  _OP_REQP = ["instance_name", "new_name"]
2253

    
2254
  def BuildHooksEnv(self):
2255
    """Build hooks env.
2256

2257
    This runs on master, primary and secondary nodes of the instance.
2258

2259
    """
2260
    env = _BuildInstanceHookEnvByObject(self.instance)
2261
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2262
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2263
          list(self.instance.secondary_nodes))
2264
    return env, nl, nl
2265

    
2266
  def CheckPrereq(self):
2267
    """Check prerequisites.
2268

2269
    This checks that the instance is in the cluster and is not running.
2270

2271
    """
2272
    instance = self.cfg.GetInstanceInfo(
2273
      self.cfg.ExpandInstanceName(self.op.instance_name))
2274
    if instance is None:
2275
      raise errors.OpPrereqError("Instance '%s' not known" %
2276
                                 self.op.instance_name)
2277
    if instance.status != "down":
2278
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2279
                                 self.op.instance_name)
2280
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2281
    if remote_info:
2282
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2283
                                 (self.op.instance_name,
2284
                                  instance.primary_node))
2285
    self.instance = instance
2286

    
2287
    # new name verification
2288
    name_info = utils.HostInfo(self.op.new_name)
2289

    
2290
    self.op.new_name = new_name = name_info.name
2291
    instance_list = self.cfg.GetInstanceList()
2292
    if new_name in instance_list:
2293
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2294
                                 new_name)
2295

    
2296
    if not getattr(self.op, "ignore_ip", False):
2297
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2298
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2299
                                   (name_info.ip, new_name))
2300

    
2301

    
2302
  def Exec(self, feedback_fn):
2303
    """Reinstall the instance.
2304

2305
    """
2306
    inst = self.instance
2307
    old_name = inst.name
2308

    
2309
    if inst.disk_template == constants.DT_FILE:
2310
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2311

    
2312
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2313

    
2314
    # re-read the instance from the configuration after rename
2315
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2316

    
2317
    if inst.disk_template == constants.DT_FILE:
2318
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2319
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2320
                                                old_file_storage_dir,
2321
                                                new_file_storage_dir)
2322

    
2323
      if not result:
2324
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2325
                                 " directory '%s' to '%s' (but the instance"
2326
                                 " has been renamed in Ganeti)" % (
2327
                                 inst.primary_node, old_file_storage_dir,
2328
                                 new_file_storage_dir))
2329

    
2330
      if not result[0]:
2331
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2332
                                 " (but the instance has been renamed in"
2333
                                 " Ganeti)" % (old_file_storage_dir,
2334
                                               new_file_storage_dir))
2335

    
2336
    _StartInstanceDisks(self.cfg, inst, None)
2337
    try:
2338
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2339
                                          "sda", "sdb"):
2340
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2341
               " instance has been renamed in Ganeti)" %
2342
               (inst.name, inst.primary_node))
2343
        logger.Error(msg)
2344
    finally:
2345
      _ShutdownInstanceDisks(inst, self.cfg)
2346

    
2347

    
2348
class LURemoveInstance(LogicalUnit):
2349
  """Remove an instance.
2350

2351
  """
2352
  HPATH = "instance-remove"
2353
  HTYPE = constants.HTYPE_INSTANCE
2354
  _OP_REQP = ["instance_name", "ignore_failures"]
2355

    
2356
  def BuildHooksEnv(self):
2357
    """Build hooks env.
2358

2359
    This runs on master, primary and secondary nodes of the instance.
2360

2361
    """
2362
    env = _BuildInstanceHookEnvByObject(self.instance)
2363
    nl = [self.sstore.GetMasterNode()]
2364
    return env, nl, nl
2365

    
2366
  def CheckPrereq(self):
2367
    """Check prerequisites.
2368

2369
    This checks that the instance is in the cluster.
2370

2371
    """
2372
    instance = self.cfg.GetInstanceInfo(
2373
      self.cfg.ExpandInstanceName(self.op.instance_name))
2374
    if instance is None:
2375
      raise errors.OpPrereqError("Instance '%s' not known" %
2376
                                 self.op.instance_name)
2377
    self.instance = instance
2378

    
2379
  def Exec(self, feedback_fn):
2380
    """Remove the instance.
2381

2382
    """
2383
    instance = self.instance
2384
    logger.Info("shutting down instance %s on node %s" %
2385
                (instance.name, instance.primary_node))
2386

    
2387
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2388
      if self.op.ignore_failures:
2389
        feedback_fn("Warning: can't shutdown instance")
2390
      else:
2391
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2392
                                 (instance.name, instance.primary_node))
2393

    
2394
    logger.Info("removing block devices for instance %s" % instance.name)
2395

    
2396
    if not _RemoveDisks(instance, self.cfg):
2397
      if self.op.ignore_failures:
2398
        feedback_fn("Warning: can't remove instance's disks")
2399
      else:
2400
        raise errors.OpExecError("Can't remove instance's disks")
2401

    
2402
    logger.Info("removing instance %s out of cluster config" % instance.name)
2403

    
2404
    self.cfg.RemoveInstance(instance.name)
2405
    # Remove the new instance from the Ganeti Lock Manager
2406
    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2407

    
2408

    
2409
class LUQueryInstances(NoHooksLU):
2410
  """Logical unit for querying instances.
2411

2412
  """
2413
  _OP_REQP = ["output_fields", "names"]
2414

    
2415
  def CheckPrereq(self):
2416
    """Check prerequisites.
2417

2418
    This checks that the fields required are valid output fields.
2419

2420
    """
2421
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2422
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2423
                               "admin_state", "admin_ram",
2424
                               "disk_template", "ip", "mac", "bridge",
2425
                               "sda_size", "sdb_size", "vcpus", "tags"],
2426
                       dynamic=self.dynamic_fields,
2427
                       selected=self.op.output_fields)
2428

    
2429
    self.wanted = _GetWantedInstances(self, self.op.names)
2430

    
2431
  def Exec(self, feedback_fn):
2432
    """Computes the list of nodes and their attributes.
2433

2434
    """
2435
    instance_names = self.wanted
2436
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2437
                     in instance_names]
2438

    
2439
    # begin data gathering
2440

    
2441
    nodes = frozenset([inst.primary_node for inst in instance_list])
2442

    
2443
    bad_nodes = []
2444
    if self.dynamic_fields.intersection(self.op.output_fields):
2445
      live_data = {}
2446
      node_data = rpc.call_all_instances_info(nodes)
2447
      for name in nodes:
2448
        result = node_data[name]
2449
        if result:
2450
          live_data.update(result)
2451
        elif result == False:
2452
          bad_nodes.append(name)
2453
        # else no instance is alive
2454
    else:
2455
      live_data = dict([(name, {}) for name in instance_names])
2456

    
2457
    # end data gathering
2458

    
2459
    output = []
2460
    for instance in instance_list:
2461
      iout = []
2462
      for field in self.op.output_fields:
2463
        if field == "name":
2464
          val = instance.name
2465
        elif field == "os":
2466
          val = instance.os
2467
        elif field == "pnode":
2468
          val = instance.primary_node
2469
        elif field == "snodes":
2470
          val = list(instance.secondary_nodes)
2471
        elif field == "admin_state":
2472
          val = (instance.status != "down")
2473
        elif field == "oper_state":
2474
          if instance.primary_node in bad_nodes:
2475
            val = None
2476
          else:
2477
            val = bool(live_data.get(instance.name))
2478
        elif field == "status":
2479
          if instance.primary_node in bad_nodes:
2480
            val = "ERROR_nodedown"
2481
          else:
2482
            running = bool(live_data.get(instance.name))
2483
            if running:
2484
              if instance.status != "down":
2485
                val = "running"
2486
              else:
2487
                val = "ERROR_up"
2488
            else:
2489
              if instance.status != "down":
2490
                val = "ERROR_down"
2491
              else:
2492
                val = "ADMIN_down"
2493
        elif field == "admin_ram":
2494
          val = instance.memory
2495
        elif field == "oper_ram":
2496
          if instance.primary_node in bad_nodes:
2497
            val = None
2498
          elif instance.name in live_data:
2499
            val = live_data[instance.name].get("memory", "?")
2500
          else:
2501
            val = "-"
2502
        elif field == "disk_template":
2503
          val = instance.disk_template
2504
        elif field == "ip":
2505
          val = instance.nics[0].ip
2506
        elif field == "bridge":
2507
          val = instance.nics[0].bridge
2508
        elif field == "mac":
2509
          val = instance.nics[0].mac
2510
        elif field == "sda_size" or field == "sdb_size":
2511
          disk = instance.FindDisk(field[:3])
2512
          if disk is None:
2513
            val = None
2514
          else:
2515
            val = disk.size
2516
        elif field == "vcpus":
2517
          val = instance.vcpus
2518
        elif field == "tags":
2519
          val = list(instance.GetTags())
2520
        else:
2521
          raise errors.ParameterError(field)
2522
        iout.append(val)
2523
      output.append(iout)
2524

    
2525
    return output
2526

    
2527

    
2528
class LUFailoverInstance(LogicalUnit):
2529
  """Failover an instance.
2530

2531
  """
2532
  HPATH = "instance-failover"
2533
  HTYPE = constants.HTYPE_INSTANCE
2534
  _OP_REQP = ["instance_name", "ignore_consistency"]
2535

    
2536
  def BuildHooksEnv(self):
2537
    """Build hooks env.
2538

2539
    This runs on master, primary and secondary nodes of the instance.
2540

2541
    """
2542
    env = {
2543
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2544
      }
2545
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2546
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2547
    return env, nl, nl
2548

    
2549
  def CheckPrereq(self):
2550
    """Check prerequisites.
2551

2552
    This checks that the instance is in the cluster.
2553

2554
    """
2555
    instance = self.cfg.GetInstanceInfo(
2556
      self.cfg.ExpandInstanceName(self.op.instance_name))
2557
    if instance is None:
2558
      raise errors.OpPrereqError("Instance '%s' not known" %
2559
                                 self.op.instance_name)
2560

    
2561
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2562
      raise errors.OpPrereqError("Instance's disk layout is not"
2563
                                 " network mirrored, cannot failover.")
2564

    
2565
    secondary_nodes = instance.secondary_nodes
2566
    if not secondary_nodes:
2567
      raise errors.ProgrammerError("no secondary node but using "
2568
                                   "a mirrored disk template")
2569

    
2570
    target_node = secondary_nodes[0]
2571
    # check memory requirements on the secondary node
2572
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2573
                         instance.name, instance.memory)
2574

    
2575
    # check bridge existance
2576
    brlist = [nic.bridge for nic in instance.nics]
2577
    if not rpc.call_bridges_exist(target_node, brlist):
2578
      raise errors.OpPrereqError("One or more target bridges %s does not"
2579
                                 " exist on destination node '%s'" %
2580
                                 (brlist, target_node))
2581

    
2582
    self.instance = instance
2583

    
2584
  def Exec(self, feedback_fn):
2585
    """Failover an instance.
2586

2587
    The failover is done by shutting it down on its present node and
2588
    starting it on the secondary.
2589

2590
    """
2591
    instance = self.instance
2592

    
2593
    source_node = instance.primary_node
2594
    target_node = instance.secondary_nodes[0]
2595

    
2596
    feedback_fn("* checking disk consistency between source and target")
2597
    for dev in instance.disks:
2598
      # for drbd, these are drbd over lvm
2599
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2600
        if instance.status == "up" and not self.op.ignore_consistency:
2601
          raise errors.OpExecError("Disk %s is degraded on target node,"
2602
                                   " aborting failover." % dev.iv_name)
2603

    
2604
    feedback_fn("* shutting down instance on source node")
2605
    logger.Info("Shutting down instance %s on node %s" %
2606
                (instance.name, source_node))
2607

    
2608
    if not rpc.call_instance_shutdown(source_node, instance):
2609
      if self.op.ignore_consistency:
2610
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2611
                     " anyway. Please make sure node %s is down"  %
2612
                     (instance.name, source_node, source_node))
2613
      else:
2614
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2615
                                 (instance.name, source_node))
2616

    
2617
    feedback_fn("* deactivating the instance's disks on source node")
2618
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2619
      raise errors.OpExecError("Can't shut down the instance's disks.")
2620

    
2621
    instance.primary_node = target_node
2622
    # distribute new instance config to the other nodes
2623
    self.cfg.Update(instance)
2624

    
2625
    # Only start the instance if it's marked as up
2626
    if instance.status == "up":
2627
      feedback_fn("* activating the instance's disks on target node")
2628
      logger.Info("Starting instance %s on node %s" %
2629
                  (instance.name, target_node))
2630

    
2631
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2632
                                               ignore_secondaries=True)
2633
      if not disks_ok:
2634
        _ShutdownInstanceDisks(instance, self.cfg)
2635
        raise errors.OpExecError("Can't activate the instance's disks")
2636

    
2637
      feedback_fn("* starting the instance on the target node")
2638
      if not rpc.call_instance_start(target_node, instance, None):
2639
        _ShutdownInstanceDisks(instance, self.cfg)
2640
        raise errors.OpExecError("Could not start instance %s on node %s." %
2641
                                 (instance.name, target_node))
2642

    
2643

    
2644
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2645
  """Create a tree of block devices on the primary node.
2646

2647
  This always creates all devices.
2648

2649
  """
2650
  if device.children:
2651
    for child in device.children:
2652
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2653
        return False
2654

    
2655
  cfg.SetDiskID(device, node)
2656
  new_id = rpc.call_blockdev_create(node, device, device.size,
2657
                                    instance.name, True, info)
2658
  if not new_id:
2659
    return False
2660
  if device.physical_id is None:
2661
    device.physical_id = new_id
2662
  return True
2663

    
2664

    
2665
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2666
  """Create a tree of block devices on a secondary node.
2667

2668
  If this device type has to be created on secondaries, create it and
2669
  all its children.
2670

2671
  If not, just recurse to children keeping the same 'force' value.
2672

2673
  """
2674
  if device.CreateOnSecondary():
2675
    force = True
2676
  if device.children:
2677
    for child in device.children:
2678
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2679
                                        child, force, info):
2680
        return False
2681

    
2682
  if not force:
2683
    return True
2684
  cfg.SetDiskID(device, node)
2685
  new_id = rpc.call_blockdev_create(node, device, device.size,
2686
                                    instance.name, False, info)
2687
  if not new_id:
2688
    return False
2689
  if device.physical_id is None:
2690
    device.physical_id = new_id
2691
  return True
2692

    
2693

    
2694
def _GenerateUniqueNames(cfg, exts):
2695
  """Generate a suitable LV name.
2696

2697
  This will generate a logical volume name for the given instance.
2698

2699
  """
2700
  results = []
2701
  for val in exts:
2702
    new_id = cfg.GenerateUniqueID()
2703
    results.append("%s%s" % (new_id, val))
2704
  return results
2705

    
2706

    
2707
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2708
  """Generate a drbd8 device complete with its children.
2709

2710
  """
2711
  port = cfg.AllocatePort()
2712
  vgname = cfg.GetVGName()
2713
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2714
                          logical_id=(vgname, names[0]))
2715
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2716
                          logical_id=(vgname, names[1]))
2717
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2718
                          logical_id = (primary, secondary, port),
2719
                          children = [dev_data, dev_meta],
2720
                          iv_name=iv_name)
2721
  return drbd_dev
2722

    
2723

    
2724
def _GenerateDiskTemplate(cfg, template_name,
2725
                          instance_name, primary_node,
2726
                          secondary_nodes, disk_sz, swap_sz,
2727
                          file_storage_dir, file_driver):
2728
  """Generate the entire disk layout for a given template type.
2729

2730
  """
2731
  #TODO: compute space requirements
2732

    
2733
  vgname = cfg.GetVGName()
2734
  if template_name == constants.DT_DISKLESS:
2735
    disks = []
2736
  elif template_name == constants.DT_PLAIN:
2737
    if len(secondary_nodes) != 0:
2738
      raise errors.ProgrammerError("Wrong template configuration")
2739

    
2740
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2741
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2742
                           logical_id=(vgname, names[0]),
2743
                           iv_name = "sda")
2744
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2745
                           logical_id=(vgname, names[1]),
2746
                           iv_name = "sdb")
2747
    disks = [sda_dev, sdb_dev]
2748
  elif template_name == constants.DT_DRBD8:
2749
    if len(secondary_nodes) != 1:
2750
      raise errors.ProgrammerError("Wrong template configuration")
2751
    remote_node = secondary_nodes[0]
2752
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2753
                                       ".sdb_data", ".sdb_meta"])
2754
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2755
                                         disk_sz, names[0:2], "sda")
2756
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2757
                                         swap_sz, names[2:4], "sdb")
2758
    disks = [drbd_sda_dev, drbd_sdb_dev]
2759
  elif template_name == constants.DT_FILE:
2760
    if len(secondary_nodes) != 0:
2761
      raise errors.ProgrammerError("Wrong template configuration")
2762

    
2763
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2764
                                iv_name="sda", logical_id=(file_driver,
2765
                                "%s/sda" % file_storage_dir))
2766
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2767
                                iv_name="sdb", logical_id=(file_driver,
2768
                                "%s/sdb" % file_storage_dir))
2769
    disks = [file_sda_dev, file_sdb_dev]
2770
  else:
2771
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2772
  return disks
2773

    
2774

    
2775
def _GetInstanceInfoText(instance):
2776
  """Compute that text that should be added to the disk's metadata.
2777

2778
  """
2779
  return "originstname+%s" % instance.name
2780

    
2781

    
2782
def _CreateDisks(cfg, instance):
2783
  """Create all disks for an instance.
2784

2785
  This abstracts away some work from AddInstance.
2786

2787
  Args:
2788
    instance: the instance object
2789

2790
  Returns:
2791
    True or False showing the success of the creation process
2792

2793
  """
2794
  info = _GetInstanceInfoText(instance)
2795

    
2796
  if instance.disk_template == constants.DT_FILE:
2797
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2798
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2799
                                              file_storage_dir)
2800

    
2801
    if not result:
2802
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2803
      return False
2804

    
2805
    if not result[0]:
2806
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2807
      return False
2808

    
2809
  for device in instance.disks:
2810
    logger.Info("creating volume %s for instance %s" %
2811
                (device.iv_name, instance.name))
2812
    #HARDCODE
2813
    for secondary_node in instance.secondary_nodes:
2814
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2815
                                        device, False, info):
2816
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2817
                     (device.iv_name, device, secondary_node))
2818
        return False
2819
    #HARDCODE
2820
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2821
                                    instance, device, info):
2822
      logger.Error("failed to create volume %s on primary!" %
2823
                   device.iv_name)
2824
      return False
2825

    
2826
  return True
2827

    
2828

    
2829
def _RemoveDisks(instance, cfg):
2830
  """Remove all disks for an instance.
2831

2832
  This abstracts away some work from `AddInstance()` and
2833
  `RemoveInstance()`. Note that in case some of the devices couldn't
2834
  be removed, the removal will continue with the other ones (compare
2835
  with `_CreateDisks()`).
2836

2837
  Args:
2838
    instance: the instance object
2839

2840
  Returns:
2841
    True or False showing the success of the removal proces
2842

2843
  """
2844
  logger.Info("removing block devices for instance %s" % instance.name)
2845

    
2846
  result = True
2847
  for device in instance.disks:
2848
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2849
      cfg.SetDiskID(disk, node)
2850
      if not rpc.call_blockdev_remove(node, disk):
2851
        logger.Error("could not remove block device %s on node %s,"
2852
                     " continuing anyway" %
2853
                     (device.iv_name, node))
2854
        result = False
2855

    
2856
  if instance.disk_template == constants.DT_FILE:
2857
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2858
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
2859
                                            file_storage_dir):
2860
      logger.Error("could not remove directory '%s'" % file_storage_dir)
2861
      result = False
2862

    
2863
  return result
2864

    
2865

    
2866
def _ComputeDiskSize(disk_template, disk_size, swap_size):
2867
  """Compute disk size requirements in the volume group
2868

2869
  This is currently hard-coded for the two-drive layout.
2870

2871
  """
2872
  # Required free disk space as a function of disk and swap space
2873
  req_size_dict = {
2874
    constants.DT_DISKLESS: None,
2875
    constants.DT_PLAIN: disk_size + swap_size,
2876
    # 256 MB are added for drbd metadata, 128MB for each drbd device
2877
    constants.DT_DRBD8: disk_size + swap_size + 256,
2878
    constants.DT_FILE: None,
2879
  }
2880

    
2881
  if disk_template not in req_size_dict:
2882
    raise errors.ProgrammerError("Disk template '%s' size requirement"
2883
                                 " is unknown" %  disk_template)
2884

    
2885
  return req_size_dict[disk_template]
2886

    
2887

    
2888
class LUCreateInstance(LogicalUnit):
2889
  """Create an instance.
2890

2891
  """
2892
  HPATH = "instance-add"
2893
  HTYPE = constants.HTYPE_INSTANCE
2894
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
2895
              "disk_template", "swap_size", "mode", "start", "vcpus",
2896
              "wait_for_sync", "ip_check", "mac"]
2897

    
2898
  def _RunAllocator(self):
2899
    """Run the allocator based on input opcode.
2900

2901
    """
2902
    disks = [{"size": self.op.disk_size, "mode": "w"},
2903
             {"size": self.op.swap_size, "mode": "w"}]
2904
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2905
             "bridge": self.op.bridge}]
2906
    ial = IAllocator(self.cfg, self.sstore,
2907
                     mode=constants.IALLOCATOR_MODE_ALLOC,
2908
                     name=self.op.instance_name,
2909
                     disk_template=self.op.disk_template,
2910
                     tags=[],
2911
                     os=self.op.os_type,
2912
                     vcpus=self.op.vcpus,
2913
                     mem_size=self.op.mem_size,
2914
                     disks=disks,
2915
                     nics=nics,
2916
                     )
2917

    
2918
    ial.Run(self.op.iallocator)
2919

    
2920
    if not ial.success:
2921
      raise errors.OpPrereqError("Can't compute nodes using"
2922
                                 " iallocator '%s': %s" % (self.op.iallocator,
2923
                                                           ial.info))
2924
    if len(ial.nodes) != ial.required_nodes:
2925
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2926
                                 " of nodes (%s), required %s" %
2927
                                 (len(ial.nodes), ial.required_nodes))
2928
    self.op.pnode = ial.nodes[0]
2929
    logger.ToStdout("Selected nodes for the instance: %s" %
2930
                    (", ".join(ial.nodes),))
2931
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2932
                (self.op.instance_name, self.op.iallocator, ial.nodes))
2933
    if ial.required_nodes == 2:
2934
      self.op.snode = ial.nodes[1]
2935

    
2936
  def BuildHooksEnv(self):
2937
    """Build hooks env.
2938

2939
    This runs on master, primary and secondary nodes of the instance.
2940

2941
    """
2942
    env = {
2943
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2944
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2945
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2946
      "INSTANCE_ADD_MODE": self.op.mode,
2947
      }
2948
    if self.op.mode == constants.INSTANCE_IMPORT:
2949
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2950
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2951
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2952

    
2953
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2954
      primary_node=self.op.pnode,
2955
      secondary_nodes=self.secondaries,
2956
      status=self.instance_status,
2957
      os_type=self.op.os_type,
2958
      memory=self.op.mem_size,
2959
      vcpus=self.op.vcpus,
2960
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2961
    ))
2962

    
2963
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2964
          self.secondaries)
2965
    return env, nl, nl
2966

    
2967

    
2968
  def CheckPrereq(self):
2969
    """Check prerequisites.
2970

2971
    """
2972
    # set optional parameters to none if they don't exist
2973
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
2974
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
2975
                 "vnc_bind_address"]:
2976
      if not hasattr(self.op, attr):
2977
        setattr(self.op, attr, None)
2978

    
2979
    if self.op.mode not in (constants.INSTANCE_CREATE,
2980
                            constants.INSTANCE_IMPORT):
2981
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2982
                                 self.op.mode)
2983

    
2984
    if (not self.cfg.GetVGName() and
2985
        self.op.disk_template not in constants.DTS_NOT_LVM):
2986
      raise errors.OpPrereqError("Cluster does not support lvm-based"
2987
                                 " instances")
2988

    
2989
    if self.op.mode == constants.INSTANCE_IMPORT:
2990
      src_node = getattr(self.op, "src_node", None)
2991
      src_path = getattr(self.op, "src_path", None)
2992
      if src_node is None or src_path is None:
2993
        raise errors.OpPrereqError("Importing an instance requires source"
2994
                                   " node and path options")
2995
      src_node_full = self.cfg.ExpandNodeName(src_node)
2996
      if src_node_full is None:
2997
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2998
      self.op.src_node = src_node = src_node_full
2999

    
3000
      if not os.path.isabs(src_path):
3001
        raise errors.OpPrereqError("The source path must be absolute")
3002

    
3003
      export_info = rpc.call_export_info(src_node, src_path)
3004

    
3005
      if not export_info:
3006
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3007

    
3008
      if not export_info.has_section(constants.INISECT_EXP):
3009
        raise errors.ProgrammerError("Corrupted export config")
3010

    
3011
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3012
      if (int(ei_version) != constants.EXPORT_VERSION):
3013
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3014
                                   (ei_version, constants.EXPORT_VERSION))
3015

    
3016
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3017
        raise errors.OpPrereqError("Can't import instance with more than"
3018
                                   " one data disk")
3019

    
3020
      # FIXME: are the old os-es, disk sizes, etc. useful?
3021
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3022
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3023
                                                         'disk0_dump'))
3024
      self.src_image = diskimage
3025
    else: # INSTANCE_CREATE
3026
      if getattr(self.op, "os_type", None) is None:
3027
        raise errors.OpPrereqError("No guest OS specified")
3028

    
3029
    #### instance parameters check
3030

    
3031
    # disk template and mirror node verification
3032
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3033
      raise errors.OpPrereqError("Invalid disk template name")
3034

    
3035
    # instance name verification
3036
    hostname1 = utils.HostInfo(self.op.instance_name)
3037

    
3038
    self.op.instance_name = instance_name = hostname1.name
3039
    instance_list = self.cfg.GetInstanceList()
3040
    if instance_name in instance_list:
3041
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3042
                                 instance_name)
3043

    
3044
    # ip validity checks
3045
    ip = getattr(self.op, "ip", None)
3046
    if ip is None or ip.lower() == "none":
3047
      inst_ip = None
3048
    elif ip.lower() == "auto":
3049
      inst_ip = hostname1.ip
3050
    else:
3051
      if not utils.IsValidIP(ip):
3052
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3053
                                   " like a valid IP" % ip)
3054
      inst_ip = ip
3055
    self.inst_ip = self.op.ip = inst_ip
3056

    
3057
    if self.op.start and not self.op.ip_check:
3058
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3059
                                 " adding an instance in start mode")
3060

    
3061
    if self.op.ip_check:
3062
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3063
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3064
                                   (hostname1.ip, instance_name))
3065

    
3066
    # MAC address verification
3067
    if self.op.mac != "auto":
3068
      if not utils.IsValidMac(self.op.mac.lower()):
3069
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3070
                                   self.op.mac)
3071

    
3072
    # bridge verification
3073
    bridge = getattr(self.op, "bridge", None)
3074
    if bridge is None:
3075
      self.op.bridge = self.cfg.GetDefBridge()
3076
    else:
3077
      self.op.bridge = bridge
3078

    
3079
    # boot order verification
3080
    if self.op.hvm_boot_order is not None:
3081
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3082
        raise errors.OpPrereqError("invalid boot order specified,"
3083
                                   " must be one or more of [acdn]")
3084
    # file storage checks
3085
    if (self.op.file_driver and
3086
        not self.op.file_driver in constants.FILE_DRIVER):
3087
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3088
                                 self.op.file_driver)
3089

    
3090
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3091
      raise errors.OpPrereqError("File storage directory not a relative"
3092
                                 " path")
3093
    #### allocator run
3094

    
3095
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3096
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3097
                                 " node must be given")
3098

    
3099
    if self.op.iallocator is not None:
3100
      self._RunAllocator()
3101

    
3102
    #### node related checks
3103

    
3104
    # check primary node
3105
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3106
    if pnode is None:
3107
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3108
                                 self.op.pnode)
3109
    self.op.pnode = pnode.name
3110
    self.pnode = pnode
3111
    self.secondaries = []
3112

    
3113
    # mirror node verification
3114
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3115
      if getattr(self.op, "snode", None) is None:
3116
        raise errors.OpPrereqError("The networked disk templates need"
3117
                                   " a mirror node")
3118

    
3119
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3120
      if snode_name is None:
3121
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3122
                                   self.op.snode)
3123
      elif snode_name == pnode.name:
3124
        raise errors.OpPrereqError("The secondary node cannot be"
3125
                                   " the primary node.")
3126
      self.secondaries.append(snode_name)
3127

    
3128
    req_size = _ComputeDiskSize(self.op.disk_template,
3129
                                self.op.disk_size, self.op.swap_size)
3130

    
3131
    # Check lv size requirements
3132
    if req_size is not None:
3133
      nodenames = [pnode.name] + self.secondaries
3134
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3135
      for node in nodenames:
3136
        info = nodeinfo.get(node, None)
3137
        if not info:
3138
          raise errors.OpPrereqError("Cannot get current information"
3139
                                     " from node '%s'" % node)
3140
        vg_free = info.get('vg_free', None)
3141
        if not isinstance(vg_free, int):
3142
          raise errors.OpPrereqError("Can't compute free disk space on"
3143
                                     " node %s" % node)
3144
        if req_size > info['vg_free']:
3145
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3146
                                     " %d MB available, %d MB required" %
3147
                                     (node, info['vg_free'], req_size))
3148

    
3149
    # os verification
3150
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3151
    if not os_obj:
3152
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3153
                                 " primary node"  % self.op.os_type)
3154

    
3155
    if self.op.kernel_path == constants.VALUE_NONE:
3156
      raise errors.OpPrereqError("Can't set instance kernel to none")
3157

    
3158

    
3159
    # bridge check on primary node
3160
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3161
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3162
                                 " destination node '%s'" %
3163
                                 (self.op.bridge, pnode.name))
3164

    
3165
    # memory check on primary node
3166
    if self.op.start:
3167
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3168
                           "creating instance %s" % self.op.instance_name,
3169
                           self.op.mem_size)
3170

    
3171
    # hvm_cdrom_image_path verification
3172
    if self.op.hvm_cdrom_image_path is not None:
3173
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3174
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3175
                                   " be an absolute path or None, not %s" %
3176
                                   self.op.hvm_cdrom_image_path)
3177
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3178
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3179
                                   " regular file or a symlink pointing to"
3180
                                   " an existing regular file, not %s" %
3181
                                   self.op.hvm_cdrom_image_path)
3182

    
3183
    # vnc_bind_address verification
3184
    if self.op.vnc_bind_address is not None:
3185
      if not utils.IsValidIP(self.op.vnc_bind_address):
3186
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3187
                                   " like a valid IP address" %
3188
                                   self.op.vnc_bind_address)
3189

    
3190
    if self.op.start:
3191
      self.instance_status = 'up'
3192
    else:
3193
      self.instance_status = 'down'
3194

    
3195
  def Exec(self, feedback_fn):
3196
    """Create and add the instance to the cluster.
3197

3198
    """
3199
    instance = self.op.instance_name
3200
    pnode_name = self.pnode.name
3201

    
3202
    if self.op.mac == "auto":
3203
      mac_address = self.cfg.GenerateMAC()
3204
    else:
3205
      mac_address = self.op.mac
3206

    
3207
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3208
    if self.inst_ip is not None:
3209
      nic.ip = self.inst_ip
3210

    
3211
    ht_kind = self.sstore.GetHypervisorType()
3212
    if ht_kind in constants.HTS_REQ_PORT:
3213
      network_port = self.cfg.AllocatePort()
3214
    else:
3215
      network_port = None
3216

    
3217
    if self.op.vnc_bind_address is None:
3218
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3219

    
3220
    # this is needed because os.path.join does not accept None arguments
3221
    if self.op.file_storage_dir is None:
3222
      string_file_storage_dir = ""
3223
    else:
3224
      string_file_storage_dir = self.op.file_storage_dir
3225

    
3226
    # build the full file storage dir path
3227
    file_storage_dir = os.path.normpath(os.path.join(
3228
                                        self.sstore.GetFileStorageDir(),
3229
                                        string_file_storage_dir, instance))
3230

    
3231

    
3232
    disks = _GenerateDiskTemplate(self.cfg,
3233
                                  self.op.disk_template,
3234
                                  instance, pnode_name,
3235
                                  self.secondaries, self.op.disk_size,
3236
                                  self.op.swap_size,
3237
                                  file_storage_dir,
3238
                                  self.op.file_driver)
3239

    
3240
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3241
                            primary_node=pnode_name,
3242
                            memory=self.op.mem_size,
3243
                            vcpus=self.op.vcpus,
3244
                            nics=[nic], disks=disks,
3245
                            disk_template=self.op.disk_template,
3246
                            status=self.instance_status,
3247
                            network_port=network_port,
3248
                            kernel_path=self.op.kernel_path,
3249
                            initrd_path=self.op.initrd_path,
3250
                            hvm_boot_order=self.op.hvm_boot_order,
3251
                            hvm_acpi=self.op.hvm_acpi,
3252
                            hvm_pae=self.op.hvm_pae,
3253
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3254
                            vnc_bind_address=self.op.vnc_bind_address,
3255
                            )
3256

    
3257
    feedback_fn("* creating instance disks...")
3258
    if not _CreateDisks(self.cfg, iobj):
3259
      _RemoveDisks(iobj, self.cfg)
3260
      raise errors.OpExecError("Device creation failed, reverting...")
3261

    
3262
    feedback_fn("adding instance %s to cluster config" % instance)
3263

    
3264
    self.cfg.AddInstance(iobj)
3265
    # Add the new instance to the Ganeti Lock Manager
3266
    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3267

    
3268
    if self.op.wait_for_sync:
3269
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3270
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3271
      # make sure the disks are not degraded (still sync-ing is ok)
3272
      time.sleep(15)
3273
      feedback_fn("* checking mirrors status")
3274
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3275
    else:
3276
      disk_abort = False
3277

    
3278
    if disk_abort:
3279
      _RemoveDisks(iobj, self.cfg)
3280
      self.cfg.RemoveInstance(iobj.name)
3281
      # Remove the new instance from the Ganeti Lock Manager
3282
      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3283
      raise errors.OpExecError("There are some degraded disks for"
3284
                               " this instance")
3285

    
3286
    feedback_fn("creating os for instance %s on node %s" %
3287
                (instance, pnode_name))
3288

    
3289
    if iobj.disk_template != constants.DT_DISKLESS:
3290
      if self.op.mode == constants.INSTANCE_CREATE:
3291
        feedback_fn("* running the instance OS create scripts...")
3292
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3293
          raise errors.OpExecError("could not add os for instance %s"
3294
                                   " on node %s" %
3295
                                   (instance, pnode_name))
3296

    
3297
      elif self.op.mode == constants.INSTANCE_IMPORT:
3298
        feedback_fn("* running the instance OS import scripts...")
3299
        src_node = self.op.src_node
3300
        src_image = self.src_image
3301
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3302
                                                src_node, src_image):
3303
          raise errors.OpExecError("Could not import os for instance"
3304
                                   " %s on node %s" %
3305
                                   (instance, pnode_name))
3306
      else:
3307
        # also checked in the prereq part
3308
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3309
                                     % self.op.mode)
3310

    
3311
    if self.op.start:
3312
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3313
      feedback_fn("* starting instance...")
3314
      if not rpc.call_instance_start(pnode_name, iobj, None):
3315
        raise errors.OpExecError("Could not start instance")
3316

    
3317

    
3318
class LUConnectConsole(NoHooksLU):
3319
  """Connect to an instance's console.
3320

3321
  This is somewhat special in that it returns the command line that
3322
  you need to run on the master node in order to connect to the
3323
  console.
3324

3325
  """
3326
  _OP_REQP = ["instance_name"]
3327

    
3328
  def CheckPrereq(self):
3329
    """Check prerequisites.
3330

3331
    This checks that the instance is in the cluster.
3332

3333
    """
3334
    instance = self.cfg.GetInstanceInfo(
3335
      self.cfg.ExpandInstanceName(self.op.instance_name))
3336
    if instance is None:
3337
      raise errors.OpPrereqError("Instance '%s' not known" %
3338
                                 self.op.instance_name)
3339
    self.instance = instance
3340

    
3341
  def Exec(self, feedback_fn):
3342
    """Connect to the console of an instance
3343

3344
    """
3345
    instance = self.instance
3346
    node = instance.primary_node
3347

    
3348
    node_insts = rpc.call_instance_list([node])[node]
3349
    if node_insts is False:
3350
      raise errors.OpExecError("Can't connect to node %s." % node)
3351

    
3352
    if instance.name not in node_insts:
3353
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3354

    
3355
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3356

    
3357
    hyper = hypervisor.GetHypervisor()
3358
    console_cmd = hyper.GetShellCommandForConsole(instance)
3359

    
3360
    # build ssh cmdline
3361
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3362

    
3363

    
3364
class LUReplaceDisks(LogicalUnit):
3365
  """Replace the disks of an instance.
3366

3367
  """
3368
  HPATH = "mirrors-replace"
3369
  HTYPE = constants.HTYPE_INSTANCE
3370
  _OP_REQP = ["instance_name", "mode", "disks"]
3371

    
3372
  def _RunAllocator(self):
3373
    """Compute a new secondary node using an IAllocator.
3374

3375
    """
3376
    ial = IAllocator(self.cfg, self.sstore,
3377
                     mode=constants.IALLOCATOR_MODE_RELOC,
3378
                     name=self.op.instance_name,
3379
                     relocate_from=[self.sec_node])
3380

    
3381
    ial.Run(self.op.iallocator)
3382

    
3383
    if not ial.success:
3384
      raise errors.OpPrereqError("Can't compute nodes using"
3385
                                 " iallocator '%s': %s" % (self.op.iallocator,
3386
                                                           ial.info))
3387
    if len(ial.nodes) != ial.required_nodes:
3388
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3389
                                 " of nodes (%s), required %s" %
3390
                                 (len(ial.nodes), ial.required_nodes))
3391
    self.op.remote_node = ial.nodes[0]
3392
    logger.ToStdout("Selected new secondary for the instance: %s" %
3393
                    self.op.remote_node)
3394

    
3395
  def BuildHooksEnv(self):
3396
    """Build hooks env.
3397

3398
    This runs on the master, the primary and all the secondaries.
3399

3400
    """
3401
    env = {
3402
      "MODE": self.op.mode,
3403
      "NEW_SECONDARY": self.op.remote_node,
3404
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3405
      }
3406
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3407
    nl = [
3408
      self.sstore.GetMasterNode(),
3409
      self.instance.primary_node,
3410
      ]
3411
    if self.op.remote_node is not None:
3412
      nl.append(self.op.remote_node)
3413
    return env, nl, nl
3414

    
3415
  def CheckPrereq(self):
3416
    """Check prerequisites.
3417

3418
    This checks that the instance is in the cluster.
3419

3420
    """
3421
    if not hasattr(self.op, "remote_node"):
3422
      self.op.remote_node = None
3423

    
3424
    instance = self.cfg.GetInstanceInfo(
3425
      self.cfg.ExpandInstanceName(self.op.instance_name))
3426
    if instance is None:
3427
      raise errors.OpPrereqError("Instance '%s' not known" %
3428
                                 self.op.instance_name)
3429
    self.instance = instance
3430
    self.op.instance_name = instance.name
3431

    
3432
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3433
      raise errors.OpPrereqError("Instance's disk layout is not"
3434
                                 " network mirrored.")
3435

    
3436
    if len(instance.secondary_nodes) != 1:
3437
      raise errors.OpPrereqError("The instance has a strange layout,"
3438
                                 " expected one secondary but found %d" %
3439
                                 len(instance.secondary_nodes))
3440

    
3441
    self.sec_node = instance.secondary_nodes[0]
3442

    
3443
    ia_name = getattr(self.op, "iallocator", None)
3444
    if ia_name is not None:
3445
      if self.op.remote_node is not None:
3446
        raise errors.OpPrereqError("Give either the iallocator or the new"
3447
                                   " secondary, not both")
3448
      self.op.remote_node = self._RunAllocator()
3449

    
3450
    remote_node = self.op.remote_node
3451
    if remote_node is not None:
3452
      remote_node = self.cfg.ExpandNodeName(remote_node)
3453
      if remote_node is None:
3454
        raise errors.OpPrereqError("Node '%s' not known" %
3455
                                   self.op.remote_node)
3456
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3457
    else:
3458
      self.remote_node_info = None
3459
    if remote_node == instance.primary_node:
3460
      raise errors.OpPrereqError("The specified node is the primary node of"
3461
                                 " the instance.")
3462
    elif remote_node == self.sec_node:
3463
      if self.op.mode == constants.REPLACE_DISK_SEC:
3464
        # this is for DRBD8, where we can't execute the same mode of
3465
        # replacement as for drbd7 (no different port allocated)
3466
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3467
                                   " replacement")
3468
    if instance.disk_template == constants.DT_DRBD8:
3469
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3470
          remote_node is not None):
3471
        # switch to replace secondary mode
3472
        self.op.mode = constants.REPLACE_DISK_SEC
3473

    
3474
      if self.op.mode == constants.REPLACE_DISK_ALL:
3475
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3476
                                   " secondary disk replacement, not"
3477
                                   " both at once")
3478
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3479
        if remote_node is not None:
3480
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3481
                                     " the secondary while doing a primary"
3482
                                     " node disk replacement")
3483
        self.tgt_node = instance.primary_node
3484
        self.oth_node = instance.secondary_nodes[0]
3485
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3486
        self.new_node = remote_node # this can be None, in which case
3487
                                    # we don't change the secondary
3488
        self.tgt_node = instance.secondary_nodes[0]
3489
        self.oth_node = instance.primary_node
3490
      else:
3491
        raise errors.ProgrammerError("Unhandled disk replace mode")
3492

    
3493
    for name in self.op.disks:
3494
      if instance.FindDisk(name) is None:
3495
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3496
                                   (name, instance.name))
3497
    self.op.remote_node = remote_node
3498

    
3499
  def _ExecD8DiskOnly(self, feedback_fn):
3500
    """Replace a disk on the primary or secondary for dbrd8.
3501

3502
    The algorithm for replace is quite complicated:
3503
      - for each disk to be replaced:
3504
        - create new LVs on the target node with unique names
3505
        - detach old LVs from the drbd device
3506
        - rename old LVs to name_replaced.<time_t>
3507
        - rename new LVs to old LVs
3508
        - attach the new LVs (with the old names now) to the drbd device
3509
      - wait for sync across all devices
3510
      - for each modified disk:
3511
        - remove old LVs (which have the name name_replaces.<time_t>)
3512

3513
    Failures are not very well handled.
3514

3515
    """
3516
    steps_total = 6
3517
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3518
    instance = self.instance
3519
    iv_names = {}
3520
    vgname = self.cfg.GetVGName()
3521
    # start of work
3522
    cfg = self.cfg
3523
    tgt_node = self.tgt_node
3524
    oth_node = self.oth_node
3525

    
3526
    # Step: check device activation
3527
    self.proc.LogStep(1, steps_total, "check device existence")
3528
    info("checking volume groups")
3529
    my_vg = cfg.GetVGName()
3530
    results = rpc.call_vg_list([oth_node, tgt_node])
3531
    if not results:
3532
      raise errors.OpExecError("Can't list volume groups on the nodes")
3533
    for node in oth_node, tgt_node:
3534
      res = results.get(node, False)
3535
      if not res or my_vg not in res:
3536
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3537
                                 (my_vg, node))
3538
    for dev in instance.disks:
3539
      if not dev.iv_name in self.op.disks:
3540
        continue
3541
      for node in tgt_node, oth_node:
3542
        info("checking %s on %s" % (dev.iv_name, node))
3543
        cfg.SetDiskID(dev, node)
3544
        if not rpc.call_blockdev_find(node, dev):
3545
          raise errors.OpExecError("Can't find device %s on node %s" %
3546
                                   (dev.iv_name, node))
3547

    
3548
    # Step: check other node consistency
3549
    self.proc.LogStep(2, steps_total, "check peer consistency")
3550
    for dev in instance.disks:
3551
      if not dev.iv_name in self.op.disks:
3552
        continue
3553
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3554
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3555
                                   oth_node==instance.primary_node):
3556
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3557
                                 " to replace disks on this node (%s)" %
3558
                                 (oth_node, tgt_node))
3559

    
3560
    # Step: create new storage
3561
    self.proc.LogStep(3, steps_total, "allocate new storage")
3562
    for dev in instance.disks:
3563
      if not dev.iv_name in self.op.disks:
3564
        continue
3565
      size = dev.size
3566
      cfg.SetDiskID(dev, tgt_node)
3567
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3568
      names = _GenerateUniqueNames(cfg, lv_names)
3569
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3570
                             logical_id=(vgname, names[0]))
3571
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3572
                             logical_id=(vgname, names[1]))
3573
      new_lvs = [lv_data, lv_meta]
3574
      old_lvs = dev.children
3575
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3576
      info("creating new local storage on %s for %s" %
3577
           (tgt_node, dev.iv_name))
3578
      # since we *always* want to create this LV, we use the
3579
      # _Create...OnPrimary (which forces the creation), even if we
3580
      # are talking about the secondary node
3581
      for new_lv in new_lvs:
3582
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3583
                                        _GetInstanceInfoText(instance)):
3584
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3585
                                   " node '%s'" %
3586
                                   (new_lv.logical_id[1], tgt_node))
3587

    
3588
    # Step: for each lv, detach+rename*2+attach
3589
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3590
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3591
      info("detaching %s drbd from local storage" % dev.iv_name)
3592
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3593
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3594
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3595
      #dev.children = []
3596
      #cfg.Update(instance)
3597

    
3598
      # ok, we created the new LVs, so now we know we have the needed
3599
      # storage; as such, we proceed on the target node to rename
3600
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3601
      # using the assumption that logical_id == physical_id (which in
3602
      # turn is the unique_id on that node)
3603

    
3604
      # FIXME(iustin): use a better name for the replaced LVs
3605
      temp_suffix = int(time.time())
3606
      ren_fn = lambda d, suff: (d.physical_id[0],
3607
                                d.physical_id[1] + "_replaced-%s" % suff)
3608
      # build the rename list based on what LVs exist on the node
3609
      rlist = []
3610
      for to_ren in old_lvs:
3611
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3612
        if find_res is not None: # device exists
3613
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3614

    
3615
      info("renaming the old LVs on the target node")
3616
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3617
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3618
      # now we rename the new LVs to the old LVs
3619
      info("renaming the new LVs on the target node")
3620
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3621
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3622
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3623

    
3624
      for old, new in zip(old_lvs, new_lvs):
3625
        new.logical_id = old.logical_id
3626
        cfg.SetDiskID(new, tgt_node)
3627

    
3628
      for disk in old_lvs:
3629
        disk.logical_id = ren_fn(disk, temp_suffix)
3630
        cfg.SetDiskID(disk, tgt_node)
3631

    
3632
      # now that the new lvs have the old name, we can add them to the device
3633
      info("adding new mirror component on %s" % tgt_node)
3634
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3635
        for new_lv in new_lvs:
3636
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3637
            warning("Can't rollback device %s", hint="manually cleanup unused"
3638
                    " logical volumes")
3639
        raise errors.OpExecError("Can't add local storage to drbd")
3640

    
3641
      dev.children = new_lvs
3642
      cfg.Update(instance)
3643

    
3644
    # Step: wait for sync
3645

    
3646
    # this can fail as the old devices are degraded and _WaitForSync
3647
    # does a combined result over all disks, so we don't check its
3648
    # return value
3649
    self.proc.LogStep(5, steps_total, "sync devices")
3650
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3651

    
3652
    # so check manually all the devices
3653
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3654
      cfg.SetDiskID(dev, instance.primary_node)
3655
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3656
      if is_degr:
3657
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3658

    
3659
    # Step: remove old storage
3660
    self.proc.LogStep(6, steps_total, "removing old storage")
3661
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3662
      info("remove logical volumes for %s" % name)
3663
      for lv in old_lvs:
3664
        cfg.SetDiskID(lv, tgt_node)
3665
        if not rpc.call_blockdev_remove(tgt_node, lv):
3666
          warning("Can't remove old LV", hint="manually remove unused LVs")
3667
          continue
3668

    
3669
  def _ExecD8Secondary(self, feedback_fn):
3670
    """Replace the secondary node for drbd8.
3671

3672
    The algorithm for replace is quite complicated:
3673
      - for all disks of the instance:
3674
        - create new LVs on the new node with same names
3675
        - shutdown the drbd device on the old secondary
3676
        - disconnect the drbd network on the primary
3677
        - create the drbd device on the new secondary
3678
        - network attach the drbd on the primary, using an artifice:
3679
          the drbd code for Attach() will connect to the network if it
3680
          finds a device which is connected to the good local disks but
3681
          not network enabled
3682
      - wait for sync across all devices
3683
      - remove all disks from the old secondary
3684

3685
    Failures are not very well handled.
3686

3687
    """
3688
    steps_total = 6
3689
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3690
    instance = self.instance
3691
    iv_names = {}
3692
    vgname = self.cfg.GetVGName()
3693
    # start of work
3694
    cfg = self.cfg
3695
    old_node = self.tgt_node
3696
    new_node = self.new_node
3697
    pri_node = instance.primary_node
3698

    
3699
    # Step: check device activation
3700
    self.proc.LogStep(1, steps_total, "check device existence")
3701
    info("checking volume groups")
3702
    my_vg = cfg.GetVGName()
3703
    results = rpc.call_vg_list([pri_node, new_node])
3704
    if not results:
3705
      raise errors.OpExecError("Can't list volume groups on the nodes")
3706
    for node in pri_node, new_node:
3707
      res = results.get(node, False)
3708
      if not res or my_vg not in res:
3709
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3710
                                 (my_vg, node))
3711
    for dev in instance.disks:
3712
      if not dev.iv_name in self.op.disks:
3713
        continue
3714
      info("checking %s on %s" % (dev.iv_name, pri_node))
3715
      cfg.SetDiskID(dev, pri_node)
3716
      if not rpc.call_blockdev_find(pri_node, dev):
3717
        raise errors.OpExecError("Can't find device %s on node %s" %
3718
                                 (dev.iv_name, pri_node))
3719

    
3720
    # Step: check other node consistency
3721
    self.proc.LogStep(2, steps_total, "check peer consistency")
3722
    for dev in instance.disks:
3723
      if not dev.iv_name in self.op.disks:
3724
        continue
3725
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3726
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3727
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3728
                                 " unsafe to replace the secondary" %
3729
                                 pri_node)
3730

    
3731
    # Step: create new storage
3732
    self.proc.LogStep(3, steps_total, "allocate new storage")
3733
    for dev in instance.disks:
3734
      size = dev.size
3735
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3736
      # since we *always* want to create this LV, we use the
3737
      # _Create...OnPrimary (which forces the creation), even if we
3738
      # are talking about the secondary node
3739
      for new_lv in dev.children:
3740
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3741
                                        _GetInstanceInfoText(instance)):
3742
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3743
                                   " node '%s'" %
3744
                                   (new_lv.logical_id[1], new_node))
3745

    
3746
      iv_names[dev.iv_name] = (dev, dev.children)
3747

    
3748
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3749
    for dev in instance.disks:
3750
      size = dev.size
3751
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3752
      # create new devices on new_node
3753
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3754
                              logical_id=(pri_node, new_node,
3755
                                          dev.logical_id[2]),
3756
                              children=dev.children)
3757
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3758
                                        new_drbd, False,
3759
                                      _GetInstanceInfoText(instance)):
3760
        raise errors.OpExecError("Failed to create new DRBD on"
3761
                                 " node '%s'" % new_node)
3762

    
3763
    for dev in instance.disks:
3764
      # we have new devices, shutdown the drbd on the old secondary
3765
      info("shutting down drbd for %s on old node" % dev.iv_name)
3766
      cfg.SetDiskID(dev, old_node)
3767
      if not rpc.call_blockdev_shutdown(old_node, dev):
3768
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3769
                hint="Please cleanup this device manually as soon as possible")
3770

    
3771
    info("detaching primary drbds from the network (=> standalone)")
3772
    done = 0
3773
    for dev in instance.disks:
3774
      cfg.SetDiskID(dev, pri_node)
3775
      # set the physical (unique in bdev terms) id to None, meaning
3776
      # detach from network
3777
      dev.physical_id = (None,) * len(dev.physical_id)
3778
      # and 'find' the device, which will 'fix' it to match the
3779
      # standalone state
3780
      if rpc.call_blockdev_find(pri_node, dev):
3781
        done += 1
3782
      else:
3783
        warning("Failed to detach drbd %s from network, unusual case" %
3784
                dev.iv_name)
3785

    
3786
    if not done:
3787
      # no detaches succeeded (very unlikely)
3788
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3789

    
3790
    # if we managed to detach at least one, we update all the disks of
3791
    # the instance to point to the new secondary
3792
    info("updating instance configuration")
3793
    for dev in instance.disks:
3794
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3795
      cfg.SetDiskID(dev, pri_node)
3796
    cfg.Update(instance)
3797

    
3798
    # and now perform the drbd attach
3799
    info("attaching primary drbds to new secondary (standalone => connected)")
3800
    failures = []
3801
    for dev in instance.disks:
3802
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3803
      # since the attach is smart, it's enough to 'find' the device,
3804
      # it will automatically activate the network, if the physical_id
3805
      # is correct
3806
      cfg.SetDiskID(dev, pri_node)
3807
      if not rpc.call_blockdev_find(pri_node, dev):
3808
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3809
                "please do a gnt-instance info to see the status of disks")
3810

    
3811
    # this can fail as the old devices are degraded and _WaitForSync
3812
    # does a combined result over all disks, so we don't check its
3813
    # return value
3814
    self.proc.LogStep(5, steps_total, "sync devices")
3815
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3816

    
3817
    # so check manually all the devices
3818
    for name, (dev, old_lvs) in iv_names.iteritems():
3819
      cfg.SetDiskID(dev, pri_node)
3820
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3821
      if is_degr:
3822
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3823

    
3824
    self.proc.LogStep(6, steps_total, "removing old storage")
3825
    for name, (dev, old_lvs) in iv_names.iteritems():
3826
      info("remove logical volumes for %s" % name)
3827
      for lv in old_lvs:
3828
        cfg.SetDiskID(lv, old_node)
3829
        if not rpc.call_blockdev_remove(old_node, lv):
3830
          warning("Can't remove LV on old secondary",
3831
                  hint="Cleanup stale volumes by hand")
3832

    
3833
  def Exec(self, feedback_fn):
3834
    """Execute disk replacement.
3835

3836
    This dispatches the disk replacement to the appropriate handler.
3837

3838
    """
3839
    instance = self.instance
3840

    
3841
    # Activate the instance disks if we're replacing them on a down instance
3842
    if instance.status == "down":
3843
      op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3844
      self.proc.ChainOpCode(op)
3845

    
3846
    if instance.disk_template == constants.DT_DRBD8:
3847
      if self.op.remote_node is None:
3848
        fn = self._ExecD8DiskOnly
3849
      else:
3850
        fn = self._ExecD8Secondary
3851
    else:
3852
      raise errors.ProgrammerError("Unhandled disk replacement case")
3853

    
3854
    ret = fn(feedback_fn)
3855

    
3856
    # Deactivate the instance disks if we're replacing them on a down instance
3857
    if instance.status == "down":
3858
      op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3859
      self.proc.ChainOpCode(op)
3860

    
3861
    return ret
3862

    
3863

    
3864
class LUGrowDisk(LogicalUnit):
3865
  """Grow a disk of an instance.
3866

3867
  """
3868
  HPATH = "disk-grow"
3869
  HTYPE = constants.HTYPE_INSTANCE
3870
  _OP_REQP = ["instance_name", "disk", "amount"]
3871

    
3872
  def BuildHooksEnv(self):
3873
    """Build hooks env.
3874

3875
    This runs on the master, the primary and all the secondaries.
3876

3877
    """
3878
    env = {
3879
      "DISK": self.op.disk,
3880
      "AMOUNT": self.op.amount,
3881
      }
3882
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3883
    nl = [
3884
      self.sstore.GetMasterNode(),
3885
      self.instance.primary_node,
3886
      ]
3887
    return env, nl, nl
3888

    
3889
  def CheckPrereq(self):
3890
    """Check prerequisites.
3891

3892
    This checks that the instance is in the cluster.
3893

3894
    """
3895
    instance = self.cfg.GetInstanceInfo(
3896
      self.cfg.ExpandInstanceName(self.op.instance_name))
3897
    if instance is None:
3898
      raise errors.OpPrereqError("Instance '%s' not known" %
3899
                                 self.op.instance_name)
3900
    self.instance = instance
3901
    self.op.instance_name = instance.name
3902

    
3903
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3904
      raise errors.OpPrereqError("Instance's disk layout does not support"
3905
                                 " growing.")
3906

    
3907
    if instance.FindDisk(self.op.disk) is None:
3908
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3909
                                 (self.op.disk, instance.name))
3910

    
3911
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3912
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3913
    for node in nodenames:
3914
      info = nodeinfo.get(node, None)
3915
      if not info:
3916
        raise errors.OpPrereqError("Cannot get current information"
3917
                                   " from node '%s'" % node)
3918
      vg_free = info.get('vg_free', None)
3919
      if not isinstance(vg_free, int):
3920
        raise errors.OpPrereqError("Can't compute free disk space on"
3921
                                   " node %s" % node)
3922
      if self.op.amount > info['vg_free']:
3923
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
3924
                                   " %d MiB available, %d MiB required" %
3925
                                   (node, info['vg_free'], self.op.amount))
3926

    
3927
  def Exec(self, feedback_fn):
3928
    """Execute disk grow.
3929

3930
    """
3931
    instance = self.instance
3932
    disk = instance.FindDisk(self.op.disk)
3933
    for node in (instance.secondary_nodes + (instance.primary_node,)):
3934
      self.cfg.SetDiskID(disk, node)
3935
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3936
      if not result or not isinstance(result, tuple) or len(result) != 2:
3937
        raise errors.OpExecError("grow request failed to node %s" % node)
3938
      elif not result[0]:
3939
        raise errors.OpExecError("grow request failed to node %s: %s" %
3940
                                 (node, result[1]))
3941
    disk.RecordGrow(self.op.amount)
3942
    self.cfg.Update(instance)
3943
    return
3944

    
3945

    
3946
class LUQueryInstanceData(NoHooksLU):
3947
  """Query runtime instance data.
3948

3949
  """
3950
  _OP_REQP = ["instances"]
3951

    
3952
  def CheckPrereq(self):
3953
    """Check prerequisites.
3954

3955
    This only checks the optional instance list against the existing names.
3956

3957
    """
3958
    if not isinstance(self.op.instances, list):
3959
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3960
    if self.op.instances:
3961
      self.wanted_instances = []
3962
      names = self.op.instances
3963
      for name in names:
3964
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3965
        if instance is None:
3966
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3967
        self.wanted_instances.append(instance)
3968
    else:
3969
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3970
                               in self.cfg.GetInstanceList()]
3971
    return
3972

    
3973

    
3974
  def _ComputeDiskStatus(self, instance, snode, dev):
3975
    """Compute block device status.
3976

3977
    """