Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 1a5c7281

History | View | Annotate | Download (171.2 kB)

<
1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46
from ganeti import serializer
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq
55
    - implement Exec
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_MASTER: the LU needs to run on the master node
60
        REQ_WSSTORE: the LU needs a writable SimpleStore
61
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
62

63
  Note that all commands require root permissions.
64

65
  """
66
  HPATH = None
67
  HTYPE = None
68
  _OP_REQP = []
69
  REQ_MASTER = True
70
  REQ_WSSTORE = False
71
  REQ_BGL = True
72

    
73
  def __init__(self, processor, op, context, sstore):
74
    """Constructor for LogicalUnit.
75

76
    This needs to be overriden in derived classes in order to check op
77
    validity.
78

79
    """
80
    self.proc = processor
81
    self.op = op
82
    self.cfg = context.cfg
83
    self.sstore = sstore
84
    self.context = context
85
    self.needed_locks = None
86
    self.__ssh = None
87

    
88
    for attr_name in self._OP_REQP:
89
      attr_val = getattr(op, attr_name, None)
90
      if attr_val is None:
91
        raise errors.OpPrereqError("Required parameter '%s' missing" %
92
                                   attr_name)
93

    
94
    if not self.cfg.IsCluster():
95
      raise errors.OpPrereqError("Cluster not initialized yet,"
96
                                 " use 'gnt-cluster init' first.")
97
    if self.REQ_MASTER:
98
      master = sstore.GetMasterNode()
99
      if master != utils.HostInfo().name:
100
        raise errors.OpPrereqError("Commands must be run on the master"
101
                                   " node %s" % master)
102

    
103
  def __GetSSH(self):
104
    """Returns the SshRunner object
105

106
    """
107
    if not self.__ssh:
108
      self.__ssh = ssh.SshRunner(self.sstore)
109
    return self.__ssh
110

    
111
  ssh = property(fget=__GetSSH)
112

    
113
  def ExpandNames(self):
114
    """Expand names for this LU.
115

116
    This method is called before starting to execute the opcode, and it should
117
    update all the parameters of the opcode to their canonical form (e.g. a
118
    short node name must be fully expanded after this method has successfully
119
    completed). This way locking, hooks, logging, ecc. can work correctly.
120

121
    LUs which implement this method must also populate the self.needed_locks
122
    member, as a dict with lock levels as keys, and a list of needed lock names
123
    as values. Rules:
124
      - Use an empty dict if you don't need any lock
125
      - If you don't need any lock at a particular level omit that level
126
      - Don't put anything for the BGL level
127
      - If you want all locks at a level use None as a value
128
        (this reflects what LockSet does, and will be replaced before
129
        CheckPrereq with the full list of nodes that have been locked)
130

131
    Examples:
132
    # Acquire all nodes and one instance
133
    self.needed_locks = {
134
      locking.LEVEL_NODE: None,
135
      locking.LEVEL_INSTANCES: ['instance1.example.tld'],
136
    }
137
    # Acquire just two nodes
138
    self.needed_locks = {
139
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
140
    }
141
    # Acquire no locks
142
    self.needed_locks = {} # No, you can't leave it to the default value None
143

144
    """
145
    # The implementation of this method is mandatory only if the new LU is
146
    # concurrent, so that old LUs don't need to be changed all at the same
147
    # time.
148
    if self.REQ_BGL:
149
      self.needed_locks = {} # Exclusive LUs don't need locks.
150
    else:
151
      raise NotImplementedError
152

    
153
  def CheckPrereq(self):
154
    """Check prerequisites for this LU.
155

156
    This method should check that the prerequisites for the execution
157
    of this LU are fulfilled. It can do internode communication, but
158
    it should be idempotent - no cluster or system changes are
159
    allowed.
160

161
    The method should raise errors.OpPrereqError in case something is
162
    not fulfilled. Its return value is ignored.
163

164
    This method should also update all the parameters of the opcode to
165
    their canonical form if it hasn't been done by ExpandNames before.
166

167
    """
168
    raise NotImplementedError
169

    
170
  def Exec(self, feedback_fn):
171
    """Execute the LU.
172

173
    This method should implement the actual work. It should raise
174
    errors.OpExecError for failures that are somewhat dealt with in
175
    code, or expected.
176

177
    """
178
    raise NotImplementedError
179

    
180
  def BuildHooksEnv(self):
181
    """Build hooks environment for this LU.
182

183
    This method should return a three-node tuple consisting of: a dict
184
    containing the environment that will be used for running the
185
    specific hook for this LU, a list of node names on which the hook
186
    should run before the execution, and a list of node names on which
187
    the hook should run after the execution.
188

189
    The keys of the dict must not have 'GANETI_' prefixed as this will
190
    be handled in the hooks runner. Also note additional keys will be
191
    added by the hooks runner. If the LU doesn't define any
192
    environment, an empty dict (and not None) should be returned.
193

194
    No nodes should be returned as an empty list (and not None).
195

196
    Note that if the HPATH for a LU class is None, this function will
197
    not be called.
198

199
    """
200
    raise NotImplementedError
201

    
202
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
203
    """Notify the LU about the results of its hooks.
204

205
    This method is called every time a hooks phase is executed, and notifies
206
    the Logical Unit about the hooks' result. The LU can then use it to alter
207
    its result based on the hooks.  By default the method does nothing and the
208
    previous result is passed back unchanged but any LU can define it if it
209
    wants to use the local cluster hook-scripts somehow.
210

211
    Args:
212
      phase: the hooks phase that has just been run
213
      hooks_results: the results of the multi-node hooks rpc call
214
      feedback_fn: function to send feedback back to the caller
215
      lu_result: the previous result this LU had, or None in the PRE phase.
216

217
    """
218
    return lu_result
219

    
220
  def _ExpandAndLockInstance(self):
221
    """Helper function to expand and lock an instance.
222

223
    Many LUs that work on an instance take its name in self.op.instance_name
224
    and need to expand it and then declare the expanded name for locking. This
225
    function does it, and then updates self.op.instance_name to the expanded
226
    name. It also initializes needed_locks as a dict, if this hasn't been done
227
    before.
228

229
    """
230
    if self.needed_locks is None:
231
      self.needed_locks = {}
232
    else:
233
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
234
        "_ExpandAndLockInstance called with instance-level locks set"
235
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
236
    if expanded_name is None:
237
      raise errors.OpPrereqError("Instance '%s' not known" %
238
                                  self.op.instance_name)
239
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
240
    self.op.instance_name = expanded_name
241

    
242

    
243
class NoHooksLU(LogicalUnit):
244
  """Simple LU which runs no hooks.
245

246
  This LU is intended as a parent for other LogicalUnits which will
247
  run no hooks, in order to reduce duplicate code.
248

249
  """
250
  HPATH = None
251
  HTYPE = None
252

    
253

    
254
def _GetWantedNodes(lu, nodes):
255
  """Returns list of checked and expanded node names.
256

257
  Args:
258
    nodes: List of nodes (strings) or None for all
259

260
  """
261
  if not isinstance(nodes, list):
262
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
263

    
264
  if nodes:
265
    wanted = []
266

    
267
    for name in nodes:
268
      node = lu.cfg.ExpandNodeName(name)
269
      if node is None:
270
        raise errors.OpPrereqError("No such node name '%s'" % name)
271
      wanted.append(node)
272

    
273
  else:
274
    wanted = lu.cfg.GetNodeList()
275
  return utils.NiceSort(wanted)
276

    
277

    
278
def _GetWantedInstances(lu, instances):
279
  """Returns list of checked and expanded instance names.
280

281
  Args:
282
    instances: List of instances (strings) or None for all
283

284
  """
285
  if not isinstance(instances, list):
286
    raise errors.OpPrereqError("Invalid argument type 'instances'")
287

    
288
  if instances:
289
    wanted = []
290

    
291
    for name in instances:
292
      instance = lu.cfg.ExpandInstanceName(name)
293
      if instance is None:
294
        raise errors.OpPrereqError("No such instance name '%s'" % name)
295
      wanted.append(instance)
296

    
297
  else:
298
    wanted = lu.cfg.GetInstanceList()
299
  return utils.NiceSort(wanted)
300

    
301

    
302
def _CheckOutputFields(static, dynamic, selected):
303
  """Checks whether all selected fields are valid.
304

305
  Args:
306
    static: Static fields
307
    dynamic: Dynamic fields
308

309
  """
310
  static_fields = frozenset(static)
311
  dynamic_fields = frozenset(dynamic)
312

    
313
  all_fields = static_fields | dynamic_fields
314

    
315
  if not all_fields.issuperset(selected):
316
    raise errors.OpPrereqError("Unknown output fields selected: %s"
317
                               % ",".join(frozenset(selected).
318
                                          difference(all_fields)))
319

    
320

    
321
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
322
                          memory, vcpus, nics):
323
  """Builds instance related env variables for hooks from single variables.
324

325
  Args:
326
    secondary_nodes: List of secondary nodes as strings
327
  """
328
  env = {
329
    "OP_TARGET": name,
330
    "INSTANCE_NAME": name,
331
    "INSTANCE_PRIMARY": primary_node,
332
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
333
    "INSTANCE_OS_TYPE": os_type,
334
    "INSTANCE_STATUS": status,
335
    "INSTANCE_MEMORY": memory,
336
    "INSTANCE_VCPUS": vcpus,
337
  }
338

    
339
  if nics:
340
    nic_count = len(nics)
341
    for idx, (ip, bridge, mac) in enumerate(nics):
342
      if ip is None:
343
        ip = ""
344
      env["INSTANCE_NIC%d_IP" % idx] = ip
345
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
346
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
347
  else:
348
    nic_count = 0
349

    
350
  env["INSTANCE_NIC_COUNT"] = nic_count
351

    
352
  return env
353

    
354

    
355
def _BuildInstanceHookEnvByObject(instance, override=None):
356
  """Builds instance related env variables for hooks from an object.
357

358
  Args:
359
    instance: objects.Instance object of instance
360
    override: dict of values to override
361
  """
362
  args = {
363
    'name': instance.name,
364
    'primary_node': instance.primary_node,
365
    'secondary_nodes': instance.secondary_nodes,
366
    'os_type': instance.os,
367
    'status': instance.os,
368
    'memory': instance.memory,
369
    'vcpus': instance.vcpus,
370
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
371
  }
372
  if override:
373
    args.update(override)
374
  return _BuildInstanceHookEnv(**args)
375

    
376

    
377
def _CheckInstanceBridgesExist(instance):
378
  """Check that the brigdes needed by an instance exist.
379

380
  """
381
  # check bridges existance
382
  brlist = [nic.bridge for nic in instance.nics]
383
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
384
    raise errors.OpPrereqError("one or more target bridges %s does not"
385
                               " exist on destination node '%s'" %
386
                               (brlist, instance.primary_node))
387

    
388

    
389
class LUDestroyCluster(NoHooksLU):
390
  """Logical unit for destroying the cluster.
391

392
  """
393
  _OP_REQP = []
394

    
395
  def CheckPrereq(self):
396
    """Check prerequisites.
397

398
    This checks whether the cluster is empty.
399

400
    Any errors are signalled by raising errors.OpPrereqError.
401

402
    """
403
    master = self.sstore.GetMasterNode()
404

    
405
    nodelist = self.cfg.GetNodeList()
406
    if len(nodelist) != 1 or nodelist[0] != master:
407
      raise errors.OpPrereqError("There are still %d node(s) in"
408
                                 " this cluster." % (len(nodelist) - 1))
409
    instancelist = self.cfg.GetInstanceList()
410
    if instancelist:
411
      raise errors.OpPrereqError("There are still %d instance(s) in"
412
                                 " this cluster." % len(instancelist))
413

    
414
  def Exec(self, feedback_fn):
415
    """Destroys the cluster.
416

417
    """
418
    master = self.sstore.GetMasterNode()
419
    if not rpc.call_node_stop_master(master):
420
      raise errors.OpExecError("Could not disable the master role")
421
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
422
    utils.CreateBackup(priv_key)
423
    utils.CreateBackup(pub_key)
424
    rpc.call_node_leave_cluster(master)
425

    
426

    
427
class LUVerifyCluster(LogicalUnit):
428
  """Verifies the cluster status.
429

430
  """
431
  HPATH = "cluster-verify"
432
  HTYPE = constants.HTYPE_CLUSTER
433
  _OP_REQP = ["skip_checks"]
434

    
435
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
436
                  remote_version, feedback_fn):
437
    """Run multiple tests against a node.
438

439
    Test list:
440
      - compares ganeti version
441
      - checks vg existance and size > 20G
442
      - checks config file checksum
443
      - checks ssh to other nodes
444

445
    Args:
446
      node: name of the node to check
447
      file_list: required list of files
448
      local_cksum: dictionary of local files and their checksums
449

450
    """
451
    # compares ganeti version
452
    local_version = constants.PROTOCOL_VERSION
453
    if not remote_version:
454
      feedback_fn("  - ERROR: connection to %s failed" % (node))
455
      return True
456

    
457
    if local_version != remote_version:
458
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
459
                      (local_version, node, remote_version))
460
      return True
461

    
462
    # checks vg existance and size > 20G
463

    
464
    bad = False
465
    if not vglist:
466
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
467
                      (node,))
468
      bad = True
469
    else:
470
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
471
                                            constants.MIN_VG_SIZE)
472
      if vgstatus:
473
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
474
        bad = True
475

    
476
    # checks config file checksum
477
    # checks ssh to any
478

    
479
    if 'filelist' not in node_result:
480
      bad = True
481
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
482
    else:
483
      remote_cksum = node_result['filelist']
484
      for file_name in file_list:
485
        if file_name not in remote_cksum:
486
          bad = True
487
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
488
        elif remote_cksum[file_name] != local_cksum[file_name]:
489
          bad = True
490
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
491

    
492
    if 'nodelist' not in node_result:
493
      bad = True
494
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
495
    else:
496
      if node_result['nodelist']:
497
        bad = True
498
        for node in node_result['nodelist']:
499
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
500
                          (node, node_result['nodelist'][node]))
501
    if 'node-net-test' not in node_result:
502
      bad = True
503
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
504
    else:
505
      if node_result['node-net-test']:
506
        bad = True
507
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
508
        for node in nlist:
509
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
510
                          (node, node_result['node-net-test'][node]))
511

    
512
    hyp_result = node_result.get('hypervisor', None)
513
    if hyp_result is not None:
514
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
515
    return bad
516

    
517
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
518
                      node_instance, feedback_fn):
519
    """Verify an instance.
520

521
    This function checks to see if the required block devices are
522
    available on the instance's node.
523

524
    """
525
    bad = False
526

    
527
    node_current = instanceconfig.primary_node
528

    
529
    node_vol_should = {}
530
    instanceconfig.MapLVsByNode(node_vol_should)
531

    
532
    for node in node_vol_should:
533
      for volume in node_vol_should[node]:
534
        if node not in node_vol_is or volume not in node_vol_is[node]:
535
          feedback_fn("  - ERROR: volume %s missing on node %s" %
536
                          (volume, node))
537
          bad = True
538

    
539
    if not instanceconfig.status == 'down':
540
      if (node_current not in node_instance or
541
          not instance in node_instance[node_current]):
542
        feedback_fn("  - ERROR: instance %s not running on node %s" %
543
                        (instance, node_current))
544
        bad = True
545

    
546
    for node in node_instance:
547
      if (not node == node_current):
548
        if instance in node_instance[node]:
549
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
550
                          (instance, node))
551
          bad = True
552

    
553
    return bad
554

    
555
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
556
    """Verify if there are any unknown volumes in the cluster.
557

558
    The .os, .swap and backup volumes are ignored. All other volumes are
559
    reported as unknown.
560

561
    """
562
    bad = False
563

    
564
    for node in node_vol_is:
565
      for volume in node_vol_is[node]:
566
        if node not in node_vol_should or volume not in node_vol_should[node]:
567
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
568
                      (volume, node))
569
          bad = True
570
    return bad
571

    
572
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
573
    """Verify the list of running instances.
574

575
    This checks what instances are running but unknown to the cluster.
576

577
    """
578
    bad = False
579
    for node in node_instance:
580
      for runninginstance in node_instance[node]:
581
        if runninginstance not in instancelist:
582
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
583
                          (runninginstance, node))
584
          bad = True
585
    return bad
586

    
587
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
588
    """Verify N+1 Memory Resilience.
