Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ c7cdfc90

History | View | Annotate | Download (171.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement CheckPrereq which also fills in the opcode instance
53
      with all the fields (even if as None)
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_MASTER: the LU needs to run on the master node
59
        REQ_WSSTORE: the LU needs a writable SimpleStore
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_WSSTORE = False
69

    
70
  def __init__(self, processor, op, cfg, sstore):
71
    """Constructor for LogicalUnit.
72

73
    This needs to be overriden in derived classes in order to check op
74
    validity.
75

76
    """
77
    self.proc = processor
78
    self.op = op
79
    self.cfg = cfg
80
    self.sstore = sstore
81
    self.__ssh = None
82

    
83
    for attr_name in self._OP_REQP:
84
      attr_val = getattr(op, attr_name, None)
85
      if attr_val is None:
86
        raise errors.OpPrereqError("Required parameter '%s' missing" %
87
                                   attr_name)
88

    
89
    if not cfg.IsCluster():
90
      raise errors.OpPrereqError("Cluster not initialized yet,"
91
                                 " use 'gnt-cluster init' first.")
92
    if self.REQ_MASTER:
93
      master = sstore.GetMasterNode()
94
      if master != utils.HostInfo().name:
95
        raise errors.OpPrereqError("Commands must be run on the master"
96
                                   " node %s" % master)
97

    
98
  def __GetSSH(self):
99
    """Returns the SshRunner object
100

101
    """
102
    if not self.__ssh:
103
      self.__ssh = ssh.SshRunner(self.sstore)
104
    return self.__ssh
105

    
106
  ssh = property(fget=__GetSSH)
107

    
108
  def CheckPrereq(self):
109
    """Check prerequisites for this LU.
110

111
    This method should check that the prerequisites for the execution
112
    of this LU are fulfilled. It can do internode communication, but
113
    it should be idempotent - no cluster or system changes are
114
    allowed.
115

116
    The method should raise errors.OpPrereqError in case something is
117
    not fulfilled. Its return value is ignored.
118

119
    This method should also update all the parameters of the opcode to
120
    their canonical form; e.g. a short node name must be fully
121
    expanded after this method has successfully completed (so that
122
    hooks, logging, etc. work correctly).
123

124
    """
125
    raise NotImplementedError
126

    
127
  def Exec(self, feedback_fn):
128
    """Execute the LU.
129

130
    This method should implement the actual work. It should raise
131
    errors.OpExecError for failures that are somewhat dealt with in
132
    code, or expected.
133

134
    """
135
    raise NotImplementedError
136

    
137
  def BuildHooksEnv(self):
138
    """Build hooks environment for this LU.
139

140
    This method should return a three-node tuple consisting of: a dict
141
    containing the environment that will be used for running the
142
    specific hook for this LU, a list of node names on which the hook
143
    should run before the execution, and a list of node names on which
144
    the hook should run after the execution.
145

146
    The keys of the dict must not have 'GANETI_' prefixed as this will
147
    be handled in the hooks runner. Also note additional keys will be
148
    added by the hooks runner. If the LU doesn't define any
149
    environment, an empty dict (and not None) should be returned.
150

151
    No nodes should be returned as an empty list (and not None).
152

153
    Note that if the HPATH for a LU class is None, this function will
154
    not be called.
155

156
    """
157
    raise NotImplementedError
158

    
159
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
160
    """Notify the LU about the results of its hooks.
161

162
    This method is called every time a hooks phase is executed, and notifies
163
    the Logical Unit about the hooks' result. The LU can then use it to alter
164
    its result based on the hooks.  By default the method does nothing and the
165
    previous result is passed back unchanged but any LU can define it if it
166
    wants to use the local cluster hook-scripts somehow.
167

168
    Args:
169
      phase: the hooks phase that has just been run
170
      hooks_results: the results of the multi-node hooks rpc call
171
      feedback_fn: function to send feedback back to the caller
172
      lu_result: the previous result this LU had, or None in the PRE phase.
173

174
    """
175
    return lu_result
176

    
177

    
178
class NoHooksLU(LogicalUnit):
179
  """Simple LU which runs no hooks.
180

181
  This LU is intended as a parent for other LogicalUnits which will
182
  run no hooks, in order to reduce duplicate code.
183

184
  """
185
  HPATH = None
186
  HTYPE = None
187

    
188

    
189
def _GetWantedNodes(lu, nodes):
190
  """Returns list of checked and expanded node names.
191

192
  Args:
193
    nodes: List of nodes (strings) or None for all
194

195
  """
196
  if not isinstance(nodes, list):
197
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
198

    
199
  if nodes:
200
    wanted = []
201

    
202
    for name in nodes:
203
      node = lu.cfg.ExpandNodeName(name)
204
      if node is None:
205
        raise errors.OpPrereqError("No such node name '%s'" % name)
206
      wanted.append(node)
207

    
208
  else:
209
    wanted = lu.cfg.GetNodeList()
210
  return utils.NiceSort(wanted)
211

    
212

    
213
def _GetWantedInstances(lu, instances):
214
  """Returns list of checked and expanded instance names.
215

216
  Args:
217
    instances: List of instances (strings) or None for all
218

219
  """
220
  if not isinstance(instances, list):
221
    raise errors.OpPrereqError("Invalid argument type 'instances'")
222

    
223
  if instances:
224
    wanted = []
225

    
226
    for name in instances:
227
      instance = lu.cfg.ExpandInstanceName(name)
228
      if instance is None:
229
        raise errors.OpPrereqError("No such instance name '%s'" % name)
230
      wanted.append(instance)
231

    
232
  else:
233
    wanted = lu.cfg.GetInstanceList()
234
  return utils.NiceSort(wanted)
235

    
236

    
237
def _CheckOutputFields(static, dynamic, selected):
238
  """Checks whether all selected fields are valid.
239

240
  Args:
241
    static: Static fields
242
    dynamic: Dynamic fields
243

244
  """
245
  static_fields = frozenset(static)
246
  dynamic_fields = frozenset(dynamic)
247

    
248
  all_fields = static_fields | dynamic_fields
249

    
250
  if not all_fields.issuperset(selected):
251
    raise errors.OpPrereqError("Unknown output fields selected: %s"
252
                               % ",".join(frozenset(selected).
253
                                          difference(all_fields)))
254

    
255

    
256
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
257
                          memory, vcpus, nics):
258
  """Builds instance related env variables for hooks from single variables.
259

260
  Args:
261
    secondary_nodes: List of secondary nodes as strings
262
  """
263
  env = {
264
    "OP_TARGET": name,
265
    "INSTANCE_NAME": name,
266
    "INSTANCE_PRIMARY": primary_node,
267
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
268
    "INSTANCE_OS_TYPE": os_type,
269
    "INSTANCE_STATUS": status,
270
    "INSTANCE_MEMORY": memory,
271
    "INSTANCE_VCPUS": vcpus,
272
  }
273

    
274
  if nics:
275
    nic_count = len(nics)
276
    for idx, (ip, bridge, mac) in enumerate(nics):
277
      if ip is None:
278
        ip = ""
279
      env["INSTANCE_NIC%d_IP" % idx] = ip
280
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
281
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
282
  else:
283
    nic_count = 0
284

    
285
  env["INSTANCE_NIC_COUNT"] = nic_count
286

    
287
  return env
288

    
289

    
290
def _BuildInstanceHookEnvByObject(instance, override=None):
291
  """Builds instance related env variables for hooks from an object.
292

293
  Args:
294
    instance: objects.Instance object of instance
295
    override: dict of values to override
296
  """
297
  args = {
298
    'name': instance.name,
299
    'primary_node': instance.primary_node,
300
    'secondary_nodes': instance.secondary_nodes,
301
    'os_type': instance.os,
302
    'status': instance.os,
303
    'memory': instance.memory,
304
    'vcpus': instance.vcpus,
305
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
306
  }
307
  if override:
308
    args.update(override)
309
  return _BuildInstanceHookEnv(**args)
310

    
311

    
312
def _CheckInstanceBridgesExist(instance):
313
  """Check that the brigdes needed by an instance exist.
314

315
  """
316
  # check bridges existance
317
  brlist = [nic.bridge for nic in instance.nics]
318
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
319
    raise errors.OpPrereqError("one or more target bridges %s does not"
320
                               " exist on destination node '%s'" %
321
                               (brlist, instance.primary_node))
322

    
323

    
324
class LUDestroyCluster(NoHooksLU):
325
  """Logical unit for destroying the cluster.
326

327
  """
328
  _OP_REQP = []
329

    
330
  def CheckPrereq(self):
331
    """Check prerequisites.
332

333
    This checks whether the cluster is empty.
334

335
    Any errors are signalled by raising errors.OpPrereqError.
336

337
    """
338
    master = self.sstore.GetMasterNode()
339

    
340
    nodelist = self.cfg.GetNodeList()
341
    if len(nodelist) != 1 or nodelist[0] != master:
342
      raise errors.OpPrereqError("There are still %d node(s) in"
343
                                 " this cluster." % (len(nodelist) - 1))
344
    instancelist = self.cfg.GetInstanceList()
345
    if instancelist:
346
      raise errors.OpPrereqError("There are still %d instance(s) in"
347
                                 " this cluster." % len(instancelist))
348

    
349
  def Exec(self, feedback_fn):
350
    """Destroys the cluster.
351

352
    """
353
    master = self.sstore.GetMasterNode()
354
    if not rpc.call_node_stop_master(master):
355
      raise errors.OpExecError("Could not disable the master role")
356
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
357
    utils.CreateBackup(priv_key)
358
    utils.CreateBackup(pub_key)
359
    rpc.call_node_leave_cluster(master)
360

    
361

    
362
class LUVerifyCluster(LogicalUnit):
363
  """Verifies the cluster status.
364

365
  """
366
  HPATH = "cluster-verify"
367
  HTYPE = constants.HTYPE_CLUSTER
368
  _OP_REQP = ["skip_checks"]
369

    
370
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
371
                  remote_version, feedback_fn):
372
    """Run multiple tests against a node.
373

374
    Test list:
375
      - compares ganeti version
376
      - checks vg existance and size > 20G
377
      - checks config file checksum
378
      - checks ssh to other nodes
379

380
    Args:
381
      node: name of the node to check
382
      file_list: required list of files
383
      local_cksum: dictionary of local files and their checksums
384

385
    """
386
    # compares ganeti version
387
    local_version = constants.PROTOCOL_VERSION
388
    if not remote_version:
389
      feedback_fn("  - ERROR: connection to %s failed" % (node))
390
      return True
391

    
392
    if local_version != remote_version:
393
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
394
                      (local_version, node, remote_version))
395
      return True
396

    
397
    # checks vg existance and size > 20G
398

    
399
    bad = False
400
    if not vglist:
401
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
402
                      (node,))
403
      bad = True
404
    else:
405
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
406
                                            constants.MIN_VG_SIZE)
407
      if vgstatus:
408
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
409
        bad = True
410

    
411
    # checks config file checksum
412
    # checks ssh to any
413

    
414
    if 'filelist' not in node_result:
415
      bad = True
416
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
417
    else:
418
      remote_cksum = node_result['filelist']
419
      for file_name in file_list:
420
        if file_name not in remote_cksum:
421
          bad = True
422
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
423
        elif remote_cksum[file_name] != local_cksum[file_name]:
424
          bad = True
425
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
426

    
427
    if 'nodelist' not in node_result:
428
      bad = True
429
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
430
    else:
431
      if node_result['nodelist']:
432
        bad = True
433
        for node in node_result['nodelist']:
434
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
435
                          (node, node_result['nodelist'][node]))
436
    if 'node-net-test' not in node_result:
437
      bad = True
438
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
439
    else:
440
      if node_result['node-net-test']:
441
        bad = True
442
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
443
        for node in nlist:
444
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
445
                          (node, node_result['node-net-test'][node]))
446

    
447
    hyp_result = node_result.get('hypervisor', None)
448
    if hyp_result is not None:
449
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
450
    return bad
451

    
452
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
453
                      node_instance, feedback_fn):
454
    """Verify an instance.
455

456
    This function checks to see if the required block devices are
457
    available on the instance's node.
458

459
    """
460
    bad = False
461

    
462
    node_current = instanceconfig.primary_node
463

    
464
    node_vol_should = {}
465
    instanceconfig.MapLVsByNode(node_vol_should)
466

    
467
    for node in node_vol_should:
468
      for volume in node_vol_should[node]:
469
        if node not in node_vol_is or volume not in node_vol_is[node]:
470
          feedback_fn("  - ERROR: volume %s missing on node %s" %
471
                          (volume, node))
472
          bad = True
473

    
474
    if not instanceconfig.status == 'down':
475
      if (node_current not in node_instance or
476
          not instance in node_instance[node_current]):
477
        feedback_fn("  - ERROR: instance %s not running on node %s" %
478
                        (instance, node_current))
479
        bad = True
480

    
481
    for node in node_instance:
482
      if (not node == node_current):
483
        if instance in node_instance[node]:
484
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
485
                          (instance, node))
486
          bad = True
487

    
488
    return bad
489

    
490
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
491
    """Verify if there are any unknown volumes in the cluster.
492

493
    The .os, .swap and backup volumes are ignored. All other volumes are
494
    reported as unknown.
495

496
    """
497
    bad = False
498

    
499
    for node in node_vol_is:
500
      for volume in node_vol_is[node]:
501
        if node not in node_vol_should or volume not in node_vol_should[node]:
502
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
503
                      (volume, node))
504
          bad = True
505
    return bad
506

    
507
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
508
    """Verify the list of running instances.
509

510
    This checks what instances are running but unknown to the cluster.
511

512
    """
513
    bad = False
514
    for node in node_instance:
515
      for runninginstance in node_instance[node]:
516
        if runninginstance not in instancelist:
517
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
518
                          (runninginstance, node))
519
          bad = True
520
    return bad
521

    
522
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
523
    """Verify N+1 Memory Resilience.
524

525
    Check that if one single node dies we can still start all the instances it
526
    was primary for.
527

528
    """
529
    bad = False
530

    
531
    for node, nodeinfo in node_info.iteritems():
532
      # This code checks that every node which is now listed as secondary has
533
      # enough memory to host all instances it is supposed to should a single
534
      # other node in the cluster fail.
535
      # FIXME: not ready for failover to an arbitrary node
536
      # FIXME: does not support file-backed instances
537
      # WARNING: we currently take into account down instances as well as up
538
      # ones, considering that even if they're down someone might want to start
539
      # them even in the event of a node failure.
540
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
541
        needed_mem = 0
542
        for instance in instances:
543
          needed_mem += instance_cfg[instance].memory
544
        if nodeinfo['mfree'] < needed_mem:
545
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
546
                      " failovers should node %s fail" % (node, prinode))
547
          bad = True
548
    return bad
549

    
550
  def CheckPrereq(self):
551
    """Check prerequisites.
552

553
    Transform the list of checks we're going to skip into a set and check that
554
    all its members are valid.
555

556
    """
557
    self.skip_set = frozenset(self.op.skip_checks)
558
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
559
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
560

    
561
  def BuildHooksEnv(self):
562
    """Build hooks env.
563

564
    Cluster-Verify hooks just rone in the post phase and their failure makes
565
    the output be logged in the verify output and the verification to fail.
566

567
    """
568
    all_nodes = self.cfg.GetNodeList()
569
    # TODO: populate the environment with useful information for verify hooks
570
    env = {}
571
    return env, [], all_nodes
572

    
573
  def Exec(self, feedback_fn):
574
    """Verify integrity of cluster, performing various test on nodes.
575

576
    """
577
    bad = False
578
    feedback_fn("* Verifying global settings")
579
    for msg in self.cfg.VerifyConfig():
580
      feedback_fn("  - ERROR: %s" % msg)
581

    
582
    vg_name = self.cfg.GetVGName()
583
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
584
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
585
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
586
    i_non_redundant = [] # Non redundant instances
587
    node_volume = {}
588
    node_instance = {}
589
    node_info = {}
590
    instance_cfg = {}
591

    
592
    # FIXME: verify OS list
593
    # do local checksums
594
    file_names = list(self.sstore.GetFileList())
595
    file_names.append(constants.SSL_CERT_FILE)
596
    file_names.append(constants.CLUSTER_CONF_FILE)
597
    local_checksums = utils.FingerprintFiles(file_names)
598

    
599
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
600
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
601
    all_instanceinfo = rpc.call_instance_list(nodelist)
602
    all_vglist = rpc.call_vg_list(nodelist)
603
    node_verify_param = {
604
      'filelist': file_names,
605
      'nodelist': nodelist,
606
      'hypervisor': None,
607
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
608
                        for node in nodeinfo]
609
      }
610
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
611
    all_rversion = rpc.call_version(nodelist)
612
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
613

    
614
    for node in nodelist:
615
      feedback_fn("* Verifying node %s" % node)
616
      result = self._VerifyNode(node, file_names, local_checksums,
617
                                all_vglist[node], all_nvinfo[node],
618
                                all_rversion[node], feedback_fn)
619
      bad = bad or result
620

    
621
      # node_volume
622
      volumeinfo = all_volumeinfo[node]
623

    
624
      if isinstance(volumeinfo, basestring):
625
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
626
                    (node, volumeinfo[-400:].encode('string_escape')))
627
        bad = True
628
        node_volume[node] = {}
629
      elif not isinstance(volumeinfo, dict):
630
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
631
        bad = True
632
        continue
633
      else:
634
        node_volume[node] = volumeinfo
635

    
636
      # node_instance
637
      nodeinstance = all_instanceinfo[node]
638
      if type(nodeinstance) != list:
639
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
640
        bad = True
641
        continue
642

    
643
      node_instance[node] = nodeinstance
644

    
645
      # node_info
646
      nodeinfo = all_ninfo[node]
647
      if not isinstance(nodeinfo, dict):
648
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
649
        bad = True
650
        continue
651

    
652
      try:
653
        node_info[node] = {
654
          "mfree": int(nodeinfo['memory_free']),
655
          "dfree": int(nodeinfo['vg_free']),
656
          "pinst": [],
657
          "sinst": [],
658
          # dictionary holding all instances this node is secondary for,
659
          # grouped by their primary node. Each key is a cluster node, and each
660
          # value is a list of instances which have the key as primary and the
661
          # current node as secondary.  this is handy to calculate N+1 memory
662
          # availability if you can only failover from a primary to its
663
          # secondary.
664
          "sinst-by-pnode": {},
665
        }
666
      except ValueError:
667
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
668
        bad = True
669
        continue
670

    
671
    node_vol_should = {}
672

    
673
    for instance in instancelist:
674
      feedback_fn("* Verifying instance %s" % instance)
675
      inst_config = self.cfg.GetInstanceInfo(instance)
676
      result =  self._VerifyInstance(instance, inst_config, node_volume,
677
                                     node_instance, feedback_fn)
678
      bad = bad or result
679

    
680
      inst_config.MapLVsByNode(node_vol_should)
681

    
682
      instance_cfg[instance] = inst_config
683

    
684
      pnode = inst_config.primary_node
685
      if pnode in node_info:
686
        node_info[pnode]['pinst'].append(instance)
687
      else:
688
        feedback_fn("  - ERROR: instance %s, connection to primary node"
689
                    " %s failed" % (instance, pnode))
690
        bad = True
691

    
692
      # If the instance is non-redundant we cannot survive losing its primary
693
      # node, so we are not N+1 compliant. On the other hand we have no disk
694
      # templates with more than one secondary so that situation is not well
695
      # supported either.
696
      # FIXME: does not support file-backed instances
697
      if len(inst_config.secondary_nodes) == 0:
698
        i_non_redundant.append(instance)
699
      elif len(inst_config.secondary_nodes) > 1:
700
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
701
                    % instance)
702

    
703
      for snode in inst_config.secondary_nodes:
704
        if snode in node_info:
705
          node_info[snode]['sinst'].append(instance)
706
          if pnode not in node_info[snode]['sinst-by-pnode']:
707
            node_info[snode]['sinst-by-pnode'][pnode] = []
708
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
709
        else:
710
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
711
                      " %s failed" % (instance, snode))
712

    
713
    feedback_fn("* Verifying orphan volumes")
714
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
715
                                       feedback_fn)
716
    bad = bad or result
717

    
718
    feedback_fn("* Verifying remaining instances")
719
    result = self._VerifyOrphanInstances(instancelist, node_instance,
720
                                         feedback_fn)
721
    bad = bad or result
722

    
723
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
724
      feedback_fn("* Verifying N+1 Memory redundancy")
725
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
726
      bad = bad or result
727

    
728
    feedback_fn("* Other Notes")
729
    if i_non_redundant:
730
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
731
                  % len(i_non_redundant))
732

    
733
    return int(bad)
734

    
735
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
736
    """Analize the post-hooks' result, handle it, and send some
737
    nicely-formatted feedback back to the user.
738

739
    Args:
740
      phase: the hooks phase that has just been run
741
      hooks_results: the results of the multi-node hooks rpc call
742
      feedback_fn: function to send feedback back to the caller
743
      lu_result: previous Exec result
744

745
    """
