Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 642339cf

History | View | Annotate | Download (170.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46
from ganeti import serializer
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq
55
    - implement Exec
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_MASTER: the LU needs to run on the master node
60
        REQ_WSSTORE: the LU needs a writable SimpleStore
61
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
62

63
  Note that all commands require root permissions.
64

65
  """
66
  HPATH = None
67
  HTYPE = None
68
  _OP_REQP = []
69
  REQ_MASTER = True
70
  REQ_WSSTORE = False
71
  REQ_BGL = True
72

    
73
  def __init__(self, processor, op, context, sstore):
74
    """Constructor for LogicalUnit.
75

76
    This needs to be overriden in derived classes in order to check op
77
    validity.
78

79
    """
80
    self.proc = processor
81
    self.op = op
82
    self.cfg = context.cfg
83
    self.sstore = sstore
84
    self.context = context
85
    self.needed_locks = None
86
    self.__ssh = None
87

    
88
    for attr_name in self._OP_REQP:
89
      attr_val = getattr(op, attr_name, None)
90
      if attr_val is None:
91
        raise errors.OpPrereqError("Required parameter '%s' missing" %
92
                                   attr_name)
93

    
94
    if not self.cfg.IsCluster():
95
      raise errors.OpPrereqError("Cluster not initialized yet,"
96
                                 " use 'gnt-cluster init' first.")
97
    if self.REQ_MASTER:
98
      master = sstore.GetMasterNode()
99
      if master != utils.HostInfo().name:
100
        raise errors.OpPrereqError("Commands must be run on the master"
101
                                   " node %s" % master)
102

    
103
  def __GetSSH(self):
104
    """Returns the SshRunner object
105

106
    """
107
    if not self.__ssh:
108
      self.__ssh = ssh.SshRunner(self.sstore)
109
    return self.__ssh
110

    
111
  ssh = property(fget=__GetSSH)
112

    
113
  def ExpandNames(self):
114
    """Expand names for this LU.
115

116
    This method is called before starting to execute the opcode, and it should
117
    update all the parameters of the opcode to their canonical form (e.g. a
118
    short node name must be fully expanded after this method has successfully
119
    completed). This way locking, hooks, logging, ecc. can work correctly.
120

121
    LUs which implement this method must also populate the self.needed_locks
122
    member, as a dict with lock levels as keys, and a list of needed lock names
123
    as values. Rules:
124
      - Use an empty dict if you don't need any lock
125
      - If you don't need any lock at a particular level omit that level
126
      - Don't put anything for the BGL level
127
      - If you want all locks at a level use None as a value
128
        (this reflects what LockSet does, and will be replaced before
129
        CheckPrereq with the full list of nodes that have been locked)
130

131
    Examples:
132
    # Acquire all nodes and one instance
133
    self.needed_locks = {
134
      locking.LEVEL_NODE: None,
135
      locking.LEVEL_INSTANCES: ['instance1.example.tld'],
136
    }
137
    # Acquire just two nodes
138
    self.needed_locks = {
139
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
140
    }
141
    # Acquire no locks
142
    self.needed_locks = {} # No, you can't leave it to the default value None
143

144
    """
145
    # The implementation of this method is mandatory only if the new LU is
146
    # concurrent, so that old LUs don't need to be changed all at the same
147
    # time.
148
    if self.REQ_BGL:
149
      self.needed_locks = {} # Exclusive LUs don't need locks.
150
    else:
151
      raise NotImplementedError
152

    
153
  def CheckPrereq(self):
154
    """Check prerequisites for this LU.
155

156
    This method should check that the prerequisites for the execution
157
    of this LU are fulfilled. It can do internode communication, but
158
    it should be idempotent - no cluster or system changes are
159
    allowed.
160

161
    The method should raise errors.OpPrereqError in case something is
162
    not fulfilled. Its return value is ignored.
163

164
    This method should also update all the parameters of the opcode to
165
    their canonical form if it hasn't been done by ExpandNames before.
166

167
    """
168
    raise NotImplementedError
169

    
170
  def Exec(self, feedback_fn):
171
    """Execute the LU.
172

173
    This method should implement the actual work. It should raise
174
    errors.OpExecError for failures that are somewhat dealt with in
175
    code, or expected.
176

177
    """
178
    raise NotImplementedError
179

    
180
  def BuildHooksEnv(self):
181
    """Build hooks environment for this LU.
182

183
    This method should return a three-node tuple consisting of: a dict
184
    containing the environment that will be used for running the
185
    specific hook for this LU, a list of node names on which the hook
186
    should run before the execution, and a list of node names on which
187
    the hook should run after the execution.
188

189
    The keys of the dict must not have 'GANETI_' prefixed as this will
190
    be handled in the hooks runner. Also note additional keys will be
191
    added by the hooks runner. If the LU doesn't define any
192
    environment, an empty dict (and not None) should be returned.
193

194
    No nodes should be returned as an empty list (and not None).
195

196
    Note that if the HPATH for a LU class is None, this function will
197
    not be called.
198

199
    """
200
    raise NotImplementedError
201

    
202
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
203
    """Notify the LU about the results of its hooks.
204

205
    This method is called every time a hooks phase is executed, and notifies
206
    the Logical Unit about the hooks' result. The LU can then use it to alter
207
    its result based on the hooks.  By default the method does nothing and the
208
    previous result is passed back unchanged but any LU can define it if it
209
    wants to use the local cluster hook-scripts somehow.
210

211
    Args:
212
      phase: the hooks phase that has just been run
213
      hooks_results: the results of the multi-node hooks rpc call
214
      feedback_fn: function to send feedback back to the caller
215
      lu_result: the previous result this LU had, or None in the PRE phase.
216

217
    """
218
    return lu_result
219

    
220

    
221
class NoHooksLU(LogicalUnit):
222
  """Simple LU which runs no hooks.
223

224
  This LU is intended as a parent for other LogicalUnits which will
225
  run no hooks, in order to reduce duplicate code.
226

227
  """
228
  HPATH = None
229
  HTYPE = None
230

    
231

    
232
def _GetWantedNodes(lu, nodes):
233
  """Returns list of checked and expanded node names.
234

235
  Args:
236
    nodes: List of nodes (strings) or None for all
237

238
  """
239
  if not isinstance(nodes, list):
240
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
241

    
242
  if nodes:
243
    wanted = []
244

    
245
    for name in nodes:
246
      node = lu.cfg.ExpandNodeName(name)
247
      if node is None:
248
        raise errors.OpPrereqError("No such node name '%s'" % name)
249
      wanted.append(node)
250

    
251
  else:
252
    wanted = lu.cfg.GetNodeList()
253
  return utils.NiceSort(wanted)
254

    
255

    
256
def _GetWantedInstances(lu, instances):
257
  """Returns list of checked and expanded instance names.
258

259
  Args:
260
    instances: List of instances (strings) or None for all
261

262
  """
263
  if not isinstance(instances, list):
264
    raise errors.OpPrereqError("Invalid argument type 'instances'")
265

    
266
  if instances:
267
    wanted = []
268

    
269
    for name in instances:
270
      instance = lu.cfg.ExpandInstanceName(name)
271
      if instance is None:
272
        raise errors.OpPrereqError("No such instance name '%s'" % name)
273
      wanted.append(instance)
274

    
275
  else:
276
    wanted = lu.cfg.GetInstanceList()
277
  return utils.NiceSort(wanted)
278

    
279

    
280
def _CheckOutputFields(static, dynamic, selected):
281
  """Checks whether all selected fields are valid.
282

283
  Args:
284
    static: Static fields
285
    dynamic: Dynamic fields
286

287
  """
288
  static_fields = frozenset(static)
289
  dynamic_fields = frozenset(dynamic)
290

    
291
  all_fields = static_fields | dynamic_fields
292

    
293
  if not all_fields.issuperset(selected):
294
    raise errors.OpPrereqError("Unknown output fields selected: %s"
295
                               % ",".join(frozenset(selected).
296
                                          difference(all_fields)))
297

    
298

    
299
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
300
                          memory, vcpus, nics):
301
  """Builds instance related env variables for hooks from single variables.
302

303
  Args:
304
    secondary_nodes: List of secondary nodes as strings
305
  """
306
  env = {
307
    "OP_TARGET": name,
308
    "INSTANCE_NAME": name,
309
    "INSTANCE_PRIMARY": primary_node,
310
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
311
    "INSTANCE_OS_TYPE": os_type,
312
    "INSTANCE_STATUS": status,
313
    "INSTANCE_MEMORY": memory,
314
    "INSTANCE_VCPUS": vcpus,
315
  }
316

    
317
  if nics:
318
    nic_count = len(nics)
319
    for idx, (ip, bridge, mac) in enumerate(nics):
320
      if ip is None:
321
        ip = ""
322
      env["INSTANCE_NIC%d_IP" % idx] = ip
323
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
324
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
325
  else:
326
    nic_count = 0
327

    
328
  env["INSTANCE_NIC_COUNT"] = nic_count
329

    
330
  return env
331

    
332

    
333
def _BuildInstanceHookEnvByObject(instance, override=None):
334
  """Builds instance related env variables for hooks from an object.
335

336
  Args:
337
    instance: objects.Instance object of instance
338
    override: dict of values to override
339
  """
340
  args = {
341
    'name': instance.name,
342
    'primary_node': instance.primary_node,
343
    'secondary_nodes': instance.secondary_nodes,
344
    'os_type': instance.os,
345
    'status': instance.os,
346
    'memory': instance.memory,
347
    'vcpus': instance.vcpus,
348
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
349
  }
350
  if override:
351
    args.update(override)
352
  return _BuildInstanceHookEnv(**args)
353

    
354

    
355
def _CheckInstanceBridgesExist(instance):
356
  """Check that the brigdes needed by an instance exist.
357

358
  """
359
  # check bridges existance
360
  brlist = [nic.bridge for nic in instance.nics]
361
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
362
    raise errors.OpPrereqError("one or more target bridges %s does not"
363
                               " exist on destination node '%s'" %
364
                               (brlist, instance.primary_node))
365

    
366

    
367
class LUDestroyCluster(NoHooksLU):
368
  """Logical unit for destroying the cluster.
369

370
  """
371
  _OP_REQP = []
372

    
373
  def CheckPrereq(self):
374
    """Check prerequisites.
375

376
    This checks whether the cluster is empty.
377

378
    Any errors are signalled by raising errors.OpPrereqError.
379

380
    """
381
    master = self.sstore.GetMasterNode()
382

    
383
    nodelist = self.cfg.GetNodeList()
384
    if len(nodelist) != 1 or nodelist[0] != master:
385
      raise errors.OpPrereqError("There are still %d node(s) in"
386
                                 " this cluster." % (len(nodelist) - 1))
387
    instancelist = self.cfg.GetInstanceList()
388
    if instancelist:
389
      raise errors.OpPrereqError("There are still %d instance(s) in"
390
                                 " this cluster." % len(instancelist))
391

    
392
  def Exec(self, feedback_fn):
393
    """Destroys the cluster.
394

395
    """
396
    master = self.sstore.GetMasterNode()
397
    if not rpc.call_node_stop_master(master):
398
      raise errors.OpExecError("Could not disable the master role")
399
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
400
    utils.CreateBackup(priv_key)
401
    utils.CreateBackup(pub_key)
402
    rpc.call_node_leave_cluster(master)
403

    
404

    
405
class LUVerifyCluster(LogicalUnit):
406
  """Verifies the cluster status.
407

408
  """
409
  HPATH = "cluster-verify"
410
  HTYPE = constants.HTYPE_CLUSTER
411
  _OP_REQP = ["skip_checks"]
412

    
413
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
414
                  remote_version, feedback_fn):
415
    """Run multiple tests against a node.
416

417
    Test list:
418
      - compares ganeti version
419
      - checks vg existance and size > 20G
420
      - checks config file checksum
421
      - checks ssh to other nodes
422

423
    Args:
424
      node: name of the node to check
425
      file_list: required list of files
426
      local_cksum: dictionary of local files and their checksums
427

428
    """
429
    # compares ganeti version
430
    local_version = constants.PROTOCOL_VERSION
431
    if not remote_version:
432
      feedback_fn("  - ERROR: connection to %s failed" % (node))
433
      return True
434

    
435
    if local_version != remote_version:
436
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
437
                      (local_version, node, remote_version))
438
      return True
439

    
440
    # checks vg existance and size > 20G
441

    
442
    bad = False
443
    if not vglist:
444
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
445
                      (node,))
446
      bad = True
447
    else:
448
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
449
                                            constants.MIN_VG_SIZE)
450
      if vgstatus:
451
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
452
        bad = True
453

    
454
    # checks config file checksum
455
    # checks ssh to any
456

    
457
    if 'filelist' not in node_result:
458
      bad = True
459
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
460
    else:
461
      remote_cksum = node_result['filelist']
462
      for file_name in file_list:
463
        if file_name not in remote_cksum:
464
          bad = True
465
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
466
        elif remote_cksum[file_name] != local_cksum[file_name]:
467
          bad = True
468
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
469

    
470
    if 'nodelist' not in node_result:
471
      bad = True
472
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
473
    else:
474
      if node_result['nodelist']:
475
        bad = True
476
        for node in node_result['nodelist']:
477
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
478
                          (node, node_result['nodelist'][node]))
479
    if 'node-net-test' not in node_result:
480
      bad = True
481
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
482
    else:
483
      if node_result['node-net-test']:
484
        bad = True
485
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
486
        for node in nlist:
487
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
488
                          (node, node_result['node-net-test'][node]))
489

    
490
    hyp_result = node_result.get('hypervisor', None)
491
    if hyp_result is not None:
492
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
493
    return bad
494

    
495
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
496
                      node_instance, feedback_fn):
497
    """Verify an instance.
498

499
    This function checks to see if the required block devices are
500
    available on the instance's node.
501

502
    """
503
    bad = False
504

    
505
    node_current = instanceconfig.primary_node
506

    
507
    node_vol_should = {}
508
    instanceconfig.MapLVsByNode(node_vol_should)
509

    
510
    for node in node_vol_should:
511
      for volume in node_vol_should[node]:
512
        if node not in node_vol_is or volume not in node_vol_is[node]:
513
          feedback_fn("  - ERROR: volume %s missing on node %s" %
514
                          (volume, node))
515
          bad = True
516

    
517
    if not instanceconfig.status == 'down':
518
      if (node_current not in node_instance or
519
          not instance in node_instance[node_current]):
520
        feedback_fn("  - ERROR: instance %s not running on node %s" %
521
                        (instance, node_current))
522
        bad = True
523

    
524
    for node in node_instance:
525
      if (not node == node_current):
526
        if instance in node_instance[node]:
527
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
528
                          (instance, node))
529
          bad = True
530

    
531
    return bad
532

    
533
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
534
    """Verify if there are any unknown volumes in the cluster.
535

536
    The .os, .swap and backup volumes are ignored. All other volumes are
537
    reported as unknown.
538

539
    """
540
    bad = False
541

    
542
    for node in node_vol_is:
543
      for volume in node_vol_is[node]:
544
        if node not in node_vol_should or volume not in node_vol_should[node]:
545
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
546
                      (volume, node))
547
          bad = True
548
    return bad
549

    
550
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
551
    """Verify the list of running instances.
552

553
    This checks what instances are running but unknown to the cluster.
554

555
    """
556
    bad = False
557
    for node in node_instance:
558
      for runninginstance in node_instance[node]:
559
        if runninginstance not in instancelist:
560
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
561
                          (runninginstance, node))
562
          bad = True
563
    return bad
564

    
565
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
566
    """Verify N+1 Memory Resilience.
567

568
    Check that if one single node dies we can still start all the instances it
569
    was primary for.
570

571
    """
572
    bad = False
573

    
574
    for node, nodeinfo in node_info.iteritems():
575
      # This code checks that every node which is now listed as secondary has
576
      # enough memory to host all instances it is supposed to should a single
577
      # other node in the cluster fail.
578
      # FIXME: not ready for failover to an arbitrary node
579
      # FIXME: does not support file-backed instances
580
      # WARNING: we currently take into account down instances as well as up
581
      # ones, considering that even if they're down someone might want to start
582
      # them even in the event of a node failure.
583
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
584
        needed_mem = 0
585
        for instance in instances:
586
          needed_mem += instance_cfg[instance].memory
587
        if nodeinfo['mfree'] < needed_mem:
588
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
589
                      " failovers should node %s fail" % (node, prinode))
590
          bad = True
591
    return bad
592

    
593
  def CheckPrereq(self):
594
    """Check prerequisites.
595

596
    Transform the list of checks we're going to skip into a set and check that
597
    all its members are valid.
598

599
    """
600
    self.skip_set = frozenset(self.op.skip_checks)
601
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
602
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
603

    
604
  def BuildHooksEnv(self):
605
    """Build hooks env.
606

607
    Cluster-Verify hooks just rone in the post phase and their failure makes
608
    the output be logged in the verify output and the verification to fail.
609

610
    """
611
    all_nodes = self.cfg.GetNodeList()
612
    # TODO: populate the environment with useful information for verify hooks
613
    env = {}
614
    return env, [], all_nodes
615

    
616
  def Exec(self, feedback_fn):
617
    """Verify integrity of cluster, performing various test on nodes.
618

619
    """
620
    bad = False
621
    feedback_fn("* Verifying global settings")
622
    for msg in self.cfg.VerifyConfig():
623
      feedback_fn("  - ERROR: %s" % msg)
624

    
625
    vg_name = self.cfg.GetVGName()
626
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
627
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
628
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
629
    i_non_redundant = [] # Non redundant instances
630
    node_volume = {}
631
    node_instance = {}
632
    node_info = {}
633
    instance_cfg = {}
634

    
635
    # FIXME: verify OS list
636
    # do local checksums
637
    file_names = list(self.sstore.GetFileList())
638
    file_names.append(constants.SSL_CERT_FILE)
639
    file_names.append(constants.CLUSTER_CONF_FILE)
640
    local_checksums = utils.FingerprintFiles(file_names)
641

    
642
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
643
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
644
    all_instanceinfo = rpc.call_instance_list(nodelist)
645
    all_vglist = rpc.call_vg_list(nodelist)
646
    node_verify_param = {
647
      'filelist': file_names,
648
      'nodelist': nodelist,
649
      'hypervisor': None,
650
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
651
                        for node in nodeinfo]
652
      }
653
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
654
    all_rversion = rpc.call_version(nodelist)
655
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
656

    
657
    for node in nodelist:
658
      feedback_fn("* Verifying node %s" % node)
659
      result = self._VerifyNode(node, file_names, local_checksums,
660
                                all_vglist[node], all_nvinfo[node],
661
                                all_rversion[node], feedback_fn)
662
      bad = bad or result
663

    
664
      # node_volume
665
      volumeinfo = all_volumeinfo[node]
666

    
667
      if isinstance(volumeinfo, basestring):
668
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
669
                    (node, volumeinfo[-400:].encode('string_escape')))
670
        bad = True
671
        node_volume[node] = {}
672
      elif not isinstance(volumeinfo, dict):
673
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
674
        bad = True
675
        continue
676
      else:
677
        node_volume[node] = volumeinfo
678

    
679
      # node_instance
680
      nodeinstance = all_instanceinfo[node]
681
      if type(nodeinstance) != list:
682
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
683
        bad = True
684
        continue
685

    
686
      node_instance[node] = nodeinstance
687

    
688
      # node_info
689
      nodeinfo = all_ninfo[node]
690
      if not isinstance(nodeinfo, dict):
691
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
692
        bad = True
693
        continue
694

    
695
      try:
696
        node_info[node] = {
697
          "mfree": int(nodeinfo['memory_free']),
698
          "dfree": int(nodeinfo['vg_free']),
699
          "pinst": [],
700
          "sinst": [],
701
          # dictionary holding all instances this node is secondary for,
702
          # grouped by their primary node. Each key is a cluster node, and each
703
          # value is a list of instances which have the key as primary and the
704
          # current node as secondary.  this is handy to calculate N+1 memory
705
          # availability if you can only failover from a primary to its
706
          # secondary.
707
          "sinst-by-pnode": {},
708
        }
709
      except ValueError:
710
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
711
        bad = True
712
        continue
713

    
714
    node_vol_should = {}
715

    
716
    for instance in instancelist:
717
      feedback_fn("* Verifying instance %s" % instance)
718
      inst_config = self.cfg.GetInstanceInfo(instance)
719
      result =  self._VerifyInstance(instance, inst_config, node_volume,
720
                                     node_instance, feedback_fn)
721
      bad = bad or result
722

    
723
      inst_config.MapLVsByNode(node_vol_should)
724

    
725
      instance_cfg[instance] = inst_config
726

    
727
      pnode = inst_config.primary_node
728
      if pnode in node_info:
729
        node_info[pnode]['pinst'].append(instance)
730
      else:
731
        feedback_fn("  - ERROR: instance %s, connection to primary node"
732
                    " %s failed" % (instance, pnode))
733
        bad = True
734

    
735
      # If the instance is non-redundant we cannot survive losing its primary
736
      # node, so we are not N+1 compliant. On the other hand we have no disk
737
      # templates with more than one secondary so that situation is not well
738
      # supported either.
739
      # FIXME: does not support file-backed instances
740
      if len(inst_config.secondary_nodes) == 0:
741
        i_non_redundant.append(instance)
742
      elif len(inst_config.secondary_nodes) > 1:
743
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
744
                    % instance)
745

    
746
      for snode in inst_config.secondary_nodes:
747
        if snode in node_info:
748
          node_info[snode]['sinst'].append(instance)
749
          if pnode not in node_info[snode]['sinst-by-pnode']:
750
            node_info[snode]['sinst-by-pnode'][pnode] = []
751
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
752
        else:
753
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
754
                      " %s failed" % (instance, snode))
755

    
756
    feedback_fn("* Verifying orphan volumes")
757
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
758
                                       feedback_fn)
759
    bad = bad or result
760

    
761
    feedback_fn("* Verifying remaining instances")
762
    result = self._VerifyOrphanInstances(instancelist, node_instance,
763
                                         feedback_fn)
764
    bad = bad or result
765

    
766
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
767
      feedback_fn("* Verifying N+1 Memory redundancy")
768
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
769
      bad = bad or result
770

    
771
    feedback_fn("* Other Notes")
772
    if i_non_redundant:
773
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
774
                  % len(i_non_redundant))
775

    
776
    return int(bad)
777

    
778
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
779
    """Analize the post-hooks' result, handle it, and send some
780
    nicely-formatted feedback back to the user.
781

782
    Args:
783
      phase: the hooks phase that has just been run
784
      hooks_results: the results of the multi-node hooks rpc call
785
      feedback_fn: function to send feedback back to the caller
786
      lu_result: previous Exec result
787

788
    """
789
    # We only really run POST phase hooks, and are only interested in their results
790
    if phase == constants.HOOKS_PHASE_POST:
791
      # Used to change hooks' output to proper indentation
792
      indent_re = re.compile('^', re.M)
793
      feedback_fn("* Hooks Results")
794
      if not hooks_results:
795
        feedback_fn("  - ERROR: general communication failure")
796
        lu_result = 1
797
      else:
798
        for node_name in hooks_results:
799
          show_node_header = True
800
          res = hooks_results[node_name]
801
          if res is False or not isinstance(res, list):
802
            feedback_fn("    Communication failure")
803
            lu_result = 1
804
            continue
805
          for script, hkr, output in res:
806
            if hkr == constants.HKR_FAIL:
807
              # The node header is only shown once, if there are
808
              # failing hooks on that node
809
              if show_node_header:
810
                feedback_fn("  Node %s:" % node_name)
811
                show_node_header = False
812
              feedback_fn("    ERROR: Script %s failed, output:" % script)
813
              output = indent_re.sub('      ', output)
814
              feedback_fn("%s" % output)
815
              lu_result = 1
816

    
817
      return lu_result
818

    
819

    
820
class LUVerifyDisks(NoHooksLU):
821
  """Verifies the cluster disks status.
822

823
  """
824
  _OP_REQP = []
825

    
826
  def CheckPrereq(self):
827
    """Check prerequisites.
828

829
    This has no prerequisites.
830

831
    """
832
    pass
833

    
834
  def Exec(self, feedback_fn):
835
    """Verify integrity of cluster disks.
836

837
    """
838
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
839

    
840
    vg_name = self.cfg.GetVGName()
841
    nodes = utils.NiceSort(self.cfg.GetNodeList())
842
    instances = [self.cfg.GetInstanceInfo(name)
843
                 for name in self.cfg.GetInstanceList()]
844

    
845
    nv_dict = {}
846
    for inst in instances:
847
      inst_lvs = {}
848
      if (inst.status != "up" or
849
          inst.disk_template not in constants.DTS_NET_MIRROR):
850
        continue
851
      inst.MapLVsByNode(inst_lvs)
852
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
853
      for node, vol_list in inst_lvs.iteritems():
854
        for vol in vol_list:
855
          nv_dict[(node, vol)] = inst
856

    
857
    if not nv_dict:
858
      return result
859

    
860
    node_lvs = rpc.call_volume_list(nodes, vg_name)
861

    
862
    to_act = set()
863
    for node in nodes:
864
      # node_volume
865
      lvs = node_lvs[node]
866

    
867
      if isinstance(lvs, basestring):
868
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
869
        res_nlvm[node] = lvs
870
      elif not isinstance(lvs, dict):
871
        logger.Info("connection to node %s failed or invalid data returned" %
872
                    (node,))
873
        res_nodes.append(node)
874
        continue
875

    
876
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
877
        inst = nv_dict.pop((node, lv_name), None)
878
        if (not lv_online and inst is not None
879
            and inst.name not in res_instances):
880
          res_instances.append(inst.name)
881

    
882
    # any leftover items in nv_dict are missing LVs, let's arrange the
883
    # data better
884
    for key, inst in nv_dict.iteritems():
885
      if inst.name not in res_missing:
886
        res_missing[inst.name] = []
887
      res_missing[inst.name].append(key)
888

    
889
    return result
890

    
891

    
892
class LURenameCluster(LogicalUnit):
893
  """Rename the cluster.
894

895
  """
896
  HPATH = "cluster-rename"
897
  HTYPE = constants.HTYPE_CLUSTER
898
  _OP_REQP = ["name"]
899
  REQ_WSSTORE = True
900

    
901
  def BuildHooksEnv(self):
902
    """Build hooks env.
903

904
    """
905
    env = {
906
      "OP_TARGET": self.sstore.GetClusterName(),
907
      "NEW_NAME": self.op.name,
908
      }
909
    mn = self.sstore.GetMasterNode()
910
    return env, [mn], [mn]
911

    
912
  def CheckPrereq(self):
913
    """Verify that the passed name is a valid one.
914

915
    """
916
    hostname = utils.HostInfo(self.op.name)
917

    
918
    new_name = hostname.name
919
    self.ip = new_ip = hostname.ip
920
    old_name = self.sstore.GetClusterName()
921
    old_ip = self.sstore.GetMasterIP()
922
    if new_name == old_name and new_ip == old_ip:
923
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
924
                                 " cluster has changed")
925
    if new_ip != old_ip:
926
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
927
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
928
                                   " reachable on the network. Aborting." %
929
                                   new_ip)
930

    
931
    self.op.name = new_name
932

    
933
  def Exec(self, feedback_fn):
934
    """Rename the cluster.
935

936
    """
937
    clustername = self.op.name
938
    ip = self.ip
939
    ss = self.sstore
940

    
941
    # shutdown the master IP
942
    master = ss.GetMasterNode()
943
    if not rpc.call_node_stop_master(master):
944
      raise errors.OpExecError("Could not disable the master role")
945

    
946
    try:
947
      # modify the sstore
948
      ss.SetKey(ss.SS_MASTER_IP, ip)
949
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
950

    
951
      # Distribute updated ss config to all nodes
952
      myself = self.cfg.GetNodeInfo(master)
953
      dist_nodes = self.cfg.GetNodeList()
954
      if myself.name in dist_nodes:
955
        dist_nodes.remove(myself.name)
956

    
957
      logger.Debug("Copying updated ssconf data to all nodes")
958
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
959
        fname = ss.KeyToFilename(keyname)
960
        result = rpc.call_upload_file(dist_nodes, fname)
961
        for to_node in dist_nodes:
962
          if not result[to_node]:
963
            logger.Error("copy of file %s to node %s failed" %
964
                         (fname, to_node))
965
    finally:
966
      if not rpc.call_node_start_master(master):
967
        logger.Error("Could not re-enable the master role on the master,"
968
                     " please restart manually.")
969

    
970

    
971
def _RecursiveCheckIfLVMBased(disk):
972
  """Check if the given disk or its children are lvm-based.
973

974
  Args:
975
    disk: ganeti.objects.Disk object
976

977
  Returns:
978
    boolean indicating whether a LD_LV dev_type was found or not
979

980
  """
981
  if disk.children:
982
    for chdisk in disk.children:
983
      if _RecursiveCheckIfLVMBased(chdisk):
984
        return True
985
  return disk.dev_type == constants.LD_LV
986

    
987

    
988
class LUSetClusterParams(LogicalUnit):
989
  """Change the parameters of the cluster.
990

991
  """
992
  HPATH = "cluster-modify"
993
  HTYPE = constants.HTYPE_CLUSTER
994
  _OP_REQP = []
995

    
996
  def BuildHooksEnv(self):
997
    """Build hooks env.
998

999
    """
1000
    env = {
1001
      "OP_TARGET": self.sstore.GetClusterName(),
1002
      "NEW_VG_NAME": self.op.vg_name,
1003
      }
1004
    mn = self.sstore.GetMasterNode()
1005
    return env, [mn], [mn]
1006

    
1007
  def CheckPrereq(self):
1008
    """Check prerequisites.
1009

1010
    This checks whether the given params don't conflict and
1011
    if the given volume group is valid.
1012

1013
    """
1014
    if not self.op.vg_name:
1015
      instances = [self.cfg.GetInstanceInfo(name)
1016
                   for name in self.cfg.GetInstanceList()]
1017
      for inst in instances:
1018
        for disk in inst.disks:
1019
          if _RecursiveCheckIfLVMBased(disk):
1020
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1021
                                       " lvm-based instances exist")
1022

    
1023
    # if vg_name not None, checks given volume group on all nodes
1024
    if self.op.vg_name:
1025
      node_list = self.cfg.GetNodeList()
1026
      vglist = rpc.call_vg_list(node_list)
1027
      for node in node_list:
1028
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1029
                                              constants.MIN_VG_SIZE)
1030
        if vgstatus:
1031
          raise errors.OpPrereqError("Error on node '%s': %s" %
1032
                                     (node, vgstatus))
1033

    
1034
  def Exec(self, feedback_fn):
1035
    """Change the parameters of the cluster.
1036

1037
    """
1038
    if self.op.vg_name != self.cfg.GetVGName():
1039
      self.cfg.SetVGName(self.op.vg_name)
1040
    else:
1041
      feedback_fn("Cluster LVM configuration already in desired"
1042
                  " state, not changing")
1043

    
1044

    
1045
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1046
  """Sleep and poll for an instance's disk to sync.
1047

1048
  """
1049
  if not instance.disks:
1050
    return True
1051

    
1052
  if not oneshot:
1053
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1054

    
1055
  node = instance.primary_node
1056

    
1057
  for dev in instance.disks:
1058
    cfgw.SetDiskID(dev, node)
1059

    
1060
  retries = 0
1061
  while True:
1062
    max_time = 0
1063
    done = True
1064
    cumul_degraded = False
1065
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1066
    if not rstats:
1067
      proc.LogWarning("Can't get any data from node %s" % node)
1068
      retries += 1
1069
      if retries >= 10:
1070
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1071
                                 " aborting." % node)
1072
      time.sleep(6)
1073
      continue
1074
    retries = 0
1075
    for i in range(len(rstats)):
1076
      mstat = rstats[i]
1077
      if mstat is None:
1078
        proc.LogWarning("Can't compute data for node %s/%s" %
1079
                        (node, instance.disks[i].iv_name))
1080
        continue
1081
      # we ignore the ldisk parameter
1082
      perc_done, est_time, is_degraded, _ = mstat
1083
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1084
      if perc_done is not None:
1085
        done = False
1086
        if est_time is not None:
1087
          rem_time = "%d estimated seconds remaining" % est_time
1088
          max_time = est_time
1089
        else:
1090
          rem_time = "no time estimate"
1091
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1092
                     (instance.disks[i].iv_name, perc_done, rem_time))
1093
    if done or oneshot:
1094
      break
1095

    
1096
    time.sleep(min(60, max_time))
1097

    
1098
  if done:
1099
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1100
  return not cumul_degraded
1101

    
1102

    
1103
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1104
  """Check that mirrors are not degraded.
1105

1106
  The ldisk parameter, if True, will change the test from the
1107
  is_degraded attribute (which represents overall non-ok status for
1108
  the device(s)) to the ldisk (representing the local storage status).
1109

1110
  """
1111
  cfgw.SetDiskID(dev, node)
1112
  if ldisk:
1113
    idx = 6
1114
  else:
1115
    idx = 5
1116

    
1117
  result = True
1118
  if on_primary or dev.AssembleOnSecondary():
1119
    rstats = rpc.call_blockdev_find(node, dev)
1120
    if not rstats:
1121
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1122
      result = False
1123
    else:
1124
      result = result and (not rstats[idx])
1125
  if dev.children:
1126
    for child in dev.children:
1127
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1128

    
1129
  return result
1130

    
1131

    
1132
class LUDiagnoseOS(NoHooksLU):
1133
  """Logical unit for OS diagnose/query.
1134

1135
  """
1136
  _OP_REQP = ["output_fields", "names"]
1137

    
1138
  def CheckPrereq(self):
1139
    """Check prerequisites.
1140

1141
    This always succeeds, since this is a pure query LU.
1142

1143
    """
1144
    if self.op.names:
1145
      raise errors.OpPrereqError("Selective OS query not supported")
1146

    
1147
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1148
    _CheckOutputFields(static=[],
1149
                       dynamic=self.dynamic_fields,
1150
                       selected=self.op.output_fields)
1151

    
1152
  @staticmethod
1153
  def _DiagnoseByOS(node_list, rlist):
1154
    """Remaps a per-node return list into an a per-os per-node dictionary
1155

1156
      Args:
1157
        node_list: a list with the names of all nodes
1158
        rlist: a map with node names as keys and OS objects as values
1159

1160
      Returns:
1161
        map: a map with osnames as keys and as value another map, with
1162
             nodes as
1163
             keys and list of OS objects as values
1164
             e.g. {"debian-etch": {"node1": [<object>,...],
1165
                                   "node2": [<object>,]}
1166
                  }
1167

1168
    """
1169
    all_os = {}
1170
    for node_name, nr in rlist.iteritems():
1171
      if not nr:
1172
        continue
1173
      for os_obj in nr:
1174
        if os_obj.name not in all_os:
1175
          # build a list of nodes for this os containing empty lists
1176
          # for each node in node_list
1177
          all_os[os_obj.name] = {}
1178
          for nname in node_list:
1179
            all_os[os_obj.name][nname] = []
1180
        all_os[os_obj.name][node_name].append(os_obj)
1181
    return all_os
1182

    
1183
  def Exec(self, feedback_fn):
1184
    """Compute the list of OSes.
1185

1186
    """
1187
    node_list = self.cfg.GetNodeList()
1188
    node_data = rpc.call_os_diagnose(node_list)
1189
    if node_data == False:
1190
      raise errors.OpExecError("Can't gather the list of OSes")
1191
    pol = self._DiagnoseByOS(node_list, node_data)
1192
    output = []
1193
    for os_name, os_data in pol.iteritems():
1194
      row = []
1195
      for field in self.op.output_fields:
1196
        if field == "name":
1197
          val = os_name
1198
        elif field == "valid":
1199
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1200
        elif field == "node_status":
1201
          val = {}
1202
          for node_name, nos_list in os_data.iteritems():
1203
            val[node_name] = [(v.status, v.path) for v in nos_list]
1204
        else:
1205
          raise errors.ParameterError(field)
1206
        row.append(val)
1207
      output.append(row)
1208

    
1209
    return output
1210

    
1211

    
1212
class LURemoveNode(LogicalUnit):
1213
  """Logical unit for removing a node.
1214

1215
  """
1216
  HPATH = "node-remove"
1217
  HTYPE = constants.HTYPE_NODE
1218
  _OP_REQP = ["node_name"]
1219

    
1220
  def BuildHooksEnv(self):
1221
    """Build hooks env.
1222

1223
    This doesn't run on the target node in the pre phase as a failed
1224
    node would then be impossible to remove.
1225

1226
    """
1227
    env = {
1228
      "OP_TARGET": self.op.node_name,
1229
      "NODE_NAME": self.op.node_name,
1230
      }
1231
    all_nodes = self.cfg.GetNodeList()
1232
    all_nodes.remove(self.op.node_name)
1233
    return env, all_nodes, all_nodes
1234

    
1235
  def CheckPrereq(self):
1236
    """Check prerequisites.
1237

1238
    This checks:
1239
     - the node exists in the configuration
1240
     - it does not have primary or secondary instances
1241
     - it's not the master
1242

1243
    Any errors are signalled by raising errors.OpPrereqError.
1244

1245
    """
1246
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1247
    if node is None:
1248
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1249

    
1250
    instance_list = self.cfg.GetInstanceList()
1251

    
1252
    masternode = self.sstore.GetMasterNode()
1253
    if node.name == masternode:
1254
      raise errors.OpPrereqError("Node is the master node,"
1255
                                 " you need to failover first.")
1256

    
1257
    for instance_name in instance_list:
1258
      instance = self.cfg.GetInstanceInfo(instance_name)
1259
      if node.name == instance.primary_node:
1260
        raise errors.OpPrereqError("Instance %s still running on the node,"
1261
                                   " please remove first." % instance_name)
1262
      if node.name in instance.secondary_nodes:
1263
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1264
                                   " please remove first." % instance_name)
1265
    self.op.node_name = node.name
1266
    self.node = node
1267

    
1268
  def Exec(self, feedback_fn):
1269
    """Removes the node from the cluster.
1270

1271
    """
1272
    node = self.node
1273
    logger.Info("stopping the node daemon and removing configs from node %s" %
1274
                node.name)
1275

    
1276
    rpc.call_node_leave_cluster(node.name)
1277

    
1278
    logger.Info("Removing node %s from config" % node.name)
1279

    
1280
    self.cfg.RemoveNode(node.name)
1281
    # Remove the node from the Ganeti Lock Manager
1282
    self.context.glm.remove(locking.LEVEL_NODE, node.name)
1283

    
1284
    utils.RemoveHostFromEtcHosts(node.name)
1285

    
1286

    
1287
class LUQueryNodes(NoHooksLU):
1288
  """Logical unit for querying nodes.
1289

1290
  """
1291
  _OP_REQP = ["output_fields", "names"]
1292

    
1293
  def CheckPrereq(self):
1294
    """Check prerequisites.
1295

1296
    This checks that the fields required are valid output fields.
1297

1298
    """
1299
    self.dynamic_fields = frozenset([
1300
      "dtotal", "dfree",
1301
      "mtotal", "mnode", "mfree",
1302
      "bootid",
1303
      "ctotal",
1304
      ])
1305

    
1306
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1307
                               "pinst_list", "sinst_list",
1308
                               "pip", "sip", "tags"],
1309
                       dynamic=self.dynamic_fields,
1310
                       selected=self.op.output_fields)
1311

    
1312
    self.wanted = _GetWantedNodes(self, self.op.names)
1313

    
1314
  def Exec(self, feedback_fn):
1315
    """Computes the list of nodes and their attributes.
1316

1317
    """
1318
    nodenames = self.wanted
1319
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1320

    
1321
    # begin data gathering
1322

    
1323
    if self.dynamic_fields.intersection(self.op.output_fields):
1324
      live_data = {}
1325
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1326
      for name in nodenames:
1327
        nodeinfo = node_data.get(name, None)
1328
        if nodeinfo:
1329
          live_data[name] = {
1330
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1331
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1332
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1333
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1334
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1335
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1336
            "bootid": nodeinfo['bootid'],
1337
            }
1338
        else:
1339
          live_data[name] = {}
1340
    else:
1341
      live_data = dict.fromkeys(nodenames, {})
1342

    
1343
    node_to_primary = dict([(name, set()) for name in nodenames])
1344
    node_to_secondary = dict([(name, set()) for name in nodenames])
1345

    
1346
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1347
                             "sinst_cnt", "sinst_list"))
1348
    if inst_fields & frozenset(self.op.output_fields):
1349
      instancelist = self.cfg.GetInstanceList()
1350

    
1351
      for instance_name in instancelist:
1352
        inst = self.cfg.GetInstanceInfo(instance_name)
1353
        if inst.primary_node in node_to_primary:
1354
          node_to_primary[inst.primary_node].add(inst.name)
1355
        for secnode in inst.secondary_nodes:
1356
          if secnode in node_to_secondary:
1357
            node_to_secondary[secnode].add(inst.name)
1358

    
1359
    # end data gathering
1360

    
1361
    output = []
1362
    for node in nodelist:
1363
      node_output = []
1364
      for field in self.op.output_fields:
1365
        if field == "name":
1366
          val = node.name
1367
        elif field == "pinst_list":
1368
          val = list(node_to_primary[node.name])
1369
        elif field == "sinst_list":
1370
          val = list(node_to_secondary[node.name])
1371
        elif field == "pinst_cnt":
1372
          val = len(node_to_primary[node.name])
1373
        elif field == "sinst_cnt":
1374
          val = len(node_to_secondary[node.name])
1375
        elif field == "pip":
1376
          val = node.primary_ip
1377
        elif field == "sip":
1378
          val = node.secondary_ip
1379
        elif field == "tags":
1380
          val = list(node.GetTags())
1381
        elif field in self.dynamic_fields:
1382
          val = live_data[node.name].get(field, None)
1383
        else:
1384
          raise errors.ParameterError(field)
1385
        node_output.append(val)
1386
      output.append(node_output)
1387

    
1388
    return output
1389

    
1390

    
1391
class LUQueryNodeVolumes(NoHooksLU):
1392
  """Logical unit for getting volumes on node(s).
1393

1394
  """
1395
  _OP_REQP = ["nodes", "output_fields"]
1396

    
1397
  def CheckPrereq(self):
1398
    """Check prerequisites.
1399

1400
    This checks that the fields required are valid output fields.
1401

1402
    """
1403
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1404

    
1405
    _CheckOutputFields(static=["node"],
1406
                       dynamic=["phys", "vg", "name", "size", "instance"],
1407
                       selected=self.op.output_fields)
1408

    
1409

    
1410
  def Exec(self, feedback_fn):
1411
    """Computes the list of nodes and their attributes.
1412

1413
    """
1414
    nodenames = self.nodes
1415
    volumes = rpc.call_node_volumes(nodenames)
1416

    
1417
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1418
             in self.cfg.GetInstanceList()]
1419

    
1420
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1421

    
1422
    output = []
1423
    for node in nodenames:
1424
      if node not in volumes or not volumes[node]:
1425
        continue
1426

    
1427
      node_vols = volumes[node][:]
1428
      node_vols.sort(key=lambda vol: vol['dev'])
1429

    
1430
      for vol in node_vols:
1431
        node_output = []
1432
        for field in self.op.output_fields:
1433
          if field == "node":
1434
            val = node
1435
          elif field == "phys":
1436
            val = vol['dev']
1437
          elif field == "vg":
1438
            val = vol['vg']
1439
          elif field == "name":
1440
            val = vol['name']
1441
          elif field == "size":
1442
            val = int(float(vol['size']))
1443
          elif field == "instance":
1444
            for inst in ilist:
1445
              if node not in lv_by_node[inst]:
1446
                continue
1447
              if vol['name'] in lv_by_node[inst][node]:
1448
                val = inst.name
1449
                break
1450
            else:
1451
              val = '-'
1452
          else:
1453
            raise errors.ParameterError(field)
1454
          node_output.append(str(val))
1455

    
1456
        output.append(node_output)
1457

    
1458
    return output
1459

    
1460

    
1461
class LUAddNode(LogicalUnit):
1462
  """Logical unit for adding node to the cluster.
1463

1464
  """
1465
  HPATH = "node-add"
1466
  HTYPE = constants.HTYPE_NODE
1467
  _OP_REQP = ["node_name"]
1468

    
1469
  def BuildHooksEnv(self):
1470
    """Build hooks env.
1471

1472
    This will run on all nodes before, and on all nodes + the new node after.
1473

1474
    """
1475
    env = {
1476
      "OP_TARGET": self.op.node_name,
1477
      "NODE_NAME": self.op.node_name,
1478
      "NODE_PIP": self.op.primary_ip,
1479
      "NODE_SIP": self.op.secondary_ip,
1480
      }
1481
    nodes_0 = self.cfg.GetNodeList()
1482
    nodes_1 = nodes_0 + [self.op.node_name, ]
1483
    return env, nodes_0, nodes_1
1484

    
1485
  def CheckPrereq(self):
1486
    """Check prerequisites.
1487

1488
    This checks:
1489
     - the new node is not already in the config
1490
     - it is resolvable
1491
     - its parameters (single/dual homed) matches the cluster
1492

1493
    Any errors are signalled by raising errors.OpPrereqError.
1494

1495
    """
1496
    node_name = self.op.node_name
1497
    cfg = self.cfg
1498

    
1499
    dns_data = utils.HostInfo(node_name)
1500

    
1501
    node = dns_data.name
1502
    primary_ip = self.op.primary_ip = dns_data.ip
1503
    secondary_ip = getattr(self.op, "secondary_ip", None)
1504
    if secondary_ip is None:
1505
      secondary_ip = primary_ip
1506
    if not utils.IsValidIP(secondary_ip):
1507
      raise errors.OpPrereqError("Invalid secondary IP given")
1508
    self.op.secondary_ip = secondary_ip
1509

    
1510
    node_list = cfg.GetNodeList()
1511
    if not self.op.readd and node in node_list:
1512
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1513
                                 node)
1514
    elif self.op.readd and node not in node_list:
1515
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1516

    
1517
    for existing_node_name in node_list:
1518
      existing_node = cfg.GetNodeInfo(existing_node_name)
1519

    
1520
      if self.op.readd and node == existing_node_name:
1521
        if (existing_node.primary_ip != primary_ip or
1522
            existing_node.secondary_ip != secondary_ip):
1523
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1524
                                     " address configuration as before")
1525
        continue
1526

    
1527
      if (existing_node.primary_ip == primary_ip or
1528
          existing_node.secondary_ip == primary_ip or
1529
          existing_node.primary_ip == secondary_ip or
1530
          existing_node.secondary_ip == secondary_ip):
1531
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1532
                                   " existing node %s" % existing_node.name)
1533

    
1534
    # check that the type of the node (single versus dual homed) is the
1535
    # same as for the master
1536
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1537
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1538
    newbie_singlehomed = secondary_ip == primary_ip
1539
    if master_singlehomed != newbie_singlehomed:
1540
      if master_singlehomed:
1541
        raise errors.OpPrereqError("The master has no private ip but the"
1542
                                   " new node has one")
1543
      else:
1544
        raise errors.OpPrereqError("The master has a private ip but the"
1545
                                   " new node doesn't have one")
1546

    
1547
    # checks reachablity
1548
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1549
      raise errors.OpPrereqError("Node not reachable by ping")
1550

    
1551
    if not newbie_singlehomed:
1552
      # check reachability from my secondary ip to newbie's secondary ip
1553
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1554
                           source=myself.secondary_ip):
1555
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1556
                                   " based ping to noded port")
1557

    
1558
    self.new_node = objects.Node(name=node,
1559
                                 primary_ip=primary_ip,
1560
                                 secondary_ip=secondary_ip)
1561

    
1562
  def Exec(self, feedback_fn):
1563
    """Adds the new node to the cluster.
1564

1565
    """
1566
    new_node = self.new_node
1567
    node = new_node.name
1568

    
1569
    # check connectivity
1570
    result = rpc.call_version([node])[node]
1571
    if result:
1572
      if constants.PROTOCOL_VERSION == result:
1573
        logger.Info("communication to node %s fine, sw version %s match" %
1574
                    (node, result))
1575
      else:
1576
        raise errors.OpExecError("Version mismatch master version %s,"
1577
                                 " node version %s" %
1578
                                 (constants.PROTOCOL_VERSION, result))
1579
    else:
1580
      raise errors.OpExecError("Cannot get version from the new node")
1581

    
1582
    # setup ssh on node
1583
    logger.Info("copy ssh key to node %s" % node)
1584
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1585
    keyarray = []
1586
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1587
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1588
                priv_key, pub_key]
1589

    
1590
    for i in keyfiles:
1591
      f = open(i, 'r')
1592
      try:
1593
        keyarray.append(f.read())
1594
      finally:
1595
        f.close()
1596

    
1597
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1598
                               keyarray[3], keyarray[4], keyarray[5])
1599

    
1600
    if not result:
1601
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1602

    
1603
    # Add node to our /etc/hosts, and add key to known_hosts
1604
    utils.AddHostToEtcHosts(new_node.name)
1605

    
1606
    if new_node.secondary_ip != new_node.primary_ip:
1607
      if not rpc.call_node_tcp_ping(new_node.name,
1608
                                    constants.LOCALHOST_IP_ADDRESS,
1609
                                    new_node.secondary_ip,
1610
                                    constants.DEFAULT_NODED_PORT,
1611
                                    10, False):
1612
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1613
                                 " you gave (%s). Please fix and re-run this"
1614
                                 " command." % new_node.secondary_ip)
1615

    
1616
    node_verify_list = [self.sstore.GetMasterNode()]
1617
    node_verify_param = {
1618
      'nodelist': [node],
1619
      # TODO: do a node-net-test as well?
1620
    }
1621

    
1622
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1623
    for verifier in node_verify_list:
1624
      if not result[verifier]:
1625
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1626
                                 " for remote verification" % verifier)
1627
      if result[verifier]['nodelist']:
1628
        for failed in result[verifier]['nodelist']:
1629
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1630
                      (verifier, result[verifier]['nodelist'][failed]))
1631
        raise errors.OpExecError("ssh/hostname verification failed.")
1632

    
1633
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1634
    # including the node just added
1635
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1636
    dist_nodes = self.cfg.GetNodeList()
1637
    if not self.op.readd:
1638
      dist_nodes.append(node)
1639
    if myself.name in dist_nodes:
1640
      dist_nodes.remove(myself.name)
1641

    
1642
    logger.Debug("Copying hosts and known_hosts to all nodes")
1643
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1644
      result = rpc.call_upload_file(dist_nodes, fname)
1645
      for to_node in dist_nodes:
1646
        if not result[to_node]:
1647
          logger.Error("copy of file %s to node %s failed" %
1648
                       (fname, to_node))
1649

    
1650
    to_copy = self.sstore.GetFileList()
1651
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1652
      to_copy.append(constants.VNC_PASSWORD_FILE)
1653
    for fname in to_copy:
1654
      result = rpc.call_upload_file([node], fname)
1655
      if not result[node]:
1656
        logger.Error("could not copy file %s to node %s" % (fname, node))
1657

    
1658
    if not self.op.readd:
1659
      logger.Info("adding node %s to cluster.conf" % node)
1660
      self.cfg.AddNode(new_node)
1661
      # Add the new node to the Ganeti Lock Manager
1662
      self.context.glm.add(locking.LEVEL_NODE, node)
1663

    
1664

    
1665
class LUMasterFailover(LogicalUnit):
1666
  """Failover the master node to the current node.
1667

1668
  This is a special LU in that it must run on a non-master node.
1669

1670
  """
1671
  HPATH = "master-failover"
1672
  HTYPE = constants.HTYPE_CLUSTER
1673
  REQ_MASTER = False
1674
  REQ_WSSTORE = True
1675
  _OP_REQP = []
1676

    
1677
  def BuildHooksEnv(self):
1678
    """Build hooks env.
1679

1680
    This will run on the new master only in the pre phase, and on all
1681
    the nodes in the post phase.
1682

1683
    """
1684
    env = {
1685
      "OP_TARGET": self.new_master,
1686
      "NEW_MASTER": self.new_master,
1687
      "OLD_MASTER": self.old_master,
1688
      }
1689
    return env, [self.new_master], self.cfg.GetNodeList()
1690

    
1691
  def CheckPrereq(self):
1692
    """Check prerequisites.
1693

1694
    This checks that we are not already the master.
1695

1696
    """
1697
    self.new_master = utils.HostInfo().name
1698
    self.old_master = self.sstore.GetMasterNode()
1699

    
1700
    if self.old_master == self.new_master:
1701
      raise errors.OpPrereqError("This commands must be run on the node"
1702
                                 " where you want the new master to be."
1703
                                 " %s is already the master" %
1704
                                 self.old_master)
1705

    
1706
  def Exec(self, feedback_fn):
1707
    """Failover the master node.
1708

1709
    This command, when run on a non-master node, will cause the current
1710
    master to cease being master, and the non-master to become new
1711
    master.
1712

1713
    """
1714
    #TODO: do not rely on gethostname returning the FQDN
1715
    logger.Info("setting master to %s, old master: %s" %
1716
                (self.new_master, self.old_master))
1717

    
1718
    if not rpc.call_node_stop_master(self.old_master):
1719
      logger.Error("could disable the master role on the old master"
1720
                   " %s, please disable manually" % self.old_master)
1721

    
1722
    ss = self.sstore
1723
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1724
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1725
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1726
      logger.Error("could not distribute the new simple store master file"
1727
                   " to the other nodes, please check.")
1728

    
1729
    if not rpc.call_node_start_master(self.new_master):
1730
      logger.Error("could not start the master role on the new master"
1731
                   " %s, please check" % self.new_master)
1732
      feedback_fn("Error in activating the master IP on the new master,"
1733
                  " please fix manually.")
1734

    
1735

    
1736

    
1737
class LUQueryClusterInfo(NoHooksLU):
1738
  """Query cluster configuration.
1739

1740
  """
1741
  _OP_REQP = []
1742
  REQ_MASTER = False
1743
  REQ_BGL = False
1744

    
1745
  def ExpandNames(self):
1746
    self.needed_locks = {}
1747

    
1748
  def CheckPrereq(self):
1749
    """No prerequsites needed for this LU.
1750

1751
    """
1752
    pass
1753

    
1754
  def Exec(self, feedback_fn):
1755
    """Return cluster config.
1756

1757
    """
1758
    result = {
1759
      "name": self.sstore.GetClusterName(),
1760
      "software_version": constants.RELEASE_VERSION,
1761
      "protocol_version": constants.PROTOCOL_VERSION,
1762
      "config_version": constants.CONFIG_VERSION,
1763
      "os_api_version": constants.OS_API_VERSION,
1764
      "export_version": constants.EXPORT_VERSION,
1765
      "master": self.sstore.GetMasterNode(),
1766
      "architecture": (platform.architecture()[0], platform.machine()),
1767
      "hypervisor_type": self.sstore.GetHypervisorType(),
1768
      }
1769

    
1770
    return result
1771

    
1772

    
1773
class LUDumpClusterConfig(NoHooksLU):
1774
  """Return a text-representation of the cluster-config.
1775

1776
  """
1777
  _OP_REQP = []
1778
  REQ_BGL = False
1779

    
1780
  def ExpandNames(self):
1781
    self.needed_locks = {}
1782

    
1783
  def CheckPrereq(self):
1784
    """No prerequisites.
1785

1786
    """
1787
    pass
1788

    
1789
  def Exec(self, feedback_fn):
1790
    """Dump a representation of the cluster config to the standard output.
1791

1792
    """
1793
    return self.cfg.DumpConfig()
1794

    
1795

    
1796
class LUActivateInstanceDisks(NoHooksLU):
1797
  """Bring up an instance's disks.
1798

1799
  """
1800
  _OP_REQP = ["instance_name"]
1801

    
1802
  def CheckPrereq(self):
1803
    """Check prerequisites.
1804

1805
    This checks that the instance is in the cluster.
1806

1807
    """
1808
    instance = self.cfg.GetInstanceInfo(
1809
      self.cfg.ExpandInstanceName(self.op.instance_name))
1810
    if instance is None:
1811
      raise errors.OpPrereqError("Instance '%s' not known" %
1812
                                 self.op.instance_name)
1813
    self.instance = instance
1814

    
1815

    
1816
  def Exec(self, feedback_fn):
1817
    """Activate the disks.
1818

1819
    """
1820
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1821
    if not disks_ok:
1822
      raise errors.OpExecError("Cannot activate block devices")
1823

    
1824
    return disks_info
1825

    
1826

    
1827
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1828
  """Prepare the block devices for an instance.
1829

1830
  This sets up the block devices on all nodes.
1831

1832
  Args:
1833
    instance: a ganeti.objects.Instance object
1834
    ignore_secondaries: if true, errors on secondary nodes won't result
1835
                        in an error return from the function
1836

1837
  Returns:
1838
    false if the operation failed
1839
    list of (host, instance_visible_name, node_visible_name) if the operation
1840
         suceeded with the mapping from node devices to instance devices
1841
  """
1842
  device_info = []
1843
  disks_ok = True
1844
  iname = instance.name
1845
  # With the two passes mechanism we try to reduce the window of
1846
  # opportunity for the race condition of switching DRBD to primary
1847
  # before handshaking occured, but we do not eliminate it
1848

    
1849
  # The proper fix would be to wait (with some limits) until the
1850
  # connection has been made and drbd transitions from WFConnection
1851
  # into any other network-connected state (Connected, SyncTarget,
1852
  # SyncSource, etc.)
1853

    
1854
  # 1st pass, assemble on all nodes in secondary mode
1855
  for inst_disk in instance.disks:
1856
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1857
      cfg.SetDiskID(node_disk, node)
1858
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1859
      if not result:
1860
        logger.Error("could not prepare block device %s on node %s"
1861
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1862
        if not ignore_secondaries:
1863
          disks_ok = False
1864

    
1865
  # FIXME: race condition on drbd migration to primary
1866

    
1867
  # 2nd pass, do only the primary node
1868
  for inst_disk in instance.disks:
1869
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1870
      if node != instance.primary_node:
1871
        continue
1872
      cfg.SetDiskID(node_disk, node)
1873
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1874
      if not result:
1875
        logger.Error("could not prepare block device %s on node %s"
1876
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1877
        disks_ok = False
1878
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1879

    
1880
  # leave the disks configured for the primary node
1881
  # this is a workaround that would be fixed better by
1882
  # improving the logical/physical id handling
1883
  for disk in instance.disks:
1884
    cfg.SetDiskID(disk, instance.primary_node)
1885

    
1886
  return disks_ok, device_info
1887

    
1888

    
1889
def _StartInstanceDisks(cfg, instance, force):
1890
  """Start the disks of an instance.
1891

1892
  """
1893
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1894
                                           ignore_secondaries=force)
1895
  if not disks_ok:
1896
    _ShutdownInstanceDisks(instance, cfg)
1897
    if force is not None and not force:
1898
      logger.Error("If the message above refers to a secondary node,"
1899
                   " you can retry the operation using '--force'.")
1900
    raise errors.OpExecError("Disk consistency error")
1901

    
1902

    
1903
class LUDeactivateInstanceDisks(NoHooksLU):
1904
  """Shutdown an instance's disks.
1905

1906
  """
1907
  _OP_REQP = ["instance_name"]
1908

    
1909
  def CheckPrereq(self):
1910
    """Check prerequisites.
1911

1912
    This checks that the instance is in the cluster.
1913

1914
    """
1915
    instance = self.cfg.GetInstanceInfo(
1916
      self.cfg.ExpandInstanceName(self.op.instance_name))
1917
    if instance is None:
1918
      raise errors.OpPrereqError("Instance '%s' not known" %
1919
                                 self.op.instance_name)
1920
    self.instance = instance
1921

    
1922
  def Exec(self, feedback_fn):
1923
    """Deactivate the disks
1924

1925
    """
1926
    instance = self.instance
1927
    ins_l = rpc.call_instance_list([instance.primary_node])
1928
    ins_l = ins_l[instance.primary_node]
1929
    if not type(ins_l) is list:
1930
      raise errors.OpExecError("Can't contact node '%s'" %
1931
                               instance.primary_node)
1932

    
1933
    if self.instance.name in ins_l:
1934
      raise errors.OpExecError("Instance is running, can't shutdown"
1935
                               " block devices.")
1936

    
1937
    _ShutdownInstanceDisks(instance, self.cfg)
1938

    
1939

    
1940
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1941
  """Shutdown block devices of an instance.
1942

1943
  This does the shutdown on all nodes of the instance.
1944

1945
  If the ignore_primary is false, errors on the primary node are
1946
  ignored.
1947

1948
  """
1949
  result = True
1950
  for disk in instance.disks:
1951
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1952
      cfg.SetDiskID(top_disk, node)
1953
      if not rpc.call_blockdev_shutdown(node, top_disk):
1954
        logger.Error("could not shutdown block device %s on node %s" %
1955
                     (disk.iv_name, node))
1956
        if not ignore_primary or node != instance.primary_node:
1957
          result = False
1958
  return result
1959

    
1960

    
1961
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1962
  """Checks if a node has enough free memory.
1963

1964
  This function check if a given node has the needed amount of free
1965
  memory. In case the node has less memory or we cannot get the
1966
  information from the node, this function raise an OpPrereqError
1967
  exception.
1968

1969
  Args:
1970
    - cfg: a ConfigWriter instance
1971
    - node: the node name
1972
    - reason: string to use in the error message
1973
    - requested: the amount of memory in MiB
1974

1975
  """
1976
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1977
  if not nodeinfo or not isinstance(nodeinfo, dict):
1978
    raise errors.OpPrereqError("Could not contact node %s for resource"
1979
                             " information" % (node,))
1980

    
1981
  free_mem = nodeinfo[node].get('memory_free')
1982
  if not isinstance(free_mem, int):
1983
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1984
                             " was '%s'" % (node, free_mem))
1985
  if requested > free_mem:
1986
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1987
                             " needed %s MiB, available %s MiB" %
1988
                             (node, reason, requested, free_mem))
1989

    
1990

    
1991
class LUStartupInstance(LogicalUnit):
1992
  """Starts an instance.
1993

1994
  """
1995
  HPATH = "instance-start"
1996
  HTYPE = constants.HTYPE_INSTANCE
1997
  _OP_REQP = ["instance_name", "force"]
1998

    
1999
  def BuildHooksEnv(self):
2000
    """Build hooks env.
2001

2002
    This runs on master, primary and secondary nodes of the instance.
2003

2004
    """
2005
    env = {
2006
      "FORCE": self.op.force,
2007
      }
2008
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2009
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2010
          list(self.instance.secondary_nodes))
2011
    return env, nl, nl
2012

    
2013
  def CheckPrereq(self):
2014
    """Check prerequisites.
2015

2016
    This checks that the instance is in the cluster.
2017

2018
    """
2019
    instance = self.cfg.GetInstanceInfo(
2020
      self.cfg.ExpandInstanceName(self.op.instance_name))
2021
    if instance is None:
2022
      raise errors.OpPrereqError("Instance '%s' not known" %
2023
                                 self.op.instance_name)
2024

    
2025
    # check bridges existance
2026
    _CheckInstanceBridgesExist(instance)
2027

    
2028
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2029
                         "starting instance %s" % instance.name,
2030
                         instance.memory)
2031

    
2032
    self.instance = instance
2033
    self.op.instance_name = instance.name
2034

    
2035
  def Exec(self, feedback_fn):
2036
    """Start the instance.
2037

2038
    """
2039
    instance = self.instance
2040
    force = self.op.force
2041
    extra_args = getattr(self.op, "extra_args", "")
2042

    
2043
    self.cfg.MarkInstanceUp(instance.name)
2044

    
2045
    node_current = instance.primary_node
2046

    
2047
    _StartInstanceDisks(self.cfg, instance, force)
2048

    
2049
    if not rpc.call_instance_start(node_current, instance, extra_args):
2050
      _ShutdownInstanceDisks(instance, self.cfg)
2051
      raise errors.OpExecError("Could not start instance")
2052

    
2053

    
2054
class LURebootInstance(LogicalUnit):
2055
  """Reboot an instance.
2056

2057
  """
2058
  HPATH = "instance-reboot"
2059
  HTYPE = constants.HTYPE_INSTANCE
2060
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2061

    
2062
  def BuildHooksEnv(self):
2063
    """Build hooks env.
2064

2065
    This runs on master, primary and secondary nodes of the instance.
2066

2067
    """
2068
    env = {
2069
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2070
      }
2071
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2072
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2073
          list(self.instance.secondary_nodes))
2074
    return env, nl, nl
2075

    
2076
  def CheckPrereq(self):
2077
    """Check prerequisites.
2078

2079
    This checks that the instance is in the cluster.
2080

2081
    """
2082
    instance = self.cfg.GetInstanceInfo(
2083
      self.cfg.ExpandInstanceName(self.op.instance_name))
2084
    if instance is None:
2085
      raise errors.OpPrereqError("Instance '%s' not known" %
2086
                                 self.op.instance_name)
2087

    
2088
    # check bridges existance
2089
    _CheckInstanceBridgesExist(instance)
2090

    
2091
    self.instance = instance
2092
    self.op.instance_name = instance.name
2093

    
2094
  def Exec(self, feedback_fn):
2095
    """Reboot the instance.
2096

2097
    """
2098
    instance = self.instance
2099
    ignore_secondaries = self.op.ignore_secondaries
2100
    reboot_type = self.op.reboot_type
2101
    extra_args = getattr(self.op, "extra_args", "")
2102

    
2103
    node_current = instance.primary_node
2104

    
2105
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2106
                           constants.INSTANCE_REBOOT_HARD,
2107
                           constants.INSTANCE_REBOOT_FULL]:
2108
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2109
                                  (constants.INSTANCE_REBOOT_SOFT,
2110
                                   constants.INSTANCE_REBOOT_HARD,
2111
                                   constants.INSTANCE_REBOOT_FULL))
2112

    
2113
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2114
                       constants.INSTANCE_REBOOT_HARD]:
2115
      if not rpc.call_instance_reboot(node_current, instance,
2116
                                      reboot_type, extra_args):
2117
        raise errors.OpExecError("Could not reboot instance")
2118
    else:
2119
      if not rpc.call_instance_shutdown(node_current, instance):
2120
        raise errors.OpExecError("could not shutdown instance for full reboot")
2121
      _ShutdownInstanceDisks(instance, self.cfg)
2122
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2123
      if not rpc.call_instance_start(node_current, instance, extra_args):
2124
        _ShutdownInstanceDisks(instance, self.cfg)
2125
        raise errors.OpExecError("Could not start instance for full reboot")
2126

    
2127
    self.cfg.MarkInstanceUp(instance.name)
2128

    
2129

    
2130
class LUShutdownInstance(LogicalUnit):
2131
  """Shutdown an instance.
2132

2133
  """
2134
  HPATH = "instance-stop"
2135
  HTYPE = constants.HTYPE_INSTANCE
2136
  _OP_REQP = ["instance_name"]
2137

    
2138
  def BuildHooksEnv(self):
2139
    """Build hooks env.
2140

2141
    This runs on master, primary and secondary nodes of the instance.
2142

2143
    """
2144
    env = _BuildInstanceHookEnvByObject(self.instance)
2145
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2146
          list(self.instance.secondary_nodes))
2147
    return env, nl, nl
2148

    
2149
  def CheckPrereq(self):
2150
    """Check prerequisites.
2151

2152
    This checks that the instance is in the cluster.
2153

2154
    """
2155
    instance = self.cfg.GetInstanceInfo(
2156
      self.cfg.ExpandInstanceName(self.op.instance_name))
2157
    if instance is None:
2158
      raise errors.OpPrereqError("Instance '%s' not known" %
2159
                                 self.op.instance_name)
2160
    self.instance = instance
2161

    
2162
  def Exec(self, feedback_fn):
2163
    """Shutdown the instance.
2164

2165
    """
2166
    instance = self.instance
2167
    node_current = instance.primary_node
2168
    self.cfg.MarkInstanceDown(instance.name)
2169
    if not rpc.call_instance_shutdown(node_current, instance):
2170
      logger.Error("could not shutdown instance")
2171

    
2172
    _ShutdownInstanceDisks(instance, self.cfg)
2173

    
2174

    
2175
class LUReinstallInstance(LogicalUnit):
2176
  """Reinstall an instance.
2177

2178
  """
2179
  HPATH = "instance-reinstall"
2180
  HTYPE = constants.HTYPE_INSTANCE
2181
  _OP_REQP = ["instance_name"]
2182

    
2183
  def BuildHooksEnv(self):
2184
    """Build hooks env.
2185

2186
    This runs on master, primary and secondary nodes of the instance.
2187

2188
    """
2189
    env = _BuildInstanceHookEnvByObject(self.instance)
2190
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2191
          list(self.instance.secondary_nodes))
2192
    return env, nl, nl
2193

    
2194
  def CheckPrereq(self):
2195
    """Check prerequisites.
2196

2197
    This checks that the instance is in the cluster and is not running.
2198

2199
    """
2200
    instance = self.cfg.GetInstanceInfo(
2201
      self.cfg.ExpandInstanceName(self.op.instance_name))
2202
    if instance is None:
2203
      raise errors.OpPrereqError("Instance '%s' not known" %
2204
                                 self.op.instance_name)
2205
    if instance.disk_template == constants.DT_DISKLESS:
2206
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2207
                                 self.op.instance_name)
2208
    if instance.status != "down":
2209
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2210
                                 self.op.instance_name)
2211
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2212
    if remote_info:
2213
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2214
                                 (self.op.instance_name,
2215
                                  instance.primary_node))
2216

    
2217
    self.op.os_type = getattr(self.op, "os_type", None)
2218
    if self.op.os_type is not None:
2219
      # OS verification
2220
      pnode = self.cfg.GetNodeInfo(
2221
        self.cfg.ExpandNodeName(instance.primary_node))
2222
      if pnode is None:
2223
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2224
                                   self.op.pnode)
2225
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2226
      if not os_obj:
2227
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2228
                                   " primary node"  % self.op.os_type)
2229

    
2230
    self.instance = instance
2231

    
2232
  def Exec(self, feedback_fn):
2233
    """Reinstall the instance.
2234

2235
    """
2236
    inst = self.instance
2237

    
2238
    if self.op.os_type is not None:
2239
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2240
      inst.os = self.op.os_type
2241
      self.cfg.AddInstance(inst)
2242

    
2243
    _StartInstanceDisks(self.cfg, inst, None)
2244
    try:
2245
      feedback_fn("Running the instance OS create scripts...")
2246
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2247
        raise errors.OpExecError("Could not install OS for instance %s"
2248
                                 " on node %s" %
2249
                                 (inst.name, inst.primary_node))
2250
    finally:
2251
      _ShutdownInstanceDisks(inst, self.cfg)
2252

    
2253

    
2254
class LURenameInstance(LogicalUnit):
2255
  """Rename an instance.
2256

2257
  """
2258
  HPATH = "instance-rename"
2259
  HTYPE = constants.HTYPE_INSTANCE
2260
  _OP_REQP = ["instance_name", "new_name"]
2261

    
2262
  def BuildHooksEnv(self):
2263
    """Build hooks env.
2264

2265
    This runs on master, primary and secondary nodes of the instance.
2266

2267
    """
2268
    env = _BuildInstanceHookEnvByObject(self.instance)
2269
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2270
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2271
          list(self.instance.secondary_nodes))
2272
    return env, nl, nl
2273

    
2274
  def CheckPrereq(self):
2275
    """Check prerequisites.
2276

2277
    This checks that the instance is in the cluster and is not running.
2278

2279
    """
2280
    instance = self.cfg.GetInstanceInfo(
2281
      self.cfg.ExpandInstanceName(self.op.instance_name))
2282
    if instance is None:
2283
      raise errors.OpPrereqError("Instance '%s' not known" %
2284
                                 self.op.instance_name)
2285
    if instance.status != "down":
2286
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2287
                                 self.op.instance_name)
2288
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2289
    if remote_info:
2290
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2291
                                 (self.op.instance_name,
2292
                                  instance.primary_node))
2293
    self.instance = instance
2294

    
2295
    # new name verification
2296
    name_info = utils.HostInfo(self.op.new_name)
2297

    
2298
    self.op.new_name = new_name = name_info.name
2299
    instance_list = self.cfg.GetInstanceList()
2300
    if new_name in instance_list:
2301
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2302
                                 new_name)
2303

    
2304
    if not getattr(self.op, "ignore_ip", False):
2305
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2306
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2307
                                   (name_info.ip, new_name))
2308

    
2309

    
2310
  def Exec(self, feedback_fn):
2311
    """Reinstall the instance.
2312

2313
    """
2314
    inst = self.instance
2315
    old_name = inst.name
2316

    
2317
    if inst.disk_template == constants.DT_FILE:
2318
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2319

    
2320
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2321

    
2322
    # re-read the instance from the configuration after rename
2323
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2324

    
2325
    if inst.disk_template == constants.DT_FILE:
2326
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2327
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2328
                                                old_file_storage_dir,
2329
                                                new_file_storage_dir)
2330

    
2331
      if not result:
2332
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2333
                                 " directory '%s' to '%s' (but the instance"
2334
                                 " has been renamed in Ganeti)" % (
2335
                                 inst.primary_node, old_file_storage_dir,
2336
                                 new_file_storage_dir))
2337

    
2338
      if not result[0]:
2339
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2340
                                 " (but the instance has been renamed in"
2341
                                 " Ganeti)" % (old_file_storage_dir,
2342
                                               new_file_storage_dir))
2343

    
2344
    _StartInstanceDisks(self.cfg, inst, None)
2345
    try:
2346
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2347
                                          "sda", "sdb"):
2348
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2349
               " instance has been renamed in Ganeti)" %
2350
               (inst.name, inst.primary_node))
2351
        logger.Error(msg)
2352
    finally:
2353
      _ShutdownInstanceDisks(inst, self.cfg)
2354

    
2355

    
2356
class LURemoveInstance(LogicalUnit):
2357
  """Remove an instance.
2358

2359
  """
2360
  HPATH = "instance-remove"
2361
  HTYPE = constants.HTYPE_INSTANCE
2362
  _OP_REQP = ["instance_name", "ignore_failures"]
2363

    
2364
  def BuildHooksEnv(self):
2365
    """Build hooks env.
2366

2367
    This runs on master, primary and secondary nodes of the instance.
2368

2369
    """
2370
    env = _BuildInstanceHookEnvByObject(self.instance)
2371
    nl = [self.sstore.GetMasterNode()]
2372
    return env, nl, nl
2373

    
2374
  def CheckPrereq(self):
2375
    """Check prerequisites.
2376

2377
    This checks that the instance is in the cluster.
2378

2379
    """
2380
    instance = self.cfg.GetInstanceInfo(
2381
      self.cfg.ExpandInstanceName(self.op.instance_name))
2382
    if instance is None:
2383
      raise errors.OpPrereqError("Instance '%s' not known" %
2384
                                 self.op.instance_name)
2385
    self.instance = instance
2386

    
2387
  def Exec(self, feedback_fn):
2388
    """Remove the instance.
2389

2390
    """
2391
    instance = self.instance
2392
    logger.Info("shutting down instance %s on node %s" %
2393
                (instance.name, instance.primary_node))
2394

    
2395
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2396
      if self.op.ignore_failures:
2397
        feedback_fn("Warning: can't shutdown instance")
2398
      else:
2399
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2400
                                 (instance.name, instance.primary_node))
2401

    
2402
    logger.Info("removing block devices for instance %s" % instance.name)
2403

    
2404
    if not _RemoveDisks(instance, self.cfg):
2405
      if self.op.ignore_failures:
2406
        feedback_fn("Warning: can't remove instance's disks")
2407
      else:
2408
        raise errors.OpExecError("Can't remove instance's disks")
2409

    
2410
    logger.Info("removing instance %s out of cluster config" % instance.name)
2411

    
2412
    self.cfg.RemoveInstance(instance.name)
2413
    # Remove the new instance from the Ganeti Lock Manager
2414
    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2415

    
2416

    
2417
class LUQueryInstances(NoHooksLU):
2418
  """Logical unit for querying instances.
2419

2420
  """
2421
  _OP_REQP = ["output_fields", "names"]
2422

    
2423
  def CheckPrereq(self):
2424
    """Check prerequisites.
2425

2426
    This checks that the fields required are valid output fields.
2427

2428
    """
2429
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2430
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2431
                               "admin_state", "admin_ram",
2432
                               "disk_template", "ip", "mac", "bridge",
2433
                               "sda_size", "sdb_size", "vcpus", "tags"],
2434
                       dynamic=self.dynamic_fields,
2435
                       selected=self.op.output_fields)
2436

    
2437
    self.wanted = _GetWantedInstances(self, self.op.names)
2438

    
2439
  def Exec(self, feedback_fn):
2440
    """Computes the list of nodes and their attributes.
2441

2442
    """
2443
    instance_names = self.wanted
2444
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2445
                     in instance_names]
2446

    
2447
    # begin data gathering
2448

    
2449
    nodes = frozenset([inst.primary_node for inst in instance_list])
2450

    
2451
    bad_nodes = []
2452
    if self.dynamic_fields.intersection(self.op.output_fields):
2453
      live_data = {}
2454
      node_data = rpc.call_all_instances_info(nodes)
2455
      for name in nodes:
2456
        result = node_data[name]
2457
        if result:
2458
          live_data.update(result)
2459
        elif result == False:
2460
          bad_nodes.append(name)
2461
        # else no instance is alive
2462
    else:
2463
      live_data = dict([(name, {}) for name in instance_names])
2464

    
2465
    # end data gathering
2466

    
2467
    output = []
2468
    for instance in instance_list:
2469
      iout = []
2470
      for field in self.op.output_fields:
2471
        if field == "name":
2472
          val = instance.name
2473
        elif field == "os":
2474
          val = instance.os
2475
        elif field == "pnode":
2476
          val = instance.primary_node
2477
        elif field == "snodes":
2478
          val = list(instance.secondary_nodes)
2479
        elif field == "admin_state":
2480
          val = (instance.status != "down")
2481
        elif field == "oper_state":
2482
          if instance.primary_node in bad_nodes:
2483
            val = None
2484
          else:
2485
            val = bool(live_data.get(instance.name))
2486
        elif field == "status":
2487
          if instance.primary_node in bad_nodes:
2488
            val = "ERROR_nodedown"
2489
          else:
2490
            running = bool(live_data.get(instance.name))
2491
            if running:
2492
              if instance.status != "down":
2493
                val = "running"
2494
              else:
2495
                val = "ERROR_up"
2496
            else:
2497
              if instance.status != "down":
2498
                val = "ERROR_down"
2499
              else:
2500
                val = "ADMIN_down"
2501
        elif field == "admin_ram":
2502
          val = instance.memory
2503
        elif field == "oper_ram":
2504
          if instance.primary_node in bad_nodes:
2505
            val = None
2506
          elif instance.name in live_data:
2507
            val = live_data[instance.name].get("memory", "?")
2508
          else:
2509
            val = "-"
2510
        elif field == "disk_template":
2511
          val = instance.disk_template
2512
        elif field == "ip":
2513
          val = instance.nics[0].ip
2514
        elif field == "bridge":
2515
          val = instance.nics[0].bridge
2516
        elif field == "mac":
2517
          val = instance.nics[0].mac
2518
        elif field == "sda_size" or field == "sdb_size":
2519
          disk = instance.FindDisk(field[:3])
2520
          if disk is None:
2521
            val = None
2522
          else:
2523
            val = disk.size
2524
        elif field == "vcpus":
2525
          val = instance.vcpus
2526
        elif field == "tags":
2527
          val = list(instance.GetTags())
2528
        else:
2529
          raise errors.ParameterError(field)
2530
        iout.append(val)
2531
      output.append(iout)
2532

    
2533
    return output
2534

    
2535

    
2536
class LUFailoverInstance(LogicalUnit):
2537
  """Failover an instance.
2538

2539
  """
2540
  HPATH = "instance-failover"
2541
  HTYPE = constants.HTYPE_INSTANCE
2542
  _OP_REQP = ["instance_name", "ignore_consistency"]
2543

    
2544
  def BuildHooksEnv(self):
2545
    """Build hooks env.
2546

2547
    This runs on master, primary and secondary nodes of the instance.
2548

2549
    """
2550
    env = {
2551
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2552
      }
2553
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2554
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2555
    return env, nl, nl
2556

    
2557
  def CheckPrereq(self):
2558
    """Check prerequisites.
2559

2560
    This checks that the instance is in the cluster.
2561

2562
    """
2563
    instance = self.cfg.GetInstanceInfo(
2564
      self.cfg.ExpandInstanceName(self.op.instance_name))
2565
    if instance is None:
2566
      raise errors.OpPrereqError("Instance '%s' not known" %
2567
                                 self.op.instance_name)
2568

    
2569
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2570
      raise errors.OpPrereqError("Instance's disk layout is not"
2571
                                 " network mirrored, cannot failover.")
2572

    
2573
    secondary_nodes = instance.secondary_nodes
2574
    if not secondary_nodes:
2575
      raise errors.ProgrammerError("no secondary node but using "
2576
                                   "a mirrored disk template")
2577

    
2578
    target_node = secondary_nodes[0]
2579
    # check memory requirements on the secondary node
2580
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2581
                         instance.name, instance.memory)
2582

    
2583
    # check bridge existance
2584
    brlist = [nic.bridge for nic in instance.nics]
2585
    if not rpc.call_bridges_exist(target_node, brlist):
2586
      raise errors.OpPrereqError("One or more target bridges %s does not"
2587
                                 " exist on destination node '%s'" %
2588
                                 (brlist, target_node))
2589

    
2590
    self.instance = instance
2591

    
2592
  def Exec(self, feedback_fn):
2593
    """Failover an instance.
2594

2595
    The failover is done by shutting it down on its present node and
2596
    starting it on the secondary.
2597

2598
    """
2599
    instance = self.instance
2600

    
2601
    source_node = instance.primary_node
2602
    target_node = instance.secondary_nodes[0]
2603

    
2604
    feedback_fn("* checking disk consistency between source and target")
2605
    for dev in instance.disks:
2606
      # for drbd, these are drbd over lvm
2607
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2608
        if instance.status == "up" and not self.op.ignore_consistency:
2609
          raise errors.OpExecError("Disk %s is degraded on target node,"
2610
                                   " aborting failover." % dev.iv_name)
2611

    
2612
    feedback_fn("* shutting down instance on source node")
2613
    logger.Info("Shutting down instance %s on node %s" %
2614
                (instance.name, source_node))
2615

    
2616
    if not rpc.call_instance_shutdown(source_node, instance):
2617
      if self.op.ignore_consistency:
2618
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2619
                     " anyway. Please make sure node %s is down"  %
2620
                     (instance.name, source_node, source_node))
2621
      else:
2622
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2623
                                 (instance.name, source_node))
2624

    
2625
    feedback_fn("* deactivating the instance's disks on source node")
2626
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2627
      raise errors.OpExecError("Can't shut down the instance's disks.")
2628

    
2629
    instance.primary_node = target_node
2630
    # distribute new instance config to the other nodes
2631
    self.cfg.Update(instance)
2632

    
2633
    # Only start the instance if it's marked as up
2634
    if instance.status == "up":
2635
      feedback_fn("* activating the instance's disks on target node")
2636
      logger.Info("Starting instance %s on node %s" %
2637
                  (instance.name, target_node))
2638

    
2639
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2640
                                               ignore_secondaries=True)
2641
      if not disks_ok:
2642
        _ShutdownInstanceDisks(instance, self.cfg)
2643
        raise errors.OpExecError("Can't activate the instance's disks")
2644

    
2645
      feedback_fn("* starting the instance on the target node")
2646
      if not rpc.call_instance_start(target_node, instance, None):
2647
        _ShutdownInstanceDisks(instance, self.cfg)
2648
        raise errors.OpExecError("Could not start instance %s on node %s." %
2649
                                 (instance.name, target_node))
2650

    
2651

    
2652
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2653
  """Create a tree of block devices on the primary node.
2654

2655
  This always creates all devices.
2656

2657
  """
2658
  if device.children:
2659
    for child in device.children:
2660
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2661
        return False
2662

    
2663
  cfg.SetDiskID(device, node)
2664
  new_id = rpc.call_blockdev_create(node, device, device.size,
2665
                                    instance.name, True, info)
2666
  if not new_id:
2667
    return False
2668
  if device.physical_id is None:
2669
    device.physical_id = new_id
2670
  return True
2671

    
2672

    
2673
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2674
  """Create a tree of block devices on a secondary node.
2675

2676
  If this device type has to be created on secondaries, create it and
2677
  all its children.
2678

2679
  If not, just recurse to children keeping the same 'force' value.
2680

2681
  """
2682
  if device.CreateOnSecondary():
2683
    force = True
2684
  if device.children:
2685
    for child in device.children:
2686
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2687
                                        child, force, info):
2688
        return False
2689

    
2690
  if not force:
2691
    return True
2692
  cfg.SetDiskID(device, node)
2693
  new_id = rpc.call_blockdev_create(node, device, device.size,
2694
                                    instance.name, False, info)
2695
  if not new_id:
2696
    return False
2697
  if device.physical_id is None:
2698
    device.physical_id = new_id
2699
  return True
2700

    
2701

    
2702
def _GenerateUniqueNames(cfg, exts):
2703
  """Generate a suitable LV name.
2704

2705
  This will generate a logical volume name for the given instance.
2706

2707
  """
2708
  results = []
2709
  for val in exts:
2710
    new_id = cfg.GenerateUniqueID()
2711
    results.append("%s%s" % (new_id, val))
2712
  return results
2713

    
2714

    
2715
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2716
  """Generate a drbd8 device complete with its children.
2717

2718
  """
2719
  port = cfg.AllocatePort()
2720
  vgname = cfg.GetVGName()
2721
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2722
                          logical_id=(vgname, names[0]))
2723
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2724
                          logical_id=(vgname, names[1]))
2725
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2726
                          logical_id = (primary, secondary, port),
2727
                          children = [dev_data, dev_meta],
2728
                          iv_name=iv_name)
2729
  return drbd_dev
2730

    
2731

    
2732
def _GenerateDiskTemplate(cfg, template_name,
2733
                          instance_name, primary_node,
2734
                          secondary_nodes, disk_sz, swap_sz,
2735
                          file_storage_dir, file_driver):
2736
  """Generate the entire disk layout for a given template type.
2737

2738
  """
2739
  #TODO: compute space requirements
2740

    
2741
  vgname = cfg.GetVGName()
2742
  if template_name == constants.DT_DISKLESS:
2743
    disks = []
2744
  elif template_name == constants.DT_PLAIN:
2745
    if len(secondary_nodes) != 0:
2746
      raise errors.ProgrammerError("Wrong template configuration")
2747

    
2748
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2749
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2750
                           logical_id=(vgname, names[0]),
2751
                           iv_name = "sda")
2752
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2753
                           logical_id=(vgname, names[1]),
2754
                           iv_name = "sdb")
2755
    disks = [sda_dev, sdb_dev]
2756
  elif template_name == constants.DT_DRBD8:
2757
    if len(secondary_nodes) != 1:
2758
      raise errors.ProgrammerError("Wrong template configuration")
2759
    remote_node = secondary_nodes[0]
2760
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2761
                                       ".sdb_data", ".sdb_meta"])
2762
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2763
                                         disk_sz, names[0:2], "sda")
2764
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2765
                                         swap_sz, names[2:4], "sdb")
2766
    disks = [drbd_sda_dev, drbd_sdb_dev]
2767
  elif template_name == constants.DT_FILE:
2768
    if len(secondary_nodes) != 0:
2769
      raise errors.ProgrammerError("Wrong template configuration")
2770

    
2771
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2772
                                iv_name="sda", logical_id=(file_driver,
2773
                                "%s/sda" % file_storage_dir))
2774
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2775
                                iv_name="sdb", logical_id=(file_driver,
2776
                                "%s/sdb" % file_storage_dir))
2777
    disks = [file_sda_dev, file_sdb_dev]
2778
  else:
2779
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2780
  return disks
2781

    
2782

    
2783
def _GetInstanceInfoText(instance):
2784
  """Compute that text that should be added to the disk's metadata.
2785

2786
  """
2787
  return "originstname+%s" % instance.name
2788

    
2789

    
2790
def _CreateDisks(cfg, instance):
2791
  """Create all disks for an instance.
2792

2793
  This abstracts away some work from AddInstance.
2794

2795
  Args:
2796
    instance: the instance object
2797

2798
  Returns:
2799
    True or False showing the success of the creation process
2800

2801
  """
2802
  info = _GetInstanceInfoText(instance)
2803

    
2804
  if instance.disk_template == constants.DT_FILE:
2805
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2806
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2807
                                              file_storage_dir)
2808

    
2809
    if not result:
2810
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2811
      return False
2812

    
2813
    if not result[0]:
2814
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2815
      return False
2816

    
2817
  for device in instance.disks:
2818
    logger.Info("creating volume %s for instance %s" %
2819
                (device.iv_name, instance.name))
2820
    #HARDCODE
2821
    for secondary_node in instance.secondary_nodes:
2822
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2823
                                        device, False, info):
2824
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2825
                     (device.iv_name, device, secondary_node))
2826
        return False
2827
    #HARDCODE
2828
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2829
                                    instance, device, info):
2830
      logger.Error("failed to create volume %s on primary!" %
2831
                   device.iv_name)
2832
      return False
2833

    
2834
  return True
2835

    
2836

    
2837
def _RemoveDisks(instance, cfg):
2838
  """Remove all disks for an instance.
2839

2840
  This abstracts away some work from `AddInstance()` and
2841
  `RemoveInstance()`. Note that in case some of the devices couldn't
2842
  be removed, the removal will continue with the other ones (compare
2843
  with `_CreateDisks()`).
2844

2845
  Args:
2846
    instance: the instance object
2847

2848
  Returns:
2849
    True or False showing the success of the removal proces
2850

2851
  """
2852
  logger.Info("removing block devices for instance %s" % instance.name)
2853

    
2854
  result = True
2855
  for device in instance.disks:
2856
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2857
      cfg.SetDiskID(disk, node)
2858
      if not rpc.call_blockdev_remove(node, disk):
2859
        logger.Error("could not remove block device %s on node %s,"
2860
                     " continuing anyway" %
2861
                     (device.iv_name, node))
2862
        result = False
2863

    
2864
  if instance.disk_template == constants.DT_FILE:
2865
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2866
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
2867
                                            file_storage_dir):
2868
      logger.Error("could not remove directory '%s'" % file_storage_dir)
2869
      result = False
2870

    
2871
  return result
2872

    
2873

    
2874
def _ComputeDiskSize(disk_template, disk_size, swap_size):
2875
  """Compute disk size requirements in the volume group
2876

2877
  This is currently hard-coded for the two-drive layout.
2878

2879
  """
2880
  # Required free disk space as a function of disk and swap space
2881
  req_size_dict = {
2882
    constants.DT_DISKLESS: None,
2883
    constants.DT_PLAIN: disk_size + swap_size,
2884
    # 256 MB are added for drbd metadata, 128MB for each drbd device
2885
    constants.DT_DRBD8: disk_size + swap_size + 256,
2886
    constants.DT_FILE: None,
2887
  }
2888

    
2889
  if disk_template not in req_size_dict:
2890
    raise errors.ProgrammerError("Disk template '%s' size requirement"
2891
                                 " is unknown" %  disk_template)
2892

    
2893
  return req_size_dict[disk_template]
2894

    
2895

    
2896
class LUCreateInstance(LogicalUnit):
2897
  """Create an instance.
2898

2899
  """
2900
  HPATH = "instance-add"
2901
  HTYPE = constants.HTYPE_INSTANCE
2902
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
2903
              "disk_template", "swap_size", "mode", "start", "vcpus",
2904
              "wait_for_sync", "ip_check", "mac"]
2905

    
2906
  def _RunAllocator(self):
2907
    """Run the allocator based on input opcode.
2908

2909
    """
2910
    disks = [{"size": self.op.disk_size, "mode": "w"},
2911
             {"size": self.op.swap_size, "mode": "w"}]
2912
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2913
             "bridge": self.op.bridge}]
2914
    ial = IAllocator(self.cfg, self.sstore,
2915
                     mode=constants.IALLOCATOR_MODE_ALLOC,
2916
                     name=self.op.instance_name,
2917
                     disk_template=self.op.disk_template,
2918
                     tags=[],
2919
                     os=self.op.os_type,
2920
                     vcpus=self.op.vcpus,
2921
                     mem_size=self.op.mem_size,
2922
                     disks=disks,
2923
                     nics=nics,
2924
                     )
2925

    
2926
    ial.Run(self.op.iallocator)
2927

    
2928
    if not ial.success:
2929
      raise errors.OpPrereqError("Can't compute nodes using"
2930
                                 " iallocator '%s': %s" % (self.op.iallocator,
2931
                                                           ial.info))
2932
    if len(ial.nodes) != ial.required_nodes:
2933
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2934
                                 " of nodes (%s), required %s" %
2935
                                 (len(ial.nodes), ial.required_nodes))
2936
    self.op.pnode = ial.nodes[0]
2937
    logger.ToStdout("Selected nodes for the instance: %s" %
2938
                    (", ".join(ial.nodes),))
2939
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2940
                (self.op.instance_name, self.op.iallocator, ial.nodes))
2941
    if ial.required_nodes == 2:
2942
      self.op.snode = ial.nodes[1]
2943

    
2944
  def BuildHooksEnv(self):
2945
    """Build hooks env.
2946

2947
    This runs on master, primary and secondary nodes of the instance.
2948

2949
    """
2950
    env = {
2951
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2952
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2953
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2954
      "INSTANCE_ADD_MODE": self.op.mode,
2955
      }
2956
    if self.op.mode == constants.INSTANCE_IMPORT:
2957
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2958
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2959
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2960

    
2961
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2962
      primary_node=self.op.pnode,
2963
      secondary_nodes=self.secondaries,
2964
      status=self.instance_status,
2965
      os_type=self.op.os_type,
2966
      memory=self.op.mem_size,
2967
      vcpus=self.op.vcpus,
2968
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2969
    ))
2970

    
2971
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2972
          self.secondaries)
2973
    return env, nl, nl
2974

    
2975

    
2976
  def CheckPrereq(self):
2977
    """Check prerequisites.
2978

2979
    """
2980
    # set optional parameters to none if they don't exist
2981
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
2982
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
2983
                 "vnc_bind_address"]:
2984
      if not hasattr(self.op, attr):
2985
        setattr(self.op, attr, None)
2986

    
2987
    if self.op.mode not in (constants.INSTANCE_CREATE,
2988
                            constants.INSTANCE_IMPORT):
2989
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2990
                                 self.op.mode)
2991

    
2992
    if (not self.cfg.GetVGName() and
2993
        self.op.disk_template not in constants.DTS_NOT_LVM):
2994
      raise errors.OpPrereqError("Cluster does not support lvm-based"
2995
                                 " instances")
2996

    
2997
    if self.op.mode == constants.INSTANCE_IMPORT:
2998
      src_node = getattr(self.op, "src_node", None)
2999
      src_path = getattr(self.op, "src_path", None)
3000
      if src_node is None or src_path is None:
3001
        raise errors.OpPrereqError("Importing an instance requires source"
3002
                                   " node and path options")
3003
      src_node_full = self.cfg.ExpandNodeName(src_node)
3004
      if src_node_full is None:
3005
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3006
      self.op.src_node = src_node = src_node_full
3007

    
3008
      if not os.path.isabs(src_path):
3009
        raise errors.OpPrereqError("The source path must be absolute")
3010

    
3011
      export_info = rpc.call_export_info(src_node, src_path)
3012

    
3013
      if not export_info:
3014
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3015

    
3016
      if not export_info.has_section(constants.INISECT_EXP):
3017
        raise errors.ProgrammerError("Corrupted export config")
3018

    
3019
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3020
      if (int(ei_version) != constants.EXPORT_VERSION):
3021
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3022
                                   (ei_version, constants.EXPORT_VERSION))
3023

    
3024
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3025
        raise errors.OpPrereqError("Can't import instance with more than"
3026
                                   " one data disk")
3027

    
3028
      # FIXME: are the old os-es, disk sizes, etc. useful?
3029
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3030
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3031
                                                         'disk0_dump'))
3032
      self.src_image = diskimage
3033
    else: # INSTANCE_CREATE
3034
      if getattr(self.op, "os_type", None) is None:
3035
        raise errors.OpPrereqError("No guest OS specified")
3036

    
3037
    #### instance parameters check
3038

    
3039
    # disk template and mirror node verification
3040
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3041
      raise errors.OpPrereqError("Invalid disk template name")
3042

    
3043
    # instance name verification
3044
    hostname1 = utils.HostInfo(self.op.instance_name)
3045

    
3046
    self.op.instance_name = instance_name = hostname1.name
3047
    instance_list = self.cfg.GetInstanceList()
3048
    if instance_name in instance_list:
3049
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3050
                                 instance_name)
3051

    
3052
    # ip validity checks
3053
    ip = getattr(self.op, "ip", None)
3054
    if ip is None or ip.lower() == "none":
3055
      inst_ip = None
3056
    elif ip.lower() == "auto":
3057
      inst_ip = hostname1.ip
3058
    else:
3059
      if not utils.IsValidIP(ip):
3060
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3061
                                   " like a valid IP" % ip)
3062
      inst_ip = ip
3063
    self.inst_ip = self.op.ip = inst_ip
3064

    
3065
    if self.op.start and not self.op.ip_check:
3066
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3067
                                 " adding an instance in start mode")
3068

    
3069
    if self.op.ip_check:
3070
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3071
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3072
                                   (hostname1.ip, instance_name))
3073

    
3074
    # MAC address verification
3075
    if self.op.mac != "auto":
3076
      if not utils.IsValidMac(self.op.mac.lower()):
3077
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3078
                                   self.op.mac)
3079

    
3080
    # bridge verification
3081
    bridge = getattr(self.op, "bridge", None)
3082
    if bridge is None:
3083
      self.op.bridge = self.cfg.GetDefBridge()
3084
    else:
3085
      self.op.bridge = bridge
3086

    
3087
    # boot order verification
3088
    if self.op.hvm_boot_order is not None:
3089
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3090
        raise errors.OpPrereqError("invalid boot order specified,"
3091
                                   " must be one or more of [acdn]")
3092
    # file storage checks
3093
    if (self.op.file_driver and
3094
        not self.op.file_driver in constants.FILE_DRIVER):
3095
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3096
                                 self.op.file_driver)
3097

    
3098
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3099
      raise errors.OpPrereqError("File storage directory not a relative"
3100
                                 " path")
3101
    #### allocator run
3102

    
3103
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3104
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3105
                                 " node must be given")
3106

    
3107
    if self.op.iallocator is not None:
3108
      self._RunAllocator()
3109

    
3110
    #### node related checks
3111

    
3112
    # check primary node
3113
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3114
    if pnode is None:
3115
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3116
                                 self.op.pnode)
3117
    self.op.pnode = pnode.name
3118
    self.pnode = pnode
3119
    self.secondaries = []
3120

    
3121
    # mirror node verification
3122
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3123
      if getattr(self.op, "snode", None) is None:
3124
        raise errors.OpPrereqError("The networked disk templates need"
3125
                                   " a mirror node")
3126

    
3127
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3128
      if snode_name is None:
3129
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3130
                                   self.op.snode)
3131
      elif snode_name == pnode.name:
3132
        raise errors.OpPrereqError("The secondary node cannot be"
3133
                                   " the primary node.")
3134
      self.secondaries.append(snode_name)
3135

    
3136
    req_size = _ComputeDiskSize(self.op.disk_template,
3137
                                self.op.disk_size, self.op.swap_size)
3138

    
3139
    # Check lv size requirements
3140
    if req_size is not None:
3141
      nodenames = [pnode.name] + self.secondaries
3142
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3143
      for node in nodenames:
3144
        info = nodeinfo.get(node, None)
3145
        if not info:
3146
          raise errors.OpPrereqError("Cannot get current information"
3147
                                     " from node '%s'" % node)
3148
        vg_free = info.get('vg_free', None)
3149
        if not isinstance(vg_free, int):
3150
          raise errors.OpPrereqError("Can't compute free disk space on"
3151
                                     " node %s" % node)
3152
        if req_size > info['vg_free']:
3153
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3154
                                     " %d MB available, %d MB required" %
3155
                                     (node, info['vg_free'], req_size))
3156

    
3157
    # os verification
3158
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3159
    if not os_obj:
3160
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3161
                                 " primary node"  % self.op.os_type)
3162

    
3163
    if self.op.kernel_path == constants.VALUE_NONE:
3164
      raise errors.OpPrereqError("Can't set instance kernel to none")
3165

    
3166

    
3167
    # bridge check on primary node
3168
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3169
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3170
                                 " destination node '%s'" %
3171
                                 (self.op.bridge, pnode.name))
3172

    
3173
    # memory check on primary node
3174
    if self.op.start:
3175
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3176
                           "creating instance %s" % self.op.instance_name,
3177
                           self.op.mem_size)
3178

    
3179
    # hvm_cdrom_image_path verification
3180
    if self.op.hvm_cdrom_image_path is not None:
3181
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3182
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3183
                                   " be an absolute path or None, not %s" %
3184
                                   self.op.hvm_cdrom_image_path)
3185
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3186
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3187
                                   " regular file or a symlink pointing to"
3188
                                   " an existing regular file, not %s" %
3189
                                   self.op.hvm_cdrom_image_path)
3190

    
3191
    # vnc_bind_address verification
3192
    if self.op.vnc_bind_address is not None:
3193
      if not utils.IsValidIP(self.op.vnc_bind_address):
3194
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3195
                                   " like a valid IP address" %
3196
                                   self.op.vnc_bind_address)
3197

    
3198
    if self.op.start:
3199
      self.instance_status = 'up'
3200
    else:
3201
      self.instance_status = 'down'
3202

    
3203
  def Exec(self, feedback_fn):
3204
    """Create and add the instance to the cluster.
3205

3206
    """
3207
    instance = self.op.instance_name
3208
    pnode_name = self.pnode.name
3209

    
3210
    if self.op.mac == "auto":
3211
      mac_address = self.cfg.GenerateMAC()
3212
    else:
3213
      mac_address = self.op.mac
3214

    
3215
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3216
    if self.inst_ip is not None:
3217
      nic.ip = self.inst_ip
3218

    
3219
    ht_kind = self.sstore.GetHypervisorType()
3220
    if ht_kind in constants.HTS_REQ_PORT:
3221
      network_port = self.cfg.AllocatePort()
3222
    else:
3223
      network_port = None
3224

    
3225
    if self.op.vnc_bind_address is None:
3226
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3227

    
3228
    # this is needed because os.path.join does not accept None arguments
3229
    if self.op.file_storage_dir is None:
3230
      string_file_storage_dir = ""
3231
    else:
3232
      string_file_storage_dir = self.op.file_storage_dir
3233

    
3234
    # build the full file storage dir path
3235
    file_storage_dir = os.path.normpath(os.path.join(
3236
                                        self.sstore.GetFileStorageDir(),
3237
                                        string_file_storage_dir, instance))
3238

    
3239

    
3240
    disks = _GenerateDiskTemplate(self.cfg,
3241
                                  self.op.disk_template,
3242
                                  instance, pnode_name,
3243
                                  self.secondaries, self.op.disk_size,
3244
                                  self.op.swap_size,
3245
                                  file_storage_dir,
3246
                                  self.op.file_driver)
3247

    
3248
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3249
                            primary_node=pnode_name,
3250
                            memory=self.op.mem_size,
3251
                            vcpus=self.op.vcpus,
3252
                            nics=[nic], disks=disks,
3253
                            disk_template=self.op.disk_template,
3254
                            status=self.instance_status,
3255
                            network_port=network_port,
3256
                            kernel_path=self.op.kernel_path,
3257
                            initrd_path=self.op.initrd_path,
3258
                            hvm_boot_order=self.op.hvm_boot_order,
3259
                            hvm_acpi=self.op.hvm_acpi,
3260
                            hvm_pae=self.op.hvm_pae,
3261
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3262
                            vnc_bind_address=self.op.vnc_bind_address,
3263
                            )
3264

    
3265
    feedback_fn("* creating instance disks...")
3266
    if not _CreateDisks(self.cfg, iobj):
3267
      _RemoveDisks(iobj, self.cfg)
3268
      raise errors.OpExecError("Device creation failed, reverting...")
3269

    
3270
    feedback_fn("adding instance %s to cluster config" % instance)
3271

    
3272
    self.cfg.AddInstance(iobj)
3273
    # Add the new instance to the Ganeti Lock Manager
3274
    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3275

    
3276
    if self.op.wait_for_sync:
3277
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3278
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3279
      # make sure the disks are not degraded (still sync-ing is ok)
3280
      time.sleep(15)
3281
      feedback_fn("* checking mirrors status")
3282
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3283
    else:
3284
      disk_abort = False
3285

    
3286
    if disk_abort:
3287
      _RemoveDisks(iobj, self.cfg)
3288
      self.cfg.RemoveInstance(iobj.name)
3289
      # Remove the new instance from the Ganeti Lock Manager
3290
      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3291
      raise errors.OpExecError("There are some degraded disks for"
3292
                               " this instance")
3293

    
3294
    feedback_fn("creating os for instance %s on node %s" %
3295
                (instance, pnode_name))
3296

    
3297
    if iobj.disk_template != constants.DT_DISKLESS:
3298
      if self.op.mode == constants.INSTANCE_CREATE:
3299
        feedback_fn("* running the instance OS create scripts...")
3300
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3301
          raise errors.OpExecError("could not add os for instance %s"
3302
                                   " on node %s" %
3303
                                   (instance, pnode_name))
3304

    
3305
      elif self.op.mode == constants.INSTANCE_IMPORT:
3306
        feedback_fn("* running the instance OS import scripts...")
3307
        src_node = self.op.src_node
3308
        src_image = self.src_image
3309
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3310
                                                src_node, src_image):
3311
          raise errors.OpExecError("Could not import os for instance"
3312
                                   " %s on node %s" %
3313
                                   (instance, pnode_name))
3314
      else:
3315
        # also checked in the prereq part
3316
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3317
                                     % self.op.mode)
3318

    
3319
    if self.op.start:
3320
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3321
      feedback_fn("* starting instance...")
3322
      if not rpc.call_instance_start(pnode_name, iobj, None):
3323
        raise errors.OpExecError("Could not start instance")
3324

    
3325

    
3326
class LUConnectConsole(NoHooksLU):
3327
  """Connect to an instance's console.
3328

3329
  This is somewhat special in that it returns the command line that
3330
  you need to run on the master node in order to connect to the
3331
  console.
3332

3333
  """
3334
  _OP_REQP = ["instance_name"]
3335

    
3336
  def CheckPrereq(self):
3337
    """Check prerequisites.
3338

3339
    This checks that the instance is in the cluster.
3340

3341
    """
3342
    instance = self.cfg.GetInstanceInfo(
3343
      self.cfg.ExpandInstanceName(self.op.instance_name))
3344
    if instance is None:
3345
      raise errors.OpPrereqError("Instance '%s' not known" %
3346
                                 self.op.instance_name)
3347
    self.instance = instance
3348

    
3349
  def Exec(self, feedback_fn):
3350
    """Connect to the console of an instance
3351

3352
    """
3353
    instance = self.instance
3354
    node = instance.primary_node
3355

    
3356
    node_insts = rpc.call_instance_list([node])[node]
3357
    if node_insts is False:
3358
      raise errors.OpExecError("Can't connect to node %s." % node)
3359

    
3360
    if instance.name not in node_insts:
3361
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3362

    
3363
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3364

    
3365
    hyper = hypervisor.GetHypervisor()
3366
    console_cmd = hyper.GetShellCommandForConsole(instance)
3367

    
3368
    # build ssh cmdline
3369
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3370

    
3371

    
3372
class LUReplaceDisks(LogicalUnit):
3373
  """Replace the disks of an instance.
3374

3375
  """
3376
  HPATH = "mirrors-replace"
3377
  HTYPE = constants.HTYPE_INSTANCE
3378
  _OP_REQP = ["instance_name", "mode", "disks"]
3379

    
3380
  def _RunAllocator(self):
3381
    """Compute a new secondary node using an IAllocator.
3382

3383
    """
3384
    ial = IAllocator(self.cfg, self.sstore,
3385
                     mode=constants.IALLOCATOR_MODE_RELOC,
3386
                     name=self.op.instance_name,
3387
                     relocate_from=[self.sec_node])
3388

    
3389
    ial.Run(self.op.iallocator)
3390

    
3391
    if not ial.success:
3392
      raise errors.OpPrereqError("Can't compute nodes using"
3393
                                 " iallocator '%s': %s" % (self.op.iallocator,
3394
                                                           ial.info))
3395
    if len(ial.nodes) != ial.required_nodes:
3396
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3397
                                 " of nodes (%s), required %s" %
3398
                                 (len(ial.nodes), ial.required_nodes))
3399
    self.op.remote_node = ial.nodes[0]
3400
    logger.ToStdout("Selected new secondary for the instance: %s" %
3401
                    self.op.remote_node)
3402

    
3403
  def BuildHooksEnv(self):
3404
    """Build hooks env.
3405

3406
    This runs on the master, the primary and all the secondaries.
3407

3408
    """
3409
    env = {
3410
      "MODE": self.op.mode,
3411
      "NEW_SECONDARY": self.op.remote_node,
3412
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3413
      }
3414
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3415
    nl = [
3416
      self.sstore.GetMasterNode(),
3417
      self.instance.primary_node,
3418
      ]
3419
    if self.op.remote_node is not None:
3420
      nl.append(self.op.remote_node)
3421
    return env, nl, nl
3422

    
3423
  def CheckPrereq(self):
3424
    """Check prerequisites.
3425

3426
    This checks that the instance is in the cluster.
3427

3428
    """
3429
    if not hasattr(self.op, "remote_node"):
3430
      self.op.remote_node = None
3431

    
3432
    instance = self.cfg.GetInstanceInfo(
3433
      self.cfg.ExpandInstanceName(self.op.instance_name))
3434
    if instance is None:
3435
      raise errors.OpPrereqError("Instance '%s' not known" %
3436
                                 self.op.instance_name)
3437
    self.instance = instance
3438
    self.op.instance_name = instance.name
3439

    
3440
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3441
      raise errors.OpPrereqError("Instance's disk layout is not"
3442
                                 " network mirrored.")
3443

    
3444
    if len(instance.secondary_nodes) != 1:
3445
      raise errors.OpPrereqError("The instance has a strange layout,"
3446
                                 " expected one secondary but found %d" %
3447
                                 len(instance.secondary_nodes))
3448

    
3449
    self.sec_node = instance.secondary_nodes[0]
3450

    
3451
    ia_name = getattr(self.op, "iallocator", None)
3452
    if ia_name is not None:
3453
      if self.op.remote_node is not None:
3454
        raise errors.OpPrereqError("Give either the iallocator or the new"
3455
                                   " secondary, not both")
3456
      self.op.remote_node = self._RunAllocator()
3457

    
3458
    remote_node = self.op.remote_node
3459
    if remote_node is not None:
3460
      remote_node = self.cfg.ExpandNodeName(remote_node)
3461
      if remote_node is None:
3462
        raise errors.OpPrereqError("Node '%s' not known" %
3463
                                   self.op.remote_node)
3464
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3465
    else:
3466
      self.remote_node_info = None
3467
    if remote_node == instance.primary_node:
3468
      raise errors.OpPrereqError("The specified node is the primary node of"
3469
                                 " the instance.")
3470
    elif remote_node == self.sec_node:
3471
      if self.op.mode == constants.REPLACE_DISK_SEC:
3472
        # this is for DRBD8, where we can't execute the same mode of
3473
        # replacement as for drbd7 (no different port allocated)
3474
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3475
                                   " replacement")
3476
    if instance.disk_template == constants.DT_DRBD8:
3477
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3478
          remote_node is not None):
3479
        # switch to replace secondary mode
3480
        self.op.mode = constants.REPLACE_DISK_SEC
3481

    
3482
      if self.op.mode == constants.REPLACE_DISK_ALL:
3483
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3484
                                   " secondary disk replacement, not"
3485
                                   " both at once")
3486
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3487
        if remote_node is not None:
3488
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3489
                                     " the secondary while doing a primary"
3490
                                     " node disk replacement")
3491
        self.tgt_node = instance.primary_node
3492
        self.oth_node = instance.secondary_nodes[0]
3493
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3494
        self.new_node = remote_node # this can be None, in which case
3495
                                    # we don't change the secondary
3496
        self.tgt_node = instance.secondary_nodes[0]
3497
        self.oth_node = instance.primary_node
3498
      else:
3499
        raise errors.ProgrammerError("Unhandled disk replace mode")
3500

    
3501
    for name in self.op.disks:
3502
      if instance.FindDisk(name) is None:
3503
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3504
                                   (name, instance.name))
3505
    self.op.remote_node = remote_node
3506

    
3507
  def _ExecD8DiskOnly(self, feedback_fn):
3508
    """Replace a disk on the primary or secondary for dbrd8.
3509

3510
    The algorithm for replace is quite complicated:
3511
      - for each disk to be replaced:
3512
        - create new LVs on the target node with unique names
3513
        - detach old LVs from the drbd device
3514
        - rename old LVs to name_replaced.<time_t>
3515
        - rename new LVs to old LVs
3516
        - attach the new LVs (with the old names now) to the drbd device
3517
      - wait for sync across all devices
3518
      - for each modified disk:
3519
        - remove old LVs (which have the name name_replaces.<time_t>)
3520

3521
    Failures are not very well handled.
3522

3523
    """
3524
    steps_total = 6
3525
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3526
    instance = self.instance
3527
    iv_names = {}
3528
    vgname = self.cfg.GetVGName()
3529
    # start of work
3530
    cfg = self.cfg
3531
    tgt_node = self.tgt_node
3532
    oth_node = self.oth_node
3533

    
3534
    # Step: check device activation
3535
    self.proc.LogStep(1, steps_total, "check device existence")
3536
    info("checking volume groups")
3537
    my_vg = cfg.GetVGName()
3538
    results = rpc.call_vg_list([oth_node, tgt_node])
3539
    if not results:
3540
      raise errors.OpExecError("Can't list volume groups on the nodes")
3541
    for node in oth_node, tgt_node:
3542
      res = results.get(node, False)
3543
      if not res or my_vg not in res:
3544
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3545
                                 (my_vg, node))
3546
    for dev in instance.disks:
3547
      if not dev.iv_name in self.op.disks:
3548
        continue
3549
      for node in tgt_node, oth_node:
3550
        info("checking %s on %s" % (dev.iv_name, node))
3551
        cfg.SetDiskID(dev, node)
3552
        if not rpc.call_blockdev_find(node, dev):
3553
          raise errors.OpExecError("Can't find device %s on node %s" %
3554
                                   (dev.iv_name, node))
3555

    
3556
    # Step: check other node consistency
3557
    self.proc.LogStep(2, steps_total, "check peer consistency")
3558
    for dev in instance.disks:
3559
      if not dev.iv_name in self.op.disks:
3560
        continue
3561
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3562
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3563
                                   oth_node==instance.primary_node):
3564
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3565
                                 " to replace disks on this node (%s)" %
3566
                                 (oth_node, tgt_node))
3567

    
3568
    # Step: create new storage
3569
    self.proc.LogStep(3, steps_total, "allocate new storage")
3570
    for dev in instance.disks:
3571
      if not dev.iv_name in self.op.disks:
3572
        continue
3573
      size = dev.size
3574
      cfg.SetDiskID(dev, tgt_node)
3575
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3576
      names = _GenerateUniqueNames(cfg, lv_names)
3577
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3578
                             logical_id=(vgname, names[0]))
3579
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3580
                             logical_id=(vgname, names[1]))
3581
      new_lvs = [lv_data, lv_meta]
3582
      old_lvs = dev.children
3583
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3584
      info("creating new local storage on %s for %s" %
3585
           (tgt_node, dev.iv_name))
3586
      # since we *always* want to create this LV, we use the
3587
      # _Create...OnPrimary (which forces the creation), even if we
3588
      # are talking about the secondary node
3589
      for new_lv in new_lvs:
3590
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3591
                                        _GetInstanceInfoText(instance)):
3592
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3593
                                   " node '%s'" %
3594
                                   (new_lv.logical_id[1], tgt_node))
3595

    
3596
    # Step: for each lv, detach+rename*2+attach
3597
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3598
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3599
      info("detaching %s drbd from local storage" % dev.iv_name)
3600
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3601
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3602
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3603
      #dev.children = []
3604
      #cfg.Update(instance)
3605

    
3606
      # ok, we created the new LVs, so now we know we have the needed
3607
      # storage; as such, we proceed on the target node to rename
3608
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3609
      # using the assumption that logical_id == physical_id (which in
3610
      # turn is the unique_id on that node)
3611

    
3612
      # FIXME(iustin): use a better name for the replaced LVs
3613
      temp_suffix = int(time.time())
3614
      ren_fn = lambda d, suff: (d.physical_id[0],
3615
                                d.physical_id[1] + "_replaced-%s" % suff)
3616
      # build the rename list based on what LVs exist on the node
3617
      rlist = []
3618
      for to_ren in old_lvs:
3619
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3620
        if find_res is not None: # device exists
3621
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3622

    
3623
      info("renaming the old LVs on the target node")
3624
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3625
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3626
      # now we rename the new LVs to the old LVs
3627
      info("renaming the new LVs on the target node")
3628
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3629
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3630
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3631

    
3632
      for old, new in zip(old_lvs, new_lvs):
3633
        new.logical_id = old.logical_id
3634
        cfg.SetDiskID(new, tgt_node)
3635

    
3636
      for disk in old_lvs:
3637
        disk.logical_id = ren_fn(disk, temp_suffix)
3638
        cfg.SetDiskID(disk, tgt_node)
3639

    
3640
      # now that the new lvs have the old name, we can add them to the device
3641
      info("adding new mirror component on %s" % tgt_node)
3642
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3643
        for new_lv in new_lvs:
3644
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3645
            warning("Can't rollback device %s", hint="manually cleanup unused"
3646
                    " logical volumes")
3647
        raise errors.OpExecError("Can't add local storage to drbd")
3648

    
3649
      dev.children = new_lvs
3650
      cfg.Update(instance)
3651

    
3652
    # Step: wait for sync
3653

    
3654
    # this can fail as the old devices are degraded and _WaitForSync
3655
    # does a combined result over all disks, so we don't check its
3656
    # return value
3657
    self.proc.LogStep(5, steps_total, "sync devices")
3658
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3659

    
3660
    # so check manually all the devices
3661
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3662
      cfg.SetDiskID(dev, instance.primary_node)
3663
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3664
      if is_degr:
3665
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3666

    
3667
    # Step: remove old storage
3668
    self.proc.LogStep(6, steps_total, "removing old storage")
3669
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3670
      info("remove logical volumes for %s" % name)
3671
      for lv in old_lvs:
3672
        cfg.SetDiskID(lv, tgt_node)
3673
        if not rpc.call_blockdev_remove(tgt_node, lv):
3674
          warning("Can't remove old LV", hint="manually remove unused LVs")
3675
          continue
3676

    
3677
  def _ExecD8Secondary(self, feedback_fn):
3678
    """Replace the secondary node for drbd8.
3679

3680
    The algorithm for replace is quite complicated:
3681
      - for all disks of the instance:
3682
        - create new LVs on the new node with same names
3683
        - shutdown the drbd device on the old secondary
3684
        - disconnect the drbd network on the primary
3685
        - create the drbd device on the new secondary
3686
        - network attach the drbd on the primary, using an artifice:
3687
          the drbd code for Attach() will connect to the network if it
3688
          finds a device which is connected to the good local disks but
3689
          not network enabled
3690
      - wait for sync across all devices
3691
      - remove all disks from the old secondary
3692

3693
    Failures are not very well handled.
3694

3695
    """
3696
    steps_total = 6
3697
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3698
    instance = self.instance
3699
    iv_names = {}
3700
    vgname = self.cfg.GetVGName()
3701
    # start of work
3702
    cfg = self.cfg
3703
    old_node = self.tgt_node
3704
    new_node = self.new_node
3705
    pri_node = instance.primary_node
3706

    
3707
    # Step: check device activation
3708
    self.proc.LogStep(1, steps_total, "check device existence")
3709
    info("checking volume groups")
3710
    my_vg = cfg.GetVGName()
3711
    results = rpc.call_vg_list([pri_node, new_node])
3712
    if not results:
3713
      raise errors.OpExecError("Can't list volume groups on the nodes")
3714
    for node in pri_node, new_node:
3715
      res = results.get(node, False)
3716
      if not res or my_vg not in res:
3717
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3718
                                 (my_vg, node))
3719
    for dev in instance.disks:
3720
      if not dev.iv_name in self.op.disks:
3721
        continue
3722
      info("checking %s on %s" % (dev.iv_name, pri_node))
3723
      cfg.SetDiskID(dev, pri_node)
3724
      if not rpc.call_blockdev_find(pri_node, dev):
3725
        raise errors.OpExecError("Can't find device %s on node %s" %
3726
                                 (dev.iv_name, pri_node))
3727

    
3728
    # Step: check other node consistency
3729
    self.proc.LogStep(2, steps_total, "check peer consistency")
3730
    for dev in instance.disks:
3731
      if not dev.iv_name in self.op.disks:
3732
        continue
3733
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3734
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3735
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3736
                                 " unsafe to replace the secondary" %
3737
                                 pri_node)
3738

    
3739
    # Step: create new storage
3740
    self.proc.LogStep(3, steps_total, "allocate new storage")
3741
    for dev in instance.disks:
3742
      size = dev.size
3743
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3744
      # since we *always* want to create this LV, we use the
3745
      # _Create...OnPrimary (which forces the creation), even if we
3746
      # are talking about the secondary node
3747
      for new_lv in dev.children:
3748
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3749
                                        _GetInstanceInfoText(instance)):
3750
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3751
                                   " node '%s'" %
3752
                                   (new_lv.logical_id[1], new_node))
3753

    
3754
      iv_names[dev.iv_name] = (dev, dev.children)
3755

    
3756
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3757
    for dev in instance.disks:
3758
      size = dev.size
3759
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3760
      # create new devices on new_node
3761
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3762
                              logical_id=(pri_node, new_node,
3763
                                          dev.logical_id[2]),
3764
                              children=dev.children)
3765
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3766
                                        new_drbd, False,
3767
                                      _GetInstanceInfoText(instance)):
3768
        raise errors.OpExecError("Failed to create new DRBD on"
3769
                                 " node '%s'" % new_node)
3770

    
3771
    for dev in instance.disks:
3772
      # we have new devices, shutdown the drbd on the old secondary
3773
      info("shutting down drbd for %s on old node" % dev.iv_name)
3774
      cfg.SetDiskID(dev, old_node)
3775
      if not rpc.call_blockdev_shutdown(old_node, dev):
3776
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3777
                hint="Please cleanup this device manually as soon as possible")
3778

    
3779
    info("detaching primary drbds from the network (=> standalone)")
3780
    done = 0
3781
    for dev in instance.disks:
3782
      cfg.SetDiskID(dev, pri_node)
3783
      # set the physical (unique in bdev terms) id to None, meaning
3784
      # detach from network
3785
      dev.physical_id = (None,) * len(dev.physical_id)
3786
      # and 'find' the device, which will 'fix' it to match the
3787
      # standalone state
3788
      if rpc.call_blockdev_find(pri_node, dev):
3789
        done += 1
3790
      else:
3791
        warning("Failed to detach drbd %s from network, unusual case" %
3792
                dev.iv_name)
3793

    
3794
    if not done:
3795
      # no detaches succeeded (very unlikely)
3796
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3797

    
3798
    # if we managed to detach at least one, we update all the disks of
3799
    # the instance to point to the new secondary
3800
    info("updating instance configuration")
3801
    for dev in instance.disks:
3802
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3803
      cfg.SetDiskID(dev, pri_node)
3804
    cfg.Update(instance)
3805

    
3806
    # and now perform the drbd attach
3807
    info("attaching primary drbds to new secondary (standalone => connected)")
3808
    failures = []
3809
    for dev in instance.disks:
3810
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3811
      # since the attach is smart, it's enough to 'find' the device,
3812
      # it will automatically activate the network, if the physical_id
3813
      # is correct
3814
      cfg.SetDiskID(dev, pri_node)
3815
      if not rpc.call_blockdev_find(pri_node, dev):
3816
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3817
                "please do a gnt-instance info to see the status of disks")
3818

    
3819
    # this can fail as the old devices are degraded and _WaitForSync
3820
    # does a combined result over all disks, so we don't check its
3821
    # return value
3822
    self.proc.LogStep(5, steps_total, "sync devices")
3823
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3824

    
3825
    # so check manually all the devices
3826
    for name, (dev, old_lvs) in iv_names.iteritems():
3827
      cfg.SetDiskID(dev, pri_node)
3828
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3829
      if is_degr:
3830
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3831

    
3832
    self.proc.LogStep(6, steps_total, "removing old storage")
3833
    for name, (dev, old_lvs) in iv_names.iteritems():
3834
      info("remove logical volumes for %s" % name)
3835
      for lv in old_lvs:
3836
        cfg.SetDiskID(lv, old_node)
3837
        if not rpc.call_blockdev_remove(old_node, lv):
3838
          warning("Can't remove LV on old secondary",
3839
                  hint="Cleanup stale volumes by hand")
3840

    
3841
  def Exec(self, feedback_fn):
3842
    """Execute disk replacement.
3843

3844
    This dispatches the disk replacement to the appropriate handler.
3845

3846
    """
3847
    instance = self.instance
3848

    
3849
    # Activate the instance disks if we're replacing them on a down instance
3850
    if instance.status == "down":
3851
      op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3852
      self.proc.ChainOpCode(op)
3853

    
3854
    if instance.disk_template == constants.DT_DRBD8:
3855
      if self.op.remote_node is None:
3856
        fn = self._ExecD8DiskOnly
3857
      else:
3858
        fn = self._ExecD8Secondary
3859
    else:
3860
      raise errors.ProgrammerError("Unhandled disk replacement case")
3861

    
3862
    ret = fn(feedback_fn)
3863

    
3864
    # Deactivate the instance disks if we're replacing them on a down instance
3865
    if instance.status == "down":
3866
      op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3867
      self.proc.ChainOpCode(op)
3868

    
3869
    return ret
3870

    
3871

    
3872
class LUGrowDisk(LogicalUnit):
3873
  """Grow a disk of an instance.
3874

3875
  """
3876
  HPATH = "disk-grow"
3877
  HTYPE = constants.HTYPE_INSTANCE
3878
  _OP_REQP = ["instance_name", "disk", "amount"]
3879

    
3880
  def BuildHooksEnv(self):
3881
    """Build hooks env.
3882

3883
    This runs on the master, the primary and all the secondaries.
3884

3885
    """
3886
    env = {
3887
      "DISK": self.op.disk,
3888
      "AMOUNT": self.op.amount,
3889
      }
3890
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3891
    nl = [
3892
      self.sstore.GetMasterNode(),
3893
      self.instance.primary_node,
3894
      ]
3895
    return env, nl, nl
3896

    
3897
  def CheckPrereq(self):
3898
    """Check prerequisites.
3899

3900
    This checks that the instance is in the cluster.
3901

3902
    """
3903
    instance = self.cfg.GetInstanceInfo(
3904
      self.cfg.ExpandInstanceName(self.op.instance_name))
3905
    if instance is None:
3906
      raise errors.OpPrereqError("Instance '%s' not known" %
3907
                                 self.op.instance_name)
3908
    self.instance = instance
3909
    self.op.instance_name = instance.name
3910

    
3911
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3912
      raise errors.OpPrereqError("Instance's disk layout does not support"
3913
                                 " growing.")
3914

    
3915
    if instance.FindDisk(self.op.disk) is None:
3916
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3917
                                 (self.op.disk, instance.name))
3918

    
3919
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3920
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3921
    for node in nodenames:
3922
      info = nodeinfo.get(node, None)
3923
      if not info:
3924
        raise errors.OpPrereqError("Cannot get current information"
3925
                                   " from node '%s'" % node)
3926
      vg_free = info.get('vg_free', None)
3927
      if not isinstance(vg_free, int):
3928
        raise errors.OpPrereqError("Can't compute free disk space on"
3929
                                   " node %s" % node)
3930
      if self.op.amount > info['vg_free']:
3931
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
3932
                                   " %d MiB available, %d MiB required" %
3933
                                   (node, info['vg_free'], self.op.amount))
3934

    
3935
  def Exec(self, feedback_fn):
3936
    """Execute disk grow.
3937

3938
    """
3939
    instance = self.instance
3940
    disk = instance.FindDisk(self.op.disk)
3941
    for node in (instance.secondary_nodes + (instance.primary_node,)):
3942
      self.cfg.SetDiskID(disk, node)
3943
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3944
      if not result or not isinstance(result, tuple) or len(result) != 2:
3945
        raise errors.OpExecError("grow request failed to node %s" % node)
3946
      elif not result[0]:
3947
        raise errors.OpExecError("grow request failed to node %s: %s" %
3948
                                 (node, result[1]))
3949
    disk.RecordGrow(self.op.amount)
3950
    self.cfg.Update(instance)
3951
    return
3952

    
3953

    
3954
class LUQueryInstanceData(NoHooksLU):
3955
  """Query runtime instance data.
3956

3957
  """
3958
  _OP_REQP = ["instances"]