589

590
    Check that if one single node dies we can still start all the instances it
591
    was primary for.
592

593
    """
594
    bad = False
595

    
596
    for node, nodeinfo in node_info.iteritems():
597
      # This code checks that every node which is now listed as secondary has
598
      # enough memory to host all instances it is supposed to should a single
599
      # other node in the cluster fail.
600
      # FIXME: not ready for failover to an arbitrary node
601
      # FIXME: does not support file-backed instances
602
      # WARNING: we currently take into account down instances as well as up
603
      # ones, considering that even if they're down someone might want to start
604
      # them even in the event of a node failure.
605
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
606
        needed_mem = 0
607
        for instance in instances:
608
          needed_mem += instance_cfg[instance].memory
609
        if nodeinfo['mfree'] < needed_mem:
610
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
611
                      " failovers should node %s fail" % (node, prinode))
612
          bad = True
613
    return bad
614

    
615
  def CheckPrereq(self):
616
    """Check prerequisites.
617

618
    Transform the list of checks we're going to skip into a set and check that
619
    all its members are valid.
620

621
    """
622
    self.skip_set = frozenset(self.op.skip_checks)
623
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
624
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
625

    
626
  def BuildHooksEnv(self):
627
    """Build hooks env.
628

629
    Cluster-Verify hooks just rone in the post phase and their failure makes
630
    the output be logged in the verify output and the verification to fail.
631

632
    """
633
    all_nodes = self.cfg.GetNodeList()
634
    # TODO: populate the environment with useful information for verify hooks
635
    env = {}
636
    return env, [], all_nodes
637

    
638
  def Exec(self, feedback_fn):
639
    """Verify integrity of cluster, performing various test on nodes.
640

641
    """
642
    bad = False
643
    feedback_fn("* Verifying global settings")
644
    for msg in self.cfg.VerifyConfig():
645
      feedback_fn("  - ERROR: %s" % msg)
646

    
647
    vg_name = self.cfg.GetVGName()
648
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
649
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
650
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
651
    i_non_redundant = [] # Non redundant instances
652
    node_volume = {}
653
    node_instance = {}
654
    node_info = {}
655
    instance_cfg = {}
656

    
657
    # FIXME: verify OS list
658
    # do local checksums
659
    file_names = list(self.sstore.GetFileList())
660
    file_names.append(constants.SSL_CERT_FILE)
661
    file_names.append(constants.CLUSTER_CONF_FILE)
662
    local_checksums = utils.FingerprintFiles(file_names)
663

    
664
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
665
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
666
    all_instanceinfo = rpc.call_instance_list(nodelist)
667
    all_vglist = rpc.call_vg_list(nodelist)
668
    node_verify_param = {
669
      'filelist': file_names,
670
      'nodelist': nodelist,
671
      'hypervisor': None,
672
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
673
                        for node in nodeinfo]
674
      }
675
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
676
    all_rversion = rpc.call_version(nodelist)
677
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
678

    
679
    for node in nodelist:
680
      feedback_fn("* Verifying node %s" % node)
681
      result = self._VerifyNode(node, file_names, local_checksums,
682
                                all_vglist[node], all_nvinfo[node],
683
                                all_rversion[node], feedback_fn)
684
      bad = bad or result
685

    
686
      # node_volume
687
      volumeinfo = all_volumeinfo[node]
688

    
689
      if isinstance(volumeinfo, basestring):
690
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
691
                    (node, volumeinfo[-400:].encode('string_escape')))
692
        bad = True
693
        node_volume[node] = {}
694
      elif not isinstance(volumeinfo, dict):
695
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
696
        bad = True
697
        continue
698
      else:
699
        node_volume[node] = volumeinfo
700

    
701
      # node_instance
702
      nodeinstance = all_instanceinfo[node]
703
      if type(nodeinstance) != list:
704
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
705
        bad = True
706
        continue
707

    
708
      node_instance[node] = nodeinstance
709

    
710
      # node_info
711
      nodeinfo = all_ninfo[node]
712
      if not isinstance(nodeinfo, dict):
713
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
714
        bad = True
715
        continue
716

    
717
      try:
718
        node_info[node] = {
719
          "mfree": int(nodeinfo['memory_free']),
720
          "dfree": int(nodeinfo['vg_free']),
721
          "pinst": [],
722
          "sinst": [],
723
          # dictionary holding all instances this node is secondary for,
724
          # grouped by their primary node. Each key is a cluster node, and each
725
          # value is a list of instances which have the key as primary and the
726
          # current node as secondary.  this is handy to calculate N+1 memory
727
          # availability if you can only failover from a primary to its
728
          # secondary.
729
          "sinst-by-pnode": {},
730
        }
731
      except ValueError:
732
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
733
        bad = True
734
        continue
735

    
736
    node_vol_should = {}
737

    
738
    for instance in instancelist:
739
      feedback_fn("* Verifying instance %s" % instance)
740
      inst_config = self.cfg.GetInstanceInfo(instance)
741
      result =  self._VerifyInstance(instance, inst_config, node_volume,
742
                                     node_instance, feedback_fn)
743
      bad = bad or result
744

    
745
      inst_config.MapLVsByNode(node_vol_should)
746

    
747
      instance_cfg[instance] = inst_config
748

    
749
      pnode = inst_config.primary_node
750
      if pnode in node_info:
751
        node_info[pnode]['pinst'].append(instance)
752
      else:
753
        feedback_fn("  - ERROR: instance %s, connection to primary node"
754
                    " %s failed" % (instance, pnode))
755
        bad = True
756

    
757
      # If the instance is non-redundant we cannot survive losing its primary
758
      # node, so we are not N+1 compliant. On the other hand we have no disk
759
      # templates with more than one secondary so that situation is not well
760
      # supported either.
761
      # FIXME: does not support file-backed instances
762
      if len(inst_config.secondary_nodes) == 0:
763
        i_non_redundant.append(instance)
764
      elif len(inst_config.secondary_nodes) > 1:
765
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
766
                    % instance)
767

    
768
      for snode in inst_config.secondary_nodes:
769
        if snode in node_info:
770
          node_info[snode]['sinst'].append(instance)
771
          if pnode not in node_info[snode]['sinst-by-pnode']:
772
            node_info[snode]['sinst-by-pnode'][pnode] = []
773
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
774
        else:
775
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
776
                      " %s failed" % (instance, snode))
777

    
778
    feedback_fn("* Verifying orphan volumes")
779
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
780
                                       feedback_fn)
781
    bad = bad or result
782

    
783
    feedback_fn("* Verifying remaining instances")
784
    result = self._VerifyOrphanInstances(instancelist, node_instance,
785
                                         feedback_fn)
786
    bad = bad or result
787

    
788
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
789
      feedback_fn("* Verifying N+1 Memory redundancy")
790
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
791
      bad = bad or result
792

    
793
    feedback_fn("* Other Notes")
794
    if i_non_redundant:
795
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
796
                  % len(i_non_redundant))
797

    
798
    return int(bad)
799

    
800
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
801
    """Analize the post-hooks' result, handle it, and send some
802
    nicely-formatted feedback back to the user.
803

804
    Args:
805
      phase: the hooks phase that has just been run
806
      hooks_results: the results of the multi-node hooks rpc call
807
      feedback_fn: function to send feedback back to the caller
808
      lu_result: previous Exec result
809

810
    """
811
    # We only really run POST phase hooks, and are only interested in their results
812
    if phase == constants.HOOKS_PHASE_POST:
813
      # Used to change hooks' output to proper indentation
814
      indent_re = re.compile('^', re.M)
815
      feedback_fn("* Hooks Results")
816
      if not hooks_results:
817
        feedback_fn("  - ERROR: general communication failure")
818
        lu_result = 1
819
      else:
820
        for node_name in hooks_results:
821
          show_node_header = True
822
          res = hooks_results[node_name]
823
          if res is False or not isinstance(res, list):
824
            feedback_fn("    Communication failure")
825
            lu_result = 1
826
            continue
827
          for script, hkr, output in res:
828
            if hkr == constants.HKR_FAIL:
829
              # The node header is only shown once, if there are
830
              # failing hooks on that node
831
              if show_node_header:
832
                feedback_fn("  Node %s:" % node_name)
833
                show_node_header = False
834
              feedback_fn("    ERROR: Script %s failed, output:" % script)
835
              output = indent_re.sub('      ', output)
836
              feedback_fn("%s" % output)
837
              lu_result = 1
838

    
839
      return lu_result
840

    
841

    
842
class LUVerifyDisks(NoHooksLU):
843
  """Verifies the cluster disks status.
844

845
  """
846
  _OP_REQP = []
847

    
848
  def CheckPrereq(self):
849
    """Check prerequisites.
850

851
    This has no prerequisites.
852

853
    """
854
    pass
855

    
856
  def Exec(self, feedback_fn):
857
    """Verify integrity of cluster disks.
858

859
    """
860
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
861

    
862
    vg_name = self.cfg.GetVGName()
863
    nodes = utils.NiceSort(self.cfg.GetNodeList())
864
    instances = [self.cfg.GetInstanceInfo(name)
865
                 for name in self.cfg.GetInstanceList()]
866

    
867
    nv_dict = {}
868
    for inst in instances:
869
      inst_lvs = {}
870
      if (inst.status != "up" or
871
          inst.disk_template not in constants.DTS_NET_MIRROR):
872
        continue
873
      inst.MapLVsByNode(inst_lvs)
874
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
875
      for node, vol_list in inst_lvs.iteritems():
876
        for vol in vol_list:
877
          nv_dict[(node, vol)] = inst
878

    
879
    if not nv_dict:
880
      return result
881

    
882
    node_lvs = rpc.call_volume_list(nodes, vg_name)
883

    
884
    to_act = set()
885
    for node in nodes:
886
      # node_volume
887
      lvs = node_lvs[node]
888

    
889
      if isinstance(lvs, basestring):
890
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
891
        res_nlvm[node] = lvs
892
      elif not isinstance(lvs, dict):
893
        logger.Info("connection to node %s failed or invalid data returned" %
894
                    (node,))
895
        res_nodes.append(node)
896
        continue
897

    
898
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
899
        inst = nv_dict.pop((node, lv_name), None)
900
        if (not lv_online and inst is not None
901
            and inst.name not in res_instances):
902
          res_instances.append(inst.name)
903

    
904
    # any leftover items in nv_dict are missing LVs, let's arrange the
905
    # data better
906
    for key, inst in nv_dict.iteritems():
907
      if inst.name not in res_missing:
908
        res_missing[inst.name] = []
909
      res_missing[inst.name].append(key)
910

    
911
    return result
912

    
913

    
914
class LURenameCluster(LogicalUnit):
915
  """Rename the cluster.
916

917
  """
918
  HPATH = "cluster-rename"
919
  HTYPE = constants.HTYPE_CLUSTER
920
  _OP_REQP = ["name"]
921
  REQ_WSSTORE = True
922

    
923
  def BuildHooksEnv(self):
924
    """Build hooks env.
925

926
    """
927
    env = {
928
      "OP_TARGET": self.sstore.GetClusterName(),
929
      "NEW_NAME": self.op.name,
930
      }
931
    mn = self.sstore.GetMasterNode()
932
    return env, [mn], [mn]
933

    
934
  def CheckPrereq(self):
935
    """Verify that the passed name is a valid one.
936

937
    """
938
    hostname = utils.HostInfo(self.op.name)
939

    
940
    new_name = hostname.name
941
    self.ip = new_ip = hostname.ip
942
    old_name = self.sstore.GetClusterName()
943
    old_ip = self.sstore.GetMasterIP()
944
    if new_name == old_name and new_ip == old_ip:
945
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
946
                                 " cluster has changed")
947
    if new_ip != old_ip:
948
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
949
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
950
                                   " reachable on the network. Aborting." %
951
                                   new_ip)
952

    
953
    self.op.name = new_name
954

    
955
  def Exec(self, feedback_fn):
956
    """Rename the cluster.
957

958
    """
959
    clustername = self.op.name
960
    ip = self.ip
961
    ss = self.sstore
962

    
963
    # shutdown the master IP
964
    master = ss.GetMasterNode()
965
    if not rpc.call_node_stop_master(master):
966
      raise errors.OpExecError("Could not disable the master role")
967

    
968
    try:
969
      # modify the sstore
970
      ss.SetKey(ss.SS_MASTER_IP, ip)
971
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
972

    
973
      # Distribute updated ss config to all nodes
974
      myself = self.cfg.GetNodeInfo(master)
975
      dist_nodes = self.cfg.GetNodeList()
976
      if myself.name in dist_nodes:
977
        dist_nodes.remove(myself.name)
978

    
979
      logger.Debug("Copying updated ssconf data to all nodes")
980
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
981
        fname = ss.KeyToFilename(keyname)
982
        result = rpc.call_upload_file(dist_nodes, fname)
983
        for to_node in dist_nodes:
984
          if not result[to_node]:
985
            logger.Error("copy of file %s to node %s failed" %
986
                         (fname, to_node))
987
    finally:
988
      if not rpc.call_node_start_master(master):
989
        logger.Error("Could not re-enable the master role on the master,"
990
                     " please restart manually.")
991

    
992

    
993
def _RecursiveCheckIfLVMBased(disk):
994
  """Check if the given disk or its children are lvm-based.
995

996
  Args:
997
    disk: ganeti.objects.Disk object
998

999
  Returns:
1000
    boolean indicating whether a LD_LV dev_type was found or not
1001

1002
  """
1003
  if disk.children:
1004
    for chdisk in disk.children:
1005
      if _RecursiveCheckIfLVMBased(chdisk):
1006
        return True
1007
  return disk.dev_type == constants.LD_LV
1008

    
1009

    
1010
class LUSetClusterParams(LogicalUnit):
1011
  """Change the parameters of the cluster.
1012

1013
  """
1014
  HPATH = "cluster-modify"
1015
  HTYPE = constants.HTYPE_CLUSTER
1016
  _OP_REQP = []
1017

    
1018
  def BuildHooksEnv(self):
1019
    """Build hooks env.
1020

1021
    """
1022
    env = {
1023
      "OP_TARGET": self.sstore.GetClusterName(),
1024
      "NEW_VG_NAME": self.op.vg_name,
1025
      }
1026
    mn = self.sstore.GetMasterNode()
1027
    return env, [mn], [mn]
1028

    
1029
  def CheckPrereq(self):
1030
    """Check prerequisites.
1031

1032
    This checks whether the given params don't conflict and
1033
    if the given volume group is valid.
1034

1035
    """
1036
    if not self.op.vg_name:
1037
      instances = [self.cfg.GetInstanceInfo(name)
1038
                   for name in self.cfg.GetInstanceList()]
1039
      for inst in instances:
1040
        for disk in inst.disks:
1041
          if _RecursiveCheckIfLVMBased(disk):
1042
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1043
                                       " lvm-based instances exist")
1044

    
1045
    # if vg_name not None, checks given volume group on all nodes
1046
    if self.op.vg_name:
1047
      node_list = self.cfg.GetNodeList()
1048
      vglist = rpc.call_vg_list(node_list)
1049
      for node in node_list:
1050
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1051
                                              constants.MIN_VG_SIZE)
1052
        if vgstatus:
1053
          raise errors.OpPrereqError("Error on node '%s': %s" %
1054
                                     (node, vgstatus))
1055

    
1056
  def Exec(self, feedback_fn):
1057
    """Change the parameters of the cluster.
1058

1059
    """
1060
    if self.op.vg_name != self.cfg.GetVGName():
1061
      self.cfg.SetVGName(self.op.vg_name)
1062
    else:
1063
      feedback_fn("Cluster LVM configuration already in desired"
1064
                  " state, not changing")
1065

    
1066

    
1067
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1068
  """Sleep and poll for an instance's disk to sync.
1069