746
    # We only really run POST phase hooks, and are only interested in their results
747
    if phase == constants.HOOKS_PHASE_POST:
748
      # Used to change hooks' output to proper indentation
749
      indent_re = re.compile('^', re.M)
750
      feedback_fn("* Hooks Results")
751
      if not hooks_results:
752
        feedback_fn("  - ERROR: general communication failure")
753
        lu_result = 1
754
      else:
755
        for node_name in hooks_results:
756
          show_node_header = True
757
          res = hooks_results[node_name]
758
          if res is False or not isinstance(res, list):
759
            feedback_fn("    Communication failure")
760
            lu_result = 1
761
            continue
762
          for script, hkr, output in res:
763
            if hkr == constants.HKR_FAIL:
764
              # The node header is only shown once, if there are
765
              # failing hooks on that node
766
              if show_node_header:
767
                feedback_fn("  Node %s:" % node_name)
768
                show_node_header = False
769
              feedback_fn("    ERROR: Script %s failed, output:" % script)
770
              output = indent_re.sub('      ', output)
771
              feedback_fn("%s" % output)
772
              lu_result = 1
773

    
774
      return lu_result
775

    
776

    
777
class LUVerifyDisks(NoHooksLU):
778
  """Verifies the cluster disks status.
779

780
  """
781
  _OP_REQP = []
782

    
783
  def CheckPrereq(self):
784
    """Check prerequisites.
785

786
    This has no prerequisites.
787

788
    """
789
    pass
790

    
791
  def Exec(self, feedback_fn):
792
    """Verify integrity of cluster disks.
793

794
    """
795
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
796

    
797
    vg_name = self.cfg.GetVGName()
798
    nodes = utils.NiceSort(self.cfg.GetNodeList())
799
    instances = [self.cfg.GetInstanceInfo(name)
800
                 for name in self.cfg.GetInstanceList()]
801

    
802
    nv_dict = {}
803
    for inst in instances:
804
      inst_lvs = {}
805
      if (inst.status != "up" or
806
          inst.disk_template not in constants.DTS_NET_MIRROR):
807
        continue
808
      inst.MapLVsByNode(inst_lvs)
809
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
810
      for node, vol_list in inst_lvs.iteritems():
811
        for vol in vol_list:
812
          nv_dict[(node, vol)] = inst
813

    
814
    if not nv_dict:
815
      return result
816

    
817
    node_lvs = rpc.call_volume_list(nodes, vg_name)
818

    
819
    to_act = set()
820
    for node in nodes:
821
      # node_volume
822
      lvs = node_lvs[node]
823

    
824
      if isinstance(lvs, basestring):
825
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
826
        res_nlvm[node] = lvs
827
      elif not isinstance(lvs, dict):
828
        logger.Info("connection to node %s failed or invalid data returned" %
829
                    (node,))
830
        res_nodes.append(node)
831
        continue
832

    
833
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
834
        inst = nv_dict.pop((node, lv_name), None)
835
        if (not lv_online and inst is not None
836
            and inst.name not in res_instances):
837
          res_instances.append(inst.name)
838

    
839
    # any leftover items in nv_dict are missing LVs, let's arrange the
840
    # data better
841
    for key, inst in nv_dict.iteritems():
842
      if inst.name not in res_missing:
843
        res_missing[inst.name] = []
844
      res_missing[inst.name].append(key)
845

    
846
    return result
847

    
848

    
849
class LURenameCluster(LogicalUnit):
850
  """Rename the cluster.
851

852
  """
853
  HPATH = "cluster-rename"
854
  HTYPE = constants.HTYPE_CLUSTER
855
  _OP_REQP = ["name"]
856
  REQ_WSSTORE = True
857

    
858
  def BuildHooksEnv(self):
859
    """Build hooks env.
860

861
    """
862
    env = {
863
      "OP_TARGET": self.sstore.GetClusterName(),
864
      "NEW_NAME": self.op.name,
865
      }
866
    mn = self.sstore.GetMasterNode()
867
    return env, [mn], [mn]
868

    
869
  def CheckPrereq(self):
870
    """Verify that the passed name is a valid one.
871

872
    """
873
    hostname = utils.HostInfo(self.op.name)
874

    
875
    new_name = hostname.name
876
    self.ip = new_ip = hostname.ip
877
    old_name = self.sstore.GetClusterName()
878
    old_ip = self.sstore.GetMasterIP()
879
    if new_name == old_name and new_ip == old_ip:
880
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
881
                                 " cluster has changed")
882
    if new_ip != old_ip:
883
      result = utils.RunCmd(["fping", "-q", new_ip])
884
      if not result.failed:
885
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
886
                                   " reachable on the network. Aborting." %
887
                                   new_ip)
888

    
889
    self.op.name = new_name
890

    
891
  def Exec(self, feedback_fn):
892
    """Rename the cluster.
893

894
    """
895
    clustername = self.op.name
896
    ip = self.ip
897
    ss = self.sstore
898

    
899
    # shutdown the master IP
900
    master = ss.GetMasterNode()
901
    if not rpc.call_node_stop_master(master):
902
      raise errors.OpExecError("Could not disable the master role")
903

    
904
    try:
905
      # modify the sstore
906
      ss.SetKey(ss.SS_MASTER_IP, ip)
907
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
908

    
909
      # Distribute updated ss config to all nodes
910
      myself = self.cfg.GetNodeInfo(master)
911
      dist_nodes = self.cfg.GetNodeList()
912
      if myself.name in dist_nodes:
913
        dist_nodes.remove(myself.name)
914

    
915
      logger.Debug("Copying updated ssconf data to all nodes")
916
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
917
        fname = ss.KeyToFilename(keyname)
918
        result = rpc.call_upload_file(dist_nodes, fname)
919
        for to_node in dist_nodes:
920
          if not result[to_node]:
921
            logger.Error("copy of file %s to node %s failed" %
922
                         (fname, to_node))
923
    finally:
924
      if not rpc.call_node_start_master(master):
925
        logger.Error("Could not re-enable the master role on the master,"
926
                     " please restart manually.")
927

    
928

    
929
def _RecursiveCheckIfLVMBased(disk):
930
  """Check if the given disk or its children are lvm-based.
931

932
  Args:
933
    disk: ganeti.objects.Disk object
934

935
  Returns:
936
    boolean indicating whether a LD_LV dev_type was found or not
937

938
  """
939
  if disk.children:
940
    for chdisk in disk.children:
941
      if _RecursiveCheckIfLVMBased(chdisk):
942
        return True
943
  return disk.dev_type == constants.LD_LV
944

    
945

    
946
class LUSetClusterParams(LogicalUnit):
947
  """Change the parameters of the cluster.
948

949
  """
950
  HPATH = "cluster-modify"
951
  HTYPE = constants.HTYPE_CLUSTER
952
  _OP_REQP = []
953

    
954
  def BuildHooksEnv(self):
955
    """Build hooks env.
956

957
    """
958
    env = {
959
      "OP_TARGET": self.sstore.GetClusterName(),
960
      "NEW_VG_NAME": self.op.vg_name,
961
      }
962
    mn = self.sstore.GetMasterNode()
963
    return env, [mn], [mn]
964

    
965
  def CheckPrereq(self):
966
    """Check prerequisites.
967

968
    This checks whether the given params don't conflict and
969
    if the given volume group is valid.
970

971
    """
972
    if not self.op.vg_name:
973
      instances = [self.cfg.GetInstanceInfo(name)
974
                   for name in self.cfg.GetInstanceList()]
975
      for inst in instances:
976
        for disk in inst.disks:
977
          if _RecursiveCheckIfLVMBased(disk):
978
            raise errors.OpPrereqError("Cannot disable lvm storage while"
979
                                       " lvm-based instances exist")
980

    
981
    # if vg_name not None, checks given volume group on all nodes
982
    if self.op.vg_name:
983
      node_list = self.cfg.GetNodeList()
984
      vglist = rpc.call_vg_list(node_list)
985
      for node in node_list:
986
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
987
                                              constants.MIN_VG_SIZE)
988
        if vgstatus:
989
          raise errors.OpPrereqError("Error on node '%s': %s" %
990
                                     (node, vgstatus))
991

    
992
  def Exec(self, feedback_fn):
993
    """Change the parameters of the cluster.
994

995
    """
996
    if self.op.vg_name != self.cfg.GetVGName():
997
      self.cfg.SetVGName(self.op.vg_name)
998
    else:
999
      feedback_fn("Cluster LVM configuration already in desired"
1000
                  " state, not changing")
1001

    
1002

    
1003
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1004
  """Sleep and poll for an instance's disk to sync.
1005

1006
  """
1007
  if not instance.disks:
1008
    return True
1009

    
1010
  if not oneshot:
1011
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1012

    
1013
  node = instance.primary_node
1014

    
1015
  for dev in instance.disks:
1016
    cfgw.SetDiskID(dev, node)
1017

    
1018
  retries = 0
1019
  while True:
1020
    max_time = 0
1021
    done = True
1022
    cumul_degraded = False
1023
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1024
    if not rstats:
1025
      proc.LogWarning("Can't get any data from node %s" % node)
1026
      retries += 1
1027
      if retries >= 10:
1028
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1029
                                 " aborting." % node)
1030
      time.sleep(6)
1031
      continue
1032
    retries = 0
1033
    for i in range(len(rstats)):
1034
      mstat = rstats[i]
1035
      if mstat is None:
1036
        proc.LogWarning("Can't compute data for node %s/%s" %
1037
                        (node, instance.disks[i].iv_name))
1038
        continue
1039
      # we ignore the ldisk parameter
1040
      perc_done, est_time, is_degraded, _ = mstat
1041
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1042
      if perc_done is not None:
1043
        done = False
1044
        if est_time is not None:
1045
          rem_time = "%d estimated seconds remaining" % est_time
1046
          max_time = est_time
1047
        else:
1048
          rem_time = "no time estimate"
1049
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1050
                     (instance.disks[i].iv_name, perc_done, rem_time))
1051
    if done or oneshot:
1052
      break
1053

    
1054
    if unlock:
1055
      #utils.Unlock('cmd')
1056
      pass
1057
    try:
1058
      time.sleep(min(60, max_time))
1059
    finally:
1060
      if unlock:
1061
        #utils.Lock('cmd')
1062
        pass
1063

    
1064
  if done:
1065
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1066
  return not cumul_degraded
1067

    
1068

    
1069
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1070
  """Check that mirrors are not degraded.
1071

1072
  The ldisk parameter, if True, will change the test from the
1073
  is_degraded attribute (which represents overall non-ok status for
1074
  the device(s)) to the ldisk (representing the local storage status).
1075

1076
  """
1077
  cfgw.SetDiskID(dev, node)
1078
  if ldisk:
1079
    idx = 6
1080
  else:
1081
    idx = 5
1082

    
1083
  result = True
1084
  if on_primary or dev.AssembleOnSecondary():
1085
    rstats = rpc.call_blockdev_find(node, dev)
1086
    if not rstats:
1087
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1088
      result = False
1089
    else:
1090
      result = result and (not rstats[idx])
1091
  if dev.children:
1092
    for child in dev.children:
1093
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1094

    
1095
  return result
1096

    
1097

    
1098
class LUDiagnoseOS(NoHooksLU):
1099
  """Logical unit for OS diagnose/query.
1100

1101
  """
1102
  _OP_REQP = ["output_fields", "names"]
1103

    
1104
  def CheckPrereq(self):
1105
    """Check prerequisites.
1106

1107
    This always succeeds, since this is a pure query LU.
1108

1109
    """
1110
    if self.op.names:
1111
      raise errors.OpPrereqError("Selective OS query not supported")
1112

    
1113
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1114
    _CheckOutputFields(static=[],
1115
                       dynamic=self.dynamic_fields,
1116
                       selected=self.op.output_fields)
1117

    
1118
  @staticmethod
1119
  def _DiagnoseByOS(node_list, rlist):
1120
    """Remaps a per-node return list into an a per-os per-node dictionary
1121

1122
      Args:
1123
        node_list: a list with the names of all nodes
1124
        rlist: a map with node names as keys and OS objects as values
1125

1126
      Returns:
1127
        map: a map with osnames as keys and as value another map, with
1128
             nodes as
1129
             keys and list of OS objects as values
1130
             e.g. {"debian-etch": {"node1": [<object>,...],
1131
                                   "node2": [<object>,]}
1132
                  }
1133

1134
    """
1135
    all_os = {}
1136
    for node_name, nr in rlist.iteritems():
1137
      if not nr:
1138
        continue
1139
      for os_obj in nr:
1140
        if os_obj.name not in all_os:
1141
          # build a list of nodes for this os containing empty lists
1142
          # for each node in node_list
1143
          all_os[os_obj.name] = {}
1144
          for nname in node_list:
1145
            all_os[os_obj.name][nname] = []
1146
        all_os[os_obj.name][node_name].append(os_obj)
1147
    return all_os
1148

    
1149
  def Exec(self, feedback_fn):
1150
    """Compute the list of OSes.
1151

1152
    """
1153
    node_list = self.cfg.GetNodeList()
1154
    node_data = rpc.call_os_diagnose(node_list)
1155
    if node_data == False:
1156
      raise errors.OpExecError("Can't gather the list of OSes")
1157
    pol = self._DiagnoseByOS(node_list, node_data)
1158
    output = []
1159
    for os_name, os_data in pol.iteritems():
1160
      row = []
1161
      for field in self.op.output_fields:
1162
        if field == "name":
1163
          val = os_name
1164
        elif field == "valid":
1165
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1166
        elif field == "node_status":
1167
          val = {}
1168
          for node_name, nos_list in os_data.iteritems():
1169
            val[node_name] = [(v.status, v.path) for v in nos_list]
1170
        else:
1171
          raise errors.ParameterError(field)
1172
        row.append(val)
1173
      output.append(row)
1174

    
1175
    return output
1176

    
1177

    
1178
class LURemoveNode(LogicalUnit):
1179
  """Logical unit for removing a node.
1180

1181
  """
1182
  HPATH = "node-remove"
1183
  HTYPE = constants.HTYPE_NODE
1184
  _OP_REQP = ["node_name"]
1185

    
1186
  def BuildHooksEnv(self):
1187
    """Build hooks env.
1188

1189
    This doesn't run on the target node in the pre phase as a failed
1190
    node would not allows itself to run.
1191

1192
    """
1193
    env = {
1194
      "OP_TARGET": self.op.node_name,
1195
      "NODE_NAME": self.op.node_name,
1196
      }
1197
    all_nodes = self.cfg.GetNodeList()
1198
    all_nodes.remove(self.op.node_name)
1199
    return env, all_nodes, all_nodes
1200

    
1201
  def CheckPrereq(self):
1202
    """Check prerequisites.
1203

1204
    This checks:
1205
     - the node exists in the configuration
1206
     - it does not have primary or secondary instances
1207
     - it's not the master
1208

1209
    Any errors are signalled by raising errors.OpPrereqError.
1210

1211
    """
1212
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1213
    if node is None:
1214
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1215

    
1216
    instance_list = self.cfg.GetInstanceList()
1217

    
1218
    masternode = self.sstore.GetMasterNode()
1219
    if node.name == masternode:
1220
      raise errors.OpPrereqError("Node is the master node,"
1221
                                 " you need to failover first.")
1222

    
1223
    for instance_name in instance_list:
1224
      instance = self.cfg.GetInstanceInfo(instance_name)
1225
      if node.name == instance.primary_node:
1226
        raise errors.OpPrereqError("Instance %s still running on the node,"
1227
                                   " please remove first." % instance_name)
1228
      if node.name in instance.secondary_nodes:
1229
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1230
                                   " please remove first." % instance_name)
1231
    self.op.node_name = node.name
1232
    self.node = node
1233

    
1234
  def Exec(self, feedback_fn):
1235
    """Removes the node from the cluster.
1236

1237
    """
1238
    node = self.node
1239
    logger.Info("stopping the node daemon and removing configs from node %s" %
1240
                node.name)
1241

    
1242
    rpc.call_node_leave_cluster(node.name)
1243

    
1244
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1245

    
1246
    logger.Info("Removing node %s from config" % node.name)
1247

    
1248
    self.cfg.RemoveNode(node.name)
1249

    
1250
    utils.RemoveHostFromEtcHosts(node.name)
1251

    
1252

    
1253
class LUQueryNodes(NoHooksLU):
1254
  """Logical unit for querying nodes.
1255

1256
  """
1257
  _OP_REQP = ["output_fields", "names"]
1258

    
1259
  def CheckPrereq(self):
1260
    """Check prerequisites.
1261

1262
    This checks that the fields required are valid output fields.
1263

1264
    """
1265
    self.dynamic_fields = frozenset([
1266
      "dtotal", "dfree",
1267
      "mtotal", "mnode", "mfree",
1268
      "bootid",
1269
      "ctotal",
1270
      ])
1271

    
1272
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1273
                               "pinst_list", "sinst_list",
1274
                               "pip", "sip"],
1275
                       dynamic=self.dynamic_fields,
1276
                       selected=self.op.output_fields)
1277

    
1278
    self.wanted = _GetWantedNodes(self, self.op.names)
1279

    
1280
  def Exec(self, feedback_fn):
1281
    """Computes the list of nodes and their attributes.
1282

1283
    """
1284
    nodenames = self.wanted
1285
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1286

    
1287
    # begin data gathering
1288

    
1289
    if self.dynamic_fields.intersection(self.op.output_fields):
1290
      live_data = {}
1291
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1292
      for name in nodenames:
1293
        nodeinfo = node_data.get(name, None)
1294
        if nodeinfo:
1295
          live_data[name] = {
1296
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1297
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1298
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1299
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1300
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1301
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1302
            "bootid": nodeinfo['bootid'],
1303
            }
1304
        else:
1305
          live_data[name] = {}
1306
    else:
1307
      live_data = dict.fromkeys(nodenames, {})
1308

    
1309
    node_to_primary = dict([(name, set()) for name in nodenames])
1310
    node_to_secondary = dict([(name, set()) for name in nodenames])
1311

    
1312
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1313
                             "sinst_cnt", "sinst_list"))
1314
    if inst_fields & frozenset(self.op.output_fields):
1315
      instancelist = self.cfg.GetInstanceList()
1316

    
1317
      for instance_name in instancelist:
1318
        inst = self.cfg.GetInstanceInfo(instance_name)
1319
        if inst.primary_node in node_to_primary:
1320
          node_to_primary[inst.primary_node].add(inst.name)
1321
        for secnode in inst.secondary_nodes:
1322
          if secnode in node_to_secondary:
1323
            node_to_secondary[secnode].add(inst.name)
1324

    
1325
    # end data gathering
1326

    
1327
    output = []
1328
    for node in nodelist:
1329
      node_output = []
1330
      for field in self.op.output_fields:
1331
        if field == "name":
1332
          val = node.name
1333
        elif field == "pinst_list":
1334
          val = list(node_to_primary[node.name])
1335
        elif field == "sinst_list":
1336
          val = list(node_to_secondary[node.name])
1337
        elif field == "pinst_cnt":
1338
          val = len(node_to_primary[node.name])
1339
        elif field == "sinst_cnt":
1340
          val = len(node_to_secondary[node.name])
1341
        elif field == "pip":
1342
          val = node.primary_ip
1343
        elif field == "sip":
1344
          val = node.secondary_ip
1345
        elif field in self.dynamic_fields:
1346
          val = live_data[node.name].get(field, None)
1347
        else:
1348
          raise errors.ParameterError(field)
1349
        node_output.append(val)
1350
      output.append(node_output)
1351

    
1352
    return output
1353

    
1354

    
1355
class LUQueryNodeVolumes(NoHooksLU):
1356
  """Logical unit for getting volumes on node(s).
1357

1358
  """
1359
  _OP_REQP = ["nodes", "output_fields"]
1360

    
1361
  def CheckPrereq(self):
1362
    """Check prerequisites.
1363

1364
    This checks that the fields required are valid output fields.
1365

1366
    """
1367
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1368

    
1369
    _CheckOutputFields(static=["node"],
1370
                       dynamic=["phys", "vg", "name", "size", "instance"],
1371
                       selected=self.op.output_fields)
1372

    
1373

    
1374
  def Exec(self, feedback_fn):
1375
    """Computes the list of nodes and their attributes.
1376

1377
    """
1378
    nodenames = self.nodes
1379
    volumes = rpc.call_node_volumes(nodenames)
1380

    
1381
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1382
             in self.cfg.GetInstanceList()]
1383

    
1384
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1385

    
1386
    output = []
1387
    for node in nodenames:
1388
      if node not in volumes or not volumes[node]:
1389
        continue
1390

    
1391
      node_vols = volumes[node][:]
1392
      node_vols.sort(key=lambda vol: vol['dev'])
