Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 77b657a3

History | View | Annotate | Download (167.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement CheckPrereq which also fills in the opcode instance
53
      with all the fields (even if as None)
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_MASTER: the LU needs to run on the master node
59
        REQ_WSSTORE: the LU needs a writable SimpleStore
60
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61

62
  Note that all commands require root permissions.
63

64
  """
65
  HPATH = None
66
  HTYPE = None
67
  _OP_REQP = []
68
  REQ_MASTER = True
69
  REQ_WSSTORE = False
70
  REQ_BGL = True
71

    
72
  def __init__(self, processor, op, context, sstore):
73
    """Constructor for LogicalUnit.
74

75
    This needs to be overriden in derived classes in order to check op
76
    validity.
77

78
    """
79
    self.proc = processor
80
    self.op = op
81
    self.cfg = context.cfg
82
    self.sstore = sstore
83
    self.context = context
84
    self.__ssh = None
85

    
86
    for attr_name in self._OP_REQP:
87
      attr_val = getattr(op, attr_name, None)
88
      if attr_val is None:
89
        raise errors.OpPrereqError("Required parameter '%s' missing" %
90
                                   attr_name)
91

    
92
    if not cfg.IsCluster():
93
      raise errors.OpPrereqError("Cluster not initialized yet,"
94
                                 " use 'gnt-cluster init' first.")
95
    if self.REQ_MASTER:
96
      master = sstore.GetMasterNode()
97
      if master != utils.HostInfo().name:
98
        raise errors.OpPrereqError("Commands must be run on the master"
99
                                   " node %s" % master)
100

    
101
  def __GetSSH(self):
102
    """Returns the SshRunner object
103

104
    """
105
    if not self.__ssh:
106
      self.__ssh = ssh.SshRunner(self.sstore)
107
    return self.__ssh
108

    
109
  ssh = property(fget=__GetSSH)
110

    
111
  def CheckPrereq(self):
112
    """Check prerequisites for this LU.
113

114
    This method should check that the prerequisites for the execution
115
    of this LU are fulfilled. It can do internode communication, but
116
    it should be idempotent - no cluster or system changes are
117
    allowed.
118

119
    The method should raise errors.OpPrereqError in case something is
120
    not fulfilled. Its return value is ignored.
121

122
    This method should also update all the parameters of the opcode to
123
    their canonical form; e.g. a short node name must be fully
124
    expanded after this method has successfully completed (so that
125
    hooks, logging, etc. work correctly).
126

127
    """
128
    raise NotImplementedError
129

    
130
  def Exec(self, feedback_fn):
131
    """Execute the LU.
132

133
    This method should implement the actual work. It should raise
134
    errors.OpExecError for failures that are somewhat dealt with in
135
    code, or expected.
136

137
    """
138
    raise NotImplementedError
139

    
140
  def BuildHooksEnv(self):
141
    """Build hooks environment for this LU.
142

143
    This method should return a three-node tuple consisting of: a dict
144
    containing the environment that will be used for running the
145
    specific hook for this LU, a list of node names on which the hook
146
    should run before the execution, and a list of node names on which
147
    the hook should run after the execution.
148

149
    The keys of the dict must not have 'GANETI_' prefixed as this will
150
    be handled in the hooks runner. Also note additional keys will be
151
    added by the hooks runner. If the LU doesn't define any
152
    environment, an empty dict (and not None) should be returned.
153

154
    No nodes should be returned as an empty list (and not None).
155

156
    Note that if the HPATH for a LU class is None, this function will
157
    not be called.
158

159
    """
160
    raise NotImplementedError
161

    
162
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
163
    """Notify the LU about the results of its hooks.
164

165
    This method is called every time a hooks phase is executed, and notifies
166
    the Logical Unit about the hooks' result. The LU can then use it to alter
167
    its result based on the hooks.  By default the method does nothing and the
168
    previous result is passed back unchanged but any LU can define it if it
169
    wants to use the local cluster hook-scripts somehow.
170

171
    Args:
172
      phase: the hooks phase that has just been run
173
      hooks_results: the results of the multi-node hooks rpc call
174
      feedback_fn: function to send feedback back to the caller
175
      lu_result: the previous result this LU had, or None in the PRE phase.
176

177
    """
178
    return lu_result
179

    
180

    
181
class NoHooksLU(LogicalUnit):
182
  """Simple LU which runs no hooks.
183

184
  This LU is intended as a parent for other LogicalUnits which will
185
  run no hooks, in order to reduce duplicate code.
186

187
  """
188
  HPATH = None
189
  HTYPE = None
190

    
191

    
192
def _GetWantedNodes(lu, nodes):
193
  """Returns list of checked and expanded node names.
194

195
  Args:
196
    nodes: List of nodes (strings) or None for all
197

198
  """
199
  if not isinstance(nodes, list):
200
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
201

    
202
  if nodes:
203
    wanted = []
204

    
205
    for name in nodes:
206
      node = lu.cfg.ExpandNodeName(name)
207
      if node is None:
208
        raise errors.OpPrereqError("No such node name '%s'" % name)
209
      wanted.append(node)
210

    
211
  else:
212
    wanted = lu.cfg.GetNodeList()
213
  return utils.NiceSort(wanted)
214

    
215

    
216
def _GetWantedInstances(lu, instances):
217
  """Returns list of checked and expanded instance names.
218

219
  Args:
220
    instances: List of instances (strings) or None for all
221

222
  """
223
  if not isinstance(instances, list):
224
    raise errors.OpPrereqError("Invalid argument type 'instances'")
225

    
226
  if instances:
227
    wanted = []
228

    
229
    for name in instances:
230
      instance = lu.cfg.ExpandInstanceName(name)
231
      if instance is None:
232
        raise errors.OpPrereqError("No such instance name '%s'" % name)
233
      wanted.append(instance)
234

    
235
  else:
236
    wanted = lu.cfg.GetInstanceList()
237
  return utils.NiceSort(wanted)
238

    
239

    
240
def _CheckOutputFields(static, dynamic, selected):
241
  """Checks whether all selected fields are valid.
242

243
  Args:
244
    static: Static fields
245
    dynamic: Dynamic fields
246

247
  """
248
  static_fields = frozenset(static)
249
  dynamic_fields = frozenset(dynamic)
250

    
251
  all_fields = static_fields | dynamic_fields
252

    
253
  if not all_fields.issuperset(selected):
254
    raise errors.OpPrereqError("Unknown output fields selected: %s"
255
                               % ",".join(frozenset(selected).
256
                                          difference(all_fields)))
257

    
258

    
259
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
260
                          memory, vcpus, nics):
261
  """Builds instance related env variables for hooks from single variables.
262

263
  Args:
264
    secondary_nodes: List of secondary nodes as strings
265
  """
266
  env = {
267
    "OP_TARGET": name,
268
    "INSTANCE_NAME": name,
269
    "INSTANCE_PRIMARY": primary_node,
270
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
271
    "INSTANCE_OS_TYPE": os_type,
272
    "INSTANCE_STATUS": status,
273
    "INSTANCE_MEMORY": memory,
274
    "INSTANCE_VCPUS": vcpus,
275
  }
276

    
277
  if nics:
278
    nic_count = len(nics)
279
    for idx, (ip, bridge, mac) in enumerate(nics):
280
      if ip is None:
281
        ip = ""
282
      env["INSTANCE_NIC%d_IP" % idx] = ip
283
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
284
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
285
  else:
286
    nic_count = 0
287

    
288
  env["INSTANCE_NIC_COUNT"] = nic_count
289

    
290
  return env
291

    
292

    
293
def _BuildInstanceHookEnvByObject(instance, override=None):
294
  """Builds instance related env variables for hooks from an object.
295

296
  Args:
297
    instance: objects.Instance object of instance
298
    override: dict of values to override
299
  """
300
  args = {
301
    'name': instance.name,
302
    'primary_node': instance.primary_node,
303
    'secondary_nodes': instance.secondary_nodes,
304
    'os_type': instance.os,
305
    'status': instance.os,
306
    'memory': instance.memory,
307
    'vcpus': instance.vcpus,
308
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
309
  }
310
  if override:
311
    args.update(override)
312
  return _BuildInstanceHookEnv(**args)
313

    
314

    
315
def _CheckInstanceBridgesExist(instance):
316
  """Check that the brigdes needed by an instance exist.
317

318
  """
319
  # check bridges existance
320
  brlist = [nic.bridge for nic in instance.nics]
321
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
322
    raise errors.OpPrereqError("one or more target bridges %s does not"
323
                               " exist on destination node '%s'" %
324
                               (brlist, instance.primary_node))
325

    
326

    
327
class LUDestroyCluster(NoHooksLU):
328
  """Logical unit for destroying the cluster.
329

330
  """
331
  _OP_REQP = []
332

    
333
  def CheckPrereq(self):
334
    """Check prerequisites.
335

336
    This checks whether the cluster is empty.
337

338
    Any errors are signalled by raising errors.OpPrereqError.
339

340
    """
341
    master = self.sstore.GetMasterNode()
342

    
343
    nodelist = self.cfg.GetNodeList()
344
    if len(nodelist) != 1 or nodelist[0] != master:
345
      raise errors.OpPrereqError("There are still %d node(s) in"
346
                                 " this cluster." % (len(nodelist) - 1))
347
    instancelist = self.cfg.GetInstanceList()
348
    if instancelist:
349
      raise errors.OpPrereqError("There are still %d instance(s) in"
350
                                 " this cluster." % len(instancelist))
351

    
352
  def Exec(self, feedback_fn):
353
    """Destroys the cluster.
354

355
    """
356
    master = self.sstore.GetMasterNode()
357
    if not rpc.call_node_stop_master(master):
358
      raise errors.OpExecError("Could not disable the master role")
359
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
360
    utils.CreateBackup(priv_key)
361
    utils.CreateBackup(pub_key)
362
    rpc.call_node_leave_cluster(master)
363

    
364

    
365
class LUVerifyCluster(LogicalUnit):
366
  """Verifies the cluster status.
367

368
  """
369
  HPATH = "cluster-verify"
370
  HTYPE = constants.HTYPE_CLUSTER
371
  _OP_REQP = ["skip_checks"]
372

    
373
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
374
                  remote_version, feedback_fn):
375
    """Run multiple tests against a node.
376

377
    Test list:
378
      - compares ganeti version
379
      - checks vg existance and size > 20G
380
      - checks config file checksum
381
      - checks ssh to other nodes
382

383
    Args:
384
      node: name of the node to check
385
      file_list: required list of files
386
      local_cksum: dictionary of local files and their checksums
387

388
    """
389
    # compares ganeti version
390
    local_version = constants.PROTOCOL_VERSION
391
    if not remote_version:
392
      feedback_fn("  - ERROR: connection to %s failed" % (node))
393
      return True
394

    
395
    if local_version != remote_version:
396
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
397
                      (local_version, node, remote_version))
398
      return True
399

    
400
    # checks vg existance and size > 20G
401

    
402
    bad = False
403
    if not vglist:
404
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
405
                      (node,))
406
      bad = True
407
    else:
408
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
409
                                            constants.MIN_VG_SIZE)
410
      if vgstatus:
411
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
412
        bad = True
413

    
414
    # checks config file checksum
415
    # checks ssh to any
416

    
417
    if 'filelist' not in node_result:
418
      bad = True
419
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
420
    else:
421
      remote_cksum = node_result['filelist']
422
      for file_name in file_list:
423
        if file_name not in remote_cksum:
424
          bad = True
425
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
426
        elif remote_cksum[file_name] != local_cksum[file_name]:
427
          bad = True
428
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
429

    
430
    if 'nodelist' not in node_result:
431
      bad = True
432
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
433
    else:
434
      if node_result['nodelist']:
435
        bad = True
436
        for node in node_result['nodelist']:
437
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
438
                          (node, node_result['nodelist'][node]))
439
    if 'node-net-test' not in node_result:
440
      bad = True
441
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
442
    else:
443
      if node_result['node-net-test']:
444
        bad = True
445
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
446
        for node in nlist:
447
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
448
                          (node, node_result['node-net-test'][node]))
449

    
450
    hyp_result = node_result.get('hypervisor', None)
451
    if hyp_result is not None:
452
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
453
    return bad
454

    
455
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
456
                      node_instance, feedback_fn):
457
    """Verify an instance.
458

459
    This function checks to see if the required block devices are
460
    available on the instance's node.
461

462
    """
463
    bad = False
464

    
465
    node_current = instanceconfig.primary_node
466

    
467
    node_vol_should = {}
468
    instanceconfig.MapLVsByNode(node_vol_should)
469

    
470
    for node in node_vol_should:
471
      for volume in node_vol_should[node]:
472
        if node not in node_vol_is or volume not in node_vol_is[node]:
473
          feedback_fn("  - ERROR: volume %s missing on node %s" %
474
                          (volume, node))
475
          bad = True
476

    
477
    if not instanceconfig.status == 'down':
478
      if (node_current not in node_instance or
479
          not instance in node_instance[node_current]):
480
        feedback_fn("  - ERROR: instance %s not running on node %s" %
481
                        (instance, node_current))
482
        bad = True
483

    
484
    for node in node_instance:
485
      if (not node == node_current):
486
        if instance in node_instance[node]:
487
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
488
                          (instance, node))
489
          bad = True
490

    
491
    return bad
492

    
493
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
494
    """Verify if there are any unknown volumes in the cluster.
495

496
    The .os, .swap and backup volumes are ignored. All other volumes are
497
    reported as unknown.
498

499
    """
500
    bad = False
501

    
502
    for node in node_vol_is:
503
      for volume in node_vol_is[node]:
504
        if node not in node_vol_should or volume not in node_vol_should[node]:
505
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
506
                      (volume, node))
507
          bad = True
508
    return bad
509

    
510
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
511
    """Verify the list of running instances.
512

513
    This checks what instances are running but unknown to the cluster.
514

515
    """
516
    bad = False
517
    for node in node_instance:
518
      for runninginstance in node_instance[node]:
519
        if runninginstance not in instancelist:
520
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
521
                          (runninginstance, node))
522
          bad = True
523
    return bad
524

    
525
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
526
    """Verify N+1 Memory Resilience.
527

528
    Check that if one single node dies we can still start all the instances it
529
    was primary for.
530

531
    """
532
    bad = False
533

    
534
    for node, nodeinfo in node_info.iteritems():
535
      # This code checks that every node which is now listed as secondary has
536
      # enough memory to host all instances it is supposed to should a single
537
      # other node in the cluster fail.
538
      # FIXME: not ready for failover to an arbitrary node
539
      # FIXME: does not support file-backed instances
540
      # WARNING: we currently take into account down instances as well as up
541
      # ones, considering that even if they're down someone might want to start
542
      # them even in the event of a node failure.
543
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
544
        needed_mem = 0
545
        for instance in instances:
546
          needed_mem += instance_cfg[instance].memory
547
        if nodeinfo['mfree'] < needed_mem:
548
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
549
                      " failovers should node %s fail" % (node, prinode))
550
          bad = True
551
    return bad
552

    
553
  def CheckPrereq(self):
554
    """Check prerequisites.
555

556
    Transform the list of checks we're going to skip into a set and check that
557
    all its members are valid.
558

559
    """
560
    self.skip_set = frozenset(self.op.skip_checks)
561
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
562
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
563

    
564
  def BuildHooksEnv(self):
565
    """Build hooks env.
566

567
    Cluster-Verify hooks just rone in the post phase and their failure makes
568
    the output be logged in the verify output and the verification to fail.
569

570
    """
571
    all_nodes = self.cfg.GetNodeList()
572
    # TODO: populate the environment with useful information for verify hooks
573
    env = {}
574
    return env, [], all_nodes
575

    
576
  def Exec(self, feedback_fn):
577
    """Verify integrity of cluster, performing various test on nodes.
578

579
    """
580
    bad = False
581
    feedback_fn("* Verifying global settings")
582
    for msg in self.cfg.VerifyConfig():
583
      feedback_fn("  - ERROR: %s" % msg)
584

    
585
    vg_name = self.cfg.GetVGName()
586
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
587
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
588
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
589
    i_non_redundant = [] # Non redundant instances
590
    node_volume = {}
591
    node_instance = {}
592
    node_info = {}
593
    instance_cfg = {}
594

    
595
    # FIXME: verify OS list
596
    # do local checksums
597
    file_names = list(self.sstore.GetFileList())
598
    file_names.append(constants.SSL_CERT_FILE)
599
    file_names.append(constants.CLUSTER_CONF_FILE)
600
    local_checksums = utils.FingerprintFiles(file_names)
601

    
602
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
603
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
604
    all_instanceinfo = rpc.call_instance_list(nodelist)
605
    all_vglist = rpc.call_vg_list(nodelist)
606
    node_verify_param = {
607
      'filelist': file_names,
608
      'nodelist': nodelist,
609
      'hypervisor': None,
610
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
611
                        for node in nodeinfo]
612
      }
613
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
614
    all_rversion = rpc.call_version(nodelist)
615
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
616

    
617
    for node in nodelist:
618
      feedback_fn("* Verifying node %s" % node)
619
      result = self._VerifyNode(node, file_names, local_checksums,
620
                                all_vglist[node], all_nvinfo[node],
621
                                all_rversion[node], feedback_fn)
622
      bad = bad or result
623

    
624
      # node_volume
625
      volumeinfo = all_volumeinfo[node]
626

    
627
      if isinstance(volumeinfo, basestring):
628
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
629
                    (node, volumeinfo[-400:].encode('string_escape')))
630
        bad = True
631
        node_volume[node] = {}
632
      elif not isinstance(volumeinfo, dict):
633
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
634
        bad = True
635
        continue
636
      else:
637
        node_volume[node] = volumeinfo
638

    
639
      # node_instance
640
      nodeinstance = all_instanceinfo[node]
641
      if type(nodeinstance) != list:
642
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
643
        bad = True
644
        continue
645

    
646
      node_instance[node] = nodeinstance
647

    
648
      # node_info
649
      nodeinfo = all_ninfo[node]
650
      if not isinstance(nodeinfo, dict):
651
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
652
        bad = True
653
        continue
654

    
655
      try:
656
        node_info[node] = {
657
          "mfree": int(nodeinfo['memory_free']),
658
          "dfree": int(nodeinfo['vg_free']),
659
          "pinst": [],
660
          "sinst": [],
661
          # dictionary holding all instances this node is secondary for,
662
          # grouped by their primary node. Each key is a cluster node, and each
663
          # value is a list of instances which have the key as primary and the
664
          # current node as secondary.  this is handy to calculate N+1 memory
665
          # availability if you can only failover from a primary to its
666
          # secondary.
667
          "sinst-by-pnode": {},
668
        }
669
      except ValueError:
670
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
671
        bad = True
672
        continue
673

    
674
    node_vol_should = {}
675

    
676
    for instance in instancelist:
677
      feedback_fn("* Verifying instance %s" % instance)
678
      inst_config = self.cfg.GetInstanceInfo(instance)
679
      result =  self._VerifyInstance(instance, inst_config, node_volume,
680
                                     node_instance, feedback_fn)
681
      bad = bad or result
682

    
683
      inst_config.MapLVsByNode(node_vol_should)
684

    
685
      instance_cfg[instance] = inst_config
686

    
687
      pnode = inst_config.primary_node
688
      if pnode in node_info:
689
        node_info[pnode]['pinst'].append(instance)
690
      else:
691
        feedback_fn("  - ERROR: instance %s, connection to primary node"
692
                    " %s failed" % (instance, pnode))
693
        bad = True
694

    
695
      # If the instance is non-redundant we cannot survive losing its primary
696
      # node, so we are not N+1 compliant. On the other hand we have no disk
697
      # templates with more than one secondary so that situation is not well
698
      # supported either.
699
      # FIXME: does not support file-backed instances
700
      if len(inst_config.secondary_nodes) == 0:
701
        i_non_redundant.append(instance)
702
      elif len(inst_config.secondary_nodes) > 1:
703
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
704
                    % instance)
705

    
706
      for snode in inst_config.secondary_nodes:
707
        if snode in node_info:
708
          node_info[snode]['sinst'].append(instance)
709
          if pnode not in node_info[snode]['sinst-by-pnode']:
710
            node_info[snode]['sinst-by-pnode'][pnode] = []
711
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
712
        else:
713
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
714
                      " %s failed" % (instance, snode))
715

    
716
    feedback_fn("* Verifying orphan volumes")
717
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
718
                                       feedback_fn)
719
    bad = bad or result
720

    
721
    feedback_fn("* Verifying remaining instances")
722
    result = self._VerifyOrphanInstances(instancelist, node_instance,
723
                                         feedback_fn)
724
    bad = bad or result
725

    
726
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
727
      feedback_fn("* Verifying N+1 Memory redundancy")
728
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
729
      bad = bad or result
730

    
731
    feedback_fn("* Other Notes")
732
    if i_non_redundant:
733
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
734
                  % len(i_non_redundant))
735

    
736
    return int(bad)
737

    
738
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
739
    """Analize the post-hooks' result, handle it, and send some
740
    nicely-formatted feedback back to the user.
741

742
    Args:
743
      phase: the hooks phase that has just been run
744
      hooks_results: the results of the multi-node hooks rpc call
745
      feedback_fn: function to send feedback back to the caller
746
      lu_result: previous Exec result
747

748
    """
749
    # We only really run POST phase hooks, and are only interested in their results
750
    if phase == constants.HOOKS_PHASE_POST:
751
      # Used to change hooks' output to proper indentation
752
      indent_re = re.compile('^', re.M)
753
      feedback_fn("* Hooks Results")
754
      if not hooks_results:
755
        feedback_fn("  - ERROR: general communication failure")
756
        lu_result = 1
757
      else:
758
        for node_name in hooks_results:
759
          show_node_header = True
760
          res = hooks_results[node_name]
761
          if res is False or not isinstance(res, list):
762
            feedback_fn("    Communication failure")
763
            lu_result = 1
764
            continue
765
          for script, hkr, output in res:
766
            if hkr == constants.HKR_FAIL:
767
              # The node header is only shown once, if there are
768
              # failing hooks on that node
769
              if show_node_header:
770
                feedback_fn("  Node %s:" % node_name)
771
                show_node_header = False
772
              feedback_fn("    ERROR: Script %s failed, output:" % script)
773
              output = indent_re.sub('      ', output)
774
              feedback_fn("%s" % output)
775
              lu_result = 1
776

    
777
      return lu_result
778

    
779

    
780
class LUVerifyDisks(NoHooksLU):
781
  """Verifies the cluster disks status.
782

783
  """
784
  _OP_REQP = []
785

    
786
  def CheckPrereq(self):
787
    """Check prerequisites.
788

789
    This has no prerequisites.
790

791
    """
792
    pass
793

    
794
  def Exec(self, feedback_fn):
795
    """Verify integrity of cluster disks.
796

797
    """
798
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
799

    
800
    vg_name = self.cfg.GetVGName()
801
    nodes = utils.NiceSort(self.cfg.GetNodeList())
802
    instances = [self.cfg.GetInstanceInfo(name)
803
                 for name in self.cfg.GetInstanceList()]
804

    
805
    nv_dict = {}
806
    for inst in instances:
807
      inst_lvs = {}
808
      if (inst.status != "up" or
809
          inst.disk_template not in constants.DTS_NET_MIRROR):
810
        continue
811
      inst.MapLVsByNode(inst_lvs)
812
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
813
      for node, vol_list in inst_lvs.iteritems():
814
        for vol in vol_list:
815
          nv_dict[(node, vol)] = inst
816

    
817
    if not nv_dict:
818
      return result
819

    
820
    node_lvs = rpc.call_volume_list(nodes, vg_name)
821

    
822
    to_act = set()
823
    for node in nodes:
824
      # node_volume
825
      lvs = node_lvs[node]
826

    
827
      if isinstance(lvs, basestring):
828
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
829
        res_nlvm[node] = lvs
830
      elif not isinstance(lvs, dict):
831
        logger.Info("connection to node %s failed or invalid data returned" %
832
                    (node,))
833
        res_nodes.append(node)
834
        continue
835

    
836
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
837
        inst = nv_dict.pop((node, lv_name), None)
838
        if (not lv_online and inst is not None
839
            and inst.name not in res_instances):
840
          res_instances.append(inst.name)
841

    
842
    # any leftover items in nv_dict are missing LVs, let's arrange the
843
    # data better
844
    for key, inst in nv_dict.iteritems():
845
      if inst.name not in res_missing:
846
        res_missing[inst.name] = []
847
      res_missing[inst.name].append(key)
848

    
849
    return result
850

    
851

    
852
class LURenameCluster(LogicalUnit):
853
  """Rename the cluster.
854

855
  """
856
  HPATH = "cluster-rename"
857
  HTYPE = constants.HTYPE_CLUSTER
858
  _OP_REQP = ["name"]
859
  REQ_WSSTORE = True
860

    
861
  def BuildHooksEnv(self):
862
    """Build hooks env.
863

864
    """
865
    env = {
866
      "OP_TARGET": self.sstore.GetClusterName(),
867
      "NEW_NAME": self.op.name,
868
      }
869
    mn = self.sstore.GetMasterNode()
870
    return env, [mn], [mn]
871

    
872
  def CheckPrereq(self):
873
    """Verify that the passed name is a valid one.
874

875
    """
876
    hostname = utils.HostInfo(self.op.name)
877

    
878
    new_name = hostname.name
879
    self.ip = new_ip = hostname.ip
880
    old_name = self.sstore.GetClusterName()
881
    old_ip = self.sstore.GetMasterIP()
882
    if new_name == old_name and new_ip == old_ip:
883
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
884
                                 " cluster has changed")
885
    if new_ip != old_ip:
886
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
887
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
888
                                   " reachable on the network. Aborting." %
889
                                   new_ip)
890

    
891
    self.op.name = new_name
892

    
893
  def Exec(self, feedback_fn):
894
    """Rename the cluster.
895

896
    """
897
    clustername = self.op.name
898
    ip = self.ip
899
    ss = self.sstore
900

    
901
    # shutdown the master IP
902
    master = ss.GetMasterNode()
903
    if not rpc.call_node_stop_master(master):
904
      raise errors.OpExecError("Could not disable the master role")
905

    
906
    try:
907
      # modify the sstore
908
      ss.SetKey(ss.SS_MASTER_IP, ip)
909
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
910

    
911
      # Distribute updated ss config to all nodes
912
      myself = self.cfg.GetNodeInfo(master)
913
      dist_nodes = self.cfg.GetNodeList()
914
      if myself.name in dist_nodes:
915
        dist_nodes.remove(myself.name)
916

    
917
      logger.Debug("Copying updated ssconf data to all nodes")
918
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
919
        fname = ss.KeyToFilename(keyname)
920
        result = rpc.call_upload_file(dist_nodes, fname)
921
        for to_node in dist_nodes:
922
          if not result[to_node]:
923
            logger.Error("copy of file %s to node %s failed" %
924
                         (fname, to_node))
925
    finally:
926
      if not rpc.call_node_start_master(master):
927
        logger.Error("Could not re-enable the master role on the master,"
928
                     " please restart manually.")
929

    
930

    
931
def _RecursiveCheckIfLVMBased(disk):
932
  """Check if the given disk or its children are lvm-based.
933

934
  Args:
935
    disk: ganeti.objects.Disk object
936

937
  Returns:
938
    boolean indicating whether a LD_LV dev_type was found or not
939

940
  """
941
  if disk.children:
942
    for chdisk in disk.children:
943
      if _RecursiveCheckIfLVMBased(chdisk):
944
        return True
945
  return disk.dev_type == constants.LD_LV
946

    
947

    
948
class LUSetClusterParams(LogicalUnit):
949
  """Change the parameters of the cluster.
950

951
  """
952
  HPATH = "cluster-modify"
953
  HTYPE = constants.HTYPE_CLUSTER
954
  _OP_REQP = []
955

    
956
  def BuildHooksEnv(self):
957
    """Build hooks env.
958

959
    """
960
    env = {
961
      "OP_TARGET": self.sstore.GetClusterName(),
962
      "NEW_VG_NAME": self.op.vg_name,
963
      }
964
    mn = self.sstore.GetMasterNode()
965
    return env, [mn], [mn]
966

    
967
  def CheckPrereq(self):
968
    """Check prerequisites.
969

970
    This checks whether the given params don't conflict and
971
    if the given volume group is valid.
972

973
    """
974
    if not self.op.vg_name:
975
      instances = [self.cfg.GetInstanceInfo(name)
976
                   for name in self.cfg.GetInstanceList()]
977
      for inst in instances:
978
        for disk in inst.disks:
979
          if _RecursiveCheckIfLVMBased(disk):
980
            raise errors.OpPrereqError("Cannot disable lvm storage while"
981
                                       " lvm-based instances exist")
982

    
983
    # if vg_name not None, checks given volume group on all nodes
984
    if self.op.vg_name:
985
      node_list = self.cfg.GetNodeList()
986
      vglist = rpc.call_vg_list(node_list)
987
      for node in node_list:
988
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
989
                                              constants.MIN_VG_SIZE)
990
        if vgstatus:
991
          raise errors.OpPrereqError("Error on node '%s': %s" %
992
                                     (node, vgstatus))
993

    
994
  def Exec(self, feedback_fn):
995
    """Change the parameters of the cluster.
996

997
    """
998
    if self.op.vg_name != self.cfg.GetVGName():
999
      self.cfg.SetVGName(self.op.vg_name)
1000
    else:
1001
      feedback_fn("Cluster LVM configuration already in desired"
1002
                  " state, not changing")
1003

    
1004

    
1005
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1006
  """Sleep and poll for an instance's disk to sync.
1007

1008
  """
1009
  if not instance.disks:
1010
    return True
1011

    
1012
  if not oneshot:
1013
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1014

    
1015
  node = instance.primary_node
1016

    
1017
  for dev in instance.disks:
1018
    cfgw.SetDiskID(dev, node)
1019

    
1020
  retries = 0
1021
  while True:
1022
    max_time = 0
1023
    done = True
1024
    cumul_degraded = False
1025
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1026
    if not rstats:
1027
      proc.LogWarning("Can't get any data from node %s" % node)
1028
      retries += 1
1029
      if retries >= 10:
1030
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1031
                                 " aborting." % node)
1032
      time.sleep(6)
1033
      continue
1034
    retries = 0
1035
    for i in range(len(rstats)):
1036
      mstat = rstats[i]
1037
      if mstat is None:
1038
        proc.LogWarning("Can't compute data for node %s/%s" %
1039
                        (node, instance.disks[i].iv_name))
1040
        continue
1041
      # we ignore the ldisk parameter
1042
      perc_done, est_time, is_degraded, _ = mstat
1043
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1044
      if perc_done is not None:
1045
        done = False
1046
        if est_time is not None:
1047
          rem_time = "%d estimated seconds remaining" % est_time
1048
          max_time = est_time
1049
        else:
1050
          rem_time = "no time estimate"
1051
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1052
                     (instance.disks[i].iv_name, perc_done, rem_time))
1053
    if done or oneshot:
1054
      break
1055

    
1056
    if unlock:
1057
      #utils.Unlock('cmd')
1058
      pass
1059
    try:
1060
      time.sleep(min(60, max_time))
1061
    finally:
1062
      if unlock:
1063
        #utils.Lock('cmd')
1064
        pass
1065

    
1066
  if done:
1067
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1068
  return not cumul_degraded
1069

    
1070

    
1071
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1072
  """Check that mirrors are not degraded.
1073

1074
  The ldisk parameter, if True, will change the test from the
1075
  is_degraded attribute (which represents overall non-ok status for
1076
  the device(s)) to the ldisk (representing the local storage status).
1077

1078
  """
1079
  cfgw.SetDiskID(dev, node)
1080
  if ldisk:
1081
    idx = 6
1082
  else:
1083
    idx = 5
1084

    
1085
  result = True
1086
  if on_primary or dev.AssembleOnSecondary():
1087
    rstats = rpc.call_blockdev_find(node, dev)
1088
    if not rstats:
1089
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1090
      result = False
1091
    else:
1092
      result = result and (not rstats[idx])
1093
  if dev.children:
1094
    for child in dev.children:
1095
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1096

    
1097
  return result
1098

    
1099

    
1100
class LUDiagnoseOS(NoHooksLU):
1101
  """Logical unit for OS diagnose/query.
1102

1103
  """
1104
  _OP_REQP = ["output_fields", "names"]
1105

    
1106
  def CheckPrereq(self):
1107
    """Check prerequisites.
1108

1109
    This always succeeds, since this is a pure query LU.
1110

1111
    """
1112
    if self.op.names:
1113
      raise errors.OpPrereqError("Selective OS query not supported")
1114

    
1115
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1116
    _CheckOutputFields(static=[],
1117
                       dynamic=self.dynamic_fields,
1118
                       selected=self.op.output_fields)
1119

    
1120
  @staticmethod
1121
  def _DiagnoseByOS(node_list, rlist):
1122
    """Remaps a per-node return list into an a per-os per-node dictionary
1123

1124
      Args:
1125
        node_list: a list with the names of all nodes
1126
        rlist: a map with node names as keys and OS objects as values
1127

1128
      Returns:
1129
        map: a map with osnames as keys and as value another map, with
1130
             nodes as
1131
             keys and list of OS objects as values
1132
             e.g. {"debian-etch": {"node1": [<object>,...],
1133
                                   "node2": [<object>,]}
1134
                  }
1135

1136
    """
1137
    all_os = {}
1138
    for node_name, nr in rlist.iteritems():
1139
      if not nr:
1140
        continue
1141
      for os_obj in nr:
1142
        if os_obj.name not in all_os:
1143
          # build a list of nodes for this os containing empty lists
1144
          # for each node in node_list
1145
          all_os[os_obj.name] = {}
1146
          for nname in node_list:
1147
            all_os[os_obj.name][nname] = []
1148
        all_os[os_obj.name][node_name].append(os_obj)
1149
    return all_os
1150

    
1151
  def Exec(self, feedback_fn):
1152
    """Compute the list of OSes.
1153

1154
    """
1155
    node_list = self.cfg.GetNodeList()
1156
    node_data = rpc.call_os_diagnose(node_list)
1157
    if node_data == False:
1158
      raise errors.OpExecError("Can't gather the list of OSes")
1159
    pol = self._DiagnoseByOS(node_list, node_data)
1160
    output = []
1161
    for os_name, os_data in pol.iteritems():
1162
      row = []
1163
      for field in self.op.output_fields:
1164
        if field == "name":
1165
          val = os_name
1166
        elif field == "valid":
1167
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1168
        elif field == "node_status":
1169
          val = {}
1170
          for node_name, nos_list in os_data.iteritems():
1171
            val[node_name] = [(v.status, v.path) for v in nos_list]
1172
        else:
1173
          raise errors.ParameterError(field)
1174
        row.append(val)
1175
      output.append(row)
1176

    
1177
    return output
1178

    
1179

    
1180
class LURemoveNode(LogicalUnit):
1181
  """Logical unit for removing a node.
1182

1183
  """
1184
  HPATH = "node-remove"
1185
  HTYPE = constants.HTYPE_NODE
1186
  _OP_REQP = ["node_name"]
1187

    
1188
  def BuildHooksEnv(self):
1189
    """Build hooks env.
1190

1191
    This doesn't run on the target node in the pre phase as a failed
1192
    node would then be impossible to remove.
1193

1194
    """
1195
    env = {
1196
      "OP_TARGET": self.op.node_name,
1197
      "NODE_NAME": self.op.node_name,
1198
      }
1199
    all_nodes = self.cfg.GetNodeList()
1200
    all_nodes.remove(self.op.node_name)
1201
    return env, all_nodes, all_nodes
1202

    
1203
  def CheckPrereq(self):
1204
    """Check prerequisites.
1205

1206
    This checks:
1207
     - the node exists in the configuration
1208
     - it does not have primary or secondary instances
1209
     - it's not the master
1210

1211
    Any errors are signalled by raising errors.OpPrereqError.
1212

1213
    """
1214
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1215
    if node is None:
1216
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1217

    
1218
    instance_list = self.cfg.GetInstanceList()
1219

    
1220
    masternode = self.sstore.GetMasterNode()
1221
    if node.name == masternode:
1222
      raise errors.OpPrereqError("Node is the master node,"
1223
                                 " you need to failover first.")
1224

    
1225
    for instance_name in instance_list:
1226
      instance = self.cfg.GetInstanceInfo(instance_name)
1227
      if node.name == instance.primary_node:
1228
        raise errors.OpPrereqError("Instance %s still running on the node,"
1229
                                   " please remove first." % instance_name)
1230
      if node.name in instance.secondary_nodes:
1231
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1232
                                   " please remove first." % instance_name)
1233
    self.op.node_name = node.name
1234
    self.node = node
1235

    
1236
  def Exec(self, feedback_fn):
1237
    """Removes the node from the cluster.
1238

1239
    """
1240
    node = self.node
1241
    logger.Info("stopping the node daemon and removing configs from node %s" %
1242
                node.name)
1243

    
1244
    rpc.call_node_leave_cluster(node.name)
1245

    
1246
    logger.Info("Removing node %s from config" % node.name)
1247

    
1248
    self.cfg.RemoveNode(node.name)
1249

    
1250
    utils.RemoveHostFromEtcHosts(node.name)
1251

    
1252

    
1253
class LUQueryNodes(NoHooksLU):
1254
  """Logical unit for querying nodes.
1255

1256
  """
1257
  _OP_REQP = ["output_fields", "names"]
1258

    
1259
  def CheckPrereq(self):
1260
    """Check prerequisites.
1261

1262
    This checks that the fields required are valid output fields.
1263

1264
    """
1265
    self.dynamic_fields = frozenset([
1266
      "dtotal", "dfree",
1267
      "mtotal", "mnode", "mfree",
1268
      "bootid",
1269
      "ctotal",
1270
      ])
1271

    
1272
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1273
                               "pinst_list", "sinst_list",
1274
                               "pip", "sip", "tags"],
1275
                       dynamic=self.dynamic_fields,
1276
                       selected=self.op.output_fields)
1277

    
1278
    self.wanted = _GetWantedNodes(self, self.op.names)
1279

    
1280
  def Exec(self, feedback_fn):
1281
    """Computes the list of nodes and their attributes.
1282

1283
    """
1284
    nodenames = self.wanted
1285
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1286

    
1287
    # begin data gathering
1288

    
1289
    if self.dynamic_fields.intersection(self.op.output_fields):
1290
      live_data = {}
1291
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1292
      for name in nodenames:
1293
        nodeinfo = node_data.get(name, None)
1294
        if nodeinfo:
1295
          live_data[name] = {
1296
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1297
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1298
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1299
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1300
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1301
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1302
            "bootid": nodeinfo['bootid'],
1303
            }
1304
        else:
1305
          live_data[name] = {}
1306
    else:
1307
      live_data = dict.fromkeys(nodenames, {})
1308

    
1309
    node_to_primary = dict([(name, set()) for name in nodenames])
1310
    node_to_secondary = dict([(name, set()) for name in nodenames])
1311

    
1312
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1313
                             "sinst_cnt", "sinst_list"))
1314
    if inst_fields & frozenset(self.op.output_fields):
1315
      instancelist = self.cfg.GetInstanceList()
1316

    
1317
      for instance_name in instancelist:
1318
        inst = self.cfg.GetInstanceInfo(instance_name)
1319
        if inst.primary_node in node_to_primary:
1320
          node_to_primary[inst.primary_node].add(inst.name)
1321
        for secnode in inst.secondary_nodes:
1322
          if secnode in node_to_secondary:
1323
            node_to_secondary[secnode].add(inst.name)
1324

    
1325
    # end data gathering
1326

    
1327
    output = []
1328
    for node in nodelist:
1329
      node_output = []
1330
      for field in self.op.output_fields:
1331
        if field == "name":
1332
          val = node.name
1333
        elif field == "pinst_list":
1334
          val = list(node_to_primary[node.name])
1335
        elif field == "sinst_list":
1336
          val = list(node_to_secondary[node.name])
1337
        elif field == "pinst_cnt":
1338
          val = len(node_to_primary[node.name])
1339
        elif field == "sinst_cnt":
1340
          val = len(node_to_secondary[node.name])
1341
        elif field == "pip":
1342
          val = node.primary_ip
1343
        elif field == "sip":
1344
          val = node.secondary_ip
1345
        elif field == "tags":
1346
          val = list(node.GetTags())
1347
        elif field in self.dynamic_fields:
1348
          val = live_data[node.name].get(field, None)
1349
        else:
1350
          raise errors.ParameterError(field)
1351
        node_output.append(val)
1352
      output.append(node_output)
1353

    
1354
    return output
1355

    
1356

    
1357
class LUQueryNodeVolumes(NoHooksLU):
1358
  """Logical unit for getting volumes on node(s).
1359

1360
  """
1361
  _OP_REQP = ["nodes", "output_fields"]
1362

    
1363
  def CheckPrereq(self):
1364
    """Check prerequisites.
1365

1366
    This checks that the fields required are valid output fields.
1367

1368
    """
1369
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1370

    
1371
    _CheckOutputFields(static=["node"],
1372
                       dynamic=["phys", "vg", "name", "size", "instance"],
1373
                       selected=self.op.output_fields)
1374

    
1375

    
1376
  def Exec(self, feedback_fn):
1377
    """Computes the list of nodes and their attributes.
1378

1379
    """
1380
    nodenames = self.nodes
1381
    volumes = rpc.call_node_volumes(nodenames)
1382

    
1383
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1384
             in self.cfg.GetInstanceList()]
1385

    
1386
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1387

    
1388
    output = []
1389
    for node in nodenames:
1390
      if node not in volumes or not volumes[node]:
1391
        continue
1392

    
1393
      node_vols = volumes[node][:]
1394
      node_vols.sort(key=lambda vol: vol['dev'])
1395

    
1396
      for vol in node_vols:
1397
        node_output = []
1398
        for field in self.op.output_fields:
1399
          if field == "node":
1400
            val = node
1401
          elif field == "phys":
1402
            val = vol['dev']
1403
          elif field == "vg":
1404
            val = vol['vg']
1405
          elif field == "name":
1406
            val = vol['name']
1407
          elif field == "size":
1408
            val = int(float(vol['size']))
1409
          elif field == "instance":
1410
            for inst in ilist:
1411
              if node not in lv_by_node[inst]:
1412
                continue
1413
              if vol['name'] in lv_by_node[inst][node]:
1414
                val = inst.name
1415
                break
1416
            else:
1417
              val = '-'
1418
          else:
1419
            raise errors.ParameterError(field)
1420
          node_output.append(str(val))
1421

    
1422
        output.append(node_output)
1423

    
1424
    return output
1425

    
1426

    
1427
class LUAddNode(LogicalUnit):
1428
  """Logical unit for adding node to the cluster.
1429

1430
  """
1431
  HPATH = "node-add"
1432
  HTYPE = constants.HTYPE_NODE
1433
  _OP_REQP = ["node_name"]
1434

    
1435
  def BuildHooksEnv(self):
1436
    """Build hooks env.
1437

1438
    This will run on all nodes before, and on all nodes + the new node after.
1439

1440
    """
1441
    env = {
1442
      "OP_TARGET": self.op.node_name,
1443
      "NODE_NAME": self.op.node_name,
1444
      "NODE_PIP": self.op.primary_ip,
1445
      "NODE_SIP": self.op.secondary_ip,
1446
      }
1447
    nodes_0 = self.cfg.GetNodeList()
1448
    nodes_1 = nodes_0 + [self.op.node_name, ]
1449
    return env, nodes_0, nodes_1
1450

    
1451
  def CheckPrereq(self):
1452
    """Check prerequisites.
1453

1454
    This checks:
1455
     - the new node is not already in the config
1456
     - it is resolvable
1457
     - its parameters (single/dual homed) matches the cluster
1458

1459
    Any errors are signalled by raising errors.OpPrereqError.
1460

1461
    """
1462
    node_name = self.op.node_name
1463
    cfg = self.cfg
1464

    
1465
    dns_data = utils.HostInfo(node_name)
1466

    
1467
    node = dns_data.name
1468
    primary_ip = self.op.primary_ip = dns_data.ip
1469
    secondary_ip = getattr(self.op, "secondary_ip", None)
1470
    if secondary_ip is None:
1471
      secondary_ip = primary_ip
1472
    if not utils.IsValidIP(secondary_ip):
1473
      raise errors.OpPrereqError("Invalid secondary IP given")
1474
    self.op.secondary_ip = secondary_ip
1475

    
1476
    node_list = cfg.GetNodeList()
1477
    if not self.op.readd and node in node_list:
1478
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1479
                                 node)
1480
    elif self.op.readd and node not in node_list:
1481
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1482

    
1483
    for existing_node_name in node_list:
1484
      existing_node = cfg.GetNodeInfo(existing_node_name)
1485

    
1486
      if self.op.readd and node == existing_node_name:
1487
        if (existing_node.primary_ip != primary_ip or
1488
            existing_node.secondary_ip != secondary_ip):
1489
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1490
                                     " address configuration as before")
1491
        continue
1492

    
1493
      if (existing_node.primary_ip == primary_ip or
1494
          existing_node.secondary_ip == primary_ip or
1495
          existing_node.primary_ip == secondary_ip or
1496
          existing_node.secondary_ip == secondary_ip):
1497
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1498
                                   " existing node %s" % existing_node.name)
1499

    
1500
    # check that the type of the node (single versus dual homed) is the
1501
    # same as for the master
1502
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1503
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1504
    newbie_singlehomed = secondary_ip == primary_ip
1505
    if master_singlehomed != newbie_singlehomed:
1506
      if master_singlehomed:
1507
        raise errors.OpPrereqError("The master has no private ip but the"
1508
                                   " new node has one")
1509
      else:
1510
        raise errors.OpPrereqError("The master has a private ip but the"
1511
                                   " new node doesn't have one")
1512

    
1513
    # checks reachablity
1514
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1515
      raise errors.OpPrereqError("Node not reachable by ping")
1516

    
1517
    if not newbie_singlehomed:
1518
      # check reachability from my secondary ip to newbie's secondary ip
1519
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1520
                           source=myself.secondary_ip):
1521
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1522
                                   " based ping to noded port")
1523

    
1524
    self.new_node = objects.Node(name=node,
1525
                                 primary_ip=primary_ip,
1526
                                 secondary_ip=secondary_ip)
1527

    
1528
  def Exec(self, feedback_fn):
1529
    """Adds the new node to the cluster.
1530

1531
    """
1532
    new_node = self.new_node
1533
    node = new_node.name
1534

    
1535
    # check connectivity
1536
    result = rpc.call_version([node])[node]
1537
    if result:
1538
      if constants.PROTOCOL_VERSION == result:
1539
        logger.Info("communication to node %s fine, sw version %s match" %
1540
                    (node, result))
1541
      else:
1542
        raise errors.OpExecError("Version mismatch master version %s,"
1543
                                 " node version %s" %
1544
                                 (constants.PROTOCOL_VERSION, result))
1545
    else:
1546
      raise errors.OpExecError("Cannot get version from the new node")
1547

    
1548
    # setup ssh on node
1549
    logger.Info("copy ssh key to node %s" % node)
1550
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1551
    keyarray = []
1552
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1553
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1554
                priv_key, pub_key]
1555

    
1556
    for i in keyfiles:
1557
      f = open(i, 'r')
1558
      try:
1559
        keyarray.append(f.read())
1560
      finally:
1561
        f.close()
1562

    
1563
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1564
                               keyarray[3], keyarray[4], keyarray[5])
1565

    
1566
    if not result:
1567
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1568

    
1569
    # Add node to our /etc/hosts, and add key to known_hosts
1570
    utils.AddHostToEtcHosts(new_node.name)
1571

    
1572
    if new_node.secondary_ip != new_node.primary_ip:
1573
      if not rpc.call_node_tcp_ping(new_node.name,
1574
                                    constants.LOCALHOST_IP_ADDRESS,
1575
                                    new_node.secondary_ip,
1576
                                    constants.DEFAULT_NODED_PORT,
1577
                                    10, False):
1578
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1579
                                 " you gave (%s). Please fix and re-run this"
1580
                                 " command." % new_node.secondary_ip)
1581

    
1582
    node_verify_list = [self.sstore.GetMasterNode()]
1583
    node_verify_param = {
1584
      'nodelist': [node],
1585
      # TODO: do a node-net-test as well?
1586
    }
1587

    
1588
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1589
    for verifier in node_verify_list:
1590
      if not result[verifier]:
1591
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1592
                                 " for remote verification" % verifier)
1593
      if result[verifier]['nodelist']:
1594
        for failed in result[verifier]['nodelist']:
1595
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1596
                      (verifier, result[verifier]['nodelist'][failed]))
1597
        raise errors.OpExecError("ssh/hostname verification failed.")
1598

    
1599
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1600
    # including the node just added
1601
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1602
    dist_nodes = self.cfg.GetNodeList()
1603
    if not self.op.readd:
1604
      dist_nodes.append(node)
1605
    if myself.name in dist_nodes:
1606
      dist_nodes.remove(myself.name)
1607

    
1608
    logger.Debug("Copying hosts and known_hosts to all nodes")
1609
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1610
      result = rpc.call_upload_file(dist_nodes, fname)
1611
      for to_node in dist_nodes:
1612
        if not result[to_node]:
1613
          logger.Error("copy of file %s to node %s failed" %
1614
                       (fname, to_node))
1615

    
1616
    to_copy = self.sstore.GetFileList()
1617
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1618
      to_copy.append(constants.VNC_PASSWORD_FILE)
1619
    for fname in to_copy:
1620
      result = rpc.call_upload_file([node], fname)
1621
      if not result[node]:
1622
        logger.Error("could not copy file %s to node %s" % (fname, node))
1623

    
1624
    if not self.op.readd:
1625
      logger.Info("adding node %s to cluster.conf" % node)
1626
      self.cfg.AddNode(new_node)
1627

    
1628

    
1629
class LUMasterFailover(LogicalUnit):
1630
  """Failover the master node to the current node.
1631

1632
  This is a special LU in that it must run on a non-master node.
1633

1634
  """
1635
  HPATH = "master-failover"
1636
  HTYPE = constants.HTYPE_CLUSTER
1637
  REQ_MASTER = False
1638
  REQ_WSSTORE = True
1639
  _OP_REQP = []
1640

    
1641
  def BuildHooksEnv(self):
1642
    """Build hooks env.
1643

1644
    This will run on the new master only in the pre phase, and on all
1645
    the nodes in the post phase.
1646

1647
    """
1648
    env = {
1649
      "OP_TARGET": self.new_master,
1650
      "NEW_MASTER": self.new_master,
1651
      "OLD_MASTER": self.old_master,
1652
      }
1653
    return env, [self.new_master], self.cfg.GetNodeList()
1654

    
1655
  def CheckPrereq(self):
1656
    """Check prerequisites.
1657

1658
    This checks that we are not already the master.
1659

1660
    """
1661
    self.new_master = utils.HostInfo().name
1662
    self.old_master = self.sstore.GetMasterNode()
1663

    
1664
    if self.old_master == self.new_master:
1665
      raise errors.OpPrereqError("This commands must be run on the node"
1666
                                 " where you want the new master to be."
1667
                                 " %s is already the master" %
1668
                                 self.old_master)
1669

    
1670
  def Exec(self, feedback_fn):
1671
    """Failover the master node.
1672

1673
    This command, when run on a non-master node, will cause the current
1674
    master to cease being master, and the non-master to become new
1675
    master.
1676

1677
    """
1678
    #TODO: do not rely on gethostname returning the FQDN
1679
    logger.Info("setting master to %s, old master: %s" %
1680
                (self.new_master, self.old_master))
1681

    
1682
    if not rpc.call_node_stop_master(self.old_master):
1683
      logger.Error("could disable the master role on the old master"
1684
                   " %s, please disable manually" % self.old_master)
1685

    
1686
    ss = self.sstore
1687
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1688
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1689
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1690
      logger.Error("could not distribute the new simple store master file"
1691
                   " to the other nodes, please check.")
1692

    
1693
    if not rpc.call_node_start_master(self.new_master):
1694
      logger.Error("could not start the master role on the new master"
1695
                   " %s, please check" % self.new_master)
1696
      feedback_fn("Error in activating the master IP on the new master,"
1697
                  " please fix manually.")
1698

    
1699

    
1700

    
1701
class LUQueryClusterInfo(NoHooksLU):
1702
  """Query cluster configuration.
1703

1704
  """
1705
  _OP_REQP = []
1706
  REQ_MASTER = False
1707

    
1708
  def CheckPrereq(self):
1709
    """No prerequsites needed for this LU.
1710

1711
    """
1712
    pass
1713

    
1714
  def Exec(self, feedback_fn):
1715
    """Return cluster config.
1716

1717
    """
1718
    result = {
1719
      "name": self.sstore.GetClusterName(),
1720
      "software_version": constants.RELEASE_VERSION,
1721
      "protocol_version": constants.PROTOCOL_VERSION,
1722
      "config_version": constants.CONFIG_VERSION,
1723
      "os_api_version": constants.OS_API_VERSION,
1724
      "export_version": constants.EXPORT_VERSION,
1725
      "master": self.sstore.GetMasterNode(),
1726
      "architecture": (platform.architecture()[0], platform.machine()),
1727
      "hypervisor_type": self.sstore.GetHypervisorType(),
1728
      }
1729

    
1730
    return result
1731

    
1732

    
1733
class LUDumpClusterConfig(NoHooksLU):
1734
  """Return a text-representation of the cluster-config.
1735

1736
  """
1737
  _OP_REQP = []
1738

    
1739
  def CheckPrereq(self):
1740
    """No prerequisites.
1741

1742
    """
1743
    pass
1744

    
1745
  def Exec(self, feedback_fn):
1746
    """Dump a representation of the cluster config to the standard output.
1747

1748
    """
1749
    return self.cfg.DumpConfig()
1750

    
1751

    
1752
class LUActivateInstanceDisks(NoHooksLU):
1753
  """Bring up an instance's disks.
1754

1755
  """
1756
  _OP_REQP = ["instance_name"]
1757

    
1758
  def CheckPrereq(self):
1759
    """Check prerequisites.
1760

1761
    This checks that the instance is in the cluster.
1762

1763
    """
1764
    instance = self.cfg.GetInstanceInfo(
1765
      self.cfg.ExpandInstanceName(self.op.instance_name))
1766
    if instance is None:
1767
      raise errors.OpPrereqError("Instance '%s' not known" %
1768
                                 self.op.instance_name)
1769
    self.instance = instance
1770

    
1771

    
1772
  def Exec(self, feedback_fn):
1773
    """Activate the disks.
1774

1775
    """
1776
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1777
    if not disks_ok:
1778
      raise errors.OpExecError("Cannot activate block devices")
1779

    
1780
    return disks_info
1781

    
1782

    
1783
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1784
  """Prepare the block devices for an instance.
1785

1786
  This sets up the block devices on all nodes.
1787

1788
  Args:
1789
    instance: a ganeti.objects.Instance object
1790
    ignore_secondaries: if true, errors on secondary nodes won't result
1791
                        in an error return from the function
1792

1793
  Returns:
1794
    false if the operation failed
1795
    list of (host, instance_visible_name, node_visible_name) if the operation
1796
         suceeded with the mapping from node devices to instance devices
1797
  """
1798
  device_info = []
1799
  disks_ok = True
1800
  iname = instance.name
1801
  # With the two passes mechanism we try to reduce the window of
1802
  # opportunity for the race condition of switching DRBD to primary
1803
  # before handshaking occured, but we do not eliminate it
1804

    
1805
  # The proper fix would be to wait (with some limits) until the
1806
  # connection has been made and drbd transitions from WFConnection
1807
  # into any other network-connected state (Connected, SyncTarget,
1808
  # SyncSource, etc.)
1809

    
1810
  # 1st pass, assemble on all nodes in secondary mode
1811
  for inst_disk in instance.disks:
1812
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1813
      cfg.SetDiskID(node_disk, node)
1814
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1815
      if not result:
1816
        logger.Error("could not prepare block device %s on node %s"
1817
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1818
        if not ignore_secondaries:
1819
          disks_ok = False
1820

    
1821
  # FIXME: race condition on drbd migration to primary
1822

    
1823
  # 2nd pass, do only the primary node
1824
  for inst_disk in instance.disks:
1825
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1826
      if node != instance.primary_node:
1827
        continue
1828
      cfg.SetDiskID(node_disk, node)
1829
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1830
      if not result:
1831
        logger.Error("could not prepare block device %s on node %s"
1832
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1833
        disks_ok = False
1834
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1835

    
1836
  # leave the disks configured for the primary node
1837
  # this is a workaround that would be fixed better by
1838
  # improving the logical/physical id handling
1839
  for disk in instance.disks:
1840
    cfg.SetDiskID(disk, instance.primary_node)
1841

    
1842
  return disks_ok, device_info
1843

    
1844

    
1845
def _StartInstanceDisks(cfg, instance, force):
1846
  """Start the disks of an instance.
1847

1848
  """
1849
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1850
                                           ignore_secondaries=force)
1851
  if not disks_ok:
1852
    _ShutdownInstanceDisks(instance, cfg)
1853
    if force is not None and not force:
1854
      logger.Error("If the message above refers to a secondary node,"
1855
                   " you can retry the operation using '--force'.")
1856
    raise errors.OpExecError("Disk consistency error")
1857

    
1858

    
1859
class LUDeactivateInstanceDisks(NoHooksLU):
1860
  """Shutdown an instance's disks.
1861

1862
  """
1863
  _OP_REQP = ["instance_name"]
1864

    
1865
  def CheckPrereq(self):
1866
    """Check prerequisites.
1867

1868
    This checks that the instance is in the cluster.
1869

1870
    """
1871
    instance = self.cfg.GetInstanceInfo(
1872
      self.cfg.ExpandInstanceName(self.op.instance_name))
1873
    if instance is None:
1874
      raise errors.OpPrereqError("Instance '%s' not known" %
1875
                                 self.op.instance_name)
1876
    self.instance = instance
1877

    
1878
  def Exec(self, feedback_fn):
1879
    """Deactivate the disks
1880

1881
    """
1882
    instance = self.instance
1883
    ins_l = rpc.call_instance_list([instance.primary_node])
1884
    ins_l = ins_l[instance.primary_node]
1885
    if not type(ins_l) is list:
1886
      raise errors.OpExecError("Can't contact node '%s'" %
1887
                               instance.primary_node)
1888

    
1889
    if self.instance.name in ins_l:
1890
      raise errors.OpExecError("Instance is running, can't shutdown"
1891
                               " block devices.")
1892

    
1893
    _ShutdownInstanceDisks(instance, self.cfg)
1894

    
1895

    
1896
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1897
  """Shutdown block devices of an instance.
1898

1899
  This does the shutdown on all nodes of the instance.
1900

1901
  If the ignore_primary is false, errors on the primary node are
1902
  ignored.
1903

1904
  """
1905
  result = True
1906
  for disk in instance.disks:
1907
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1908
      cfg.SetDiskID(top_disk, node)
1909
      if not rpc.call_blockdev_shutdown(node, top_disk):
1910
        logger.Error("could not shutdown block device %s on node %s" %
1911
                     (disk.iv_name, node))
1912
        if not ignore_primary or node != instance.primary_node:
1913
          result = False
1914
  return result
1915

    
1916

    
1917
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1918
  """Checks if a node has enough free memory.
1919

1920
  This function check if a given node has the needed amount of free
1921
  memory. In case the node has less memory or we cannot get the
1922
  information from the node, this function raise an OpPrereqError
1923
  exception.
1924

1925
  Args:
1926
    - cfg: a ConfigWriter instance
1927
    - node: the node name
1928
    - reason: string to use in the error message
1929
    - requested: the amount of memory in MiB
1930

1931
  """
1932
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1933
  if not nodeinfo or not isinstance(nodeinfo, dict):
1934
    raise errors.OpPrereqError("Could not contact node %s for resource"
1935
                             " information" % (node,))
1936

    
1937
  free_mem = nodeinfo[node].get('memory_free')
1938
  if not isinstance(free_mem, int):
1939
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1940
                             " was '%s'" % (node, free_mem))
1941
  if requested > free_mem:
1942
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1943
                             " needed %s MiB, available %s MiB" %
1944
                             (node, reason, requested, free_mem))
1945

    
1946

    
1947
class LUStartupInstance(LogicalUnit):
1948
  """Starts an instance.
1949

1950
  """
1951
  HPATH = "instance-start"
1952
  HTYPE = constants.HTYPE_INSTANCE
1953
  _OP_REQP = ["instance_name", "force"]
1954

    
1955
  def BuildHooksEnv(self):
1956
    """Build hooks env.
1957

1958
    This runs on master, primary and secondary nodes of the instance.
1959

1960
    """
1961
    env = {
1962
      "FORCE": self.op.force,
1963
      }
1964
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1965
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1966
          list(self.instance.secondary_nodes))
1967
    return env, nl, nl
1968

    
1969
  def CheckPrereq(self):
1970
    """Check prerequisites.
1971

1972
    This checks that the instance is in the cluster.
1973

1974
    """
1975
    instance = self.cfg.GetInstanceInfo(
1976
      self.cfg.ExpandInstanceName(self.op.instance_name))
1977
    if instance is None:
1978
      raise errors.OpPrereqError("Instance '%s' not known" %
1979
                                 self.op.instance_name)
1980

    
1981
    # check bridges existance
1982
    _CheckInstanceBridgesExist(instance)
1983

    
1984
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
1985
                         "starting instance %s" % instance.name,
1986
                         instance.memory)
1987

    
1988
    self.instance = instance
1989
    self.op.instance_name = instance.name
1990

    
1991
  def Exec(self, feedback_fn):
1992
    """Start the instance.
1993

1994
    """
1995
    instance = self.instance
1996
    force = self.op.force
1997
    extra_args = getattr(self.op, "extra_args", "")
1998

    
1999
    self.cfg.MarkInstanceUp(instance.name)
2000

    
2001
    node_current = instance.primary_node
2002

    
2003
    _StartInstanceDisks(self.cfg, instance, force)
2004

    
2005
    if not rpc.call_instance_start(node_current, instance, extra_args):
2006
      _ShutdownInstanceDisks(instance, self.cfg)
2007
      raise errors.OpExecError("Could not start instance")
2008

    
2009

    
2010
class LURebootInstance(LogicalUnit):
2011
  """Reboot an instance.
2012

2013
  """
2014
  HPATH = "instance-reboot"
2015
  HTYPE = constants.HTYPE_INSTANCE
2016
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2017

    
2018
  def BuildHooksEnv(self):
2019
    """Build hooks env.
2020

2021
    This runs on master, primary and secondary nodes of the instance.
2022

2023
    """
2024
    env = {
2025
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2026
      }
2027
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2028
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2029
          list(self.instance.secondary_nodes))
2030
    return env, nl, nl
2031

    
2032
  def CheckPrereq(self):
2033
    """Check prerequisites.
2034

2035
    This checks that the instance is in the cluster.
2036

2037
    """
2038
    instance = self.cfg.GetInstanceInfo(
2039
      self.cfg.ExpandInstanceName(self.op.instance_name))
2040
    if instance is None:
2041
      raise errors.OpPrereqError("Instance '%s' not known" %
2042
                                 self.op.instance_name)
2043

    
2044
    # check bridges existance
2045
    _CheckInstanceBridgesExist(instance)
2046

    
2047
    self.instance = instance
2048
    self.op.instance_name = instance.name
2049

    
2050
  def Exec(self, feedback_fn):
2051
    """Reboot the instance.
2052

2053
    """
2054
    instance = self.instance
2055
    ignore_secondaries = self.op.ignore_secondaries
2056
    reboot_type = self.op.reboot_type
2057
    extra_args = getattr(self.op, "extra_args", "")
2058

    
2059
    node_current = instance.primary_node
2060

    
2061
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2062
                           constants.INSTANCE_REBOOT_HARD,
2063
                           constants.INSTANCE_REBOOT_FULL]:
2064
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2065
                                  (constants.INSTANCE_REBOOT_SOFT,
2066
                                   constants.INSTANCE_REBOOT_HARD,
2067
                                   constants.INSTANCE_REBOOT_FULL))
2068

    
2069
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2070
                       constants.INSTANCE_REBOOT_HARD]:
2071
      if not rpc.call_instance_reboot(node_current, instance,
2072
                                      reboot_type, extra_args):
2073
        raise errors.OpExecError("Could not reboot instance")
2074
    else:
2075
      if not rpc.call_instance_shutdown(node_current, instance):
2076
        raise errors.OpExecError("could not shutdown instance for full reboot")
2077
      _ShutdownInstanceDisks(instance, self.cfg)
2078
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2079
      if not rpc.call_instance_start(node_current, instance, extra_args):
2080
        _ShutdownInstanceDisks(instance, self.cfg)
2081
        raise errors.OpExecError("Could not start instance for full reboot")
2082

    
2083
    self.cfg.MarkInstanceUp(instance.name)
2084

    
2085

    
2086
class LUShutdownInstance(LogicalUnit):
2087
  """Shutdown an instance.
2088

2089
  """
2090
  HPATH = "instance-stop"
2091
  HTYPE = constants.HTYPE_INSTANCE
2092
  _OP_REQP = ["instance_name"]
2093

    
2094
  def BuildHooksEnv(self):
2095
    """Build hooks env.
2096

2097
    This runs on master, primary and secondary nodes of the instance.
2098

2099
    """
2100
    env = _BuildInstanceHookEnvByObject(self.instance)
2101
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2102
          list(self.instance.secondary_nodes))
2103
    return env, nl, nl
2104

    
2105
  def CheckPrereq(self):
2106
    """Check prerequisites.
2107

2108
    This checks that the instance is in the cluster.
2109

2110
    """
2111
    instance = self.cfg.GetInstanceInfo(
2112
      self.cfg.ExpandInstanceName(self.op.instance_name))
2113
    if instance is None:
2114
      raise errors.OpPrereqError("Instance '%s' not known" %
2115
                                 self.op.instance_name)
2116
    self.instance = instance
2117

    
2118
  def Exec(self, feedback_fn):
2119
    """Shutdown the instance.
2120

2121
    """
2122
    instance = self.instance
2123
    node_current = instance.primary_node
2124
    self.cfg.MarkInstanceDown(instance.name)
2125
    if not rpc.call_instance_shutdown(node_current, instance):
2126
      logger.Error("could not shutdown instance")
2127

    
2128
    _ShutdownInstanceDisks(instance, self.cfg)
2129

    
2130

    
2131
class LUReinstallInstance(LogicalUnit):
2132
  """Reinstall an instance.
2133

2134
  """
2135
  HPATH = "instance-reinstall"
2136
  HTYPE = constants.HTYPE_INSTANCE
2137
  _OP_REQP = ["instance_name"]
2138

    
2139
  def BuildHooksEnv(self):
2140
    """Build hooks env.
2141

2142
    This runs on master, primary and secondary nodes of the instance.
2143

2144
    """
2145
    env = _BuildInstanceHookEnvByObject(self.instance)
2146
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2147
          list(self.instance.secondary_nodes))
2148
    return env, nl, nl
2149

    
2150
  def CheckPrereq(self):
2151
    """Check prerequisites.
2152

2153
    This checks that the instance is in the cluster and is not running.
2154

2155
    """
2156
    instance = self.cfg.GetInstanceInfo(
2157
      self.cfg.ExpandInstanceName(self.op.instance_name))
2158
    if instance is None:
2159
      raise errors.OpPrereqError("Instance '%s' not known" %
2160
                                 self.op.instance_name)
2161
    if instance.disk_template == constants.DT_DISKLESS:
2162
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2163
                                 self.op.instance_name)
2164
    if instance.status != "down":
2165
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2166
                                 self.op.instance_name)
2167
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2168
    if remote_info:
2169
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2170
                                 (self.op.instance_name,
2171
                                  instance.primary_node))
2172

    
2173
    self.op.os_type = getattr(self.op, "os_type", None)
2174
    if self.op.os_type is not None:
2175
      # OS verification
2176
      pnode = self.cfg.GetNodeInfo(
2177
        self.cfg.ExpandNodeName(instance.primary_node))
2178
      if pnode is None:
2179
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2180
                                   self.op.pnode)
2181
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2182
      if not os_obj:
2183
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2184
                                   " primary node"  % self.op.os_type)
2185

    
2186
    self.instance = instance
2187

    
2188
  def Exec(self, feedback_fn):
2189
    """Reinstall the instance.
2190

2191
    """
2192
    inst = self.instance
2193

    
2194
    if self.op.os_type is not None:
2195
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2196
      inst.os = self.op.os_type
2197
      self.cfg.AddInstance(inst)
2198

    
2199
    _StartInstanceDisks(self.cfg, inst, None)
2200
    try:
2201
      feedback_fn("Running the instance OS create scripts...")
2202
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2203
        raise errors.OpExecError("Could not install OS for instance %s"
2204
                                 " on node %s" %
2205
                                 (inst.name, inst.primary_node))
2206
    finally:
2207
      _ShutdownInstanceDisks(inst, self.cfg)
2208

    
2209

    
2210
class LURenameInstance(LogicalUnit):
2211
  """Rename an instance.
2212

2213
  """
2214
  HPATH = "instance-rename"
2215
  HTYPE = constants.HTYPE_INSTANCE
2216
  _OP_REQP = ["instance_name", "new_name"]
2217

    
2218
  def BuildHooksEnv(self):
2219
    """Build hooks env.
2220

2221
    This runs on master, primary and secondary nodes of the instance.
2222

2223
    """
2224
    env = _BuildInstanceHookEnvByObject(self.instance)
2225
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2226
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2227
          list(self.instance.secondary_nodes))
2228
    return env, nl, nl
2229

    
2230
  def CheckPrereq(self):
2231
    """Check prerequisites.
2232

2233
    This checks that the instance is in the cluster and is not running.
2234

2235
    """
2236
    instance = self.cfg.GetInstanceInfo(
2237
      self.cfg.ExpandInstanceName(self.op.instance_name))
2238
    if instance is None:
2239
      raise errors.OpPrereqError("Instance '%s' not known" %
2240
                                 self.op.instance_name)
2241
    if instance.status != "down":
2242
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2243
                                 self.op.instance_name)
2244
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2245
    if remote_info:
2246
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2247
                                 (self.op.instance_name,
2248
                                  instance.primary_node))
2249
    self.instance = instance
2250

    
2251
    # new name verification
2252
    name_info = utils.HostInfo(self.op.new_name)
2253

    
2254
    self.op.new_name = new_name = name_info.name
2255
    instance_list = self.cfg.GetInstanceList()
2256
    if new_name in instance_list:
2257
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2258
                                 new_name)
2259

    
2260
    if not getattr(self.op, "ignore_ip", False):
2261
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2262
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2263
                                   (name_info.ip, new_name))
2264

    
2265

    
2266
  def Exec(self, feedback_fn):
2267
    """Reinstall the instance.
2268

2269
    """
2270
    inst = self.instance
2271
    old_name = inst.name
2272

    
2273
    if inst.disk_template == constants.DT_FILE:
2274
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2275

    
2276
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2277

    
2278
    # re-read the instance from the configuration after rename
2279
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2280

    
2281
    if inst.disk_template == constants.DT_FILE:
2282
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2283
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2284
                                                old_file_storage_dir,
2285
                                                new_file_storage_dir)
2286

    
2287
      if not result:
2288
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2289
                                 " directory '%s' to '%s' (but the instance"
2290
                                 " has been renamed in Ganeti)" % (
2291
                                 inst.primary_node, old_file_storage_dir,
2292
                                 new_file_storage_dir))
2293

    
2294
      if not result[0]:
2295
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2296
                                 " (but the instance has been renamed in"
2297
                                 " Ganeti)" % (old_file_storage_dir,
2298
                                               new_file_storage_dir))
2299

    
2300
    _StartInstanceDisks(self.cfg, inst, None)
2301
    try:
2302
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2303
                                          "sda", "sdb"):
2304
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2305
               " instance has been renamed in Ganeti)" %
2306
               (inst.name, inst.primary_node))
2307
        logger.Error(msg)
2308
    finally:
2309
      _ShutdownInstanceDisks(inst, self.cfg)
2310

    
2311

    
2312
class LURemoveInstance(LogicalUnit):
2313
  """Remove an instance.
2314

2315
  """
2316
  HPATH = "instance-remove"
2317
  HTYPE = constants.HTYPE_INSTANCE
2318
  _OP_REQP = ["instance_name", "ignore_failures"]
2319

    
2320
  def BuildHooksEnv(self):
2321
    """Build hooks env.
2322

2323
    This runs on master, primary and secondary nodes of the instance.
2324

2325
    """
2326
    env = _BuildInstanceHookEnvByObject(self.instance)
2327
    nl = [self.sstore.GetMasterNode()]
2328
    return env, nl, nl
2329

    
2330
  def CheckPrereq(self):
2331
    """Check prerequisites.
2332

2333
    This checks that the instance is in the cluster.
2334

2335
    """
2336
    instance = self.cfg.GetInstanceInfo(
2337
      self.cfg.ExpandInstanceName(self.op.instance_name))
2338
    if instance is None:
2339
      raise errors.OpPrereqError("Instance '%s' not known" %
2340
                                 self.op.instance_name)
2341
    self.instance = instance
2342

    
2343
  def Exec(self, feedback_fn):
2344
    """Remove the instance.
2345

2346
    """
2347
    instance = self.instance
2348
    logger.Info("shutting down instance %s on node %s" %
2349
                (instance.name, instance.primary_node))
2350

    
2351
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2352
      if self.op.ignore_failures:
2353
        feedback_fn("Warning: can't shutdown instance")
2354
      else:
2355
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2356
                                 (instance.name, instance.primary_node))
2357

    
2358
    logger.Info("removing block devices for instance %s" % instance.name)
2359

    
2360
    if not _RemoveDisks(instance, self.cfg):
2361
      if self.op.ignore_failures:
2362
        feedback_fn("Warning: can't remove instance's disks")
2363
      else:
2364
        raise errors.OpExecError("Can't remove instance's disks")
2365

    
2366
    logger.Info("removing instance %s out of cluster config" % instance.name)
2367

    
2368
    self.cfg.RemoveInstance(instance.name)
2369

    
2370

    
2371
class LUQueryInstances(NoHooksLU):
2372
  """Logical unit for querying instances.
2373

2374
  """
2375
  _OP_REQP = ["output_fields", "names"]
2376

    
2377
  def CheckPrereq(self):
2378
    """Check prerequisites.
2379

2380
    This checks that the fields required are valid output fields.
2381

2382
    """
2383
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2384
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2385
                               "admin_state", "admin_ram",
2386
                               "disk_template", "ip", "mac", "bridge",
2387
                               "sda_size", "sdb_size", "vcpus", "tags"],
2388
                       dynamic=self.dynamic_fields,
2389
                       selected=self.op.output_fields)
2390

    
2391
    self.wanted = _GetWantedInstances(self, self.op.names)
2392

    
2393
  def Exec(self, feedback_fn):
2394
    """Computes the list of nodes and their attributes.
2395

2396
    """
2397
    instance_names = self.wanted
2398
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2399
                     in instance_names]
2400

    
2401
    # begin data gathering
2402

    
2403
    nodes = frozenset([inst.primary_node for inst in instance_list])
2404

    
2405
    bad_nodes = []
2406
    if self.dynamic_fields.intersection(self.op.output_fields):
2407
      live_data = {}
2408
      node_data = rpc.call_all_instances_info(nodes)
2409
      for name in nodes:
2410
        result = node_data[name]
2411
        if result:
2412
          live_data.update(result)
2413
        elif result == False:
2414
          bad_nodes.append(name)
2415
        # else no instance is alive
2416
    else:
2417
      live_data = dict([(name, {}) for name in instance_names])
2418

    
2419
    # end data gathering
2420

    
2421
    output = []
2422
    for instance in instance_list:
2423
      iout = []
2424
      for field in self.op.output_fields:
2425
        if field == "name":
2426
          val = instance.name
2427
        elif field == "os":
2428
          val = instance.os
2429
        elif field == "pnode":
2430
          val = instance.primary_node
2431
        elif field == "snodes":
2432
          val = list(instance.secondary_nodes)
2433
        elif field == "admin_state":
2434
          val = (instance.status != "down")
2435
        elif field == "oper_state":
2436
          if instance.primary_node in bad_nodes:
2437
            val = None
2438
          else:
2439
            val = bool(live_data.get(instance.name))
2440
        elif field == "status":
2441
          if instance.primary_node in bad_nodes:
2442
            val = "ERROR_nodedown"
2443
          else:
2444
            running = bool(live_data.get(instance.name))
2445
            if running:
2446
              if instance.status != "down":
2447
                val = "running"
2448
              else:
2449
                val = "ERROR_up"
2450
            else:
2451
              if instance.status != "down":
2452
                val = "ERROR_down"
2453
              else:
2454
                val = "ADMIN_down"
2455
        elif field == "admin_ram":
2456
          val = instance.memory
2457
        elif field == "oper_ram":
2458
          if instance.primary_node in bad_nodes:
2459
            val = None
2460
          elif instance.name in live_data:
2461
            val = live_data[instance.name].get("memory", "?")
2462
          else:
2463
            val = "-"
2464
        elif field == "disk_template":
2465
          val = instance.disk_template
2466
        elif field == "ip":
2467
          val = instance.nics[0].ip
2468
        elif field == "bridge":
2469
          val = instance.nics[0].bridge
2470
        elif field == "mac":
2471
          val = instance.nics[0].mac
2472
        elif field == "sda_size" or field == "sdb_size":
2473
          disk = instance.FindDisk(field[:3])
2474
          if disk is None:
2475
            val = None
2476
          else:
2477
            val = disk.size
2478
        elif field == "vcpus":
2479
          val = instance.vcpus
2480
        elif field == "tags":
2481
          val = list(instance.GetTags())
2482
        else:
2483
          raise errors.ParameterError(field)
2484
        iout.append(val)
2485
      output.append(iout)
2486

    
2487
    return output
2488

    
2489

    
2490
class LUFailoverInstance(LogicalUnit):
2491
  """Failover an instance.
2492

2493
  """
2494
  HPATH = "instance-failover"
2495
  HTYPE = constants.HTYPE_INSTANCE
2496
  _OP_REQP = ["instance_name", "ignore_consistency"]
2497

    
2498
  def BuildHooksEnv(self):
2499
    """Build hooks env.
2500

2501
    This runs on master, primary and secondary nodes of the instance.
2502

2503
    """
2504
    env = {
2505
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2506
      }
2507
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2508
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2509
    return env, nl, nl
2510

    
2511
  def CheckPrereq(self):
2512
    """Check prerequisites.
2513

2514
    This checks that the instance is in the cluster.
2515

2516
    """
2517
    instance = self.cfg.GetInstanceInfo(
2518
      self.cfg.ExpandInstanceName(self.op.instance_name))
2519
    if instance is None:
2520
      raise errors.OpPrereqError("Instance '%s' not known" %
2521
                                 self.op.instance_name)
2522

    
2523
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2524
      raise errors.OpPrereqError("Instance's disk layout is not"
2525
                                 " network mirrored, cannot failover.")
2526

    
2527
    secondary_nodes = instance.secondary_nodes
2528
    if not secondary_nodes:
2529
      raise errors.ProgrammerError("no secondary node but using "
2530
                                   "a mirrored disk template")
2531

    
2532
    target_node = secondary_nodes[0]
2533
    # check memory requirements on the secondary node
2534
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2535
                         instance.name, instance.memory)
2536

    
2537
    # check bridge existance
2538
    brlist = [nic.bridge for nic in instance.nics]
2539
    if not rpc.call_bridges_exist(target_node, brlist):
2540
      raise errors.OpPrereqError("One or more target bridges %s does not"
2541
                                 " exist on destination node '%s'" %
2542
                                 (brlist, target_node))
2543

    
2544
    self.instance = instance
2545

    
2546
  def Exec(self, feedback_fn):
2547
    """Failover an instance.
2548

2549
    The failover is done by shutting it down on its present node and
2550
    starting it on the secondary.
2551

2552
    """
2553
    instance = self.instance
2554

    
2555
    source_node = instance.primary_node
2556
    target_node = instance.secondary_nodes[0]
2557

    
2558
    feedback_fn("* checking disk consistency between source and target")
2559
    for dev in instance.disks:
2560
      # for drbd, these are drbd over lvm
2561
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2562
        if instance.status == "up" and not self.op.ignore_consistency:
2563
          raise errors.OpExecError("Disk %s is degraded on target node,"
2564
                                   " aborting failover." % dev.iv_name)
2565

    
2566
    feedback_fn("* shutting down instance on source node")
2567
    logger.Info("Shutting down instance %s on node %s" %
2568
                (instance.name, source_node))
2569

    
2570
    if not rpc.call_instance_shutdown(source_node, instance):
2571
      if self.op.ignore_consistency:
2572
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2573
                     " anyway. Please make sure node %s is down"  %
2574
                     (instance.name, source_node, source_node))
2575
      else:
2576
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2577
                                 (instance.name, source_node))
2578

    
2579
    feedback_fn("* deactivating the instance's disks on source node")
2580
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2581
      raise errors.OpExecError("Can't shut down the instance's disks.")
2582

    
2583
    instance.primary_node = target_node
2584
    # distribute new instance config to the other nodes
2585
    self.cfg.Update(instance)
2586

    
2587
    # Only start the instance if it's marked as up
2588
    if instance.status == "up":
2589
      feedback_fn("* activating the instance's disks on target node")
2590
      logger.Info("Starting instance %s on node %s" %
2591
                  (instance.name, target_node))
2592

    
2593
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2594
                                               ignore_secondaries=True)
2595
      if not disks_ok:
2596
        _ShutdownInstanceDisks(instance, self.cfg)
2597
        raise errors.OpExecError("Can't activate the instance's disks")
2598

    
2599
      feedback_fn("* starting the instance on the target node")
2600
      if not rpc.call_instance_start(target_node, instance, None):
2601
        _ShutdownInstanceDisks(instance, self.cfg)
2602
        raise errors.OpExecError("Could not start instance %s on node %s." %
2603
                                 (instance.name, target_node))
2604

    
2605

    
2606
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2607
  """Create a tree of block devices on the primary node.
2608

2609
  This always creates all devices.
2610

2611
  """
2612
  if device.children:
2613
    for child in device.children:
2614
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2615
        return False
2616

    
2617
  cfg.SetDiskID(device, node)
2618
  new_id = rpc.call_blockdev_create(node, device, device.size,
2619
                                    instance.name, True, info)
2620
  if not new_id:
2621
    return False
2622
  if device.physical_id is None:
2623
    device.physical_id = new_id
2624
  return True
2625

    
2626

    
2627
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2628
  """Create a tree of block devices on a secondary node.
2629

2630
  If this device type has to be created on secondaries, create it and
2631
  all its children.
2632

2633
  If not, just recurse to children keeping the same 'force' value.
2634

2635
  """
2636
  if device.CreateOnSecondary():
2637
    force = True
2638
  if device.children:
2639
    for child in device.children:
2640
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2641
                                        child, force, info):
2642
        return False
2643

    
2644
  if not force:
2645
    return True
2646
  cfg.SetDiskID(device, node)
2647
  new_id = rpc.call_blockdev_create(node, device, device.size,
2648
                                    instance.name, False, info)
2649
  if not new_id:
2650
    return False
2651
  if device.physical_id is None:
2652
    device.physical_id = new_id
2653
  return True
2654

    
2655

    
2656
def _GenerateUniqueNames(cfg, exts):
2657
  """Generate a suitable LV name.
2658

2659
  This will generate a logical volume name for the given instance.
2660

2661
  """
2662
  results = []
2663
  for val in exts:
2664
    new_id = cfg.GenerateUniqueID()
2665
    results.append("%s%s" % (new_id, val))
2666
  return results
2667

    
2668

    
2669
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2670
  """Generate a drbd8 device complete with its children.
2671

2672
  """
2673
  port = cfg.AllocatePort()
2674
  vgname = cfg.GetVGName()
2675
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2676
                          logical_id=(vgname, names[0]))
2677
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2678
                          logical_id=(vgname, names[1]))
2679
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2680
                          logical_id = (primary, secondary, port),
2681
                          children = [dev_data, dev_meta],
2682
                          iv_name=iv_name)
2683
  return drbd_dev
2684

    
2685

    
2686
def _GenerateDiskTemplate(cfg, template_name,
2687
                          instance_name, primary_node,
2688
                          secondary_nodes, disk_sz, swap_sz,
2689
                          file_storage_dir, file_driver):
2690
  """Generate the entire disk layout for a given template type.
2691

2692
  """
2693
  #TODO: compute space requirements
2694

    
2695
  vgname = cfg.GetVGName()
2696
  if template_name == constants.DT_DISKLESS:
2697
    disks = []
2698
  elif template_name == constants.DT_PLAIN:
2699
    if len(secondary_nodes) != 0:
2700
      raise errors.ProgrammerError("Wrong template configuration")
2701

    
2702
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2703
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2704
                           logical_id=(vgname, names[0]),
2705
                           iv_name = "sda")
2706
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2707
                           logical_id=(vgname, names[1]),
2708
                           iv_name = "sdb")
2709
    disks = [sda_dev, sdb_dev]
2710
  elif template_name == constants.DT_DRBD8:
2711
    if len(secondary_nodes) != 1:
2712
      raise errors.ProgrammerError("Wrong template configuration")
2713
    remote_node = secondary_nodes[0]
2714
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2715
                                       ".sdb_data", ".sdb_meta"])
2716
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2717
                                         disk_sz, names[0:2], "sda")
2718
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2719
                                         swap_sz, names[2:4], "sdb")
2720
    disks = [drbd_sda_dev, drbd_sdb_dev]
2721
  elif template_name == constants.DT_FILE:
2722
    if len(secondary_nodes) != 0:
2723
      raise errors.ProgrammerError("Wrong template configuration")
2724

    
2725
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2726
                                iv_name="sda", logical_id=(file_driver,
2727
                                "%s/sda" % file_storage_dir))
2728
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2729
                                iv_name="sdb", logical_id=(file_driver,
2730
                                "%s/sdb" % file_storage_dir))
2731
    disks = [file_sda_dev, file_sdb_dev]
2732
  else:
2733
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2734
  return disks
2735

    
2736

    
2737
def _GetInstanceInfoText(instance):
2738
  """Compute that text that should be added to the disk's metadata.
2739

2740
  """
2741
  return "originstname+%s" % instance.name
2742

    
2743

    
2744
def _CreateDisks(cfg, instance):
2745
  """Create all disks for an instance.
2746

2747
  This abstracts away some work from AddInstance.
2748

2749
  Args:
2750
    instance: the instance object
2751

2752
  Returns:
2753
    True or False showing the success of the creation process
2754

2755
  """
2756
  info = _GetInstanceInfoText(instance)
2757

    
2758
  if instance.disk_template == constants.DT_FILE:
2759
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2760
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2761
                                              file_storage_dir)
2762

    
2763
    if not result:
2764
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2765
      return False
2766

    
2767
    if not result[0]:
2768
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2769
      return False
2770

    
2771
  for device in instance.disks:
2772
    logger.Info("creating volume %s for instance %s" %
2773
                (device.iv_name, instance.name))
2774
    #HARDCODE
2775
    for secondary_node in instance.secondary_nodes:
2776
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2777
                                        device, False, info):
2778
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2779
                     (device.iv_name, device, secondary_node))
2780
        return False
2781
    #HARDCODE
2782
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2783
                                    instance, device, info):
2784
      logger.Error("failed to create volume %s on primary!" %
2785
                   device.iv_name)
2786
      return False
2787

    
2788
  return True
2789

    
2790

    
2791
def _RemoveDisks(instance, cfg):
2792
  """Remove all disks for an instance.
2793

2794
  This abstracts away some work from `AddInstance()` and
2795
  `RemoveInstance()`. Note that in case some of the devices couldn't
2796
  be removed, the removal will continue with the other ones (compare
2797
  with `_CreateDisks()`).
2798

2799
  Args:
2800
    instance: the instance object
2801

2802
  Returns:
2803
    True or False showing the success of the removal proces
2804

2805
  """
2806
  logger.Info("removing block devices for instance %s" % instance.name)
2807

    
2808
  result = True
2809
  for device in instance.disks:
2810
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2811
      cfg.SetDiskID(disk, node)
2812
      if not rpc.call_blockdev_remove(node, disk):
2813
        logger.Error("could not remove block device %s on node %s,"
2814
                     " continuing anyway" %
2815
                     (device.iv_name, node))
2816
        result = False
2817

    
2818
  if instance.disk_template == constants.DT_FILE:
2819
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2820
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
2821
                                            file_storage_dir):
2822
      logger.Error("could not remove directory '%s'" % file_storage_dir)
2823
      result = False
2824

    
2825
  return result
2826

    
2827

    
2828
def _ComputeDiskSize(disk_template, disk_size, swap_size):
2829
  """Compute disk size requirements in the volume group
2830

2831
  This is currently hard-coded for the two-drive layout.
2832

2833
  """
2834
  # Required free disk space as a function of disk and swap space
2835
  req_size_dict = {
2836
    constants.DT_DISKLESS: None,
2837
    constants.DT_PLAIN: disk_size + swap_size,
2838
    # 256 MB are added for drbd metadata, 128MB for each drbd device
2839
    constants.DT_DRBD8: disk_size + swap_size + 256,
2840
    constants.DT_FILE: None,
2841
  }
2842

    
2843
  if disk_template not in req_size_dict:
2844
    raise errors.ProgrammerError("Disk template '%s' size requirement"
2845
                                 " is unknown" %  disk_template)
2846

    
2847
  return req_size_dict[disk_template]
2848

    
2849

    
2850
class LUCreateInstance(LogicalUnit):
2851
  """Create an instance.
2852

2853
  """
2854
  HPATH = "instance-add"
2855
  HTYPE = constants.HTYPE_INSTANCE
2856
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
2857
              "disk_template", "swap_size", "mode", "start", "vcpus",
2858
              "wait_for_sync", "ip_check", "mac"]
2859

    
2860
  def _RunAllocator(self):
2861
    """Run the allocator based on input opcode.
2862

2863
    """
2864
    disks = [{"size": self.op.disk_size, "mode": "w"},
2865
             {"size": self.op.swap_size, "mode": "w"}]
2866
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2867
             "bridge": self.op.bridge}]
2868
    ial = IAllocator(self.cfg, self.sstore,
2869
                     mode=constants.IALLOCATOR_MODE_ALLOC,
2870
                     name=self.op.instance_name,
2871
                     disk_template=self.op.disk_template,
2872
                     tags=[],
2873
                     os=self.op.os_type,
2874
                     vcpus=self.op.vcpus,
2875
                     mem_size=self.op.mem_size,
2876
                     disks=disks,
2877
                     nics=nics,
2878
                     )
2879

    
2880
    ial.Run(self.op.iallocator)
2881

    
2882
    if not ial.success:
2883
      raise errors.OpPrereqError("Can't compute nodes using"
2884
                                 " iallocator '%s': %s" % (self.op.iallocator,
2885
                                                           ial.info))
2886
    if len(ial.nodes) != ial.required_nodes:
2887
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2888
                                 " of nodes (%s), required %s" %
2889
                                 (len(ial.nodes), ial.required_nodes))
2890
    self.op.pnode = ial.nodes[0]
2891
    logger.ToStdout("Selected nodes for the instance: %s" %
2892
                    (", ".join(ial.nodes),))
2893
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2894
                (self.op.instance_name, self.op.iallocator, ial.nodes))
2895
    if ial.required_nodes == 2:
2896
      self.op.snode = ial.nodes[1]
2897

    
2898
  def BuildHooksEnv(self):
2899
    """Build hooks env.
2900

2901
    This runs on master, primary and secondary nodes of the instance.
2902

2903
    """
2904
    env = {
2905
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2906
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2907
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2908
      "INSTANCE_ADD_MODE": self.op.mode,
2909
      }
2910
    if self.op.mode == constants.INSTANCE_IMPORT:
2911
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2912
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2913
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2914

    
2915
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2916
      primary_node=self.op.pnode,
2917
      secondary_nodes=self.secondaries,
2918
      status=self.instance_status,
2919
      os_type=self.op.os_type,
2920
      memory=self.op.mem_size,
2921
      vcpus=self.op.vcpus,
2922
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2923
    ))
2924

    
2925
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2926
          self.secondaries)
2927
    return env, nl, nl
2928

    
2929

    
2930
  def CheckPrereq(self):
2931
    """Check prerequisites.
2932

2933
    """
2934
    # set optional parameters to none if they don't exist
2935
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
2936
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
2937
                 "vnc_bind_address"]:
2938
      if not hasattr(self.op, attr):
2939
        setattr(self.op, attr, None)
2940

    
2941
    if self.op.mode not in (constants.INSTANCE_CREATE,
2942
                            constants.INSTANCE_IMPORT):
2943
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2944
                                 self.op.mode)
2945

    
2946
    if (not self.cfg.GetVGName() and
2947
        self.op.disk_template not in constants.DTS_NOT_LVM):
2948
      raise errors.OpPrereqError("Cluster does not support lvm-based"
2949
                                 " instances")
2950

    
2951
    if self.op.mode == constants.INSTANCE_IMPORT:
2952
      src_node = getattr(self.op, "src_node", None)
2953
      src_path = getattr(self.op, "src_path", None)
2954
      if src_node is None or src_path is None:
2955
        raise errors.OpPrereqError("Importing an instance requires source"
2956
                                   " node and path options")
2957
      src_node_full = self.cfg.ExpandNodeName(src_node)
2958
      if src_node_full is None:
2959
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2960
      self.op.src_node = src_node = src_node_full
2961

    
2962
      if not os.path.isabs(src_path):
2963
        raise errors.OpPrereqError("The source path must be absolute")
2964

    
2965
      export_info = rpc.call_export_info(src_node, src_path)
2966

    
2967
      if not export_info:
2968
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2969

    
2970
      if not export_info.has_section(constants.INISECT_EXP):
2971
        raise errors.ProgrammerError("Corrupted export config")
2972

    
2973
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2974
      if (int(ei_version) != constants.EXPORT_VERSION):
2975
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2976
                                   (ei_version, constants.EXPORT_VERSION))
2977

    
2978
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2979
        raise errors.OpPrereqError("Can't import instance with more than"
2980
                                   " one data disk")
2981

    
2982
      # FIXME: are the old os-es, disk sizes, etc. useful?
2983
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2984
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2985
                                                         'disk0_dump'))
2986
      self.src_image = diskimage
2987
    else: # INSTANCE_CREATE
2988
      if getattr(self.op, "os_type", None) is None:
2989
        raise errors.OpPrereqError("No guest OS specified")
2990

    
2991
    #### instance parameters check
2992

    
2993
    # disk template and mirror node verification
2994
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2995
      raise errors.OpPrereqError("Invalid disk template name")
2996

    
2997
    # instance name verification
2998
    hostname1 = utils.HostInfo(self.op.instance_name)
2999

    
3000
    self.op.instance_name = instance_name = hostname1.name
3001
    instance_list = self.cfg.GetInstanceList()
3002
    if instance_name in instance_list:
3003
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3004
                                 instance_name)
3005

    
3006
    # ip validity checks
3007
    ip = getattr(self.op, "ip", None)
3008
    if ip is None or ip.lower() == "none":
3009
      inst_ip = None
3010
    elif ip.lower() == "auto":
3011
      inst_ip = hostname1.ip
3012
    else:
3013
      if not utils.IsValidIP(ip):
3014
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3015
                                   " like a valid IP" % ip)
3016
      inst_ip = ip
3017
    self.inst_ip = self.op.ip = inst_ip
3018

    
3019
    if self.op.start and not self.op.ip_check:
3020
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3021
                                 " adding an instance in start mode")
3022

    
3023
    if self.op.ip_check:
3024
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3025
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3026
                                   (hostname1.ip, instance_name))
3027

    
3028
    # MAC address verification
3029
    if self.op.mac != "auto":
3030
      if not utils.IsValidMac(self.op.mac.lower()):
3031
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3032
                                   self.op.mac)
3033

    
3034
    # bridge verification
3035
    bridge = getattr(self.op, "bridge", None)
3036
    if bridge is None:
3037
      self.op.bridge = self.cfg.GetDefBridge()
3038
    else:
3039
      self.op.bridge = bridge
3040

    
3041
    # boot order verification
3042
    if self.op.hvm_boot_order is not None:
3043
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3044
        raise errors.OpPrereqError("invalid boot order specified,"
3045
                                   " must be one or more of [acdn]")
3046
    # file storage checks
3047
    if (self.op.file_driver and
3048
        not self.op.file_driver in constants.FILE_DRIVER):
3049
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3050
                                 self.op.file_driver)
3051

    
3052
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3053
      raise errors.OpPrereqError("File storage directory not a relative"
3054
                                 " path")
3055
    #### allocator run
3056

    
3057
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3058
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3059
                                 " node must be given")
3060

    
3061
    if self.op.iallocator is not None:
3062
      self._RunAllocator()
3063

    
3064
    #### node related checks
3065

    
3066
    # check primary node
3067
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3068
    if pnode is None:
3069
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3070
                                 self.op.pnode)
3071
    self.op.pnode = pnode.name
3072
    self.pnode = pnode
3073
    self.secondaries = []
3074

    
3075
    # mirror node verification
3076
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3077
      if getattr(self.op, "snode", None) is None:
3078
        raise errors.OpPrereqError("The networked disk templates need"
3079
                                   " a mirror node")
3080

    
3081
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3082
      if snode_name is None:
3083
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3084
                                   self.op.snode)
3085
      elif snode_name == pnode.name:
3086
        raise errors.OpPrereqError("The secondary node cannot be"
3087
                                   " the primary node.")
3088
      self.secondaries.append(snode_name)
3089

    
3090
    req_size = _ComputeDiskSize(self.op.disk_template,
3091
                                self.op.disk_size, self.op.swap_size)
3092

    
3093
    # Check lv size requirements
3094
    if req_size is not None:
3095
      nodenames = [pnode.name] + self.secondaries
3096
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3097
      for node in nodenames:
3098
        info = nodeinfo.get(node, None)
3099
        if not info:
3100
          raise errors.OpPrereqError("Cannot get current information"
3101
                                     " from node '%s'" % node)
3102
        vg_free = info.get('vg_free', None)
3103
        if not isinstance(vg_free, int):
3104
          raise errors.OpPrereqError("Can't compute free disk space on"
3105
                                     " node %s" % node)
3106
        if req_size > info['vg_free']:
3107
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3108
                                     " %d MB available, %d MB required" %
3109
                                     (node, info['vg_free'], req_size))
3110

    
3111
    # os verification
3112
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3113
    if not os_obj:
3114
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3115
                                 " primary node"  % self.op.os_type)
3116

    
3117
    if self.op.kernel_path == constants.VALUE_NONE:
3118
      raise errors.OpPrereqError("Can't set instance kernel to none")
3119

    
3120

    
3121
    # bridge check on primary node
3122
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3123
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3124
                                 " destination node '%s'" %
3125
                                 (self.op.bridge, pnode.name))
3126

    
3127
    # memory check on primary node
3128
    if self.op.start:
3129
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3130
                           "creating instance %s" % self.op.instance_name,
3131
                           self.op.mem_size)
3132

    
3133
    # hvm_cdrom_image_path verification
3134
    if self.op.hvm_cdrom_image_path is not None:
3135
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3136
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3137
                                   " be an absolute path or None, not %s" %
3138
                                   self.op.hvm_cdrom_image_path)
3139
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3140
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3141
                                   " regular file or a symlink pointing to"
3142
                                   " an existing regular file, not %s" %
3143
                                   self.op.hvm_cdrom_image_path)
3144

    
3145
    # vnc_bind_address verification
3146
    if self.op.vnc_bind_address is not None:
3147
      if not utils.IsValidIP(self.op.vnc_bind_address):
3148
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3149
                                   " like a valid IP address" %
3150
                                   self.op.vnc_bind_address)
3151

    
3152
    if self.op.start:
3153
      self.instance_status = 'up'
3154
    else:
3155
      self.instance_status = 'down'
3156

    
3157
  def Exec(self, feedback_fn):
3158
    """Create and add the instance to the cluster.
3159

3160
    """
3161
    instance = self.op.instance_name
3162
    pnode_name = self.pnode.name
3163

    
3164
    if self.op.mac == "auto":
3165
      mac_address = self.cfg.GenerateMAC()
3166
    else:
3167
      mac_address = self.op.mac
3168

    
3169
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3170
    if self.inst_ip is not None:
3171
      nic.ip = self.inst_ip
3172

    
3173
    ht_kind = self.sstore.GetHypervisorType()
3174
    if ht_kind in constants.HTS_REQ_PORT:
3175
      network_port = self.cfg.AllocatePort()
3176
    else:
3177
      network_port = None
3178

    
3179
    if self.op.vnc_bind_address is None:
3180
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3181

    
3182
    # this is needed because os.path.join does not accept None arguments
3183
    if self.op.file_storage_dir is None:
3184
      string_file_storage_dir = ""
3185
    else:
3186
      string_file_storage_dir = self.op.file_storage_dir
3187

    
3188
    # build the full file storage dir path
3189
    file_storage_dir = os.path.normpath(os.path.join(
3190
                                        self.sstore.GetFileStorageDir(),
3191
                                        string_file_storage_dir, instance))
3192

    
3193

    
3194
    disks = _GenerateDiskTemplate(self.cfg,
3195
                                  self.op.disk_template,
3196
                                  instance, pnode_name,
3197
                                  self.secondaries, self.op.disk_size,
3198
                                  self.op.swap_size,
3199
                                  file_storage_dir,
3200
                                  self.op.file_driver)
3201

    
3202
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3203
                            primary_node=pnode_name,
3204
                            memory=self.op.mem_size,
3205
                            vcpus=self.op.vcpus,
3206
                            nics=[nic], disks=disks,
3207
                            disk_template=self.op.disk_template,
3208
                            status=self.instance_status,
3209
                            network_port=network_port,
3210
                            kernel_path=self.op.kernel_path,
3211
                            initrd_path=self.op.initrd_path,
3212
                            hvm_boot_order=self.op.hvm_boot_order,
3213
                            hvm_acpi=self.op.hvm_acpi,
3214
                            hvm_pae=self.op.hvm_pae,
3215
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3216
                            vnc_bind_address=self.op.vnc_bind_address,
3217
                            )
3218

    
3219
    feedback_fn("* creating instance disks...")
3220
    if not _CreateDisks(self.cfg, iobj):
3221
      _RemoveDisks(iobj, self.cfg)
3222
      raise errors.OpExecError("Device creation failed, reverting...")
3223

    
3224
    feedback_fn("adding instance %s to cluster config" % instance)
3225

    
3226
    self.cfg.AddInstance(iobj)
3227

    
3228
    if self.op.wait_for_sync:
3229
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3230
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3231
      # make sure the disks are not degraded (still sync-ing is ok)
3232
      time.sleep(15)
3233
      feedback_fn("* checking mirrors status")
3234
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3235
    else:
3236
      disk_abort = False
3237

    
3238
    if disk_abort:
3239
      _RemoveDisks(iobj, self.cfg)
3240
      self.cfg.RemoveInstance(iobj.name)
3241
      raise errors.OpExecError("There are some degraded disks for"
3242
                               " this instance")
3243

    
3244
    feedback_fn("creating os for instance %s on node %s" %
3245
                (instance, pnode_name))
3246

    
3247
    if iobj.disk_template != constants.DT_DISKLESS:
3248
      if self.op.mode == constants.INSTANCE_CREATE:
3249
        feedback_fn("* running the instance OS create scripts...")
3250
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3251
          raise errors.OpExecError("could not add os for instance %s"
3252
                                   " on node %s" %
3253
                                   (instance, pnode_name))
3254

    
3255
      elif self.op.mode == constants.INSTANCE_IMPORT:
3256
        feedback_fn("* running the instance OS import scripts...")
3257
        src_node = self.op.src_node
3258
        src_image = self.src_image
3259
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3260
                                                src_node, src_image):
3261
          raise errors.OpExecError("Could not import os for instance"
3262
                                   " %s on node %s" %
3263
                                   (instance, pnode_name))
3264
      else:
3265
        # also checked in the prereq part
3266
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3267
                                     % self.op.mode)
3268

    
3269
    if self.op.start:
3270
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3271
      feedback_fn("* starting instance...")
3272
      if not rpc.call_instance_start(pnode_name, iobj, None):
3273
        raise errors.OpExecError("Could not start instance")
3274

    
3275

    
3276
class LUConnectConsole(NoHooksLU):
3277
  """Connect to an instance's console.
3278

3279
  This is somewhat special in that it returns the command line that
3280
  you need to run on the master node in order to connect to the
3281
  console.
3282

3283
  """
3284
  _OP_REQP = ["instance_name"]
3285

    
3286
  def CheckPrereq(self):
3287
    """Check prerequisites.
3288

3289
    This checks that the instance is in the cluster.
3290

3291
    """
3292
    instance = self.cfg.GetInstanceInfo(
3293
      self.cfg.ExpandInstanceName(self.op.instance_name))
3294
    if instance is None:
3295
      raise errors.OpPrereqError("Instance '%s' not known" %
3296
                                 self.op.instance_name)
3297
    self.instance = instance
3298

    
3299
  def Exec(self, feedback_fn):
3300
    """Connect to the console of an instance
3301

3302
    """
3303
    instance = self.instance
3304
    node = instance.primary_node
3305

    
3306
    node_insts = rpc.call_instance_list([node])[node]
3307
    if node_insts is False:
3308
      raise errors.OpExecError("Can't connect to node %s." % node)
3309

    
3310
    if instance.name not in node_insts:
3311
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3312

    
3313
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3314

    
3315
    hyper = hypervisor.GetHypervisor()
3316
    console_cmd = hyper.GetShellCommandForConsole(instance)
3317

    
3318
    # build ssh cmdline
3319
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3320

    
3321

    
3322
class LUReplaceDisks(LogicalUnit):
3323
  """Replace the disks of an instance.
3324

3325
  """
3326
  HPATH = "mirrors-replace"
3327
  HTYPE = constants.HTYPE_INSTANCE
3328
  _OP_REQP = ["instance_name", "mode", "disks"]
3329

    
3330
  def _RunAllocator(self):
3331
    """Compute a new secondary node using an IAllocator.
3332

3333
    """
3334
    ial = IAllocator(self.cfg, self.sstore,
3335
                     mode=constants.IALLOCATOR_MODE_RELOC,
3336
                     name=self.op.instance_name,
3337
                     relocate_from=[self.sec_node])
3338

    
3339
    ial.Run(self.op.iallocator)
3340

    
3341
    if not ial.success:
3342
      raise errors.OpPrereqError("Can't compute nodes using"
3343
                                 " iallocator '%s': %s" % (self.op.iallocator,
3344
                                                           ial.info))
3345
    if len(ial.nodes) != ial.required_nodes:
3346
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3347
                                 " of nodes (%s), required %s" %
3348
                                 (len(ial.nodes), ial.required_nodes))
3349
    self.op.remote_node = ial.nodes[0]
3350
    logger.ToStdout("Selected new secondary for the instance: %s" %
3351
                    self.op.remote_node)
3352

    
3353
  def BuildHooksEnv(self):
3354
    """Build hooks env.
3355

3356
    This runs on the master, the primary and all the secondaries.
3357

3358
    """
3359
    env = {
3360
      "MODE": self.op.mode,
3361
      "NEW_SECONDARY": self.op.remote_node,
3362
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3363
      }
3364
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3365
    nl = [
3366
      self.sstore.GetMasterNode(),
3367
      self.instance.primary_node,
3368
      ]
3369
    if self.op.remote_node is not None:
3370
      nl.append(self.op.remote_node)
3371
    return env, nl, nl
3372

    
3373
  def CheckPrereq(self):
3374
    """Check prerequisites.
3375

3376
    This checks that the instance is in the cluster.
3377

3378
    """
3379
    if not hasattr(self.op, "remote_node"):
3380
      self.op.remote_node = None
3381

    
3382
    instance = self.cfg.GetInstanceInfo(
3383
      self.cfg.ExpandInstanceName(self.op.instance_name))
3384
    if instance is None:
3385
      raise errors.OpPrereqError("Instance '%s' not known" %
3386
                                 self.op.instance_name)
3387
    self.instance = instance
3388
    self.op.instance_name = instance.name
3389

    
3390
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3391
      raise errors.OpPrereqError("Instance's disk layout is not"
3392
                                 " network mirrored.")
3393

    
3394
    if len(instance.secondary_nodes) != 1:
3395
      raise errors.OpPrereqError("The instance has a strange layout,"
3396
                                 " expected one secondary but found %d" %
3397
                                 len(instance.secondary_nodes))
3398

    
3399
    self.sec_node = instance.secondary_nodes[0]
3400

    
3401
    ia_name = getattr(self.op, "iallocator", None)
3402
    if ia_name is not None:
3403
      if self.op.remote_node is not None:
3404
        raise errors.OpPrereqError("Give either the iallocator or the new"
3405
                                   " secondary, not both")
3406
      self.op.remote_node = self._RunAllocator()
3407

    
3408
    remote_node = self.op.remote_node
3409
    if remote_node is not None:
3410
      remote_node = self.cfg.ExpandNodeName(remote_node)
3411
      if remote_node is None:
3412
        raise errors.OpPrereqError("Node '%s' not known" %
3413
                                   self.op.remote_node)
3414
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3415
    else:
3416
      self.remote_node_info = None
3417
    if remote_node == instance.primary_node:
3418
      raise errors.OpPrereqError("The specified node is the primary node of"
3419
                                 " the instance.")
3420
    elif remote_node == self.sec_node:
3421
      if self.op.mode == constants.REPLACE_DISK_SEC:
3422
        # this is for DRBD8, where we can't execute the same mode of
3423
        # replacement as for drbd7 (no different port allocated)
3424
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3425
                                   " replacement")
3426
    if instance.disk_template == constants.DT_DRBD8:
3427
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3428
          remote_node is not None):
3429
        # switch to replace secondary mode
3430
        self.op.mode = constants.REPLACE_DISK_SEC
3431

    
3432
      if self.op.mode == constants.REPLACE_DISK_ALL:
3433
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3434
                                   " secondary disk replacement, not"
3435
                                   " both at once")
3436
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3437
        if remote_node is not None:
3438
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3439
                                     " the secondary while doing a primary"
3440
                                     " node disk replacement")
3441
        self.tgt_node = instance.primary_node
3442
        self.oth_node = instance.secondary_nodes[0]
3443
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3444
        self.new_node = remote_node # this can be None, in which case
3445
                                    # we don't change the secondary
3446
        self.tgt_node = instance.secondary_nodes[0]
3447
        self.oth_node = instance.primary_node
3448
      else:
3449
        raise errors.ProgrammerError("Unhandled disk replace mode")
3450

    
3451
    for name in self.op.disks:
3452
      if instance.FindDisk(name) is None:
3453
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3454
                                   (name, instance.name))
3455
    self.op.remote_node = remote_node
3456

    
3457
  def _ExecD8DiskOnly(self, feedback_fn):
3458
    """Replace a disk on the primary or secondary for dbrd8.
3459

3460
    The algorithm for replace is quite complicated:
3461
      - for each disk to be replaced:
3462
        - create new LVs on the target node with unique names
3463
        - detach old LVs from the drbd device
3464
        - rename old LVs to name_replaced.<time_t>
3465
        - rename new LVs to old LVs
3466
        - attach the new LVs (with the old names now) to the drbd device
3467
      - wait for sync across all devices
3468
      - for each modified disk:
3469
        - remove old LVs (which have the name name_replaces.<time_t>)
3470

3471
    Failures are not very well handled.
3472

3473
    """
3474
    steps_total = 6
3475
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3476
    instance = self.instance
3477
    iv_names = {}
3478
    vgname = self.cfg.GetVGName()
3479
    # start of work
3480
    cfg = self.cfg
3481
    tgt_node = self.tgt_node
3482
    oth_node = self.oth_node
3483

    
3484
    # Step: check device activation
3485
    self.proc.LogStep(1, steps_total, "check device existence")
3486
    info("checking volume groups")
3487
    my_vg = cfg.GetVGName()
3488
    results = rpc.call_vg_list([oth_node, tgt_node])
3489
    if not results:
3490
      raise errors.OpExecError("Can't list volume groups on the nodes")
3491
    for node in oth_node, tgt_node:
3492
      res = results.get(node, False)
3493
      if not res or my_vg not in res:
3494
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3495
                                 (my_vg, node))
3496
    for dev in instance.disks:
3497
      if not dev.iv_name in self.op.disks:
3498
        continue
3499
      for node in tgt_node, oth_node:
3500
        info("checking %s on %s" % (dev.iv_name, node))
3501
        cfg.SetDiskID(dev, node)
3502
        if not rpc.call_blockdev_find(node, dev):
3503
          raise errors.OpExecError("Can't find device %s on node %s" %
3504
                                   (dev.iv_name, node))
3505

    
3506
    # Step: check other node consistency
3507
    self.proc.LogStep(2, steps_total, "check peer consistency")
3508
    for dev in instance.disks:
3509
      if not dev.iv_name in self.op.disks:
3510
        continue
3511
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3512
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3513
                                   oth_node==instance.primary_node):
3514
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3515
                                 " to replace disks on this node (%s)" %
3516
                                 (oth_node, tgt_node))
3517

    
3518
    # Step: create new storage
3519
    self.proc.LogStep(3, steps_total, "allocate new storage")
3520
    for dev in instance.disks:
3521
      if not dev.iv_name in self.op.disks:
3522
        continue
3523
      size = dev.size
3524
      cfg.SetDiskID(dev, tgt_node)
3525
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3526
      names = _GenerateUniqueNames(cfg, lv_names)
3527
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3528
                             logical_id=(vgname, names[0]))
3529
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3530
                             logical_id=(vgname, names[1]))
3531
      new_lvs = [lv_data, lv_meta]
3532
      old_lvs = dev.children
3533
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3534
      info("creating new local storage on %s for %s" %
3535
           (tgt_node, dev.iv_name))
3536
      # since we *always* want to create this LV, we use the
3537
      # _Create...OnPrimary (which forces the creation), even if we
3538
      # are talking about the secondary node
3539
      for new_lv in new_lvs:
3540
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3541
                                        _GetInstanceInfoText(instance)):
3542
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3543
                                   " node '%s'" %
3544
                                   (new_lv.logical_id[1], tgt_node))
3545

    
3546
    # Step: for each lv, detach+rename*2+attach
3547
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3548
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3549
      info("detaching %s drbd from local storage" % dev.iv_name)
3550
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3551
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3552
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3553
      #dev.children = []
3554
      #cfg.Update(instance)
3555

    
3556
      # ok, we created the new LVs, so now we know we have the needed
3557
      # storage; as such, we proceed on the target node to rename
3558
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3559
      # using the assumption that logical_id == physical_id (which in
3560
      # turn is the unique_id on that node)
3561

    
3562
      # FIXME(iustin): use a better name for the replaced LVs
3563
      temp_suffix = int(time.time())
3564
      ren_fn = lambda d, suff: (d.physical_id[0],
3565
                                d.physical_id[1] + "_replaced-%s" % suff)
3566
      # build the rename list based on what LVs exist on the node
3567
      rlist = []
3568
      for to_ren in old_lvs:
3569
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3570
        if find_res is not None: # device exists
3571
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3572

    
3573
      info("renaming the old LVs on the target node")
3574
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3575
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3576
      # now we rename the new LVs to the old LVs
3577
      info("renaming the new LVs on the target node")
3578
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3579
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3580
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3581

    
3582
      for old, new in zip(old_lvs, new_lvs):
3583
        new.logical_id = old.logical_id
3584
        cfg.SetDiskID(new, tgt_node)
3585

    
3586
      for disk in old_lvs:
3587
        disk.logical_id = ren_fn(disk, temp_suffix)
3588
        cfg.SetDiskID(disk, tgt_node)
3589

    
3590
      # now that the new lvs have the old name, we can add them to the device
3591
      info("adding new mirror component on %s" % tgt_node)
3592
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3593
        for new_lv in new_lvs:
3594
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3595
            warning("Can't rollback device %s", hint="manually cleanup unused"
3596
                    " logical volumes")
3597
        raise errors.OpExecError("Can't add local storage to drbd")
3598

    
3599
      dev.children = new_lvs
3600
      cfg.Update(instance)
3601

    
3602
    # Step: wait for sync
3603

    
3604
    # this can fail as the old devices are degraded and _WaitForSync
3605
    # does a combined result over all disks, so we don't check its
3606
    # return value
3607
    self.proc.LogStep(5, steps_total, "sync devices")
3608
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3609

    
3610
    # so check manually all the devices
3611
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3612
      cfg.SetDiskID(dev, instance.primary_node)
3613
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3614
      if is_degr:
3615
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3616

    
3617
    # Step: remove old storage
3618
    self.proc.LogStep(6, steps_total, "removing old storage")
3619
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3620
      info("remove logical volumes for %s" % name)
3621
      for lv in old_lvs:
3622
        cfg.SetDiskID(lv, tgt_node)
3623
        if not rpc.call_blockdev_remove(tgt_node, lv):
3624
          warning("Can't remove old LV", hint="manually remove unused LVs")
3625
          continue
3626

    
3627
  def _ExecD8Secondary(self, feedback_fn):
3628
    """Replace the secondary node for drbd8.
3629

3630
    The algorithm for replace is quite complicated:
3631
      - for all disks of the instance:
3632
        - create new LVs on the new node with same names
3633
        - shutdown the drbd device on the old secondary
3634
        - disconnect the drbd network on the primary
3635
        - create the drbd device on the new secondary
3636
        - network attach the drbd on the primary, using an artifice:
3637
          the drbd code for Attach() will connect to the network if it
3638
          finds a device which is connected to the good local disks but
3639
          not network enabled
3640
      - wait for sync across all devices
3641
      - remove all disks from the old secondary
3642

3643
    Failures are not very well handled.
3644

3645
    """
3646
    steps_total = 6
3647
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3648
    instance = self.instance
3649
    iv_names = {}
3650
    vgname = self.cfg.GetVGName()
3651
    # start of work
3652
    cfg = self.cfg
3653
    old_node = self.tgt_node
3654
    new_node = self.new_node
3655
    pri_node = instance.primary_node
3656

    
3657
    # Step: check device activation
3658
    self.proc.LogStep(1, steps_total, "check device existence")
3659
    info("checking volume groups")
3660
    my_vg = cfg.GetVGName()
3661
    results = rpc.call_vg_list([pri_node, new_node])
3662
    if not results:
3663
      raise errors.OpExecError("Can't list volume groups on the nodes")
3664
    for node in pri_node, new_node:
3665
      res = results.get(node, False)
3666
      if not res or my_vg not in res:
3667
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3668
                                 (my_vg, node))
3669
    for dev in instance.disks:
3670
      if not dev.iv_name in self.op.disks:
3671
        continue
3672
      info("checking %s on %s" % (dev.iv_name, pri_node))
3673
      cfg.SetDiskID(dev, pri_node)
3674
      if not rpc.call_blockdev_find(pri_node, dev):
3675
        raise errors.OpExecError("Can't find device %s on node %s" %
3676
                                 (dev.iv_name, pri_node))
3677

    
3678
    # Step: check other node consistency
3679
    self.proc.LogStep(2, steps_total, "check peer consistency")
3680
    for dev in instance.disks:
3681
      if not dev.iv_name in self.op.disks:
3682
        continue
3683
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3684
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3685
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3686
                                 " unsafe to replace the secondary" %
3687
                                 pri_node)
3688

    
3689
    # Step: create new storage
3690
    self.proc.LogStep(3, steps_total, "allocate new storage")
3691
    for dev in instance.disks:
3692
      size = dev.size
3693
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3694
      # since we *always* want to create this LV, we use the
3695
      # _Create...OnPrimary (which forces the creation), even if we
3696
      # are talking about the secondary node
3697
      for new_lv in dev.children:
3698
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3699
                                        _GetInstanceInfoText(instance)):
3700
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3701
                                   " node '%s'" %
3702
                                   (new_lv.logical_id[1], new_node))
3703

    
3704
      iv_names[dev.iv_name] = (dev, dev.children)
3705

    
3706
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3707
    for dev in instance.disks:
3708
      size = dev.size
3709
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3710
      # create new devices on new_node
3711
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3712
                              logical_id=(pri_node, new_node,
3713
                                          dev.logical_id[2]),
3714
                              children=dev.children)
3715
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3716
                                        new_drbd, False,
3717
                                      _GetInstanceInfoText(instance)):
3718
        raise errors.OpExecError("Failed to create new DRBD on"
3719
                                 " node '%s'" % new_node)
3720

    
3721
    for dev in instance.disks:
3722
      # we have new devices, shutdown the drbd on the old secondary
3723
      info("shutting down drbd for %s on old node" % dev.iv_name)
3724
      cfg.SetDiskID(dev, old_node)
3725
      if not rpc.call_blockdev_shutdown(old_node, dev):
3726
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3727
                hint="Please cleanup this device manually as soon as possible")
3728

    
3729
    info("detaching primary drbds from the network (=> standalone)")
3730
    done = 0
3731
    for dev in instance.disks:
3732
      cfg.SetDiskID(dev, pri_node)
3733
      # set the physical (unique in bdev terms) id to None, meaning
3734
      # detach from network
3735
      dev.physical_id = (None,) * len(dev.physical_id)
3736
      # and 'find' the device, which will 'fix' it to match the
3737
      # standalone state
3738
      if rpc.call_blockdev_find(pri_node, dev):
3739
        done += 1
3740
      else:
3741
        warning("Failed to detach drbd %s from network, unusual case" %
3742
                dev.iv_name)
3743

    
3744
    if not done:
3745
      # no detaches succeeded (very unlikely)
3746
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3747

    
3748
    # if we managed to detach at least one, we update all the disks of
3749
    # the instance to point to the new secondary
3750
    info("updating instance configuration")
3751
    for dev in instance.disks:
3752
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3753
      cfg.SetDiskID(dev, pri_node)
3754
    cfg.Update(instance)
3755

    
3756
    # and now perform the drbd attach
3757
    info("attaching primary drbds to new secondary (standalone => connected)")
3758
    failures = []
3759
    for dev in instance.disks:
3760
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3761
      # since the attach is smart, it's enough to 'find' the device,
3762
      # it will automatically activate the network, if the physical_id
3763
      # is correct
3764
      cfg.SetDiskID(dev, pri_node)
3765
      if not rpc.call_blockdev_find(pri_node, dev):
3766
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3767
                "please do a gnt-instance info to see the status of disks")
3768

    
3769
    # this can fail as the old devices are degraded and _WaitForSync
3770
    # does a combined result over all disks, so we don't check its
3771
    # return value
3772
    self.proc.LogStep(5, steps_total, "sync devices")
3773
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3774

    
3775
    # so check manually all the devices
3776
    for name, (dev, old_lvs) in iv_names.iteritems():
3777
      cfg.SetDiskID(dev, pri_node)
3778
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3779
      if is_degr:
3780
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3781

    
3782
    self.proc.LogStep(6, steps_total, "removing old storage")
3783
    for name, (dev, old_lvs) in iv_names.iteritems():
3784
      info("remove logical volumes for %s" % name)
3785
      for lv in old_lvs:
3786
        cfg.SetDiskID(lv, old_node)
3787
        if not rpc.call_blockdev_remove(old_node, lv):
3788
          warning("Can't remove LV on old secondary",
3789
                  hint="Cleanup stale volumes by hand")
3790

    
3791
  def Exec(self, feedback_fn):
3792
    """Execute disk replacement.
3793

3794
    This dispatches the disk replacement to the appropriate handler.
3795

3796
    """
3797
    instance = self.instance
3798

    
3799
    # Activate the instance disks if we're replacing them on a down instance
3800
    if instance.status == "down":
3801
      op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3802
      self.proc.ChainOpCode(op)
3803

    
3804
    if instance.disk_template == constants.DT_DRBD8:
3805
      if self.op.remote_node is None:
3806
        fn = self._ExecD8DiskOnly
3807
      else:
3808
        fn = self._ExecD8Secondary
3809
    else:
3810
      raise errors.ProgrammerError("Unhandled disk replacement case")
3811

    
3812
    ret = fn(feedback_fn)
3813

    
3814
    # Deactivate the instance disks if we're replacing them on a down instance
3815
    if instance.status == "down":
3816
      op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3817
      self.proc.ChainOpCode(op)
3818

    
3819
    return ret
3820

    
3821

    
3822
class LUGrowDisk(LogicalUnit):
3823
  """Grow a disk of an instance.
3824

3825
  """
3826
  HPATH = "disk-grow"
3827
  HTYPE = constants.HTYPE_INSTANCE
3828
  _OP_REQP = ["instance_name", "disk", "amount"]
3829

    
3830
  def BuildHooksEnv(self):
3831
    """Build hooks env.
3832

3833
    This runs on the master, the primary and all the secondaries.
3834

3835
    """
3836
    env = {
3837
      "DISK": self.op.disk,
3838
      "AMOUNT": self.op.amount,
3839
      }
3840
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3841
    nl = [
3842
      self.sstore.GetMasterNode(),
3843
      self.instance.primary_node,
3844
      ]
3845
    return env, nl, nl
3846

    
3847
  def CheckPrereq(self):
3848
    """Check prerequisites.
3849

3850
    This checks that the instance is in the cluster.
3851

3852
    """
3853
    instance = self.cfg.GetInstanceInfo(
3854
      self.cfg.ExpandInstanceName(self.op.instance_name))
3855
    if instance is None:
3856
      raise errors.OpPrereqError("Instance '%s' not known" %
3857
                                 self.op.instance_name)
3858
    self.instance = instance
3859
    self.op.instance_name = instance.name
3860

    
3861
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3862
      raise errors.OpPrereqError("Instance's disk layout does not support"
3863
                                 " growing.")
3864

    
3865
    if instance.FindDisk(self.op.disk) is None:
3866
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3867
                                 (self.op.disk, instance.name))
3868

    
3869
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3870
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3871
    for node in nodenames:
3872
      info = nodeinfo.get(node, None)
3873
      if not info:
3874
        raise errors.OpPrereqError("Cannot get current information"
3875
                                   " from node '%s'" % node)
3876
      vg_free = info.get('vg_free', None)
3877
      if not isinstance(vg_free, int):
3878
        raise errors.OpPrereqError("Can't compute free disk space on"
3879
                                   " node %s" % node)
3880
      if self.op.amount > info['vg_free']:
3881
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
3882
                                   " %d MiB available, %d MiB required" %
3883
                                   (node, info['vg_free'], self.op.amount))
3884

    
3885
  def Exec(self, feedback_fn):
3886
    """Execute disk grow.
3887

3888
    """
3889
    instance = self.instance
3890
    disk = instance.FindDisk(self.op.disk)
3891
    for node in (instance.secondary_nodes + (instance.primary_node,)):
3892
      self.cfg.SetDiskID(disk, node)
3893
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3894
      if not result or not isinstance(result, tuple) or len(result) != 2:
3895
        raise errors.OpExecError("grow request failed to node %s" % node)
3896
      elif not result[0]:
3897
        raise errors.OpExecError("grow request failed to node %s: %s" %
3898
                                 (node, result[1]))
3899
    disk.RecordGrow(self.op.amount)
3900
    self.cfg.Update(instance)
3901
    return
3902

    
3903

    
3904
class LUQueryInstanceData(NoHooksLU):
3905
  """Query runtime instance data.
3906

3907
  """
3908
  _OP_REQP = ["instances"]
3909

    
3910
  def CheckPrereq(self):
3911
    """Check prerequisites.
3912

3913
    This only checks the optional instance list against the existing names.
3914

3915
    """
3916
    if not isinstance(self.op.instances, list):
3917
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3918
    if self.op.instances:
3919
      self.wanted_instances = []
3920
      names = self.op.instances
3921
      for name in names:
3922
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3923
        if instance is None:
3924
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3925
        self.wanted_instances.append(instance)
3926
    else:
3927
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3928
                               in self.cfg.GetInstanceList()]
3929
    return
3930

    
3931

    
3932
  def _ComputeDiskStatus(self, instance, snode, dev):
3933
    """Compute block device status.
3934

3935
    """
3936
    self.cfg.SetDiskID(dev, instance.primary_node)
3937
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3938
    if dev.dev_type in constants.LDS_DRBD:
3939
      # we change the snode then (otherwise we use the one passed in)
3940
      if dev.logical_id[0] == instance.primary_node:
3941
        snode = dev.logical_id[1]
3942
      else:
3943
        snode = dev.logical_id[0]
3944

    
3945
    if snode:
3946
      self.cfg.SetDiskID(dev, snode)
3947
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3948
    else:
3949
      dev_sstatus = None
3950

    
3951
    if dev.children:
3952
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3953
                      for child in dev.children]
3954
    else:
3955
      dev_children = []
3956

    
3957
    data = {
3958
      "iv_name": dev.iv_name,
3959
      "dev_type": dev.dev_type,
3960
      "logical_id": dev.logical_id,
3961
      "physical_id": dev.physical_id,
3962
      "pstatus": dev_pstatus,
3963
      "sstatus": dev_sstatus,
3964
      "children": dev_children,
3965
      }
3966

    
3967
    return data
3968

    
3969
  def Exec(self, feedback_fn):
3970
    """Gather and return data"""
3971
    result = {}
3972
    for instance in self.wanted_instances:
3973
      remote_info = rpc.call_instance_info(instance.primary_node,
3974
                                                instance.name)
3975
      if remote_info and "state" in remote_info:
3976
        remote_state = "up"
3977
      else:
3978
        remote_state = "down"
3979
      if instance.status == "down":
3980
        config_state = "down"
3981
      else:
3982
        config_state = "up"
3983

    
3984
      disks = [self._ComputeDiskStatus(instance, None, device)
3985
               for device in instance.disks]
3986

    
3987
      idict = {
3988
        "name": instance.name,
3989
        "config_state": config_state,
3990
        "run_state": remote_state,
3991
        "pnode": instance.primary_node,
3992
        "snodes": instance.secondary_nodes,
3993
        "os": instance.os,
3994
        "memory": instance.memory,
3995
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3996
        "disks": disks,
3997
        "vcpus": instance.vcpus,
3998
        }
3999

    
4000
      htkind = self.sstore.GetHypervisorType()
4001
      if htkind == constants.HT_XEN_PVM30:
4002
        idict["kernel_path"] = instance.kernel_path
4003
        idict["initrd_path"] = instance.initrd_path
4004

    
4005
      if htkind == constants.HT_XEN_HVM31:
4006
        idict["hvm_boot_order"] = instance.hvm_boot_order
4007
        idict["hvm_acpi"] = instance.hvm_acpi
4008
        idict["hvm_pae"] = instance.hvm_pae
4009
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4010

    
4011
      if htkind in constants.HTS_REQ_PORT:
4012
        idict["vnc_bind_address"] = instance.vnc_bind_address
4013
        idict["network_port"] = instance.network_port
4014

    
4015
      result[instance.name] = idict
4016

    
4017
    return result
4018

    
4019

    
4020
class LUSetInstanceParams(LogicalUnit):
4021
  """Modifies an instances's parameters.
4022

4023
  """
4024
  HPATH = "instance-modify"
4025
  HTYPE = constants.HTYPE_INSTANCE
4026
  _OP_REQP = ["instance_name"]
4027

    
4028
  def BuildHooksEnv(self):
4029
    """Build hooks env.
4030

4031
    This runs on the master, primary and secondaries.
4032

4033
    """
4034
    args = dict()
4035
    if self.mem:
4036
      args['memory'] = self.mem
4037
    if self.vcpus:
4038
      args['vcpus'] = self.vcpus
4039
    if self.do_ip or self.do_bridge or self.mac:
4040
      if self.do_ip:
4041
        ip = self.ip
4042
      else:
4043
        ip = self.instance.nics[0].ip
4044
      if self.bridge:
4045
        bridge = self.bridge
4046
      else:
4047
        bridge = self.instance.nics[0].bridge
4048
      if self.mac:
4049
        mac = self.mac
4050
      else:
4051
        mac = self.instance.nics[0].mac
4052
      args['nics'] = [(ip, bridge, mac)]
4053
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4054
    nl = [self.sstore.GetMasterNode(),
4055
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4056
    return env, nl, nl
4057

    
4058
  def CheckPrereq(self):
4059
    """Check prerequisites.
4060

4061
    This only checks the instance list against the existing names.
4062

4063
    """
4064
    self.mem = getattr(self.op, "mem", None)
4065
    self.vcpus = getattr(self.op, "vcpus", None)
4066
    self.ip = getattr(self.op, "ip", None)
4067
    self.mac = getattr(self.op, "mac", None)
4068
    self.bridge = getattr(self.op, "bridge", None)
4069
    self.kernel_path = getattr(self.op, "kernel_path", None)
4070
    self.initrd_path = getattr(self.op, "initrd_path", None)
4071
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4072
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4073
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4074
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4075
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4076
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4077
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4078
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4079
                 self.vnc_bind_address]
4080
    if all_parms.count(None) == len(all_parms):
4081
      raise errors.OpPrereqError("No changes submitted")
4082
    if self.mem is not None:
4083
      try:
4084
        self.mem = int(self.mem)
4085
      except ValueError, err:
4086
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4087
    if self.vcpus is not None:
4088
      try:
4089
        self.vcpus = int(self.vcpus)
4090
      except ValueError, err:
4091
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4092
    if self.ip is not None:
4093
      self.do_ip = True
4094
      if self.ip.lower() == "none":
4095
        self.ip = None
4096
      else:
4097
        if not utils.IsValidIP(self.ip):
4098
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4099
    else:
4100
      self.do_ip = False
4101
    self.do_bridge = (self.bridge is not None)
4102
    if self.mac is not None:
4103
      if self.cfg.IsMacInUse(self.mac):
4104
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4105
                                   self.mac)
4106
      if not utils.IsValidMac(self.mac):
4107
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4108

    
4109
    if self.kernel_path is not None:
4110
      self.do_kernel_path = True
4111
      if self.kernel_path == constants.VALUE_NONE:
4112
        raise errors.OpPrereqError("Can't set instance to no kernel")
4113

    
4114
      if self.kernel_path != constants.VALUE_DEFAULT:
4115
        if not os.path.isabs(self.kernel_path):
4116
          raise errors.OpPrereqError("The kernel path must be an absolute"
4117
                                    " filename")
4118
    else:
4119
      self.do_kernel_path = False
4120

    
4121
    if self.initrd_path is not None:
4122
      self.do_initrd_path = True
4123
      if self.initrd_path not in (constants.VALUE_NONE,
4124
                                  constants.VALUE_DEFAULT):
4125
        if not os.path.isabs(self.initrd_path):
4126
          raise errors.OpPrereqError("The initrd path must be an absolute"
4127
                                    " filename")
4128
    else:
4129
      self.do_initrd_path = False
4130

    
4131
    # boot order verification
4132
    if self.hvm_boot_order is not None:
4133
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4134
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4135
          raise errors.OpPrereqError("invalid boot order specified,"
4136
                                     " must be one or more of [acdn]"
4137
                                     " or 'default'")
4138

    
4139
    # hvm_cdrom_image_path verification
4140
    if self.op.hvm_cdrom_image_path is not None:
4141
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
4142
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4143
                                   " be an absolute path or None, not %s" %
4144
                                   self.op.hvm_cdrom_image_path)
4145
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
4146
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4147
                                   " regular file or a symlink pointing to"
4148
                                   " an existing regular file, not %s" %
4149
                                   self.op.hvm_cdrom_image_path)
4150

    
4151
    # vnc_bind_address verification
4152
    if self.op.vnc_bind_address is not None:
4153
      if not utils.IsValidIP(self.op.vnc_bind_address):
4154
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4155
                                   " like a valid IP address" %
4156
                                   self.op.vnc_bind_address)
4157

    
4158
    instance = self.cfg.GetInstanceInfo(
4159
      self.cfg.ExpandInstanceName(self.op.instance_name))
4160
    if instance is None:
4161
      raise errors.OpPrereqError("No such instance name '%s'" %
4162
                                 self.op.instance_name)
4163
    self.op.instance_name = instance.name
4164
    self.instance = instance
4165
    return
4166

    
4167
  def Exec(self, feedback_fn):
4168
    """Modifies an instance.
4169

4170
    All parameters take effect only at the next restart of the instance.
4171
    """
4172
    result = []
4173
    instance = self.instance
4174
    if self.mem:
4175
      instance.memory = self.mem
4176
      result.append(("mem", self.mem))
4177
    if self.vcpus:
4178
      instance.vcpus = self.vcpus
4179
      result.append(("vcpus",  self.vcpus))
4180
    if self.do_ip:
4181
      instance.nics[0].ip = self.ip
4182
      result.append(("ip", self.ip))
4183
    if self.bridge:
4184
      instance.nics[0].bridge = self.bridge
4185
      result.append(("bridge", self.bridge))
4186
    if self.mac:
4187
      instance.nics[0].mac = self.mac
4188
      result.append(("mac", self.mac))
4189
    if self.do_kernel_path:
4190
      instance.kernel_path = self.kernel_path
4191
      result.append(("kernel_path", self.kernel_path))
4192
    if self.do_initrd_path:
4193
      instance.initrd_path = self.initrd_path
4194
      result.append(("initrd_path", self.initrd_path))
4195
    if self.hvm_boot_order:
4196
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4197
        instance.hvm_boot_order = None
4198
      else:
4199
        instance.hvm_boot_order = self.hvm_boot_order
4200
      result.append(("hvm_boot_order", self.hvm_boot_order))
4201
    if self.hvm_acpi:
4202
      instance.hvm_acpi = self.hvm_acpi
4203
      result.append(("hvm_acpi", self.hvm_acpi))
4204
    if self.hvm_pae:
4205
      instance.hvm_pae = self.hvm_pae
4206
      result.append(("hvm_pae", self.hvm_pae))
4207
    if self.hvm_cdrom_image_path:
4208
      instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4209
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4210
    if self.vnc_bind_address:
4211
      instance.vnc_bind_address = self.vnc_bind_address
4212
      result.append(("vnc_bind_address", self.vnc_bind_address))
4213

    
4214
    self.cfg.AddInstance(instance)
4215

    
4216
    return result
4217

    
4218

    
4219
class LUQueryExports(NoHooksLU):
4220
  """Query the exports list
4221

4222
  """
4223
  _OP_REQP = []
4224

    
4225
  def CheckPrereq(self):
4226
    """Check that the nodelist contains only existing nodes.
4227

4228
    """
4229
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4230

    
4231
  def Exec(self, feedback_fn):
4232
    """Compute the list of all the exported system images.
4233

4234
    Returns:
4235
      a dictionary with the structure node->(export-list)
4236
      where export-list is a list of the instances exported on
4237
      that node.
4238

4239
    """
4240
    return rpc.call_export_list(self.nodes)
4241

    
4242

    
4243
class LUExportInstance(LogicalUnit):
4244
  """Export an instance to an image in the cluster.
4245

4246
  """
4247
  HPATH = "instance-export"
4248
  HTYPE = constants.HTYPE_INSTANCE
4249
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4250

    
4251
  def BuildHooksEnv(self):
4252
    """Build hooks env.
4253

4254
    This will run on the master, primary node and target node.
4255

4256
    """
4257
    env = {
4258
      "EXPORT_NODE": self.op.target_node,
4259
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4260
      }
4261
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4262
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4263
          self.op.target_node]
4264
    return env, nl, nl
4265

    
4266
  def CheckPrereq(self):
4267
    """Check prerequisites.
4268

4269
    This checks that the instance and node names are valid.
4270

4271
    """
4272
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4273
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4274
    if self.instance is None:
4275
      raise errors.OpPrereqError("Instance '%s' not found" %
4276
                                 self.op.instance_name)
4277

    
4278
    # node verification
4279
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4280
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4281

    
4282
    if self.dst_node is None:
4283
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4284
                                 self.op.target_node)
4285
    self.op.target_node = self.dst_node.name
4286

    
4287
    # instance disk type verification
4288
    for disk in self.instance.disks:
4289
      if disk.dev_type == constants.LD_FILE:
4290
        raise errors.OpPrereqError("Export not supported for instances with"
4291
                                   " file-based disks")
4292

    
4293
  def Exec(self, feedback_fn):
4294
    """Export an instance to an image in the cluster.
4295

4296
    """
4297
    instance = self.instance
4298
    dst_node = self.dst_node
4299
    src_node = instance.primary_node
4300
    if self.op.shutdown:
4301
      # shutdown the instance, but not the disks
4302
      if not rpc.call_instance_shutdown(src_node, instance):
4303
         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4304
                                  (instance.name, src_node))
4305

    
4306
    vgname = self.cfg.GetVGName()
4307

    
4308
    snap_disks = []
4309

    
4310
    try:
4311
      for disk in instance.disks:
4312
        if disk.iv_name == "sda":
4313
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4314
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4315

    
4316
          if not new_dev_name:
4317
            logger.Error("could not snapshot block device %s on node %s" %
4318
                         (disk.logical_id[1], src_node))
4319
          else:
4320
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4321
                                      logical_id=(vgname, new_dev_name),
4322
                                      physical_id=(vgname, new_dev_name),
4323
                                      iv_name=disk.iv_name)
4324
            snap_disks.append(new_dev)
4325

    
4326
    finally:
4327
      if self.op.shutdown and instance.status == "up":
4328
        if not rpc.call_instance_start(src_node, instance, None):
4329
          _ShutdownInstanceDisks(instance, self.cfg)
4330
          raise errors.OpExecError("Could not start instance")
4331

    
4332
    # TODO: check for size
4333

    
4334
    for dev in snap_disks:
4335
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4336
        logger.Error("could not export block device %s from node %s to node %s"
4337
                     % (dev.logical_id[1], src_node, dst_node.name))
4338
      if not rpc.call_blockdev_remove(src_node, dev):
4339
        logger.Error("could not remove snapshot block device %s from node %s" %
4340
                     (dev.logical_id[1], src_node))
4341

    
4342
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4343
      logger.Error("could not finalize export for instance %s on node %s" %
4344
                   (instance.name, dst_node.name))
4345

    
4346
    nodelist = self.cfg.GetNodeList()
4347
    nodelist.remove(dst_node.name)
4348

    
4349
    # on one-node clusters nodelist will be empty after the removal
4350
    # if we proceed the backup would be removed because OpQueryExports
4351
    # substitutes an empty list with the full cluster node list.
4352
    if nodelist:
4353
      op = opcodes.OpQueryExports(nodes=nodelist)
4354
      exportlist = self.proc.ChainOpCode(op)
4355
      for node in exportlist:
4356
        if instance.name in exportlist[node]:
4357
          if not rpc.call_export_remove(node, instance.name):
4358
            logger.Error("could not remove older export for instance %s"
4359
                         " on node %s" % (instance.name, node))
4360

    
4361

    
4362
class LURemoveExport(NoHooksLU):
4363
  """Remove exports related to the named instance.
4364

4365
  """
4366
  _OP_REQP = ["instance_name"]
4367

    
4368
  def CheckPrereq(self):
4369
    """Check prerequisites.
4370
    """
4371
    pass
4372

    
4373
  def Exec(self, feedback_fn):
4374
    """Remove any export.
4375

4376
    """
4377
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4378
    # If the instance was not found we'll try with the name that was passed in.
4379
    # This will only work if it was an FQDN, though.
4380
    fqdn_warn = False
4381
    if not instance_name:
4382
      fqdn_warn = True
4383
      instance_name = self.op.instance_name
4384

    
4385
    op = opcodes.OpQueryExports(nodes=[])
4386
    exportlist = self.proc.ChainOpCode(op)
4387
    found = False
4388
    for node in exportlist:
4389
      if instance_name in exportlist[node]:
4390
        found = True
4391
        if not rpc.call_export_remove(node, instance_name):
4392
          logger.Error("could not remove export for instance %s"
4393
                       " on node %s" % (instance_name, node))
4394

    
4395
    if fqdn_warn and not found:
4396
      feedback_fn("Export not found. If trying to remove an export belonging"
4397
                  " to a deleted instance please use its Fully Qualified"
4398
                  " Domain Name.")
4399

    
4400

    
4401
class TagsLU(NoHooksLU):
4402
  """Generic tags LU.
4403

4404
  This is an abstract class which is the parent of all the other tags LUs.
4405

4406
  """
4407
  def CheckPrereq(self):
4408
    """Check prerequisites.
4409

4410
    """
4411
    if self.op.kind == constants.TAG_CLUSTER:
4412
      self.target = self.cfg.GetClusterInfo()
4413
    elif self.op.kind == constants.TAG_NODE:
4414
      name = self.cfg.ExpandNodeName(self.op.name)
4415
      if name is None:
4416
        raise errors.OpPrereqError("Invalid node name (%s)" %
4417
                                   (self.op.name,))
4418
      self.op.name = name
4419
      self.target = self.cfg.GetNodeInfo(name)
4420
    elif self.op.kind == constants.TAG_INSTANCE:
4421
      name = self.cfg.ExpandInstanceName(self.op.name)
4422
      if name is None:
4423
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4424
                                   (self.op.name,))
4425
      self.op.name = name
4426
      self.target = self.cfg.GetInstanceInfo(name)
4427
    else:
4428
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4429
                                 str(self.op.kind))
4430

    
4431

    
4432
class LUGetTags(TagsLU):
4433
  """Returns the tags of a given object.
4434

4435
  """
4436
  _OP_REQP = ["kind", "name"]
4437

    
4438
  def Exec(self, feedback_fn):
4439
    """Returns the tag list.
4440

4441
    """
4442
    return self.target.GetTags()
4443

    
4444

    
4445
class LUSearchTags(NoHooksLU):
4446
  """Searches the tags for a given pattern.
4447

4448
  """
4449
  _OP_REQP = ["pattern"]
4450

    
4451
  def CheckPrereq(self):
4452
    """Check prerequisites.
4453

4454
    This checks the pattern passed for validity by compiling it.
4455

4456
    """
4457
    try:
4458
      self.re = re.compile(self.op.pattern)
4459
    except re.error, err:
4460
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4461
                                 (self.op.pattern, err))
4462

    
4463
  def Exec(self, feedback_fn):
4464
    """Returns the tag list.
4465

4466
    """
4467
    cfg = self.cfg
4468
    tgts = [("/cluster", cfg.GetClusterInfo())]
4469
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4470
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4471
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4472
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4473
    results = []
4474
    for path, target in tgts:
4475
      for tag in target.GetTags():
4476
        if self.re.search(tag):
4477
          results.append((path, tag))
4478
    return results
4479

    
4480

    
4481
class LUAddTags(TagsLU):
4482
  """Sets a tag on a given object.
4483

4484
  """
4485
  _OP_REQP = ["kind", "name", "tags"]
4486

    
4487
  def CheckPrereq(self):
4488
    """Check prerequisites.
4489

4490
    This checks the type and length of the tag name and value.
4491

4492
    """
4493
    TagsLU.CheckPrereq(self)
4494
    for tag in self.op.tags:
4495
      objects.TaggableObject.ValidateTag(tag)
4496

    
4497
  def Exec(self, feedback_fn):
4498
    """Sets the tag.
4499

4500
    """
4501
    try:
4502
      for tag in self.op.tags:
4503
        self.target.AddTag(tag)
4504
    except errors.TagError, err:
4505
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4506
    try:
4507
      self.cfg.Update(self.target)
4508
    except errors.ConfigurationError:
4509
      raise errors.OpRetryError("There has been a modification to the"
4510
                                " config file and the operation has been"
4511
                                " aborted. Please retry.")
4512

    
4513

    
4514
class LUDelTags(TagsLU):
4515
  """Delete a list of tags from a given object.
4516

4517
  """
4518
  _OP_REQP = ["kind", "name", "tags"]
4519

    
4520
  def CheckPrereq(self):
4521
    """Check prerequisites.
4522

4523
    This checks that we have the given tag.
4524

4525
    """
4526
    TagsLU.CheckPrereq(self)
4527
    for tag in self.op.tags:
4528
      objects.TaggableObject.ValidateTag(tag)
4529
    del_tags = frozenset(self.op.tags)
4530
    cur_tags = self.target.GetTags()
4531
    if not del_tags <= cur_tags:
4532
      diff_tags = del_tags - cur_tags
4533
      diff_names = ["'%s'" % tag for tag in diff_tags]
4534
      diff_names.sort()
4535
      raise errors.OpPrereqError("Tag(s) %s not found" %
4536
                                 (",".join(diff_names)))
4537

    
4538
  def Exec(self, feedback_fn):
4539
    """Remove the tag from the object.
4540

4541
    """
4542
    for tag in self.op.tags:
4543
      self.target.RemoveTag(tag)
4544
    try:
4545
      self.cfg.Update(self.target)
4546
    except errors.ConfigurationError:
4547
      raise errors.OpRetryError("There has been a modification to the"
4548
                                " config file and the operation has been"
4549
                                " aborted. Please retry.")
4550

    
4551
class LUTestDelay(NoHooksLU):
4552
  """Sleep for a specified amount of time.
4553

4554
  This LU sleeps on the master and/or nodes for a specified amount of
4555
  time.
4556

4557
  """
4558
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4559

    
4560
  def CheckPrereq(self):
4561
    """Check prerequisites.
4562

4563
    This checks that we have a good list of nodes and/or the duration
4564
    is valid.
4565

4566
    """
4567

    
4568
    if self.op.on_nodes:
4569
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4570

    
4571
  def Exec(self, feedback_fn):
4572
    """Do the actual sleep.
4573

4574
    """
4575
    if self.op.on_master:
4576
      if not utils.TestDelay(self.op.duration):
4577
        raise errors.OpExecError("Error during master delay test")
4578
    if self.op.on_nodes:
4579
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4580
      if not result:
4581
        raise errors.OpExecError("Complete failure from rpc call")
4582
      for node, node_result in result.items():
4583
        if not node_result:
4584
          raise errors.OpExecError("Failure during rpc call to node %s,"
4585
                                   " result: %s" % (node, node_result))
4586

    
4587

    
4588
class IAllocator(object):
4589
  """IAllocator framework.
4590

4591
  An IAllocator instance has three sets of attributes:
4592
    - cfg/sstore that are needed to query the cluster
4593
    - input data (all members of the _KEYS class attribute are required)
4594
    - four buffer attributes (in|out_data|text), that represent the
4595
      input (to the external script) in text and data structure format,
4596
      and the output from it, again in two formats
4597
    - the result variables from the script (success, info, nodes) for
4598
      easy usage
4599

4600
  """
4601
  _ALLO_KEYS = [
4602
    "mem_size", "disks", "disk_template",
4603
    "os", "tags", "nics", "vcpus",
4604
    ]
4605
  _RELO_KEYS = [
4606
    "relocate_from",
4607
    ]
4608

    
4609
  def __init__(self, cfg, sstore, mode, name, **kwargs):
4610
    self.cfg = cfg
4611
    self.sstore = sstore
4612
    # init buffer variables
4613
    self.in_text = self.out_text = self.in_data = self.out_data = None
4614
    # init all input fields so that pylint is happy
4615
    self.mode = mode
4616
    self.name = name
4617
    self.mem_size = self.disks = self.disk_template = None
4618
    self.os = self.tags = self.nics = self.vcpus = None
4619
    self.relocate_from = None
4620
    # computed fields
4621
    self.required_nodes = None
4622
    # init result fields
4623
    self.success = self.info = self.nodes = None
4624
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4625
      keyset = self._ALLO_KEYS
4626
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4627
      keyset = self._RELO_KEYS
4628
    else:
4629
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4630
                                   " IAllocator" % self.mode)
4631
    for key in kwargs:
4632
      if key not in keyset:
4633
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4634
                                     " IAllocator" % key)
4635
      setattr(self, key, kwargs[key])
4636
    for key in keyset:
4637
      if key not in kwargs:
4638
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4639
                                     " IAllocator" % key)
4640
    self._BuildInputData()
4641

    
4642
  def _ComputeClusterData(self):
4643
    """Compute the generic allocator input data.
4644

4645
    This is the data that is independent of the actual operation.
4646

4647
    """
4648
    cfg = self.cfg
4649
    # cluster data
4650
    data = {
4651
      "version": 1,
4652
      "cluster_name": self.sstore.GetClusterName(),
4653
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4654
      "hypervisor_type": self.sstore.GetHypervisorType(),
4655
      # we don't have job IDs
4656
      }
4657

    
4658
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4659

    
4660
    # node data
4661
    node_results = {}
4662
    node_list = cfg.GetNodeList()
4663
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4664
    for nname in node_list:
4665
      ninfo = cfg.GetNodeInfo(nname)
4666
      if nname not in node_data or not isinstance(node_data[nname], dict):
4667
        raise errors.OpExecError("Can't get data for node %s" % nname)
4668
      remote_info = node_data[nname]
4669
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
4670
                   'vg_size', 'vg_free', 'cpu_total']:
4671
        if attr not in remote_info:
4672
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4673
                                   (nname, attr))
4674
        try:
4675
          remote_info[attr] = int(remote_info[attr])
4676
        except ValueError, err:
4677
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4678
                                   " %s" % (nname, attr, str(err)))
4679
      # compute memory used by primary instances
4680
      i_p_mem = i_p_up_mem = 0
4681
      for iinfo in i_list:
4682
        if iinfo.primary_node == nname:
4683
          i_p_mem += iinfo.memory
4684
          if iinfo.status == "up":
4685
            i_p_up_mem += iinfo.memory
4686

    
4687
      # compute memory used by instances
4688
      pnr = {
4689
        "tags": list(ninfo.GetTags()),
4690
        "total_memory": remote_info['memory_total'],
4691
        "reserved_memory": remote_info['memory_dom0'],
4692
        "free_memory": remote_info['memory_free'],
4693
        "i_pri_memory": i_p_mem,
4694
        "i_pri_up_memory": i_p_up_mem,
4695
        "total_disk": remote_info['vg_size'],
4696
        "free_disk": remote_info['vg_free'],
4697
        "primary_ip": ninfo.primary_ip,
4698
        "secondary_ip": ninfo.secondary_ip,
4699
        "total_cpus": remote_info['cpu_total'],
4700
        }
4701
      node_results[nname] = pnr
4702
    data["nodes"] = node_results
4703

    
4704
    # instance data
4705
    instance_data = {}
4706
    for iinfo in i_list:
4707
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4708
                  for n in iinfo.nics]
4709
      pir = {
4710
        "tags": list(iinfo.GetTags()),
4711
        "should_run": iinfo.status == "up",
4712
        "vcpus": iinfo.vcpus,
4713
        "memory": iinfo.memory,
4714
        "os": iinfo.os,
4715
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4716
        "nics": nic_data,
4717
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4718
        "disk_template": iinfo.disk_template,
4719
        }
4720
      instance_data[iinfo.name] = pir
4721

    
4722
    data["instances"] = instance_data
4723

    
4724
    self.in_data = data
4725

    
4726
  def _AddNewInstance(self):
4727
    """Add new instance data to allocator structure.
4728

4729
    This in combination with _AllocatorGetClusterData will create the
4730
    correct structure needed as input for the allocator.
4731

4732
    The checks for the completeness of the opcode must have already been
4733
    done.
4734

4735
    """
4736
    data = self.in_data
4737
    if len(self.disks) != 2:
4738
      raise errors.OpExecError("Only two-disk configurations supported")
4739

    
4740
    disk_space = _ComputeDiskSize(self.disk_template,
4741
                                  self.disks[0]["size"], self.disks[1]["size"])
4742

    
4743
    if self.disk_template in constants.DTS_NET_MIRROR:
4744
      self.required_nodes = 2
4745
    else:
4746
      self.required_nodes = 1
4747
    request = {
4748
      "type": "allocate",
4749
      "name": self.name,
4750
      "disk_template": self.disk_template,
4751
      "tags": self.tags,
4752
      "os": self.os,
4753
      "vcpus": self.vcpus,
4754
      "memory": self.mem_size,
4755
      "disks": self.disks,
4756
      "disk_space_total": disk_space,
4757
      "nics": self.nics,
4758
      "required_nodes": self.required_nodes,
4759
      }
4760
    data["request"] = request
4761

    
4762
  def _AddRelocateInstance(self):
4763
    """Add relocate instance data to allocator structure.
4764

4765
    This in combination with _IAllocatorGetClusterData will create the
4766
    correct structure needed as input for the allocator.
4767

4768
    The checks for the completeness of the opcode must have already been
4769
    done.
4770

4771
    """
4772
    instance = self.cfg.GetInstanceInfo(self.name)
4773
    if instance is None:
4774
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
4775
                                   " IAllocator" % self.name)
4776

    
4777
    if instance.disk_template not in constants.DTS_NET_MIRROR:
4778
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4779

    
4780
    if len(instance.secondary_nodes) != 1:
4781
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
4782

    
4783
    self.required_nodes = 1
4784

    
4785
    disk_space = _ComputeDiskSize(instance.disk_template,
4786
                                  instance.disks[0].size,
4787
                                  instance.disks[1].size)
4788

    
4789
    request = {
4790
      "type": "relocate",
4791
      "name": self.name,
4792
      "disk_space_total": disk_space,
4793
      "required_nodes": self.required_nodes,
4794
      "relocate_from": self.relocate_from,
4795
      }
4796
    self.in_data["request"] = request
4797

    
4798
  def _BuildInputData(self):
4799
    """Build input data structures.
4800

4801
    """
4802
    self._ComputeClusterData()
4803

    
4804
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4805
      self._AddNewInstance()
4806
    else:
4807
      self._AddRelocateInstance()
4808

    
4809
    self.in_text = serializer.Dump(self.in_data)
4810

    
4811
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4812
    """Run an instance allocator and return the results.
4813

4814
    """
4815
    data = self.in_text
4816

    
4817
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4818

    
4819
    if not isinstance(result, tuple) or len(result) != 4:
4820
      raise errors.OpExecError("Invalid result from master iallocator runner")
4821

    
4822
    rcode, stdout, stderr, fail = result
4823

    
4824
    if rcode == constants.IARUN_NOTFOUND:
4825
      raise errors.OpExecError("Can't find allocator '%s'" % name)
4826
    elif rcode == constants.IARUN_FAILURE:
4827
        raise errors.OpExecError("Instance allocator call failed: %s,"
4828
                                 " output: %s" %
4829
                                 (fail, stdout+stderr))
4830
    self.out_text = stdout
4831
    if validate:
4832
      self._ValidateResult()
4833

    
4834
  def _ValidateResult(self):
4835
    """Process the allocator results.
4836

4837
    This will process and if successful save the result in
4838
    self.out_data and the other parameters.
4839

4840
    """
4841
    try:
4842
      rdict = serializer.Load(self.out_text)
4843
    except Exception, err:
4844
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4845

    
4846
    if not isinstance(rdict, dict):
4847
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
4848

    
4849
    for key in "success", "info", "nodes":
4850
      if key not in rdict:
4851
        raise errors.OpExecError("Can't parse iallocator results:"
4852
                                 " missing key '%s'" % key)
4853
      setattr(self, key, rdict[key])
4854

    
4855
    if not isinstance(rdict["nodes"], list):
4856
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4857
                               " is not a list")
4858
    self.out_data = rdict
4859

    
4860

    
4861
class LUTestAllocator(NoHooksLU):
4862
  """Run allocator tests.
4863

4864
  This LU runs the allocator tests
4865

4866
  """
4867
  _OP_REQP = ["direction", "mode", "name"]
4868

    
4869
  def CheckPrereq(self):
4870
    """Check prerequisites.
4871

4872
    This checks the opcode parameters depending on the director and mode test.
4873

4874
    """
4875
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4876
      for attr in ["name", "mem_size", "disks", "disk_template",
4877
                   "os", "tags", "nics", "vcpus"]:
4878
        if not hasattr(self.op, attr):
4879
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4880
                                     attr)
4881
      iname = self.cfg.ExpandInstanceName(self.op.name)
4882
      if iname is not None:
4883
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4884
                                   iname)
4885
      if not isinstance(self.op.nics, list):
4886
        raise errors.OpPrereqError("Invalid parameter 'nics'")
4887
      for row in self.op.nics:
4888
        if (not isinstance(row, dict) or
4889
            "mac" not in row or
4890
            "ip" not in row or
4891
            "bridge" not in row):
4892
          raise errors.OpPrereqError("Invalid contents of the"
4893
                                     " 'nics' parameter")
4894
      if not isinstance(self.op.disks, list):
4895
        raise errors.OpPrereqError("Invalid parameter 'disks'")
4896
      if len(self.op.disks) != 2:
4897
        raise errors.OpPrereqError("Only two-disk configurations supported")
4898
      for row in self.op.disks:
4899
        if (not isinstance(row, dict) or
4900
            "size" not in row or
4901
            not isinstance(row["size"], int) or
4902
            "mode" not in row or
4903
            row["mode"] not in ['r', 'w']):
4904
          raise errors.OpPrereqError("Invalid contents of the"
4905
                                     " 'disks' parameter")
4906
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
4907
      if not hasattr(self.op, "name"):
4908
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4909
      fname = self.cfg.ExpandInstanceName(self.op.name)
4910
      if fname is None:
4911
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4912
                                   self.op.name)
4913
      self.op.name = fname
4914
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
4915
    else:
4916
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4917
                                 self.op.mode)
4918

    
4919
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
4920
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
4921
        raise errors.OpPrereqError("Missing allocator name")
4922
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
4923
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
4924
                                 self.op.direction)
4925

    
4926
  def Exec(self, feedback_fn):
4927
    """Run the allocator test.
4928

4929
    """
4930
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4931
      ial = IAllocator(self.cfg, self.sstore,
4932
                       mode=self.op.mode,
4933
                       name=self.op.name,
4934
                       mem_size=self.op.mem_size,
4935
                       disks=self.op.disks,
4936
                       disk_template=self.op.disk_template,
4937
                       os=self.op.os,
4938
                       tags=self.op.tags,
4939
                       nics=self.op.nics,
4940
                       vcpus=self.op.vcpus,
4941
                       )
4942
    else:
4943
      ial = IAllocator(self.cfg, self.sstore,
4944
                       mode=self.op.mode,
4945
                       name=self.op.name,
4946
                       relocate_from=list(self.relocate_from),
4947
                       )
4948

    
4949
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
4950
      result = ial.in_text
4951
    else:
4952
      ial.Run(self.op.allocator, validate=False)
4953
      result = ial.out_text
4954
    return result