1070
  """
1071
  if not instance.disks:
1072
    return True
1073

    
1074
  if not oneshot:
1075
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1076

    
1077
  node = instance.primary_node
1078

    
1079
  for dev in instance.disks:
1080
    cfgw.SetDiskID(dev, node)
1081

    
1082
  retries = 0
1083
  while True:
1084
    max_time = 0
1085
    done = True
1086
    cumul_degraded = False
1087
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1088
    if not rstats:
1089
      proc.LogWarning("Can't get any data from node %s" % node)
1090
      retries += 1
1091
      if retries >= 10:
1092
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1093
                                 " aborting." % node)
1094
      time.sleep(6)
1095
      continue
1096
    retries = 0
1097
    for i in range(len(rstats)):
1098
      mstat = rstats[i]
1099
      if mstat is None:
1100
        proc.LogWarning("Can't compute data for node %s/%s" %
1101
                        (node, instance.disks[i].iv_name))
1102
        continue
1103
      # we ignore the ldisk parameter
1104
      perc_done, est_time, is_degraded, _ = mstat
1105
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1106
      if perc_done is not None:
1107
        done = False
1108
        if est_time is not None:
1109
          rem_time = "%d estimated seconds remaining" % est_time
1110
          max_time = est_time
1111
        else:
1112
          rem_time = "no time estimate"
1113
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1114
                     (instance.disks[i].iv_name, perc_done, rem_time))
1115
    if done or oneshot:
1116
      break
1117

    
1118
    time.sleep(min(60, max_time))
1119

    
1120
  if done:
1121
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1122
  return not cumul_degraded
1123

    
1124

    
1125
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1126
  """Check that mirrors are not degraded.
1127

1128
  The ldisk parameter, if True, will change the test from the
1129
  is_degraded attribute (which represents overall non-ok status for
1130
  the device(s)) to the ldisk (representing the local storage status).
1131

1132
  """
1133
  cfgw.SetDiskID(dev, node)
1134
  if ldisk:
1135
    idx = 6
1136
  else:
1137
    idx = 5
1138

    
1139
  result = True
1140
  if on_primary or dev.AssembleOnSecondary():
1141
    rstats = rpc.call_blockdev_find(node, dev)
1142
    if not rstats:
1143
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1144
      result = False
1145
    else:
1146
      result = result and (not rstats[idx])
1147
  if dev.children:
1148
    for child in dev.children:
1149
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1150

    
1151
  return result
1152

    
1153

    
1154
class LUDiagnoseOS(NoHooksLU):
1155
  """Logical unit for OS diagnose/query.
1156

1157
  """
1158
  _OP_REQP = ["output_fields", "names"]
1159

    
1160
  def CheckPrereq(self):
1161
    """Check prerequisites.
1162

1163
    This always succeeds, since this is a pure query LU.
1164

1165
    """
1166
    if self.op.names:
1167
      raise errors.OpPrereqError("Selective OS query not supported")
1168

    
1169
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1170
    _CheckOutputFields(static=[],
1171
                       dynamic=self.dynamic_fields,
1172
                       selected=self.op.output_fields)
1173

    
1174
  @staticmethod
1175
  def _DiagnoseByOS(node_list, rlist):
1176
    """Remaps a per-node return list into an a per-os per-node dictionary
1177

1178
      Args:
1179
        node_list: a list with the names of all nodes
1180
        rlist: a map with node names as keys and OS objects as values
1181

1182
      Returns:
1183
        map: a map with osnames as keys and as value another map, with
1184
             nodes as
1185
             keys and list of OS objects as values
1186
             e.g. {"debian-etch": {"node1": [<object>,...],
1187
                                   "node2": [<object>,]}
1188
                  }
1189

1190
    """
1191
    all_os = {}
1192
    for node_name, nr in rlist.iteritems():
1193
      if not nr:
1194
        continue
1195
      for os_obj in nr:
1196
        if os_obj.name not in all_os:
1197
          # build a list of nodes for this os containing empty lists
1198
          # for each node in node_list
1199
          all_os[os_obj.name] = {}
1200
          for nname in node_list:
1201
            all_os[os_obj.name][nname] = []
1202
        all_os[os_obj.name][node_name].append(os_obj)
1203
    return all_os
1204

    
1205
  def Exec(self, feedback_fn):
1206
    """Compute the list of OSes.
1207

1208
    """
1209
    node_list = self.cfg.GetNodeList()
1210
    node_data = rpc.call_os_diagnose(node_list)
1211
    if node_data == False:
1212
      raise errors.OpExecError("Can't gather the list of OSes")
1213
    pol = self._DiagnoseByOS(node_list, node_data)
1214
    output = []
1215
    for os_name, os_data in pol.iteritems():
1216
      row = []
1217
      for field in self.op.output_fields:
1218
        if field == "name":
1219
          val = os_name
1220
        elif field == "valid":
1221
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1222
        elif field == "node_status":
1223
          val = {}
1224
          for node_name, nos_list in os_data.iteritems():
1225
            val[node_name] = [(v.status, v.path) for v in nos_list]
1226
        else:
1227
          raise errors.ParameterError(field)
1228
        row.append(val)
1229
      output.append(row)
1230

    
1231
    return output
1232

    
1233

    
1234
class LURemoveNode(LogicalUnit):
1235
  """Logical unit for removing a node.
1236

1237
  """
1238
  HPATH = "node-remove"
1239
  HTYPE = constants.HTYPE_NODE
1240
  _OP_REQP = ["node_name"]
1241

    
1242
  def BuildHooksEnv(self):
1243
    """Build hooks env.
1244

1245
    This doesn't run on the target node in the pre phase as a failed
1246
    node would then be impossible to remove.
1247

1248
    """
1249
    env = {
1250
      "OP_TARGET": self.op.node_name,
1251
      "NODE_NAME": self.op.node_name,
1252
      }
1253
    all_nodes = self.cfg.GetNodeList()
1254
    all_nodes.remove(self.op.node_name)
1255
    return env, all_nodes, all_nodes
1256

    
1257
  def CheckPrereq(self):
1258
    """Check prerequisites.
1259

1260
    This checks:
1261
     - the node exists in the configuration
1262
     - it does not have primary or secondary instances
1263
     - it's not the master
1264

1265
    Any errors are signalled by raising errors.OpPrereqError.
1266

1267
    """
1268
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1269
    if node is None:
1270
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1271

    
1272
    instance_list = self.cfg.GetInstanceList()
1273

    
1274
    masternode = self.sstore.GetMasterNode()
1275
    if node.name == masternode:
1276
      raise errors.OpPrereqError("Node is the master node,"
1277
                                 " you need to failover first.")
1278

    
1279
    for instance_name in instance_list:
1280
      instance = self.cfg.GetInstanceInfo(instance_name)
1281
      if node.name == instance.primary_node:
1282
        raise errors.OpPrereqError("Instance %s still running on the node,"
1283
                                   " please remove first." % instance_name)
1284
      if node.name in instance.secondary_nodes:
1285
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1286
                                   " please remove first." % instance_name)
1287
    self.op.node_name = node.name
1288
    self.node = node
1289

    
1290
  def Exec(self, feedback_fn):
1291
    """Removes the node from the cluster.
1292

1293
    """
1294
    node = self.node
1295
    logger.Info("stopping the node daemon and removing configs from node %s" %
1296
                node.name)
1297

    
1298
    rpc.call_node_leave_cluster(node.name)
1299

    
1300
    logger.Info("Removing node %s from config" % node.name)
1301

    
1302
    self.cfg.RemoveNode(node.name)
1303
    # Remove the node from the Ganeti Lock Manager
1304
    self.context.glm.remove(locking.LEVEL_NODE, node.name)
1305

    
1306
    utils.RemoveHostFromEtcHosts(node.name)
1307

    
1308

    
1309
class LUQueryNodes(NoHooksLU):
1310
  """Logical unit for querying nodes.
1311

1312
  """
1313
  _OP_REQP = ["output_fields", "names"]
1314

    
1315
  def CheckPrereq(self):
1316
    """Check prerequisites.
1317

1318
    This checks that the fields required are valid output fields.
1319

1320
    """
1321
    self.dynamic_fields = frozenset([
1322
      "dtotal", "dfree",
1323
      "mtotal", "mnode", "mfree",
1324
      "bootid",
1325
      "ctotal",
1326
      ])
1327

    
1328
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1329
                               "pinst_list", "sinst_list",
1330
                               "pip", "sip", "tags"],
1331
                       dynamic=self.dynamic_fields,
1332
                       selected=self.op.output_fields)
1333

    
1334
    self.wanted = _GetWantedNodes(self, self.op.names)
1335

    
1336
  def Exec(self, feedback_fn):
1337
    """Computes the list of nodes and their attributes.
1338

1339
    """
1340
    nodenames = self.wanted
1341
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1342

    
1343
    # begin data gathering
1344

    
1345
    if self.dynamic_fields.intersection(self.op.output_fields):
1346
      live_data = {}
1347
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1348
      for name in nodenames:
1349
        nodeinfo = node_data.get(name, None)
1350
        if nodeinfo:
1351
          live_data[name] = {
1352
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1353
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1354
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1355
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1356
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1357
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1358
            "bootid": nodeinfo['bootid'],
1359
            }
1360
        else:
1361
          live_data[name] = {}
1362
    else:
1363
      live_data = dict.fromkeys(nodenames, {})
1364

    
1365
    node_to_primary = dict([(name, set()) for name in nodenames])
1366
    node_to_secondary = dict([(name, set()) for name in nodenames])
1367

    
1368
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1369
                             "sinst_cnt", "sinst_list"))
1370
    if inst_fields & frozenset(self.op.output_fields):
1371
      instancelist = self.cfg.GetInstanceList()
1372

    
1373
      for instance_name in instancelist:
1374
        inst = self.cfg.GetInstanceInfo(instance_name)
1375
        if inst.primary_node in node_to_primary:
1376
          node_to_primary[inst.primary_node].add(inst.name)
1377
        for secnode in inst.secondary_nodes:
1378
          if secnode in node_to_secondary:
1379
            node_to_secondary[secnode].add(inst.name)
1380

    
1381
    # end data gathering
1382

    
1383
    output = []
1384
    for node in nodelist:
1385
      node_output = []
1386
      for field in self.op.output_fields:
1387
        if field == "name":
1388
          val = node.name
1389
        elif field == "pinst_list":
1390
          val = list(node_to_primary[node.name])
1391
        elif field == "sinst_list":
1392
          val = list(node_to_secondary[node.name])
1393
        elif field == "pinst_cnt":
1394
          val = len(node_to_primary[node.name])
1395
        elif field == "sinst_cnt":
1396
          val = len(node_to_secondary[node.name])
1397
        elif field == "pip":
1398
          val = node.primary_ip
1399
        elif field == "sip":
1400
          val = node.secondary_ip
1401
        elif field == "tags":
1402
          val = list(node.GetTags())
1403
        elif field in self.dynamic_fields:
1404
          val = live_data[node.name].get(field, None)
1405
        else:
1406
          raise errors.ParameterError(field)
1407
        node_output.append(val)
1408
      output.append(node_output)
1409

    
1410
    return output
1411

    
1412

    
1413
class LUQueryNodeVolumes(NoHooksLU):
1414
  """Logical unit for getting volumes on node(s).
1415

1416
  """
1417
  _OP_REQP = ["nodes", "output_fields"]
1418

    
1419
  def CheckPrereq(self):
1420
    """Check prerequisites.
1421

1422
    This checks that the fields required are valid output fields.
1423

1424
    """
1425
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1426

    
1427
    _CheckOutputFields(static=["node"],
1428
                       dynamic=["phys", "vg", "name", "size", "instance"],
1429
                       selected=self.op.output_fields)
1430

    
1431

    
1432
  def Exec(self, feedback_fn):
1433
    """Computes the list of nodes and their attributes.
1434

1435
    """
1436
    nodenames = self.nodes
1437
    volumes = rpc.call_node_volumes(nodenames)
1438

    
1439
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1440
             in self.cfg.GetInstanceList()]
1441

    
1442
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1443

    
1444
    output = []
1445
    for node in nodenames:
1446
      if node not in volumes or not volumes[node]:
1447
        continue
1448

    
1449
      node_vols = volumes[node][:]
1450
      node_vols.sort(key=lambda vol: vol['dev'])
1451

    
1452
      for vol in node_vols:
1453
        node_output = []
1454
        for field in self.op.output_fields:
1455
          if field == "node":
1456
            val = node
1457
          elif field == "phys":
1458
            val = vol['dev']
1459
          elif field == "vg":
1460
            val = vol['vg']
1461
          elif field == "name":
1462
            val = vol['name']
1463
          elif field == "size":
1464
            val = int(float(vol['size']))
1465
          elif field == "instance":
1466
            for inst in ilist:
1467
              if node not in lv_by_node[inst]:
1468
                continue
1469
              if vol['name'] in lv_by_node[inst][node]:
1470
                val = inst.name
1471
                break
1472
            else:
1473
              val = '-'
1474
          else:
1475
            raise errors.ParameterError(field)
1476
          node_output.append(str(val))
1477

    
1478
        output.append(node_output)
1479

    
1480
    return output
1481

    
1482

    
1483
class LUAddNode(LogicalUnit):
1484
  """Logical unit for adding node to the cluster.
1485

1486
  """
1487
  HPATH = "node-add"
1488
  HTYPE = constants.HTYPE_NODE
1489
  _OP_REQP = ["node_name"]
1490

    
1491
  def BuildHooksEnv(self):
1492
    """Build hooks env.
1493

1494
    This will run on all nodes before, and on all nodes + the new node after.
1495

1496
    """
1497
    env = {
1498
      "OP_TARGET": self.op.node_name,
1499
      "NODE_NAME": self.op.node_name,
1500
      "NODE_PIP": self.op.primary_ip,
1501
      "NODE_SIP": self.op.secondary_ip,
1502
      }
1503
    nodes_0 = self.cfg.GetNodeList()
1504
    nodes_1 = nodes_0 + [self.op.node_name, ]
1505
    return env, nodes_0, nodes_1
1506

    
1507
  def CheckPrereq(self):
1508
    """Check prerequisites.
1509

1510
    This checks:
1511
     - the new node is not already in the config
1512
     - it is resolvable
1513
     - its parameters (single/dual homed) matches the cluster
1514

1515
    Any errors are signalled by raising errors.OpPrereqError.
1516

1517
    """
1518
    node_name = self.op.node_name
1519
    cfg = self.cfg
1520

    
1521
    dns_data = utils.HostInfo(node_name)
1522

    
1523
    node = dns_data.name
1524
    primary_ip = self.op.primary_ip = dns_data.ip
1525
    secondary_ip = getattr(self.op, "secondary_ip", None)
1526
    if secondary_ip is None:
1527
      secondary_ip = primary_ip
1528
    if not utils.IsValidIP(secondary_ip):
1529
      raise errors.OpPrereqError("Invalid secondary IP given")
1530
    self.op.secondary_ip = secondary_ip
1531

    
1532
    node_list = cfg.GetNodeList()
1533
    if not self.op.readd and node in node_list:
1534
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1535
                                 node)
1536
    elif self.op.readd and node not in node_list:
1537
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1538

    
1539
    for existing_node_name in node_list:
1540
      existing_node = cfg.GetNodeInfo(existing_node_name)
1541

    
1542
      if self.op.readd and node == existing_node_name:
1543
        if (existing_node.primary_ip != primary_ip or
1544
            existing_node.secondary_ip != secondary_ip):
1545
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1546
                                     " address configuration as before")
1547
        continue
1548

    
1549
      if (existing_node.primary_ip == primary_ip or
1550
          existing_node.secondary_ip == primary_ip or
1551
          existing_node.primary_ip == secondary_ip or
1552
          existing_node.secondary_ip == secondary_ip):
1553
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1554
                                   " existing node %s" % existing_node.name)
1555

    
1556
    # check that the type of the node (single versus dual homed) is the
1557
    # same as for the master
1558
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1559
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1560
    newbie_singlehomed = secondary_ip == primary_ip
1561
    if master_singlehomed != newbie_singlehomed:
1562
      if master_singlehomed:
1563
        raise errors.OpPrereqError("The master has no private ip but the"
1564
                                   " new node has one")
1565
      else:
1566
        raise errors.OpPrereqError("The master has a private ip but the"
1567
                                   " new node doesn't have one")
1568

    
1569
    # checks reachablity
1570
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1571
      raise errors.OpPrereqError("Node not reachable by ping")
1572

    
1573
    if not newbie_singlehomed:
1574
      # check reachability from my secondary ip to newbie's secondary ip
1575
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1576
                           source=myself.secondary_ip):
1577
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1578
                                   " based ping to noded port")
1579

    
1580
    self.new_node = objects.Node(name=node,
1581
                                 primary_ip=primary_ip,
1582
                                 secondary_ip=secondary_ip)
1583

    
1584
  def Exec(self, feedback_fn):
1585
    """Adds the new node to the cluster.
1586