1393

    
1394
      for vol in node_vols:
1395
        node_output = []
1396
        for field in self.op.output_fields:
1397
          if field == "node":
1398
            val = node
1399
          elif field == "phys":
1400
            val = vol['dev']
1401
          elif field == "vg":
1402
            val = vol['vg']
1403
          elif field == "name":
1404
            val = vol['name']
1405
          elif field == "size":
1406
            val = int(float(vol['size']))
1407
          elif field == "instance":
1408
            for inst in ilist:
1409
              if node not in lv_by_node[inst]:
1410
                continue
1411
              if vol['name'] in lv_by_node[inst][node]:
1412
                val = inst.name
1413
                break
1414
            else:
1415
              val = '-'
1416
          else:
1417
            raise errors.ParameterError(field)
1418
          node_output.append(str(val))
1419

    
1420
        output.append(node_output)
1421

    
1422
    return output
1423

    
1424

    
1425
class LUAddNode(LogicalUnit):
1426
  """Logical unit for adding node to the cluster.
1427

1428
  """
1429
  HPATH = "node-add"
1430
  HTYPE = constants.HTYPE_NODE
1431
  _OP_REQP = ["node_name"]
1432

    
1433
  def BuildHooksEnv(self):
1434
    """Build hooks env.
1435

1436
    This will run on all nodes before, and on all nodes + the new node after.
1437

1438
    """
1439
    env = {
1440
      "OP_TARGET": self.op.node_name,
1441
      "NODE_NAME": self.op.node_name,
1442
      "NODE_PIP": self.op.primary_ip,
1443
      "NODE_SIP": self.op.secondary_ip,
1444
      }
1445
    nodes_0 = self.cfg.GetNodeList()
1446
    nodes_1 = nodes_0 + [self.op.node_name, ]
1447
    return env, nodes_0, nodes_1
1448

    
1449
  def CheckPrereq(self):
1450
    """Check prerequisites.
1451

1452
    This checks:
1453
     - the new node is not already in the config
1454
     - it is resolvable
1455
     - its parameters (single/dual homed) matches the cluster
1456

1457
    Any errors are signalled by raising errors.OpPrereqError.
1458

1459
    """
1460
    node_name = self.op.node_name
1461
    cfg = self.cfg
1462

    
1463
    dns_data = utils.HostInfo(node_name)
1464

    
1465
    node = dns_data.name
1466
    primary_ip = self.op.primary_ip = dns_data.ip
1467
    secondary_ip = getattr(self.op, "secondary_ip", None)
1468
    if secondary_ip is None:
1469
      secondary_ip = primary_ip
1470
    if not utils.IsValidIP(secondary_ip):
1471
      raise errors.OpPrereqError("Invalid secondary IP given")
1472
    self.op.secondary_ip = secondary_ip
1473

    
1474
    node_list = cfg.GetNodeList()
1475
    if not self.op.readd and node in node_list:
1476
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1477
                                 node)
1478
    elif self.op.readd and node not in node_list:
1479
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1480

    
1481
    for existing_node_name in node_list:
1482
      existing_node = cfg.GetNodeInfo(existing_node_name)
1483

    
1484
      if self.op.readd and node == existing_node_name:
1485
        if (existing_node.primary_ip != primary_ip or
1486
            existing_node.secondary_ip != secondary_ip):
1487
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1488
                                     " address configuration as before")
1489
        continue
1490

    
1491
      if (existing_node.primary_ip == primary_ip or
1492
          existing_node.secondary_ip == primary_ip or
1493
          existing_node.primary_ip == secondary_ip or
1494
          existing_node.secondary_ip == secondary_ip):
1495
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1496
                                   " existing node %s" % existing_node.name)
1497

    
1498
    # check that the type of the node (single versus dual homed) is the
1499
    # same as for the master
1500
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1501
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1502
    newbie_singlehomed = secondary_ip == primary_ip
1503
    if master_singlehomed != newbie_singlehomed:
1504
      if master_singlehomed:
1505
        raise errors.OpPrereqError("The master has no private ip but the"
1506
                                   " new node has one")
1507
      else:
1508
        raise errors.OpPrereqError("The master has a private ip but the"
1509
                                   " new node doesn't have one")
1510

    
1511
    # checks reachablity
1512
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1513
      raise errors.OpPrereqError("Node not reachable by ping")
1514

    
1515
    if not newbie_singlehomed:
1516
      # check reachability from my secondary ip to newbie's secondary ip
1517
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1518
                           source=myself.secondary_ip):
1519
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1520
                                   " based ping to noded port")
1521

    
1522
    self.new_node = objects.Node(name=node,
1523
                                 primary_ip=primary_ip,
1524
                                 secondary_ip=secondary_ip)
1525

    
1526
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1527
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1528
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1529
                                   constants.VNC_PASSWORD_FILE)
1530

    
1531
  def Exec(self, feedback_fn):
1532
    """Adds the new node to the cluster.
1533

1534
    """
1535
    new_node = self.new_node
1536
    node = new_node.name
1537

    
1538
    # set up inter-node password and certificate and restarts the node daemon
1539
    gntpass = self.sstore.GetNodeDaemonPassword()
1540
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1541
      raise errors.OpExecError("ganeti password corruption detected")
1542
    f = open(constants.SSL_CERT_FILE)
1543
    try:
1544
      gntpem = f.read(8192)
1545
    finally:
1546
      f.close()
1547
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1548
    # so we use this to detect an invalid certificate; as long as the
1549
    # cert doesn't contain this, the here-document will be correctly
1550
    # parsed by the shell sequence below
1551
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1552
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1553
    if not gntpem.endswith("\n"):
1554
      raise errors.OpExecError("PEM must end with newline")
1555
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1556

    
1557
    # and then connect with ssh to set password and start ganeti-noded
1558
    # note that all the below variables are sanitized at this point,
1559
    # either by being constants or by the checks above
1560
    ss = self.sstore
1561
    mycommand = ("umask 077 && "
1562
                 "echo '%s' > '%s' && "
1563
                 "cat > '%s' << '!EOF.' && \n"
1564
                 "%s!EOF.\n%s restart" %
1565
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1566
                  constants.SSL_CERT_FILE, gntpem,
1567
                  constants.NODE_INITD_SCRIPT))
1568

    
1569
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1570
    if result.failed:
1571
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1572
                               " output: %s" %
1573
                               (node, result.fail_reason, result.output))
1574

    
1575
    # check connectivity
1576
    time.sleep(4)
1577

    
1578
    result = rpc.call_version([node])[node]
1579
    if result:
1580
      if constants.PROTOCOL_VERSION == result:
1581
        logger.Info("communication to node %s fine, sw version %s match" %
1582
                    (node, result))
1583
      else:
1584
        raise errors.OpExecError("Version mismatch master version %s,"
1585
                                 " node version %s" %
1586
                                 (constants.PROTOCOL_VERSION, result))
1587
    else:
1588
      raise errors.OpExecError("Cannot get version from the new node")
1589

    
1590
    # setup ssh on node
1591
    logger.Info("copy ssh key to node %s" % node)
1592
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1593
    keyarray = []
1594
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1595
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1596
                priv_key, pub_key]
1597

    
1598
    for i in keyfiles:
1599
      f = open(i, 'r')
1600
      try:
1601
        keyarray.append(f.read())
1602
      finally:
1603
        f.close()
1604

    
1605
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1606
                               keyarray[3], keyarray[4], keyarray[5])
1607

    
1608
    if not result:
1609
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1610

    
1611
    # Add node to our /etc/hosts, and add key to known_hosts
1612
    utils.AddHostToEtcHosts(new_node.name)
1613

    
1614
    if new_node.secondary_ip != new_node.primary_ip:
1615
      if not rpc.call_node_tcp_ping(new_node.name,
1616
                                    constants.LOCALHOST_IP_ADDRESS,
1617
                                    new_node.secondary_ip,
1618
                                    constants.DEFAULT_NODED_PORT,
1619
                                    10, False):
1620
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1621
                                 " you gave (%s). Please fix and re-run this"
1622
                                 " command." % new_node.secondary_ip)
1623

    
1624
    success, msg = self.ssh.VerifyNodeHostname(node)
1625
    if not success:
1626
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1627
                               " than the one the resolver gives: %s."
1628
                               " Please fix and re-run this command." %
1629
                               (node, msg))
1630

    
1631
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1632
    # including the node just added
1633
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1634
    dist_nodes = self.cfg.GetNodeList()
1635
    if not self.op.readd:
1636
      dist_nodes.append(node)
1637
    if myself.name in dist_nodes:
1638
      dist_nodes.remove(myself.name)
1639

    
1640
    logger.Debug("Copying hosts and known_hosts to all nodes")
1641
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1642
      result = rpc.call_upload_file(dist_nodes, fname)
1643
      for to_node in dist_nodes:
1644
        if not result[to_node]:
1645
          logger.Error("copy of file %s to node %s failed" %
1646
                       (fname, to_node))
1647

    
1648
    to_copy = ss.GetFileList()
1649
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1650
      to_copy.append(constants.VNC_PASSWORD_FILE)
1651
    for fname in to_copy:
1652
      if not self.ssh.CopyFileToNode(node, fname):
1653
        logger.Error("could not copy file %s to node %s" % (fname, node))
1654

    
1655
    if not self.op.readd:
1656
      logger.Info("adding node %s to cluster.conf" % node)
1657
      self.cfg.AddNode(new_node)
1658

    
1659

    
1660
class LUMasterFailover(LogicalUnit):
1661
  """Failover the master node to the current node.
1662

1663
  This is a special LU in that it must run on a non-master node.
1664

1665
  """
1666
  HPATH = "master-failover"
1667
  HTYPE = constants.HTYPE_CLUSTER
1668
  REQ_MASTER = False
1669
  REQ_WSSTORE = True
1670
  _OP_REQP = []
1671

    
1672
  def BuildHooksEnv(self):
1673
    """Build hooks env.
1674

1675
    This will run on the new master only in the pre phase, and on all
1676
    the nodes in the post phase.
1677

1678
    """
1679
    env = {
1680
      "OP_TARGET": self.new_master,
1681
      "NEW_MASTER": self.new_master,
1682
      "OLD_MASTER": self.old_master,
1683
      }
1684
    return env, [self.new_master], self.cfg.GetNodeList()
1685

    
1686
  def CheckPrereq(self):
1687
    """Check prerequisites.
1688

1689
    This checks that we are not already the master.
1690

1691
    """
1692
    self.new_master = utils.HostInfo().name
1693
    self.old_master = self.sstore.GetMasterNode()
1694

    
1695
    if self.old_master == self.new_master:
1696
      raise errors.OpPrereqError("This commands must be run on the node"
1697
                                 " where you want the new master to be."
1698
                                 " %s is already the master" %
1699
                                 self.old_master)
1700

    
1701
  def Exec(self, feedback_fn):
1702
    """Failover the master node.
1703

1704
    This command, when run on a non-master node, will cause the current
1705
    master to cease being master, and the non-master to become new
1706
    master.
1707

1708
    """
1709
    #TODO: do not rely on gethostname returning the FQDN
1710
    logger.Info("setting master to %s, old master: %s" %
1711
                (self.new_master, self.old_master))
1712

    
1713
    if not rpc.call_node_stop_master(self.old_master):
1714
      logger.Error("could disable the master role on the old master"
1715
                   " %s, please disable manually" % self.old_master)
1716

    
1717
    ss = self.sstore
1718
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1719
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1720
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1721
      logger.Error("could not distribute the new simple store master file"
1722
                   " to the other nodes, please check.")
1723

    
1724
    if not rpc.call_node_start_master(self.new_master):
1725
      logger.Error("could not start the master role on the new master"
1726
                   " %s, please check" % self.new_master)
1727
      feedback_fn("Error in activating the master IP on the new master,"
1728
                  " please fix manually.")
1729

    
1730

    
1731

    
1732
class LUQueryClusterInfo(NoHooksLU):
1733
  """Query cluster configuration.
1734

1735
  """
1736
  _OP_REQP = []
1737
  REQ_MASTER = False
1738

    
1739
  def CheckPrereq(self):
1740
    """No prerequsites needed for this LU.
1741

1742
    """
1743
    pass
1744

    
1745
  def Exec(self, feedback_fn):
1746
    """Return cluster config.
1747

1748
    """
1749
    result = {
1750
      "name": self.sstore.GetClusterName(),
1751
      "software_version": constants.RELEASE_VERSION,
1752
      "protocol_version": constants.PROTOCOL_VERSION,
1753
      "config_version": constants.CONFIG_VERSION,
1754
      "os_api_version": constants.OS_API_VERSION,
1755
      "export_version": constants.EXPORT_VERSION,
1756
      "master": self.sstore.GetMasterNode(),
1757
      "architecture": (platform.architecture()[0], platform.machine()),
1758
      "hypervisor_type": self.sstore.GetHypervisorType(),
1759
      }
1760

    
1761
    return result
1762

    
1763

    
1764
class LUClusterCopyFile(NoHooksLU):
1765
  """Copy file to cluster.
1766

1767
  """
1768
  _OP_REQP = ["nodes", "filename"]
1769

    
1770
  def CheckPrereq(self):
1771
    """Check prerequisites.
1772

1773
    It should check that the named file exists and that the given list
1774
    of nodes is valid.
1775

1776
    """
1777
    if not os.path.exists(self.op.filename):
1778
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1779

    
1780
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1781

    
1782
  def Exec(self, feedback_fn):
1783
    """Copy a file from master to some nodes.
1784

1785
    Args:
1786
      opts - class with options as members
1787
      args - list containing a single element, the file name
1788
    Opts used:
1789
      nodes - list containing the name of target nodes; if empty, all nodes
1790

1791
    """
1792
    filename = self.op.filename
1793

    
1794
    myname = utils.HostInfo().name
1795

    
1796
    for node in self.nodes:
1797
      if node == myname:
1798
        continue
1799
      if not self.ssh.CopyFileToNode(node, filename):
1800
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1801

    
1802

    
1803
class LUDumpClusterConfig(NoHooksLU):
1804
  """Return a text-representation of the cluster-config.
1805

1806
  """
1807
  _OP_REQP = []
1808

    
1809
  def CheckPrereq(self):
1810
    """No prerequisites.
1811

1812
    """
1813
    pass
1814

    
1815
  def Exec(self, feedback_fn):
1816
    """Dump a representation of the cluster config to the standard output.
1817

1818
    """
1819
    return self.cfg.DumpConfig()
1820

    
1821

    
1822
class LURunClusterCommand(NoHooksLU):
1823
  """Run a command on some nodes.
1824

1825
  """
1826
  _OP_REQP = ["command", "nodes"]
1827

    
1828
  def CheckPrereq(self):
1829
    """Check prerequisites.
1830

1831
    It checks that the given list of nodes is valid.
1832

1833
    """
1834
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1835

    
1836
  def Exec(self, feedback_fn):
1837
    """Run a command on some nodes.
1838

1839
    """
1840
    # put the master at the end of the nodes list
1841
    master_node = self.sstore.GetMasterNode()
1842
    if master_node in self.nodes:
1843
      self.nodes.remove(master_node)
1844
      self.nodes.append(master_node)
1845

    
1846
    data = []
1847
    for node in self.nodes:
1848
      result = self.ssh.Run(node, "root", self.op.command)
1849
      data.append((node, result.output, result.exit_code))
1850

    
1851
    return data
1852

    
1853

    
1854
class LUActivateInstanceDisks(NoHooksLU):
1855
  """Bring up an instance's disks.
1856

1857
  """
1858
  _OP_REQP = ["instance_name"]
1859

    
1860
  def CheckPrereq(self):
1861
    """Check prerequisites.
1862

1863
    This checks that the instance is in the cluster.
1864

1865
    """
1866
    instance = self.cfg.GetInstanceInfo(
1867
      self.cfg.ExpandInstanceName(self.op.instance_name))
1868
    if instance is None:
1869
      raise errors.OpPrereqError("Instance '%s' not known" %
1870
                                 self.op.instance_name)
1871
    self.instance = instance
1872

    
1873

    
1874
  def Exec(self, feedback_fn):
1875
    """Activate the disks.
1876

1877
    """
1878
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1879
    if not disks_ok:
1880
      raise errors.OpExecError("Cannot activate block devices")
1881

    
1882
    return disks_info
1883

    
1884

    
1885
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1886
  """Prepare the block devices for an instance.
1887

1888
  This sets up the block devices on all nodes.
1889

1890
  Args:
1891
    instance: a ganeti.objects.Instance object
1892
    ignore_secondaries: if true, errors on secondary nodes won't result
1893
                        in an error return from the function
1894

1895
  Returns:
1896
    false if the operation failed
1897
    list of (host, instance_visible_name, node_visible_name) if the operation
1898
         suceeded with the mapping from node devices to instance devices
1899
  """
1900
  device_info = []
1901
  disks_ok = True
1902
  iname = instance.name
1903
  # With the two passes mechanism we try to reduce the window of
1904
  # opportunity for the race condition of switching DRBD to primary
1905
  # before handshaking occured, but we do not eliminate it
1906

    
1907
  # The proper fix would be to wait (with some limits) until the
1908
  # connection has been made and drbd transitions from WFConnection
1909
  # into any other network-connected state (Connected, SyncTarget,
1910
  # SyncSource, etc.)
1911

    
1912
  # 1st pass, assemble on all nodes in secondary mode
1913
  for inst_disk in instance.disks:
1914
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1915
      cfg.SetDiskID(node_disk, node)
1916
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1917
      if not result:
1918
        logger.Error("could not prepare block device %s on node %s"
1919
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1920
        if not ignore_secondaries:
1921
          disks_ok = False
1922

    
1923
  # FIXME: race condition on drbd migration to primary
1924

    
1925
  # 2nd pass, do only the primary node
1926
  for inst_disk in instance.disks:
1927
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1928
      if node != instance.primary_node:
1929
        continue
1930
      cfg.SetDiskID(node_disk, node)
1931
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1932
      if not result:
1933
        logger.Error("could not prepare block device %s on node %s"
1934
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1935
        disks_ok = False
1936
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1937

    
1938
  # leave the disks configured for the primary node
1939
  # this is a workaround that would be fixed better by
1940
  # improving the logical/physical id handling
1941
  for disk in instance.disks:
1942
    cfg.SetDiskID(disk, instance.primary_node)
1943

    
1944
  return disks_ok, device_info
1945

    
1946

    
1947
def _StartInstanceDisks(cfg, instance, force):
1948
  """Start the disks of an instance.
1949

1950
  """
1951
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1952
                                           ignore_secondaries=force)
1953
  if not disks_ok:
1954
    _ShutdownInstanceDisks(instance, cfg)
1955
    if force is not None and not force:
1956
      logger.Error("If the message above refers to a secondary node,"
1957
                   " you can retry the operation using '--force'.")
1958
    raise errors.OpExecError("Disk consistency error")
1959

    
1960

    
1961
class LUDeactivateInstanceDisks(NoHooksLU):
1962
  """Shutdown an instance's disks.
1963

1964
  """
1965
  _OP_REQP = ["instance_name"]
1966

    
1967
  def CheckPrereq(self):
1968
    """Check prerequisites.
1969

1970
    This checks that the instance is in the cluster.
1971

1972
    """
1973
    instance = self.cfg.GetInstanceInfo(
1974
      self.cfg.ExpandInstanceName(self.op.instance_name))
1975
    if instance is None:
1976
      raise errors.OpPrereqError("Instance '%s' not known" %
1977
                                 self.op.instance_name)
1978
    self.instance = instance
1979

    
1980
  def Exec(self, feedback_fn):
1981
    """Deactivate the disks
1982

1983
    """
1984
    instance = self.instance
1985
    ins_l = rpc.call_instance_list([instance.primary_node])
1986
    ins_l = ins_l[instance.primary_node]
1987
    if not type(ins_l) is list:
1988
      raise errors.OpExecError("Can't contact node '%s'" %
1989
                               instance.primary_node)
1990

    
1991
    if self.instance.name in ins_l:
1992
      raise errors.OpExecError("Instance is running, can't shutdown"
1993
                               " block devices.")
1994

    
1995
    _ShutdownInstanceDisks(instance, self.cfg)
1996

    
1997

    
1998
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1999
  """Shutdown block devices of an instance.
2000

2001
  This does the shutdown on all nodes of the instance.
2002

2003
  If the ignore_primary is false, errors on the primary node are
2004
  ignored.
2005

2006
  """
2007
  result = True
2008
  for disk in instance.disks:
2009
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2010
      cfg.SetDiskID(top_disk, node)
2011
      if not rpc.call_blockdev_shutdown(node, top_disk):
2012
        logger.Error("could not shutdown block device %s on node %s" %
2013
                     (disk.iv_name, node))
2014
        if not ignore_primary or node != instance.primary_node:
2015
          result = False
2016
  return result
2017

    
2018

    
2019
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2020
  """Checks if a node has enough free memory.
2021

2022
  This function check if a given node has the needed amount of free
2023
  memory. In case the node has less memory or we cannot get the
2024
  information from the node, this function raise an OpPrereqError
2025
  exception.
2026

2027
  Args:
2028
    - cfg: a ConfigWriter instance
2029
    - node: the node name
2030
    - reason: string to use in the error message
2031
    - requested: the amount of memory in MiB
2032

2033
  """
2034
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2035
  if not nodeinfo or not isinstance(nodeinfo, dict):
2036
    raise errors.OpPrereqError("Could not contact node %s for resource"
2037
                             " information" % (node,))
2038

    
2039
  free_mem = nodeinfo[node].get('memory_free')
2040
  if not isinstance(free_mem, int):
2041
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2042
                             " was '%s'" % (node, free_mem))