1587
    """
1588
    new_node = self.new_node
1589
    node = new_node.name
1590

    
1591
    # check connectivity
1592
    result = rpc.call_version([node])[node]
1593
    if result:
1594
      if constants.PROTOCOL_VERSION == result:
1595
        logger.Info("communication to node %s fine, sw version %s match" %
1596
                    (node, result))
1597
      else:
1598
        raise errors.OpExecError("Version mismatch master version %s,"
1599
                                 " node version %s" %
1600
                                 (constants.PROTOCOL_VERSION, result))
1601
    else:
1602
      raise errors.OpExecError("Cannot get version from the new node")
1603

    
1604
    # setup ssh on node
1605
    logger.Info("copy ssh key to node %s" % node)
1606
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1607
    keyarray = []
1608
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1609
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1610
                priv_key, pub_key]
1611

    
1612
    for i in keyfiles:
1613
      f = open(i, 'r')
1614
      try:
1615
        keyarray.append(f.read())
1616
      finally:
1617
        f.close()
1618

    
1619
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1620
                               keyarray[3], keyarray[4], keyarray[5])
1621

    
1622
    if not result:
1623
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1624

    
1625
    # Add node to our /etc/hosts, and add key to known_hosts
1626
    utils.AddHostToEtcHosts(new_node.name)
1627

    
1628
    if new_node.secondary_ip != new_node.primary_ip:
1629
      if not rpc.call_node_tcp_ping(new_node.name,
1630
                                    constants.LOCALHOST_IP_ADDRESS,
1631
                                    new_node.secondary_ip,
1632
                                    constants.DEFAULT_NODED_PORT,
1633
                                    10, False):
1634
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1635
                                 " you gave (%s). Please fix and re-run this"
1636
                                 " command." % new_node.secondary_ip)
1637

    
1638
    node_verify_list = [self.sstore.GetMasterNode()]
1639
    node_verify_param = {
1640
      'nodelist': [node],
1641
      # TODO: do a node-net-test as well?
1642
    }
1643

    
1644
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1645
    for verifier in node_verify_list:
1646
      if not result[verifier]:
1647
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1648
                                 " for remote verification" % verifier)
1649
      if result[verifier]['nodelist']:
1650
        for failed in result[verifier]['nodelist']:
1651
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1652
                      (verifier, result[verifier]['nodelist'][failed]))
1653
        raise errors.OpExecError("ssh/hostname verification failed.")
1654

    
1655
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1656
    # including the node just added
1657
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1658
    dist_nodes = self.cfg.GetNodeList()
1659
    if not self.op.readd:
1660
      dist_nodes.append(node)
1661
    if myself.name in dist_nodes:
1662
      dist_nodes.remove(myself.name)
1663

    
1664
    logger.Debug("Copying hosts and known_hosts to all nodes")
1665
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1666
      result = rpc.call_upload_file(dist_nodes, fname)
1667
      for to_node in dist_nodes:
1668
        if not result[to_node]:
1669
          logger.Error("copy of file %s to node %s failed" %
1670
                       (fname, to_node))
1671

    
1672
    to_copy = self.sstore.GetFileList()
1673
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1674
      to_copy.append(constants.VNC_PASSWORD_FILE)
1675
    for fname in to_copy:
1676
      result = rpc.call_upload_file([node], fname)
1677
      if not result[node]:
1678
        logger.Error("could not copy file %s to node %s" % (fname, node))
1679

    
1680
    if not self.op.readd:
1681
      logger.Info("adding node %s to cluster.conf" % node)
1682
      self.cfg.AddNode(new_node)
1683
      # Add the new node to the Ganeti Lock Manager
1684
      self.context.glm.add(locking.LEVEL_NODE, node)
1685

    
1686

    
1687
class LUMasterFailover(LogicalUnit):
1688
  """Failover the master node to the current node.
1689

1690
  This is a special LU in that it must run on a non-master node.
1691

1692
  """
1693
  HPATH = "master-failover"
1694
  HTYPE = constants.HTYPE_CLUSTER
1695
  REQ_MASTER = False
1696
  REQ_WSSTORE = True
1697
  _OP_REQP = []
1698

    
1699
  def BuildHooksEnv(self):
1700
    """Build hooks env.
1701

1702
    This will run on the new master only in the pre phase, and on all
1703
    the nodes in the post phase.
1704

1705
    """
1706
    env = {
1707
      "OP_TARGET": self.new_master,
1708
      "NEW_MASTER": self.new_master,
1709
      "OLD_MASTER": self.old_master,
1710
      }
1711
    return env, [self.new_master], self.cfg.GetNodeList()
1712

    
1713
  def CheckPrereq(self):
1714
    """Check prerequisites.
1715

1716
    This checks that we are not already the master.
1717

1718
    """
1719
    self.new_master = utils.HostInfo().name
1720
    self.old_master = self.sstore.GetMasterNode()
1721

    
1722
    if self.old_master == self.new_master:
1723
      raise errors.OpPrereqError("This commands must be run on the node"
1724
                                 " where you want the new master to be."
1725
                                 " %s is already the master" %
1726
                                 self.old_master)
1727

    
1728
  def Exec(self, feedback_fn):
1729
    """Failover the master node.
1730

1731
    This command, when run on a non-master node, will cause the current
1732
    master to cease being master, and the non-master to become new
1733
    master.
1734

1735
    """
1736
    #TODO: do not rely on gethostname returning the FQDN
1737
    logger.Info("setting master to %s, old master: %s" %
1738
                (self.new_master, self.old_master))
1739

    
1740
    if not rpc.call_node_stop_master(self.old_master):
1741
      logger.Error("could disable the master role on the old master"
1742
                   " %s, please disable manually" % self.old_master)
1743

    
1744
    ss = self.sstore
1745
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1746
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1747
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1748
      logger.Error("could not distribute the new simple store master file"
1749
                   " to the other nodes, please check.")
1750

    
1751
    if not rpc.call_node_start_master(self.new_master):
1752
      logger.Error("could not start the master role on the new master"
1753
                   " %s, please check" % self.new_master)
1754
      feedback_fn("Error in activating the master IP on the new master,"
1755
                  " please fix manually.")
1756

    
1757

    
1758

    
1759
class LUQueryClusterInfo(NoHooksLU):
1760
  """Query cluster configuration.
1761

1762
  """
1763
  _OP_REQP = []
1764
  REQ_MASTER = False
1765
  REQ_BGL = False
1766

    
1767
  def ExpandNames(self):
1768
    self.needed_locks = {}
1769

    
1770
  def CheckPrereq(self):
1771
    """No prerequsites needed for this LU.
1772

1773
    """
1774
    pass
1775

    
1776
  def Exec(self, feedback_fn):
1777
    """Return cluster config.
1778

1779
    """
1780
    result = {
1781
      "name": self.sstore.GetClusterName(),
1782
      "software_version": constants.RELEASE_VERSION,
1783
      "protocol_version": constants.PROTOCOL_VERSION,
1784
      "config_version": constants.CONFIG_VERSION,
1785
      "os_api_version": constants.OS_API_VERSION,
1786
      "export_version": constants.EXPORT_VERSION,
1787
      "master": self.sstore.GetMasterNode(),
1788
      "architecture": (platform.architecture()[0], platform.machine()),
1789
      "hypervisor_type": self.sstore.GetHypervisorType(),
1790
      }
1791

    
1792
    return result
1793

    
1794

    
1795
class LUDumpClusterConfig(NoHooksLU):
1796
  """Return a text-representation of the cluster-config.
1797

1798
  """
1799
  _OP_REQP = []
1800
  REQ_BGL = False
1801

    
1802
  def ExpandNames(self):
1803
    self.needed_locks = {}
1804

    
1805
  def CheckPrereq(self):
1806
    """No prerequisites.
1807

1808
    """
1809
    pass
1810

    
1811
  def Exec(self, feedback_fn):
1812
    """Dump a representation of the cluster config to the standard output.
1813

1814
    """
1815
    return self.cfg.DumpConfig()
1816

    
1817

    
1818
class LUActivateInstanceDisks(NoHooksLU):
1819
  """Bring up an instance's disks.
1820

1821
  """
1822
  _OP_REQP = ["instance_name"]
1823

    
1824
  def CheckPrereq(self):
1825
    """Check prerequisites.
1826

1827
    This checks that the instance is in the cluster.
1828

1829
    """
1830
    instance = self.cfg.GetInstanceInfo(
1831
      self.cfg.ExpandInstanceName(self.op.instance_name))
1832
    if instance is None:
1833
      raise errors.OpPrereqError("Instance '%s' not known" %
1834
                                 self.op.instance_name)
1835
    self.instance = instance
1836

    
1837

    
1838
  def Exec(self, feedback_fn):
1839
    """Activate the disks.
1840

1841
    """
1842
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1843
    if not disks_ok:
1844
      raise errors.OpExecError("Cannot activate block devices")
1845

    
1846
    return disks_info
1847

    
1848

    
1849
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1850
  """Prepare the block devices for an instance.
1851

1852
  This sets up the block devices on all nodes.
1853

1854
  Args:
1855
    instance: a ganeti.objects.Instance object
1856
    ignore_secondaries: if true, errors on secondary nodes won't result
1857
                        in an error return from the function
1858

1859
  Returns:
1860
    false if the operation failed
1861
    list of (host, instance_visible_name, node_visible_name) if the operation
1862
         suceeded with the mapping from node devices to instance devices
1863
  """
1864
  device_info = []
1865
  disks_ok = True
1866
  iname = instance.name
1867
  # With the two passes mechanism we try to reduce the window of
1868
  # opportunity for the race condition of switching DRBD to primary
1869
  # before handshaking occured, but we do not eliminate it
1870

    
1871
  # The proper fix would be to wait (with some limits) until the
1872
  # connection has been made and drbd transitions from WFConnection
1873
  # into any other network-connected state (Connected, SyncTarget,
1874
  # SyncSource, etc.)
1875

    
1876
  # 1st pass, assemble on all nodes in secondary mode
1877
  for inst_disk in instance.disks:
1878
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1879
      cfg.SetDiskID(node_disk, node)
1880
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1881
      if not result:
1882
        logger.Error("could not prepare block device %s on node %s"
1883
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1884
        if not ignore_secondaries:
1885
          disks_ok = False
1886

    
1887
  # FIXME: race condition on drbd migration to primary
1888

    
1889
  # 2nd pass, do only the primary node
1890
  for inst_disk in instance.disks:
1891
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1892
      if node != instance.primary_node:
1893
        continue
1894
      cfg.SetDiskID(node_disk, node)
1895
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1896
      if not result:
1897
        logger.Error("could not prepare block device %s on node %s"
1898
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1899
        disks_ok = False
1900
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1901

    
1902
  # leave the disks configured for the primary node
1903
  # this is a workaround that would be fixed better by
1904
  # improving the logical/physical id handling
1905
  for disk in instance.disks:
1906
    cfg.SetDiskID(disk, instance.primary_node)
1907

    
1908
  return disks_ok, device_info
1909

    
1910

    
1911
def _StartInstanceDisks(cfg, instance, force):
1912
  """Start the disks of an instance.
1913

1914
  """
1915
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1916
                                           ignore_secondaries=force)
1917
  if not disks_ok:
1918
    _ShutdownInstanceDisks(instance, cfg)
1919
    if force is not None and not force:
1920
      logger.Error("If the message above refers to a secondary node,"
1921
                   " you can retry the operation using '--force'.")
1922
    raise errors.OpExecError("Disk consistency error")
1923

    
1924

    
1925
class LUDeactivateInstanceDisks(NoHooksLU):
1926
  """Shutdown an instance's disks.
1927

1928
  """
1929
  _OP_REQP = ["instance_name"]
1930

    
1931
  def CheckPrereq(self):
1932
    """Check prerequisites.
1933

1934
    This checks that the instance is in the cluster.
1935

1936
    """
1937
    instance = self.cfg.GetInstanceInfo(
1938
      self.cfg.ExpandInstanceName(self.op.instance_name))
1939
    if instance is None:
1940
      raise errors.OpPrereqError("Instance '%s' not known" %
1941
                                 self.op.instance_name)
1942
    self.instance = instance
1943

    
1944
  def Exec(self, feedback_fn):
1945
    """Deactivate the disks
1946

1947
    """
1948
    instance = self.instance
1949
    ins_l = rpc.call_instance_list([instance.primary_node])
1950
    ins_l = ins_l[instance.primary_node]
1951
    if not type(ins_l) is list:
1952
      raise errors.OpExecError("Can't contact node '%s'" %
1953
                               instance.primary_node)
1954

    
1955
    if self.instance.name in ins_l:
1956
      raise errors.OpExecError("Instance is running, can't shutdown"
1957
                               " block devices.")
1958

    
1959
    _ShutdownInstanceDisks(instance, self.cfg)
1960

    
1961

    
1962
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1963
  """Shutdown block devices of an instance.
1964

1965
  This does the shutdown on all nodes of the instance.
1966

1967
  If the ignore_primary is false, errors on the primary node are
1968
  ignored.
1969

1970
  """
1971
  result = True
1972
  for disk in instance.disks:
1973
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1974
      cfg.SetDiskID(top_disk, node)
1975
      if not rpc.call_blockdev_shutdown(node, top_disk):
1976
        logger.Error("could not shutdown block device %s on node %s" %
1977
                     (disk.iv_name, node))
1978
        if not ignore_primary or node != instance.primary_node:
1979
          result = False
1980
  return result
1981

    
1982

    
1983
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1984
  """Checks if a node has enough free memory.
1985

1986
  This function check if a given node has the needed amount of free
1987
  memory. In case the node has less memory or we cannot get the
1988
  information from the node, this function raise an OpPrereqError
1989
  exception.
1990

1991
  Args:
1992
    - cfg: a ConfigWriter instance
1993
    - node: the node name
1994
    - reason: string to use in the error message
1995
    - requested: the amount of memory in MiB
1996

1997
  """
1998
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1999
  if not nodeinfo or not isinstance(nodeinfo, dict):
2000
    raise errors.OpPrereqError("Could not contact node %s for resource"
2001
                             " information" % (node,))
2002

    
2003
  free_mem = nodeinfo[node].get('memory_free')
2004
  if not isinstance(free_mem, int):
2005
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2006
                             " was '%s'" % (node, free_mem))
2007
  if requested > free_mem:
2008
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2009
                             " needed %s MiB, available %s MiB" %
2010
                             (node, reason, requested, free_mem))
2011

    
2012

    
2013
class LUStartupInstance(LogicalUnit):
2014
  """Starts an instance.
2015

2016
  """
2017
  HPATH = "instance-start"
2018
  HTYPE = constants.HTYPE_INSTANCE
2019
  _OP_REQP = ["instance_name", "force"]
2020

    
2021
  def BuildHooksEnv(self):
2022
    """Build hooks env.
2023

2024
    This runs on master, primary and secondary nodes of the instance.
2025

2026
    """
2027
    env = {
2028
      "FORCE": self.op.force,
2029
      }
2030
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2031
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2032
          list(self.instance.secondary_nodes))
2033
    return env, nl, nl
2034

    
2035
  def CheckPrereq(self):
2036
    """Check prerequisites.
2037

2038
    This checks that the instance is in the cluster.
2039

2040
    """
2041
    instance = self.cfg.GetInstanceInfo(
2042
      self.cfg.ExpandInstanceName(self.op.instance_name))
2043
    if instance is None:
2044
      raise errors.OpPrereqError("Instance '%s' not known" %
2045
                                 self.op.instance_name)
2046

    
2047
    # check bridges existance
2048
    _CheckInstanceBridgesExist(instance)
2049

    
2050
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2051
                         "starting instance %s" % instance.name,
2052
                         instance.memory)
2053

    
2054
    self.instance = instance
2055
    self.op.instance_name = instance.name
2056

    
2057
  def Exec(self, feedback_fn):
2058
    """Start the instance.
2059

2060
    """
2061
    instance = self.instance
2062
    force = self.op.force
2063
    extra_args = getattr(self.op, "extra_args", "")
2064

    
2065
    self.cfg.MarkInstanceUp(instance.name)
2066

    
2067
    node_current = instance.primary_node
2068

    
2069
    _StartInstanceDisks(self.cfg, instance, force)
2070

    
2071
    if not rpc.call_instance_start(node_current, instance, extra_args):
2072
      _ShutdownInstanceDisks(instance, self.cfg)
2073
      raise errors.OpExecError("Could not start instance")
2074

    
2075

    
2076
class LURebootInstance(LogicalUnit):
2077
  """Reboot an instance.
2078

2079
  """
2080
  HPATH = "instance-reboot"
2081
  HTYPE = constants.HTYPE_INSTANCE
2082
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2083

    
2084
  def BuildHooksEnv(self):
2085
    """Build hooks env.
2086

2087
    This runs on master, primary and secondary nodes of the instance.
2088

2089
    """
2090
    env = {
2091
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2092
      }
2093
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2094
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2095
          list(self.instance.secondary_nodes))
2096
    return env, nl, nl
2097

    
2098
  def CheckPrereq(self):
2099
    """Check prerequisites.
2100

2101
    This checks that the instance is in the cluster.
2102

2103
    """
2104
    instance = self.cfg.GetInstanceInfo(
2105
      self.cfg.ExpandInstanceName(self.op.instance_name))
2106
    if instance is None:
2107
      raise errors.OpPrereqError("Instance '%s' not known" %
2108
                                 self.op.instance_name)
2109

    
2110
    # check bridges existance
2111
    _CheckInstanceBridgesExist(instance)
2112

    
2113
    self.instance = instance
2114
    self.op.instance_name = instance.name
2115

    
2116
  def Exec(self, feedback_fn):
2117
    """Reboot the instance.
2118

2119
    """
2120
    instance = self.instance
2121
    ignore_secondaries = self.op.ignore_secondaries
2122
    reboot_type = self.op.reboot_type
2123
    extra_args = getattr(self.op, "extra_args", "")
2124

    
2125
    node_current = instance.primary_node
2126

    
2127
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2128
                           constants.INSTANCE_REBOOT_HARD,
2129
                           constants.INSTANCE_REBOOT_FULL]:
2130
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2131
                                  (constants.INSTANCE_REBOOT_SOFT,
2132
                                   constants.INSTANCE_REBOOT_HARD,
2133
                                   constants.INSTANCE_REBOOT_FULL))
2134

    
2135
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2136
                       constants.INSTANCE_REBOOT_HARD]:
2137
      if not rpc.call_instance_reboot(node_current, instance,
2138
                                      reboot_type, extra_args):
2139
        raise errors.OpExecError("Could not reboot instance")
2140
    else:
2141
      if not rpc.call_instance_shutdown(node_current, instance):
2142
        raise errors.OpExecError("could not shutdown instance for full reboot")
2143
      _ShutdownInstanceDisks(instance, self.cfg)
2144
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2145
      if not rpc.call_instance_start(node_current, instance, extra_args):
2146
        _ShutdownInstanceDisks(instance, self.cfg)
2147
        raise errors.OpExecError("Could not start instance for full reboot")
2148

    
2149
    self.cfg.MarkInstanceUp(instance.name)
2150

    
2151

    
2152
class LUShutdownInstance(LogicalUnit):
2153
  """Shutdown an instance.
2154

2155
  """
2156
  HPATH = "instance-stop"
2157
  HTYPE = constants.HTYPE_INSTANCE
2158
  _OP_REQP = ["instance_name"]
2159

    
2160
  def BuildHooksEnv(self):
2161
    """Build hooks env.
2162

2163
    This runs on master, primary and secondary nodes of the instance.
2164

2165
    """
2166
    env = _BuildInstanceHookEnvByObject(self.instance)
2167
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2168
          list(self.instance.secondary_nodes))
2169
    return env, nl, nl
2170

    
2171
  def CheckPrereq(self):
2172
    """Check prerequisites.
2173

2174
    This checks that the instance is in the cluster.
2175

2176
    """
2177
    instance = self.cfg.GetInstanceInfo(
2178
      self.cfg.ExpandInstanceName(self.op.instance_name))
2179
    if instance is None:
2180
      raise errors.OpPrereqError("Instance '%s' not known" %
2181
                                 self.op.instance_name)
2182
    self.instance = instance
2183

    
2184
  def Exec(self, feedback_fn):
2185
    """Shutdown the instance.
2186

2187
    """
2188
    instance = self.instance
2189
    node_current = instance.primary_node
2190
    self.cfg.MarkInstanceDown(instance.name)
2191
    if not rpc.call_instance_shutdown(node_current, instance):
2192
      logger.Error("could not shutdown instance")
2193

    
2194
    _ShutdownInstanceDisks(instance, self.cfg)
2195

    
2196

    
2197
class LUReinstallInstance(LogicalUnit):
2198
  """Reinstall an instance.
2199

2200
  """
2201
  HPATH = "instance-reinstall"
2202
  HTYPE = constants.HTYPE_INSTANCE
2203
  _OP_REQP = ["instance_name"]
2204

    
2205
  def BuildHooksEnv(self):
2206
    """Build hooks env.
2207

2208
    This runs on master, primary and secondary nodes of the instance.
2209

2210
    """
2211
    env = _BuildInstanceHookEnvByObject(self.instance)
2212
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2213
          list(self.instance.secondary_nodes))
2214
    return env, nl, nl
2215

    
2216
  def CheckPrereq(self):
2217
    """Check prerequisites.
2218

2219
    This checks that the instance is in the cluster and is not running.
2220

2221
    """
2222
    instance = self.cfg.GetInstanceInfo(
2223
      self.cfg.ExpandInstanceName(self.op.instance_name))
2224
    if instance is None:
2225
      raise errors.OpPrereqError("Instance '%s' not known" %
2226
                                 self.op.instance_name)
2227
    if instance.disk_template == constants.DT_DISKLESS:
2228
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2229
                                 self.op.instance_name)
2230
    if instance.status != "down":
2231
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2232
                                 self.op.instance_name)
2233
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2234
    if remote_info:
2235
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2236
                                 (self.op.instance_name,
2237
                                  instance.primary_node))
2238

    
2239
    self.op.os_type = getattr(self.op, "os_type", None)
2240
    if self.op.os_type is not None:
2241
      # OS verification
2242
      pnode = self.cfg.GetNodeInfo(
2243
        self.cfg.ExpandNodeName(instance.primary_node))
2244
      if pnode is None:
2245
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2246
                                   self.op.pnode)
2247
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2248
      if not os_obj:
2249
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2250
                                   " primary node"  % self.op.os_type)
2251

    
2252
    self.instance = instance
2253

    
2254
  def Exec(self, feedback_fn):
2255
    """Reinstall the instance.
2256

2257
    """
2258
    inst = self.instance
2259

    
2260
    if self.op.os_type is not None:
2261
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2262
      inst.os = self.op.os_type
2263
      self.cfg.AddInstance(inst)
2264

    
2265
    _StartInstanceDisks(self.cfg, inst, None)
2266
    try:
2267
      feedback_fn("Running the instance OS create scripts...")
2268
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2269
        raise errors.OpExecError("Could not install OS for instance %s"
2270
                                 " on node %s" %
2271
                                 (inst.name, inst.primary_node))
2272
    finally:
2273
      _ShutdownInstanceDisks(inst, self.cfg)
2274

    
2275

    
2276
class LURenameInstance(LogicalUnit):
2277
  """Rename an instance.
2278

2279
  """
2280
  HPATH = "instance-rename"
2281
  HTYPE = constants.HTYPE_INSTANCE
2282
  _OP_REQP = ["instance_name", "new_name"]
2283

    
2284
  def BuildHooksEnv(self):
2285
    """Build hooks env.
2286

2287
    This runs on master, primary and secondary nodes of the instance.
2288

2289
    """
2290
    env = _BuildInstanceHookEnvByObject(self.instance)
2291
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2292
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2293
          list(self.instance.secondary_nodes))
2294
    return env, nl, nl
2295

    
2296
  def CheckPrereq(self):
2297
    """Check prerequisites.
2298

2299
    This checks that the instance is in the cluster and is not running.
2300

2301
    """
2302
    instance = self.cfg.GetInstanceInfo(
2303
      self.cfg.ExpandInstanceName(self.op.instance_name))
2304
    if instance is None:
2305
      raise errors.OpPrereqError("Instance '%s' not known" %
2306
                                 self.op.instance_name)
2307
    if instance.status != "down":
2308
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2309
                                 self.op.instance_name)
2310
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2311
    if remote_info:
2312
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2313
                                 (self.op.instance_name,
2314
                                  instance.primary_node))
2315
    self.instance = instance
2316

    
2317
    # new name verification
2318
    name_info = utils.HostInfo(self.op.new_name)
2319

    
2320
    self.op.new_name = new_name = name_info.name
2321
    instance_list = self.cfg.GetInstanceList()
2322
    if new_name in instance_list:
2323
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2324
                                 new_name)
2325

    
2326
    if not getattr(self.op, "ignore_ip", False):
2327
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2328
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2329
                                   (name_info.ip, new_name))
2330

    
2331

    
2332
  def Exec(self, feedback_fn):
2333
    """Reinstall the instance.
2334

2335
    """
2336
    inst = self.instance
2337
    old_name = inst.name
2338

    
2339
    if inst.disk_template == constants.DT_FILE:
2340
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2341

    
2342
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2343

    
2344
    # re-read the instance from the configuration after rename
2345
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2346

    
2347
    if inst.disk_template == constants.DT_FILE:
2348
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2349
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2350
                                                old_file_storage_dir,
2351
                                                new_file_storage_dir)
2352

    
2353
      if not result:
2354
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2355
                                 " directory '%s' to '%s' (but the instance"
2356
                                 " has been renamed in Ganeti)" % (
2357
                                 inst.primary_node, old_file_storage_dir,
2358
                                 new_file_storage_dir))
2359

    
2360
      if not result[0]:
2361
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2362
                                 " (but the instance has been renamed in"
2363
                                 " Ganeti)" % (old_file_storage_dir,
2364
                                               new_file_storage_dir))
2365

    
2366
    _StartInstanceDisks(self.cfg, inst, None)
2367
    try:
2368
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2369
                                          "sda", "sdb"):
2370
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2371
               " instance has been renamed in Ganeti)" %
2372
               (inst.name, inst.primary_node))
2373
        logger.Error(msg)
2374
    finally:
2375
      _ShutdownInstanceDisks(inst, self.cfg)
2376

    
2377

    
2378
class LURemoveInstance(LogicalUnit):
2379
  """Remove an instance.
2380

2381
  """
2382
  HPATH = "instance-remove"
2383
  HTYPE = constants.HTYPE_INSTANCE
2384
  _OP_REQP = ["instance_name", "ignore_failures"]
2385

    
2386
  def BuildHooksEnv(self):
2387
    """Build hooks env.
2388

2389
    This runs on master, primary and secondary nodes of the instance.
2390

2391
    """
2392
    env = _BuildInstanceHookEnvByObject(self.instance)
2393
    nl = [self.sstore.GetMasterNode()]
2394
    return env, nl, nl
2395

    
2396
  def CheckPrereq(self):
2397
    """Check prerequisites.
2398

2399
    This checks that the instance is in the cluster.
2400

2401
    """
2402
    instance = self.cfg.GetInstanceInfo(
2403
      self.cfg.ExpandInstanceName(self.op.instance_name))
2404
    if instance is None:
2405
      raise errors.OpPrereqError("Instance '%s' not known" %
2406
                                 self.op.instance_name)
2407
    self.instance = instance
2408

    
2409
  def Exec(self, feedback_fn):
2410
    """Remove the instance.
2411

2412
    """
2413
    instance = self.instance
2414
    logger.Info("shutting down instance %s on node %s" %
2415
                (instance.name, instance.primary_node))
2416

    
2417
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2418
      if self.op.ignore_failures:
2419
        feedback_fn("Warning: can't shutdown instance")
2420
      else:
2421
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2422
                                 (instance.name, instance.primary_node))
2423

    
2424
    logger.Info("removing block devices for instance %s" % instance.name)
2425

    
2426
    if not _RemoveDisks(instance, self.cfg):
2427
      if self.op.ignore_failures:
2428
        feedback_fn("Warning: can't remove instance's disks")
2429
      else:
2430
        raise errors.OpExecError("Can't remove instance's disks")
2431

    
2432
    logger.Info("removing instance %s out of cluster config" % instance.name)
2433

    
2434
    self.cfg.RemoveInstance(instance.name)
2435
    # Remove the new instance from the Ganeti Lock Manager
2436
    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2437

    
2438

    
2439
class LUQueryInstances(NoHooksLU):
2440
  """Logical unit for querying instances.
2441

2442
  """
2443
  _OP_REQP = ["output_fields", "names"]
2444

    
2445
  def CheckPrereq(self):
2446
    """Check prerequisites.
2447

2448
    This checks that the fields required are valid output fields.
2449

2450
    """
2451
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2452
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2453
                               "admin_state", "admin_ram",
2454
                               "disk_template", "ip", "mac", "bridge",
2455
                               "sda_size", "sdb_size", "vcpus", "tags"],
2456
                       dynamic=self.dynamic_fields,
2457
                       selected=self.op.output_fields)
2458

    
2459
    self.wanted = _GetWantedInstances(self, self.op.names)
2460

    
2461
  def Exec(self, feedback_fn):
2462
    """Computes the list of nodes and their attributes.
2463

2464
    """
2465
    instance_names = self.wanted
2466
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2467
                     in instance_names]
2468

    
2469
    # begin data gathering
2470

    
2471
    nodes = frozenset([inst.primary_node for inst in instance_list])
2472

    
2473
    bad_nodes = []
2474
    if self.dynamic_fields.intersection(self.op.output_fields):
2475
      live_data = {}
2476
      node_data = rpc.call_all_instances_info(nodes)
2477
      for name in nodes:
2478
        result = node_data[name]
2479
        if result:
2480
          live_data.update(result)
2481
        elif result == False:
2482
          bad_nodes.append(name)
2483
        # else no instance is alive
2484
    else:
2485
      live_data = dict([(name, {}) for name in instance_names])
2486

    
2487
    # end data gathering
2488

    
2489
    output = []
2490
    for instance in instance_list:
2491
      iout = []
2492
      for field in self.op.output_fields:
2493
        if field == "name":
2494
          val = instance.name
2495
        elif field == "os":
2496
          val = instance.os
2497
        elif field == "pnode":
2498
          val = instance.primary_node
2499
        elif field == "snodes":
2500
          val = list(instance.secondary_nodes)
2501
        elif field == "admin_state":
2502
          val = (instance.status != "down")
2503
        elif field == "oper_state":
2504
          if instance.primary_node in bad_nodes:
2505
            val = None
2506
          else:
2507
            val = bool(live_data.get(instance.name))
2508
        elif field == "status":
2509
          if instance.primary_node in bad_nodes:
2510
            val = "ERROR_nodedown"
2511
          else:
2512
            running = bool(live_data.get(instance.name))
2513
            if running:
2514
              if instance.status != "down":
2515
                val = "running"
2516
              else:
2517
                val = "ERROR_up"
2518
            else:
2519
              if instance.status != "down":
2520
                val = "ERROR_down"
2521
              else:
2522
                val = "ADMIN_down"
2523
        elif field == "admin_ram":
2524
          val = instance.memory
2525
        elif field == "oper_ram":
2526
          if instance.primary_node in bad_nodes:
2527
            val = None
2528
          elif instance.name in live_data:
2529
            val = live_data[instance.name].get("memory", "?")
2530
          else:
2531
            val = "-"
2532
        elif field == "disk_template":
2533
          val = instance.disk_template
2534
        elif field == "ip":
2535
          val = instance.nics[0].ip
2536
        elif field == "bridge":
2537
          val = instance.nics[0].bridge
2538
        elif field == "mac":
2539
          val = instance.nics[0].mac
2540
        elif field == "sda_size" or field == "sdb_size":
2541
          disk = instance.FindDisk(field[:3])
2542
          if disk is None:
2543
            val = None
2544
          else:
2545
            val = disk.size
2546
        elif field == "vcpus":
2547
          val = instance.vcpus
2548
        elif field == "tags":
2549
          val = list(instance.GetTags())
2550
        else:
2551
          raise errors.ParameterError(field)
2552
        iout.append(val)
2553
      output.append(iout)
2554

    
2555
    return output
2556

    
2557

    
2558
class LUFailoverInstance(LogicalUnit):
2559
  """Failover an instance.
2560

2561
  """
2562
  HPATH = "instance-failover"
2563
  HTYPE = constants.HTYPE_INSTANCE
2564
  _OP_REQP = ["instance_name", "ignore_consistency"]
2565

    
2566
  def BuildHooksEnv(self):
2567
    """Build hooks env.