2043
  if requested > free_mem:
2044
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2045
                             " needed %s MiB, available %s MiB" %
2046
                             (node, reason, requested, free_mem))
2047

    
2048

    
2049
class LUStartupInstance(LogicalUnit):
2050
  """Starts an instance.
2051

2052
  """
2053
  HPATH = "instance-start"
2054
  HTYPE = constants.HTYPE_INSTANCE
2055
  _OP_REQP = ["instance_name", "force"]
2056

    
2057
  def BuildHooksEnv(self):
2058
    """Build hooks env.
2059

2060
    This runs on master, primary and secondary nodes of the instance.
2061

2062
    """
2063
    env = {
2064
      "FORCE": self.op.force,
2065
      }
2066
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2067
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2068
          list(self.instance.secondary_nodes))
2069
    return env, nl, nl
2070

    
2071
  def CheckPrereq(self):
2072
    """Check prerequisites.
2073

2074
    This checks that the instance is in the cluster.
2075

2076
    """
2077
    instance = self.cfg.GetInstanceInfo(
2078
      self.cfg.ExpandInstanceName(self.op.instance_name))
2079
    if instance is None:
2080
      raise errors.OpPrereqError("Instance '%s' not known" %
2081
                                 self.op.instance_name)
2082

    
2083
    # check bridges existance
2084
    _CheckInstanceBridgesExist(instance)
2085

    
2086
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2087
                         "starting instance %s" % instance.name,
2088
                         instance.memory)
2089

    
2090
    self.instance = instance
2091
    self.op.instance_name = instance.name
2092

    
2093
  def Exec(self, feedback_fn):
2094
    """Start the instance.
2095

2096
    """
2097
    instance = self.instance
2098
    force = self.op.force
2099
    extra_args = getattr(self.op, "extra_args", "")
2100

    
2101
    self.cfg.MarkInstanceUp(instance.name)
2102

    
2103
    node_current = instance.primary_node
2104

    
2105
    _StartInstanceDisks(self.cfg, instance, force)
2106

    
2107
    if not rpc.call_instance_start(node_current, instance, extra_args):
2108
      _ShutdownInstanceDisks(instance, self.cfg)
2109
      raise errors.OpExecError("Could not start instance")
2110

    
2111

    
2112
class LURebootInstance(LogicalUnit):
2113
  """Reboot an instance.
2114

2115
  """
2116
  HPATH = "instance-reboot"
2117
  HTYPE = constants.HTYPE_INSTANCE
2118
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2119

    
2120
  def BuildHooksEnv(self):
2121
    """Build hooks env.
2122

2123
    This runs on master, primary and secondary nodes of the instance.
2124

2125
    """
2126
    env = {
2127
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2128
      }
2129
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2130
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2131
          list(self.instance.secondary_nodes))
2132
    return env, nl, nl
2133

    
2134
  def CheckPrereq(self):
2135
    """Check prerequisites.
2136

2137
    This checks that the instance is in the cluster.
2138

2139
    """
2140
    instance = self.cfg.GetInstanceInfo(
2141
      self.cfg.ExpandInstanceName(self.op.instance_name))
2142
    if instance is None:
2143
      raise errors.OpPrereqError("Instance '%s' not known" %
2144
                                 self.op.instance_name)
2145

    
2146
    # check bridges existance
2147
    _CheckInstanceBridgesExist(instance)
2148

    
2149
    self.instance = instance
2150
    self.op.instance_name = instance.name
2151

    
2152
  def Exec(self, feedback_fn):
2153
    """Reboot the instance.
2154

2155
    """
2156
    instance = self.instance
2157
    ignore_secondaries = self.op.ignore_secondaries
2158
    reboot_type = self.op.reboot_type
2159
    extra_args = getattr(self.op, "extra_args", "")
2160

    
2161
    node_current = instance.primary_node
2162

    
2163
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2164
                           constants.INSTANCE_REBOOT_HARD,
2165
                           constants.INSTANCE_REBOOT_FULL]:
2166
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2167
                                  (constants.INSTANCE_REBOOT_SOFT,
2168
                                   constants.INSTANCE_REBOOT_HARD,
2169
                                   constants.INSTANCE_REBOOT_FULL))
2170

    
2171
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2172
                       constants.INSTANCE_REBOOT_HARD]:
2173
      if not rpc.call_instance_reboot(node_current, instance,
2174
                                      reboot_type, extra_args):
2175
        raise errors.OpExecError("Could not reboot instance")
2176
    else:
2177
      if not rpc.call_instance_shutdown(node_current, instance):
2178
        raise errors.OpExecError("could not shutdown instance for full reboot")
2179
      _ShutdownInstanceDisks(instance, self.cfg)
2180
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2181
      if not rpc.call_instance_start(node_current, instance, extra_args):
2182
        _ShutdownInstanceDisks(instance, self.cfg)
2183
        raise errors.OpExecError("Could not start instance for full reboot")
2184

    
2185
    self.cfg.MarkInstanceUp(instance.name)
2186

    
2187

    
2188
class LUShutdownInstance(LogicalUnit):
2189
  """Shutdown an instance.
2190

2191
  """
2192
  HPATH = "instance-stop"
2193
  HTYPE = constants.HTYPE_INSTANCE
2194
  _OP_REQP = ["instance_name"]
2195

    
2196
  def BuildHooksEnv(self):
2197
    """Build hooks env.
2198

2199
    This runs on master, primary and secondary nodes of the instance.
2200

2201
    """
2202
    env = _BuildInstanceHookEnvByObject(self.instance)
2203
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2204
          list(self.instance.secondary_nodes))
2205
    return env, nl, nl
2206

    
2207
  def CheckPrereq(self):
2208
    """Check prerequisites.
2209

2210
    This checks that the instance is in the cluster.
2211

2212
    """
2213
    instance = self.cfg.GetInstanceInfo(
2214
      self.cfg.ExpandInstanceName(self.op.instance_name))
2215
    if instance is None:
2216
      raise errors.OpPrereqError("Instance '%s' not known" %
2217
                                 self.op.instance_name)
2218
    self.instance = instance
2219

    
2220
  def Exec(self, feedback_fn):
2221
    """Shutdown the instance.
2222

2223
    """
2224
    instance = self.instance
2225
    node_current = instance.primary_node
2226
    self.cfg.MarkInstanceDown(instance.name)
2227
    if not rpc.call_instance_shutdown(node_current, instance):
2228
      logger.Error("could not shutdown instance")
2229

    
2230
    _ShutdownInstanceDisks(instance, self.cfg)
2231

    
2232

    
2233
class LUReinstallInstance(LogicalUnit):
2234
  """Reinstall an instance.
2235

2236
  """
2237
  HPATH = "instance-reinstall"
2238
  HTYPE = constants.HTYPE_INSTANCE
2239
  _OP_REQP = ["instance_name"]
2240

    
2241
  def BuildHooksEnv(self):
2242
    """Build hooks env.
2243

2244
    This runs on master, primary and secondary nodes of the instance.
2245

2246
    """
2247
    env = _BuildInstanceHookEnvByObject(self.instance)
2248
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2249
          list(self.instance.secondary_nodes))
2250
    return env, nl, nl
2251

    
2252
  def CheckPrereq(self):
2253
    """Check prerequisites.
2254

2255
    This checks that the instance is in the cluster and is not running.
2256

2257
    """
2258
    instance = self.cfg.GetInstanceInfo(
2259
      self.cfg.ExpandInstanceName(self.op.instance_name))
2260
    if instance is None:
2261
      raise errors.OpPrereqError("Instance '%s' not known" %
2262
                                 self.op.instance_name)
2263
    if instance.disk_template == constants.DT_DISKLESS:
2264
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2265
                                 self.op.instance_name)
2266
    if instance.status != "down":
2267
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2268
                                 self.op.instance_name)
2269
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2270
    if remote_info:
2271
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2272
                                 (self.op.instance_name,
2273
                                  instance.primary_node))
2274

    
2275
    self.op.os_type = getattr(self.op, "os_type", None)
2276
    if self.op.os_type is not None:
2277
      # OS verification
2278
      pnode = self.cfg.GetNodeInfo(
2279
        self.cfg.ExpandNodeName(instance.primary_node))
2280
      if pnode is None:
2281
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2282
                                   self.op.pnode)
2283
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2284
      if not os_obj:
2285
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2286
                                   " primary node"  % self.op.os_type)
2287

    
2288
    self.instance = instance
2289

    
2290
  def Exec(self, feedback_fn):
2291
    """Reinstall the instance.
2292

2293
    """
2294
    inst = self.instance
2295

    
2296
    if self.op.os_type is not None:
2297
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2298
      inst.os = self.op.os_type
2299
      self.cfg.AddInstance(inst)
2300

    
2301
    _StartInstanceDisks(self.cfg, inst, None)
2302
    try:
2303
      feedback_fn("Running the instance OS create scripts...")
2304
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2305
        raise errors.OpExecError("Could not install OS for instance %s"
2306
                                 " on node %s" %
2307
                                 (inst.name, inst.primary_node))
2308
    finally:
2309
      _ShutdownInstanceDisks(inst, self.cfg)
2310

    
2311

    
2312
class LURenameInstance(LogicalUnit):
2313
  """Rename an instance.
2314

2315
  """
2316
  HPATH = "instance-rename"
2317
  HTYPE = constants.HTYPE_INSTANCE
2318
  _OP_REQP = ["instance_name", "new_name"]
2319

    
2320
  def BuildHooksEnv(self):
2321
    """Build hooks env.
2322

2323
    This runs on master, primary and secondary nodes of the instance.
2324

2325
    """
2326
    env = _BuildInstanceHookEnvByObject(self.instance)
2327
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2328
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2329
          list(self.instance.secondary_nodes))
2330
    return env, nl, nl
2331

    
2332
  def CheckPrereq(self):
2333
    """Check prerequisites.
2334

2335
    This checks that the instance is in the cluster and is not running.
2336

2337
    """
2338
    instance = self.cfg.GetInstanceInfo(
2339
      self.cfg.ExpandInstanceName(self.op.instance_name))
2340
    if instance is None:
2341
      raise errors.OpPrereqError("Instance '%s' not known" %
2342
                                 self.op.instance_name)
2343
    if instance.status != "down":
2344
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2345
                                 self.op.instance_name)
2346
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2347
    if remote_info:
2348
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2349
                                 (self.op.instance_name,
2350
                                  instance.primary_node))
2351
    self.instance = instance
2352

    
2353
    # new name verification
2354
    name_info = utils.HostInfo(self.op.new_name)
2355

    
2356
    self.op.new_name = new_name = name_info.name
2357
    instance_list = self.cfg.GetInstanceList()
2358
    if new_name in instance_list:
2359
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2360
                                 new_name)
2361

    
2362
    if not getattr(self.op, "ignore_ip", False):
2363
      command = ["fping", "-q", name_info.ip]
2364
      result = utils.RunCmd(command)
2365
      if not result.failed:
2366
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2367
                                   (name_info.ip, new_name))
2368

    
2369

    
2370
  def Exec(self, feedback_fn):
2371
    """Reinstall the instance.
2372

2373
    """
2374
    inst = self.instance
2375
    old_name = inst.name
2376

    
2377
    if inst.disk_template == constants.DT_FILE:
2378
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2379

    
2380
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2381

    
2382
    # re-read the instance from the configuration after rename
2383
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2384

    
2385
    if inst.disk_template == constants.DT_FILE:
2386
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2387
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2388
                                                old_file_storage_dir,
2389
                                                new_file_storage_dir)
2390

    
2391
      if not result:
2392
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2393
                                 " directory '%s' to '%s' (but the instance"
2394
                                 " has been renamed in Ganeti)" % (
2395
                                 inst.primary_node, old_file_storage_dir,
2396
                                 new_file_storage_dir))
2397

    
2398
      if not result[0]:
2399
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2400
                                 " (but the instance has been renamed in"
2401
                                 " Ganeti)" % (old_file_storage_dir,
2402
                                               new_file_storage_dir))
2403

    
2404
    _StartInstanceDisks(self.cfg, inst, None)
2405
    try:
2406
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2407
                                          "sda", "sdb"):
2408
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2409
               " instance has been renamed in Ganeti)" %
2410
               (inst.name, inst.primary_node))
2411
        logger.Error(msg)
2412
    finally:
2413
      _ShutdownInstanceDisks(inst, self.cfg)
2414

    
2415

    
2416
class LURemoveInstance(LogicalUnit):
2417
  """Remove an instance.
2418

2419
  """
2420
  HPATH = "instance-remove"
2421
  HTYPE = constants.HTYPE_INSTANCE
2422
  _OP_REQP = ["instance_name", "ignore_failures"]
2423

    
2424
  def BuildHooksEnv(self):
2425
    """Build hooks env.
2426

2427
    This runs on master, primary and secondary nodes of the instance.
2428

2429
    """
2430
    env = _BuildInstanceHookEnvByObject(self.instance)
2431
    nl = [self.sstore.GetMasterNode()]
2432
    return env, nl, nl
2433

    
2434
  def CheckPrereq(self):
2435
    """Check prerequisites.
2436

2437
    This checks that the instance is in the cluster.
2438

2439
    """
2440
    instance = self.cfg.GetInstanceInfo(
2441
      self.cfg.ExpandInstanceName(self.op.instance_name))
2442
    if instance is None:
2443
      raise errors.OpPrereqError("Instance '%s' not known" %
2444
                                 self.op.instance_name)
2445
    self.instance = instance
2446

    
2447
  def Exec(self, feedback_fn):
2448
    """Remove the instance.
2449

2450
    """
2451
    instance = self.instance
2452
    logger.Info("shutting down instance %s on node %s" %
2453
                (instance.name, instance.primary_node))
2454

    
2455
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2456
      if self.op.ignore_failures:
2457
        feedback_fn("Warning: can't shutdown instance")
2458
      else:
2459
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2460
                                 (instance.name, instance.primary_node))
2461

    
2462
    logger.Info("removing block devices for instance %s" % instance.name)
2463

    
2464
    if not _RemoveDisks(instance, self.cfg):
2465
      if self.op.ignore_failures:
2466
        feedback_fn("Warning: can't remove instance's disks")
2467
      else:
2468
        raise errors.OpExecError("Can't remove instance's disks")
2469

    
2470
    logger.Info("removing instance %s out of cluster config" % instance.name)
2471

    
2472
    self.cfg.RemoveInstance(instance.name)
2473

    
2474

    
2475
class LUQueryInstances(NoHooksLU):
2476
  """Logical unit for querying instances.
2477

2478
  """
2479
  _OP_REQP = ["output_fields", "names"]
2480

    
2481
  def CheckPrereq(self):
2482
    """Check prerequisites.
2483

2484
    This checks that the fields required are valid output fields.
2485

2486
    """
2487
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2488
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2489
                               "admin_state", "admin_ram",
2490
                               "disk_template", "ip", "mac", "bridge",
2491
                               "sda_size", "sdb_size", "vcpus"],
2492
                       dynamic=self.dynamic_fields,
2493
                       selected=self.op.output_fields)
2494

    
2495
    self.wanted = _GetWantedInstances(self, self.op.names)
2496

    
2497
  def Exec(self, feedback_fn):
2498
    """Computes the list of nodes and their attributes.
2499

2500
    """
2501
    instance_names = self.wanted
2502
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2503
                     in instance_names]
2504

    
2505
    # begin data gathering
2506

    
2507
    nodes = frozenset([inst.primary_node for inst in instance_list])
2508

    
2509
    bad_nodes = []
2510
    if self.dynamic_fields.intersection(self.op.output_fields):
2511
      live_data = {}
2512
      node_data = rpc.call_all_instances_info(nodes)
2513
      for name in nodes:
2514
        result = node_data[name]
2515
        if result:
2516
          live_data.update(result)
2517
        elif result == False:
2518
          bad_nodes.append(name)
2519
        # else no instance is alive
2520
    else:
2521
      live_data = dict([(name, {}) for name in instance_names])
2522

    
2523
    # end data gathering
2524

    
2525
    output = []
2526
    for instance in instance_list:
2527
      iout = []
2528
      for field in self.op.output_fields:
2529
        if field == "name":
2530
          val = instance.name
2531
        elif field == "os":
2532
          val = instance.os
2533
        elif field == "pnode":
2534
          val = instance.primary_node
2535
        elif field == "snodes":
2536
          val = list(instance.secondary_nodes)
2537
        elif field == "admin_state":
2538
          val = (instance.status != "down")
2539
        elif field == "oper_state":
2540
          if instance.primary_node in bad_nodes:
2541
            val = None
2542
          else:
2543
            val = bool(live_data.get(instance.name))
2544
        elif field == "status":
2545
          if instance.primary_node in bad_nodes:
2546
            val = "ERROR_nodedown"
2547
          else:
2548
            running = bool(live_data.get(instance.name))
2549
            if running:
2550
              if instance.status != "down":
2551
                val = "running"
2552
              else:
2553
                val = "ERROR_up"
2554
            else:
2555
              if instance.status != "down":
2556
                val = "ERROR_down"
2557
              else:
2558
                val = "ADMIN_down"
2559
        elif field == "admin_ram":
2560
          val = instance.memory
2561
        elif field == "oper_ram":
2562
          if instance.primary_node in bad_nodes:
2563
            val = None
2564
          elif instance.name in live_data:
2565
            val = live_data[instance.name].get("memory", "?")
2566
          else:
2567
            val = "-"
2568
        elif field == "disk_template":
2569
          val = instance.disk_template
2570
        elif field == "ip":
2571
          val = instance.nics[0].ip
2572
        elif field == "bridge":
2573
          val = instance.nics[0].bridge
2574
        elif field == "mac":
2575
          val = instance.nics[0].mac
2576
        elif field == "sda_size" or field == "sdb_size":
2577
          disk = instance.FindDisk(field[:3])
2578
          if disk is None:
2579
            val = None
2580
          else:
2581
            val = disk.size
2582
        elif field == "vcpus":
2583
          val = instance.vcpus
2584
        else:
2585
          raise errors.ParameterError(field)
2586
        iout.append(val)
2587
      output.append(iout)
2588

    
2589
    return output
2590

    
2591

    
2592
class LUFailoverInstance(LogicalUnit):
2593
  """Failover an instance.
2594

2595
  """
2596
  HPATH = "instance-failover"
2597
  HTYPE = constants.HTYPE_INSTANCE
2598
  _OP_REQP = ["instance_name", "ignore_consistency"]
2599

    
2600
  def BuildHooksEnv(self):
2601
    """Build hooks env.
2602

2603
    This runs on master, primary and secondary nodes of the instance.
2604

2605
    """
2606
    env = {
2607
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2608
      }
2609
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2610
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2611
    return env, nl, nl
2612

    
2613
  def CheckPrereq(self):
2614
    """Check prerequisites.
2615

2616
    This checks that the instance is in the cluster.
2617

2618
    """
2619
    instance = self.cfg.GetInstanceInfo(
2620
      self.cfg.ExpandInstanceName(self.op.instance_name))
2621
    if instance is None:
2622
      raise errors.OpPrereqError("Instance '%s' not known" %
2623
                                 self.op.instance_name)
2624

    
2625
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2626
      raise errors.OpPrereqError("Instance's disk layout is not"
2627
                                 " network mirrored, cannot failover.")
2628

    
2629
    secondary_nodes = instance.secondary_nodes
2630
    if not secondary_nodes:
2631
      raise errors.ProgrammerError("no secondary node but using "
2632
                                   "a mirrored disk template")
2633

    
2634
    target_node = secondary_nodes[0]
2635
    # check memory requirements on the secondary node
2636
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2637
                         instance.name, instance.memory)
2638

    
2639
    # check bridge existance
2640
    brlist = [nic.bridge for nic in instance.nics]
2641
    if not rpc.call_bridges_exist(target_node, brlist):
2642
      raise errors.OpPrereqError("One or more target bridges %s does not"
2643
                                 " exist on destination node '%s'" %
2644
                                 (brlist, target_node))
2645

    
2646
    self.instance = instance
2647

    
2648
  def Exec(self, feedback_fn):