2568

2569
    This runs on master, primary and secondary nodes of the instance.
2570

2571
    """
2572
    env = {
2573
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2574
      }
2575
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2576
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2577
    return env, nl, nl
2578

    
2579
  def CheckPrereq(self):
2580
    """Check prerequisites.
2581

2582
    This checks that the instance is in the cluster.
2583

2584
    """
2585
    instance = self.cfg.GetInstanceInfo(
2586
      self.cfg.ExpandInstanceName(self.op.instance_name))
2587
    if instance is None:
2588
      raise errors.OpPrereqError("Instance '%s' not known" %
2589
                                 self.op.instance_name)
2590

    
2591
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2592
      raise errors.OpPrereqError("Instance's disk layout is not"
2593
                                 " network mirrored, cannot failover.")
2594

    
2595
    secondary_nodes = instance.secondary_nodes
2596
    if not secondary_nodes:
2597
      raise errors.ProgrammerError("no secondary node but using "
2598
                                   "a mirrored disk template")
2599

    
2600
    target_node = secondary_nodes[0]
2601
    # check memory requirements on the secondary node
2602
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2603
                         instance.name, instance.memory)
2604

    
2605
    # check bridge existance
2606
    brlist = [nic.bridge for nic in instance.nics]
2607
    if not rpc.call_bridges_exist(target_node, brlist):
2608
      raise errors.OpPrereqError("One or more target bridges %s does not"
2609
                                 " exist on destination node '%s'" %
2610
                                 (brlist, target_node))
2611

    
2612
    self.instance = instance
2613

    
2614
  def Exec(self, feedback_fn):
2615
    """Failover an instance.
2616

2617
    The failover is done by shutting it down on its present node and
2618
    starting it on the secondary.
2619

2620
    """
2621
    instance = self.instance
2622

    
2623
    source_node = instance.primary_node
2624
    target_node = instance.secondary_nodes[0]
2625

    
2626
    feedback_fn("* checking disk consistency between source and target")
2627
    for dev in instance.disks:
2628
      # for drbd, these are drbd over lvm
2629
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2630
        if instance.status == "up" and not self.op.ignore_consistency:
2631
          raise errors.OpExecError("Disk %s is degraded on target node,"
2632
                                   " aborting failover." % dev.iv_name)
2633

    
2634
    feedback_fn("* shutting down instance on source node")
2635
    logger.Info("Shutting down instance %s on node %s" %
2636
                (instance.name, source_node))
2637

    
2638
    if not rpc.call_instance_shutdown(source_node, instance):
2639
      if self.op.ignore_consistency:
2640
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2641
                     " anyway. Please make sure node %s is down"  %
2642
                     (instance.name, source_node, source_node))
2643
      else:
2644
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2645
                                 (instance.name, source_node))
2646

    
2647
    feedback_fn("* deactivating the instance's disks on source node")
2648
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2649
      raise errors.OpExecError("Can't shut down the instance's disks.")
2650

    
2651
    instance.primary_node = target_node
2652
    # distribute new instance config to the other nodes
2653
    self.cfg.Update(instance)
2654

    
2655
    # Only start the instance if it's marked as up
2656
    if instance.status == "up":
2657
      feedback_fn("* activating the instance's disks on target node")
2658
      logger.Info("Starting instance %s on node %s" %
2659
                  (instance.name, target_node))
2660

    
2661
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2662
                                               ignore_secondaries=True)
2663
      if not disks_ok:
2664
        _ShutdownInstanceDisks(instance, self.cfg)
2665
        raise errors.OpExecError("Can't activate the instance's disks")
2666

    
2667
      feedback_fn("* starting the instance on the target node")
2668
      if not rpc.call_instance_start(target_node, instance, None):
2669
        _ShutdownInstanceDisks(instance, self.cfg)
2670
        raise errors.OpExecError("Could not start instance %s on node %s." %
2671
                                 (instance.name, target_node))
2672

    
2673

    
2674
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2675
  """Create a tree of block devices on the primary node.
2676

2677
  This always creates all devices.
2678

2679
  """
2680
  if device.children:
2681
    for child in device.children:
2682
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2683
        return False
2684

    
2685
  cfg.SetDiskID(device, node)
2686
  new_id = rpc.call_blockdev_create(node, device, device.size,
2687
                                    instance.name, True, info)
2688
  if not new_id:
2689
    return False
2690
  if device.physical_id is None:
2691
    device.physical_id = new_id
2692
  return True
2693

    
2694

    
2695
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2696
  """Create a tree of block devices on a secondary node.
2697

2698
  If this device type has to be created on secondaries, create it and
2699
  all its children.
2700

2701
  If not, just recurse to children keeping the same 'force' value.
2702

2703
  """
2704
  if device.CreateOnSecondary():
2705
    force = True
2706
  if device.children:
2707
    for child in device.children:
2708
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2709
                                        child, force, info):
2710
        return False
2711

    
2712
  if not force:
2713
    return True
2714
  cfg.SetDiskID(device, node)
2715
  new_id = rpc.call_blockdev_create(node, device, device.size,
2716
                                    instance.name, False, info)
2717
  if not new_id:
2718
    return False
2719
  if device.physical_id is None:
2720
    device.physical_id = new_id
2721
  return True
2722

    
2723

    
2724
def _GenerateUniqueNames(cfg, exts):
2725
  """Generate a suitable LV name.
2726

2727
  This will generate a logical volume name for the given instance.
2728

2729
  """
2730
  results = []
2731
  for val in exts:
2732
    new_id = cfg.GenerateUniqueID()
2733
    results.append("%s%s" % (new_id, val))
2734
  return results
2735

    
2736

    
2737
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2738
  """Generate a drbd8 device complete with its children.
2739

2740
  """
2741
  port = cfg.AllocatePort()
2742
  vgname = cfg.GetVGName()
2743
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2744
                          logical_id=(vgname, names[0]))
2745
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2746
                          logical_id=(vgname, names[1]))
2747
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2748
                          logical_id = (primary, secondary, port),
2749
                          children = [dev_data, dev_meta],
2750
                          iv_name=iv_name)
2751
  return drbd_dev
2752

    
2753

    
2754
def _GenerateDiskTemplate(cfg, template_name,
2755
                          instance_name, primary_node,
2756
                          secondary_nodes, disk_sz, swap_sz,
2757
                          file_storage_dir, file_driver):
2758
  """Generate the entire disk layout for a given template type.
2759

2760
  """
2761
  #TODO: compute space requirements
2762

    
2763
  vgname = cfg.GetVGName()
2764
  if template_name == constants.DT_DISKLESS:
2765
    disks = []
2766
  elif template_name == constants.DT_PLAIN:
2767
    if len(secondary_nodes) != 0:
2768
      raise errors.ProgrammerError("Wrong template configuration")
2769

    
2770
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2771
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2772
                           logical_id=(vgname, names[0]),
2773
                           iv_name = "sda")
2774
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2775
                           logical_id=(vgname, names[1]),
2776
                           iv_name = "sdb")
2777
    disks = [sda_dev, sdb_dev]
2778
  elif template_name == constants.DT_DRBD8:
2779
    if len(secondary_nodes) != 1:
2780
      raise errors.ProgrammerError("Wrong template configuration")
2781
    remote_node = secondary_nodes[0]
2782
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2783
                                       ".sdb_data", ".sdb_meta"])
2784
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2785
                                         disk_sz, names[0:2], "sda")
2786
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2787
                                         swap_sz, names[2:4], "sdb")
2788
    disks = [drbd_sda_dev, drbd_sdb_dev]
2789
  elif template_name == constants.DT_FILE:
2790
    if len(secondary_nodes) != 0:
2791
      raise errors.ProgrammerError("Wrong template configuration")
2792

    
2793
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2794
                                iv_name="sda", logical_id=(file_driver,
2795
                                "%s/sda" % file_storage_dir))
2796
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2797
                                iv_name="sdb", logical_id=(file_driver,
2798
                                "%s/sdb" % file_storage_dir))
2799
    disks = [file_sda_dev, file_sdb_dev]
2800
  else:
2801
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2802
  return disks
2803

    
2804

    
2805
def _GetInstanceInfoText(instance):
2806
  """Compute that text that should be added to the disk's metadata.
2807

2808
  """
2809
  return "originstname+%s" % instance.name
2810

    
2811

    
2812
def _CreateDisks(cfg, instance):
2813
  """Create all disks for an instance.
2814

2815
  This abstracts away some work from AddInstance.
2816

2817
  Args:
2818
    instance: the instance object
2819

2820
  Returns:
2821
    True or False showing the success of the creation process
2822

2823
  """
2824
  info = _GetInstanceInfoText(instance)
2825

    
2826
  if instance.disk_template == constants.DT_FILE:
2827
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2828
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2829
                                              file_storage_dir)
2830

    
2831
    if not result:
2832
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2833
      return False
2834

    
2835
    if not result[0]:
2836
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2837
      return False
2838

    
2839
  for device in instance.disks:
2840
    logger.Info("creating volume %s for instance %s" %
2841
                (device.iv_name, instance.name))
2842
    #HARDCODE
2843
    for secondary_node in instance.secondary_nodes:
2844
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2845
                                        device, False, info):
2846
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2847
                     (device.iv_name, device, secondary_node))
2848
        return False
2849
    #HARDCODE
2850
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2851
                                    instance, device, info):
2852
      logger.Error("failed to create volume %s on primary!" %
2853
                   device.iv_name)
2854
      return False
2855

    
2856
  return True
2857

    
2858

    
2859
def _RemoveDisks(instance, cfg):
2860
  """Remove all disks for an instance.
2861

2862
  This abstracts away some work from `AddInstance()` and
2863
  `RemoveInstance()`. Note that in case some of the devices couldn't
2864
  be removed, the removal will continue with the other ones (compare
2865
  with `_CreateDisks()`).
2866

2867
  Args:
2868
    instance: the instance object
2869

2870
  Returns:
2871
    True or False showing the success of the removal proces
2872

2873
  """
2874
  logger.Info("removing block devices for instance %s" % instance.name)
2875

    
2876
  result = True
2877
  for device in instance.disks:
2878
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2879
      cfg.SetDiskID(disk, node)
2880
      if not rpc.call_blockdev_remove(node, disk):
2881
        logger.Error("could not remove block device %s on node %s,"
2882
                     " continuing anyway" %
2883
                     (device.iv_name, node))
2884
        result = False
2885

    
2886
  if instance.disk_template == constants.DT_FILE:
2887
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2888
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
2889
                                            file_storage_dir):
2890
      logger.Error("could not remove directory '%s'" % file_storage_dir)
2891
      result = False
2892

    
2893
  return result
2894

    
2895

    
2896
def _ComputeDiskSize(disk_template, disk_size, swap_size):
2897
  """Compute disk size requirements in the volume group
2898

2899
  This is currently hard-coded for the two-drive layout.
2900

2901
  """
2902
  # Required free disk space as a function of disk and swap space
2903
  req_size_dict = {
2904
    constants.DT_DISKLESS: None,
2905
    constants.DT_PLAIN: disk_size + swap_size,
2906
    # 256 MB are added for drbd metadata, 128MB for each drbd device
2907
    constants.DT_DRBD8: disk_size + swap_size + 256,
2908
    constants.DT_FILE: None,
2909
  }
2910

    
2911
  if disk_template not in req_size_dict:
2912
    raise errors.ProgrammerError("Disk template '%s' size requirement"
2913
                                 " is unknown" %  disk_template)
2914

    
2915
  return req_size_dict[disk_template]
2916

    
2917

    
2918
class LUCreateInstance(LogicalUnit):
2919
  """Create an instance.
2920

2921
  """
2922
  HPATH = "instance-add"
2923
  HTYPE = constants.HTYPE_INSTANCE
2924
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
2925
              "disk_template", "swap_size", "mode", "start", "vcpus",
2926
              "wait_for_sync", "ip_check", "mac"]
2927

    
2928
  def _RunAllocator(self):
2929
    """Run the allocator based on input opcode.
2930

2931
    """
2932
    disks = [{"size": self.op.disk_size, "mode": "w"},
2933
             {"size": self.op.swap_size, "mode": "w"}]
2934
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2935
             "bridge": self.op.bridge}]
2936
    ial = IAllocator(self.cfg, self.sstore,
2937
                     mode=constants.IALLOCATOR_MODE_ALLOC,
2938
                     name=self.op.instance_name,
2939
                     disk_template=self.op.disk_template,
2940
                     tags=[],
2941
                     os=self.op.os_type,
2942
                     vcpus=self.op.vcpus,
2943
                     mem_size=self.op.mem_size,
2944
                     disks=disks,
2945
                     nics=nics,
2946
                     )
2947

    
2948
    ial.Run(self.op.iallocator)
2949

    
2950
    if not ial.success:
2951
      raise errors.OpPrereqError("Can't compute nodes using"
2952
                                 " iallocator '%s': %s" % (self.op.iallocator,
2953
                                                           ial.info))
2954
    if len(ial.nodes) != ial.required_nodes:
2955
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2956
                                 " of nodes (%s), required %s" %
2957
                                 (len(ial.nodes), ial.required_nodes))
2958
    self.op.pnode = ial.nodes[0]
2959
    logger.ToStdout("Selected nodes for the instance: %s" %
2960
                    (", ".join(ial.nodes),))
2961
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2962
                (self.op.instance_name, self.op.iallocator, ial.nodes))
2963
    if ial.required_nodes == 2:
2964
      self.op.snode = ial.nodes[1]
2965

    
2966
  def BuildHooksEnv(self):
2967
    """Build hooks env.
2968

2969
    This runs on master, primary and secondary nodes of the instance.
2970

2971
    """
2972
    env = {
2973
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2974
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2975
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2976
      "INSTANCE_ADD_MODE": self.op.mode,
2977
      }
2978
    if self.op.mode == constants.INSTANCE_IMPORT:
2979
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2980
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2981
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2982

    
2983
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2984
      primary_node=self.op.pnode,
2985
      secondary_nodes=self.secondaries,
2986
      status=self.instance_status,
2987
      os_type=self.op.os_type,
2988
      memory=self.op.mem_size,
2989
      vcpus=self.op.vcpus,
2990
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2991
    ))
2992

    
2993
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2994
          self.secondaries)
2995
    return env, nl, nl
2996

    
2997

    
2998
  def CheckPrereq(self):
2999
    """Check prerequisites.
3000

3001
    """
3002
    # set optional parameters to none if they don't exist
3003
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3004
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3005
                 "vnc_bind_address"]:
3006
      if not hasattr(self.op, attr):
3007
        setattr(self.op, attr, None)
3008

    
3009
    if self.op.mode not in (constants.INSTANCE_CREATE,
3010
                            constants.INSTANCE_IMPORT):
3011
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3012
                                 self.op.mode)
3013

    
3014
    if (not self.cfg.GetVGName() and
3015
        self.op.disk_template not in constants.DTS_NOT_LVM):
3016
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3017
                                 " instances")
3018

    
3019
    if self.op.mode == constants.INSTANCE_IMPORT:
3020
      src_node = getattr(self.op, "src_node", None)
3021
      src_path = getattr(self.op, "src_path", None)
3022
      if src_node is None or src_path is None:
3023
        raise errors.OpPrereqError("Importing an instance requires source"
3024
                                   " node and path options")
3025
      src_node_full = self.cfg.ExpandNodeName(src_node)
3026
      if src_node_full is None:
3027
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3028
      self.op.src_node = src_node = src_node_full
3029

    
3030
      if not os.path.isabs(src_path):
3031
        raise errors.OpPrereqError("The source path must be absolute")
3032

    
3033
      export_info = rpc.call_export_info(src_node, src_path)
3034

    
3035
      if not export_info:
3036
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3037

    
3038
      if not export_info.has_section(constants.INISECT_EXP):
3039
        raise errors.ProgrammerError("Corrupted export config")
3040

    
3041
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3042
      if (int(ei_version) != constants.EXPORT_VERSION):
3043
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3044
                                   (ei_version, constants.EXPORT_VERSION))
3045

    
3046
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3047
        raise errors.OpPrereqError("Can't import instance with more than"
3048
                                   " one data disk")
3049

    
3050
      # FIXME: are the old os-es, disk sizes, etc. useful?
3051
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3052
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3053
                                                         'disk0_dump'))
3054
      self.src_image = diskimage
3055
    else: # INSTANCE_CREATE
3056
      if getattr(self.op, "os_type", None) is None:
3057
        raise errors.OpPrereqError("No guest OS specified")
3058

    
3059
    #### instance parameters check
3060

    
3061
    # disk template and mirror node verification
3062
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3063
      raise errors.OpPrereqError("Invalid disk template name")
3064

    
3065
    # instance name verification
3066
    hostname1 = utils.HostInfo(self.op.instance_name)
3067

    
3068
    self.op.instance_name = instance_name = hostname1.name
3069
    instance_list = self.cfg.GetInstanceList()
3070
    if instance_name in instance_list:
3071
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3072
                                 instance_name)
3073

    
3074
    # ip validity checks
3075
    ip = getattr(self.op, "ip", None)
3076
    if ip is None or ip.lower() == "none":
3077
      inst_ip = None
3078
    elif ip.lower() == "auto":
3079
      inst_ip = hostname1.ip
3080
    else:
3081
      if not utils.IsValidIP(ip):
3082
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3083
                                   " like a valid IP" % ip)
3084
      inst_ip = ip
3085
    self.inst_ip = self.op.ip = inst_ip
3086

    
3087
    if self.op.start and not self.op.ip_check:
3088
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3089
                                 " adding an instance in start mode")
3090

    
3091
    if self.op.ip_check:
3092
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3093
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3094
                                   (hostname1.ip, instance_name))
3095

    
3096
    # MAC address verification
3097
    if self.op.mac != "auto":
3098
      if not utils.IsValidMac(self.op.mac.lower()):
3099
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3100
                                   self.op.mac)
3101

    
3102
    # bridge verification
3103
    bridge = getattr(self.op, "bridge", None)
3104
    if bridge is None:
3105
      self.op.bridge = self.cfg.GetDefBridge()
3106
    else:
3107
      self.op.bridge = bridge
3108

    
3109
    # boot order verification
3110
    if self.op.hvm_boot_order is not None:
3111
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3112
        raise errors.OpPrereqError("invalid boot order specified,"
3113
                                   " must be one or more of [acdn]")
3114
    # file storage checks
3115
    if (self.op.file_driver and
3116
        not self.op.file_driver in constants.FILE_DRIVER):
3117
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3118
                                 self.op.file_driver)
3119

    
3120
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3121
      raise errors.OpPrereqError("File storage directory not a relative"
3122
                                 " path")
3123
    #### allocator run
3124

    
3125
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3126
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3127
                                 " node must be given")
3128

    
3129
    if self.op.iallocator is not None:
3130
      self._RunAllocator()
3131

    
3132
    #### node related checks
3133

    
3134
    # check primary node
3135
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3136
    if pnode is None:
3137
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3138
                                 self.op.pnode)
3139
    self.op.pnode = pnode.name
3140
    self.pnode = pnode
3141
    self.secondaries = []
3142

    
3143
    # mirror node verification
3144
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3145
      if getattr(self.op, "snode", None) is None:
3146
        raise errors.OpPrereqError("The networked disk templates need"
3147
                                   " a mirror node")
3148

    
3149
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3150
      if snode_name is None:
3151
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3152
                                   self.op.snode)
3153
      elif snode_name == pnode.name:
3154
        raise errors.OpPrereqError("The secondary node cannot be"
3155
                                   " the primary node.")
3156
      self.secondaries.append(snode_name)
3157

    
3158
    req_size = _ComputeDiskSize(self.op.disk_template,
3159
                                self.op.disk_size, self.op.swap_size)
3160

    
3161
    # Check lv size requirements
3162
    if req_size is not None:
3163
      nodenames = [pnode.name] + self.secondaries
3164
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3165
      for node in nodenames:
3166
        info = nodeinfo.get(node, None)
3167
        if not info:
3168
          raise errors.OpPrereqError("Cannot get current information"
3169
                                     " from node '%s'" % node)
3170
        vg_free = info.get('vg_free', None)
3171
        if not isinstance(vg_free, int):
3172
          raise errors.OpPrereqError("Can't compute free disk space on"
3173
                                     " node %s" % node)
3174
        if req_size > info['vg_free']:
3175
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3176
                                     " %d MB available, %d MB required" %
3177
                                     (node, info['vg_free'], req_size))
3178

    
3179
    # os verification
3180
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3181
    if not os_obj:
3182
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3183
                                 " primary node"  % self.op.os_type)
3184

    
3185
    if self.op.kernel_path == constants.VALUE_NONE:
3186
      raise errors.OpPrereqError("Can't set instance kernel to none")
3187

    
3188

    
3189
    # bridge check on primary node
3190
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3191
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3192
                                 " destination node '%s'" %
3193
                                 (self.op.bridge, pnode.name))
3194

    
3195
    # memory check on primary node
3196
    if self.op.start:
3197
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3198
                           "creating instance %s" % self.op.instance_name,
3199
                           self.op.mem_size)
3200

    
3201
    # hvm_cdrom_image_path verification
3202
    if self.op.hvm_cdrom_image_path is not None:
3203
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3204
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3205
                                   " be an absolute path or None, not %s" %
3206
                                   self.op.hvm_cdrom_image_path)
3207
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3208
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3209
                                   " regular file or a symlink pointing to"
3210
                                   " an existing regular file, not %s" %
3211
                                   self.op.hvm_cdrom_image_path)
3212

    
3213
    # vnc_bind_address verification
3214
    if self.op.vnc_bind_address is not None:
3215
      if not utils.IsValidIP(self.op.vnc_bind_address):
3216
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3217
                                   " like a valid IP address" %
3218
                                   self.op.vnc_bind_address)
3219

    
3220
    if self.op.start:
3221
      self.instance_status = 'up'
3222
    else:
3223
      self.instance_status = 'down'
3224

    
3225
  def Exec(self, feedback_fn):
3226
    """Create and add the instance to the cluster.
3227

3228
    """
3229
    instance = self.op.instance_name
3230
    pnode_name = self.pnode.name
3231

    
3232
    if self.op.mac == "auto":
3233
      mac_address = self.cfg.GenerateMAC()
3234
    else:
3235
      mac_address = self.op.mac
3236

    
3237
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3238
    if self.inst_ip is not None:
3239
      nic.ip = self.inst_ip
3240

    
3241
    ht_kind = self.sstore.GetHypervisorType()
3242
    if ht_kind in constants.HTS_REQ_PORT:
3243
      network_port = self.cfg.AllocatePort()
3244
    else:
3245
      network_port = None
3246

    
3247
    if self.op.vnc_bind_address is None:
3248
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3249

    
3250
    # this is needed because os.path.join does not accept None arguments
3251
    if self.op.file_storage_dir is None:
3252
      string_file_storage_dir = ""
3253
    else:
3254
      string_file_storage_dir = self.op.file_storage_dir
3255

    
3256
    # build the full file storage dir path
3257
    file_storage_dir = os.path.normpath(os.path.join(
3258
                                        self.sstore.GetFileStorageDir(),
3259
                                        string_file_storage_dir, instance))
3260

    
3261

    
3262
    disks = _GenerateDiskTemplate(self.cfg,
3263
                                  self.op.disk_template,
3264
                                  instance, pnode_name,
3265
                                  self.secondaries, self.op.disk_size,
3266
                                  self.op.swap_size,
3267
                                  file_storage_dir,
3268
                                  self.op.file_driver)
3269

    
3270
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3271
                            primary_node=pnode_name,
3272
                            memory=self.op.mem_size,
3273
                            vcpus=self.op.vcpus,
3274
                            nics=[nic], disks=disks,
3275
                            disk_template=self.op.disk_template,
3276
                            status=self.instance_status,
3277
                            network_port=network_port,
3278
                            kernel_path=self.op.kernel_path,
3279
                            initrd_path=self.op.initrd_path,
3280
                            hvm_boot_order=self.op.hvm_boot_order,
3281
                            hvm_acpi=self.op.hvm_acpi,
3282
                            hvm_pae=self.op.hvm_pae,
3283
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3284
                            vnc_bind_address=self.op.vnc_bind_address,
3285
                            )
3286

    
3287
    feedback_fn("* creating instance disks...")
3288
    if not _CreateDisks(self.cfg, iobj):
3289
      _RemoveDisks(iobj, self.cfg)
3290
      raise errors.OpExecError("Device creation failed, reverting...")
3291

    
3292
    feedback_fn("adding instance %s to cluster config" % instance)
3293

    
3294
    self.cfg.AddInstance(iobj)
3295
    # Add the new instance to the Ganeti Lock Manager
3296
    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3297

    
3298
    if self.op.wait_for_sync:
3299
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3300
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3301
      # make sure the disks are not degraded (still sync-ing is ok)
3302
      time.sleep(15)
3303
      feedback_fn("* checking mirrors status")
3304
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3305
    else:
3306
      disk_abort = False
3307

    
3308
    if disk_abort:
3309
      _RemoveDisks(iobj, self.cfg)
3310
      self.cfg.RemoveInstance(iobj.name)
3311
      # Remove the new instance from the Ganeti Lock Manager
3312
      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3313
      raise errors.OpExecError("There are some degraded disks for"
3314
                               " this instance")
3315

    
3316
    feedback_fn("creating os for instance %s on node %s" %
3317
                (instance, pnode_name))
3318

    
3319
    if iobj.disk_template != constants.DT_DISKLESS:
3320
      if self.op.mode == constants.INSTANCE_CREATE:
3321
        feedback_fn("* running the instance OS create scripts...")
3322
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3323
          raise errors.OpExecError("could not add os for instance %s"
3324
                                   " on node %s" %
3325
                                   (instance, pnode_name))
3326

    
3327
      elif self.op.mode == constants.INSTANCE_IMPORT:
3328
        feedback_fn("* running the instance OS import scripts...")
3329
        src_node = self.op.src_node
3330
        src_image = self.src_image
3331
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3332
                                                src_node, src_image):
3333
          raise errors.OpExecError("Could not import os for instance"
3334
                                   " %s on node %s" %
3335
                                   (instance, pnode_name))
3336
      else:
3337
        # also checked in the prereq part
3338
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3339
                                     % self.op.mode)
3340

    
3341
    if self.op.start:
3342
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3343
      feedback_fn("* starting instance...")
3344
      if not rpc.call_instance_start(pnode_name, iobj, None):
3345
        raise errors.OpExecError("Could not start instance")
3346

    
3347

    
3348
class LUConnectConsole(NoHooksLU):
3349
  """Connect to an instance's console.
3350

3351
  This is somewhat special in that it returns the command line that
3352
  you need to run on the master node in order to connect to the
3353
  console.
3354

3355
  """
3356
  _OP_REQP = ["instance_name"]
3357
  REQ_BGL = False
3358

    
3359
  def ExpandNames(self):
3360
    self._ExpandAndLockInstance()
3361

    
3362
  def CheckPrereq(self):
3363
    """Check prerequisites.
3364

3365
    This checks that the instance is in the cluster.
3366

3367
    """
3368
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3369
    assert self.instance is not None, \
3370
      "Cannot retrieve locked instance %s" % self.op.instance_name
3371

    
3372
  def Exec(self, feedback_fn):
3373
    """Connect to the console of an instance
3374

3375
    """
3376
    instance = self.instance
3377
    node = instance.primary_node
3378

    
3379
    node_insts = rpc.call_instance_list([node])[node]
3380
    if node_insts is False:
3381
      raise errors.OpExecError("Can't connect to node %s." % node)
3382

    
3383
    if instance.name not in node_insts:
3384
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3385

    
3386
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3387

    
3388
    hyper = hypervisor.GetHypervisor()
3389
    console_cmd = hyper.GetShellCommandForConsole(instance)
3390

    
3391
    # build ssh cmdline
3392
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3393

    
3394

    
3395
class LUReplaceDisks(LogicalUnit):
3396
  """Replace the disks of an instance.
3397

3398
  """
3399
  HPATH = "mirrors-replace"
3400
  HTYPE = constants.HTYPE_INSTANCE
3401
  _OP_REQP = ["instance_name", "mode", "disks"]
3402

    
3403
  def _RunAllocator(self):
3404
    """Compute a new secondary node using an IAllocator.
3405

3406
    """
3407
    ial = IAllocator(self.cfg, self.sstore,
3408
                     mode=constants.IALLOCATOR_MODE_RELOC,
3409
                     name=self.op.instance_name,
3410
                     relocate_from=[self.sec_node])
3411

    
3412
    ial.Run(self.op.iallocator)
3413

    
3414
    if not ial.success:
3415
      raise errors.OpPrereqError("Can't compute nodes using"
3416
                                 " iallocator '%s': %s" % (self.op.iallocator,
3417
                                                           ial.info))
3418
    if len(ial.nodes) != ial.required_nodes:
3419
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3420
                                 " of nodes (%s), required %s" %
3421
                                 (len(ial.nodes), ial.required_nodes))
3422
    self.op.remote_node = ial.nodes[0]
3423
    logger.ToStdout("Selected new secondary for the instance: %s" %
3424
                    self.op.remote_node)
3425

    
3426
  def BuildHooksEnv(self):
3427
    """Build hooks env.
3428

3429
    This runs on the master, the primary and all the secondaries.
3430

3431
    """
3432
    env = {
3433
      "MODE": self.op.mode,
3434
      "NEW_SECONDARY": self.op.remote_node,
3435
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3436
      }
3437
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3438
    nl = [
3439
      self.sstore.GetMasterNode(),
3440
      self.instance.primary_node,
3441
      ]
3442
    if self.op.remote_node is not None:
3443
      nl.append(self.op.remote_node)
3444
    return env, nl, nl
3445

    
3446
  def CheckPrereq(self):
3447
    """Check prerequisites.
3448

3449
    This checks that the instance is in the cluster.
3450

3451
    """
3452
    if not hasattr(self.op, "remote_node"):
3453
      self.op.remote_node = None
3454

    
3455
    instance = self.cfg.GetInstanceInfo(
3456
      self.cfg.ExpandInstanceName(self.op.instance_name))
3457
    if instance is None:
3458
      raise errors.OpPrereqError("Instance '%s' not known" %
3459
                                 self.op.instance_name)
3460
    self.instance = instance
3461
    self.op.instance_name = instance.name
3462

    
3463
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3464
      raise errors.OpPrereqError("Instance's disk layout is not"
3465
                                 " network mirrored.")
3466

    
3467
    if len(instance.secondary_nodes) != 1:
3468
      raise errors.OpPrereqError("The instance has a strange layout,"
3469
                                 " expected one secondary but found %d" %
3470
                                 len(instance.secondary_nodes))
3471

    
3472
    self.sec_node = instance.secondary_nodes[0]
3473

    
3474
    ia_name = getattr(self.op, "iallocator", None)
3475
    if ia_name is not None:
3476
      if self.op.remote_node is not None:
3477
        raise errors.OpPrereqError("Give either the iallocator or the new"
3478
                                   " secondary, not both")
3479
      self.op.remote_node = self._RunAllocator()
3480

    
3481
    remote_node = self.op.remote_node
3482
    if remote_node is not None:
3483
      remote_node = self.cfg.ExpandNodeName(remote_node)
3484
      if remote_node is None:
3485
        raise errors.OpPrereqError("Node '%s' not known" %
3486
                                   self.op.remote_node)
3487
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3488
    else:
3489
      self.remote_node_info = None
3490
    if remote_node == instance.primary_node:
3491
      raise errors.OpPrereqError("The specified node is the primary node of"
3492
                                 " the instance.")
3493
    elif remote_node == self.sec_node:
3494
      if self.op.mode == constants.REPLACE_DISK_SEC:
3495
        # this is for DRBD8, where we can't execute the same mode of
3496
        # replacement as for drbd7 (no different port allocated)
3497
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3498
                                   " replacement")
3499
    if instance.disk_template == constants.DT_DRBD8:
3500
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3501
          remote_node is not None):
3502
        # switch to replace secondary mode
3503
        self.op.mode = constants.REPLACE_DISK_SEC
3504

    
3505
      if self.op.mode == constants.REPLACE_DISK_ALL:
3506
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3507
                                   " secondary disk replacement, not"
3508
                                   " both at once")
3509
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3510
        if remote_node is not None:
3511
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3512
                                     " the secondary while doing a primary"
3513
                                     " node disk replacement")
3514
        self.tgt_node = instance.primary_node
3515
        self.oth_node = instance.secondary_nodes[0]
3516
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3517
        self.new_node = remote_node # this can be None, in which case
3518
                                    # we don't change the secondary
3519
        self.tgt_node = instance.secondary_nodes[0]
3520
        self.oth_node = instance.primary_node
3521
      else:
3522
        raise errors.ProgrammerError("Unhandled disk replace mode")
3523

    
3524
    for name in self.op.disks:
3525
      if instance.FindDisk(name) is None:
3526
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3527
                                   (name, instance.name))
3528
    self.op.remote_node = remote_node
3529

    
3530
  def _ExecD8DiskOnly(self, feedback_fn):
3531
    """Replace a disk on the primary or secondary for dbrd8.