2649
    """Failover an instance.
2650

2651
    The failover is done by shutting it down on its present node and
2652
    starting it on the secondary.
2653

2654
    """
2655
    instance = self.instance
2656

    
2657
    source_node = instance.primary_node
2658
    target_node = instance.secondary_nodes[0]
2659

    
2660
    feedback_fn("* checking disk consistency between source and target")
2661
    for dev in instance.disks:
2662
      # for drbd, these are drbd over lvm
2663
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2664
        if instance.status == "up" and not self.op.ignore_consistency:
2665
          raise errors.OpExecError("Disk %s is degraded on target node,"
2666
                                   " aborting failover." % dev.iv_name)
2667

    
2668
    feedback_fn("* shutting down instance on source node")
2669
    logger.Info("Shutting down instance %s on node %s" %
2670
                (instance.name, source_node))
2671

    
2672
    if not rpc.call_instance_shutdown(source_node, instance):
2673
      if self.op.ignore_consistency:
2674
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2675
                     " anyway. Please make sure node %s is down"  %
2676
                     (instance.name, source_node, source_node))
2677
      else:
2678
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2679
                                 (instance.name, source_node))
2680

    
2681
    feedback_fn("* deactivating the instance's disks on source node")
2682
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2683
      raise errors.OpExecError("Can't shut down the instance's disks.")
2684

    
2685
    instance.primary_node = target_node
2686
    # distribute new instance config to the other nodes
2687
    self.cfg.Update(instance)
2688

    
2689
    # Only start the instance if it's marked as up
2690
    if instance.status == "up":
2691
      feedback_fn("* activating the instance's disks on target node")
2692
      logger.Info("Starting instance %s on node %s" %
2693
                  (instance.name, target_node))
2694

    
2695
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2696
                                               ignore_secondaries=True)
2697
      if not disks_ok:
2698
        _ShutdownInstanceDisks(instance, self.cfg)
2699
        raise errors.OpExecError("Can't activate the instance's disks")
2700

    
2701
      feedback_fn("* starting the instance on the target node")
2702
      if not rpc.call_instance_start(target_node, instance, None):
2703
        _ShutdownInstanceDisks(instance, self.cfg)
2704
        raise errors.OpExecError("Could not start instance %s on node %s." %
2705
                                 (instance.name, target_node))
2706

    
2707

    
2708
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2709
  """Create a tree of block devices on the primary node.
2710

2711
  This always creates all devices.
2712

2713
  """
2714
  if device.children:
2715
    for child in device.children:
2716
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2717
        return False
2718

    
2719
  cfg.SetDiskID(device, node)
2720
  new_id = rpc.call_blockdev_create(node, device, device.size,
2721
                                    instance.name, True, info)
2722
  if not new_id:
2723
    return False
2724
  if device.physical_id is None:
2725
    device.physical_id = new_id
2726
  return True
2727

    
2728

    
2729
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2730
  """Create a tree of block devices on a secondary node.
2731

2732
  If this device type has to be created on secondaries, create it and
2733
  all its children.
2734

2735
  If not, just recurse to children keeping the same 'force' value.
2736

2737
  """
2738
  if device.CreateOnSecondary():
2739
    force = True
2740
  if device.children:
2741
    for child in device.children:
2742
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2743
                                        child, force, info):
2744
        return False
2745

    
2746
  if not force:
2747
    return True
2748
  cfg.SetDiskID(device, node)
2749
  new_id = rpc.call_blockdev_create(node, device, device.size,
2750
                                    instance.name, False, info)
2751
  if not new_id:
2752
    return False
2753
  if device.physical_id is None:
2754
    device.physical_id = new_id
2755
  return True
2756

    
2757

    
2758
def _GenerateUniqueNames(cfg, exts):
2759
  """Generate a suitable LV name.
2760

2761
  This will generate a logical volume name for the given instance.
2762

2763
  """
2764
  results = []
2765
  for val in exts:
2766
    new_id = cfg.GenerateUniqueID()
2767
    results.append("%s%s" % (new_id, val))
2768
  return results
2769

    
2770

    
2771
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2772
  """Generate a drbd device complete with its children.
2773

2774
  """
2775
  port = cfg.AllocatePort()
2776
  vgname = cfg.GetVGName()
2777
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2778
                          logical_id=(vgname, names[0]))
2779
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2780
                          logical_id=(vgname, names[1]))
2781
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2782
                          logical_id = (primary, secondary, port),
2783
                          children = [dev_data, dev_meta])
2784
  return drbd_dev
2785

    
2786

    
2787
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2788
  """Generate a drbd8 device complete with its children.
2789

2790
  """
2791
  port = cfg.AllocatePort()
2792
  vgname = cfg.GetVGName()
2793
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2794
                          logical_id=(vgname, names[0]))
2795
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2796
                          logical_id=(vgname, names[1]))
2797
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2798
                          logical_id = (primary, secondary, port),
2799
                          children = [dev_data, dev_meta],
2800
                          iv_name=iv_name)
2801
  return drbd_dev
2802

    
2803

    
2804
def _GenerateDiskTemplate(cfg, template_name,
2805
                          instance_name, primary_node,
2806
                          secondary_nodes, disk_sz, swap_sz,
2807
                          file_storage_dir, file_driver):
2808
  """Generate the entire disk layout for a given template type.
2809

2810
  """
2811
  #TODO: compute space requirements
2812

    
2813
  vgname = cfg.GetVGName()
2814
  if template_name == constants.DT_DISKLESS:
2815
    disks = []
2816
  elif template_name == constants.DT_PLAIN:
2817
    if len(secondary_nodes) != 0:
2818
      raise errors.ProgrammerError("Wrong template configuration")
2819

    
2820
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2821
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2822
                           logical_id=(vgname, names[0]),
2823
                           iv_name = "sda")
2824
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2825
                           logical_id=(vgname, names[1]),
2826
                           iv_name = "sdb")
2827
    disks = [sda_dev, sdb_dev]
2828
  elif template_name == constants.DT_DRBD8:
2829
    if len(secondary_nodes) != 1:
2830
      raise errors.ProgrammerError("Wrong template configuration")
2831
    remote_node = secondary_nodes[0]
2832
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2833
                                       ".sdb_data", ".sdb_meta"])
2834
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2835
                                         disk_sz, names[0:2], "sda")
2836
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2837
                                         swap_sz, names[2:4], "sdb")
2838
    disks = [drbd_sda_dev, drbd_sdb_dev]
2839
  elif template_name == constants.DT_FILE:
2840
    if len(secondary_nodes) != 0:
2841
      raise errors.ProgrammerError("Wrong template configuration")
2842

    
2843
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2844
                                iv_name="sda", logical_id=(file_driver,
2845
                                "%s/sda" % file_storage_dir))
2846
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2847
                                iv_name="sdb", logical_id=(file_driver,
2848
                                "%s/sdb" % file_storage_dir))
2849
    disks = [file_sda_dev, file_sdb_dev]
2850
  else:
2851
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2852
  return disks
2853

    
2854

    
2855
def _GetInstanceInfoText(instance):
2856
  """Compute that text that should be added to the disk's metadata.
2857

2858
  """
2859
  return "originstname+%s" % instance.name
2860

    
2861

    
2862
def _CreateDisks(cfg, instance):
2863
  """Create all disks for an instance.
2864

2865
  This abstracts away some work from AddInstance.
2866

2867
  Args:
2868
    instance: the instance object
2869

2870
  Returns:
2871
    True or False showing the success of the creation process
2872

2873
  """
2874
  info = _GetInstanceInfoText(instance)
2875

    
2876
  if instance.disk_template == constants.DT_FILE:
2877
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2878
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2879
                                              file_storage_dir)
2880

    
2881
    if not result:
2882
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2883
      return False
2884

    
2885
    if not result[0]:
2886
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2887
      return False
2888

    
2889
  for device in instance.disks:
2890
    logger.Info("creating volume %s for instance %s" %
2891
                (device.iv_name, instance.name))
2892
    #HARDCODE
2893
    for secondary_node in instance.secondary_nodes:
2894
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2895
                                        device, False, info):
2896
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2897
                     (device.iv_name, device, secondary_node))
2898
        return False
2899
    #HARDCODE
2900
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2901
                                    instance, device, info):
2902
      logger.Error("failed to create volume %s on primary!" %
2903
                   device.iv_name)
2904
      return False
2905

    
2906
  return True
2907

    
2908

    
2909
def _RemoveDisks(instance, cfg):
2910
  """Remove all disks for an instance.
2911

2912
  This abstracts away some work from `AddInstance()` and
2913
  `RemoveInstance()`. Note that in case some of the devices couldn't
2914
  be removed, the removal will continue with the other ones (compare
2915
  with `_CreateDisks()`).
2916

2917
  Args:
2918
    instance: the instance object
2919

2920
  Returns:
2921
    True or False showing the success of the removal proces
2922

2923
  """
2924
  logger.Info("removing block devices for instance %s" % instance.name)
2925

    
2926
  result = True
2927
  for device in instance.disks:
2928
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2929
      cfg.SetDiskID(disk, node)
2930
      if not rpc.call_blockdev_remove(node, disk):
2931
        logger.Error("could not remove block device %s on node %s,"
2932
                     " continuing anyway" %
2933
                     (device.iv_name, node))
2934
        result = False
2935

    
2936
  if instance.disk_template == constants.DT_FILE:
2937
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2938
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
2939
                                            file_storage_dir):
2940
      logger.Error("could not remove directory '%s'" % file_storage_dir)
2941
      result = False
2942

    
2943
  return result
2944

    
2945

    
2946
def _ComputeDiskSize(disk_template, disk_size, swap_size):
2947
  """Compute disk size requirements in the volume group
2948

2949
  This is currently hard-coded for the two-drive layout.
2950

2951
  """
2952
  # Required free disk space as a function of disk and swap space
2953
  req_size_dict = {
2954
    constants.DT_DISKLESS: None,
2955
    constants.DT_PLAIN: disk_size + swap_size,
2956
    # 256 MB are added for drbd metadata, 128MB for each drbd device
2957
    constants.DT_DRBD8: disk_size + swap_size + 256,
2958
    constants.DT_FILE: None,
2959
  }
2960

    
2961
  if disk_template not in req_size_dict:
2962
    raise errors.ProgrammerError("Disk template '%s' size requirement"
2963
                                 " is unknown" %  disk_template)
2964

    
2965
  return req_size_dict[disk_template]
2966

    
2967

    
2968
class LUCreateInstance(LogicalUnit):
2969
  """Create an instance.
2970

2971
  """
2972
  HPATH = "instance-add"
2973
  HTYPE = constants.HTYPE_INSTANCE
2974
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
2975
              "disk_template", "swap_size", "mode", "start", "vcpus",
2976
              "wait_for_sync", "ip_check", "mac"]
2977

    
2978
  def _RunAllocator(self):
2979
    """Run the allocator based on input opcode.
2980

2981
    """
2982
    disks = [{"size": self.op.disk_size, "mode": "w"},
2983
             {"size": self.op.swap_size, "mode": "w"}]
2984
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2985
             "bridge": self.op.bridge}]
2986
    ial = IAllocator(self.cfg, self.sstore,
2987
                     mode=constants.IALLOCATOR_MODE_ALLOC,
2988
                     name=self.op.instance_name,
2989
                     disk_template=self.op.disk_template,
2990
                     tags=[],
2991
                     os=self.op.os_type,
2992
                     vcpus=self.op.vcpus,
2993
                     mem_size=self.op.mem_size,
2994
                     disks=disks,
2995
                     nics=nics,
2996
                     )
2997

    
2998
    ial.Run(self.op.iallocator)
2999

    
3000
    if not ial.success:
3001
      raise errors.OpPrereqError("Can't compute nodes using"
3002
                                 " iallocator '%s': %s" % (self.op.iallocator,
3003
                                                           ial.info))
3004
    if len(ial.nodes) != ial.required_nodes:
3005
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3006
                                 " of nodes (%s), required %s" %
3007
                                 (len(ial.nodes), ial.required_nodes))
3008
    self.op.pnode = ial.nodes[0]
3009
    logger.ToStdout("Selected nodes for the instance: %s" %
3010
                    (", ".join(ial.nodes),))
3011
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3012
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3013
    if ial.required_nodes == 2:
3014
      self.op.snode = ial.nodes[1]
3015

    
3016
  def BuildHooksEnv(self):
3017
    """Build hooks env.
3018

3019
    This runs on master, primary and secondary nodes of the instance.
3020

3021
    """
3022
    env = {
3023
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3024
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3025
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3026
      "INSTANCE_ADD_MODE": self.op.mode,
3027
      }
3028
    if self.op.mode == constants.INSTANCE_IMPORT:
3029
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3030
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3031
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3032

    
3033
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3034
      primary_node=self.op.pnode,
3035
      secondary_nodes=self.secondaries,
3036
      status=self.instance_status,
3037
      os_type=self.op.os_type,
3038
      memory=self.op.mem_size,
3039
      vcpus=self.op.vcpus,
3040
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3041
    ))
3042

    
3043
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3044
          self.secondaries)
3045
    return env, nl, nl
3046

    
3047

    
3048
  def CheckPrereq(self):
3049
    """Check prerequisites.
3050

3051
    """
3052
    # set optional parameters to none if they don't exist
3053
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3054
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3055
                 "vnc_bind_address"]:
3056
      if not hasattr(self.op, attr):
3057
        setattr(self.op, attr, None)
3058

    
3059
    if self.op.mode not in (constants.INSTANCE_CREATE,
3060
                            constants.INSTANCE_IMPORT):
3061
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3062
                                 self.op.mode)
3063

    
3064
    if (not self.cfg.GetVGName() and
3065
        self.op.disk_template not in constants.DTS_NOT_LVM):
3066
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3067
                                 " instances")
3068

    
3069
    if self.op.mode == constants.INSTANCE_IMPORT:
3070
      src_node = getattr(self.op, "src_node", None)
3071
      src_path = getattr(self.op, "src_path", None)
3072
      if src_node is None or src_path is None:
3073
        raise errors.OpPrereqError("Importing an instance requires source"
3074
                                   " node and path options")
3075
      src_node_full = self.cfg.ExpandNodeName(src_node)
3076
      if src_node_full is None:
3077
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3078
      self.op.src_node = src_node = src_node_full
3079

    
3080
      if not os.path.isabs(src_path):
3081
        raise errors.OpPrereqError("The source path must be absolute")
3082

    
3083
      export_info = rpc.call_export_info(src_node, src_path)
3084

    
3085
      if not export_info:
3086
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3087

    
3088
      if not export_info.has_section(constants.INISECT_EXP):
3089
        raise errors.ProgrammerError("Corrupted export config")
3090

    
3091
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3092
      if (int(ei_version) != constants.EXPORT_VERSION):
3093
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3094
                                   (ei_version, constants.EXPORT_VERSION))
3095

    
3096
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3097
        raise errors.OpPrereqError("Can't import instance with more than"
3098
                                   " one data disk")
3099

    
3100
      # FIXME: are the old os-es, disk sizes, etc. useful?
3101
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3102
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3103
                                                         'disk0_dump'))
3104
      self.src_image = diskimage
3105
    else: # INSTANCE_CREATE
3106
      if getattr(self.op, "os_type", None) is None:
3107
        raise errors.OpPrereqError("No guest OS specified")
3108

    
3109
    #### instance parameters check
3110

    
3111
    # disk template and mirror node verification
3112
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3113
      raise errors.OpPrereqError("Invalid disk template name")
3114

    
3115
    # instance name verification
3116
    hostname1 = utils.HostInfo(self.op.instance_name)
3117

    
3118
    self.op.instance_name = instance_name = hostname1.name
3119
    instance_list = self.cfg.GetInstanceList()
3120
    if instance_name in instance_list:
3121
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3122
                                 instance_name)
3123

    
3124
    # ip validity checks
3125
    ip = getattr(self.op, "ip", None)
3126
    if ip is None or ip.lower() == "none":
3127
      inst_ip = None
3128
    elif ip.lower() == "auto":
3129
      inst_ip = hostname1.ip
3130
    else:
3131
      if not utils.IsValidIP(ip):
3132
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3133
                                   " like a valid IP" % ip)
3134
      inst_ip = ip
3135
    self.inst_ip = self.op.ip = inst_ip
3136

    
3137
    if self.op.start and not self.op.ip_check:
3138
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3139
                                 " adding an instance in start mode")
3140

    
3141
    if self.op.ip_check:
3142
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3143
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3144
                                   (hostname1.ip, instance_name))
3145

    
3146
    # MAC address verification
3147
    if self.op.mac != "auto":
3148
      if not utils.IsValidMac(self.op.mac.lower()):
3149
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3150
                                   self.op.mac)
3151

    
3152
    # bridge verification
3153
    bridge = getattr(self.op, "bridge", None)
3154
    if bridge is None:
3155
      self.op.bridge = self.cfg.GetDefBridge()
3156
    else:
3157
      self.op.bridge = bridge
3158

    
3159
    # boot order verification
3160
    if self.op.hvm_boot_order is not None:
3161
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3162
        raise errors.OpPrereqError("invalid boot order specified,"
3163
                                   " must be one or more of [acdn]")
3164
    # file storage checks
3165
    if (self.op.file_driver and
3166
        not self.op.file_driver in constants.FILE_DRIVER):
3167
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3168
                                 self.op.file_driver)
3169

    
3170
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3171
      raise errors.OpPrereqError("File storage directory not a relative"
3172
                                 " path")
3173
    #### allocator run
3174

    
3175
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3176
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3177
                                 " node must be given")
3178

    
3179
    if self.op.iallocator is not None:
3180
      self._RunAllocator()
3181

    
3182
    #### node related checks
3183

    
3184
    # check primary node
3185
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3186
    if pnode is None:
3187
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3188
                                 self.op.pnode)
3189
    self.op.pnode = pnode.name
3190
    self.pnode = pnode
3191
    self.secondaries = []
3192

    
3193
    # mirror node verification
3194
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3195
      if getattr(self.op, "snode", None) is None:
3196
        raise errors.OpPrereqError("The networked disk templates need"
3197
                                   " a mirror node")
3198

    
3199
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3200
      if snode_name is None:
3201
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3202
                                   self.op.snode)
3203
      elif snode_name == pnode.name:
3204
        raise errors.OpPrereqError("The secondary node cannot be"
3205
                                   " the primary node.")
3206
      self.secondaries.append(snode_name)
3207

    
3208
    req_size = _ComputeDiskSize(self.op.disk_template,
3209
                                self.op.disk_size, self.op.swap_size)
3210

    
3211
    # Check lv size requirements
3212
    if req_size is not None:
3213
      nodenames = [pnode.name] + self.secondaries
3214
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3215
      for node in nodenames:
3216
        info = nodeinfo.get(node, None)
3217
        if not info:
3218
          raise errors.OpPrereqError("Cannot get current information"
3219
                                     " from node '%s'" % node)
3220
        vg_free = info.get('vg_free', None)
3221
        if not isinstance(vg_free, int):
3222
          raise errors.OpPrereqError("Can't compute free disk space on"
3223
                                     " node %s" % node)
3224
        if req_size > info['vg_free']:
3225
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3226
                                     " %d MB available, %d MB required" %
3227
                                     (node, info['vg_free'], req_size))
3228

    
3229
    # os verification
3230
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3231
    if not os_obj:
3232
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3233
                                 " primary node"  % self.op.os_type)
3234

    
3235
    if self.op.kernel_path == constants.VALUE_NONE:
3236
      raise errors.OpPrereqError("Can't set instance kernel to none")
3237

    
3238

    
3239
    # bridge check on primary node
3240
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3241
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3242
                                 " destination node '%s'" %
3243
                                 (self.op.bridge, pnode.name))
3244

    
3245
    # memory check on primary node
3246
    if self.op.start:
3247
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3248
                           "creating instance %s" % self.op.instance_name,
3249
                           self.op.mem_size)
3250

    
3251
    # hvm_cdrom_image_path verification
3252
    if self.op.hvm_cdrom_image_path is not None:
3253
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3254
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3255
                                   " be an absolute path or None, not %s" %
3256
                                   self.op.hvm_cdrom_image_path)
3257
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3258
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3259
                                   " regular file or a symlink pointing to"
3260
                                   " an existing regular file, not %s" %
3261
                                   self.op.hvm_cdrom_image_path)
3262

    
3263
    # vnc_bind_address verification
3264
    if self.op.vnc_bind_address is not None:
3265
      if not utils.IsValidIP(self.op.vnc_bind_address):
3266
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3267
                                   " like a valid IP address" %
3268
                                   self.op.vnc_bind_address)
3269

    
3270
    if self.op.start:
3271
      self.instance_status = 'up'
3272
    else:
3273
      self.instance_status = 'down'
3274

    
3275
  def Exec(self, feedback_fn):
3276
    """Create and add the instance to the cluster.
3277

3278
    """
3279
    instance = self.op.instance_name
3280
    pnode_name = self.pnode.name
3281

    
3282
    if self.op.mac == "auto":
3283
      mac_address = self.cfg.GenerateMAC()
3284
    else:
3285
      mac_address = self.op.mac
3286

    
3287
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3288
    if self.inst_ip is not None:
3289
      nic.ip = self.inst_ip
3290

    
3291
    ht_kind = self.sstore.GetHypervisorType()
3292
    if ht_kind in constants.HTS_REQ_PORT:
3293
      network_port = self.cfg.AllocatePort()
3294
    else:
3295
      network_port = None
3296

    
3297
    if self.op.vnc_bind_address is None:
3298
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3299

    
3300
    # this is needed because os.path.join does not accept None arguments
3301
    if self.op.file_storage_dir is None:
3302
      string_file_storage_dir = ""
3303
    else:
3304
      string_file_storage_dir = self.op.file_storage_dir
3305

    
3306
    # build the full file storage dir path
3307
    file_storage_dir = os.path.normpath(os.path.join(
3308
                                        self.sstore.GetFileStorageDir(),
3309
                                        string_file_storage_dir, instance))
3310

    
3311

    
3312
    disks = _GenerateDiskTemplate(self.cfg,
3313
                                  self.op.disk_template,
3314
                                  instance, pnode_name,
3315
                                  self.secondaries, self.op.disk_size,
3316
                                  self.op.swap_size,
3317
                                  file_storage_dir,
3318
                                  self.op.file_driver)
3319

    
3320
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3321
                            primary_node=pnode_name,
3322
                            memory=self.op.mem_size,
3323
                            vcpus=self.op.vcpus,
3324
                            nics=[nic], disks=disks,
3325
                            disk_template=self.op.disk_template,
3326
                            status=self.instance_status,
3327
                            network_port=network_port,
3328
                            kernel_path=self.op.kernel_path,
3329
                            initrd_path=self.op.initrd_path,
3330
                            hvm_boot_order=self.op.hvm_boot_order,
3331
                            hvm_acpi=self.op.hvm_acpi,
3332
                            hvm_pae=self.op.hvm_pae,
3333
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3334
                            vnc_bind_address=self.op.vnc_bind_address,
3335
                            )
3336

    
3337
    feedback_fn("* creating instance disks...")
3338
    if not _CreateDisks(self.cfg, iobj):
3339
      _RemoveDisks(iobj, self.cfg)
3340
      raise errors.OpExecError("Device creation failed, reverting...")
3341

    
3342
    feedback_fn("adding instance %s to cluster config" % instance)
3343

    
3344
    self.cfg.AddInstance(iobj)
3345

    
3346
    if self.op.wait_for_sync:
3347
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3348
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3349
      # make sure the disks are not degraded (still sync-ing is ok)
3350
      time.sleep(15)
3351
      feedback_fn("* checking mirrors status")
3352
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3353
    else:
3354
      disk_abort = False
3355

    
3356
    if disk_abort:
3357
      _RemoveDisks(iobj, self.cfg)
3358
      self.cfg.RemoveInstance(iobj.name)
3359
      raise errors.OpExecError("There are some degraded disks for"
3360
                               " this instance")
3361

    
3362
    feedback_fn("creating os for instance %s on node %s" %
3363
                (instance, pnode_name))
3364

    
3365
    if iobj.disk_template != constants.DT_DISKLESS:
3366
      if self.op.mode == constants.INSTANCE_CREATE:
3367
        feedback_fn("* running the instance OS create scripts...")
3368
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3369
          raise errors.OpExecError("could not add os for instance %s"
3370
                                   " on node %s" %
3371
                                   (instance, pnode_name))
3372

    
3373
      elif self.op.mode == constants.INSTANCE_IMPORT:
3374
        feedback_fn("* running the instance OS import scripts...")
3375
        src_node = self.op.src_node
3376
        src_image = self.src_image
3377
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3378
                                                src_node, src_image):
3379
          raise errors.OpExecError("Could not import os for instance"
3380
                                   " %s on node %s" %
3381
                                   (instance, pnode_name))
3382
      else:
3383
        # also checked in the prereq part
3384
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3385
                                     % self.op.mode)
3386

    
3387
    if self.op.start:
3388
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3389
      feedback_fn("* starting instance...")
3390
      if not rpc.call_instance_start(pnode_name, iobj, None):
3391
        raise errors.OpExecError("Could not start instance")
3392

    
3393

    
3394
class LUConnectConsole(NoHooksLU):
3395
  """Connect to an instance's console.
3396

3397
  This is somewhat special in that it returns the command line that
3398
  you need to run on the master node in order to connect to the
3399
  console.
3400

3401
  """
3402
  _OP_REQP = ["instance_name"]
3403

    
3404
  def CheckPrereq(self):
3405
    """Check prerequisites.
3406

3407
    This checks that the instance is in the cluster.
3408

3409
    """
3410
    instance = self.cfg.GetInstanceInfo(
3411
      self.cfg.ExpandInstanceName(self.op.instance_name))
3412
    if instance is None:
3413
      raise errors.OpPrereqError("Instance '%s' not known" %
3414
                                 self.op.instance_name)
3415
    self.instance = instance
3416

    
3417
  def Exec(self, feedback_fn):
3418
    """Connect to the console of an instance
3419

3420
    """
3421
    instance = self.instance
3422
    node = instance.primary_node
3423

    
3424
    node_insts = rpc.call_instance_list([node])[node]
3425
    if node_insts is False:
3426
      raise errors.OpExecError("Can't connect to node %s." % node)
3427

    
3428
    if instance.name not in node_insts:
3429
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3430

    
3431
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3432

    
3433
    hyper = hypervisor.GetHypervisor()
3434
    console_cmd = hyper.GetShellCommandForConsole(instance)
3435

    
3436
    # build ssh cmdline
3437
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3438

    
3439

    
3440
class LUReplaceDisks(LogicalUnit):
3441
  """Replace the disks of an instance.
3442

3443
  """
3444
  HPATH = "mirrors-replace"
3445
  HTYPE = constants.HTYPE_INSTANCE
3446
  _OP_REQP = ["instance_name", "mode", "disks"]
3447

    
3448
  def _RunAllocator(self):
3449
    """Compute a new secondary node using an IAllocator.
3450

3451
    """
3452
    ial = IAllocator(self.cfg, self.sstore,
3453
                     mode=constants.IALLOCATOR_MODE_RELOC,
3454
                     name=self.op.instance_name,
3455
                     relocate_from=[self.sec_node])
3456

    
3457
    ial.Run(self.op.iallocator)
3458

    
3459
    if not ial.success:
3460
      raise errors.OpPrereqError("Can't compute nodes using"
3461
                                 " iallocator '%s': %s" % (self.op.iallocator,
3462
                                                           ial.info))
3463
    if len(ial.nodes) != ial.required_nodes:
3464
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3465
                                 " of nodes (%s), required %s" %
3466
                                 (len(ial.nodes), ial.required_nodes))
3467
    self.op.remote_node = ial.nodes[0]
3468
    logger.ToStdout("Selected new secondary for the instance: %s" %
3469
                    self.op.remote_node)
3470

    
3471
  def BuildHooksEnv(self):
3472
    """Build hooks env.
3473

3474
    This runs on the master, the primary and all the secondaries.
3475

3476
    """
3477
    env = {
3478
      "MODE": self.op.mode,
3479
      "NEW_SECONDARY": self.op.remote_node,
3480
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3481
      }
3482
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3483
    nl = [
3484
      self.sstore.GetMasterNode(),
3485
      self.instance.primary_node,
3486
      ]
3487
    if self.op.remote_node is not None:
3488
      nl.append(self.op.remote_node)
3489
    return env, nl, nl
3490

    
3491
  def CheckPrereq(self):
3492
    """Check prerequisites.
3493

3494
    This checks that the instance is in the cluster.
3495

3496
    """
3497
    if not hasattr(self.op, "remote_node"):
3498
      self.op.remote_node = None
3499

    
3500
    instance = self.cfg.GetInstanceInfo(
3501
      self.cfg.ExpandInstanceName(self.op.instance_name))
3502
    if instance is None:
3503
      raise errors.OpPrereqError("Instance '%s' not known" %
3504
                                 self.op.instance_name)
3505
    self.instance = instance
3506
    self.op.instance_name = instance.name
3507

    
3508
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3509
      raise errors.OpPrereqError("Instance's disk layout is not"
3510
                                 " network mirrored.")
3511

    
3512
    if len(instance.secondary_nodes) != 1:
3513
      raise errors.OpPrereqError("The instance has a strange layout,"
3514
                                 " expected one secondary but found %d" %
3515
                                 len(instance.secondary_nodes))
3516

    
3517
    self.sec_node = instance.secondary_nodes[0]
3518

    
3519
    ia_name = getattr(self.op, "iallocator", None)
3520
    if ia_name is not None:
3521
      if self.op.remote_node is not None:
3522
        raise errors.OpPrereqError("Give either the iallocator or the new"
3523
                                   " secondary, not both")
3524
      self.op.remote_node = self._RunAllocator()
3525

    
3526
    remote_node = self.op.remote_node
3527
    if remote_node is not None:
3528
      remote_node = self.cfg.ExpandNodeName(remote_node)
3529
      if remote_node is None:
3530
        raise errors.OpPrereqError("Node '%s' not known" %
3531
                                   self.op.remote_node)
3532
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3533
    else:
3534
      self.remote_node_info = None
3535
    if remote_node == instance.primary_node:
3536
      raise errors.OpPrereqError("The specified node is the primary node of"
3537
                                 " the instance.")
3538
    elif remote_node == self.sec_node:
3539
      if self.op.mode == constants.REPLACE_DISK_SEC:
3540
        # this is for DRBD8, where we can't execute the same mode of
3541
        # replacement as for drbd7 (no different port allocated)
3542
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3543
                                   " replacement")
3544
    if instance.disk_template == constants.DT_DRBD8:
3545
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3546
          remote_node is not None):
3547
        # switch to replace secondary mode
3548
        self.op.mode = constants.REPLACE_DISK_SEC
3549

    
3550
      if self.op.mode == constants.REPLACE_DISK_ALL:
3551
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3552
                                   " secondary disk replacement, not"
3553
                                   " both at once")
3554
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3555
        if remote_node is not None:
3556
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3557
                                     " the secondary while doing a primary"
3558
                                     " node disk replacement")
3559
        self.tgt_node = instance.primary_node
3560
        self.oth_node = instance.secondary_nodes[0]
3561
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3562
        self.new_node = remote_node # this can be None, in which case
3563
                                    # we don't change the secondary
3564
        self.tgt_node = instance.secondary_nodes[0]
3565
        self.oth_node = instance.primary_node
3566
      else:
3567
        raise errors.ProgrammerError("Unhandled disk replace mode")
3568

    
3569
    for name in self.op.disks:
3570
      if instance.FindDisk(name) is None:
3571
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3572
                                   (name, instance.name))
3573
    self.op.remote_node = remote_node
3574

    
3575
  def _ExecD8DiskOnly(self, feedback_fn):
3576
    """Replace a disk on the primary or secondary for dbrd8.
3577

3578
    The algorithm for replace is quite complicated:
3579
      - for each disk to be replaced:
3580
        - create new LVs on the target node with unique names
3581
        - detach old LVs from the drbd device
3582
        - rename old LVs to name_replaced.<time_t>
3583
        - rename new LVs to old LVs
3584
        - attach the new LVs (with the old names now) to the drbd device
3585
      - wait for sync across all devices
3586
      - for each modified disk:
3587
        - remove old LVs (which have the name name_replaces.<time_t>)
3588

3589
    Failures are not very well handled.
3590

3591
    """
3592
    steps_total = 6
3593
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3594
    instance = self.instance
3595
    iv_names = {}
3596
    vgname = self.cfg.GetVGName()
3597
    # start of work
3598
    cfg = self.cfg
3599
    tgt_node = self.tgt_node
3600
    oth_node = self.oth_node
3601

    
3602
    # Step: check device activation
3603
    self.proc.LogStep(1, steps_total, "check device existence")
3604
    info("checking volume groups")
3605
    my_vg = cfg.GetVGName()
3606
    results = rpc.call_vg_list([oth_node, tgt_node])
3607
    if not results:
3608
      raise errors.OpExecError("Can't list volume groups on the nodes")
3609
    for node in oth_node, tgt_node:
3610
      res = results.get(node, False)
3611
      if not res or my_vg not in res:
3612
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3613
                                 (my_vg, node))
3614
    for dev in instance.disks:
3615
      if not dev.iv_name in self.op.disks:
3616
        continue
3617
      for node in tgt_node, oth_node:
3618
        info("checking %s on %s" % (dev.iv_name, node))
3619
        cfg.SetDiskID(dev, node)
3620
        if not rpc.call_blockdev_find(node, dev):
3621
          raise errors.OpExecError("Can't find device %s on node %s" %
3622
                                   (dev.iv_name, node))
3623

    
3624
    # Step: check other node consistency
3625
    self.proc.LogStep(2, steps_total, "check peer consistency")
3626
    for dev in instance.disks:
3627
      if not dev.iv_name in self.op.disks:
3628
        continue
3629
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3630
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3631
                                   oth_node==instance.primary_node):
3632
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3633
                                 " to replace disks on this node (%s)" %
3634
                                 (oth_node, tgt_node))
3635

    
3636
    # Step: create new storage
3637
    self.proc.LogStep(3, steps_total, "allocate new storage")
3638
    for dev in instance.disks:
3639
      if not dev.iv_name in self.op.disks:
3640
        continue
3641
      size = dev.size
3642
      cfg.SetDiskID(dev, tgt_node)
3643
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3644
      names = _GenerateUniqueNames(cfg, lv_names)
3645
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3646
                             logical_id=(vgname, names[0]))
3647
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3648
                             logical_id=(vgname, names[1]))
3649
      new_lvs = [lv_data, lv_meta]
3650
      old_lvs = dev.children
3651
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3652
      info("creating new local storage on %s for %s" %
3653
           (tgt_node, dev.iv_name))
3654
      # since we *always* want to create this LV, we use the
3655
      # _Create...OnPrimary (which forces the creation), even if we
3656
      # are talking about the secondary node
3657
      for new_lv in new_lvs:
3658
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3659
                                        _GetInstanceInfoText(instance)):
3660
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3661
                                   " node '%s'" %
3662
                                   (new_lv.logical_id[1], tgt_node))
3663

    
3664
    # Step: for each lv, detach+rename*2+attach
3665
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3666
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3667
      info("detaching %s drbd from local storage" % dev.iv_name)
3668
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3669
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3670
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3671
      #dev.children = []
3672
      #cfg.Update(instance)
3673

    
3674
      # ok, we created the new LVs, so now we know we have the needed
3675
      # storage; as such, we proceed on the target node to rename
3676
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3677
      # using the assumption that logical_id == physical_id (which in
3678
      # turn is the unique_id on that node)
3679

    
3680
      # FIXME(iustin): use a better name for the replaced LVs
3681
      temp_suffix = int(time.time())
3682
      ren_fn = lambda d, suff: (d.physical_id[0],
3683
                                d.physical_id[1] + "_replaced-%s" % suff)
3684
      # build the rename list based on what LVs exist on the node
3685
      rlist = []
3686
      for to_ren in old_lvs:
3687
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3688
        if find_res is not None: # device exists
3689
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3690

    
3691
      info("renaming the old LVs on the target node")
3692
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3693
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3694
      # now we rename the new LVs to the old LVs
3695
      info("renaming the new LVs on the target node")
3696
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3697
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3698
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3699

    
3700
      for old, new in zip(old_lvs, new_lvs):
3701
        new.logical_id = old.logical_id
3702
        cfg.SetDiskID(new, tgt_node)
3703

    
3704
      for disk in old_lvs:
3705
        disk.logical_id = ren_fn(disk, temp_suffix)
3706
        cfg.SetDiskID(disk, tgt_node)
3707

    
3708
      # now that the new lvs have the old name, we can add them to the device
3709
      info("adding new mirror component on %s" % tgt_node)
3710
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3711
        for new_lv in new_lvs:
3712
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3713
            warning("Can't rollback device %s", hint="manually cleanup unused"
3714
                    " logical volumes")
3715
        raise errors.OpExecError("Can't add local storage to drbd")
3716

    
3717
      dev.children = new_lvs
3718
      cfg.Update(instance)
3719

    
3720
    # Step: wait for sync
3721

    
3722
    # this can fail as the old devices are degraded and _WaitForSync
3723
    # does a combined result over all disks, so we don't check its
3724
    # return value
3725
    self.proc.LogStep(5, steps_total, "sync devices")
3726
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3727

    
3728
    # so check manually all the devices
3729
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3730
      cfg.SetDiskID(dev, instance.primary_node)
3731
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3732
      if is_degr:
3733
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3734

    
3735
    # Step: remove old storage
3736
    self.proc.LogStep(6, steps_total, "removing old storage")
3737
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3738
      info("remove logical volumes for %s" % name)
3739
      for lv in old_lvs:
3740
        cfg.SetDiskID(lv, tgt_node)
3741
        if not rpc.call_blockdev_remove(tgt_node, lv):
3742
          warning("Can't remove old LV", hint="manually remove unused LVs")
3743
          continue
3744

    
3745
  def _ExecD8Secondary(self, feedback_fn):
3746
    """Replace the secondary node for drbd8.
3747

3748
    The algorithm for replace is quite complicated:
3749
      - for all disks of the instance:
3750
        - create new LVs on the new node with same names
3751
        - shutdown the drbd device on the old secondary
3752
        - disconnect the drbd network on the primary
3753
        - create the drbd device on the new secondary
3754
        - network attach the drbd on the primary, using an artifice:
3755
          the drbd code for Attach() will connect to the network if it
3756
          finds a device which is connected to the good local disks but
3757
          not network enabled
3758
      - wait for sync across all devices
3759
      - remove all disks from the old secondary
3760

3761
    Failures are not very well handled.
3762

3763
    """
3764
    steps_total = 6
3765
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3766
    instance = self.instance
3767
    iv_names = {}
3768
    vgname = self.cfg.GetVGName()
3769
    # start of work
3770
    cfg = self.cfg
3771
    old_node = self.tgt_node
3772
    new_node = self.new_node
3773
    pri_node = instance.primary_node
3774

    
3775
    # Step: check device activation
3776
    self.proc.LogStep(1, steps_total, "check device existence")
3777
    info("checking volume groups")
3778
    my_vg = cfg.GetVGName()
3779
    results = rpc.call_vg_list([pri_node, new_node])
3780
    if not results:
3781
      raise errors.OpExecError("Can't list volume groups on the nodes")
3782
    for node in pri_node, new_node:
3783
      res = results.get(node, False)
3784
      if not res or my_vg not in res:
3785
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3786
                                 (my_vg, node))
3787
    for dev in instance.disks:
3788
      if not dev.iv_name in self.op.disks:
3789
        continue
3790
      info("checking %s on %s" % (dev.iv_name, pri_node))
3791
      cfg.SetDiskID(dev, pri_node)
3792
      if not rpc.call_blockdev_find(pri_node, dev):
3793
        raise errors.OpExecError("Can't find device %s on node %s" %
3794
                                 (dev.iv_name, pri_node))
3795

    
3796
    # Step: check other node consistency
3797
    self.proc.LogStep(2, steps_total, "check peer consistency")
3798
    for dev in instance.disks:
3799
      if not dev.iv_name in self.op.disks:
3800
        continue
3801
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3802
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3803
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3804
                                 " unsafe to replace the secondary" %
3805
                                 pri_node)
3806

    
3807
    # Step: create new storage
3808
    self.proc.LogStep(3, steps_total, "allocate new storage")
3809
    for dev in instance.disks:
3810
      size = dev.size
3811
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3812
      # since we *always* want to create this LV, we use the
3813
      # _Create...OnPrimary (which forces the creation), even if we
3814
      # are talking about the secondary node
3815
      for new_lv in dev.children:
3816
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3817
                                        _GetInstanceInfoText(instance)):
3818
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3819
                                   " node '%s'" %
3820
                                   (new_lv.logical_id[1], new_node))
3821

    
3822
      iv_names[dev.iv_name] = (dev, dev.children)
3823

    
3824
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3825
    for dev in instance.disks:
3826
      size = dev.size
3827
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3828
      # create new devices on new_node
3829
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3830
                              logical_id=(pri_node, new_node,
3831
                                          dev.logical_id[2]),
3832
                              children=dev.children)
3833
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3834
                                        new_drbd, False,
3835
                                      _GetInstanceInfoText(instance)):
3836
        raise errors.OpExecError("Failed to create new DRBD on"
3837
                                 " node '%s'" % new_node)
3838

    
3839
    for dev in instance.disks:
3840
      # we have new devices, shutdown the drbd on the old secondary
3841
      info("shutting down drbd for %s on old node" % dev.iv_name)
3842
      cfg.SetDiskID(dev, old_node)