3532

3533
    The algorithm for replace is quite complicated:
3534
      - for each disk to be replaced:
3535
        - create new LVs on the target node with unique names
3536
        - detach old LVs from the drbd device
3537
        - rename old LVs to name_replaced.<time_t>
3538
        - rename new LVs to old LVs
3539
        - attach the new LVs (with the old names now) to the drbd device
3540
      - wait for sync across all devices
3541
      - for each modified disk:
3542
        - remove old LVs (which have the name name_replaces.<time_t>)
3543

3544
    Failures are not very well handled.
3545

3546
    """
3547
    steps_total = 6
3548
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3549
    instance = self.instance
3550
    iv_names = {}
3551
    vgname = self.cfg.GetVGName()
3552
    # start of work
3553
    cfg = self.cfg
3554
    tgt_node = self.tgt_node
3555
    oth_node = self.oth_node
3556

    
3557
    # Step: check device activation
3558
    self.proc.LogStep(1, steps_total, "check device existence")
3559
    info("checking volume groups")
3560
    my_vg = cfg.GetVGName()
3561
    results = rpc.call_vg_list([oth_node, tgt_node])
3562
    if not results:
3563
      raise errors.OpExecError("Can't list volume groups on the nodes")
3564
    for node in oth_node, tgt_node:
3565
      res = results.get(node, False)
3566
      if not res or my_vg not in res:
3567
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3568
                                 (my_vg, node))
3569
    for dev in instance.disks:
3570
      if not dev.iv_name in self.op.disks:
3571
        continue
3572
      for node in tgt_node, oth_node:
3573
        info("checking %s on %s" % (dev.iv_name, node))
3574
        cfg.SetDiskID(dev, node)
3575
        if not rpc.call_blockdev_find(node, dev):
3576
          raise errors.OpExecError("Can't find device %s on node %s" %
3577
                                   (dev.iv_name, node))
3578

    
3579
    # Step: check other node consistency
3580
    self.proc.LogStep(2, steps_total, "check peer consistency")
3581
    for dev in instance.disks:
3582
      if not dev.iv_name in self.op.disks:
3583
        continue
3584
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3585
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3586
                                   oth_node==instance.primary_node):
3587
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3588
                                 " to replace disks on this node (%s)" %
3589
                                 (oth_node, tgt_node))
3590

    
3591
    # Step: create new storage
3592
    self.proc.LogStep(3, steps_total, "allocate new storage")
3593
    for dev in instance.disks:
3594
      if not dev.iv_name in self.op.disks:
3595
        continue
3596
      size = dev.size
3597
      cfg.SetDiskID(dev, tgt_node)
3598
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3599
      names = _GenerateUniqueNames(cfg, lv_names)
3600
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3601
                             logical_id=(vgname, names[0]))
3602
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3603
                             logical_id=(vgname, names[1]))
3604
      new_lvs = [lv_data, lv_meta]
3605
      old_lvs = dev.children
3606
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3607
      info("creating new local storage on %s for %s" %
3608
           (tgt_node, dev.iv_name))
3609
      # since we *always* want to create this LV, we use the
3610
      # _Create...OnPrimary (which forces the creation), even if we
3611
      # are talking about the secondary node
3612
      for new_lv in new_lvs:
3613
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3614
                                        _GetInstanceInfoText(instance)):
3615
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3616
                                   " node '%s'" %
3617
                                   (new_lv.logical_id[1], tgt_node))
3618

    
3619
    # Step: for each lv, detach+rename*2+attach
3620
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3621
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3622
      info("detaching %s drbd from local storage" % dev.iv_name)
3623
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3624
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3625
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3626
      #dev.children = []
3627
      #cfg.Update(instance)
3628

    
3629
      # ok, we created the new LVs, so now we know we have the needed
3630
      # storage; as such, we proceed on the target node to rename
3631
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3632
      # using the assumption that logical_id == physical_id (which in
3633
      # turn is the unique_id on that node)
3634

    
3635
      # FIXME(iustin): use a better name for the replaced LVs
3636
      temp_suffix = int(time.time())
3637
      ren_fn = lambda d, suff: (d.physical_id[0],
3638
                                d.physical_id[1] + "_replaced-%s" % suff)
3639
      # build the rename list based on what LVs exist on the node
3640
      rlist = []
3641
      for to_ren in old_lvs:
3642
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3643
        if find_res is not None: # device exists
3644
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3645

    
3646
      info("renaming the old LVs on the target node")
3647
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3648
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3649
      # now we rename the new LVs to the old LVs
3650
      info("renaming the new LVs on the target node")
3651
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3652
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3653
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3654

    
3655
      for old, new in zip(old_lvs, new_lvs):
3656
        new.logical_id = old.logical_id
3657
        cfg.SetDiskID(new, tgt_node)
3658

    
3659
      for disk in old_lvs:
3660
        disk.logical_id = ren_fn(disk, temp_suffix)
3661
        cfg.SetDiskID(disk, tgt_node)
3662

    
3663
      # now that the new lvs have the old name, we can add them to the device
3664
      info("adding new mirror component on %s" % tgt_node)
3665
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3666
        for new_lv in new_lvs:
3667
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3668
            warning("Can't rollback device %s", hint="manually cleanup unused"
3669
                    " logical volumes")
3670
        raise errors.OpExecError("Can't add local storage to drbd")
3671

    
3672
      dev.children = new_lvs
3673
      cfg.Update(instance)
3674

    
3675
    # Step: wait for sync
3676

    
3677
    # this can fail as the old devices are degraded and _WaitForSync
3678
    # does a combined result over all disks, so we don't check its
3679
    # return value
3680
    self.proc.LogStep(5, steps_total, "sync devices")
3681
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3682

    
3683
    # so check manually all the devices
3684
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3685
      cfg.SetDiskID(dev, instance.primary_node)
3686
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3687
      if is_degr:
3688
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3689

    
3690
    # Step: remove old storage
3691
    self.proc.LogStep(6, steps_total, "removing old storage")
3692
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3693
      info("remove logical volumes for %s" % name)
3694
      for lv in old_lvs:
3695
        cfg.SetDiskID(lv, tgt_node)
3696
        if not rpc.call_blockdev_remove(tgt_node, lv):
3697
          warning("Can't remove old LV", hint="manually remove unused LVs")
3698
          continue
3699

    
3700
  def _ExecD8Secondary(self, feedback_fn):
3701
    """Replace the secondary node for drbd8.
3702

3703
    The algorithm for replace is quite complicated:
3704
      - for all disks of the instance:
3705
        - create new LVs on the new node with same names
3706
        - shutdown the drbd device on the old secondary
3707
        - disconnect the drbd network on the primary
3708
        - create the drbd device on the new secondary
3709
        - network attach the drbd on the primary, using an artifice:
3710
          the drbd code for Attach() will connect to the network if it
3711
          finds a device which is connected to the good local disks but
3712
          not network enabled
3713
      - wait for sync across all devices
3714
      - remove all disks from the old secondary
3715

3716
    Failures are not very well handled.
3717

3718
    """
3719
    steps_total = 6
3720
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3721
    instance = self.instance
3722
    iv_names = {}
3723
    vgname = self.cfg.GetVGName()
3724
    # start of work
3725
    cfg = self.cfg
3726
    old_node = self.tgt_node
3727
    new_node = self.new_node
3728
    pri_node = instance.primary_node
3729

    
3730
    # Step: check device activation
3731
    self.proc.LogStep(1, steps_total, "check device existence")
3732
    info("checking volume groups")
3733
    my_vg = cfg.GetVGName()
3734
    results = rpc.call_vg_list([pri_node, new_node])
3735
    if not results:
3736
      raise errors.OpExecError("Can't list volume groups on the nodes")
3737
    for node in pri_node, new_node:
3738
      res = results.get(node, False)
3739
      if not res or my_vg not in res:
3740
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3741
                                 (my_vg, node))
3742
    for dev in instance.disks:
3743
      if not dev.iv_name in self.op.disks:
3744
        continue
3745
      info("checking %s on %s" % (dev.iv_name, pri_node))
3746
      cfg.SetDiskID(dev, pri_node)
3747
      if not rpc.call_blockdev_find(pri_node, dev):
3748
        raise errors.OpExecError("Can't find device %s on node %s" %
3749
                                 (dev.iv_name, pri_node))
3750

    
3751
    # Step: check other node consistency
3752
    self.proc.LogStep(2, steps_total, "check peer consistency")
3753
    for dev in instance.disks:
3754
      if not dev.iv_name in self.op.disks:
3755
        continue
3756
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3757
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3758
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3759
                                 " unsafe to replace the secondary" %
3760
                                 pri_node)
3761

    
3762
    # Step: create new storage
3763
    self.proc.LogStep(3, steps_total, "allocate new storage")
3764
    for dev in instance.disks:
3765
      size = dev.size
3766
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3767
      # since we *always* want to create this LV, we use the
3768
      # _Create...OnPrimary (which forces the creation), even if we
3769
      # are talking about the secondary node
3770
      for new_lv in dev.children:
3771
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3772
                                        _GetInstanceInfoText(instance)):
3773
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3774
                                   " node '%s'" %
3775
                                   (new_lv.logical_id[1], new_node))
3776

    
3777
      iv_names[dev.iv_name] = (dev, dev.children)
3778

    
3779
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3780
    for dev in instance.disks:
3781
      size = dev.size
3782
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3783
      # create new devices on new_node
3784
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3785
                              logical_id=(pri_node, new_node,
3786
                                          dev.logical_id[2]),
3787
                              children=dev.children)
3788
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3789
                                        new_drbd, False,
3790
                                      _GetInstanceInfoText(instance)):
3791
        raise errors.OpExecError("Failed to create new DRBD on"
3792
                                 " node '%s'" % new_node)
3793

    
3794
    for dev in instance.disks:
3795
      # we have new devices, shutdown the drbd on the old secondary
3796
      info("shutting down drbd for %s on old node" % dev.iv_name)
3797
      cfg.SetDiskID(dev, old_node)
3798
      if not rpc.call_blockdev_shutdown(old_node, dev):
3799
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3800
                hint="Please cleanup this device manually as soon as possible")
3801

    
3802
    info("detaching primary drbds from the network (=> standalone)")
3803
    done = 0
3804
    for dev in instance.disks:
3805
      cfg.SetDiskID(dev, pri_node)
3806
      # set the physical (unique in bdev terms) id to None, meaning
3807
      # detach from network
3808
      dev.physical_id = (None,) * len(dev.physical_id)
3809
      # and 'find' the device, which will 'fix' it to match the
3810
      # standalone state
3811
      if rpc.call_blockdev_find(pri_node, dev):
3812
        done += 1
3813
      else:
3814
        warning("Failed to detach drbd %s from network, unusual case" %
3815
                dev.iv_name)
3816

    
3817
    if not done:
3818
      # no detaches succeeded (very unlikely)
3819
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3820

    
3821
    # if we managed to detach at least one, we update all the disks of
3822
    # the instance to point to the new secondary
3823
    info("updating instance configuration")
3824
    for dev in instance.disks:
3825
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3826
      cfg.SetDiskID(dev, pri_node)
3827
    cfg.Update(instance)
3828

    
3829
    # and now perform the drbd attach
3830
    info("attaching primary drbds to new secondary (standalone => connected)")
3831
    failures = []
3832
    for dev in instance.disks:
3833
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3834
      # since the attach is smart, it's enough to 'find' the device,
3835
      # it will automatically activate the network, if the physical_id
3836
      # is correct
3837
      cfg.SetDiskID(dev, pri_node)
3838
      if not rpc.call_blockdev_find(pri_node, dev):
3839
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3840
                "please do a gnt-instance info to see the status of disks")
3841

    
3842
    # this can fail as the old devices are degraded and _WaitForSync
3843
    # does a combined result over all disks, so we don't check its
3844
    # return value
3845
    self.proc.LogStep(5, steps_total, "sync devices")
3846
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3847

    
3848
    # so check manually all the devices
3849
    for name, (dev, old_lvs) in iv_names.iteritems():
3850
      cfg.SetDiskID(dev, pri_node)
3851
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3852
      if is_degr:
3853
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3854

    
3855
    self.proc.LogStep(6, steps_total, "removing old storage")
3856
    for name, (dev, old_lvs) in iv_names.iteritems():
3857
      info("remove logical volumes for %s" % name)
3858
      for lv in old_lvs:
3859
        cfg.SetDiskID(lv, old_node)
3860
        if not rpc.call_blockdev_remove(old_node, lv):
3861
          warning("Can't remove LV on old secondary",
3862
                  hint="Cleanup stale volumes by hand")
3863

    
3864
  def Exec(self, feedback_fn):
3865
    """Execute disk replacement.
3866

3867
    This dispatches the disk replacement to the appropriate handler.
3868

3869
    """
3870
    instance = self.instance
3871

    
3872
    # Activate the instance disks if we're replacing them on a down instance
3873
    if instance.status == "down":
3874
      op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3875
      self.proc.ChainOpCode(op)
3876

    
3877
    if instance.disk_template == constants.DT_DRBD8:
3878
      if self.op.remote_node is None:
3879
        fn = self._ExecD8DiskOnly
3880
      else:
3881
        fn = self._ExecD8Secondary
3882
    else:
3883
      raise errors.ProgrammerError("Unhandled disk replacement case")
3884

    
3885
    ret = fn(feedback_fn)
3886

    
3887
    # Deactivate the instance disks if we're replacing them on a down instance
3888
    if instance.status == "down":
3889
      op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3890
      self.proc.ChainOpCode(op)
3891

    
3892
    return ret
3893

    
3894

    
3895
class LUGrowDisk(LogicalUnit):
3896
  """Grow a disk of an instance.
3897

3898
  """
3899
  HPATH = "disk-grow"
3900
  HTYPE = constants.HTYPE_INSTANCE
3901
  _OP_REQP = ["instance_name", "disk", "amount"]
3902

    
3903
  def BuildHooksEnv(self):
3904
    """Build hooks env.
3905

3906
    This runs on the master, the primary and all the secondaries.
3907

3908
    """
3909
    env = {
3910
      "DISK": self.op.disk,
3911
      "AMOUNT": self.op.amount,
3912
      }
3913
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3914
    nl = [
3915
      self.sstore.GetMasterNode(),
3916
      self.instance.primary_node,
3917
      ]
3918
    return env, nl, nl
3919

    
3920
  def CheckPrereq(self):
3921
    """Check prerequisites.
3922

3923
    This checks that the instance is in the cluster.
3924

3925
    """
3926
    instance = self.cfg.GetInstanceInfo(
3927
      self.cfg.ExpandInstanceName(self.op.instance_name))
3928
    if instance is None:
3929
      raise errors.OpPrereqError("Instance '%s' not known" %
3930
                                 self.op.instance_name)
3931
    self.instance = instance
3932
    self.op.instance_name = instance.name
3933

    
3934
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3935
      raise errors.OpPrereqError("Instance's disk layout does not support"
3936
                                 " growing.")
3937

    
3938
    if instance.FindDisk(self.op.disk) is None:
3939
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3940
                                 (self.op.disk, instance.name))
3941

    
3942
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3943
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3944
    for node in nodenames:
3945
      info = nodeinfo.get(node, None)
3946
      if not info:
3947
        raise errors.OpPrereqError("Cannot get current information"
3948
                                   " from node '%s'" % node)
3949
      vg_free = info.get('vg_free', None)
3950
      if not isinstance(vg_free, int):
3951
        raise errors.OpPrereqError("Can't compute free disk space on"
3952
                                   " node %s" % node)
3953
      if self.op.amount > info['vg_free']:
3954
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
3955
                                   " %d MiB available, %d MiB required" %
3956
                                   (node, info['vg_free'], self.op.amount))