3843
      if not rpc.call_blockdev_shutdown(old_node, dev):
3844
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3845
                hint="Please cleanup this device manually as soon as possible")
3846

    
3847
    info("detaching primary drbds from the network (=> standalone)")
3848
    done = 0
3849
    for dev in instance.disks:
3850
      cfg.SetDiskID(dev, pri_node)
3851
      # set the physical (unique in bdev terms) id to None, meaning
3852
      # detach from network
3853
      dev.physical_id = (None,) * len(dev.physical_id)
3854
      # and 'find' the device, which will 'fix' it to match the
3855
      # standalone state
3856
      if rpc.call_blockdev_find(pri_node, dev):
3857
        done += 1
3858
      else:
3859
        warning("Failed to detach drbd %s from network, unusual case" %
3860
                dev.iv_name)
3861

    
3862
    if not done:
3863
      # no detaches succeeded (very unlikely)
3864
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3865

    
3866
    # if we managed to detach at least one, we update all the disks of
3867
    # the instance to point to the new secondary
3868
    info("updating instance configuration")
3869
    for dev in instance.disks:
3870
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3871
      cfg.SetDiskID(dev, pri_node)
3872
    cfg.Update(instance)
3873

    
3874
    # and now perform the drbd attach
3875
    info("attaching primary drbds to new secondary (standalone => connected)")
3876
    failures = []
3877
    for dev in instance.disks:
3878
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3879
      # since the attach is smart, it's enough to 'find' the device,
3880
      # it will automatically activate the network, if the physical_id
3881
      # is correct
3882
      cfg.SetDiskID(dev, pri_node)
3883
      if not rpc.call_blockdev_find(pri_node, dev):
3884
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3885
                "please do a gnt-instance info to see the status of disks")
3886

    
3887
    # this can fail as the old devices are degraded and _WaitForSync
3888
    # does a combined result over all disks, so we don't check its
3889
    # return value
3890
    self.proc.LogStep(5, steps_total, "sync devices")
3891
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3892

    
3893
    # so check manually all the devices
3894
    for name, (dev, old_lvs) in iv_names.iteritems():
3895
      cfg.SetDiskID(dev, pri_node)
3896
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3897
      if is_degr:
3898
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3899

    
3900
    self.proc.LogStep(6, steps_total, "removing old storage")
3901
    for name, (dev, old_lvs) in iv_names.iteritems():
3902
      info("remove logical volumes for %s" % name)
3903
      for lv in old_lvs:
3904
        cfg.SetDiskID(lv, old_node)
3905
        if not rpc.call_blockdev_remove(old_node, lv):
3906
          warning("Can't remove LV on old secondary",
3907
                  hint="Cleanup stale volumes by hand")
3908

    
3909
  def Exec(self, feedback_fn):
3910
    """Execute disk replacement.
3911

3912
    This dispatches the disk replacement to the appropriate handler.
3913

3914
    """
3915
    instance = self.instance
3916

    
3917
    # Activate the instance disks if we're replacing them on a down instance
3918
    if instance.status == "down":
3919
      op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3920
      self.proc.ChainOpCode(op)
3921

    
3922
    if instance.disk_template == constants.DT_DRBD8:
3923
      if self.op.remote_node is None:
3924
        fn = self._ExecD8DiskOnly
3925
      else:
3926
        fn = self._ExecD8Secondary
3927
    else:
3928
      raise errors.ProgrammerError("Unhandled disk replacement case")
3929

    
3930
    ret = fn(feedback_fn)
3931

    
3932
    # Deactivate the instance disks if we're replacing them on a down instance
3933
    if instance.status == "down":
3934
      op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3935
      self.proc.ChainOpCode(op)
3936

    
3937
    return ret
3938

    
3939

    
3940
class LUGrowDisk(LogicalUnit):
3941
  """Grow a disk of an instance.
3942

3943
  """
3944
  HPATH = "disk-grow"
3945
  HTYPE = constants.HTYPE_INSTANCE
3946
  _OP_REQP = ["instance_name", "disk", "amount"]
3947

    
3948
  def BuildHooksEnv(self):
3949
    """Build hooks env.
3950

3951
    This runs on the master, the primary and all the secondaries.
3952

3953
    """
3954
    env = {
3955
      "DISK": self.op.disk,
3956
      "AMOUNT": self.op.amount,
3957
      }
3958
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3959
    nl = [
3960
      self.sstore.GetMasterNode(),
3961
      self.instance.primary_node,
3962
      ]
3963
    return env, nl, nl
3964

    
3965
  def CheckPrereq(self):
3966
    """Check prerequisites.
3967

3968
    This checks that the instance is in the cluster.
3969

3970
    """
3971
    instance = self.cfg.GetInstanceInfo(
3972
      self.cfg.ExpandInstanceName(self.op.instance_name))
3973
    if instance is None:
3974
      raise errors.OpPrereqError("Instance '%s' not known" %
3975
                                 self.op.instance_name)
3976
    self.instance = instance
3977
    self.op.instance_name = instance.name
3978

    
3979
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3980
      raise errors.OpPrereqError("Instance's disk layout does not support"
3981
                                 " growing.")
3982

    
3983
    if instance.FindDisk(self.op.disk) is None:
3984
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3985
                                 (self.op.disk, instance.name))
3986

    
3987
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3988
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3989
    for node in nodenames:
3990
      info = nodeinfo.get(node, None)
3991
      if not info:
3992
        raise errors.OpPrereqError("Cannot get current information"
3993
                                   " from node '%s'" % node)
3994
      vg_free = info.get('vg_free', None)
3995
      if not isinstance(vg_free, int):
3996
        raise errors.OpPrereqError("Can't compute free disk space on"
3997
                                   " node %s" % node)
3998
      if self.op.amount > info['vg_free']:
3999
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4000
                                   " %d MiB available, %d MiB required" %
4001
                                   (node, info['vg_free'], self.op.amount))
4002

    
4003
  def Exec(self, feedback_fn):
4004
    """Execute disk grow.
4005

4006
    """
4007
    instance = self.instance
4008
    disk = instance.FindDisk(self.op.disk)
4009
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4010
      self.cfg.SetDiskID(disk, node)
4011
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4012
      if not result or not isinstance(result, tuple) or len(result) != 2:
4013
        raise errors.OpExecError("grow request failed to node %s" % node)
4014
      elif not result[0]:
4015
        raise errors.OpExecError("grow request failed to node %s: %s" %
4016
                                 (node, result[1]))
4017
    disk.RecordGrow(self.op.amount)
4018
    self.cfg.Update(instance)
4019
    return
4020

    
4021

    
4022
class LUQueryInstanceData(NoHooksLU):
4023
  """Query runtime instance data.
4024

4025
  """
4026
  _OP_REQP = ["instances"]
4027

    
4028
  def CheckPrereq(self):
4029
    """Check prerequisites.
4030

4031
    This only checks the optional instance list against the existing names.
4032

4033
    """
4034
    if not isinstance(self.op.instances, list):
4035
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4036
    if self.op.instances:
4037
      self.wanted_instances = []
4038
      names = self.op.instances
4039
      for name in names:
4040
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4041
        if instance is None:
4042
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4043
        self.wanted_instances.append(instance)
4044
    else:
4045
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4046
                               in self.cfg.GetInstanceList()]
4047
    return
4048

    
4049

    
4050
  def _ComputeDiskStatus(self, instance, snode, dev):
4051
    """Compute block device status.
4052

4053
    """
4054
    self.cfg.SetDiskID(dev, instance.primary_node)
4055
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4056
    if dev.dev_type in constants.LDS_DRBD:
4057
      # we change the snode then (otherwise we use the one passed in)
4058
      if dev.logical_id[0] == instance.primary_node:
4059
        snode = dev.logical_id[1]
4060
      else:
4061
        snode = dev.logical_id[0]
4062

    
4063
    if snode:
4064
      self.cfg.SetDiskID(dev, snode)
4065
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4066
    else:
4067
      dev_sstatus = None
4068

    
4069
    if dev.children:
4070
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4071
                      for child in dev.children]
4072
    else:
4073
      dev_children = []
4074

    
4075
    data = {
4076
      "iv_name": dev.iv_name,
4077
      "dev_type": dev.dev_type,
4078
      "logical_id": dev.logical_id,
4079
      "physical_id": dev.physical_id,
4080
      "pstatus": dev_pstatus,
4081
      "sstatus": dev_sstatus,
4082
      "children": dev_children,
4083
      }
4084

    
4085
    return data
4086

    
4087
  def Exec(self, feedback_fn):
4088
    """Gather and return data"""
4089
    result = {}
4090
    for instance in self.wanted_instances:
4091
      remote_info = rpc.call_instance_info(instance.primary_node,
4092
                                                instance.name)
4093
      if remote_info and "state" in remote_info:
4094
        remote_state = "up"
4095
      else:
4096
        remote_state = "down"
4097
      if instance.status == "down":
4098
        config_state = "down"
4099
      else:
4100
        config_state = "up"
4101

    
4102
      disks = [self._ComputeDiskStatus(instance, None, device)
4103
               for device in instance.disks]
4104

    
4105
      idict = {
4106
        "name": instance.name,
4107
        "config_state": config_state,
4108
        "run_state": remote_state,
4109
        "pnode": instance.primary_node,
4110
        "snodes": instance.secondary_nodes,
4111
        "os": instance.os,
4112
        "memory": instance.memory,
4113
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4114
        "disks": disks,
4115
        "vcpus": instance.vcpus,
4116
        }
4117

    
4118
      htkind = self.sstore.GetHypervisorType()
4119
      if htkind == constants.HT_XEN_PVM30:
4120
        idict["kernel_path"] = instance.kernel_path
4121
        idict["initrd_path"] = instance.initrd_path
4122

    
4123
      if htkind == constants.HT_XEN_HVM31:
4124
        idict["hvm_boot_order"] = instance.hvm_boot_order
4125
        idict["hvm_acpi"] = instance.hvm_acpi
4126
        idict["hvm_pae"] = instance.hvm_pae
4127
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4128

    
4129
      if htkind in constants.HTS_REQ_PORT:
4130
        idict["vnc_bind_address"] = instance.vnc_bind_address
4131
        idict["network_port"] = instance.network_port
4132

    
4133
      result[instance.name] = idict
4134

    
4135
    return result
4136

    
4137

    
4138
class LUSetInstanceParams(LogicalUnit):
4139
  """Modifies an instances's parameters.
4140

4141
  """
4142
  HPATH = "instance-modify"
4143
  HTYPE = constants.HTYPE_INSTANCE
4144
  _OP_REQP = ["instance_name"]
4145

    
4146
  def BuildHooksEnv(self):
4147
    """Build hooks env.
4148

4149
    This runs on the master, primary and secondaries.
4150

4151
    """
4152
    args = dict()
4153
    if self.mem:
4154
      args['memory'] = self.mem
4155
    if self.vcpus:
4156
      args['vcpus'] = self.vcpus
4157
    if self.do_ip or self.do_bridge or self.mac:
4158
      if self.do_ip:
4159
        ip = self.ip
4160
      else:
4161
        ip = self.instance.nics[0].ip
4162
      if self.bridge:
4163
        bridge = self.bridge
4164
      else:
4165
        bridge = self.instance.nics[0].bridge
4166
      if self.mac:
4167
        mac = self.mac
4168
      else:
4169
        mac = self.instance.nics[0].mac
4170
      args['nics'] = [(ip, bridge, mac)]
4171
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4172
    nl = [self.sstore.GetMasterNode(),
4173
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4174
    return env, nl, nl
4175

    
4176
  def CheckPrereq(self):
4177
    """Check prerequisites.
4178

4179
    This only checks the instance list against the existing names.
4180

4181
    """
4182
    self.mem = getattr(self.op, "mem", None)
4183
    self.vcpus = getattr(self.op, "vcpus", None)
4184
    self.ip = getattr(self.op, "ip", None)
4185
    self.mac = getattr(self.op, "mac", None)
4186
    self.bridge = getattr(self.op, "bridge", None)
4187
    self.kernel_path = getattr(self.op, "kernel_path", None)
4188
    self.initrd_path = getattr(self.op, "initrd_path", None)
4189
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4190
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4191
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4192
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4193
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4194
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4195
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4196
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4197
                 self.vnc_bind_address]
4198
    if all_parms.count(None) == len(all_parms):
4199
      raise errors.OpPrereqError("No changes submitted")
4200
    if self.mem is not None:
4201
      try:
4202
        self.mem = int(self.mem)
4203
      except ValueError, err:
4204
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4205
    if self.vcpus is not None:
4206
      try:
4207
        self.vcpus = int(self.vcpus)
4208
      except ValueError, err:
4209
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4210
    if self.ip is not None:
4211
      self.do_ip = True
4212
      if self.ip.lower() == "none":
4213
        self.ip = None
4214
      else:
4215
        if not utils.IsValidIP(self.ip):
4216
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4217
    else:
4218
      self.do_ip = False
4219
    self.do_bridge = (self.bridge is not None)
4220
    if self.mac is not None:
4221
      if self.cfg.IsMacInUse(self.mac):
4222
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4223
                                   self.mac)
4224
      if not utils.IsValidMac(self.mac):
4225
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4226

    
4227
    if self.kernel_path is not None:
4228
      self.do_kernel_path = True
4229
      if self.kernel_path == constants.VALUE_NONE:
4230
        raise errors.OpPrereqError("Can't set instance to no kernel")
4231

    
4232
      if self.kernel_path != constants.VALUE_DEFAULT:
4233
        if not os.path.isabs(self.kernel_path):
4234
          raise errors.OpPrereqError("The kernel path must be an absolute"
4235
                                    " filename")
4236
    else:
4237
      self.do_kernel_path = False
4238

    
4239
    if self.initrd_path is not None:
4240
      self.do_initrd_path = True
4241
      if self.initrd_path not in (constants.VALUE_NONE,
4242
                                  constants.VALUE_DEFAULT):
4243
        if not os.path.isabs(self.initrd_path):
4244
          raise errors.OpPrereqError("The initrd path must be an absolute"
4245
                                    " filename")
4246
    else:
4247
      self.do_initrd_path = False
4248

    
4249
    # boot order verification
4250
    if self.hvm_boot_order is not None:
4251
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4252
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4253
          raise errors.OpPrereqError("invalid boot order specified,"
4254
                                     " must be one or more of [acdn]"
4255
                                     " or 'default'")
4256

    
4257
    # hvm_cdrom_image_path verification
4258
    if self.op.hvm_cdrom_image_path is not None:
4259
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
4260
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4261
                                   " be an absolute path or None, not %s" %
4262
                                   self.op.hvm_cdrom_image_path)
4263
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
4264
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4265
                                   " regular file or a symlink pointing to"
4266
                                   " an existing regular file, not %s" %
4267
                                   self.op.hvm_cdrom_image_path)
4268

    
4269
    # vnc_bind_address verification
4270
    if self.op.vnc_bind_address is not None:
4271
      if not utils.IsValidIP(self.op.vnc_bind_address):
4272
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4273
                                   " like a valid IP address" %
4274
                                   self.op.vnc_bind_address)
4275

    
4276
    instance = self.cfg.GetInstanceInfo(
4277
      self.cfg.ExpandInstanceName(self.op.instance_name))
4278
    if instance is None:
4279
      raise errors.OpPrereqError("No such instance name '%s'" %
4280
                                 self.op.instance_name)
4281
    self.op.instance_name = instance.name
4282
    self.instance = instance
4283
    return
4284

    
4285
  def Exec(self, feedback_fn):
4286
    """Modifies an instance.
4287

4288
    All parameters take effect only at the next restart of the instance.
4289
    """
4290
    result = []
4291
    instance = self.instance
4292
    if self.mem:
4293
      instance.memory = self.mem
4294
      result.append(("mem", self.mem))
4295
    if self.vcpus:
4296
      instance.vcpus = self.vcpus
4297
      result.append(("vcpus",  self.vcpus))
4298
    if self.do_ip:
4299
      instance.nics[0].ip = self.ip
4300
      result.append(("ip", self.ip))
4301
    if self.bridge:
4302
      instance.nics[0].bridge = self.bridge
4303
      result.append(("bridge", self.bridge))
4304
    if self.mac:
4305
      instance.nics[0].mac = self.mac
4306
      result.append(("mac", self.mac))
4307
    if self.do_kernel_path:
4308
      instance.kernel_path = self.kernel_path
4309
      result.append(("kernel_path", self.kernel_path))
4310
    if self.do_initrd_path:
4311
      instance.initrd_path = self.initrd_path
4312
      result.append(("initrd_path", self.initrd_path))
4313
    if self.hvm_boot_order:
4314
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4315
        instance.hvm_boot_order = None
4316
      else:
4317
        instance.hvm_boot_order = self.hvm_boot_order
4318
      result.append(("hvm_boot_order", self.hvm_boot_order))
4319
    if self.hvm_acpi:
4320
      instance.hvm_acpi = self.hvm_acpi
4321
      result.append(("hvm_acpi", self.hvm_acpi))
4322
    if self.hvm_pae:
4323
      instance.hvm_pae = self.hvm_pae
4324
      result.append(("hvm_pae", self.hvm_pae))
4325
    if self.hvm_cdrom_image_path:
4326
      instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4327
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4328
    if self.vnc_bind_address:
4329
      instance.vnc_bind_address = self.vnc_bind_address
4330
      result.append(("vnc_bind_address", self.vnc_bind_address))
4331

    
4332
    self.cfg.AddInstance(instance)
4333

    
4334
    return result
4335

    
4336

    
4337
class LUQueryExports(NoHooksLU):
4338
  """Query the exports list
4339

4340
  """
4341
  _OP_REQP = []
4342

    
4343
  def CheckPrereq(self):
4344
    """Check that the nodelist contains only existing nodes.
4345

4346
    """
4347
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4348

    
4349
  def Exec(self, feedback_fn):
4350
    """Compute the list of all the exported system images.
4351

4352
    Returns:
4353
      a dictionary with the structure node->(export-list)
4354
      where export-list is a list of the instances exported on
4355
      that node.
4356

4357
    """
4358
    return rpc.call_export_list(self.nodes)
4359

    
4360

    
4361
class LUExportInstance(LogicalUnit):
4362
  """Export an instance to an image in the cluster.
4363

4364
  """
4365
  HPATH = "instance-export"
4366
  HTYPE = constants.HTYPE_INSTANCE
4367
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4368

    
4369
  def BuildHooksEnv(self):
4370
    """Build hooks env.
4371

4372
    This will run on the master, primary node and target node.
4373

4374
    """
4375
    env = {
4376
      "EXPORT_NODE": self.op.target_node,
4377
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4378
      }
4379
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4380
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4381
          self.op.target_node]
4382
    return env, nl, nl
4383

    
4384
  def CheckPrereq(self):
4385
    """Check prerequisites.
4386

4387
    This checks that the instance and node names are valid.
4388

4389
    """
4390
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4391
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4392
    if self.instance is None:
4393
      raise errors.OpPrereqError("Instance '%s' not found" %
4394
                                 self.op.instance_name)
4395

    
4396
    # node verification
4397
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4398
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4399

    
4400
    if self.dst_node is None:
4401
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4402
                                 self.op.target_node)
4403
    self.op.target_node = self.dst_node.name
4404

    
4405
    # instance disk type verification
4406
    for disk in self.instance.disks:
4407
      if disk.dev_type == constants.LD_FILE:
4408
        raise errors.OpPrereqError("Export not supported for instances with"
4409
                                   " file-based disks")
4410

    
4411
  def Exec(self, feedback_fn):
4412
    """Export an instance to an image in the cluster.
4413

4414
    """
4415
    instance = self.instance
4416
    dst_node = self.dst_node
4417
    src_node = instance.primary_node
4418
    if self.op.shutdown:
4419
      # shutdown the instance, but not the disks
4420
      if not rpc.call_instance_shutdown(src_node, instance):
4421
         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4422
                                  (instance.name, src_node))
4423

    
4424
    vgname = self.cfg.GetVGName()
4425

    
4426
    snap_disks = []
4427

    
4428
    try:
4429
      for disk in instance.disks:
4430
        if disk.iv_name == "sda":
4431
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4432
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4433

    
4434
          if not new_dev_name:
4435
            logger.Error("could not snapshot block device %s on node %s" %
4436
                         (disk.logical_id[1], src_node))
4437
          else:
4438
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4439
                                      logical_id=(vgname, new_dev_name),
4440
                                      physical_id=(vgname, new_dev_name),
4441
                                      iv_name=disk.iv_name)
4442
            snap_disks.append(new_dev)
4443

    
4444
    finally:
4445
      if self.op.shutdown and instance.status == "up":
4446
        if not rpc.call_instance_start(src_node, instance, None):
4447
          _ShutdownInstanceDisks(instance, self.cfg)
4448
          raise errors.OpExecError("Could not start instance")
4449

    
4450
    # TODO: check for size
4451

    
4452
    for dev in snap_disks:
4453
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4454
        logger.Error("could not export block device %s from node %s to node %s"
4455
                     % (dev.logical_id[1], src_node, dst_node.name))
4456
      if not rpc.call_blockdev_remove(src_node, dev):
4457
        logger.Error("could not remove snapshot block device %s from node %s" %
4458
                     (dev.logical_id[1], src_node))
4459

    
4460
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4461
      logger.Error("could not finalize export for instance %s on node %s" %
4462
                   (instance.name, dst_node.name))
4463

    
4464
    nodelist = self.cfg.GetNodeList()
4465
    nodelist.remove(dst_node.name)
4466

    
4467
    # on one-node clusters nodelist will be empty after the removal
4468
    # if we proceed the backup would be removed because OpQueryExports
4469
    # substitutes an empty list with the full cluster node list.
4470
    if nodelist:
4471
      op = opcodes.OpQueryExports(nodes=nodelist)
4472
      exportlist = self.proc.ChainOpCode(op)
4473
      for node in exportlist:
4474
        if instance.name in exportlist[node]:
4475
          if not rpc.call_export_remove(node, instance.name):
4476
            logger.Error("could not remove older export for instance %s"
4477
                         " on node %s" % (instance.name, node))
4478

    
4479

    
4480
class LURemoveExport(NoHooksLU):
4481
  """Remove exports related to the named instance.
4482

4483
  """
4484
  _OP_REQP = ["instance_name"]
4485

    
4486
  def CheckPrereq(self):
4487
    """Check prerequisites.
4488
    """
4489
    pass
4490

    
4491
  def Exec(self, feedback_fn):
4492
    """Remove any export.
4493

4494
    """
4495
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4496
    # If the instance was not found we'll try with the name that was passed in.
4497
    # This will only work if it was an FQDN, though.
4498
    fqdn_warn = False
4499
    if not instance_name:
4500
      fqdn_warn = True
4501
      instance_name = self.op.instance_name
4502

    
4503
    op = opcodes.OpQueryExports(nodes=[])
4504
    exportlist = self.proc.ChainOpCode(op)
4505
    found = False
4506
    for node in exportlist:
4507
      if instance_name in exportlist[node]:
4508
        found = True
4509
        if not rpc.call_export_remove(node, instance_name):
4510
          logger.Error("could not remove export for instance %s"
4511
                       " on node %s" % (instance_name, node))
4512

    
4513
    if fqdn_warn and not found:
4514
      feedback_fn("Export not found. If trying to remove an export belonging"
4515
                  " to a deleted instance please use its Fully Qualified"
4516
                  " Domain Name.")
4517

    
4518

    
4519
class TagsLU(NoHooksLU):
4520
  """Generic tags LU.
4521

4522
  This is an abstract class which is the parent of all the other tags LUs.
4523

4524
  """
4525
  def CheckPrereq(self):
4526
    """Check prerequisites.
4527

4528
    """
4529
    if self.op.kind == constants.TAG_CLUSTER:
4530
      self.target = self.cfg.GetClusterInfo()
4531
    elif self.op.kind == constants.TAG_NODE:
4532
      name = self.cfg.ExpandNodeName(self.op.name)
4533
      if name is None:
4534
        raise errors.OpPrereqError("Invalid node name (%s)" %
4535
                                   (self.op.name,))
4536
      self.op.name = name
4537
      self.target = self.cfg.GetNodeInfo(name)
4538
    elif self.op.kind == constants.TAG_INSTANCE:
4539
      name = self.cfg.ExpandInstanceName(self.op.name)
4540
      if name is None:
4541
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4542
                                   (self.op.name,))
4543
      self.op.name = name
4544
      self.target = self.cfg.GetInstanceInfo(name)
4545
    else:
4546
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4547
                                 str(self.op.kind))
4548

    
4549

    
4550
class LUGetTags(TagsLU):
4551
  """Returns the tags of a given object.
4552

4553
  """
4554
  _OP_REQP = ["kind", "name"]
4555

    
4556
  def Exec(self, feedback_fn):
4557
    """Returns the tag list.
4558

4559
    """
4560
    return self.target.GetTags()
4561

    
4562

    
4563
class LUSearchTags(NoHooksLU):
4564
  """Searches the tags for a given pattern.
4565

4566
  """
4567
  _OP_REQP = ["pattern"]
4568

    
4569
  def CheckPrereq(self):
4570
    """Check prerequisites.
4571

4572
    This checks the pattern passed for validity by compiling it.
4573

4574
    """
4575
    try:
4576
      self.re = re.compile(self.op.pattern)
4577
    except re.error, err:
4578
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4579
                                 (self.op.pattern, err))
4580

    
4581
  def Exec(self, feedback_fn):
4582
    """Returns the tag list.
4583

4584
    """
4585
    cfg = self.cfg
4586
    tgts = [("/cluster", cfg.GetClusterInfo())]
4587
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4588
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4589
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4590
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4591
    results = []
4592
    for path, target in tgts:
4593
      for tag in target.GetTags():
4594
        if self.re.search(tag):
4595
          results.append((path, tag))
4596
    return results
4597

    
4598

    
4599
class LUAddTags(TagsLU):
4600
  """Sets a tag on a given object.
4601

4602
  """
4603
  _OP_REQP = ["kind", "name", "tags"]
4604

    
4605
  def CheckPrereq(self):
4606
    """Check prerequisites.
4607

4608
    This checks the type and length of the tag name and value.
4609

4610
    """
4611
    TagsLU.CheckPrereq(self)
4612
    for tag in self.op.tags:
4613
      objects.TaggableObject.ValidateTag(tag)
4614

    
4615
  def Exec(self, feedback_fn):
4616
    """Sets the tag.
4617

4618
    """
4619
    try:
4620
      for tag in self.op.tags:
4621
        self.target.AddTag(tag)
4622
    except errors.TagError, err:
4623
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4624
    try:
4625
      self.cfg.Update(self.target)
4626
    except errors.ConfigurationError:
4627
      raise errors.OpRetryError("There has been a modification to the"
4628
                                " config file and the operation has been"
4629
                                " aborted. Please retry.")
4630

    
4631

    
4632
class LUDelTags(TagsLU):
4633
  """Delete a list of tags from a given object.
4634

4635
  """
4636
  _OP_REQP = ["kind", "name", "tags"]
4637

    
4638
  def CheckPrereq(self):
4639
    """Check prerequisites.
4640

4641
    This checks that we have the given tag.
4642

4643
    """
4644
    TagsLU.CheckPrereq(self)
4645
    for tag in self.op.tags:
4646
      objects.TaggableObject.ValidateTag(tag)
4647
    del_tags = frozenset(self.op.tags)
4648
    cur_tags = self.target.GetTags()
4649
    if not del_tags <= cur_tags:
4650
      diff_tags = del_tags - cur_tags
4651
      diff_names = ["'%s'" % tag for tag in diff_tags]
4652
      diff_names.sort()
4653
      raise errors.OpPrereqError("Tag(s) %s not found" %
4654
                                 (",".join(diff_names)))
4655

    
4656
  def Exec(self, feedback_fn):
4657
    """Remove the tag from the object.
4658

4659
    """
4660
    for tag in self.op.tags:
4661
      self.target.RemoveTag(tag)
4662
    try:
4663
      self.cfg.Update(self.target)
4664
    except errors.ConfigurationError:
4665
      raise errors.OpRetryError("There has been a modification to the"
4666
                                " config file and the operation has been"
4667
                                " aborted. Please retry.")
4668

    
4669
class LUTestDelay(NoHooksLU):
4670
  """Sleep for a specified amount of time.
4671

4672
  This LU sleeps on the master and/or nodes for a specified amoutn of
4673
  time.
4674

4675
  """
4676
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4677

    
4678
  def CheckPrereq(self):
4679
    """Check prerequisites.
4680

4681
    This checks that we have a good list of nodes and/or the duration
4682
    is valid.
4683

4684
    """
4685

    
4686
    if self.op.on_nodes:
4687
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4688

    
4689
  def Exec(self, feedback_fn):
4690
    """Do the actual sleep.
4691

4692
    """
4693
    if self.op.on_master:
4694
      if not utils.TestDelay(self.op.duration):
4695
        raise errors.OpExecError("Error during master delay test")
4696
    if self.op.on_nodes:
4697
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4698
      if not result:
4699
        raise errors.OpExecError("Complete failure from rpc call")
4700
      for node, node_result in result.items():
4701
        if not node_result:
4702
          raise errors.OpExecError("Failure during rpc call to node %s,"
4703
                                   " result: %s" % (node, node_result))
4704

    
4705

    
4706
class IAllocator(object):
4707
  """IAllocator framework.
4708

4709
  An IAllocator instance has three sets of attributes:
4710
    - cfg/sstore that are needed to query the cluster
4711
    - input data (all members of the _KEYS class attribute are required)
4712
    - four buffer attributes (in|out_data|text), that represent the
4713
      input (to the external script) in text and data structure format,
4714
      and the output from it, again in two formats
4715
    - the result variables from the script (success, info, nodes) for
4716
      easy usage
4717

4718
  """
4719
  _ALLO_KEYS = [
4720
    "mem_size", "disks", "disk_template",
4721
    "os", "tags", "nics", "vcpus",
4722
    ]
4723
  _RELO_KEYS = [
4724
    "relocate_from",
4725
    ]
4726

    
4727
  def __init__(self, cfg, sstore, mode, name, **kwargs):
4728
    self.cfg = cfg
4729
    self.sstore = sstore
4730
    # init buffer variables
4731
    self.in_text = self.out_text = self.in_data = self.out_data = None
4732
    # init all input fields so that pylint is happy
4733
    self.mode = mode
4734
    self.name = name
4735
    self.mem_size = self.disks = self.disk_template = None
4736
    self.os = self.tags = self.nics = self.vcpus = None
4737
    self.relocate_from = None
4738
    # computed fields
4739
    self.required_nodes = None
4740
    # init result fields
4741
    self.success = self.info = self.nodes = None
4742
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4743
      keyset = self._ALLO_KEYS
4744
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4745
      keyset = self._RELO_KEYS
4746
    else:
4747
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4748
                                   " IAllocator" % self.mode)
4749
    for key in kwargs:
4750
      if key not in keyset:
4751
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4752
                                     " IAllocator" % key)
4753
      setattr(self, key, kwargs[key])
4754
    for key in keyset:
4755
      if key not in kwargs:
4756
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4757
                                     " IAllocator" % key)
4758
    self._BuildInputData()
4759

    
4760
  def _ComputeClusterData(self):
4761
    """Compute the generic allocator input data.
4762

4763
    This is the data that is independent of the actual operation.
4764

4765
    """
4766
    cfg = self.cfg
4767
    # cluster data
4768
    data = {
4769
      "version": 1,
4770
      "cluster_name": self.sstore.GetClusterName(),
4771
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4772
      "hypervisor_type": self.sstore.GetHypervisorType(),
4773
      # we don't have job IDs
4774
      }
4775

    
4776
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4777

    
4778
    # node data
4779
    node_results = {}
4780
    node_list = cfg.GetNodeList()
4781
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4782
    for nname in node_list:
4783
      ninfo = cfg.GetNodeInfo(nname)
4784
      if nname not in node_data or not isinstance(node_data[nname], dict):
4785
        raise errors.OpExecError("Can't get data for node %s" % nname)
4786
      remote_info = node_data[nname]
4787
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
4788
                   'vg_size', 'vg_free', 'cpu_total']:
4789
        if attr not in remote_info:
4790
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4791
                                   (nname, attr))
4792
        try:
4793
          remote_info[attr] = int(remote_info[attr])
4794
        except ValueError, err:
4795
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4796
                                   " %s" % (nname, attr, str(err)))
4797
      # compute memory used by primary instances
4798
      i_p_mem = i_p_up_mem = 0
4799
      for iinfo in i_list:
4800
        if iinfo.primary_node == nname:
4801
          i_p_mem += iinfo.memory
4802
          if iinfo.status == "up":
4803
            i_p_up_mem += iinfo.memory
4804

    
4805
      # compute memory used by instances
4806
      pnr = {
4807
        "tags": list(ninfo.GetTags()),
4808
        "total_memory": remote_info['memory_total'],
4809
        "reserved_memory": remote_info['memory_dom0'],
4810
        "free_memory": remote_info['memory_free'],
4811
        "i_pri_memory": i_p_mem,
4812
        "i_pri_up_memory": i_p_up_mem,
4813
        "total_disk": remote_info['vg_size'],
4814
        "free_disk": remote_info['vg_free'],
4815
        "primary_ip": ninfo.primary_ip,
4816
        "secondary_ip": ninfo.secondary_ip,
4817
        "total_cpus": remote_info['cpu_total'],
4818
        }
4819
      node_results[nname] = pnr
4820
    data["nodes"] = node_results
4821

    
4822
    # instance data
4823
    instance_data = {}
4824
    for iinfo in i_list:
4825
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4826
                  for n in iinfo.nics]
4827
      pir = {
4828
        "tags": list(iinfo.GetTags()),
4829
        "should_run": iinfo.status == "up",
4830
        "vcpus": iinfo.vcpus,
4831
        "memory": iinfo.memory,
4832
        "os": iinfo.os,
4833
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4834
        "nics": nic_data,
4835
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4836
        "disk_template": iinfo.disk_template,
4837
        }
4838
      instance_data[iinfo.name] = pir
4839

    
4840
    data["instances"] = instance_data
4841

    
4842
    self.in_data = data
4843

    
4844
  def _AddNewInstance(self):
4845
    """Add new instance data to allocator structure.
4846

4847
    This in combination with _AllocatorGetClusterData will create the
4848
    correct structure needed as input for the allocator.
4849

4850
    The checks for the completeness of the opcode must have already been
4851
    done.
4852

4853
    """
4854
    data = self.in_data
4855
    if len(self.disks) != 2:
4856
      raise errors.OpExecError("Only two-disk configurations supported")
4857

    
4858
    disk_space = _ComputeDiskSize(self.disk_template,
4859
                                  self.disks[0]["size"], self.disks[1]["size"])
4860

    
4861
    if self.disk_template in constants.DTS_NET_MIRROR:
4862
      self.required_nodes = 2
4863
    else:
4864
      self.required_nodes = 1
4865
    request = {
4866
      "type": "allocate",
4867
      "name": self.name,
4868
      "disk_template": self.disk_template,
4869
      "tags": self.tags,
4870
      "os": self.os,
4871
      "vcpus": self.vcpus,
4872
      "memory": self.mem_size,
4873
      "disks": self.disks,
4874
      "disk_space_total": disk_space,
4875
      "nics": self.nics,
4876
      "required_nodes": self.required_nodes,
4877
      }
4878
    data["request"] = request
4879

    
4880
  def _AddRelocateInstance(self):
4881
    """Add relocate instance data to allocator structure.
4882

4883
    This in combination with _IAllocatorGetClusterData will create the
4884
    correct structure needed as input for the allocator.
4885

4886
    The checks for the completeness of the opcode must have already been
4887
    done.
4888

4889
    """
4890
    instance = self.cfg.GetInstanceInfo(self.name)
4891
    if instance is None:
4892
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
4893
                                   " IAllocator" % self.name)
4894

    
4895
    if instance.disk_template not in constants.DTS_NET_MIRROR:
4896
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4897

    
4898
    if len(instance.secondary_nodes) != 1:
4899
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
4900

    
4901
    self.required_nodes = 1
4902

    
4903
    disk_space = _ComputeDiskSize(instance.disk_template,
4904
                                  instance.disks[0].size,
4905
                                  instance.disks[1].size)
4906

    
4907
    request = {
4908
      "type": "relocate",
4909
      "name": self.name,
4910
      "disk_space_total": disk_space,
4911
      "required_nodes": self.required_nodes,
4912
      "relocate_from": self.relocate_from,
4913
      }
4914
    self.in_data["request"] = request
4915

    
4916
  def _BuildInputData(self):
4917
    """Build input data structures.
4918

4919
    """
4920
    self._ComputeClusterData()
4921

    
4922
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4923
      self._AddNewInstance()
4924
    else:
4925
      self._AddRelocateInstance()
4926

    
4927
    self.in_text = serializer.Dump(self.in_data)
4928

    
4929
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4930
    """Run an instance allocator and return the results.
4931

4932
    """
4933
    data = self.in_text
4934

    
4935
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4936

    
4937
    if not isinstance(result, tuple) or len(result) != 4:
4938
      raise errors.OpExecError("Invalid result from master iallocator runner")
4939

    
4940
    rcode, stdout, stderr, fail = result
4941

    
4942
    if rcode == constants.IARUN_NOTFOUND:
4943
      raise errors.OpExecError("Can't find allocator '%s'" % name)
4944
    elif rcode == constants.IARUN_FAILURE:
4945
        raise errors.OpExecError("Instance allocator call failed: %s,"
4946
                                 " output: %s" %
4947
                                 (fail, stdout+stderr))
4948
    self.out_text = stdout
4949
    if validate:
4950
      self._ValidateResult()
4951

    
4952
  def _ValidateResult(self):
4953
    """Process the allocator results.
4954

4955
    This will process and if successful save the result in
4956
    self.out_data and the other parameters.
4957

4958
    """
4959
    try:
4960
      rdict = serializer.Load(self.out_text)
4961
    except Exception, err:
4962
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4963

    
4964
    if not isinstance(rdict, dict):
4965
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
4966

    
4967
    for key in "success", "info", "nodes":
4968
      if key not in rdict:
4969
        raise errors.OpExecError("Can't parse iallocator results:"
4970
                                 " missing key '%s'" % key)
4971
      setattr(self, key, rdict[key])
4972

    
4973
    if not isinstance(rdict["nodes"], list):
4974
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4975
                               " is not a list")
4976
    self.out_data = rdict
4977

    
4978

    
4979
class LUTestAllocator(NoHooksLU):
4980
  """Run allocator tests.
4981

4982
  This LU runs the allocator tests
4983

4984
  """
4985
  _OP_REQP = ["direction", "mode", "name"]
4986

    
4987
  def CheckPrereq(self):
4988
    """Check prerequisites.
4989

4990
    This checks the opcode parameters depending on the director and mode test.
4991

4992
    """
4993
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4994
      for attr in ["name", "mem_size", "disks", "disk_template",
4995
                   "os", "tags", "nics", "vcpus"]:
4996
        if not hasattr(self.op, attr):
4997
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4998
                                     attr)
4999
      iname = self.cfg.ExpandInstanceName(self.op.name)
5000
      if iname is not None:
5001
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5002
                                   iname)
5003
      if not isinstance(self.op.nics, list):
5004
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5005
      for row in self.op.nics:
5006
        if (not isinstance(row, dict) or
5007
            "mac" not in row or
5008
            "ip" not in row or
5009
            "bridge" not in row):
5010
          raise errors.OpPrereqError("Invalid contents of the"
5011
                                     " 'nics' parameter")
5012
      if not isinstance(self.op.disks, list):
5013
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5014
      if len(self.op.disks) != 2:
5015
        raise errors.OpPrereqError("Only two-disk configurations supported")
5016
      for row in self.op.disks:
5017
        if (not isinstance(row, dict) or
5018
            "size" not in row or
5019
            not isinstance(row["size"], int) or
5020
            "mode" not in row or
5021
            row["mode"] not in ['r', 'w']):
5022
          raise errors.OpPrereqError("Invalid contents of the"
5023
                                     " 'disks' parameter")
5024
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5025
      if not hasattr(self.op, "name"):
5026
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5027
      fname = self.cfg.ExpandInstanceName(self.op.name)
5028
      if fname is None:
5029
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5030
                                   self.op.name)
5031
      self.op.name = fname
5032
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5033
    else:
5034
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5035
                                 self.op.mode)
5036

    
5037
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5038
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5039
        raise errors.OpPrereqError("Missing allocator name")
5040
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5041
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5042
                                 self.op.direction)
5043

    
5044
  def Exec(self, feedback_fn):
5045
    """Run the allocator test.
5046

5047
    """
5048
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5049
      ial = IAllocator(self.cfg, self.sstore,
5050
                       mode=self.op.mode,
5051
                       name=self.op.name,
5052
                       mem_size=self.op.mem_size,
5053
                       disks=self.op.disks,
5054
                       disk_template=self.op.disk_template,
5055
                       os=self.op.os,
5056
                       tags=self.op.tags,
5057
                       nics=self.op.nics,
5058
                       vcpus=self.op.vcpus,
5059
                       )
5060
    else:
5061
      ial = IAllocator(self.cfg, self.sstore,
5062
                       mode=self.op.mode,
5063
                       name=self.op.name,
5064
                       relocate_from=list(self.relocate_from),
5065
                       )
5066

    
5067
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5068
      result = ial.in_text
5069
    else:
5070
      ial.Run(self.op.allocator, validate=False)
5071
      result = ial.out_text
5072
    return result